From d44e3201e06e9306fb41f2c0d6a4764fee59e487 Mon Sep 17 00:00:00 2001 From: Guillaume Nodet Date: Thu, 12 Nov 2020 00:18:06 +0100 Subject: [PATCH] Fix the readInputLoop so that messages are all delivered and processed in the main thread --- .../fuse/mvnd/client/DaemonClientConnection.java | 8 ++++++++ .../org/jboss/fuse/mvnd/client/DefaultClient.java | 3 ++- .../jboss/fuse/mvnd/common/logging/ClientOutput.java | 4 +++- .../fuse/mvnd/common/logging/TerminalOutput.java | 11 +++++++++-- .../org/jboss/fuse/mvnd/assertj/TestClientOutput.java | 6 +++++- 5 files changed, 27 insertions(+), 5 deletions(-) diff --git a/client/src/main/java/org/jboss/fuse/mvnd/client/DaemonClientConnection.java b/client/src/main/java/org/jboss/fuse/mvnd/client/DaemonClientConnection.java index 588d21fa..3603e040 100644 --- a/client/src/main/java/org/jboss/fuse/mvnd/client/DaemonClientConnection.java +++ b/client/src/main/java/org/jboss/fuse/mvnd/client/DaemonClientConnection.java @@ -129,6 +129,14 @@ public class DaemonClientConnection implements Closeable { } } + public void enqueue(Message message) { + try { + queue.put(message); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + protected void doReceive() { try { while (running.get()) { diff --git a/client/src/main/java/org/jboss/fuse/mvnd/client/DefaultClient.java b/client/src/main/java/org/jboss/fuse/mvnd/client/DefaultClient.java index ada0dcca..97efd64f 100644 --- a/client/src/main/java/org/jboss/fuse/mvnd/client/DefaultClient.java +++ b/client/src/main/java/org/jboss/fuse/mvnd/client/DefaultClient.java @@ -199,7 +199,8 @@ public class DefaultClient implements Client { final DaemonConnector connector = new DaemonConnector(parameters, registry); try (DaemonClientConnection daemon = connector.connect(output)) { - output.setDeamonDispatch(daemon::dispatch); + output.setDaemonDispatch(daemon::dispatch); + output.setDaemonReceive(daemon::enqueue); output.accept(Message.buildStatus("Connected to daemon")); daemon.dispatch(new Message.BuildRequest( diff --git a/common/src/main/java/org/jboss/fuse/mvnd/common/logging/ClientOutput.java b/common/src/main/java/org/jboss/fuse/mvnd/common/logging/ClientOutput.java index fb5b5912..16246c1c 100644 --- a/common/src/main/java/org/jboss/fuse/mvnd/common/logging/ClientOutput.java +++ b/common/src/main/java/org/jboss/fuse/mvnd/common/logging/ClientOutput.java @@ -24,7 +24,9 @@ import org.jboss.fuse.mvnd.common.Message; */ public interface ClientOutput extends AutoCloseable { - void setDeamonDispatch(Consumer sink); + void setDaemonDispatch(Consumer sink); + + void setDaemonReceive(Consumer sink); void accept(Message message); diff --git a/common/src/main/java/org/jboss/fuse/mvnd/common/logging/TerminalOutput.java b/common/src/main/java/org/jboss/fuse/mvnd/common/logging/TerminalOutput.java index 84e53532..4d9df2de 100644 --- a/common/src/main/java/org/jboss/fuse/mvnd/common/logging/TerminalOutput.java +++ b/common/src/main/java/org/jboss/fuse/mvnd/common/logging/TerminalOutput.java @@ -69,6 +69,8 @@ public class TerminalOutput implements ClientOutput { /** A sink for sending messages back to the daemon */ private volatile Consumer daemonDispatch; + /** A sink for queuing messages to the main queue */ + private volatile Consumer daemonReceive; /* * The following non-final fields are read/written from the main thread only. @@ -131,10 +133,15 @@ public class TerminalOutput implements ClientOutput { } @Override - public void setDeamonDispatch(Consumer daemonDispatch) { + public void setDaemonDispatch(Consumer daemonDispatch) { this.daemonDispatch = daemonDispatch; } + @Override + public void setDaemonReceive(Consumer daemonReceive) { + this.daemonReceive = daemonReceive; + } + @Override public void accept(Message entry) { assert "main".equals(Thread.currentThread().getName()); @@ -356,7 +363,7 @@ public class TerminalOutput implements ClientOutput { break; } if (c == '+' || c == '-' || c == CTRL_L || c == CTRL_M || c == CTRL_B) { - accept(Message.keyboardInput((char) c)); + daemonReceive.accept(Message.keyboardInput((char) c)); } readInput.readLock().unlock(); } diff --git a/integration-tests/src/test/java/org/jboss/fuse/mvnd/assertj/TestClientOutput.java b/integration-tests/src/test/java/org/jboss/fuse/mvnd/assertj/TestClientOutput.java index f1498725..d6fb74c4 100644 --- a/integration-tests/src/test/java/org/jboss/fuse/mvnd/assertj/TestClientOutput.java +++ b/integration-tests/src/test/java/org/jboss/fuse/mvnd/assertj/TestClientOutput.java @@ -32,10 +32,14 @@ public class TestClientOutput implements ClientOutput { } @Override - public void setDeamonDispatch(Consumer daemonDispatch) { + public void setDaemonDispatch(Consumer daemonDispatch) { this.daemonDispatch = daemonDispatch; } + @Override + public void setDaemonReceive(Consumer sink) { + } + @Override public void accept(Message message) { messages.add(message);