递增值的增量字段同步

This commit is contained in:
inrgihc
2025-04-21 22:31:55 +08:00
parent a046c510ff
commit 15d55ed864
57 changed files with 952 additions and 230 deletions

View File

@@ -9,11 +9,13 @@
/////////////////////////////////////////////////////////////
package org.dromara.dbswitch.data.entity;
import org.dromara.dbswitch.common.entity.PatternMapper;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.dromara.dbswitch.common.type.ProductTypeEnum;
import lombok.Data;
import org.dromara.dbswitch.common.entity.PatternMapper;
import org.dromara.dbswitch.common.type.ProductTypeEnum;
/**
* 源端参数配置
@@ -32,11 +34,15 @@ public class SourceDataSourceProperties {
private Long connectionTimeout = TimeUnit.SECONDS.toMillis(60);
private Long maxLifeTime = TimeUnit.MINUTES.toMillis(60);
private String beforeSqlScripts;
private String afterSqlScripts;
private Integer fetchSize = 5000;
private String sourceSchema = "";
private String tableType = "TABLE";
private String sourceIncludes = "";
private String sourceExcludes = "";
private List<PatternMapper> regexTableMapper;
private List<PatternMapper> regexColumnMapper;
private Map<String, String> incrTableColumns = Collections.emptyMap();
private List<PatternMapper> regexTableMapper = Collections.emptyList();
private List<PatternMapper> regexColumnMapper = Collections.emptyList();
}

View File

@@ -10,40 +10,6 @@
package org.dromara.dbswitch.data.handler;
import cn.hutool.core.io.unit.DataSizeUtil;
import org.dromara.dbswitch.core.calculate.DefaultChangeCalculatorService;
import org.dromara.dbswitch.core.calculate.RecordRowChangeCalculator;
import org.dromara.dbswitch.core.calculate.RecordRowHandler;
import org.dromara.dbswitch.core.calculate.RowChangeTypeEnum;
import org.dromara.dbswitch.core.calculate.TaskParamEntity;
import org.dromara.dbswitch.common.consts.Constants;
import org.dromara.dbswitch.common.entity.CloseableDataSource;
import org.dromara.dbswitch.common.entity.ResultSetWrapper;
import org.dromara.dbswitch.common.type.CaseConvertEnum;
import org.dromara.dbswitch.common.type.ProductTypeEnum;
import org.dromara.dbswitch.common.util.DatabaseAwareUtils;
import org.dromara.dbswitch.common.util.JdbcTypesUtils;
import org.dromara.dbswitch.common.util.PatterNameUtils;
import org.dromara.dbswitch.core.basic.exchange.BatchElement;
import org.dromara.dbswitch.core.basic.exchange.MemChannel;
import org.dromara.dbswitch.core.basic.task.TaskProcessor;
import org.dromara.dbswitch.data.config.DbswichPropertiesConfiguration;
import org.dromara.dbswitch.data.domain.ReaderTaskParam;
import org.dromara.dbswitch.data.domain.ReaderTaskResult;
import org.dromara.dbswitch.data.entity.SourceDataSourceProperties;
import org.dromara.dbswitch.data.entity.TargetDataSourceProperties;
import org.dromara.dbswitch.core.provider.ProductFactoryProvider;
import org.dromara.dbswitch.core.provider.ProductProviderFactory;
import org.dromara.dbswitch.core.provider.manage.TableManageProvider;
import org.dromara.dbswitch.core.provider.meta.MetadataProvider;
import org.dromara.dbswitch.core.provider.query.TableDataQueryProvider;
import org.dromara.dbswitch.core.provider.sync.TableDataSynchronizeProvider;
import org.dromara.dbswitch.core.provider.transform.RecordTransformProvider;
import org.dromara.dbswitch.core.provider.write.TableDataWriteProvider;
import org.dromara.dbswitch.core.schema.ColumnDescription;
import org.dromara.dbswitch.core.schema.TableDescription;
import org.dromara.dbswitch.core.schema.SourceProperties;
import org.dromara.dbswitch.core.service.DefaultMetadataService;
import org.dromara.dbswitch.core.service.MetadataService;
import com.google.common.collect.Lists;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
@@ -58,8 +24,44 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.dromara.dbswitch.common.consts.Constants;
import org.dromara.dbswitch.common.entity.CloseableDataSource;
import org.dromara.dbswitch.common.entity.IncrementPoint;
import org.dromara.dbswitch.common.entity.ResultSetWrapper;
import org.dromara.dbswitch.common.type.CaseConvertEnum;
import org.dromara.dbswitch.common.type.ProductTypeEnum;
import org.dromara.dbswitch.common.util.DatabaseAwareUtils;
import org.dromara.dbswitch.common.util.JdbcTypesUtils;
import org.dromara.dbswitch.common.util.PatterNameUtils;
import org.dromara.dbswitch.core.basic.exchange.BatchElement;
import org.dromara.dbswitch.core.basic.exchange.MemChannel;
import org.dromara.dbswitch.core.basic.task.TaskProcessor;
import org.dromara.dbswitch.core.calculate.DefaultChangeCalculatorService;
import org.dromara.dbswitch.core.calculate.RecordRowChangeCalculator;
import org.dromara.dbswitch.core.calculate.RecordRowHandler;
import org.dromara.dbswitch.core.calculate.RowChangeTypeEnum;
import org.dromara.dbswitch.core.calculate.TaskParamEntity;
import org.dromara.dbswitch.core.provider.ProductFactoryProvider;
import org.dromara.dbswitch.core.provider.ProductProviderFactory;
import org.dromara.dbswitch.core.provider.manage.TableManageProvider;
import org.dromara.dbswitch.core.provider.meta.MetadataProvider;
import org.dromara.dbswitch.core.provider.query.TableDataQueryProvider;
import org.dromara.dbswitch.core.provider.sync.TableDataSynchronizeProvider;
import org.dromara.dbswitch.core.provider.transform.RecordTransformProvider;
import org.dromara.dbswitch.core.provider.write.TableDataWriteProvider;
import org.dromara.dbswitch.core.schema.ColumnDescription;
import org.dromara.dbswitch.core.schema.ColumnValue;
import org.dromara.dbswitch.core.schema.SourceProperties;
import org.dromara.dbswitch.core.schema.TableDescription;
import org.dromara.dbswitch.core.service.DefaultMetadataService;
import org.dromara.dbswitch.core.service.MetadataService;
import org.dromara.dbswitch.data.config.DbswichPropertiesConfiguration;
import org.dromara.dbswitch.data.domain.ReaderTaskParam;
import org.dromara.dbswitch.data.domain.ReaderTaskResult;
import org.dromara.dbswitch.data.entity.SourceDataSourceProperties;
import org.dromara.dbswitch.data.entity.TargetDataSourceProperties;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.util.StringUtils;
/**
* 数据读取线程体(一个表的读)
@@ -87,6 +89,7 @@ public class ReaderTaskThread extends TaskProcessor<ReaderTaskResult> {
private String sourceTableRemarks;
private List<ColumnDescription> sourceColumnDescriptions;
private List<String> sourcePrimaryKeys;
private Map<String, String> incrTableColumns;
// 目的端
private final CloseableDataSource targetDataSource;
@@ -101,8 +104,8 @@ public class ReaderTaskThread extends TaskProcessor<ReaderTaskResult> {
private String tableNameMapString;
// 统计信息
AtomicLong totalBytes = new AtomicLong(0);
AtomicLong totalCount = new AtomicLong(0);
private AtomicLong totalBytes = new AtomicLong(0);
private AtomicLong totalCount = new AtomicLong(0);
private CountDownLatch robotCountDownLatch;
@@ -118,6 +121,7 @@ public class ReaderTaskThread extends TaskProcessor<ReaderTaskResult> {
this.targetProductType = this.targetProperties.getType();
this.sourceSchemaName = this.sourceProperties.getSourceSchema();
this.sourceTableName = this.tableDescription.getTableName();
this.incrTableColumns = this.sourceProperties.getIncrTableColumns();
this.targetExistTables = taskParam.getTargetExistTables();
this.robotCountDownLatch = taskParam.getCountDownLatch();
}
@@ -128,7 +132,6 @@ public class ReaderTaskThread extends TaskProcessor<ReaderTaskResult> {
fetchSize = sourceProperties.getFetchSize();
}
if (this.targetProductType.isLikeHive()) {
// !! hive does not support upper table name and upper column name
properties.getTarget().setTableNameCase(CaseConvertEnum.LOWER);
@@ -202,7 +205,7 @@ public class ReaderTaskThread extends TaskProcessor<ReaderTaskResult> {
for (int i = 0; i < sourceColumnDescriptions.size(); ++i) {
String sourceColumnName = sourceColumnDescriptions.get(i).getFieldName();
String targetColumnName = targetColumnDescriptions.get(i).getFieldName();
if (StringUtils.hasLength(targetColumnName)) {
if (StringUtils.isNotBlank(targetColumnName)) {
columnMapperPairs.add(String.format("%s --> %s", sourceColumnName, targetColumnName));
mapChecker.put(sourceColumnName, targetColumnName);
} else {
@@ -272,7 +275,7 @@ public class ReaderTaskThread extends TaskProcessor<ReaderTaskResult> {
List<String> sqlCreateTable = sourceMetaDataService.getDDLCreateTableSQL(
targetMetaProvider,
targetColumnDescriptions.stream()
.filter(column -> StringUtils.hasLength(column.getFieldName()))
.filter(column -> StringUtils.isNotBlank(column.getFieldName()))
.collect(Collectors.toList()),
targetPrimaryKeys,
targetSchemaName,
@@ -321,7 +324,7 @@ public class ReaderTaskThread extends TaskProcessor<ReaderTaskResult> {
List<String> sqlCreateTable = sourceMetaDataService.getDDLCreateTableSQL(
targetMetaProvider,
targetColumnDescriptions.stream()
.filter(column -> StringUtils.hasLength(column.getFieldName()))
.filter(column -> StringUtils.isNotBlank(column.getFieldName()))
.collect(Collectors.toList()),
targetPrimaryKeys,
targetSchemaName,
@@ -341,9 +344,29 @@ public class ReaderTaskThread extends TaskProcessor<ReaderTaskResult> {
return doFullCoverSynchronize(targetWriter, targetTableManager, sourceQuerier, transformProvider);
}
// 判断是否具备变化量同步的条件1两端表结构一致且都有一样的主键字段(2)MySQL使用Innodb引擎
if (properties.getTarget().getChangeDataSync()) {
// 根据主键情况判断同步的方式:增量同步或覆盖同步
if (incrTableColumns.containsKey(sourceTableName)) {
// 处理指定增量字段的增量同步
String incrSourceColumnName = this.incrTableColumns.get(sourceTableName);
String incrTargetColumnName = mapChecker.get(incrSourceColumnName);
if (org.apache.commons.lang3.StringUtils.isBlank(incrTargetColumnName)) {
throw new RuntimeException("增量字段在目标端表中不存在");
}
MetadataService service = new DefaultMetadataService(targetDataSource, targetProductType);
ColumnValue columnValue = service.queryIncrementPoint(targetSchemaName, targetTableName, incrTargetColumnName);
IncrementPoint incrPoint = IncrementPoint.EMPTY;
if (null != columnValue) {
if (!JdbcTypesUtils.isInteger(columnValue.getJdbcType())
&& !JdbcTypesUtils.isDateTime(columnValue.getJdbcType())) {
throw new RuntimeException("增量字段必须为整型或时间类型");
}
incrPoint = new IncrementPoint(incrSourceColumnName, columnValue.getValue());
}
log.info("Table: {}.{} has increment column: {}", sourceSchemaName, sourceTableName, incrSourceColumnName);
return doFullCoverSynchronize(targetWriter, targetTableManager, sourceQuerier, transformProvider, incrPoint);
} else if (properties.getTarget().getChangeDataSync()) {
log.info("Check table: {}.{} can whether use change data sync", sourceSchemaName, sourceTableName);
// 判断是否具备变化量同步的条件1两端表结构一致且都有一样的主键字段(2)MySQL使用Innodb引擎
// 根据主键情况判断同步的方式:变化量同步或覆盖同步
MetadataService metaDataByDatasourceService =
new DefaultMetadataService(targetDataSource, targetProductType);
List<String> dbTargetPks = metaDataByDatasourceService.queryTablePrimaryKeys(
@@ -357,12 +380,13 @@ public class ReaderTaskThread extends TaskProcessor<ReaderTaskResult> {
targetSchemaName, targetTableName, targetDataSource)) {
return doFullCoverSynchronize(targetWriter, targetTableManager, sourceQuerier, transformProvider);
} else {
return doIncreaseSynchronize(targetSynchronizer, transformProvider);
return doChangeSynchronize(targetSynchronizer, transformProvider);
}
} else {
return doFullCoverSynchronize(targetWriter, targetTableManager, sourceQuerier, transformProvider);
}
} else {
log.info("Table: {}.{} with target value changeDataSync=false", sourceSchemaName, sourceTableName);
return doFullCoverSynchronize(targetWriter, targetTableManager, sourceQuerier, transformProvider);
}
}
@@ -379,6 +403,21 @@ public class ReaderTaskThread extends TaskProcessor<ReaderTaskResult> {
*/
private ReaderTaskResult doFullCoverSynchronize(TableDataWriteProvider tableWriter, TableManageProvider tableManager,
TableDataQueryProvider sourceQuerier, RecordTransformProvider transformer) {
return doFullCoverSynchronize(tableWriter, tableManager, sourceQuerier, transformer, IncrementPoint.EMPTY);
}
/**
* 执行覆盖同步
*
* @param tableWriter
* @param tableManager
* @param sourceQuerier
* @param transformer
* @param incrementPoint
* @return ReaderTaskResult
*/
private ReaderTaskResult doFullCoverSynchronize(TableDataWriteProvider tableWriter, TableManageProvider tableManager,
TableDataQueryProvider sourceQuerier, RecordTransformProvider transformer, IncrementPoint incrementPoint) {
final int BATCH_SIZE = fetchSize;
List<String> sourceFields = new ArrayList<>();
@@ -394,16 +433,22 @@ public class ReaderTaskThread extends TaskProcessor<ReaderTaskResult> {
// 准备目的端的数据写入操作
tableWriter.prepareWrite(targetSchemaName, targetTableName, targetFields);
// 清空目的端表的数据
tableManager.truncateTableData(targetSchemaName, targetTableName);
// 增量模式不能清空目表的数据
if (IncrementPoint.EMPTY == incrementPoint) {
// 清空目的端表的数据
tableManager.truncateTableData(targetSchemaName, targetTableName);
}
// 查询源端数据并写入目的端
sourceQuerier.setQueryFetchSize(BATCH_SIZE);
ResultSetWrapper srs = sourceQuerier.queryTableData(
sourceSchemaName, sourceTableName, sourceFields
sourceSchemaName, sourceTableName, sourceFields, incrementPoint
);
String syncMethod = (IncrementPoint.EMPTY == incrementPoint)
? "FullCoverSync" : "IncrementSync";
List<Object[]> cache = new LinkedList<>();
long cacheBytes = 0;
try (ResultSet rs = srs.getResultSet()) {
@@ -435,8 +480,8 @@ public class ReaderTaskThread extends TaskProcessor<ReaderTaskResult> {
.tableNameMapString(tableNameMapString)
.handler((arg1, arg2, logger) -> {
long ret = tableWriter.write(arg1, arg2);
logger.info("[FullCoverSync] handle write table [{}] batch record count: {}, the bytes size: {}",
tableNameMapString, ret, DataSizeUtil.format(finalCacheBytes));
logger.info("[{}] handle write table [{}] batch record count: {}, the bytes size: {}",
syncMethod, tableNameMapString, ret, DataSizeUtil.format(finalCacheBytes));
return ret;
})
.arg1(Lists.newArrayList(targetFields))
@@ -456,8 +501,8 @@ public class ReaderTaskThread extends TaskProcessor<ReaderTaskResult> {
.tableNameMapString(tableNameMapString)
.handler((arg1, arg2, logger) -> {
long ret = tableWriter.write(arg1, arg2);
logger.info("[FullCoverSync] handle write table [{}] batch record count: {}, the bytes size: {}",
tableNameMapString, ret, DataSizeUtil.format(finalCacheBytes));
logger.info("[{}] handle write table [{}] batch record count: {}, the bytes size: {}",
syncMethod, tableNameMapString, ret, DataSizeUtil.format(finalCacheBytes));
return ret;
})
.arg1(Lists.newArrayList(targetFields))
@@ -468,10 +513,10 @@ public class ReaderTaskThread extends TaskProcessor<ReaderTaskResult> {
totalBytes.addAndGet(cacheBytes);
}
log.info("[FullCoverSync] handle read table [{}] total record count: {}, total bytes = {}",
tableNameMapString, totalCount.get(), DataSizeUtil.format(totalBytes.get()));
log.info("[{}] handle read table [{}] total record count: {}, total bytes = {}",
syncMethod, tableNameMapString, totalCount.get(), DataSizeUtil.format(totalBytes.get()));
} catch (Throwable e) {
log.warn("[FullCoverSync] handle read table [{}] error: {}", e.getMessage());
log.warn("[{}] handle read table [{}] error: {}", syncMethod, e.getMessage());
if (e instanceof RuntimeException) {
throw (RuntimeException) e;
}
@@ -498,7 +543,7 @@ public class ReaderTaskThread extends TaskProcessor<ReaderTaskResult> {
* @param transformer
* @return ReaderTaskResult
*/
private ReaderTaskResult doIncreaseSynchronize(TableDataSynchronizeProvider synchronizer,
private ReaderTaskResult doChangeSynchronize(TableDataSynchronizeProvider synchronizer,
RecordTransformProvider transformer) {
final int BATCH_SIZE = fetchSize;
@@ -539,7 +584,7 @@ public class ReaderTaskThread extends TaskProcessor<ReaderTaskResult> {
calculator.setCheckJdbcType(false);
// 执行实际的变化同步过程
log.info("[IncreaseSync] Handle table by compare [{}] data now ... ", tableNameMapString);
log.info("[ChangeSync] Handle table by compare [{}] data now ... ", tableNameMapString);
calculator.executeCalculate(param, new RecordRowHandler() {
private long countInsert = 0;
@@ -627,7 +672,7 @@ public class ReaderTaskThread extends TaskProcessor<ReaderTaskResult> {
doUpdate(fields);
}
log.info("[IncreaseSync] Handle table by compare [{}] total count: {}, Insert:{},Update:{},Delete:{} ",
log.info("[ChangeSync] Handle table by compare [{}] total count: {}, Insert:{},Update:{},Delete:{} ",
tableNameMapString, countTotal, countInsert, countUpdate, countDelete);
}
@@ -637,7 +682,7 @@ public class ReaderTaskThread extends TaskProcessor<ReaderTaskResult> {
.tableNameMapString(tableNameMapString)
.handler((arg1, arg2, logger) -> {
long ret = synchronizer.executeInsert(arg2);
logger.info("[IncreaseSync] Handle write table [{}] record Insert count: {}",
logger.info("[ChangeSync] Handle write table [{}] record Insert count: {}",
tableNameMapString, ret);
return ret;
})
@@ -654,7 +699,7 @@ public class ReaderTaskThread extends TaskProcessor<ReaderTaskResult> {
.tableNameMapString(tableNameMapString)
.handler((arg1, arg2, logger) -> {
long ret = synchronizer.executeUpdate(arg2);
logger.info("[IncreaseSync] Handle write table [{}] record Update count: {}",
logger.info("[ChangeSync] Handle write table [{}] record Update count: {}",
tableNameMapString, ret);
return ret;
})
@@ -671,7 +716,7 @@ public class ReaderTaskThread extends TaskProcessor<ReaderTaskResult> {
.tableNameMapString(tableNameMapString)
.handler((arg1, arg2, logger) -> {
long ret = synchronizer.executeDelete(arg2);
logger.info("[IncreaseSync] Handle write table [{}] record Delete count: {}",
logger.info("[ChangeSync] Handle write table [{}] record Delete count: {}",
tableNameMapString, ret);
return ret;
})

View File

@@ -9,6 +9,13 @@
/////////////////////////////////////////////////////////////
package org.dromara.dbswitch.data.service;
import java.sql.Connection;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.dromara.dbswitch.common.entity.CloseableDataSource;
import org.dromara.dbswitch.common.entity.LoggingRunnable;
import org.dromara.dbswitch.common.entity.MdcKeyValue;
@@ -20,13 +27,6 @@ import org.dromara.dbswitch.data.config.DbswichPropertiesConfiguration;
import org.dromara.dbswitch.data.entity.GlobalParamConfigProperties;
import org.dromara.dbswitch.data.util.DataSourceUtils;
import org.dromara.dbswitch.data.util.MachineUtils;
import java.sql.Connection;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.jdbc.datasource.init.ScriptUtils;
import org.springframework.stereotype.Service;
@@ -124,12 +124,19 @@ public class MigrationService implements Runnable {
try (CloseableDataSource sourceDataSource = DataSourceUtils.createSourceDataSource(configuration.getSource())) {
robotReader = new DefaultReaderRobot(mdcKeyValue, configuration, sourceDataSource, targetDataSource);
robotWriter = new DefaultWriterRobot(mdcKeyValue, robotReader, writeThreadNum, concurrentWrite);
boolean success = executeSqlScripts(targetDataSource, configuration.getTarget().getBeforeSqlScripts());
boolean sourceSuccess = executeSqlScripts(sourceDataSource, configuration.getSource().getBeforeSqlScripts());
try {
exchanger.exchange(robotReader, robotWriter);
boolean targetSuccess = executeSqlScripts(targetDataSource, configuration.getTarget().getBeforeSqlScripts());
try {
exchanger.exchange(robotReader, robotWriter);
} finally {
if (targetSuccess) {
executeSqlScripts(targetDataSource, configuration.getTarget().getAfterSqlScripts());
}
}
} finally {
if (success) {
executeSqlScripts(targetDataSource, configuration.getTarget().getAfterSqlScripts());
if (sourceSuccess) {
executeSqlScripts(sourceDataSource, configuration.getSource().getAfterSqlScripts());
}
}
}
@@ -160,7 +167,7 @@ public class MigrationService implements Runnable {
}
}
private boolean executeSqlScripts(CloseableDataSource targetDataSource, String sqlScripts) {
private boolean executeSqlScripts(CloseableDataSource closeableDataSource, String sqlScripts) {
if (StringUtils.isBlank(sqlScripts) || StringUtils.isBlank(sqlScripts.trim())) {
return true;
}
@@ -171,7 +178,7 @@ public class MigrationService implements Runnable {
sqlList);
if (!sqlList.isEmpty()) {
try {
try (Connection connection = targetDataSource.getConnection();
try (Connection connection = closeableDataSource.getConnection();
Statement statement = connection.createStatement()) {
for (String sql : sqlList) {
log.info("Execute sql : {}", sql);

View File

@@ -20,6 +20,9 @@ dbswitch:
source-includes: ''
## table name exclude from table lists, separate by ','
source-excludes: ''
## table incr column name for table data increment synchronization
incr-table-columns:
t_test_table: id
## table name convert mapper by regular expression
regex-table-mapper:
- from-pattern: '^'