!65 代码优化

* Merge branch from dev
This commit is contained in:
inrgihc
2021-11-23 14:18:44 +00:00
parent 39a16020ce
commit df66d0e1b4
10 changed files with 1031 additions and 674 deletions

View File

@@ -16,18 +16,20 @@ import lombok.Getter;
@AllArgsConstructor
public enum ResultCode {
SUCCESS(0,"操作成功"),
ERROR_INTERNAL_ERROR(1,"内部错误"),
ERROR_INVALID_ARGUMENT(2,"无效参数"),
ERROR_RESOURCE_NOT_EXISTS(3,"资源不存在"),
ERROR_RESOURCE_ALREADY_EXISTS(4,"资源已存在"),
ERROR_RESOURCE_NOT_DEPLOY(5,"资源未发布"),
ERROR_RESOURCE_HAS_DEPLOY(6,"资源已发布"),
ERROR_USER_NOT_EXISTS(7,"用户不存在"),
ERROR_USER_PASSWORD_WRONG(8,"密码错误"),
SUCCESS(0, "操作成功"),
ERROR_INTERNAL_ERROR(1, "内部错误"),
ERROR_INVALID_ARGUMENT(2, "无效参数"),
ERROR_RESOURCE_NOT_EXISTS(3, "资源不存在"),
ERROR_RESOURCE_ALREADY_EXISTS(4, "资源已存在"),
ERROR_RESOURCE_NOT_DEPLOY(5, "资源未发布"),
ERROR_RESOURCE_HAS_DEPLOY(6, "资源已发布"),
ERROR_USER_NOT_EXISTS(7, "用户不存在"),
ERROR_USER_PASSWORD_WRONG(8, "密码错误"),
ERROR_INVALID_JDBC_URL(9, "JDBC连接的URL格式不正确"),
ERROR_CANNOT_CONNECT_REMOTE(10, "远程地址不可达"),
ERROR_ACCESS_FORBIDDEN(403,"无效的登陆凭证"),
ERROR_TOKEN_EXPIRED(404,"登录凭证已失效"),
ERROR_ACCESS_FORBIDDEN(403, "无效的登陆凭证"),
ERROR_TOKEN_EXPIRED(404, "登录凭证已失效"),
;
private int code;

View File

@@ -23,6 +23,7 @@ import com.gitee.dbswitch.admin.model.response.DatabaseTypeDetailResponse;
import com.gitee.dbswitch.admin.model.response.DbConnectionDetailResponse;
import com.gitee.dbswitch.admin.model.response.DbConnectionNameResponse;
import com.gitee.dbswitch.admin.type.SupportDbTypeEnum;
import com.gitee.dbswitch.admin.util.JDBCURL;
import com.gitee.dbswitch.admin.util.PageUtil;
import com.gitee.dbswitch.common.constant.DatabaseTypeEnum;
import com.gitee.dbswitch.core.service.IMetaDataService;
@@ -31,8 +32,10 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.stream.Collectors;
import javax.annotation.Resource;
import org.apache.commons.lang.StringUtils;
import org.springframework.stereotype.Service;
@Service
@@ -48,7 +51,7 @@ public class DbConnectionService {
detail.setId(type.getId());
detail.setType(type.getName().toUpperCase());
detail.setDriver(type.getDriver());
detail.setTemplate(type.getTemplate());
detail.setTemplate(StringUtils.join(type.getUrl(), ","));
lists.add(detail);
}
@@ -80,11 +83,38 @@ public class DbConnectionService {
return Result.failed(ResultCode.ERROR_RESOURCE_NOT_EXISTS, "id=" + id);
}
DatabaseTypeEnum dbType = DatabaseTypeEnum.valueOf(dbConn.getType().getName().toUpperCase());
IMetaDataService metaDataService = new MigrationMetaDataServiceImpl();
metaDataService.setDatabaseConnection(dbType);
metaDataService.testQuerySQL(dbConn.getUrl(), dbConn.getUsername(), dbConn.getPassword(),
dbConn.getType().getSql());
SupportDbTypeEnum supportDbType = SupportDbTypeEnum
.valueOf(dbConn.getType().getName().toUpperCase());
for (String pattern : supportDbType.getUrl()) {
final Matcher matcher = JDBCURL.getPattern(pattern).matcher(dbConn.getUrl());
if (!matcher.matches()) {
if (1 == supportDbType.getUrl().length) {
return Result.failed(ResultCode.ERROR_INVALID_JDBC_URL, dbConn.getUrl());
} else {
continue;
}
}
String host = matcher.group("host");
String port = matcher.group("port");
if (null == port) {
port = String.valueOf(supportDbType.getPort());
}
if (!JDBCURL.reachable(host, port)) {
return Result.failed(ResultCode.ERROR_CANNOT_CONNECT_REMOTE, dbConn.getUrl());
}
DatabaseTypeEnum prd = DatabaseTypeEnum.valueOf(dbConn.getType().getName().toUpperCase());
IMetaDataService metaDataService = new MigrationMetaDataServiceImpl();
metaDataService.setDatabaseConnection(prd);
metaDataService.testQuerySQL(
dbConn.getUrl(),
dbConn.getUsername(),
dbConn.getPassword(),
dbConn.getType().getSql()
);
}
return Result.success();
}

View File

@@ -17,28 +17,31 @@ import org.springframework.util.StringUtils;
@AllArgsConstructor
public enum SupportDbTypeEnum {
MYSQL(1, "mysql", "com.mysql.jdbc.Driver", "/* ping */ SELECT 1",
"jdbc:mysql://{host}:{port>/{name}?useUnicode=true&characterEncoding=utf-8&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&nullCatalogMeansCurrent=true&tinyInt1isBit=false"),
MARIADB(2, "mariadb", "org.mariadb.jdbc.Driver", "SELECT 1",
"jdbc:mariadb://{host}:{port}/{name}?useUnicode=true&characterEncoding=utf-8&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&nullCatalogMeansCurrent=true&tinyInt1isBit=false"),
ORACLE(3, "oracle", "oracle.jdbc.driver.OracleDriver", "SELECT 'Hello' from DUAL",
"jdbc:oracle:thin:@{host}:{port}:{name}"),
SQLSERVER(4, "sqlserver", "com.microsoft.sqlserver.jdbc.SQLServerDriver", "SELECT 1+2 as a",
"jdbc:sqlserver://{host}:{port};DatabaseName={name}"),
POSTGRESQL(5, "postgresql", "org.postgresql.Driver", "SELECT 1",
"jdbc:postgresql://{host}:{port}/{name}"),
DB2(6, "db2", "com.ibm.db2.jcc.DB2Driver", "SELECT 1 FROM SYSIBM.SYSDUMMY1",
"jdbc:db2://{host}:{port}/{name}:driverType=4;fullyMaterializeLobData=true;fullyMaterializeInputStreams=true;progressiveStreaming=2;progresssiveLocators=2;"),
DM(7, "dm", "dm.jdbc.driver.DmDriver", "SELECT 'Hello' from DUAL", "jdbc:dm://{host}:{port}"),
KINGBASE(8, "kingbase", "com.kingbase8.Driver", "SELECT 1",
"jdbc:kingbase8://{host}:{port}/{name}"),
MYSQL(1, "mysql", "com.mysql.jdbc.Driver", 3306, "/* ping */ SELECT 1",
new String[]{"jdbc:mysql://{host}[:{port}]/[{database}][\\?{params}]"}),
MARIADB(2, "mariadb", "org.mariadb.jdbc.Driver", 3306, "SELECT 1",
new String[]{"jdbc:mariadb://{host}[:{port}]/[{database}][\\?{params}]"}),
ORACLE(3, "oracle", "oracle.jdbc.driver.OracleDriver", 1521, "SELECT 'Hello' from DUAL",
new String[]{"jdbc:oracle:thin:@{host}:{port}:{name}",
"jdbc:oracle:thin:@//{host}[:{port}]/{name}"}),
SQLSERVER(4, "sqlserver", "com.microsoft.sqlserver.jdbc.SQLServerDriver", 1433, "SELECT 1+2 as a",
new String[]{"jdbc:sqlserver://{host}[:{port}][;databaseName={database}][;{params}]"}),
POSTGRESQL(5, "postgresql", "org.postgresql.Driver", 5432, "SELECT 1",
new String[]{"jdbc:postgresql://{host}[:{port}]/[{database}][\\?{params}]"}),
DB2(6, "db2", "com.ibm.db2.jcc.DB2Driver", 50000, "SELECT 1 FROM SYSIBM.SYSDUMMY1",
new String[]{"jdbc:db2://{host}:{port}/{database}[:{params}]"}),
DM(7, "dm", "dm.jdbc.driver.DmDriver", 5236, "SELECT 'Hello' from DUAL",
new String[]{"jdbc:dm://{host}:{port}[/{database}][\\?{params}]"}),
KINGBASE(8, "kingbase", "com.kingbase8.Driver", 54321, "SELECT 1",
new String[]{"jdbc:kingbase8://{host}[:{port}]/[{database}][\\?{params}]"}),
;
private int id;
private String name;
private String driver;
private int port;
private String sql;
private String template;
private String[] url;
public static SupportDbTypeEnum of(String name) {
if (!StringUtils.isEmpty(name)) {

View File

@@ -0,0 +1,297 @@
// Copyright tang. All rights reserved.
// https://gitee.com/inrgihc/dbswitch
//
// Use of this source code is governed by a BSD-style license
//
// Author: tang (inrgihc@126.com)
// Date : 2020/1/2
// Location: beijing , china
/////////////////////////////////////////////////////////////
package com.gitee.dbswitch.admin.util;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* JDBC-URL参数提取工具类
*
* @author tang
* @date 2021-11-20 22:54:21
* @since 1.0
*/
public class JDBCURL {
public static final String PROP_HOST = "host"; //$NON-NLS-1$
public static final String PROP_PORT = "port"; //$NON-NLS-1$
public static final String PROP_DATABASE = "database"; //$NON-NLS-1$
public static final String PROP_SERVER = "server"; //$NON-NLS-1$
public static final String PROP_PARAMS = "params"; //$NON-NLS-1$
public static final String PROP_FOLDER = "folder"; //$NON-NLS-1$
public static final String PROP_FILE = "file"; //$NON-NLS-1$
public static final String PROP_USER = "user"; //$NON-NLS-1$
public static final String PROP_PASSWORD = "password"; //$NON-NLS-1$
private static String getPropertyRegex(String property) {
switch (property) {
case PROP_FOLDER:
case PROP_FILE:
case PROP_PARAMS:
return ".+?";
default:
return "[\\\\w\\\\-_.~]+";
}
}
private static String replaceAll(String input, String regex, Function<Matcher, String> replacer) {
final Matcher matcher = Pattern.compile(regex).matcher(input);
final StringBuffer sb = new StringBuffer();
while (matcher.find()) {
matcher.appendReplacement(sb, replacer.apply(matcher));
}
matcher.appendTail(sb);
return sb.toString();
}
public static Pattern getPattern(String sampleUrl) {
String pattern = sampleUrl;
pattern = replaceAll(pattern, "\\[(.*?)]", m -> "\\\\E(?:\\\\Q" + m.group(1) + "\\\\E)?\\\\Q");
pattern = replaceAll(pattern, "\\{(.*?)}",
m -> "\\\\E(\\?<\\\\Q" + m.group(1) + "\\\\E>" + getPropertyRegex(m.group(1)) + ")\\\\Q");
pattern = "^\\Q" + pattern + "\\E$";
return Pattern.compile(pattern);
}
/**
* 根据主机地址与端口号检查可达性
*
* @param host 主机地址
* @param port 端口号
* @return 成功返回true否则为false
*/
public static boolean reachable(String host, String port) {
try {
InetAddress address = InetAddress.getByName(host);
if (!address.isReachable(1500)) {
return false;
}
try (Socket socket = new Socket()) {
socket.connect(new InetSocketAddress(host, Integer.parseInt(port)), 1500);
}
} catch (IOException e) {
return false;
}
return true;
}
/**
* 测试代码
*
* @param args
*/
public static void main(String[] args) {
// 1、teradata数据库
// jdbc:teradata://localhost/DATABASE=test,DBS_PORT=1234,CLIENT_CHARSET=EUC_CN,TMODE=TERA,CHARSET=ASCII,LOB_SUPPORT=true
final Matcher matcher0 = JDBCURL
.getPattern("jdbc:teradata://{host}/DATABASE={database},DBS_PORT={port}[,{params}]")
.matcher(
"jdbc:teradata://localhost/DATABASE=test,DBS_PORT=1234,CLIENT_CHARSET=EUC_CN,TMODE=TERA,CHARSET=ASCII,LOB_SUPPORT=true");
if (matcher0.matches()) {
System.out.println("teradata host:" + matcher0.group("host"));
System.out.println("teradata port:" + matcher0.group("port"));
System.out.println("teradata database:" + matcher0.group("database"));
String params = matcher0.group("params");
if (null != params) {
String[] pairs = params.split(",");
for (String pair : pairs) {
System.out.println("teradata params:" + pair);
}
}
} else {
System.out.println("error for teradata!");
}
// 2、PostgreSQL数据库
// jdbc:postgresql://localhost:5432/dvdrental?currentSchema=test&ssl=true
// https://jdbc.postgresql.org/documentation/head/connect.html
final Matcher matcher1 = JDBCURL
.getPattern("jdbc:postgresql://{host}[:{port}]/[{database}][\\?{params}]")
.matcher("jdbc:postgresql://localhost:5432/dvdrental?currentSchema=test&ssl=true");
if (matcher1.matches()) {
System.out.println("postgresql host:" + matcher1.group("host"));
System.out.println("postgresql port:" + matcher1.group("port"));
System.out.println("postgresql database:" + matcher1.group("database"));
String params = matcher1.group("params");
if (null != params) {
String[] pairs = params.split("&");
for (String pair : pairs) {
System.out.println("postgresql params:" + pair);
}
}
} else {
System.out.println("error for postgresql!");
}
// 3、Oracle数据库
// oracle sid 方式
final Matcher matcher2 = JDBCURL.getPattern("jdbc:oracle:thin:@{host}[:{port}]:{sid}")
.matcher("jdbc:oracle:thin:@localhost:1521:orcl");
if (matcher2.matches()) {
System.out.println("oracle sid host:" + matcher2.group("host"));
System.out.println("oracle sid port:" + matcher2.group("port"));
System.out.println("oracle sid name:" + matcher2.group("sid"));
} else {
System.out.println("error for oracle sid!");
}
// oracle service name 方式
final Matcher matcher2_1 = JDBCURL.getPattern("jdbc:oracle:thin:@//{host}[:{port}]/{name}")
.matcher("jdbc:oracle:thin:@//localhost:1521/orcl.city.com");
if (matcher2_1.matches()) {
System.out.println("oracle ServiceName host:" + matcher2_1.group("host"));
System.out.println("oracle ServiceName port:" + matcher2_1.group("port"));
System.out.println("oracle ServiceName name:" + matcher2_1.group("name"));
} else {
System.out.println("error for oracle ServiceName!");
}
// oracle TNSName 方式不支持
// jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS_LIST=(ADDRESS=(PROTOCOL=TCP)(HOST=192.168.16.91)(PORT=1521)))(CONNECT_DATA=(SERVICE_NAME=orcl)))
// ..............................
// 4、MySQL数据库
// jdbc:mysql://172.17.2.10:3306/test?useUnicode=true&useSSL=false
final Matcher matcher3 = JDBCURL
.getPattern("jdbc:mysql://{host}[:{port}]/[{database}][\\?{params}]")
.matcher("jdbc:mysql://localhost:3306/test_demo?useUnicode=true&useSSL=false");
if (matcher3.matches()) {
System.out.println("mysql host:" + matcher3.group("host"));
System.out.println("mysql port:" + matcher3.group("port"));
System.out.println("mysql database:" + matcher3.group("database"));
String params = matcher3.group("params");
if (null != params) {
String[] pairs = params.split("&");
for (String pair : pairs) {
System.out.println("mysql params:" + pair);
}
}
} else {
System.out.println("error for mysql!");
}
// 5、MariaDB数据库
// 同Mysql的jdbc-url
final Matcher matcher4 = JDBCURL
.getPattern("jdbc:mariadb://{host}[:{port}]/[{database}][\\?{params}]")
.matcher("jdbc:mariadb://localhost:3306/test_demo");
if (matcher4.matches()) {
System.out.println("mariadb host:" + matcher4.group("host"));
System.out.println("mariadb port:" + matcher4.group("port"));
System.out.println("mariadb database:" + matcher4.group("database"));
String params = matcher4.group("params");
if (null != params) {
String[] pairs = params.split("&");
for (String pair : pairs) {
System.out.println("mysql params:" + pair);
}
}
} else {
System.out.println("error for mariadb!");
}
// 6、Microsoft SQLServer数据库
// jdbc:sqlserver://localhost:1433;databaseName=AdventureWorks;user=MyUserName;password=123456;
final Matcher matcher5 = JDBCURL
.getPattern("jdbc:sqlserver://{host}[:{port}][;databaseName={database}][;{params}]")
.matcher("jdbc:sqlserver://localhost:1433;databaseName=master;user=MyUserName");
if (matcher5.matches()) {
System.out.println("sqlserver host:" + matcher5.group("host"));
System.out.println("sqlserver port:" + matcher5.group("port"));
System.out.println("sqlserver database:" + matcher5.group("database"));
String params = matcher5.group("params");
if (null != params) {
String[] pairs = params.split(";");
for (String pair : pairs) {
System.out.println("sqlserver params:" + pair);
}
}
} else {
System.out.println("error for sqlserver!");
}
// 7、人大金仓数据库
// 同postgresql的jdbc-url
final Matcher matcher6 = JDBCURL
.getPattern("jdbc:kingbase8://{host}[:{port}]/[{database}][\\?{params}]")
.matcher("jdbc:kingbase8://localhost:54321/sample");
if (matcher6.matches()) {
System.out.println("kingbase8 host:" + matcher6.group("host"));
System.out.println("kingbase8 port:" + matcher6.group("port"));
System.out.println("kingbase8 database:" + matcher6.group("database"));
String params = matcher6.group("params");
if (null != params) {
String[] pairs = params.split("&");
for (String pair : pairs) {
System.out.println("mysql params:" + pair);
}
}
} else {
System.out.println("error for kingbase8!");
}
// 8、达梦数据库
// jdbc:dm://localhost:5236/user?param=hello
final Matcher matcher7 = JDBCURL.getPattern("jdbc:dm://{host}:{port}[/{database}][\\?{params}]")
.matcher("jdbc:dm://localhost:5236");
if (matcher7.matches()) {
System.out.println("dm host:" + matcher7.group("host"));
System.out.println("dm port:" + matcher7.group("port"));
System.out.println("dm database:" + matcher7.group("database"));
String params = matcher7.group("params");
if (null != params) {
String[] pairs = params.split("&");
for (String pair : pairs) {
System.out.println("dm params:" + pair);
}
}
} else {
System.out.println("error for dm!");
}
// 9、DB2数据库
// jdbc:db2://localhost:50000/testdb:driverType=4;fullyMaterializeLobData=true;fullyMaterializeInputStreams=true;progressiveStreaming=2;progresssiveLocators=2;
final Matcher matcher8 = JDBCURL.getPattern("jdbc:db2://{host}:{port}/{database}[:{params}]")
.matcher("jdbc:db2://localhost:50000/testdb:driverType=4;fullyMaterializeLobData=true");
if (matcher8.matches()) {
System.out.println("db2 host:" + matcher8.group("host"));
System.out.println("db2 port:" + matcher8.group("port"));
System.out.println("db2 database:" + matcher8.group("database"));
String params = matcher8.group("params");
if (null != params) {
String[] pairs = params.split(";");
for (String pair : pairs) {
System.out.println("mysql params:" + pair);
}
}
} else {
System.out.println("error for db2!");
}
// 9、SQLite数据库
// jdbc:sqlite:/tmp/phone.db
final Matcher matcher9 = JDBCURL.getPattern("jdbc:sqlite:{file}")
.matcher("jdbc:sqlite:D:\\Project\\Test\\phone.db");
if (matcher9.matches()) {
System.out.println("sqlite file:" + matcher9.group("file"));
} else {
System.out.println("error for sqlite!");
}
}
}

View File

@@ -9,474 +9,498 @@
/////////////////////////////////////////////////////////////
package com.gitee.dbswitch.dbchange;
import com.gitee.dbswitch.dbchange.pojo.TaskParamBean;
import com.gitee.dbswitch.dbchange.util.JdbcTypesUtils;
import com.gitee.dbswitch.dbcommon.constant.Constants;
import com.gitee.dbswitch.dbcommon.database.DatabaseOperatorFactory;
import com.gitee.dbswitch.dbcommon.database.IDatabaseOperator;
import com.gitee.dbswitch.dbcommon.pojo.StatementResultSet;
import com.gitee.dbswitch.dbcommon.util.JdbcMetaDataUtils;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.springframework.util.SerializationUtils;
import com.gitee.dbswitch.dbcommon.constant.Constants;
import com.gitee.dbswitch.dbcommon.database.DatabaseOperatorFactory;
import com.gitee.dbswitch.dbcommon.database.IDatabaseOperator;
import com.gitee.dbswitch.dbcommon.pojo.StatementResultSet;
import com.gitee.dbswitch.dbchange.pojo.TaskParamBean;
import com.gitee.dbswitch.dbchange.util.JdbcTypesUtils;
import com.gitee.dbswitch.dbcommon.util.JdbcMetaDataUtils;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.SerializationUtils;
/**
* 数据变化量计算核心类
*
* @author tang
*
* @author tang
*/
@Slf4j
public final class ChangeCaculatorService implements IDatabaseChangeCaculator {
/** 是否记录不变化的记录 */
private boolean recordIdentical;
/**
* 是否记录不变化的记录
*/
private boolean recordIdentical;
/** 是否进行jdbc数据type检查 */
private boolean checkJdbcType;
/**
* 是否进行jdbc数据type检查
*/
private boolean checkJdbcType;
/** 批量读取数据的行数大小 */
private int queryFetchSize;
/**
* 批量读取数据的行数大小
*/
private int queryFetchSize;
public ChangeCaculatorService() {
this(false, true);
}
public ChangeCaculatorService() {
this(false, true);
}
public ChangeCaculatorService(boolean recordIdentical, boolean checkJdbcType) {
this.recordIdentical = recordIdentical;
this.checkJdbcType = checkJdbcType;
this.queryFetchSize = Constants.DEFAULT_FETCH_SIZE;
}
public ChangeCaculatorService(boolean recordIdentical, boolean checkJdbcType) {
this.recordIdentical = recordIdentical;
this.checkJdbcType = checkJdbcType;
this.queryFetchSize = Constants.DEFAULT_FETCH_SIZE;
}
@Override
public boolean isRecordIdentical() {
return this.recordIdentical;
}
@Override
public boolean isRecordIdentical() {
return this.recordIdentical;
}
@Override
public void setRecordIdentical(boolean recordOrNot) {
this.recordIdentical = recordOrNot;
}
@Override
public void setRecordIdentical(boolean recordOrNot) {
this.recordIdentical = recordOrNot;
}
@Override
public boolean isCheckJdbcType() {
return this.checkJdbcType;
}
@Override
public boolean isCheckJdbcType() {
return this.checkJdbcType;
}
@Override
public void setCheckJdbcType(boolean checkOrNot) {
this.checkJdbcType = checkOrNot;
}
@Override
public void setCheckJdbcType(boolean checkOrNot) {
this.checkJdbcType = checkOrNot;
}
@Override
public int getFetchSize() {
return this.queryFetchSize;
}
@Override
public int getFetchSize() {
return this.queryFetchSize;
}
@Override
public void setFetchSize(int size) {
if (size < Constants.MINIMUM_FETCH_SIZE) {
throw new IllegalArgumentException("设置的批量处理行数的大小fetchSize不得小于" + Constants.MINIMUM_FETCH_SIZE);
}
@Override
public void setFetchSize(int size) {
if (size < Constants.MINIMUM_FETCH_SIZE) {
throw new IllegalArgumentException(
"设置的批量处理行数的大小fetchSize不得小于" + Constants.MINIMUM_FETCH_SIZE);
}
this.queryFetchSize = size;
}
this.queryFetchSize = size;
}
@Override
public void executeCalculate(@NonNull TaskParamBean task, @NonNull IDatabaseRowHandler handler) {
@Override
public void executeCalculate(@NonNull TaskParamBean task, @NonNull IDatabaseRowHandler handler) {
if (log.isDebugEnabled()) {
log.debug("###### Begin execute calculate table CDC data now");
}
if (log.isDebugEnabled()) {
log.debug("###### Begin execute calculate table CDC data now");
}
boolean useOwnFieldsColumns = (task.getFieldColumns() != null && !task.getFieldColumns().isEmpty());
boolean useOwnFieldsColumns = (task.getFieldColumns() != null && !task.getFieldColumns()
.isEmpty());
// 检查新旧两张表的主键字段与比较字段
JdbcMetaDataUtils oldMd = new JdbcMetaDataUtils(task.getOldDataSource());
JdbcMetaDataUtils newMd = new JdbcMetaDataUtils(task.getNewDataSource());
List<String> fieldsPrimaryKeyOld = oldMd.queryTablePrimaryKeys(task.getOldSchemaName(), task.getOldTableName());
List<String> fieldsAllColumnOld = oldMd.queryTableColumnName(task.getOldSchemaName(), task.getOldTableName());
List<String> fieldsPrimaryKeyNew = newMd.queryTablePrimaryKeys(task.getNewSchemaName(), task.getNewTableName());
List<String> fieldsAllColumnNew = newMd.queryTableColumnName(task.getNewSchemaName(), task.getNewTableName());
// 检查新旧两张表的主键字段与比较字段
JdbcMetaDataUtils oldMd = new JdbcMetaDataUtils(task.getOldDataSource());
JdbcMetaDataUtils newMd = new JdbcMetaDataUtils(task.getNewDataSource());
List<String> fieldsPrimaryKeyOld = oldMd
.queryTablePrimaryKeys(task.getOldSchemaName(), task.getOldTableName());
List<String> fieldsAllColumnOld = oldMd
.queryTableColumnName(task.getOldSchemaName(), task.getOldTableName());
List<String> fieldsPrimaryKeyNew = newMd
.queryTablePrimaryKeys(task.getNewSchemaName(), task.getNewTableName());
List<String> fieldsAllColumnNew = newMd
.queryTableColumnName(task.getNewSchemaName(), task.getNewTableName());
if (fieldsPrimaryKeyOld.isEmpty() || fieldsPrimaryKeyNew.isEmpty()) {
throw new RuntimeException("计算变化量的表中存在无主键的表");
}
if (fieldsPrimaryKeyOld.isEmpty() || fieldsPrimaryKeyNew.isEmpty()) {
throw new RuntimeException("计算变化量的表中存在无主键的表");
}
boolean same = (fieldsPrimaryKeyOld.containsAll(fieldsPrimaryKeyNew)
&& fieldsPrimaryKeyNew.containsAll(fieldsPrimaryKeyOld));
if (!same) {
throw new RuntimeException("两个表的主键不相同");
}
boolean same = (fieldsPrimaryKeyOld.containsAll(fieldsPrimaryKeyNew)
&& fieldsPrimaryKeyNew.containsAll(fieldsPrimaryKeyOld));
if (!same) {
throw new RuntimeException("两个表的主键不相同");
}
if (useOwnFieldsColumns) {
if (!fieldsAllColumnOld.containsAll(task.getFieldColumns())
|| !fieldsAllColumnNew.containsAll(task.getFieldColumns())) {
throw new RuntimeException("指定的字段列不完全在两个表中存在");
}
} else {
same = (fieldsAllColumnOld.containsAll(fieldsPrimaryKeyNew)
&& fieldsAllColumnNew.containsAll(fieldsAllColumnOld));
if (!same) {
throw new RuntimeException("两个表的字段不相同");
}
}
if (useOwnFieldsColumns) {
if (!fieldsAllColumnOld.containsAll(task.getFieldColumns())
|| !fieldsAllColumnNew.containsAll(task.getFieldColumns())) {
throw new RuntimeException("指定的字段列不完全在两个表中存在");
}
} else {
same = (fieldsAllColumnOld.containsAll(fieldsPrimaryKeyNew)
&& fieldsAllColumnNew.containsAll(fieldsAllColumnOld));
if (!same) {
throw new RuntimeException("两个表的字段不相同");
}
}
if (useOwnFieldsColumns) {
// 如果自己配置了字段列表
same = (task.getFieldColumns().containsAll(fieldsPrimaryKeyNew)
&& task.getFieldColumns().containsAll(fieldsPrimaryKeyOld));
if (!same) {
throw new RuntimeException("提供的比较字段中未包含主键");
}
if (useOwnFieldsColumns) {
// 如果自己配置了字段列表
same = (task.getFieldColumns().containsAll(fieldsPrimaryKeyNew)
&& task.getFieldColumns().containsAll(fieldsPrimaryKeyOld));
if (!same) {
throw new RuntimeException("提供的比较字段中未包含主键");
}
same = (fieldsAllColumnOld.containsAll(task.getFieldColumns())
&& fieldsAllColumnNew.containsAll(task.getFieldColumns()));
if (!same) {
throw new RuntimeException("提供的比较字段中存在表中不存在的字段");
}
}
same = (fieldsAllColumnOld.containsAll(task.getFieldColumns())
&& fieldsAllColumnNew.containsAll(task.getFieldColumns()));
if (!same) {
throw new RuntimeException("提供的比较字段中存在表中不存在的字段");
}
}
// 计算除主键外的比较字段
List<String> fieldsOfCompareValue = new ArrayList<>();
if (useOwnFieldsColumns) {
fieldsOfCompareValue.addAll(task.getFieldColumns());
} else {
fieldsOfCompareValue.addAll(fieldsAllColumnOld);
}
fieldsOfCompareValue.removeAll(fieldsPrimaryKeyOld);
// 计算除主键外的比较字段
List<String> fieldsOfCompareValue = new ArrayList<>();
if (useOwnFieldsColumns) {
fieldsOfCompareValue.addAll(task.getFieldColumns());
} else {
fieldsOfCompareValue.addAll(fieldsAllColumnOld);
}
fieldsOfCompareValue.removeAll(fieldsPrimaryKeyOld);
// 构造查询列字段
List<String> queryFieldColumn;
if (useOwnFieldsColumns) {
queryFieldColumn = task.getFieldColumns();
} else {
queryFieldColumn = fieldsAllColumnOld;
}
// 构造查询列字段
List<String> queryFieldColumn;
if (useOwnFieldsColumns) {
queryFieldColumn = task.getFieldColumns();
} else {
queryFieldColumn = fieldsAllColumnOld;
}
StatementResultSet rsold = null;
StatementResultSet rsnew = null;
StatementResultSet rsold = null;
StatementResultSet rsnew = null;
try {
// 提取新旧两表数据的结果集(按主键排序后的)
IDatabaseOperator oldQuery = DatabaseOperatorFactory.createDatabaseOperator(task.getOldDataSource());
oldQuery.setFetchSize(this.queryFetchSize);
IDatabaseOperator newQuery = DatabaseOperatorFactory.createDatabaseOperator(task.getNewDataSource());
newQuery.setFetchSize(this.queryFetchSize);
try {
// 提取新旧两表数据的结果集(按主键排序后的)
IDatabaseOperator oldQuery = DatabaseOperatorFactory
.createDatabaseOperator(task.getOldDataSource());
oldQuery.setFetchSize(this.queryFetchSize);
IDatabaseOperator newQuery = DatabaseOperatorFactory
.createDatabaseOperator(task.getNewDataSource());
newQuery.setFetchSize(this.queryFetchSize);
if (log.isDebugEnabled()) {
log.debug("###### Query data from two table now");
}
if (log.isDebugEnabled()) {
log.debug("###### Query data from two table now");
}
rsold = oldQuery.queryTableData(task.getOldSchemaName(), task.getOldTableName(), queryFieldColumn,
fieldsPrimaryKeyOld);
rsnew = newQuery.queryTableData(task.getNewSchemaName(), task.getNewTableName(), queryFieldColumn,
fieldsPrimaryKeyNew);
ResultSetMetaData metaData = rsold.getResultset().getMetaData();
rsold = oldQuery
.queryTableData(task.getOldSchemaName(), task.getOldTableName(), queryFieldColumn,
fieldsPrimaryKeyOld);
rsnew = newQuery
.queryTableData(task.getNewSchemaName(), task.getNewTableName(), queryFieldColumn,
fieldsPrimaryKeyNew);
ResultSetMetaData metaData = rsold.getResultset().getMetaData();
if (log.isDebugEnabled()) {
log.debug("###### Check data validate now");
}
if (log.isDebugEnabled()) {
log.debug("###### Check data validate now");
}
// 检查结果集源信息是否一直
int oldcnt = rsold.getResultset().getMetaData().getColumnCount();
int newcnt = rsnew.getResultset().getMetaData().getColumnCount();
if (oldcnt != newcnt) {
throw new RuntimeException(String.format("两个表的字段总个数不相等,即:%d!=%d", oldcnt, newcnt));
} else {
for (int k = 1; k < metaData.getColumnCount(); ++k) {
String key1 = rsold.getResultset().getMetaData().getColumnLabel(k);
if (null == key1) {
key1 = rsold.getResultset().getMetaData().getColumnName(k);
}
// 检查结果集源信息是否一直
int oldcnt = rsold.getResultset().getMetaData().getColumnCount();
int newcnt = rsnew.getResultset().getMetaData().getColumnCount();
if (oldcnt != newcnt) {
throw new RuntimeException(String.format("两个表的字段总个数不相等,即:%d!=%d", oldcnt, newcnt));
} else {
for (int k = 1; k < metaData.getColumnCount(); ++k) {
String key1 = rsold.getResultset().getMetaData().getColumnLabel(k);
if (null == key1) {
key1 = rsold.getResultset().getMetaData().getColumnName(k);
}
String key2 = rsnew.getResultset().getMetaData().getColumnLabel(k);
if (null == key2) {
key2 = rsnew.getResultset().getMetaData().getColumnName(k);
}
String key2 = rsnew.getResultset().getMetaData().getColumnLabel(k);
if (null == key2) {
key2 = rsnew.getResultset().getMetaData().getColumnName(k);
}
if (!key1.equals(key2)) {
throw new RuntimeException(String.format("字段名称 [Index=%d] 不同,因 %s!=%s !", k, key1, key2));
}
if (!key1.equals(key2)) {
throw new RuntimeException(
String.format("字段名称 [Index=%d] 不同,因 %s!=%s !", k, key1, key2));
}
if (checkJdbcType) {
int type1 = rsold.getResultset().getMetaData().getColumnType(k);
int type2 = rsnew.getResultset().getMetaData().getColumnType(k);
if (type1 != type2) {
throw new RuntimeException(String.format("字段 [name=%s] 的数据类型不同,因 %s!=%s !", key1,
JdbcTypesUtils.resolveTypeName(type1), JdbcTypesUtils.resolveTypeName(type2)));
}
}
if (checkJdbcType) {
int type1 = rsold.getResultset().getMetaData().getColumnType(k);
int type2 = rsnew.getResultset().getMetaData().getColumnType(k);
if (type1 != type2) {
throw new RuntimeException(String.format("字段 [name=%s] 的数据类型不同,因 %s!=%s !", key1,
JdbcTypesUtils.resolveTypeName(type1), JdbcTypesUtils.resolveTypeName(type2)));
}
}
}
}
}
}
// 计算主键字段序列在结果集中的索引号
int[] keyNumbers = new int[fieldsPrimaryKeyOld.size()];
for (int i = 0; i < keyNumbers.length; ++i) {
String fn = fieldsPrimaryKeyOld.get(i);
keyNumbers[i] = getIndexOfField(fn, metaData);
}
// 计算主键字段序列在结果集中的索引号
int[] keyNumbers = new int[fieldsPrimaryKeyOld.size()];
for (int i = 0; i < keyNumbers.length; ++i) {
String fn = fieldsPrimaryKeyOld.get(i);
keyNumbers[i] = getIndexOfField(fn, metaData);
}
// 计算比较(非主键)字段序列在结果集中的索引号
int[] valNumbers = new int[fieldsOfCompareValue.size()];
for (int i = 0; i < valNumbers.length; ++i) {
String fn = fieldsOfCompareValue.get(i);
valNumbers[i] = getIndexOfField(fn, metaData);
}
// 计算比较(非主键)字段序列在结果集中的索引号
int[] valNumbers = new int[fieldsOfCompareValue.size()];
for (int i = 0; i < valNumbers.length; ++i) {
String fn = fieldsOfCompareValue.get(i);
valNumbers[i] = getIndexOfField(fn, metaData);
}
// 初始化计算结果数据字段列信息
List<String> targetColumns = new ArrayList<>();
for (int k = 1; k <= metaData.getColumnCount(); ++k) {
String key = metaData.getColumnLabel(k);
if (null == key) {
key = metaData.getColumnName(k);
}
targetColumns.add(key);
}
// 初始化计算结果数据字段列信息
List<String> targetColumns = new ArrayList<>();
for (int k = 1; k <= metaData.getColumnCount(); ++k) {
String key = metaData.getColumnLabel(k);
if (null == key) {
key = metaData.getColumnName(k);
}
targetColumns.add(key);
}
if (log.isDebugEnabled()) {
log.debug("###### Enter CDC calculate now");
}
if (log.isDebugEnabled()) {
log.debug("###### Enter CDC calculate now");
}
// 进入核心比较计算算法区域
RecordChangeTypeEnum flagField = null;
Object[] outputRow;
Object[] one = getRowData(rsold.getResultset());
Object[] two = getRowData(rsnew.getResultset());
while (true) {
if (one == null && two == null) {
break;
} else if (one == null && two != null) {
flagField = RecordChangeTypeEnum.VALUE_INSERT;
outputRow = two;
two = getRowData(rsnew.getResultset());
} else if (one != null && two == null) {
flagField = RecordChangeTypeEnum.VALUE_DELETED;
outputRow = one;
one = getRowData(rsold.getResultset());
} else {
int compare = this.compare(one, two, keyNumbers, metaData);
if (0 == compare) {
int compareValues = this.compare(one, two, valNumbers, metaData);
if (compareValues == 0) {
flagField = RecordChangeTypeEnum.VALUE_IDENTICAL;
outputRow = one;
} else {
flagField = RecordChangeTypeEnum.VALUE_CHANGED;
outputRow = two;
}
// 进入核心比较计算算法区域
RecordChangeTypeEnum flagField = null;
Object[] outputRow;
Object[] one = getRowData(rsold.getResultset());
Object[] two = getRowData(rsnew.getResultset());
while (true) {
if (one == null && two == null) {
break;
} else if (one == null && two != null) {
flagField = RecordChangeTypeEnum.VALUE_INSERT;
outputRow = two;
two = getRowData(rsnew.getResultset());
} else if (one != null && two == null) {
flagField = RecordChangeTypeEnum.VALUE_DELETED;
outputRow = one;
one = getRowData(rsold.getResultset());
} else {
int compare = this.compare(one, two, keyNumbers, metaData);
if (0 == compare) {
int compareValues = this.compare(one, two, valNumbers, metaData);
if (compareValues == 0) {
flagField = RecordChangeTypeEnum.VALUE_IDENTICAL;
outputRow = one;
} else {
flagField = RecordChangeTypeEnum.VALUE_CHANGED;
outputRow = two;
}
one = getRowData(rsold.getResultset());
two = getRowData(rsnew.getResultset());
} else {
if (compare < 0) {
flagField = RecordChangeTypeEnum.VALUE_DELETED;
outputRow = one;
one = getRowData(rsold.getResultset());
} else {
flagField = RecordChangeTypeEnum.VALUE_INSERT;
outputRow = two;
two = getRowData(rsnew.getResultset());
}
}
}
one = getRowData(rsold.getResultset());
two = getRowData(rsnew.getResultset());
} else {
if (compare < 0) {
flagField = RecordChangeTypeEnum.VALUE_DELETED;
outputRow = one;
one = getRowData(rsold.getResultset());
} else {
flagField = RecordChangeTypeEnum.VALUE_INSERT;
outputRow = two;
two = getRowData(rsnew.getResultset());
}
}
}
if (!this.recordIdentical && RecordChangeTypeEnum.VALUE_IDENTICAL == flagField) {
continue;
}
if (!this.recordIdentical && RecordChangeTypeEnum.VALUE_IDENTICAL == flagField) {
continue;
}
// 这里对计算的单条记录结果进行处理
handler.handle(Collections.unmodifiableList(targetColumns), outputRow, flagField);
}
// 这里对计算的单条记录结果进行处理
handler.handle(Collections.unmodifiableList(targetColumns), outputRow, flagField);
}
if (log.isDebugEnabled()) {
log.debug("###### Calculate CDC Over now");
}
if (log.isDebugEnabled()) {
log.debug("###### Calculate CDC Over now");
}
// 结束返回前的回调
handler.destroy(Collections.unmodifiableList(targetColumns));
// 结束返回前的回调
handler.destroy(Collections.unmodifiableList(targetColumns));
} catch (SQLException e) {
throw new RuntimeException(e);
} finally {
if (null != rsold) {
rsold.close();
}
if (null != rsnew) {
rsnew.close();
}
}
} catch (SQLException e) {
throw new RuntimeException(e);
} finally {
if (null != rsold) {
rsold.close();
}
if (null != rsnew) {
rsnew.close();
}
}
}
}
/**
* 获取字段的索引号
*
* @param key 字段名
* @param metaData 结果集的元信息
* @return 字段的索引号
* @throws SQLException
*/
private int getIndexOfField(String key, ResultSetMetaData metaData) throws SQLException {
for (int k = 1; k <= metaData.getColumnCount(); ++k) {
String fieldName = metaData.getColumnLabel(k);
if (null == fieldName) {
fieldName = metaData.getColumnName(k);
}
/**
* 获取字段的索引号
*
* @param key 字段名
* @param metaData 结果集的元信息
* @return 字段的索引号
* @throws SQLException
*/
private int getIndexOfField(String key, ResultSetMetaData metaData) throws SQLException {
for (int k = 1; k <= metaData.getColumnCount(); ++k) {
String fieldName = metaData.getColumnLabel(k);
if (null == fieldName) {
fieldName = metaData.getColumnName(k);
}
if (fieldName.equals(key)) {
return k - 1;
}
}
if (fieldName.equals(key)) {
return k - 1;
}
}
return -1;
}
return -1;
}
/**
* 记录比较
*
* @param obj1 记录1
* @param obj2 记录2
* @param fieldnrs 待比较的字段索引号
* @param metaData 记录集的元信息
* @return 比较的结果0-11
* @throws SQLException
*/
private int compare(Object[] obj1, Object[] obj2, int[] fieldnrs, ResultSetMetaData metaData) throws SQLException {
if (obj1.length != obj2.length) {
throw new RuntimeException("Invalid compare object list !");
}
/**
* 记录比较
*
* @param obj1 记录1
* @param obj2 记录2
* @param fieldnrs 待比较的字段索引号
* @param metaData 记录集的元信息
* @return 比较的结果0-11
* @throws SQLException
*/
private int compare(Object[] obj1, Object[] obj2, int[] fieldnrs, ResultSetMetaData metaData)
throws SQLException {
if (obj1.length != obj2.length) {
throw new RuntimeException("Invalid compare object list !");
}
for (int fieldnr : fieldnrs) {
int jdbcType = metaData.getColumnType(fieldnr + 1);
Object o1 = obj1[fieldnr];
Object o2 = obj2[fieldnr];
for (int fieldnr : fieldnrs) {
int jdbcType = metaData.getColumnType(fieldnr + 1);
Object o1 = obj1[fieldnr];
Object o2 = obj2[fieldnr];
int cmp = typeCompare(jdbcType, o1, o2);
if (cmp != 0) {
return cmp;
}
}
int cmp = typeCompare(jdbcType, o1, o2);
if (cmp != 0) {
return cmp;
}
}
return 0;
}
return 0;
}
/**
* 字段值对象比较,将对象转换为字节数组来比较实现
*
* @param type 字段的JDBC数据类型
* @param o1 对象1
* @param o2 对象2
* @return 0为相等-1为小于1为大于
*/
private int typeCompare(int type, Object o1, Object o2) {
boolean n1 = (o1 == null);
boolean n2 = (o2 == null);
if (n1 && !n2) {
return -1;
}
if (!n1 && n2) {
return 1;
}
if (n1 && n2) {
return 0;
}
/**
* 字段值对象比较,将对象转换为字节数组来比较实现
*
* @param type 字段的JDBC数据类型
* @param o1 对象1
* @param o2 对象2
* @return 0为相等-1为小于1为大于
*/
private int typeCompare(int type, Object o1, Object o2) {
boolean n1 = (o1 == null);
boolean n2 = (o2 == null);
if (n1 && !n2) {
return -1;
}
if (!n1 && n2) {
return 1;
}
if (n1 && n2) {
return 0;
}
/**
* <p>
* 这里要比较的两个对象o1与o2可能类型不同但值相同例如 Integer o1=12Long o2=12;
* </p>
* <p>
* 但是这种不属于同一类的比较情况不应出现: String o1="12", Integer o2=12;
* </p>
*/
if (JdbcTypesUtils.isString(type)) {
String s1 = String.valueOf(o1);
String s2 = String.valueOf(o2);
return s1.compareTo(s2);
} else if (JdbcTypesUtils.isNumeric(type) && o1 instanceof java.lang.Number && o2 instanceof java.lang.Number) {
java.lang.Number s1 = (java.lang.Number) o1;
java.lang.Number s2 = (java.lang.Number) o2;
return Double.compare(s1.doubleValue(), s2.doubleValue());
} else if (JdbcTypesUtils.isInteger(type) && o1 instanceof java.lang.Number && o2 instanceof java.lang.Number) {
java.lang.Number s1 = (java.lang.Number) o1;
java.lang.Number s2 = (java.lang.Number) o2;
return Long.compare(s1.longValue(), s2.longValue());
} else if (JdbcTypesUtils.isDateTime(type)) {
if (o1 instanceof java.sql.Time && o2 instanceof java.sql.Time) {
java.sql.Time t1 = (java.sql.Time) o1;
java.sql.Time t2 = (java.sql.Time) o2;
return t1.compareTo(t2);
} else if (o1 instanceof java.sql.Timestamp && o2 instanceof java.sql.Timestamp) {
java.sql.Timestamp t1 = (java.sql.Timestamp) o1;
java.sql.Timestamp t2 = (java.sql.Timestamp) o2;
return t1.compareTo(t2);
} else if (o1 instanceof java.sql.Date && o2 instanceof java.sql.Date) {
java.sql.Date t1 = (java.sql.Date) o1;
java.sql.Date t2 = (java.sql.Date) o2;
return t1.compareTo(t2);
} else {
String s1 = String.valueOf(o1);
String s2 = String.valueOf(o2);
return s1.compareTo(s2);
}
} else {
return compareTo(SerializationUtils.serialize(o1), SerializationUtils.serialize(o2));
}
}
/**
* <p>
* 这里要比较的两个对象o1与o2可能类型不同但值相同例如 Integer o1=12Long o2=12;
* </p>
* <p>
* 但是这种不属于同一类的比较情况不应出现: String o1="12", Integer o2=12;
* </p>
*/
if (JdbcTypesUtils.isString(type)) {
String s1 = String.valueOf(o1);
String s2 = String.valueOf(o2);
return s1.compareTo(s2);
} else if (JdbcTypesUtils.isNumeric(type) && o1 instanceof java.lang.Number
&& o2 instanceof java.lang.Number) {
java.lang.Number s1 = (java.lang.Number) o1;
java.lang.Number s2 = (java.lang.Number) o2;
return Double.compare(s1.doubleValue(), s2.doubleValue());
} else if (JdbcTypesUtils.isInteger(type) && o1 instanceof java.lang.Number
&& o2 instanceof java.lang.Number) {
java.lang.Number s1 = (java.lang.Number) o1;
java.lang.Number s2 = (java.lang.Number) o2;
return Long.compare(s1.longValue(), s2.longValue());
} else if (JdbcTypesUtils.isDateTime(type)) {
if (o1 instanceof java.sql.Time && o2 instanceof java.sql.Time) {
java.sql.Time t1 = (java.sql.Time) o1;
java.sql.Time t2 = (java.sql.Time) o2;
return t1.compareTo(t2);
} else if (o1 instanceof java.sql.Timestamp && o2 instanceof java.sql.Timestamp) {
java.sql.Timestamp t1 = (java.sql.Timestamp) o1;
java.sql.Timestamp t2 = (java.sql.Timestamp) o2;
return t1.compareTo(t2);
} else if (o1 instanceof java.sql.Date && o2 instanceof java.sql.Date) {
java.sql.Date t1 = (java.sql.Date) o1;
java.sql.Date t2 = (java.sql.Date) o2;
return t1.compareTo(t2);
} else {
String s1 = String.valueOf(o1);
String s2 = String.valueOf(o2);
return s1.compareTo(s2);
}
} else {
try {
return compareTo(SerializationUtils.serialize(o1), SerializationUtils.serialize(o2));
} catch (Exception e) {
log.warn("CDC compare field value failed, return 0 instead,{}", e.getMessage());
return 0;
}
}
}
/**
* 字节数组的比较
*
* @param s1 字节数组1
* @param s2 字节数组2
* @return 0为相等-1为小于1为大于
*/
public int compareTo(byte[] s1, byte[] s2) {
int len1 = s1.length;
int len2 = s2.length;
int lim = Math.min(len1, len2);
byte[] v1 = s1;
byte[] v2 = s2;
/**
* 字节数组的比较
*
* @param s1 字节数组1
* @param s2 字节数组2
* @return 0为相等-1为小于1为大于
*/
public int compareTo(byte[] s1, byte[] s2) {
int len1 = s1.length;
int len2 = s2.length;
int lim = Math.min(len1, len2);
byte[] v1 = s1;
byte[] v2 = s2;
int k = 0;
while (k < lim) {
byte c1 = v1[k];
byte c2 = v2[k];
if (c1 != c2) {
return c1 - c2;
}
k++;
}
return len1 - len2;
}
int k = 0;
while (k < lim) {
byte c1 = v1[k];
byte c2 = v2[k];
if (c1 != c2) {
return c1 - c2;
}
k++;
}
return len1 - len2;
}
/**
* 从结果集中取出一条记录
*
* @param rs 记录集
* @return 一条记录到记录结尾时返回null
* @throws SQLException
*/
private Object[] getRowData(ResultSet rs) throws SQLException {
ResultSetMetaData metaData = rs.getMetaData();
Object[] rowData = null;
/**
* 从结果集中取出一条记录
*
* @param rs 记录集
* @return 一条记录到记录结尾时返回null
* @throws SQLException
*/
private Object[] getRowData(ResultSet rs) throws SQLException {
ResultSetMetaData metaData = rs.getMetaData();
Object[] rowData = null;
if (rs.next()) {
rowData = new Object[metaData.getColumnCount()];
for (int j = 1; j <= metaData.getColumnCount(); ++j) {
rowData[j - 1] = rs.getObject(j);
}
}
if (rs.next()) {
rowData = new Object[metaData.getColumnCount()];
for (int j = 1; j <= metaData.getColumnCount(); ++j) {
rowData[j - 1] = rs.getObject(j);
}
}
return rowData;
}
return rowData;
}
}

View File

@@ -13,59 +13,58 @@ import com.gitee.dbswitch.dbchange.pojo.TaskParamBean;
/**
* 变化量计算器接口定义
*
* @author tang
*
* @author tang
*/
public interface IDatabaseChangeCaculator {
/**
* 是否记录无变化的数据
*
* @return 是否记录无变化的数据
*/
public boolean isRecordIdentical();
/**
* 是否记录无变化的数据
*
* @return 是否记录无变化的数据
*/
boolean isRecordIdentical();
/**
* 设置是否记录无变化的数据
*
* @param recordOrNot 是否记录无变化的数据
*/
public void setRecordIdentical(boolean recordOrNot);
/**
* 是否进行Jdbc的数据类型检查
*
* @return 是否进行检查
*/
public boolean isCheckJdbcType();
/**
* 设置是否记录无变化的数据
*
* @param recordOrNot 是否记录无变化的数据
*/
void setRecordIdentical(boolean recordOrNot);
/**
* 设置是否进行Jdbc的数据类型检查
*
* @param checkOrNot 是否进行检查
*/
public void setCheckJdbcType(boolean checkOrNot);
/**
* 是否进行Jdbc的数据类型检查
*
* @return 是否进行检查
*/
boolean isCheckJdbcType();
/**
* 获取JDBC驱动批量读取数据的行数大小
*
* @return 批量行数大小
*/
public int getFetchSize();
/**
* 设置是否进行Jdbc的数据类型检查
*
* @param checkOrNot 是否进行检查
*/
void setCheckJdbcType(boolean checkOrNot);
/**
* 设置JDBC驱动批量读取数据的行数大小
*
* @param size 批量行数大小
*/
public void setFetchSize(int size);
/**
* 获取JDBC驱动批量读取数据的行数大小
*
* @return 批量行数大小
*/
int getFetchSize();
/**
* 执行变化量计算任务
*
* @param task 任务描述实体对象
* @param handler 计算结果回调处理器
*/
public void executeCalculate(TaskParamBean task, IDatabaseRowHandler handler);
/**
* 设置JDBC驱动批量读取数据的行数大小
*
* @param size 批量行数大小
*/
void setFetchSize(int size);
/**
* 执行变化量计算任务
*
* @param task 任务描述实体对象
* @param handler 计算结果回调处理器
*/
void executeCalculate(TaskParamBean task, IDatabaseRowHandler handler);
}

View File

@@ -13,25 +13,24 @@ import java.util.List;
/**
* 计算结果行记录处理器
*
* @author tang
*
* @author tang
*/
public interface IDatabaseRowHandler {
/**
* 行数据处理
*
* @param fields 字段名称列表,该列表只读
* @param record 一条数据记实录
* @param flag 数据变化状态
*/
public void handle(List<String> fields, Object[] record, RecordChangeTypeEnum flag);
/**
* 行数据处理
*
* @param fields 字段名称列表,该列表只读
* @param record 一条数据记实录
* @param flag 数据变化状态
*/
void handle(List<String> fields, Object[] record, RecordChangeTypeEnum flag);
/**
* 计算结束通知
*
* @param fields 字段名称列表,该列表只读
*/
public void destroy(List<String> fields);
/**
* 计算结束通知
*
* @param fields 字段名称列表,该列表只读
*/
void destroy(List<String> fields);
}

View File

@@ -11,49 +11,51 @@ package com.gitee.dbswitch.dbchange;
/**
* 记录变化状态枚举类
*
* @author tang
*
* @author tang
*/
public enum RecordChangeTypeEnum {
/**
* 未变标识
*/
VALUE_IDENTICAL(0, "identical"),
/**
* 更新标识
*/
VALUE_CHANGED(1, "update"),
/**
* 插入标识
*/
VALUE_INSERT(2, "insert"),
/**
* 删除标识
*/
VALUE_DELETED(3, "delete")
;
/**
* 未变标识
*/
VALUE_IDENTICAL(0, "identical"),
/** index */
private Integer index;
/** 状态标记 */
private String status;
/**
* 更新标识
*/
VALUE_CHANGED(1, "update"),
RecordChangeTypeEnum(int idx, String flag) {
this.index = idx;
this.status = flag;
}
/**
* 插入标识
*/
VALUE_INSERT(2, "insert"),
public int getIndex() {
return index;
}
/**
* 删除标识
*/
VALUE_DELETED(3, "delete");
public String getStatus() {
return this.status;
}
/**
* index
*/
private Integer index;
/**
* 状态标记
*/
private String status;
RecordChangeTypeEnum(int idx, String flag) {
this.index = idx;
this.status = flag;
}
public int getIndex() {
return index;
}
public String getStatus() {
return this.status;
}
}

View File

@@ -19,53 +19,53 @@ import lombok.AllArgsConstructor;
/**
* 任务参数实体类定义
*
* @author tang
*
* @author tang
*/
@Data
@Builder
@AllArgsConstructor
public class TaskParamBean {
/**
* 老表的数据源
*/
@NonNull
DataSource oldDataSource;
/**
* 老表的schema名
*/
@NonNull
private String oldSchemaName;
/**
* 老表的数据源
*/
@NonNull
DataSource oldDataSource;
/**
* 老表的table
*/
@NonNull
private String oldTableName;
/**
* 老表的schema
*/
@NonNull
private String oldSchemaName;
/**
* 新表的数据源
*/
@NonNull
DataSource newDataSource;
/**
* 老表的table名
*/
@NonNull
private String oldTableName;
/**
* 新表的schema名
*/
@NonNull
private String newSchemaName;
/**
* 新表的数据源
*/
@NonNull
DataSource newDataSource;
/**
* 新表的table
*/
@NonNull
private String newTableName;
/**
* 字段列
*/
@Nullable
private List<String> fieldColumns;
/**
* 新表的schema
*/
@NonNull
private String newSchemaName;
/**
* 新表的table名
*/
@NonNull
private String newTableName;
/**
* 字段列
*/
@Nullable
private List<String> fieldColumns;
}

View File

@@ -16,126 +16,127 @@ import java.util.Map;
/**
* JDBC的数据类型相关工具类
*
* @author tang
*
* @author tang
*/
public final class JdbcTypesUtils {
private static final Map<Integer, String> TYPE_NAMES = new HashMap<>();
private static final Map<Integer, String> TYPE_NAMES = new HashMap<>();
static {
try {
for (Field field : Types.class.getFields()) {
TYPE_NAMES.put((Integer) field.get(null), field.getName());
}
} catch (Exception ex) {
throw new IllegalStateException("Failed to resolve JDBC Types constants", ex);
}
}
static {
try {
for (Field field : Types.class.getFields()) {
TYPE_NAMES.put((Integer) field.get(null), field.getName());
}
} catch (Exception ex) {
throw new IllegalStateException("Failed to resolve JDBC Types constants", ex);
}
}
/**
* 将JDBC的整型类型转换成文本类型
*
* @param sqlType jdbc的整型类型详见:{@codejava.sql.Types }
* @return JDBC的文本类型
*/
public static String resolveTypeName(int sqlType) {
return TYPE_NAMES.get(sqlType);
}
/**
* 将JDBC的整型类型转换成文本类型
*
* @param sqlType jdbc的整型类型详见:{@codejava.sql.Types }
* @return JDBC的文本类型
*/
public static String resolveTypeName(int sqlType) {
return TYPE_NAMES.get(sqlType);
}
/**
* 判断是否为JDCB的浮点数类型
*
* @param sqlType jdbc的整型类型详见:{@codejava.sql.Types }
* @return true为是否则为false
*/
public static boolean isNumeric(int sqlType) {
// 5
return (Types.DECIMAL == sqlType || Types.DOUBLE == sqlType || Types.FLOAT == sqlType
|| Types.NUMERIC == sqlType || Types.REAL == sqlType);
}
/**
* 判断是否为JDCB的浮点数类型
*
* @param sqlType jdbc的整型类型详见:{@codejava.sql.Types }
* @return true为是否则为false
*/
public static boolean isNumeric(int sqlType) {
// 5
return (Types.DECIMAL == sqlType || Types.DOUBLE == sqlType || Types.FLOAT == sqlType
|| Types.NUMERIC == sqlType || Types.REAL == sqlType);
}
/**
* 判断是否为JDCB的整型类型
*
* @param sqlType jdbc的整型类型详见:{@codejava.sql.Types }
* @return true为是否则为false
*/
public static boolean isInteger(int sqlType) {
// 5
return (Types.BIT == sqlType || Types.BIGINT == sqlType || Types.INTEGER == sqlType || Types.SMALLINT == sqlType
|| Types.TINYINT == sqlType);
}
/**
* 判断是否为JDCB的字符文本类型
*
* @param sqlType jdbc的整型类型详见:{@codejava.sql.Types }
* @return true为是否则为false
*/
public static boolean isString(int sqlType) {
// 10
return (Types.CHAR == sqlType || Types.NCHAR == sqlType || Types.VARCHAR == sqlType
|| Types.LONGVARCHAR == sqlType || Types.NVARCHAR == sqlType || Types.LONGNVARCHAR == sqlType
|| Types.CLOB == sqlType || Types.NCLOB == sqlType || Types.SQLXML == sqlType
|| Types.ROWID == sqlType);
}
/**
* 判断是否为JDCB的整型类型
*
* @param sqlType jdbc的整型类型详见:{@codejava.sql.Types }
* @return true为是否则为false
*/
public static boolean isInteger(int sqlType) {
// 5
return (Types.BIT == sqlType || Types.BIGINT == sqlType || Types.INTEGER == sqlType
|| Types.SMALLINT == sqlType
|| Types.TINYINT == sqlType);
}
/**
* 判断是否为JDCB的时间类型
*
* @param sqlType jdbc的整型类型详见:{@codejava.sql.Types }
* @return true为是否则为false
*/
public static boolean isDateTime(int sqlType) {
// 5
return (Types.DATE == sqlType || Types.TIME == sqlType || Types.TIMESTAMP == sqlType
|| Types.TIME_WITH_TIMEZONE == sqlType || Types.TIMESTAMP_WITH_TIMEZONE == sqlType);
}
/**
* 判断是否为JDCB的字符文本类型
*
* @param sqlType jdbc的整型类型详见:{@codejava.sql.Types }
* @return true为是否则为false
*/
public static boolean isString(int sqlType) {
// 10
return (Types.CHAR == sqlType || Types.NCHAR == sqlType || Types.VARCHAR == sqlType
|| Types.LONGVARCHAR == sqlType || Types.NVARCHAR == sqlType
|| Types.LONGNVARCHAR == sqlType
|| Types.CLOB == sqlType || Types.NCLOB == sqlType || Types.SQLXML == sqlType
|| Types.ROWID == sqlType);
}
/**
* 判断是否为JDCB的布尔类型
*
* @param sqlType jdbc的整型类型详见:{@codejava.sql.Types }
* @return true为是否则为false
*/
public static boolean isBoolean(int sqlType) {
// 1
return (Types.BOOLEAN == sqlType);
}
/**
* 判断是否为JDCB的时间类型
*
* @param sqlType jdbc的整型类型详见:{@codejava.sql.Types }
* @return true为是否则为false
*/
public static boolean isDateTime(int sqlType) {
// 5
return (Types.DATE == sqlType || Types.TIME == sqlType || Types.TIMESTAMP == sqlType
|| Types.TIME_WITH_TIMEZONE == sqlType || Types.TIMESTAMP_WITH_TIMEZONE == sqlType);
}
/**
* 判断是否为JDCB的二进制类型
*
* @param sqlType jdbc的整型类型详见:{@codejava.sql.Types }
* @return true为是否则为false
*/
public static boolean isBinary(int sqlType) {
// 4
return (Types.BINARY == sqlType || Types.VARBINARY == sqlType || Types.BLOB == sqlType
|| Types.LONGVARBINARY == sqlType);
}
public static boolean isTextable(int sqlType) {
return isNumeric(sqlType) || isString(sqlType) || isDateTime(sqlType) || isBoolean(sqlType) ;
}
/**
* 判断是否为JDCB的布尔类型
*
* @param sqlType jdbc的整型类型详见:{@codejava.sql.Types }
* @return true为是否则为false
*/
public static boolean isBoolean(int sqlType) {
// 1
return (Types.BOOLEAN == sqlType);
}
// 其他类型如下9个
// JAVA_OBJECT
// OTHER
// NULL
// DISTINCT
// STRUCT
// ARRAY
// REF
// DATALINK
// REF_CURSOR
/**
* 构造函数私有化
*/
private JdbcTypesUtils() {
}
/**
* 判断是否为JDCB的二进制类型
*
* @param sqlType jdbc的整型类型详见:{@codejava.sql.Types }
* @return true为是否则为false
*/
public static boolean isBinary(int sqlType) {
// 4
return (Types.BINARY == sqlType || Types.VARBINARY == sqlType || Types.BLOB == sqlType
|| Types.LONGVARBINARY == sqlType);
}
public static boolean isTextable(int sqlType) {
return isNumeric(sqlType) || isString(sqlType) || isDateTime(sqlType) || isBoolean(sqlType);
}
// 其他类型如下9个
// JAVA_OBJECT
// OTHER
// NULL
// DISTINCT
// STRUCT
// ARRAY
// REF
// DATALINK
// REF_CURSOR
/**
* 构造函数私有化
*/
private JdbcTypesUtils() {
}
}