diff --git a/common/src/main/java/org/mvndaemon/mvnd/common/Message.java b/common/src/main/java/org/mvndaemon/mvnd/common/Message.java index 6c7de2b7..2878f3e3 100644 --- a/common/src/main/java/org/mvndaemon/mvnd/common/Message.java +++ b/common/src/main/java/org/mvndaemon/mvnd/common/Message.java @@ -1054,30 +1054,38 @@ public abstract class Message { public static class RequestInput extends Message { private String projectId; + private int bytesToRead; public static RequestInput read(DataInputStream input) throws IOException { String projectId = readUTF(input); - return new RequestInput(projectId); + int bytesToRead = input.readInt(); + return new RequestInput(projectId, bytesToRead); } - public RequestInput(String projectId) { + public RequestInput(String projectId, int bytesToRead) { super(REQUEST_INPUT); this.projectId = projectId; + this.bytesToRead = bytesToRead; } public String getProjectId() { return projectId; } + public int getBytesToRead() { + return bytesToRead; + } + @Override public String toString() { - return "RequestInput{" + "projectId='" + projectId + '\'' + '}'; + return "RequestInput{" + "projectId='" + projectId + '\'' + ", bytesToRead=" + bytesToRead + '}'; } @Override public void write(DataOutputStream output) throws IOException { super.write(output); writeUTF(output, projectId); + output.writeInt(bytesToRead); } } @@ -1099,9 +1107,13 @@ public abstract class Message { return data; } + public boolean isEof() { + return data == null; + } + @Override public String toString() { - return "InputResponse{" + "data='" + data + "\'" + '}'; + return "InputResponse{" + (data == null ? "eof" : "data='" + data + "'") + '}'; } @Override @@ -1119,14 +1131,18 @@ public abstract class Message { return new StringMessage(BUILD_STATUS, payload); } - public static RequestInput requestInput(String projectId) { - return new RequestInput(projectId); + public static RequestInput requestInput(String projectId, int bytesToRead) { + return new RequestInput(projectId, bytesToRead); } public static InputData inputResponse(String data) { return new InputData(data); } + public static InputData inputEof() { + return new InputData(null); + } + public static StringMessage out(String message) { return new StringMessage(PRINT_OUT, message); } diff --git a/common/src/main/java/org/mvndaemon/mvnd/common/logging/TerminalInputHandler.java b/common/src/main/java/org/mvndaemon/mvnd/common/logging/TerminalInputHandler.java new file mode 100644 index 00000000..2be40d88 --- /dev/null +++ b/common/src/main/java/org/mvndaemon/mvnd/common/logging/TerminalInputHandler.java @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.mvndaemon.mvnd.common.logging; + +import java.io.IOException; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +import org.jline.terminal.Terminal; +import org.mvndaemon.mvnd.common.Message; + +/** + * Handles terminal input in a clean, thread-safe manner using a producer-consumer pattern. + * + * This class is responsible for: + * 1. Reading input from the terminal based on different types of requests: + * - Project input: Reading raw input for a specific project + * - Prompts: Handling interactive prompts with user feedback + * - Control keys: Monitoring for special control keys in non-dumb terminals + * + * 2. Managing input state through InputRequest objects which specify: + * - The type of input needed (project input, prompt, or control keys) + * - The project requiring input + * - How many bytes to read + * + * 3. Converting input to appropriate Message objects and sending them to either: + * - daemonDispatch: for prompt responses + * - daemonReceive: for project input and control keys + * + * The class detects end-of-stream conditions (EOF) and communicates them back through + * the message system, which is crucial for handling piped input (e.g., cat file | mvnd ...). + * + * Input handling differs based on terminal type: + * - Normal terminals: Handle all input types including control keys + * - Dumb terminals: Only handle project input and prompts, ignore control keys + */ +public class TerminalInputHandler implements AutoCloseable { + private final Terminal terminal; + private final BlockingQueue inputRequests; + private volatile boolean closing; + private final Thread inputThread; + private final boolean dumb; + private volatile int maxThreads; + + private volatile Consumer daemonDispatch; + private volatile Consumer daemonReceive; + + private static class InputRequest { + final String projectId; // null for control keys + final Message.Prompt prompt; // non-null only for prompt requests + final boolean isControlKey; // true for control key listening + final int bytesToRead; // max number of bytes to read + + private InputRequest(String projectId, Message.Prompt prompt, boolean isControlKey, int bytesToRead) { + this.projectId = projectId; + this.prompt = prompt; + this.isControlKey = isControlKey; + this.bytesToRead = bytesToRead; + } + + static InputRequest forProject(String projectId, int bytesToRead) { + return new InputRequest(projectId, null, false, bytesToRead); + } + + static InputRequest forPrompt(Message.Prompt prompt) { + return new InputRequest(prompt.getProjectId(), prompt, false, 0); + } + + static InputRequest forControlKeys() { + return new InputRequest(null, null, true, 0); + } + } + + public TerminalInputHandler(Terminal terminal, boolean dumb) { + this.terminal = terminal; + this.inputRequests = new LinkedBlockingQueue<>(); + this.dumb = dumb; + + // Always create input thread as we always need to handle prompts and project input + this.inputThread = new Thread(() -> { + try { + while (!closing) { + InputRequest request = inputRequests.poll(10, TimeUnit.MILLISECONDS); + if (request == null) { + // No active request + if (!dumb) { + // Only listen for control keys in non-dumb mode + handleControlKeys(); + } + } else if (request.prompt != null) { + // Always handle prompts + handlePrompt(request.prompt); + } else if (request.projectId != null) { + // Always handle project input + handleProjectInput(request.projectId, request.bytesToRead); + } else if (!dumb && request.isControlKey) { + // Only handle control keys in non-dumb mode + handleControlKeys(); + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (IOException e) { + // Handle terminal IO exception + } + }); + inputThread.setDaemon(true); + inputThread.start(); + } + + private void handleProjectInput(String projectId, int bytesToRead) throws IOException { + if (daemonReceive == null) { + return; + } + char[] buf = bytesToRead > 0 ? new char[bytesToRead] : new char[8192]; + int idx = 0; + int timeout = 10; // Initial timeout for first read + + while ((bytesToRead < 0 || idx < bytesToRead) && idx < buf.length) { + int c = terminal.reader().read(timeout); + if (c < 0) { + // End of stream reached + daemonReceive.accept(Message.inputEof()); + break; + } + buf[idx++] = (char) c; + timeout = idx > 0 ? 1 : 10; // Shorter timeout after first char + } + + if (idx > 0) { + String data = String.valueOf(buf, 0, idx); + daemonReceive.accept(Message.inputResponse(data)); + } + } + + private void handleControlKeys() throws IOException { + if (daemonReceive == null) { + return; + } + int c = terminal.reader().read(10); + if (c != -1 && isControlKey(c)) { + daemonReceive.accept(Message.keyboardInput((char) c)); + } + } + + private void handlePrompt(Message.Prompt prompt) throws IOException { + if (daemonDispatch == null) { + return; + } + if (prompt.getMessage() != null) { + String msg = formatPromptMessage(prompt); + terminal.writer().print(msg); + } + terminal.flush(); + + StringBuilder sb = new StringBuilder(); + while (true) { + int c = terminal.reader().read(); + if (c < 0) { + break; + } else if (c == '\n' || c == '\r') { + terminal.writer().println(); + daemonDispatch.accept(prompt.response(sb.toString())); + break; + } else if (c == 127) { // Backspace + if (sb.length() > 0) { + sb.setLength(sb.length() - 1); + terminal.writer().write("\b \b"); + terminal.writer().flush(); + } + } else { + terminal.writer().print((char) c); + terminal.writer().flush(); + sb.append((char) c); + } + } + // After prompt is handled, go back to control key listening only if not dumb + if (!dumb) { + inputRequests.offer(InputRequest.forControlKeys()); + } + } + + private boolean isControlKey(int c) { + return c == TerminalOutput.KEY_PLUS + || c == TerminalOutput.KEY_MINUS + || c == TerminalOutput.KEY_CTRL_L + || c == TerminalOutput.KEY_CTRL_M + || c == TerminalOutput.KEY_CTRL_B; + } + + private String formatPromptMessage(Message.Prompt prompt) { + return (maxThreads > 1) + ? String.format("[%s] %s", prompt.getProjectId(), prompt.getMessage()) + : prompt.getMessage(); + } + + public void setDaemonDispatch(Consumer daemonDispatch) { + this.daemonDispatch = daemonDispatch; + } + + public void setDaemonReceive(Consumer daemonReceive) { + this.daemonReceive = daemonReceive; + } + + public void setMaxThreads(int maxThreads) { + this.maxThreads = maxThreads; + } + + public void requestProjectInput(String projectId, int bytesToRead) { + inputRequests.clear(); // Clear any pending requests + inputRequests.offer(InputRequest.forProject(projectId, bytesToRead)); + } + + public void requestPrompt(Message.Prompt prompt) { + inputRequests.clear(); // Clear any pending requests + inputRequests.offer(InputRequest.forPrompt(prompt)); + } + + @Override + public void close() { + closing = true; + if (inputThread != null) { + inputThread.interrupt(); + } + } +} diff --git a/common/src/main/java/org/mvndaemon/mvnd/common/logging/TerminalOutput.java b/common/src/main/java/org/mvndaemon/mvnd/common/logging/TerminalOutput.java index 89708f00..bf322f67 100644 --- a/common/src/main/java/org/mvndaemon/mvnd/common/logging/TerminalOutput.java +++ b/common/src/main/java/org/mvndaemon/mvnd/common/logging/TerminalOutput.java @@ -19,7 +19,6 @@ package org.mvndaemon.mvnd.common.logging; import java.io.IOException; -import java.io.InterruptedIOException; import java.io.Writer; import java.nio.charset.StandardCharsets; import java.nio.file.Files; @@ -32,9 +31,6 @@ import java.util.Deque; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; import java.util.stream.Collector; import java.util.stream.Collectors; @@ -102,19 +98,16 @@ public class TerminalOutput implements ClientOutput { private final ArrayList failures = new ArrayList<>(); private final LinkedHashMap projects = new LinkedHashMap<>(); private final ClientLog log; - private final Thread reader; private volatile Exception exception; private volatile boolean closing; private final long start; - private final ReadWriteLock readInput = new ReentrantReadWriteLock(); private final boolean dumb; + private final TerminalInputHandler inputHandler; /** A sink for sending messages back to the daemon */ private volatile Consumer daemonDispatch; /** A sink for queuing messages to the main queue */ private volatile Consumer daemonReceive; - /** The project id which is trying to read the input stream */ - private volatile String projectReadingInput; /* * The following non-final fields are read/written from the main thread only. @@ -173,14 +166,7 @@ public class TerminalOutput implements ClientOutput { Terminal.Signal.INT, sig -> daemonDispatch.accept(Message.BareMessage.CANCEL_BUILD_SINGLETON)); this.display = new Display(terminal, false); this.log = logFile == null ? new MessageCollector() : new FileLog(logFile); - if (!dumb) { - final Thread r = new Thread(this::readInputLoop); - r.setDaemon(true); - r.start(); - this.reader = r; - } else { - this.reader = null; - } + this.inputHandler = new TerminalInputHandler(terminal, this.dumb); } @Override @@ -191,11 +177,13 @@ public class TerminalOutput implements ClientOutput { @Override public void setDaemonDispatch(Consumer daemonDispatch) { this.daemonDispatch = daemonDispatch; + this.inputHandler.setDaemonDispatch(daemonDispatch); } @Override public void setDaemonReceive(Consumer daemonReceive) { this.daemonReceive = daemonReceive; + this.inputHandler.setDaemonReceive(daemonReceive); } @Override @@ -226,6 +214,7 @@ public class TerminalOutput implements ClientOutput { final int totalProjectsDigits = (int) (Math.log10(totalProjects) + 1); this.projectsDoneFomat = "%" + totalProjectsDigits + "d"; this.maxThreads = bs.getMaxThreads(); + this.inputHandler.setMaxThreads(maxThreads); this.artifactIdFormat = "%-" + bs.getArtifactIdDisplayLength() + "s "; final int maxThreadsDigits = (int) (Math.log10(maxThreads) + 1); this.threadsFormat = "%" + (maxThreadsDigits * 3 + 2) + "s"; @@ -343,42 +332,8 @@ public class TerminalOutput implements ClientOutput { terminal.writer().println(""); break; } - readInput.writeLock().lock(); - try { - clearDisplay(); - if (prompt.getMessage() != null) { - String msg = (maxThreads > 1) - ? String.format("[%s] %s", prompt.getProjectId(), prompt.getMessage()) - : prompt.getMessage(); - terminal.writer().print(msg); - } - terminal.flush(); - StringBuilder sb = new StringBuilder(); - while (true) { - int c = terminal.reader().read(); - if (c < 0) { - break; - } else if (c == '\n' || c == '\r') { - terminal.writer().println(); - daemonDispatch.accept(prompt.response(sb.toString())); - break; - } else if (c == 127) { - if (sb.length() > 0) { - sb.setLength(sb.length() - 1); - terminal.writer().write("\b \b"); - terminal.writer().flush(); - } - } else { - terminal.writer().print((char) c); - terminal.writer().flush(); - sb.append((char) c); - } - } - } catch (IOException e) { - throw new RuntimeException(e); - } finally { - readInput.writeLock().unlock(); - } + clearDisplay(); + inputHandler.requestPrompt(prompt); break; } case Message.BUILD_LOG_MESSAGE: { @@ -456,7 +411,7 @@ public class TerminalOutput implements ClientOutput { } case Message.REQUEST_INPUT: { RequestInput ri = (RequestInput) entry; - projectReadingInput = ri.getProjectId(); + inputHandler.requestProjectInput(ri.getProjectId(), ri.getBytesToRead()); break; } case Message.INPUT_DATA: { @@ -502,45 +457,6 @@ public class TerminalOutput implements ClientOutput { return terminal; } - void readInputLoop() { - try { - while (!closing) { - if (readInput.readLock().tryLock(10, TimeUnit.MILLISECONDS)) { - if (projectReadingInput != null) { - char[] buf = new char[256]; - int idx = 0; - while (idx < buf.length) { - int c = terminal.reader().read(idx > 0 ? 1 : 10); - if (c < 0) { - break; - } - buf[idx++] = (char) c; - } - if (idx > 0) { - String data = String.valueOf(buf, 0, idx); - daemonReceive.accept(Message.inputResponse(data)); - } - } else { - int c = terminal.reader().read(10); - if (c == -1) { - break; - } - if (c == KEY_PLUS || c == KEY_MINUS || c == KEY_CTRL_L || c == KEY_CTRL_M || c == KEY_CTRL_B) { - daemonReceive.accept(Message.keyboardInput((char) c)); - } - } - readInput.readLock().unlock(); - } - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } catch (InterruptedIOException e) { - Thread.currentThread().interrupt(); - } catch (IOException e) { - this.exception = e; - } - } - private void clearDisplay() { if (!noBuffering && !dumb) { display.update(Collections.emptyList(), 0); @@ -560,9 +476,7 @@ public class TerminalOutput implements ClientOutput { @Override public void close() throws Exception { closing = true; - if (reader != null) { - reader.interrupt(); - } + inputHandler.close(); log.close(); terminal.handle(Terminal.Signal.INT, previousIntHandler); terminal.close(); @@ -781,13 +695,11 @@ public class TerminalOutput implements ClientOutput { .style(AttributedStyle.BOLD) .append(String.format( threadsFormat, - new StringBuilder(threadsFormat.length()) - .append(projectsCount) - .append('/') - .append(Math.max(0, projectsCount - dispLines)) - .append('/') - .append(maxThreads) - .toString())) + String.valueOf(projectsCount) + + '/' + + Math.max(0, projectsCount - dispLines) + + '/' + + maxThreads)) .style(AttributedStyle.DEFAULT); /* Progress */ @@ -801,7 +713,7 @@ public class TerminalOutput implements ClientOutput { .append('%') .style(AttributedStyle.DEFAULT); - } else if (buildStatus != null) { + } else { asb.style(AttributedStyle.BOLD).append(buildStatus).style(AttributedStyle.DEFAULT); } diff --git a/daemon/src/main/java/org/apache/maven/cli/DaemonMavenInvoker.java b/daemon/src/main/java/org/apache/maven/cli/DaemonMavenInvoker.java index 69be4ab7..53773d11 100644 --- a/daemon/src/main/java/org/apache/maven/cli/DaemonMavenInvoker.java +++ b/daemon/src/main/java/org/apache/maven/cli/DaemonMavenInvoker.java @@ -54,14 +54,13 @@ public class DaemonMavenInvoker extends ResidentMavenInvoker { if (context.coloredOutput != null) { builder.color(context.coloredOutput); } + // we do want to pause input + builder.paused(true); }, terminal -> doConfigureWithTerminal(context, terminal)); context.terminal = MessageUtils.getTerminal(); context.closeables.add(MessageUtils::systemUninstall); MessageUtils.registerShutdownHook(); - if (context.coloredOutput != null) { - MessageUtils.setColorEnabled(context.coloredOutput); - } } @Override diff --git a/daemon/src/main/java/org/mvndaemon/mvnd/daemon/DaemonInputStream.java b/daemon/src/main/java/org/mvndaemon/mvnd/daemon/DaemonInputStream.java new file mode 100644 index 00000000..6843a910 --- /dev/null +++ b/daemon/src/main/java/org/mvndaemon/mvnd/daemon/DaemonInputStream.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.mvndaemon.mvnd.daemon; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; +import java.nio.charset.Charset; +import java.util.LinkedList; +import java.util.Objects; +import java.util.function.BiConsumer; + +import org.apache.maven.logging.ProjectBuildLogAppender; + +/** + * An InputStream implementation that manages input for Maven daemon processes. + * + * This class implements a buffered input stream that: + * 1. Tracks which project is currently reading input using ProjectBuildLogAppender + * 2. Requests input from the client when needed through a callback + * 3. Buffers received input data in memory + * + * Key behaviors: + * - Input is requested through startReadingFromProject callback whenever: + * a) The reading project changes + * b) The input buffer is empty and more data is needed + * - The callback receives both the project ID and the number of bytes requested + * - Data is added to the buffer through addInputData, which can be called from another thread + * - EOF is signaled by calling addInputData with null + * + * The stream coordinates between multiple threads: + * - Reader thread(s): Calling read() methods to get input + * - Writer thread: Calling addInputData to provide input data + * + * Synchronization: + * - All buffer access is synchronized on the datas collection + * - Readers wait when no data is available using datas.wait() + * - Writers notify readers when new data arrives using datas.notifyAll() + * + * This implementation is particularly important for: + * 1. Handling piped input (e.g., cat file | mvnd ...) + * 2. Supporting interactive input during builds + * 3. Managing input across multiple project builds + */ +class DaemonInputStream extends InputStream { + private final BiConsumer startReadingFromProject; + private final LinkedList datas = new LinkedList<>(); + private final Charset charset; + private int pos = -1; + private String projectReading = null; + private volatile boolean eof = false; + + DaemonInputStream(BiConsumer startReadingFromProject) { + this.startReadingFromProject = startReadingFromProject; + this.charset = Charset.forName(System.getProperty("file.encoding")); + } + + @Override + public int available() throws IOException { + synchronized (datas) { + String projectId = ProjectBuildLogAppender.getProjectId(); + if (!eof && !Objects.equals(projectId, projectReading)) { + projectReading = projectId; + startReadingFromProject.accept(projectId, 1); + } + return datas.stream().mapToInt(a -> a.length).sum() - Math.max(pos, 0); + } + } + + @Override + public int read() throws IOException { + byte[] b = new byte[1]; + int read = read(b, 0, 1); + if (read == 1) { + return b[0]; + } + return -1; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + synchronized (datas) { + if (eof && datas.isEmpty()) { + return -1; // Return EOF if we've reached the end and no more data + } + String projectId = ProjectBuildLogAppender.getProjectId(); + if (!Objects.equals(projectId, projectReading)) { + projectReading = projectId; + } + int read = 0; + while (read < len) { + if (datas.isEmpty()) { + if (eof) { + return read > 0 ? read : -1; // Exit properly on EOF + } + if (read > 0) { + break; + } + // Always notify we need input when waiting for data + startReadingFromProject.accept(projectReading, len - read); + try { + datas.wait(); + } catch (InterruptedException e) { + throw new InterruptedIOException("Interrupted"); + } + pos = -1; + continue; + } + byte[] curData = datas.getFirst(); + if (pos >= curData.length) { + datas.removeFirst(); + pos = -1; + continue; + } + if (pos < 0) { + pos = 0; + } + b[off + read++] = curData[pos++]; + } + return read; + } + } + + public void addInputData(String data) { + synchronized (datas) { + if (data == null) { + eof = true; + } else { + datas.add(data.getBytes(charset)); + } + datas.notifyAll(); + } + } +} diff --git a/daemon/src/main/java/org/mvndaemon/mvnd/daemon/Server.java b/daemon/src/main/java/org/mvndaemon/mvnd/daemon/Server.java index a8bcc956..4fcc4d78 100644 --- a/daemon/src/main/java/org/mvndaemon/mvnd/daemon/Server.java +++ b/daemon/src/main/java/org/mvndaemon/mvnd/daemon/Server.java @@ -20,22 +20,18 @@ package org.mvndaemon.mvnd.daemon; import java.io.IOException; import java.io.InputStream; -import java.io.InterruptedIOException; import java.lang.reflect.Field; import java.nio.ByteBuffer; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; -import java.nio.charset.Charset; import java.security.MessageDigest; import java.security.SecureRandom; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; -import java.util.LinkedList; import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.Objects; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; @@ -46,7 +42,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import java.util.function.Consumer; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -513,9 +508,11 @@ public class Server implements AutoCloseable, Runnable { final BlockingQueue sendQueue = new PriorityBlockingQueue<>(64, Message.getMessageComparator()); final BlockingQueue recvQueue = new LinkedBlockingDeque<>(); final BuildEventListener buildEventListener = new ClientDispatcher(sendQueue); - final DaemonInputStream daemonInputStream = - new DaemonInputStream(projectId -> sendQueue.add(Message.requestInput(projectId))); + final DaemonInputStream daemonInputStream = new DaemonInputStream( + (projectId, bytesToRead) -> sendQueue.add(Message.requestInput(projectId, bytesToRead))); + InputStream in = System.in; try { + System.setIn(daemonInputStream); LOGGER.info("Executing request"); @@ -639,6 +636,7 @@ public class Server implements AutoCloseable, Runnable { } catch (Throwable t) { LOGGER.error("Error while building project", t); } finally { + System.setIn(in); if (!noDaemon) { LOGGER.info("Daemon back to idle"); updateState(DaemonState.Idle); @@ -688,67 +686,4 @@ public class Server implements AutoCloseable, Runnable { public String toString() { return info.toString(); } - - static class DaemonInputStream extends InputStream { - private final Consumer startReadingFromProject; - private final LinkedList datas = new LinkedList<>(); - private int pos = -1; - private String projectReading = null; - - DaemonInputStream(Consumer startReadingFromProject) { - this.startReadingFromProject = startReadingFromProject; - } - - @Override - public int available() throws IOException { - synchronized (datas) { - String projectId = ProjectBuildLogAppender.getProjectId(); - if (!Objects.equals(projectId, projectReading)) { - projectReading = projectId; - startReadingFromProject.accept(projectId); - } - return datas.stream().mapToInt(a -> a.length).sum() - Math.max(pos, 0); - } - } - - @Override - public int read() throws IOException { - synchronized (datas) { - String projectId = ProjectBuildLogAppender.getProjectId(); - if (!Objects.equals(projectId, projectReading)) { - projectReading = projectId; - startReadingFromProject.accept(projectId); - // TODO: start a 10ms timer to turn data off - } - for (; ; ) { - if (datas.isEmpty()) { - try { - datas.wait(); - } catch (InterruptedException e) { - throw new InterruptedIOException("Interrupted"); - } - pos = -1; - continue; - } - byte[] curData = datas.getFirst(); - if (pos >= curData.length) { - datas.removeFirst(); - pos = -1; - continue; - } - if (pos < 0) { - pos = 0; - } - return curData[pos++]; - } - } - } - - public void addInputData(String data) { - synchronized (datas) { - datas.add(data.getBytes(Charset.forName(System.getProperty("file.encoding")))); - datas.notifyAll(); - } - } - } } diff --git a/integration-tests/src/test/java/org/mvndaemon/mvnd/it/InputStreamNativeIT.java b/integration-tests/src/test/java/org/mvndaemon/mvnd/it/InputStreamNativeIT.java new file mode 100644 index 00000000..78df741e --- /dev/null +++ b/integration-tests/src/test/java/org/mvndaemon/mvnd/it/InputStreamNativeIT.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.mvndaemon.mvnd.it; + +import javax.inject.Inject; + +import java.io.IOException; + +import org.junit.jupiter.api.Test; +import org.mvndaemon.mvnd.assertj.TestClientOutput; +import org.mvndaemon.mvnd.client.Client; +import org.mvndaemon.mvnd.client.DaemonParameters; +import org.mvndaemon.mvnd.common.Message; +import org.mvndaemon.mvnd.junit.MvndNativeTest; + +@MvndNativeTest(projectDir = "src/test/projects/input-stream") +class InputStreamNativeIT { + + @Inject + Client client; + + @Inject + DaemonParameters parameters; + + @Test + void installPluginAndTest() throws IOException, InterruptedException { + final TestClientOutput output = new TestClientOutput() { + int input = 0; + + @Override + public void accept(Message message) { + if (message instanceof Message.RequestInput) { + if (input++ < 10) { + daemonDispatch.accept(Message.inputResponse("0123456789\n")); + } else { + daemonDispatch.accept(Message.inputEof()); + } + } + if (!(message instanceof Message.TransferEvent)) { + super.accept(message); + } + } + }; + client.execute(output, "install").assertSuccess(); + + client.execute(output, "org.mvndaemon.mvnd.test.input-stream:echo-maven-plugin:echo"); + } +} diff --git a/integration-tests/src/test/java/org/mvndaemon/mvnd/it/InputStreamTest.java b/integration-tests/src/test/java/org/mvndaemon/mvnd/it/InputStreamTest.java new file mode 100644 index 00000000..c037880d --- /dev/null +++ b/integration-tests/src/test/java/org/mvndaemon/mvnd/it/InputStreamTest.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.mvndaemon.mvnd.it; + +import org.mvndaemon.mvnd.junit.MvndTest; + +@MvndTest(projectDir = "src/test/projects/input-stream") +class InputStreamTest extends InputStreamNativeIT {} diff --git a/integration-tests/src/test/projects/input-stream/.mvn/.gitkeep b/integration-tests/src/test/projects/input-stream/.mvn/.gitkeep new file mode 100644 index 00000000..e69de29b diff --git a/integration-tests/src/test/projects/input-stream/pom.xml b/integration-tests/src/test/projects/input-stream/pom.xml new file mode 100644 index 00000000..1ce75a56 --- /dev/null +++ b/integration-tests/src/test/projects/input-stream/pom.xml @@ -0,0 +1,85 @@ + + + + 4.0.0 + + org.mvndaemon.mvnd.test.input-stream + echo-maven-plugin + 1.0-SNAPSHOT + maven-plugin + + echo-maven-plugin Maven Plugin + + + ${maven.version} + + + + UTF-8 + 8 + 8 + 3.9.9 + + + + + org.apache.maven + maven-plugin-api + ${maven.version} + provided + + + org.apache.maven + maven-core + ${maven.version} + provided + + + org.apache.maven + maven-artifact + ${maven.version} + provided + + + org.apache.maven.plugin-tools + maven-plugin-annotations + 3.13.1 + provided + + + + + + + org.apache.maven.plugins + maven-plugin-plugin + + + + + + mojo-descriptor + + descriptor + + + + + + + diff --git a/integration-tests/src/test/projects/input-stream/src/main/java/org/apache/maven/its/EchoMojo.java b/integration-tests/src/test/projects/input-stream/src/main/java/org/apache/maven/its/EchoMojo.java new file mode 100644 index 00000000..e789fb91 --- /dev/null +++ b/integration-tests/src/test/projects/input-stream/src/main/java/org/apache/maven/its/EchoMojo.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.maven.its; + +import org.apache.maven.plugin.AbstractMojo; +import org.apache.maven.plugin.MojoExecutionException; + +import org.apache.maven.plugins.annotations.Mojo; +import org.apache.maven.plugins.annotations.Parameter; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.Scanner; + +/** + * Goal which copy stdin to stdout. + */ +@Mojo( name = "echo", requiresProject = false ) +public class EchoMojo + extends AbstractMojo +{ + + public void execute() + throws MojoExecutionException + { + getLog().info("Reading from standard input. Type 'exit' to stop."); + + try (Scanner scanner = new Scanner(System.in)) { + while (scanner.hasNextLine()) { + String line = scanner.nextLine(); + if ("exit".equalsIgnoreCase(line.trim())) { + getLog().info("Exiting..."); + break; + } + System.out.println(line); + } + } + } +}