Add a global synccontext implementation

This commit is contained in:
Guillaume Nodet
2021-04-26 18:17:30 +02:00
parent 69da77be0e
commit 3aa356433d
10 changed files with 989 additions and 0 deletions

View File

@@ -92,6 +92,7 @@
<configuration>
<compilerArgs>
<arg>--add-exports=java.base/sun.nio.ch=ALL-UNNAMED</arg>
<arg>--add-exports=java.base/jdk.internal.module=ALL-UNNAMED</arg>
</compilerArgs>
</configuration>
</plugin>

View File

@@ -63,6 +63,9 @@
<artifact id="org.mvndaemon.mvnd:mvnd-client:${project.version}">
<exclusion id="*:*"/>
</artifact>
<artifact id="org.mvndaemon.mvnd:mvnd-synccontext:${project.version}">
<exclusion id="*:*"/>
</artifact>
<artifact id="org.mvndaemon.mvnd:mvnd-agent:${project.version}"/>
<artifact id="org.mvndaemon.mvnd:mvnd-helper-agent:${project.version}"/>
</artifactSet>

View File

@@ -46,6 +46,7 @@
<junit.jupiter.version>5.6.0</junit.jupiter.version>
<logback.version>1.2.3</logback.version>
<maven.version>3.8.1</maven.version>
<maven.resolver.version>1.6.2</maven.resolver.version>
<slf4j.version>1.7.29</slf4j.version>
<!-- plugin versions a..z -->
@@ -75,6 +76,7 @@
<module>common</module>
<module>client</module>
<module>daemon</module>
<module>sync</module>
<module>dist</module>
<module>integration-tests</module>
</modules>

91
sync/pom.xml Normal file
View File

@@ -0,0 +1,91 @@
<!--
Copyright 2021 the original author or authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.mvndaemon.mvnd</groupId>
<artifactId>mvnd</artifactId>
<version>0.5.0-SNAPSHOT</version>
</parent>
<artifactId>mvnd-synccontext</artifactId>
<packaging>jar</packaging>
<name>Maven Daemon - IPC Sync Context</name>
<dependencies>
<dependency>
<groupId>org.apache.maven.resolver</groupId>
<artifactId>maven-resolver-api</artifactId>
<version>${maven.resolver.version}</version>
</dependency>
<dependency>
<groupId>org.apache.maven.resolver</groupId>
<artifactId>maven-resolver-impl</artifactId>
<version>${maven.resolver.version}</version>
</dependency>
<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>jakarta.inject</groupId>
<artifactId>jakarta.inject-api</artifactId>
<version>${jakarta.inject.version}</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>${basedir}/src/main/resources</directory>
<filtering>true</filtering>
</resource>
</resources>
<plugins>
<plugin>
<groupId>io.takari.maven.plugins</groupId>
<artifactId>takari-lifecycle-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>sisu-index</goal>
</goals>
<phase>process-classes</phase>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@@ -0,0 +1,288 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.mvndaemon.mvnd.sync;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.RandomAccessFile;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.channels.FileLock;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.mvndaemon.mvnd.sync.IpcMessages.REQUEST_ACQUIRE;
import static org.mvndaemon.mvnd.sync.IpcMessages.REQUEST_CLOSE;
import static org.mvndaemon.mvnd.sync.IpcMessages.REQUEST_CONTEXT;
import static org.mvndaemon.mvnd.sync.IpcMessages.RESPONSE_ACQUIRE;
import static org.mvndaemon.mvnd.sync.IpcMessages.RESPONSE_CLOSE;
import static org.mvndaemon.mvnd.sync.IpcMessages.RESPONSE_CONTEXT;
/**
* Client side implementation.
* The client instance is bound to a given maven repository.
*/
public class IpcClient {
Path repository;
Socket socket;
DataOutputStream output;
DataInputStream input;
Thread receiver;
AtomicInteger requestId = new AtomicInteger();
Map<Integer, CompletableFuture<List<String>>> responses = new ConcurrentHashMap<>();
IpcClient(Path repository) {
this.repository = repository;
}
synchronized Socket ensureInitialized() throws IOException {
if (socket == null) {
socket = createClient();
input = new DataInputStream(socket.getInputStream());
output = new DataOutputStream(socket.getOutputStream());
receiver = new Thread(this::receive);
receiver.setDaemon(true);
receiver.start();
}
return socket;
}
Socket createClient() throws IOException {
InetAddress loopback = InetAddress.getLoopbackAddress();
Path lockFile = repository.resolve(".maven-resolver-ipc-lock").toAbsolutePath().normalize();
if (!Files.isRegularFile(lockFile)) {
if (!Files.isDirectory(lockFile.getParent())) {
Files.createDirectories(lockFile.getParent());
}
}
try (RandomAccessFile raf = new RandomAccessFile(lockFile.toFile(), "rw")) {
try (FileLock lock = raf.getChannel().lock()) {
String line = raf.readLine();
if (line != null) {
int port = Integer.parseInt(line);
if (port > 0) {
try {
Socket socket = new Socket();
socket.connect(new InetSocketAddress(loopback, port));
return socket;
} catch (IOException e) {
// ignore
}
}
}
ServerSocket ss = new ServerSocket();
ss.bind(new InetSocketAddress(loopback, 0));
int tmpport = ss.getLocalPort();
int rand = new Random().nextInt();
List<String> args = new ArrayList<>();
String javaHome = System.getenv("JAVA_HOME");
if (javaHome == null) {
javaHome = System.getProperty("java.home");
}
boolean win = System.getProperty("os.name").toLowerCase(Locale.ROOT).contains("win");
String javaCmd = win ? "bin\\java.exe" : "bin/java";
String java = Paths.get(javaHome).resolve(javaCmd).toAbsolutePath().toString();
args.add(java);
String classpath;
String className = getClass().getName().replace('.', '/') + ".class";
String url = getClass().getClassLoader().getResource(className).toString();
if (url.startsWith("jar:")) {
classpath = url.substring("jar:".length(), url.indexOf("!/"));
} else if (url.startsWith("file:")) {
classpath = url.substring("file:".length(), url.indexOf(className));
} else {
throw new IllegalStateException();
}
args.add("-cp");
args.add(classpath);
args.add(IpcServer.class.getName());
args.add(Integer.toString(tmpport));
args.add(Integer.toString(rand));
ProcessBuilder processBuilder = new ProcessBuilder();
ProcessBuilder.Redirect discard = ProcessBuilder.Redirect.to(new File(win ? "NUL" : "/dev/null"));
discard = ProcessBuilder.Redirect.INHERIT;
Process process = processBuilder
.directory(lockFile.getParent().toFile())
.command(args)
.redirectOutput(discard)
.redirectError(discard)
.start();
Future<int[]> future = ForkJoinPool.commonPool().submit(() -> {
Socket s = ss.accept();
DataInputStream dis = new DataInputStream(s.getInputStream());
int rand2 = dis.readInt();
int port2 = dis.readInt();
return new int[] { rand2, port2 };
});
int[] res = future.get(5, TimeUnit.SECONDS);
if (rand != res[0]) {
process.destroyForcibly();
}
ss.close();
int port = res[1];
Socket socket = new Socket();
socket.connect(new InetSocketAddress(loopback, port));
raf.seek(0);
raf.writeBytes(port + "\n");
return socket;
} catch (Exception e) {
throw new RuntimeException("Unable to create and connect to lock server", e);
}
}
}
void receive() {
try {
while (true) {
int id = input.readInt();
int sz = input.readInt();
List<String> s = new ArrayList<>(sz);
for (int i = 0; i < sz; i++) {
s.add(input.readUTF());
}
CompletableFuture<List<String>> f = responses.remove(id);
if (f == null || s.isEmpty()) {
throw new IllegalStateException("Protocol error");
}
f.complete(s);
}
} catch (Exception e) {
// ignore
close();
}
}
List<String> send(List<String> request) throws IOException {
ensureInitialized();
int id = requestId.incrementAndGet();
CompletableFuture<List<String>> response = new CompletableFuture<>();
responses.put(id, response);
synchronized (output) {
output.writeInt(id);
output.writeInt(request.size());
for (String s : request) {
output.writeUTF(s);
}
output.flush();
}
try {
return response.get();
} catch (InterruptedException e) {
throw (IOException) new InterruptedIOException("Interrupted").initCause(e);
} catch (ExecutionException e) {
throw new IOException("Execution error", e);
}
}
synchronized void close() {
if (socket != null) {
try {
socket.close();
} catch (IOException t) {
// ignore
}
socket = null;
input = null;
output = null;
}
if (receiver != null) {
receiver.interrupt();
try {
receiver.join(1000);
} catch (InterruptedException e) {
// ignore
}
}
Throwable t = new IOException("Closing");
responses.values().forEach(f -> f.completeExceptionally(t));
responses.clear();
}
String newContext(boolean shared) {
try {
List<String> response = send(Arrays.asList(
REQUEST_CONTEXT, Boolean.toString(shared)));
if (response.size() != 2 || !RESPONSE_CONTEXT.equals(response.get(0))) {
throw new IOException("Unexpected response: " + response);
}
return response.get(1);
} catch (Exception e) {
close();
throw new RuntimeException("Unable to create new context", e);
}
}
void lock(String contextId, Collection<String> keys) {
try {
List<String> req = new ArrayList<>(keys.size() + 2);
req.add(REQUEST_ACQUIRE);
req.add(contextId);
req.addAll(keys);
List<String> response = send(req);
if (response.size() != 1 || !RESPONSE_ACQUIRE.equals(response.get(0))) {
throw new IOException("Unexpected response: " + response);
}
} catch (Exception e) {
close();
throw new RuntimeException("Unable to perform lock", e);
}
}
void unlock(String contextId) {
try {
List<String> response = send(Arrays.asList(REQUEST_CLOSE, contextId));
if (response.size() != 1 || !RESPONSE_CLOSE.equals(response.get(0))) {
throw new IOException("Unexpected response: " + response);
}
} catch (Exception e) {
close();
throw new RuntimeException("Unable to perform lock", e);
}
}
@Override
public String toString() {
return "IpcClient{"
+ "repository=" + repository
+ '}';
}
}

View File

@@ -0,0 +1,30 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.mvndaemon.mvnd.sync;
/**
* Constants used for the inter-process communication protocol.
*/
public class IpcMessages {
public static final String REQUEST_CONTEXT = "request-context";
public static final String REQUEST_ACQUIRE = "request-acquire";
public static final String REQUEST_CLOSE = "request-close";
public static final String RESPONSE_CONTEXT = "response-context";
public static final String RESPONSE_ACQUIRE = "response-acquire";
public static final String RESPONSE_CLOSE = "response-close";
}

View File

@@ -0,0 +1,347 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.mvndaemon.mvnd.sync;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.mvndaemon.mvnd.sync.IpcMessages.REQUEST_ACQUIRE;
import static org.mvndaemon.mvnd.sync.IpcMessages.REQUEST_CLOSE;
import static org.mvndaemon.mvnd.sync.IpcMessages.REQUEST_CONTEXT;
import static org.mvndaemon.mvnd.sync.IpcMessages.RESPONSE_ACQUIRE;
import static org.mvndaemon.mvnd.sync.IpcMessages.RESPONSE_CLOSE;
import static org.mvndaemon.mvnd.sync.IpcMessages.RESPONSE_CONTEXT;
/**
* Implementation of the server side.
* The server instance is bound to a given maven repository.
*/
public class IpcServer {
static final long IDLE_TIMEOUT = TimeUnit.SECONDS.toNanos(60);
private final ServerSocket serverSocket;
private final AtomicInteger clients = new AtomicInteger();
private final AtomicInteger counter = new AtomicInteger();
private final Map<String, Lock> locks = new ConcurrentHashMap<>();
private final Map<String, Context> contexts = new ConcurrentHashMap<>();
private volatile long lastUsed;
private volatile boolean closing;
public IpcServer() throws IOException {
serverSocket = new ServerSocket();
serverSocket.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
}
public static void main(String[] args) throws Exception {
// When spawning a new process, the child process is create within
// the same process group. This means that a few signals are sent
// to the whole group. This is the case for SIGINT (Ctrl-C) and
// SIGTSTP (Ctrl-Z) which are both sent to all the processed in the
// group when initiated from the controlling terminal.
// This is only a problem when the client creates the daemon, but
// without ignoring those signals, a client being interrupted will
// also interrupt and kill the daemon.
try {
sun.misc.Signal.handle(new sun.misc.Signal("INT"), sun.misc.SignalHandler.SIG_IGN);
if (System.getProperty("os.name").toLowerCase(Locale.ROOT).contains("win")) {
sun.misc.Signal.handle(new sun.misc.Signal("TSTP"), sun.misc.SignalHandler.SIG_IGN);
}
} catch (Throwable t) {
System.err.println("Unable to ignore INT and TSTP signals");
t.printStackTrace();
}
int tmpPort = Integer.parseInt(args[0]);
int rand = Integer.parseInt(args[1]);
IpcServer server = new IpcServer();
run(server::run);
int port = server.getPort();
try (Socket s = new Socket()) {
s.connect(new InetSocketAddress(InetAddress.getLoopbackAddress(), tmpPort));
try (DataOutputStream dos = new DataOutputStream(s.getOutputStream())) {
dos.writeInt(rand);
dos.writeInt(port);
dos.flush();
}
}
}
private static void info(String msg, Object... args) {
System.out.printf(msg + "\n", args);
}
private static void error(String msg, Throwable t) {
System.err.println(msg);
t.printStackTrace();
}
private static void run(Runnable runnable) {
Thread thread = new Thread(runnable);
thread.start();
}
public int getPort() {
return serverSocket.getLocalPort();
}
public void run() {
try {
info("IpcServer started on port %d", getPort());
use();
run(this::expirationCheck);
while (!closing) {
Socket socket = this.serverSocket.accept();
run(() -> client(socket));
}
} catch (Throwable t) {
if (!closing) {
error("Error running sync server loop", t);
}
}
}
private void client(Socket socket) {
info("Client connected");
clients.incrementAndGet();
use();
Map<String, Context> clientContexts = new ConcurrentHashMap<>();
try {
DataInputStream input = new DataInputStream(socket.getInputStream());
DataOutputStream output = new DataOutputStream(socket.getOutputStream());
while (!closing) {
int requestId = input.readInt();
int sz = input.readInt();
List<String> request = new ArrayList<>(sz);
for (int i = 0; i < sz; i++) {
request.add(input.readUTF());
}
if (request.isEmpty()) {
throw new IOException("Received invalid request");
}
String contextId;
Context context;
String command = request.remove(0);
switch (command) {
case REQUEST_CONTEXT:
if (request.size() != 1) {
throw new IOException("Expected one argument for " + command + " but got " + request);
}
boolean shared = Boolean.parseBoolean(request.remove(0));
context = new Context(shared);
contexts.put(context.id, context);
clientContexts.put(context.id, context);
synchronized (output) {
output.writeInt(requestId);
output.writeInt(2);
output.writeUTF(RESPONSE_CONTEXT);
output.writeUTF(context.id);
output.flush();
}
break;
case REQUEST_ACQUIRE:
if (request.size() < 1) {
throw new IOException("Expected at least one argument for " + command + " but got " + request);
}
contextId = request.remove(0);
context = contexts.get(contextId);
if (context == null) {
throw new IOException("Unknown context: " + contextId);
}
context.lock(request).thenRun(() -> {
try {
synchronized (output) {
output.writeInt(requestId);
output.writeInt(1);
output.writeUTF(RESPONSE_ACQUIRE);
output.flush();
}
} catch (IOException e) {
try {
socket.close();
} catch (IOException ioException) {
// ignore
}
}
});
break;
case REQUEST_CLOSE:
if (request.size() != 1) {
throw new IOException("Expected one argument for " + command + " but got " + request);
}
contextId = request.remove(0);
context = contexts.remove(contextId);
clientContexts.remove(contextId);
if (context == null) {
throw new IOException("Unknown context: " + contextId);
}
context.unlock();
synchronized (output) {
output.writeInt(requestId);
output.writeInt(1);
output.writeUTF(RESPONSE_CLOSE);
output.flush();
}
break;
default:
throw new IOException("Unknown request: " + request.get(0));
}
}
} catch (Throwable t) {
error("Error processing request", t);
} finally {
info("Client disconnecting...");
clientContexts.values().forEach(context -> {
contexts.remove(context.id);
context.unlock();
});
try {
socket.close();
} catch (IOException ioException) {
// ignore
}
info("%d clients left", clients.decrementAndGet());
}
}
private void use() {
lastUsed = System.nanoTime();
}
private void expirationCheck() {
while (true) {
long current = System.nanoTime();
if (current - lastUsed > IDLE_TIMEOUT) {
close();
break;
}
}
}
private void close() {
closing = true;
try {
serverSocket.close();
} catch (IOException e) {
error("Error closing server socket", e);
}
}
static class Waiter {
final Context context;
final CompletableFuture<Void> future;
Waiter(Context context, CompletableFuture<Void> future) {
this.context = context;
this.future = future;
}
}
static class Lock {
final String key;
List<Context> holders;
List<Waiter> waiters;
Lock(String key) {
this.key = key;
}
public synchronized CompletableFuture<Void> lock(Context context) {
if (holders == null) {
holders = new ArrayList<>();
}
if (holders.isEmpty() || holders.get(0).shared && context.shared) {
holders.add(context);
return CompletableFuture.completedFuture(null);
}
if (waiters == null) {
waiters = new ArrayList<>();
}
CompletableFuture<Void> future = new CompletableFuture<>();
waiters.add(new Waiter(context, future));
return future;
}
public synchronized void unlock(Context context) {
if (holders.remove(context)) {
while (waiters != null && !waiters.isEmpty()
&& (holders.isEmpty() || holders.get(0).shared && waiters.get(0).context.shared)) {
Waiter waiter = waiters.remove(0);
holders.add(waiter.context);
waiter.future.complete(null);
}
} else if (waiters != null) {
for (Iterator<Waiter> it = waiters.iterator(); it.hasNext();) {
Waiter waiter = it.next();
if (waiter.context == context) {
it.remove();
waiter.future.cancel(false);
}
}
}
}
}
class Context {
final String id;
final boolean shared;
final List<String> locks = new CopyOnWriteArrayList<>();
Context(boolean shared) {
this.id = String.format("%08x", counter.incrementAndGet());
this.shared = shared;
}
public CompletableFuture<?> lock(List<String> keys) {
locks.addAll(keys);
CompletableFuture<?>[] futures = keys.stream()
.map(k -> IpcServer.this.locks.computeIfAbsent(k, Lock::new))
.map(l -> l.lock(this))
.toArray(CompletableFuture[]::new);
return CompletableFuture.allOf(futures);
}
public void unlock() {
locks.stream()
.map(k -> IpcServer.this.locks.computeIfAbsent(k, Lock::new))
.forEach(l -> l.unlock(this));
}
}
}

View File

@@ -0,0 +1,88 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.mvndaemon.mvnd.sync;
import java.util.Collection;
import java.util.TreeSet;
import java.util.stream.Stream;
import org.eclipse.aether.SyncContext;
import org.eclipse.aether.artifact.Artifact;
import org.eclipse.aether.metadata.Metadata;
/**
* The SyncContext implementation.
*/
class IpcSyncContext implements SyncContext {
IpcClient client;
boolean shared;
String contextId;
IpcSyncContext(IpcClient client, boolean shared) {
this.client = client;
this.shared = shared;
this.contextId = client.newContext(shared);
}
@Override
public void acquire(Collection<? extends Artifact> artifacts, Collection<? extends Metadata> metadatas) {
Collection<String> keys = new TreeSet<>();
stream(artifacts).map(this::getKey).forEach(keys::add);
stream(metadatas).map(this::getKey).forEach(keys::add);
if (keys.isEmpty()) {
return;
}
client.lock(contextId, keys);
}
@Override
public void close() {
if (contextId != null) {
client.unlock(contextId);
}
}
@Override
public String toString() {
return "IpcSyncContext{"
+ "client=" + client
+ ", shared=" + shared
+ ", contextId='" + contextId + '\''
+ '}';
}
private <T> Stream<T> stream(Collection<T> col) {
return col != null ? col.stream() : Stream.empty();
}
private String getKey(Artifact a) {
return "artifact:" + a.getGroupId() + ":" + a.getArtifactId() + ":" + a.getBaseVersion();
}
private String getKey(Metadata m) {
StringBuilder key = new StringBuilder("metadata:");
if (!m.getGroupId().isEmpty()) {
key.append(m.getGroupId());
if (!m.getArtifactId().isEmpty()) {
key.append(':').append(m.getArtifactId());
if (!m.getVersion().isEmpty()) {
key.append(':').append(m.getVersion());
}
}
}
return key.toString();
}
}

View File

@@ -0,0 +1,57 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.mvndaemon.mvnd.sync;
import java.nio.file.Path;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.PreDestroy;
import javax.annotation.Priority;
import javax.inject.Named;
import javax.inject.Singleton;
import org.eclipse.aether.RepositorySystemSession;
import org.eclipse.aether.SyncContext;
import org.eclipse.aether.impl.SyncContextFactory;
/**
* The SyncContextFactory implementation.
*/
@Named
@Priority(Integer.MAX_VALUE)
@Singleton
public class IpcSyncContextFactory implements SyncContextFactory {
private final Map<Path, IpcClient> clients = new ConcurrentHashMap<>();
@Override
public SyncContext newInstance(RepositorySystemSession session, boolean shared) {
Path repository = session.getLocalRepository().getBasedir().toPath();
IpcClient client = clients.computeIfAbsent(repository, IpcClient::new);
return new IpcSyncContext(client, shared);
}
@PreDestroy
void close() {
clients.values().forEach(IpcClient::close);
}
@Override
public String toString() {
return "IpcSyncContextFactory{"
+ "clients=" + clients
+ '}';
}
}

View File

@@ -0,0 +1,82 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.mvndaemon.mvnd.sync;
import java.io.File;
import java.util.Collections;
import org.eclipse.aether.DefaultRepositorySystemSession;
import org.eclipse.aether.SyncContext;
import org.eclipse.aether.artifact.Artifact;
import org.eclipse.aether.artifact.DefaultArtifact;
import org.eclipse.aether.impl.SyncContextFactory;
import org.eclipse.aether.internal.impl.SimpleLocalRepositoryManagerFactory;
import org.eclipse.aether.repository.LocalRepository;
import org.eclipse.aether.repository.LocalRepositoryManager;
import org.junit.jupiter.api.Test;
public class IpcSyncContextTest {
@Test
public void testContextSimple() throws Exception {
SyncContextFactory factory = new IpcSyncContextFactory();
DefaultRepositorySystemSession session = new DefaultRepositorySystemSession();
LocalRepository repository = new LocalRepository(new File("target/test-repo"));
LocalRepositoryManager localRepositoryManager = new SimpleLocalRepositoryManagerFactory()
.newInstance(session, repository);
session.setLocalRepositoryManager(localRepositoryManager);
Artifact artifact = new DefaultArtifact("myGroup", "myArtifact", "jar", "0.1");
try (SyncContext context = factory.newInstance(session, false)) {
context.acquire(Collections.singleton(artifact), null);
Thread.sleep(50);
}
}
@Test
public void testContext() throws Exception {
SyncContextFactory factory = new IpcSyncContextFactory();
DefaultRepositorySystemSession session = new DefaultRepositorySystemSession();
LocalRepository repository = new LocalRepository(new File("target/test-repo"));
LocalRepositoryManager localRepositoryManager = new SimpleLocalRepositoryManagerFactory()
.newInstance(session, repository);
session.setLocalRepositoryManager(localRepositoryManager);
Artifact artifact = new DefaultArtifact("myGroup", "myArtifact", "jar", "0.1");
Thread[] threads = new Thread[10];
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
try (SyncContext context = factory.newInstance(session, false)) {
System.out.println("Trying to lock from " + context);
context.acquire(Collections.singleton(artifact), null);
System.out.println("Lock acquired from " + context);
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Unlock from " + context);
}
});
threads[i].start();
}
for (Thread thread : threads) {
thread.join();
}
}
}