|
|
|
@@ -4,7 +4,7 @@
|
|
|
|
|
// Use of this source code is governed by a BSD-style license
|
|
|
|
|
//
|
|
|
|
|
// Author: tang (inrgihc@126.com)
|
|
|
|
|
// Data : 2020/1/2
|
|
|
|
|
// Date : 2020/1/2
|
|
|
|
|
// Location: beijing , china
|
|
|
|
|
/////////////////////////////////////////////////////////////
|
|
|
|
|
package com.gitee.dbswitch.data.service;
|
|
|
|
@@ -17,8 +17,8 @@ import java.util.List;
|
|
|
|
|
import java.util.Map;
|
|
|
|
|
import java.util.Objects;
|
|
|
|
|
import java.util.regex.Pattern;
|
|
|
|
|
|
|
|
|
|
import javax.sql.DataSource;
|
|
|
|
|
import com.carrotsearch.sizeof.RamUsageEstimator;
|
|
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
|
|
import org.springframework.jdbc.core.JdbcTemplate;
|
|
|
|
@@ -58,6 +58,8 @@ import lombok.extern.slf4j.Slf4j;
|
|
|
|
|
@Service("MainService")
|
|
|
|
|
public class MainService {
|
|
|
|
|
|
|
|
|
|
private final long MAX_CACHE_BYTES_SIZE = 512 * 1024 * 1024;
|
|
|
|
|
|
|
|
|
|
private ObjectMapper jackson = new ObjectMapper();
|
|
|
|
|
|
|
|
|
|
@Autowired
|
|
|
|
@@ -264,7 +266,8 @@ public class MainService {
|
|
|
|
|
StatementResultSet srs = sourceOperator.queryTableData(tableDescription.getSchemaName(),
|
|
|
|
|
tableDescription.getTableName(), fields);
|
|
|
|
|
|
|
|
|
|
List<Object[]> cache = new LinkedList<Object[]>();
|
|
|
|
|
List<Object[]> cache = new LinkedList<>();
|
|
|
|
|
long cacheBytes=0;
|
|
|
|
|
long totalCount = 0;
|
|
|
|
|
try {
|
|
|
|
|
ResultSet rs = srs.getResultset();
|
|
|
|
@@ -280,12 +283,14 @@ public class MainService {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
cache.add(record);
|
|
|
|
|
cacheBytes += RamUsageEstimator.shallowSizeOf(record);
|
|
|
|
|
++totalCount;
|
|
|
|
|
|
|
|
|
|
if (cache.size() >= BATCH_SIZE) {
|
|
|
|
|
if (cache.size() >= BATCH_SIZE || cacheBytes >= MAX_CACHE_BYTES_SIZE) {
|
|
|
|
|
long ret = writer.write(fields, cache);
|
|
|
|
|
log.info("[FullCoverSynch] handle table [{}] data count: {}", fullTableName, ret);
|
|
|
|
|
cache.clear();
|
|
|
|
|
cacheBytes = 0;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@@ -348,6 +353,7 @@ public class MainService {
|
|
|
|
|
private long countUpdate = 0;
|
|
|
|
|
private long countDelete = 0;
|
|
|
|
|
private long count = 0;
|
|
|
|
|
private long cacheBytes=0;
|
|
|
|
|
private List<Object[]> cacheInsert = new LinkedList<Object[]>();
|
|
|
|
|
private List<Object[]> cacheUpdate = new LinkedList<Object[]>();
|
|
|
|
|
private List<Object[]> cacheDelete = new LinkedList<Object[]>();
|
|
|
|
@@ -365,6 +371,7 @@ public class MainService {
|
|
|
|
|
countDelete++;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
cacheBytes+=RamUsageEstimator.shallowSizeOf(record);
|
|
|
|
|
count++;
|
|
|
|
|
checkFull(fields);
|
|
|
|
|
}
|
|
|
|
@@ -376,7 +383,7 @@ public class MainService {
|
|
|
|
|
*/
|
|
|
|
|
private void checkFull(List<String> fields) {
|
|
|
|
|
if (cacheInsert.size() >= BATCH_SIZE || cacheUpdate.size() >= BATCH_SIZE
|
|
|
|
|
|| cacheDelete.size() >= BATCH_SIZE) {
|
|
|
|
|
|| cacheDelete.size() >= BATCH_SIZE || cacheBytes >= MAX_CACHE_BYTES_SIZE) {
|
|
|
|
|
if (cacheDelete.size() > 0) {
|
|
|
|
|
doDelete(fields);
|
|
|
|
|
}
|
|
|
|
@@ -388,6 +395,8 @@ public class MainService {
|
|
|
|
|
if (cacheUpdate.size() > 0) {
|
|
|
|
|
doUpdate(fields);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
cacheBytes = 0;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@@ -433,7 +442,7 @@ public class MainService {
|
|
|
|
|
/**
|
|
|
|
|
* 创建于指定数据库连接描述符的连接池
|
|
|
|
|
*
|
|
|
|
|
* @param dbdesc 数据库连接描述符
|
|
|
|
|
* @param description 数据库连接描述符
|
|
|
|
|
* @return HikariDataSource连接池
|
|
|
|
|
*/
|
|
|
|
|
private HikariDataSource createSourceDataSource(DbswichProperties.SourceDataSourceProperties description) {
|
|
|
|
@@ -459,7 +468,7 @@ public class MainService {
|
|
|
|
|
/**
|
|
|
|
|
* 创建于指定数据库连接描述符的连接池
|
|
|
|
|
*
|
|
|
|
|
* @param dbdesc 数据库连接描述符
|
|
|
|
|
* @param description 数据库连接描述符
|
|
|
|
|
* @return HikariDataSource连接池
|
|
|
|
|
*/
|
|
|
|
|
private HikariDataSource createTargetDataSource(DbswichProperties.TargetDataSourceProperties description) {
|
|
|
|
@@ -499,15 +508,16 @@ public class MainService {
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 检查MySQL数据库表的存储引擎是否为Innodb
|
|
|
|
|
*
|
|
|
|
|
*
|
|
|
|
|
* @param shemaName schema名
|
|
|
|
|
* @param tableName table名
|
|
|
|
|
* @param dataSource 数据源
|
|
|
|
|
* @param task 任务实体
|
|
|
|
|
* @return 为Innodb存储引擎时返回True,否在为false
|
|
|
|
|
* @return 为Innodb存储引擎时返回True, 否在为false
|
|
|
|
|
*/
|
|
|
|
|
private boolean isMysqlInodbStorageEngine(String shemaName, String tableName, DataSource dataSource) {
|
|
|
|
|
String sql = "SELECT count(*) as total FROM information_schema.tables WHERE table_schema=? AND table_name=? AND ENGINE='InnoDB'";
|
|
|
|
|
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
|
|
|
|
|
return jdbcTemplate.queryForObject(sql, new Object[] { shemaName, tableName }, Integer.class) > 0;
|
|
|
|
|
return jdbcTemplate.queryForObject(sql, new Object[]{shemaName, tableName}, Integer.class) > 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|