diff --git a/sync/src/main/java/org/mvndaemon/mvnd/sync/IpcClient.java b/sync/src/main/java/org/mvndaemon/mvnd/sync/IpcClient.java index 7fb71856..56ac67eb 100644 --- a/sync/src/main/java/org/mvndaemon/mvnd/sync/IpcClient.java +++ b/sync/src/main/java/org/mvndaemon/mvnd/sync/IpcClient.java @@ -197,8 +197,7 @@ public class IpcClient { f.complete(s); } } catch (Exception e) { - // ignore - close(); + close(e); } } @@ -224,12 +223,16 @@ public class IpcClient { } } - synchronized void close() { + void close() { + close(new IOException("Closing")); + } + + synchronized void close(Throwable e) { if (socket != null) { try { socket.close(); } catch (IOException t) { - // ignore + e.addSuppressed(t); } socket = null; input = null; @@ -239,12 +242,11 @@ public class IpcClient { receiver.interrupt(); try { receiver.join(1000); - } catch (InterruptedException e) { - // ignore + } catch (InterruptedException t) { + e.addSuppressed(t); } } - Throwable t = new IOException("Closing"); - responses.values().forEach(f -> f.completeExceptionally(t)); + responses.values().forEach(f -> f.completeExceptionally(e)); responses.clear(); } @@ -257,8 +259,8 @@ public class IpcClient { } return response.get(1); } catch (Exception e) { - close(); - throw new RuntimeException("Unable to create new context", e); + close(e); + throw new RuntimeException("Unable to create new sync context", e); } } @@ -273,8 +275,8 @@ public class IpcClient { throw new IOException("Unexpected response: " + response); } } catch (Exception e) { - close(); - throw new RuntimeException("Unable to perform lock", e); + close(e); + throw new RuntimeException("Unable to perform lock (contextId = " + contextId + ")", e); } } @@ -285,8 +287,8 @@ public class IpcClient { throw new IOException("Unexpected response: " + response); } } catch (Exception e) { - close(); - throw new RuntimeException("Unable to perform lock", e); + close(e); + throw new RuntimeException("Unable to unlock (contextId = " + contextId + ")", e); } } diff --git a/sync/src/main/java/org/mvndaemon/mvnd/sync/IpcServer.java b/sync/src/main/java/org/mvndaemon/mvnd/sync/IpcServer.java index 96ed1d85..07aa0e91 100644 --- a/sync/src/main/java/org/mvndaemon/mvnd/sync/IpcServer.java +++ b/sync/src/main/java/org/mvndaemon/mvnd/sync/IpcServer.java @@ -98,13 +98,17 @@ public class IpcServer { } } + private static void debug(String msg, Object... args) { + //System.out.printf("[ipc] [debug] " + msg + "\n", args); + } + private static void info(String msg, Object... args) { - System.out.printf(msg + "\n", args); + System.out.printf("[ipc] [info] " + msg + "\n", args); } private static void error(String msg, Throwable t) { - System.err.println(msg); - t.printStackTrace(); + System.out.println("[ipc] [error] " + msg); + t.printStackTrace(System.out); } private static void run(Runnable runnable) { @@ -133,8 +137,8 @@ public class IpcServer { } private void client(Socket socket) { - info("Client connected"); - clients.incrementAndGet(); + int c = clients.incrementAndGet(); + info("New client connected (%d connected)", c); use(); Map clientContexts = new ConcurrentHashMap<>(); try { @@ -150,6 +154,7 @@ public class IpcServer { if (request.isEmpty()) { throw new IOException("Received invalid request"); } + use(); String contextId; Context context; String command = request.remove(0); @@ -163,6 +168,7 @@ public class IpcServer { contexts.put(context.id, context); clientContexts.put(context.id, context); synchronized (output) { + debug("Created context %s", context.id); output.writeInt(requestId); output.writeInt(2); output.writeUTF(RESPONSE_CONTEXT); @@ -177,11 +183,12 @@ public class IpcServer { contextId = request.remove(0); context = contexts.get(contextId); if (context == null) { - throw new IOException("Unknown context: " + contextId); + throw new IOException("Unknown context: " + contextId + ". Known contexts = " + contexts.keySet()); } context.lock(request).thenRun(() -> { try { synchronized (output) { + debug("Locking in context %s", context.id); output.writeInt(requestId); output.writeInt(1); output.writeUTF(RESPONSE_ACQUIRE); @@ -191,8 +198,9 @@ public class IpcServer { try { socket.close(); } catch (IOException ioException) { - // ignore + e.addSuppressed(ioException); } + error("Error writing lock response", e); } }); break; @@ -204,10 +212,11 @@ public class IpcServer { context = contexts.remove(contextId); clientContexts.remove(contextId); if (context == null) { - throw new IOException("Unknown context: " + contextId); + throw new IOException("Unknown context: " + contextId + ". Known contexts = " + contexts.keySet()); } context.unlock(); synchronized (output) { + debug("Closing context %s", context.id); output.writeInt(requestId); output.writeInt(1); output.writeUTF(RESPONSE_CLOSE); diff --git a/sync/src/main/java/org/mvndaemon/mvnd/sync/IpcSyncContext.java b/sync/src/main/java/org/mvndaemon/mvnd/sync/IpcSyncContext.java index 5bd3797a..9fcb8da2 100644 --- a/sync/src/main/java/org/mvndaemon/mvnd/sync/IpcSyncContext.java +++ b/sync/src/main/java/org/mvndaemon/mvnd/sync/IpcSyncContext.java @@ -16,7 +16,9 @@ package org.mvndaemon.mvnd.sync; import java.util.Collection; +import java.util.Objects; import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Stream; import org.eclipse.aether.SyncContext; import org.eclipse.aether.artifact.Artifact; @@ -27,18 +29,22 @@ import org.eclipse.aether.metadata.Metadata; */ class IpcSyncContext implements SyncContext { - IpcClient client; - boolean shared; - String contextId; + final IpcClient client; + final boolean shared; + final String contextId; + final AtomicBoolean closed = new AtomicBoolean(); IpcSyncContext(IpcClient client, boolean shared) { this.client = client; this.shared = shared; - this.contextId = client.newContext(shared); + this.contextId = Objects.requireNonNull(client.newContext(shared)); } @Override public void acquire(Collection artifacts, Collection metadatas) { + if (closed.get()) { + throw new IllegalStateException("Already closed"); + } Collection keys = new TreeSet<>(); stream(artifacts).map(this::getKey).forEach(keys::add); stream(metadatas).map(this::getKey).forEach(keys::add); @@ -50,7 +56,7 @@ class IpcSyncContext implements SyncContext { @Override public void close() { - if (contextId != null) { + if (closed.compareAndSet(false, true)) { client.unlock(contextId); } }