mirror of
https://gitee.com/dromara/dbswitch.git
synced 2025-10-14 13:50:24 +00:00
故障问题修复
This commit is contained in:
@@ -17,13 +17,15 @@ public class IncrementPoint {
|
|||||||
|
|
||||||
private String columnName;
|
private String columnName;
|
||||||
private Object maxValue;
|
private Object maxValue;
|
||||||
|
private int jdbcType;
|
||||||
|
|
||||||
private IncrementPoint() {
|
private IncrementPoint() {
|
||||||
}
|
}
|
||||||
|
|
||||||
public IncrementPoint(String columnName, Object maxValue) {
|
public IncrementPoint(String columnName, Object maxValue, int jdbcType) {
|
||||||
this.columnName = columnName;
|
this.columnName = columnName;
|
||||||
this.maxValue = maxValue;
|
this.maxValue = maxValue;
|
||||||
|
this.jdbcType = jdbcType;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isWorkable() {
|
public boolean isWorkable() {
|
||||||
@@ -45,4 +47,12 @@ public class IncrementPoint {
|
|||||||
public void setMaxValue(Object maxValue) {
|
public void setMaxValue(Object maxValue) {
|
||||||
this.maxValue = maxValue;
|
this.maxValue = maxValue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getJdbcType() {
|
||||||
|
return jdbcType;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setJdbcType(int jdbcType) {
|
||||||
|
this.jdbcType = jdbcType;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@@ -23,6 +23,7 @@ import org.apache.commons.lang3.StringUtils;
|
|||||||
import org.dromara.dbswitch.common.consts.Constants;
|
import org.dromara.dbswitch.common.consts.Constants;
|
||||||
import org.dromara.dbswitch.common.entity.IncrementPoint;
|
import org.dromara.dbswitch.common.entity.IncrementPoint;
|
||||||
import org.dromara.dbswitch.common.entity.ResultSetWrapper;
|
import org.dromara.dbswitch.common.entity.ResultSetWrapper;
|
||||||
|
import org.dromara.dbswitch.common.util.JdbcTypesUtils;
|
||||||
import org.dromara.dbswitch.common.util.ObjectCastUtils;
|
import org.dromara.dbswitch.common.util.ObjectCastUtils;
|
||||||
import org.dromara.dbswitch.core.features.ProductFeatures;
|
import org.dromara.dbswitch.core.features.ProductFeatures;
|
||||||
import org.dromara.dbswitch.core.provider.AbstractCommonProvider;
|
import org.dromara.dbswitch.core.provider.AbstractCommonProvider;
|
||||||
@@ -66,10 +67,7 @@ public class DefaultTableDataQueryProvider
|
|||||||
sb.append(" FROM ");
|
sb.append(" FROM ");
|
||||||
sb.append(quoteSchemaTableName(schemaName, tableName));
|
sb.append(quoteSchemaTableName(schemaName, tableName));
|
||||||
if (IncrementPoint.EMPTY != point && point.isWorkable()) {
|
if (IncrementPoint.EMPTY != point && point.isWorkable()) {
|
||||||
sb.append(" WHERE ");
|
sb.append(" WHERE ").append(toGreaterThanCondition(point));
|
||||||
sb.append(quoteName(point.getColumnName()));
|
|
||||||
sb.append(" > ");
|
|
||||||
sb.append(point.getMaxValue());
|
|
||||||
}
|
}
|
||||||
if (CollectionUtils.isNotEmpty(orders)) {
|
if (CollectionUtils.isNotEmpty(orders)) {
|
||||||
sb.append(" ORDER BY ");
|
sb.append(" ORDER BY ");
|
||||||
@@ -79,6 +77,18 @@ public class DefaultTableDataQueryProvider
|
|||||||
return this.selectTableData(sb.toString(), features.convertFetchSize(this.fetchSize));
|
return this.selectTableData(sb.toString(), features.convertFetchSize(this.fetchSize));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected String toGreaterThanCondition(IncrementPoint point) {
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
sb.append(quoteName(point.getColumnName()));
|
||||||
|
sb.append(" > ");
|
||||||
|
if (JdbcTypesUtils.isInteger(point.getJdbcType())) {
|
||||||
|
sb.append(point.getMaxValue());
|
||||||
|
} else {
|
||||||
|
sb.append("'").append(point.getMaxValue()).append("'");
|
||||||
|
}
|
||||||
|
return sb.toString();
|
||||||
|
}
|
||||||
|
|
||||||
protected ResultSetWrapper selectTableData(String sql, int fetchSize) {
|
protected ResultSetWrapper selectTableData(String sql, int fetchSize) {
|
||||||
if (log.isDebugEnabled()) {
|
if (log.isDebugEnabled()) {
|
||||||
log.debug("Query table data sql :{}", sql);
|
log.debug("Query table data sql :{}", sql);
|
||||||
|
@@ -349,17 +349,20 @@ public class ReaderTaskThread extends TaskProcessor<ReaderTaskResult> {
|
|||||||
String incrSourceColumnName = this.incrTableColumns.get(sourceTableName);
|
String incrSourceColumnName = this.incrTableColumns.get(sourceTableName);
|
||||||
String incrTargetColumnName = mapChecker.get(incrSourceColumnName);
|
String incrTargetColumnName = mapChecker.get(incrSourceColumnName);
|
||||||
if (org.apache.commons.lang3.StringUtils.isBlank(incrTargetColumnName)) {
|
if (org.apache.commons.lang3.StringUtils.isBlank(incrTargetColumnName)) {
|
||||||
throw new RuntimeException("增量字段在目标端表中不存在");
|
throw new RuntimeException(
|
||||||
|
String.format("增量字段[%s]在目标端表[%s]中不存在",
|
||||||
|
incrTargetColumnName, targetTableName)
|
||||||
|
);
|
||||||
}
|
}
|
||||||
MetadataService service = new DefaultMetadataService(targetDataSource, targetProductType);
|
MetadataService service = new DefaultMetadataService(targetDataSource, targetProductType);
|
||||||
ColumnValue columnValue = service.queryIncrementPoint(targetSchemaName, targetTableName, incrTargetColumnName);
|
ColumnValue columnValue = service.queryIncrementPoint(targetSchemaName, targetTableName, incrTargetColumnName);
|
||||||
IncrementPoint incrPoint = IncrementPoint.EMPTY;
|
IncrementPoint incrPoint = IncrementPoint.EMPTY;
|
||||||
if (null != columnValue) {
|
if (null != columnValue) {
|
||||||
if (!JdbcTypesUtils.isInteger(columnValue.getJdbcType())
|
if (!JdbcTypesUtils.isIncrement(columnValue.getJdbcType())) {
|
||||||
&& !JdbcTypesUtils.isDateTime(columnValue.getJdbcType())) {
|
throw new RuntimeException(String.format("增量字段[%s]必须为整型或时间戳类型,而实际为:%s",
|
||||||
throw new RuntimeException("增量字段必须为整型或时间类型");
|
incrSourceColumnName, JdbcTypesUtils.resolveTypeName(columnValue.getJdbcType())));
|
||||||
}
|
}
|
||||||
incrPoint = new IncrementPoint(incrSourceColumnName, columnValue.getValue());
|
incrPoint = new IncrementPoint(incrSourceColumnName, columnValue.getValue(), columnValue.getJdbcType());
|
||||||
}
|
}
|
||||||
log.info("Table: {}.{} has increment column: {}", sourceSchemaName, sourceTableName, incrSourceColumnName);
|
log.info("Table: {}.{} has increment column: {}", sourceSchemaName, sourceTableName, incrSourceColumnName);
|
||||||
return doFullCoverSynchronize(targetWriter, targetTableManager, sourceQuerier, transformProvider, incrPoint);
|
return doFullCoverSynchronize(targetWriter, targetTableManager, sourceQuerier, transformProvider, incrPoint);
|
||||||
|
@@ -16,6 +16,7 @@ import org.dromara.dbswitch.core.features.ProductFeatures;
|
|||||||
import org.dromara.dbswitch.core.provider.AbstractFactoryProvider;
|
import org.dromara.dbswitch.core.provider.AbstractFactoryProvider;
|
||||||
import org.dromara.dbswitch.core.provider.meta.MetadataProvider;
|
import org.dromara.dbswitch.core.provider.meta.MetadataProvider;
|
||||||
import org.dromara.dbswitch.core.provider.manage.TableManageProvider;
|
import org.dromara.dbswitch.core.provider.manage.TableManageProvider;
|
||||||
|
import org.dromara.dbswitch.core.provider.query.TableDataQueryProvider;
|
||||||
import org.dromara.dbswitch.core.provider.sync.TableDataSynchronizeProvider;
|
import org.dromara.dbswitch.core.provider.sync.TableDataSynchronizeProvider;
|
||||||
import org.dromara.dbswitch.core.provider.write.TableDataWriteProvider;
|
import org.dromara.dbswitch.core.provider.write.TableDataWriteProvider;
|
||||||
import javax.sql.DataSource;
|
import javax.sql.DataSource;
|
||||||
@@ -37,6 +38,11 @@ public class OracleFactoryProvider extends AbstractFactoryProvider {
|
|||||||
return new OracleMetadataQueryProvider(this);
|
return new OracleMetadataQueryProvider(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TableDataQueryProvider createTableDataQueryProvider() {
|
||||||
|
return new OracleTableDataQueryProvider(this);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TableDataWriteProvider createTableDataWriteProvider(boolean useInsert) {
|
public TableDataWriteProvider createTableDataWriteProvider(boolean useInsert) {
|
||||||
return new OracleTableDataWriteProvider(this);
|
return new OracleTableDataWriteProvider(this);
|
||||||
|
@@ -0,0 +1,43 @@
|
|||||||
|
// Copyright tang. All rights reserved.
|
||||||
|
// https://gitee.com/inrgihc/dbswitch
|
||||||
|
//
|
||||||
|
// Use of this source code is governed by a BSD-style license
|
||||||
|
//
|
||||||
|
// Author: tang (inrgihc@126.com)
|
||||||
|
// Date : 2020/1/2
|
||||||
|
// Location: beijing , china
|
||||||
|
/////////////////////////////////////////////////////////////
|
||||||
|
package org.dromara.dbswitch.product.oracle;
|
||||||
|
|
||||||
|
import java.sql.Types;
|
||||||
|
import org.dromara.dbswitch.common.entity.IncrementPoint;
|
||||||
|
import org.dromara.dbswitch.common.util.JdbcTypesUtils;
|
||||||
|
import org.dromara.dbswitch.core.provider.ProductFactoryProvider;
|
||||||
|
import org.dromara.dbswitch.core.provider.query.DefaultTableDataQueryProvider;
|
||||||
|
|
||||||
|
public class OracleTableDataQueryProvider extends DefaultTableDataQueryProvider {
|
||||||
|
|
||||||
|
private static final String TIMESTAMP_PATTERN = "yyyy-mm-dd hh24:mi:ss.ff";
|
||||||
|
private static final String DATE_PATTERN = "yyyy-mm-dd hh24:mi:ss";
|
||||||
|
|
||||||
|
public OracleTableDataQueryProvider(ProductFactoryProvider factoryProvider) {
|
||||||
|
super(factoryProvider);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected String toGreaterThanCondition(IncrementPoint point) {
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
sb.append(quoteName(point.getColumnName()));
|
||||||
|
sb.append(" > ");
|
||||||
|
if (JdbcTypesUtils.isInteger(point.getJdbcType())) {
|
||||||
|
sb.append(point.getMaxValue());
|
||||||
|
} else if (JdbcTypesUtils.isDateTime(point.getJdbcType())) {
|
||||||
|
if (Types.TIMESTAMP == point.getJdbcType() || Types.TIMESTAMP_WITH_TIMEZONE == point.getJdbcType()) {
|
||||||
|
sb.append(String.format("TO_TIMESTAMP('%s', '%s')", point.getMaxValue(), TIMESTAMP_PATTERN));
|
||||||
|
} else {
|
||||||
|
sb.append(String.format("TO_DATE('%s', '%s')", point.getMaxValue(), DATE_PATTERN));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return sb.toString();
|
||||||
|
}
|
||||||
|
}
|
Reference in New Issue
Block a user