支持db2和MariaDB数据库

This commit is contained in:
inrgihc
2020-11-27 18:37:24 +08:00
parent a1a9ac0a2e
commit 54aa539bd3
15 changed files with 463 additions and 13 deletions

View File

@@ -49,7 +49,17 @@ public enum DatabaseTypeEnum {
/**
* Greenplum数据库类型
*/
GREENPLUM(6);
GREENPLUM(6),
/**
* MariaDB数据库类型
*/
MARIADB(7),
/**
* DB2数据库类型
*/
DB2(8);
private int index;

View File

@@ -44,8 +44,6 @@
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>sqljdbc4</artifactId>
<version>4.0</version>
<scope>system</scope>
<systemPath>${project.basedir}/lib/sqljdbc4-4.0.jar</systemPath>
</dependency>
<dependency>
@@ -80,6 +78,18 @@
<systemPath>${project.basedir}/lib/greenplum-jdbc-5.1.4.jar</systemPath>
</dependency>
<dependency>
<groupId>org.mariadb.jdbc</groupId>
<artifactId>mariadb-java-client</artifactId>
<version>2.7.1</version>
</dependency>
<dependency>
<groupId>com.ibm.db2.jcc</groupId>
<artifactId>db2jcc</artifactId>
<version>db2jcc4</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>

View File

@@ -12,7 +12,9 @@ package com.weishao.dbswitch.core.database;
import java.util.HashMap;
import java.util.Map;
import com.weishao.dbswitch.common.constant.DatabaseTypeEnum;
import com.weishao.dbswitch.core.database.impl.DatabaseDB2Impl;
import com.weishao.dbswitch.core.database.impl.DatabaseGreenplumImpl;
import com.weishao.dbswitch.core.database.impl.DatabaseMariaDBImpl;
import com.weishao.dbswitch.core.database.impl.DatabaseMysqlImpl;
import com.weishao.dbswitch.core.database.impl.DatabaseOracleImpl;
import com.weishao.dbswitch.core.database.impl.DatabasePostgresImpl;
@@ -37,6 +39,8 @@ public final class DatabaseFactory {
put(DatabaseTypeEnum.SQLSERVER,DatabaseSqlserverImpl.class.getName());
put(DatabaseTypeEnum.POSTGRESQL,DatabasePostgresImpl.class.getName());
put(DatabaseTypeEnum.GREENPLUM,DatabaseGreenplumImpl.class.getName());
put(DatabaseTypeEnum.MARIADB,DatabaseMariaDBImpl.class.getName());
put(DatabaseTypeEnum.DB2,DatabaseDB2Impl.class.getName());
}};
public static AbstractDatabase getDatabaseInstance(DatabaseTypeEnum type) {

View File

@@ -0,0 +1,146 @@
// Copyright tang. All rights reserved.
// https://gitee.com/inrgihc/dbswitch
//
// Use of this source code is governed by a BSD-style license
//
// Author: tang (inrgihc@126.com)
// Data : 2020/1/2
// Location: beijing , china
/////////////////////////////////////////////////////////////
package com.weishao.dbswitch.core.database.impl;
import java.util.List;
import com.alibaba.druid.sql.SQLUtils;
import com.alibaba.druid.util.JdbcConstants;
import com.weishao.dbswitch.common.constant.DatabaseTypeEnum;
import com.weishao.dbswitch.core.constant.Const;
import com.weishao.dbswitch.core.database.AbstractDatabase;
import com.weishao.dbswitch.core.database.IDatabaseInterface;
import com.weishao.dbswitch.core.model.ColumnDescription;
import com.weishao.dbswitch.core.model.ColumnMetaData;
/**
* 支持DB2数据库的元信息实现
*
* @author tang
*
*/
public class DatabaseDB2Impl extends AbstractDatabase implements IDatabaseInterface {
public DatabaseDB2Impl() {
super("com.ibm.db2.jcc.DB2Driver");
}
@Override
public List<ColumnDescription> querySelectSqlColumnMeta(String sql) {
String querySQL = String.format(" %s LIMIT 0 ", sql.replace(";", ""));
return this.getSelectSqlColumnMeta(querySQL, DatabaseTypeEnum.DB2);
}
@Override
protected String getTableFieldsQuerySQL(String schemaName, String tableName) {
return String.format("SELECT * FROM \"%s\".\"%s\" ", schemaName, tableName);
}
@Override
protected String getTestQuerySQL(String sql) {
return String.format("explain %s", sql.replace(";", ""));
}
@Override
public String formatSQL(String sql) {
return SQLUtils.format(sql, JdbcConstants.DB2);
}
@Override
public String getFieldDefinition(ColumnMetaData v, List<String> pks, boolean useAutoInc, boolean addCr) {
String fieldname = v.getName();
int length = v.getLength();
int precision = v.getPrecision();
int type = v.getType();
String retval = " \"" + fieldname + "\" ";
switch (type) {
case ColumnMetaData.TYPE_TIMESTAMP:
retval += "TIMESTAMP";
break;
case ColumnMetaData.TYPE_TIME:
retval += "TIME";
break;
case ColumnMetaData.TYPE_DATE:
retval += "DATE";
break;
case ColumnMetaData.TYPE_BOOLEAN:
retval += "CHARACTER(32)";
break;
case ColumnMetaData.TYPE_NUMBER:
case ColumnMetaData.TYPE_BIGNUMBER:
if (null != pks && pks.contains(fieldname)) {
if (useAutoInc) {
retval += "BIGINT NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 0, INCREMENT BY 1, NOCACHE)";
} else {
retval += "BIGINT";
}
} else {
if (length > 0) {
retval += "DECIMAL(" + length;
if (precision > 0) {
retval += ", " + precision;
}
retval += ")";
} else {
retval += "FLOAT";
}
}
break;
case ColumnMetaData.TYPE_INTEGER:
if (null != pks && pks.contains(fieldname)) {
retval += "INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 0, INCREMENT BY 1, NOCACHE)";
} else {
retval += "INTEGER";
}
break;
case ColumnMetaData.TYPE_STRING:
if (length > 32672 || length >= AbstractDatabase.CLOB_LENGTH) {
retval += "CLOB";
} else {
retval += "VARCHAR";
if (length > 0) {
retval += "(" + length;
} else {
retval += "(";
}
retval += ")";
}
if (null != pks && pks.contains(fieldname)) {
retval += " NOT NULL";
}
break;
case ColumnMetaData.TYPE_BINARY:
if (length > 32672 || length >= AbstractDatabase.CLOB_LENGTH) {
retval += "BLOB(" + length + ")";
} else {
if (length > 0) {
retval += "CHAR(" + length + ") FOR BIT DATA";
} else {
retval += "BLOB";
}
}
break;
default:
retval += "CLOB";
break;
}
if (addCr) {
retval += Const.CR;
}
return retval;
}
}

View File

@@ -0,0 +1,33 @@
// Copyright tang. All rights reserved.
// https://gitee.com/inrgihc/dbswitch
//
// Use of this source code is governed by a BSD-style license
//
// Author: tang (inrgihc@126.com)
// Data : 2020/1/2
// Location: beijing , china
/////////////////////////////////////////////////////////////
package com.weishao.dbswitch.core.database.impl;
import java.util.List;
import com.weishao.dbswitch.common.constant.DatabaseTypeEnum;
import com.weishao.dbswitch.core.model.ColumnDescription;
/**
* 支持MariaDB数据库的元信息实现
*
* @author tang
*
*/
public class DatabaseMariaDBImpl extends DatabaseMysqlImpl {
public DatabaseMariaDBImpl() {
super("org.mariadb.jdbc.Driver");
}
@Override
public List<ColumnDescription> querySelectSqlColumnMeta(String sql) {
String querySQL = String.format(" %s LIMIT 0 ", sql.replace(";", ""));
return this.getSelectSqlColumnMeta(querySQL, DatabaseTypeEnum.MARIADB);
}
}

View File

@@ -39,6 +39,10 @@ public class DatabaseMysqlImpl extends AbstractDatabase implements IDatabaseInte
super("com.mysql.cj.jdbc.Driver");
}
public DatabaseMysqlImpl(String driverClassName) {
super(driverClassName);
}
@Override
public List<String> querySchemaList() {
String mysqlJdbcUrl=null;

View File

@@ -139,6 +139,16 @@ public class JdbcUrlUtils {
case GREENPLUM:
return String.format("jdbc:pivotal:greenplum://%s:%d;DatabaseName=%s", db.getHost(), db.getPort(),
db.getDbname());
case MARIADB:
String charsets = db.getCharset();
if (Objects.isNull(charsets) || charsets.isEmpty()) {
charsets = "utf-8";
}
return String.format(
"jdbc:mariadb://%s:%d/%s?useSSL=false&serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=%s&nullCatalogMeansCurrent=true&connectTimeout=%d",
db.getHost(), db.getPort(), db.getDbname(), charsets, connectTimeout * 1000);
case DB2:
return String.format("jdbc:db2://%s:%d/%s", db.getHost(), db.getPort(), db.getDbname());
default:
throw new RuntimeException(String.format("Unkown database type (%s)", db.getType().name()));
}

View File

@@ -45,6 +45,8 @@ public final class JdbcTemplateUtils {
String driverClassName = dataSource.getDriverClassName();
if (driverClassName.contains("mysql")) {
return DatabaseTypeEnum.MYSQL;
} else if (driverClassName.contains("mariadb")) {
return DatabaseTypeEnum.MYSQL;
} else if (driverClassName.contains("oracle")) {
return DatabaseTypeEnum.ORACLE;
} else if (driverClassName.contains("postgresql")) {
@@ -53,6 +55,8 @@ public final class JdbcTemplateUtils {
return DatabaseTypeEnum.GREENPLUM;
} else if (driverClassName.contains("sqlserver")) {
return DatabaseTypeEnum.SQLSERVER;
} else if (driverClassName.contains("db2")) {
return DatabaseTypeEnum.DB2;
} else {
throw new RuntimeException(
String.format("Unsupport database type by driver class name [%s]", driverClassName));

View File

@@ -1,22 +1,22 @@
# source database connection information
## support MySQL/Oracle/SQLServer/PostgreSQL/Greenplum
source.datasource.url= jdbc:mysql://172.17.207.63:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=true&zeroDateTimeBehavior=convertToNull&nullCatalogMeansCurrent=true
source.datasource.url= jdbc:mysql://172.17.207.210:3306/open_data?useUnicode=true&characterEncoding=utf-8&useSSL=true&zeroDateTimeBehavior=convertToNull&nullCatalogMeansCurrent=true
source.datasource.driver-class-name= com.mysql.cj.jdbc.Driver
source.datasource.username= tang
source.datasource.password= 123321
source.datasource.username= tangyibo
source.datasource.password= tangyibo
# target database connection information
## support MySQL/Oracle/SQLServer/PostgreSQL/Greenplum
target.datasource.url= jdbc:postgresql://172.17.207.44:5432/test
target.datasource.driver-class-name= org.postgresql.Driver
target.datasource.username= tang
target.datasource.password= 123321
target.datasource.url= jdbc:db2://172.17.203.91:50000/testdb
target.datasource.driver-class-name= com.ibm.db2.jcc.DB2Driver
target.datasource.username= db2inst1
target.datasource.password= hangge-1234
# source database configuration parameters
## fetch size for query source database
source.datasource-fetch.size=10000
## schema name for query source database
source.datasource-source.schema=test
source.datasource-source.schema=open_data
## table name include from table lists
source.datasource-source.includes=
## table name exclude from table lists
@@ -24,7 +24,7 @@ source.datasource-source.excludes=
# target database configuration parameters
## schema name for create/insert table data
target.datasource-target.schema=public
target.datasource-target.schema=NULLID
## whether drop-create table when target table exist
target.datasource-target.drop=true
## whether create table support auto increment for primary key field

View File

@@ -16,6 +16,7 @@ import javax.sql.DataSource;
import org.springframework.boot.jdbc.DatabaseDriver;
import org.springframework.jdbc.support.JdbcUtils;
import org.springframework.jdbc.support.MetaDataAccessException;
import com.weishao.dbswitch.dbcommon.database.impl.DB2DatabaseOperator;
import com.weishao.dbswitch.dbcommon.database.impl.GreenplumDatabaseOperator;
import com.weishao.dbswitch.dbcommon.database.impl.MysqlDatabaseOperator;
import com.weishao.dbswitch.dbcommon.database.impl.OracleDatabaseOperator;
@@ -40,6 +41,7 @@ public final class DatabaseOperatorFactory {
put("SQLSERVER", SqlServerDatabaseOperator.class.getName());
put("POSTGRESQL", PostgreSqlDatabaseOperator.class.getName());
put("GREENPLUM", GreenplumDatabaseOperator.class.getName());
put("DB2", DB2DatabaseOperator.class.getName());
}
};
@@ -71,7 +73,7 @@ public final class DatabaseOperatorFactory {
* 根据DataSource获取数据库的类型
*
* @param dataSource 数据库源
* @return 数据库的类型mysql/oracle/postgresql/sqlserver/greenplum
* @return 数据库的类型mysql/oracle/postgresql/sqlserver/greenplum/db2
*/
public static String getDatabaseNameByDataSource(DataSource dataSource) {
try {

View File

@@ -0,0 +1,64 @@
// Copyright tang. All rights reserved.
// https://gitee.com/inrgihc/dbswitch
//
// Use of this source code is governed by a BSD-style license
//
// Author: tang (inrgihc@126.com)
// Data : 2020/1/2
// Location: beijing , china
/////////////////////////////////////////////////////////////
package com.weishao.dbswitch.dbcommon.database.impl;
import java.util.List;
import javax.sql.DataSource;
import org.apache.commons.lang3.StringUtils;
import com.weishao.dbswitch.dbcommon.database.AbstractDatabaseOperator;
import com.weishao.dbswitch.dbcommon.database.IDatabaseOperator;
import com.weishao.dbswitch.dbcommon.pojo.StatementResultSet;
/**
* DB2数据库实现类
*
* @author tang
*
*/
public class DB2DatabaseOperator extends AbstractDatabaseOperator implements IDatabaseOperator {
public DB2DatabaseOperator(DataSource dataSource) {
super(dataSource);
}
@Override
public String getSelectTableSql(String schemaName, String tableName, List<String> fields) {
return String.format("select \"%s\" from \"%s\".\"%s\" ", StringUtils.join(fields, "\",\""), schemaName,
tableName);
}
@Override
public StatementResultSet queryTableData(String schemaName, String tableName, List<String> fields,
List<String> orders) {
String sql = String.format("select \"%s\" from \"%s\".\"%s\" order by \"%s\" asc ",
StringUtils.join(fields, "\",\""), schemaName, tableName, StringUtils.join(orders, "\",\""));
return this.selectTableData(sql, this.fetchSize);
}
@Override
public StatementResultSet queryTableData(String schemaName, String tableName, List<String> fields) {
String sql = String.format("select \"%s\" from \"%s\".\"%s\" ", StringUtils.join(fields, "\",\""), schemaName,
tableName);
return this.selectTableData(sql, this.fetchSize);
}
@Override
public void truncateTableData(String schemaName, String tableName) {
String sql = String.format("TRUNCATE TABLE \"%s\".\"%s\" immediate ", schemaName, tableName);
this.executeSql(sql);
}
@Override
public void dropTable(String schemaName, String tableName) {
String sql = String.format("DROP TABLE \"%s\".\"%s\" ", schemaName, tableName);
this.executeSql(sql);
}
}

View File

@@ -13,6 +13,7 @@ import java.util.Map;
import java.util.HashMap;
import javax.sql.DataSource;
import com.weishao.dbswitch.dbcommon.util.DatabaseAwareUtils;
import com.weishao.dbswitch.dbsynch.db2.DB2DatabaseSynchImpl;
import com.weishao.dbswitch.dbsynch.mssql.SqlServerDatabaseSynchImpl;
import com.weishao.dbswitch.dbsynch.mysql.MySqlDatabaseSynchImpl;
import com.weishao.dbswitch.dbsynch.oracle.OracleDatabaseSynchImpl;
@@ -39,6 +40,7 @@ public final class DatabaseSynchronizeFactory {
put("SQLSERVER", SqlServerDatabaseSynchImpl.class.getName());
put("POSTGRESQL", PostgresqlDatabaseSynchImpl.class.getName());
put("GREENPLUM", GreenplumDatabaseSynchImpl.class.getName());
put("DB2",DB2DatabaseSynchImpl.class.getName());
}
};

View File

@@ -0,0 +1,76 @@
// Copyright tang. All rights reserved.
// https://gitee.com/inrgihc/dbswitch
//
// Use of this source code is governed by a BSD-style license
//
// Author: tang (inrgihc@126.com)
// Data : 2020/1/2
// Location: beijing , china
/////////////////////////////////////////////////////////////
package com.weishao.dbswitch.dbsynch.db2;
import java.util.ArrayList;
import java.util.List;
import javax.sql.DataSource;
import org.apache.commons.lang3.StringUtils;
import com.weishao.dbswitch.dbsynch.AbstractDatabaseSynchronize;
import com.weishao.dbswitch.dbsynch.IDatabaseSynchronize;
/**
* DB2数据库实现类
*
* @author tang
*
*/
public class DB2DatabaseSynchImpl extends AbstractDatabaseSynchronize implements IDatabaseSynchronize {
public DB2DatabaseSynchImpl(DataSource ds) {
super(ds);
}
@Override
public String getColumMetaDataSql(String schemaName, String tableName) {
return String.format("SELECT * FROM \"%s\".\"%s\" WHERE 1=2", schemaName, tableName);
}
@Override
public String getInsertPrepareStatementSql(String schemaName, String tableName, List<String> fieldNames) {
List<String> placeHolders = new ArrayList<String>();
for (int i = 0; i < fieldNames.size(); ++i) {
placeHolders.add("?");
}
return String.format("INSERT INTO \"%s\".\"%s\" ( \"%s\" ) VALUES ( %s )", schemaName, tableName,
StringUtils.join(fieldNames, "\",\""), StringUtils.join(placeHolders, ","));
}
@Override
public String getUpdatePrepareStatementSql(String schemaName, String tableName, List<String> fieldNames,
List<String> pks) {
List<String> uf = new ArrayList<String>();
for (String field : fieldNames) {
if (!pks.contains(field)) {
uf.add(String.format("\"%s\"=?", field));
}
}
List<String> uw = new ArrayList<String>();
for (String pk : pks) {
uw.add(String.format("\"%s\"=?", pk));
}
return String.format("UPDATE \"%s\".\"%s\" SET %s WHERE %s", schemaName, tableName, StringUtils.join(uf, " , "),
StringUtils.join(uw, " AND "));
}
@Override
public String getDeletePrepareStatementSql(String schemaName, String tableName, List<String> pks) {
List<String> uw = new ArrayList<String>();
for (String pk : pks) {
uw.add(String.format("\"%s\"=?", pk));
}
return String.format("DELETE FROM \"%s\".\"%s\" WHERE %s ", schemaName, tableName, StringUtils.join(uw, " AND "));
}
}

View File

@@ -35,6 +35,7 @@ public class DatabaseWriterFactory {
put("SQLSERVER", "com.weishao.dbswitch.dbwriter.mssql.SqlServerWriterImpl");
put("POSTGRESQL", "com.weishao.dbswitch.dbwriter.gpdb.GreenplumCopyWriterImpl");
put("GREENPLUM", "com.weishao.dbswitch.dbwriter.gpdb.GreenplumCopyWriterImpl");
put("DB2", "com.weishao.dbswitch.dbwriter.db2.DB2WriterImpl");
}
};

View File

@@ -0,0 +1,84 @@
// Copyright tang. All rights reserved.
// https://gitee.com/inrgihc/dbswitch
//
// Use of this source code is governed by a BSD-style license
//
// Author: tang (inrgihc@126.com)
// Data : 2020/1/2
// Location: beijing , china
/////////////////////////////////////////////////////////////
package com.weishao.dbswitch.dbwriter.db2;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import javax.sql.DataSource;
import org.apache.commons.lang3.StringUtils;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionException;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import com.weishao.dbswitch.dbwriter.AbstractDatabaseWriter;
import com.weishao.dbswitch.dbwriter.IDatabaseWriter;
import lombok.extern.slf4j.Slf4j;
/**
* DB2数据库写入实现类
*
* @author tang
*
*/
@Slf4j
public class DB2WriterImpl extends AbstractDatabaseWriter implements IDatabaseWriter {
public DB2WriterImpl(DataSource dataSource) {
super(dataSource);
}
@Override
public long write(List<String> fieldNames, List<Object[]> recordValues) {
List<String> placeHolders = new ArrayList<String>();
for (int i = 0; i < fieldNames.size(); ++i) {
placeHolders.add("?");
}
String schemaName = Objects.requireNonNull(this.schemaName, "schema-name名称为空不合法!");
String tableName = Objects.requireNonNull(this.tableName, "table-name名称为空不合法!");
String sqlInsert = String.format("INSERT INTO \"%s\".\"%s\" ( \"%s\" ) VALUES ( %s )", schemaName, tableName,
StringUtils.join(fieldNames, "\",\""), StringUtils.join(placeHolders, ","));
int[] argTypes = new int[fieldNames.size()];
for (int i = 0; i < fieldNames.size(); ++i) {
String col = fieldNames.get(i);
argTypes[i] = this.columnType.get(col);
}
DefaultTransactionDefinition definition = new DefaultTransactionDefinition();
definition.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED);
definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
DataSourceTransactionManager transactionManager = new DataSourceTransactionManager(this.dataSource);
TransactionStatus status = transactionManager.getTransaction(definition);
try {
int affectCount = 0;
jdbcTemplate.batchUpdate(sqlInsert, recordValues, argTypes);
affectCount = recordValues.size();
recordValues.clear();
transactionManager.commit(status);
if (log.isDebugEnabled()) {
log.debug("DB2 insert write data affect count:{}", affectCount);
}
return affectCount;
} catch (TransactionException e) {
transactionManager.rollback(status);
throw e;
} catch (Exception e) {
transactionManager.rollback(status);
throw e;
}
}
}