Make sure the IpcClient recreates the server if the context creation fails, fixes #446

This commit is contained in:
Guillaume Nodet
2021-07-06 14:52:11 +02:00
committed by GitHub
4 changed files with 151 additions and 64 deletions

View File

@@ -48,7 +48,9 @@ public class LockingEventSpy extends AbstractEventSpy {
private Lock getLock(ExecutionEvent event) { private Lock getLock(ExecutionEvent event) {
SessionData data = event.getSession().getRepositorySession().getData(); SessionData data = event.getSession().getRepositorySession().getData();
Map<MavenProject, Lock> locks = (Map) data.get(LOCKS_KEY); Map<MavenProject, Lock> locks = (Map) data.get(LOCKS_KEY);
// initialize the value if not already done (in case of a concurrent access) to the method
if (locks == null) { if (locks == null) {
// the call to data.set(k, null, v) is effectively a call to data.putIfAbsent(k, v)
data.set(LOCKS_KEY, null, new ConcurrentHashMap<>()); data.set(LOCKS_KEY, null, new ConcurrentHashMap<>());
locks = (Map) data.get(LOCKS_KEY); locks = (Map) data.get(LOCKS_KEY);
} }
@@ -58,15 +60,15 @@ public class LockingEventSpy extends AbstractEventSpy {
@Override @Override
public void onEvent(Object event) throws Exception { public void onEvent(Object event) throws Exception {
if (event instanceof ExecutionEvent) { if (event instanceof ExecutionEvent) {
ExecutionEvent ee = (ExecutionEvent) event; ExecutionEvent executionEvent = (ExecutionEvent) event;
switch (ee.getType()) { switch (executionEvent.getType()) {
case ProjectStarted: case ProjectStarted:
case ForkedProjectStarted: { case ForkedProjectStarted: {
Lock lock = getLock(ee); Lock lock = getLock(executionEvent);
if (!lock.tryLock()) { if (!lock.tryLock()) {
logger.warn("Suspending concurrent execution of project " + ee.getProject()); logger.warn("Suspending concurrent execution of project '{}'", executionEvent.getProject());
getLock(ee).lockInterruptibly(); lock.lockInterruptibly();
logger.warn("Resuming execution of project " + ee.getProject()); logger.warn("Resuming execution of project '{}'", executionEvent.getProject());
} }
break; break;
} }
@@ -74,7 +76,7 @@ public class LockingEventSpy extends AbstractEventSpy {
case ProjectFailed: case ProjectFailed:
case ForkedProjectSucceeded: case ForkedProjectSucceeded:
case ForkedProjectFailed: case ForkedProjectFailed:
getLock(ee).unlock(); getLock(executionEvent).unlock();
break; break;
} }
} }

View File

@@ -15,6 +15,7 @@
*/ */
package org.mvndaemon.mvnd.sync; package org.mvndaemon.mvnd.sync;
import java.io.Closeable;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.File; import java.io.File;
@@ -111,43 +112,51 @@ public class IpcClient {
int tmpport = ss.getLocalPort(); int tmpport = ss.getLocalPort();
int rand = new Random().nextInt(); int rand = new Random().nextInt();
List<String> args = new ArrayList<>(); String noFork = System.getProperty(IpcServer.NO_FORK_PROP);
String javaHome = System.getenv("JAVA_HOME"); Closeable close;
if (javaHome == null) { if (Boolean.parseBoolean(noFork)) {
javaHome = System.getProperty("java.home"); IpcServer server = IpcServer.runServer(tmpport, rand);
} close = server::close;
boolean win = System.getProperty("os.name").toLowerCase(Locale.ROOT).contains("win");
String javaCmd = win ? "bin\\java.exe" : "bin/java";
String java = Paths.get(javaHome).resolve(javaCmd).toAbsolutePath().toString();
args.add(java);
String classpath;
String className = getClass().getName().replace('.', '/') + ".class";
String url = getClass().getClassLoader().getResource(className).toString();
if (url.startsWith("jar:")) {
classpath = url.substring("jar:".length(), url.indexOf("!/"));
} else if (url.startsWith("file:")) {
classpath = url.substring("file:".length(), url.indexOf(className));
} else { } else {
throw new IllegalStateException(); List<String> args = new ArrayList<>();
String javaHome = System.getenv("JAVA_HOME");
if (javaHome == null) {
javaHome = System.getProperty("java.home");
}
boolean win = System.getProperty("os.name").toLowerCase(Locale.ROOT).contains("win");
String javaCmd = win ? "bin\\java.exe" : "bin/java";
String java = Paths.get(javaHome).resolve(javaCmd).toAbsolutePath().toString();
args.add(java);
String classpath;
String className = getClass().getName().replace('.', '/') + ".class";
String url = getClass().getClassLoader().getResource(className).toString();
if (url.startsWith("jar:")) {
classpath = url.substring("jar:".length(), url.indexOf("!/"));
} else if (url.startsWith("file:")) {
classpath = url.substring("file:".length(), url.indexOf(className));
} else {
throw new IllegalStateException();
}
args.add("-cp");
args.add(classpath);
String timeout = System.getProperty(IpcServer.IDLE_TIMEOUT_PROP);
if (timeout != null) {
args.add("-D" + IpcServer.IDLE_TIMEOUT_PROP + "=" + timeout);
}
args.add(IpcServer.class.getName());
args.add(Integer.toString(tmpport));
args.add(Integer.toString(rand));
ProcessBuilder processBuilder = new ProcessBuilder();
ProcessBuilder.Redirect discard = ProcessBuilder.Redirect.to(new File(win ? "NUL" : "/dev/null"));
discard = ProcessBuilder.Redirect.INHERIT;
Process process = processBuilder
.directory(lockFile.getParent().toFile())
.command(args)
.redirectOutput(discard)
.redirectError(discard)
.start();
close = process::destroyForcibly;
} }
args.add("-cp");
args.add(classpath);
String timeout = System.getProperty(IpcServer.IDLE_TIMEOUT_PROP);
if (timeout != null) {
args.add("-D" + IpcServer.IDLE_TIMEOUT_PROP + "=" + timeout);
}
args.add(IpcServer.class.getName());
args.add(Integer.toString(tmpport));
args.add(Integer.toString(rand));
ProcessBuilder processBuilder = new ProcessBuilder();
ProcessBuilder.Redirect discard = ProcessBuilder.Redirect.to(new File(win ? "NUL" : "/dev/null"));
discard = ProcessBuilder.Redirect.INHERIT;
Process process = processBuilder
.directory(lockFile.getParent().toFile())
.command(args)
.redirectOutput(discard)
.redirectError(discard)
.start();
ExecutorService es = Executors.newSingleThreadExecutor(); ExecutorService es = Executors.newSingleThreadExecutor();
Future<int[]> future = es.submit(() -> { Future<int[]> future = es.submit(() -> {
@@ -161,14 +170,14 @@ public class IpcClient {
try { try {
res = future.get(5, TimeUnit.SECONDS); res = future.get(5, TimeUnit.SECONDS);
} catch (Exception e) { } catch (Exception e) {
process.destroyForcibly(); close.close();
throw e; throw e;
} finally { } finally {
es.shutdownNow(); es.shutdownNow();
ss.close(); ss.close();
} }
if (rand != res[0]) { if (rand != res[0]) {
process.destroyForcibly(); close.close();
throw new IllegalStateException("IpcServer did not respond with the correct random"); throw new IllegalStateException("IpcServer did not respond with the correct random");
} }
@@ -255,17 +264,21 @@ public class IpcClient {
} }
String newContext(boolean shared) { String newContext(boolean shared) {
try { RuntimeException error = new RuntimeException("Unable to create new sync context");
List<String> response = send(Arrays.asList( for (int i = 0; i < 2; i++) {
REQUEST_CONTEXT, Boolean.toString(shared))); try {
if (response.size() != 2 || !RESPONSE_CONTEXT.equals(response.get(0))) { List<String> response = send(Arrays.asList(
throw new IOException("Unexpected response: " + response); REQUEST_CONTEXT, Boolean.toString(shared)));
if (response.size() != 2 || !RESPONSE_CONTEXT.equals(response.get(0))) {
throw new IOException("Unexpected response: " + response);
}
return response.get(1);
} catch (Exception e) {
close(e);
error.addSuppressed(e);
} }
return response.get(1);
} catch (Exception e) {
close(e);
throw new RuntimeException("Unable to create new sync context", e);
} }
throw error;
} }
void lock(String contextId, Collection<String> keys) { void lock(String contextId, Collection<String> keys) {
@@ -299,7 +312,8 @@ public class IpcClient {
@Override @Override
public String toString() { public String toString() {
return "IpcClient{" return "IpcClient{"
+ "repository=" + repository + "repository=" + repository + ','
+ "port=" + (socket != null ? socket.getPort() : 0)
+ '}'; + '}';
} }
} }

View File

@@ -23,6 +23,7 @@ import java.net.InetSocketAddress;
import java.net.ServerSocket; import java.net.ServerSocket;
import java.net.Socket; import java.net.Socket;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
@@ -46,12 +47,14 @@ import static org.mvndaemon.mvnd.sync.IpcMessages.RESPONSE_CONTEXT;
*/ */
public class IpcServer { public class IpcServer {
public static final String NO_FORK_PROP = "ipcsync.nofork";
public static final String IDLE_TIMEOUT_PROP = "ipcsync.idle.timeout"; public static final String IDLE_TIMEOUT_PROP = "ipcsync.idle.timeout";
static final long IDLE_TIMEOUT = TimeUnit.SECONDS.toNanos(60); static final long IDLE_TIMEOUT = TimeUnit.SECONDS.toNanos(60);
private final ServerSocket serverSocket; private final ServerSocket serverSocket;
private final AtomicInteger clients = new AtomicInteger(); private final Map<Socket, Thread> clients = new HashMap<>();
private final AtomicInteger counter = new AtomicInteger(); private final AtomicInteger counter = new AtomicInteger();
private final Map<String, Lock> locks = new ConcurrentHashMap<>(); private final Map<String, Lock> locks = new ConcurrentHashMap<>();
private final Map<String, Context> contexts = new ConcurrentHashMap<>(); private final Map<String, Context> contexts = new ConcurrentHashMap<>();
@@ -66,8 +69,13 @@ public class IpcServer {
String str = System.getProperty(IDLE_TIMEOUT_PROP); String str = System.getProperty(IDLE_TIMEOUT_PROP);
if (str != null) { if (str != null) {
try { try {
TimeUnit unit = TimeUnit.SECONDS;
if (str.endsWith("ms")) {
unit = TimeUnit.MILLISECONDS;
str = str.substring(0, str.length() - 2);
}
long dur = Long.parseLong(str); long dur = Long.parseLong(str);
timeout = TimeUnit.SECONDS.toNanos(dur); timeout = unit.toNanos(dur);
} catch (NumberFormatException e) { } catch (NumberFormatException e) {
error("Property " + IDLE_TIMEOUT_PROP + " specified with invalid value: " + str, e); error("Property " + IDLE_TIMEOUT_PROP + " specified with invalid value: " + str, e);
} }
@@ -96,6 +104,10 @@ public class IpcServer {
int tmpPort = Integer.parseInt(args[0]); int tmpPort = Integer.parseInt(args[0]);
int rand = Integer.parseInt(args[1]); int rand = Integer.parseInt(args[1]);
runServer(tmpPort, rand);
}
static IpcServer runServer(int tmpPort, int rand) throws IOException {
IpcServer server = new IpcServer(); IpcServer server = new IpcServer();
run(server::run); run(server::run);
int port = server.getPort(); int port = server.getPort();
@@ -108,6 +120,8 @@ public class IpcServer {
dos.flush(); dos.flush();
} }
} }
return server;
} }
private static void debug(String msg, Object... args) { private static void debug(String msg, Object... args) {
@@ -149,7 +163,11 @@ public class IpcServer {
} }
private void client(Socket socket) { private void client(Socket socket) {
int c = clients.incrementAndGet(); int c;
synchronized (clients) {
clients.put(socket, Thread.currentThread());
c = clients.size();
}
info("New client connected (%d connected)", c); info("New client connected (%d connected)", c);
use(); use();
Map<String, Context> clientContexts = new ConcurrentHashMap<>(); Map<String, Context> clientContexts = new ConcurrentHashMap<>();
@@ -240,9 +258,13 @@ public class IpcServer {
} }
} }
} catch (Throwable t) { } catch (Throwable t) {
error("Error processing request", t); if (!closing) {
error("Error processing request", t);
}
} finally { } finally {
info("Client disconnecting..."); if (!closing) {
info("Client disconnecting...");
}
clientContexts.values().forEach(context -> { clientContexts.values().forEach(context -> {
contexts.remove(context.id); contexts.remove(context.id);
context.unlock(); context.unlock();
@@ -252,7 +274,13 @@ public class IpcServer {
} catch (IOException ioException) { } catch (IOException ioException) {
// ignore // ignore
} }
info("%d clients left", clients.decrementAndGet()); synchronized (clients) {
clients.remove(socket);
c = clients.size();
}
if (!closing) {
info("%d clients left", c);
}
} }
} }
@@ -264,19 +292,28 @@ public class IpcServer {
while (true) { while (true) {
long current = System.nanoTime(); long current = System.nanoTime();
if (current - lastUsed > idleTimeout) { if (current - lastUsed > idleTimeout) {
info("IpcServer expired, closing");
close(); close();
break; break;
} }
} }
} }
private void close() { void close() {
closing = true; closing = true;
try { try {
serverSocket.close(); serverSocket.close();
} catch (IOException e) { } catch (IOException e) {
error("Error closing server socket", e); error("Error closing server socket", e);
} }
clients.forEach((s, t) -> {
try {
s.close();
} catch (IOException e) {
// ignore
}
t.interrupt();
});
} }
static class Waiter { static class Waiter {

View File

@@ -28,9 +28,13 @@ import org.eclipse.aether.repository.LocalRepositoryManager;
import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class IpcSyncContextTest { public class IpcSyncContextTest {
private static final Logger LOGGER = LoggerFactory.getLogger(IpcSyncContextTest.class);
@BeforeAll @BeforeAll
static void setup() { static void setup() {
System.setProperty(IpcServer.IDLE_TIMEOUT_PROP, "5"); System.setProperty(IpcServer.IDLE_TIMEOUT_PROP, "5");
@@ -73,15 +77,15 @@ public class IpcSyncContextTest {
for (int i = 0; i < threads.length; i++) { for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> { threads[i] = new Thread(() -> {
try (SyncContext context = factory.newInstance(session, false)) { try (SyncContext context = factory.newInstance(session, false)) {
System.out.println("Trying to lock from " + context); LOGGER.info("Trying to lock from {}", context);
context.acquire(Collections.singleton(artifact), null); context.acquire(Collections.singleton(artifact), null);
System.out.println("Lock acquired from " + context); LOGGER.info("Lock acquired from {}", context);
try { try {
Thread.sleep(50); Thread.sleep(50);
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
} }
System.out.println("Unlock from " + context); LOGGER.info("Unlock from {}", context);
} }
}); });
threads[i].start(); threads[i].start();
@@ -91,4 +95,34 @@ public class IpcSyncContextTest {
thread.join(); thread.join();
} }
} }
@Test
void testTimeoutAndConnect() throws Exception {
System.setProperty(IpcServer.IDLE_TIMEOUT_PROP, "50ms");
System.setProperty(IpcServer.NO_FORK_PROP, "true");
try {
SyncContextFactory factory = new IpcSyncContextFactory();
DefaultRepositorySystemSession session = new DefaultRepositorySystemSession();
LocalRepository repository = new LocalRepository(new File("target/test-repo"));
LocalRepositoryManager localRepositoryManager = new SimpleLocalRepositoryManagerFactory()
.newInstance(session, repository);
session.setLocalRepositoryManager(localRepositoryManager);
Artifact artifact = new DefaultArtifact("myGroup", "myArtifact", "jar", "0.1");
for (int i = 0; i < 10; i++) {
LOGGER.info("[client] Creating sync context");
try (SyncContext context = factory.newInstance(session, false)) {
LOGGER.info("[client] Sync context created: {}", context.toString());
context.acquire(Collections.singleton(artifact), null);
}
LOGGER.info("[client] Sync context closed");
Thread.sleep(100);
}
} finally {
System.clearProperty(IpcServer.IDLE_TIMEOUT_PROP);
System.clearProperty(IpcServer.NO_FORK_PROP);
}
}
} }