调整sr写入逻辑

This commit is contained in:
inrgihc
2025-06-08 21:52:38 +08:00
parent 50ecc06f70
commit 10e98b91aa
6 changed files with 208 additions and 246 deletions

View File

@@ -1,13 +1,22 @@
// Copyright tang. All rights reserved.
// https://gitee.com/inrgihc/dbswitch
//
// Use of this source code is governed by a BSD-style license
//
// Author: tang (inrgihc@126.com)
// Date : 2020/1/2
// Location: beijing , china
/////////////////////////////////////////////////////////////
package org.dromara.dbswitch.product.sr; package org.dromara.dbswitch.product.sr;
import lombok.Data; import lombok.Data;
@Data @Data
public class FrontendEntity { public class FrontendEntity {
String ip;
String httpport; private String ip;
Boolean alive; private String httpport;
Boolean join; private Boolean alive;
String role; private Boolean join;
private String role;
} }

View File

@@ -16,7 +16,18 @@ import cn.hutool.db.Entity;
import cn.hutool.json.JSONArray; import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONObject; import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil; import cn.hutool.json.JSONUtil;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Base64;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.sql.DataSource;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.http.HttpHeaders; import org.apache.http.HttpHeaders;
import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPut; import org.apache.http.client.methods.HttpPut;
@@ -28,129 +39,118 @@ import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils; import org.apache.http.util.EntityUtils;
import org.dromara.dbswitch.common.entity.CloseableDataSource; import org.dromara.dbswitch.common.entity.CloseableDataSource;
import javax.sql.DataSource;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
import java.util.*;
import java.util.stream.Collectors;
@Slf4j @Slf4j
public final class StarRocksUtils { public final class StarRocksUtils {
private String dbName;
private String tbName;
private String host;
private String username;
private String password;
private CloseableDataSource dataSource;
private String httpPort;
private String indexName; public void init(String schemaName, String tableName, DataSource dataSource) {
private volatile String dbName; this.getHttpPort(dataSource);
private volatile String tbName; this.dataSource = (CloseableDataSource) dataSource;
private volatile String host; this.host = ReUtil.extractMulti("jdbc:mysql://(.*):[0-9]{2,8}/", this.dataSource.getJdbcUrl(), "$1");
private volatile String username; this.username = this.dataSource.getUserName();
private volatile String password; this.password = this.dataSource.getPassword();
private volatile CloseableDataSource dataSource; this.tbName = tableName;
private volatile String httpPort; this.dbName = schemaName;
}
public void getHttpPort(DataSource dataSource) {
public void init(String schemaName, String tableName, DataSource dataSource) { Db use = Db.use(dataSource);
this.getHttpPort(dataSource); try {
this.dataSource = (CloseableDataSource) dataSource; List<Entity> frontends = use.query("SHOW FRONTENDS");
this.indexName = tableName; List<FrontendEntity> frontendEntities = BeanUtil.copyToList(frontends, FrontendEntity.class);
this.host = ReUtil.extractMulti("jdbc:mysql://(.*):[0-9]{2,8}/", this.dataSource.getJdbcUrl(), "$1"); List<FrontendEntity> leader = frontendEntities.stream().filter(i -> i.getRole().equals("LEADER"))
this.username = this.dataSource.getUserName(); .collect(Collectors.toList());
this.password = this.dataSource.getPassword(); FrontendEntity frontendEntity = leader.get(0);
this.tbName = tableName; this.httpPort = frontendEntity.getHttpport();
this.dbName = schemaName; } catch (Exception e) {
throw new RuntimeException(e);
} }
}
public void getHttpPort(DataSource dataSource) { public long addOrUpdateData(List<String> fieldNames, List<Object[]> recordValues) {
Db use = Db.use(dataSource); if (CollectionUtils.isEmpty(fieldNames) || CollectionUtils.isEmpty(recordValues)) {
try { return 0L;
List<Entity> frontends = use.query("SHOW FRONTENDS"); }
List<FrontendEntity> frontendEntities = BeanUtil.copyToList(frontends, FrontendEntity.class); List<Object> objectList = asObjectList(fieldNames, recordValues);
List<FrontendEntity> leader = frontendEntities.stream().filter(i -> i.getRole().equals("LEADER")).collect(Collectors.toList()); JSONArray array = JSONUtil.parseArray(objectList);
FrontendEntity frontendEntity = leader.get(0); JSONObject jsonObject = JSONUtil.createObj().set("data", array);
this.httpPort = frontendEntity.getHttpport(); try {
} catch (Exception e) { sendData(jsonObject.toString());
log.error(e.getMessage()); return recordValues.size();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private void sendData(String content) throws Exception {
final String loadUrl = String.format("http://%s:%s/api/%s/%s/_stream_load",
this.host,
this.httpPort,
this.dbName,
this.tbName);
final HttpClientBuilder httpClientBuilder = HttpClients
.custom()
.setRedirectStrategy(new DefaultRedirectStrategy() {
@Override
protected boolean isRedirectable(String method) {
return true;
}
});
try (CloseableHttpClient client = httpClientBuilder.build()) {
HttpPut put = new HttpPut(loadUrl);
StringEntity entity = new StringEntity(content, "UTF-8");
put.setHeader(HttpHeaders.EXPECT, "100-continue");
put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(this.username, this.password));
put.setHeader("strip_outer_array", "true");
put.setHeader("format", "JSON");
put.setHeader("json_root", "$.data");
put.setHeader("ignore_json_size", "true");
put.setHeader("Content-Type", "application/json");
put.setEntity(entity);
try (CloseableHttpResponse response = client.execute(put)) {
String loadResult = "";
if (response.getEntity() != null) {
loadResult = EntityUtils.toString(response.getEntity());
} }
} final int statusCode = response.getStatusLine().getStatusCode();
// statusCode 200 just indicates that starrocks be service is ok, not stream load
public long addOrUpdateData(List<String> fieldNames, List<Object[]> recordValues) { // you should see the output content to find whether stream load is success
List<Object> objectList = asObjectList(fieldNames, recordValues); if (statusCode != 200) {
JSONArray array = JSONUtil.parseArray(objectList); throw new IOException(
JSONObject jsonObject = JSONUtil.createObj() String.format("Stream load failed, statusCode=%s load result=%s", statusCode, loadResult));
.set("data", array);
try {
sendData(jsonObject.toString());
return recordValues.size();
} catch (Exception e) {
throw new RuntimeException(e);
} }
}
} }
}
private void sendData(String content) throws Exception { private String basicAuthHeader(String username, String password) {
final String tobeEncode = username + ":" + password;
byte[] encoded = Base64.getEncoder().encode(tobeEncode.getBytes(StandardCharsets.UTF_8));
return "Basic " + new String(encoded);
}
final String loadUrl = String.format("http://%s:%s/api/%s/%s/_stream_load", private List<Object> asObjectList(List<String> fieldNames, List<Object[]> recordValues) {
this.host, int fieldCount = Math.min(fieldNames.size(), recordValues.get(0).length);
this.httpPort, List<Object> rows = new ArrayList<>(recordValues.size());
this.dbName, for (Object[] row : recordValues) {
this.tbName); Map<String, Object> columns = new LinkedHashMap<>(fieldCount);
for (int i = 0; i < fieldCount; ++i) {
final HttpClientBuilder httpClientBuilder = HttpClients Object rowValue = row[i];
.custom() if (row[i] instanceof Timestamp) {
.setRedirectStrategy(new DefaultRedirectStrategy() { rowValue = String.valueOf(rowValue);
@Override
protected boolean isRedirectable(String method) {
return true;
}
});
try (CloseableHttpClient client = httpClientBuilder.build()) {
HttpPut put = new HttpPut(loadUrl);
StringEntity entity = new StringEntity(content, "UTF-8");
put.setHeader(HttpHeaders.EXPECT, "100-continue");
put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(this.username, this.password));
put.setHeader("strip_outer_array", "true");
put.setHeader("format", "JSON");
put.setHeader("json_root", "$.data");
put.setHeader("ignore_json_size", "true");
put.setHeader("Content-Type", "application/json");
put.setEntity(entity);
try (CloseableHttpResponse response = client.execute(put)) {
String loadResult = "";
if (response.getEntity() != null) {
loadResult = EntityUtils.toString(response.getEntity());
}
final int statusCode = response.getStatusLine().getStatusCode();
// statusCode 200 just indicates that starrocks be service is ok, not stream load
// you should see the output content to find whether stream load is success
if (statusCode != 200) {
throw new IOException(
String.format("Stream load failed, statusCode=%s load result=%s", statusCode, loadResult));
}
}
} }
columns.put(fieldNames.get(i), rowValue);
}
rows.add(columns);
} }
return rows;
private String basicAuthHeader(String username, String password) { }
final String tobeEncode = username + ":" + password;
byte[] encoded = Base64.getEncoder().encode(tobeEncode.getBytes(StandardCharsets.UTF_8));
return "Basic " + new String(encoded);
}
private List<Object> asObjectList(List<String> fieldNames, List<Object[]> recordValues) {
int fieldCount = Math.min(fieldNames.size(), recordValues.get(0).length);
List<Object> rows = new ArrayList<>(recordValues.size());
for (Object[] row : recordValues) {
Map<String, Object> columns = new LinkedHashMap<>(fieldCount);
for (int i = 0; i < fieldCount; ++i) {
Object rowValue = row[i];
if (row[i] instanceof Timestamp) {
rowValue = String.valueOf(rowValue);
}
columns.put(fieldNames.get(i), rowValue);
}
rows.add(columns);
}
return rows;
}
} }

View File

@@ -9,16 +9,14 @@
///////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////
package org.dromara.dbswitch.product.sr; package org.dromara.dbswitch.product.sr;
import org.dromara.dbswitch.core.annotation.Product; import javax.sql.DataSource;
import org.dromara.dbswitch.common.type.ProductTypeEnum; import org.dromara.dbswitch.common.type.ProductTypeEnum;
import org.dromara.dbswitch.core.annotation.Product;
import org.dromara.dbswitch.core.features.ProductFeatures; import org.dromara.dbswitch.core.features.ProductFeatures;
import org.dromara.dbswitch.core.provider.AbstractFactoryProvider; import org.dromara.dbswitch.core.provider.AbstractFactoryProvider;
import org.dromara.dbswitch.core.provider.meta.MetadataProvider; import org.dromara.dbswitch.core.provider.meta.MetadataProvider;
import org.dromara.dbswitch.core.provider.sync.AutoCastTableDataSynchronizeProvider;
import org.dromara.dbswitch.core.provider.sync.TableDataSynchronizeProvider; import org.dromara.dbswitch.core.provider.sync.TableDataSynchronizeProvider;
import org.dromara.dbswitch.core.provider.write.AutoCastTableDataWriteProvider;
import org.dromara.dbswitch.core.provider.write.TableDataWriteProvider; import org.dromara.dbswitch.core.provider.write.TableDataWriteProvider;
import javax.sql.DataSource;
@Product(ProductTypeEnum.STARROCKS) @Product(ProductTypeEnum.STARROCKS)
public class StarrocksFactoryProvider extends AbstractFactoryProvider { public class StarrocksFactoryProvider extends AbstractFactoryProvider {
@@ -47,5 +45,4 @@ public class StarrocksFactoryProvider extends AbstractFactoryProvider {
return new StarrocksTableDataSynchronizer(this); return new StarrocksTableDataSynchronizer(this);
} }
} }

View File

@@ -48,6 +48,11 @@ public class StarrocksMetadataQueryProvider extends AbstractMetadataProvider {
private static final String QUERY_TABLE_METADATA_SQL = private static final String QUERY_TABLE_METADATA_SQL =
"SELECT `TABLE_COMMENT`,`TABLE_TYPE` FROM `information_schema`.`TABLES` " "SELECT `TABLE_COMMENT`,`TABLE_TYPE` FROM `information_schema`.`TABLES` "
+ "WHERE `TABLE_SCHEMA` = ? AND `TABLE_NAME` = ?"; + "WHERE `TABLE_SCHEMA` = ? AND `TABLE_NAME` = ?";
private static final String QUERY_TABLE_PRIMARY_KEY_SQL = " SELECT COLUMN_NAME \n" +
" from information_schema.columns\n" +
" where TABLE_SCHEMA=? and TABLE_NAME=?\n" +
" and TABLE_CATALOG is null\n" +
" and COLUMN_KEY = 'PRI'";
public StarrocksMetadataQueryProvider(ProductFactoryProvider factoryProvider) { public StarrocksMetadataQueryProvider(ProductFactoryProvider factoryProvider) {
super(factoryProvider); super(factoryProvider);
@@ -166,21 +171,16 @@ public class StarrocksMetadataQueryProvider extends AbstractMetadataProvider {
@Override @Override
public List<String> queryTablePrimaryKeys(Connection connection, String schemaName, String tableName) { public List<String> queryTablePrimaryKeys(Connection connection, String schemaName, String tableName) {
List<String> ret = new ArrayList<>(); try (PreparedStatement statement = connection.prepareStatement(QUERY_TABLE_PRIMARY_KEY_SQL)) {
try { statement.setString(1, schemaName);
Statement statement = connection.createStatement(); statement.setString(2, tableName);
String sql = String.format(" SELECT * \n" + try (ResultSet primaryKeys = statement.executeQuery()) {
" from information_schema.columns\n" + List<String> ret = new ArrayList<>();
" where TABLE_SCHEMA=\"%s\" and TABLE_NAME=\"%s\"\n" + while (primaryKeys.next()) {
" and TABLE_CATALOG is null\n" + ret.add(primaryKeys.getString(1));
" and COLUMN_KEY = 'PRI';", }
schemaName, tableName return ret.stream().distinct().collect(Collectors.toList());
);
ResultSet primaryKeys = statement.executeQuery(sql);
while (primaryKeys.next()) {
ret.add(primaryKeys.getString("COLUMN_NAME"));
} }
return ret.stream().distinct().collect(Collectors.toList());
} catch (SQLException e) { } catch (SQLException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
@@ -323,7 +323,10 @@ public class StarrocksMetadataQueryProvider extends AbstractMetadataProvider {
break; break;
case ColumnMetaData.TYPE_STRING: case ColumnMetaData.TYPE_STRING:
//see: https://docs.starrocks.io/zh/docs/category/string/ //see: https://docs.starrocks.io/zh/docs/category/string/
if (length <= 65533) { long newLength = length * 3;
if (newLength < 255) {
retval += "VARCHAR(" + newLength + ")";
} else if (newLength <= 65533) {
retval += "STRING"; retval += "STRING";
} else if (newLength <= 1048576) { } else if (newLength <= 1048576) {
retval += "VARCHAR(" + newLength + ")"; retval += "VARCHAR(" + newLength + ")";

View File

@@ -9,100 +9,54 @@
///////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////
package org.dromara.dbswitch.product.sr; package org.dromara.dbswitch.product.sr;
import java.util.List;
import org.apache.commons.collections4.CollectionUtils; import lombok.extern.slf4j.Slf4j;
import org.dromara.dbswitch.common.entity.CloseableDataSource; import org.dromara.dbswitch.common.entity.CloseableDataSource;
import org.dromara.dbswitch.core.provider.ProductFactoryProvider; import org.dromara.dbswitch.core.provider.ProductFactoryProvider;
import org.dromara.dbswitch.core.provider.sync.DefaultTableDataSynchronizeProvider; import org.dromara.dbswitch.core.provider.sync.AutoCastTableDataSynchronizeProvider;
import java.util.ArrayList; @Slf4j
import java.util.List; public class StarrocksTableDataSynchronizer extends AutoCastTableDataSynchronizeProvider {
import java.util.Map;
public class StarrocksTableDataSynchronizer extends DefaultTableDataSynchronizeProvider { private List<String> fieldNames;
private final CloseableDataSource dataSource;
private volatile List<String> fieldNames; private final StarRocksUtils starRocksUtils = new StarRocksUtils();
private final CloseableDataSource dataSource;
private final StarRocksUtils starRocksUtils = new StarRocksUtils(); public StarrocksTableDataSynchronizer(ProductFactoryProvider factoryProvider) {
super(factoryProvider);
dataSource = (CloseableDataSource) factoryProvider.getDataSource();
}
@Override
public StarrocksTableDataSynchronizer(ProductFactoryProvider factoryProvider) { public void prepare(String schemaName, String tableName, List<String> fieldNames, List<String> pks) {
super(factoryProvider); this.fieldNames = fieldNames;
dataSource = (CloseableDataSource) factoryProvider.getDataSource(); super.prepare(schemaName, tableName, fieldNames, pks);
try {
starRocksUtils.init(schemaName, tableName, dataSource);
} catch (Exception e) {
log.warn("Failed to init by StarRocksUtils#init(),information:: {}", e.getMessage());
} }
}
@Override @Override
public void prepare(String schemaName, String tableName, List<String> fieldNames, List<String> pks) { public long executeInsert(List<Object[]> recordValues) {
starRocksUtils.init(schemaName, tableName, dataSource); try {
return starRocksUtils.addOrUpdateData(fieldNames, recordValues);
this.fieldNames = fieldNames; } catch (Exception e) {
log.warn("Failed to addOrUpdateData by StarRocksUtils#addOrUpdateData(),information:: {}", e.getMessage());
return super.executeInsert(recordValues);
if (fieldNames.isEmpty() || pks.isEmpty() || fieldNames.size() < pks.size()) {
throw new IllegalArgumentException("字段列表和主键列表不能为空,或者字段总个数应不小于主键总个数");
}
if (!fieldNames.containsAll(pks)) {
throw new IllegalArgumentException("字段列表必须包含主键列表");
}
Map<String, Integer> columnType = getTableColumnMetaData(schemaName, tableName, fieldNames);
this.fieldOrders = new ArrayList<>(fieldNames);
this.pksOrders = new ArrayList<>(pks);
this.insertStatementSql = getInsertPrepareStatementSql(schemaName, tableName, fieldNames);
this.updateStatementSql = getUpdatePrepareStatementSql(schemaName, tableName, fieldNames, pks);
this.deleteStatementSql = getDeletePrepareStatementSql(schemaName, tableName, pks);
insertArgsType = new int[fieldNames.size()];
for (int k = 0; k < fieldNames.size(); ++k) {
String field = fieldNames.get(k);
insertArgsType[k] = columnType.get(field);
}
updateArgsType = new int[fieldNames.size()];
int idx = 0;
for (int i = 0; i < fieldNames.size(); ++i) {
String field = fieldNames.get(i);
if (!pks.contains(field)) {
updateArgsType[idx++] = columnType.get(field);
}
}
for (String pk : pks) {
updateArgsType[idx++] = columnType.get(pk);
}
deleteArgsType = new int[pks.size()];
for (int j = 0; j < pks.size(); ++j) {
String pk = pks.get(j);
deleteArgsType[j] = columnType.get(pk);
}
} }
}
@Override
@Override public long executeUpdate(List<Object[]> recordValues) {
public long executeInsert(List<Object[]> recordValues) { try {
if (CollectionUtils.isEmpty(fieldNames) || CollectionUtils.isEmpty(recordValues)) { return starRocksUtils.addOrUpdateData(fieldNames, recordValues);
return 0L; } catch (Exception e) {
} log.warn("Failed to addOrUpdateData by StarRocksUtils#addOrUpdateData(),information:: {}", e.getMessage());
if (CollectionUtils.isEmpty(fieldNames) || CollectionUtils.isEmpty(recordValues)) { return super.executeUpdate(recordValues);
return 0L;
}
return starRocksUtils.addOrUpdateData(fieldNames, recordValues);
} }
}
@Override
public long executeUpdate(List<Object[]> recordValues) {
if (CollectionUtils.isEmpty(fieldNames) || CollectionUtils.isEmpty(recordValues)) {
return 0L;
}
if (CollectionUtils.isEmpty(fieldNames) || CollectionUtils.isEmpty(recordValues)) {
return 0L;
}
return starRocksUtils.addOrUpdateData(fieldNames, recordValues);
}
} }

View File

@@ -9,42 +9,41 @@
///////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////
package org.dromara.dbswitch.product.sr; package org.dromara.dbswitch.product.sr;
import java.util.List;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.dromara.dbswitch.common.entity.CloseableDataSource; import org.dromara.dbswitch.common.entity.CloseableDataSource;
import org.dromara.dbswitch.core.provider.ProductFactoryProvider; import org.dromara.dbswitch.core.provider.ProductFactoryProvider;
import org.dromara.dbswitch.core.provider.write.DefaultTableDataWriteProvider; import org.dromara.dbswitch.core.provider.write.AutoCastTableDataWriteProvider;
import java.util.List;
@Slf4j @Slf4j
public class StarrocksTableDataWriteProvider extends DefaultTableDataWriteProvider { public class StarrocksTableDataWriteProvider extends AutoCastTableDataWriteProvider {
private final CloseableDataSource dataSource;
private final StarRocksUtils starRocksUtils = new StarRocksUtils();
private final CloseableDataSource dataSource; public StarrocksTableDataWriteProvider(ProductFactoryProvider factoryProvider) {
private final StarRocksUtils starRocksUtils = new StarRocksUtils(); super(factoryProvider);
; dataSource = (CloseableDataSource) factoryProvider.getDataSource();
}
public StarrocksTableDataWriteProvider(ProductFactoryProvider factoryProvider) {
super(factoryProvider);
dataSource = (CloseableDataSource) factoryProvider.getDataSource();
@Override
public void prepareWrite(String schemaName, String tableName, List<String> fieldNames) {
super.prepareWrite(schemaName, tableName, fieldNames);
try {
starRocksUtils.init(schemaName, tableName, dataSource);
} catch (Exception e) {
log.warn("Failed to init by StarRocksUtils#init(),information: {}", e.getMessage());
} }
}
@Override @Override
public void prepareWrite(String schemaName, String tableName, List<String> fieldNames) { public long write(List<String> fieldNames, List<Object[]> recordValues) {
starRocksUtils.init(schemaName, tableName, dataSource); try {
return starRocksUtils.addOrUpdateData(fieldNames, recordValues);
} catch (Exception e) {
log.warn("Failed to insertOrUpdate data by StarRocksUtils#addOrUpdateData(),information: {}", e.getMessage());
return super.write(fieldNames, recordValues);
} }
}
@Override
public long write(List<String> fieldNames, List<Object[]> recordValues) {
if (CollectionUtils.isEmpty(fieldNames) || CollectionUtils.isEmpty(recordValues)) {
return 0L;
}
return starRocksUtils.addOrUpdateData(fieldNames, recordValues);
}
} }