代码优化调整

This commit is contained in:
inrgihc
2021-06-09 22:42:55 +08:00
parent 9fd0230b9a
commit a997b9dfcc
5 changed files with 133 additions and 96 deletions

View File

@@ -11,58 +11,54 @@ package com.gitee.dbswitch.data.config;
import java.util.ArrayList;
import java.util.List;
import lombok.ToString;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* Properties属性映射配置
*
* @author tang
* 属性映射配置
*
* @author tang
*/
@Configuration
@Data
@ToString
@ConfigurationProperties(prefix = "dbswitch", ignoreInvalidFields=false, ignoreUnknownFields = false)
@PropertySource("classpath:config.properties")
@ConfigurationProperties(prefix = "dbswitch", ignoreInvalidFields = false, ignoreUnknownFields = false)
@PropertySource(
value = {"classpath:config.properties", "classpath:config.yml"},
ignoreResourceNotFound=true,
factory = DbswitchPropertySourceFactory.class)
public class DbswichProperties {
private List<SourceDataSourceProperties> source = new ArrayList<>();
private List<SourceDataSourceProperties> source = new ArrayList<>();
private TargetDataSourceProperties target = new TargetDataSourceProperties();
private TargetDataSourceProperties target = new TargetDataSourceProperties();
@Data
@NoArgsConstructor
public static class SourceDataSourceProperties {
private String url;
private String driverClassName;
private String username;
private String password;
@Data
public static class SourceDataSourceProperties {
private String url;
private String driverClassName;
private String username;
private String password;
private Integer fetchSize=5000;
private String sourceSchema="";
private String prefixTable="";
private String sourceIncludes="";
private String sourceExcludes="";
}
private Integer fetchSize = 5000;
private String sourceSchema = "";
private String prefixTable = "";
private String sourceIncludes = "";
private String sourceExcludes = "";
}
@Data
@NoArgsConstructor
public static class TargetDataSourceProperties {
private String url;
private String driverClassName;
private String username;
private String password;
@Data
public static class TargetDataSourceProperties {
private String url;
private String driverClassName;
private String username;
private String password;
private String targetSchema="";
private Boolean targetDrop=Boolean.TRUE;
private Boolean createTableAutoIncrement=Boolean.FALSE;
private Boolean writerEngineInsert=Boolean.FALSE;
private Boolean changeDataSynch=Boolean.FALSE;
}
private String targetSchema = "";
private Boolean targetDrop = Boolean.TRUE;
private Boolean createTableAutoIncrement = Boolean.FALSE;
private Boolean writerEngineInsert = Boolean.FALSE;
private Boolean changeDataSynch = Boolean.FALSE;
}
}

View File

@@ -0,0 +1,45 @@
// Copyright tang. All rights reserved.
// https://gitee.com/inrgihc/dbswitch
//
// Use of this source code is governed by a BSD-style license
//
// Author: tang (inrgihc@126.com)
// Date : 2020/1/2
// Location: beijing , china
/////////////////////////////////////////////////////////////
package com.gitee.dbswitch.data.config;
import org.springframework.beans.factory.config.YamlPropertiesFactoryBean;
import org.springframework.core.env.PropertiesPropertySource;
import org.springframework.core.env.PropertySource;
import org.springframework.core.io.support.DefaultPropertySourceFactory;
import org.springframework.core.io.support.EncodedResource;
import java.io.IOException;
import java.util.Properties;
/**
* 同时支持.properties和.yaml两种配置类型
*
* @author tang
*/
public class DbswitchPropertySourceFactory extends DefaultPropertySourceFactory {
private static final String suffixYml = ".yml";
private static final String suffixYaml = ".yaml";
@Override
public PropertySource<?> createPropertySource(String name, EncodedResource resource) throws IOException {
String sourceName = name != null ? name : resource.getResource().getFilename();
if (!resource.getResource().exists()) {
return new PropertiesPropertySource(sourceName, new Properties());
} else if (sourceName.endsWith(suffixYml) || sourceName.endsWith(suffixYaml)) {
YamlPropertiesFactoryBean factory = new YamlPropertiesFactoryBean();
factory.setResources(resource.getResource());
factory.afterPropertiesSet();
return new PropertiesPropertySource(sourceName, factory.getObject());
} else {
return super.createPropertySource(name, resource);
}
}
}

View File

@@ -1,30 +0,0 @@
// Copyright tang. All rights reserved.
// https://gitee.com/inrgihc/dbswitch
//
// Use of this source code is governed by a BSD-style license
//
// Author: tang (inrgihc@126.com)
// Date : 2020/1/2
// Location: beijing , china
/////////////////////////////////////////////////////////////
package com.gitee.dbswitch.data.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.gitee.dbswitch.core.service.IMetaDataService;
import com.gitee.dbswitch.core.service.impl.MigrationMetaDataServiceImpl;
/**
* 配置类
*
* @author tang
*
*/
@Configuration
public class PropertiesConfig {
@Bean
public IMetaDataService getMetaDataService() {
return new MigrationMetaDataServiceImpl();
}
}

View File

@@ -51,7 +51,7 @@ import java.util.function.Supplier;
@Slf4j
public class MigrationHandler implements Supplier<Long> {
private final long MAX_CACHE_BYTES_SIZE = 512 * 1024 * 1024;
private final long MAX_CACHE_BYTES_SIZE = 64 * 1024 * 1024;
private int fetchSize = 100;
private TableDescription tableDescription;

View File

@@ -14,16 +14,15 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import com.gitee.dbswitch.core.service.impl.MigrationMetaDataServiceImpl;
import com.gitee.dbswitch.data.domain.PerfStat;
import com.gitee.dbswitch.data.handler.MigrationHandler;
import com.gitee.dbswitch.data.util.BytesUnitUtils;
import com.gitee.dbswitch.data.util.DataSouceUtils;
import com.gitee.dbswitch.data.util.StrUtils;
import com.gitee.dbswitch.core.service.impl.MigrationMetaDataServiceImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.StopWatch;
@@ -44,11 +43,20 @@ import lombok.extern.slf4j.Slf4j;
@Service
public class MainService {
/**
* JSON序列化工具
*/
private ObjectMapper jackson = new ObjectMapper();
/**
* 配置参数属性
*/
@Autowired
private DbswichProperties properties;
/**
* 性能统计记录表
*/
private List<PerfStat> perfStats;
public MainService() {
@@ -112,40 +120,28 @@ public class MainService {
if (useExcludeTables) {
if (!filters.contains(tableName)) {
Supplier<Long> supplier = () -> MigrationHandler.createInstance(td, properties, indexInternal, sourceDataSource, targetDataSource).get();
Function<Throwable, Long> exceptFunction = (e) -> {
log.error("Error migration for table: {}.{}, error message:", td.getSchemaName(), td.getTableName(), e);
numberOfFailures.incrementAndGet();
throw new RuntimeException(e);
};
Consumer<Long> finishConsumer = (r) -> totalBytesSize.addAndGet(r.longValue());
CompletableFuture<Void> future = CompletableFuture.supplyAsync(supplier).exceptionally(exceptFunction).thenAccept(finishConsumer);
CompletableFuture<Void> future = CompletableFuture.supplyAsync(
getMigrateHandler(td, properties, indexInternal, sourceDataSource, targetDataSource))
.exceptionally(getExceptHandler(td, numberOfFailures))
.thenAccept((r) -> totalBytesSize.addAndGet(r.longValue()));
futures.add(future);
}
} else {
if (includes.size() == 1 && (includes.get(0).contains("*") || includes.get(0).contains("?"))) {
if (Pattern.matches(includes.get(0), tableName)) {
Supplier<Long> supplier = () -> MigrationHandler.createInstance(td, properties, indexInternal, sourceDataSource, targetDataSource).get();
Function<Throwable, Long> exceptFunction = (e) -> {
log.error("Error migration for table: {}.{}, error message:", td.getSchemaName(), td.getTableName(), e);
numberOfFailures.incrementAndGet();
throw new RuntimeException(e);
};
Consumer<Long> finishConsumer = (r) -> totalBytesSize.addAndGet(r.longValue());
CompletableFuture<Void> future = CompletableFuture.supplyAsync(supplier).exceptionally(exceptFunction).thenAccept(finishConsumer);
CompletableFuture<Void> future = CompletableFuture.supplyAsync(
getMigrateHandler(td, properties, indexInternal, sourceDataSource, targetDataSource))
.exceptionally(getExceptHandler(td, numberOfFailures))
.thenAccept((r) -> totalBytesSize.addAndGet(r.longValue()));
futures.add(future);
}
} else if (includes.contains(tableName)) {
Supplier<Long> supplier = () -> MigrationHandler.createInstance(td, properties, indexInternal, sourceDataSource, targetDataSource).get();
Function<Throwable, Long> exceptFunction = (e) -> {
log.error("Error migration for table: {}.{}, error message:", td.getSchemaName(), td.getTableName(), e);
numberOfFailures.incrementAndGet();
throw new RuntimeException(e);
};
Consumer<Long> finishConsumer = (r) -> totalBytesSize.addAndGet(r.longValue());
CompletableFuture<Void> future = CompletableFuture.supplyAsync(supplier).exceptionally(exceptFunction).thenAccept(finishConsumer);
CompletableFuture<Void> future = CompletableFuture.supplyAsync(
getMigrateHandler(td, properties, indexInternal, sourceDataSource, targetDataSource))
.exceptionally(getExceptHandler(td, numberOfFailures))
.thenAccept((r) -> totalBytesSize.addAndGet(r.longValue()));
futures.add(future);
}
@@ -157,10 +153,10 @@ public class MainService {
}
//CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{})).get();
futures.forEach(i->i.join());
CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{})).join();
log.info("#### Complete data migration for the [ {} ] data source:\ntotal count={}\nfailure count={}\ntotal bytes size={}",
sourcePropertiesIndex, futures.size(), numberOfFailures.get(), BytesUnitUtils.bytesSizeToHuman(totalBytesSize.get()));
perfStats.add(new PerfStat(sourcePropertiesIndex, futures.size(), numberOfFailures.get(), totalBytesSize.get()));
++sourcePropertiesIndex;
@@ -183,4 +179,34 @@ public class MainService {
}
}
/**
* 单表迁移处理方法
*
* @param td 表描述上下文
* @param properties 属性配置参数
* @param indexInternal 源端索引号
* @param sds 源端的DataSource数据源
* @param tds 目的端的DataSource数据源
* @return
*/
private Supplier<Long> getMigrateHandler(TableDescription td, DbswichProperties properties, Integer indexInternal,
HikariDataSource sds, HikariDataSource tds) {
return () -> MigrationHandler.createInstance(td, properties, indexInternal, sds, tds).get();
}
/**
* 异常处理函数方法
*
* @param td 表描述上下文
* @param numberOfFailures 失败记录数
* @return
*/
private Function<Throwable, Long> getExceptHandler(TableDescription td, AtomicInteger numberOfFailures) {
return (e) -> {
log.error("Error migration for table: {}.{}, error message:", td.getSchemaName(), td.getTableName(), e);
numberOfFailures.incrementAndGet();
throw new RuntimeException(e);
};
}
}