diff --git a/dbswitch-data/src/main/java/com/gitee/dbswitch/data/config/DbswichProperties.java b/dbswitch-data/src/main/java/com/gitee/dbswitch/data/config/DbswichProperties.java index ce073ff8..e6fb23fe 100644 --- a/dbswitch-data/src/main/java/com/gitee/dbswitch/data/config/DbswichProperties.java +++ b/dbswitch-data/src/main/java/com/gitee/dbswitch/data/config/DbswichProperties.java @@ -11,58 +11,54 @@ package com.gitee.dbswitch.data.config; import java.util.ArrayList; import java.util.List; - -import lombok.ToString; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.PropertySource; import lombok.Data; -import lombok.NoArgsConstructor; /** - * Properties属性映射配置 - * - * @author tang + * 属性映射配置 * + * @author tang */ @Configuration @Data -@ToString -@ConfigurationProperties(prefix = "dbswitch", ignoreInvalidFields=false, ignoreUnknownFields = false) -@PropertySource("classpath:config.properties") +@ConfigurationProperties(prefix = "dbswitch", ignoreInvalidFields = false, ignoreUnknownFields = false) +@PropertySource( + value = {"classpath:config.properties", "classpath:config.yml"}, + ignoreResourceNotFound=true, + factory = DbswitchPropertySourceFactory.class) public class DbswichProperties { - private List source = new ArrayList<>(); + private List source = new ArrayList<>(); - private TargetDataSourceProperties target = new TargetDataSourceProperties(); + private TargetDataSourceProperties target = new TargetDataSourceProperties(); - @Data - @NoArgsConstructor - public static class SourceDataSourceProperties { - private String url; - private String driverClassName; - private String username; - private String password; + @Data + public static class SourceDataSourceProperties { + private String url; + private String driverClassName; + private String username; + private String password; - private Integer fetchSize=5000; - private String sourceSchema=""; - private String prefixTable=""; - private String sourceIncludes=""; - private String sourceExcludes=""; - } + private Integer fetchSize = 5000; + private String sourceSchema = ""; + private String prefixTable = ""; + private String sourceIncludes = ""; + private String sourceExcludes = ""; + } - @Data - @NoArgsConstructor - public static class TargetDataSourceProperties { - private String url; - private String driverClassName; - private String username; - private String password; + @Data + public static class TargetDataSourceProperties { + private String url; + private String driverClassName; + private String username; + private String password; - private String targetSchema=""; - private Boolean targetDrop=Boolean.TRUE; - private Boolean createTableAutoIncrement=Boolean.FALSE; - private Boolean writerEngineInsert=Boolean.FALSE; - private Boolean changeDataSynch=Boolean.FALSE; - } + private String targetSchema = ""; + private Boolean targetDrop = Boolean.TRUE; + private Boolean createTableAutoIncrement = Boolean.FALSE; + private Boolean writerEngineInsert = Boolean.FALSE; + private Boolean changeDataSynch = Boolean.FALSE; + } } diff --git a/dbswitch-data/src/main/java/com/gitee/dbswitch/data/config/DbswitchPropertySourceFactory.java b/dbswitch-data/src/main/java/com/gitee/dbswitch/data/config/DbswitchPropertySourceFactory.java new file mode 100644 index 00000000..96ea2a64 --- /dev/null +++ b/dbswitch-data/src/main/java/com/gitee/dbswitch/data/config/DbswitchPropertySourceFactory.java @@ -0,0 +1,45 @@ +// Copyright tang. All rights reserved. +// https://gitee.com/inrgihc/dbswitch +// +// Use of this source code is governed by a BSD-style license +// +// Author: tang (inrgihc@126.com) +// Date : 2020/1/2 +// Location: beijing , china +///////////////////////////////////////////////////////////// +package com.gitee.dbswitch.data.config; + +import org.springframework.beans.factory.config.YamlPropertiesFactoryBean; +import org.springframework.core.env.PropertiesPropertySource; +import org.springframework.core.env.PropertySource; +import org.springframework.core.io.support.DefaultPropertySourceFactory; +import org.springframework.core.io.support.EncodedResource; +import java.io.IOException; +import java.util.Properties; + +/** + * 同时支持.properties和.yaml两种配置类型 + * + * @author tang + */ +public class DbswitchPropertySourceFactory extends DefaultPropertySourceFactory { + + private static final String suffixYml = ".yml"; + private static final String suffixYaml = ".yaml"; + + @Override + public PropertySource createPropertySource(String name, EncodedResource resource) throws IOException { + String sourceName = name != null ? name : resource.getResource().getFilename(); + if (!resource.getResource().exists()) { + return new PropertiesPropertySource(sourceName, new Properties()); + } else if (sourceName.endsWith(suffixYml) || sourceName.endsWith(suffixYaml)) { + YamlPropertiesFactoryBean factory = new YamlPropertiesFactoryBean(); + factory.setResources(resource.getResource()); + factory.afterPropertiesSet(); + return new PropertiesPropertySource(sourceName, factory.getObject()); + } else { + return super.createPropertySource(name, resource); + } + } + +} diff --git a/dbswitch-data/src/main/java/com/gitee/dbswitch/data/config/PropertiesConfig.java b/dbswitch-data/src/main/java/com/gitee/dbswitch/data/config/PropertiesConfig.java deleted file mode 100644 index 10fce7a9..00000000 --- a/dbswitch-data/src/main/java/com/gitee/dbswitch/data/config/PropertiesConfig.java +++ /dev/null @@ -1,30 +0,0 @@ -// Copyright tang. All rights reserved. -// https://gitee.com/inrgihc/dbswitch -// -// Use of this source code is governed by a BSD-style license -// -// Author: tang (inrgihc@126.com) -// Date : 2020/1/2 -// Location: beijing , china -///////////////////////////////////////////////////////////// -package com.gitee.dbswitch.data.config; - -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import com.gitee.dbswitch.core.service.IMetaDataService; -import com.gitee.dbswitch.core.service.impl.MigrationMetaDataServiceImpl; - -/** - * 配置类 - * - * @author tang - * - */ -@Configuration -public class PropertiesConfig { - - @Bean - public IMetaDataService getMetaDataService() { - return new MigrationMetaDataServiceImpl(); - } -} diff --git a/dbswitch-data/src/main/java/com/gitee/dbswitch/data/handler/MigrationHandler.java b/dbswitch-data/src/main/java/com/gitee/dbswitch/data/handler/MigrationHandler.java index def73c77..0d744e11 100644 --- a/dbswitch-data/src/main/java/com/gitee/dbswitch/data/handler/MigrationHandler.java +++ b/dbswitch-data/src/main/java/com/gitee/dbswitch/data/handler/MigrationHandler.java @@ -51,7 +51,7 @@ import java.util.function.Supplier; @Slf4j public class MigrationHandler implements Supplier { - private final long MAX_CACHE_BYTES_SIZE = 512 * 1024 * 1024; + private final long MAX_CACHE_BYTES_SIZE = 64 * 1024 * 1024; private int fetchSize = 100; private TableDescription tableDescription; diff --git a/dbswitch-data/src/main/java/com/gitee/dbswitch/data/service/MainService.java b/dbswitch-data/src/main/java/com/gitee/dbswitch/data/service/MainService.java index 5a0c78e1..6e2f589c 100644 --- a/dbswitch-data/src/main/java/com/gitee/dbswitch/data/service/MainService.java +++ b/dbswitch-data/src/main/java/com/gitee/dbswitch/data/service/MainService.java @@ -14,16 +14,15 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; import java.util.regex.Pattern; -import com.gitee.dbswitch.core.service.impl.MigrationMetaDataServiceImpl; import com.gitee.dbswitch.data.domain.PerfStat; import com.gitee.dbswitch.data.handler.MigrationHandler; import com.gitee.dbswitch.data.util.BytesUnitUtils; import com.gitee.dbswitch.data.util.DataSouceUtils; import com.gitee.dbswitch.data.util.StrUtils; +import com.gitee.dbswitch.core.service.impl.MigrationMetaDataServiceImpl; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.util.StopWatch; @@ -44,11 +43,20 @@ import lombok.extern.slf4j.Slf4j; @Service public class MainService { + /** + * JSON序列化工具 + */ private ObjectMapper jackson = new ObjectMapper(); + /** + * 配置参数属性 + */ @Autowired private DbswichProperties properties; + /** + * 性能统计记录表 + */ private List perfStats; public MainService() { @@ -112,40 +120,28 @@ public class MainService { if (useExcludeTables) { if (!filters.contains(tableName)) { - Supplier supplier = () -> MigrationHandler.createInstance(td, properties, indexInternal, sourceDataSource, targetDataSource).get(); - Function exceptFunction = (e) -> { - log.error("Error migration for table: {}.{}, error message:", td.getSchemaName(), td.getTableName(), e); - numberOfFailures.incrementAndGet(); - throw new RuntimeException(e); - }; - Consumer finishConsumer = (r) -> totalBytesSize.addAndGet(r.longValue()); - CompletableFuture future = CompletableFuture.supplyAsync(supplier).exceptionally(exceptFunction).thenAccept(finishConsumer); + CompletableFuture future = CompletableFuture.supplyAsync( + getMigrateHandler(td, properties, indexInternal, sourceDataSource, targetDataSource)) + .exceptionally(getExceptHandler(td, numberOfFailures)) + .thenAccept((r) -> totalBytesSize.addAndGet(r.longValue())); futures.add(future); } } else { if (includes.size() == 1 && (includes.get(0).contains("*") || includes.get(0).contains("?"))) { if (Pattern.matches(includes.get(0), tableName)) { - Supplier supplier = () -> MigrationHandler.createInstance(td, properties, indexInternal, sourceDataSource, targetDataSource).get(); - Function exceptFunction = (e) -> { - log.error("Error migration for table: {}.{}, error message:", td.getSchemaName(), td.getTableName(), e); - numberOfFailures.incrementAndGet(); - throw new RuntimeException(e); - }; - Consumer finishConsumer = (r) -> totalBytesSize.addAndGet(r.longValue()); - CompletableFuture future = CompletableFuture.supplyAsync(supplier).exceptionally(exceptFunction).thenAccept(finishConsumer); + CompletableFuture future = CompletableFuture.supplyAsync( + getMigrateHandler(td, properties, indexInternal, sourceDataSource, targetDataSource)) + .exceptionally(getExceptHandler(td, numberOfFailures)) + .thenAccept((r) -> totalBytesSize.addAndGet(r.longValue())); futures.add(future); } } else if (includes.contains(tableName)) { - Supplier supplier = () -> MigrationHandler.createInstance(td, properties, indexInternal, sourceDataSource, targetDataSource).get(); - Function exceptFunction = (e) -> { - log.error("Error migration for table: {}.{}, error message:", td.getSchemaName(), td.getTableName(), e); - numberOfFailures.incrementAndGet(); - throw new RuntimeException(e); - }; - Consumer finishConsumer = (r) -> totalBytesSize.addAndGet(r.longValue()); - CompletableFuture future = CompletableFuture.supplyAsync(supplier).exceptionally(exceptFunction).thenAccept(finishConsumer); + CompletableFuture future = CompletableFuture.supplyAsync( + getMigrateHandler(td, properties, indexInternal, sourceDataSource, targetDataSource)) + .exceptionally(getExceptHandler(td, numberOfFailures)) + .thenAccept((r) -> totalBytesSize.addAndGet(r.longValue())); futures.add(future); } @@ -157,10 +153,10 @@ public class MainService { } - //CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{})).get(); - futures.forEach(i->i.join()); + CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{})).join(); log.info("#### Complete data migration for the [ {} ] data source:\ntotal count={}\nfailure count={}\ntotal bytes size={}", sourcePropertiesIndex, futures.size(), numberOfFailures.get(), BytesUnitUtils.bytesSizeToHuman(totalBytesSize.get())); + perfStats.add(new PerfStat(sourcePropertiesIndex, futures.size(), numberOfFailures.get(), totalBytesSize.get())); ++sourcePropertiesIndex; @@ -183,4 +179,34 @@ public class MainService { } } + /** + * 单表迁移处理方法 + * + * @param td 表描述上下文 + * @param properties 属性配置参数 + * @param indexInternal 源端索引号 + * @param sds 源端的DataSource数据源 + * @param tds 目的端的DataSource数据源 + * @return + */ + private Supplier getMigrateHandler(TableDescription td, DbswichProperties properties, Integer indexInternal, + HikariDataSource sds, HikariDataSource tds) { + return () -> MigrationHandler.createInstance(td, properties, indexInternal, sds, tds).get(); + } + + /** + * 异常处理函数方法 + * + * @param td 表描述上下文 + * @param numberOfFailures 失败记录数 + * @return + */ + private Function getExceptHandler(TableDescription td, AtomicInteger numberOfFailures) { + return (e) -> { + log.error("Error migration for table: {}.{}, error message:", td.getSchemaName(), td.getTableName(), e); + numberOfFailures.incrementAndGet(); + throw new RuntimeException(e); + }; + } + }