代码逻辑梳理优化

This commit is contained in:
inrgihc
2023-07-27 23:50:01 +08:00
parent b261976bae
commit 540890ae77
55 changed files with 510 additions and 1280 deletions

View File

@@ -0,0 +1,63 @@
package com.gitee.dbswitch.product.oracle;
import com.gitee.dbswitch.common.util.ObjectCastUtils;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Types;
import java.util.List;
import java.util.Objects;
import lombok.experimental.UtilityClass;
import org.springframework.jdbc.core.SqlTypeValue;
@UtilityClass
public class OracleCastUtils {
/**
* 将java.sql.Array 类型转换为java.lang.String
* <p>
* Oracle 没有数组类型,这里以文本类型进行存在
* <p>
* Oracle的CLOB和BLOB类型写入请见
* <p>
* oracle.jdbc.driver.OraclePreparedStatement.setObjectCritical
*/
public static Object castByJdbcType(int jdbcType, Object value, List<InputStream> iss) {
try {
switch (jdbcType) {
case Types.CLOB:
case Types.NCLOB:
return Objects.isNull(value)
? null
: ObjectCastUtils.castToString(value);
case Types.BLOB:
final byte[] bytes = Objects.isNull(value)
? null
: ObjectCastUtils.castToByteArray(value);
return new SqlTypeValue() {
@Override
public void setTypeValue(PreparedStatement ps, int paramIndex, int sqlType,
String typeName) throws SQLException {
if (null != bytes) {
InputStream is = new ByteArrayInputStream(bytes);
ps.setBlob(paramIndex, is);
iss.add(is);
} else {
ps.setNull(paramIndex, sqlType);
}
}
};
case Types.ROWID:
case Types.ARRAY:
case Types.REF:
case Types.SQLXML:
default:
return null;
}
} catch (Exception e) {
return null;
}
}
}

View File

@@ -47,7 +47,7 @@ public class OracleFactoryProvider extends AbstractFactoryProvider {
@Override
public TableDataSynchronizer createTableDataSynchronizer() {
return new OracleTableSynchronizer(this);
return new OracleTableDataSynchronizer(this);
}
}

View File

@@ -0,0 +1,66 @@
// Copyright tang. All rights reserved.
// https://gitee.com/inrgihc/dbswitch
//
// Use of this source code is governed by a BSD-style license
//
// Author: tang (inrgihc@126.com)
// Date : 2020/1/2
// Location: beijing , china
/////////////////////////////////////////////////////////////
package com.gitee.dbswitch.product.oracle;
import com.gitee.dbswitch.provider.ProductFactoryProvider;
import com.gitee.dbswitch.provider.sync.DefaultTableDataSynchronizer;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
public class OracleTableDataSynchronizer extends DefaultTableDataSynchronizer {
public OracleTableDataSynchronizer(ProductFactoryProvider factoryProvider) {
super(factoryProvider);
}
@Override
public long executeInsert(List<Object[]> records) {
List<InputStream> iss = new ArrayList<>();
records.parallelStream().forEach((Object[] row) -> {
for (int i = 0; i < row.length; ++i) {
row[i] = OracleCastUtils.castByJdbcType(insertArgsType[i], row[i], iss);
}
});
try {
return super.executeInsert(records);
} finally {
iss.forEach(is -> {
try {
is.close();
} catch (Exception ignore) {
}
});
}
}
@Override
public long executeUpdate(List<Object[]> records) {
List<InputStream> iss = new ArrayList<>();
records.parallelStream().forEach((Object[] row) -> {
for (int i = 0; i < row.length; ++i) {
row[i] = OracleCastUtils.castByJdbcType(updateArgsType[i], row[i], iss);
}
});
try {
return super.executeUpdate(records);
} finally {
iss.forEach(is -> {
try {
is.close();
} catch (Exception ignore) {
}
});
}
}
}

View File

@@ -9,7 +9,7 @@
/////////////////////////////////////////////////////////////
package com.gitee.dbswitch.product.oracle;
import com.gitee.dbswitch.common.util.TypeConvertUtils;
import com.gitee.dbswitch.common.util.ObjectCastUtils;
import com.gitee.dbswitch.provider.ProductFactoryProvider;
import com.gitee.dbswitch.provider.write.DefaultTableDataWriteProvider;
import java.io.ByteArrayInputStream;
@@ -30,57 +30,11 @@ public class OracleTableDataWriteProvider extends DefaultTableDataWriteProvider
@Override
public long write(List<String> fieldNames, List<Object[]> recordValues) {
/**
* 将java.sql.Array 类型转换为java.lang.String
* <p>
* Oracle 没有数组类型,这里以文本类型进行存在
* <p>
* Oracle的CLOB和BLOB类型写入请见
* <p>
* oracle.jdbc.driver.OraclePreparedStatement.setObjectCritical
*/
List<InputStream> iss = new ArrayList<>();
recordValues.parallelStream().forEach((Object[] row) -> {
for (int i = 0; i < row.length; ++i) {
try {
int dataType = this.columnType.get(fieldNames.get(i));
switch (dataType) {
case Types.CLOB:
case Types.NCLOB:
row[i] = Objects.isNull(row[i])
? null
: TypeConvertUtils.castToString(row[i]);
break;
case Types.BLOB:
final byte[] bytes = Objects.isNull(row[i])
? null
: TypeConvertUtils.castToByteArray(row[i]);
row[i] = new SqlTypeValue() {
@Override
public void setTypeValue(PreparedStatement ps, int paramIndex, int sqlType,
String typeName) throws SQLException {
if (null != bytes) {
InputStream is = new ByteArrayInputStream(bytes);
ps.setBlob(paramIndex, is);
iss.add(is);
} else {
ps.setNull(paramIndex, sqlType);
}
}
};
break;
case Types.ROWID:
case Types.ARRAY:
case Types.REF:
case Types.SQLXML:
row[i] = null;
break;
default:
break;
}
} catch (Exception e) {
row[i] = null;
}
int dataType = this.columnType.get(fieldNames.get(i));
row[i] = OracleCastUtils.castByJdbcType(dataType, row[i], iss);
}
});

View File

@@ -18,12 +18,6 @@ public class OracleTableOperateProvider extends DefaultTableOperateProvider {
super(factoryProvider);
}
@Override
public void truncateTableData(String schemaName, String tableName) {
String sql = String.format("TRUNCATE TABLE \"%s\".\"%s\" ", schemaName, tableName);
this.executeSql(sql);
}
@Override
public void dropTable(String schemaName, String tableName) {
String sql = String.format("DROP TABLE \"%s\".\"%s\" CASCADE CONSTRAINTS", schemaName, tableName);

View File

@@ -1,147 +0,0 @@
// Copyright tang. All rights reserved.
// https://gitee.com/inrgihc/dbswitch
//
// Use of this source code is governed by a BSD-style license
//
// Author: tang (inrgihc@126.com)
// Date : 2020/1/2
// Location: beijing , china
/////////////////////////////////////////////////////////////
package com.gitee.dbswitch.product.oracle;
import com.gitee.dbswitch.common.util.TypeConvertUtils;
import com.gitee.dbswitch.provider.ProductFactoryProvider;
import com.gitee.dbswitch.provider.sync.DefaultTableDataSynchronizer;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Types;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import org.springframework.jdbc.core.SqlTypeValue;
public class OracleTableSynchronizer extends DefaultTableDataSynchronizer {
public OracleTableSynchronizer(ProductFactoryProvider factoryProvider) {
super(factoryProvider);
}
@Override
public long executeInsert(List<Object[]> records) {
List<InputStream> iss = new ArrayList<>();
records.parallelStream().forEach((Object[] row) -> {
for (int i = 0; i < row.length; ++i) {
try {
switch (this.insertArgsType[i]) {
case Types.CLOB:
case Types.NCLOB:
row[i] = Objects.isNull(row[i])
? null
: TypeConvertUtils.castToString(row[i]);
break;
case Types.BLOB:
final byte[] bytes = Objects.isNull(row[i])
? null
: TypeConvertUtils.castToByteArray(row[i]);
row[i] = new SqlTypeValue() {
@Override
public void setTypeValue(PreparedStatement ps, int paramIndex, int sqlType,
String typeName) throws SQLException {
if (null != bytes) {
InputStream is = new ByteArrayInputStream(bytes);
ps.setBlob(paramIndex, is);
iss.add(is);
} else {
ps.setNull(paramIndex, sqlType);
}
}
};
break;
case Types.ROWID:
case Types.ARRAY:
case Types.REF:
case Types.SQLXML:
row[i] = null;
break;
default:
break;
}
} catch (Exception e) {
row[i] = null;
}
}
});
try {
return super.executeInsert(records);
} finally {
iss.forEach(is -> {
try {
is.close();
} catch (Exception ignore) {
}
});
}
}
@Override
public long executeUpdate(List<Object[]> records) {
List<InputStream> iss = new ArrayList<>();
records.parallelStream().forEach((Object[] row) -> {
for (int i = 0; i < row.length; ++i) {
try {
switch (this.updateArgsType[i]) {
case Types.CLOB:
case Types.NCLOB:
row[i] = Objects.isNull(row[i])
? null
: TypeConvertUtils.castToString(row[i]);
break;
case Types.BLOB:
final byte[] bytes = Objects.isNull(row[i])
? null
: TypeConvertUtils.castToByteArray(row[i]);
row[i] = new SqlTypeValue() {
@Override
public void setTypeValue(PreparedStatement ps, int paramIndex, int sqlType,
String typeName) throws SQLException {
if (null != bytes) {
InputStream is = new ByteArrayInputStream(bytes);
ps.setBlob(paramIndex, is);
iss.add(is);
} else {
ps.setNull(paramIndex, sqlType);
}
}
};
break;
case Types.ROWID:
case Types.ARRAY:
case Types.REF:
case Types.SQLXML:
row[i] = null;
break;
default:
break;
}
} catch (Exception e) {
row[i] = null;
}
}
});
try {
return super.executeUpdate(records);
} finally {
iss.forEach(is -> {
try {
is.close();
} catch (Exception ignore) {
}
});
}
}
}