v1.6.16:修复issue I5X9ED、I5XNB7、I5XNLZ等问题

This commit is contained in:
inrgihc
2022-10-25 21:27:58 +08:00
parent f7bf8f134e
commit c6ec7ef22e
5 changed files with 52 additions and 38 deletions

View File

@@ -78,6 +78,7 @@ public class MigrationHandler implements Supplier<Long> {
// 目的端
private final HikariDataSource targetDataSource;
private ProductTypeEnum targetProductType;
private Set<String> targetExistTables;
private String targetSchemaName;
private String targetTableName;
private List<ColumnDescription> targetColumnDescriptions;
@@ -90,15 +91,17 @@ public class MigrationHandler implements Supplier<Long> {
DbswichProperties properties,
Integer sourcePropertiesIndex,
HikariDataSource sds,
HikariDataSource tds) {
return new MigrationHandler(td, properties, sourcePropertiesIndex, sds, tds);
HikariDataSource tds,
Set<String> targetExistTables) {
return new MigrationHandler(td, properties, sourcePropertiesIndex, sds, tds, targetExistTables);
}
private MigrationHandler(TableDescription td,
DbswichProperties properties,
Integer sourcePropertiesIndex,
HikariDataSource sds,
HikariDataSource tds) {
HikariDataSource tds,
Set<String> targetExistTables) {
this.sourceSchemaName = td.getSchemaName();
this.sourceTableName = td.getTableName();
this.properties = properties;
@@ -110,6 +113,7 @@ public class MigrationHandler implements Supplier<Long> {
fetchSize = sourceProperties.getFetchSize();
}
this.targetExistTables = targetExistTables;
// 获取映射转换后新的表名
this.targetSchemaName = properties.getTarget().getTargetSchema();
this.targetTableName = PatterNameUtils.getFinalName(td.getTableName(),
@@ -252,14 +256,7 @@ public class MigrationHandler implements Supplier<Long> {
throw new RuntimeException("task is interrupted");
}
IMetaDataByDatasourceService metaDataByDatasourceService =
new MetaDataByDataSourceServiceImpl(targetDataSource, targetProductType);
List<String> targetTableNames = metaDataByDatasourceService
.queryTableList(targetSchemaName)
.stream().map(TableDescription::getTableName)
.collect(Collectors.toList());
if (!targetTableNames.contains(targetSchemaName)) {
if (!targetExistTables.contains(targetTableName)) {
// 当目标端不存在该表时,则生成建表语句并创建
List<String> sqlCreateTable = sourceMetaDataService.getDDLCreateTableSQL(
targetProductType,
@@ -289,6 +286,8 @@ public class MigrationHandler implements Supplier<Long> {
// 判断是否具备变化量同步的条件1两端表结构一致且都有一样的主键字段(2)MySQL使用Innodb引擎
if (properties.getTarget().getChangeDataSync()) {
// 根据主键情况判断同步的方式:增量同步或覆盖同步
IMetaDataByDatasourceService metaDataByDatasourceService =
new MetaDataByDataSourceServiceImpl(targetDataSource, targetProductType);
List<String> dbTargetPks = metaDataByDatasourceService.queryTablePrimaryKeys(
targetSchemaName, targetTableName);

View File

@@ -24,12 +24,14 @@ import com.zaxxer.hikari.HikariDataSource;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.stereotype.Service;
@@ -103,6 +105,9 @@ public class MigrationService {
//log.info("Application properties configuration \n{}", properties);
try (HikariDataSource targetDataSource = DataSourceUtils.createTargetDataSource(properties.getTarget())) {
IMetaDataByDatasourceService tdsService = new MetaDataByDataSourceServiceImpl(targetDataSource);
Set<String> tablesAlreadyExist = tdsService.queryTableList(properties.getTarget().getTargetSchema())
.stream().map(TableDescription::getTableName).collect(Collectors.toSet());
int sourcePropertiesIndex = 0;
int totalTableCount = 0;
List<SourceDataSourceProperties> sourcesProperties = properties.getSource();
@@ -160,19 +165,19 @@ public class MigrationService {
if (useExcludeTables) {
if (!filters.contains(tableName)) {
futures.add(
makeFutureTask(td, indexInternal, sourceDataSource, targetDataSource,
makeFutureTask(td, indexInternal, sourceDataSource, targetDataSource, tablesAlreadyExist,
numberOfFailures, totalBytesSize));
}
} else {
if (includes.size() == 1 && (includes.get(0).contains("*") || includes.get(0).contains("?"))) {
if (Pattern.matches(includes.get(0), tableName)) {
futures.add(
makeFutureTask(td, indexInternal, sourceDataSource, targetDataSource,
makeFutureTask(td, indexInternal, sourceDataSource, targetDataSource, tablesAlreadyExist,
numberOfFailures, totalBytesSize));
}
} else if (includes.contains(tableName)) {
futures.add(
makeFutureTask(td, indexInternal, sourceDataSource, targetDataSource,
makeFutureTask(td, indexInternal, sourceDataSource, targetDataSource, tablesAlreadyExist,
numberOfFailures, totalBytesSize));
}
}
@@ -222,6 +227,7 @@ public class MigrationService {
* @param indexInternal 源端索引号
* @param sds 源端的DataSource数据源
* @param tds 目的端的DataSource数据源
* @param exists 目的端已经存在的表名列表
* @param numberOfFailures 失败的数量
* @param totalBytesSize 同步的字节大小
* @return CompletableFuture<Void>
@@ -231,10 +237,11 @@ public class MigrationService {
Integer indexInternal,
HikariDataSource sds,
HikariDataSource tds,
Set<String> exists,
AtomicInteger numberOfFailures,
AtomicLong totalBytesSize) {
return CompletableFuture
.supplyAsync(getMigrateHandler(td, indexInternal, sds, tds), this.taskExecutor)
.supplyAsync(getMigrateHandler(td, indexInternal, sds, tds, exists), this.taskExecutor)
.exceptionally(getExceptHandler(td, numberOfFailures))
.thenAccept(totalBytesSize::addAndGet);
}
@@ -246,14 +253,16 @@ public class MigrationService {
* @param indexInternal 源端索引号
* @param sds 源端的DataSource数据源
* @param tds 目的端的DataSource数据源
* @param exists 目的端已经存在的表名列表
* @return Supplier<Long>
*/
private Supplier<Long> getMigrateHandler(
TableDescription td,
Integer indexInternal,
HikariDataSource sds,
HikariDataSource tds) {
MigrationHandler instance = MigrationHandler.createInstance(td, properties, indexInternal, sds, tds);
HikariDataSource tds,
Set<String> exists) {
MigrationHandler instance = MigrationHandler.createInstance(td, properties, indexInternal, sds, tds, exists);
migrationHandlers.add(instance);
return instance;
}