From 50ecc06f707e0bfa3a150add2a18d98e2d36fb46 Mon Sep 17 00:00:00 2001 From: Kingkazuma <1991377542@qq.com> Date: Tue, 3 Jun 2025 13:02:55 +0000 Subject: [PATCH] =?UTF-8?q?!242=20feat:=20=E4=BC=98=E5=8C=96starRocks?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E7=9A=84=E6=89=B9=E9=87=8F=E5=86=99=E5=85=A5?= =?UTF-8?q?=20*=20Merge=20branch=20'master'=20of=20gitee.com:dromara/dbswi?= =?UTF-8?q?tch=20into=20zsj=20*=20feat:=20=E4=BC=98=E5=8C=96starRocks?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=86=99=E5=85=A5=EF=BC=8C=E5=B0=86=E5=86=99?= =?UTF-8?q?=E5=85=A5=E6=9B=B4=E6=96=B0=E6=93=8D=E4=BD=9C=E4=BB=8Esql?= =?UTF-8?q?=E6=A8=A1=E5=BC=8F=E5=88=87=E6=8D=A2=E6=88=90stream=20load?= =?UTF-8?q?=E6=A8=A1=E5=BC=8F=EF=BC=8C=E6=8F=90=E9=AB=98=E6=9B=B4=E6=96=B0?= =?UTF-8?q?=E3=80=81=E6=96=B0=E5=A2=9E=E6=95=88=E7=8E=87?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dbswitch-core/pom.xml | 5 + .../dbswitch/product/sr/FrontendEntity.java | 13 ++ .../dbswitch/product/sr/StarRocksUtils.java | 153 ++++++++++++++++-- .../product/sr/StarrocksFactoryProvider.java | 4 +- .../sr/StarrocksMetadataQueryProvider.java | 5 +- .../sr/StarrocksTableDataSynchronizer.java | 108 +++++++++++++ .../sr/StarrocksTableDataWriteProvider.java | 50 ++++++ pom.xml | 6 + 8 files changed, 323 insertions(+), 21 deletions(-) create mode 100644 dbswitch-product/dbswitch-product-starrocks/src/main/java/org/dromara/dbswitch/product/sr/FrontendEntity.java create mode 100644 dbswitch-product/dbswitch-product-starrocks/src/main/java/org/dromara/dbswitch/product/sr/StarrocksTableDataSynchronizer.java create mode 100644 dbswitch-product/dbswitch-product-starrocks/src/main/java/org/dromara/dbswitch/product/sr/StarrocksTableDataWriteProvider.java diff --git a/dbswitch-core/pom.xml b/dbswitch-core/pom.xml index aa353168..41bbf79e 100644 --- a/dbswitch-core/pom.xml +++ b/dbswitch-core/pom.xml @@ -33,6 +33,11 @@ spring-boot-starter-jdbc + + org.apache.httpcomponents + httpclient + + \ No newline at end of file diff --git a/dbswitch-product/dbswitch-product-starrocks/src/main/java/org/dromara/dbswitch/product/sr/FrontendEntity.java b/dbswitch-product/dbswitch-product-starrocks/src/main/java/org/dromara/dbswitch/product/sr/FrontendEntity.java new file mode 100644 index 00000000..5b73d9bd --- /dev/null +++ b/dbswitch-product/dbswitch-product-starrocks/src/main/java/org/dromara/dbswitch/product/sr/FrontendEntity.java @@ -0,0 +1,13 @@ +package org.dromara.dbswitch.product.sr; + + +import lombok.Data; + +@Data +public class FrontendEntity { + String ip; + String httpport; + Boolean alive; + Boolean join; + String role; +} diff --git a/dbswitch-product/dbswitch-product-starrocks/src/main/java/org/dromara/dbswitch/product/sr/StarRocksUtils.java b/dbswitch-product/dbswitch-product-starrocks/src/main/java/org/dromara/dbswitch/product/sr/StarRocksUtils.java index 39d74c2f..319e9623 100644 --- a/dbswitch-product/dbswitch-product-starrocks/src/main/java/org/dromara/dbswitch/product/sr/StarRocksUtils.java +++ b/dbswitch-product/dbswitch-product-starrocks/src/main/java/org/dromara/dbswitch/product/sr/StarRocksUtils.java @@ -9,25 +9,148 @@ ///////////////////////////////////////////////////////////// package org.dromara.dbswitch.product.sr; -import org.dromara.dbswitch.core.provider.meta.MetadataProvider; -import org.dromara.dbswitch.core.schema.ColumnDescription; -import org.dromara.dbswitch.core.util.GenerateSqlUtils; +import cn.hutool.core.bean.BeanUtil; +import cn.hutool.core.util.ReUtil; +import cn.hutool.db.Db; +import cn.hutool.db.Entity; +import cn.hutool.json.JSONArray; +import cn.hutool.json.JSONObject; +import cn.hutool.json.JSONUtil; +import lombok.extern.slf4j.Slf4j; +import org.apache.http.HttpHeaders; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.DefaultRedirectStrategy; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; +import org.dromara.dbswitch.common.entity.CloseableDataSource; -import java.sql.Connection; -import java.util.List; +import javax.sql.DataSource; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.sql.Timestamp; +import java.util.*; +import java.util.stream.Collectors; +@Slf4j public final class StarRocksUtils { - public static String getTableDDL(MetadataProvider provider, Connection connection, String schema, - String table) { - List columnDescriptions = provider.queryTableColumnMeta(connection, schema, table); - List pks = provider.queryTablePrimaryKeys(connection, schema, table); - return GenerateSqlUtils.getDDLCreateTableSQL( - provider, columnDescriptions, pks, schema, table, false); - } - private StarRocksUtils() { - throw new IllegalStateException(); - } + private String indexName; + private volatile String dbName; + private volatile String tbName; + private volatile String host; + private volatile String username; + private volatile String password; + private volatile CloseableDataSource dataSource; + private volatile String httpPort; + + public void init(String schemaName, String tableName, DataSource dataSource) { + this.getHttpPort(dataSource); + this.dataSource = (CloseableDataSource) dataSource; + this.indexName = tableName; + this.host = ReUtil.extractMulti("jdbc:mysql://(.*):[0-9]{2,8}/", this.dataSource.getJdbcUrl(), "$1"); + this.username = this.dataSource.getUserName(); + this.password = this.dataSource.getPassword(); + this.tbName = tableName; + this.dbName = schemaName; + } + + public void getHttpPort(DataSource dataSource) { + Db use = Db.use(dataSource); + try { + List frontends = use.query("SHOW FRONTENDS"); + List frontendEntities = BeanUtil.copyToList(frontends, FrontendEntity.class); + List leader = frontendEntities.stream().filter(i -> i.getRole().equals("LEADER")).collect(Collectors.toList()); + FrontendEntity frontendEntity = leader.get(0); + this.httpPort = frontendEntity.getHttpport(); + } catch (Exception e) { + log.error(e.getMessage()); + } + } + + public long addOrUpdateData(List fieldNames, List recordValues) { + List objectList = asObjectList(fieldNames, recordValues); + JSONArray array = JSONUtil.parseArray(objectList); + JSONObject jsonObject = JSONUtil.createObj() + .set("data", array); + try { + sendData(jsonObject.toString()); + return recordValues.size(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private void sendData(String content) throws Exception { + + final String loadUrl = String.format("http://%s:%s/api/%s/%s/_stream_load", + this.host, + this.httpPort, + this.dbName, + this.tbName); + + final HttpClientBuilder httpClientBuilder = HttpClients + .custom() + .setRedirectStrategy(new DefaultRedirectStrategy() { + @Override + protected boolean isRedirectable(String method) { + return true; + } + }); + + try (CloseableHttpClient client = httpClientBuilder.build()) { + HttpPut put = new HttpPut(loadUrl); + StringEntity entity = new StringEntity(content, "UTF-8"); + put.setHeader(HttpHeaders.EXPECT, "100-continue"); + put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(this.username, this.password)); + put.setHeader("strip_outer_array", "true"); + put.setHeader("format", "JSON"); + put.setHeader("json_root", "$.data"); + put.setHeader("ignore_json_size", "true"); + put.setHeader("Content-Type", "application/json"); + put.setEntity(entity); + try (CloseableHttpResponse response = client.execute(put)) { + String loadResult = ""; + if (response.getEntity() != null) { + loadResult = EntityUtils.toString(response.getEntity()); + } + final int statusCode = response.getStatusLine().getStatusCode(); + // statusCode 200 just indicates that starrocks be service is ok, not stream load + // you should see the output content to find whether stream load is success + if (statusCode != 200) { + throw new IOException( + String.format("Stream load failed, statusCode=%s load result=%s", statusCode, loadResult)); + } + } + } + } + + private String basicAuthHeader(String username, String password) { + final String tobeEncode = username + ":" + password; + byte[] encoded = Base64.getEncoder().encode(tobeEncode.getBytes(StandardCharsets.UTF_8)); + return "Basic " + new String(encoded); + } + + + private List asObjectList(List fieldNames, List recordValues) { + int fieldCount = Math.min(fieldNames.size(), recordValues.get(0).length); + List rows = new ArrayList<>(recordValues.size()); + for (Object[] row : recordValues) { + Map columns = new LinkedHashMap<>(fieldCount); + for (int i = 0; i < fieldCount; ++i) { + Object rowValue = row[i]; + if (row[i] instanceof Timestamp) { + rowValue = String.valueOf(rowValue); + } + columns.put(fieldNames.get(i), rowValue); + } + rows.add(columns); + } + return rows; + } } diff --git a/dbswitch-product/dbswitch-product-starrocks/src/main/java/org/dromara/dbswitch/product/sr/StarrocksFactoryProvider.java b/dbswitch-product/dbswitch-product-starrocks/src/main/java/org/dromara/dbswitch/product/sr/StarrocksFactoryProvider.java index 7184b70d..d7db40cb 100644 --- a/dbswitch-product/dbswitch-product-starrocks/src/main/java/org/dromara/dbswitch/product/sr/StarrocksFactoryProvider.java +++ b/dbswitch-product/dbswitch-product-starrocks/src/main/java/org/dromara/dbswitch/product/sr/StarrocksFactoryProvider.java @@ -39,12 +39,12 @@ public class StarrocksFactoryProvider extends AbstractFactoryProvider { @Override public TableDataWriteProvider createTableDataWriteProvider(boolean useInsert) { - return new AutoCastTableDataWriteProvider(this); + return new StarrocksTableDataWriteProvider(this); } @Override public TableDataSynchronizeProvider createTableDataSynchronizeProvider() { - return new AutoCastTableDataSynchronizeProvider(this); + return new StarrocksTableDataSynchronizer(this); } diff --git a/dbswitch-product/dbswitch-product-starrocks/src/main/java/org/dromara/dbswitch/product/sr/StarrocksMetadataQueryProvider.java b/dbswitch-product/dbswitch-product-starrocks/src/main/java/org/dromara/dbswitch/product/sr/StarrocksMetadataQueryProvider.java index d35df860..f593dc91 100644 --- a/dbswitch-product/dbswitch-product-starrocks/src/main/java/org/dromara/dbswitch/product/sr/StarrocksMetadataQueryProvider.java +++ b/dbswitch-product/dbswitch-product-starrocks/src/main/java/org/dromara/dbswitch/product/sr/StarrocksMetadataQueryProvider.java @@ -323,10 +323,7 @@ public class StarrocksMetadataQueryProvider extends AbstractMetadataProvider { break; case ColumnMetaData.TYPE_STRING: //see: https://docs.starrocks.io/zh/docs/category/string/ - long newLength = length * 3; - if (newLength < 255) { - retval += "VARCHAR(" + newLength + ")"; - } else if (newLength <= 65533) { + if (length <= 65533) { retval += "STRING"; } else if (newLength <= 1048576) { retval += "VARCHAR(" + newLength + ")"; diff --git a/dbswitch-product/dbswitch-product-starrocks/src/main/java/org/dromara/dbswitch/product/sr/StarrocksTableDataSynchronizer.java b/dbswitch-product/dbswitch-product-starrocks/src/main/java/org/dromara/dbswitch/product/sr/StarrocksTableDataSynchronizer.java new file mode 100644 index 00000000..1e49f968 --- /dev/null +++ b/dbswitch-product/dbswitch-product-starrocks/src/main/java/org/dromara/dbswitch/product/sr/StarrocksTableDataSynchronizer.java @@ -0,0 +1,108 @@ +// Copyright tang. All rights reserved. +// https://gitee.com/inrgihc/dbswitch +// +// Use of this source code is governed by a BSD-style license +// +// Author: tang (inrgihc@126.com) +// Date : 2020/1/2 +// Location: beijing , china +///////////////////////////////////////////////////////////// +package org.dromara.dbswitch.product.sr; + + +import org.apache.commons.collections4.CollectionUtils; +import org.dromara.dbswitch.common.entity.CloseableDataSource; +import org.dromara.dbswitch.core.provider.ProductFactoryProvider; +import org.dromara.dbswitch.core.provider.sync.DefaultTableDataSynchronizeProvider; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class StarrocksTableDataSynchronizer extends DefaultTableDataSynchronizeProvider { + + private volatile List fieldNames; + private final CloseableDataSource dataSource; + + private final StarRocksUtils starRocksUtils = new StarRocksUtils(); + + + public StarrocksTableDataSynchronizer(ProductFactoryProvider factoryProvider) { + super(factoryProvider); + dataSource = (CloseableDataSource) factoryProvider.getDataSource(); + } + + @Override + public void prepare(String schemaName, String tableName, List fieldNames, List pks) { + starRocksUtils.init(schemaName, tableName, dataSource); + + this.fieldNames = fieldNames; + + + if (fieldNames.isEmpty() || pks.isEmpty() || fieldNames.size() < pks.size()) { + throw new IllegalArgumentException("字段列表和主键列表不能为空,或者字段总个数应不小于主键总个数"); + } + if (!fieldNames.containsAll(pks)) { + throw new IllegalArgumentException("字段列表必须包含主键列表"); + } + + Map columnType = getTableColumnMetaData(schemaName, tableName, fieldNames); + this.fieldOrders = new ArrayList<>(fieldNames); + this.pksOrders = new ArrayList<>(pks); + + this.insertStatementSql = getInsertPrepareStatementSql(schemaName, tableName, fieldNames); + this.updateStatementSql = getUpdatePrepareStatementSql(schemaName, tableName, fieldNames, pks); + this.deleteStatementSql = getDeletePrepareStatementSql(schemaName, tableName, pks); + + insertArgsType = new int[fieldNames.size()]; + for (int k = 0; k < fieldNames.size(); ++k) { + String field = fieldNames.get(k); + insertArgsType[k] = columnType.get(field); + } + + updateArgsType = new int[fieldNames.size()]; + int idx = 0; + for (int i = 0; i < fieldNames.size(); ++i) { + String field = fieldNames.get(i); + if (!pks.contains(field)) { + updateArgsType[idx++] = columnType.get(field); + } + } + for (String pk : pks) { + updateArgsType[idx++] = columnType.get(pk); + } + + deleteArgsType = new int[pks.size()]; + for (int j = 0; j < pks.size(); ++j) { + String pk = pks.get(j); + deleteArgsType[j] = columnType.get(pk); + } + } + + + @Override + public long executeInsert(List recordValues) { + if (CollectionUtils.isEmpty(fieldNames) || CollectionUtils.isEmpty(recordValues)) { + return 0L; + } + if (CollectionUtils.isEmpty(fieldNames) || CollectionUtils.isEmpty(recordValues)) { + return 0L; + } + + return starRocksUtils.addOrUpdateData(fieldNames, recordValues); + } + + @Override + public long executeUpdate(List recordValues) { + if (CollectionUtils.isEmpty(fieldNames) || CollectionUtils.isEmpty(recordValues)) { + return 0L; + } + if (CollectionUtils.isEmpty(fieldNames) || CollectionUtils.isEmpty(recordValues)) { + return 0L; + } + + return starRocksUtils.addOrUpdateData(fieldNames, recordValues); + } + + +} diff --git a/dbswitch-product/dbswitch-product-starrocks/src/main/java/org/dromara/dbswitch/product/sr/StarrocksTableDataWriteProvider.java b/dbswitch-product/dbswitch-product-starrocks/src/main/java/org/dromara/dbswitch/product/sr/StarrocksTableDataWriteProvider.java new file mode 100644 index 00000000..c2d7daa3 --- /dev/null +++ b/dbswitch-product/dbswitch-product-starrocks/src/main/java/org/dromara/dbswitch/product/sr/StarrocksTableDataWriteProvider.java @@ -0,0 +1,50 @@ +// Copyright tang. All rights reserved. +// https://gitee.com/inrgihc/dbswitch +// +// Use of this source code is governed by a BSD-style license +// +// Author: tang (inrgihc@126.com) +// Date : 2020/1/2 +// Location: beijing , china +///////////////////////////////////////////////////////////// +package org.dromara.dbswitch.product.sr; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.CollectionUtils; +import org.dromara.dbswitch.common.entity.CloseableDataSource; +import org.dromara.dbswitch.core.provider.ProductFactoryProvider; +import org.dromara.dbswitch.core.provider.write.DefaultTableDataWriteProvider; + +import java.util.List; + +@Slf4j +public class StarrocksTableDataWriteProvider extends DefaultTableDataWriteProvider { + + + private final CloseableDataSource dataSource; + private final StarRocksUtils starRocksUtils = new StarRocksUtils(); + ; + + public StarrocksTableDataWriteProvider(ProductFactoryProvider factoryProvider) { + super(factoryProvider); + dataSource = (CloseableDataSource) factoryProvider.getDataSource(); + + } + + @Override + public void prepareWrite(String schemaName, String tableName, List fieldNames) { + starRocksUtils.init(schemaName, tableName, dataSource); + } + + @Override + public long write(List fieldNames, List recordValues) { + if (CollectionUtils.isEmpty(fieldNames) || CollectionUtils.isEmpty(recordValues)) { + return 0L; + } + + return starRocksUtils.addOrUpdateData(fieldNames, recordValues); + + } + + +} diff --git a/pom.xml b/pom.xml index 19798efe..34c516f5 100644 --- a/pom.xml +++ b/pom.xml @@ -79,6 +79,12 @@ 5.7.5 + + org.apache.httpcomponents + httpclient + 4.5.13 + + com.google.guava guava