version for 1.6.3

This commit is contained in:
inrgihc
2022-01-23 00:53:48 +08:00
parent df66d0e1b4
commit 6fea38b0e4
281 changed files with 11250 additions and 10915 deletions

View File

@@ -5,7 +5,7 @@
<parent>
<groupId>com.gitee.dbswitch</groupId>
<artifactId>dbswitch-parent</artifactId>
<version>1.6.2</version>
<version>1.6.3</version>
</parent>
<artifactId>dbswitch-data</artifactId>

View File

@@ -9,8 +9,7 @@
/////////////////////////////////////////////////////////////
package com.gitee.dbswitch.data;
import com.gitee.dbswitch.data.config.DbswichProperties;
import com.gitee.dbswitch.data.core.MainService;
import com.gitee.dbswitch.data.service.MigrationService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.Banner;
import org.springframework.boot.SpringApplication;
@@ -34,15 +33,12 @@ public class DataSyncApplication {
springApplication.setBannerMode(Banner.Mode.OFF);
ConfigurableApplicationContext applicationContext = springApplication.run(args);
try {
DbswichProperties properties = applicationContext.getBean(DbswichProperties.class);
MainService mainService = new MainService(properties);
mainService.run();
applicationContext.getBean(MigrationService.class).run();
} catch (Exception e) {
log.error("error:", e);
} finally {
applicationContext.close();
}
}
}

View File

@@ -11,10 +11,10 @@ package com.gitee.dbswitch.data.config;
import java.util.ArrayList;
import java.util.List;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import lombok.Data;
/**
* 属性映射配置
@@ -23,42 +23,44 @@ import lombok.Data;
*/
@Configuration
@Data
@ConfigurationProperties(prefix = "dbswitch", ignoreInvalidFields = false, ignoreUnknownFields = false)
@ConfigurationProperties(prefix = "dbswitch", ignoreUnknownFields = false)
@PropertySource(
value = {"classpath:config.properties", "classpath:config.yml"},
ignoreResourceNotFound=true,
factory = DbswitchPropertySourceFactory.class)
value = {"classpath:config.properties", "classpath:config.yml"},
ignoreResourceNotFound = true,
factory = DbswitchPropertySourceFactory.class)
public class DbswichProperties {
private List<SourceDataSourceProperties> source = new ArrayList<>();
private List<SourceDataSourceProperties> source = new ArrayList<>();
private TargetDataSourceProperties target = new TargetDataSourceProperties();
private TargetDataSourceProperties target = new TargetDataSourceProperties();
@Data
public static class SourceDataSourceProperties {
private String url;
private String driverClassName;
private String username;
private String password;
@Data
public static class SourceDataSourceProperties {
private Integer fetchSize = 5000;
private String sourceSchema = "";
private String prefixTable = "";
private String sourceIncludes = "";
private String sourceExcludes = "";
}
private String url;
private String driverClassName;
private String username;
private String password;
@Data
public static class TargetDataSourceProperties {
private String url;
private String driverClassName;
private String username;
private String password;
private Integer fetchSize = 5000;
private String sourceSchema = "";
private String prefixTable = "";
private String sourceIncludes = "";
private String sourceExcludes = "";
}
private String targetSchema = "";
private Boolean targetDrop = Boolean.TRUE;
private Boolean createTableAutoIncrement = Boolean.FALSE;
private Boolean writerEngineInsert = Boolean.FALSE;
private Boolean changeDataSynch = Boolean.FALSE;
}
@Data
public static class TargetDataSourceProperties {
private String url;
private String driverClassName;
private String username;
private String password;
private String targetSchema = "";
private Boolean targetDrop = Boolean.TRUE;
private Boolean createTableAutoIncrement = Boolean.FALSE;
private Boolean writerEngineInsert = Boolean.FALSE;
private Boolean changeDataSync = Boolean.FALSE;
}
}

View File

@@ -9,13 +9,13 @@
/////////////////////////////////////////////////////////////
package com.gitee.dbswitch.data.config;
import java.io.IOException;
import java.util.Properties;
import org.springframework.beans.factory.config.YamlPropertiesFactoryBean;
import org.springframework.core.env.PropertiesPropertySource;
import org.springframework.core.env.PropertySource;
import org.springframework.core.io.support.DefaultPropertySourceFactory;
import org.springframework.core.io.support.EncodedResource;
import java.io.IOException;
import java.util.Properties;
/**
* 同时支持.properties和.yaml两种配置类型
@@ -24,22 +24,23 @@ import java.util.Properties;
*/
public class DbswitchPropertySourceFactory extends DefaultPropertySourceFactory {
private static final String suffixYml = ".yml";
private static final String suffixYaml = ".yaml";
private static final String suffixYml = ".yml";
private static final String suffixYaml = ".yaml";
@Override
public PropertySource<?> createPropertySource(String name, EncodedResource resource) throws IOException {
String sourceName = name != null ? name : resource.getResource().getFilename();
if (!resource.getResource().exists()) {
return new PropertiesPropertySource(sourceName, new Properties());
} else if (sourceName.endsWith(suffixYml) || sourceName.endsWith(suffixYaml)) {
YamlPropertiesFactoryBean factory = new YamlPropertiesFactoryBean();
factory.setResources(resource.getResource());
factory.afterPropertiesSet();
return new PropertiesPropertySource(sourceName, factory.getObject());
} else {
return super.createPropertySource(name, resource);
}
@Override
public PropertySource<?> createPropertySource(String name, EncodedResource resource)
throws IOException {
String sourceName = name != null ? name : resource.getResource().getFilename();
if (!resource.getResource().exists()) {
return new PropertiesPropertySource(sourceName, new Properties());
} else if (sourceName.endsWith(suffixYml) || sourceName.endsWith(suffixYaml)) {
YamlPropertiesFactoryBean factory = new YamlPropertiesFactoryBean();
factory.setResources(resource.getResource());
factory.afterPropertiesSet();
return new PropertiesPropertySource(sourceName, factory.getObject());
} else {
return super.createPropertySource(name, resource);
}
}
}

View File

@@ -16,21 +16,22 @@ import lombok.Data;
/**
* 统计信息
*
* @Author: tang
* @author tang
*/
@Data
@AllArgsConstructor
public class PerfStat {
private Integer index;
private Integer total;
private Integer failure;
private Long bytes;
@Override
public String toString() {
return "Data Source Index: \t" + index + "\n" +
"Total Tables Count: \t" + total + "\n" +
"Failure Tables count: \t" + failure + "\n" +
"Total Transfer Size: \t" + BytesUnitUtils.bytesSizeToHuman(bytes) + "\n";
}
private Integer index;
private Integer total;
private Integer failure;
private Long bytes;
@Override
public String toString() {
return "Data Source Index: \t" + index + "\n" +
"Total Tables Count: \t" + total + "\n" +
"Failure Tables count: \t" + failure + "\n" +
"Total Transfer Size: \t" + BytesUnitUtils.bytesSizeToHuman(bytes) + "\n";
}
}

View File

@@ -9,8 +9,7 @@
/////////////////////////////////////////////////////////////
package com.gitee.dbswitch.data.handler;
import org.ehcache.sizeof.SizeOf;
import com.gitee.dbswitch.common.constant.DatabaseTypeEnum;
import com.gitee.dbswitch.common.type.DatabaseTypeEnum;
import com.gitee.dbswitch.common.util.CommonUtils;
import com.gitee.dbswitch.core.model.ColumnDescription;
import com.gitee.dbswitch.core.model.TableDescription;
@@ -19,22 +18,20 @@ import com.gitee.dbswitch.core.service.impl.MigrationMetaDataServiceImpl;
import com.gitee.dbswitch.data.config.DbswichProperties;
import com.gitee.dbswitch.data.util.BytesUnitUtils;
import com.gitee.dbswitch.data.util.JdbcTemplateUtils;
import com.gitee.dbswitch.dbchange.ChangeCaculatorService;
import com.gitee.dbswitch.dbchange.ChangeCalculatorService;
import com.gitee.dbswitch.dbchange.IDatabaseChangeCaculator;
import com.gitee.dbswitch.dbchange.IDatabaseRowHandler;
import com.gitee.dbswitch.dbchange.RecordChangeTypeEnum;
import com.gitee.dbswitch.dbchange.pojo.TaskParamBean;
import com.gitee.dbswitch.dbchange.TaskParamEntity;
import com.gitee.dbswitch.dbcommon.database.DatabaseOperatorFactory;
import com.gitee.dbswitch.dbcommon.database.IDatabaseOperator;
import com.gitee.dbswitch.dbcommon.pojo.StatementResultSet;
import com.gitee.dbswitch.dbcommon.domain.StatementResultSet;
import com.gitee.dbswitch.dbcommon.util.JdbcMetaDataUtils;
import com.gitee.dbswitch.dbsynch.DatabaseSynchronizeFactory;
import com.gitee.dbswitch.dbsynch.IDatabaseSynchronize;
import com.gitee.dbswitch.dbwriter.DatabaseWriterFactory;
import com.gitee.dbswitch.dbwriter.IDatabaseWriter;
import com.zaxxer.hikari.HikariDataSource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jdbc.core.JdbcTemplate;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.LinkedList;
@@ -42,6 +39,9 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.ehcache.sizeof.SizeOf;
import org.springframework.jdbc.core.JdbcTemplate;
/**
* 在一个线程内的单表迁移处理逻辑
@@ -51,320 +51,353 @@ import java.util.function.Supplier;
@Slf4j
public class MigrationHandler implements Supplier<Long> {
private final long MAX_CACHE_BYTES_SIZE = 64 * 1024 * 1024;
private final long MAX_CACHE_BYTES_SIZE = 64 * 1024 * 1024;
private int fetchSize = 100;
private TableDescription tableDescription;
private DbswichProperties properties;
private DbswichProperties.SourceDataSourceProperties sourceProperties;
private HikariDataSource sourceDataSource;
private IMetaDataService sourceMetaDataSerice;
private HikariDataSource targetDataSource;
private int fetchSize = 100;
private final TableDescription tableDescription;
private final DbswichProperties properties;
private final DbswichProperties.SourceDataSourceProperties sourceProperties;
private final HikariDataSource sourceDataSource;
private final IMetaDataService sourceMetaDataService;
private final HikariDataSource targetDataSource;
public static MigrationHandler createInstance(TableDescription td,
DbswichProperties properties,
Integer sourcePropertiesIndex,
HikariDataSource sds,
HikariDataSource tds) {
return new MigrationHandler(td, properties, sourcePropertiesIndex, sds, tds);
public static MigrationHandler createInstance(TableDescription td,
DbswichProperties properties,
Integer sourcePropertiesIndex,
HikariDataSource sds,
HikariDataSource tds) {
return new MigrationHandler(td, properties, sourcePropertiesIndex, sds, tds);
}
private MigrationHandler(TableDescription td,
DbswichProperties properties,
Integer sourcePropertiesIndex,
HikariDataSource sds,
HikariDataSource tds) {
this.tableDescription = td;
this.properties = properties;
this.sourceProperties = properties.getSource().get(sourcePropertiesIndex);
this.sourceDataSource = sds;
this.sourceMetaDataService = new MigrationMetaDataServiceImpl();
this.targetDataSource = tds;
if (sourceProperties.getFetchSize() >= fetchSize) {
fetchSize = sourceProperties.getFetchSize();
}
private MigrationHandler(TableDescription td,
DbswichProperties properties,
Integer sourcePropertiesIndex,
HikariDataSource sds,
HikariDataSource tds) {
this.tableDescription = td;
this.properties = properties;
this.sourceProperties = properties.getSource().get(sourcePropertiesIndex);
this.sourceDataSource = sds;
this.sourceMetaDataSerice = new MigrationMetaDataServiceImpl();
this.targetDataSource = tds;
this.sourceMetaDataService
.setDatabaseConnection(JdbcTemplateUtils.getDatabaseProduceName(sourceDataSource));
}
if (sourceProperties.getFetchSize() >= fetchSize) {
fetchSize = sourceProperties.getFetchSize();
}
@Override
public Long get() {
log.info("Begin Migrate table for {}.{} ", tableDescription.getSchemaName(),
tableDescription.getTableName());
this.sourceMetaDataSerice.setDatabaseConnection(JdbcTemplateUtils.getDatabaseProduceName(sourceDataSource));
}
JdbcTemplate targetJdbcTemplate = new JdbcTemplate(targetDataSource);
DatabaseTypeEnum targetDatabaseType = JdbcTemplateUtils
.getDatabaseProduceName(targetDataSource);
IDatabaseWriter writer = DatabaseWriterFactory.createDatabaseWriter(targetDataSource,
properties.getTarget().getWriterEngineInsert());
@Override
public Long get() {
log.info("Migrate table for {}.{} ", tableDescription.getSchemaName(), tableDescription.getTableName());
if (properties.getTarget().getTargetDrop()) {
/*
如果配置了dbswitch.target.datasource-target-drop=true时先执行drop table语句然后执行create
table语句
*/
JdbcTemplate targetJdbcTemplate = new JdbcTemplate(targetDataSource);
DatabaseTypeEnum targetDatabaseType = JdbcTemplateUtils.getDatabaseProduceName(targetDataSource);
IDatabaseWriter writer = DatabaseWriterFactory.createDatabaseWriter(targetDataSource, properties.getTarget().getWriterEngineInsert().booleanValue());
// 先drop表
try {
IDatabaseOperator targetOperator = DatabaseOperatorFactory
.createDatabaseOperator(targetDataSource);
targetOperator.dropTable(properties.getTarget().getTargetSchema(),
sourceProperties.getPrefixTable() + tableDescription.getTableName());
} catch (Exception e) {
log.info("Target Table {}.{} is not exits!", properties.getTarget().getTargetSchema(),
sourceProperties.getPrefixTable() + tableDescription.getTableName());
}
if (properties.getTarget().getTargetDrop().booleanValue()) {
/**
* 如果配置了dbswitch.target.datasource-target-drop=true时先执行drop table语句然后执行create
* table语句
*/
// 然后create表
List<ColumnDescription> columnDescriptions = sourceMetaDataService
.queryTableColumnMeta(sourceProperties.getUrl(),
sourceProperties.getUsername(), sourceProperties.getPassword(),
tableDescription.getSchemaName(),
tableDescription.getTableName());
List<String> primaryKeys = sourceMetaDataService
.queryTablePrimaryKeys(sourceProperties.getUrl(),
sourceProperties.getUsername(), sourceProperties.getPassword(),
tableDescription.getSchemaName(),
tableDescription.getTableName());
String sqlCreateTable = sourceMetaDataService
.getDDLCreateTableSQL(targetDatabaseType, columnDescriptions, primaryKeys,
properties.getTarget().getTargetSchema(),
sourceProperties.getPrefixTable() + tableDescription.getTableName(),
properties.getTarget().getCreateTableAutoIncrement());
targetJdbcTemplate.execute(sqlCreateTable);
log.info("Execute SQL: \n{}", sqlCreateTable);
// 先drop表
try {
IDatabaseOperator targetOperator = DatabaseOperatorFactory.createDatabaseOperator(targetDataSource);
targetOperator.dropTable(properties.getTarget().getTargetSchema(),
sourceProperties.getPrefixTable() + tableDescription.getTableName());
} catch (Exception e) {
log.info("Target Table {}.{} is not exits!", properties.getTarget().getTargetSchema(),
sourceProperties.getPrefixTable() + tableDescription.getTableName());
}
return doFullCoverSynchronize(tableDescription, sourceProperties, sourceDataSource, writer);
} else {
// 判断是否具备变化量同步的条件1两端表结构一致且都有一样的主键字段(2)MySQL使用Innodb引擎
if (properties.getTarget().getChangeDataSync()) {
// 根据主键情况判断同步的方式:增量同步或覆盖同步
JdbcMetaDataUtils mds = new JdbcMetaDataUtils(sourceDataSource);
JdbcMetaDataUtils mdt = new JdbcMetaDataUtils(targetDataSource);
List<String> pks1 = mds.queryTablePrimaryKeys(tableDescription.getSchemaName(),
tableDescription.getTableName());
List<String> pks2 = mdt.queryTablePrimaryKeys(properties.getTarget().getTargetSchema(),
sourceProperties.getPrefixTable() + tableDescription.getTableName());
// 然后create表
List<ColumnDescription> columnDescs = sourceMetaDataSerice.queryTableColumnMeta(sourceProperties.getUrl(),
sourceProperties.getUsername(), sourceProperties.getPassword(), tableDescription.getSchemaName(),
tableDescription.getTableName());
List<String> primaryKeys = sourceMetaDataSerice.queryTablePrimaryKeys(sourceProperties.getUrl(),
sourceProperties.getUsername(), sourceProperties.getPassword(), tableDescription.getSchemaName(),
tableDescription.getTableName());
String sqlCreateTable = sourceMetaDataSerice.getDDLCreateTableSQL(targetDatabaseType, columnDescs, primaryKeys,
properties.getTarget().getTargetSchema(),
sourceProperties.getPrefixTable() + tableDescription.getTableName(),
properties.getTarget().getCreateTableAutoIncrement().booleanValue());
targetJdbcTemplate.execute(sqlCreateTable);
log.info("Execute SQL: \n{}", sqlCreateTable);
return doFullCoverSynchronize(tableDescription, sourceProperties, sourceDataSource, writer);
if (!pks1.isEmpty() && !pks2.isEmpty() && pks1.containsAll(pks2) && pks2
.containsAll(pks1)) {
if (targetDatabaseType == DatabaseTypeEnum.MYSQL
&& !JdbcTemplateUtils
.isMysqlInnodbStorageEngine(properties.getTarget().getTargetSchema(),
sourceProperties.getPrefixTable() + tableDescription.getTableName(),
targetDataSource)) {
return doFullCoverSynchronize(tableDescription, sourceProperties, sourceDataSource,
writer);
} else {
List<String> fields = mds.queryTableColumnName(tableDescription.getSchemaName(),
tableDescription.getTableName());
return doIncreaseSynchronize(tableDescription, sourceProperties, sourceDataSource,
writer, pks1, fields);
}
} else {
// 判断是否具备变化量同步的条件1两端表结构一致且都有一样的主键字段(2)MySQL使用Innodb引擎
if (properties.getTarget().getChangeDataSynch().booleanValue()) {
// 根据主键情况判断同步的方式:增量同步或覆盖同步
JdbcMetaDataUtils mds = new JdbcMetaDataUtils(sourceDataSource);
JdbcMetaDataUtils mdt = new JdbcMetaDataUtils(targetDataSource);
List<String> pks1 = mds.queryTablePrimaryKeys(tableDescription.getSchemaName(),
tableDescription.getTableName());
List<String> pks2 = mdt.queryTablePrimaryKeys(properties.getTarget().getTargetSchema(),
sourceProperties.getPrefixTable() + tableDescription.getTableName());
if (!pks1.isEmpty() && !pks2.isEmpty() && pks1.containsAll(pks2) && pks2.containsAll(pks1)) {
if (targetDatabaseType == DatabaseTypeEnum.MYSQL
&& !JdbcTemplateUtils.isMysqlInodbStorageEngine(properties.getTarget().getTargetSchema(),
sourceProperties.getPrefixTable() + tableDescription.getTableName(), targetDataSource)) {
return doFullCoverSynchronize(tableDescription, sourceProperties, sourceDataSource, writer);
} else {
List<String> fields = mds.queryTableColumnName(tableDescription.getSchemaName(), tableDescription.getTableName());
return doIncreaseSynchronize(tableDescription, sourceProperties, sourceDataSource, writer, pks1, fields);
}
} else {
return doFullCoverSynchronize(tableDescription, sourceProperties, sourceDataSource, writer);
}
} else {
return doFullCoverSynchronize(tableDescription, sourceProperties, sourceDataSource, writer);
}
return doFullCoverSynchronize(tableDescription, sourceProperties, sourceDataSource,
writer);
}
} else {
return doFullCoverSynchronize(tableDescription, sourceProperties, sourceDataSource, writer);
}
}
}
/**
* 执行覆盖同步
*
* @param tableDescription 表的描述信息,可能是视图表,可能是物理表
* @param writer 目的端的写入器
*/
private Long doFullCoverSynchronize(TableDescription tableDescription,
DbswichProperties.SourceDataSourceProperties sourceProperties,
HikariDataSource sourceDataSource,
IDatabaseWriter writer) {
final int BATCH_SIZE = fetchSize;
/**
* 执行覆盖同步
*
* @param tableDescription 表的描述信息,可能是视图表,可能是物理表
* @param writer 目的端的写入器
*/
private Long doFullCoverSynchronize(TableDescription tableDescription,
DbswichProperties.SourceDataSourceProperties sourceProperties,
HikariDataSource sourceDataSource,
IDatabaseWriter writer) {
final int BATCH_SIZE = fetchSize;
// 准备目的端的数据写入操作
writer.prepareWrite(properties.getTarget().getTargetSchema(), sourceProperties.getPrefixTable() + tableDescription.getTableName());
// 准备目的端的数据写入操作
writer.prepareWrite(properties.getTarget().getTargetSchema(),
sourceProperties.getPrefixTable() + tableDescription.getTableName());
// 清空目的端表的数据
IDatabaseOperator targetOperator = DatabaseOperatorFactory.createDatabaseOperator(writer.getDataSource());
targetOperator.truncateTableData(properties.getTarget().getTargetSchema(), sourceProperties.getPrefixTable() + tableDescription.getTableName());
// 清空目的端表的数据
IDatabaseOperator targetOperator = DatabaseOperatorFactory
.createDatabaseOperator(writer.getDataSource());
targetOperator.truncateTableData(properties.getTarget().getTargetSchema(),
sourceProperties.getPrefixTable() + tableDescription.getTableName());
// 查询源端数据并写入目的端
IDatabaseOperator sourceOperator = DatabaseOperatorFactory.createDatabaseOperator(sourceDataSource);
sourceOperator.setFetchSize(BATCH_SIZE);
// 查询源端数据并写入目的端
IDatabaseOperator sourceOperator = DatabaseOperatorFactory
.createDatabaseOperator(sourceDataSource);
sourceOperator.setFetchSize(BATCH_SIZE);
DatabaseTypeEnum sourceDatabaseType = JdbcTemplateUtils.getDatabaseProduceName(sourceDataSource);
String fullTableName = CommonUtils.getTableFullNameByDatabase(sourceDatabaseType,
tableDescription.getSchemaName(), tableDescription.getTableName());
Map<String, Integer> columnMetaData = JdbcTemplateUtils.getColumnMetaData(new JdbcTemplate(sourceDataSource),
fullTableName);
DatabaseTypeEnum sourceDatabaseType = JdbcTemplateUtils
.getDatabaseProduceName(sourceDataSource);
String fullTableName = CommonUtils.getTableFullNameByDatabase(sourceDatabaseType,
tableDescription.getSchemaName(), tableDescription.getTableName());
Map<String, Integer> columnMetaData = JdbcTemplateUtils
.getColumnMetaData(new JdbcTemplate(sourceDataSource),
fullTableName);
List<String> fields = new ArrayList<>(columnMetaData.keySet());
StatementResultSet srs = sourceOperator.queryTableData(tableDescription.getSchemaName(), tableDescription.getTableName(), fields);
List<String> fields = new ArrayList<>(columnMetaData.keySet());
StatementResultSet srs = sourceOperator
.queryTableData(tableDescription.getSchemaName(), tableDescription.getTableName(), fields);
List<Object[]> cache = new LinkedList<>();
long cacheBytes = 0;
long totalCount = 0;
long totalBytes = 0;
try (ResultSet rs = srs.getResultset();) {
while (rs.next()) {
Object[] record = new Object[fields.size()];
for (int i = 1; i <= fields.size(); ++i) {
try {
record[i - 1] = rs.getObject(i);
} catch (Exception e) {
log.warn("!!! Read data from table [ {} ] use function ResultSet.getObject() error", fullTableName, e);
record[i - 1] = null;
}
}
cache.add(record);
cacheBytes += SizeOf.newInstance().deepSizeOf(record);
++totalCount;
if (cache.size() >= BATCH_SIZE || cacheBytes >= MAX_CACHE_BYTES_SIZE) {
long ret = writer.write(fields, cache);
log.info("[FullCoverSynch] handle table [{}] data count: {}, the batch bytes sie: {}", fullTableName, ret, BytesUnitUtils.bytesSizeToHuman(cacheBytes));
cache.clear();
totalBytes += cacheBytes;
cacheBytes = 0;
}
}
if (cache.size() > 0) {
long ret = writer.write(fields, cache);
log.info("[FullCoverSynch] handle table [{}] data count: {}, last batch bytes sie: {}", fullTableName, ret, BytesUnitUtils.bytesSizeToHuman(cacheBytes));
cache.clear();
totalBytes += cacheBytes;
}
log.info("[FullCoverSynch] handle table [{}] total data count:{}, total bytes={}", fullTableName, totalCount, BytesUnitUtils.bytesSizeToHuman(totalBytes));
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
srs.close();
List<Object[]> cache = new LinkedList<>();
long cacheBytes = 0;
long totalCount = 0;
long totalBytes = 0;
try (ResultSet rs = srs.getResultset()) {
while (rs.next()) {
Object[] record = new Object[fields.size()];
for (int i = 1; i <= fields.size(); ++i) {
try {
record[i - 1] = rs.getObject(i);
} catch (Exception e) {
log.warn("!!! Read data from table [ {} ] use function ResultSet.getObject() error",
fullTableName, e);
record[i - 1] = null;
}
}
return totalBytes;
cache.add(record);
cacheBytes += SizeOf.newInstance().deepSizeOf(record);
++totalCount;
if (cache.size() >= BATCH_SIZE || cacheBytes >= MAX_CACHE_BYTES_SIZE) {
long ret = writer.write(fields, cache);
log.info("[FullCoverSync] handle table [{}] data count: {}, the batch bytes sie: {}",
fullTableName, ret, BytesUnitUtils.bytesSizeToHuman(cacheBytes));
cache.clear();
totalBytes += cacheBytes;
cacheBytes = 0;
}
}
if (cache.size() > 0) {
long ret = writer.write(fields, cache);
log.info("[FullCoverSync] handle table [{}] data count: {}, last batch bytes sie: {}",
fullTableName, ret, BytesUnitUtils.bytesSizeToHuman(cacheBytes));
cache.clear();
totalBytes += cacheBytes;
}
log.info("[FullCoverSync] handle table [{}] total data count:{}, total bytes={}",
fullTableName, totalCount, BytesUnitUtils.bytesSizeToHuman(totalBytes));
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
srs.close();
}
/**
* 变化量同步
*
* @param tableDescription 表的描述信息,这里只能是物理表
* @param writer 目的端的写入器
*/
private Long doIncreaseSynchronize(TableDescription tableDescription,
DbswichProperties.SourceDataSourceProperties sourceProperties, HikariDataSource sourceDataSource,
IDatabaseWriter writer, List<String> pks, List<String> fields) {
final int BATCH_SIZE = fetchSize;
return totalBytes;
}
DatabaseTypeEnum sourceDatabaseType = JdbcTemplateUtils.getDatabaseProduceName(sourceDataSource);
String fullTableName = CommonUtils.getTableFullNameByDatabase(sourceDatabaseType,
tableDescription.getSchemaName(), sourceProperties.getPrefixTable() + tableDescription.getTableName());
/**
* 变化量同步
*
* @param tableDescription 表的描述信息,这里只能是物理表
* @param writer 目的端的写入器
*/
private Long doIncreaseSynchronize(TableDescription tableDescription,
DbswichProperties.SourceDataSourceProperties sourceProperties,
HikariDataSource sourceDataSource,
IDatabaseWriter writer, List<String> pks, List<String> fields) {
final int BATCH_SIZE = fetchSize;
TaskParamBean.TaskParamBeanBuilder taskBuilder = TaskParamBean.builder();
taskBuilder.oldDataSource(writer.getDataSource());
taskBuilder.oldSchemaName(properties.getTarget().getTargetSchema());
taskBuilder.oldTableName(sourceProperties.getPrefixTable() + tableDescription.getTableName());
taskBuilder.newDataSource(sourceDataSource);
taskBuilder.newSchemaName(tableDescription.getSchemaName());
taskBuilder.newTableName(tableDescription.getTableName());
taskBuilder.fieldColumns(fields);
DatabaseTypeEnum sourceDatabaseType = JdbcTemplateUtils
.getDatabaseProduceName(sourceDataSource);
String fullTableName = CommonUtils.getTableFullNameByDatabase(sourceDatabaseType,
tableDescription.getSchemaName(),
sourceProperties.getPrefixTable() + tableDescription.getTableName());
TaskParamBean param = taskBuilder.build();
TaskParamEntity.TaskParamEntityBuilder taskBuilder = TaskParamEntity.builder();
taskBuilder.oldDataSource(writer.getDataSource());
taskBuilder.oldSchemaName(properties.getTarget().getTargetSchema());
taskBuilder.oldTableName(sourceProperties.getPrefixTable() + tableDescription.getTableName());
taskBuilder.newDataSource(sourceDataSource);
taskBuilder.newSchemaName(tableDescription.getSchemaName());
taskBuilder.newTableName(tableDescription.getTableName());
taskBuilder.fieldColumns(fields);
IDatabaseSynchronize synchronizer = DatabaseSynchronizeFactory.createDatabaseWriter(writer.getDataSource());
synchronizer.prepare(param.getOldSchemaName(), param.getOldTableName(), fields, pks);
TaskParamEntity param = taskBuilder.build();
IDatabaseChangeCaculator calculator = new ChangeCaculatorService();
calculator.setFetchSize(BATCH_SIZE);
calculator.setRecordIdentical(false);
calculator.setCheckJdbcType(false);
IDatabaseSynchronize synchronizer = DatabaseSynchronizeFactory
.createDatabaseWriter(writer.getDataSource());
synchronizer.prepare(param.getOldSchemaName(), param.getOldTableName(), fields, pks);
AtomicLong totalBytes = new AtomicLong(0);
IDatabaseChangeCaculator calculator = new ChangeCalculatorService();
calculator.setFetchSize(BATCH_SIZE);
calculator.setRecordIdentical(false);
calculator.setCheckJdbcType(false);
// 执行实际的变化同步过程
calculator.executeCalculate(param, new IDatabaseRowHandler() {
AtomicLong totalBytes = new AtomicLong(0);
private long countInsert = 0;
private long countUpdate = 0;
private long countDelete = 0;
private long countTotal = 0;
private long cacheBytes = 0;
private List<Object[]> cacheInsert = new LinkedList<Object[]>();
private List<Object[]> cacheUpdate = new LinkedList<Object[]>();
private List<Object[]> cacheDelete = new LinkedList<Object[]>();
// 执行实际的变化同步过程
calculator.executeCalculate(param, new IDatabaseRowHandler() {
@Override
public void handle(List<String> fields, Object[] record, RecordChangeTypeEnum flag) {
if (flag == RecordChangeTypeEnum.VALUE_INSERT) {
cacheInsert.add(record);
countInsert++;
} else if (flag == RecordChangeTypeEnum.VALUE_CHANGED) {
cacheUpdate.add(record);
countUpdate++;
} else {
cacheDelete.add(record);
countDelete++;
}
private long countInsert = 0;
private long countUpdate = 0;
private long countDelete = 0;
private long countTotal = 0;
private long cacheBytes = 0;
private final List<Object[]> cacheInsert = new LinkedList<>();
private final List<Object[]> cacheUpdate = new LinkedList<>();
private final List<Object[]> cacheDelete = new LinkedList<>();
cacheBytes += SizeOf.newInstance().deepSizeOf(record);
totalBytes.addAndGet(cacheBytes);
countTotal++;
checkFull(fields);
}
@Override
public void handle(List<String> fields, Object[] record, RecordChangeTypeEnum flag) {
if (flag == RecordChangeTypeEnum.VALUE_INSERT) {
cacheInsert.add(record);
countInsert++;
} else if (flag == RecordChangeTypeEnum.VALUE_CHANGED) {
cacheUpdate.add(record);
countUpdate++;
} else {
cacheDelete.add(record);
countDelete++;
}
/**
* 检测缓存是否已满,如果已满执行同步操作
*
* @param fields 同步的字段列表
*/
private void checkFull(List<String> fields) {
if (cacheInsert.size() >= BATCH_SIZE || cacheUpdate.size() >= BATCH_SIZE
|| cacheDelete.size() >= BATCH_SIZE || cacheBytes >= MAX_CACHE_BYTES_SIZE) {
if (cacheDelete.size() > 0) {
doDelete(fields);
}
cacheBytes += SizeOf.newInstance().deepSizeOf(record);
totalBytes.addAndGet(cacheBytes);
countTotal++;
checkFull(fields);
}
if (cacheInsert.size() > 0) {
doInsert(fields);
}
/**
* 检测缓存是否已满,如果已满执行同步操作
*
* @param fields 同步的字段列表
*/
private void checkFull(List<String> fields) {
if (cacheInsert.size() >= BATCH_SIZE || cacheUpdate.size() >= BATCH_SIZE
|| cacheDelete.size() >= BATCH_SIZE || cacheBytes >= MAX_CACHE_BYTES_SIZE) {
if (cacheDelete.size() > 0) {
doDelete(fields);
}
if (cacheUpdate.size() > 0) {
doUpdate(fields);
}
if (cacheInsert.size() > 0) {
doInsert(fields);
}
log.info("[IncreaseSynch] Handle table [{}] data one batch size: {}", fullTableName, BytesUnitUtils.bytesSizeToHuman(cacheBytes));
cacheBytes = 0;
}
}
if (cacheUpdate.size() > 0) {
doUpdate(fields);
}
@Override
public void destroy(List<String> fields) {
if (cacheDelete.size() > 0) {
doDelete(fields);
}
log.info("[IncreaseSync] Handle table [{}] data one batch size: {}", fullTableName,
BytesUnitUtils.bytesSizeToHuman(cacheBytes));
cacheBytes = 0;
}
}
if (cacheInsert.size() > 0) {
doInsert(fields);
}
@Override
public void destroy(List<String> fields) {
if (cacheDelete.size() > 0) {
doDelete(fields);
}
if (cacheUpdate.size() > 0) {
doUpdate(fields);
}
if (cacheInsert.size() > 0) {
doInsert(fields);
}
log.info("[IncreaseSynch] Handle table [{}] total count: {}, Insert:{},Update:{},Delete:{} ", fullTableName, countTotal,
countInsert, countUpdate, countDelete);
}
if (cacheUpdate.size() > 0) {
doUpdate(fields);
}
private void doInsert(List<String> fields) {
long ret = synchronizer.executeInsert(cacheInsert);
log.info("[IncreaseSynch] Handle table [{}] data Insert count: {}", fullTableName, ret);
cacheInsert.clear();
}
log.info("[IncreaseSync] Handle table [{}] total count: {}, Insert:{},Update:{},Delete:{} ",
fullTableName, countTotal, countInsert, countUpdate, countDelete);
}
private void doUpdate(List<String> fields) {
long ret = synchronizer.executeUpdate(cacheUpdate);
log.info("[IncreaseSynch] Handle table [{}] data Update count: {}", fullTableName, ret);
cacheUpdate.clear();
}
private void doInsert(List<String> fields) {
long ret = synchronizer.executeInsert(cacheInsert);
log.info("[IncreaseSync] Handle table [{}] data Insert count: {}", fullTableName, ret);
cacheInsert.clear();
}
private void doDelete(List<String> fields) {
long ret = synchronizer.executeDelete(cacheDelete);
log.info("[IncreaseSynch] Handle table [{}] data Delete count: {}", fullTableName, ret);
cacheDelete.clear();
}
private void doUpdate(List<String> fields) {
long ret = synchronizer.executeUpdate(cacheUpdate);
log.info("[IncreaseSync] Handle table [{}] data Update count: {}", fullTableName, ret);
cacheUpdate.clear();
}
});
private void doDelete(List<String> fields) {
long ret = synchronizer.executeDelete(cacheDelete);
log.info("[IncreaseSync] Handle table [{}] data Delete count: {}", fullTableName, ret);
cacheDelete.clear();
}
return totalBytes.get();
}
});
return totalBytes.get();
}
}

View File

@@ -7,7 +7,7 @@
// Date : 2020/1/2
// Location: beijing , china
/////////////////////////////////////////////////////////////
package com.gitee.dbswitch.data.core;
package com.gitee.dbswitch.data.service;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.gitee.dbswitch.core.model.TableDescription;
@@ -17,7 +17,7 @@ import com.gitee.dbswitch.data.config.DbswichProperties;
import com.gitee.dbswitch.data.domain.PerfStat;
import com.gitee.dbswitch.data.handler.MigrationHandler;
import com.gitee.dbswitch.data.util.BytesUnitUtils;
import com.gitee.dbswitch.data.util.DataSouceUtils;
import com.gitee.dbswitch.data.util.DataSourceUtils;
import com.gitee.dbswitch.data.util.JdbcTemplateUtils;
import com.gitee.dbswitch.data.util.StrUtils;
import com.zaxxer.hikari.HikariDataSource;
@@ -30,6 +30,7 @@ import java.util.function.Function;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.util.StopWatch;
/**
@@ -38,7 +39,8 @@ import org.springframework.util.StopWatch;
* @author tang
*/
@Slf4j
public class MainService {
@Service
public class MigrationService {
/**
* JSON序列化工具
@@ -60,7 +62,7 @@ public class MainService {
*
* @param properties 配置信息
*/
public MainService(DbswichProperties properties) {
public MigrationService(DbswichProperties properties) {
this.properties = properties;
}
@@ -74,14 +76,14 @@ public class MainService {
log.info("dbswitch data service is started....");
//log.info("Application properties configuration \n{}", properties);
try (HikariDataSource targetDataSource = DataSouceUtils
try (HikariDataSource targetDataSource = DataSourceUtils
.createTargetDataSource(properties.getTarget())) {
int sourcePropertiesIndex = 0;
int totalTableCount = 0;
List<DbswichProperties.SourceDataSourceProperties> sourcesProperties = properties.getSource();
for (DbswichProperties.SourceDataSourceProperties sourceProperties : sourcesProperties) {
try (HikariDataSource sourceDataSource = DataSouceUtils
try (HikariDataSource sourceDataSource = DataSourceUtils
.createSourceDataSource(sourceProperties)) {
IMetaDataService sourceMetaDataService = new MigrationMetaDataServiceImpl();
@@ -148,17 +150,20 @@ public class MainService {
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{})).join();
log.info(
"#### Complete data migration for the [ {} ] data source:\ntotal count={}\nfailure count={}\ntotal bytes size={}",
sourcePropertiesIndex, futures.size(), numberOfFailures.get(),
BytesUnitUtils.bytesSizeToHuman(totalBytesSize.get()));
perfStats.add(new PerfStat(sourcePropertiesIndex, futures.size(), numberOfFailures.get(),
totalBytesSize.get()));
++sourcePropertiesIndex;
totalTableCount += futures.size();
try {
CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{})).get();
log.info(
"#### Complete data migration for the [ {} ] data source:\ntotal count={}\nfailure count={}\ntotal bytes size={}",
sourcePropertiesIndex, futures.size(), numberOfFailures.get(),
BytesUnitUtils.bytesSizeToHuman(totalBytesSize.get()));
perfStats.add(new PerfStat(sourcePropertiesIndex, futures.size(),
numberOfFailures.get(), totalBytesSize.get()));
++sourcePropertiesIndex;
totalTableCount += futures.size();
} catch (InterruptedException e) {
log.warn(" ### Thread is interrupted , exit execute task now ......");
throw e;
}
}
}
log.info("service run all success, total migrate table count={} ", totalTableCount);
@@ -192,9 +197,13 @@ public class MainService {
* @param totalBytesSize 同步的字节大小
* @return CompletableFuture<Void>
*/
private CompletableFuture<Void> makeFutureTask(TableDescription td, Integer indexInternal,
HikariDataSource sds, HikariDataSource tds,
AtomicInteger numberOfFailures, AtomicLong totalBytesSize) {
private CompletableFuture<Void> makeFutureTask(
TableDescription td,
Integer indexInternal,
HikariDataSource sds,
HikariDataSource tds,
AtomicInteger numberOfFailures,
AtomicLong totalBytesSize) {
return CompletableFuture.supplyAsync(getMigrateHandler(td, indexInternal, sds, tds))
.exceptionally(getExceptHandler(td, numberOfFailures))
.thenAccept(totalBytesSize::addAndGet);
@@ -209,8 +218,11 @@ public class MainService {
* @param tds 目的端的DataSource数据源
* @return Supplier<Long>
*/
private Supplier<Long> getMigrateHandler(TableDescription td, Integer indexInternal,
HikariDataSource sds, HikariDataSource tds) {
private Supplier<Long> getMigrateHandler(
TableDescription td,
Integer indexInternal,
HikariDataSource sds,
HikariDataSource tds) {
return () -> MigrationHandler.createInstance(td, properties, indexInternal, sds, tds).get();
}
@@ -221,7 +233,8 @@ public class MainService {
* @param numberOfFailures 失败记录数
* @return Function<Throwable, Long>
*/
private Function<Throwable, Long> getExceptHandler(TableDescription td,
private Function<Throwable, Long> getExceptHandler(
TableDescription td,
AtomicInteger numberOfFailures) {
return (e) -> {
log.error("Error migration for table: {}.{}, error message:", td.getSchemaName(),

View File

@@ -9,33 +9,34 @@ import java.text.DecimalFormat;
*/
public final class BytesUnitUtils {
public static String bytesSizeToHuman(long size) {
/** 定义GB的计算常量 */
long GB = 1024 * 1024 * 1024;
/** 定义MB的计算常量 */
long MB = 1024 * 1024;
/** 定义KB的计算常量 */
long KB = 1024;
public static String bytesSizeToHuman(long size) {
/** 定义GB的计算常量 */
long GB = 1024 * 1024 * 1024;
/** 定义MB的计算常量 */
long MB = 1024 * 1024;
/** 定义KB的计算常量 */
long KB = 1024;
/** 格式化小数 */
DecimalFormat df = new DecimalFormat("0.00");
String resultSize = "0.00";
/** 格式化小数 */
DecimalFormat df = new DecimalFormat("0.00");
String resultSize = "0.00";
if (size / GB >= 1) {
//如果当前Byte的值大于等于1GB
resultSize = df.format(size / (float) GB) + "GB ";
} else if (size / MB >= 1) {
//如果当前Byte的值大于等于1MB
resultSize = df.format(size / (float) MB) + "MB ";
} else if (size / KB >= 1) {
//如果当前Byte的值大于等于1KB
resultSize = df.format(size / (float) KB) + "KB ";
} else {
resultSize = size + "B ";
}
return resultSize;
if (size / GB >= 1) {
//如果当前Byte的值大于等于1GB
resultSize = df.format(size / (float) GB) + "GB ";
} else if (size / MB >= 1) {
//如果当前Byte的值大于等于1MB
resultSize = df.format(size / (float) MB) + "MB ";
} else if (size / KB >= 1) {
//如果当前Byte的值大于等于1KB
resultSize = df.format(size / (float) KB) + "KB ";
} else {
resultSize = size + "B ";
}
private BytesUnitUtils(){}
return resultSize;
}
private BytesUnitUtils() {
}
}

View File

@@ -1,103 +0,0 @@
package com.gitee.dbswitch.data.util;
import com.gitee.dbswitch.data.config.DbswichProperties;
import com.zaxxer.hikari.HikariDataSource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jdbc.core.JdbcTemplate;
import java.util.Objects;
/**
* DataSource工具类
*
* @author tang
* @date 2021/6/8 22:00
*/
@Slf4j
public final class DataSouceUtils {
/**
* 创建于指定数据库连接描述符的连接池
*
* @param description 数据库连接描述符
* @return HikariDataSource连接池
*/
public static HikariDataSource createSourceDataSource(DbswichProperties.SourceDataSourceProperties description) {
HikariDataSource ds = new HikariDataSource();
ds.setPoolName("The_Source_DB_Connection");
ds.setJdbcUrl(description.getUrl());
ds.setDriverClassName(description.getDriverClassName());
ds.setUsername(description.getUsername());
ds.setPassword(description.getPassword());
if (description.getDriverClassName().contains("oracle")) {
ds.setConnectionTestQuery("SELECT 'Hello' from DUAL");
} else if (description.getDriverClassName().contains("db2")) {
ds.setConnectionTestQuery("SELECT 1 FROM SYSIBM.SYSDUMMY1");
} else {
ds.setConnectionTestQuery("SELECT 1");
}
ds.setMaximumPoolSize(8);
ds.setMinimumIdle(5);
ds.setConnectionTimeout(60000);
ds.setIdleTimeout(60000);
return ds;
}
/**
* 创建于指定数据库连接描述符的连接池
*
* @param description 数据库连接描述符
* @return HikariDataSource连接池
*/
public static HikariDataSource createTargetDataSource(DbswichProperties.TargetDataSourceProperties description) {
HikariDataSource ds = new HikariDataSource();
ds.setPoolName("The_Target_DB_Connection");
ds.setJdbcUrl(description.getUrl());
ds.setDriverClassName(description.getDriverClassName());
ds.setUsername(description.getUsername());
ds.setPassword(description.getPassword());
if (description.getDriverClassName().contains("oracle")) {
ds.setConnectionTestQuery("SELECT 'Hello' from DUAL");
} else if (description.getDriverClassName().contains("db2")) {
ds.setConnectionTestQuery("SELECT 1 FROM SYSIBM.SYSDUMMY1");
} else {
ds.setConnectionTestQuery("SELECT 1");
}
ds.setMaximumPoolSize(8);
ds.setMinimumIdle(5);
ds.setConnectionTimeout(30000);
ds.setIdleTimeout(60000);
// 如果是Greenplum数据库这里需要关闭会话的查询优化器
if (description.getDriverClassName().contains("postgresql")) {
org.springframework.jdbc.datasource.DriverManagerDataSource dataSource = new org.springframework.jdbc.datasource.DriverManagerDataSource();
dataSource.setDriverClassName(description.getDriverClassName());
dataSource.setUrl(description.getUrl());
dataSource.setUsername(description.getUsername());
dataSource.setPassword(description.getPassword());
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
String versionString = jdbcTemplate.queryForObject("SELECT version()", String.class);
if (Objects.nonNull(versionString) && versionString.contains("Greenplum")) {
log.info("#### Target database is Greenplum Cluster, Close Optimizer now: set optimizer to 'off' ");
ds.setConnectionInitSql("set optimizer to 'off'");
}
}
return ds;
}
/**
* 关闭HikariDataSource
*
* @param dataSource
*/
public static void closeHikariDataSource(HikariDataSource dataSource){
try {
dataSource.close();
} catch (Exception e) {
log.warn("Close data source error:", e);
}
}
private DataSouceUtils(){}
}

View File

@@ -0,0 +1,95 @@
package com.gitee.dbswitch.data.util;
import com.gitee.dbswitch.data.config.DbswichProperties;
import com.zaxxer.hikari.HikariDataSource;
import java.util.Objects;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jdbc.core.JdbcTemplate;
/**
* DataSource工具类
*
* @author tang
*/
@Slf4j
public final class DataSourceUtils {
/**
* 创建于指定数据库连接描述符的连接池
*
* @param description 数据库连接描述符
* @return HikariDataSource连接池
*/
public static HikariDataSource createSourceDataSource(
DbswichProperties.SourceDataSourceProperties description) {
HikariDataSource ds = new HikariDataSource();
ds.setPoolName("The_Source_DB_Connection");
ds.setJdbcUrl(description.getUrl());
ds.setDriverClassName(description.getDriverClassName());
ds.setUsername(description.getUsername());
ds.setPassword(description.getPassword());
if (description.getDriverClassName().contains("oracle")) {
ds.setConnectionTestQuery("SELECT 'Hello' from DUAL");
// https://blog.csdn.net/qq_20960159/article/details/78593936
System.getProperties().setProperty("oracle.jdbc.J2EE13Compliant", "true");
} else if (description.getDriverClassName().contains("db2")) {
ds.setConnectionTestQuery("SELECT 1 FROM SYSIBM.SYSDUMMY1");
} else {
ds.setConnectionTestQuery("SELECT 1");
}
ds.setMaximumPoolSize(8);
ds.setMinimumIdle(5);
ds.setConnectionTimeout(60000);
ds.setIdleTimeout(60000);
return ds;
}
/**
* 创建于指定数据库连接描述符的连接池
*
* @param description 数据库连接描述符
* @return HikariDataSource连接池
*/
public static HikariDataSource createTargetDataSource(
DbswichProperties.TargetDataSourceProperties description) {
HikariDataSource ds = new HikariDataSource();
ds.setPoolName("The_Target_DB_Connection");
ds.setJdbcUrl(description.getUrl());
ds.setDriverClassName(description.getDriverClassName());
ds.setUsername(description.getUsername());
ds.setPassword(description.getPassword());
if (description.getDriverClassName().contains("oracle")) {
ds.setConnectionTestQuery("SELECT 'Hello' from DUAL");
} else if (description.getDriverClassName().contains("db2")) {
ds.setConnectionTestQuery("SELECT 1 FROM SYSIBM.SYSDUMMY1");
} else {
ds.setConnectionTestQuery("SELECT 1");
}
ds.setMaximumPoolSize(8);
ds.setMinimumIdle(5);
ds.setConnectionTimeout(30000);
ds.setIdleTimeout(60000);
// 如果是Greenplum数据库这里需要关闭会话的查询优化器
if (description.getDriverClassName().contains("postgresql")) {
org.springframework.jdbc.datasource.DriverManagerDataSource dataSource = new org.springframework.jdbc.datasource.DriverManagerDataSource();
dataSource.setDriverClassName(description.getDriverClassName());
dataSource.setUrl(description.getUrl());
dataSource.setUsername(description.getUsername());
dataSource.setPassword(description.getPassword());
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
String versionString = jdbcTemplate.queryForObject("SELECT version()", String.class);
if (Objects.nonNull(versionString) && versionString.contains("Greenplum")) {
log.info(
"#### Target database is Greenplum Cluster, Close Optimizer now: set optimizer to 'off' ");
ds.setConnectionInitSql("set optimizer to 'off'");
}
}
return ds;
}
private DataSourceUtils() {
}
}

View File

@@ -9,121 +9,110 @@
/////////////////////////////////////////////////////////////
package com.gitee.dbswitch.data.util;
import com.gitee.dbswitch.common.type.DatabaseTypeEnum;
import com.gitee.dbswitch.dbcommon.util.DatabaseAwareUtils;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
import javax.sql.DataSource;
import org.springframework.boot.jdbc.DatabaseDriver;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.ConnectionCallback;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.support.JdbcUtils;
import com.gitee.dbswitch.common.constant.DatabaseTypeEnum;
import com.gitee.dbswitch.dbcommon.util.DatabaseAwareUtils;
/**
* JdbcTemplate包制使用工具类型
*
* @author tang
*
* @author tang
*/
public final class JdbcTemplateUtils {
private JdbcTemplateUtils() {
private JdbcTemplateUtils() {
}
}
/**
* 获取数据库类型
*
* @param dataSource 数据源
* @return DatabaseType 数据库类型
*/
public static DatabaseTypeEnum getDatabaseProduceName(
DataSource dataSource) {
String productName = DatabaseAwareUtils.getDatabaseNameByDataSource(dataSource);
if (productName.equalsIgnoreCase("Greenplum")) {
return DatabaseTypeEnum.GREENPLUM;
} else if (productName.equalsIgnoreCase("SQLServer")) {
return DatabaseTypeEnum.SQLSERVER;
} else if (productName.equalsIgnoreCase("DM")) {
return DatabaseTypeEnum.DM;
} else if (productName.equalsIgnoreCase("Kingbase")) {
return DatabaseTypeEnum.KINGBASE;
} else {
DatabaseDriver databaseDriver = DatabaseDriver.fromProductName(productName);
if (DatabaseDriver.MARIADB == databaseDriver) {
return DatabaseTypeEnum.MARIADB;
} else if (DatabaseDriver.MYSQL == databaseDriver) {
return DatabaseTypeEnum.MYSQL;
} else if (DatabaseDriver.ORACLE == databaseDriver) {
return DatabaseTypeEnum.ORACLE;
} else if (DatabaseDriver.POSTGRESQL == databaseDriver) {
return DatabaseTypeEnum.POSTGRESQL;
} else if (DatabaseDriver.DB2 == databaseDriver) {
return DatabaseTypeEnum.DB2;
} else {
throw new RuntimeException(
String.format("Unsupported database type by product name [%s]", productName));
}
}
/**
* 获取数据库类型
*
* @param dataSource 数据源
* @return DatabaseType 数据库类型
*/
public static DatabaseTypeEnum getDatabaseProduceName(DataSource dataSource) {
String productName = DatabaseAwareUtils.getDatabaseNameByDataSource(dataSource);
if (productName.equalsIgnoreCase("Greenplum")) {
return DatabaseTypeEnum.GREENPLUM;
} else if (productName.equalsIgnoreCase("SQLServer")) {
return DatabaseTypeEnum.SQLSERVER;
} else if (productName.equalsIgnoreCase("DM")) {
return DatabaseTypeEnum.DM;
} else if (productName.equalsIgnoreCase("Kingbase")) {
return DatabaseTypeEnum.KINGBASE;
} else {
DatabaseDriver databaseDriver = DatabaseDriver.fromProductName(productName);
if (DatabaseDriver.MARIADB == databaseDriver) {
return DatabaseTypeEnum.MARIADB;
} else if (DatabaseDriver.MYSQL == databaseDriver) {
return DatabaseTypeEnum.MYSQL;
} else if (DatabaseDriver.ORACLE == databaseDriver) {
return DatabaseTypeEnum.ORACLE;
} else if (DatabaseDriver.POSTGRESQL == databaseDriver) {
return DatabaseTypeEnum.POSTGRESQL;
} else if (DatabaseDriver.DB2 == databaseDriver) {
return DatabaseTypeEnum.DB2;
} else {
throw new RuntimeException(String.format("Unsupport database type by product name [%s]", productName));
}
}
}
}
/**
* 获取表字段的元信息
*
* @param sourceJdbcTemplate JdbcTemplate
* @param fullTableName 表的全名
* @return Map<String, Integer>
*/
public static Map<String, Integer> getColumnMetaData(
JdbcTemplate sourceJdbcTemplate,
String fullTableName) {
final String sql = String.format("select * from %s where 1=2", fullTableName);
Map<String, Integer> columnMetaData = new HashMap<>();
sourceJdbcTemplate.execute((Connection connection) -> {
try (Statement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery(sql)) {
ResultSetMetaData rsMetaData = rs.getMetaData();
for (int i = 0, len = rsMetaData.getColumnCount(); i < len; i++) {
columnMetaData.put(rsMetaData.getColumnName(i + 1), rsMetaData.getColumnType(i + 1));
}
return true;
} catch (Exception e) {
throw new RuntimeException(
String.format("获取表:%s 的字段的元信息时失败. 请联系 DBA 核查该库、表信息.", fullTableName), e);
}
});
/**
* 获取表字段的元信息
*
* @param sourceJdbcTemplate
* @param fullTableName 表的全名
* @return
*/
public static Map<String, Integer> getColumnMetaData(JdbcTemplate sourceJdbcTemplate, String fullTableName) {
final String sql = String.format("select * from %s where 1=2", fullTableName);
Map<String, Integer> columnMetaData = new HashMap<String, Integer>();
Boolean ret = sourceJdbcTemplate.execute(new ConnectionCallback<Boolean>() {
return columnMetaData;
}
@Override
public Boolean doInConnection(Connection conn) throws SQLException, DataAccessException {
Statement stmt = null;
ResultSet rs = null;
try {
stmt = conn.createStatement();
rs = stmt.executeQuery(sql);
ResultSetMetaData rsMetaData = rs.getMetaData();
for (int i = 0, len = rsMetaData.getColumnCount(); i < len; i++) {
columnMetaData.put(rsMetaData.getColumnName(i + 1), rsMetaData.getColumnType(i + 1));
}
return true;
} catch (Exception e) {
throw new RuntimeException(String.format("获取表:%s 的字段的元信息时失败. 请联系 DBA 核查该库、表信息.", fullTableName), e);
} finally {
JdbcUtils.closeResultSet(rs);
JdbcUtils.closeStatement(stmt);
}
}
});
if (ret.booleanValue()) {
return columnMetaData;
}
return null;
}
/**
* 检查MySQL数据库表的存储引擎是否为Innodb
*
* @param shemaName schema名
* @param tableName table名
* @param dataSource 数据源
* @return 为Innodb存储引擎时返回True, 否在为false
*/
public static boolean isMysqlInodbStorageEngine(String shemaName, String tableName, DataSource dataSource) {
String sql = "SELECT count(*) as total FROM information_schema.tables WHERE table_schema=? AND table_name=? AND ENGINE='InnoDB'";
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
return jdbcTemplate.queryForObject(sql, new Object[]{shemaName, tableName}, Integer.class) > 0;
}
/**
* 检查MySQL数据库表的存储引擎是否为Innodb
*
* @param schemaName schema名
* @param tableName table名
* @param dataSource 数据源
* @return 为Innodb存储引擎时返回True, 否在为false
*/
public static boolean isMysqlInnodbStorageEngine(
String schemaName,
String tableName,
DataSource dataSource) {
String sql = "SELECT count(*) as total FROM information_schema.tables "
+ "WHERE table_schema=? AND table_name=? AND ENGINE='InnoDB'";
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
return jdbcTemplate.queryForObject(sql, new Object[]{schemaName, tableName}, Integer.class) > 0;
}
}

View File

@@ -1,9 +1,9 @@
package com.gitee.dbswitch.data.util;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
/**
* 字符串工具类
@@ -13,23 +13,23 @@ import java.util.List;
*/
public final class StrUtils {
/**
* 根据逗号切分字符串为数组
*
* @param s 待切分的字符串
* @return List
*/
public static List<String> stringToList(String s) {
if (!StringUtils.isEmpty(s)) {
String[] strs = s.split(",");
if (strs.length > 0) {
return new ArrayList<>(Arrays.asList(strs));
}
}
return new ArrayList<>();
/**
* 根据逗号切分字符串为数组
*
* @param str 待切分的字符串
* @return List
*/
public static List<String> stringToList(String str) {
if (!StringUtils.isEmpty(str)) {
String[] strs = str.split(",");
if (strs.length > 0) {
return new ArrayList<>(Arrays.asList(strs));
}
}
private StrUtils() {
}
return new ArrayList<>();
}
private StrUtils() {
}
}

View File

@@ -34,7 +34,7 @@ dbswitch:
## whether create table support auto increment for primary key field
create-table-auto-increment: false
## whether use insert engine to write data for target database
## Only usefull for PostgreSQL/Greenplum database
## Only useful for PostgreSQL/Greenplum database
writer-engine-insert: false
## whether use change data synchronize to target database table
change-data-synch: true
change-data-sync: true