Merge pull request #425 from gnodet/i424

The IPC server shuts down after one minute during the build
This commit is contained in:
Guillaume Nodet
2021-06-04 09:57:16 +02:00
committed by GitHub
3 changed files with 44 additions and 27 deletions

View File

@@ -197,8 +197,7 @@ public class IpcClient {
f.complete(s);
}
} catch (Exception e) {
// ignore
close();
close(e);
}
}
@@ -224,12 +223,16 @@ public class IpcClient {
}
}
synchronized void close() {
void close() {
close(new IOException("Closing"));
}
synchronized void close(Throwable e) {
if (socket != null) {
try {
socket.close();
} catch (IOException t) {
// ignore
e.addSuppressed(t);
}
socket = null;
input = null;
@@ -239,12 +242,11 @@ public class IpcClient {
receiver.interrupt();
try {
receiver.join(1000);
} catch (InterruptedException e) {
// ignore
} catch (InterruptedException t) {
e.addSuppressed(t);
}
}
Throwable t = new IOException("Closing");
responses.values().forEach(f -> f.completeExceptionally(t));
responses.values().forEach(f -> f.completeExceptionally(e));
responses.clear();
}
@@ -257,8 +259,8 @@ public class IpcClient {
}
return response.get(1);
} catch (Exception e) {
close();
throw new RuntimeException("Unable to create new context", e);
close(e);
throw new RuntimeException("Unable to create new sync context", e);
}
}
@@ -273,8 +275,8 @@ public class IpcClient {
throw new IOException("Unexpected response: " + response);
}
} catch (Exception e) {
close();
throw new RuntimeException("Unable to perform lock", e);
close(e);
throw new RuntimeException("Unable to perform lock (contextId = " + contextId + ")", e);
}
}
@@ -285,8 +287,8 @@ public class IpcClient {
throw new IOException("Unexpected response: " + response);
}
} catch (Exception e) {
close();
throw new RuntimeException("Unable to perform lock", e);
close(e);
throw new RuntimeException("Unable to unlock (contextId = " + contextId + ")", e);
}
}

View File

@@ -98,13 +98,17 @@ public class IpcServer {
}
}
private static void debug(String msg, Object... args) {
//System.out.printf("[ipc] [debug] " + msg + "\n", args);
}
private static void info(String msg, Object... args) {
System.out.printf(msg + "\n", args);
System.out.printf("[ipc] [info] " + msg + "\n", args);
}
private static void error(String msg, Throwable t) {
System.err.println(msg);
t.printStackTrace();
System.out.println("[ipc] [error] " + msg);
t.printStackTrace(System.out);
}
private static void run(Runnable runnable) {
@@ -133,8 +137,8 @@ public class IpcServer {
}
private void client(Socket socket) {
info("Client connected");
clients.incrementAndGet();
int c = clients.incrementAndGet();
info("New client connected (%d connected)", c);
use();
Map<String, Context> clientContexts = new ConcurrentHashMap<>();
try {
@@ -150,6 +154,7 @@ public class IpcServer {
if (request.isEmpty()) {
throw new IOException("Received invalid request");
}
use();
String contextId;
Context context;
String command = request.remove(0);
@@ -163,6 +168,7 @@ public class IpcServer {
contexts.put(context.id, context);
clientContexts.put(context.id, context);
synchronized (output) {
debug("Created context %s", context.id);
output.writeInt(requestId);
output.writeInt(2);
output.writeUTF(RESPONSE_CONTEXT);
@@ -177,11 +183,12 @@ public class IpcServer {
contextId = request.remove(0);
context = contexts.get(contextId);
if (context == null) {
throw new IOException("Unknown context: " + contextId);
throw new IOException("Unknown context: " + contextId + ". Known contexts = " + contexts.keySet());
}
context.lock(request).thenRun(() -> {
try {
synchronized (output) {
debug("Locking in context %s", context.id);
output.writeInt(requestId);
output.writeInt(1);
output.writeUTF(RESPONSE_ACQUIRE);
@@ -191,8 +198,9 @@ public class IpcServer {
try {
socket.close();
} catch (IOException ioException) {
// ignore
e.addSuppressed(ioException);
}
error("Error writing lock response", e);
}
});
break;
@@ -204,10 +212,11 @@ public class IpcServer {
context = contexts.remove(contextId);
clientContexts.remove(contextId);
if (context == null) {
throw new IOException("Unknown context: " + contextId);
throw new IOException("Unknown context: " + contextId + ". Known contexts = " + contexts.keySet());
}
context.unlock();
synchronized (output) {
debug("Closing context %s", context.id);
output.writeInt(requestId);
output.writeInt(1);
output.writeUTF(RESPONSE_CLOSE);

View File

@@ -16,7 +16,9 @@
package org.mvndaemon.mvnd.sync;
import java.util.Collection;
import java.util.Objects;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
import org.eclipse.aether.SyncContext;
import org.eclipse.aether.artifact.Artifact;
@@ -27,18 +29,22 @@ import org.eclipse.aether.metadata.Metadata;
*/
class IpcSyncContext implements SyncContext {
IpcClient client;
boolean shared;
String contextId;
final IpcClient client;
final boolean shared;
final String contextId;
final AtomicBoolean closed = new AtomicBoolean();
IpcSyncContext(IpcClient client, boolean shared) {
this.client = client;
this.shared = shared;
this.contextId = client.newContext(shared);
this.contextId = Objects.requireNonNull(client.newContext(shared));
}
@Override
public void acquire(Collection<? extends Artifact> artifacts, Collection<? extends Metadata> metadatas) {
if (closed.get()) {
throw new IllegalStateException("Already closed");
}
Collection<String> keys = new TreeSet<>();
stream(artifacts).map(this::getKey).forEach(keys::add);
stream(metadatas).map(this::getKey).forEach(keys::add);
@@ -50,7 +56,7 @@ class IpcSyncContext implements SyncContext {
@Override
public void close() {
if (contextId != null) {
if (closed.compareAndSet(false, true)) {
client.unlock(contextId);
}
}