diff --git a/dbswitch-admin/src/main/java/com/gitee/dbswitch/admin/common/response/ResultCode.java b/dbswitch-admin/src/main/java/com/gitee/dbswitch/admin/common/response/ResultCode.java index a9c671b2..31fb641a 100644 --- a/dbswitch-admin/src/main/java/com/gitee/dbswitch/admin/common/response/ResultCode.java +++ b/dbswitch-admin/src/main/java/com/gitee/dbswitch/admin/common/response/ResultCode.java @@ -16,18 +16,20 @@ import lombok.Getter; @AllArgsConstructor public enum ResultCode { - SUCCESS(0,"操作成功"), - ERROR_INTERNAL_ERROR(1,"内部错误"), - ERROR_INVALID_ARGUMENT(2,"无效参数"), - ERROR_RESOURCE_NOT_EXISTS(3,"资源不存在"), - ERROR_RESOURCE_ALREADY_EXISTS(4,"资源已存在"), - ERROR_RESOURCE_NOT_DEPLOY(5,"资源未发布"), - ERROR_RESOURCE_HAS_DEPLOY(6,"资源已发布"), - ERROR_USER_NOT_EXISTS(7,"用户不存在"), - ERROR_USER_PASSWORD_WRONG(8,"密码错误"), + SUCCESS(0, "操作成功"), + ERROR_INTERNAL_ERROR(1, "内部错误"), + ERROR_INVALID_ARGUMENT(2, "无效参数"), + ERROR_RESOURCE_NOT_EXISTS(3, "资源不存在"), + ERROR_RESOURCE_ALREADY_EXISTS(4, "资源已存在"), + ERROR_RESOURCE_NOT_DEPLOY(5, "资源未发布"), + ERROR_RESOURCE_HAS_DEPLOY(6, "资源已发布"), + ERROR_USER_NOT_EXISTS(7, "用户不存在"), + ERROR_USER_PASSWORD_WRONG(8, "密码错误"), + ERROR_INVALID_JDBC_URL(9, "JDBC连接的URL格式不正确"), + ERROR_CANNOT_CONNECT_REMOTE(10, "远程地址不可达"), - ERROR_ACCESS_FORBIDDEN(403,"无效的登陆凭证"), - ERROR_TOKEN_EXPIRED(404,"登录凭证已失效"), + ERROR_ACCESS_FORBIDDEN(403, "无效的登陆凭证"), + ERROR_TOKEN_EXPIRED(404, "登录凭证已失效"), ; private int code; diff --git a/dbswitch-admin/src/main/java/com/gitee/dbswitch/admin/service/DbConnectionService.java b/dbswitch-admin/src/main/java/com/gitee/dbswitch/admin/service/DbConnectionService.java index ab5f881f..7841a4d9 100644 --- a/dbswitch-admin/src/main/java/com/gitee/dbswitch/admin/service/DbConnectionService.java +++ b/dbswitch-admin/src/main/java/com/gitee/dbswitch/admin/service/DbConnectionService.java @@ -23,6 +23,7 @@ import com.gitee.dbswitch.admin.model.response.DatabaseTypeDetailResponse; import com.gitee.dbswitch.admin.model.response.DbConnectionDetailResponse; import com.gitee.dbswitch.admin.model.response.DbConnectionNameResponse; import com.gitee.dbswitch.admin.type.SupportDbTypeEnum; +import com.gitee.dbswitch.admin.util.JDBCURL; import com.gitee.dbswitch.admin.util.PageUtil; import com.gitee.dbswitch.common.constant.DatabaseTypeEnum; import com.gitee.dbswitch.core.service.IMetaDataService; @@ -31,8 +32,10 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.function.Supplier; +import java.util.regex.Matcher; import java.util.stream.Collectors; import javax.annotation.Resource; +import org.apache.commons.lang.StringUtils; import org.springframework.stereotype.Service; @Service @@ -48,7 +51,7 @@ public class DbConnectionService { detail.setId(type.getId()); detail.setType(type.getName().toUpperCase()); detail.setDriver(type.getDriver()); - detail.setTemplate(type.getTemplate()); + detail.setTemplate(StringUtils.join(type.getUrl(), ",")); lists.add(detail); } @@ -80,11 +83,38 @@ public class DbConnectionService { return Result.failed(ResultCode.ERROR_RESOURCE_NOT_EXISTS, "id=" + id); } - DatabaseTypeEnum dbType = DatabaseTypeEnum.valueOf(dbConn.getType().getName().toUpperCase()); - IMetaDataService metaDataService = new MigrationMetaDataServiceImpl(); - metaDataService.setDatabaseConnection(dbType); - metaDataService.testQuerySQL(dbConn.getUrl(), dbConn.getUsername(), dbConn.getPassword(), - dbConn.getType().getSql()); + SupportDbTypeEnum supportDbType = SupportDbTypeEnum + .valueOf(dbConn.getType().getName().toUpperCase()); + for (String pattern : supportDbType.getUrl()) { + final Matcher matcher = JDBCURL.getPattern(pattern).matcher(dbConn.getUrl()); + if (!matcher.matches()) { + if (1 == supportDbType.getUrl().length) { + return Result.failed(ResultCode.ERROR_INVALID_JDBC_URL, dbConn.getUrl()); + } else { + continue; + } + } + + String host = matcher.group("host"); + String port = matcher.group("port"); + if (null == port) { + port = String.valueOf(supportDbType.getPort()); + } + + if (!JDBCURL.reachable(host, port)) { + return Result.failed(ResultCode.ERROR_CANNOT_CONNECT_REMOTE, dbConn.getUrl()); + } + + DatabaseTypeEnum prd = DatabaseTypeEnum.valueOf(dbConn.getType().getName().toUpperCase()); + IMetaDataService metaDataService = new MigrationMetaDataServiceImpl(); + metaDataService.setDatabaseConnection(prd); + metaDataService.testQuerySQL( + dbConn.getUrl(), + dbConn.getUsername(), + dbConn.getPassword(), + dbConn.getType().getSql() + ); + } return Result.success(); } diff --git a/dbswitch-admin/src/main/java/com/gitee/dbswitch/admin/type/SupportDbTypeEnum.java b/dbswitch-admin/src/main/java/com/gitee/dbswitch/admin/type/SupportDbTypeEnum.java index 8b54bf61..bc105562 100644 --- a/dbswitch-admin/src/main/java/com/gitee/dbswitch/admin/type/SupportDbTypeEnum.java +++ b/dbswitch-admin/src/main/java/com/gitee/dbswitch/admin/type/SupportDbTypeEnum.java @@ -17,28 +17,31 @@ import org.springframework.util.StringUtils; @AllArgsConstructor public enum SupportDbTypeEnum { - MYSQL(1, "mysql", "com.mysql.jdbc.Driver", "/* ping */ SELECT 1", - "jdbc:mysql://{host}:{port>/{name}?useUnicode=true&characterEncoding=utf-8&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&nullCatalogMeansCurrent=true&tinyInt1isBit=false"), - MARIADB(2, "mariadb", "org.mariadb.jdbc.Driver", "SELECT 1", - "jdbc:mariadb://{host}:{port}/{name}?useUnicode=true&characterEncoding=utf-8&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&nullCatalogMeansCurrent=true&tinyInt1isBit=false"), - ORACLE(3, "oracle", "oracle.jdbc.driver.OracleDriver", "SELECT 'Hello' from DUAL", - "jdbc:oracle:thin:@{host}:{port}:{name}"), - SQLSERVER(4, "sqlserver", "com.microsoft.sqlserver.jdbc.SQLServerDriver", "SELECT 1+2 as a", - "jdbc:sqlserver://{host}:{port};DatabaseName={name}"), - POSTGRESQL(5, "postgresql", "org.postgresql.Driver", "SELECT 1", - "jdbc:postgresql://{host}:{port}/{name}"), - DB2(6, "db2", "com.ibm.db2.jcc.DB2Driver", "SELECT 1 FROM SYSIBM.SYSDUMMY1", - "jdbc:db2://{host}:{port}/{name}:driverType=4;fullyMaterializeLobData=true;fullyMaterializeInputStreams=true;progressiveStreaming=2;progresssiveLocators=2;"), - DM(7, "dm", "dm.jdbc.driver.DmDriver", "SELECT 'Hello' from DUAL", "jdbc:dm://{host}:{port}"), - KINGBASE(8, "kingbase", "com.kingbase8.Driver", "SELECT 1", - "jdbc:kingbase8://{host}:{port}/{name}"), + MYSQL(1, "mysql", "com.mysql.jdbc.Driver", 3306, "/* ping */ SELECT 1", + new String[]{"jdbc:mysql://{host}[:{port}]/[{database}][\\?{params}]"}), + MARIADB(2, "mariadb", "org.mariadb.jdbc.Driver", 3306, "SELECT 1", + new String[]{"jdbc:mariadb://{host}[:{port}]/[{database}][\\?{params}]"}), + ORACLE(3, "oracle", "oracle.jdbc.driver.OracleDriver", 1521, "SELECT 'Hello' from DUAL", + new String[]{"jdbc:oracle:thin:@{host}:{port}:{name}", + "jdbc:oracle:thin:@//{host}[:{port}]/{name}"}), + SQLSERVER(4, "sqlserver", "com.microsoft.sqlserver.jdbc.SQLServerDriver", 1433, "SELECT 1+2 as a", + new String[]{"jdbc:sqlserver://{host}[:{port}][;databaseName={database}][;{params}]"}), + POSTGRESQL(5, "postgresql", "org.postgresql.Driver", 5432, "SELECT 1", + new String[]{"jdbc:postgresql://{host}[:{port}]/[{database}][\\?{params}]"}), + DB2(6, "db2", "com.ibm.db2.jcc.DB2Driver", 50000, "SELECT 1 FROM SYSIBM.SYSDUMMY1", + new String[]{"jdbc:db2://{host}:{port}/{database}[:{params}]"}), + DM(7, "dm", "dm.jdbc.driver.DmDriver", 5236, "SELECT 'Hello' from DUAL", + new String[]{"jdbc:dm://{host}:{port}[/{database}][\\?{params}]"}), + KINGBASE(8, "kingbase", "com.kingbase8.Driver", 54321, "SELECT 1", + new String[]{"jdbc:kingbase8://{host}[:{port}]/[{database}][\\?{params}]"}), ; private int id; private String name; private String driver; + private int port; private String sql; - private String template; + private String[] url; public static SupportDbTypeEnum of(String name) { if (!StringUtils.isEmpty(name)) { diff --git a/dbswitch-admin/src/main/java/com/gitee/dbswitch/admin/util/JDBCURL.java b/dbswitch-admin/src/main/java/com/gitee/dbswitch/admin/util/JDBCURL.java new file mode 100644 index 00000000..f8698025 --- /dev/null +++ b/dbswitch-admin/src/main/java/com/gitee/dbswitch/admin/util/JDBCURL.java @@ -0,0 +1,297 @@ +// Copyright tang. All rights reserved. +// https://gitee.com/inrgihc/dbswitch +// +// Use of this source code is governed by a BSD-style license +// +// Author: tang (inrgihc@126.com) +// Date : 2020/1/2 +// Location: beijing , china +///////////////////////////////////////////////////////////// +package com.gitee.dbswitch.admin.util; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * JDBC-URL参数提取工具类 + * + * @author tang + * @date 2021-11-20 22:54:21 + * @since 1.0 + */ +public class JDBCURL { + + public static final String PROP_HOST = "host"; //$NON-NLS-1$ + public static final String PROP_PORT = "port"; //$NON-NLS-1$ + public static final String PROP_DATABASE = "database"; //$NON-NLS-1$ + public static final String PROP_SERVER = "server"; //$NON-NLS-1$ + public static final String PROP_PARAMS = "params"; //$NON-NLS-1$ + public static final String PROP_FOLDER = "folder"; //$NON-NLS-1$ + public static final String PROP_FILE = "file"; //$NON-NLS-1$ + public static final String PROP_USER = "user"; //$NON-NLS-1$ + public static final String PROP_PASSWORD = "password"; //$NON-NLS-1$ + + private static String getPropertyRegex(String property) { + switch (property) { + case PROP_FOLDER: + case PROP_FILE: + case PROP_PARAMS: + return ".+?"; + default: + return "[\\\\w\\\\-_.~]+"; + } + } + + private static String replaceAll(String input, String regex, Function replacer) { + final Matcher matcher = Pattern.compile(regex).matcher(input); + final StringBuffer sb = new StringBuffer(); + while (matcher.find()) { + matcher.appendReplacement(sb, replacer.apply(matcher)); + } + matcher.appendTail(sb); + return sb.toString(); + } + + public static Pattern getPattern(String sampleUrl) { + String pattern = sampleUrl; + pattern = replaceAll(pattern, "\\[(.*?)]", m -> "\\\\E(?:\\\\Q" + m.group(1) + "\\\\E)?\\\\Q"); + pattern = replaceAll(pattern, "\\{(.*?)}", + m -> "\\\\E(\\?<\\\\Q" + m.group(1) + "\\\\E>" + getPropertyRegex(m.group(1)) + ")\\\\Q"); + pattern = "^\\Q" + pattern + "\\E$"; + return Pattern.compile(pattern); + } + + /** + * 根据主机地址与端口号检查可达性 + * + * @param host 主机地址 + * @param port 端口号 + * @return 成功返回true,否则为false + */ + public static boolean reachable(String host, String port) { + try { + InetAddress address = InetAddress.getByName(host); + if (!address.isReachable(1500)) { + return false; + } + + try (Socket socket = new Socket()) { + socket.connect(new InetSocketAddress(host, Integer.parseInt(port)), 1500); + } + } catch (IOException e) { + return false; + } + + return true; + } + + /** + * 测试代码 + * + * @param args + */ + public static void main(String[] args) { + // 1、teradata数据库 + // jdbc:teradata://localhost/DATABASE=test,DBS_PORT=1234,CLIENT_CHARSET=EUC_CN,TMODE=TERA,CHARSET=ASCII,LOB_SUPPORT=true + final Matcher matcher0 = JDBCURL + .getPattern("jdbc:teradata://{host}/DATABASE={database},DBS_PORT={port}[,{params}]") + .matcher( + "jdbc:teradata://localhost/DATABASE=test,DBS_PORT=1234,CLIENT_CHARSET=EUC_CN,TMODE=TERA,CHARSET=ASCII,LOB_SUPPORT=true"); + if (matcher0.matches()) { + System.out.println("teradata host:" + matcher0.group("host")); + System.out.println("teradata port:" + matcher0.group("port")); + System.out.println("teradata database:" + matcher0.group("database")); + String params = matcher0.group("params"); + if (null != params) { + String[] pairs = params.split(","); + for (String pair : pairs) { + System.out.println("teradata params:" + pair); + } + } + } else { + System.out.println("error for teradata!"); + } + + // 2、PostgreSQL数据库 + // jdbc:postgresql://localhost:5432/dvdrental?currentSchema=test&ssl=true + // https://jdbc.postgresql.org/documentation/head/connect.html + final Matcher matcher1 = JDBCURL + .getPattern("jdbc:postgresql://{host}[:{port}]/[{database}][\\?{params}]") + .matcher("jdbc:postgresql://localhost:5432/dvdrental?currentSchema=test&ssl=true"); + if (matcher1.matches()) { + System.out.println("postgresql host:" + matcher1.group("host")); + System.out.println("postgresql port:" + matcher1.group("port")); + System.out.println("postgresql database:" + matcher1.group("database")); + String params = matcher1.group("params"); + if (null != params) { + String[] pairs = params.split("&"); + for (String pair : pairs) { + System.out.println("postgresql params:" + pair); + } + } + } else { + System.out.println("error for postgresql!"); + } + + // 3、Oracle数据库 + // oracle sid 方式 + final Matcher matcher2 = JDBCURL.getPattern("jdbc:oracle:thin:@{host}[:{port}]:{sid}") + .matcher("jdbc:oracle:thin:@localhost:1521:orcl"); + if (matcher2.matches()) { + System.out.println("oracle sid host:" + matcher2.group("host")); + System.out.println("oracle sid port:" + matcher2.group("port")); + System.out.println("oracle sid name:" + matcher2.group("sid")); + } else { + System.out.println("error for oracle sid!"); + } + + // oracle service name 方式 + final Matcher matcher2_1 = JDBCURL.getPattern("jdbc:oracle:thin:@//{host}[:{port}]/{name}") + .matcher("jdbc:oracle:thin:@//localhost:1521/orcl.city.com"); + if (matcher2_1.matches()) { + System.out.println("oracle ServiceName host:" + matcher2_1.group("host")); + System.out.println("oracle ServiceName port:" + matcher2_1.group("port")); + System.out.println("oracle ServiceName name:" + matcher2_1.group("name")); + } else { + System.out.println("error for oracle ServiceName!"); + } + + // oracle TNSName 方式不支持 + // jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS_LIST=(ADDRESS=(PROTOCOL=TCP)(HOST=192.168.16.91)(PORT=1521)))(CONNECT_DATA=(SERVICE_NAME=orcl))) + // .............................. + + // 4、MySQL数据库 + // jdbc:mysql://172.17.2.10:3306/test?useUnicode=true&useSSL=false + final Matcher matcher3 = JDBCURL + .getPattern("jdbc:mysql://{host}[:{port}]/[{database}][\\?{params}]") + .matcher("jdbc:mysql://localhost:3306/test_demo?useUnicode=true&useSSL=false"); + if (matcher3.matches()) { + System.out.println("mysql host:" + matcher3.group("host")); + System.out.println("mysql port:" + matcher3.group("port")); + System.out.println("mysql database:" + matcher3.group("database")); + String params = matcher3.group("params"); + if (null != params) { + String[] pairs = params.split("&"); + for (String pair : pairs) { + System.out.println("mysql params:" + pair); + } + } + } else { + System.out.println("error for mysql!"); + } + + // 5、MariaDB数据库 + // 同Mysql的jdbc-url + final Matcher matcher4 = JDBCURL + .getPattern("jdbc:mariadb://{host}[:{port}]/[{database}][\\?{params}]") + .matcher("jdbc:mariadb://localhost:3306/test_demo"); + if (matcher4.matches()) { + System.out.println("mariadb host:" + matcher4.group("host")); + System.out.println("mariadb port:" + matcher4.group("port")); + System.out.println("mariadb database:" + matcher4.group("database")); + String params = matcher4.group("params"); + if (null != params) { + String[] pairs = params.split("&"); + for (String pair : pairs) { + System.out.println("mysql params:" + pair); + } + } + } else { + System.out.println("error for mariadb!"); + } + + // 6、Microsoft SQLServer数据库 + // jdbc:sqlserver://localhost:1433;databaseName=AdventureWorks;user=MyUserName;password=123456; + final Matcher matcher5 = JDBCURL + .getPattern("jdbc:sqlserver://{host}[:{port}][;databaseName={database}][;{params}]") + .matcher("jdbc:sqlserver://localhost:1433;databaseName=master;user=MyUserName"); + if (matcher5.matches()) { + System.out.println("sqlserver host:" + matcher5.group("host")); + System.out.println("sqlserver port:" + matcher5.group("port")); + System.out.println("sqlserver database:" + matcher5.group("database")); + String params = matcher5.group("params"); + if (null != params) { + String[] pairs = params.split(";"); + for (String pair : pairs) { + System.out.println("sqlserver params:" + pair); + } + } + } else { + System.out.println("error for sqlserver!"); + } + + // 7、人大金仓数据库 + // 同postgresql的jdbc-url + final Matcher matcher6 = JDBCURL + .getPattern("jdbc:kingbase8://{host}[:{port}]/[{database}][\\?{params}]") + .matcher("jdbc:kingbase8://localhost:54321/sample"); + if (matcher6.matches()) { + System.out.println("kingbase8 host:" + matcher6.group("host")); + System.out.println("kingbase8 port:" + matcher6.group("port")); + System.out.println("kingbase8 database:" + matcher6.group("database")); + String params = matcher6.group("params"); + if (null != params) { + String[] pairs = params.split("&"); + for (String pair : pairs) { + System.out.println("mysql params:" + pair); + } + } + } else { + System.out.println("error for kingbase8!"); + } + + // 8、达梦数据库 + // jdbc:dm://localhost:5236/user?param=hello + final Matcher matcher7 = JDBCURL.getPattern("jdbc:dm://{host}:{port}[/{database}][\\?{params}]") + .matcher("jdbc:dm://localhost:5236"); + if (matcher7.matches()) { + System.out.println("dm host:" + matcher7.group("host")); + System.out.println("dm port:" + matcher7.group("port")); + System.out.println("dm database:" + matcher7.group("database")); + String params = matcher7.group("params"); + if (null != params) { + String[] pairs = params.split("&"); + for (String pair : pairs) { + System.out.println("dm params:" + pair); + } + } + } else { + System.out.println("error for dm!"); + } + + // 9、DB2数据库 + // jdbc:db2://localhost:50000/testdb:driverType=4;fullyMaterializeLobData=true;fullyMaterializeInputStreams=true;progressiveStreaming=2;progresssiveLocators=2; + final Matcher matcher8 = JDBCURL.getPattern("jdbc:db2://{host}:{port}/{database}[:{params}]") + .matcher("jdbc:db2://localhost:50000/testdb:driverType=4;fullyMaterializeLobData=true"); + if (matcher8.matches()) { + System.out.println("db2 host:" + matcher8.group("host")); + System.out.println("db2 port:" + matcher8.group("port")); + System.out.println("db2 database:" + matcher8.group("database")); + String params = matcher8.group("params"); + if (null != params) { + String[] pairs = params.split(";"); + for (String pair : pairs) { + System.out.println("mysql params:" + pair); + } + } + } else { + System.out.println("error for db2!"); + } + + // 9、SQLite数据库 + // jdbc:sqlite:/tmp/phone.db + final Matcher matcher9 = JDBCURL.getPattern("jdbc:sqlite:{file}") + .matcher("jdbc:sqlite:D:\\Project\\Test\\phone.db"); + if (matcher9.matches()) { + System.out.println("sqlite file:" + matcher9.group("file")); + } else { + System.out.println("error for sqlite!"); + } + } + +} diff --git a/dbswitch-dbchange/src/main/java/com/gitee/dbswitch/dbchange/ChangeCaculatorService.java b/dbswitch-dbchange/src/main/java/com/gitee/dbswitch/dbchange/ChangeCaculatorService.java index 4e831e8f..c6d3c7f0 100644 --- a/dbswitch-dbchange/src/main/java/com/gitee/dbswitch/dbchange/ChangeCaculatorService.java +++ b/dbswitch-dbchange/src/main/java/com/gitee/dbswitch/dbchange/ChangeCaculatorService.java @@ -9,474 +9,498 @@ ///////////////////////////////////////////////////////////// package com.gitee.dbswitch.dbchange; +import com.gitee.dbswitch.dbchange.pojo.TaskParamBean; +import com.gitee.dbswitch.dbchange.util.JdbcTypesUtils; +import com.gitee.dbswitch.dbcommon.constant.Constants; +import com.gitee.dbswitch.dbcommon.database.DatabaseOperatorFactory; +import com.gitee.dbswitch.dbcommon.database.IDatabaseOperator; +import com.gitee.dbswitch.dbcommon.pojo.StatementResultSet; +import com.gitee.dbswitch.dbcommon.util.JdbcMetaDataUtils; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import org.springframework.util.SerializationUtils; -import com.gitee.dbswitch.dbcommon.constant.Constants; -import com.gitee.dbswitch.dbcommon.database.DatabaseOperatorFactory; -import com.gitee.dbswitch.dbcommon.database.IDatabaseOperator; -import com.gitee.dbswitch.dbcommon.pojo.StatementResultSet; -import com.gitee.dbswitch.dbchange.pojo.TaskParamBean; -import com.gitee.dbswitch.dbchange.util.JdbcTypesUtils; -import com.gitee.dbswitch.dbcommon.util.JdbcMetaDataUtils; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; +import org.springframework.util.SerializationUtils; /** * 数据变化量计算核心类 - * - * @author tang * + * @author tang */ @Slf4j public final class ChangeCaculatorService implements IDatabaseChangeCaculator { - /** 是否记录不变化的记录 */ - private boolean recordIdentical; + /** + * 是否记录不变化的记录 + */ + private boolean recordIdentical; - /** 是否进行jdbc数据type检查 */ - private boolean checkJdbcType; + /** + * 是否进行jdbc数据type检查 + */ + private boolean checkJdbcType; - /** 批量读取数据的行数大小 */ - private int queryFetchSize; + /** + * 批量读取数据的行数大小 + */ + private int queryFetchSize; - public ChangeCaculatorService() { - this(false, true); - } + public ChangeCaculatorService() { + this(false, true); + } - public ChangeCaculatorService(boolean recordIdentical, boolean checkJdbcType) { - this.recordIdentical = recordIdentical; - this.checkJdbcType = checkJdbcType; - this.queryFetchSize = Constants.DEFAULT_FETCH_SIZE; - } + public ChangeCaculatorService(boolean recordIdentical, boolean checkJdbcType) { + this.recordIdentical = recordIdentical; + this.checkJdbcType = checkJdbcType; + this.queryFetchSize = Constants.DEFAULT_FETCH_SIZE; + } - @Override - public boolean isRecordIdentical() { - return this.recordIdentical; - } + @Override + public boolean isRecordIdentical() { + return this.recordIdentical; + } - @Override - public void setRecordIdentical(boolean recordOrNot) { - this.recordIdentical = recordOrNot; - } + @Override + public void setRecordIdentical(boolean recordOrNot) { + this.recordIdentical = recordOrNot; + } - @Override - public boolean isCheckJdbcType() { - return this.checkJdbcType; - } + @Override + public boolean isCheckJdbcType() { + return this.checkJdbcType; + } - @Override - public void setCheckJdbcType(boolean checkOrNot) { - this.checkJdbcType = checkOrNot; - } + @Override + public void setCheckJdbcType(boolean checkOrNot) { + this.checkJdbcType = checkOrNot; + } - @Override - public int getFetchSize() { - return this.queryFetchSize; - } + @Override + public int getFetchSize() { + return this.queryFetchSize; + } - @Override - public void setFetchSize(int size) { - if (size < Constants.MINIMUM_FETCH_SIZE) { - throw new IllegalArgumentException("设置的批量处理行数的大小fetchSize不得小于" + Constants.MINIMUM_FETCH_SIZE); - } + @Override + public void setFetchSize(int size) { + if (size < Constants.MINIMUM_FETCH_SIZE) { + throw new IllegalArgumentException( + "设置的批量处理行数的大小fetchSize不得小于" + Constants.MINIMUM_FETCH_SIZE); + } - this.queryFetchSize = size; - } + this.queryFetchSize = size; + } - @Override - public void executeCalculate(@NonNull TaskParamBean task, @NonNull IDatabaseRowHandler handler) { + @Override + public void executeCalculate(@NonNull TaskParamBean task, @NonNull IDatabaseRowHandler handler) { - if (log.isDebugEnabled()) { - log.debug("###### Begin execute calculate table CDC data now"); - } + if (log.isDebugEnabled()) { + log.debug("###### Begin execute calculate table CDC data now"); + } - boolean useOwnFieldsColumns = (task.getFieldColumns() != null && !task.getFieldColumns().isEmpty()); + boolean useOwnFieldsColumns = (task.getFieldColumns() != null && !task.getFieldColumns() + .isEmpty()); - // 检查新旧两张表的主键字段与比较字段 - JdbcMetaDataUtils oldMd = new JdbcMetaDataUtils(task.getOldDataSource()); - JdbcMetaDataUtils newMd = new JdbcMetaDataUtils(task.getNewDataSource()); - List fieldsPrimaryKeyOld = oldMd.queryTablePrimaryKeys(task.getOldSchemaName(), task.getOldTableName()); - List fieldsAllColumnOld = oldMd.queryTableColumnName(task.getOldSchemaName(), task.getOldTableName()); - List fieldsPrimaryKeyNew = newMd.queryTablePrimaryKeys(task.getNewSchemaName(), task.getNewTableName()); - List fieldsAllColumnNew = newMd.queryTableColumnName(task.getNewSchemaName(), task.getNewTableName()); + // 检查新旧两张表的主键字段与比较字段 + JdbcMetaDataUtils oldMd = new JdbcMetaDataUtils(task.getOldDataSource()); + JdbcMetaDataUtils newMd = new JdbcMetaDataUtils(task.getNewDataSource()); + List fieldsPrimaryKeyOld = oldMd + .queryTablePrimaryKeys(task.getOldSchemaName(), task.getOldTableName()); + List fieldsAllColumnOld = oldMd + .queryTableColumnName(task.getOldSchemaName(), task.getOldTableName()); + List fieldsPrimaryKeyNew = newMd + .queryTablePrimaryKeys(task.getNewSchemaName(), task.getNewTableName()); + List fieldsAllColumnNew = newMd + .queryTableColumnName(task.getNewSchemaName(), task.getNewTableName()); - if (fieldsPrimaryKeyOld.isEmpty() || fieldsPrimaryKeyNew.isEmpty()) { - throw new RuntimeException("计算变化量的表中存在无主键的表"); - } + if (fieldsPrimaryKeyOld.isEmpty() || fieldsPrimaryKeyNew.isEmpty()) { + throw new RuntimeException("计算变化量的表中存在无主键的表"); + } - boolean same = (fieldsPrimaryKeyOld.containsAll(fieldsPrimaryKeyNew) - && fieldsPrimaryKeyNew.containsAll(fieldsPrimaryKeyOld)); - if (!same) { - throw new RuntimeException("两个表的主键不相同"); - } + boolean same = (fieldsPrimaryKeyOld.containsAll(fieldsPrimaryKeyNew) + && fieldsPrimaryKeyNew.containsAll(fieldsPrimaryKeyOld)); + if (!same) { + throw new RuntimeException("两个表的主键不相同"); + } - if (useOwnFieldsColumns) { - if (!fieldsAllColumnOld.containsAll(task.getFieldColumns()) - || !fieldsAllColumnNew.containsAll(task.getFieldColumns())) { - throw new RuntimeException("指定的字段列不完全在两个表中存在"); - } - } else { - same = (fieldsAllColumnOld.containsAll(fieldsPrimaryKeyNew) - && fieldsAllColumnNew.containsAll(fieldsAllColumnOld)); - if (!same) { - throw new RuntimeException("两个表的字段不相同"); - } - } + if (useOwnFieldsColumns) { + if (!fieldsAllColumnOld.containsAll(task.getFieldColumns()) + || !fieldsAllColumnNew.containsAll(task.getFieldColumns())) { + throw new RuntimeException("指定的字段列不完全在两个表中存在"); + } + } else { + same = (fieldsAllColumnOld.containsAll(fieldsPrimaryKeyNew) + && fieldsAllColumnNew.containsAll(fieldsAllColumnOld)); + if (!same) { + throw new RuntimeException("两个表的字段不相同"); + } + } - if (useOwnFieldsColumns) { - // 如果自己配置了字段列表 - same = (task.getFieldColumns().containsAll(fieldsPrimaryKeyNew) - && task.getFieldColumns().containsAll(fieldsPrimaryKeyOld)); - if (!same) { - throw new RuntimeException("提供的比较字段中未包含主键"); - } + if (useOwnFieldsColumns) { + // 如果自己配置了字段列表 + same = (task.getFieldColumns().containsAll(fieldsPrimaryKeyNew) + && task.getFieldColumns().containsAll(fieldsPrimaryKeyOld)); + if (!same) { + throw new RuntimeException("提供的比较字段中未包含主键"); + } - same = (fieldsAllColumnOld.containsAll(task.getFieldColumns()) - && fieldsAllColumnNew.containsAll(task.getFieldColumns())); - if (!same) { - throw new RuntimeException("提供的比较字段中存在表中不存在的字段"); - } - } + same = (fieldsAllColumnOld.containsAll(task.getFieldColumns()) + && fieldsAllColumnNew.containsAll(task.getFieldColumns())); + if (!same) { + throw new RuntimeException("提供的比较字段中存在表中不存在的字段"); + } + } - // 计算除主键外的比较字段 - List fieldsOfCompareValue = new ArrayList<>(); - if (useOwnFieldsColumns) { - fieldsOfCompareValue.addAll(task.getFieldColumns()); - } else { - fieldsOfCompareValue.addAll(fieldsAllColumnOld); - } - fieldsOfCompareValue.removeAll(fieldsPrimaryKeyOld); + // 计算除主键外的比较字段 + List fieldsOfCompareValue = new ArrayList<>(); + if (useOwnFieldsColumns) { + fieldsOfCompareValue.addAll(task.getFieldColumns()); + } else { + fieldsOfCompareValue.addAll(fieldsAllColumnOld); + } + fieldsOfCompareValue.removeAll(fieldsPrimaryKeyOld); - // 构造查询列字段 - List queryFieldColumn; - if (useOwnFieldsColumns) { - queryFieldColumn = task.getFieldColumns(); - } else { - queryFieldColumn = fieldsAllColumnOld; - } + // 构造查询列字段 + List queryFieldColumn; + if (useOwnFieldsColumns) { + queryFieldColumn = task.getFieldColumns(); + } else { + queryFieldColumn = fieldsAllColumnOld; + } - StatementResultSet rsold = null; - StatementResultSet rsnew = null; + StatementResultSet rsold = null; + StatementResultSet rsnew = null; - try { - // 提取新旧两表数据的结果集(按主键排序后的) - IDatabaseOperator oldQuery = DatabaseOperatorFactory.createDatabaseOperator(task.getOldDataSource()); - oldQuery.setFetchSize(this.queryFetchSize); - IDatabaseOperator newQuery = DatabaseOperatorFactory.createDatabaseOperator(task.getNewDataSource()); - newQuery.setFetchSize(this.queryFetchSize); + try { + // 提取新旧两表数据的结果集(按主键排序后的) + IDatabaseOperator oldQuery = DatabaseOperatorFactory + .createDatabaseOperator(task.getOldDataSource()); + oldQuery.setFetchSize(this.queryFetchSize); + IDatabaseOperator newQuery = DatabaseOperatorFactory + .createDatabaseOperator(task.getNewDataSource()); + newQuery.setFetchSize(this.queryFetchSize); - if (log.isDebugEnabled()) { - log.debug("###### Query data from two table now"); - } + if (log.isDebugEnabled()) { + log.debug("###### Query data from two table now"); + } - rsold = oldQuery.queryTableData(task.getOldSchemaName(), task.getOldTableName(), queryFieldColumn, - fieldsPrimaryKeyOld); - rsnew = newQuery.queryTableData(task.getNewSchemaName(), task.getNewTableName(), queryFieldColumn, - fieldsPrimaryKeyNew); - ResultSetMetaData metaData = rsold.getResultset().getMetaData(); + rsold = oldQuery + .queryTableData(task.getOldSchemaName(), task.getOldTableName(), queryFieldColumn, + fieldsPrimaryKeyOld); + rsnew = newQuery + .queryTableData(task.getNewSchemaName(), task.getNewTableName(), queryFieldColumn, + fieldsPrimaryKeyNew); + ResultSetMetaData metaData = rsold.getResultset().getMetaData(); - if (log.isDebugEnabled()) { - log.debug("###### Check data validate now"); - } + if (log.isDebugEnabled()) { + log.debug("###### Check data validate now"); + } - // 检查结果集源信息是否一直 - int oldcnt = rsold.getResultset().getMetaData().getColumnCount(); - int newcnt = rsnew.getResultset().getMetaData().getColumnCount(); - if (oldcnt != newcnt) { - throw new RuntimeException(String.format("两个表的字段总个数不相等,即:%d!=%d", oldcnt, newcnt)); - } else { - for (int k = 1; k < metaData.getColumnCount(); ++k) { - String key1 = rsold.getResultset().getMetaData().getColumnLabel(k); - if (null == key1) { - key1 = rsold.getResultset().getMetaData().getColumnName(k); - } + // 检查结果集源信息是否一直 + int oldcnt = rsold.getResultset().getMetaData().getColumnCount(); + int newcnt = rsnew.getResultset().getMetaData().getColumnCount(); + if (oldcnt != newcnt) { + throw new RuntimeException(String.format("两个表的字段总个数不相等,即:%d!=%d", oldcnt, newcnt)); + } else { + for (int k = 1; k < metaData.getColumnCount(); ++k) { + String key1 = rsold.getResultset().getMetaData().getColumnLabel(k); + if (null == key1) { + key1 = rsold.getResultset().getMetaData().getColumnName(k); + } - String key2 = rsnew.getResultset().getMetaData().getColumnLabel(k); - if (null == key2) { - key2 = rsnew.getResultset().getMetaData().getColumnName(k); - } + String key2 = rsnew.getResultset().getMetaData().getColumnLabel(k); + if (null == key2) { + key2 = rsnew.getResultset().getMetaData().getColumnName(k); + } - if (!key1.equals(key2)) { - throw new RuntimeException(String.format("字段名称 [Index=%d] 不同,因 %s!=%s !", k, key1, key2)); - } + if (!key1.equals(key2)) { + throw new RuntimeException( + String.format("字段名称 [Index=%d] 不同,因 %s!=%s !", k, key1, key2)); + } - if (checkJdbcType) { - int type1 = rsold.getResultset().getMetaData().getColumnType(k); - int type2 = rsnew.getResultset().getMetaData().getColumnType(k); - if (type1 != type2) { - throw new RuntimeException(String.format("字段 [name=%s] 的数据类型不同,因 %s!=%s !", key1, - JdbcTypesUtils.resolveTypeName(type1), JdbcTypesUtils.resolveTypeName(type2))); - } - } + if (checkJdbcType) { + int type1 = rsold.getResultset().getMetaData().getColumnType(k); + int type2 = rsnew.getResultset().getMetaData().getColumnType(k); + if (type1 != type2) { + throw new RuntimeException(String.format("字段 [name=%s] 的数据类型不同,因 %s!=%s !", key1, + JdbcTypesUtils.resolveTypeName(type1), JdbcTypesUtils.resolveTypeName(type2))); + } + } - } - } + } + } - // 计算主键字段序列在结果集中的索引号 - int[] keyNumbers = new int[fieldsPrimaryKeyOld.size()]; - for (int i = 0; i < keyNumbers.length; ++i) { - String fn = fieldsPrimaryKeyOld.get(i); - keyNumbers[i] = getIndexOfField(fn, metaData); - } + // 计算主键字段序列在结果集中的索引号 + int[] keyNumbers = new int[fieldsPrimaryKeyOld.size()]; + for (int i = 0; i < keyNumbers.length; ++i) { + String fn = fieldsPrimaryKeyOld.get(i); + keyNumbers[i] = getIndexOfField(fn, metaData); + } - // 计算比较(非主键)字段序列在结果集中的索引号 - int[] valNumbers = new int[fieldsOfCompareValue.size()]; - for (int i = 0; i < valNumbers.length; ++i) { - String fn = fieldsOfCompareValue.get(i); - valNumbers[i] = getIndexOfField(fn, metaData); - } + // 计算比较(非主键)字段序列在结果集中的索引号 + int[] valNumbers = new int[fieldsOfCompareValue.size()]; + for (int i = 0; i < valNumbers.length; ++i) { + String fn = fieldsOfCompareValue.get(i); + valNumbers[i] = getIndexOfField(fn, metaData); + } - // 初始化计算结果数据字段列信息 - List targetColumns = new ArrayList<>(); - for (int k = 1; k <= metaData.getColumnCount(); ++k) { - String key = metaData.getColumnLabel(k); - if (null == key) { - key = metaData.getColumnName(k); - } - targetColumns.add(key); - } + // 初始化计算结果数据字段列信息 + List targetColumns = new ArrayList<>(); + for (int k = 1; k <= metaData.getColumnCount(); ++k) { + String key = metaData.getColumnLabel(k); + if (null == key) { + key = metaData.getColumnName(k); + } + targetColumns.add(key); + } - if (log.isDebugEnabled()) { - log.debug("###### Enter CDC calculate now"); - } + if (log.isDebugEnabled()) { + log.debug("###### Enter CDC calculate now"); + } - // 进入核心比较计算算法区域 - RecordChangeTypeEnum flagField = null; - Object[] outputRow; - Object[] one = getRowData(rsold.getResultset()); - Object[] two = getRowData(rsnew.getResultset()); - while (true) { - if (one == null && two == null) { - break; - } else if (one == null && two != null) { - flagField = RecordChangeTypeEnum.VALUE_INSERT; - outputRow = two; - two = getRowData(rsnew.getResultset()); - } else if (one != null && two == null) { - flagField = RecordChangeTypeEnum.VALUE_DELETED; - outputRow = one; - one = getRowData(rsold.getResultset()); - } else { - int compare = this.compare(one, two, keyNumbers, metaData); - if (0 == compare) { - int compareValues = this.compare(one, two, valNumbers, metaData); - if (compareValues == 0) { - flagField = RecordChangeTypeEnum.VALUE_IDENTICAL; - outputRow = one; - } else { - flagField = RecordChangeTypeEnum.VALUE_CHANGED; - outputRow = two; - } + // 进入核心比较计算算法区域 + RecordChangeTypeEnum flagField = null; + Object[] outputRow; + Object[] one = getRowData(rsold.getResultset()); + Object[] two = getRowData(rsnew.getResultset()); + while (true) { + if (one == null && two == null) { + break; + } else if (one == null && two != null) { + flagField = RecordChangeTypeEnum.VALUE_INSERT; + outputRow = two; + two = getRowData(rsnew.getResultset()); + } else if (one != null && two == null) { + flagField = RecordChangeTypeEnum.VALUE_DELETED; + outputRow = one; + one = getRowData(rsold.getResultset()); + } else { + int compare = this.compare(one, two, keyNumbers, metaData); + if (0 == compare) { + int compareValues = this.compare(one, two, valNumbers, metaData); + if (compareValues == 0) { + flagField = RecordChangeTypeEnum.VALUE_IDENTICAL; + outputRow = one; + } else { + flagField = RecordChangeTypeEnum.VALUE_CHANGED; + outputRow = two; + } - one = getRowData(rsold.getResultset()); - two = getRowData(rsnew.getResultset()); - } else { - if (compare < 0) { - flagField = RecordChangeTypeEnum.VALUE_DELETED; - outputRow = one; - one = getRowData(rsold.getResultset()); - } else { - flagField = RecordChangeTypeEnum.VALUE_INSERT; - outputRow = two; - two = getRowData(rsnew.getResultset()); - } - } - } + one = getRowData(rsold.getResultset()); + two = getRowData(rsnew.getResultset()); + } else { + if (compare < 0) { + flagField = RecordChangeTypeEnum.VALUE_DELETED; + outputRow = one; + one = getRowData(rsold.getResultset()); + } else { + flagField = RecordChangeTypeEnum.VALUE_INSERT; + outputRow = two; + two = getRowData(rsnew.getResultset()); + } + } + } - if (!this.recordIdentical && RecordChangeTypeEnum.VALUE_IDENTICAL == flagField) { - continue; - } + if (!this.recordIdentical && RecordChangeTypeEnum.VALUE_IDENTICAL == flagField) { + continue; + } - // 这里对计算的单条记录结果进行处理 - handler.handle(Collections.unmodifiableList(targetColumns), outputRow, flagField); - } + // 这里对计算的单条记录结果进行处理 + handler.handle(Collections.unmodifiableList(targetColumns), outputRow, flagField); + } - if (log.isDebugEnabled()) { - log.debug("###### Calculate CDC Over now"); - } + if (log.isDebugEnabled()) { + log.debug("###### Calculate CDC Over now"); + } - // 结束返回前的回调 - handler.destroy(Collections.unmodifiableList(targetColumns)); + // 结束返回前的回调 + handler.destroy(Collections.unmodifiableList(targetColumns)); - } catch (SQLException e) { - throw new RuntimeException(e); - } finally { - if (null != rsold) { - rsold.close(); - } - if (null != rsnew) { - rsnew.close(); - } - } + } catch (SQLException e) { + throw new RuntimeException(e); + } finally { + if (null != rsold) { + rsold.close(); + } + if (null != rsnew) { + rsnew.close(); + } + } - } + } - /** - * 获取字段的索引号 - * - * @param key 字段名 - * @param metaData 结果集的元信息 - * @return 字段的索引号 - * @throws SQLException - */ - private int getIndexOfField(String key, ResultSetMetaData metaData) throws SQLException { - for (int k = 1; k <= metaData.getColumnCount(); ++k) { - String fieldName = metaData.getColumnLabel(k); - if (null == fieldName) { - fieldName = metaData.getColumnName(k); - } + /** + * 获取字段的索引号 + * + * @param key 字段名 + * @param metaData 结果集的元信息 + * @return 字段的索引号 + * @throws SQLException + */ + private int getIndexOfField(String key, ResultSetMetaData metaData) throws SQLException { + for (int k = 1; k <= metaData.getColumnCount(); ++k) { + String fieldName = metaData.getColumnLabel(k); + if (null == fieldName) { + fieldName = metaData.getColumnName(k); + } - if (fieldName.equals(key)) { - return k - 1; - } - } + if (fieldName.equals(key)) { + return k - 1; + } + } - return -1; - } + return -1; + } - /** - * 记录比较 - * - * @param obj1 记录1 - * @param obj2 记录2 - * @param fieldnrs 待比较的字段索引号 - * @param metaData 记录集的元信息 - * @return 比较的结果:0,-1,1 - * @throws SQLException - */ - private int compare(Object[] obj1, Object[] obj2, int[] fieldnrs, ResultSetMetaData metaData) throws SQLException { - if (obj1.length != obj2.length) { - throw new RuntimeException("Invalid compare object list !"); - } + /** + * 记录比较 + * + * @param obj1 记录1 + * @param obj2 记录2 + * @param fieldnrs 待比较的字段索引号 + * @param metaData 记录集的元信息 + * @return 比较的结果:0,-1,1 + * @throws SQLException + */ + private int compare(Object[] obj1, Object[] obj2, int[] fieldnrs, ResultSetMetaData metaData) + throws SQLException { + if (obj1.length != obj2.length) { + throw new RuntimeException("Invalid compare object list !"); + } - for (int fieldnr : fieldnrs) { - int jdbcType = metaData.getColumnType(fieldnr + 1); - Object o1 = obj1[fieldnr]; - Object o2 = obj2[fieldnr]; + for (int fieldnr : fieldnrs) { + int jdbcType = metaData.getColumnType(fieldnr + 1); + Object o1 = obj1[fieldnr]; + Object o2 = obj2[fieldnr]; - int cmp = typeCompare(jdbcType, o1, o2); - if (cmp != 0) { - return cmp; - } - } + int cmp = typeCompare(jdbcType, o1, o2); + if (cmp != 0) { + return cmp; + } + } - return 0; - } + return 0; + } - /** - * 字段值对象比较,将对象转换为字节数组来比较实现 - * - * @param type 字段的JDBC数据类型 - * @param o1 对象1 - * @param o2 对象2 - * @return 0为相等,-1为小于,1为大于 - */ - private int typeCompare(int type, Object o1, Object o2) { - boolean n1 = (o1 == null); - boolean n2 = (o2 == null); - if (n1 && !n2) { - return -1; - } - if (!n1 && n2) { - return 1; - } - if (n1 && n2) { - return 0; - } + /** + * 字段值对象比较,将对象转换为字节数组来比较实现 + * + * @param type 字段的JDBC数据类型 + * @param o1 对象1 + * @param o2 对象2 + * @return 0为相等,-1为小于,1为大于 + */ + private int typeCompare(int type, Object o1, Object o2) { + boolean n1 = (o1 == null); + boolean n2 = (o2 == null); + if (n1 && !n2) { + return -1; + } + if (!n1 && n2) { + return 1; + } + if (n1 && n2) { + return 0; + } - /** - *

- * 这里要比较的两个对象o1与o2可能类型不同,但值相同,例如: Integer o1=12,Long o2=12; - *

- *

- * 但是这种不属于同一类的比较情况不应出现: String o1="12", Integer o2=12; - *

- */ - if (JdbcTypesUtils.isString(type)) { - String s1 = String.valueOf(o1); - String s2 = String.valueOf(o2); - return s1.compareTo(s2); - } else if (JdbcTypesUtils.isNumeric(type) && o1 instanceof java.lang.Number && o2 instanceof java.lang.Number) { - java.lang.Number s1 = (java.lang.Number) o1; - java.lang.Number s2 = (java.lang.Number) o2; - return Double.compare(s1.doubleValue(), s2.doubleValue()); - } else if (JdbcTypesUtils.isInteger(type) && o1 instanceof java.lang.Number && o2 instanceof java.lang.Number) { - java.lang.Number s1 = (java.lang.Number) o1; - java.lang.Number s2 = (java.lang.Number) o2; - return Long.compare(s1.longValue(), s2.longValue()); - } else if (JdbcTypesUtils.isDateTime(type)) { - if (o1 instanceof java.sql.Time && o2 instanceof java.sql.Time) { - java.sql.Time t1 = (java.sql.Time) o1; - java.sql.Time t2 = (java.sql.Time) o2; - return t1.compareTo(t2); - } else if (o1 instanceof java.sql.Timestamp && o2 instanceof java.sql.Timestamp) { - java.sql.Timestamp t1 = (java.sql.Timestamp) o1; - java.sql.Timestamp t2 = (java.sql.Timestamp) o2; - return t1.compareTo(t2); - } else if (o1 instanceof java.sql.Date && o2 instanceof java.sql.Date) { - java.sql.Date t1 = (java.sql.Date) o1; - java.sql.Date t2 = (java.sql.Date) o2; - return t1.compareTo(t2); - } else { - String s1 = String.valueOf(o1); - String s2 = String.valueOf(o2); - return s1.compareTo(s2); - } - } else { - return compareTo(SerializationUtils.serialize(o1), SerializationUtils.serialize(o2)); - } - } + /** + *

+ * 这里要比较的两个对象o1与o2可能类型不同,但值相同,例如: Integer o1=12,Long o2=12; + *

+ *

+ * 但是这种不属于同一类的比较情况不应出现: String o1="12", Integer o2=12; + *

+ */ + if (JdbcTypesUtils.isString(type)) { + String s1 = String.valueOf(o1); + String s2 = String.valueOf(o2); + return s1.compareTo(s2); + } else if (JdbcTypesUtils.isNumeric(type) && o1 instanceof java.lang.Number + && o2 instanceof java.lang.Number) { + java.lang.Number s1 = (java.lang.Number) o1; + java.lang.Number s2 = (java.lang.Number) o2; + return Double.compare(s1.doubleValue(), s2.doubleValue()); + } else if (JdbcTypesUtils.isInteger(type) && o1 instanceof java.lang.Number + && o2 instanceof java.lang.Number) { + java.lang.Number s1 = (java.lang.Number) o1; + java.lang.Number s2 = (java.lang.Number) o2; + return Long.compare(s1.longValue(), s2.longValue()); + } else if (JdbcTypesUtils.isDateTime(type)) { + if (o1 instanceof java.sql.Time && o2 instanceof java.sql.Time) { + java.sql.Time t1 = (java.sql.Time) o1; + java.sql.Time t2 = (java.sql.Time) o2; + return t1.compareTo(t2); + } else if (o1 instanceof java.sql.Timestamp && o2 instanceof java.sql.Timestamp) { + java.sql.Timestamp t1 = (java.sql.Timestamp) o1; + java.sql.Timestamp t2 = (java.sql.Timestamp) o2; + return t1.compareTo(t2); + } else if (o1 instanceof java.sql.Date && o2 instanceof java.sql.Date) { + java.sql.Date t1 = (java.sql.Date) o1; + java.sql.Date t2 = (java.sql.Date) o2; + return t1.compareTo(t2); + } else { + String s1 = String.valueOf(o1); + String s2 = String.valueOf(o2); + return s1.compareTo(s2); + } + } else { + try { + return compareTo(SerializationUtils.serialize(o1), SerializationUtils.serialize(o2)); + } catch (Exception e) { + log.warn("CDC compare field value failed, return 0 instead,{}", e.getMessage()); + return 0; + } + } + } - /** - * 字节数组的比较 - * - * @param s1 字节数组1 - * @param s2 字节数组2 - * @return 0为相等,-1为小于,1为大于 - */ - public int compareTo(byte[] s1, byte[] s2) { - int len1 = s1.length; - int len2 = s2.length; - int lim = Math.min(len1, len2); - byte[] v1 = s1; - byte[] v2 = s2; + /** + * 字节数组的比较 + * + * @param s1 字节数组1 + * @param s2 字节数组2 + * @return 0为相等,-1为小于,1为大于 + */ + public int compareTo(byte[] s1, byte[] s2) { + int len1 = s1.length; + int len2 = s2.length; + int lim = Math.min(len1, len2); + byte[] v1 = s1; + byte[] v2 = s2; - int k = 0; - while (k < lim) { - byte c1 = v1[k]; - byte c2 = v2[k]; - if (c1 != c2) { - return c1 - c2; - } - k++; - } - return len1 - len2; - } + int k = 0; + while (k < lim) { + byte c1 = v1[k]; + byte c2 = v2[k]; + if (c1 != c2) { + return c1 - c2; + } + k++; + } + return len1 - len2; + } - /** - * 从结果集中取出一条记录 - * - * @param rs 记录集 - * @return 一条记录,到记录结尾时返回null - * @throws SQLException - */ - private Object[] getRowData(ResultSet rs) throws SQLException { - ResultSetMetaData metaData = rs.getMetaData(); - Object[] rowData = null; + /** + * 从结果集中取出一条记录 + * + * @param rs 记录集 + * @return 一条记录,到记录结尾时返回null + * @throws SQLException + */ + private Object[] getRowData(ResultSet rs) throws SQLException { + ResultSetMetaData metaData = rs.getMetaData(); + Object[] rowData = null; - if (rs.next()) { - rowData = new Object[metaData.getColumnCount()]; - for (int j = 1; j <= metaData.getColumnCount(); ++j) { - rowData[j - 1] = rs.getObject(j); - } - } + if (rs.next()) { + rowData = new Object[metaData.getColumnCount()]; + for (int j = 1; j <= metaData.getColumnCount(); ++j) { + rowData[j - 1] = rs.getObject(j); + } + } - return rowData; - } + return rowData; + } } diff --git a/dbswitch-dbchange/src/main/java/com/gitee/dbswitch/dbchange/IDatabaseChangeCaculator.java b/dbswitch-dbchange/src/main/java/com/gitee/dbswitch/dbchange/IDatabaseChangeCaculator.java index 523cf496..9a2e6c19 100644 --- a/dbswitch-dbchange/src/main/java/com/gitee/dbswitch/dbchange/IDatabaseChangeCaculator.java +++ b/dbswitch-dbchange/src/main/java/com/gitee/dbswitch/dbchange/IDatabaseChangeCaculator.java @@ -13,59 +13,58 @@ import com.gitee.dbswitch.dbchange.pojo.TaskParamBean; /** * 变化量计算器接口定义 - * - * @author tang * + * @author tang */ public interface IDatabaseChangeCaculator { - /** - * 是否记录无变化的数据 - * - * @return 是否记录无变化的数据 - */ - public boolean isRecordIdentical(); + /** + * 是否记录无变化的数据 + * + * @return 是否记录无变化的数据 + */ + boolean isRecordIdentical(); - /** - * 设置是否记录无变化的数据 - * - * @param recordOrNot 是否记录无变化的数据 - */ - public void setRecordIdentical(boolean recordOrNot); - - /** - * 是否进行Jdbc的数据类型检查 - * - * @return 是否进行检查 - */ - public boolean isCheckJdbcType(); + /** + * 设置是否记录无变化的数据 + * + * @param recordOrNot 是否记录无变化的数据 + */ + void setRecordIdentical(boolean recordOrNot); - /** - * 设置是否进行Jdbc的数据类型检查 - * - * @param checkOrNot 是否进行检查 - */ - public void setCheckJdbcType(boolean checkOrNot); + /** + * 是否进行Jdbc的数据类型检查 + * + * @return 是否进行检查 + */ + boolean isCheckJdbcType(); - /** - * 获取JDBC驱动批量读取数据的行数大小 - * - * @return 批量行数大小 - */ - public int getFetchSize(); + /** + * 设置是否进行Jdbc的数据类型检查 + * + * @param checkOrNot 是否进行检查 + */ + void setCheckJdbcType(boolean checkOrNot); - /** - * 设置JDBC驱动批量读取数据的行数大小 - * - * @param size 批量行数大小 - */ - public void setFetchSize(int size); + /** + * 获取JDBC驱动批量读取数据的行数大小 + * + * @return 批量行数大小 + */ + int getFetchSize(); - /** - * 执行变化量计算任务 - * - * @param task 任务描述实体对象 - * @param handler 计算结果回调处理器 - */ - public void executeCalculate(TaskParamBean task, IDatabaseRowHandler handler); + /** + * 设置JDBC驱动批量读取数据的行数大小 + * + * @param size 批量行数大小 + */ + void setFetchSize(int size); + + /** + * 执行变化量计算任务 + * + * @param task 任务描述实体对象 + * @param handler 计算结果回调处理器 + */ + void executeCalculate(TaskParamBean task, IDatabaseRowHandler handler); } diff --git a/dbswitch-dbchange/src/main/java/com/gitee/dbswitch/dbchange/IDatabaseRowHandler.java b/dbswitch-dbchange/src/main/java/com/gitee/dbswitch/dbchange/IDatabaseRowHandler.java index eb14bdb1..2738db8c 100644 --- a/dbswitch-dbchange/src/main/java/com/gitee/dbswitch/dbchange/IDatabaseRowHandler.java +++ b/dbswitch-dbchange/src/main/java/com/gitee/dbswitch/dbchange/IDatabaseRowHandler.java @@ -13,25 +13,24 @@ import java.util.List; /** * 计算结果行记录处理器 - * - * @author tang * + * @author tang */ public interface IDatabaseRowHandler { - /** - * 行数据处理 - * - * @param fields 字段名称列表,该列表只读 - * @param record 一条数据记实录 - * @param flag 数据变化状态 - */ - public void handle(List fields, Object[] record, RecordChangeTypeEnum flag); + /** + * 行数据处理 + * + * @param fields 字段名称列表,该列表只读 + * @param record 一条数据记实录 + * @param flag 数据变化状态 + */ + void handle(List fields, Object[] record, RecordChangeTypeEnum flag); - /** - * 计算结束通知 - * - * @param fields 字段名称列表,该列表只读 - */ - public void destroy(List fields); + /** + * 计算结束通知 + * + * @param fields 字段名称列表,该列表只读 + */ + void destroy(List fields); } diff --git a/dbswitch-dbchange/src/main/java/com/gitee/dbswitch/dbchange/RecordChangeTypeEnum.java b/dbswitch-dbchange/src/main/java/com/gitee/dbswitch/dbchange/RecordChangeTypeEnum.java index e4ca1452..012ea164 100644 --- a/dbswitch-dbchange/src/main/java/com/gitee/dbswitch/dbchange/RecordChangeTypeEnum.java +++ b/dbswitch-dbchange/src/main/java/com/gitee/dbswitch/dbchange/RecordChangeTypeEnum.java @@ -11,49 +11,51 @@ package com.gitee.dbswitch.dbchange; /** * 记录变化状态枚举类 - * - * @author tang * + * @author tang */ public enum RecordChangeTypeEnum { - /** - * 未变标识 - */ - VALUE_IDENTICAL(0, "identical"), - - /** - * 更新标识 - */ - VALUE_CHANGED(1, "update"), - - /** - * 插入标识 - */ - VALUE_INSERT(2, "insert"), - - /** - * 删除标识 - */ - VALUE_DELETED(3, "delete") - ; + /** + * 未变标识 + */ + VALUE_IDENTICAL(0, "identical"), - /** index */ - private Integer index; - - /** 状态标记 */ - private String status; + /** + * 更新标识 + */ + VALUE_CHANGED(1, "update"), - RecordChangeTypeEnum(int idx, String flag) { - this.index = idx; - this.status = flag; - } + /** + * 插入标识 + */ + VALUE_INSERT(2, "insert"), - public int getIndex() { - return index; - } + /** + * 删除标识 + */ + VALUE_DELETED(3, "delete"); - public String getStatus() { - return this.status; - } + /** + * index + */ + private Integer index; + + /** + * 状态标记 + */ + private String status; + + RecordChangeTypeEnum(int idx, String flag) { + this.index = idx; + this.status = flag; + } + + public int getIndex() { + return index; + } + + public String getStatus() { + return this.status; + } } diff --git a/dbswitch-dbchange/src/main/java/com/gitee/dbswitch/dbchange/pojo/TaskParamBean.java b/dbswitch-dbchange/src/main/java/com/gitee/dbswitch/dbchange/pojo/TaskParamBean.java index 82f45535..8cb801fe 100644 --- a/dbswitch-dbchange/src/main/java/com/gitee/dbswitch/dbchange/pojo/TaskParamBean.java +++ b/dbswitch-dbchange/src/main/java/com/gitee/dbswitch/dbchange/pojo/TaskParamBean.java @@ -19,53 +19,53 @@ import lombok.AllArgsConstructor; /** * 任务参数实体类定义 - * - * @author tang * + * @author tang */ @Data @Builder @AllArgsConstructor public class TaskParamBean { - /** - * 老表的数据源 - */ - @NonNull - DataSource oldDataSource; - /** - * 老表的schema名 - */ - @NonNull - private String oldSchemaName; + /** + * 老表的数据源 + */ + @NonNull + DataSource oldDataSource; - /** - * 老表的table名 - */ - @NonNull - private String oldTableName; + /** + * 老表的schema名 + */ + @NonNull + private String oldSchemaName; - /** - * 新表的数据源 - */ - @NonNull - DataSource newDataSource; + /** + * 老表的table名 + */ + @NonNull + private String oldTableName; - /** - * 新表的schema名 - */ - @NonNull - private String newSchemaName; + /** + * 新表的数据源 + */ + @NonNull + DataSource newDataSource; - /** - * 新表的table名 - */ - @NonNull - private String newTableName; - - /** - * 字段列 - */ - @Nullable - private List fieldColumns; + /** + * 新表的schema名 + */ + @NonNull + private String newSchemaName; + + /** + * 新表的table名 + */ + @NonNull + private String newTableName; + + /** + * 字段列 + */ + @Nullable + private List fieldColumns; } diff --git a/dbswitch-dbchange/src/main/java/com/gitee/dbswitch/dbchange/util/JdbcTypesUtils.java b/dbswitch-dbchange/src/main/java/com/gitee/dbswitch/dbchange/util/JdbcTypesUtils.java index 8fd1d6e5..ecbaa9d3 100644 --- a/dbswitch-dbchange/src/main/java/com/gitee/dbswitch/dbchange/util/JdbcTypesUtils.java +++ b/dbswitch-dbchange/src/main/java/com/gitee/dbswitch/dbchange/util/JdbcTypesUtils.java @@ -16,126 +16,127 @@ import java.util.Map; /** * JDBC的数据类型相关工具类 - * - * @author tang * + * @author tang */ public final class JdbcTypesUtils { - private static final Map TYPE_NAMES = new HashMap<>(); + private static final Map TYPE_NAMES = new HashMap<>(); - static { - try { - for (Field field : Types.class.getFields()) { - TYPE_NAMES.put((Integer) field.get(null), field.getName()); - } - } catch (Exception ex) { - throw new IllegalStateException("Failed to resolve JDBC Types constants", ex); - } - } + static { + try { + for (Field field : Types.class.getFields()) { + TYPE_NAMES.put((Integer) field.get(null), field.getName()); + } + } catch (Exception ex) { + throw new IllegalStateException("Failed to resolve JDBC Types constants", ex); + } + } - /** - * 将JDBC的整型类型转换成文本类型 - * - * @param sqlType jdbc的整型类型,详见:{@codejava.sql.Types } - * @return JDBC的文本类型 - */ - public static String resolveTypeName(int sqlType) { - return TYPE_NAMES.get(sqlType); - } + /** + * 将JDBC的整型类型转换成文本类型 + * + * @param sqlType jdbc的整型类型,详见:{@codejava.sql.Types } + * @return JDBC的文本类型 + */ + public static String resolveTypeName(int sqlType) { + return TYPE_NAMES.get(sqlType); + } - /** - * 判断是否为JDCB的浮点数类型 - * - * @param sqlType jdbc的整型类型,详见:{@codejava.sql.Types } - * @return true为是,否则为false - */ - public static boolean isNumeric(int sqlType) { - // 5 - return (Types.DECIMAL == sqlType || Types.DOUBLE == sqlType || Types.FLOAT == sqlType - || Types.NUMERIC == sqlType || Types.REAL == sqlType); - } + /** + * 判断是否为JDCB的浮点数类型 + * + * @param sqlType jdbc的整型类型,详见:{@codejava.sql.Types } + * @return true为是,否则为false + */ + public static boolean isNumeric(int sqlType) { + // 5 + return (Types.DECIMAL == sqlType || Types.DOUBLE == sqlType || Types.FLOAT == sqlType + || Types.NUMERIC == sqlType || Types.REAL == sqlType); + } - /** - * 判断是否为JDCB的整型类型 - * - * @param sqlType jdbc的整型类型,详见:{@codejava.sql.Types } - * @return true为是,否则为false - */ - public static boolean isInteger(int sqlType) { - // 5 - return (Types.BIT == sqlType || Types.BIGINT == sqlType || Types.INTEGER == sqlType || Types.SMALLINT == sqlType - || Types.TINYINT == sqlType); - } - - /** - * 判断是否为JDCB的字符文本类型 - * - * @param sqlType jdbc的整型类型,详见:{@codejava.sql.Types } - * @return true为是,否则为false - */ - public static boolean isString(int sqlType) { - // 10 - return (Types.CHAR == sqlType || Types.NCHAR == sqlType || Types.VARCHAR == sqlType - || Types.LONGVARCHAR == sqlType || Types.NVARCHAR == sqlType || Types.LONGNVARCHAR == sqlType - || Types.CLOB == sqlType || Types.NCLOB == sqlType || Types.SQLXML == sqlType - || Types.ROWID == sqlType); - } + /** + * 判断是否为JDCB的整型类型 + * + * @param sqlType jdbc的整型类型,详见:{@codejava.sql.Types } + * @return true为是,否则为false + */ + public static boolean isInteger(int sqlType) { + // 5 + return (Types.BIT == sqlType || Types.BIGINT == sqlType || Types.INTEGER == sqlType + || Types.SMALLINT == sqlType + || Types.TINYINT == sqlType); + } - /** - * 判断是否为JDCB的时间类型 - * - * @param sqlType jdbc的整型类型,详见:{@codejava.sql.Types } - * @return true为是,否则为false - */ - public static boolean isDateTime(int sqlType) { - // 5 - return (Types.DATE == sqlType || Types.TIME == sqlType || Types.TIMESTAMP == sqlType - || Types.TIME_WITH_TIMEZONE == sqlType || Types.TIMESTAMP_WITH_TIMEZONE == sqlType); - } + /** + * 判断是否为JDCB的字符文本类型 + * + * @param sqlType jdbc的整型类型,详见:{@codejava.sql.Types } + * @return true为是,否则为false + */ + public static boolean isString(int sqlType) { + // 10 + return (Types.CHAR == sqlType || Types.NCHAR == sqlType || Types.VARCHAR == sqlType + || Types.LONGVARCHAR == sqlType || Types.NVARCHAR == sqlType + || Types.LONGNVARCHAR == sqlType + || Types.CLOB == sqlType || Types.NCLOB == sqlType || Types.SQLXML == sqlType + || Types.ROWID == sqlType); + } - /** - * 判断是否为JDCB的布尔类型 - * - * @param sqlType jdbc的整型类型,详见:{@codejava.sql.Types } - * @return true为是,否则为false - */ - public static boolean isBoolean(int sqlType) { - // 1 - return (Types.BOOLEAN == sqlType); - } + /** + * 判断是否为JDCB的时间类型 + * + * @param sqlType jdbc的整型类型,详见:{@codejava.sql.Types } + * @return true为是,否则为false + */ + public static boolean isDateTime(int sqlType) { + // 5 + return (Types.DATE == sqlType || Types.TIME == sqlType || Types.TIMESTAMP == sqlType + || Types.TIME_WITH_TIMEZONE == sqlType || Types.TIMESTAMP_WITH_TIMEZONE == sqlType); + } - /** - * 判断是否为JDCB的二进制类型 - * - * @param sqlType jdbc的整型类型,详见:{@codejava.sql.Types } - * @return true为是,否则为false - */ - public static boolean isBinary(int sqlType) { - // 4 - return (Types.BINARY == sqlType || Types.VARBINARY == sqlType || Types.BLOB == sqlType - || Types.LONGVARBINARY == sqlType); - } - - public static boolean isTextable(int sqlType) { - return isNumeric(sqlType) || isString(sqlType) || isDateTime(sqlType) || isBoolean(sqlType) ; - } + /** + * 判断是否为JDCB的布尔类型 + * + * @param sqlType jdbc的整型类型,详见:{@codejava.sql.Types } + * @return true为是,否则为false + */ + public static boolean isBoolean(int sqlType) { + // 1 + return (Types.BOOLEAN == sqlType); + } - // 其他类型如下:9个 - // JAVA_OBJECT - // OTHER - // NULL - // DISTINCT - // STRUCT - // ARRAY - // REF - // DATALINK - // REF_CURSOR - - /** - * 构造函数私有化 - */ - private JdbcTypesUtils() { - } + /** + * 判断是否为JDCB的二进制类型 + * + * @param sqlType jdbc的整型类型,详见:{@codejava.sql.Types } + * @return true为是,否则为false + */ + public static boolean isBinary(int sqlType) { + // 4 + return (Types.BINARY == sqlType || Types.VARBINARY == sqlType || Types.BLOB == sqlType + || Types.LONGVARBINARY == sqlType); + } + + public static boolean isTextable(int sqlType) { + return isNumeric(sqlType) || isString(sqlType) || isDateTime(sqlType) || isBoolean(sqlType); + } + + // 其他类型如下:9个 + // JAVA_OBJECT + // OTHER + // NULL + // DISTINCT + // STRUCT + // ARRAY + // REF + // DATALINK + // REF_CURSOR + + /** + * 构造函数私有化 + */ + private JdbcTypesUtils() { + } }