建表SQL结构调整

This commit is contained in:
inrgihc
2024-07-13 23:13:06 +08:00
parent 226e014494
commit 6b3a148fdf
12 changed files with 271 additions and 149 deletions

View File

@@ -240,6 +240,15 @@ public enum ProductTypeEnum {
return String.format("%s%s%s.%s%s%s", quote, schema, quote, quote, table, quote);
}
/**
* 建表语句中主键字段是否必须放在最前边
*
* @return boolean
*/
public boolean isPrimaryKeyShouldAtFirst() {
return this == STARROCKS;
}
/**
* 类似于PostgreSQL系列的数据库类型
*
@@ -249,15 +258,6 @@ public enum ProductTypeEnum {
return this == POSTGRESQL || this == KINGBASE || this == OPENGAUSS;
}
/**
* 类似于Starrocks系列的数据库类型
*
* @return boolean
*/
public boolean isLikeStarRocks() {
return this == STARROCKS;
}
/**
* 类似于MySQL系列的数据库类型
*
@@ -267,15 +267,6 @@ public enum ProductTypeEnum {
return this == MYSQL || this == MARIADB;
}
/**
* 类似于MySQL系列的数据库类型
*
* @return boolean
*/
public boolean isLikeGbase8a() {
return this == GBASE8A;
}
/**
* 类似于Oracle系列的数据库类型
*
@@ -285,15 +276,6 @@ public enum ProductTypeEnum {
return this == ORACLE || this == DM;
}
/**
* 类似于SQL Server系列的数据库类型
*
* @return boolean
*/
public boolean isLikeSqlServer() {
return this == SQLSERVER || this == SYBASE;
}
/**
* 类似于Hive系列的数据库类型
*

View File

@@ -33,11 +33,9 @@ import java.util.Objects;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.util.StreamUtils;
/**
* 数据库元信息抽象基类
@@ -104,7 +102,7 @@ public abstract class AbstractMetadataProvider
public TableDescription queryTableMeta(Connection connection, String schemaName,
String tableName) {
try (ResultSet tables = connection.getMetaData()
.getTables(catalogName, schemaName, tableName, new String[]{"TABLE","VIEW"})) {
.getTables(catalogName, schemaName, tableName, new String[]{"TABLE", "VIEW"})) {
if (tables.next()) {
TableDescription td = new TableDescription();
td.setSchemaName(schemaName);
@@ -233,6 +231,26 @@ public abstract class AbstractMetadataProvider
throw new RuntimeException("AbstractDatabase Unimplemented!");
}
@Override
public void preAppendCreateTableSql(StringBuilder builder) {
// NOTHING, Please override by subclass!
}
@Override
public void appendPrimaryKeyForCreateTableSql(StringBuilder builder, List<String> primaryKeys) {
// 不支持主键的数据库类型(例如hive),需要覆盖掉该方法
if (CollectionUtils.isNotEmpty(primaryKeys)) {
String primaryKeyAsString = getPrimaryKeyAsString(primaryKeys);
builder.append(", PRIMARY KEY (").append(primaryKeyAsString).append(")");
}
}
@Override
public void postAppendCreateTableSql(StringBuilder builder, String tblComment, List<String> primaryKeys,
Map<String, String> tblProperties) {
// Nothing, please override by subclass!
}
@Override
public String getPrimaryKeyAsString(List<String> pks) {
if (!pks.isEmpty()) {

View File

@@ -16,6 +16,7 @@ import com.gitee.dbswitch.schema.IndexDescription;
import com.gitee.dbswitch.schema.TableDescription;
import java.sql.Connection;
import java.util.List;
import java.util.Map;
/**
* 元数据查询
@@ -157,6 +158,33 @@ public interface MetadataProvider {
String getFieldDefinition(ColumnMetaData v, List<String> pks, boolean useAutoInc, boolean addCr,
boolean withRemarks);
/**
* 前置补充建表SQL
*
* @param builder 建表SQL的字符串构造器
*/
void preAppendCreateTableSql(StringBuilder builder);
/**
* 在建表SQl中补充主键
*
* @param builder 建表SQL的字符串构造器
* @param primaryKeys 主键字段列表
*/
void appendPrimaryKeyForCreateTableSql(StringBuilder builder, List<String> primaryKeys);
/**
* 后置补充建表SQL
*
* @param builder 建表SQL的字符串构造器
* @param tblComment 表的注释
* @param primaryKeys 主键列表
* @param tblProperties 表的属性
*/
void postAppendCreateTableSql(StringBuilder builder, String tblComment, List<String> primaryKeys,
Map<String, String> tblProperties);
/**
* 主键列转换为逗号分隔的字符串
*
@@ -173,4 +201,20 @@ public interface MetadataProvider {
* @return 定义字符串列表
*/
List<String> getTableColumnCommentDefinition(TableDescription td, List<ColumnDescription> cds);
/**
* 为hive定制的获取联邦建表导数SQL列表
*
* @param fieldNames 字段结构信息
* @param primaryKeys 主键字段信息
* @param schemaName 模式名称
* @param tableName 表名称
* @param autoIncr 是否允许主键自增
* @param tblProperties 表的属性信息
* @return 建表导数SQL列表
*/
default List<String> getCreateTableSqlList(List<ColumnDescription> fieldNames, List<String> primaryKeys,
String schemaName, String tableName, String tableRemarks, boolean autoIncr, Map<String, String> tblProperties) {
throw new UnsupportedOperationException("Unsupported function!");
}
}

View File

@@ -13,7 +13,6 @@ import com.gitee.dbswitch.common.consts.Constants;
import com.gitee.dbswitch.common.type.ProductTableEnum;
import com.gitee.dbswitch.common.type.ProductTypeEnum;
import com.gitee.dbswitch.common.util.DDLFormatterUtils;
import com.gitee.dbswitch.common.util.UuidUtils;
import com.gitee.dbswitch.provider.meta.MetadataProvider;
import com.gitee.dbswitch.schema.ColumnDescription;
import com.gitee.dbswitch.schema.ColumnMetaData;
@@ -28,7 +27,6 @@ import java.util.Set;
import java.util.stream.Collectors;
import lombok.experimental.UtilityClass;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
/**
* 拼接SQL工具类
@@ -38,8 +36,6 @@ import org.apache.commons.lang3.StringUtils;
@UtilityClass
public final class GenerateSqlUtils {
private static final boolean HIVE_USE_CTAS = false;
public static String getDDLCreateTableSQL(
MetadataProvider provider,
List<ColumnDescription> fieldNames,
@@ -79,21 +75,18 @@ public final class GenerateSqlUtils {
.collect(Collectors.toList());
sb.append(Constants.CREATE_TABLE);
// if(ifNotExist && !type.isLikeOracle()) {
// sb.append( Const.IF_NOT_EXISTS );
// }
provider.preAppendCreateTableSql(sb);
sb.append(provider.getQuotedSchemaTableCombination(schemaName, tableName));
sb.append("(");
// starrocks 当中,字段主键的情况下,必须将字段放在最前面,并且顺序一致。
if (type.isLikeStarRocks()) {
// StarRocks 当中,字段主键的情况下,必须将字段放在最前面,并且顺序一致。
if (type.isPrimaryKeyShouldAtFirst()) {
List<ColumnDescription> copyFieldNames = new ArrayList<>();
Integer fieldIndex = 0;
for (int i = 0; i < fieldNames.size(); i++) {
ColumnDescription cd = fieldNames.get(i);
if (primaryKeys.contains(cd.getFieldName())) {
copyFieldNames.add(fieldIndex, cd);
fieldIndex = fieldIndex + 1;
copyFieldNames.add(fieldIndex++, cd);
} else {
copyFieldNames.add(cd);
}
@@ -112,51 +105,9 @@ public final class GenerateSqlUtils {
sb.append(provider.getFieldDefinition(v, pks, autoIncr, false, withRemarks));
}
if (!pks.isEmpty() && !type.isLikeHive() && !type.isLikeStarRocks()) {
String pk = provider.getPrimaryKeyAsString(pks);
sb.append(", PRIMARY KEY (").append(pk).append(")");
}
provider.appendPrimaryKeyForCreateTableSql(sb, pks);
sb.append(")");
if (type.isLikeGbase8a()) {
sb.append("ENGINE=EXPRESS DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin");
if (withRemarks && StringUtils.isNotBlank(tableRemarks)) {
sb.append(String.format(" COMMENT='%s' ", tableRemarks.replace("'", "\\'")));
}
} else if (type.isLikeMysql()) {
sb.append("ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin");
if (withRemarks && StringUtils.isNotBlank(tableRemarks)) {
sb.append(String.format(" COMMENT='%s' ", tableRemarks.replace("'", "\\'")));
}
} else if (type.isLikeHive()) {
if (null != tblProperties && !tblProperties.isEmpty()) {
List<String> kvProperties = new ArrayList<>();
tblProperties.forEach((k, v) -> kvProperties.add(String.format("\t\t'%s' = '%s'", k, v)));
sb.append(Constants.CR);
sb.append("STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler'");
sb.append(Constants.CR);
sb.append("TBLPROPERTIES (");
sb.append(kvProperties.stream().collect(Collectors.joining(",\n")));
sb.append(")");
} else {
sb.append(Constants.CR);
sb.append("STORED AS ORC");
}
} else if (type.isClickHouse()) {
sb.append("ENGINE=MergeTree");
if (CollectionUtils.isEmpty(pks)) {
sb.append(Constants.CR);
sb.append("ORDER BY tuple()");
}
if (withRemarks && StringUtils.isNotBlank(tableRemarks)) {
sb.append(Constants.CR);
sb.append(String.format("COMMENT '%s' ", tableRemarks.replace("'", "\\'")));
}
} else if (type.isLikeStarRocks()) {
String pk = provider.getPrimaryKeyAsString(pks);
sb.append("PRIMARY KEY (").append(pk).append(")");
sb.append("\n DISTRIBUTED BY HASH(").append(pk).append(")");
}
provider.postAppendCreateTableSql(sb, tableRemarks, pks, tblProperties);
return DDLFormatterUtils.format(sb.toString());
}
@@ -172,29 +123,8 @@ public final class GenerateSqlUtils {
Map<String, String> tblProperties) {
ProductTypeEnum productType = provider.getProductType();
if (productType.isLikeHive()) {
List<String> sqlLists = new ArrayList<>();
String tmpTableName = "tmp_" + UuidUtils.generateUuid();
String createTableSql = getDDLCreateTableSQL(provider, fieldNames, primaryKeys, schemaName,
tmpTableName, true, tableRemarks, autoIncr, tblProperties);
sqlLists.add(createTableSql);
if (HIVE_USE_CTAS) {
String createAsTableSql = String.format("CREATE TABLE `%s`.`%s` STORED AS ORC AS (SELECT * FROM `%s`.`%s`)",
schemaName, tableName, schemaName, tmpTableName);
sqlLists.add(createAsTableSql);
} else {
String createAsTableSql = getDDLCreateTableSQL(provider, fieldNames, primaryKeys, schemaName,
tableName, true, tableRemarks, autoIncr, null);
sqlLists.add(createAsTableSql);
String selectColumns = fieldNames.stream()
.map(s -> String.format("`%s`", s.getFieldName()))
.collect(Collectors.joining(","));
String insertIntoSql = String.format("INSERT INTO `%s`.`%s` SELECT %s FROM `%s`.`%s`",
schemaName, tableName, selectColumns, schemaName, tmpTableName);
sqlLists.add(insertIntoSql);
}
String dropTmpTableSql = String.format("DROP TABLE IF EXISTS `%s`.`%s`", schemaName, tmpTableName);
sqlLists.add(dropTmpTableSql);
return sqlLists;
return provider.getCreateTableSqlList(
fieldNames, primaryKeys, schemaName, tableName, tableRemarks, autoIncr, tblProperties);
} else if (productType.noCommentStatement()) {
String createTableSql = getDDLCreateTableSQL(provider, fieldNames, primaryKeys, schemaName,
tableName, true, tableRemarks, autoIncr, tblProperties);

View File

@@ -21,7 +21,6 @@ import com.gitee.dbswitch.common.entity.ResultSetWrapper;
import com.gitee.dbswitch.common.type.CaseConvertEnum;
import com.gitee.dbswitch.common.type.ProductTypeEnum;
import com.gitee.dbswitch.common.util.DatabaseAwareUtils;
import com.gitee.dbswitch.common.util.ExamineUtils;
import com.gitee.dbswitch.common.util.JdbcTypesUtils;
import com.gitee.dbswitch.common.util.PatterNameUtils;
import com.gitee.dbswitch.core.exchange.BatchElement;
@@ -32,6 +31,7 @@ import com.gitee.dbswitch.data.domain.ReaderTaskParam;
import com.gitee.dbswitch.data.domain.ReaderTaskResult;
import com.gitee.dbswitch.data.entity.SourceDataSourceProperties;
import com.gitee.dbswitch.data.entity.TargetDataSourceProperties;
import com.gitee.dbswitch.data.util.HiveTblUtils;
import com.gitee.dbswitch.provider.ProductFactoryProvider;
import com.gitee.dbswitch.provider.ProductProviderFactory;
import com.gitee.dbswitch.provider.manage.TableManageProvider;
@@ -48,7 +48,6 @@ import com.google.common.collect.Lists;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
@@ -693,46 +692,12 @@ public class ReaderTaskThread extends TaskProcessor<ReaderTaskResult> {
.build();
}
/**
* https://cwiki.apache.org/confluence/display/Hive/JDBC+Storage+Handler
*
* @return Map<String, String>
*/
public Map<String, String> getTblProperties() {
Map<String, String> ret = new HashMap<>();
if (targetProductType.isLikeHive()) {
// hive.sql.database.type: MYSQL, POSTGRES, ORACLE, DERBY, DB2
final List<ProductTypeEnum> supportedProductTypes =
Arrays.asList(ProductTypeEnum.MYSQL, ProductTypeEnum.ORACLE,
ProductTypeEnum.DB2, ProductTypeEnum.POSTGRESQL);
ExamineUtils.check(supportedProductTypes.contains(sourceProductType),
"Unsupported data from %s to Hive", sourceProductType.name());
String fullTableName = sourceProductType.quoteSchemaTableName(sourceSchemaName, sourceTableName);
List<String> columnNames = sourceColumnDescriptions.stream().map(ColumnDescription::getFieldName)
.collect(Collectors.toList());
String querySql = String.format("SELECT %s FROM %s",
columnNames.stream()
.map(s -> sourceProductType.quoteName(s))
.collect(Collectors.joining(",")),
fullTableName);
String databaseType = sourceProductType.name().toUpperCase();
if (ProductTypeEnum.POSTGRESQL == sourceProductType) {
databaseType = "POSTGRES";
} else if (ProductTypeEnum.SQLSERVER == sourceProductType) {
databaseType = "MSSQL";
}
ret.put("hive.sql.database.type", databaseType);
ret.put("hive.sql.jdbc.driver", sourceDataSource.getDriverClass());
ret.put("hive.sql.jdbc.url", sourceDataSource.getJdbcUrl());
ret.put("hive.sql.dbcp.username", sourceDataSource.getUserName());
ret.put("hive.sql.dbcp.password", sourceDataSource.getPassword());
ret.put("hive.sql.query", querySql);
ret.put("hive.sql.jdbc.read-write", "read");
ret.put("hive.sql.jdbc.fetch.size", "2000");
ret.put("hive.sql.dbcp.maxActive", "1");
return HiveTblUtils.getTblProperties(sourceProductType, sourceDataSource,
sourceSchemaName, sourceTableName, sourceColumnDescriptions);
}
return ret;
return new HashMap<>();
}
@Override

View File

@@ -0,0 +1,62 @@
package com.gitee.dbswitch.data.util;
import com.gitee.dbswitch.common.entity.CloseableDataSource;
import com.gitee.dbswitch.common.type.ProductTypeEnum;
import com.gitee.dbswitch.common.util.ExamineUtils;
import com.gitee.dbswitch.schema.ColumnDescription;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import lombok.experimental.UtilityClass;
@UtilityClass
public class HiveTblUtils {
// hive.sql.database.type: MYSQL, POSTGRES, ORACLE, DERBY, DB2
private final static List<ProductTypeEnum> supportedProductTypes =
Arrays.asList(ProductTypeEnum.MYSQL, ProductTypeEnum.ORACLE,
ProductTypeEnum.DB2, ProductTypeEnum.POSTGRESQL);
/**
* https://cwiki.apache.org/confluence/display/Hive/JDBC+Storage+Handler
*
* @return Map<String, String>
*/
public static Map<String, String> getTblProperties(ProductTypeEnum sourceProductType,
CloseableDataSource sourceDataSource, String sourceSchemaName,
String sourceTableName, List<ColumnDescription> sourceColumnDescriptions) {
ExamineUtils.check(supportedProductTypes.contains(sourceProductType),
"Unsupported data from %s to Hive", sourceProductType.name());
Map<String, String> ret = new HashMap<>();
String querySql = String.format("SELECT %s FROM %s",
sourceColumnDescriptions.stream()
.map(ColumnDescription::getFieldName)
.map(s -> sourceProductType.quoteName(s))
.collect(Collectors.joining(",")),
sourceProductType.quoteSchemaTableName(sourceSchemaName, sourceTableName));
String databaseType;
if (ProductTypeEnum.POSTGRESQL == sourceProductType) {
databaseType = "POSTGRES";
} else if (ProductTypeEnum.SQLSERVER == sourceProductType) {
databaseType = "MSSQL";
} else {
databaseType = sourceProductType.name().toUpperCase();
}
ret.put("hive.sql.database.type", databaseType);
ret.put("hive.sql.jdbc.driver", sourceDataSource.getDriverClass());
ret.put("hive.sql.jdbc.url", sourceDataSource.getJdbcUrl());
ret.put("hive.sql.dbcp.username", sourceDataSource.getUserName());
ret.put("hive.sql.dbcp.password", sourceDataSource.getPassword());
ret.put("hive.sql.query", querySql);
ret.put("hive.sql.jdbc.read-write", "read");
ret.put("hive.sql.jdbc.fetch.size", "2000");
ret.put("hive.sql.dbcp.maxActive", "1");
return ret;
}
}

View File

@@ -23,8 +23,10 @@ import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
@Slf4j
@@ -299,4 +301,17 @@ public class ClickhouseMetadataQueryProvider extends AbstractMetadataProvider {
return Collections.emptyList();
}
@Override
public void postAppendCreateTableSql(StringBuilder builder, String tblComment, List<String> primaryKeys,
Map<String, String> tblProperties) {
builder.append("ENGINE=MergeTree");
if (CollectionUtils.isEmpty(primaryKeys)) {
builder.append(Constants.CR);
builder.append("ORDER BY tuple()");
}
if (StringUtils.isNotBlank(tblComment)) {
builder.append(Constants.CR);
builder.append(String.format("COMMENT '%s' ", tblComment.replace("'", "\\'")));
}
}
}

View File

@@ -12,7 +12,6 @@ package com.gitee.dbswitch.product.gbase;
import com.gitee.dbswitch.annotation.Product;
import com.gitee.dbswitch.common.type.ProductTypeEnum;
import com.gitee.dbswitch.features.ProductFeatures;
import com.gitee.dbswitch.product.mysql.MysqlMetadataQueryProvider;
import com.gitee.dbswitch.provider.AbstractFactoryProvider;
import com.gitee.dbswitch.provider.meta.MetadataProvider;
import com.gitee.dbswitch.provider.sync.AutoCastTableDataSynchronizeProvider;
@@ -34,7 +33,7 @@ public class GbaseFactoryProvider extends AbstractFactoryProvider {
@Override
public MetadataProvider createMetadataQueryProvider() {
return new MysqlMetadataQueryProvider(this);
return new GbaseMetadataQueryProvider(this);
}
@Override

View File

@@ -0,0 +1,23 @@
package com.gitee.dbswitch.product.gbase;
import com.gitee.dbswitch.product.mysql.MysqlMetadataQueryProvider;
import com.gitee.dbswitch.provider.ProductFactoryProvider;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
public class GbaseMetadataQueryProvider extends MysqlMetadataQueryProvider {
public GbaseMetadataQueryProvider(ProductFactoryProvider factoryProvider) {
super(factoryProvider);
}
@Override
public void postAppendCreateTableSql(StringBuilder builder, String tblComment, List<String> primaryKeys,
Map<String, String> tblProperties) {
builder.append("ENGINE=EXPRESS DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin");
if (StringUtils.isNotBlank(tblComment)) {
builder.append(String.format(" COMMENT='%s' ", tblComment.replace("'", "\\'")));
}
}
}

View File

@@ -11,12 +11,14 @@ package com.gitee.dbswitch.product.hive;
import com.gitee.dbswitch.common.consts.Constants;
import com.gitee.dbswitch.common.type.ProductTypeEnum;
import com.gitee.dbswitch.common.util.UuidUtils;
import com.gitee.dbswitch.provider.ProductFactoryProvider;
import com.gitee.dbswitch.provider.meta.AbstractMetadataProvider;
import com.gitee.dbswitch.schema.ColumnDescription;
import com.gitee.dbswitch.schema.ColumnMetaData;
import com.gitee.dbswitch.schema.IndexDescription;
import com.gitee.dbswitch.schema.TableDescription;
import com.gitee.dbswitch.util.GenerateSqlUtils;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
@@ -25,13 +27,17 @@ import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
@Slf4j
public class HiveMetadataQueryProvider extends AbstractMetadataProvider {
private static final boolean HIVE_USE_CTAS = false;
private static final String SHOW_CREATE_TABLE_SQL = "SHOW CREATE TABLE `%s`.`%s` ";
public HiveMetadataQueryProvider(ProductFactoryProvider factoryProvider) {
@@ -170,4 +176,56 @@ public class HiveMetadataQueryProvider extends AbstractMetadataProvider {
return Collections.emptyList();
}
@Override
public void appendPrimaryKeyForCreateTableSql(StringBuilder builder, List<String> primaryKeys) {
// HIVE表没有主键
}
@Override
public void postAppendCreateTableSql(StringBuilder builder, String tblComment, List<String> primaryKeys,
Map<String, String> tblProperties) {
if (MapUtils.isNotEmpty(tblProperties)) {
builder.append(Constants.CR);
builder.append("STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler'");
builder.append(Constants.CR);
builder.append("TBLPROPERTIES (");
builder.append(
tblProperties.entrySet().stream()
.map(entry -> String.format("\t\t'%s' = '%s'", entry.getKey(), entry.getValue()))
.collect(Collectors.joining(",\n")));
builder.append(")");
} else {
builder.append(Constants.CR);
builder.append("STORED AS ORC");
}
}
@Override
public List<String> getCreateTableSqlList(List<ColumnDescription> fieldNames, List<String> primaryKeys,
String schemaName, String tableName, String tableRemarks, boolean autoIncr, Map<String, String> tblProperties) {
List<String> sqlLists = new ArrayList<>();
String tmpTableName = "tmp_" + UuidUtils.generateUuid();
String createTableSql = GenerateSqlUtils.getDDLCreateTableSQL(this, fieldNames, primaryKeys, schemaName,
tmpTableName, true, tableRemarks, autoIncr, tblProperties);
sqlLists.add(createTableSql);
if (HIVE_USE_CTAS) {
String createAsTableSql = String.format("CREATE TABLE `%s`.`%s` STORED AS ORC AS (SELECT * FROM `%s`.`%s`)",
schemaName, tableName, schemaName, tmpTableName);
sqlLists.add(createAsTableSql);
} else {
String createAsTableSql = GenerateSqlUtils.getDDLCreateTableSQL(this, fieldNames, primaryKeys, schemaName,
tableName, true, tableRemarks, autoIncr, null);
sqlLists.add(createAsTableSql);
String selectColumns = fieldNames.stream()
.map(s -> String.format("`%s`", s.getFieldName()))
.collect(Collectors.joining(","));
String insertIntoSql = String.format("INSERT INTO `%s`.`%s` SELECT %s FROM `%s`.`%s`",
schemaName, tableName, selectColumns, schemaName, tmpTableName);
sqlLists.add(insertIntoSql);
}
String dropTmpTableSql = String.format("DROP TABLE IF EXISTS `%s`.`%s`", schemaName, tmpTableName);
sqlLists.add(dropTmpTableSql);
return sqlLists;
}
}

View File

@@ -24,6 +24,7 @@ import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
@@ -353,4 +354,17 @@ public class MysqlMetadataQueryProvider extends AbstractMetadataProvider {
return Collections.emptyList();
}
@Override
public void preAppendCreateTableSql(StringBuilder builder) {
// builder.append( Const.IF_NOT_EXISTS );
}
@Override
public void postAppendCreateTableSql(StringBuilder builder, String tblComment, List<String> primaryKeys,
Map<String, String> tblProperties) {
builder.append("ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin");
if (StringUtils.isNotBlank(tblComment)) {
builder.append(String.format(" COMMENT='%s' ", tblComment.replace("'", "\\'")));
}
}
}

View File

@@ -25,6 +25,7 @@ import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
@@ -359,5 +360,16 @@ public class StarrocksMetadataQueryProvider extends AbstractMetadataProvider {
return Collections.emptyList();
}
@Override
public void appendPrimaryKeyForCreateTableSql(StringBuilder builder, List<String> primaryKeys) {
// StarRocks主键需要在postAppendCreateTableSql函数里组装
}
@Override
public void postAppendCreateTableSql(StringBuilder builder, String tblComment, List<String> primaryKeys,
Map<String, String> tblProperties) {
String pk = getPrimaryKeyAsString(primaryKeys);
builder.append("PRIMARY KEY (").append(pk).append(")");
builder.append("\n DISTRIBUTED BY HASH(").append(pk).append(")");
}
}