diff --git a/dbswitch-common/pom.xml b/dbswitch-common/pom.xml index fd1a79e7..d6a8b9b7 100644 --- a/dbswitch-common/pom.xml +++ b/dbswitch-common/pom.xml @@ -5,7 +5,7 @@ com.gitee dbswitch - 1.5.5 + 1.5.6 dbswitch-common diff --git a/dbswitch-core/pom.xml b/dbswitch-core/pom.xml index 1bf27188..a9a72eee 100644 --- a/dbswitch-core/pom.xml +++ b/dbswitch-core/pom.xml @@ -5,7 +5,7 @@ com.gitee dbswitch - 1.5.5 + 1.5.6 dbswitch-core diff --git a/dbswitch-data/pom.xml b/dbswitch-data/pom.xml index 110b308f..8a7ca69f 100644 --- a/dbswitch-data/pom.xml +++ b/dbswitch-data/pom.xml @@ -5,7 +5,7 @@ com.gitee dbswitch - 1.5.5 + 1.5.6 dbswitch-data diff --git a/dbswitch-data/src/main/java/com/gitee/dbswitch/data/config/DbswichProperties.java b/dbswitch-data/src/main/java/com/gitee/dbswitch/data/config/DbswichProperties.java index 52f158db..ce073ff8 100644 --- a/dbswitch-data/src/main/java/com/gitee/dbswitch/data/config/DbswichProperties.java +++ b/dbswitch-data/src/main/java/com/gitee/dbswitch/data/config/DbswichProperties.java @@ -11,6 +11,8 @@ package com.gitee.dbswitch.data.config; import java.util.ArrayList; import java.util.List; + +import lombok.ToString; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.PropertySource; @@ -25,6 +27,7 @@ import lombok.NoArgsConstructor; */ @Configuration @Data +@ToString @ConfigurationProperties(prefix = "dbswitch", ignoreInvalidFields=false, ignoreUnknownFields = false) @PropertySource("classpath:config.properties") public class DbswichProperties { diff --git a/dbswitch-data/src/main/java/com/gitee/dbswitch/data/domain/PerfStat.java b/dbswitch-data/src/main/java/com/gitee/dbswitch/data/domain/PerfStat.java new file mode 100644 index 00000000..2ce6dab5 --- /dev/null +++ b/dbswitch-data/src/main/java/com/gitee/dbswitch/data/domain/PerfStat.java @@ -0,0 +1,36 @@ +// Copyright tang. All rights reserved. +// https://gitee.com/inrgihc/dbswitch +// +// Use of this source code is governed by a BSD-style license +// +// Author: tang (inrgihc@126.com) +// Date : 2020/1/2 +// Location: beijing , china +///////////////////////////////////////////////////////////// +package com.gitee.dbswitch.data.domain; + +import com.gitee.dbswitch.data.util.BytesUnitUtils; +import lombok.AllArgsConstructor; +import lombok.Data; + +/** + * 统计信息 + * + * @Author: tang + */ +@Data +@AllArgsConstructor +public class PerfStat { + private Integer index; + private Integer total; + private Integer failure; + private Long bytes; + + @Override + public String toString() { + return "Data Source Index: \t" + index + "\n" + + "Total Tables Count: \t" + total + "\n" + + "Failure Tables count: \t" + failure + "\n" + + "Total Transfer Size: \t" + BytesUnitUtils.bytesSizeToHuman(bytes); + } +} diff --git a/dbswitch-data/src/main/java/com/gitee/dbswitch/data/handler/MigrationHandler.java b/dbswitch-data/src/main/java/com/gitee/dbswitch/data/handler/MigrationHandler.java index d38fc939..4276457e 100644 --- a/dbswitch-data/src/main/java/com/gitee/dbswitch/data/handler/MigrationHandler.java +++ b/dbswitch-data/src/main/java/com/gitee/dbswitch/data/handler/MigrationHandler.java @@ -1,3 +1,12 @@ +// Copyright tang. All rights reserved. +// https://gitee.com/inrgihc/dbswitch +// +// Use of this source code is governed by a BSD-style license +// +// Author: tang (inrgihc@126.com) +// Date : 2020/1/2 +// Location: beijing , china +///////////////////////////////////////////////////////////// package com.gitee.dbswitch.data.handler; import com.carrotsearch.sizeof.RamUsageEstimator; @@ -31,6 +40,8 @@ import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicLong; /** * 在一个线程内的单表迁移处理逻辑 @@ -38,10 +49,11 @@ import java.util.Map; * @author tang */ @Slf4j -public class MigrationHandler implements Runnable { +public class MigrationHandler implements Callable { private final long MAX_CACHE_BYTES_SIZE = 512 * 1024 * 1024; + private int fetchSize = 100; private TableDescription tableDescription; private DbswichProperties properties; private DbswichProperties.SourceDataSourceProperties sourceProperties; @@ -61,11 +73,15 @@ public class MigrationHandler implements Runnable { this.sourceMetaDataSerice = new MigrationMetaDataServiceImpl(); this.targetDataSource = tds; + if (sourceProperties.getFetchSize() >= fetchSize) { + fetchSize = sourceProperties.getFetchSize(); + } + this.sourceMetaDataSerice.setDatabaseConnection(JdbcTemplateUtils.getDatabaseProduceName(sourceDataSource)); } @Override - public void run() { + public Long call() { log.info("Migrate table for {}.{} ", tableDescription.getSchemaName(), tableDescription.getTableName()); JdbcTemplate targetJdbcTemplate = new JdbcTemplate(targetDataSource); @@ -102,7 +118,7 @@ public class MigrationHandler implements Runnable { targetJdbcTemplate.execute(sqlCreateTable); log.info("Execute SQL: \n{}", sqlCreateTable); - this.doFullCoverSynchronize(tableDescription, sourceProperties, sourceDataSource, writer); + return doFullCoverSynchronize(tableDescription, sourceProperties, sourceDataSource, writer); } else { // 判断是否具备变化量同步的条件:(1)两端表结构一致,且都有一样的主键字段;(2)MySQL使用Innodb引擎; if (properties.getTarget().getChangeDataSynch().booleanValue()) { @@ -118,16 +134,16 @@ public class MigrationHandler implements Runnable { if (targetDatabaseType == DatabaseTypeEnum.MYSQL && !JdbcTemplateUtils.isMysqlInodbStorageEngine(properties.getTarget().getTargetSchema(), sourceProperties.getPrefixTable() + tableDescription.getTableName(), targetDataSource)) { - this.doFullCoverSynchronize(tableDescription, sourceProperties, sourceDataSource, writer); + return doFullCoverSynchronize(tableDescription, sourceProperties, sourceDataSource, writer); } else { List fields = mds.queryTableColumnName(tableDescription.getSchemaName(), tableDescription.getTableName()); - this.doIncreaseSynchronize(tableDescription, sourceProperties, sourceDataSource, writer, pks1, fields); + return doIncreaseSynchronize(tableDescription, sourceProperties, sourceDataSource, writer, pks1, fields); } } else { - this.doFullCoverSynchronize(tableDescription, sourceProperties, sourceDataSource, writer); + return doFullCoverSynchronize(tableDescription, sourceProperties, sourceDataSource, writer); } } else { - this.doFullCoverSynchronize(tableDescription, sourceProperties, sourceDataSource, writer); + return doFullCoverSynchronize(tableDescription, sourceProperties, sourceDataSource, writer); } } } @@ -138,14 +154,10 @@ public class MigrationHandler implements Runnable { * @param tableDescription 表的描述信息,可能是视图表,可能是物理表 * @param writer 目的端的写入器 */ - private void doFullCoverSynchronize(TableDescription tableDescription, + private Long doFullCoverSynchronize(TableDescription tableDescription, DbswichProperties.SourceDataSourceProperties sourceProperties, HikariDataSource sourceDataSource, IDatabaseWriter writer) { - int fetchSize = 100; - if (sourceProperties.getFetchSize() >= fetchSize) { - fetchSize = sourceProperties.getFetchSize(); - } final int BATCH_SIZE = fetchSize; // 准备目的端的数据写入操作 @@ -166,15 +178,13 @@ public class MigrationHandler implements Runnable { fullTableName); List fields = new ArrayList<>(columnMetaData.keySet()); - StatementResultSet srs = sourceOperator.queryTableData(tableDescription.getSchemaName(), - tableDescription.getTableName(), fields); + StatementResultSet srs = sourceOperator.queryTableData(tableDescription.getSchemaName(), tableDescription.getTableName(), fields); List cache = new LinkedList<>(); long cacheBytes = 0; long totalCount = 0; long totalBytes = 0; - try { - ResultSet rs = srs.getResultset(); + try (ResultSet rs = srs.getResultset();) { while (rs.next()) { Object[] record = new Object[fields.size()]; for (int i = 1; i <= fields.size(); ++i) { @@ -192,7 +202,7 @@ public class MigrationHandler implements Runnable { if (cache.size() >= BATCH_SIZE || cacheBytes >= MAX_CACHE_BYTES_SIZE) { long ret = writer.write(fields, cache); - log.info("[FullCoverSynch] handle table [{}] data count: {}, batch bytes sie: {}", fullTableName, ret, BytesUnitUtils.bytesSizeToHuman(cacheBytes)); + log.info("[FullCoverSynch] handle table [{}] data count: {}, the batch bytes sie: {}", fullTableName, ret, BytesUnitUtils.bytesSizeToHuman(cacheBytes)); cache.clear(); totalBytes += cacheBytes; cacheBytes = 0; @@ -206,12 +216,14 @@ public class MigrationHandler implements Runnable { totalBytes += cacheBytes; } - log.info("[FullCoverSynch] handle table [{}] total data count:{} ,total bytes={}", fullTableName, totalCount, BytesUnitUtils.bytesSizeToHuman(totalBytes)); + log.info("[FullCoverSynch] handle table [{}] total data count:{}, total bytes={}", fullTableName, totalCount, BytesUnitUtils.bytesSizeToHuman(totalBytes)); } catch (Exception e) { throw new RuntimeException(e); } finally { srs.close(); } + + return totalBytes; } /** @@ -220,13 +232,9 @@ public class MigrationHandler implements Runnable { * @param tableDescription 表的描述信息,这里只能是物理表 * @param writer 目的端的写入器 */ - private void doIncreaseSynchronize(TableDescription tableDescription, + private Long doIncreaseSynchronize(TableDescription tableDescription, DbswichProperties.SourceDataSourceProperties sourceProperties, HikariDataSource sourceDataSource, IDatabaseWriter writer, List pks, List fields) { - int fetchSize = 100; - if (sourceProperties.getFetchSize() >= fetchSize) { - fetchSize = sourceProperties.getFetchSize(); - } final int BATCH_SIZE = fetchSize; DatabaseTypeEnum sourceDatabaseType = JdbcTemplateUtils.getDatabaseProduceName(sourceDataSource); @@ -252,13 +260,15 @@ public class MigrationHandler implements Runnable { calculator.setRecordIdentical(false); calculator.setCheckJdbcType(false); + AtomicLong totalBytes=new AtomicLong(0); + // 执行实际的变化同步过程 calculator.executeCalculate(param, new IDatabaseRowHandler() { private long countInsert = 0; private long countUpdate = 0; private long countDelete = 0; - private long count = 0; + private long countTotal = 0; private long cacheBytes = 0; private List cacheInsert = new LinkedList(); private List cacheUpdate = new LinkedList(); @@ -278,7 +288,8 @@ public class MigrationHandler implements Runnable { } cacheBytes += RamUsageEstimator.sizeOf(record); - count++; + totalBytes.addAndGet(cacheBytes); + countTotal++; checkFull(fields); } @@ -321,7 +332,7 @@ public class MigrationHandler implements Runnable { doUpdate(fields); } - log.info("[IncreaseSynch] Handle table [{}] total count: {}, Insert:{},Update:{},Delete:{} ", fullTableName, count, + log.info("[IncreaseSynch] Handle table [{}] total count: {}, Insert:{},Update:{},Delete:{} ", fullTableName, countTotal, countInsert, countUpdate, countDelete); } @@ -344,6 +355,8 @@ public class MigrationHandler implements Runnable { } }); + + return totalBytes.get(); } } diff --git a/dbswitch-data/src/main/java/com/gitee/dbswitch/data/service/MainService.java b/dbswitch-data/src/main/java/com/gitee/dbswitch/data/service/MainService.java index c6b58db5..5ecc9d91 100644 --- a/dbswitch-data/src/main/java/com/gitee/dbswitch/data/service/MainService.java +++ b/dbswitch-data/src/main/java/com/gitee/dbswitch/data/service/MainService.java @@ -13,11 +13,15 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; import java.util.regex.Pattern; -import javax.sql.DataSource; - import com.gitee.dbswitch.core.service.impl.MigrationMetaDataServiceImpl; +import com.gitee.dbswitch.data.domain.PerfStat; import com.gitee.dbswitch.data.handler.MigrationHandler; +import com.gitee.dbswitch.data.util.BytesUnitUtils; import com.gitee.dbswitch.data.util.DataSouceUtils; import com.gitee.dbswitch.data.util.StrUtils; import org.springframework.beans.factory.annotation.Autowired; @@ -37,7 +41,7 @@ import lombok.extern.slf4j.Slf4j; * @author tang */ @Slf4j -@Service("MainService") +@Service public class MainService { private ObjectMapper jackson = new ObjectMapper(); @@ -45,6 +49,12 @@ public class MainService { @Autowired private DbswichProperties properties; + private List perfStats; + + public MainService(){ + perfStats=new ArrayList<>(); + } + /** * 执行主逻辑 */ @@ -52,10 +62,10 @@ public class MainService { StopWatch watch = new StopWatch(); watch.start(); - log.info("service is running...."); + log.info("service is started...."); try { - //log.info("Application properties configuration \n{}", jackson.writeValueAsString(properties)); + //log.info("Application properties configuration \n{}", properties); HikariDataSource targetDataSource = DataSouceUtils.createTargetDataSource(properties.getTarget()); @@ -66,7 +76,9 @@ public class MainService { for (DbswichProperties.SourceDataSourceProperties sourceProperties : sourcesProperties) { HikariDataSource sourceDataSource = DataSouceUtils.createSourceDataSource(sourceProperties); - IMetaDataService sourceMetaDataService = getMetaDataService(sourceDataSource); + IMetaDataService sourceMetaDataService = new MigrationMetaDataServiceImpl(); + + sourceMetaDataService.setDatabaseConnection(JdbcTemplateUtils.getDatabaseProduceName(sourceDataSource)); // 判断处理的策略:是排除还是包含 List includes = StrUtils.stringToList(sourceProperties.getSourceIncludes()); @@ -86,8 +98,9 @@ public class MainService { List schemas = StrUtils.stringToList(sourceProperties.getSourceSchema()); log.info("Source schema names is :{}", jackson.writeValueAsString(schemas)); - AtomicInteger numberOfFailures = new AtomicInteger(); - + AtomicInteger numberOfFailures = new AtomicInteger(0); + AtomicLong totalBytesSize=new AtomicLong(0L); + final int indexInternal=sourcePropertiesIndex; for (String schema : schemas) { // 读取源库指定schema里所有的表 List tableList = sourceMetaDataService.queryTableList(sourceProperties.getUrl(), @@ -97,44 +110,26 @@ public class MainService { } else { for (TableDescription td : tableList) { String tableName = td.getTableName(); + Supplier supplier = () -> new MigrationHandler(td, properties, indexInternal, sourceDataSource, targetDataSource).call(); + Function exceptFunction = (e) -> { + log.error("Error migration for table: {}.{}, error message:", td.getSchemaName(), td.getTableName(), e); + numberOfFailures.incrementAndGet(); + throw new RuntimeException(e); + }; + Consumer finishConsumer = (r) -> totalBytesSize.addAndGet(r.longValue()); + CompletableFuture future = CompletableFuture.supplyAsync(supplier).exceptionally(exceptFunction).thenAccept(finishConsumer); + if (useExcludeTables) { if (!filters.contains(tableName)) { - futures.add(CompletableFuture.runAsync( - new MigrationHandler(td, properties, sourcePropertiesIndex, sourceDataSource, targetDataSource) - ).exceptionally( - (e) -> { - log.error("Error migration table: {}.{}, error message:", td.getSchemaName(), td.getTableName(), e); - numberOfFailures.incrementAndGet(); - throw new RuntimeException(e); - } - ) - ); + futures.add(future); } } else { - if (includes.size() == 1 && includes.get(0).contains("*")) { + if (includes.size() == 1 && (includes.get(0).contains("*") || includes.get(0).contains("?"))) { if (Pattern.matches(includes.get(0), tableName)) { - futures.add(CompletableFuture.runAsync( - new MigrationHandler(td, properties, sourcePropertiesIndex, sourceDataSource, targetDataSource) - ).exceptionally( - (e) -> { - log.error("Error migration table: {}.{}, error message:", td.getSchemaName(), td.getTableName(), e); - numberOfFailures.incrementAndGet(); - throw new RuntimeException(e); - } - ) - ); + futures.add(future); } } else if (includes.contains(tableName)) { - futures.add(CompletableFuture.runAsync( - new MigrationHandler(td, properties, sourcePropertiesIndex, sourceDataSource, targetDataSource) - ).exceptionally( - (e) -> { - log.error("Error migration table: {}.{}, error message:", td.getSchemaName(), td.getTableName(), e); - numberOfFailures.incrementAndGet(); - throw new RuntimeException(e); - } - ) - ); + futures.add(future); } } @@ -144,10 +139,10 @@ public class MainService { } - CompletableFuture allFuture=CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{})); - allFuture.get(); - log.info("#### Complete data migration for the [ {} ] data source ,total count={}, failure count={}", - sourcePropertiesIndex, futures.size(), numberOfFailures); + CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{})).join(); + log.info("#### Complete data migration for the [ {} ] data source:\ntotal count={}\nfailure count={}\ntotal bytes size={}", + sourcePropertiesIndex, futures.size(), numberOfFailures.get(), BytesUnitUtils.bytesSizeToHuman(totalBytesSize.get())); + perfStats.add(new PerfStat(sourcePropertiesIndex,futures.size(),numberOfFailures.get(),totalBytesSize.get())); DataSouceUtils.closeHikariDataSource(sourceDataSource); ++sourcePropertiesIndex; @@ -159,19 +154,11 @@ public class MainService { } finally { watch.stop(); log.info("total elipse = {} s", watch.getTotalTimeSeconds()); + System.out.println("==================================="); + System.out.println(String.format("total elipse time:\t %f s", watch.getTotalTimeSeconds())); + this.perfStats.stream().forEach(st -> System.out.println(st)); + System.out.println("==================================="); } } - /** - * 获取MetaDataService对象 - * - * @param dataSource - * @return IMetaDataService - */ - private IMetaDataService getMetaDataService(DataSource dataSource) { - IMetaDataService metaDataService = new MigrationMetaDataServiceImpl(); - metaDataService.setDatabaseConnection(JdbcTemplateUtils.getDatabaseProduceName(dataSource)); - return metaDataService; - } - } diff --git a/dbswitch-dbchange/pom.xml b/dbswitch-dbchange/pom.xml index 64568c03..62b30fc5 100644 --- a/dbswitch-dbchange/pom.xml +++ b/dbswitch-dbchange/pom.xml @@ -5,7 +5,7 @@ com.gitee dbswitch - 1.5.5 + 1.5.6 dbswitch-dbchange diff --git a/dbswitch-dbcommon/pom.xml b/dbswitch-dbcommon/pom.xml index bc1d70b3..b29892ad 100644 --- a/dbswitch-dbcommon/pom.xml +++ b/dbswitch-dbcommon/pom.xml @@ -3,7 +3,7 @@ com.gitee dbswitch - 1.5.5 + 1.5.6 dbswitch-dbcommon diff --git a/dbswitch-dbsynch/pom.xml b/dbswitch-dbsynch/pom.xml index ce738521..dba52918 100644 --- a/dbswitch-dbsynch/pom.xml +++ b/dbswitch-dbsynch/pom.xml @@ -3,7 +3,7 @@ com.gitee dbswitch - 1.5.5 + 1.5.6 dbswitch-dbsynch diff --git a/dbswitch-dbwriter/pom.xml b/dbswitch-dbwriter/pom.xml index 7a1e4d52..0e2ae6bc 100644 --- a/dbswitch-dbwriter/pom.xml +++ b/dbswitch-dbwriter/pom.xml @@ -5,7 +5,7 @@ com.gitee dbswitch - 1.5.5 + 1.5.6 dbswitch-dbwriter diff --git a/dbswitch-pgwriter/pom.xml b/dbswitch-pgwriter/pom.xml index fe585da9..313326bb 100644 --- a/dbswitch-pgwriter/pom.xml +++ b/dbswitch-pgwriter/pom.xml @@ -5,7 +5,7 @@ com.gitee dbswitch - 1.5.5 + 1.5.6 dbswitch-pgwriter diff --git a/dbswitch-sql/pom.xml b/dbswitch-sql/pom.xml index 4bd1d737..3ee91bbe 100644 --- a/dbswitch-sql/pom.xml +++ b/dbswitch-sql/pom.xml @@ -5,7 +5,7 @@ com.gitee dbswitch - 1.5.5 + 1.5.6 dbswitch-sql diff --git a/dbswitch-webapi/pom.xml b/dbswitch-webapi/pom.xml index d390f238..7526c537 100644 --- a/dbswitch-webapi/pom.xml +++ b/dbswitch-webapi/pom.xml @@ -5,7 +5,7 @@ com.gitee dbswitch - 1.5.5 + 1.5.6 dbswitch-webapi diff --git a/package-tool/pom.xml b/package-tool/pom.xml index 71084a78..9890a689 100644 --- a/package-tool/pom.xml +++ b/package-tool/pom.xml @@ -5,7 +5,7 @@ com.gitee dbswitch - 1.5.5 + 1.5.6 package-tool diff --git a/pom.xml b/pom.xml index 89c18b37..d48c1868 100644 --- a/pom.xml +++ b/pom.xml @@ -11,7 +11,7 @@ com.gitee dbswitch - 1.5.5 + 1.5.6 pom dbswitch database switch project diff --git a/version.cmd b/version.cmd index 2e015fdb..9b741ba7 100644 --- a/version.cmd +++ b/version.cmd @@ -1,6 +1,6 @@ @echo off -set APP_VERSION=1.5.5 +set APP_VERSION=1.5.6 echo "Clean Project ..." call mvn clean -f pom.xml