代码接口调整

This commit is contained in:
inrgihc
2024-07-24 21:45:20 +08:00
parent 5ec8f5c48a
commit 815528b817
25 changed files with 179 additions and 160 deletions

View File

@@ -31,7 +31,6 @@ import com.gitee.dbswitch.data.domain.ReaderTaskParam;
import com.gitee.dbswitch.data.domain.ReaderTaskResult;
import com.gitee.dbswitch.data.entity.SourceDataSourceProperties;
import com.gitee.dbswitch.data.entity.TargetDataSourceProperties;
import com.gitee.dbswitch.data.util.HiveTblUtils;
import com.gitee.dbswitch.provider.ProductFactoryProvider;
import com.gitee.dbswitch.provider.ProductProviderFactory;
import com.gitee.dbswitch.provider.manage.TableManageProvider;
@@ -42,6 +41,7 @@ import com.gitee.dbswitch.provider.transform.RecordTransformProvider;
import com.gitee.dbswitch.provider.write.TableDataWriteProvider;
import com.gitee.dbswitch.schema.ColumnDescription;
import com.gitee.dbswitch.schema.TableDescription;
import com.gitee.dbswitch.schema.SourceProperties;
import com.gitee.dbswitch.service.DefaultMetadataService;
import com.gitee.dbswitch.service.MetadataService;
import com.google.common.collect.Lists;
@@ -692,12 +692,20 @@ public class ReaderTaskThread extends TaskProcessor<ReaderTaskResult> {
.build();
}
public Map<String, String> getTblProperties() {
if (targetProductType.isLikeHive()) {
return HiveTblUtils.getTblProperties(sourceProductType, sourceDataSource,
sourceSchemaName, sourceTableName, sourceColumnDescriptions);
}
return new HashMap<>();
public SourceProperties getTblProperties() {
List<String> columnNames = sourceColumnDescriptions.stream()
.map(ColumnDescription::getFieldName)
.collect(Collectors.toList());
SourceProperties param = new SourceProperties();
param.setProductType(sourceProductType);
param.setDriverClass(sourceDataSource.getDriverClass());
param.setJdbcUrl(sourceDataSource.getJdbcUrl());
param.setUsername(sourceDataSource.getUserName());
param.setPassword(sourceDataSource.getPassword());
param.setSchemaName(sourceSchemaName);
param.setTableName(sourceTableName);
param.setColumnNames(columnNames);
return param;
}
@Override

View File

@@ -1,62 +0,0 @@
package com.gitee.dbswitch.data.util;
import com.gitee.dbswitch.common.entity.CloseableDataSource;
import com.gitee.dbswitch.common.type.ProductTypeEnum;
import com.gitee.dbswitch.common.util.ExamineUtils;
import com.gitee.dbswitch.schema.ColumnDescription;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import lombok.experimental.UtilityClass;
@UtilityClass
public class HiveTblUtils {
// hive.sql.database.type: MYSQL, POSTGRES, ORACLE, DERBY, DB2
private final static List<ProductTypeEnum> supportedProductTypes =
Arrays.asList(ProductTypeEnum.MYSQL, ProductTypeEnum.ORACLE,
ProductTypeEnum.DB2, ProductTypeEnum.POSTGRESQL);
/**
* https://cwiki.apache.org/confluence/display/Hive/JDBC+Storage+Handler
*
* @return Map<String, String>
*/
public static Map<String, String> getTblProperties(ProductTypeEnum sourceProductType,
CloseableDataSource sourceDataSource, String sourceSchemaName,
String sourceTableName, List<ColumnDescription> sourceColumnDescriptions) {
ExamineUtils.check(supportedProductTypes.contains(sourceProductType),
"Unsupported data from %s to Hive", sourceProductType.name());
Map<String, String> ret = new HashMap<>();
String querySql = String.format("SELECT %s FROM %s",
sourceColumnDescriptions.stream()
.map(ColumnDescription::getFieldName)
.map(s -> sourceProductType.quoteName(s))
.collect(Collectors.joining(",")),
sourceProductType.quoteSchemaTableName(sourceSchemaName, sourceTableName));
String databaseType;
if (ProductTypeEnum.POSTGRESQL == sourceProductType) {
databaseType = "POSTGRES";
} else if (ProductTypeEnum.SQLSERVER == sourceProductType) {
databaseType = "MSSQL";
} else {
databaseType = sourceProductType.name().toUpperCase();
}
ret.put("hive.sql.database.type", databaseType);
ret.put("hive.sql.jdbc.driver", sourceDataSource.getDriverClass());
ret.put("hive.sql.jdbc.url", sourceDataSource.getJdbcUrl());
ret.put("hive.sql.dbcp.username", sourceDataSource.getUserName());
ret.put("hive.sql.dbcp.password", sourceDataSource.getPassword());
ret.put("hive.sql.query", querySql);
ret.put("hive.sql.jdbc.read-write", "read");
ret.put("hive.sql.jdbc.fetch.size", "2000");
ret.put("hive.sql.dbcp.maxActive", "1");
return ret;
}
}