Provide a local (semaphore based) sync context and use it as the default (#480)

* Provide a local (semaphore based) sync context and use it as the default
* Use a ReentrantLock
* Update daemon/src/main/java/org/mvndaemon/mvnd/sync/MvndSyncContextFactory.java
Co-authored-by: Peter Palaga <ppalaga@redhat.com>
* Improve lock acquisition
Co-authored-by: Peter Palaga <ppalaga@redhat.com>
This commit is contained in:
Guillaume Nodet
2021-10-20 10:23:13 +02:00
committed by GitHub
parent 0a77ed06ba
commit 69c975fcd0
2 changed files with 74 additions and 1 deletions

View File

@@ -257,7 +257,7 @@ public enum Environment {
/**
* The SyncContextFactory to use (can be either 'noop' or 'ipc' for a server-wide factory).
*/
MVND_SYNC_CONTEXT_FACTORY("mvnd.syncContextFactory", null, "noop", OptionType.BOOLEAN, Flags.OPTIONAL);
MVND_SYNC_CONTEXT_FACTORY("mvnd.syncContextFactory", null, "local", OptionType.BOOLEAN, Flags.OPTIONAL);
static Properties properties;

View File

@@ -15,10 +15,17 @@
*/
package org.mvndaemon.mvnd.sync;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Stream;
import javax.inject.Named;
import javax.inject.Singleton;
import org.eclipse.aether.RepositorySystemSession;
@@ -36,6 +43,7 @@ public class MvndSyncContextFactory implements SyncContextFactory {
public static final String FACTORY_NOOP = "noop";
public static final String FACTORY_IPC = "ipc";
public static final String FACTORY_LOCAL = "local";
public static final String IPC_SYNC_CONTEXT_FACTORY = "org.mvndaemon.mvnd.sync.IpcSyncContextFactory";
@@ -45,6 +53,7 @@ public class MvndSyncContextFactory implements SyncContextFactory {
public MvndSyncContextFactory() {
try {
Map<String, SyncContextFactory> map = new HashMap<>();
map.put(FACTORY_LOCAL, new LocalSyncContextFactory());
map.put(FACTORY_NOOP, new NoopSyncContextFactory());
Class<? extends SyncContextFactory> factoryClass = (Class<? extends SyncContextFactory>) getClass().getClassLoader()
.loadClass(IPC_SYNC_CONTEXT_FACTORY);
@@ -80,4 +89,68 @@ public class MvndSyncContextFactory implements SyncContextFactory {
};
}
}
private static class LocalSyncContextFactory implements SyncContextFactory {
final Map<String, Lock> locks = new ConcurrentHashMap<>();
@Override
public SyncContext newInstance(RepositorySystemSession session, boolean shared) {
return new LocalSyncContext();
}
private class LocalSyncContext implements SyncContext {
private final Deque<String> locked = new ArrayDeque<>();
@Override
public void acquire(Collection<? extends Artifact> artifacts, Collection<? extends Metadata> metadatas) {
stream(artifacts).map(this::getKey).sorted().forEach(this::acquire);
stream(metadatas).map(this::getKey).sorted().forEach(this::acquire);
}
private void acquire(String key) {
try {
getLock(key).lock();
locked.add(key);
} catch (Exception e) {
close();
throw new IllegalStateException("Could not acquire lock for '" + key + "'", e);
}
}
@Override
public void close() {
String key;
while ((key = locked.poll()) != null) {
getLock(key).unlock();
}
}
private Lock getLock(String key) {
return locks.computeIfAbsent(key, k -> new ReentrantLock());
}
private <T> Stream<T> stream(Collection<T> col) {
return col != null ? col.stream() : Stream.empty();
}
private String getKey(Artifact a) {
return "artifact:" + a.getGroupId() + ":" + a.getArtifactId() + ":" + a.getBaseVersion();
}
private String getKey(Metadata m) {
StringBuilder key = new StringBuilder("metadata:");
if (!m.getGroupId().isEmpty()) {
key.append(m.getGroupId());
if (!m.getArtifactId().isEmpty()) {
key.append(':').append(m.getArtifactId());
if (!m.getVersion().isEmpty()) {
key.append(':').append(m.getVersion());
}
}
}
return key.toString();
}
}
}
}