version 1.6.7

This commit is contained in:
inrgihc
2022-03-27 00:45:10 +08:00
parent 0312a4b1a4
commit 14b6aee9b1
61 changed files with 759 additions and 90 deletions

View File

@@ -5,7 +5,7 @@
<parent>
<groupId>com.gitee.dbswitch</groupId>
<artifactId>dbswitch-parent</artifactId>
<version>1.6.6</version>
<version>1.6.7</version>
</parent>
<artifactId>dbswitch-dbwriter</artifactId>

View File

@@ -11,6 +11,8 @@ package com.gitee.dbswitch.dbwriter.db2;
import com.gitee.dbswitch.dbwriter.AbstractDatabaseWriter;
import com.gitee.dbswitch.dbwriter.IDatabaseWriter;
import com.gitee.dbswitch.dbwriter.util.ObjectCastUtils;
import java.util.List;
import javax.sql.DataSource;
import lombok.extern.slf4j.Slf4j;
@@ -31,4 +33,18 @@ public class DB2WriterImpl extends AbstractDatabaseWriter implements IDatabaseWr
return "DB2";
}
@Override
public long write(List<String> fieldNames, List<Object[]> recordValues) {
recordValues.parallelStream().forEach((Object[] row) -> {
for (int i = 0; i < row.length; ++i) {
try {
row[i] = ObjectCastUtils.castByDetermine(row[i]);
} catch (Exception e) {
row[i] = null;
}
}
});
return super.write(fieldNames, recordValues);
}
}

View File

@@ -9,8 +9,10 @@
/////////////////////////////////////////////////////////////
package com.gitee.dbswitch.dbwriter.dm;
import com.gitee.dbswitch.dbwriter.AbstractDatabaseWriter;
import com.gitee.dbswitch.dbwriter.IDatabaseWriter;
import com.gitee.dbswitch.dbwriter.oracle.OracleWriterImpl;
import com.gitee.dbswitch.dbwriter.util.ObjectCastUtils;
import java.util.List;
import javax.sql.DataSource;
/**
@@ -18,7 +20,7 @@ import javax.sql.DataSource;
*
* @author tang
*/
public class DmWriterImpl extends OracleWriterImpl implements IDatabaseWriter {
public class DmWriterImpl extends AbstractDatabaseWriter implements IDatabaseWriter {
public DmWriterImpl(DataSource dataSource) {
super(dataSource);
@@ -29,4 +31,18 @@ public class DmWriterImpl extends OracleWriterImpl implements IDatabaseWriter {
return "DM";
}
@Override
public long write(List<String> fieldNames, List<Object[]> recordValues) {
recordValues.parallelStream().forEach((Object[] row) -> {
for (int i = 0; i < row.length; ++i) {
try {
row[i] = ObjectCastUtils.castByDetermine(row[i]);
} catch (Exception e) {
row[i] = null;
}
}
});
return super.write(fieldNames, recordValues);
}
}

View File

@@ -11,6 +11,8 @@ package com.gitee.dbswitch.dbwriter.gpdb;
import com.gitee.dbswitch.dbwriter.AbstractDatabaseWriter;
import com.gitee.dbswitch.dbwriter.IDatabaseWriter;
import com.gitee.dbswitch.dbwriter.util.ObjectCastUtils;
import java.util.List;
import javax.sql.DataSource;
import lombok.extern.slf4j.Slf4j;
@@ -31,4 +33,18 @@ public class GreenplumInsertWriterImpl extends AbstractDatabaseWriter implements
return "Greenplum";
}
@Override
public long write(List<String> fieldNames, List<Object[]> recordValues) {
recordValues.parallelStream().forEach((Object[] row) -> {
for (int i = 0; i < row.length; ++i) {
try {
row[i] = ObjectCastUtils.castByDetermine(row[i]);
} catch (Exception e) {
row[i] = null;
}
}
});
return super.write(fieldNames, recordValues);
}
}

View File

@@ -9,14 +9,23 @@
/////////////////////////////////////////////////////////////
package com.gitee.dbswitch.dbwriter.mysql;
import com.gitee.dbswitch.common.util.TypeConvertUtils;
import com.gitee.dbswitch.dbwriter.AbstractDatabaseWriter;
import com.gitee.dbswitch.dbwriter.IDatabaseWriter;
import com.gitee.dbswitch.dbwriter.util.ObjectCastUtils;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Types;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import javax.sql.DataSource;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.jdbc.core.SqlTypeValue;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
@@ -55,6 +64,20 @@ public class MySqlWriterImpl extends AbstractDatabaseWriter implements IDatabase
@Override
public long write(List<String> fieldNames, List<Object[]> recordValues) {
if (recordValues.isEmpty()) {
return 0;
}
recordValues.parallelStream().forEach((Object[] row) -> {
for (int i = 0; i < row.length; ++i) {
try {
row[i] = ObjectCastUtils.castByDetermine(row[i]);
} catch (Exception e) {
row[i] = null;
}
}
});
List<String> placeHolders = Collections.nCopies(fieldNames.size(), "?");
String sqlInsert = String.format("INSERT INTO `%s`.`%s` ( `%s` ) VALUES ( %s )",
schemaName, tableName,

View File

@@ -9,13 +9,20 @@
/////////////////////////////////////////////////////////////
package com.gitee.dbswitch.dbwriter.oracle;
import com.gitee.dbswitch.common.util.TypeConvertUtils;
import com.gitee.dbswitch.dbwriter.AbstractDatabaseWriter;
import com.gitee.dbswitch.dbwriter.IDatabaseWriter;
import com.gitee.dbswitch.dbwriter.util.ObjectCastUtils;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Types;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import javax.sql.DataSource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jdbc.core.SqlTypeValue;
/**
* Oracle数据库写入实现类
@@ -40,28 +47,48 @@ public class OracleWriterImpl extends AbstractDatabaseWriter implements IDatabas
* 将java.sql.Array 类型转换为java.lang.String
* <p>
* Oracle 没有数组类型,这里以文本类型进行存在
* <p>
* Oracle的CLOB和BLOB类型写入请见
* <p>
* oracle.jdbc.driver.OraclePreparedStatement.setObjectCritical
*/
List<InputStream> iss = new ArrayList<>();
recordValues.parallelStream().forEach((Object[] row) -> {
for (int i = 0; i < row.length; ++i) {
try {
int dataType = this.columnType.get(fieldNames.get(i));
switch (dataType) {
// oracle.jdbc.driver.OraclePreparedStatement.setObjectCritical
case Types.CLOB:
case Types.NCLOB:
row[i] = Objects.isNull(row[i])
? null
: TypeConvertUtils.castToString(row[i]);
break;
case Types.BLOB:
// 需要oracle.sql.BLOB类型
log.warn("Unsupported type for convert {} to oracle.sql.BLOB",
row[i].getClass().getName());
row[i] = null;
final byte[] bytes = Objects.isNull(row[i])
? null
: TypeConvertUtils.castToByteArray(row[i]);
row[i] = new SqlTypeValue() {
@Override
public void setTypeValue(PreparedStatement ps, int paramIndex, int sqlType,
String typeName) throws SQLException {
if (null != bytes) {
InputStream is = new ByteArrayInputStream(bytes);
ps.setBlob(paramIndex, is);
iss.add(is);
} else {
ps.setNull(paramIndex, sqlType);
}
}
};
break;
case Types.ROWID:
case Types.ARRAY:
case Types.NCLOB:
case Types.REF:
case Types.SQLXML:
row[i] = null;
break;
default:
row[i] = ObjectCastUtils.castByDetermine(row[i]);
break;
}
} catch (Exception e) {
@@ -70,6 +97,15 @@ public class OracleWriterImpl extends AbstractDatabaseWriter implements IDatabas
}
});
return super.write(fieldNames, recordValues);
try {
return super.write(fieldNames, recordValues);
} finally {
iss.forEach(is -> {
try {
is.close();
} catch (Exception ignore) {
}
});
}
}
}

View File

@@ -868,18 +868,18 @@ public final class ObjectCastUtils {
}
if (in instanceof java.sql.Clob) {
return ObjectCastUtils.clob2Str((java.sql.Clob) in);
return clob2Str((java.sql.Clob) in);
} else if (in instanceof java.sql.Array
|| in instanceof java.sql.SQLXML) {
try {
return ObjectCastUtils.objectToString(in);
return objectToString(in);
} catch (Exception e) {
log.warn("Unsupported type for convert {} to java.lang.String", in.getClass().getName());
return null;
}
} else if (in instanceof java.sql.Blob) {
try {
return ObjectCastUtils.blob2Bytes((java.sql.Blob) in);
return blob2Bytes((java.sql.Blob) in);
} catch (Exception e) {
log.warn("Unsupported type for convert {} to byte[] ", in.getClass().getName());
return null;