From e7ed5dbf6e62441ed56d5ccd98707aa931102b49 Mon Sep 17 00:00:00 2001 From: inrgihc Date: Sat, 13 Jan 2024 19:55:31 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=AD=A3=E4=BB=A3=E7=A0=81=E9=94=99?= =?UTF-8?q?=E8=AF=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dbswitch/common/util/PomVersionUtils.java | 92 +++++++++++++++++++ .../data/domain/WrapCommonDataSource.java | 5 - .../dbswitch/data/domain/WriterTaskParam.java | 1 + .../data/handler/WriterTaskThread.java | 8 +- .../data/service/DefaultWriterRobot.java | 8 +- .../data/service/MigrationService.java | 17 ++-- .../dbswitch/data/util/DataSourceUtils.java | 24 ++--- .../dbswitch/data/util/MachineUtils.java | 46 ++++++++++ 8 files changed, 173 insertions(+), 28 deletions(-) create mode 100644 dbswitch-common/src/main/java/com/gitee/dbswitch/common/util/PomVersionUtils.java create mode 100644 dbswitch-data/src/main/java/com/gitee/dbswitch/data/util/MachineUtils.java diff --git a/dbswitch-common/src/main/java/com/gitee/dbswitch/common/util/PomVersionUtils.java b/dbswitch-common/src/main/java/com/gitee/dbswitch/common/util/PomVersionUtils.java new file mode 100644 index 00000000..60180d2c --- /dev/null +++ b/dbswitch-common/src/main/java/com/gitee/dbswitch/common/util/PomVersionUtils.java @@ -0,0 +1,92 @@ +// Copyright tang. All rights reserved. +// https://gitee.com/inrgihc/dbswitch +// +// Use of this source code is governed by a BSD-style license +// +// Author: tang (inrgihc@126.com) +// Date : 2020/1/2 +// Location: beijing , china +///////////////////////////////////////////////////////////// +package com.gitee.dbswitch.common.util; + +import cn.hutool.core.io.FileUtil; +import cn.hutool.core.util.URLUtil; +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.file.Paths; +import java.security.CodeSource; +import java.security.ProtectionDomain; +import java.util.Enumeration; +import java.util.jar.JarEntry; +import java.util.jar.JarFile; +import lombok.experimental.UtilityClass; +import lombok.extern.slf4j.Slf4j; + +/** + * 获取项目的版本号工具类 + */ +@Slf4j +@UtilityClass +public final class PomVersionUtils { + + private static final String PREFIX = "version="; + + public static String getProjectVersion() { + Class clazz = PomVersionUtils.class; + String resourcePath = clazz.getResource("").toString(); + if (resourcePath.startsWith("file:")) { + return getProjectVersionFromFile(resourcePath); + } else if (resourcePath.startsWith("jar:")) { + return getProjectVersionFromJar(clazz); + } else { + return null; + } + } + + private static String getProjectVersionFromFile(String classPath) { + String basePath = classPath.substring(0, classPath.indexOf("/classes/")); + basePath = URLUtil.decode(FileUtil.normalize(basePath)); + File propertiesFile = Paths.get(basePath, "maven-archiver", "pom.properties").toFile(); + if (propertiesFile.exists()) { + return extractPomVersion(FileUtil.getInputStream(propertiesFile)); + } + return null; + } + + private static String getProjectVersionFromJar(Class clazz) { + ProtectionDomain protectionDomain = clazz.getProtectionDomain(); + CodeSource codeSource = protectionDomain.getCodeSource(); + try (JarFile jarFile = new JarFile(codeSource.getLocation().getPath())) { + Enumeration entries = jarFile.entries(); + while (entries.hasMoreElements()) { + JarEntry entry = entries.nextElement(); + if (entry.getName().startsWith("META-INF/maven/") && entry.getName().endsWith("/pom.properties")) { + return extractPomVersion(jarFile.getInputStream(entry)); + } + } + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + + private static String extractPomVersion(InputStream inputStream) { + String line; + try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream))) { + while ((line = bufferedReader.readLine()) != null) { + if (line.startsWith(PREFIX)) { + return line.substring(PREFIX.length()); + } + } + } catch (IOException e) { + } + return null; + } + + public static void main(String[] args) { + System.out.println(getProjectVersion()); + } +} diff --git a/dbswitch-data/src/main/java/com/gitee/dbswitch/data/domain/WrapCommonDataSource.java b/dbswitch-data/src/main/java/com/gitee/dbswitch/data/domain/WrapCommonDataSource.java index 61d0d615..d405d124 100644 --- a/dbswitch-data/src/main/java/com/gitee/dbswitch/data/domain/WrapCommonDataSource.java +++ b/dbswitch-data/src/main/java/com/gitee/dbswitch/data/domain/WrapCommonDataSource.java @@ -98,10 +98,5 @@ public class WrapCommonDataSource implements CloseableDataSource { } public void close() { - try { - urlClassLoader.close(); - } catch (IOException e) { - log.warn(e.getMessage(), e); - } } } diff --git a/dbswitch-data/src/main/java/com/gitee/dbswitch/data/domain/WriterTaskParam.java b/dbswitch-data/src/main/java/com/gitee/dbswitch/data/domain/WriterTaskParam.java index 17222629..eca992b5 100644 --- a/dbswitch-data/src/main/java/com/gitee/dbswitch/data/domain/WriterTaskParam.java +++ b/dbswitch-data/src/main/java/com/gitee/dbswitch/data/domain/WriterTaskParam.java @@ -30,4 +30,5 @@ public class WriterTaskParam implements TaskParam { private MemChannel memChannel; private RobotReader robotReader; + private boolean concurrentWrite; } diff --git a/dbswitch-data/src/main/java/com/gitee/dbswitch/data/handler/WriterTaskThread.java b/dbswitch-data/src/main/java/com/gitee/dbswitch/data/handler/WriterTaskThread.java index f46718a7..91f34903 100644 --- a/dbswitch-data/src/main/java/com/gitee/dbswitch/data/handler/WriterTaskThread.java +++ b/dbswitch-data/src/main/java/com/gitee/dbswitch/data/handler/WriterTaskThread.java @@ -50,7 +50,13 @@ public class WriterTaskThread extends TaskProcessor { if (null != elem) { try { Long ret = Long.valueOf(elem.getArg2().size()); - elem.getHandler().apply(elem.getArg1(), elem.getArg2(), log); + if (this.taskParam.isConcurrentWrite()) { + elem.getHandler().apply(elem.getArg1(), elem.getArg2(), log); + } else { + synchronized (this.taskParam) { + elem.getHandler().apply(elem.getArg1(), elem.getArg2(), log); + } + } Long count = taskResult.getPerf().get(elem.getTableNameMapString()); Long total = ret + Optional.ofNullable(count).orElse(0L); taskResult.getPerf().put(elem.getTableNameMapString(), total); diff --git a/dbswitch-data/src/main/java/com/gitee/dbswitch/data/service/DefaultWriterRobot.java b/dbswitch-data/src/main/java/com/gitee/dbswitch/data/service/DefaultWriterRobot.java index 5d0c5d51..3d390ef5 100644 --- a/dbswitch-data/src/main/java/com/gitee/dbswitch/data/service/DefaultWriterRobot.java +++ b/dbswitch-data/src/main/java/com/gitee/dbswitch/data/service/DefaultWriterRobot.java @@ -38,16 +38,19 @@ public class DefaultWriterRobot extends RobotWriter { private final MdcKeyValue mdcKeyValue; private final RobotReader robotReader; - private final int writeThreadNum; // 4 <= writeThreadNum <= 8 + private final int writeThreadNum; + private final boolean supportConcurrentWrite; private AsyncTaskExecutor threadExecutor; private List writeTaskThreads; private List futures; - public DefaultWriterRobot(MdcKeyValue mdcKeyValue, RobotReader robotReader, int writeThreadNum) { + public DefaultWriterRobot(MdcKeyValue mdcKeyValue, RobotReader robotReader, int writeThreadNum, + boolean concurrentWrite) { ExamineUtils.checkArgument(writeThreadNum > 0, "writeThreadNum(%s) must >0 ", writeThreadNum); this.mdcKeyValue = mdcKeyValue; this.robotReader = robotReader; this.writeThreadNum = writeThreadNum; + this.supportConcurrentWrite = concurrentWrite; } @Override @@ -64,6 +67,7 @@ public class DefaultWriterRobot extends RobotWriter { .builder() .robotReader(robotReader) .memChannel(robotReader.getChannel()) + .concurrentWrite(supportConcurrentWrite) .build(); for (int i = 0; i < writeThreadNum; ++i) { if (Objects.nonNull(mdcKeyValue)) { diff --git a/dbswitch-data/src/main/java/com/gitee/dbswitch/data/service/MigrationService.java b/dbswitch-data/src/main/java/com/gitee/dbswitch/data/service/MigrationService.java index bb409114..2860338a 100644 --- a/dbswitch-data/src/main/java/com/gitee/dbswitch/data/service/MigrationService.java +++ b/dbswitch-data/src/main/java/com/gitee/dbswitch/data/service/MigrationService.java @@ -9,7 +9,6 @@ ///////////////////////////////////////////////////////////// package com.gitee.dbswitch.data.service; -import cn.hutool.system.SystemUtil; import com.gitee.dbswitch.common.entity.CloseableDataSource; import com.gitee.dbswitch.common.entity.LoggingRunnable; import com.gitee.dbswitch.common.entity.MdcKeyValue; @@ -20,6 +19,7 @@ import com.gitee.dbswitch.core.robot.RobotWriter; import com.gitee.dbswitch.data.config.DbswichPropertiesConfiguration; import com.gitee.dbswitch.data.entity.GlobalParamConfigProperties; import com.gitee.dbswitch.data.util.DataSourceUtils; +import com.gitee.dbswitch.data.util.MachineUtils; import java.sql.Connection; import java.sql.Statement; import java.util.ArrayList; @@ -106,22 +106,23 @@ public class MigrationService { * 执行主逻辑 */ private void doRun() { - StopWatch watch = new StopWatch(); - watch.start(); - log.info("dbswitch data service is started...."); - log.info(SystemUtil.getOsInfo().toString()); - log.info(SystemUtil.getJvmInfo().toString()); - log.info(SystemUtil.getRuntimeInfo().toString()); + log.info("Task run environment information:\n{}", MachineUtils.getPrintInformation()); //log.info("input configuration \n{}", JsonUtils.toJsonString(configuration)); GlobalParamConfigProperties globalParam = configuration.getConfig(); int maxQueueSize = globalParam.getChannelQueueSize(); + int writeThreadNum = globalParam.getWriteThreadNum(); + boolean concurrentWrite = DataSourceUtils.supportConcurrentWrite(configuration.getTarget()); + + StopWatch watch = new StopWatch(); + watch.start(); + AbstractBatchExchanger exchanger = new DefaultBatchExchanger(readExecutor, writeExecutor, maxQueueSize, perfStats); try (CloseableDataSource targetDataSource = DataSourceUtils.createTargetDataSource(configuration.getTarget())) { try (CloseableDataSource sourceDataSource = DataSourceUtils.createSourceDataSource(configuration.getSource())) { robotReader = new DefaultReaderRobot(mdcKeyValue, configuration, sourceDataSource, targetDataSource); - robotWriter = new DefaultWriterRobot(mdcKeyValue, robotReader, globalParam.getWriteThreadNum()); + robotWriter = new DefaultWriterRobot(mdcKeyValue, robotReader, writeThreadNum, concurrentWrite); boolean success = executeSqlScripts(targetDataSource, configuration.getTarget().getBeforeSqlScripts()); try { exchanger.exchange(robotReader, robotWriter); diff --git a/dbswitch-data/src/main/java/com/gitee/dbswitch/data/util/DataSourceUtils.java b/dbswitch-data/src/main/java/com/gitee/dbswitch/data/util/DataSourceUtils.java index a70ccba6..1d4c506d 100644 --- a/dbswitch-data/src/main/java/com/gitee/dbswitch/data/util/DataSourceUtils.java +++ b/dbswitch-data/src/main/java/com/gitee/dbswitch/data/util/DataSourceUtils.java @@ -45,8 +45,7 @@ public final class DataSourceUtils { public static final int MAX_THREAD_COUNT = 10; public static final int MAX_TIMEOUT_MS = 60000; - private static Object mutexForMap = new Object(); - private static Map classLoaderMap = new ConcurrentHashMap<>(); + private static final Map classLoaderMap = new ConcurrentHashMap<>(); /** * 创建于指定数据库连接描述符的连接池 @@ -115,13 +114,8 @@ public final class DataSourceUtils { } else if (!ds.getJdbcUrl().contains("jdbc:jest://")) { ds.setConnectionTestQuery("SELECT 1"); } - if (properties.getDriverClassName().contains("sqlite")) { - ds.setMaximumPoolSize(1); - ds.setMinimumIdle(1); - } else { - ds.setMaximumPoolSize(MAX_THREAD_COUNT); - ds.setMinimumIdle(MAX_THREAD_COUNT); - } + ds.setMaximumPoolSize(MAX_THREAD_COUNT); + ds.setMinimumIdle(MAX_THREAD_COUNT); ds.setMaxLifetime(properties.getMaxLifeTime()); ds.setConnectionTimeout(properties.getConnectionTimeout()); ds.setIdleTimeout(MAX_TIMEOUT_MS); @@ -173,6 +167,11 @@ public final class DataSourceUtils { return new WrapCommonDataSource(dataSource, urlClassLoader); } + public static boolean supportConcurrentWrite(TargetDataSourceProperties properties) { + return !properties.getDriverClassName().contains("sqlite") + && !properties.getUrl().contains("jdbc:sqlite:"); + } + private static InvisibleDataSource createInvisibleDataSource( ClassLoader cl, String jdbcUrl, @@ -211,13 +210,14 @@ public final class DataSourceUtils { } } - private static URLClassLoader getOrCreateClassLoader(String path, ClassLoader parent) { + private static URLClassLoader getOrCreateClassLoader( + String path, ClassLoader parent) { URLClassLoader urlClassLoader = classLoaderMap.get(path); if (null == urlClassLoader) { - synchronized (mutexForMap) { + synchronized (DataSourceUtils.class) { urlClassLoader = classLoaderMap.get(path); if (null == urlClassLoader) { - log.info("Create Jar ClassLoader from path: {}", path); + log.info("Create jar classLoader from path: {}", path); urlClassLoader = new JarClassLoader(path, parent); classLoaderMap.put(path, urlClassLoader); } diff --git a/dbswitch-data/src/main/java/com/gitee/dbswitch/data/util/MachineUtils.java b/dbswitch-data/src/main/java/com/gitee/dbswitch/data/util/MachineUtils.java new file mode 100644 index 00000000..1b2da9bc --- /dev/null +++ b/dbswitch-data/src/main/java/com/gitee/dbswitch/data/util/MachineUtils.java @@ -0,0 +1,46 @@ +// Copyright tang. All rights reserved. +// https://gitee.com/inrgihc/dbswitch +// +// Use of this source code is governed by a BSD-style license +// +// Author: tang (inrgihc@126.com) +// Date : 2020/1/2 +// Location: beijing , china +///////////////////////////////////////////////////////////// +package com.gitee.dbswitch.data.util; + +import cn.hutool.core.io.FileUtil; +import cn.hutool.system.JvmInfo; +import cn.hutool.system.OsInfo; +import cn.hutool.system.RuntimeInfo; +import cn.hutool.system.SystemUtil; +import com.gitee.dbswitch.common.util.PomVersionUtils; +import lombok.experimental.UtilityClass; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@UtilityClass +public final class MachineUtils { + + public static String getPrintInformation() { + OsInfo osInfo = SystemUtil.getOsInfo(); + JvmInfo jvmInfo = SystemUtil.getJvmInfo(); + RuntimeInfo runtimeInfo = SystemUtil.getRuntimeInfo(); + + StringBuilder sb = new StringBuilder(); + sb.append("\tOS Arch:\t" + osInfo.getArch()); + sb.append("\n\tOS Name:\t" + osInfo.getName()); + sb.append("\n\tOS Version:\t" + osInfo.getVersion()); + sb.append("\n\tJavaVM Name:\t" + jvmInfo.getName()); + sb.append("\n\tJavaVM Version:\t" + jvmInfo.getVersion()); + sb.append("\n\tJavaVM Vendor:\t" + jvmInfo.getVendor()); + sb.append("\n\tJavaVM Info:\t" + jvmInfo.getInfo()); + sb.append("\n\tSystem Max Memory: \t" + FileUtil.readableFileSize(runtimeInfo.getMaxMemory())); + sb.append("\n\tSystem Total Memory: \t" + FileUtil.readableFileSize(runtimeInfo.getTotalMemory())); + sb.append("\n\tSystem Free Memory: \t" + FileUtil.readableFileSize(runtimeInfo.getFreeMemory())); + sb.append("\n\tSystem Usable Memory: \t" + FileUtil.readableFileSize(runtimeInfo.getUsableMemory())); + sb.append("\n\tRelease dbswitch Version: \t" + PomVersionUtils.getProjectVersion()); + + return sb.toString(); + } +}