diff --git a/README.md b/README.md
index eefc3e38..107eee4b 100644
--- a/README.md
+++ b/README.md
@@ -33,29 +33,29 @@
```
└── dbswitch
- ├── dbswitch-common // dbswitch通用定义模块
- ├── dbswitch-core // dbswitch迁移同步实现类
- ├── dbswitch-product // dbswitch数据库方言
- ├── dbswitch-product-mysql // -> mysql方言实现类
- ├── dbswitch-product-oracle // -> oracle方言实现类
- ├── dbswitch-product-sqlserver// -> sqlserver方言实现类
- ├── dbswitch-product-postgres // -> postgres方言实现类
- ├── dbswitch-product-dm // -> dm方言实现类
- ├── dbswitch-product-kingbase // -> kingbase方言实现类
- ├── dbswitch-product-oscar // -> oscar方言实现类
- ├── dbswitch-product-gbase // -> gbase方言实现类
- ├── dbswitch-product-mariadb // -> mariadb方言实现类
- ├── dbswitch-product-openguass// -> openguass方言实现类
- ├── dbswitch-product-db2 // -> db2方言实现类
- ├── dbswitch-product-sybase // -> sybase方言实现类
- ├── dbswitch-product-hive // -> hive方言实现类
- ├── dbswitch-product-sqlite // -> sqlite方言实现类
- ├── dbswitch-product-clickhouse// -> clickhouse方言实现类
- ├── dbswitch-product-mongodb // -> mongodb方言实现类
- ├── dbswitch-data // 工具入口模块,读取配置文件中的参数执行异构迁移同步
- ├── dbswitch-admin // 在以上模块的基础上引入Quartz的调度服务与接口
- ├── dbswitch-admin-ui // 基于Vue2的前段WEB交互页面
- ├── dbswitch-dist // 基于maven-assembly-plugin插件的项目打包模块
+ ├── dbswitch-common // dbswitch通用定义模块
+ ├── dbswitch-core // dbswitch迁移同步实现类
+ ├── dbswitch-product // dbswitch数据库方言
+ ├── dbswitch-product-mysql // -> mysql方言实现类
+ ├── dbswitch-product-oracle // -> oracle方言实现类
+ ├── dbswitch-product-sqlserver // -> sqlserver方言实现类
+ ├── dbswitch-product-postgres // -> postgres方言实现类
+ ├── dbswitch-product-dm // -> dm方言实现类
+ ├── dbswitch-product-kingbase // -> kingbase方言实现类
+ ├── dbswitch-product-oscar // -> oscar方言实现类
+ ├── dbswitch-product-gbase // -> gbase方言实现类
+ ├── dbswitch-product-mariadb // -> mariadb方言实现类
+ ├── dbswitch-product-openguass // -> openguass方言实现类
+ ├── dbswitch-product-db2 // -> db2方言实现类
+ ├── dbswitch-product-sybase // -> sybase方言实现类
+ ├── dbswitch-product-hive // -> hive方言实现类
+ ├── dbswitch-product-sqlite // -> sqlite方言实现类
+ ├── dbswitch-product-clickhouse // -> clickhouse方言实现类
+ ├── dbswitch-product-mongodb // -> mongodb方言实现类
+ ├── dbswitch-data // 工具入口模块,读取配置文件中的参数执行异构迁移同步
+ ├── dbswitch-admin // 在以上模块的基础上引入Quartz的调度服务与接口
+ ├── dbswitch-admin-ui // 基于Vue2的前段WEB交互页面
+ ├── dbswitch-dist // 基于maven-assembly-plugin插件的项目打包模块
```
## 二、编译打包
@@ -132,28 +132,28 @@ dbswitch:
source:
# source database connection information
## support multiple source database connection
- - url: jdbc:oracle:thin:@172.17.2.10:1521:ORCL
- driver-class-name: 'oracle.jdbc.driver.OracleDriver'
- driver-path: D:/Workspace/dbswitch/driver/oracle/oracle-12c
- username: 'system'
- password: '123456'
- # source database configuration parameters
- ## fetch size for query source database
- fetch-size: 10000
- ## schema name for query source schemas, separate by ','
- source-schema: 'TANG'
- ## table type which include or exclude,option: TABLE,VIEW
- table-type: 'TABLE'
- ## table name include from table lists, separate by ','
- source-includes: ''
- ## table name exclude from table lists, separate by ','
- source-excludes: ''
- ## table name convert mapper by regular expression
- regex-table-mapper:
- - from-pattern: '^'
- to-value: 'T_'
- ## columns name convert mapper by regular expression like regex-table-mapper
- regex-column-mapper:
+ url: jdbc:oracle:thin:@172.17.2.10:1521:ORCL
+ driver-class-name: 'oracle.jdbc.driver.OracleDriver'
+ driver-path: D:/Workspace/dbswitch/driver/oracle/oracle-12c
+ username: 'system'
+ password: '123456'
+ # source database configuration parameters
+ ## fetch size for query source database
+ fetch-size: 10000
+ ## schema name for query source schemas, separate by ','
+ source-schema: 'TANG'
+ ## table type which include or exclude,option: TABLE,VIEW
+ table-type: 'TABLE'
+ ## table name include from table lists, separate by ','
+ source-includes: ''
+ ## table name exclude from table lists, separate by ','
+ source-excludes: ''
+ ## table name convert mapper by regular expression
+ regex-table-mapper:
+ - from-pattern: '^'
+ to-value: 'T_'
+ ## columns name convert mapper by regular expression like regex-table-mapper
+ regex-column-mapper:
target:
# target database connection information
@@ -183,21 +183,21 @@ dbswitch:
| 配置参数 | 配置说明 | 示例 | 备注 |
| :------| :------ | :------ | :------ |
-| dbswitch.source[i].url | 来源端JDBC连接的URL | jdbc:oracle:thin:@10.17.1.158:1521:ORCL | 可为:oracle/mysql/mariadb/sqlserver/postgresql/db2/dm/kingbase8/highgo |
-| dbswitch.source[i].driver-class-name | 来源端数据库的驱动类名称 | oracle.jdbc.driver.OracleDriver | 对应数据库的驱动类 |
-| dbswitch.source[i].driver-path | 来源端数据库的驱动JAR所在目录 | D:/Workspace/dbswitch/driver/oracle/oracle-12c | 对应数据库的驱动JAR所在目录 |
-| dbswitch.source[i].username | 来源端连接帐号名 | test | 无 |
-| dbswitch.source[i].password | 来源端连接帐号密码 | 123456 | 无 |
-| dbswitch.source[i].fetch-size | 来源端数据库查询时的fetch_size设置 | 10000 | 需要大于100有效 |
-| dbswitch.source[i].source-schema | 来源端的schema名称 | dbo,test | 多个之间用英文逗号分隔 |
-| dbswitch.source[i].table-type | 来源端表的类型 | TABLE | 可选值为:TABLE、VIEW ,分别代表物理表和试图表 |
-| dbswitch.source[i].source-includes | 来源端schema下的表中需要包含的表名称 | users1,orgs1 | 支持多个表(多个之间用英文逗号分隔);支持支持正则表达式(不能含有逗号) |
-| dbswitch.source[i].source-excludes | 来源端schema下的表中需要过滤的表名称 | users,orgs | 不包含的表名称,多个之间用英文逗号分隔 |
-| dbswitch.source[i].regex-table-mapper | 基于正则表达式的表名称映射关系 | [{"from-pattern": "^","to-value": "T_"}] | 为list类型,元素存在顺序关系 |
-| dbswitch.source[i].regex-column-mapper | 基于正则表达式的字段名映射关系 | [{"from-pattern": "$","to-value": "_x"}] | 为list类型,元素存在顺序关系 |
+| dbswitch.source.url | 来源端JDBC连接的URL | jdbc:oracle:thin:@10.17.1.158:1521:ORCL | 可为:oracle/mysql/mariadb/sqlserver/postgresql/db2/dm/kingbase8/highgo |
+| dbswitch.source.driver-class-name | 来源端数据库的驱动类名称 | oracle.jdbc.driver.OracleDriver | 对应数据库的驱动类 |
+| dbswitch.source.driver-path | 来源端数据库的驱动JAR所在目录 | D:/Workspace/dbswitch/driver/oracle/oracle-12c | 对应数据库的驱动JAR所在目录 |
+| dbswitch.source.username | 来源端连接帐号名 | test | 无 |
+| dbswitch.source.password | 来源端连接帐号密码 | 123456 | 无 |
+| dbswitch.source.fetch-size | 来源端数据库查询时的fetch_size设置 | 10000 | 需要大于100有效 |
+| dbswitch.source.source-schema | 来源端的schema名称 | dbo,test | 多个之间用英文逗号分隔 |
+| dbswitch.source.table-type | 来源端表的类型 | TABLE | 可选值为:TABLE、VIEW ,分别代表物理表和试图表 |
+| dbswitch.source.source-includes | 来源端schema下的表中需要包含的表名称 | users1,orgs1 | 支持多个表(多个之间用英文逗号分隔);支持支持正则表达式(不能含有逗号) |
+| dbswitch.source.source-excludes | 来源端schema下的表中需要过滤的表名称 | users,orgs | 不包含的表名称,多个之间用英文逗号分隔 |
+| dbswitch.source.regex-table-mapper | 基于正则表达式的表名称映射关系 | [{"from-pattern": "^","to-value": "T_"}] | 为list类型,元素存在顺序关系 |
+| dbswitch.source.regex-column-mapper | 基于正则表达式的字段名映射关系 | [{"from-pattern": "$","to-value": "_x"}] | 为list类型,元素存在顺序关系 |
| dbswitch.target.url | 目的端JDBC连接的URL | jdbc:postgresql://10.17.1.90:5432/study | 可为:oracle/sqlserver/postgresql/greenplum,mysql/mariadb/db2/dm/kingbase8/highgo也支持,但字段类型兼容性问题比较多 |
| dbswitch.target.driver-class-name |目的端数据库的驱动类名称 | org.postgresql.Driver | 对应数据库的驱动类 |
-| dbswitch.source[i].driver-path | 目的端数据库的驱动JAR所在目录 | D:/Workspace/dbswitch/driver/postgresql/postgresql-11.4 | 对应数据库的驱动JAR所在目录 |
+| dbswitch.target.driver-path | 目的端数据库的驱动JAR所在目录 | D:/Workspace/dbswitch/driver/postgresql/postgresql-11.4 | 对应数据库的驱动JAR所在目录 |
| dbswitch.target.username | 目的端连接帐号名 | test | 无 |
| dbswitch.target.password | 目的端连接帐号密码 | 123456 | 无 |
| dbswitch.target.target-schema | 目的端的schema名称 | public | 目的端的schema名称只能有且只有一个 |
@@ -208,22 +208,19 @@ dbswitch:
| dbswitch.target.writer-engine-insert | 是否使用insert写入数据 | false | 可选值为:true为insert写入、false为copy写入,只针对目的端数据库为PostgreSQL/Greenplum的有效 |
| dbswitch.target.change-data-sync | 是否启用增量变更同步,dbswitch.target.target-drop为false时且表有主键情况下有效,千万级以上数据量建议设为false | false | 可选值为:true、false |
-
**注意:**
-
-- (1)支持源端为多个数据源类型,如果```dbswitch.source[i]```为数组类型,i为编号,从0开始的整数;
-
-- (2)如果```dbswitch.source[i].source-includes```不为空,则按照包含表的方式来执行;
-- (3)如果```dbswitch.source[i].source-includes```为空,则按照```dbswitch.source[i].source-excludes```排除表的方式来执行。
+- (1)如果```dbswitch.source.source-includes```不为空,则按照包含表的方式来执行;
-- (4)如果```dbswitch.target.target-drop=false```,```dbswitch.target.change-data-synch=true```;时会对有主键表启用增量变更方式同步
+- (2)如果```dbswitch.source.source-includes```为空,则按照```dbswitch.source.source-excludes```排除表的方式来执行。
-- (5)对于```regex-table-mapper```和```regex-column-mappe```,为基于正则表达式替换的表名映射和字段名映射,均可以为空(代表原名映射,即源的表t_a映射到目的端也为t_a)
+- (3)如果```dbswitch.target.target-drop=false```,```dbswitch.target.change-data-synch=true```;时会对有主键表启用增量变更方式同步
+
+- (4)对于```regex-table-mapper```和```regex-column-mappe```,为基于正则表达式替换的表名映射和字段名映射,均可以为空(代表原名映射,即源的表t_a映射到目的端也为t_a)
> 提示:如果要将源端所有表名(或者字段名)添加前缀,可以配置```"from-pattern": "^","to-value": "T_"```;
-- (6)支持的数据库产品及其JDBC驱动连接示例如下:
+- (5)支持的数据库产品及其JDBC驱动连接示例如下:
**MySQL/MariaDB数据库**
@@ -275,7 +272,7 @@ jdbc连接地址:jdbc:db2://172.17.2.10:50000/testdb:driverType=4;fullyMateria
jdbc驱动名称:com.ibm.db2.jcc.DB2Driver
```
-**达梦DMDB数据库**
+**达梦DM数据库**
```
jdbc连接地址:jdbc:dm://172.17.2.10:5236
@@ -319,7 +316,7 @@ jdbc驱动名称:org.apache.hive.jdbc.HiveDriver
注意:当前只支持hive version 3.x的账号密码认证方式。
-**OpenGuass数据库**
+**OpenGauss数据库**
```
jdbc连接地址:dbc:opengauss://172.17.2.10:5866/test
@@ -344,9 +341,9 @@ jdbc驱动名称:org.sqlite.JDBC
> (a) 本地文件方式:jdbc:sqlite:/tmp/test.db , 该方式适用于dbswitch为实体机器部署的场景。
>
> (b) 远程文件方式: jdbc:sqlite::resource:http://172.17.2.12:8080/test.db ,该方式适用于容器方式部署的场景, 搭建文件服务器的方法可使
-> 用如下docker方式快速部署(/home/sqlites为服务器上存放sqlite数据库文件的目录):
+> 用如下docker方式快速部署(/home/files为服务器上存放sqlite数据库文件的目录):
>
-> ```docker run -d --name http_file_server -p 8080:8080 -v /home/sqlites:/data inrgihc/http_file_server:latest```
+> ```docker run -d --name http_file_server -p 8080:8080 -v /home/files:/data inrgihc/http_file_server:latest```
>
> 说明:远程服务器文件将会被下载到本地System.getProperty("java.io.tmpdir")所指定的目录下(linux为/tmp/,Windows为C:/temp/),并以
> sqlite-jdbc-tmp-{XXX}.db的方式进行文件命名,其中{XXX}为文件网络地址(例如上述为http://192.168.31.57:8080/test.db) 的字符串哈希值,
@@ -563,16 +560,21 @@ cd dbswitch && mvn clean install
#### (3)、代码集成开发
```
-// 构建任务执行的线程池
-AsyncTaskExecutor taskExecutor=new ThreadPoolTaskExecutor();
-taskExecutor.setXXXX();
+// 构建并行读取任务执行的线程池
+AsyncTaskExecutor taskReadExecutor=new ThreadPoolTaskExecutor();
+taskReadExecutor.setXXXX();
+
+// 构建并行写入任务执行的线程池
+AsyncTaskExecutor taskWriteExecutor=new ThreadPoolTaskExecutor();
+taskWriteExecutor.setXXXX();
// 构造dbswitch所需的配置参数,参数说明请参考第三章第1小节
-DbswichProperties properties = new DbswichProperties();
-properties.setXXXX();
+DbswichPropertiesConfiguration properties = new DbswichPropertiesConfiguration();
+properties.setSource(XXX);
+properties.setTarget(YYY);
-// 将参数传递给dbswitch启动同步方式执行
-MigrationService service = new MigrationService(properties, taskExecutor);
+// 将参数传递给dbswitch启动迁移同步方式执行
+MigrationService service = new MigrationService(properties, taskReadExecutor, taskWriteExecutor);
service.run();
```
diff --git a/build-docker/build_and_push_image.sh b/build-docker/build_and_push_image.sh
index 5a7a9430..a2dc3fd5 100644
--- a/build-docker/build_and_push_image.sh
+++ b/build-docker/build_and_push_image.sh
@@ -2,7 +2,7 @@
set -e
-DBSWITCH_VERSION=1.8.2
+DBSWITCH_VERSION=1.9.0
BUILD_DOCKER_DIR="$( cd "$( dirname "$0" )" && pwd )"
PROJECT_ROOT_DIR=$( dirname "$BUILD_DOCKER_DIR")
DOCKER_DBSWITCH_DIR=$BUILD_DOCKER_DIR/dbswitch
@@ -14,6 +14,7 @@ cd $PROJECT_ROOT_DIR && sh docker-maven-build.sh && cd -
cd $BUILD_DOCKER_DIR \
&& tar zxvf $PROJECT_ROOT_DIR/target/dbswitch-release-${DBSWITCH_VERSION}.tar.gz -C /tmp \
&& cp /tmp/dbswitch-release-${DBSWITCH_VERSION}/lib/* ${BUILD_DOCKER_DIR}/dbswitch/dbswitch-release/lib/ \
+ && cp /tmp/dbswitch-release-${DBSWITCH_VERSION}/ext/* ${BUILD_DOCKER_DIR}/dbswitch/dbswitch-release/ext/ \
&& cp -r /tmp/dbswitch-release-${DBSWITCH_VERSION}/drivers/* ${BUILD_DOCKER_DIR}/dbswitch/dbswitch-release/drivers/ \
&& rm -rf /tmp/dbswitch-release-*
diff --git a/build-docker/dbswitch/dbswitch-release/bin/startup.sh b/build-docker/dbswitch/dbswitch-release/bin/startup.sh
index 6dbbfea0..18fcdfdb 100644
--- a/build-docker/dbswitch/dbswitch-release/bin/startup.sh
+++ b/build-docker/dbswitch/dbswitch-release/bin/startup.sh
@@ -18,6 +18,7 @@ APP_HOME="$(cd "$(dirname ${APP_HOME})"; pwd)"
APP_BIN_PATH=$APP_HOME/bin
APP_LIB_PATH=$APP_HOME/lib
+APP_EXT_PATH=$APP_HOME/ext
APP_CONF_PATH=$APP_HOME/conf
export APP_DRIVERS_PATH=$APP_HOME/drivers
@@ -45,5 +46,6 @@ for i in $APP_LIB_PATH/*.jar
do
CLASSPATH="$i:$CLASSPATH"
done
+CLASSPATH="$CLASSPATH:$APP_EXT_PATH/*"
$JAVA -cp $CLASSPATH $JVMFLAGS $APP_MAIN $APP_CONF_PATH
diff --git a/build-docker/dbswitch/dbswitch-release/ext/.gitkeep b/build-docker/dbswitch/dbswitch-release/ext/.gitkeep
new file mode 100644
index 00000000..e69de29b
diff --git a/build-docker/install/docker-compose.yml b/build-docker/install/docker-compose.yml
index e2b9c0e0..478282a0 100644
--- a/build-docker/install/docker-compose.yml
+++ b/build-docker/install/docker-compose.yml
@@ -19,7 +19,7 @@ services:
start_period: 30s
dbswitch:
container_name: dbswitch_webui
- image: inrgihc/dbswitch:1.8.2
+ image: inrgihc/dbswitch:1.9.0
environment:
MYSQLDB_HOST: dbswitch_mysqldb
MYSQLDB_PORT: 3306
diff --git a/dbswitch-admin/pom.xml b/dbswitch-admin/pom.xml
index cd0d9331..a05d1ecc 100644
--- a/dbswitch-admin/pom.xml
+++ b/dbswitch-admin/pom.xml
@@ -5,7 +5,7 @@
com.gitee.dbswitch
dbswitch-parent
- 1.8.2
+ 1.9.0
dbswitch-admin
diff --git a/dbswitch-admin/src/main/java/com/gitee/dbswitch/admin/config/ExecutorConfig.java b/dbswitch-admin/src/main/java/com/gitee/dbswitch/admin/config/ExecutorConfig.java
index 346f8e5f..90b8098d 100644
--- a/dbswitch-admin/src/main/java/com/gitee/dbswitch/admin/config/ExecutorConfig.java
+++ b/dbswitch-admin/src/main/java/com/gitee/dbswitch/admin/config/ExecutorConfig.java
@@ -10,26 +10,49 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration("dbswitchExecutorConfig")
public class ExecutorConfig {
- public final static String TASK_EXECUTOR_BEAN_NAME = "migrationTaskExecutor";
+ public final static String TASK_READ_EXECUTOR_BEAN_NAME = "readerTaskExecutor";
+ public final static String TASK_WRITE_EXECUTOR_BEAN_NAME = "writerTaskExecutor";
/**
- * 创建一个异步任务执行ThreadPoolTaskExecutor
+ * 创建一个异步读取任务线程池
*
* @return ThreadPoolTaskExecutor
*/
- @Bean(TASK_EXECUTOR_BEAN_NAME)
- public AsyncTaskExecutor createTableMigrationTaskExecutor() {
+ @Bean(TASK_READ_EXECUTOR_BEAN_NAME)
+ public AsyncTaskExecutor createReaderTaskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
- taskExecutor.setCorePoolSize(DataSourceUtils.MAX_THREAD_COUNT);
- taskExecutor.setMaxPoolSize(DataSourceUtils.MAX_THREAD_COUNT);
+ taskExecutor.setCorePoolSize(DataSourceUtils.MAX_THREAD_COUNT / 2);
+ taskExecutor.setMaxPoolSize(DataSourceUtils.MAX_THREAD_COUNT / 2);
taskExecutor.setQueueCapacity(10000);
taskExecutor.setKeepAliveSeconds(1800);
taskExecutor.setDaemon(true);
taskExecutor.setThreadGroupName("dbswitch");
- taskExecutor.setThreadNamePrefix("dbswitch-migration-");
- taskExecutor.setBeanName(TASK_EXECUTOR_BEAN_NAME);
+ taskExecutor.setThreadNamePrefix("dbswitch-reader-");
+ taskExecutor.setBeanName(TASK_READ_EXECUTOR_BEAN_NAME);
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
taskExecutor.initialize();
return taskExecutor;
}
+
+ /**
+ * 创建一个异步写入任务线程池
+ *
+ * @return ThreadPoolTaskExecutor
+ */
+ @Bean(TASK_WRITE_EXECUTOR_BEAN_NAME)
+ public AsyncTaskExecutor createWriterTaskExecutor() {
+ ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
+ taskExecutor.setCorePoolSize(DataSourceUtils.MAX_THREAD_COUNT / 2);
+ taskExecutor.setMaxPoolSize(DataSourceUtils.MAX_THREAD_COUNT / 2);
+ taskExecutor.setQueueCapacity(10000);
+ taskExecutor.setKeepAliveSeconds(1800);
+ taskExecutor.setDaemon(true);
+ taskExecutor.setThreadGroupName("dbswitch");
+ taskExecutor.setThreadNamePrefix("dbswitch-writer-");
+ taskExecutor.setBeanName(TASK_WRITE_EXECUTOR_BEAN_NAME);
+ taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
+ taskExecutor.initialize();
+ return taskExecutor;
+ }
+
}
diff --git a/dbswitch-admin/src/main/java/com/gitee/dbswitch/admin/execution/ExecuteJobTaskRunnable.java b/dbswitch-admin/src/main/java/com/gitee/dbswitch/admin/execution/ExecuteJobTaskRunnable.java
index c4033c09..506859d7 100644
--- a/dbswitch-admin/src/main/java/com/gitee/dbswitch/admin/execution/ExecuteJobTaskRunnable.java
+++ b/dbswitch-admin/src/main/java/com/gitee/dbswitch/admin/execution/ExecuteJobTaskRunnable.java
@@ -21,12 +21,13 @@ import com.gitee.dbswitch.admin.entity.AssignmentTaskEntity;
import com.gitee.dbswitch.admin.logback.LogbackAppenderRegister;
import com.gitee.dbswitch.admin.type.JobStatusEnum;
import com.gitee.dbswitch.common.entity.MdcKeyValue;
-import com.gitee.dbswitch.data.config.DbswichProperties;
+import com.gitee.dbswitch.data.config.DbswichPropertiesConfiguration;
import com.gitee.dbswitch.data.service.MigrationService;
import com.gitee.dbswitch.data.util.JsonUtils;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.sql.Timestamp;
+import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
@@ -54,9 +55,12 @@ public class ExecuteJobTaskRunnable implements Runnable {
private AssignmentJobDAO assignmentJobDAO;
- private AsyncTaskExecutor migrationTaskExecutor;
+ private AsyncTaskExecutor readerTaskExecutor;
- @Getter private Long taskId;
+ private AsyncTaskExecutor writerTaskExecutor;
+
+ @Getter
+ private Long taskId;
private Integer schedule;
@@ -66,8 +70,10 @@ public class ExecuteJobTaskRunnable implements Runnable {
this.assignmentTaskDAO = SpringUtil.getBean(AssignmentTaskDAO.class);
this.assignmentConfigDAO = SpringUtil.getBean(AssignmentConfigDAO.class);
this.assignmentJobDAO = SpringUtil.getBean(AssignmentJobDAO.class);
- this.migrationTaskExecutor = SpringUtil.getBean(
- ExecutorConfig.TASK_EXECUTOR_BEAN_NAME, AsyncTaskExecutor.class);
+ this.readerTaskExecutor = SpringUtil.getBean(
+ ExecutorConfig.TASK_READ_EXECUTOR_BEAN_NAME, AsyncTaskExecutor.class);
+ this.writerTaskExecutor = SpringUtil.getBean(
+ ExecutorConfig.TASK_WRITE_EXECUTOR_BEAN_NAME, AsyncTaskExecutor.class);
this.taskId = taskId;
this.schedule = schedule;
this.keyName = keyName;
@@ -110,8 +116,8 @@ public class ExecuteJobTaskRunnable implements Runnable {
task.getContent());
try {
- DbswichProperties properties = JsonUtils.toBeanObject(
- task.getContent(), DbswichProperties.class);
+ DbswichPropertiesConfiguration properties = JsonUtils.toBeanObject(
+ task.getContent(), DbswichPropertiesConfiguration.class);
if (!assignmentConfigEntity.getFirstFlag()) {
if (!assignmentConfigEntity.getTargetOnlyCreate()) {
properties.getTarget().setTargetDrop(false);
@@ -123,7 +129,7 @@ public class ExecuteJobTaskRunnable implements Runnable {
properties.getTarget().setTargetDrop(true);
}
- migrationService = new MigrationService(properties, migrationTaskExecutor);
+ migrationService = new MigrationService(properties, readerTaskExecutor, writerTaskExecutor);
if (interrupted) {
log.info("Quartz task id:{} interrupted when prepare stage", taskId);
return;
@@ -147,11 +153,19 @@ public class ExecuteJobTaskRunnable implements Runnable {
} catch (Throwable e) {
assignmentJobEntity.setStatus(JobStatusEnum.FAIL.getValue());
assignmentJobEntity.setErrorLog(ExceptionUtil.stacktraceToString(e));
- log.info("Execute Assignment Failed [taskId={},jobId={}],Task Name: {}",
- task.getId(), assignmentJobEntity.getId(), task.getName(), e);
+ log.info("Execute Assignment Failed [taskId={},jobId={}],Task Name: {}, Message: {}",
+ task.getId(), assignmentJobEntity.getId(), task.getName(), e.getMessage());
} finally {
- assignmentJobEntity.setFinishTime(new Timestamp(System.currentTimeMillis()));
- assignmentJobDAO.updateSelective(assignmentJobEntity);
+ AssignmentJobEntity latestJobEntity = assignmentJobDAO.getById(assignmentJobEntity.getId());
+ if (Objects.nonNull(latestJobEntity)) {
+ // 注意,这里有可能用户手动取消任务后,直接删除了任务和这个作业,导致查询不到了
+ latestJobEntity.setFinishTime(new Timestamp(System.currentTimeMillis()));
+ latestJobEntity.setErrorLog(assignmentJobEntity.getErrorLog());
+ if (JobStatusEnum.CANCEL.getValue() != latestJobEntity.getStatus()) {
+ latestJobEntity.setStatus(assignmentJobEntity.getStatus().intValue());
+ }
+ assignmentJobDAO.updateSelective(latestJobEntity);
+ }
}
} finally {
lock.unlock();
diff --git a/dbswitch-admin/src/main/java/com/gitee/dbswitch/admin/listener/DbswitchAdminStartedEventListener.java b/dbswitch-admin/src/main/java/com/gitee/dbswitch/admin/listener/DbswitchAdminStartedEventListener.java
index 81b66892..9fad5def 100644
--- a/dbswitch-admin/src/main/java/com/gitee/dbswitch/admin/listener/DbswitchAdminStartedEventListener.java
+++ b/dbswitch-admin/src/main/java/com/gitee/dbswitch/admin/listener/DbswitchAdminStartedEventListener.java
@@ -12,7 +12,11 @@ public class DbswitchAdminStartedEventListener implements ApplicationListener
com.gitee.dbswitch
dbswitch-parent
- 1.8.2
+ 1.9.0
dbswitch-common
diff --git a/dbswitch-common/src/main/java/com/gitee/dbswitch/common/entity/JarClassLoader.java b/dbswitch-common/src/main/java/com/gitee/dbswitch/common/entity/JarClassLoader.java
index 57309776..2fb4aef8 100644
--- a/dbswitch-common/src/main/java/com/gitee/dbswitch/common/entity/JarClassLoader.java
+++ b/dbswitch-common/src/main/java/com/gitee/dbswitch/common/entity/JarClassLoader.java
@@ -19,6 +19,11 @@ import java.util.Collections;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
+/**
+ * 数据库驱动jar的ClassLoader
+ *
+ * @author tang
+ */
public class JarClassLoader extends URLClassLoader {
public JarClassLoader(String path, ClassLoader parent) {
@@ -42,7 +47,7 @@ public class JarClassLoader extends URLClassLoader {
for (String path : dirs) {
urls.addAll(doGetURLs(path));
}
- if(urls.isEmpty()){
+ if (urls.isEmpty()) {
throw new RuntimeException("No jar file found from path :" + paths + "!");
}
return urls.toArray(new URL[0]);
diff --git a/dbswitch-common/src/main/java/com/gitee/dbswitch/common/entity/LoggingSupplier.java b/dbswitch-common/src/main/java/com/gitee/dbswitch/common/entity/LoggingSupplier.java
index 48a659de..0ed77bd2 100644
--- a/dbswitch-common/src/main/java/com/gitee/dbswitch/common/entity/LoggingSupplier.java
+++ b/dbswitch-common/src/main/java/com/gitee/dbswitch/common/entity/LoggingSupplier.java
@@ -20,6 +20,10 @@ public class LoggingSupplier extends AbstractLogging implements Supplier {
this.command = command;
}
+ public Supplier getCommand() {
+ return command;
+ }
+
@Override
public T get() {
try {
diff --git a/dbswitch-common/src/main/java/com/gitee/dbswitch/common/entity/PatternMapper.java b/dbswitch-common/src/main/java/com/gitee/dbswitch/common/entity/PatternMapper.java
index d6bf82f5..877973ce 100644
--- a/dbswitch-common/src/main/java/com/gitee/dbswitch/common/entity/PatternMapper.java
+++ b/dbswitch-common/src/main/java/com/gitee/dbswitch/common/entity/PatternMapper.java
@@ -13,6 +13,8 @@ import java.util.Objects;
/**
* 基于正则表达式的批评替换实体定义
+ *
+ * @author tang
*/
public class PatternMapper {
diff --git a/dbswitch-common/src/main/java/com/gitee/dbswitch/common/entity/PrintablePerfStat.java b/dbswitch-common/src/main/java/com/gitee/dbswitch/common/entity/PrintablePerfStat.java
new file mode 100644
index 00000000..1ade61c7
--- /dev/null
+++ b/dbswitch-common/src/main/java/com/gitee/dbswitch/common/entity/PrintablePerfStat.java
@@ -0,0 +1,20 @@
+// Copyright tang. All rights reserved.
+// https://gitee.com/inrgihc/dbswitch
+//
+// Use of this source code is governed by a BSD-style license
+//
+// Author: tang (inrgihc@126.com)
+// Date : 2020/1/2
+// Location: beijing , china
+/////////////////////////////////////////////////////////////
+package com.gitee.dbswitch.common.entity;
+
+/**
+ * 可打印的统计信息接口
+ *
+ * @author tang
+ */
+public abstract class PrintablePerfStat {
+
+ public abstract String getPrintableString();
+}
diff --git a/dbswitch-common/src/main/java/com/gitee/dbswitch/common/event/TaskEventListener.java b/dbswitch-common/src/main/java/com/gitee/dbswitch/common/event/TaskEventListener.java
index b2049ba9..a39af637 100644
--- a/dbswitch-common/src/main/java/com/gitee/dbswitch/common/event/TaskEventListener.java
+++ b/dbswitch-common/src/main/java/com/gitee/dbswitch/common/event/TaskEventListener.java
@@ -10,12 +10,6 @@
package com.gitee.dbswitch.common.event;
public interface TaskEventListener extends java.util.EventListener {
-
- /**
- * The event callback
- *
- * @param event object
- * @return event result
- */
+
Object event(ListenedEvent event);
}
\ No newline at end of file
diff --git a/dbswitch-common/src/main/java/com/gitee/dbswitch/common/type/CaseConvertEnum.java b/dbswitch-common/src/main/java/com/gitee/dbswitch/common/type/CaseConvertEnum.java
index f910e0ba..5a014783 100644
--- a/dbswitch-common/src/main/java/com/gitee/dbswitch/common/type/CaseConvertEnum.java
+++ b/dbswitch-common/src/main/java/com/gitee/dbswitch/common/type/CaseConvertEnum.java
@@ -11,6 +11,8 @@ package com.gitee.dbswitch.common.type;
/**
* 处理大小写转换的枚举类
+ *
+ * @author tang
*/
public enum CaseConvertEnum {
NONE(s -> s),
diff --git a/dbswitch-common/src/main/java/com/gitee/dbswitch/common/type/TableIndexEnum.java b/dbswitch-common/src/main/java/com/gitee/dbswitch/common/type/TableIndexEnum.java
index 3c765a28..052ee1a2 100644
--- a/dbswitch-common/src/main/java/com/gitee/dbswitch/common/type/TableIndexEnum.java
+++ b/dbswitch-common/src/main/java/com/gitee/dbswitch/common/type/TableIndexEnum.java
@@ -9,6 +9,11 @@
/////////////////////////////////////////////////////////////
package com.gitee.dbswitch.common.type;
+/**
+ * 表的索引类型枚举定义
+ *
+ * @author tang
+ */
public enum TableIndexEnum {
NORMAL("普通索引"),
diff --git a/dbswitch-common/src/main/java/com/gitee/dbswitch/common/util/JdbcUrlUtils.java b/dbswitch-common/src/main/java/com/gitee/dbswitch/common/util/JdbcUrlUtils.java
index 02a54e3c..56edfab1 100644
--- a/dbswitch-common/src/main/java/com/gitee/dbswitch/common/util/JdbcUrlUtils.java
+++ b/dbswitch-common/src/main/java/com/gitee/dbswitch/common/util/JdbcUrlUtils.java
@@ -21,8 +21,6 @@ import lombok.experimental.UtilityClass;
* JDBC-URL参数提取工具类
*
* @author tang
- * @date 2021-11-20 22:54:21
- * @since 1.0
*/
@UtilityClass
public final class JdbcUrlUtils {
diff --git a/dbswitch-common/src/main/java/com/gitee/dbswitch/common/util/MachineInfoUtils.java b/dbswitch-common/src/main/java/com/gitee/dbswitch/common/util/MachineInfoUtils.java
new file mode 100644
index 00000000..dde9b2c5
--- /dev/null
+++ b/dbswitch-common/src/main/java/com/gitee/dbswitch/common/util/MachineInfoUtils.java
@@ -0,0 +1,76 @@
+package com.gitee.dbswitch.common.util;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryPoolMXBean;
+import java.lang.management.MemoryUsage;
+import java.lang.management.OperatingSystemMXBean;
+import java.text.DecimalFormat;
+import java.text.DecimalFormatSymbols;
+import java.text.NumberFormat;
+import java.util.List;
+import java.util.Locale;
+import lombok.experimental.UtilityClass;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * 机器统计信息
+ */
+@Slf4j
+@UtilityClass
+public class MachineInfoUtils {
+
+ private static NumberFormat fmtI = new DecimalFormat("###,###", new DecimalFormatSymbols(Locale.ENGLISH));
+
+ /**
+ * 获取操作系统信息
+ *
+ * @return String
+ */
+ public static String getOSInfo() {
+ OperatingSystemMXBean os = ManagementFactory.getOperatingSystemMXBean();
+ StringBuffer sb = new StringBuffer();
+ sb.append("Operation System Information :" + "\r\n");
+ sb.append("OS Name: " + os.getName() + "\r\n");
+ sb.append("OS Core Arch : " + os.getArch() + "\r\n");
+ sb.append("Available CPU Count: " + os.getAvailableProcessors() + "\r\n");
+ sb.append("System Avg Load: " + os.getSystemLoadAverage() + "\r\n");
+ sb.append("JAVA Version: " + System.getProperty("java.version") + "\r\n");
+ return sb.toString();
+ }
+
+ /**
+ * 日志打印JVM信息
+ *
+ * @return String
+ */
+ public static void printJVMInfo() {
+ List pools = ManagementFactory.getMemoryPoolMXBeans();
+ for (MemoryPoolMXBean pool : pools) {
+ final String kind = pool.getType().name();
+ final MemoryUsage usage = pool.getUsage();
+ log.info("model:" + getKindName(kind)
+ + ", name:" + pool.getName()
+ + ", init:" + bytesToMB(usage.getInit())
+ + ", used:" + bytesToMB(usage.getUsed())
+ + ", available:" + bytesToMB(usage.getCommitted())
+ + ", max:" + bytesToMB(usage.getMax()));
+ }
+ }
+
+ protected static String getKindName(String kind) {
+ if ("NON_HEAP".equals(kind)) {
+ return "NonHeap";
+ } else {
+ return "Heap";
+ }
+ }
+
+ protected static String bytesToMB(long bytes) {
+ return fmtI.format((bytes / 1024 / 1024)) + " MB";
+ }
+
+ public static void main(String[] args) {
+ log.info(getOSInfo());
+ printJVMInfo();
+ }
+}
diff --git a/dbswitch-core/pom.xml b/dbswitch-core/pom.xml
index 190fcf83..9e69f589 100644
--- a/dbswitch-core/pom.xml
+++ b/dbswitch-core/pom.xml
@@ -5,7 +5,7 @@
com.gitee.dbswitch
dbswitch-parent
- 1.8.2
+ 1.9.0
dbswitch-core
diff --git a/dbswitch-core/src/main/java/com/gitee/dbswitch/calculate/DefaultChangeCalculatorService.java b/dbswitch-core/src/main/java/com/gitee/dbswitch/calculate/DefaultChangeCalculatorService.java
index 8c7061a3..974c8cbb 100644
--- a/dbswitch-core/src/main/java/com/gitee/dbswitch/calculate/DefaultChangeCalculatorService.java
+++ b/dbswitch-core/src/main/java/com/gitee/dbswitch/calculate/DefaultChangeCalculatorService.java
@@ -16,6 +16,7 @@ import com.gitee.dbswitch.common.util.JdbcTypesUtils;
import com.gitee.dbswitch.common.util.ObjectCastUtils;
import com.gitee.dbswitch.provider.ProductProviderFactory;
import com.gitee.dbswitch.provider.query.TableDataQueryProvider;
+import com.gitee.dbswitch.provider.transform.RecordTransformProvider;
import com.gitee.dbswitch.service.DefaultMetadataService;
import com.gitee.dbswitch.service.MetadataService;
import java.sql.ResultSet;
@@ -282,18 +283,22 @@ public final class DefaultChangeCalculatorService implements RecordRowChangeCalc
log.debug("###### Enter CDC calculate now");
}
+ RecordTransformProvider transformer = task.getTransformer();
+
// 进入核心比较计算算法区域
RowChangeTypeEnum flagField = null;
Object[] outputRow;
Object[] one = getRowData(rsold.getResultSet());
- Object[] two = getRowData(rsnew.getResultSet());
+ Object[] two = transformer.doTransform(task.getNewSchemaName(), task.getNewTableName(),
+ queryFieldColumn, getRowData(rsnew.getResultSet()));
while (true) {
if (one == null && two == null) {
break;
} else if (one == null && two != null) {
flagField = RowChangeTypeEnum.VALUE_INSERT;
outputRow = two;
- two = getRowData(rsnew.getResultSet());
+ two = transformer.doTransform(task.getNewSchemaName(), task.getNewTableName(),
+ queryFieldColumn, getRowData(rsnew.getResultSet()));
} else if (one != null && two == null) {
flagField = RowChangeTypeEnum.VALUE_DELETED;
outputRow = one;
@@ -311,7 +316,8 @@ public final class DefaultChangeCalculatorService implements RecordRowChangeCalc
}
one = getRowData(rsold.getResultSet());
- two = getRowData(rsnew.getResultSet());
+ two = transformer.doTransform(task.getNewSchemaName(), task.getNewTableName(),
+ queryFieldColumn, getRowData(rsnew.getResultSet()));
} else {
if (compare < 0) {
flagField = RowChangeTypeEnum.VALUE_DELETED;
@@ -320,7 +326,8 @@ public final class DefaultChangeCalculatorService implements RecordRowChangeCalc
} else {
flagField = RowChangeTypeEnum.VALUE_INSERT;
outputRow = two;
- two = getRowData(rsnew.getResultSet());
+ two = transformer.doTransform(task.getNewSchemaName(), task.getNewTableName(), queryFieldColumn,
+ getRowData(rsnew.getResultSet()));
}
}
}
diff --git a/dbswitch-core/src/main/java/com/gitee/dbswitch/calculate/TaskParamEntity.java b/dbswitch-core/src/main/java/com/gitee/dbswitch/calculate/TaskParamEntity.java
index fbbf03ae..b0be2f24 100644
--- a/dbswitch-core/src/main/java/com/gitee/dbswitch/calculate/TaskParamEntity.java
+++ b/dbswitch-core/src/main/java/com/gitee/dbswitch/calculate/TaskParamEntity.java
@@ -9,6 +9,7 @@
/////////////////////////////////////////////////////////////
package com.gitee.dbswitch.calculate;
+import com.gitee.dbswitch.provider.transform.RecordTransformProvider;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -75,4 +76,10 @@ public class TaskParamEntity {
@NonNull
@Builder.Default
private Map columnsMap = Collections.emptyMap();
+
+ /**
+ * 值转换器
+ */
+ @NonNull
+ private RecordTransformProvider transformer;
}
diff --git a/dbswitch-core/src/main/java/com/gitee/dbswitch/core/exchange/AbstractBatchExchanger.java b/dbswitch-core/src/main/java/com/gitee/dbswitch/core/exchange/AbstractBatchExchanger.java
new file mode 100644
index 00000000..9773ae0e
--- /dev/null
+++ b/dbswitch-core/src/main/java/com/gitee/dbswitch/core/exchange/AbstractBatchExchanger.java
@@ -0,0 +1,66 @@
+// Copyright tang. All rights reserved.
+// https://gitee.com/inrgihc/dbswitch
+//
+// Use of this source code is governed by a BSD-style license
+//
+// Author: tang (inrgihc@126.com)
+// Date : 2020/1/2
+// Location: beijing , china
+/////////////////////////////////////////////////////////////
+package com.gitee.dbswitch.core.exchange;
+
+import com.gitee.dbswitch.common.util.ExamineUtils;
+import com.gitee.dbswitch.core.robot.RobotReader;
+import com.gitee.dbswitch.core.robot.RobotWriter;
+import org.springframework.core.task.AsyncTaskExecutor;
+
+public abstract class AbstractBatchExchanger {
+
+ private MemChannel memChannel;
+ private AsyncTaskExecutor readThreadExecutor;
+ private AsyncTaskExecutor writeThreadExecutor;
+
+ public AbstractBatchExchanger(AsyncTaskExecutor readExecutor, AsyncTaskExecutor writeExecutor) {
+ ExamineUtils.checkNotNull(readExecutor, "readExecutor");
+ ExamineUtils.checkNotNull(writeExecutor, "writeExecutor");
+ this.memChannel = MemChannel.createNewChannel();
+ this.readThreadExecutor = readExecutor;
+ this.writeThreadExecutor = writeExecutor;
+ }
+
+ public MemChannel getMemChannel() {
+ return memChannel;
+ }
+
+ public int getChannelWaitingNum() {
+ return memChannel.size();
+ }
+
+ public void exchange(RobotReader reader, RobotWriter writer) {
+ // 为reader和writer配置数据传输隧道
+ reader.setChannel(this.memChannel);
+ writer.setChannel(this.memChannel);
+
+ // 初始化reader和writer
+ reader.init(readThreadExecutor);
+ writer.init(writeThreadExecutor);
+
+ // 启动reader和writer的并行工作
+ writer.work();
+ reader.work();
+
+ // writer会等待reader执行完
+ writer.waitForFinish();
+
+ // 收集统计信息
+ Throwable throwable = collectPerfStats(reader, writer);
+ if (null != throwable) {
+ if (throwable instanceof RuntimeException) {
+ throw (RuntimeException) throwable;
+ }
+ throw new RuntimeException(throwable);
+ }
+ }
+
+ protected abstract Throwable collectPerfStats(RobotReader reader, RobotWriter writer);
+}
diff --git a/dbswitch-core/src/main/java/com/gitee/dbswitch/core/exchange/BatchElement.java b/dbswitch-core/src/main/java/com/gitee/dbswitch/core/exchange/BatchElement.java
new file mode 100644
index 00000000..ebf11004
--- /dev/null
+++ b/dbswitch-core/src/main/java/com/gitee/dbswitch/core/exchange/BatchElement.java
@@ -0,0 +1,32 @@
+// Copyright tang. All rights reserved.
+// https://gitee.com/inrgihc/dbswitch
+//
+// Use of this source code is governed by a BSD-style license
+//
+// Author: tang (inrgihc@126.com)
+// Date : 2020/1/2
+// Location: beijing , china
+/////////////////////////////////////////////////////////////
+package com.gitee.dbswitch.core.exchange;
+
+import java.util.List;
+import java.util.function.BiFunction;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class BatchElement {
+
+ private String tableNameMapString;
+
+ private BiFunction, List