mongo数据同步

This commit is contained in:
inrgihc
2023-08-24 22:49:45 +08:00
parent dd7e249ac5
commit ec61d0d428
7 changed files with 45 additions and 32 deletions

View File

@@ -79,12 +79,6 @@ public class AssignmentService {
"不支持目的端数据源为远程服务器上的SQLite或内存方式下的SQLite"); "不支持目的端数据源为远程服务器上的SQLite或内存方式下的SQLite");
} }
} }
Long sourceConnectionId = assignmentConfigEntity.getSourceConnectionId();
DatabaseConnectionEntity sourceEntity = databaseConnectionDAO.getById(sourceConnectionId);
if (ProductTypeEnum.MONGODB == sourceEntity.getType()) {
throw new DbswitchException(ResultCode.ERROR_INVALID_ASSIGNMENT_CONFIG,
"不支持源端数据源为MongoDB数据库");
}
return ConverterFactory.getConverter(AssignmentInfoConverter.class) return ConverterFactory.getConverter(AssignmentInfoConverter.class)
.convert(assignmentTaskDAO.getById(assignment.getId())); .convert(assignmentTaskDAO.getById(assignment.getId()));
@@ -120,12 +114,6 @@ public class AssignmentService {
"不支持目的端数据源为远程服务器上的SQLite或内存方式下的SQLite"); "不支持目的端数据源为远程服务器上的SQLite或内存方式下的SQLite");
} }
} }
Long sourceConnectionId = assignmentConfigEntity.getSourceConnectionId();
DatabaseConnectionEntity sourceEntity = databaseConnectionDAO.getById(sourceConnectionId);
if (ProductTypeEnum.MONGODB == sourceEntity.getType()) {
throw new DbswitchException(ResultCode.ERROR_INVALID_ASSIGNMENT_CONFIG,
"不支持源端数据源为MongoDB数据库");
}
} }
public PageResult<AssignmentInfoResponse> listAll(String searchText, Integer page, Integer size) { public PageResult<AssignmentInfoResponse> listAll(String searchText, Integer page, Integer size) {

View File

@@ -1,5 +1,6 @@
package com.gitee.dbswitch.common.util; package com.gitee.dbswitch.common.util;
import cn.hutool.json.JSONUtil;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream; import java.io.ObjectOutputStream;
import java.lang.reflect.Method; import java.lang.reflect.Method;
@@ -13,6 +14,7 @@ import java.time.LocalDate;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.LocalTime; import java.time.LocalTime;
import java.time.ZoneId; import java.time.ZoneId;
import java.util.Map;
import java.util.function.Function; import java.util.function.Function;
import lombok.experimental.UtilityClass; import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@@ -905,9 +907,11 @@ public final class ObjectCastUtils {
return in.toString(); return in.toString();
} else if (in instanceof byte[]) { } else if (in instanceof byte[]) {
return new String((byte[]) in); return new String((byte[]) in);
} else if (in instanceof Map) {
return JSONUtil.toJsonStr(in);
} }
return null; return null != in ? in.toString() : null;
} }
public static String objectToString(final Object in) { public static String objectToString(final Object in) {

View File

@@ -235,7 +235,7 @@ public class MigrationHandler implements Supplier<Long> {
TableOperateProvider targetOperator = targetFactoryProvider.createTableOperateProvider(); TableOperateProvider targetOperator = targetFactoryProvider.createTableOperateProvider();
TableDataSynchronizer targetSynchronizer = targetFactoryProvider.createTableDataSynchronizer(); TableDataSynchronizer targetSynchronizer = targetFactoryProvider.createTableDataSynchronizer();
if (targetProductType.isMongodb()) { if (sourceProductType.isMongodb() || targetProductType.isMongodb()) {
try { try {
targetFactoryProvider.createTableOperateProvider() targetFactoryProvider.createTableOperateProvider()
.dropTable(targetSchemaName, targetTableName); .dropTable(targetSchemaName, targetTableName);

View File

@@ -9,6 +9,7 @@
///////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////
package com.gitee.dbswitch.product.mongodb; package com.gitee.dbswitch.product.mongodb;
import com.gitee.dbswitch.common.consts.Constants;
import com.gitee.dbswitch.provider.ProductFactoryProvider; import com.gitee.dbswitch.provider.ProductFactoryProvider;
import com.gitee.dbswitch.provider.meta.AbstractMetadataProvider; import com.gitee.dbswitch.provider.meta.AbstractMetadataProvider;
import com.gitee.dbswitch.schema.ColumnDescription; import com.gitee.dbswitch.schema.ColumnDescription;
@@ -20,12 +21,13 @@ import java.sql.Connection;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.ResultSetMetaData; import java.sql.ResultSetMetaData;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.math.NumberUtils;
@Slf4j @Slf4j
public class MongodbMetadataQueryProvider extends AbstractMetadataProvider { public class MongodbMetadataQueryProvider extends AbstractMetadataProvider {
@@ -112,22 +114,33 @@ public class MongodbMetadataQueryProvider extends AbstractMetadataProvider {
public List<ColumnDescription> queryTableColumnMeta(Connection connection, String schemaName, public List<ColumnDescription> queryTableColumnMeta(Connection connection, String schemaName,
String tableName) { String tableName) {
List<ColumnDescription> ret = new ArrayList<>(); List<ColumnDescription> ret = new ArrayList<>();
try (ResultSet rs = connection.getMetaData().getColumns(schemaName, null, tableName, null)) { String sql = String.format("%s.%s.find().limit(1);", schemaName, tableName);
ResultSetMetaData m = rs.getMetaData(); try (Statement stmt = connection.createStatement()) {
while (rs.next()) { try (ResultSet rs = stmt.executeQuery(sql)) {
ColumnDescription cd = new ColumnDescription(); ResultSetMetaData metaData = rs.getMetaData();
cd.setFieldName(rs.getString("COLUMN_NAME")); for (int i = 1; i <= metaData.getColumnCount(); i++) {
cd.setLabelName(rs.getString("COLUMN_NAME")); String name = metaData.getColumnName(i);
cd.setFieldType(NumberUtils.toInt(rs.getString("DATA_TYPE"))); int jdbcType = metaData.getColumnType(i);
cd.setFieldTypeName(rs.getString("TYPE_NAME")); int displaySize = ("_id".equals(name)) ? 128 : 0;
cd.setFiledTypeClassName(rs.getString("DATA_TYPE")); if (Types.JAVA_OBJECT == jdbcType) {
cd.setDisplaySize(0); jdbcType = Types.LONGVARCHAR;
cd.setPrecisionSize(0); displaySize = Constants.CLOB_LENGTH;
cd.setScaleSize(0); }
cd.setAutoIncrement(false);
cd.setNullable(!"_id".equals(cd.getFieldName())); ColumnDescription cd = new ColumnDescription();
cd.setProductType(getProductType()); cd.setFieldName(name);
ret.add(cd); cd.setLabelName(name);
cd.setFieldType(jdbcType);
cd.setFieldTypeName(metaData.getColumnTypeName(i));
cd.setFiledTypeClassName(metaData.getColumnTypeName(i));
cd.setDisplaySize(displaySize);
cd.setPrecisionSize(0);
cd.setScaleSize(0);
cd.setAutoIncrement(false);
cd.setNullable(!"_id".equals(cd.getFieldName()));
cd.setProductType(getProductType());
ret.add(cd);
}
} }
} catch (SQLException e) { } catch (SQLException e) {
throw new RuntimeException(e); throw new RuntimeException(e);

View File

@@ -51,7 +51,7 @@ public class MongodbTableDataQueryProvider implements TableDataQueryProvider {
try { try {
Connection connection = this.dataSource.getConnection(); Connection connection = this.dataSource.getConnection();
Statement statement = connection.createStatement(); Statement statement = connection.createStatement();
statement.setQueryTimeout(Constants.DEFAULT_QUERY_TIMEOUT_SECONDS); //statement.setQueryTimeout(Constants.DEFAULT_QUERY_TIMEOUT_SECONDS);
return ResultSetWrapper.builder() return ResultSetWrapper.builder()
.connection(connection) .connection(connection)
.statement(statement) .statement(statement)

View File

@@ -36,6 +36,14 @@ public class MongodbTableDataWriteProvider extends DefaultTableDataWriteProvider
this.columnType = Collections.emptyMap(); this.columnType = Collections.emptyMap();
this.schemaName = schemaName; this.schemaName = schemaName;
this.tableName = tableName; this.tableName = tableName;
try (Connection connection = getDataSource().getConnection()) {
try (Statement stmt = connection.createStatement()) {
stmt.executeUpdate(String.format("%s.%s.drop();", schemaName, tableName));
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
} }
@Override @Override