代码提交

This commit is contained in:
inrgihc
2020-08-31 00:11:24 +08:00
commit 1f2565d40e
215 changed files with 20016 additions and 0 deletions

24
dbswitch-dbsynch/pom.xml Normal file
View File

@@ -0,0 +1,24 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.weishao</groupId>
<artifactId>dbswitch</artifactId>
<version>1.4.0</version>
</parent>
<artifactId>dbswitch-dbsynch</artifactId>
<dependencies>
<dependency>
<groupId>com.weishao</groupId>
<artifactId>dbswitch-dbcommon</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,309 @@
// Copyright tang. All rights reserved.
// https://gitee.com/inrgihc/dbswitch
//
// Use of this source code is governed by a BSD-style license
//
// Author: tang (inrgihc@126.com)
// Data : 2020/1/2
// Location: beijing , china
/////////////////////////////////////////////////////////////
package com.weishao.dbswitch.dbsynch;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import javax.sql.DataSource;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.ConnectionCallback;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.jdbc.support.JdbcUtils;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionException;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import lombok.extern.slf4j.Slf4j;
/**
* 数据同步抽象基类
*
* @author tang
*
*/
@Slf4j
public abstract class AbstractDatabaseSynchronize implements IDatabaseSynchronize {
private DefaultTransactionDefinition defination;
private JdbcTemplate jdbcTemplate;
private PlatformTransactionManager transactionManager;
//~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~//
private Map<String, Integer> columnType;
protected List<String> fieldOrders;
protected List<String> pksOrders;
protected String insertStatementSql;
protected String updateStatementSql;
protected String deleteStatementSql;
protected int[] insertArgsType;
protected int[] updateArgsType;
protected int[] deleteArgsType;
public AbstractDatabaseSynchronize(DataSource ds) {
this.defination= new DefaultTransactionDefinition();
this.defination.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED);
this.defination.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
this.jdbcTemplate=new JdbcTemplate(ds);
this.transactionManager = new DataSourceTransactionManager(ds);
this.columnType=new HashMap<>();
}
@Override
public DataSource getDataSource() {
return this.jdbcTemplate.getDataSource();
}
/**
* 获取查询列元信息的SQL语句
*
* @param schemaName 模式名称
* @param tableName 表名称
* @return SQL语句
*/
public abstract String getColumMetaDataSql(String schemaName, String tableName);
/**
* 生成Insert操作的SQL语句
*
* @param schemaName 模式名称
* @param tableName 表名称
* @param fieldNames 字段列表
* @return Insert操作的SQL语句
*/
public abstract String getInsertPrepareStatementSql(String schemaName, String tableName, List<String> fieldNames);
/**
* 生成Update操作的SQL语句
*
* @param schemaName 模式名称
* @param tableName 表名称
* @param fieldNames 字段列表
* @param pks 主键列表
* @return Update操作的SQL语句
*/
public abstract String getUpdatePrepareStatementSql(String schemaName, String tableName, List<String> fieldNames, List<String> pks);
/**
* 生成Delete操作的SQL语句
*
* @param schemaName 模式名称
* @param tableName 表名称
* @param pks 主键列表
* @return Delete操作的SQL语句
*/
public abstract String getDeletePrepareStatementSql(String schemaName, String tableName, List<String> pks);
@Override
public void prepare(String schemaName, String tableName, List<String> fieldNames, List<String> pks) {
if (fieldNames.isEmpty() || pks.isEmpty() || fieldNames.size() < pks.size()) {
throw new IllegalArgumentException("字段列表和主键列表不能为空,或者字段总个数应不小于主键总个数");
}
if (!fieldNames.containsAll(pks)) {
throw new IllegalArgumentException("字段列表必须包含主键列表");
}
String sql = this.getColumMetaDataSql(schemaName, tableName);
columnType.clear();
this.jdbcTemplate.execute(new ConnectionCallback<Boolean>() {
@Override
public Boolean doInConnection(Connection conn) throws SQLException, DataAccessException {
Statement stmt = null;
ResultSet rs = null;
try {
stmt = conn.createStatement();
rs = stmt.executeQuery(sql);
ResultSetMetaData rsMetaData = rs.getMetaData();
for (int i = 0, len = rsMetaData.getColumnCount(); i < len; i++) {
columnType.put(rsMetaData.getColumnName(i + 1), rsMetaData.getColumnType(i + 1));
}
return true;
} catch (Exception e) {
throw new RuntimeException(
String.format("获取表:%s.%s 的字段的元信息时失败. 请联系 DBA 核查该库、表信息.", schemaName, tableName), e);
} finally {
JdbcUtils.closeResultSet(rs);
JdbcUtils.closeStatement(stmt);
}
}
});
this.fieldOrders = new ArrayList<String>(fieldNames);
this.pksOrders = new ArrayList<String>(pks);
this.insertStatementSql = this.getInsertPrepareStatementSql(schemaName, tableName, fieldNames);
this.updateStatementSql = this.getUpdatePrepareStatementSql(schemaName, tableName, fieldNames, pks);
this.deleteStatementSql = this.getDeletePrepareStatementSql(schemaName, tableName, pks);
insertArgsType = new int[fieldNames.size()];
for (int k = 0; k < fieldNames.size(); ++k) {
String field = fieldNames.get(k);
insertArgsType[k] = this.columnType.get(field);
}
updateArgsType = new int[fieldNames.size()];
int idx = 0;
for (int i = 0; i < fieldNames.size(); ++i) {
String field = fieldNames.get(i);
if (!pks.contains(field)) {
updateArgsType[idx++] = this.columnType.get(field);
}
}
for (String pk : pks) {
updateArgsType[idx++] = this.columnType.get(pk);
}
deleteArgsType = new int[pks.size()];
for (int j = 0; j < pks.size(); ++j) {
String pk = pks.get(j);
deleteArgsType[j] = this.columnType.get(pk);
}
}
@Override
public long executeInsert(List<Object[]> records) {
TransactionStatus status = transactionManager.getTransaction(defination);
if (log.isDebugEnabled()) {
log.debug("Execute Insert SQL : {}", this.insertStatementSql);
}
try {
int[] affects = jdbcTemplate.batchUpdate(this.insertStatementSql, records, this.insertArgsType);
int affectCount = 0;
for (int i : affects) {
affectCount += i;
}
transactionManager.commit(status);
return affectCount;
} catch (TransactionException e) {
transactionManager.rollback(status);
throw e;
} catch (Exception e) {
transactionManager.rollback(status);
throw e;
}
}
@Override
public long executeUpdate(List<Object[]> records) {
List<Object[]> datas = new LinkedList<Object[]>();
for (Object[] r : records) {
Object[] nr = new Object[this.fieldOrders.size()];
int idx=0;
for (int i = 0; i < this.fieldOrders.size(); ++i) {
String field = this.fieldOrders.get(i);
if(!this.pksOrders.contains(field)) {
int index = this.fieldOrders.indexOf(field);
nr[idx++] = r[index];
}
}
for(int j=0;j<this.pksOrders.size();++j) {
String pk = this.pksOrders.get(j);
int index = this.fieldOrders.indexOf(pk);
nr[idx++] = r[index];
}
datas.add(nr);
}
TransactionStatus status = transactionManager.getTransaction(defination);
if (log.isDebugEnabled()) {
log.debug("Execute Update SQL : {}", this.updateStatementSql);
}
try {
int[] affects = jdbcTemplate.batchUpdate(this.updateStatementSql, datas, this.updateArgsType);
int affectCount = 0;
for (int i : affects) {
affectCount += i;
}
transactionManager.commit(status);
return affectCount;
} catch (TransactionException e) {
transactionManager.rollback(status);
throw e;
} catch (Exception e) {
transactionManager.rollback(status);
throw e;
}
}
@Override
public long executeDelete(List<Object[]> records) {
List<Object[]> datas = new LinkedList<Object[]>();
for (Object[] r : records) {
Object[] nr = new Object[this.pksOrders.size()];
for (int i = 0; i < this.pksOrders.size(); ++i) {
String pk = this.pksOrders.get(i);
int index = this.fieldOrders.indexOf(pk);
nr[i] = r[index];
}
datas.add(nr);
}
TransactionStatus status = transactionManager.getTransaction(defination);
if (log.isDebugEnabled()) {
log.debug("Execute Delete SQL : {}", this.deleteStatementSql);
}
try {
int[] affects = jdbcTemplate.batchUpdate(this.deleteStatementSql, datas, this.deleteArgsType);
int affectCount = 0;
for (int i : affects) {
affectCount += i;
}
transactionManager.commit(status);
return affectCount;
} catch (TransactionException e) {
transactionManager.rollback(status);
throw e;
} catch (Exception e) {
transactionManager.rollback(status);
throw e;
} finally {
datas.clear();
}
}
}

View File

@@ -0,0 +1,69 @@
// Copyright tang. All rights reserved.
// https://gitee.com/inrgihc/dbswitch
//
// Use of this source code is governed by a BSD-style license
//
// Author: tang (inrgihc@126.com)
// Data : 2020/1/2
// Location: beijing , china
/////////////////////////////////////////////////////////////
package com.weishao.dbswitch.dbsynch;
import java.util.Map;
import java.util.HashMap;
import javax.sql.DataSource;
import com.weishao.dbswitch.dbcommon.util.DatabaseAwareUtils;
import com.weishao.dbswitch.dbsynch.mssql.SqlServerDatabaseSynchImpl;
import com.weishao.dbswitch.dbsynch.mysql.MySqlDatabaseSynchImpl;
import com.weishao.dbswitch.dbsynch.oracle.OracleDatabaseSynchImpl;
import com.weishao.dbswitch.dbsynch.pgsql.GreenplumDatabaseSynchImpl;
import com.weishao.dbswitch.dbsynch.pgsql.PostgresqlDatabaseSynchImpl;
import java.lang.reflect.Constructor;
/**
* 数据库同步器构造工厂类
*
* @author tang
*
*/
public final class DatabaseSynchronizeFactory {
private static final Map<String, String> DATABASE_SYNCH_MAPPER = new HashMap<String, String>() {
private static final long serialVersionUID = -2359773637275934408L;
{
put("MYSQL", MySqlDatabaseSynchImpl.class.getName());
put("ORACLE", OracleDatabaseSynchImpl.class.getName());
put("SQLSERVER", SqlServerDatabaseSynchImpl.class.getName());
put("POSTGRESQL", PostgresqlDatabaseSynchImpl.class.getName());
put("GREENPLUM", GreenplumDatabaseSynchImpl.class.getName());
}
};
/**
* 获取指定数据源的同步器
*
* @param dataSource 数据源
* @return 同步器对象
*/
public static AbstractDatabaseSynchronize createDatabaseWriter(DataSource dataSource) {
String type = DatabaseAwareUtils.getDatabaseNameByDataSource(dataSource).toUpperCase();
if (DATABASE_SYNCH_MAPPER.containsKey(type)) {
String className = DATABASE_SYNCH_MAPPER.get(type);
try {
Class<?>[] paramTypes = { DataSource.class };
Object[] paramValues = { dataSource };
Class<?> clas = Class.forName(className);
Constructor<?> cons = clas.getConstructor(paramTypes);
return (AbstractDatabaseSynchronize) cons.newInstance(paramValues);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
throw new RuntimeException(String.format("[dbsynch] Unkown Supported database type (%s)", type));
}
}

View File

@@ -0,0 +1,63 @@
// Copyright tang. All rights reserved.
// https://gitee.com/inrgihc/dbswitch
//
// Use of this source code is governed by a BSD-style license
//
// Author: tang (inrgihc@126.com)
// Data : 2020/1/2
// Location: beijing , china
/////////////////////////////////////////////////////////////
package com.weishao.dbswitch.dbsynch;
import java.util.List;
import javax.sql.DataSource;
/**
* 数据同步接口定义
*
* @author tang
*
*/
public interface IDatabaseSynchronize {
/**
* 获取数据源对象
*
* @return DataSource数据源对象
*/
DataSource getDataSource();
/**
* 批量Insert/Update/Delete预处理
*
* @param schemaName schema名称
* @param tableName table名称
* @param fieldNames 字段列表
* @param pks 主键字段列表
*/
void prepare(String schemaName, String tableName, List<String> fieldNames, List<String> pks);
/**
* 批量数据Insert
*
* @param records 数据记录
* @return 返回实际影响的记录条数
*/
long executeInsert(List<Object[]> records);
/**
* 批量数据Update
*
* @param records 数据记录
* @return 返回实际影响的记录条数
*/
long executeUpdate(List<Object[]> records);
/**
* 批量数据Delete
*
* @param records 数据记录
* @return 返回实际影响的记录条数
*/
long executeDelete(List<Object[]> records);
}

View File

@@ -0,0 +1,76 @@
// Copyright tang. All rights reserved.
// https://gitee.com/inrgihc/dbswitch
//
// Use of this source code is governed by a BSD-style license
//
// Author: tang (inrgihc@126.com)
// Data : 2020/1/2
// Location: beijing , china
/////////////////////////////////////////////////////////////
package com.weishao.dbswitch.dbsynch.mssql;
import java.util.ArrayList;
import java.util.List;
import javax.sql.DataSource;
import org.apache.commons.lang3.StringUtils;
import com.weishao.dbswitch.dbsynch.AbstractDatabaseSynchronize;
import com.weishao.dbswitch.dbsynch.IDatabaseSynchronize;
/**
* SQLServer数据库实现类
*
* @author tang
*
*/
public class SqlServerDatabaseSynchImpl extends AbstractDatabaseSynchronize implements IDatabaseSynchronize {
public SqlServerDatabaseSynchImpl(DataSource ds) {
super(ds);
}
@Override
public String getColumMetaDataSql(String schemaName, String tableName) {
return String.format("SELECT * FROM [%s].[%s] WHERE 1=2", schemaName, tableName);
}
@Override
public String getInsertPrepareStatementSql(String schemaName, String tableName, List<String> fieldNames) {
List<String> placeHolders = new ArrayList<String>();
for (int i = 0; i < fieldNames.size(); ++i) {
placeHolders.add("?");
}
return String.format("INSERT INTO [%s].[%s] ( [%s] ) VALUES ( %s )", schemaName, tableName,
StringUtils.join(fieldNames, "],["), StringUtils.join(placeHolders, ","));
}
@Override
public String getUpdatePrepareStatementSql(String schemaName, String tableName, List<String> fieldNames,
List<String> pks) {
List<String> uf = new ArrayList<String>();
for (String field : fieldNames) {
if (!pks.contains(field)) {
uf.add(String.format("[%s]=?", field));
}
}
List<String> uw = new ArrayList<String>();
for (String pk : pks) {
uw.add(String.format("[%s]=?", pk));
}
return String.format("UPDATE [%s].[%s] SET %s WHERE %s", schemaName, tableName, StringUtils.join(uf, " , "),
StringUtils.join(uw, " , "));
}
@Override
public String getDeletePrepareStatementSql(String schemaName, String tableName, List<String> pks) {
List<String> uw = new ArrayList<String>();
for (String pk : pks) {
uw.add(String.format("[%s]=?", pk));
}
return String.format("DELETE FROM [%s].[%s] WHERE %s ", schemaName, tableName, StringUtils.join(uw, " AND "));
}
}

View File

@@ -0,0 +1,76 @@
// Copyright tang. All rights reserved.
// https://gitee.com/inrgihc/dbswitch
//
// Use of this source code is governed by a BSD-style license
//
// Author: tang (inrgihc@126.com)
// Data : 2020/1/2
// Location: beijing , china
/////////////////////////////////////////////////////////////
package com.weishao.dbswitch.dbsynch.mysql;
import java.util.ArrayList;
import java.util.List;
import javax.sql.DataSource;
import org.apache.commons.lang3.StringUtils;
import com.weishao.dbswitch.dbsynch.AbstractDatabaseSynchronize;
import com.weishao.dbswitch.dbsynch.IDatabaseSynchronize;
/**
* MySQL数据库实现类
*
* @author tang
*
*/
public class MySqlDatabaseSynchImpl extends AbstractDatabaseSynchronize implements IDatabaseSynchronize {
public MySqlDatabaseSynchImpl(DataSource ds) {
super(ds);
}
@Override
public String getColumMetaDataSql(String schemaName, String tableName) {
return String.format("SELECT * FROM `%s`.`%s` WHERE 1=2", schemaName, tableName);
}
@Override
public String getInsertPrepareStatementSql(String schemaName, String tableName, List<String> fieldNames) {
List<String> placeHolders = new ArrayList<String>();
for (int i = 0; i < fieldNames.size(); ++i) {
placeHolders.add("?");
}
return String.format("INSERT INTO `%s`.`%s` ( `%s` ) VALUES ( %s )", schemaName, tableName,
StringUtils.join(fieldNames, "`,`"), StringUtils.join(placeHolders, ","));
}
@Override
public String getUpdatePrepareStatementSql(String schemaName, String tableName, List<String> fieldNames,
List<String> pks) {
List<String> uf = new ArrayList<String>();
for (String field : fieldNames) {
if (!pks.contains(field)) {
uf.add(String.format("`%s`=?", field));
}
}
List<String> uw = new ArrayList<String>();
for (String pk : pks) {
uw.add(String.format("`%s`=?", pk));
}
return String.format("UPDATE `%s`.`%s` SET %s WHERE %s", schemaName, tableName, StringUtils.join(uf, " , "),
StringUtils.join(uw, " , "));
}
@Override
public String getDeletePrepareStatementSql(String schemaName, String tableName, List<String> pks) {
List<String> uw = new ArrayList<String>();
for (String pk : pks) {
uw.add(String.format("`%s`=?", pk));
}
return String.format("DELETE FROM `%s`.`%s` WHERE %s ", schemaName, tableName, StringUtils.join(uw, " AND "));
}
}

View File

@@ -0,0 +1,76 @@
// Copyright tang. All rights reserved.
// https://gitee.com/inrgihc/dbswitch
//
// Use of this source code is governed by a BSD-style license
//
// Author: tang (inrgihc@126.com)
// Data : 2020/1/2
// Location: beijing , china
/////////////////////////////////////////////////////////////
package com.weishao.dbswitch.dbsynch.oracle;
import java.util.ArrayList;
import java.util.List;
import javax.sql.DataSource;
import org.apache.commons.lang3.StringUtils;
import com.weishao.dbswitch.dbsynch.AbstractDatabaseSynchronize;
import com.weishao.dbswitch.dbsynch.IDatabaseSynchronize;
/**
* Oracle数据库实现类
*
* @author tang
*
*/
public class OracleDatabaseSynchImpl extends AbstractDatabaseSynchronize implements IDatabaseSynchronize {
public OracleDatabaseSynchImpl(DataSource ds) {
super(ds);
}
@Override
public String getColumMetaDataSql(String schemaName, String tableName) {
return String.format("SELECT * FROM \"%s\".\"%s\" WHERE 1=2", schemaName, tableName);
}
@Override
public String getInsertPrepareStatementSql(String schemaName, String tableName, List<String> fieldNames) {
List<String> placeHolders = new ArrayList<String>();
for (int i = 0; i < fieldNames.size(); ++i) {
placeHolders.add("?");
}
return String.format("INSERT INTO \"%s\".\"%s\" ( \"%s\" ) VALUES ( %s )", schemaName, tableName,
StringUtils.join(fieldNames, "\",\""), StringUtils.join(placeHolders, ","));
}
@Override
public String getUpdatePrepareStatementSql(String schemaName, String tableName, List<String> fieldNames,
List<String> pks) {
List<String> uf = new ArrayList<String>();
for (String field : fieldNames) {
if (!pks.contains(field)) {
uf.add(String.format("\"%s\"=?", field));
}
}
List<String> uw = new ArrayList<String>();
for (String pk : pks) {
uw.add(String.format("\"%s\"=?", pk));
}
return String.format("UPDATE \"%s\".\"%s\" SET %s WHERE %s", schemaName, tableName, StringUtils.join(uf, " , "),
StringUtils.join(uw, " , "));
}
@Override
public String getDeletePrepareStatementSql(String schemaName, String tableName, List<String> pks) {
List<String> uw = new ArrayList<String>();
for (String pk : pks) {
uw.add(String.format("\"%s\"=?", pk));
}
return String.format("DELETE FROM \"%s\".\"%s\" WHERE %s ", schemaName, tableName, StringUtils.join(uw, " AND "));
}
}

View File

@@ -0,0 +1,27 @@
// Copyright tang. All rights reserved.
// https://gitee.com/inrgihc/dbswitch
//
// Use of this source code is governed by a BSD-style license
//
// Author: tang (inrgihc@126.com)
// Data : 2020/1/2
// Location: beijing , china
/////////////////////////////////////////////////////////////
package com.weishao.dbswitch.dbsynch.pgsql;
import javax.sql.DataSource;
import com.weishao.dbswitch.dbsynch.IDatabaseSynchronize;
/**
* Greenplum数据库实现类
*
* @author tang
*
*/
public class GreenplumDatabaseSynchImpl extends PostgresqlDatabaseSynchImpl implements IDatabaseSynchronize {
public GreenplumDatabaseSynchImpl(DataSource ds) {
super(ds);
}
}

View File

@@ -0,0 +1,76 @@
// Copyright tang. All rights reserved.
// https://gitee.com/inrgihc/dbswitch
//
// Use of this source code is governed by a BSD-style license
//
// Author: tang (inrgihc@126.com)
// Data : 2020/1/2
// Location: beijing , china
/////////////////////////////////////////////////////////////
package com.weishao.dbswitch.dbsynch.pgsql;
import java.util.ArrayList;
import java.util.List;
import javax.sql.DataSource;
import org.apache.commons.lang3.StringUtils;
import com.weishao.dbswitch.dbsynch.AbstractDatabaseSynchronize;
import com.weishao.dbswitch.dbsynch.IDatabaseSynchronize;
/**
* PostgreSQL数据库实现类
*
* @author tang
*
*/
public class PostgresqlDatabaseSynchImpl extends AbstractDatabaseSynchronize implements IDatabaseSynchronize {
public PostgresqlDatabaseSynchImpl(DataSource ds) {
super(ds);
}
@Override
public String getColumMetaDataSql(String schemaName, String tableName) {
return String.format("SELECT * FROM \"%s\".\"%s\" WHERE 1=2", schemaName, tableName);
}
@Override
public String getInsertPrepareStatementSql(String schemaName, String tableName, List<String> fieldNames) {
List<String> placeHolders = new ArrayList<String>();
for (int i = 0; i < fieldNames.size(); ++i) {
placeHolders.add("?");
}
return String.format("INSERT INTO \"%s\".\"%s\" ( \"%s\" ) VALUES ( %s )", schemaName, tableName,
StringUtils.join(fieldNames, "\",\""), StringUtils.join(placeHolders, ","));
}
@Override
public String getUpdatePrepareStatementSql(String schemaName, String tableName, List<String> fieldNames,
List<String> pks) {
List<String> uf = new ArrayList<String>();
for (String field : fieldNames) {
if (!pks.contains(field)) {
uf.add(String.format("\"%s\"=?", field));
}
}
List<String> uw = new ArrayList<String>();
for (String pk : pks) {
uw.add(String.format("\"%s\"=?", pk));
}
return String.format("UPDATE \"%s\".\"%s\" SET %s WHERE %s", schemaName, tableName, StringUtils.join(uf, " , "),
StringUtils.join(uw, " , "));
}
@Override
public String getDeletePrepareStatementSql(String schemaName, String tableName, List<String> pks) {
List<String> uw = new ArrayList<String>();
for (String pk : pks) {
uw.add(String.format("\"%s\"=?", pk));
}
return String.format("DELETE FROM \"%s\".\"%s\" WHERE %s ", schemaName, tableName, StringUtils.join(uw, " AND "));
}
}