v1.6.15:配置任务执行的线程池

This commit is contained in:
inrgihc
2022-10-01 21:23:06 +08:00
parent e71cb3d3cc
commit 957a65a6a7
28 changed files with 179 additions and 84 deletions

View File

@@ -5,7 +5,7 @@
<parent>
<groupId>com.gitee.dbswitch</groupId>
<artifactId>dbswitch-parent</artifactId>
<version>1.6.15</version>
<version>1.6.16</version>
</parent>
<artifactId>dbswitch-data</artifactId>

View File

@@ -0,0 +1,36 @@
package com.gitee.dbswitch.data.config;
import com.gitee.dbswitch.data.util.DataSourceUtils;
import java.util.concurrent.ThreadPoolExecutor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration("dbswitchTaskExecutorConfig")
public class TaskExecutorConfig {
public final static String TASK_EXECUTOR_BEAN_NAME = "tableMigrationExecutor";
/**
* 创建一个异步任务执行ThreadPoolTaskExecutor
*
* @return ThreadPoolTaskExecutor
*/
@Bean(TASK_EXECUTOR_BEAN_NAME)
public AsyncTaskExecutor createTableMigrationTaskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(DataSourceUtils.MAX_THREAD_COUNT);
taskExecutor.setMaxPoolSize(DataSourceUtils.MAX_THREAD_COUNT);
taskExecutor.setQueueCapacity(10000);
taskExecutor.setKeepAliveSeconds(1800);
taskExecutor.setDaemon(true);
taskExecutor.setThreadGroupName("dbswitch");
taskExecutor.setThreadNamePrefix("dbswitch-migration-");
taskExecutor.setBeanName(TASK_EXECUTOR_BEAN_NAME);
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
taskExecutor.initialize();
return taskExecutor;
}
}

View File

@@ -20,7 +20,7 @@ public class TargetDataSourceProperties {
private String username;
private String password;
private Long connectionTimeout = TimeUnit.SECONDS.toMillis(60);
private Long maxLifeTime = TimeUnit.MINUTES.toMillis(60);
private Long maxLifeTime = TimeUnit.MINUTES.toMillis(30);
private String targetSchema = "";
private Boolean targetDrop = Boolean.TRUE;

View File

@@ -10,7 +10,6 @@
package com.gitee.dbswitch.data.service;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.gitee.dbswitch.common.type.DBTableType;
import com.gitee.dbswitch.common.util.DbswitchStrUtils;
import com.gitee.dbswitch.core.model.TableDescription;
import com.gitee.dbswitch.core.service.IMetaDataByDatasourceService;
@@ -24,6 +23,7 @@ import com.gitee.dbswitch.data.util.DataSourceUtils;
import com.zaxxer.hikari.HikariDataSource;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -31,9 +31,9 @@ import java.util.function.Function;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.util.StopWatch;
import org.springframework.util.StringUtils;
/**
* 数据迁移主逻辑类
@@ -59,13 +59,19 @@ public class MigrationService {
*/
private final DbswichProperties properties;
/**
* 任务执行线程池
*/
private final AsyncTaskExecutor taskExecutor;
/**
* 构造函数
*
* @param properties 配置信息
*/
public MigrationService(DbswichProperties properties) {
this.properties = properties;
public MigrationService(DbswichProperties properties, AsyncTaskExecutor tableMigrationExecutor) {
this.properties = Objects.requireNonNull(properties, "properties is null");
this.taskExecutor = Objects.requireNonNull(tableMigrationExecutor, "taskExecutor is null");
}
/**
@@ -78,21 +84,18 @@ public class MigrationService {
log.info("dbswitch data service is started....");
//log.info("Application properties configuration \n{}", properties);
try (HikariDataSource targetDataSource = DataSourceUtils
.createTargetDataSource(properties.getTarget())) {
try (HikariDataSource targetDataSource = DataSourceUtils.createTargetDataSource(properties.getTarget())) {
int sourcePropertiesIndex = 0;
int totalTableCount = 0;
List<SourceDataSourceProperties> sourcesProperties = properties.getSource();
for (SourceDataSourceProperties sourceProperties : sourcesProperties) {
try (HikariDataSource sourceDataSource = DataSourceUtils
.createSourceDataSource(sourceProperties)) {
try (HikariDataSource sourceDataSource = DataSourceUtils.createSourceDataSource(sourceProperties)) {
IMetaDataByDatasourceService
sourceMetaDataService = new MetaDataByDataSourceServiceImpl(sourceDataSource);
// 判断处理的策略:是排除还是包含
List<String> includes = DbswitchStrUtils
.stringToList(sourceProperties.getSourceIncludes());
List<String> includes = DbswitchStrUtils.stringToList(sourceProperties.getSourceIncludes());
log.info("Includes tables is :{}", jackson.writeValueAsString(includes));
List<String> filters = DbswitchStrUtils
.stringToList(sourceProperties.getSourceExcludes());
@@ -138,8 +141,7 @@ public class MigrationService {
numberOfFailures, totalBytesSize));
}
} else {
if (includes.size() == 1 && (includes.get(0).contains("*") || includes.get(0)
.contains("?"))) {
if (includes.size() == 1 && (includes.get(0).contains("*") || includes.get(0).contains("?"))) {
if (Pattern.matches(includes.get(0), tableName)) {
futures.add(
makeFutureTask(td, indexInternal, sourceDataSource, targetDataSource,
@@ -158,20 +160,15 @@ public class MigrationService {
}
try {
CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{})).get();
log.info(
"#### Complete data migration for the [ {} ] data source:\ntotal count={}\nfailure count={}\ntotal bytes size={}",
sourcePropertiesIndex, futures.size(), numberOfFailures.get(),
BytesUnitUtils.bytesSizeToHuman(totalBytesSize.get()));
perfStats.add(new PerfStat(sourcePropertiesIndex, futures.size(),
numberOfFailures.get(), totalBytesSize.get()));
++sourcePropertiesIndex;
totalTableCount += futures.size();
} catch (InterruptedException e) {
log.warn(" ### Thread is interrupted , exit execute task now ......");
throw e;
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{})).join();
log.info(
"#### Complete data migration for the [ {} ] data source:\ntotal count={}\nfailure count={}\ntotal bytes size={}",
sourcePropertiesIndex, futures.size(), numberOfFailures.get(),
BytesUnitUtils.bytesSizeToHuman(totalBytesSize.get()));
perfStats.add(new PerfStat(sourcePropertiesIndex, futures.size(),
numberOfFailures.get(), totalBytesSize.get()));
++sourcePropertiesIndex;
totalTableCount += futures.size();
}
}
log.info("service run all success, total migrate table count={} ", totalTableCount);
@@ -212,7 +209,8 @@ public class MigrationService {
HikariDataSource tds,
AtomicInteger numberOfFailures,
AtomicLong totalBytesSize) {
return CompletableFuture.supplyAsync(getMigrateHandler(td, indexInternal, sds, tds))
return CompletableFuture
.supplyAsync(getMigrateHandler(td, indexInternal, sds, tds), this.taskExecutor)
.exceptionally(getExceptHandler(td, numberOfFailures))
.thenAccept(totalBytesSize::addAndGet);
}
@@ -245,8 +243,8 @@ public class MigrationService {
TableDescription td,
AtomicInteger numberOfFailures) {
return (e) -> {
log.error("Error migration for table: {}.{}, error message:", td.getSchemaName(),
td.getTableName(), e);
log.error("Error migration for table: {}.{}, error message: {}",
td.getSchemaName(), td.getTableName(), e.getMessage());
numberOfFailures.incrementAndGet();
throw new RuntimeException(e);
};

View File

@@ -24,6 +24,9 @@ import org.springframework.jdbc.core.JdbcTemplate;
@Slf4j
public final class DataSourceUtils {
public static final int MAX_THREAD_COUNT = 10;
public static final int MAX_TIMEOUT_MS = 60000;
/**
* 创建于指定数据库连接描述符的连接池
*
@@ -46,11 +49,11 @@ public final class DataSourceUtils {
} else {
ds.setConnectionTestQuery("SELECT 1");
}
ds.setMaximumPoolSize(8);
ds.setMinimumIdle(5);
ds.setMaximumPoolSize(MAX_THREAD_COUNT);
ds.setMinimumIdle(MAX_THREAD_COUNT);
ds.setMaxLifetime(properties.getMaxLifeTime());
ds.setConnectionTimeout(properties.getConnectionTimeout());
ds.setIdleTimeout(60000);
ds.setIdleTimeout(MAX_TIMEOUT_MS);
return ds;
}
@@ -79,11 +82,11 @@ public final class DataSourceUtils {
} else {
ds.setConnectionTestQuery("SELECT 1");
}
ds.setMaximumPoolSize(8);
ds.setMinimumIdle(5);
ds.setMaximumPoolSize(MAX_THREAD_COUNT);
ds.setMinimumIdle(MAX_THREAD_COUNT);
ds.setMaxLifetime(properties.getMaxLifeTime());
ds.setConnectionTimeout(properties.getConnectionTimeout());
ds.setIdleTimeout(60000);
ds.setIdleTimeout(MAX_TIMEOUT_MS);
// 如果是Greenplum数据库这里需要关闭会话的查询优化器
if (properties.getDriverClassName().contains("postgresql")) {