Implement a keep-alive strategy to detect stales connections, fixes #47

This commit is contained in:
Guillaume Nodet
2020-10-21 23:06:24 +02:00
parent 7dbb371a06
commit 22f608d5b4
19 changed files with 571 additions and 88 deletions

View File

@@ -57,6 +57,7 @@ import org.jboss.fuse.mvnd.common.Message.BuildEvent.Type;
import org.jboss.fuse.mvnd.common.Message.BuildException;
import org.jboss.fuse.mvnd.common.Message.BuildMessage;
import org.jboss.fuse.mvnd.common.Message.BuildRequest;
import org.jboss.fuse.mvnd.common.Message.KeepAliveMessage;
import org.jboss.fuse.mvnd.common.Message.MessageSerializer;
import org.jboss.fuse.mvnd.daemon.DaemonExpiration.DaemonExpirationResult;
import org.jboss.fuse.mvnd.daemon.DaemonExpiration.DaemonExpirationStrategy;
@@ -72,7 +73,6 @@ public class Server implements AutoCloseable, Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(Server.class);
public static final int CANCEL_TIMEOUT = 10 * 1000;
public static final int DEFAULT_IDLE_TIMEOUT = (int) TimeUnit.HOURS.toMillis(3);
private final String uid;
private final ServerSocketChannel socket;
@@ -96,9 +96,9 @@ public class Server implements AutoCloseable, Runnable {
registry = new DaemonRegistry(layout.registry());
socket = ServerSocketChannel.open().bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
final int idleTimeout = Environment.DAEMON_IDLE_TIMEOUT
final int idleTimeout = Environment.DAEMON_IDLE_TIMEOUT_MS
.systemProperty()
.orDefault(() -> String.valueOf(DEFAULT_IDLE_TIMEOUT))
.orDefault(() -> String.valueOf(Environment.DEFAULT_IDLE_TIMEOUT))
.asInt();
executor = Executors.newScheduledThreadPool(1);
strategy = DaemonExpiration.master();
@@ -389,6 +389,8 @@ public class Server implements AutoCloseable, Runnable {
private void handle(DaemonConnection<Message> connection, BuildRequest buildRequest) {
updateState(Busy);
try {
int keepAlive = Environment.DAEMON_KEEP_ALIVE_MS.systemProperty().asInt();
LOGGER.info("Executing request");
CliRequest req = new CliRequestBuilder()
.arguments(buildRequest.getArgs())
@@ -403,11 +405,22 @@ public class Server implements AutoCloseable, Runnable {
AbstractLoggingSpy.instance(loggingSpy);
Thread pumper = new Thread(() -> {
try {
boolean flushed = true;
while (true) {
Message m = queue.poll();
if (m == null) {
connection.flush();
m = queue.take();
Message m;
if (flushed) {
m = queue.poll(keepAlive, TimeUnit.MILLISECONDS);
if (m == null) {
m = new KeepAliveMessage();
}
flushed = false;
} else {
m = queue.poll();
if (m == null) {
connection.flush();
flushed = true;
continue;
}
}
if (m == STOP) {
connection.flush();
@@ -462,6 +475,8 @@ public class Server implements AutoCloseable, Runnable {
return 97;
} else if (m == STOP) {
return 99;
} else if (m instanceof KeepAliveMessage) {
return 100;
} else {
throw new IllegalStateException();
}