修正代码错误

This commit is contained in:
inrgihc
2024-01-13 19:55:31 +08:00
parent 74d0f25d29
commit e7ed5dbf6e
8 changed files with 173 additions and 28 deletions

View File

@@ -0,0 +1,92 @@
// Copyright tang. All rights reserved.
// https://gitee.com/inrgihc/dbswitch
//
// Use of this source code is governed by a BSD-style license
//
// Author: tang (inrgihc@126.com)
// Date : 2020/1/2
// Location: beijing , china
/////////////////////////////////////////////////////////////
package com.gitee.dbswitch.common.util;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.util.URLUtil;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.file.Paths;
import java.security.CodeSource;
import java.security.ProtectionDomain;
import java.util.Enumeration;
import java.util.jar.JarEntry;
import java.util.jar.JarFile;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
/**
* 获取项目的版本号工具类
*/
@Slf4j
@UtilityClass
public final class PomVersionUtils {
private static final String PREFIX = "version=";
public static String getProjectVersion() {
Class<?> clazz = PomVersionUtils.class;
String resourcePath = clazz.getResource("").toString();
if (resourcePath.startsWith("file:")) {
return getProjectVersionFromFile(resourcePath);
} else if (resourcePath.startsWith("jar:")) {
return getProjectVersionFromJar(clazz);
} else {
return null;
}
}
private static String getProjectVersionFromFile(String classPath) {
String basePath = classPath.substring(0, classPath.indexOf("/classes/"));
basePath = URLUtil.decode(FileUtil.normalize(basePath));
File propertiesFile = Paths.get(basePath, "maven-archiver", "pom.properties").toFile();
if (propertiesFile.exists()) {
return extractPomVersion(FileUtil.getInputStream(propertiesFile));
}
return null;
}
private static String getProjectVersionFromJar(Class<?> clazz) {
ProtectionDomain protectionDomain = clazz.getProtectionDomain();
CodeSource codeSource = protectionDomain.getCodeSource();
try (JarFile jarFile = new JarFile(codeSource.getLocation().getPath())) {
Enumeration<JarEntry> entries = jarFile.entries();
while (entries.hasMoreElements()) {
JarEntry entry = entries.nextElement();
if (entry.getName().startsWith("META-INF/maven/") && entry.getName().endsWith("/pom.properties")) {
return extractPomVersion(jarFile.getInputStream(entry));
}
}
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
private static String extractPomVersion(InputStream inputStream) {
String line;
try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream))) {
while ((line = bufferedReader.readLine()) != null) {
if (line.startsWith(PREFIX)) {
return line.substring(PREFIX.length());
}
}
} catch (IOException e) {
}
return null;
}
public static void main(String[] args) {
System.out.println(getProjectVersion());
}
}

View File

@@ -98,10 +98,5 @@ public class WrapCommonDataSource implements CloseableDataSource {
}
public void close() {
try {
urlClassLoader.close();
} catch (IOException e) {
log.warn(e.getMessage(), e);
}
}
}

View File

@@ -30,4 +30,5 @@ public class WriterTaskParam implements TaskParam {
private MemChannel memChannel;
private RobotReader robotReader;
private boolean concurrentWrite;
}

View File

@@ -50,7 +50,13 @@ public class WriterTaskThread extends TaskProcessor<WriterTaskResult> {
if (null != elem) {
try {
Long ret = Long.valueOf(elem.getArg2().size());
elem.getHandler().apply(elem.getArg1(), elem.getArg2(), log);
if (this.taskParam.isConcurrentWrite()) {
elem.getHandler().apply(elem.getArg1(), elem.getArg2(), log);
} else {
synchronized (this.taskParam) {
elem.getHandler().apply(elem.getArg1(), elem.getArg2(), log);
}
}
Long count = taskResult.getPerf().get(elem.getTableNameMapString());
Long total = ret + Optional.ofNullable(count).orElse(0L);
taskResult.getPerf().put(elem.getTableNameMapString(), total);

View File

@@ -38,16 +38,19 @@ public class DefaultWriterRobot extends RobotWriter<WriterTaskResult> {
private final MdcKeyValue mdcKeyValue;
private final RobotReader robotReader;
private final int writeThreadNum; // 4 <= writeThreadNum <= 8
private final int writeThreadNum;
private final boolean supportConcurrentWrite;
private AsyncTaskExecutor threadExecutor;
private List<Supplier> writeTaskThreads;
private List<CompletableFuture> futures;
public DefaultWriterRobot(MdcKeyValue mdcKeyValue, RobotReader robotReader, int writeThreadNum) {
public DefaultWriterRobot(MdcKeyValue mdcKeyValue, RobotReader robotReader, int writeThreadNum,
boolean concurrentWrite) {
ExamineUtils.checkArgument(writeThreadNum > 0, "writeThreadNum(%s) must >0 ", writeThreadNum);
this.mdcKeyValue = mdcKeyValue;
this.robotReader = robotReader;
this.writeThreadNum = writeThreadNum;
this.supportConcurrentWrite = concurrentWrite;
}
@Override
@@ -64,6 +67,7 @@ public class DefaultWriterRobot extends RobotWriter<WriterTaskResult> {
.builder()
.robotReader(robotReader)
.memChannel(robotReader.getChannel())
.concurrentWrite(supportConcurrentWrite)
.build();
for (int i = 0; i < writeThreadNum; ++i) {
if (Objects.nonNull(mdcKeyValue)) {

View File

@@ -9,7 +9,6 @@
/////////////////////////////////////////////////////////////
package com.gitee.dbswitch.data.service;
import cn.hutool.system.SystemUtil;
import com.gitee.dbswitch.common.entity.CloseableDataSource;
import com.gitee.dbswitch.common.entity.LoggingRunnable;
import com.gitee.dbswitch.common.entity.MdcKeyValue;
@@ -20,6 +19,7 @@ import com.gitee.dbswitch.core.robot.RobotWriter;
import com.gitee.dbswitch.data.config.DbswichPropertiesConfiguration;
import com.gitee.dbswitch.data.entity.GlobalParamConfigProperties;
import com.gitee.dbswitch.data.util.DataSourceUtils;
import com.gitee.dbswitch.data.util.MachineUtils;
import java.sql.Connection;
import java.sql.Statement;
import java.util.ArrayList;
@@ -106,22 +106,23 @@ public class MigrationService {
* 执行主逻辑
*/
private void doRun() {
StopWatch watch = new StopWatch();
watch.start();
log.info("dbswitch data service is started....");
log.info(SystemUtil.getOsInfo().toString());
log.info(SystemUtil.getJvmInfo().toString());
log.info(SystemUtil.getRuntimeInfo().toString());
log.info("Task run environment information:\n{}", MachineUtils.getPrintInformation());
//log.info("input configuration \n{}", JsonUtils.toJsonString(configuration));
GlobalParamConfigProperties globalParam = configuration.getConfig();
int maxQueueSize = globalParam.getChannelQueueSize();
int writeThreadNum = globalParam.getWriteThreadNum();
boolean concurrentWrite = DataSourceUtils.supportConcurrentWrite(configuration.getTarget());
StopWatch watch = new StopWatch();
watch.start();
AbstractBatchExchanger exchanger = new DefaultBatchExchanger(readExecutor, writeExecutor, maxQueueSize, perfStats);
try (CloseableDataSource targetDataSource = DataSourceUtils.createTargetDataSource(configuration.getTarget())) {
try (CloseableDataSource sourceDataSource = DataSourceUtils.createSourceDataSource(configuration.getSource())) {
robotReader = new DefaultReaderRobot(mdcKeyValue, configuration, sourceDataSource, targetDataSource);
robotWriter = new DefaultWriterRobot(mdcKeyValue, robotReader, globalParam.getWriteThreadNum());
robotWriter = new DefaultWriterRobot(mdcKeyValue, robotReader, writeThreadNum, concurrentWrite);
boolean success = executeSqlScripts(targetDataSource, configuration.getTarget().getBeforeSqlScripts());
try {
exchanger.exchange(robotReader, robotWriter);

View File

@@ -45,8 +45,7 @@ public final class DataSourceUtils {
public static final int MAX_THREAD_COUNT = 10;
public static final int MAX_TIMEOUT_MS = 60000;
private static Object mutexForMap = new Object();
private static Map<String, URLClassLoader> classLoaderMap = new ConcurrentHashMap<>();
private static final Map<String, URLClassLoader> classLoaderMap = new ConcurrentHashMap<>();
/**
* 创建于指定数据库连接描述符的连接池
@@ -115,13 +114,8 @@ public final class DataSourceUtils {
} else if (!ds.getJdbcUrl().contains("jdbc:jest://")) {
ds.setConnectionTestQuery("SELECT 1");
}
if (properties.getDriverClassName().contains("sqlite")) {
ds.setMaximumPoolSize(1);
ds.setMinimumIdle(1);
} else {
ds.setMaximumPoolSize(MAX_THREAD_COUNT);
ds.setMinimumIdle(MAX_THREAD_COUNT);
}
ds.setMaximumPoolSize(MAX_THREAD_COUNT);
ds.setMinimumIdle(MAX_THREAD_COUNT);
ds.setMaxLifetime(properties.getMaxLifeTime());
ds.setConnectionTimeout(properties.getConnectionTimeout());
ds.setIdleTimeout(MAX_TIMEOUT_MS);
@@ -173,6 +167,11 @@ public final class DataSourceUtils {
return new WrapCommonDataSource(dataSource, urlClassLoader);
}
public static boolean supportConcurrentWrite(TargetDataSourceProperties properties) {
return !properties.getDriverClassName().contains("sqlite")
&& !properties.getUrl().contains("jdbc:sqlite:");
}
private static InvisibleDataSource createInvisibleDataSource(
ClassLoader cl,
String jdbcUrl,
@@ -211,13 +210,14 @@ public final class DataSourceUtils {
}
}
private static URLClassLoader getOrCreateClassLoader(String path, ClassLoader parent) {
private static URLClassLoader getOrCreateClassLoader(
String path, ClassLoader parent) {
URLClassLoader urlClassLoader = classLoaderMap.get(path);
if (null == urlClassLoader) {
synchronized (mutexForMap) {
synchronized (DataSourceUtils.class) {
urlClassLoader = classLoaderMap.get(path);
if (null == urlClassLoader) {
log.info("Create Jar ClassLoader from path: {}", path);
log.info("Create jar classLoader from path: {}", path);
urlClassLoader = new JarClassLoader(path, parent);
classLoaderMap.put(path, urlClassLoader);
}

View File

@@ -0,0 +1,46 @@
// Copyright tang. All rights reserved.
// https://gitee.com/inrgihc/dbswitch
//
// Use of this source code is governed by a BSD-style license
//
// Author: tang (inrgihc@126.com)
// Date : 2020/1/2
// Location: beijing , china
/////////////////////////////////////////////////////////////
package com.gitee.dbswitch.data.util;
import cn.hutool.core.io.FileUtil;
import cn.hutool.system.JvmInfo;
import cn.hutool.system.OsInfo;
import cn.hutool.system.RuntimeInfo;
import cn.hutool.system.SystemUtil;
import com.gitee.dbswitch.common.util.PomVersionUtils;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@UtilityClass
public final class MachineUtils {
public static String getPrintInformation() {
OsInfo osInfo = SystemUtil.getOsInfo();
JvmInfo jvmInfo = SystemUtil.getJvmInfo();
RuntimeInfo runtimeInfo = SystemUtil.getRuntimeInfo();
StringBuilder sb = new StringBuilder();
sb.append("\tOS Arch:\t" + osInfo.getArch());
sb.append("\n\tOS Name:\t" + osInfo.getName());
sb.append("\n\tOS Version:\t" + osInfo.getVersion());
sb.append("\n\tJavaVM Name:\t" + jvmInfo.getName());
sb.append("\n\tJavaVM Version:\t" + jvmInfo.getVersion());
sb.append("\n\tJavaVM Vendor:\t" + jvmInfo.getVendor());
sb.append("\n\tJavaVM Info:\t" + jvmInfo.getInfo());
sb.append("\n\tSystem Max Memory: \t" + FileUtil.readableFileSize(runtimeInfo.getMaxMemory()));
sb.append("\n\tSystem Total Memory: \t" + FileUtil.readableFileSize(runtimeInfo.getTotalMemory()));
sb.append("\n\tSystem Free Memory: \t" + FileUtil.readableFileSize(runtimeInfo.getFreeMemory()));
sb.append("\n\tSystem Usable Memory: \t" + FileUtil.readableFileSize(runtimeInfo.getUsableMemory()));
sb.append("\n\tRelease dbswitch Version: \t" + PomVersionUtils.getProjectVersion());
return sb.toString();
}
}