代码整理

This commit is contained in:
inrgihc
2022-08-14 00:27:46 +08:00
parent 31dd8e38d1
commit 898106d409
6 changed files with 318 additions and 326 deletions

View File

@@ -3,15 +3,6 @@ FROM openjdk:8-jre-alpine
ENV TZ=Asia/Shanghai ENV TZ=Asia/Shanghai
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
ENV TZ=Asia/Shanghai
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
ENV TZ=Asia/Shanghai
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
ENV TZ=Asia/Shanghai
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
ADD dbswitch-release.tar.gz / ADD dbswitch-release.tar.gz /
EXPOSE 9088 EXPOSE 9088

0
build-docker/dbswitch/dbswitch-release/bin/startup.sh Executable file → Normal file
View File

View File

@@ -41,8 +41,11 @@ import java.util.function.Supplier;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import javax.annotation.Resource; import javax.annotation.Resource;
import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.quartz.CronExpression;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import org.quartz.CronExpression;
@Service @Service
public class AssignmentService { public class AssignmentService {

View File

@@ -22,6 +22,9 @@ import org.springframework.stereotype.Service;
@Service @Service
public class PatternMapperService { public class PatternMapperService {
private final String STRING_EMPTY = "<!空>";
private final String STRING_DELETE = "<!删除>";
@Resource @Resource
private DbConnectionService connectionService; private DbConnectionService connectionService;
@@ -34,18 +37,22 @@ public class PatternMapperService {
List<PreviewNameMapperResponse> result = new ArrayList<>(); List<PreviewNameMapperResponse> result = new ArrayList<>();
if (CollectionUtils.isEmpty(request.getTableNames())) { if (CollectionUtils.isEmpty(request.getTableNames())) {
for (TableDescription td : getAllTableNames(request)) { for (TableDescription td : getAllTableNames(request)) {
String targetName = PatterNameUtils.getFinalName(
td.getTableName(), request.getNameMapper());
result.add(PreviewNameMapperResponse.builder() result.add(PreviewNameMapperResponse.builder()
.originalName(td.getTableName()) .originalName(td.getTableName())
.targetName(PatterNameUtils.getFinalName(td.getTableName(), request.getNameMapper())) .targetName(StringUtils.isNotBlank(targetName) ? targetName : STRING_EMPTY)
.build()); .build());
} }
} else { } else {
if (include) { if (include) {
for (String name : request.getTableNames()) { for (String name : request.getTableNames()) {
if (StringUtils.isNotBlank(name)) { if (StringUtils.isNotBlank(name)) {
String targetName = PatterNameUtils.getFinalName(
name, request.getNameMapper());
result.add(PreviewNameMapperResponse.builder() result.add(PreviewNameMapperResponse.builder()
.originalName(name) .originalName(name)
.targetName(PatterNameUtils.getFinalName(name, request.getNameMapper())) .targetName(StringUtils.isNotBlank(targetName) ? targetName : STRING_EMPTY)
.build()); .build());
} }
} }
@@ -92,7 +99,7 @@ public class PatternMapperService {
} else { } else {
result.add(PreviewNameMapperResponse.builder() result.add(PreviewNameMapperResponse.builder()
.originalName(cd.getFieldName()) .originalName(cd.getFieldName())
.targetName("<!字段被删除>") .targetName(STRING_DELETE)
.build()); .build());
} }
} }
@@ -110,14 +117,10 @@ public class PatternMapperService {
if (null == dbConn) { if (null == dbConn) {
throw new DbswitchException(ResultCode.ERROR_RESOURCE_NOT_EXISTS, "id=" + request.getId()); throw new DbswitchException(ResultCode.ERROR_RESOURCE_NOT_EXISTS, "id=" + request.getId());
} }
IMetaDataByJdbcService service = connectionService.getMetaDataCoreService(dbConn); IMetaDataByJdbcService service = connectionService.getMetaDataCoreService(dbConn);
return service.queryTableList( return service.queryTableList(dbConn.getUrl(), dbConn.getUsername(), dbConn.getPassword(),
dbConn.getUrl(), request.getSchemaName()).stream().filter(td -> !td.isViewTable())
dbConn.getUsername(),
dbConn.getPassword(),
request.getSchemaName()
).stream()
.filter(td -> !td.isViewTable())
.collect(Collectors.toList()); .collect(Collectors.toList());
} }

View File

@@ -9,312 +9,310 @@
///////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////
package com.gitee.dbswitch.core.database; package com.gitee.dbswitch.core.database;
import java.util.List; import com.gitee.dbswitch.common.type.DatabaseTypeEnum;
import java.util.Objects; import com.gitee.dbswitch.common.util.DbswitchStrUtils;
import java.util.Properties; import com.gitee.dbswitch.common.util.HivePrepareUtils;
import java.util.Set; import com.gitee.dbswitch.common.util.TypeConvertUtils;
import java.util.ArrayList; import com.gitee.dbswitch.core.model.ColumnDescription;
import java.util.HashSet; import com.gitee.dbswitch.core.model.ColumnMetaData;
import com.gitee.dbswitch.core.model.SchemaTableData;
import com.gitee.dbswitch.core.model.TableDescription;
import java.sql.Connection; import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.Statement;
import java.sql.PreparedStatement;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.ResultSetMetaData; import java.sql.ResultSetMetaData;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import com.gitee.dbswitch.common.constant.DatabaseTypeEnum;
import com.gitee.dbswitch.core.model.ColumnDescription;
import com.gitee.dbswitch.core.model.ColumnMetaData;
import com.gitee.dbswitch.core.model.TableDescription;
import com.gitee.dbswitch.core.util.JdbcOperatorUtils;
/** /**
* 数据库元信息抽象基类 * 数据库元信息抽象基类
* *
* @author tang * @author tang
*
*/ */
public abstract class AbstractDatabase implements IDatabaseInterface { public abstract class AbstractDatabase implements IDatabaseInterface {
public static final int CLOB_LENGTH = 9999999; public static final int CLOB_LENGTH = 9999999;
protected Connection connection = null; protected String driverClassName;
protected DatabaseMetaData metaData = null; protected String catalogName = null;
protected String catalogName = null;
public AbstractDatabase(String driverClassName) { public AbstractDatabase(String driverClassName) {
this.catalogName = null; try {
this.driverClassName = driverClassName;
Class.forName(driverClassName);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
try { @Override
Class.forName(driverClassName); public String getDriverClassName() {
} catch (ClassNotFoundException e) { return this.driverClassName;
throw new RuntimeException(e); }
}
}
@Override @Override
public void connect(String jdbcUrl, String username, String password) { public List<String> querySchemaList(Connection connection) {
/* Set<String> ret = new HashSet<>();
* 超时时间设置问题: https://blog.csdn.net/lsunwing/article/details/79461217 try (ResultSet schemas = connection.getMetaData().getSchemas()) {
* https://blog.csdn.net/weixin_34405332/article/details/91664781 while (schemas.next()) {
*/ ret.add(schemas.getString("TABLE_SCHEM"));
try { }
/** return new ArrayList<>(ret);
* Oracle在通过jdbc连接的时候需要添加一个参数来设置是否获取注释 } catch (SQLException e) {
*/ throw new RuntimeException(e);
Properties props = new Properties(); }
props.put("user", username); }
props.put("password", password);
props.put("remarksReporting", "true");
// 设置最大时间 @Override
DriverManager.setLoginTimeout(15); public List<TableDescription> queryTableList(Connection connection, String schemaName) {
List<TableDescription> ret = new ArrayList<>();
Set<String> uniqueSet = new HashSet<>();
String[] types = new String[]{"TABLE", "VIEW"};
try (ResultSet tables = connection.getMetaData()
.getTables(this.catalogName, schemaName, "%", types)) {
while (tables.next()) {
String tableName = tables.getString("TABLE_NAME");
if (uniqueSet.contains(tableName)) {
continue;
} else {
uniqueSet.add(tableName);
}
this.connection = DriverManager.getConnection(jdbcUrl, props); TableDescription td = new TableDescription();
if (Objects.isNull(this.connection)) { td.setSchemaName(schemaName);
throw new RuntimeException("数据库连接失败,连接参数为:" + jdbcUrl); td.setTableName(tableName);
} td.setRemarks(tables.getString("REMARKS"));
td.setTableType(tables.getString("TABLE_TYPE").toUpperCase());
ret.add(td);
}
return ret;
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
this.metaData = Objects.requireNonNull(this.connection.getMetaData()); @Override
} catch (SQLException e) { public TableDescription queryTableMeta(Connection connection, String schemaName,
throw new RuntimeException(e); String tableName) {
} return queryTableList(connection, schemaName).stream()
.filter(one -> tableName.equals(one.getTableName()))
.findAny().orElse(null);
}
} @Override
public List<String> queryTableColumnName(Connection connection, String schemaName,
String tableName) {
Set<String> columns = new HashSet<>();
try (ResultSet rs = connection.getMetaData()
.getColumns(this.catalogName, schemaName, tableName, null)) {
while (rs.next()) {
columns.add(rs.getString("COLUMN_NAME"));
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
return new ArrayList<>(columns);
}
@Override @Override
public void close() { public List<ColumnDescription> queryTableColumnMeta(Connection connection, String schemaName,
if (null != connection) { String tableName) {
try { String sql = this.getTableFieldsQuerySQL(schemaName, tableName);
connection.close(); List<ColumnDescription> ret = this.querySelectSqlColumnMeta(connection, sql);
} catch (SQLException e) {
}
connection = null;
}
}
@Override // 补充一下注释信息
public List<String> querySchemaList() { try (ResultSet columns = connection.getMetaData()
Set<String> ret = new HashSet<>(); .getColumns(this.catalogName, schemaName, tableName, null)) {
ResultSet schemas = null; while (columns.next()) {
try { String columnName = columns.getString("COLUMN_NAME");
schemas = this.metaData.getSchemas(); String remarks = columns.getString("REMARKS");
while (schemas.next()) { for (ColumnDescription cd : ret) {
ret.add(schemas.getString("TABLE_SCHEM")); if (columnName.equals(cd.getFieldName())) {
} cd.setRemarks(remarks);
return new ArrayList<>(ret); }
} catch (SQLException e) { }
throw new RuntimeException(e); }
} finally { } catch (SQLException e) {
try { throw new RuntimeException(e);
if (null != schemas) { }
schemas.close(); return ret;
schemas = null; }
}
} catch (SQLException e) {
}
}
} @Override
public List<String> queryTablePrimaryKeys(Connection connection, String schemaName,
String tableName) {
Set<String> ret = new HashSet<>();
try (ResultSet primaryKeys = connection.getMetaData()
.getPrimaryKeys(this.catalogName, schemaName, tableName)) {
while (primaryKeys.next()) {
String name = primaryKeys.getString("COLUMN_NAME");
if (!ret.contains(name)) {
ret.add(name);
}
}
return new ArrayList<>(ret);
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
@Override @Override
public List<TableDescription> queryTableList(String schemaName) { public SchemaTableData queryTableData(Connection connection, String schemaName, String tableName,
List<TableDescription> ret = new ArrayList<>(); int rowCount) {
Set<String> uniqueSet = new HashSet<>(); String fullTableName = getQuotedSchemaTableCombination(schemaName, tableName);
ResultSet tables = null; String querySQL = String.format("SELECT * FROM %s ", fullTableName);
try { SchemaTableData data = new SchemaTableData();
tables = this.metaData.getTables(this.catalogName, schemaName, "%", new String[] { "TABLE", "VIEW" }); data.setSchemaName(schemaName);
while (tables.next()) { data.setTableName(tableName);
String tableName = tables.getString("TABLE_NAME"); data.setColumns(new ArrayList<>());
if (uniqueSet.contains(tableName)) { data.setRows(new ArrayList<>());
continue; try (Statement st = connection.createStatement()) {
} else { if (getDatabaseType() == DatabaseTypeEnum.HIVE) {
uniqueSet.add(tableName); HivePrepareUtils.prepare(connection, schemaName, tableName);
} }
TableDescription td = new TableDescription(); try (ResultSet rs = st.executeQuery(querySQL)) {
td.setSchemaName(schemaName); ResultSetMetaData m = rs.getMetaData();
td.setTableName(tableName); int count = m.getColumnCount();
td.setRemarks(tables.getString("REMARKS")); for (int i = 1; i <= count; i++) {
td.setTableType(tables.getString("TABLE_TYPE").toUpperCase()); data.getColumns().add(m.getColumnLabel(i));
ret.add(td); }
}
return ret;
} catch (SQLException e) {
throw new RuntimeException(e);
} finally {
try {
if (null != tables) {
tables.close();
tables = null;
}
} catch (SQLException e) {
}
}
}
@Override int counter = 0;
public List<ColumnDescription> queryTableColumnMeta(String schemaName, String tableName) { while (rs.next() && counter++ < rowCount) {
String sql = this.getTableFieldsQuerySQL(schemaName, tableName); List<Object> row = new ArrayList<>(count);
List<ColumnDescription> ret = this.querySelectSqlColumnMeta(sql); for (int i = 1; i <= count; i++) {
ResultSet columns = null; Object value = rs.getObject(i);
try { if (value != null && value instanceof byte[]) {
columns = this.metaData.getColumns(this.catalogName, schemaName, tableName, null); row.add(DbswitchStrUtils.toHexString((byte[]) value));
while (columns.next()) { } else if (value != null && value instanceof java.sql.Clob) {
String columnName = columns.getString("COLUMN_NAME"); row.add(TypeConvertUtils.castToString(value));
String remarks = columns.getString("REMARKS"); } else if (value != null && value instanceof java.sql.Blob) {
for (ColumnDescription cd : ret) { byte[] bytes = TypeConvertUtils.castToByteArray(value);
if (columnName.equalsIgnoreCase(cd.getFieldName())) { row.add(DbswitchStrUtils.toHexString(bytes));
cd.setRemarks(remarks); } else {
} row.add(null == value ? null : value.toString());
} }
} }
} catch (SQLException e) { data.getRows().add(row);
throw new RuntimeException(e); }
} finally {
try {
if (null != columns) {
columns.close();
columns = null;
}
} catch (SQLException e) {
}
}
return ret; return data;
} }
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
@Override @Override
public List<String> queryTablePrimaryKeys(String schemaName, String tableName) { public void testQuerySQL(Connection connection, String sql) {
Set<String> ret = new HashSet<>(); String wrapperSql = this.getTestQuerySQL(sql);
ResultSet primarykeys = null; try (Statement statement = connection.createStatement();) {
try { statement.execute(wrapperSql);
primarykeys = this.metaData.getPrimaryKeys(this.catalogName, schemaName, tableName); } catch (SQLException e) {
while (primarykeys.next()) { throw new RuntimeException(e);
String name = primarykeys.getString("COLUMN_NAME"); }
if (!ret.contains(name)) { }
ret.add(name);
}
}
return new ArrayList<>(ret);
} catch (SQLException e) {
throw new RuntimeException(e);
} finally {
try {
if (null != primarykeys) {
primarykeys.close();
primarykeys = null;
}
} catch (SQLException e) {
}
}
}
@Override @Override
public abstract List<ColumnDescription> querySelectSqlColumnMeta(String sql); public String getQuotedSchemaTableCombination(String schemaName, String tableName) {
return String.format(" \"%s\".\"%s\" ", schemaName, tableName);
}
@Override @Override
public void testQuerySQL(String sql) { public String getFieldDefinition(ColumnMetaData v, List<String> pks, boolean useAutoInc,
String wrapperSql = this.getTestQuerySQL(sql); boolean addCr, boolean withRemarks) {
try (Statement statement = this.connection.createStatement();) { throw new RuntimeException("AbstractDatabase Unimplemented!");
statement.execute(wrapperSql); }
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
@Override @Override
public String getQuotedSchemaTableCombination(String schemaName, String tableName) { public String getPrimaryKeyAsString(List<String> pks) {
return String.format(" \"%s\".\"%s\" ", schemaName, tableName); if (!pks.isEmpty()) {
} StringBuilder sb = new StringBuilder();
sb.append("\"");
sb.append(StringUtils.join(pks, "\" , \""));
sb.append("\"");
return sb.toString();
}
@Override return "";
public String getFieldDefinition(ColumnMetaData v, List<String> pks, boolean useAutoInc, boolean addCr) { }
throw new RuntimeException("AbstractDatabase Unempliment!");
}
@Override @Override
public String getPrimaryKeyAsString(List<String> pks) { public List<String> getTableColumnCommentDefinition(TableDescription td,
if (!pks.isEmpty()) { List<ColumnDescription> cds) {
StringBuilder sb = new StringBuilder(); throw new RuntimeException("AbstractDatabase Unimplemented!");
sb.append("\""); }
sb.append(StringUtils.join(pks, "\" , \""));
sb.append("\"");
return sb.toString();
}
return ""; /**************************************
} * internal function
**************************************/
/************************************** protected abstract String getTableFieldsQuerySQL(String schemaName, String tableName);
* internal function
**************************************/
protected abstract String getTableFieldsQuerySQL(String schemaName, String tableName); protected abstract String getTestQuerySQL(String sql);
protected abstract String getTestQuerySQL(String sql); protected List<ColumnDescription> getSelectSqlColumnMeta(Connection connection, String querySQL) {
List<ColumnDescription> ret = new ArrayList<>();
try (Statement st = connection.createStatement()) {
if (getDatabaseType() == DatabaseTypeEnum.HIVE) {
HivePrepareUtils.setResultSetColumnNameNotUnique(connection);
}
protected List<ColumnDescription> getSelectSqlColumnMeta(String querySQL, DatabaseTypeEnum dbtype) { try (ResultSet rs = st.executeQuery(querySQL)) {
List<ColumnDescription> ret = new ArrayList<ColumnDescription>(); ResultSetMetaData m = rs.getMetaData();
PreparedStatement pstmt = null; int columns = m.getColumnCount();
ResultSet rs = null; for (int i = 1; i <= columns; i++) {
String name = m.getColumnLabel(i);
if (null == name) {
name = m.getColumnName(i);
}
try { ColumnDescription cd = new ColumnDescription();
pstmt = this.connection.prepareStatement(querySQL); cd.setFieldName(name);
rs = pstmt.executeQuery(); cd.setLabelName(name);
cd.setFieldType(m.getColumnType(i));
if (0 != cd.getFieldType()) {
cd.setFieldTypeName(m.getColumnTypeName(i));
cd.setFiledTypeClassName(m.getColumnClassName(i));
cd.setDisplaySize(m.getColumnDisplaySize(i));
cd.setPrecisionSize(m.getPrecision(i));
cd.setScaleSize(m.getScale(i));
cd.setAutoIncrement(m.isAutoIncrement(i));
cd.setNullable(m.isNullable(i) != ResultSetMetaData.columnNoNulls);
} else {
// 处理视图中NULL as fieldName的情况
cd.setFieldTypeName("CHAR");
cd.setFiledTypeClassName(String.class.getName());
cd.setDisplaySize(1);
cd.setPrecisionSize(1);
cd.setScaleSize(0);
cd.setAutoIncrement(false);
cd.setNullable(true);
}
ResultSetMetaData m = rs.getMetaData(); boolean signed = false;
int columns = m.getColumnCount(); try {
for (int i = 1; i <= columns; i++) { signed = m.isSigned(i);
String name = m.getColumnLabel(i); } catch (Exception ignored) {
if (null == name) { // This JDBC Driver doesn't support the isSigned method
name = m.getColumnName(i); // nothing more we can do here by catch the exception.
} }
cd.setSigned(signed);
cd.setDbType(getDatabaseType());
ColumnDescription cd = new ColumnDescription(); ret.add(cd);
cd.setFieldName(name); }
cd.setLabelName(name);
cd.setFieldType(m.getColumnType(i));
if (0 != cd.getFieldType()) {
cd.setFieldTypeName(m.getColumnTypeName(i));
cd.setFiledTypeClassName(m.getColumnClassName(i));
cd.setDisplaySize(m.getColumnDisplaySize(i));
cd.setPrecisionSize(m.getPrecision(i));
cd.setScaleSize(m.getScale(i));
cd.setAutoIncrement(m.isAutoIncrement(i));
cd.setNullable(m.isNullable(i) != ResultSetMetaData.columnNoNulls);
} else {
// 处理视图中NULL as fieldName的情况
cd.setFieldTypeName("CHAR");
cd.setFiledTypeClassName(String.class.getName());
cd.setDisplaySize(1);
cd.setPrecisionSize(1);
cd.setScaleSize(0);
cd.setAutoIncrement(false);
cd.setNullable(true);
}
boolean signed = false; return ret;
try { }
signed = m.isSigned(i); } catch (SQLException e) {
} catch (Exception ignored) { throw new RuntimeException(e);
// This JDBC Driver doesn't support the isSigned method }
// nothing more we can do here by catch the exception. }
}
cd.setSigned(signed);
cd.setDbType(dbtype);
ret.add(cd);
}
return ret;
} catch (SQLException e) {
throw new RuntimeException(e);
} finally {
JdbcOperatorUtils.closeResultSet(rs);
JdbcOperatorUtils.closeStatement(pstmt);
}
}
} }

View File

@@ -34,7 +34,6 @@ import com.gitee.dbswitch.dbwriter.IDatabaseWriter;
import com.zaxxer.hikari.HikariDataSource; import com.zaxxer.hikari.HikariDataSource;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedList; import java.util.LinkedList;
@@ -114,6 +113,10 @@ public class MigrationHandler implements Supplier<Long> {
this.targetTableName = PatterNameUtils.getFinalName(td.getTableName(), this.targetTableName = PatterNameUtils.getFinalName(td.getTableName(),
sourceProperties.getRegexTableMapper()); sourceProperties.getRegexTableMapper());
if (StringUtils.isEmpty(this.targetTableName)) {
throw new RuntimeException("表名的映射规则配置有误,不能将[" + this.sourceTableName + "]映射为空");
}
this.tableNameMapString = String.format("%s.%s --> %s.%s", this.tableNameMapString = String.format("%s.%s --> %s.%s",
td.getSchemaName(), td.getTableName(), td.getSchemaName(), td.getTableName(),
targetSchemaName, targetTableName); targetSchemaName, targetTableName);
@@ -160,16 +163,26 @@ public class MigrationHandler implements Supplier<Long> {
String targetColumnName = targetColumnDescriptions.get(i).getFieldName(); String targetColumnName = targetColumnDescriptions.get(i).getFieldName();
if (StringUtils.hasLength(targetColumnName)) { if (StringUtils.hasLength(targetColumnName)) {
columnMapperPairs.add(String.format("%s --> %s", sourceColumnName, targetColumnName)); columnMapperPairs.add(String.format("%s --> %s", sourceColumnName, targetColumnName));
mapChecker.put(sourceColumnName, targetColumnName);
} else { } else {
columnMapperPairs.add(String.format("%s --> %s", sourceColumnName, "<!Field is Deleted>")); columnMapperPairs.add(String.format(
"%s --> %s",
sourceColumnName,
String.format("<!Field(%s) is Deleted>", (i + 1))
));
} }
mapChecker.put(sourceColumnName, targetColumnName);
} }
log.info("Mapping relation : \ntable mapper :\n\t{} \ncolumn mapper :\n\t{} ", log.info("Mapping relation : \ntable mapper :\n\t{} \ncolumn mapper :\n\t{} ",
tableNameMapString, columnMapperPairs.stream().collect(Collectors.joining("\n\t"))); tableNameMapString, String.join("\n\t", columnMapperPairs));
Set<String> valueSet = new HashSet<>(mapChecker.values()); Set<String> valueSet = new HashSet<>(mapChecker.values());
if (valueSet.size() <= 0) {
throw new RuntimeException("字段映射配置有误,禁止通过映射将表所有的字段都删除!");
}
if (!valueSet.containsAll(this.targetPrimaryKeys)) {
throw new RuntimeException("字段映射配置有误,禁止通过映射将表的主键字段删除!");
}
if (mapChecker.keySet().size() != valueSet.size()) { if (mapChecker.keySet().size() != valueSet.size()) {
throw new RuntimeException("字段映射配置有误,多个字段映射到一个同名字段!"); throw new RuntimeException("字段映射配置有误,禁止将多个字段映射到一个同名字段!");
} }
IDatabaseWriter writer = DatabaseWriterFactory.createDatabaseWriter( IDatabaseWriter writer = DatabaseWriterFactory.createDatabaseWriter(
@@ -258,24 +271,16 @@ public class MigrationHandler implements Supplier<Long> {
private Long doFullCoverSynchronize(IDatabaseWriter writer) { private Long doFullCoverSynchronize(IDatabaseWriter writer) {
final int BATCH_SIZE = fetchSize; final int BATCH_SIZE = fetchSize;
List<String> sourceFields = sourceColumnDescriptions.stream() List<String> sourceFields = new ArrayList<>();
.map(ColumnDescription::getFieldName) List<String> targetFields = new ArrayList<>();
.collect(Collectors.toList()); for (int i = 0; i < targetColumnDescriptions.size(); ++i) {
List<String> targetFields = targetColumnDescriptions.stream() ColumnDescription scd = sourceColumnDescriptions.get(i);
.map(ColumnDescription::getFieldName) ColumnDescription tcd = targetColumnDescriptions.get(i);
.collect(Collectors.toList()); if (!StringUtils.isEmpty(tcd.getFieldName())) {
List<Integer> deletedFieldIndexes = new ArrayList<>(); sourceFields.add(scd.getFieldName());
for (int i = 0; i < targetFields.size(); ++i) { targetFields.add(tcd.getFieldName());
if (StringUtils.isEmpty(targetFields.get(i))) {
deletedFieldIndexes.add(i);
} }
} }
Collections.reverse(deletedFieldIndexes);
deletedFieldIndexes.forEach(i -> {
sourceFields.remove(sourceFields.get(i));
targetFields.remove(targetFields.get(i));
});
// 准备目的端的数据写入操作 // 准备目的端的数据写入操作
writer.prepareWrite(targetSchemaName, targetTableName, targetFields); writer.prepareWrite(targetSchemaName, targetTableName, targetFields);
@@ -351,26 +356,18 @@ public class MigrationHandler implements Supplier<Long> {
*/ */
private Long doIncreaseSynchronize(IDatabaseWriter writer) { private Long doIncreaseSynchronize(IDatabaseWriter writer) {
final int BATCH_SIZE = fetchSize; final int BATCH_SIZE = fetchSize;
List<String> sourceFields = sourceColumnDescriptions.stream()
.map(ColumnDescription::getFieldName) List<String> sourceFields = new ArrayList<>();
.collect(Collectors.toList()); List<String> targetFields = new ArrayList<>();
List<String> targetFields = targetColumnDescriptions.stream()
.map(ColumnDescription::getFieldName)
.collect(Collectors.toList());
List<Integer> deletedFieldIndexes = new ArrayList<>();
for (int i = 0; i < targetFields.size(); ++i) {
if (StringUtils.isEmpty(targetFields.get(i))) {
deletedFieldIndexes.add(i);
}
}
Collections.reverse(deletedFieldIndexes);
deletedFieldIndexes.forEach(i -> {
sourceFields.remove(sourceFields.get(i));
targetFields.remove(targetFields.get(i));
});
Map<String, String> columnNameMaps = new HashMap<>(); Map<String, String> columnNameMaps = new HashMap<>();
for (int i = 0; i < sourceFields.size(); ++i) { for (int i = 0; i < targetColumnDescriptions.size(); ++i) {
columnNameMaps.put(sourceFields.get(i), targetFields.get(i)); ColumnDescription scd = sourceColumnDescriptions.get(i);
ColumnDescription tcd = targetColumnDescriptions.get(i);
if (!StringUtils.isEmpty(tcd.getFieldName())) {
sourceFields.add(scd.getFieldName());
targetFields.add(tcd.getFieldName());
columnNameMaps.put(scd.getFieldName(), tcd.getFieldName());
}
} }
TaskParamEntity.TaskParamEntityBuilder taskBuilder = TaskParamEntity.builder(); TaskParamEntity.TaskParamEntityBuilder taskBuilder = TaskParamEntity.builder();