支持指定表名前缀

This commit is contained in:
inrgihc
2021-03-01 18:23:26 +08:00
parent c64ecc459e
commit 23ae5e159c
3 changed files with 35 additions and 31 deletions

View File

@@ -43,6 +43,7 @@ public class DbswichProperties {
private Integer fetchSize=5000;
private String sourceSchema="";
private String prefixTable="";
private String sourceIncludes="";
private String sourceExcludes="";
}

View File

@@ -78,48 +78,48 @@ public class MainService {
properties.getTarget().getWriterEngineInsert().booleanValue());
log.info("service is running....");
//log.info("Application properties configuration :{}", jackson.writeValueAsString(this.properties));
//log.info("Application properties configuration :{}", jackson.writeValueAsString(properties));
List<DbswichProperties.SourceDataSourceProperties> sources = properties.getSource();
List<DbswichProperties.SourceDataSourceProperties> sourcesProperties = properties.getSource();
for (DbswichProperties.SourceDataSourceProperties source : sources) {
HikariDataSource sourceDataSource = this.createSourceDataSource(source);
for (DbswichProperties.SourceDataSourceProperties sourceProperties : sourcesProperties) {
HikariDataSource sourceDataSource = this.createSourceDataSource(sourceProperties);
DatabaseTypeEnum sourceDatabaseType = JdbcTemplateUtils.getDatabaseProduceName(sourceDataSource);
metaDataService.setDatabaseConnection(sourceDatabaseType);
metaDataService.setDatabaseConnection(JdbcTemplateUtils.getDatabaseProduceName(sourceDataSource));
// 判断处理的策略:是排除还是包含
List<String> includes = stringToList(source.getSourceIncludes());
List<String> includes = stringToList(sourceProperties.getSourceIncludes());
log.info("Includes tables is :{}", jackson.writeValueAsString(includes));
List<String> filters = stringToList(source.getSourceExcludes());
List<String> filters = stringToList(sourceProperties.getSourceExcludes());
log.info("Filter tables is :{}", jackson.writeValueAsString(filters));
boolean useExcludeTables = includes.isEmpty();
if (useExcludeTables) {
log.info("!!!! Use dbswitch.source[i].source-excludes to filter tables");
log.info("!!!! Use dbswitch.source[{}].source-excludes to filter tables", sourcesProperties.indexOf(sourceProperties));
} else {
log.info("!!!! Use dbswitch.source[i].source-includes to filter tables");
log.info("!!!! Use dbswitch.source[{}].source-includes to filter tables", sourcesProperties.indexOf(sourceProperties));
}
List<String> schemas = stringToList(source.getSourceSchema());
List<String> schemas = stringToList(sourceProperties.getSourceSchema());
log.info("Source schema names is :{}", jackson.writeValueAsString(schemas));
for (String schema : schemas) {
// 读取源库指定schema里所有的表
List<TableDescription> tableList = metaDataService.queryTableList(source.getUrl(),
source.getUsername(), source.getPassword(), schema);
List<TableDescription> tableList = metaDataService.queryTableList(sourceProperties.getUrl(),
sourceProperties.getUsername(), sourceProperties.getPassword(), schema);
if (tableList.isEmpty()) {
log.warn("### Find table list empty for shema={}", schema);
log.warn("### Find source database table list empty for shema={}", schema);
} else {
int finished = 0;
for (TableDescription td : tableList) {
String tableName = td.getTableName();
if (useExcludeTables) {
if (!filters.contains(tableName)) {
this.doDataMigration(td, source, sourceDataSource, writer);
this.doDataMigration(td, sourceProperties, sourceDataSource, writer);
}
} else {
if (includes.contains(tableName)) {
this.doDataMigration(td, source, sourceDataSource, writer);
this.doDataMigration(td, sourceProperties, sourceDataSource, writer);
}
}
@@ -156,7 +156,7 @@ public class MainService {
private void doDataMigration(TableDescription tableDescription,
DbswichProperties.SourceDataSourceProperties sourceProperties, HikariDataSource sourceDataSource,
IDatabaseWriter writer) {
log.info("migration table for {} ", tableDescription.getTableName());
log.info("Migrate table for {}.{} ", tableDescription.getSchemaName(), tableDescription.getTableName());
JdbcTemplate targetJdbcTemplate = new JdbcTemplate(writer.getDataSource());
DatabaseTypeEnum targetDatabaseType = JdbcTemplateUtils.getDatabaseProduceName(writer.getDataSource());
@@ -168,12 +168,12 @@ public class MainService {
// 先drop表
try {
IDatabaseOperator targetOperator = DatabaseOperatorFactory
.createDatabaseOperator(writer.getDataSource());
targetOperator.dropTable(properties.getTarget().getTargetSchema(), tableDescription.getTableName());
IDatabaseOperator targetOperator = DatabaseOperatorFactory.createDatabaseOperator(writer.getDataSource());
targetOperator.dropTable(properties.getTarget().getTargetSchema(),
sourceProperties.getPrefixTable() + tableDescription.getTableName());
} catch (Exception e) {
log.info("Target Table {}.{} is not exits!", properties.getTarget().getTargetSchema(),
tableDescription.getTableName());
sourceProperties.getPrefixTable() + tableDescription.getTableName());
}
// 然后create表
@@ -184,7 +184,8 @@ public class MainService {
sourceProperties.getUsername(), sourceProperties.getPassword(), tableDescription.getSchemaName(),
tableDescription.getTableName());
String sqlCreateTable = metaDataService.getDDLCreateTableSQL(targetDatabaseType, columnDescs, primaryKeys,
properties.getTarget().getTargetSchema(), tableDescription.getTableName(),
properties.getTarget().getTargetSchema(),
sourceProperties.getPrefixTable() + tableDescription.getTableName(),
properties.getTarget().getCreateTableAutoIncrement().booleanValue());
targetJdbcTemplate.execute(sqlCreateTable);
log.info("Execute SQL: \n{}", sqlCreateTable);
@@ -193,7 +194,7 @@ public class MainService {
} else {
// 判断是否具备变化量同步的条件1两端表结构一致且都有一样的主键字段(2)MySQL使用Innodb引擎
if (properties.getTarget().getChangeDataSynch().booleanValue()) {
// 根据主键情况判断推送的方式
// 根据主键情况判断同步的方式:增量同步或覆盖同步
JdbcMetaDataUtils mds = new JdbcMetaDataUtils(sourceDataSource);
JdbcMetaDataUtils mdt = new JdbcMetaDataUtils(writer.getDataSource());
List<String> pks1 = mds.queryTablePrimaryKeys(tableDescription.getSchemaName(),
@@ -237,11 +238,11 @@ public class MainService {
final int BATCH_SIZE = fetchSize;
// 准备目的端的数据写入操作
writer.prepareWrite(properties.getTarget().getTargetSchema(), tableDescription.getTableName());
writer.prepareWrite(properties.getTarget().getTargetSchema(), sourceProperties.getPrefixTable() + tableDescription.getTableName());
// 清空目的端表的数据
IDatabaseOperator targetOperator = DatabaseOperatorFactory.createDatabaseOperator(writer.getDataSource());
targetOperator.truncateTableData(properties.getTarget().getTargetSchema(), tableDescription.getTableName());
targetOperator.truncateTableData(properties.getTarget().getTargetSchema(), sourceProperties.getPrefixTable() + tableDescription.getTableName());
// 查询源端数据并写入目的端
IDatabaseOperator sourceOperator = DatabaseOperatorFactory.createDatabaseOperator(sourceDataSource);
@@ -313,12 +314,12 @@ public class MainService {
DatabaseTypeEnum sourceDatabaseType = JdbcTemplateUtils.getDatabaseProduceName(sourceDataSource);
String fullTableName = CommonUtils.getTableFullNameByDatabase(sourceDatabaseType,
tableDescription.getSchemaName(), tableDescription.getTableName());
tableDescription.getSchemaName(), sourceProperties.getPrefixTable() + tableDescription.getTableName());
TaskParamBean.TaskParamBeanBuilder taskBuilder = TaskParamBean.builder();
taskBuilder.oldDataSource(writer.getDataSource());
taskBuilder.oldSchemaName(properties.getTarget().getTargetSchema());
taskBuilder.oldTableName(tableDescription.getTableName());
taskBuilder.oldTableName(sourceProperties.getPrefixTable() + tableDescription.getTableName());
taskBuilder.newDataSource(sourceDataSource);
taskBuilder.newSchemaName(tableDescription.getSchemaName());
taskBuilder.newTableName(tableDescription.getTableName());
@@ -327,7 +328,7 @@ public class MainService {
TaskParamBean param = taskBuilder.build();
IDatabaseSynchronize synch = DatabaseSynchronizeFactory.createDatabaseWriter(writer.getDataSource());
synch.prepare(param.getOldSchemaName(), param.getOldTableName(), fields, pks);
synch.prepare(param.getOldSchemaName(), sourceProperties.getPrefixTable() + param.getOldTableName(), fields, pks);
IDatabaseChangeCaculator changeCaculator = new ChangeCaculatorService();
changeCaculator.setFetchSize(BATCH_SIZE);
@@ -482,7 +483,7 @@ public class MainService {
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
String versionString = jdbcTemplate.queryForObject("SELECT version()", String.class);
if (Objects.nonNull(versionString) && versionString.contains("Greenplum")) {
log.info("#### Target database is Greenplum Clusterclose optimizer now: set optimizer to 'off' ");
log.info("#### Target database is Greenplum Cluster, Close Optimizer now: set optimizer to 'off' ");
ds.setConnectionInitSql("set optimizer to 'off'");
}
}

View File

@@ -1,5 +1,5 @@
# source database connection information
## support MySQL/MariaDB/DB2/DM/Kingbase/Oracle/SQLServer/PostgreSQL/Greenplum
## support MySQL/MariaDB/DB2/DM/Kingbase8/Oracle/SQLServer/PostgreSQL/Greenplum
dbswitch.source[0].url= jdbc:oracle:thin:@172.17.20.58:1521:ORCL
dbswitch.source[0].driver-class-name= oracle.jdbc.driver.OracleDriver
dbswitch.source[0].username= tang
@@ -9,13 +9,15 @@ dbswitch.source[0].password= tang
dbswitch.source[0].fetch-size=10000
## schema name for query source database
dbswitch.source[0].source-schema=TANG
## prefix of table name for target name
dbswitch.source[0].prefix-table=TA_
## table name include from table lists
dbswitch.source[0].source-includes=
## table name exclude from table lists
dbswitch.source[0].source-excludes=
# target database connection information
## support Oracle/PostgreSQL/Greenplum/DM/Kingbase
## Best support for Oracle/PostgreSQL/Greenplum/DM/Kingbase8
dbswitch.target.url= jdbc:postgresql://172.17.20.44:5432/study
dbswitch.target.driver-class-name= org.postgresql.Driver
dbswitch.target.username= tang