version for 1.6.11 : support SQLite

This commit is contained in:
inrgihc
2022-07-02 17:46:13 +08:00
parent 432a470eba
commit fd312588c7
73 changed files with 716 additions and 165 deletions

View File

@@ -5,7 +5,7 @@
<parent>
<groupId>com.gitee.dbswitch</groupId>
<artifactId>dbswitch-parent</artifactId>
<version>1.6.10</version>
<version>1.6.11</version>
</parent>
<artifactId>dbswitch-dbsynch</artifactId>

View File

@@ -37,7 +37,6 @@ import org.springframework.transaction.support.DefaultTransactionDefinition;
@Slf4j
public abstract class AbstractDatabaseSynchronize implements IDatabaseSynchronize {
private final DefaultTransactionDefinition defination;
private JdbcTemplate jdbcTemplate;
private PlatformTransactionManager transactionManager;
private Map<String, Integer> columnType;
@@ -51,10 +50,6 @@ public abstract class AbstractDatabaseSynchronize implements IDatabaseSynchroniz
protected int[] deleteArgsType;
public AbstractDatabaseSynchronize(DataSource ds) {
this.defination = new DefaultTransactionDefinition();
this.defination.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED);
this.defination.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
this.jdbcTemplate = new JdbcTemplate(ds);
this.transactionManager = new DataSourceTransactionManager(ds);
this.columnType = new HashMap<>();
@@ -65,6 +60,13 @@ public abstract class AbstractDatabaseSynchronize implements IDatabaseSynchroniz
return this.jdbcTemplate.getDataSource();
}
protected TransactionDefinition getTransactionDefinition() {
DefaultTransactionDefinition definition = new DefaultTransactionDefinition();
definition.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED);
definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
return definition;
}
/**
* 获取查询列元信息的SQL语句
*
@@ -178,7 +180,7 @@ public abstract class AbstractDatabaseSynchronize implements IDatabaseSynchroniz
@Override
public long executeInsert(List<Object[]> records) {
TransactionStatus status = transactionManager.getTransaction(defination);
TransactionStatus status = transactionManager.getTransaction(getTransactionDefinition());
if (log.isDebugEnabled()) {
log.debug("Execute Insert SQL : {}", this.insertStatementSql);
}
@@ -227,7 +229,7 @@ public abstract class AbstractDatabaseSynchronize implements IDatabaseSynchroniz
datas.add(nr);
}
TransactionStatus status = transactionManager.getTransaction(defination);
TransactionStatus status = transactionManager.getTransaction(getTransactionDefinition());
if (log.isDebugEnabled()) {
log.debug("Execute Update SQL : {}", this.updateStatementSql);
}
@@ -264,7 +266,7 @@ public abstract class AbstractDatabaseSynchronize implements IDatabaseSynchroniz
datas.add(nr);
}
TransactionStatus status = transactionManager.getTransaction(defination);
TransactionStatus status = transactionManager.getTransaction(getTransactionDefinition());
if (log.isDebugEnabled()) {
log.debug("Execute Delete SQL : {}", this.deleteStatementSql);
}

View File

@@ -9,6 +9,7 @@
/////////////////////////////////////////////////////////////
package com.gitee.dbswitch.dbsynch;
import com.gitee.dbswitch.common.type.DatabaseTypeEnum;
import com.gitee.dbswitch.common.util.DatabaseAwareUtils;
import com.gitee.dbswitch.dbsynch.db2.DB2DatabaseSyncImpl;
import com.gitee.dbswitch.dbsynch.dm.DmDatabaseSyncImpl;
@@ -18,6 +19,7 @@ import com.gitee.dbswitch.dbsynch.mysql.MySqlDatabaseSyncImpl;
import com.gitee.dbswitch.dbsynch.oracle.OracleDatabaseSyncImpl;
import com.gitee.dbswitch.dbsynch.pgsql.GreenplumDatabaseSyncImpl;
import com.gitee.dbswitch.dbsynch.pgsql.PostgresqlDatabaseSyncImpl;
import com.gitee.dbswitch.dbsynch.sqlite.Sqlite3DatabaseSyncImpl;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
@@ -30,20 +32,22 @@ import javax.sql.DataSource;
*/
public final class DatabaseSynchronizeFactory {
private static final Map<String, Function<DataSource, IDatabaseSynchronize>> DATABASE_SYNC_MAPPER = new HashMap<String, Function<DataSource, IDatabaseSynchronize>>() {
private static final Map<DatabaseTypeEnum, Function<DataSource, IDatabaseSynchronize>> DATABASE_SYNC_MAPPER
= new HashMap<DatabaseTypeEnum, Function<DataSource, IDatabaseSynchronize>>() {
private static final long serialVersionUID = -2359773637275934408L;
{
put("MYSQL", MySqlDatabaseSyncImpl::new);
put("ORACLE", OracleDatabaseSyncImpl::new);
put("SQLSERVER", SqlServerDatabaseSyncImpl::new);
put("SQLSERVER2000", SqlServerDatabaseSyncImpl::new);
put("POSTGRESQL", PostgresqlDatabaseSyncImpl::new);
put("GREENPLUM", GreenplumDatabaseSyncImpl::new);
put("DB2", DB2DatabaseSyncImpl::new);
put("DM", DmDatabaseSyncImpl::new);
put("KINGBASE", KingbaseDatabaseSyncImpl::new);
put(DatabaseTypeEnum.MYSQL, MySqlDatabaseSyncImpl::new);
put(DatabaseTypeEnum.ORACLE, OracleDatabaseSyncImpl::new);
put(DatabaseTypeEnum.SQLSERVER, SqlServerDatabaseSyncImpl::new);
put(DatabaseTypeEnum.SQLSERVER2000, SqlServerDatabaseSyncImpl::new);
put(DatabaseTypeEnum.POSTGRESQL, PostgresqlDatabaseSyncImpl::new);
put(DatabaseTypeEnum.GREENPLUM, GreenplumDatabaseSyncImpl::new);
put(DatabaseTypeEnum.DB2, DB2DatabaseSyncImpl::new);
put(DatabaseTypeEnum.DM, DmDatabaseSyncImpl::new);
put(DatabaseTypeEnum.KINGBASE, KingbaseDatabaseSyncImpl::new);
put(DatabaseTypeEnum.SQLITE3, Sqlite3DatabaseSyncImpl::new);
}
};
@@ -54,7 +58,7 @@ public final class DatabaseSynchronizeFactory {
* @return 同步器对象
*/
public static IDatabaseSynchronize createDatabaseWriter(DataSource dataSource) {
String type = DatabaseAwareUtils.getDatabaseTypeByDataSource(dataSource).name();
DatabaseTypeEnum type = DatabaseAwareUtils.getDatabaseTypeByDataSource(dataSource);
if (!DATABASE_SYNC_MAPPER.containsKey(type)) {
throw new RuntimeException(
String.format("[dbsynch] Unsupported database type (%s)", type));

View File

@@ -0,0 +1,115 @@
// Copyright tang. All rights reserved.
// https://gitee.com/inrgihc/dbswitch
//
// Use of this source code is governed by a BSD-style license
//
// Author: tang (inrgihc@126.com)
// Date : 2020/1/2
// Location: beijing , china
/////////////////////////////////////////////////////////////
package com.gitee.dbswitch.dbsynch.sqlite;
import com.gitee.dbswitch.common.util.TypeConvertUtils;
import com.gitee.dbswitch.dbsynch.AbstractDatabaseSynchronize;
import com.gitee.dbswitch.dbsynch.IDatabaseSynchronize;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import javax.sql.DataSource;
import org.apache.commons.lang3.StringUtils;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.support.DefaultTransactionDefinition;
/**
* SQLite数据库DML同步实现类
*
* @author tang
*/
public class Sqlite3DatabaseSyncImpl extends AbstractDatabaseSynchronize implements
IDatabaseSynchronize {
public Sqlite3DatabaseSyncImpl(DataSource ds) {
super(ds);
}
@Override
protected TransactionDefinition getTransactionDefinition() {
DefaultTransactionDefinition definition = new DefaultTransactionDefinition();
definition.setIsolationLevel(TransactionDefinition.ISOLATION_SERIALIZABLE);
definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
return definition;
}
@Override
public String getColumnMetaDataSql(String schemaName, String tableName) {
return String.format("SELECT * FROM \"%s\".\"%s\" WHERE 1=2", schemaName, tableName);
}
@Override
public String getInsertPrepareStatementSql(String schemaName, String tableName,
List<String> fieldNames) {
List<String> placeHolders = Collections.nCopies(fieldNames.size(), "?");
return String.format("INSERT INTO \"%s\".\"%s\" ( \"%s\" ) VALUES ( %s )",
schemaName, tableName,
StringUtils.join(fieldNames, "\",\""),
StringUtils.join(placeHolders, ","));
}
@Override
public String getUpdatePrepareStatementSql(String schemaName, String tableName,
List<String> fieldNames, List<String> pks) {
List<String> uf = fieldNames.stream()
.filter(field -> !pks.contains(field))
.map(field -> String.format("\"%s\"=?", field))
.collect(Collectors.toList());
List<String> uw = pks.stream()
.map(pk -> String.format("\"%s\"=?", pk))
.collect(Collectors.toList());
return String.format("UPDATE \"%s\".\"%s\" SET %s WHERE %s",
schemaName, tableName, StringUtils.join(uf, " , "),
StringUtils.join(uw, " AND "));
}
@Override
public String getDeletePrepareStatementSql(String schemaName, String tableName,
List<String> pks) {
List<String> uw = pks.stream()
.map(pk -> String.format("\"%s\"=?", pk))
.collect(Collectors.toList());
return String.format("DELETE FROM \"%s\".\"%s\" WHERE %s ",
schemaName, tableName, StringUtils.join(uw, " AND "));
}
@Override
public long executeInsert(List<Object[]> records) {
records.parallelStream().forEach((Object[] row) -> {
for (int i = 0; i < row.length; ++i) {
try {
row[i] = TypeConvertUtils.castByDetermine(row[i]);
} catch (Exception e) {
row[i] = null;
}
}
});
return super.executeInsert(records);
}
@Override
public long executeUpdate(List<Object[]> records) {
records.parallelStream().forEach((Object[] row) -> {
for (int i = 0; i < row.length; ++i) {
try {
row[i] = TypeConvertUtils.castByDetermine(row[i]);
} catch (Exception e) {
row[i] = null;
}
}
});
return super.executeUpdate(records);
}
}