Support redirecting input from client to daemon, #541 (#581)

The implementation currently switches on the redirection when the daemon actually starts reading the System.in stream using InputStream.read() or InputStream.available().
This commit is contained in:
Guillaume Nodet
2022-12-13 23:59:35 +01:00
committed by GitHub
parent 2a8d18d119
commit 2b8076d1ed
3 changed files with 178 additions and 3 deletions

View File

@@ -25,18 +25,23 @@ import static org.mvndaemon.mvnd.common.DaemonState.StopRequested;
import static org.mvndaemon.mvnd.common.DaemonState.Stopped;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.security.MessageDigest;
import java.security.SecureRandom;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
@@ -47,6 +52,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.maven.cli.DaemonMavenCli;
@@ -482,6 +488,8 @@ public class Server implements AutoCloseable, Runnable {
final BlockingQueue<Message> sendQueue = new PriorityBlockingQueue<>(64, Message.getMessageComparator());
final BlockingQueue<Message> recvQueue = new LinkedBlockingDeque<>();
final BuildEventListener buildEventListener = new ClientDispatcher(sendQueue);
final DaemonInputStream daemonInputStream =
new DaemonInputStream(projectId -> sendQueue.add(Message.requestInput(projectId)));
try (ProjectBuildLogAppender logAppender = new ProjectBuildLogAppender(buildEventListener)) {
LOGGER.info("Executing request");
@@ -529,6 +537,8 @@ public class Server implements AutoCloseable, Runnable {
if (message == Message.BareMessage.CANCEL_BUILD_SINGLETON) {
updateState(Canceled);
return;
} else if (message instanceof Message.InputData) {
daemonInputStream.addInputData(((Message.InputData) message).getData());
} else {
synchronized (recvQueue) {
recvQueue.put(message);
@@ -581,6 +591,7 @@ public class Server implements AutoCloseable, Runnable {
}
}
});
System.setIn(daemonInputStream);
System.setOut(new LoggingOutputStream(s -> sendQueue.add(Message.out(s))).printStream());
System.setErr(new LoggingOutputStream(s -> sendQueue.add(Message.err(s))).printStream());
int exitCode = cli.main(
@@ -650,4 +661,67 @@ public class Server implements AutoCloseable, Runnable {
public String toString() {
return info.toString();
}
static class DaemonInputStream extends InputStream {
private final Consumer<String> startReadingFromProject;
private final LinkedList<byte[]> datas = new LinkedList<>();
private int pos = -1;
private String projectReading = null;
DaemonInputStream(Consumer<String> startReadingFromProject) {
this.startReadingFromProject = startReadingFromProject;
}
@Override
public int available() throws IOException {
synchronized (datas) {
String projectId = ProjectBuildLogAppender.getProjectId();
if (!Objects.equals(projectId, projectReading)) {
projectReading = projectId;
startReadingFromProject.accept(projectId);
}
return datas.stream().mapToInt(a -> a.length).sum() - Math.max(pos, 0);
}
}
@Override
public int read() throws IOException {
synchronized (datas) {
String projectId = ProjectBuildLogAppender.getProjectId();
if (!Objects.equals(projectId, projectReading)) {
projectReading = projectId;
startReadingFromProject.accept(projectId);
// TODO: start a 10ms timer to turn data off
}
for (; ; ) {
if (datas.isEmpty()) {
try {
datas.wait();
} catch (InterruptedException e) {
throw new InterruptedIOException("Interrupted");
}
pos = -1;
continue;
}
byte[] curData = datas.getFirst();
if (pos >= curData.length) {
datas.removeFirst();
pos = -1;
continue;
}
if (pos < 0) {
pos = 0;
}
return curData[pos++];
}
}
}
public void addInputData(String data) {
synchronized (datas) {
datas.add(data.getBytes(Charset.forName(System.getProperty("file.encoding"))));
datas.notifyAll();
}
}
}
}