中断与异常优化

This commit is contained in:
inrgihc
2023-12-04 21:15:36 +08:00
parent a0ecbf8494
commit a346a734b9
52 changed files with 306 additions and 207 deletions

View File

@@ -2,7 +2,7 @@
set -e
DBSWITCH_VERSION=1.9.0
DBSWITCH_VERSION=1.9.1
BUILD_DOCKER_DIR="$( cd "$( dirname "$0" )" && pwd )"
PROJECT_ROOT_DIR=$( dirname "$BUILD_DOCKER_DIR")
DOCKER_DBSWITCH_DIR=$BUILD_DOCKER_DIR/dbswitch

View File

@@ -19,7 +19,7 @@ services:
start_period: 30s
dbswitch:
container_name: dbswitch_webui
image: inrgihc/dbswitch:1.9.0
image: inrgihc/dbswitch:1.9.1
environment:
MYSQLDB_HOST: dbswitch_mysqldb
MYSQLDB_PORT: 3306

View File

@@ -5,7 +5,7 @@
<parent>
<groupId>com.gitee.dbswitch</groupId>
<artifactId>dbswitch-parent</artifactId>
<version>1.9.0</version>
<version>1.9.1</version>
</parent>
<artifactId>dbswitch-admin</artifactId>

View File

@@ -5,7 +5,7 @@
<parent>
<groupId>com.gitee.dbswitch</groupId>
<artifactId>dbswitch-parent</artifactId>
<version>1.9.0</version>
<version>1.9.1</version>
</parent>
<artifactId>dbswitch-common</artifactId>

View File

@@ -5,7 +5,7 @@
<parent>
<groupId>com.gitee.dbswitch</groupId>
<artifactId>dbswitch-parent</artifactId>
<version>1.9.0</version>
<version>1.9.1</version>
</parent>
<artifactId>dbswitch-core</artifactId>

View File

@@ -53,6 +53,11 @@ public final class DefaultChangeCalculatorService implements RecordRowChangeCalc
*/
private int queryFetchSize;
/**
* 中断检查函数
*/
private Runnable checkInterrupt;
public DefaultChangeCalculatorService() {
this(false, true);
}
@@ -96,6 +101,11 @@ public final class DefaultChangeCalculatorService implements RecordRowChangeCalc
this.queryFetchSize = size;
}
@Override
public void setInterruptCheck(Runnable checkInterrupt) {
this.checkInterrupt = checkInterrupt;
}
/**
* 变化量计算函数
* <p>
@@ -210,13 +220,21 @@ public final class DefaultChangeCalculatorService implements RecordRowChangeCalc
if (log.isDebugEnabled()) {
log.debug("###### Query data from two table now");
}
if (null != checkInterrupt) {
checkInterrupt.run();
}
rsold = oldQuery
.queryTableData(task.getOldSchemaName(), task.getOldTableName(),
mappedQueryFieldColumn, fieldsMappedPrimaryKeyNew);
if (null != checkInterrupt) {
checkInterrupt.run();
}
rsnew = newQuery
.queryTableData(task.getNewSchemaName(), task.getNewTableName(),
queryFieldColumn, fieldsPrimaryKeyNew);
if (null != checkInterrupt) {
checkInterrupt.run();
}
ResultSetMetaData metaData = rsnew.getResultSet().getMetaData();
if (log.isDebugEnabled()) {
@@ -283,6 +301,10 @@ public final class DefaultChangeCalculatorService implements RecordRowChangeCalc
log.debug("###### Enter CDC calculate now");
}
if (null != checkInterrupt) {
checkInterrupt.run();
}
RecordTransformProvider transformer = task.getTransformer();
// 进入核心比较计算算法区域
@@ -292,6 +314,9 @@ public final class DefaultChangeCalculatorService implements RecordRowChangeCalc
Object[] two = transformer.doTransform(task.getNewSchemaName(), task.getNewTableName(),
queryFieldColumn, getRowData(rsnew.getResultSet()));
while (true) {
if (null != checkInterrupt) {
checkInterrupt.run();
}
if (one == null && two == null) {
break;
} else if (one == null && two != null) {

View File

@@ -58,6 +58,13 @@ public interface RecordRowChangeCalculator {
*/
void setFetchSize(int size);
/**
* 设置中断检查函数
*
* @param r 函数引用
*/
void setInterruptCheck(Runnable r);
/**
* 执行变化量计算任务
*

View File

@@ -20,10 +20,10 @@ public abstract class AbstractBatchExchanger {
private AsyncTaskExecutor readThreadExecutor;
private AsyncTaskExecutor writeThreadExecutor;
public AbstractBatchExchanger(AsyncTaskExecutor readExecutor, AsyncTaskExecutor writeExecutor) {
public AbstractBatchExchanger(AsyncTaskExecutor readExecutor, AsyncTaskExecutor writeExecutor, int channelMaxSize) {
ExamineUtils.checkNotNull(readExecutor, "readExecutor");
ExamineUtils.checkNotNull(writeExecutor, "writeExecutor");
this.memChannel = MemChannel.createNewChannel();
this.memChannel = MemChannel.createNewChannel(channelMaxSize);
this.readThreadExecutor = readExecutor;
this.writeThreadExecutor = writeExecutor;
}
@@ -46,8 +46,8 @@ public abstract class AbstractBatchExchanger {
writer.init(writeThreadExecutor);
// 启动reader和writer的并行工作
writer.work();
reader.work();
writer.startWork();
reader.startWork();
// writer会等待reader执行完
writer.waitForFinish();

View File

@@ -16,17 +16,32 @@ import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* 队列中的批元素结构
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class BatchElement {
/**
* 表名映射字符串,形如 A --> B 的形式
*/
private String tableNameMapString;
/**
* 数据写入回调函数
*/
private BiFunction<List<String>, List<Object[]>, Long> handler;
/**
* 写入回调函数的第1个参数
*/
private List<String> arg1;
/**
* 写入回调函数的第2个参数
*/
private List<Object[]> arg2;
}

View File

@@ -9,22 +9,42 @@
/////////////////////////////////////////////////////////////
package com.gitee.dbswitch.core.exchange;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeUnit;
public class MemChannel extends ConcurrentLinkedQueue<BatchElement> {
public class MemChannel extends ArrayBlockingQueue<BatchElement> {
public static MemChannel createNewChannel() {
return new MemChannel();
private static final int DEFAULT_QUEUE_MIN_SIZE = 10;
public static MemChannel createNewChannel(int capacity) {
if (capacity < DEFAULT_QUEUE_MIN_SIZE) {
capacity = DEFAULT_QUEUE_MIN_SIZE;
}
return new MemChannel(capacity);
}
public MemChannel(int capacity) {
super(capacity, true);
}
@Override
public boolean add(BatchElement elem) {
return super.add(elem);
try {
super.put(elem);
return true;
} catch (InterruptedException e) {
throw new CancellationException("task is interrupted");
}
}
@Override
public BatchElement poll() {
return super.poll();
try {
return super.poll(5, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
return null;
}
}
}

View File

@@ -15,7 +15,6 @@ import java.util.Optional;
public abstract class AbstractRobot<R extends TaskResult> implements Robot {
private volatile boolean interrupted = false;
private MemChannel channel;
public void setChannel(MemChannel channel) {
@@ -23,17 +22,11 @@ public abstract class AbstractRobot<R extends TaskResult> implements Robot {
}
public MemChannel getChannel() {
return channel;
return this.channel;
}
public void interrupt() {
interrupted = true;
}
protected void checkInterrupt() {
if (interrupted) {
throw new RuntimeException("task is interrupted");
}
public void clearChannel() {
this.channel.clear();
}
public abstract Optional<R> getWorkResult();

View File

@@ -15,7 +15,7 @@ public interface Robot {
void init(AsyncTaskExecutor threadExecutor);
void work();
void startWork();
void interrupt();
}

View File

@@ -15,8 +15,7 @@ public abstract class RobotReader<R extends TaskResult> extends AbstractRobot<R>
public abstract void startRead();
public void work() {
checkInterrupt();
public void startWork() {
startRead();
}

View File

@@ -15,8 +15,7 @@ public abstract class RobotWriter<R extends TaskResult> extends AbstractRobot<R>
public abstract void startWrite();
public void work() {
checkInterrupt();
public void startWork() {
startWrite();
}

View File

@@ -9,25 +9,17 @@
/////////////////////////////////////////////////////////////
package com.gitee.dbswitch.core.task;
import java.util.concurrent.CancellationException;
import java.util.function.Supplier;
public abstract class TaskProcessor<R extends TaskResult> implements Supplier<R> {
private volatile boolean interrupted = false;
/**
* 中断任务
*/
public void interrupt() {
this.interrupted = true;
}
/**
* 任务执行期间用于检查是否接收到任务终端信号
* 任务执行期间用于检查是否接收到任务中断信号
*/
protected void checkInterrupt() {
if (interrupted) {
throw new RuntimeException("task is interrupted");
if (Thread.currentThread().isInterrupted()) {
throw new CancellationException("task is interrupted");
}
}

View File

@@ -11,4 +11,5 @@ package com.gitee.dbswitch.core.task;
public interface TaskResult {
void padding();
}

View File

@@ -5,7 +5,7 @@
<parent>
<groupId>com.gitee.dbswitch</groupId>
<artifactId>dbswitch-parent</artifactId>
<version>1.9.0</version>
<version>1.9.1</version>
</parent>
<artifactId>dbswitch-data</artifactId>

View File

@@ -33,7 +33,7 @@ public class ComparePerfStat extends PrintablePerfStat {
public String getPrintableString() {
StringBuilder sb = new StringBuilder();
if (readMap.size() > 0) {
sb.append("\nTable Detail Information Follows:\n");
sb.append("Table Detail Information Follows:\n");
for (Map.Entry<String, Long> entry : readMap.entrySet()) {
String tableMapName = entry.getKey();
Long tableReadTotal = entry.getValue();

View File

@@ -0,0 +1,44 @@
package com.gitee.dbswitch.data.domain;
import cn.hutool.core.exceptions.ExceptionUtil;
import com.gitee.dbswitch.common.entity.PrintablePerfStat;
import com.google.common.collect.Sets;
import java.util.Map;
import java.util.Set;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ExceptPerfStat extends PrintablePerfStat {
private Map<String, Throwable> readMap;
private Map<String, Throwable> writeMap;
@Override
public String getPrintableString() {
StringBuilder sb = new StringBuilder();
Set<String> tableNamesSet = Sets.union(readMap.keySet(), writeMap.keySet());
if (tableNamesSet.size() > 0) {
sb.append("Exception Detail Information Follows:\n");
int i = 1;
for (String tableMapName : tableNamesSet) {
Throwable readException = readMap.getOrDefault(tableMapName, null);
Throwable writeException = writeMap.getOrDefault(tableMapName, null);
if (null != readException) {
sb.append("[" + i + "]" + tableMapName + " Read Stack Information :\n");
sb.append(ExceptionUtil.stacktraceToString(readException));
}
if (null != writeException) {
sb.append("[" + i + "]" + tableMapName + " Write Stack Information :\n");
sb.append(ExceptionUtil.stacktraceToString(writeException));
}
i++;
}
}
return sb.toString();
}
}

View File

@@ -9,7 +9,6 @@
/////////////////////////////////////////////////////////////
package com.gitee.dbswitch.data.domain;
import cn.hutool.core.exceptions.ExceptionUtil;
import cn.hutool.core.io.unit.DataSizeUtil;
import com.gitee.dbswitch.common.entity.PrintablePerfStat;
import lombok.AllArgsConstructor;
@@ -29,17 +28,13 @@ public class ReaderPerfStat extends PrintablePerfStat {
private long total;
private long failure;
private long bytes;
private Throwable throwable;
@Override
public String getPrintableString() {
StringBuilder sb = new StringBuilder();
sb.append("Total Tables Count: \t" + total + "\n" +
"Failure Tables count: \t" + failure + "\n" +
"Total Transfer Size: \t" + DataSizeUtil.format(bytes) + "\n");
if (null != throwable) {
sb.append("Stack:" + ExceptionUtil.stacktraceToString(throwable));
}
sb.append("Total Read Tables Count: \t" + total + "\n");
sb.append("Total Failure Tables Count: \t" + failure + "\n");
sb.append("Total Read Record Size: \t" + DataSizeUtil.format(bytes) + "\n");
return sb.toString();
}
}

View File

@@ -31,6 +31,9 @@ public class ReaderTaskResult implements TaskResult {
@Builder.Default
private Map<String, Long> perf = new HashMap<>();
@Builder.Default
private Map<String, Throwable> except = new HashMap<>();
private String tableNameMapString;
private long successCount;
@@ -43,10 +46,13 @@ public class ReaderTaskResult implements TaskResult {
private Throwable throwable;
public ReaderTaskResult paddingPerf() {
@Override
public void padding() {
if (successCount > 0 && null != tableNameMapString) {
perf.put(tableNameMapString, recordCount);
}
return this;
if (null != throwable && null != tableNameMapString) {
except.putIfAbsent(tableNameMapString, throwable);
}
}
}

View File

@@ -9,31 +9,27 @@
/////////////////////////////////////////////////////////////
package com.gitee.dbswitch.data.domain;
import cn.hutool.core.exceptions.ExceptionUtil;
import com.gitee.dbswitch.common.entity.PrintablePerfStat;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* 并发写入统计格式化信息
*
* @author tang
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class WriterPerfStat extends PrintablePerfStat {
private long duration;
private Throwable throwable;
public WriterPerfStat(long duration, Throwable throwable) {
this.duration = duration;
this.throwable = throwable;
}
@Override
public String getPrintableString() {
StringBuilder sb = new StringBuilder();
sb.append("Total Writer Duration: \t" + (duration / 1000.0) + " s \n");
if (null != throwable) {
sb.append("Stack:" + ExceptionUtil.stacktraceToString(throwable));
}
return sb.toString();
}
}

View File

@@ -31,7 +31,17 @@ public class WriterTaskResult implements TaskResult {
@Builder.Default
private Map<String, Long> perf = new HashMap<>();
@Builder.Default
private Map<String, Throwable> except = new HashMap<>();
private boolean success;
private long duration;
private Throwable throwable;
@Override
public void padding() {
if (!except.isEmpty() && null == throwable) {
throwable = except.values().stream().findAny().get();
}
}
}

View File

@@ -19,5 +19,18 @@ import lombok.Data;
@Data
public class GlobalParamConfigProperties {
private int writeThreadNum = 4;
private int channelQueueSize;
private int writeThreadNum;
public GlobalParamConfigProperties() {
this.channelQueueSize = 100;
this.writeThreadNum = getDefaultWriteThreadNum();
}
private int getDefaultWriteThreadNum() {
int availableProcessorCount = Runtime.getRuntime().availableProcessors();
return Math.min(Math.max(4, availableProcessorCount), 8);
}
}

View File

@@ -99,6 +99,10 @@ public class ReaderTaskThread extends TaskProcessor<ReaderTaskResult> {
// 日志输出字符串使用
private String tableNameMapString;
// 统计信息
AtomicLong totalBytes = new AtomicLong(0);
AtomicLong totalCount = new AtomicLong(0);
private CountDownLatch robotCountDownLatch;
public ReaderTaskThread(ReaderTaskParam taskParam) {
@@ -207,7 +211,7 @@ public class ReaderTaskThread extends TaskProcessor<ReaderTaskResult> {
));
}
}
log.info("Mapping relation : \ntable mapper :\n\t{} \ncolumn mapper :\n\t{} ",
log.info("Mapping relation : \ntable mapper : {} \ncolumn mapper :\n\t{} ",
tableNameMapString, String.join("\n\t", columnMapperPairs));
Set<String> valueSet = new HashSet<>(mapChecker.values());
if (valueSet.size() <= 0) {
@@ -288,13 +292,13 @@ public class ReaderTaskThread extends TaskProcessor<ReaderTaskResult> {
&& properties.getTarget().getOnlyCreate()) {
return ReaderTaskResult.builder()
.tableNameMapString(tableNameMapString)
.successCount(1).build().paddingPerf();
.successCount(1).build();
}
if (targetProductType.isLikeHive()) {
return ReaderTaskResult.builder()
.tableNameMapString(tableNameMapString)
.successCount(1).build().paddingPerf();
.successCount(1).build();
}
checkInterrupt();
@@ -305,7 +309,7 @@ public class ReaderTaskThread extends TaskProcessor<ReaderTaskResult> {
&& properties.getTarget().getOnlyCreate()) {
return ReaderTaskResult.builder()
.tableNameMapString(tableNameMapString)
.successCount(1).build().paddingPerf();
.successCount(1).build();
}
checkInterrupt();
@@ -400,8 +404,6 @@ public class ReaderTaskThread extends TaskProcessor<ReaderTaskResult> {
List<Object[]> cache = new LinkedList<>();
long cacheBytes = 0;
long totalCount = 0;
long totalBytes = 0;
try (ResultSet rs = srs.getResultSet()) {
ResultSetMetaData metaData = rs.getMetaData();
while (rs.next()) {
@@ -422,7 +424,7 @@ public class ReaderTaskThread extends TaskProcessor<ReaderTaskResult> {
cache.add(transformer.doTransform(sourceSchemaName, sourceTableName, sourceFields, record));
cacheBytes += bytes;
++totalCount;
totalCount.incrementAndGet();
if (cache.size() >= BATCH_SIZE || cacheBytes >= MAX_CACHE_BYTES_SIZE) {
final long finalCacheBytes = cacheBytes;
@@ -431,7 +433,7 @@ public class ReaderTaskThread extends TaskProcessor<ReaderTaskResult> {
.tableNameMapString(tableNameMapString)
.handler((arg1, arg2) -> {
long ret = tableWriter.write(arg1, arg2);
log.info("[FullCoverSync] handle table [{}] data count: {}, the batch bytes sie: {}",
log.info("[FullCoverSync] handle write table [{}] batch record count: {}, the bytes size: {}",
tableNameMapString, ret, DataSizeUtil.format(finalCacheBytes));
return ret;
})
@@ -440,7 +442,7 @@ public class ReaderTaskThread extends TaskProcessor<ReaderTaskResult> {
.build()
);
cache.clear();
totalBytes += cacheBytes;
totalBytes.addAndGet(cacheBytes);
cacheBytes = 0;
}
}
@@ -452,7 +454,7 @@ public class ReaderTaskThread extends TaskProcessor<ReaderTaskResult> {
.tableNameMapString(tableNameMapString)
.handler((arg1, arg2) -> {
long ret = tableWriter.write(arg1, arg2);
log.info("[FullCoverSync] handle table [{}] data count: {}, the batch bytes sie: {}",
log.info("[FullCoverSync] handle write table [{}] batch record count: {}, the bytes size: {}",
tableNameMapString, ret, DataSizeUtil.format(finalCacheBytes));
return ret;
})
@@ -461,17 +463,19 @@ public class ReaderTaskThread extends TaskProcessor<ReaderTaskResult> {
.build()
);
cache.clear();
totalBytes += cacheBytes;
totalBytes.addAndGet(cacheBytes);
}
log.info("[FullCoverSync] handle table [{}] total data count:{}, total bytes={}",
tableNameMapString, totalCount, DataSizeUtil.format(totalBytes));
} catch (Exception e) {
log.info("[FullCoverSync] handle read table [{}] total record count: {}, total bytes = {}",
tableNameMapString, totalCount.get(), DataSizeUtil.format(totalBytes.get()));
} catch (Throwable e) {
log.warn("[FullCoverSync] handle read table [{}] error: {}", e.getMessage());
if (e instanceof RuntimeException) {
throw (RuntimeException) e;
}
throw new RuntimeException(e);
} finally {
// 如果正在读取大表数据的话这里的close()会很慢
srs.close();
}
@@ -480,10 +484,9 @@ public class ReaderTaskThread extends TaskProcessor<ReaderTaskResult> {
.tableNameMapString(tableNameMapString)
.successCount(1)
.failureCount(0)
.totalBytes(totalBytes)
.recordCount(totalCount)
.build()
.paddingPerf();
.totalBytes(totalBytes.get())
.recordCount(totalCount.get())
.build();
}
/**
@@ -527,13 +530,12 @@ public class ReaderTaskThread extends TaskProcessor<ReaderTaskResult> {
RecordRowChangeCalculator calculator = new DefaultChangeCalculatorService();
calculator.setFetchSize(fetchSize);
calculator.setInterruptCheck(this::checkInterrupt);
calculator.setRecordIdentical(false);
calculator.setCheckJdbcType(false);
AtomicLong totalBytes = new AtomicLong(0);
AtomicLong totalCount = new AtomicLong(0);
// 执行实际的变化同步过程
log.info("[IncreaseSync] Handle table by compare [{}] data now ... ", tableNameMapString);
calculator.executeCalculate(param, new RecordRowHandler() {
private long countInsert = 0;
@@ -591,8 +593,6 @@ public class ReaderTaskThread extends TaskProcessor<ReaderTaskResult> {
doUpdate(fields);
}
log.info("[IncreaseSync] Handle table [{}] data one batch size: {}",
tableNameMapString, DataSizeUtil.format(cacheBytes));
cacheBytes = 0;
}
}
@@ -611,7 +611,7 @@ public class ReaderTaskThread extends TaskProcessor<ReaderTaskResult> {
doUpdate(fields);
}
log.info("[IncreaseSync] Handle table [{}] total count: {}, Insert:{},Update:{},Delete:{} ",
log.info("[IncreaseSync] Handle table by compare [{}] total count: {}, Insert:{},Update:{},Delete:{} ",
tableNameMapString, countTotal, countInsert, countUpdate, countDelete);
}
@@ -621,7 +621,7 @@ public class ReaderTaskThread extends TaskProcessor<ReaderTaskResult> {
.tableNameMapString(tableNameMapString)
.handler((arg1, arg2) -> {
long ret = synchronizer.executeInsert(arg2);
log.info("[IncreaseSync] Handle table [{}] data Insert count: {}", tableNameMapString, ret);
log.info("[IncreaseSync] Handle write table [{}] record Insert count: {}", tableNameMapString, ret);
return ret;
})
.arg1(fields)
@@ -637,7 +637,7 @@ public class ReaderTaskThread extends TaskProcessor<ReaderTaskResult> {
.tableNameMapString(tableNameMapString)
.handler((arg1, arg2) -> {
long ret = synchronizer.executeUpdate(arg2);
log.info("[IncreaseSync] Handle table [{}] data Update count: {}", tableNameMapString, ret);
log.info("[IncreaseSync] Handle write table [{}] record Update count: {}", tableNameMapString, ret);
return ret;
})
.arg1(fields)
@@ -653,7 +653,7 @@ public class ReaderTaskThread extends TaskProcessor<ReaderTaskResult> {
.tableNameMapString(tableNameMapString)
.handler((arg1, arg2) -> {
long ret = synchronizer.executeDelete(arg2);
log.info("[IncreaseSync] Handle table [{}] data Delete count: {}", tableNameMapString, ret);
log.info("[IncreaseSync] Handle write table [{}] record Delete count: {}", tableNameMapString, ret);
return ret;
})
.arg1(fields)
@@ -672,8 +672,7 @@ public class ReaderTaskThread extends TaskProcessor<ReaderTaskResult> {
.failureCount(0)
.recordCount(totalCount.get())
.totalBytes(totalBytes.get())
.build()
.paddingPerf();
.build();
}
/**
@@ -721,15 +720,15 @@ public class ReaderTaskThread extends TaskProcessor<ReaderTaskResult> {
@Override
protected ReaderTaskResult exceptProcess(Throwable t) {
log.error("Error migration for table: {}.{}, error message: {}",
tableDescription.getSchemaName(), tableDescription.getTableName(), t.getMessage());
tableDescription.getSchemaName(), tableDescription.getTableName(), t);
return ReaderTaskResult.builder()
.tableNameMapString(tableNameMapString)
.successCount(0)
.failureCount(1)
.recordCount(0)
.totalBytes(0)
.recordCount(totalCount.get())
.totalBytes(totalBytes.get())
.throwable(t)
.build()
.paddingPerf();
.build();
}
@Override

View File

@@ -44,7 +44,9 @@ public class WriterTaskThread extends TaskProcessor<WriterTaskResult> {
try {
BatchElement elem;
while ((elem = memChannel.poll()) != null || robotReader.getRemainingCount() > 0) {
checkInterrupt();
if (Thread.currentThread().isInterrupted()) {
break;
}
if (null != elem) {
try {
Long ret = Long.valueOf(elem.getArg2().size());
@@ -53,14 +55,11 @@ public class WriterTaskThread extends TaskProcessor<WriterTaskResult> {
Long total = ret + Optional.ofNullable(count).orElse(0L);
taskResult.getPerf().put(elem.getTableNameMapString(), total);
} catch (Throwable t) {
log.error("Failed to write table {}", elem.getTableNameMapString(), t);
throw t;
taskResult.setSuccess(false);
taskResult.getExcept().putIfAbsent(elem.getTableNameMapString(), t);
}
}
}
} catch (Throwable t) {
taskResult.setSuccess(false);
taskResult.setThrowable(t);
} finally {
stopWatch.stop();
taskResult.setDuration(stopWatch.getTotalTimeMillis());
@@ -69,7 +68,6 @@ public class WriterTaskThread extends TaskProcessor<WriterTaskResult> {
}
public WriterTaskResult exceptProcess(Throwable t) {
// 代码不会执行到这里
return WriterTaskResult.builder()
.success(false)
.duration(0)

View File

@@ -14,10 +14,12 @@ import com.gitee.dbswitch.core.exchange.AbstractBatchExchanger;
import com.gitee.dbswitch.core.robot.RobotReader;
import com.gitee.dbswitch.core.robot.RobotWriter;
import com.gitee.dbswitch.data.domain.ComparePerfStat;
import com.gitee.dbswitch.data.domain.ExceptPerfStat;
import com.gitee.dbswitch.data.domain.ReaderPerfStat;
import com.gitee.dbswitch.data.domain.ReaderTaskResult;
import com.gitee.dbswitch.data.domain.WriterPerfStat;
import com.gitee.dbswitch.data.domain.WriterTaskResult;
import com.google.common.collect.Sets;
import java.util.List;
import java.util.Optional;
import org.springframework.core.task.AsyncTaskExecutor;
@@ -31,9 +33,12 @@ public class DefaultBatchExchanger extends AbstractBatchExchanger {
private List<PrintablePerfStat> perfStats;
public DefaultBatchExchanger(AsyncTaskExecutor readExecutor,
AsyncTaskExecutor writeExecutor, List<PrintablePerfStat> perfStats) {
super(readExecutor, writeExecutor);
public DefaultBatchExchanger(
AsyncTaskExecutor readExecutor,
AsyncTaskExecutor writeExecutor,
int channelMaxSize,
List<PrintablePerfStat> perfStats) {
super(readExecutor, writeExecutor, channelMaxSize);
this.perfStats = perfStats;
}
@@ -42,25 +47,28 @@ public class DefaultBatchExchanger extends AbstractBatchExchanger {
Throwable throwable = null;
Optional<ReaderTaskResult> readResult = reader.getWorkResult();
Optional<WriterTaskResult> writeResult = writer.getWorkResult();
if (readResult.isPresent()) {
ReaderTaskResult r = readResult.get();
long total = r.getSuccessCount() + r.getFailureCount();
perfStats.add(new ReaderPerfStat(total, r.getFailureCount(), r.getTotalBytes(), r.getThrowable()));
if (null == throwable && null != r.getThrowable()) {
throwable = r.getThrowable();
}
}
if (writeResult.isPresent()) {
WriterTaskResult w = writeResult.get();
perfStats.add(new WriterPerfStat(w.getDuration(), w.getThrowable()));
if (null == throwable && null != w.getThrowable()) {
throwable = w.getThrowable();
}
}
if (readResult.isPresent() && writeResult.isPresent()) {
ReaderTaskResult r = readResult.get();
WriterTaskResult w = writeResult.get();
long total = r.getSuccessCount() + r.getFailureCount();
long failure = Sets.union(r.getExcept().keySet(), w.getExcept().keySet()).size();
perfStats.add(new ReaderPerfStat(total, failure, r.getTotalBytes()));
perfStats.add(new WriterPerfStat(w.getDuration()));
perfStats.add(new ComparePerfStat(r.getPerf(), w.getPerf()));
perfStats.add(new ExceptPerfStat(r.getExcept(), w.getExcept()));
throwable = (null != w.getThrowable()) ? w.getThrowable() : r.getThrowable();
} else {
if (readResult.isPresent()) {
ReaderTaskResult r = readResult.get();
long total = r.getSuccessCount() + r.getFailureCount();
perfStats.add(new ReaderPerfStat(total, r.getFailureCount(), r.getTotalBytes()));
throwable = r.getThrowable();
}
if (writeResult.isPresent()) {
WriterTaskResult w = writeResult.get();
perfStats.add(new WriterPerfStat(w.getDuration()));
throwable = w.getThrowable();
}
}
return throwable;
}

View File

@@ -38,7 +38,6 @@ import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.core.task.AsyncTaskExecutor;
/**
@@ -70,22 +69,8 @@ public class DefaultReaderRobot extends RobotReader<ReaderTaskResult> {
@Override
public void interrupt() {
super.interrupt();
if (CollectionUtils.isNotEmpty(readTaskThreads)) {
for (Supplier supplier : readTaskThreads) {
if (supplier instanceof ReaderTaskThread) {
ReaderTaskThread thread = (ReaderTaskThread) supplier;
thread.interrupt();
} else if (supplier instanceof LoggingSupplier) {
LoggingSupplier loggingSupplier = (LoggingSupplier) supplier;
Supplier realSupplier = loggingSupplier.getCommand();
if (realSupplier instanceof ReaderTaskThread) {
ReaderTaskThread thread = (ReaderTaskThread) realSupplier;
thread.interrupt();
}
}
}
}
Optional.ofNullable(futures).orElseGet(ArrayList::new).forEach(f -> f.cancel(true));
this.clearChannel();
}
@Override
@@ -146,7 +131,6 @@ public class DefaultReaderRobot extends RobotReader<ReaderTaskResult> {
log.info("Source schema names is :{}", JsonUtils.toJsonString(schemas));
for (String schema : schemas) {
checkInterrupt();
List<TableDescription> tableList = sourceMetaDataService.queryTableList(schema);
if (tableList.isEmpty()) {
log.warn("### Find source database table list empty for schema name is : {}", schema);
@@ -184,11 +168,8 @@ public class DefaultReaderRobot extends RobotReader<ReaderTaskResult> {
public void startRead() {
futures = new ArrayList<>(readTaskThreads.size());
readTaskThreads.forEach(
task -> {
checkInterrupt();
futures.add(CompletableFuture.supplyAsync(task, threadExecutor)
);
}
task ->
futures.add(CompletableFuture.supplyAsync(task, threadExecutor))
);
}
@@ -202,14 +183,20 @@ public class DefaultReaderRobot extends RobotReader<ReaderTaskResult> {
return futures.stream().map(CompletableFuture::join)
.filter(Objects::nonNull)
.map(f -> (ReaderTaskResult) f)
.peek(f -> f.padding())
.reduce(
(r1, r2) -> {
Map<String, Long> perf = Maps.newHashMap(r1.getPerf());
if (r2.getSuccessCount() > 0) {
perf.put(r2.getTableNameMapString(), r2.getRecordCount());
}
Map<String, Throwable> except = Maps.newHashMap(r1.getExcept());
if (r2.getExcept().size() > 0) {
except.putAll(r2.getExcept());
}
return ReaderTaskResult.builder()
.perf(perf)
.except(except)
.successCount(r1.getSuccessCount() + r2.getSuccessCount())
.failureCount(r1.getFailureCount() + r2.getFailureCount())
.recordCount(r1.getRecordCount() + r2.getRecordCount())

View File

@@ -26,7 +26,6 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.core.task.AsyncTaskExecutor;
/**
@@ -39,13 +38,13 @@ public class DefaultWriterRobot extends RobotWriter<WriterTaskResult> {
private final MdcKeyValue mdcKeyValue;
private final RobotReader robotReader;
private final int writeThreadNum;
private final int writeThreadNum; // 4 <= writeThreadNum <= 8
private AsyncTaskExecutor threadExecutor;
private List<Supplier> writeTaskThreads;
private List<CompletableFuture> futures;
public DefaultWriterRobot(MdcKeyValue mdcKeyValue, RobotReader robotReader, int writeThreadNum) {
ExamineUtils.checkState(writeThreadNum > 0, "writeThreadNum(%d) must more than zero", writeThreadNum);
ExamineUtils.checkArgument(writeThreadNum > 0, "writeThreadNum(%s) must >0 ", writeThreadNum);
this.mdcKeyValue = mdcKeyValue;
this.robotReader = robotReader;
this.writeThreadNum = writeThreadNum;
@@ -53,22 +52,7 @@ public class DefaultWriterRobot extends RobotWriter<WriterTaskResult> {
@Override
public void interrupt() {
super.interrupt();
if (CollectionUtils.isNotEmpty(writeTaskThreads)) {
for (Supplier supplier : writeTaskThreads) {
if (supplier instanceof WriterTaskThread) {
WriterTaskThread thread = (WriterTaskThread) supplier;
thread.interrupt();
} else if (supplier instanceof LoggingSupplier) {
LoggingSupplier loggingSupplier = (LoggingSupplier) supplier;
Supplier realSupplier = loggingSupplier.getCommand();
if (realSupplier instanceof WriterTaskThread) {
WriterTaskThread thread = (WriterTaskThread) realSupplier;
thread.interrupt();
}
}
}
}
Optional.ofNullable(futures).orElseGet(ArrayList::new).forEach(f -> f.cancel(true));
}
@Override
@@ -82,7 +66,6 @@ public class DefaultWriterRobot extends RobotWriter<WriterTaskResult> {
.memChannel(robotReader.getChannel())
.build();
for (int i = 0; i < writeThreadNum; ++i) {
checkInterrupt();
if (Objects.nonNull(mdcKeyValue)) {
writeTaskThreads.add(new LoggingSupplier(new WriterTaskThread(param), mdcKeyValue));
} else {
@@ -95,18 +78,13 @@ public class DefaultWriterRobot extends RobotWriter<WriterTaskResult> {
public void startWrite() {
futures = new ArrayList<>(writeTaskThreads.size());
writeTaskThreads.forEach(
task -> {
checkInterrupt();
futures.add(
CompletableFuture.supplyAsync(task, threadExecutor)
);
}
task ->
futures.add(CompletableFuture.supplyAsync(task, threadExecutor))
);
}
@Override
public void waitForFinish() {
checkInterrupt();
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
}
@@ -115,6 +93,7 @@ public class DefaultWriterRobot extends RobotWriter<WriterTaskResult> {
return futures.stream().map(CompletableFuture::join)
.filter(Objects::nonNull)
.map(f -> (WriterTaskResult) f)
.peek(f -> f.padding())
.reduce(
(r1, r2) -> {
Map<String, Long> perf = Maps.newHashMap(r1.getPerf());
@@ -124,8 +103,13 @@ public class DefaultWriterRobot extends RobotWriter<WriterTaskResult> {
perf.put(k, count);
});
}
Map<String, Throwable> except = Maps.newHashMap(r1.getExcept());
if (r2.getExcept().size() > 0) {
except.putAll(r2.getExcept());
}
return WriterTaskResult.builder()
.perf(perf)
.except(except)
.success(r1.isSuccess() && r2.isSuccess())
.duration(Math.max(r1.getDuration(), r2.getDuration()))
.throwable(Objects.nonNull(r1.getThrowable()) ? r1.getThrowable() : r2.getThrowable())

View File

@@ -46,8 +46,8 @@ public class MigrationService {
* 配置参数
*/
private final DbswichPropertiesConfiguration configuration;
private final AsyncTaskExecutor tableReadExecutor;
private final AsyncTaskExecutor tableWriteExecutor;
private final AsyncTaskExecutor readExecutor;
private final AsyncTaskExecutor writeExecutor;
private RobotReader robotReader;
private RobotWriter robotWriter;
@@ -66,8 +66,8 @@ public class MigrationService {
AsyncTaskExecutor tableReadExecutor,
AsyncTaskExecutor tableWriteExecutor) {
this.configuration = Objects.requireNonNull(properties, "properties is null");
this.tableReadExecutor = Objects.requireNonNull(tableReadExecutor, "tableReadExecutor is null");
this.tableWriteExecutor = Objects.requireNonNull(tableWriteExecutor, "tableWriteExecutor is null");
this.readExecutor = Objects.requireNonNull(tableReadExecutor, "tableReadExecutor is null");
this.writeExecutor = Objects.requireNonNull(tableWriteExecutor, "tableWriteExecutor is null");
}
public void setMdcKeyValue(MdcKeyValue mdcKeyValue) {
@@ -109,12 +109,13 @@ public class MigrationService {
log.info(MachineInfoUtils.getOSInfo());
//log.info("input configuration \n{}", JsonUtils.toJsonString(configuration));
GlobalParamConfigProperties config = configuration.getConfig();
AbstractBatchExchanger exchanger = new DefaultBatchExchanger(tableReadExecutor, tableWriteExecutor, perfStats);
GlobalParamConfigProperties globalParam = configuration.getConfig();
int maxQueueSize = globalParam.getChannelQueueSize();
AbstractBatchExchanger exchanger = new DefaultBatchExchanger(readExecutor, writeExecutor, maxQueueSize, perfStats);
try (CloseableDataSource targetDataSource = DataSourceUtils.createTargetDataSource(configuration.getTarget())) {
try (CloseableDataSource sourceDataSource = DataSourceUtils.createSourceDataSource(configuration.getSource())) {
robotReader = new DefaultReaderRobot(mdcKeyValue, configuration, sourceDataSource, targetDataSource);
robotWriter = new DefaultWriterRobot(mdcKeyValue, robotReader, config.getWriteThreadNum());
robotWriter = new DefaultWriterRobot(mdcKeyValue, robotReader, globalParam.getWriteThreadNum());
exchanger.exchange(robotReader, robotWriter);
}
} catch (Throwable t) {

View File

@@ -5,7 +5,7 @@
<parent>
<groupId>com.gitee.dbswitch</groupId>
<artifactId>dbswitch-parent</artifactId>
<version>1.9.0</version>
<version>1.9.1</version>
</parent>
<artifactId>dbswitch-dist</artifactId>

View File

@@ -5,7 +5,7 @@
<parent>
<artifactId>dbswitch-product</artifactId>
<groupId>com.gitee.dbswitch</groupId>
<version>1.9.0</version>
<version>1.9.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@@ -5,7 +5,7 @@
<parent>
<artifactId>dbswitch-product</artifactId>
<groupId>com.gitee.dbswitch</groupId>
<version>1.9.0</version>
<version>1.9.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@@ -5,7 +5,7 @@
<parent>
<artifactId>dbswitch-product</artifactId>
<groupId>com.gitee.dbswitch</groupId>
<version>1.9.0</version>
<version>1.9.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dbswitch-product-dm</artifactId>

View File

@@ -5,7 +5,7 @@
<parent>
<artifactId>dbswitch-product</artifactId>
<groupId>com.gitee.dbswitch</groupId>
<version>1.9.0</version>
<version>1.9.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@@ -5,7 +5,7 @@
<parent>
<artifactId>dbswitch-product</artifactId>
<groupId>com.gitee.dbswitch</groupId>
<version>1.9.0</version>
<version>1.9.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@@ -5,7 +5,7 @@
<parent>
<artifactId>dbswitch-product</artifactId>
<groupId>com.gitee.dbswitch</groupId>
<version>1.9.0</version>
<version>1.9.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@@ -5,7 +5,7 @@
<parent>
<artifactId>dbswitch-product</artifactId>
<groupId>com.gitee.dbswitch</groupId>
<version>1.9.0</version>
<version>1.9.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@@ -5,7 +5,7 @@
<parent>
<artifactId>dbswitch-product</artifactId>
<groupId>com.gitee.dbswitch</groupId>
<version>1.9.0</version>
<version>1.9.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dbswitch-product-mongodb</artifactId>

View File

@@ -5,7 +5,7 @@
<parent>
<artifactId>dbswitch-product</artifactId>
<groupId>com.gitee.dbswitch</groupId>
<version>1.9.0</version>
<version>1.9.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dbswitch-product-mysql</artifactId>

View File

@@ -5,7 +5,7 @@
<parent>
<artifactId>dbswitch-product</artifactId>
<groupId>com.gitee.dbswitch</groupId>
<version>1.9.0</version>
<version>1.9.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@@ -5,7 +5,7 @@
<parent>
<artifactId>dbswitch-product</artifactId>
<groupId>com.gitee.dbswitch</groupId>
<version>1.9.0</version>
<version>1.9.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@@ -5,7 +5,7 @@
<parent>
<artifactId>dbswitch-product</artifactId>
<groupId>com.gitee.dbswitch</groupId>
<version>1.9.0</version>
<version>1.9.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@@ -5,7 +5,7 @@
<parent>
<artifactId>dbswitch-product</artifactId>
<groupId>com.gitee.dbswitch</groupId>
<version>1.9.0</version>
<version>1.9.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@@ -10,12 +10,12 @@
package com.gitee.dbswitch.product.postgresql;
import com.gitee.dbswitch.common.consts.Constants;
import com.gitee.dbswitch.common.util.DDLFormatterUtils;
import com.gitee.dbswitch.provider.ProductFactoryProvider;
import com.gitee.dbswitch.provider.meta.AbstractMetadataProvider;
import com.gitee.dbswitch.schema.ColumnDescription;
import com.gitee.dbswitch.schema.ColumnMetaData;
import com.gitee.dbswitch.schema.TableDescription;
import com.gitee.dbswitch.common.util.DDLFormatterUtils;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
@@ -27,6 +27,7 @@ import java.util.Set;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
@Slf4j
public class PostgresMetadataQueryProvider extends AbstractMetadataProvider {
@@ -182,6 +183,9 @@ public class PostgresMetadataQueryProvider extends AbstractMetadataProvider {
} else {
retval += "DOUBLE PRECISION";
}
if (v.isHaveDefault()) {
retval += " DEFAULT " + NumberUtils.toDouble(v.getDefaultValue());
}
} else {
if (length > 9) {
retval += "BIGINT";
@@ -192,13 +196,16 @@ public class PostgresMetadataQueryProvider extends AbstractMetadataProvider {
retval += "INTEGER";
}
}
if (v.isHaveDefault()) {
retval += " DEFAULT " + NumberUtils.toInt(v.getDefaultValue());
}
}
} else {
retval += "DOUBLE PRECISION";
}
if (v.isHaveDefault()) {
retval += " DEFAULT " + Integer.valueOf(v.getDefaultValue());
if (v.isHaveDefault()) {
retval += " DEFAULT " + NumberUtils.toDouble(v.getDefaultValue());
}
}
}
break;

View File

@@ -5,7 +5,7 @@
<parent>
<artifactId>dbswitch-product</artifactId>
<groupId>com.gitee.dbswitch</groupId>
<version>1.9.0</version>
<version>1.9.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@@ -5,7 +5,7 @@
<parent>
<artifactId>dbswitch-product</artifactId>
<groupId>com.gitee.dbswitch</groupId>
<version>1.9.0</version>
<version>1.9.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@@ -5,7 +5,7 @@
<parent>
<artifactId>dbswitch-product</artifactId>
<groupId>com.gitee.dbswitch</groupId>
<version>1.9.0</version>
<version>1.9.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@@ -5,7 +5,7 @@
<parent>
<artifactId>dbswitch-product</artifactId>
<groupId>com.gitee.dbswitch</groupId>
<version>1.9.0</version>
<version>1.9.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@@ -5,7 +5,7 @@
<parent>
<artifactId>dbswitch-parent</artifactId>
<groupId>com.gitee.dbswitch</groupId>
<version>1.9.0</version>
<version>1.9.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dbswitch-product</artifactId>

View File

@@ -4,7 +4,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.gitee.dbswitch</groupId>
<artifactId>dbswitch-parent</artifactId>
<version>1.9.0</version>
<version>1.9.1</version>
<packaging>pom</packaging>
<name>dbswitch</name>
<description>database switch project</description>

View File

@@ -1,6 +1,6 @@
@echo off
set APP_VERSION=1.9.0
set APP_VERSION=1.9.1
echo "Clean Project ..."
call mvn clean -f pom.xml