mirror of
https://gitee.com/dromara/dbswitch.git
synced 2025-09-22 03:54:50 +00:00
支持es写入
This commit is contained in:
@@ -80,6 +80,13 @@ public class AssignmentService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Long sourceConnectionId = assignmentConfigEntity.getSourceConnectionId();
|
||||||
|
DatabaseConnectionEntity sourceEntity = databaseConnectionDAO.getById(sourceConnectionId);
|
||||||
|
if (ProductTypeEnum.ELASTICSEARCH == sourceEntity.getType()) {
|
||||||
|
throw new DbswitchException(ResultCode.ERROR_INVALID_ASSIGNMENT_CONFIG,
|
||||||
|
"不支持源端数据源为ElasticSearch类型");
|
||||||
|
}
|
||||||
|
|
||||||
return ConverterFactory.getConverter(AssignmentInfoConverter.class)
|
return ConverterFactory.getConverter(AssignmentInfoConverter.class)
|
||||||
.convert(assignmentTaskDAO.getById(assignment.getId()));
|
.convert(assignmentTaskDAO.getById(assignment.getId()));
|
||||||
}
|
}
|
||||||
|
@@ -168,6 +168,15 @@ public enum ProductTypeEnum {
|
|||||||
"jdbc:mongodb://",
|
"jdbc:mongodb://",
|
||||||
new String[]{"jdbc:mongodb://{host}[:{port}]/[{database}][\\?{params}]"},
|
new String[]{"jdbc:mongodb://{host}[:{port}]/[{database}][\\?{params}]"},
|
||||||
"jdbc:mongodb://172.17.2.12:27017/admin?authSource=admin&authMechanism=SCRAM-SHA-1"),
|
"jdbc:mongodb://172.17.2.12:27017/admin?authSource=admin&authMechanism=SCRAM-SHA-1"),
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ElasticSearch数据库类型
|
||||||
|
*/
|
||||||
|
ELASTICSEARCH(17, "\"", "ElasticSearch", "com.gitee.jdbc.elasticsearch.JdbcDriver", 9200,
|
||||||
|
"",
|
||||||
|
"jdbc:jest://",
|
||||||
|
new String[]{"jdbc:jest://{host}[:{port}][\\?{params}]"},
|
||||||
|
"jdbc:jest://172.17.2.12:9200?useHttps=false"),
|
||||||
;
|
;
|
||||||
|
|
||||||
private int id;
|
private int id;
|
||||||
@@ -181,7 +190,7 @@ public enum ProductTypeEnum {
|
|||||||
private String sample;
|
private String sample;
|
||||||
|
|
||||||
public boolean hasDatabaseName() {
|
public boolean hasDatabaseName() {
|
||||||
return !Arrays.asList(DM, SQLITE3, MYSQL, MARIADB, GBASE8A).contains(this);
|
return !Arrays.asList(DM, SQLITE3, MYSQL, MARIADB, GBASE8A, ELASTICSEARCH).contains(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean hasFilePath() {
|
public boolean hasFilePath() {
|
||||||
@@ -265,6 +274,15 @@ public enum ProductTypeEnum {
|
|||||||
return this == MONGODB;
|
return this == MONGODB;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 是否为ElasticSearch数据库类型
|
||||||
|
*
|
||||||
|
* @return boolean
|
||||||
|
*/
|
||||||
|
public boolean isElasticSearch() {
|
||||||
|
return this == ELASTICSEARCH;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 是否为ClickHouse数据库类型
|
* 是否为ClickHouse数据库类型
|
||||||
*
|
*
|
||||||
|
@@ -95,6 +95,9 @@ public final class DatabaseAwareUtils {
|
|||||||
if (null != url && url.contains("mongodb://")) {
|
if (null != url && url.contains("mongodb://")) {
|
||||||
return ProductTypeEnum.MONGODB;
|
return ProductTypeEnum.MONGODB;
|
||||||
}
|
}
|
||||||
|
if (null != url && url.contains("jest://")) {
|
||||||
|
return ProductTypeEnum.ELASTICSEARCH;
|
||||||
|
}
|
||||||
throw new IllegalStateException("Unable to detect database type from data source instance");
|
throw new IllegalStateException("Unable to detect database type from data source instance");
|
||||||
} catch (SQLException se) {
|
} catch (SQLException se) {
|
||||||
throw new RuntimeException(se);
|
throw new RuntimeException(se);
|
||||||
|
@@ -242,7 +242,7 @@ public class ReaderTaskThread extends TaskProcessor<ReaderTaskResult> {
|
|||||||
properties.getTarget().setTargetDrop(true);
|
properties.getTarget().setTargetDrop(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (targetProductType.isMongodb()) {
|
if (targetProductType.isMongodb() || targetProductType.isElasticSearch()) {
|
||||||
try {
|
try {
|
||||||
targetFactoryProvider.createTableManageProvider()
|
targetFactoryProvider.createTableManageProvider()
|
||||||
.dropTable(targetSchemaName, targetTableName);
|
.dropTable(targetSchemaName, targetTableName);
|
||||||
|
@@ -65,7 +65,7 @@ public final class DataSourceUtils {
|
|||||||
ds.setConnectionTestQuery("SELECT 1 FROM SYSIBM.SYSDUMMY1");
|
ds.setConnectionTestQuery("SELECT 1 FROM SYSIBM.SYSDUMMY1");
|
||||||
} else if (properties.getDriverClassName().contains("mongodb")) {
|
} else if (properties.getDriverClassName().contains("mongodb")) {
|
||||||
ds.setConnectionTestQuery("use admin;");
|
ds.setConnectionTestQuery("use admin;");
|
||||||
} else {
|
} else if (!ds.getJdbcUrl().contains("jdbc:jest://")) {
|
||||||
ds.setConnectionTestQuery("SELECT 1");
|
ds.setConnectionTestQuery("SELECT 1");
|
||||||
}
|
}
|
||||||
ds.setMaximumPoolSize(MAX_THREAD_COUNT);
|
ds.setMaximumPoolSize(MAX_THREAD_COUNT);
|
||||||
@@ -107,7 +107,7 @@ public final class DataSourceUtils {
|
|||||||
ds.setConnectionTestQuery("SELECT 1 FROM SYSIBM.SYSDUMMY1");
|
ds.setConnectionTestQuery("SELECT 1 FROM SYSIBM.SYSDUMMY1");
|
||||||
} else if (properties.getDriverClassName().contains("mongodb")) {
|
} else if (properties.getDriverClassName().contains("mongodb")) {
|
||||||
ds.setConnectionTestQuery("use admin;");
|
ds.setConnectionTestQuery("use admin;");
|
||||||
} else {
|
} else if (!ds.getJdbcUrl().contains("jdbc:jest://")) {
|
||||||
ds.setConnectionTestQuery("SELECT 1");
|
ds.setConnectionTestQuery("SELECT 1");
|
||||||
}
|
}
|
||||||
if (properties.getDriverClassName().contains("sqlite")) {
|
if (properties.getDriverClassName().contains("sqlite")) {
|
||||||
|
26
dbswitch-product/dbswitch-product-elasticsearch/pom.xml
Normal file
26
dbswitch-product/dbswitch-product-elasticsearch/pom.xml
Normal file
@@ -0,0 +1,26 @@
|
|||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
<parent>
|
||||||
|
<artifactId>dbswitch-product</artifactId>
|
||||||
|
<groupId>com.gitee.dbswitch</groupId>
|
||||||
|
<version>1.9.2</version>
|
||||||
|
</parent>
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
<artifactId>dbswitch-product-elasticsearch</artifactId>
|
||||||
|
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.gitee.dbswitch</groupId>
|
||||||
|
<artifactId>dbswitch-common</artifactId>
|
||||||
|
<version>${project.version}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.gitee.dbswitch</groupId>
|
||||||
|
<artifactId>dbswitch-core</artifactId>
|
||||||
|
<version>${project.version}</version>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
|
||||||
|
</project>
|
@@ -0,0 +1,59 @@
|
|||||||
|
// Copyright tang. All rights reserved.
|
||||||
|
// https://gitee.com/inrgihc/dbswitch
|
||||||
|
//
|
||||||
|
// Use of this source code is governed by a BSD-style license
|
||||||
|
//
|
||||||
|
// Author: tang (inrgihc@126.com)
|
||||||
|
// Date : 2020/1/2
|
||||||
|
// Location: beijing , china
|
||||||
|
/////////////////////////////////////////////////////////////
|
||||||
|
package com.gitee.dbswitch.product.elasticsearch;
|
||||||
|
|
||||||
|
import com.gitee.dbswitch.annotation.Product;
|
||||||
|
import com.gitee.dbswitch.common.type.ProductTypeEnum;
|
||||||
|
import com.gitee.dbswitch.features.ProductFeatures;
|
||||||
|
import com.gitee.dbswitch.provider.AbstractFactoryProvider;
|
||||||
|
import com.gitee.dbswitch.provider.manage.TableManageProvider;
|
||||||
|
import com.gitee.dbswitch.provider.meta.MetadataProvider;
|
||||||
|
import com.gitee.dbswitch.provider.query.TableDataQueryProvider;
|
||||||
|
import com.gitee.dbswitch.provider.sync.TableDataSynchronizeProvider;
|
||||||
|
import com.gitee.dbswitch.provider.write.TableDataWriteProvider;
|
||||||
|
import javax.sql.DataSource;
|
||||||
|
|
||||||
|
@Product(ProductTypeEnum.ELASTICSEARCH)
|
||||||
|
public class ElasticsearchFactoryProvider extends AbstractFactoryProvider {
|
||||||
|
|
||||||
|
public ElasticsearchFactoryProvider(DataSource dataSource) {
|
||||||
|
super(dataSource);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ProductFeatures getProductFeatures() {
|
||||||
|
return new ElasticsearchFeatures();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public MetadataProvider createMetadataQueryProvider() {
|
||||||
|
return new ElasticsearchMetadataQueryProvider(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TableDataQueryProvider createTableDataQueryProvider() {
|
||||||
|
return new ElasticsearchTableDataQueryProvider(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TableDataWriteProvider createTableDataWriteProvider(boolean useInsert) {
|
||||||
|
return new ElasticsearchTableDataWriteProvider(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TableManageProvider createTableManageProvider() {
|
||||||
|
return new ElasticsearchTableManageProvider(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TableDataSynchronizeProvider createTableDataSynchronizeProvider() {
|
||||||
|
return new ElasticsearchTableDataSynchronizer(this);
|
||||||
|
}
|
||||||
|
}
|
@@ -0,0 +1,16 @@
|
|||||||
|
// Copyright tang. All rights reserved.
|
||||||
|
// https://gitee.com/inrgihc/dbswitch
|
||||||
|
//
|
||||||
|
// Use of this source code is governed by a BSD-style license
|
||||||
|
//
|
||||||
|
// Author: tang (inrgihc@126.com)
|
||||||
|
// Date : 2020/1/2
|
||||||
|
// Location: beijing , china
|
||||||
|
/////////////////////////////////////////////////////////////
|
||||||
|
package com.gitee.dbswitch.product.elasticsearch;
|
||||||
|
|
||||||
|
import com.gitee.dbswitch.features.ProductFeatures;
|
||||||
|
|
||||||
|
public class ElasticsearchFeatures implements ProductFeatures {
|
||||||
|
|
||||||
|
}
|
@@ -0,0 +1,105 @@
|
|||||||
|
// Copyright tang. All rights reserved.
|
||||||
|
// https://gitee.com/inrgihc/dbswitch
|
||||||
|
//
|
||||||
|
// Use of this source code is governed by a BSD-style license
|
||||||
|
//
|
||||||
|
// Author: tang (inrgihc@126.com)
|
||||||
|
// Date : 2020/1/2
|
||||||
|
// Location: beijing , china
|
||||||
|
/////////////////////////////////////////////////////////////
|
||||||
|
package com.gitee.dbswitch.product.elasticsearch;
|
||||||
|
|
||||||
|
import com.gitee.dbswitch.provider.ProductFactoryProvider;
|
||||||
|
import com.gitee.dbswitch.provider.meta.AbstractMetadataProvider;
|
||||||
|
import com.gitee.dbswitch.schema.ColumnDescription;
|
||||||
|
import com.gitee.dbswitch.schema.ColumnMetaData;
|
||||||
|
import com.gitee.dbswitch.schema.IndexDescription;
|
||||||
|
import com.gitee.dbswitch.schema.TableDescription;
|
||||||
|
import java.sql.Connection;
|
||||||
|
import java.sql.ResultSet;
|
||||||
|
import java.sql.SQLException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
public class ElasticsearchMetadataQueryProvider extends AbstractMetadataProvider {
|
||||||
|
|
||||||
|
protected ElasticsearchMetadataQueryProvider(ProductFactoryProvider factoryProvider) {
|
||||||
|
super(factoryProvider);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getTableDDL(Connection connection, String schemaName, String tableName) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getViewDDL(Connection connection, String schemaName, String tableName) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<ColumnDescription> querySelectSqlColumnMeta(Connection connection, String sql) {
|
||||||
|
return Collections.emptyList();
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<ColumnDescription> queryTableColumnMeta(Connection connection, String schemaName,
|
||||||
|
String tableName) {
|
||||||
|
List<ColumnDescription> ret = new ArrayList<>();
|
||||||
|
try (ResultSet rs = connection.getMetaData().getColumns(null, schemaName, tableName, null);) {
|
||||||
|
while (rs.next()) {
|
||||||
|
ColumnDescription cd = new ColumnDescription();
|
||||||
|
cd.setFieldName(rs.getString("COLUMN_NAME"));
|
||||||
|
cd.setLabelName(rs.getString("COLUMN_NAME"));
|
||||||
|
cd.setFieldType(Integer.parseInt(rs.getString("DATA_TYPE")));
|
||||||
|
cd.setFieldTypeName(rs.getString("TYPE_NAME"));
|
||||||
|
cd.setFiledTypeClassName(rs.getString("TYPE_NAME"));
|
||||||
|
cd.setDisplaySize(Integer.parseInt(rs.getString("COLUMN_SIZE")));
|
||||||
|
cd.setPrecisionSize(0);
|
||||||
|
cd.setScaleSize(0);
|
||||||
|
cd.setAutoIncrement(false);
|
||||||
|
cd.setNullable(true);
|
||||||
|
cd.setProductType(getProductType());
|
||||||
|
ret.add(cd);
|
||||||
|
}
|
||||||
|
} catch (SQLException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void testQuerySQL(Connection connection, String sql) {
|
||||||
|
try {
|
||||||
|
List<String> schemas = querySchemaList(connection);
|
||||||
|
connection.getMetaData().getTables(null, schemas.get(0), null, null);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<String> queryTablePrimaryKeys(Connection connection, String schemaName, String tableName) {
|
||||||
|
return Collections.emptyList();
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<IndexDescription> queryTableIndexes(Connection connection, String schemaName, String tableName) {
|
||||||
|
return Collections.emptyList();
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getQuotedSchemaTableCombination(String schemaName, String tableName) {
|
||||||
|
return String.format("%s.%s", schemaName, tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getFieldDefinition(ColumnMetaData v, List<String> pks, boolean useAutoInc, boolean addCr,
|
||||||
|
boolean withRemarks) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getPrimaryKeyAsString(List<String> pks) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<String> getTableColumnCommentDefinition(TableDescription td, List<ColumnDescription> cds) {
|
||||||
|
return Collections.emptyList();
|
||||||
|
}
|
||||||
|
}
|
@@ -0,0 +1,103 @@
|
|||||||
|
// Copyright tang. All rights reserved.
|
||||||
|
// https://gitee.com/inrgihc/dbswitch
|
||||||
|
//
|
||||||
|
// Use of this source code is governed by a BSD-style license
|
||||||
|
//
|
||||||
|
// Author: tang (inrgihc@126.com)
|
||||||
|
// Date : 2020/1/2
|
||||||
|
// Location: beijing , china
|
||||||
|
/////////////////////////////////////////////////////////////
|
||||||
|
package com.gitee.dbswitch.product.elasticsearch;
|
||||||
|
|
||||||
|
import com.gitee.dbswitch.common.consts.Constants;
|
||||||
|
import com.gitee.dbswitch.common.entity.ResultSetWrapper;
|
||||||
|
import com.gitee.dbswitch.common.type.ProductTypeEnum;
|
||||||
|
import com.gitee.dbswitch.provider.ProductFactoryProvider;
|
||||||
|
import com.gitee.dbswitch.provider.query.TableDataQueryProvider;
|
||||||
|
import com.gitee.dbswitch.schema.SchemaTableData;
|
||||||
|
import java.sql.Connection;
|
||||||
|
import java.sql.ResultSet;
|
||||||
|
import java.sql.ResultSetMetaData;
|
||||||
|
import java.sql.SQLException;
|
||||||
|
import java.sql.Statement;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import javax.sql.DataSource;
|
||||||
|
|
||||||
|
public class ElasticsearchTableDataQueryProvider implements TableDataQueryProvider {
|
||||||
|
|
||||||
|
private ProductFactoryProvider factoryProvider;
|
||||||
|
private DataSource dataSource;
|
||||||
|
|
||||||
|
public ElasticsearchTableDataQueryProvider(ProductFactoryProvider factoryProvider) {
|
||||||
|
this.factoryProvider = factoryProvider;
|
||||||
|
this.dataSource = factoryProvider.getDataSource();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ProductTypeEnum getProductType() {
|
||||||
|
return this.factoryProvider.getProductType();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getQueryFetchSize() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setQueryFetchSize(int size) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ResultSetWrapper queryTableData(String schemaName, String tableName, List<String> fields,
|
||||||
|
List<String> orders) {
|
||||||
|
String sql = tableName;
|
||||||
|
try {
|
||||||
|
Connection connection = this.dataSource.getConnection();
|
||||||
|
Statement statement = connection.createStatement();
|
||||||
|
statement.setQueryTimeout(Constants.DEFAULT_QUERY_TIMEOUT_SECONDS);
|
||||||
|
return ResultSetWrapper.builder()
|
||||||
|
.connection(connection)
|
||||||
|
.statement(statement)
|
||||||
|
.resultSet(statement.executeQuery(sql))
|
||||||
|
.build();
|
||||||
|
} catch (Throwable t) {
|
||||||
|
throw new RuntimeException(t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SchemaTableData queryTableData(Connection connection, String schemaName, String tableName, int rowCount) {
|
||||||
|
String querySQL = tableName;
|
||||||
|
SchemaTableData data = new SchemaTableData();
|
||||||
|
data.setSchemaName(schemaName);
|
||||||
|
data.setTableName(tableName);
|
||||||
|
data.setColumns(new ArrayList<>());
|
||||||
|
data.setRows(new ArrayList<>());
|
||||||
|
try (Statement st = connection.createStatement()) {
|
||||||
|
try (ResultSet rs = st.executeQuery(querySQL)) {
|
||||||
|
ResultSetMetaData m = rs.getMetaData();
|
||||||
|
int count = m.getColumnCount();
|
||||||
|
for (int i = 1; i <= count; i++) {
|
||||||
|
data.getColumns().add(m.getColumnLabel(i));
|
||||||
|
}
|
||||||
|
|
||||||
|
while (rs.next()) {
|
||||||
|
List<Object> row = new ArrayList<>(count);
|
||||||
|
for (int i = 1; i <= count; i++) {
|
||||||
|
Object value = rs.getObject(i);
|
||||||
|
row.add(value);
|
||||||
|
}
|
||||||
|
if (data.getRows().size() > rowCount) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
data.getRows().add(row);
|
||||||
|
}
|
||||||
|
|
||||||
|
return data;
|
||||||
|
}
|
||||||
|
} catch (SQLException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@@ -0,0 +1,41 @@
|
|||||||
|
// Copyright tang. All rights reserved.
|
||||||
|
// https://gitee.com/inrgihc/dbswitch
|
||||||
|
//
|
||||||
|
// Use of this source code is governed by a BSD-style license
|
||||||
|
//
|
||||||
|
// Author: tang (inrgihc@126.com)
|
||||||
|
// Date : 2020/1/2
|
||||||
|
// Location: beijing , china
|
||||||
|
/////////////////////////////////////////////////////////////
|
||||||
|
package com.gitee.dbswitch.product.elasticsearch;
|
||||||
|
|
||||||
|
import com.gitee.dbswitch.provider.ProductFactoryProvider;
|
||||||
|
import com.gitee.dbswitch.provider.sync.DefaultTableDataSynchronizeProvider;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
public class ElasticsearchTableDataSynchronizer extends DefaultTableDataSynchronizeProvider {
|
||||||
|
|
||||||
|
public ElasticsearchTableDataSynchronizer(ProductFactoryProvider factoryProvider) {
|
||||||
|
super(factoryProvider);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void prepare(String schemaName, String tableName, List<String> fieldNames, List<String> pks) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long executeInsert(List<Object[]> records) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long executeUpdate(List<Object[]> records) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long executeDelete(List<Object[]> records) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@@ -0,0 +1,72 @@
|
|||||||
|
// Copyright tang. All rights reserved.
|
||||||
|
// https://gitee.com/inrgihc/dbswitch
|
||||||
|
//
|
||||||
|
// Use of this source code is governed by a BSD-style license
|
||||||
|
//
|
||||||
|
// Author: tang (inrgihc@126.com)
|
||||||
|
// Date : 2020/1/2
|
||||||
|
// Location: beijing , china
|
||||||
|
/////////////////////////////////////////////////////////////
|
||||||
|
package com.gitee.dbswitch.product.elasticsearch;
|
||||||
|
|
||||||
|
import cn.hutool.json.JSONUtil;
|
||||||
|
import com.gitee.dbswitch.provider.ProductFactoryProvider;
|
||||||
|
import com.gitee.dbswitch.provider.write.DefaultTableDataWriteProvider;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
import java.sql.Connection;
|
||||||
|
import java.sql.Statement;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.LinkedHashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.commons.collections4.CollectionUtils;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
public class ElasticsearchTableDataWriteProvider extends DefaultTableDataWriteProvider {
|
||||||
|
|
||||||
|
private String indexName;
|
||||||
|
|
||||||
|
public ElasticsearchTableDataWriteProvider(ProductFactoryProvider factoryProvider) {
|
||||||
|
super(factoryProvider);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void prepareWrite(String schemaName, String tableName, List<String> fieldNames) {
|
||||||
|
this.indexName = tableName;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long write(List<String> fieldNames, List<Object[]> recordValues) {
|
||||||
|
if (CollectionUtils.isEmpty(fieldNames) || CollectionUtils.isEmpty(recordValues)) {
|
||||||
|
return 0L;
|
||||||
|
}
|
||||||
|
Map<String, Object> bulkDocuments = new HashMap<>();
|
||||||
|
bulkDocuments.put("indexName", indexName);
|
||||||
|
try (Connection connection = getDataSource().getConnection()) {
|
||||||
|
Statement statement = connection.createStatement();
|
||||||
|
for (List<Object[]> partRecordValues : Lists.partition(recordValues, 500)) {
|
||||||
|
bulkDocuments.put("sources", asString(fieldNames, partRecordValues));
|
||||||
|
String sql = JSONUtil.toJsonStr(bulkDocuments);
|
||||||
|
statement.executeUpdate(sql);
|
||||||
|
}
|
||||||
|
return recordValues.size();
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<String> asString(List<String> fieldNames, List<Object[]> recordValues) {
|
||||||
|
int fieldCount = Math.min(fieldNames.size(), recordValues.get(0).length);
|
||||||
|
List<String> rows = new ArrayList<>(recordValues.size());
|
||||||
|
for (Object[] row : recordValues) {
|
||||||
|
Map<String, Object> columns = new LinkedHashMap<>(fieldCount);
|
||||||
|
for (int i = 0; i < fieldCount; ++i) {
|
||||||
|
columns.put(fieldNames.get(i), row[i]);
|
||||||
|
}
|
||||||
|
rows.add(JSONUtil.toJsonStr(columns));
|
||||||
|
}
|
||||||
|
return rows;
|
||||||
|
}
|
||||||
|
}
|
@@ -0,0 +1,34 @@
|
|||||||
|
// Copyright tang. All rights reserved.
|
||||||
|
// https://gitee.com/inrgihc/dbswitch
|
||||||
|
//
|
||||||
|
// Use of this source code is governed by a BSD-style license
|
||||||
|
//
|
||||||
|
// Author: tang (inrgihc@126.com)
|
||||||
|
// Date : 2020/1/2
|
||||||
|
// Location: beijing , china
|
||||||
|
/////////////////////////////////////////////////////////////
|
||||||
|
package com.gitee.dbswitch.product.elasticsearch;
|
||||||
|
|
||||||
|
import com.gitee.dbswitch.provider.ProductFactoryProvider;
|
||||||
|
import com.gitee.dbswitch.provider.manage.DefaultTableManageProvider;
|
||||||
|
|
||||||
|
public class ElasticsearchTableManageProvider extends DefaultTableManageProvider {
|
||||||
|
|
||||||
|
public ElasticsearchTableManageProvider(ProductFactoryProvider factoryProvider) {
|
||||||
|
super(factoryProvider);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void truncateTableData(String schemaName, String tableName) {
|
||||||
|
cleanup(schemaName, tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void dropTable(String schemaName, String tableName) {
|
||||||
|
cleanup(schemaName, tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void cleanup(String schemaName, String tableName) {
|
||||||
|
//this.executeSql(tableName);
|
||||||
|
}
|
||||||
|
}
|
@@ -0,0 +1 @@
|
|||||||
|
com.gitee.dbswitch.product.elasticsearch.ElasticsearchFactoryProvider
|
@@ -93,6 +93,11 @@
|
|||||||
<artifactId>dbswitch-product-mongodb</artifactId>
|
<artifactId>dbswitch-product-mongodb</artifactId>
|
||||||
<version>${project.version}</version>
|
<version>${project.version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.gitee.dbswitch</groupId>
|
||||||
|
<artifactId>dbswitch-product-elasticsearch</artifactId>
|
||||||
|
<version>${project.version}</version>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
</project>
|
</project>
|
@@ -29,6 +29,7 @@
|
|||||||
<module>dbswitch-product-mongodb</module>
|
<module>dbswitch-product-mongodb</module>
|
||||||
<module>dbswitch-register-product</module>
|
<module>dbswitch-register-product</module>
|
||||||
<module>dbswitch-product-clickhouse</module>
|
<module>dbswitch-product-clickhouse</module>
|
||||||
|
<module>dbswitch-product-elasticsearch</module>
|
||||||
</modules>
|
</modules>
|
||||||
|
|
||||||
</project>
|
</project>
|
Binary file not shown.
Binary file not shown.
Reference in New Issue
Block a user