!179 添加starrocks插件

* 添加starrocks插件
This commit is contained in:
Kingkazuma
2024-05-20 12:29:04 +00:00
committed by inrgihc
parent 2b8af0acf6
commit c8b0b69ce9
14 changed files with 5728 additions and 16720 deletions

File diff suppressed because it is too large Load Diff

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.8 KiB

View File

@@ -23,6 +23,16 @@ import org.apache.commons.lang3.StringUtils;
@AllArgsConstructor
public enum ProductTypeEnum {
/**
* StarRocks数据库类型
*/
STARROCKS(19, "`", "StarRocks", "com.mysql.cj.jdbc.Driver", 9030,
"/* ping */ SELECT 1",
"jdbc:mysql://",
new String[]{"jdbc:mysql://{host}[:{port}]/[{database}][\\?{params}]"},
"jdbc:mysql://127.0.0.1:9030/test?useUnicode=true&characterEncoding=utf-8&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&tinyInt1isBit=false&rewriteBatchedStatements=true&useCompression=true"),
/**
* MySQL数据库类型
*/
@@ -238,6 +248,14 @@ public enum ProductTypeEnum {
return this == POSTGRESQL || this == KINGBASE || this == OPENGAUSS;
}
/**
* 类似于Starrocks系列的数据库类型
*
* @return boolean
*/
public boolean isLikeStarRocks() {
return this == STARROCKS;
}
/**
* 类似于MySQL系列的数据库类型
*

View File

@@ -9,6 +9,7 @@
/////////////////////////////////////////////////////////////
package com.gitee.dbswitch.common.util;
import cn.hutool.log.StaticLog;
import com.gitee.dbswitch.common.type.ProductTypeEnum;
import java.sql.Connection;
import java.sql.PreparedStatement;
@@ -40,6 +41,7 @@ public final class DatabaseAwareUtils {
productNameMap.put("KingbaseES", ProductTypeEnum.KINGBASE);
productNameMap.put("Apache Hive", ProductTypeEnum.HIVE);
productNameMap.put("MySQL", ProductTypeEnum.MYSQL);
// productNameMap.put("StarRocks", ProductTypeEnum.STARROCKS);
productNameMap.put("MariaDB", ProductTypeEnum.MARIADB);
productNameMap.put("Oracle", ProductTypeEnum.ORACLE);
productNameMap.put("PostgreSQL", ProductTypeEnum.POSTGRESQL);
@@ -90,7 +92,16 @@ public final class DatabaseAwareUtils {
}
return productType;
}
boolean haveStarRocks = false;
try{
// 此查询语句是Starrocks查询be节点是否存活可以用来判断是否是Starrocks数据源
haveStarRocks = connection.createStatement().execute("show backends");
}catch (SQLException sqlException){
StaticLog.info("执行show backends失败代表不是MySQL数据源");
}
if (haveStarRocks) {
return ProductTypeEnum.STARROCKS;
}
ProductTypeEnum type = productNameMap.get(productName);
if (null != type) {
return type;

View File

@@ -82,6 +82,23 @@ public final class GenerateSqlUtils {
sb.append(provider.getQuotedSchemaTableCombination(schemaName, tableName));
sb.append("(");
// starrocks 当中,字段主键的情况下,必须将字段放在最前面,并且顺序一致。
if (type.isLikeStarRocks()) {
List<ColumnDescription> copyFieldNames = new ArrayList<>();
Integer fieldIndex = 0;
for (int i = 0; i < fieldNames.size(); i++) {
ColumnDescription cd = fieldNames.get(i);
if (primaryKeys.contains(cd.getFieldName())){
copyFieldNames.add(fieldIndex,cd);
fieldIndex = fieldIndex +1;
}else{
copyFieldNames.add(cd);
}
}
fieldNames = copyFieldNames;
}
for (int i = 0; i < fieldNames.size(); i++) {
if (i > 0) {
sb.append(", ");
@@ -93,7 +110,7 @@ public final class GenerateSqlUtils {
sb.append(provider.getFieldDefinition(v, pks, autoIncr, false, withRemarks));
}
if (!pks.isEmpty() && !type.isLikeHive()) {
if (!pks.isEmpty() && !type.isLikeHive() && !type.isLikeStarRocks()) {
String pk = provider.getPrimaryKeyAsString(pks);
sb.append(", PRIMARY KEY (").append(pk).append(")");
}
@@ -133,6 +150,10 @@ public final class GenerateSqlUtils {
//sb.append(Constants.CR);
//sb.append(String.format("COMMENT='%s' ", tableRemarks.replace("'", "\\'")));
}
}else if (type.isLikeStarRocks()){
String pk = provider.getPrimaryKeyAsString(pks);
sb.append("PRIMARY KEY (").append(pk).append(")");
sb.append("\n DISTRIBUTED BY HASH(").append(pk).append(")");
}
return DDLFormatterUtils.format(sb.toString());

View File

@@ -0,0 +1,28 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dbswitch-product</artifactId>
<groupId>com.gitee.dbswitch</groupId>
<version>1.9.7</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dbswitch-product-starrocks</artifactId>
<dependencies>
<dependency>
<groupId>com.gitee.dbswitch</groupId>
<artifactId>dbswitch-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.gitee.dbswitch</groupId>
<artifactId>dbswitch-core</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,33 @@
// Copyright tang. All rights reserved.
// https://gitee.com/inrgihc/dbswitch
//
// Use of this source code is governed by a BSD-style license
//
// Author: tang (inrgihc@126.com)
// Date : 2020/1/2
// Location: beijing , china
/////////////////////////////////////////////////////////////
package com.gitee.dbswitch.product.sr;
import com.gitee.dbswitch.provider.meta.MetadataProvider;
import com.gitee.dbswitch.schema.ColumnDescription;
import com.gitee.dbswitch.util.GenerateSqlUtils;
import java.sql.Connection;
import java.util.List;
public final class StarRocksUtils {
public static String getTableDDL(MetadataProvider provider, Connection connection, String schema,
String table) {
List<ColumnDescription> columnDescriptions = provider.queryTableColumnMeta(connection, schema, table);
List<String> pks = provider.queryTablePrimaryKeys(connection, schema, table);
return GenerateSqlUtils.getDDLCreateTableSQL(
provider, columnDescriptions, pks, schema, table, false);
}
private StarRocksUtils() {
throw new IllegalStateException();
}
}

View File

@@ -0,0 +1,55 @@
// Copyright tang. All rights reserved.
// https://gitee.com/inrgihc/dbswitch
//
// Use of this source code is governed by a BSD-style license
//
// Author: tang (inrgihc@126.com)
// Date : 2020/1/2
// Location: beijing , china
/////////////////////////////////////////////////////////////
package com.gitee.dbswitch.product.sr;
import com.gitee.dbswitch.annotation.Product;
import com.gitee.dbswitch.common.type.ProductTypeEnum;
import com.gitee.dbswitch.common.util.DDLFormatterUtils;
import com.gitee.dbswitch.features.ProductFeatures;
import com.gitee.dbswitch.provider.AbstractFactoryProvider;
import com.gitee.dbswitch.provider.meta.MetadataProvider;
import com.gitee.dbswitch.provider.sync.AutoCastTableDataSynchronizeProvider;
import com.gitee.dbswitch.provider.sync.TableDataSynchronizeProvider;
import com.gitee.dbswitch.provider.write.AutoCastTableDataWriteProvider;
import com.gitee.dbswitch.provider.write.TableDataWriteProvider;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
@Product(ProductTypeEnum.STARROCKS)
public class StarrocksFactoryProvider extends AbstractFactoryProvider {
public StarrocksFactoryProvider(DataSource dataSource) {
super(dataSource);
}
public ProductFeatures getProductFeatures() {
return new StarrocksFeatures();
}
@Override
public MetadataProvider createMetadataQueryProvider() {
return new StarrocksMetadataQueryProvider(this);
}
@Override
public TableDataWriteProvider createTableDataWriteProvider(boolean useInsert) {
return new AutoCastTableDataWriteProvider(this);
}
@Override
public TableDataSynchronizeProvider createTableDataSynchronizeProvider() {
return new AutoCastTableDataSynchronizeProvider(this);
}
}

View File

@@ -0,0 +1,20 @@
// Copyright tang. All rights reserved.
// https://gitee.com/inrgihc/dbswitch
//
// Use of this source code is governed by a BSD-style license
//
// Author: tang (inrgihc@126.com)
// Date : 2020/1/2
// Location: beijing , china
/////////////////////////////////////////////////////////////
package com.gitee.dbswitch.product.sr;
import com.gitee.dbswitch.features.ProductFeatures;
public class StarrocksFeatures implements ProductFeatures {
public int convertFetchSize(int fetchSize) {
return Integer.MIN_VALUE;
}
}

View File

@@ -0,0 +1,361 @@
// Copyright tang. All rights reserved.
// https://gitee.com/inrgihc/dbswitch
//
// Use of this source code is governed by a BSD-style license
//
// Author: tang (inrgihc@126.com)
// Date : 2020/1/2
// Location: beijing , china
/////////////////////////////////////////////////////////////
package com.gitee.dbswitch.product.sr;
import com.gitee.dbswitch.common.consts.Constants;
import com.gitee.dbswitch.common.type.ProductTypeEnum;
import com.gitee.dbswitch.common.util.DDLFormatterUtils;
import com.gitee.dbswitch.provider.ProductFactoryProvider;
import com.gitee.dbswitch.provider.meta.AbstractMetadataProvider;
import com.gitee.dbswitch.schema.ColumnDescription;
import com.gitee.dbswitch.schema.ColumnMetaData;
import com.gitee.dbswitch.schema.IndexDescription;
import com.gitee.dbswitch.schema.TableDescription;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
@Slf4j
public class StarrocksMetadataQueryProvider extends AbstractMetadataProvider {
private static final String SHOW_CREATE_TABLE_SQL = "SHOW CREATE TABLE `%s`.`%s` ";
private static final String SHOW_CREATE_VIEW_SQL = "SHOW CREATE VIEW `%s`.`%s` ";
private static final String QUERY_TABLE_LIST_SQL =
"SELECT `TABLE_SCHEMA`,`TABLE_NAME`,`TABLE_TYPE`,`TABLE_COMMENT` "
+ "FROM `information_schema`.`TABLES` WHERE `TABLE_SCHEMA`= ? ";
private static final String QUERY_TABLE_METADATA_SQL =
"SELECT `TABLE_COMMENT`,`TABLE_TYPE` FROM `information_schema`.`TABLES` "
+ "WHERE `TABLE_SCHEMA` = ? AND `TABLE_NAME` = ?";
public StarrocksMetadataQueryProvider(ProductFactoryProvider factoryProvider) {
super(factoryProvider);
}
@Override
public List<String> querySchemaList(Connection connection) {
List<String> result = new ArrayList<>();
try (ResultSet rs = connection.getMetaData().getCatalogs()) {
while (rs.next()) {
Optional.ofNullable(rs.getString(1)).ifPresent(result::add);
}
return result.stream().distinct().collect(Collectors.toList());
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
@Override
public List<TableDescription> queryTableList(Connection connection, String schemaName) {
List<TableDescription> result = new ArrayList<>();
try (PreparedStatement ps = connection.prepareStatement(QUERY_TABLE_LIST_SQL)) {
ps.setString(1, schemaName);
try (ResultSet rs = ps.executeQuery();) {
while (rs.next()) {
TableDescription td = new TableDescription();
td.setSchemaName(rs.getString("TABLE_SCHEMA"));
td.setTableName(rs.getString("TABLE_NAME"));
td.setRemarks(rs.getString("TABLE_COMMENT"));
String tableType = rs.getString("TABLE_TYPE");
if (tableType.equalsIgnoreCase("VIEW")) {
td.setTableType("VIEW");
} else {
td.setTableType("TABLE");
}
result.add(td);
}
return result;
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
@Override
public TableDescription queryTableMeta(Connection connection, String schemaName, String tableName) {
try (PreparedStatement ps = connection.prepareStatement(QUERY_TABLE_METADATA_SQL)) {
ps.setString(1, schemaName);
ps.setString(2, tableName);
try (ResultSet rs = ps.executeQuery();) {
while (rs.next()) {
TableDescription td = new TableDescription();
td.setSchemaName(schemaName);
td.setTableName(tableName);
td.setRemarks(rs.getString(1));
String tableType = rs.getString(2);
if (tableType.equalsIgnoreCase("VIEW")) {
td.setTableType("VIEW");
} else {
td.setTableType("TABLE");
}
return td;
}
return null;
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
@Override
public List<String> queryTableColumnName(Connection connection, String schemaName, String tableName) {
List<String> columns = new ArrayList<>();
try (ResultSet rs = connection.getMetaData()
.getColumns(schemaName, null, tableName, null)) {
while (rs.next()) {
columns.add(rs.getString("COLUMN_NAME"));
}
return columns.stream().distinct().collect(Collectors.toList());
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
@Override
public List<ColumnDescription> queryTableColumnMeta(Connection connection, String schemaName,
String tableName) {
String sql = this.getTableFieldsQuerySQL(schemaName, tableName);
List<ColumnDescription> ret = this.querySelectSqlColumnMeta(connection, sql);
// 补充一下注释信息
try (ResultSet columns = connection.getMetaData()
.getColumns(schemaName, null, tableName, null)) {
while (columns.next()) {
String columnName = columns.getString("COLUMN_NAME");
String remarks = columns.getString("REMARKS");
String columnDefault = columns.getString("COLUMN_DEF");
for (ColumnDescription cd : ret) {
if (columnName.equals(cd.getFieldName())) {
cd.setRemarks(remarks);
cd.setProductType(ProductTypeEnum.STARROCKS);
// 补充默认值信息
cd.setDefaultValue(columnDefault);
}
}
}
return ret;
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
@Override
public List<String> queryTablePrimaryKeys(Connection connection, String schemaName, String tableName) {
List<String> ret = new ArrayList<>();
try (ResultSet primaryKeys = connection.getMetaData()
.getPrimaryKeys(schemaName, null, tableName)) {
while (primaryKeys.next()) {
ret.add(primaryKeys.getString("COLUMN_NAME"));
}
return ret.stream().distinct().collect(Collectors.toList());
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
@Override
public synchronized List<IndexDescription> queryTableIndexes(Connection connection, String schemaName,
String tableName) {
setCatalogName(schemaName);
return super.queryTableIndexes(connection, schemaName, tableName);
}
@Override
public String getTableDDL(Connection connection, String schemaName, String tableName) {
List<String> result = new ArrayList<>();
try (Statement st = connection.createStatement()) {
if (st.execute(String.format(SHOW_CREATE_TABLE_SQL, schemaName, tableName))) {
try (ResultSet rs = st.getResultSet()) {
if (rs != null) {
while (rs.next()) {
String value = rs.getString(2);
Optional.ofNullable(value).ifPresent(result::add);
}
}
}
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
return result.stream().findAny().orElse(null);
}
@Override
public String getViewDDL(Connection connection, String schemaName, String tableName) {
List<String> result = new ArrayList<>();
try (Statement st = connection.createStatement()) {
if (st.execute(String.format(SHOW_CREATE_VIEW_SQL, schemaName, tableName))) {
try (ResultSet rs = st.getResultSet()) {
if (rs != null) {
while (rs.next()) {
String value = rs.getString(2);
Optional.ofNullable(value).ifPresent(result::add);
}
}
}
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
return result.stream().findAny().orElse(null);
}
@Override
public List<ColumnDescription> querySelectSqlColumnMeta(Connection connection, String sql) {
String querySQL = String.format(" %s LIMIT 0,1", sql.replace(";", ""));
return this.getSelectSqlColumnMeta(connection, querySQL);
}
@Override
public void testQuerySQL(Connection connection, String sql) {
String testQuerySql = String.format("explain %s", sql.replace(";", ""));
if (log.isDebugEnabled()) {
log.debug("Execute sql :{}", testQuerySql);
}
try (Statement st = connection.createStatement()) {
st.execute(testQuerySql);
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
@Override
public String getFieldDefinition(ColumnMetaData v, List<String> pks, boolean useAutoInc,
boolean addCr, boolean withRemarks) {
String fieldname = v.getName();
int length = v.getLength();
int precision = v.getPrecision();
int type = v.getType();
String retval = " `" + fieldname + "` ";
switch (type) {
case ColumnMetaData.TYPE_TIMESTAMP:
retval += "DATETIME";
break;
case ColumnMetaData.TYPE_TIME:
retval += "TIME";
break;
case ColumnMetaData.TYPE_DATE:
retval += "DATE";
break;
case ColumnMetaData.TYPE_BOOLEAN:
retval += "TINYINT";
break;
case ColumnMetaData.TYPE_NUMBER:
case ColumnMetaData.TYPE_INTEGER:
case ColumnMetaData.TYPE_BIGNUMBER:
if (null != pks && !pks.isEmpty() && pks.contains(fieldname)) {
if (useAutoInc) {
retval += "BIGINT AUTO_INCREMENT NOT NULL";
} else {
retval += "BIGINT NOT NULL";
}
} else {
// Integer values...
if (precision == 0) {
if (length > 9) {
if (length < 19) {
// can hold signed values between -9223372036854775808 and 9223372036854775807
// 18 significant digits
retval += "BIGINT";
} else {
retval += "DECIMAL(" + length + ")";
}
} else {
retval += "INT";
}
} else {
// Floating point values...
if (length > 15) {
retval += "DECIMAL(" + length;
if (precision > 0) {
retval += ", " + precision;
}
retval += ")";
} else {
// A double-precision floating-point number is accurate to approximately 15
// decimal places.
// http://mysql.mirrors-r-us.net/doc/refman/5.1/en/numeric-type-overview.html
retval += "DOUBLE";
}
}
}
break;
case ColumnMetaData.TYPE_STRING:
if (length > 0) {
if (length == 1) {
retval += "CHAR(1)";
} else if (length < 256) {
retval += "VARCHAR(" + length*2 + ")";
} else if (null != pks && !pks.isEmpty() && pks.contains(fieldname)) {
/*
* MySQL5.6中varchar字段为主键时最大长度为254,例如如下的建表语句在MySQL5.7下能通过但在MySQL5.6下无法通过:
* create table `t_test`(
* `key` varchar(1024) binary,
* `val` varchar(1024) binary,
* primary key(`key`)
* );
*/
retval += "VARCHAR(254) BINARY";
} else if (length < 65536) {
retval += "TEXT";
} else if (length < 16777216) {
retval += "MEDIUMTEXT";
} else {
retval += "LONGTEXT";
}
} else {
retval += "TINYTEXT";
}
break;
case ColumnMetaData.TYPE_BINARY:
retval += "LONGBLOB";
break;
default:
retval += "LONGTEXT";
break;
}
if (withRemarks && StringUtils.isNotBlank(v.getRemarks())) {
retval += String.format(" COMMENT '%s' ", v.getRemarks().replace("'", "\\'"));
}
if (addCr) {
retval += Constants.CR;
}
return retval;
}
@Override
public List<String> getTableColumnCommentDefinition(TableDescription td, List<ColumnDescription> cds) {
return Collections.emptyList();
}
}

View File

@@ -0,0 +1 @@
com.gitee.dbswitch.product.sr.StarrocksFactoryProvider

View File

@@ -12,6 +12,11 @@
<artifactId>dbswitch-register-product</artifactId>
<dependencies>
<dependency>
<groupId>com.gitee.dbswitch</groupId>
<artifactId>dbswitch-product-starrocks</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.gitee.dbswitch</groupId>
<artifactId>dbswitch-product-mysql</artifactId>

View File

@@ -13,6 +13,7 @@
<modules>
<module>dbswitch-product-mysql</module>
<module>dbswitch-product-starrocks</module>
<module>dbswitch-product-oracle</module>
<module>dbswitch-product-postgresql</module>
<module>dbswitch-product-sqlserver</module>