Implement build cancelation, fixes #127

This commit is contained in:
Guillaume Nodet
2020-11-09 12:56:18 +01:00
committed by Peter Palaga
parent 02ef4a2fc2
commit 0800aeb791
12 changed files with 168 additions and 78 deletions

View File

@@ -32,7 +32,6 @@ import org.jboss.fuse.mvnd.common.DaemonException.ConnectException;
import org.jboss.fuse.mvnd.common.DaemonException.StaleAddressException;
import org.jboss.fuse.mvnd.common.DaemonInfo;
import org.jboss.fuse.mvnd.common.Message;
import org.jboss.fuse.mvnd.common.Message.Prompt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -86,6 +85,14 @@ public class DaemonClientConnection implements Closeable {
}
throw new DaemonException.ConnectException("Could not dispatch a message to the daemon.", e);
}
// in case we dispatch a cancelation request, also forward it to the main thread to exit asap
try {
if (message.getType() == Message.CANCEL_BUILD) {
queue.put(message);
}
} catch (InterruptedException e) {
throw new DaemonException.InterruptedException(e);
}
}
public List<Message> receive() throws ConnectException, StaleAddressException {
@@ -130,10 +137,6 @@ public class DaemonClientConnection implements Closeable {
if (m == null) {
break;
}
if (m.getType() == Message.PROMPT) {
final Prompt prompt = (Prompt) m;
m = prompt.withCallback(response -> dispatch(prompt.response(response)));
}
queue.put(m);
}
} catch (Exception e) {

View File

@@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.List;
import org.fusesource.jansi.Ansi;
import org.jboss.fuse.mvnd.common.BuildProperties;
import org.jboss.fuse.mvnd.common.DaemonException;
import org.jboss.fuse.mvnd.common.DaemonInfo;
import org.jboss.fuse.mvnd.common.DaemonRegistry;
import org.jboss.fuse.mvnd.common.Environment;
@@ -32,13 +33,13 @@ import org.jboss.fuse.mvnd.common.Message.BuildException;
import org.jboss.fuse.mvnd.common.OsUtils;
import org.jboss.fuse.mvnd.common.logging.ClientOutput;
import org.jboss.fuse.mvnd.common.logging.TerminalOutput;
import org.jline.utils.AttributedString;
import org.jline.utils.AttributedStyle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DefaultClient implements Client {
public static final int CANCEL_TIMEOUT = 10 * 1000;
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultClient.class);
private final DaemonParameters parameters;
@@ -62,7 +63,13 @@ public class DefaultClient implements Client {
}
try (TerminalOutput output = new TerminalOutput(logFile)) {
new DefaultClient(new DaemonParameters()).execute(output, args);
try {
new DefaultClient(new DaemonParameters()).execute(output, args);
} catch (DaemonException.InterruptedException e) {
final AttributedStyle s = new AttributedStyle().bold().foreground(AttributedStyle.RED);
String str = new AttributedString(System.lineSeparator() + "Canceled by user", s).toAnsi();
output.accept(Message.display(str));
}
}
}
@@ -157,7 +164,7 @@ public class DefaultClient implements Client {
if (stop) {
DaemonInfo[] dis = registry.getAll().toArray(new DaemonInfo[0]);
if (dis.length > 0) {
output.accept(Message.log("Stopping " + dis.length + " running daemons"));
output.accept(Message.display("Stopping " + dis.length + " running daemons"));
for (DaemonInfo di : dis) {
try {
ProcessHandle.of(di.getPid()).ifPresent(ProcessHandle::destroyForcibly);
@@ -191,6 +198,7 @@ public class DefaultClient implements Client {
final DaemonConnector connector = new DaemonConnector(parameters, registry);
try (DaemonClientConnection daemon = connector.connect(output)) {
output.setDeamonDispatch(daemon::dispatch);
output.accept(Message.buildStatus("Connected to daemon"));
daemon.dispatch(new Message.BuildRequest(
@@ -203,24 +211,20 @@ public class DefaultClient implements Client {
while (true) {
final List<Message> messages = daemon.receive();
for (int i = 0; i < messages.size(); i++) {
Message m = messages.get(i);
output.accept(messages);
for (Message m : messages) {
switch (m.getType()) {
case Message.BUILD_EXCEPTION: {
output.accept(messages.subList(0, i + 1));
case Message.CANCEL_BUILD:
return new DefaultResult(argv,
new InterruptedException("The build was canceled"));
case Message.BUILD_EXCEPTION:
final BuildException e = (BuildException) m;
return new DefaultResult(argv,
new Exception(e.getClassName() + ": " + e.getMessage() + "\n" + e.getStackTrace()));
}
case Message.BUILD_STOPPED: {
output.accept(messages.subList(0, i));
case Message.BUILD_STOPPED:
return new DefaultResult(argv, null);
}
default:
break;
}
}
output.accept(messages);
}
}
}

View File

@@ -26,7 +26,6 @@ import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
public abstract class Message {
public static final int BUILD_REQUEST = 0;
@@ -44,10 +43,12 @@ public abstract class Message {
public static final int PROMPT_RESPONSE = 12;
public static final int BUILD_STATUS = 13;
public static final int KEYBOARD_INPUT = 14;
public static final int CANCEL_BUILD = 15;
public static final SimpleMessage KEEP_ALIVE_SINGLETON = new SimpleMessage(KEEP_ALIVE);
public static final SimpleMessage STOP_SINGLETON = new SimpleMessage(STOP);
public static final SimpleMessage BUILD_STOPPED_SINGLETON = new SimpleMessage(BUILD_STOPPED);
public static final SimpleMessage CANCEL_BUILD_SINGLETON = new SimpleMessage(CANCEL_BUILD);
final int type;
@@ -87,6 +88,8 @@ public abstract class Message {
return PromptResponse.read(input);
case BUILD_STATUS:
return StringMessage.read(BUILD_STATUS, input);
case CANCEL_BUILD:
return SimpleMessage.CANCEL_BUILD_SINGLETON;
}
throw new IllegalStateException("Unexpected message type: " + type);
}
@@ -454,6 +457,7 @@ public abstract class Message {
output.writeInt(projectCount);
output.writeInt(maxThreads);
}
}
public static class BuildMessage extends Message {
@@ -511,6 +515,8 @@ public abstract class Message {
return "BuildStopped";
case STOP:
return "Stop";
case CANCEL_BUILD:
return "BuildCanceled";
default:
throw new IllegalStateException("Unexpected type " + type);
}
@@ -607,7 +613,6 @@ public abstract class Message {
final String uid;
final String message;
final boolean password;
final Consumer<String> callback;
public static Prompt read(DataInputStream input) throws IOException {
String projectId = Message.readUTF(input);
@@ -618,16 +623,11 @@ public abstract class Message {
}
public Prompt(String projectId, String uid, String message, boolean password) {
this(projectId, uid, message, password, null);
}
public Prompt(String projectId, String uid, String message, boolean password, Consumer<String> callback) {
super(PROMPT);
this.projectId = projectId;
this.uid = uid;
this.message = message;
this.password = password;
this.callback = callback;
}
public String getProjectId() {
@@ -665,18 +665,10 @@ public abstract class Message {
output.writeBoolean(password);
}
public Prompt withCallback(Consumer<String> callback) {
return new Prompt(projectId, uid, message, password, callback);
}
public PromptResponse response(String message) {
return new PromptResponse(projectId, uid, message);
}
public Consumer<String> getCallback() {
return callback;
}
}
public static class PromptResponse extends Message {
@@ -737,6 +729,10 @@ public abstract class Message {
return new StringMessage(BUILD_STATUS, payload);
}
public static Display display(String message) {
return new Display(null, message);
}
public static BuildMessage log(String message) {
return new BuildMessage(null, message);
}

View File

@@ -16,12 +16,16 @@
package org.jboss.fuse.mvnd.common.logging;
import java.util.List;
import java.util.function.Consumer;
import org.jboss.fuse.mvnd.common.Message;
/**
* A sink for various kinds of events sent by the daemon.
*/
public interface ClientOutput extends AutoCloseable {
void setDeamonDispatch(Consumer<Message> sink);
void accept(Message message);
void accept(List<Message> messages);

View File

@@ -39,7 +39,6 @@ import org.jboss.fuse.mvnd.common.Message.BuildEvent;
import org.jboss.fuse.mvnd.common.Message.BuildException;
import org.jboss.fuse.mvnd.common.Message.BuildMessage;
import org.jboss.fuse.mvnd.common.Message.BuildStarted;
import org.jboss.fuse.mvnd.common.Message.SimpleMessage;
import org.jboss.fuse.mvnd.common.Message.StringMessage;
import org.jline.terminal.Size;
import org.jline.terminal.Terminal;
@@ -60,6 +59,7 @@ public class TerminalOutput implements ClientOutput {
public static final int CTRL_M = 'M' & 0x1f;
private final Terminal terminal;
private final Terminal.SignalHandler previousIntHandler;
private final Display display;
private final LinkedHashMap<String, Project> projects = new LinkedHashMap<>();
private final ClientLog log;
@@ -70,6 +70,8 @@ public class TerminalOutput implements ClientOutput {
private final long start;
private final ReadWriteLock readInput = new ReentrantReadWriteLock();
/** A sink for sending messages back to the daemon */
private volatile Consumer<Message> daemonDispatch;
private volatile String name;
private volatile int totalProjects;
private volatile int maxThreads;
@@ -97,6 +99,14 @@ public class TerminalOutput implements ClientOutput {
this.start = System.currentTimeMillis();
this.terminal = TerminalBuilder.terminal();
terminal.enterRawMode();
Thread mainThread = Thread.currentThread();
daemonDispatch = m -> {
if (m == Message.CANCEL_BUILD_SINGLETON) {
mainThread.interrupt();
}
};
this.previousIntHandler = terminal.handle(Terminal.Signal.INT,
sig -> daemonDispatch.accept(Message.CANCEL_BUILD_SINGLETON));
this.display = new Display(terminal, false);
this.log = logFile == null ? new MessageCollector() : new FileLog(logFile);
final Thread r = new Thread(this::readInputLoop);
@@ -104,6 +114,11 @@ public class TerminalOutput implements ClientOutput {
this.reader = r;
}
@Override
public void setDeamonDispatch(Consumer<Message> daemonDispatch) {
this.daemonDispatch = daemonDispatch;
}
@Override
public void accept(Message entry) {
assert "main".equals(Thread.currentThread().getName());
@@ -115,13 +130,12 @@ public class TerminalOutput implements ClientOutput {
@Override
public void accept(List<Message> entries) {
assert "main".equals(Thread.currentThread().getName());
boolean update = true;
for (Message entry : entries) {
update &= doAccept(entry);
}
if (update) {
update();
if (!doAccept(entry)) {
return;
}
}
update();
}
private boolean doAccept(Message entry) {
@@ -133,6 +147,19 @@ public class TerminalOutput implements ClientOutput {
this.maxThreads = bs.getMaxThreads();
break;
}
case Message.CANCEL_BUILD: {
projects.values().stream().flatMap(p -> p.log.stream()).forEach(log);
clearDisplay();
try {
log.close();
} catch (IOException e1) {
throw new RuntimeException(e1);
}
final AttributedStyle s = new AttributedStyle().bold().foreground(AttributedStyle.RED);
new AttributedString("The build was canceled", s).println(terminal);
terminal.flush();
return false;
}
case Message.BUILD_EXCEPTION: {
final BuildException e = (BuildException) entry;
final String msg;
@@ -192,7 +219,11 @@ public class TerminalOutput implements ClientOutput {
case Message.DISPLAY: {
Message.Display d = (Message.Display) entry;
display.update(Collections.emptyList(), 0);
terminal.writer().printf("[%s] %s%n", d.getProjectId(), d.getMessage());
if (d.getProjectId() != null) {
terminal.writer().printf("[%s] %s%n", d.getProjectId(), d.getMessage());
} else {
terminal.writer().printf("%s%n", d.getMessage());
}
break;
}
case Message.PROMPT: {
@@ -208,8 +239,8 @@ public class TerminalOutput implements ClientOutput {
if (c < 0) {
break;
} else if (c == '\n' || c == '\r') {
prompt.getCallback().accept(sb.toString());
terminal.writer().println();
daemonDispatch.accept(prompt.response(sb.toString()));
break;
} else if (c == 127) {
if (sb.length() > 0) {
@@ -232,24 +263,15 @@ public class TerminalOutput implements ClientOutput {
}
case Message.BUILD_MESSAGE: {
BuildMessage bm = (BuildMessage) entry;
if (closing) {
try {
closed.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.err.println(bm.getMessage());
if (bm.getProjectId() != null) {
Project prj = projects.computeIfAbsent(bm.getProjectId(), Project::new);
prj.log.add(bm.getMessage());
} else {
if (bm.getProjectId() != null) {
Project prj = projects.computeIfAbsent(bm.getProjectId(), Project::new);
prj.log.add(bm.getMessage());
} else {
log.accept(bm.getMessage());
}
log.accept(bm.getMessage());
}
break;
}
case Message.KEYBOARD_INPUT:
case Message.KEYBOARD_INPUT: {
char keyStroke = ((StringMessage) entry).getPayload().charAt(0);
switch (keyStroke) {
case '+':
@@ -268,6 +290,10 @@ public class TerminalOutput implements ClientOutput {
}
break;
}
default:
throw new IllegalStateException("Unexpected message " + entry);
}
return true;
}
@@ -322,8 +348,9 @@ public class TerminalOutput implements ClientOutput {
public void close() throws Exception {
closing = true;
reader.interrupt();
accept(SimpleMessage.BUILD_STOPPED_SINGLETON);
log.close();
reader.join();
terminal.handle(Terminal.Signal.INT, previousIntHandler);
terminal.close();
closed.countDown();
if (exception != null) {

View File

@@ -88,6 +88,10 @@ class ProjectExecutorService {
executor.shutdown();
}
public void cancel() {
executor.shutdownNow();
}
// hook to allow pausing executor during unit tests
protected void beforeExecute(Thread t, Runnable r) {
}

View File

@@ -70,16 +70,41 @@ public class SmartBuilder implements Builder {
private final LifecycleModuleBuilder moduleBuilder;
private volatile SmartBuilderImpl builder;
private volatile boolean canceled;
private static SmartBuilder INSTANCE;
public static SmartBuilder cancel() {
SmartBuilder builder = INSTANCE;
if (builder != null) {
builder.doCancel();
}
return builder;
}
@Inject
public SmartBuilder(LifecycleModuleBuilder moduleBuilder) {
this.moduleBuilder = moduleBuilder;
INSTANCE = this;
}
void doCancel() {
canceled = true;
SmartBuilderImpl b = builder;
if (b != null) {
b.cancel();
}
}
public void doneCancel() {
canceled = false;
}
@Override
public void build(final MavenSession session, final ReactorContext reactorContext,
public synchronized void build(final MavenSession session, final ReactorContext reactorContext,
ProjectBuildList projectBuilds, final List<TaskSegment> taskSegments,
ReactorBuildStatus reactorBuildStatus) throws ExecutionException, InterruptedException {
List<String> list = new ArrayList<>();
String providerScript = null;
@@ -165,9 +190,16 @@ public class SmartBuilder implements Builder {
List<Map.Entry<TaskSegment, ReactorBuildStats>> allstats = new ArrayList<>();
for (TaskSegment taskSegment : taskSegments) {
Set<MavenProject> projects = projectBuilds.getByTaskSegment(taskSegment).getProjects();
ReactorBuildStats stats = new SmartBuilderImpl(moduleBuilder, session, reactorContext, taskSegment, projects, graph)
.build();
allstats.add(new AbstractMap.SimpleEntry<>(taskSegment, stats));
if (canceled) {
return;
}
builder = new SmartBuilderImpl(moduleBuilder, session, reactorContext, taskSegment, projects, graph);
try {
ReactorBuildStats stats = builder.build();
allstats.add(new AbstractMap.SimpleEntry<>(taskSegment, stats));
} finally {
builder = null;
}
}
if (session.getResult().hasExceptions()) {

View File

@@ -157,6 +157,10 @@ class SmartBuilderImpl {
executor.shutdown();
}
public void cancel() {
executor.cancel();
}
private void submitAll(Set<MavenProject> readyProjects) {
List<ProjectBuildTask> tasks = new ArrayList<>();
for (MavenProject project : readyProjects) {

View File

@@ -41,6 +41,7 @@ import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.maven.cli.DaemonMavenCli;
import org.apache.maven.execution.MavenSession;
import org.jboss.fuse.mvnd.builder.SmartBuilder;
import org.jboss.fuse.mvnd.common.DaemonConnection;
import org.jboss.fuse.mvnd.common.DaemonException;
import org.jboss.fuse.mvnd.common.DaemonExpirationStatus;
@@ -61,7 +62,9 @@ import org.jboss.fuse.mvnd.logging.smart.AbstractLoggingSpy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.jboss.fuse.mvnd.common.DaemonState.Broken;
import static org.jboss.fuse.mvnd.common.DaemonState.Busy;
import static org.jboss.fuse.mvnd.common.DaemonState.Canceled;
import static org.jboss.fuse.mvnd.common.DaemonState.StopRequested;
import static org.jboss.fuse.mvnd.common.DaemonState.Stopped;
@@ -267,7 +270,6 @@ public class Server implements AutoCloseable, Runnable {
condition.await();
break;
case Canceled:
LOGGER.debug("cancel requested.");
cancelNow();
break;
case Broken:
@@ -355,17 +357,12 @@ public class Server implements AutoCloseable, Runnable {
private void cancelNow() {
long time = System.currentTimeMillis() + CANCEL_TIMEOUT;
// LOGGER.debug("Cancel requested: will wait for daemon to become idle.");
// try {
// cancellationToken.cancel();
// } catch (Exception ex) {
// LOGGER.error("Cancel processing failed. Will continue.", ex);
// }
LOGGER.debug("Cancel requested: will wait for daemon to become idle.");
final SmartBuilder builder = SmartBuilder.cancel();
stateLock.lock();
try {
long rem;
while ((rem = System.currentTimeMillis() - time) > 0) {
while ((rem = time - System.currentTimeMillis()) > 0) {
try {
switch (getState()) {
case Idle:
@@ -391,6 +388,9 @@ public class Server implements AutoCloseable, Runnable {
stopNow("cancel requested but timed out");
} finally {
stateLock.unlock();
if (builder != null) {
builder.doneCancel();
}
}
}
@@ -447,12 +447,20 @@ public class Server implements AutoCloseable, Runnable {
break;
}
LOGGER.info("Received message: {}", message);
synchronized (recvQueue) {
recvQueue.put(message);
recvQueue.notifyAll();
if (message == Message.CANCEL_BUILD_SINGLETON) {
updateState(DaemonState.Canceled);
return;
} else {
synchronized (recvQueue) {
recvQueue.put(message);
recvQueue.notifyAll();
}
}
}
} catch (DaemonException.RecoverableMessageIOException t) {
updateState(Canceled);
} catch (Throwable t) {
updateState(Broken);
LOGGER.error("Error receiving events", t);
}
});

View File

@@ -19,6 +19,7 @@ import java.io.IOError;
import org.apache.maven.execution.MavenSession;
import org.jboss.fuse.mvnd.common.Message;
import org.jboss.fuse.mvnd.common.Message.BuildEvent;
import org.jboss.fuse.mvnd.common.Message.BuildStarted;
import org.jboss.fuse.mvnd.common.logging.TerminalOutput;
public class MavenLoggingSpy extends AbstractLoggingSpy {
@@ -32,7 +33,7 @@ public class MavenLoggingSpy extends AbstractLoggingSpy {
protected void onStartSession(MavenSession session) {
try {
output = new TerminalOutput(null);
output.accept(new Message.BuildStarted(
output.accept(new BuildStarted(
session.getTopLevelProject().getName(),
session.getAllProjects().size(),
session.getRequest().getDegreeOfConcurrency()));

View File

@@ -17,6 +17,7 @@ package org.jboss.fuse.mvnd.assertj;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.assertj.core.api.Assertions;
import org.jboss.fuse.mvnd.common.Message;
@@ -24,11 +25,17 @@ import org.jboss.fuse.mvnd.common.logging.ClientOutput;
public class TestClientOutput implements ClientOutput {
private final List<Message> messages = new ArrayList<>();
protected Consumer<Message> daemonDispatch;
@Override
public void close() throws Exception {
}
@Override
public void setDeamonDispatch(Consumer<Message> daemonDispatch) {
this.daemonDispatch = daemonDispatch;
}
@Override
public void accept(Message message) {
messages.add(message);
@@ -43,7 +50,7 @@ public class TestClientOutput implements ClientOutput {
@Override
public void describeTerminal() {
accept(Message.log("Test terminal"));
accept(Message.display("Test terminal"));
}
public List<Message> getMessages() {

View File

@@ -44,7 +44,7 @@ public class InteractiveTest {
@Override
public void accept(Message m) {
if (m instanceof Prompt) {
((Prompt) m).getCallback().accept("0.1.0-SNAPSHOT");
daemonDispatch.accept(((Prompt) m).response("0.1.0-SNAPSHOT"));
}
super.accept(m);
}