Use unix sockets and native compilation for the ipc sync context

This commit is contained in:
Guillaume Nodet
2021-07-21 17:23:21 +02:00
parent 34b62a8cc7
commit 4b75fec3e1
12 changed files with 667 additions and 94 deletions

View File

@@ -89,7 +89,76 @@
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<executions>
<execution>
<id>default</id>
<goals>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>jdk16</id>
<goals>
<goal>compile</goal>
</goals>
<configuration>
<release>16</release>
<multiReleaseOutput>true</multiReleaseOutput>
<compileSourceRoots>
<root>${project.basedir}/src/main/java16</root>
</compileSourceRoots>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.2.0</version>
<configuration>
<archive>
<manifestFile>src/main/resources/META-INF/MANIFEST.MF</manifestFile>
</archive>
</configuration>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>native</id>
<build>
<plugins>
<plugin>
<groupId>org.graalvm.nativeimage</groupId>
<artifactId>native-image-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>native-image</goal>
</goals>
<phase>package</phase>
</execution>
</executions>
<configuration>
<skip>false</skip>
<mainClass>org.mvndaemon.mvnd.sync.IpcServer</mainClass>
<imageName>mvnd-sync</imageName>
<buildArgs>
--no-server
--no-fallback
--allow-incomplete-classpath
-H:-ParseRuntimeOptions
-ea
</buildArgs>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>

View File

@@ -0,0 +1,53 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.mvndaemon.mvnd.sync;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ByteChannel;
/**
* Trivial ByteChannel wrapper to avoid the read/write synchronization which
* happens when the channel implements SelectableChannel.
*/
public class ByteChannelWrapper implements ByteChannel {
private final ByteChannel socket;
public ByteChannelWrapper(ByteChannel socket) {
this.socket = socket;
}
@Override
public int read(ByteBuffer dst) throws IOException {
return socket.read(dst);
}
@Override
public int write(ByteBuffer src) throws IOException {
return socket.write(src);
}
@Override
public boolean isOpen() {
return socket.isOpen();
}
@Override
public void close() throws IOException {
socket.close();
}
}

View File

@@ -22,11 +22,13 @@ import java.io.File;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.RandomAccessFile;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.StandardProtocolFamily;
import java.nio.channels.ByteChannel;
import java.nio.channels.Channels;
import java.nio.channels.FileLock;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
@@ -36,6 +38,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -52,6 +55,7 @@ import static org.mvndaemon.mvnd.sync.IpcMessages.REQUEST_CONTEXT;
import static org.mvndaemon.mvnd.sync.IpcMessages.RESPONSE_ACQUIRE;
import static org.mvndaemon.mvnd.sync.IpcMessages.RESPONSE_CLOSE;
import static org.mvndaemon.mvnd.sync.IpcMessages.RESPONSE_CONTEXT;
import static org.mvndaemon.mvnd.sync.IpcServer.FAMILY_PROP;
/**
* Client side implementation.
@@ -60,32 +64,39 @@ import static org.mvndaemon.mvnd.sync.IpcMessages.RESPONSE_CONTEXT;
public class IpcClient {
Path repository;
Socket socket;
Path syncServerPath;
SocketChannel socket;
DataOutputStream output;
DataInputStream input;
Thread receiver;
AtomicInteger requestId = new AtomicInteger();
Map<Integer, CompletableFuture<List<String>>> responses = new ConcurrentHashMap<>();
IpcClient(Path repository) {
IpcClient(Path repository, Path syncServerPath) {
this.repository = repository;
this.syncServerPath = syncServerPath;
}
synchronized Socket ensureInitialized() throws IOException {
synchronized void ensureInitialized() throws IOException {
if (socket == null) {
socket = createClient();
input = new DataInputStream(socket.getInputStream());
output = new DataOutputStream(socket.getOutputStream());
ByteChannel wrapper = SocketHelper.wrapChannel(socket);
input = new DataInputStream(Channels.newInputStream(wrapper));
output = new DataOutputStream(Channels.newOutputStream(wrapper));
receiver = new Thread(this::receive);
receiver.setDaemon(true);
receiver.start();
}
return socket;
}
Socket createClient() throws IOException {
InetAddress loopback = InetAddress.getLoopbackAddress();
Path lockFile = repository.resolve(".maven-resolver-ipc-lock").toAbsolutePath().normalize();
SocketChannel createClient() throws IOException {
String familyProp = System.getProperty(FAMILY_PROP);
StandardProtocolFamily family = familyProp != null
? StandardProtocolFamily.valueOf(familyProp)
: JavaVersion.getJavaSpec() >= 16.0f ? StandardProtocolFamily.UNIX : StandardProtocolFamily.INET;
Path lockFile = repository.resolve(".maven-resolver-ipc-lock-" + family.name().toLowerCase())
.toAbsolutePath().normalize();
if (!Files.isRegularFile(lockFile)) {
if (!Files.isDirectory(lockFile.getParent())) {
Files.createDirectories(lockFile.getParent());
@@ -96,56 +107,78 @@ public class IpcClient {
try (FileLock lock = raf.getChannel().lock()) {
String line = raf.readLine();
if (line != null) {
int port = Integer.parseInt(line);
if (port > 0) {
try {
Socket socket = new Socket();
socket.connect(new InetSocketAddress(loopback, port));
return socket;
} catch (IOException e) {
// ignore
}
try {
SocketAddress address = SocketHelper.socketAddressFromString(line);
return SocketChannel.open(address);
} catch (IOException e) {
// ignore
}
}
ServerSocket ss = new ServerSocket();
ss.bind(new InetSocketAddress(loopback, 0));
int tmpport = ss.getLocalPort();
int rand = new Random().nextInt();
String noFork = System.getProperty(IpcServer.NO_FORK_PROP);
ServerSocketChannel ss = SocketHelper.openServerSocket(family);
String tmpaddr = SocketHelper.socketAddressToString(ss.getLocalAddress());
String rand = Long.toHexString(new Random().nextLong());
String noNative = System.getProperty(IpcServer.NO_NATIVE_PROP);
Closeable close;
if (Boolean.parseBoolean(noFork)) {
IpcServer server = IpcServer.runServer(tmpport, rand);
close = server::close;
if (Boolean.parseBoolean(noNative)) {
String noFork = System.getProperty(IpcServer.NO_FORK_PROP);
if (Boolean.parseBoolean(noFork)) {
IpcServer server = IpcServer.runServer(family, tmpaddr, rand);
close = server::close;
} else {
List<String> args = new ArrayList<>();
String javaHome = System.getenv("JAVA_HOME");
if (javaHome == null) {
javaHome = System.getProperty("java.home");
}
boolean win = System.getProperty("os.name").toLowerCase(Locale.ROOT).contains("win");
String javaCmd = win ? "bin\\java.exe" : "bin/java";
String java = Paths.get(javaHome).resolve(javaCmd).toAbsolutePath().toString();
args.add(java);
String classpath;
String className = getClass().getName().replace('.', '/') + ".class";
String url = getClass().getClassLoader().getResource(className).toString();
if (url.startsWith("jar:")) {
classpath = url.substring("jar:".length(), url.indexOf("!/"));
} else if (url.startsWith("file:")) {
classpath = url.substring("file:".length(), url.indexOf(className));
} else {
throw new IllegalStateException();
}
args.add("-cp");
args.add(classpath);
String timeout = System.getProperty(IpcServer.IDLE_TIMEOUT_PROP);
if (timeout != null) {
args.add("-D" + IpcServer.IDLE_TIMEOUT_PROP + "=" + timeout);
}
args.add(IpcServer.class.getName());
args.add(family.name());
args.add(tmpaddr);
args.add(rand);
ProcessBuilder processBuilder = new ProcessBuilder();
ProcessBuilder.Redirect discard = ProcessBuilder.Redirect.to(new File(win ? "NUL" : "/dev/null"));
discard = ProcessBuilder.Redirect.INHERIT;
Process process = processBuilder
.directory(lockFile.getParent().toFile())
.command(args)
.redirectOutput(discard)
.redirectError(discard)
.start();
close = process::destroyForcibly;
}
} else {
List<String> args = new ArrayList<>();
String javaHome = System.getenv("JAVA_HOME");
if (javaHome == null) {
javaHome = System.getProperty("java.home");
}
boolean win = System.getProperty("os.name").toLowerCase(Locale.ROOT).contains("win");
String javaCmd = win ? "bin\\java.exe" : "bin/java";
String java = Paths.get(javaHome).resolve(javaCmd).toAbsolutePath().toString();
args.add(java);
String classpath;
String className = getClass().getName().replace('.', '/') + ".class";
String url = getClass().getClassLoader().getResource(className).toString();
if (url.startsWith("jar:")) {
classpath = url.substring("jar:".length(), url.indexOf("!/"));
} else if (url.startsWith("file:")) {
classpath = url.substring("file:".length(), url.indexOf(className));
} else {
throw new IllegalStateException();
}
args.add("-cp");
args.add(classpath);
String syncCmd = win ? "mvnd-sync.exe" : "mvnd-sync";
args.add(syncServerPath.resolve(syncCmd).toString());
String timeout = System.getProperty(IpcServer.IDLE_TIMEOUT_PROP);
if (timeout != null) {
args.add("-D" + IpcServer.IDLE_TIMEOUT_PROP + "=" + timeout);
}
args.add(IpcServer.class.getName());
args.add(Integer.toString(tmpport));
args.add(Integer.toString(rand));
args.add(family.name());
args.add(tmpaddr);
args.add(rand);
ProcessBuilder processBuilder = new ProcessBuilder();
ProcessBuilder.Redirect discard = ProcessBuilder.Redirect.to(new File(win ? "NUL" : "/dev/null"));
discard = ProcessBuilder.Redirect.INHERIT;
@@ -159,14 +192,14 @@ public class IpcClient {
}
ExecutorService es = Executors.newSingleThreadExecutor();
Future<int[]> future = es.submit(() -> {
Socket s = ss.accept();
DataInputStream dis = new DataInputStream(s.getInputStream());
int rand2 = dis.readInt();
int port2 = dis.readInt();
return new int[] { rand2, port2 };
Future<String[]> future = es.submit(() -> {
SocketChannel s = ss.accept();
DataInputStream dis = new DataInputStream(Channels.newInputStream(s));
String rand2 = dis.readUTF();
String addr2 = dis.readUTF();
return new String[] { rand2, addr2 };
});
int[] res;
String[] res;
try {
res = future.get(5, TimeUnit.SECONDS);
} catch (Exception e) {
@@ -176,17 +209,16 @@ public class IpcClient {
es.shutdownNow();
ss.close();
}
if (rand != res[0]) {
if (!Objects.equals(rand, res[0])) {
close.close();
throw new IllegalStateException("IpcServer did not respond with the correct random");
}
int port = res[1];
Socket socket = new Socket();
socket.connect(new InetSocketAddress(loopback, port));
SocketAddress addr = SocketHelper.socketAddressFromString(res[1]);
SocketChannel socket = SocketChannel.open(addr);
raf.seek(0);
raf.writeBytes(port + "\n");
raf.writeBytes(res[1] + "\n");
return socket;
} catch (Exception e) {
throw new RuntimeException("Unable to create and connect to lock server", e);
@@ -313,7 +345,16 @@ public class IpcClient {
public String toString() {
return "IpcClient{"
+ "repository=" + repository + ','
+ "port=" + (socket != null ? socket.getPort() : 0)
+ "address=" + (socket != null ? getAddress() : 0)
+ '}';
}
private String getAddress() {
try {
return SocketHelper.socketAddressToString(socket.getLocalAddress());
} catch (IOException e) {
return "[not bound]";
}
}
}

View File

@@ -18,10 +18,12 @@ package org.mvndaemon.mvnd.sync;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.StandardProtocolFamily;
import java.nio.channels.ByteChannel;
import java.nio.channels.Channels;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
@@ -51,10 +53,14 @@ public class IpcServer {
public static final String IDLE_TIMEOUT_PROP = "ipcsync.idle.timeout";
public static final String FAMILY_PROP = "ipcsync.family";
public static final String NO_NATIVE_PROP = "ipcsync.nonative";
static final long IDLE_TIMEOUT = TimeUnit.SECONDS.toNanos(60);
private final ServerSocket serverSocket;
private final Map<Socket, Thread> clients = new HashMap<>();
private final ServerSocketChannel serverSocket;
private final Map<SocketChannel, Thread> clients = new HashMap<>();
private final AtomicInteger counter = new AtomicInteger();
private final Map<String, Lock> locks = new ConcurrentHashMap<>();
private final Map<String, Context> contexts = new ConcurrentHashMap<>();
@@ -62,9 +68,8 @@ public class IpcServer {
private volatile long lastUsed;
private volatile boolean closing;
public IpcServer() throws IOException {
serverSocket = new ServerSocket();
serverSocket.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
public IpcServer(StandardProtocolFamily family) throws IOException {
serverSocket = SocketHelper.openServerSocket(family);
long timeout = IDLE_TIMEOUT;
String str = System.getProperty(IDLE_TIMEOUT_PROP);
if (str != null) {
@@ -101,22 +106,23 @@ public class IpcServer {
error("Unable to ignore INT and TSTP signals", t);
}
int tmpPort = Integer.parseInt(args[0]);
int rand = Integer.parseInt(args[1]);
String family = args[0];
String tmpAddress = args[1];
String rand = args[2];
runServer(tmpPort, rand);
runServer(StandardProtocolFamily.valueOf(family), tmpAddress, rand);
}
static IpcServer runServer(int tmpPort, int rand) throws IOException {
IpcServer server = new IpcServer();
static IpcServer runServer(StandardProtocolFamily family, String tmpAddress, String rand) throws IOException {
IpcServer server = new IpcServer(family);
run(server::run);
int port = server.getPort();
try (Socket s = new Socket()) {
s.connect(new InetSocketAddress(InetAddress.getLoopbackAddress(), tmpPort));
try (DataOutputStream dos = new DataOutputStream(s.getOutputStream())) {
dos.writeInt(rand);
dos.writeInt(port);
String address = SocketHelper.socketAddressToString(server.getLocalAddress());
SocketAddress socketAddress = SocketHelper.socketAddressFromString(tmpAddress);
SocketHelper.checkFamily(family, socketAddress);
try (SocketChannel socket = SocketChannel.open(socketAddress)) {
try (DataOutputStream dos = new DataOutputStream(Channels.newOutputStream(socket))) {
dos.writeUTF(rand);
dos.writeUTF(address);
dos.flush();
}
}
@@ -142,17 +148,17 @@ public class IpcServer {
thread.start();
}
public int getPort() {
return serverSocket.getLocalPort();
public SocketAddress getLocalAddress() throws IOException {
return serverSocket.getLocalAddress();
}
public void run() {
try {
info("IpcServer started on port %d", getPort());
info("IpcServer started at %s", getLocalAddress().toString());
use();
run(this::expirationCheck);
while (!closing) {
Socket socket = this.serverSocket.accept();
SocketChannel socket = this.serverSocket.accept();
run(() -> client(socket));
}
} catch (Throwable t) {
@@ -162,7 +168,7 @@ public class IpcServer {
}
}
private void client(Socket socket) {
private void client(SocketChannel socket) {
int c;
synchronized (clients) {
clients.put(socket, Thread.currentThread());
@@ -172,8 +178,9 @@ public class IpcServer {
use();
Map<String, Context> clientContexts = new ConcurrentHashMap<>();
try {
DataInputStream input = new DataInputStream(socket.getInputStream());
DataOutputStream output = new DataOutputStream(socket.getOutputStream());
ByteChannel wrapper = SocketHelper.wrapChannel(socket);
DataInputStream input = new DataInputStream(Channels.newInputStream(wrapper));
DataOutputStream output = new DataOutputStream(Channels.newOutputStream(wrapper));
while (!closing) {
int requestId = input.readInt();
int sz = input.readInt();

View File

@@ -16,6 +16,7 @@
package org.mvndaemon.mvnd.sync;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.PreDestroy;
@@ -39,7 +40,9 @@ public class IpcSyncContextFactory implements SyncContextFactory {
@Override
public SyncContext newInstance(RepositorySystemSession session, boolean shared) {
Path repository = session.getLocalRepository().getBasedir().toPath();
IpcClient client = clients.computeIfAbsent(repository, IpcClient::new);
String mvndHome = System.getProperty("mvnd.home");
Path syncPath = mvndHome != null ? Paths.get(mvndHome).resolve("bin") : null;
IpcClient client = clients.computeIfAbsent(repository, r -> new IpcClient(r, syncPath));
return new IpcSyncContext(client, shared);
}

View File

@@ -0,0 +1,28 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.mvndaemon.mvnd.sync;
public class JavaVersion {
private static float javaSpec = 0.0f;
public static float getJavaSpec() {
if (javaSpec <= 0.0f) {
javaSpec = Float.parseFloat(System.getProperty("java.specification.version"));
}
return javaSpec;
}
}

View File

@@ -0,0 +1,148 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.mvndaemon.mvnd.sync;
import java.io.IOException;
import java.net.Inet4Address;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.StandardProtocolFamily;
import java.net.UnknownHostException;
import java.nio.channels.ByteChannel;
import java.nio.channels.ServerSocketChannel;
import java.util.Objects;
import static java.net.StandardProtocolFamily.INET;
import static java.net.StandardProtocolFamily.INET6;
import static java.util.Objects.requireNonNull;
public class SocketHelper {
public static void checkFamily(StandardProtocolFamily family, SocketAddress address) {
Objects.requireNonNull(family);
Objects.requireNonNull(address);
if (address instanceof InetSocketAddress) {
InetSocketAddress isa = (InetSocketAddress) address;
InetAddress ia = isa.getAddress();
if (ia != null
&& !(ia instanceof Inet4Address && family == INET)
&& !(ia instanceof Inet6Address && family == INET6)) {
throw new IllegalArgumentException(
"Socket address '" + address + "' does not match required family '" + family + "'");
}
// } else if (address instanceof UnixDomainSocketAddress) {
// if (family != StandardProtocolFamily.UNIX) {
// throw new IllegalArgumentException("Socket address '" + address + "' does not match required family '" + family + "'");
// }
} else {
throw new IllegalArgumentException(
"Socket address '" + address + "' does not match required family '" + family + "'");
}
}
public static SocketAddress socketAddressFromString(String str) {
if (str.startsWith("inet:")) {
String s = str.substring("inet:".length());
int ic = s.lastIndexOf(':');
String ia = s.substring(0, ic);
int is = ia.indexOf('/');
String h = ia.substring(0, is);
String a = ia.substring(is + 1);
String p = s.substring(ic + 1);
InetAddress addr;
if ("<unresolved>".equals(a)) {
return InetSocketAddress.createUnresolved(h, Integer.parseInt(p));
} else {
if (a.indexOf('.') > 0) {
String[] as = a.split("\\.");
if (as.length != 4) {
throw new IllegalArgumentException("Unsupported address: " + str);
}
byte[] ab = new byte[4];
for (int i = 0; i < 4; i++) {
ab[i] = (byte) Integer.parseInt(as[i]);
}
try {
addr = InetAddress.getByAddress(h.isEmpty() ? null : h, ab);
} catch (UnknownHostException e) {
throw new IllegalArgumentException("Unsupported address: " + str, e);
}
} else {
throw new IllegalArgumentException("Unsupported address: " + str);
}
return new InetSocketAddress(addr, Integer.parseInt(p));
}
// } else if (str.startsWith("unix:")) {
// return UnixDomainSocketAddress.of(str.substring("unix:".length()));
} else {
throw new IllegalArgumentException("Unsupported address: " + str);
}
}
public static String socketAddressToString(SocketAddress address) {
if (address instanceof InetSocketAddress) {
InetSocketAddress isa = (InetSocketAddress) address;
String host = isa.getHostString();
InetAddress addr = isa.getAddress();
int port = isa.getPort();
String formatted;
if (addr == null) {
formatted = host + "/<unresolved>";
} else {
formatted = addr.toString();
if (addr instanceof Inet6Address) {
int i = formatted.lastIndexOf("/");
formatted = formatted.substring(0, i + 1) + "[" + formatted.substring(i + 1) + "]";
}
}
return "inet:" + formatted + ":" + port;
// } else if (address instanceof UnixDomainSocketAddress) {
// return "unix:" + address;
} else {
throw new IllegalArgumentException("Unsupported address: " + address);
}
}
public static ServerSocketChannel openServerSocket(StandardProtocolFamily family) throws IOException {
return ServerSocketChannel.open(/*family*/).bind(getLoopbackAddress(family), 0);
}
private static SocketAddress getLoopbackAddress(StandardProtocolFamily family) {
try {
Objects.requireNonNull(family);
switch (family) {
case INET:
return new InetSocketAddress(Inet4Address.getByAddress(new byte[] { 127, 0, 0, 1 }), 0);
case INET6:
return new InetSocketAddress(
Inet6Address.getByAddress(new byte[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1 }), 0);
// case UNIX:
// return null;
default:
throw new IllegalArgumentException("Unsupported family: " + family);
}
} catch (UnknownHostException e) {
throw new IllegalArgumentException("Unsupported family: " + family, e);
}
}
public static ByteChannel wrapChannel(ByteChannel channel) {
return new ByteChannelWrapper(channel);
}
}

View File

@@ -0,0 +1,147 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.mvndaemon.mvnd.sync;
import java.io.IOException;
import java.net.Inet4Address;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ProtocolFamily;
import java.net.SocketAddress;
import java.net.StandardProtocolFamily;
import java.net.UnixDomainSocketAddress;
import java.net.UnknownHostException;
import java.nio.channels.ByteChannel;
import java.nio.channels.ServerSocketChannel;
import java.util.Objects;
import static java.net.StandardProtocolFamily.INET;
import static java.net.StandardProtocolFamily.INET6;
import static java.net.StandardProtocolFamily.UNIX;
public class SocketHelper {
public static void checkFamily(StandardProtocolFamily family, SocketAddress address) {
Objects.requireNonNull(family);
Objects.requireNonNull(address);
if (address instanceof InetSocketAddress) {
InetSocketAddress isa = (InetSocketAddress) address;
InetAddress ia = isa.getAddress();
if (ia != null
&& !(ia instanceof Inet4Address && family == INET)
&& !(ia instanceof Inet6Address && family == INET6)) {
throw new IllegalArgumentException("Socket address '" + address + "' does not match required family '" + family + "'");
}
} else if (address instanceof UnixDomainSocketAddress) {
if (family != UNIX) {
throw new IllegalArgumentException("Socket address '" + address + "' does not match required family '" + family + "'");
}
} else {
throw new IllegalArgumentException("Socket address '" + address + "' does not match required family '" + family + "'");
}
}
public static SocketAddress socketAddressFromString(String str) {
if (str.startsWith("inet:")) {
String s = str.substring("inet:".length());
int ic = s.lastIndexOf(':');
String ia = s.substring(0, ic);
int is = ia.indexOf('/');
String h = ia.substring(0, is);
String a = ia.substring(is + 1);
String p = s.substring(ic + 1);
InetAddress addr;
if ("<unresolved>".equals(a)) {
return InetSocketAddress.createUnresolved(h, Integer.parseInt(p));
} else {
if (a.indexOf('.') > 0) {
String[] as = a.split("\\.");
if (as.length != 4) {
throw new IllegalArgumentException("Unsupported address: " + str);
}
byte[] ab = new byte[4];
for (int i = 0; i < 4; i++) {
ab[i] = (byte) Integer.parseInt(as[i]);
}
try {
addr = InetAddress.getByAddress(h.isEmpty() ? null : h, ab);
} catch (UnknownHostException e) {
throw new IllegalArgumentException("Unsupported address: " + str, e);
}
} else {
throw new IllegalArgumentException("Unsupported address: " + str);
}
return new InetSocketAddress(addr, Integer.parseInt(p));
}
} else if (str.startsWith("unix:")) {
return UnixDomainSocketAddress.of(str.substring("unix:".length()));
} else {
throw new IllegalArgumentException("Unsupported address: " + str);
}
}
public static String socketAddressToString(SocketAddress address) {
if (address instanceof InetSocketAddress) {
InetSocketAddress isa = (InetSocketAddress) address;
String host = isa.getHostString();
InetAddress addr = isa.getAddress();
int port = isa.getPort();
String formatted;
if (addr == null) {
formatted = host + "/<unresolved>";
} else {
formatted = addr.toString();
if (addr instanceof Inet6Address) {
int i = formatted.lastIndexOf("/");
formatted = formatted.substring(0, i + 1) + "[" + formatted.substring(i + 1) + "]";
}
}
return "inet:" + formatted + ":" + port;
} else if (address instanceof UnixDomainSocketAddress) {
return "unix:" + address;
} else {
throw new IllegalArgumentException("Unsupported address: " + address);
}
}
public static ServerSocketChannel openServerSocket(StandardProtocolFamily family) throws IOException {
return ServerSocketChannel.open(family).bind(getLoopbackAddress(family), 0);
}
private static SocketAddress getLoopbackAddress(StandardProtocolFamily family) {
try {
Objects.requireNonNull(family);
switch (family) {
case INET:
return new InetSocketAddress(Inet4Address.getByAddress(new byte[] { 127, 0, 0, 1 }), 0);
case INET6:
return new InetSocketAddress(Inet6Address.getByAddress(new byte[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1 }), 0);
case UNIX:
return null;
default:
throw new IllegalArgumentException("Unsupported family: " + family);
}
} catch (UnknownHostException e) {
throw new IllegalArgumentException("Unsupported family: " + family, e);
}
}
public static ByteChannel wrapChannel(ByteChannel channel) {
return new ByteChannelWrapper(channel);
}
}

View File

@@ -0,0 +1,2 @@
Manifest-Version: 1.0
Multi-Release: true

View File

@@ -38,6 +38,8 @@ public class IpcSyncContextTest {
@BeforeAll
static void setup() {
System.setProperty(IpcServer.IDLE_TIMEOUT_PROP, "5");
System.setProperty(IpcServer.FAMILY_PROP, "INET");
System.setProperty(IpcServer.NO_NATIVE_PROP, "true");
}
@AfterAll

View File

@@ -0,0 +1,69 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.mvndaemon.mvnd.sync;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.StandardProtocolFamily;
import java.net.UnknownHostException;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class SocketHelperTest {
@Test
void testIpv4NullHost() throws UnknownHostException {
InetSocketAddress i4a = new InetSocketAddress(
InetAddress.getByAddress(null, new byte[] { (byte) 192, (byte) 168, 0, 1 }), 8080);
assertEquals("inet:/192.168.0.1:8080", SocketHelper.socketAddressToString(i4a));
assertEquals(i4a, SocketHelper.socketAddressFromString("inet:/192.168.0.1:8080"));
}
@Test
void testIpv4DummyHost() throws UnknownHostException {
InetSocketAddress i4a = new InetSocketAddress(
InetAddress.getByAddress("dummy.org", new byte[] { (byte) 192, (byte) 168, 0, 1 }), 8080);
assertEquals("inet:dummy.org/192.168.0.1:8080", SocketHelper.socketAddressToString(i4a));
assertEquals(i4a, SocketHelper.socketAddressFromString("inet:dummy.org/192.168.0.1:8080"));
}
@Test
void testIpv4Loopback() throws UnknownHostException {
InetSocketAddress i4a = new InetSocketAddress(8080);
assertEquals("inet:0.0.0.0/0.0.0.0:8080", SocketHelper.socketAddressToString(i4a));
assertEquals(i4a, SocketHelper.socketAddressFromString("inet:0.0.0.0/0.0.0.0:8080"));
}
@Test
void testIpv4Unresolved() throws UnknownHostException {
InetSocketAddress i4a = InetSocketAddress.createUnresolved("google.com", 8080);
assertEquals("inet:google.com/<unresolved>:8080", SocketHelper.socketAddressToString(i4a));
assertEquals(i4a, SocketHelper.socketAddressFromString("inet:google.com/<unresolved>:8080"));
}
@Test
void testCheckInetAddress() {
String family = "INET";
String address = "inet:/127.0.0.1:8192";
SocketHelper.checkFamily(StandardProtocolFamily.valueOf(family),
SocketHelper.socketAddressFromString(address));
}
}