代码优化升级

This commit is contained in:
inrgihc
2021-06-09 12:04:11 +08:00
parent 6e8d2cf05e
commit 265d038736
17 changed files with 133 additions and 94 deletions

View File

@@ -5,7 +5,7 @@
<parent>
<groupId>com.gitee</groupId>
<artifactId>dbswitch</artifactId>
<version>1.5.5</version>
<version>1.5.6</version>
</parent>
<artifactId>dbswitch-common</artifactId>

View File

@@ -5,7 +5,7 @@
<parent>
<groupId>com.gitee</groupId>
<artifactId>dbswitch</artifactId>
<version>1.5.5</version>
<version>1.5.6</version>
</parent>
<artifactId>dbswitch-core</artifactId>

View File

@@ -5,7 +5,7 @@
<parent>
<groupId>com.gitee</groupId>
<artifactId>dbswitch</artifactId>
<version>1.5.5</version>
<version>1.5.6</version>
</parent>
<artifactId>dbswitch-data</artifactId>

View File

@@ -11,6 +11,8 @@ package com.gitee.dbswitch.data.config;
import java.util.ArrayList;
import java.util.List;
import lombok.ToString;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
@@ -25,6 +27,7 @@ import lombok.NoArgsConstructor;
*/
@Configuration
@Data
@ToString
@ConfigurationProperties(prefix = "dbswitch", ignoreInvalidFields=false, ignoreUnknownFields = false)
@PropertySource("classpath:config.properties")
public class DbswichProperties {

View File

@@ -0,0 +1,36 @@
// Copyright tang. All rights reserved.
// https://gitee.com/inrgihc/dbswitch
//
// Use of this source code is governed by a BSD-style license
//
// Author: tang (inrgihc@126.com)
// Date : 2020/1/2
// Location: beijing , china
/////////////////////////////////////////////////////////////
package com.gitee.dbswitch.data.domain;
import com.gitee.dbswitch.data.util.BytesUnitUtils;
import lombok.AllArgsConstructor;
import lombok.Data;
/**
* 统计信息
*
* @Author: tang
*/
@Data
@AllArgsConstructor
public class PerfStat {
private Integer index;
private Integer total;
private Integer failure;
private Long bytes;
@Override
public String toString() {
return "Data Source Index: \t" + index + "\n" +
"Total Tables Count: \t" + total + "\n" +
"Failure Tables count: \t" + failure + "\n" +
"Total Transfer Size: \t" + BytesUnitUtils.bytesSizeToHuman(bytes);
}
}

View File

@@ -1,3 +1,12 @@
// Copyright tang. All rights reserved.
// https://gitee.com/inrgihc/dbswitch
//
// Use of this source code is governed by a BSD-style license
//
// Author: tang (inrgihc@126.com)
// Date : 2020/1/2
// Location: beijing , china
/////////////////////////////////////////////////////////////
package com.gitee.dbswitch.data.handler;
import com.carrotsearch.sizeof.RamUsageEstimator;
@@ -31,6 +40,8 @@ import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong;
/**
* 在一个线程内的单表迁移处理逻辑
@@ -38,10 +49,11 @@ import java.util.Map;
* @author tang
*/
@Slf4j
public class MigrationHandler implements Runnable {
public class MigrationHandler implements Callable<Long> {
private final long MAX_CACHE_BYTES_SIZE = 512 * 1024 * 1024;
private int fetchSize = 100;
private TableDescription tableDescription;
private DbswichProperties properties;
private DbswichProperties.SourceDataSourceProperties sourceProperties;
@@ -61,11 +73,15 @@ public class MigrationHandler implements Runnable {
this.sourceMetaDataSerice = new MigrationMetaDataServiceImpl();
this.targetDataSource = tds;
if (sourceProperties.getFetchSize() >= fetchSize) {
fetchSize = sourceProperties.getFetchSize();
}
this.sourceMetaDataSerice.setDatabaseConnection(JdbcTemplateUtils.getDatabaseProduceName(sourceDataSource));
}
@Override
public void run() {
public Long call() {
log.info("Migrate table for {}.{} ", tableDescription.getSchemaName(), tableDescription.getTableName());
JdbcTemplate targetJdbcTemplate = new JdbcTemplate(targetDataSource);
@@ -102,7 +118,7 @@ public class MigrationHandler implements Runnable {
targetJdbcTemplate.execute(sqlCreateTable);
log.info("Execute SQL: \n{}", sqlCreateTable);
this.doFullCoverSynchronize(tableDescription, sourceProperties, sourceDataSource, writer);
return doFullCoverSynchronize(tableDescription, sourceProperties, sourceDataSource, writer);
} else {
// 判断是否具备变化量同步的条件1两端表结构一致且都有一样的主键字段(2)MySQL使用Innodb引擎
if (properties.getTarget().getChangeDataSynch().booleanValue()) {
@@ -118,16 +134,16 @@ public class MigrationHandler implements Runnable {
if (targetDatabaseType == DatabaseTypeEnum.MYSQL
&& !JdbcTemplateUtils.isMysqlInodbStorageEngine(properties.getTarget().getTargetSchema(),
sourceProperties.getPrefixTable() + tableDescription.getTableName(), targetDataSource)) {
this.doFullCoverSynchronize(tableDescription, sourceProperties, sourceDataSource, writer);
return doFullCoverSynchronize(tableDescription, sourceProperties, sourceDataSource, writer);
} else {
List<String> fields = mds.queryTableColumnName(tableDescription.getSchemaName(), tableDescription.getTableName());
this.doIncreaseSynchronize(tableDescription, sourceProperties, sourceDataSource, writer, pks1, fields);
return doIncreaseSynchronize(tableDescription, sourceProperties, sourceDataSource, writer, pks1, fields);
}
} else {
this.doFullCoverSynchronize(tableDescription, sourceProperties, sourceDataSource, writer);
return doFullCoverSynchronize(tableDescription, sourceProperties, sourceDataSource, writer);
}
} else {
this.doFullCoverSynchronize(tableDescription, sourceProperties, sourceDataSource, writer);
return doFullCoverSynchronize(tableDescription, sourceProperties, sourceDataSource, writer);
}
}
}
@@ -138,14 +154,10 @@ public class MigrationHandler implements Runnable {
* @param tableDescription 表的描述信息,可能是视图表,可能是物理表
* @param writer 目的端的写入器
*/
private void doFullCoverSynchronize(TableDescription tableDescription,
private Long doFullCoverSynchronize(TableDescription tableDescription,
DbswichProperties.SourceDataSourceProperties sourceProperties,
HikariDataSource sourceDataSource,
IDatabaseWriter writer) {
int fetchSize = 100;
if (sourceProperties.getFetchSize() >= fetchSize) {
fetchSize = sourceProperties.getFetchSize();
}
final int BATCH_SIZE = fetchSize;
// 准备目的端的数据写入操作
@@ -166,15 +178,13 @@ public class MigrationHandler implements Runnable {
fullTableName);
List<String> fields = new ArrayList<>(columnMetaData.keySet());
StatementResultSet srs = sourceOperator.queryTableData(tableDescription.getSchemaName(),
tableDescription.getTableName(), fields);
StatementResultSet srs = sourceOperator.queryTableData(tableDescription.getSchemaName(), tableDescription.getTableName(), fields);
List<Object[]> cache = new LinkedList<>();
long cacheBytes = 0;
long totalCount = 0;
long totalBytes = 0;
try {
ResultSet rs = srs.getResultset();
try (ResultSet rs = srs.getResultset();) {
while (rs.next()) {
Object[] record = new Object[fields.size()];
for (int i = 1; i <= fields.size(); ++i) {
@@ -192,7 +202,7 @@ public class MigrationHandler implements Runnable {
if (cache.size() >= BATCH_SIZE || cacheBytes >= MAX_CACHE_BYTES_SIZE) {
long ret = writer.write(fields, cache);
log.info("[FullCoverSynch] handle table [{}] data count: {}, batch bytes sie: {}", fullTableName, ret, BytesUnitUtils.bytesSizeToHuman(cacheBytes));
log.info("[FullCoverSynch] handle table [{}] data count: {}, the batch bytes sie: {}", fullTableName, ret, BytesUnitUtils.bytesSizeToHuman(cacheBytes));
cache.clear();
totalBytes += cacheBytes;
cacheBytes = 0;
@@ -206,12 +216,14 @@ public class MigrationHandler implements Runnable {
totalBytes += cacheBytes;
}
log.info("[FullCoverSynch] handle table [{}] total data count:{} ,total bytes={}", fullTableName, totalCount, BytesUnitUtils.bytesSizeToHuman(totalBytes));
log.info("[FullCoverSynch] handle table [{}] total data count:{}, total bytes={}", fullTableName, totalCount, BytesUnitUtils.bytesSizeToHuman(totalBytes));
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
srs.close();
}
return totalBytes;
}
/**
@@ -220,13 +232,9 @@ public class MigrationHandler implements Runnable {
* @param tableDescription 表的描述信息,这里只能是物理表
* @param writer 目的端的写入器
*/
private void doIncreaseSynchronize(TableDescription tableDescription,
private Long doIncreaseSynchronize(TableDescription tableDescription,
DbswichProperties.SourceDataSourceProperties sourceProperties, HikariDataSource sourceDataSource,
IDatabaseWriter writer, List<String> pks, List<String> fields) {
int fetchSize = 100;
if (sourceProperties.getFetchSize() >= fetchSize) {
fetchSize = sourceProperties.getFetchSize();
}
final int BATCH_SIZE = fetchSize;
DatabaseTypeEnum sourceDatabaseType = JdbcTemplateUtils.getDatabaseProduceName(sourceDataSource);
@@ -252,13 +260,15 @@ public class MigrationHandler implements Runnable {
calculator.setRecordIdentical(false);
calculator.setCheckJdbcType(false);
AtomicLong totalBytes=new AtomicLong(0);
// 执行实际的变化同步过程
calculator.executeCalculate(param, new IDatabaseRowHandler() {
private long countInsert = 0;
private long countUpdate = 0;
private long countDelete = 0;
private long count = 0;
private long countTotal = 0;
private long cacheBytes = 0;
private List<Object[]> cacheInsert = new LinkedList<Object[]>();
private List<Object[]> cacheUpdate = new LinkedList<Object[]>();
@@ -278,7 +288,8 @@ public class MigrationHandler implements Runnable {
}
cacheBytes += RamUsageEstimator.sizeOf(record);
count++;
totalBytes.addAndGet(cacheBytes);
countTotal++;
checkFull(fields);
}
@@ -321,7 +332,7 @@ public class MigrationHandler implements Runnable {
doUpdate(fields);
}
log.info("[IncreaseSynch] Handle table [{}] total count: {}, Insert:{},Update:{},Delete:{} ", fullTableName, count,
log.info("[IncreaseSynch] Handle table [{}] total count: {}, Insert:{},Update:{},Delete:{} ", fullTableName, countTotal,
countInsert, countUpdate, countDelete);
}
@@ -344,6 +355,8 @@ public class MigrationHandler implements Runnable {
}
});
return totalBytes.get();
}
}

View File

@@ -13,11 +13,15 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import javax.sql.DataSource;
import com.gitee.dbswitch.core.service.impl.MigrationMetaDataServiceImpl;
import com.gitee.dbswitch.data.domain.PerfStat;
import com.gitee.dbswitch.data.handler.MigrationHandler;
import com.gitee.dbswitch.data.util.BytesUnitUtils;
import com.gitee.dbswitch.data.util.DataSouceUtils;
import com.gitee.dbswitch.data.util.StrUtils;
import org.springframework.beans.factory.annotation.Autowired;
@@ -37,7 +41,7 @@ import lombok.extern.slf4j.Slf4j;
* @author tang
*/
@Slf4j
@Service("MainService")
@Service
public class MainService {
private ObjectMapper jackson = new ObjectMapper();
@@ -45,6 +49,12 @@ public class MainService {
@Autowired
private DbswichProperties properties;
private List<PerfStat> perfStats;
public MainService(){
perfStats=new ArrayList<>();
}
/**
* 执行主逻辑
*/
@@ -52,10 +62,10 @@ public class MainService {
StopWatch watch = new StopWatch();
watch.start();
log.info("service is running....");
log.info("service is started....");
try {
//log.info("Application properties configuration \n{}", jackson.writeValueAsString(properties));
//log.info("Application properties configuration \n{}", properties);
HikariDataSource targetDataSource = DataSouceUtils.createTargetDataSource(properties.getTarget());
@@ -66,7 +76,9 @@ public class MainService {
for (DbswichProperties.SourceDataSourceProperties sourceProperties : sourcesProperties) {
HikariDataSource sourceDataSource = DataSouceUtils.createSourceDataSource(sourceProperties);
IMetaDataService sourceMetaDataService = getMetaDataService(sourceDataSource);
IMetaDataService sourceMetaDataService = new MigrationMetaDataServiceImpl();
sourceMetaDataService.setDatabaseConnection(JdbcTemplateUtils.getDatabaseProduceName(sourceDataSource));
// 判断处理的策略:是排除还是包含
List<String> includes = StrUtils.stringToList(sourceProperties.getSourceIncludes());
@@ -86,8 +98,9 @@ public class MainService {
List<String> schemas = StrUtils.stringToList(sourceProperties.getSourceSchema());
log.info("Source schema names is :{}", jackson.writeValueAsString(schemas));
AtomicInteger numberOfFailures = new AtomicInteger();
AtomicInteger numberOfFailures = new AtomicInteger(0);
AtomicLong totalBytesSize=new AtomicLong(0L);
final int indexInternal=sourcePropertiesIndex;
for (String schema : schemas) {
// 读取源库指定schema里所有的表
List<TableDescription> tableList = sourceMetaDataService.queryTableList(sourceProperties.getUrl(),
@@ -97,44 +110,26 @@ public class MainService {
} else {
for (TableDescription td : tableList) {
String tableName = td.getTableName();
Supplier<Long> supplier = () -> new MigrationHandler(td, properties, indexInternal, sourceDataSource, targetDataSource).call();
Function<Throwable, Long> exceptFunction = (e) -> {
log.error("Error migration for table: {}.{}, error message:", td.getSchemaName(), td.getTableName(), e);
numberOfFailures.incrementAndGet();
throw new RuntimeException(e);
};
Consumer<Long> finishConsumer = (r) -> totalBytesSize.addAndGet(r.longValue());
CompletableFuture<Void> future = CompletableFuture.supplyAsync(supplier).exceptionally(exceptFunction).thenAccept(finishConsumer);
if (useExcludeTables) {
if (!filters.contains(tableName)) {
futures.add(CompletableFuture.runAsync(
new MigrationHandler(td, properties, sourcePropertiesIndex, sourceDataSource, targetDataSource)
).exceptionally(
(e) -> {
log.error("Error migration table: {}.{}, error message:", td.getSchemaName(), td.getTableName(), e);
numberOfFailures.incrementAndGet();
throw new RuntimeException(e);
}
)
);
futures.add(future);
}
} else {
if (includes.size() == 1 && includes.get(0).contains("*")) {
if (includes.size() == 1 && (includes.get(0).contains("*") || includes.get(0).contains("?"))) {
if (Pattern.matches(includes.get(0), tableName)) {
futures.add(CompletableFuture.runAsync(
new MigrationHandler(td, properties, sourcePropertiesIndex, sourceDataSource, targetDataSource)
).exceptionally(
(e) -> {
log.error("Error migration table: {}.{}, error message:", td.getSchemaName(), td.getTableName(), e);
numberOfFailures.incrementAndGet();
throw new RuntimeException(e);
}
)
);
futures.add(future);
}
} else if (includes.contains(tableName)) {
futures.add(CompletableFuture.runAsync(
new MigrationHandler(td, properties, sourcePropertiesIndex, sourceDataSource, targetDataSource)
).exceptionally(
(e) -> {
log.error("Error migration table: {}.{}, error message:", td.getSchemaName(), td.getTableName(), e);
numberOfFailures.incrementAndGet();
throw new RuntimeException(e);
}
)
);
futures.add(future);
}
}
@@ -144,10 +139,10 @@ public class MainService {
}
CompletableFuture<Void> allFuture=CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{}));
allFuture.get();
log.info("#### Complete data migration for the [ {} ] data source ,total count={}, failure count={}",
sourcePropertiesIndex, futures.size(), numberOfFailures);
CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{})).join();
log.info("#### Complete data migration for the [ {} ] data source:\ntotal count={}\nfailure count={}\ntotal bytes size={}",
sourcePropertiesIndex, futures.size(), numberOfFailures.get(), BytesUnitUtils.bytesSizeToHuman(totalBytesSize.get()));
perfStats.add(new PerfStat(sourcePropertiesIndex,futures.size(),numberOfFailures.get(),totalBytesSize.get()));
DataSouceUtils.closeHikariDataSource(sourceDataSource);
++sourcePropertiesIndex;
@@ -159,19 +154,11 @@ public class MainService {
} finally {
watch.stop();
log.info("total elipse = {} s", watch.getTotalTimeSeconds());
System.out.println("===================================");
System.out.println(String.format("total elipse time:\t %f s", watch.getTotalTimeSeconds()));
this.perfStats.stream().forEach(st -> System.out.println(st));
System.out.println("===================================");
}
}
/**
* 获取MetaDataService对象
*
* @param dataSource
* @return IMetaDataService
*/
private IMetaDataService getMetaDataService(DataSource dataSource) {
IMetaDataService metaDataService = new MigrationMetaDataServiceImpl();
metaDataService.setDatabaseConnection(JdbcTemplateUtils.getDatabaseProduceName(dataSource));
return metaDataService;
}
}

View File

@@ -5,7 +5,7 @@
<parent>
<groupId>com.gitee</groupId>
<artifactId>dbswitch</artifactId>
<version>1.5.5</version>
<version>1.5.6</version>
</parent>
<artifactId>dbswitch-dbchange</artifactId>

View File

@@ -3,7 +3,7 @@
<parent>
<groupId>com.gitee</groupId>
<artifactId>dbswitch</artifactId>
<version>1.5.5</version>
<version>1.5.6</version>
</parent>
<artifactId>dbswitch-dbcommon</artifactId>

View File

@@ -3,7 +3,7 @@
<parent>
<groupId>com.gitee</groupId>
<artifactId>dbswitch</artifactId>
<version>1.5.5</version>
<version>1.5.6</version>
</parent>
<artifactId>dbswitch-dbsynch</artifactId>

View File

@@ -5,7 +5,7 @@
<parent>
<groupId>com.gitee</groupId>
<artifactId>dbswitch</artifactId>
<version>1.5.5</version>
<version>1.5.6</version>
</parent>
<artifactId>dbswitch-dbwriter</artifactId>

View File

@@ -5,7 +5,7 @@
<parent>
<groupId>com.gitee</groupId>
<artifactId>dbswitch</artifactId>
<version>1.5.5</version>
<version>1.5.6</version>
</parent>
<artifactId>dbswitch-pgwriter</artifactId>

View File

@@ -5,7 +5,7 @@
<parent>
<groupId>com.gitee</groupId>
<artifactId>dbswitch</artifactId>
<version>1.5.5</version>
<version>1.5.6</version>
</parent>
<artifactId>dbswitch-sql</artifactId>

View File

@@ -5,7 +5,7 @@
<parent>
<groupId>com.gitee</groupId>
<artifactId>dbswitch</artifactId>
<version>1.5.5</version>
<version>1.5.6</version>
</parent>
<artifactId>dbswitch-webapi</artifactId>

View File

@@ -5,7 +5,7 @@
<parent>
<groupId>com.gitee</groupId>
<artifactId>dbswitch</artifactId>
<version>1.5.5</version>
<version>1.5.6</version>
</parent>
<artifactId>package-tool</artifactId>

View File

@@ -11,7 +11,7 @@
<groupId>com.gitee</groupId>
<artifactId>dbswitch</artifactId>
<version>1.5.5</version>
<version>1.5.6</version>
<packaging>pom</packaging>
<name>dbswitch</name>
<description>database switch project</description>

View File

@@ -1,6 +1,6 @@
@echo off
set APP_VERSION=1.5.5
set APP_VERSION=1.5.6
echo "Clean Project ..."
call mvn clean -f pom.xml