实现I69LX2,支持查看job的执行日志

This commit is contained in:
inrgihc
2023-02-03 22:50:42 +08:00
parent 5c3a8c2003
commit 7aa7086142
47 changed files with 770 additions and 70 deletions

View File

@@ -39,36 +39,13 @@
<el-form label-position="left" <el-form label-position="left"
inline inline
class="demo-table-expand"> class="demo-table-expand">
<el-form-item label="JOB编号:"> <el-form-item label="执行日志:">
<span>{{ props.row.jobId }}</span>
</el-form-item>
<el-form-item label="调度方式:">
<span>{{ props.row.scheduleMode }}</span>
</el-form-item>
<el-form-item label="开始时间:">
<span>{{ props.row.startTime }}</span>
</el-form-item>
<el-form-item label="结束时间:">
<span>{{ props.row.finishTime }}</span>
</el-form-item>
<el-form-item label="执行状态:">
<span>{{ props.row.jobStatus }}</span>
</el-form-item>
<el-form-item label="操作:">
<el-button size="small" <el-button size="small"
type="danger" type="danger"
v-if="props.row.status=='1'" @click="handleShowJobLogs(props.row.jobId)">
@click="handleCancelJob(props.row.jobId)"> 查看
停止
</el-button> </el-button>
</el-form-item> </el-form-item>
<el-form-item label="异常日志:">
<el-input type="textarea"
style="font-size:12px;width: 700px"
:autosize="{ minRows: 2, maxRows: 5}"
v-model="props.row.errorLog">
</el-input>
</el-form-item>
</el-form> </el-form>
</template> </template>
</el-table-column> </el-table-column>
@@ -105,6 +82,61 @@
:total="totalCount"></el-pagination> :total="totalCount"></el-pagination>
</div> </div>
</div> </div>
<el-dialog title="日志详情"
:visible.sync="dialogShowLogVisible"
:showClose="false"
:before-close="handleClose">
<el-alert v-if="status===0"
title="执行状态:未执行"
type="info"
center
show-icon>
</el-alert>
<el-alert v-if="status===1"
title="执行状态:执行中"
type="success"
center
show-icon>
</el-alert>
<el-alert v-if="status===2"
title="执行状态:执行失败"
type="error"
center
show-icon>
</el-alert>
<el-alert v-if="status===3"
title="执行状态:执行成功"
type="success"
center
show-icon>
</el-alert>
<el-alert v-if="status===4"
title="执行状态:手动取消"
type="warning"
center
show-icon>
</el-alert>
<el-input type="textarea"
id="log_textarea_id"
class="log_textarea_style"
:rows="20"
v-model="logContent">
</el-input>
<div slot="footer"
class="dialog-footer">
<el-button size="small"
id="butten_cancel_id"
type="danger"
v-if="status=='1'"
@click="handleCancelJob(jobId)">
</el-button>
<el-button size="small"
type="success"
@click="handleCloseLogDialog"> </el-button>
</div>
</el-dialog>
</div> </div>
</el-card> </el-card>
</div> </div>
@@ -128,6 +160,12 @@ export default {
jobScheduleTime: '', jobScheduleTime: '',
isActive: -1, isActive: -1,
array: [], array: [],
dialogShowLogVisible: false,
logContent: "",
jobId: 0,
baseId: 0,
status: 0,
timer: null,
}; };
}, },
methods: { methods: {
@@ -192,6 +230,10 @@ export default {
"/dbswitch/admin/api/v1/ops/job/cancel?id=" + jobId "/dbswitch/admin/api/v1/ops/job/cancel?id=" + jobId
).then(res => { ).then(res => {
if (0 === res.data.code) { if (0 === res.data.code) {
// 禁用取消按钮
var cancelButton = document.getElementById('butten_cancel_id');
cancelButton.value = "已取消"
cancelButton.disabled = true;
this.$message("停止JOB成功"); this.$message("停止JOB成功");
this.loadJobsData(); this.loadJobsData();
} else { } else {
@@ -200,11 +242,73 @@ export default {
} }
} }
}); });
},
handleShowJobLogs: function (jobId) {
this.dialogShowLogVisible = true
this.jobId = jobId;
this.$http.get(
"/dbswitch/admin/api/v1/ops/job/logs/tail?id=" + jobId + "&size=500"
).then(res => {
//console.log(res.data)
if (0 === res.data.code) {
let lists = res.data.data.logs
this.status = res.data.data.status;
this.baseId = res.data.data.maxId;
this.logContent = lists.join("")
this.scrollMaxheight();
if (1 === res.data.data.status) {
this.timer = setInterval(() => {
this.timerRefreshLogs();
}, 1000);// 每隔1s定时刷新
}
} else {
if (res.data.message) {
alert("加载JOB执行日志失败," + res.data.message);
}
}
});
},
timerRefreshLogs: function () {
//console.log("time run ...");
this.$http.get(
"/dbswitch/admin/api/v1/ops/job/logs/next?id=" + this.jobId + "&baseId=" + this.baseId
).then(res => {
if (0 === res.data.code) {
let lists = res.data.data.logs;
this.logContent = this.logContent + lists.join("");
this.baseId = res.data.data.maxId;
this.status = res.data.data.status;
this.scrollMaxheight();
if (1 !== res.data.data.status) {
// 如果不是运行中,则需要关闭定时器
clearInterval(this.timer);
this.timer = null;
}
}
});
},
scrollMaxheight: function () {
this.$nextTick(() => {
setTimeout(() => {
const textarea = document.getElementById('log_textarea_id');
textarea.scrollTop = textarea.scrollHeight;
}, 13)
})
},
handleCloseLogDialog: function () {
this.dialogShowLogVisible = false;
clearInterval(this.timer);
this.timer = null;
} }
}, },
created () { created () {
this.loadPageTaskAssignments(); this.loadPageTaskAssignments();
} },
beforeDestroy () {
if (this.timer) {
clearInterval(this.timer);
}
},
}; };
</script> </script>
@@ -287,4 +391,10 @@ export default {
width: calc(100% - 250px); width: calc(100% - 250px);
} }
.log_textarea_style .el-textarea__inner {
padding: 5px 15px;
width: 100%;
color: #505664 !important;
background-color: #1d1b1f !important;
}
</style> </style>

View File

@@ -14,6 +14,8 @@ import com.gitee.dbswitch.admin.common.response.PageResult;
import com.gitee.dbswitch.admin.common.response.Result; import com.gitee.dbswitch.admin.common.response.Result;
import com.gitee.dbswitch.admin.config.SwaggerConfig; import com.gitee.dbswitch.admin.config.SwaggerConfig;
import com.gitee.dbswitch.admin.model.response.TaskJobDetailResponse; import com.gitee.dbswitch.admin.model.response.TaskJobDetailResponse;
import com.gitee.dbswitch.admin.model.response.TaskJobLogbackResponse;
import com.gitee.dbswitch.admin.service.JobLogbackService;
import com.gitee.dbswitch.admin.service.JobManagerService; import com.gitee.dbswitch.admin.service.JobManagerService;
import io.swagger.annotations.Api; import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiOperation;
@@ -32,6 +34,8 @@ public class JobManagerController {
@Resource @Resource
private JobManagerService opsManagerService; private JobManagerService opsManagerService;
@Resource
private JobLogbackService jobLogbackService;
@TokenCheck @TokenCheck
@ApiOperation(value = "根据任务ID查询作业执行记录") @ApiOperation(value = "根据任务ID查询作业执行记录")
@@ -56,4 +60,18 @@ public class JobManagerController {
return opsManagerService.cancelJob(id); return opsManagerService.cancelJob(id);
} }
@TokenCheck
@ApiOperation(value = "根据作业的ID查询最后N条日志")
@GetMapping(value = "/job/logs/tail", produces = MediaType.APPLICATION_JSON_VALUE)
public Result<TaskJobLogbackResponse> tailLogs(@RequestParam("id") Long id, @RequestParam("size") Integer size) {
return jobLogbackService.tailLog(id, size);
}
@TokenCheck
@ApiOperation(value = "根据作业的ID查询后续日志")
@GetMapping(value = "/job/logs/next", produces = MediaType.APPLICATION_JSON_VALUE)
public Result<TaskJobLogbackResponse> nextLogs(@RequestParam("id") Long id, @RequestParam("baseId") Long baseId) {
return jobLogbackService.nextLog(id, baseId);
}
} }

View File

@@ -0,0 +1,51 @@
// Copyright tang. All rights reserved.
// https://gitee.com/inrgihc/dbswitch
//
// Use of this source code is governed by a BSD-style license
//
// Author: tang (inrgihc@126.com)
// Date : 2020/1/2
// Location: beijing , china
/////////////////////////////////////////////////////////////
package com.gitee.dbswitch.admin.dao;
import com.gitee.dbswitch.admin.entity.JobLogbackEntity;
import com.gitee.dbswitch.admin.mapper.JobLogbackMapper;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import javax.annotation.Resource;
import org.springframework.stereotype.Repository;
import tk.mybatis.mapper.entity.Example;
import tk.mybatis.mapper.util.Sqls;
@Repository
public class JobLogbackDAO {
@Resource
private JobLogbackMapper jobLogbackMapper;
public void insert(String uuid, String content) {
jobLogbackMapper.insertSelective(JobLogbackEntity.builder().uuid(uuid).content(content).build());
}
public List<JobLogbackEntity> getTailByUuid(String uuid) {
Example example = Example.builder(JobLogbackEntity.class)
.select("id", "content")
.andWhere(Sqls.custom().andEqualTo("uuid", uuid))
.orderByDesc("id")
.build();
List<JobLogbackEntity> result = jobLogbackMapper.selectByExample(example);
Collections.reverse(result);
return result;
}
public List<JobLogbackEntity> getNextByUuid(String uuid, Long baseId) {
Example example = Example.builder(JobLogbackEntity.class)
.select("id", "content")
.andWhere(Sqls.custom().andGreaterThan("id", baseId).andEqualTo("uuid", uuid))
.build();
return jobLogbackMapper.selectByExample(example);
}
}

View File

@@ -0,0 +1,42 @@
// Copyright tang. All rights reserved.
// https://gitee.com/inrgihc/dbswitch
//
// Use of this source code is governed by a BSD-style license
//
// Author: tang (inrgihc@126.com)
// Date : 2020/1/2
// Location: beijing , china
/////////////////////////////////////////////////////////////
package com.gitee.dbswitch.admin.entity;
import java.sql.Timestamp;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import tk.mybatis.mapper.annotation.KeySql;
@SuperBuilder
@NoArgsConstructor
@Data
@Entity
@Table(name = "DBSWITCH_JOB_LOGBACK")
public class JobLogbackEntity {
@Id
@KeySql(useGeneratedKeys = true)
@Column(name = "id", insertable = false, updatable = false)
private Long id;
@Column(name = "uuid")
private String uuid;
@Column(name = "content")
private String content;
@Column(name = "create_time", insertable = false, updatable = false)
private Timestamp createTime;
}

View File

@@ -1,6 +1,6 @@
package com.gitee.dbswitch.admin.handler; package com.gitee.dbswitch.admin.handler;
import com.gitee.dbswitch.admin.util.JsonUtils; import com.gitee.dbswitch.data.util.JsonUtils;
import com.gitee.dbswitch.common.entity.PatternMapper; import com.gitee.dbswitch.common.entity.PatternMapper;
import java.sql.CallableStatement; import java.sql.CallableStatement;
import java.sql.PreparedStatement; import java.sql.PreparedStatement;

View File

@@ -9,7 +9,7 @@
///////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////
package com.gitee.dbswitch.admin.handler; package com.gitee.dbswitch.admin.handler;
import com.gitee.dbswitch.admin.util.JsonUtils; import com.gitee.dbswitch.data.util.JsonUtils;
import java.sql.CallableStatement; import java.sql.CallableStatement;
import java.sql.PreparedStatement; import java.sql.PreparedStatement;
import java.sql.ResultSet; import java.sql.ResultSet;

View File

@@ -0,0 +1,27 @@
package com.gitee.dbswitch.admin.listener;
import com.gitee.dbswitch.admin.dao.JobLogbackDAO;
import com.gitee.dbswitch.admin.logback.LogbackAppenderRegister;
import com.gitee.dbswitch.admin.logback.LogbackEventContent;
import com.gitee.dbswitch.admin.util.SpringUtils;
import java.util.Arrays;
import org.springframework.boot.context.event.ApplicationContextInitializedEvent;
import org.springframework.context.ApplicationListener;
public class DbswitchAdminStartedEventListener implements ApplicationListener<ApplicationContextInitializedEvent> {
private static final String[] LOGGER_CLASS_NAME = {
"com.gitee.dbswitch.data.service.MigrationService",
"com.gitee.dbswitch.data.handler.MigrationHandler"
};
@Override
public void onApplicationEvent(ApplicationContextInitializedEvent event) {
LogbackAppenderRegister.addDatabaseAppender(Arrays.asList(LOGGER_CLASS_NAME), this::recordLogContent);
}
private void recordLogContent(LogbackEventContent log) {
SpringUtils.getBean(JobLogbackDAO.class).insert(log.getIdentity(), log.getContent());
}
}

View File

@@ -0,0 +1,72 @@
package com.gitee.dbswitch.admin.logback;
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.PatternLayout;
import ch.qos.logback.classic.sift.MDCBasedDiscriminator;
import ch.qos.logback.classic.sift.SiftingAppender;
import java.util.List;
import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
@Slf4j
public final class LogbackAppenderRegister {
public final static String LOG_MDC_KEY_NAME = "LogUuid";
private final static String LOG_MDC_KEY_DEFAULT_VALUE = "00000000000";
private final static String LOG_STORE_APPENDER_NAME = "SIFT-DATABASE";
private final static String LOG_STORE_APPENDER_PATTERN
= "[%d{yyyy-MM-dd HH:mm:ss.SSS}] [%thread] %-5level %logger{35} - %msg%n";
public static void addDatabaseAppender(List<String> loggerClassNames,
Consumer<LogbackEventContent> handler) {
if (CollectionUtils.isEmpty(loggerClassNames)) {
return;
}
try {
LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
MDCBasedDiscriminator discriminator = new MDCBasedDiscriminator();
discriminator.setKey(LOG_MDC_KEY_NAME);
discriminator.setDefaultValue(LOG_MDC_KEY_DEFAULT_VALUE);
discriminator.start();
SiftingAppender sa = new SiftingAppender();
sa.setName(LOG_STORE_APPENDER_NAME);
sa.setDiscriminator(discriminator);
sa.setContext(loggerContext);
sa.setAppenderFactory(
(context, discriminatingValue) -> {
PatternLayout layout = new PatternLayout();
layout.setPattern(LOG_STORE_APPENDER_PATTERN);
layout.setContext(context);
layout.start();
LogbackDatabaseAppender la = new LogbackDatabaseAppender(handler);
la.setContext(context);
la.setName(discriminatingValue);
la.setLayout(layout);
la.start();
return la;
}
);
sa.start();
loggerClassNames.forEach(
name -> {
Logger jobLogger = loggerContext.getLogger(name);
jobLogger.addAppender(sa);
jobLogger.setLevel(Level.INFO);
log.info("Success add and initialize appender of logback for class{}", name);
}
);
} catch (Exception e) {
log.error("Failed add and initialize appender of logback ,message:", e);
}
}
}

View File

@@ -0,0 +1,70 @@
package com.gitee.dbswitch.admin.logback;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.Layout;
import ch.qos.logback.core.UnsynchronizedAppenderBase;
import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class LogbackDatabaseAppender extends UnsynchronizedAppenderBase<ILoggingEvent> {
private String name;
private Layout<ILoggingEvent> layout;
private Consumer<LogbackEventContent> handler;
public LogbackDatabaseAppender(Consumer<LogbackEventContent> handler) {
this.handler = handler;
}
@Override
public String getName() {
return name;
}
@Override
public void setName(String name) {
this.name = name;
}
public Layout<ILoggingEvent> getLayout() {
return layout;
}
public void setLayout(Layout<ILoggingEvent> layout) {
this.layout = layout;
}
@Override
public void start() {
if (layout == null) {
addError("LogbackDatabaseAppender layout cannot be null");
}
super.start();
}
@Override
public void stop() {
if (!isStarted()) {
return;
}
super.stop();
}
@Override
public void append(ILoggingEvent event) {
if (event == null || !isStarted()) {
return;
}
try {
handler.accept(LogbackEventContent.builder().identity(name).content(layout.doLayout(event)).build());
} catch (Exception e) {
e.printStackTrace();
log.error("failed to record logback log:{}", e.getMessage());
}
}
}

View File

@@ -0,0 +1,16 @@
package com.gitee.dbswitch.admin.logback;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class LogbackEventContent {
private String identity;
private String content;
}

View File

@@ -0,0 +1,17 @@
// Copyright tang. All rights reserved.
// https://gitee.com/inrgihc/dbswitch
//
// Use of this source code is governed by a BSD-style license
//
// Author: tang (inrgihc@126.com)
// Date : 2020/1/2
// Location: beijing , china
/////////////////////////////////////////////////////////////
package com.gitee.dbswitch.admin.mapper;
import com.gitee.dbswitch.admin.entity.JobLogbackEntity;
import tk.mybatis.mapper.common.Mapper;
public interface JobLogbackMapper extends Mapper<JobLogbackEntity> {
}

View File

@@ -0,0 +1,25 @@
package com.gitee.dbswitch.admin.model.response;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import java.util.Collections;
import java.util.List;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
@ApiModel("JOB执行日志信息")
public class TaskJobLogbackResponse {
@ApiModelProperty("最大的ID")
private Long maxId = 0L;
@ApiModelProperty("JOB状态")
private Integer status = 0;
@ApiModelProperty("日志列表")
private List<String> logs = Collections.emptyList();
}

View File

@@ -28,7 +28,7 @@ import com.gitee.dbswitch.admin.model.response.AssignmentDetailResponse;
import com.gitee.dbswitch.admin.model.response.AssignmentInfoResponse; import com.gitee.dbswitch.admin.model.response.AssignmentInfoResponse;
import com.gitee.dbswitch.admin.type.ScheduleModeEnum; import com.gitee.dbswitch.admin.type.ScheduleModeEnum;
import com.gitee.dbswitch.admin.type.SupportDbTypeEnum; import com.gitee.dbswitch.admin.type.SupportDbTypeEnum;
import com.gitee.dbswitch.admin.util.JsonUtils; import com.gitee.dbswitch.data.util.JsonUtils;
import com.gitee.dbswitch.admin.util.PageUtils; import com.gitee.dbswitch.admin.util.PageUtils;
import com.gitee.dbswitch.data.config.DbswichProperties; import com.gitee.dbswitch.data.config.DbswichProperties;
import com.gitee.dbswitch.data.entity.SourceDataSourceProperties; import com.gitee.dbswitch.data.entity.SourceDataSourceProperties;
@@ -41,11 +41,8 @@ import java.util.function.Supplier;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import javax.annotation.Resource; import javax.annotation.Resource;
import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.quartz.CronExpression;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import org.quartz.CronExpression;
@Service @Service
public class AssignmentService { public class AssignmentService {

View File

@@ -16,8 +16,10 @@ import com.gitee.dbswitch.admin.dao.AssignmentTaskDAO;
import com.gitee.dbswitch.admin.entity.AssignmentConfigEntity; import com.gitee.dbswitch.admin.entity.AssignmentConfigEntity;
import com.gitee.dbswitch.admin.entity.AssignmentJobEntity; import com.gitee.dbswitch.admin.entity.AssignmentJobEntity;
import com.gitee.dbswitch.admin.entity.AssignmentTaskEntity; import com.gitee.dbswitch.admin.entity.AssignmentTaskEntity;
import com.gitee.dbswitch.admin.logback.LogbackAppenderRegister;
import com.gitee.dbswitch.common.entity.MdcKeyValue;
import com.gitee.dbswitch.admin.type.JobStatusEnum; import com.gitee.dbswitch.admin.type.JobStatusEnum;
import com.gitee.dbswitch.admin.util.JsonUtils; import com.gitee.dbswitch.data.util.JsonUtils;
import com.gitee.dbswitch.data.config.DbswichProperties; import com.gitee.dbswitch.data.config.DbswichProperties;
import com.gitee.dbswitch.data.service.MigrationService; import com.gitee.dbswitch.data.service.MigrationService;
import com.google.common.cache.Cache; import com.google.common.cache.Cache;
@@ -56,6 +58,8 @@ public class JobExecutorService extends QuartzJobBean implements InterruptableJo
public final static String TASK_ID = "taskId"; public final static String TASK_ID = "taskId";
public final static String SCHEDULE = "schedule"; public final static String SCHEDULE = "schedule";
private final static String MDC_KEY = LogbackAppenderRegister.LOG_MDC_KEY_NAME;
// 相同taskId的任务限制并发执行的粒度锁缓存对象 // 相同taskId的任务限制并发执行的粒度锁缓存对象
private static Cache<String, ReentrantLock> mutexes = CacheBuilder.newBuilder() private static Cache<String, ReentrantLock> mutexes = CacheBuilder.newBuilder()
.expireAfterWrite(24 * 60L, TimeUnit.MINUTES) .expireAfterWrite(24 * 60L, TimeUnit.MINUTES)
@@ -129,6 +133,7 @@ public class JobExecutorService extends QuartzJobBean implements InterruptableJo
Integer schedule = jobDataMap.getIntValue(SCHEDULE); Integer schedule = jobDataMap.getIntValue(SCHEDULE);
AssignmentJobEntity assignmentJobEntity = assignmentJobDAO AssignmentJobEntity assignmentJobEntity = assignmentJobDAO
.newAssignmentJob(taskId, schedule, key.getName()); .newAssignmentJob(taskId, schedule, key.getName());
MdcKeyValue mdcKeyValue = new MdcKeyValue(MDC_KEY, assignmentJobEntity.getId().toString());
try { try {
ReentrantLock lock = mutexes.get(taskId.toString(), ReentrantLock::new); ReentrantLock lock = mutexes.get(taskId.toString(), ReentrantLock::new);
@@ -174,6 +179,7 @@ public class JobExecutorService extends QuartzJobBean implements InterruptableJo
} }
// 实际执行JOB // 实际执行JOB
migrationService.setMdcKeyValue(mdcKeyValue);
migrationService.run(); migrationService.run();
if (assignmentConfigEntity.getFirstFlag()) { if (assignmentConfigEntity.getFirstFlag()) {

View File

@@ -0,0 +1,73 @@
package com.gitee.dbswitch.admin.service;
import com.gitee.dbswitch.admin.common.response.PageResult;
import com.gitee.dbswitch.admin.common.response.Result;
import com.gitee.dbswitch.admin.dao.AssignmentJobDAO;
import com.gitee.dbswitch.admin.dao.JobLogbackDAO;
import com.gitee.dbswitch.admin.entity.AssignmentJobEntity;
import com.gitee.dbswitch.admin.entity.JobLogbackEntity;
import com.gitee.dbswitch.admin.model.response.TaskJobLogbackResponse;
import com.gitee.dbswitch.admin.type.JobStatusEnum;
import com.gitee.dbswitch.admin.util.PageUtils;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Resource;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
@Service
public class JobLogbackService {
@Resource
private AssignmentJobDAO assignmentJobDAO;
@Resource
private JobLogbackDAO jobLogbackDAO;
public Result<TaskJobLogbackResponse> tailLog(Long jobId, Integer size) {
TaskJobLogbackResponse response = new TaskJobLogbackResponse();
AssignmentJobEntity jobEntity = assignmentJobDAO.getById(jobId);
if (Objects.isNull(jobEntity)) {
return Result.success(response);
}
Supplier<List<JobLogbackEntity>> method = () -> jobLogbackDAO.getTailByUuid(jobId.toString());
PageResult<JobLogbackEntity> page = PageUtils.getPage(method, 1, Optional.of(size).orElse(100));
response.setStatus(jobEntity.getStatus());
if (!CollectionUtils.isEmpty(page.getData())) {
response.setMaxId(page.getData().stream().mapToLong(JobLogbackEntity::getId).max().getAsLong());
response.setLogs(page.getData().stream().map(JobLogbackEntity::getContent).collect(Collectors.toList()));
} else {
if (JobStatusEnum.FAIL.getValue() == jobEntity.getStatus()) {
response.setLogs(Arrays.asList(jobEntity.getErrorLog()));
}
}
return Result.success(response);
}
public Result<TaskJobLogbackResponse> nextLog(Long jobId, Long baseId) {
TaskJobLogbackResponse response = new TaskJobLogbackResponse();
AssignmentJobEntity jobEntity = assignmentJobDAO.getById(jobId);
if (Objects.isNull(jobEntity)) {
return Result.success(response);
}
baseId = Optional.ofNullable(baseId).orElse(0L);
List<JobLogbackEntity> page = jobLogbackDAO.getNextByUuid(jobId.toString(), baseId);
response.setStatus(jobEntity.getStatus());
if (!CollectionUtils.isEmpty(page)) {
response.setMaxId(page.stream().mapToLong(JobLogbackEntity::getId).max().getAsLong());
response.setLogs(page.stream().map(JobLogbackEntity::getContent).collect(Collectors.toList()));
}
if (response.getMaxId() <= baseId) {
response.setMaxId(baseId);
}
return Result.success(response);
}
}

View File

@@ -115,7 +115,7 @@ public class ScheduleService {
if (Objects.nonNull(assignmentJobEntity)) { if (Objects.nonNull(assignmentJobEntity)) {
String jobKeyName = assignmentJobEntity.getJobKey(); String jobKeyName = assignmentJobEntity.getJobKey();
cancelByJobKey(jobKeyName); cancelByJobKey(jobKeyName);
assignmentJobEntity.setStatus(JobStatusEnum.FAIL.getValue()); assignmentJobEntity.setStatus(JobStatusEnum.CANCEL.getValue());
assignmentJobEntity.setFinishTime(new Timestamp(System.currentTimeMillis())); assignmentJobEntity.setFinishTime(new Timestamp(System.currentTimeMillis()));
assignmentJobEntity.setErrorLog("Job was canceled!!!!"); assignmentJobEntity.setErrorLog("Job was canceled!!!!");
assignmentJobDAO.updateSelective(assignmentJobEntity); assignmentJobDAO.updateSelective(assignmentJobEntity);

View File

@@ -20,6 +20,7 @@ public enum JobStatusEnum {
RUNNING(1, "运行中"), RUNNING(1, "运行中"),
FAIL(2, "失败"), FAIL(2, "失败"),
PASS(3, "成功"), PASS(3, "成功"),
CANCEL(4, "手动终止"),
; ;
private int value; private int value;

View File

@@ -0,0 +1 @@
org.springframework.context.ApplicationListener=com.gitee.dbswitch.admin.listener.DbswitchAdminStartedEventListener

View File

@@ -0,0 +1,8 @@
CREATE TABLE IF NOT EXISTS `DBSWITCH_JOB_LOGBACK` (
`id` bigint not null auto_increment comment '自增id',
`uuid` varchar(128) not null comment 'job id',
`content` longtext comment '日志内容',
`create_time` timestamp not null default current_timestamp comment '创建时间',
PRIMARY KEY (`id`),
KEY `uuid` (`uuid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='JOB执行日志';

View File

@@ -0,0 +1,14 @@
CREATE TABLE IF NOT EXISTS DBSWITCH_JOB_LOGBACK (
"id" bigserial not null,
"uuid" varchar(128) not null default '',
"content" text,
"create_time" timestamp(6) not null default (CURRENT_TIMESTAMP(0))::timestamp(0) without time zone,
PRIMARY KEY ("id")
);
CREATE INDEX DBSWITCH_JOB_LOGBACK_UUID_IDX ON DBSWITCH_JOB_LOGBACK("uuid");
COMMENT ON TABLE DBSWITCH_JOB_LOGBACK IS 'JOB执行日志';
COMMENT ON COLUMN DBSWITCH_JOB_LOGBACK."id" IS '主键';
COMMENT ON COLUMN DBSWITCH_JOB_LOGBACK."uuid" IS 'job id';
COMMENT ON COLUMN DBSWITCH_JOB_LOGBACK."content" IS '日志内容';
COMMENT ON COLUMN DBSWITCH_JOB_LOGBACK."create_time" IS '创建时间';

View File

@@ -1 +1 @@
<!DOCTYPE html><html><head><meta charset=utf-8><meta name=viewport content="width=device-width,initial-scale=1"><title>异构数据迁移工具</title><link href=/static/css/app.8d8b93449d4031c291ff1ea0b9cd7542.css rel=stylesheet></head><body><div id=app></div><script type=text/javascript src=/static/js/manifest.e198fc88417848f1ebc8.js></script><script type=text/javascript src=/static/js/vendor.d6c2f50c2f02bf33c8cf.js></script><script type=text/javascript src=/static/js/app.fe3244a89b25aca1e9a1.js></script></body></html> <!DOCTYPE html><html><head><meta charset=utf-8><meta name=viewport content="width=device-width,initial-scale=1"><title>异构数据迁移工具</title><link href=/static/css/app.162b7be1f36569180b70e3a9276f0564.css rel=stylesheet></head><body><div id=app></div><script type=text/javascript src=/static/js/manifest.886eb50400644a213e62.js></script><script type=text/javascript src=/static/js/vendor.d6c2f50c2f02bf33c8cf.js></script><script type=text/javascript src=/static/js/app.1e8c429204c1c2e30faa.js></script></body></html>

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -1,2 +1,2 @@
webpackJsonp([17],{NHnr:function(n,e,t){"use strict";Object.defineProperty(e,"__esModule",{value:!0});var o=t("//Fk"),r=t.n(o),a=t("7+uW"),u={render:function(){var n=this.$createElement,e=this._self._c||n;return e("div",{staticClass:"body-wrapper"},[e("router-view")],1)},staticRenderFns:[]};var c=t("VU/8")({name:"App"},u,!1,function(n){t("Pibb")},"data-v-a97617c2",null).exports,i=t("/ocq");a.default.use(i.a);var l=new i.a({routes:[{path:"/",name:"首页",component:function(){return t.e(4).then(t.bind(null,"4er+"))},redirect:"/dashboard",children:[{path:"/dashboard",name:"概览",icon:"el-icon-menu",component:function(){return Promise.all([t.e(0),t.e(8)]).then(t.bind(null,"ARoL"))}},{path:"/connection",name:"连接管理",icon:"el-icon-s-order",component:function(){return Promise.all([t.e(0),t.e(13)]).then(t.bind(null,"qdtB"))}},{path:"/metadata",name:"数据目录",icon:"el-icon-coin",component:function(){return t.e(1).then(t.bind(null,"PJ2q"))}},{path:"/task",name:"任务管理",icon:"el-icon-s-tools",component:function(){return t.e(6).then(t.bind(null,"4KEO"))},children:[{path:"/task/assignment",name:"任务安排",icon:"el-icon-eleme",component:function(){return Promise.all([t.e(0),t.e(15)]).then(t.bind(null,"D0I9"))}},{path:"/task/schedule",name:"调度记录",icon:"el-icon-pie-chart",component:function(){return t.e(14).then(t.bind(null,"mKp/"))}}]},{path:"/log",name:"审计日志",icon:"el-icon-platform-eleme",component:function(){return t.e(7).then(t.bind(null,"QWih"))},children:[{path:"/log/access",name:"登录日志",icon:"el-icon-eleme",component:function(){return t.e(10).then(t.bind(null,"oQRv"))}},{path:"/log/action",name:"操作日志",icon:"el-icon-s-check",component:function(){return t.e(9).then(t.bind(null,"0eSS"))}}]},{path:"/about",name:"关于系统",icon:"el-icon-s-custom",component:function(){return t.e(2).then(t.bind(null,"m25N"))}},{path:"/user/personal",name:"个人中心",hidden:!0,component:function(){return t.e(3).then(t.bind(null,"uTKz"))}},{path:"/task/create",name:"创建任务",hidden:!0,component:function(){return Promise.all([t.e(0),t.e(12)]).then(t.bind(null,"/rCC"))}},{path:"/task/update",name:"修改任务",hidden:!0,component:function(){return Promise.all([t.e(0),t.e(11)]).then(t.bind(null,"txod"))}}]},{path:"/login",name:"登录",component:function(){return t.e(5).then(t.bind(null,"T+/8"))}}]}),p=t("mtWM"),s=t.n(p).a.create();s.interceptors.request.use(function(n){return n.url=""+n.url,n});var d=s,m=t("zL8q"),h=t.n(m),f=(t("muQq"),t("tvR6"),t("7Vno")),b=t.n(f),v=t("XLwt"),g=t.n(v);a.default.use(d),a.default.use(h.a),a.default.use(b.a),a.default.prototype.$http=d,a.default.config.productionTip=!1,a.default.prototype.$echarts=g.a,d.interceptors.request.use(function(n){var e=sessionStorage.getItem("token");return e&&(n.headers.Authorization="Bearer "+e),n},function(n){return r.a.reject(n)}),d.interceptors.response.use(function(n){return!n.data||401!==n.data.code&&403!==n.data.code&&404!==n.data.code||l.push({path:"/login"}),n},function(n){return console.log(n),r.a.reject(n.response)}),new a.default({el:"#app",router:l,components:{App:c},template:"<App/>"})},Pibb:function(n,e){},muQq:function(n,e){},tvR6:function(n,e){}},["NHnr"]); webpackJsonp([17],{NHnr:function(n,e,t){"use strict";Object.defineProperty(e,"__esModule",{value:!0});var o=t("//Fk"),r=t.n(o),a=t("7+uW"),u={render:function(){var n=this.$createElement,e=this._self._c||n;return e("div",{staticClass:"body-wrapper"},[e("router-view")],1)},staticRenderFns:[]};var c=t("VU/8")({name:"App"},u,!1,function(n){t("Pibb")},"data-v-a97617c2",null).exports,i=t("/ocq");a.default.use(i.a);var l=new i.a({routes:[{path:"/",name:"首页",component:function(){return t.e(4).then(t.bind(null,"4er+"))},redirect:"/dashboard",children:[{path:"/dashboard",name:"概览",icon:"el-icon-menu",component:function(){return Promise.all([t.e(0),t.e(8)]).then(t.bind(null,"ARoL"))}},{path:"/connection",name:"连接管理",icon:"el-icon-s-order",component:function(){return Promise.all([t.e(0),t.e(14)]).then(t.bind(null,"qdtB"))}},{path:"/metadata",name:"数据目录",icon:"el-icon-coin",component:function(){return t.e(1).then(t.bind(null,"PJ2q"))}},{path:"/task",name:"任务管理",icon:"el-icon-s-tools",component:function(){return t.e(6).then(t.bind(null,"4KEO"))},children:[{path:"/task/assignment",name:"任务安排",icon:"el-icon-eleme",component:function(){return Promise.all([t.e(0),t.e(15)]).then(t.bind(null,"D0I9"))}},{path:"/task/schedule",name:"调度记录",icon:"el-icon-pie-chart",component:function(){return t.e(11).then(t.bind(null,"mKp/"))}}]},{path:"/log",name:"审计日志",icon:"el-icon-platform-eleme",component:function(){return t.e(7).then(t.bind(null,"QWih"))},children:[{path:"/log/access",name:"登录日志",icon:"el-icon-eleme",component:function(){return t.e(10).then(t.bind(null,"oQRv"))}},{path:"/log/action",name:"操作日志",icon:"el-icon-s-check",component:function(){return t.e(9).then(t.bind(null,"0eSS"))}}]},{path:"/about",name:"关于系统",icon:"el-icon-s-custom",component:function(){return t.e(2).then(t.bind(null,"m25N"))}},{path:"/user/personal",name:"个人中心",hidden:!0,component:function(){return t.e(3).then(t.bind(null,"uTKz"))}},{path:"/task/create",name:"创建任务",hidden:!0,component:function(){return Promise.all([t.e(0),t.e(13)]).then(t.bind(null,"/rCC"))}},{path:"/task/update",name:"修改任务",hidden:!0,component:function(){return Promise.all([t.e(0),t.e(12)]).then(t.bind(null,"txod"))}}]},{path:"/login",name:"登录",component:function(){return t.e(5).then(t.bind(null,"T+/8"))}}]}),p=t("mtWM"),s=t.n(p).a.create();s.interceptors.request.use(function(n){return n.url=""+n.url,n});var d=s,m=t("zL8q"),h=t.n(m),f=(t("muQq"),t("tvR6"),t("7Vno")),b=t.n(f),v=t("XLwt"),g=t.n(v);a.default.use(d),a.default.use(h.a),a.default.use(b.a),a.default.prototype.$http=d,a.default.config.productionTip=!1,a.default.prototype.$echarts=g.a,d.interceptors.request.use(function(n){var e=sessionStorage.getItem("token");return e&&(n.headers.Authorization="Bearer "+e),n},function(n){return r.a.reject(n)}),d.interceptors.response.use(function(n){return!n.data||401!==n.data.code&&403!==n.data.code&&404!==n.data.code||l.push({path:"/login"}),n},function(n){return console.log(n),r.a.reject(n.response)}),new a.default({el:"#app",router:l,components:{App:c},template:"<App/>"})},Pibb:function(n,e){},muQq:function(n,e){},tvR6:function(n,e){}},["NHnr"]);
//# sourceMappingURL=app.fe3244a89b25aca1e9a1.js.map //# sourceMappingURL=app.1e8c429204c1c2e30faa.js.map

View File

@@ -1,2 +1,2 @@
!function(e){var n=window.webpackJsonp;window.webpackJsonp=function(r,c,a){for(var f,i,u,d=0,s=[];d<r.length;d++)i=r[d],t[i]&&s.push(t[i][0]),t[i]=0;for(f in c)Object.prototype.hasOwnProperty.call(c,f)&&(e[f]=c[f]);for(n&&n(r,c,a);s.length;)s.shift()();if(a)for(d=0;d<a.length;d++)u=o(o.s=a[d]);return u};var r={},t={18:0};function o(n){if(r[n])return r[n].exports;var t=r[n]={i:n,l:!1,exports:{}};return e[n].call(t.exports,t,t.exports,o),t.l=!0,t.exports}o.e=function(e){var n=t[e];if(0===n)return new Promise(function(e){e()});if(n)return n[2];var r=new Promise(function(r,o){n=t[e]=[r,o]});n[2]=r;var c=document.getElementsByTagName("head")[0],a=document.createElement("script");a.type="text/javascript",a.charset="utf-8",a.async=!0,a.timeout=12e4,o.nc&&a.setAttribute("nonce",o.nc),a.src=o.p+"static/js/"+e+"."+{0:"ca67e87d8c000a42e592",1:"bd90a7e98064156140b6",2:"a3a6e495913b4aed9e88",3:"d4c1dc7b68edb49b61d2",4:"c4abd62fbe15189d37a5",5:"837a4a67f1fcf6ee6c6a",6:"7f56c2238fb7e4ee2ecd",7:"d5dc80a855f66a3208ff",8:"2bf951413fb931374230",9:"5a8e8f0e586b6f27113c",10:"c2e1c087ed8b370c038c",11:"532774c3f8151c9e7000",12:"2ed5474933ed0393afb4",13:"193d0e9c6755a2722219",14:"ed586583e97505228b92",15:"fb483208e2855048c8d0"}[e]+".js";var f=setTimeout(i,12e4);function i(){a.onerror=a.onload=null,clearTimeout(f);var n=t[e];0!==n&&(n&&n[1](new Error("Loading chunk "+e+" failed.")),t[e]=void 0)}return a.onerror=a.onload=i,c.appendChild(a),r},o.m=e,o.c=r,o.d=function(e,n,r){o.o(e,n)||Object.defineProperty(e,n,{configurable:!1,enumerable:!0,get:r})},o.n=function(e){var n=e&&e.__esModule?function(){return e.default}:function(){return e};return o.d(n,"a",n),n},o.o=function(e,n){return Object.prototype.hasOwnProperty.call(e,n)},o.p="/",o.oe=function(e){throw console.error(e),e}}([]); !function(e){var n=window.webpackJsonp;window.webpackJsonp=function(r,c,a){for(var f,i,u,d=0,s=[];d<r.length;d++)i=r[d],t[i]&&s.push(t[i][0]),t[i]=0;for(f in c)Object.prototype.hasOwnProperty.call(c,f)&&(e[f]=c[f]);for(n&&n(r,c,a);s.length;)s.shift()();if(a)for(d=0;d<a.length;d++)u=o(o.s=a[d]);return u};var r={},t={18:0};function o(n){if(r[n])return r[n].exports;var t=r[n]={i:n,l:!1,exports:{}};return e[n].call(t.exports,t,t.exports,o),t.l=!0,t.exports}o.e=function(e){var n=t[e];if(0===n)return new Promise(function(e){e()});if(n)return n[2];var r=new Promise(function(r,o){n=t[e]=[r,o]});n[2]=r;var c=document.getElementsByTagName("head")[0],a=document.createElement("script");a.type="text/javascript",a.charset="utf-8",a.async=!0,a.timeout=12e4,o.nc&&a.setAttribute("nonce",o.nc),a.src=o.p+"static/js/"+e+"."+{0:"ca67e87d8c000a42e592",1:"bd90a7e98064156140b6",2:"a3a6e495913b4aed9e88",3:"d4c1dc7b68edb49b61d2",4:"c4abd62fbe15189d37a5",5:"837a4a67f1fcf6ee6c6a",6:"7f56c2238fb7e4ee2ecd",7:"d5dc80a855f66a3208ff",8:"2bf951413fb931374230",9:"5a8e8f0e586b6f27113c",10:"c2e1c087ed8b370c038c",11:"6ef08aec998c43887c9a",12:"3eac123814e95abf9826",13:"5067704e14d5bbf82b19",14:"0f29ed81f9a93fa695c6",15:"fb483208e2855048c8d0"}[e]+".js";var f=setTimeout(i,12e4);function i(){a.onerror=a.onload=null,clearTimeout(f);var n=t[e];0!==n&&(n&&n[1](new Error("Loading chunk "+e+" failed.")),t[e]=void 0)}return a.onerror=a.onload=i,c.appendChild(a),r},o.m=e,o.c=r,o.d=function(e,n,r){o.o(e,n)||Object.defineProperty(e,n,{configurable:!1,enumerable:!0,get:r})},o.n=function(e){var n=e&&e.__esModule?function(){return e.default}:function(){return e};return o.d(n,"a",n),n},o.o=function(e,n){return Object.prototype.hasOwnProperty.call(e,n)},o.p="/",o.oe=function(e){throw console.error(e),e}}([]);
//# sourceMappingURL=manifest.e198fc88417848f1ebc8.js.map //# sourceMappingURL=manifest.886eb50400644a213e62.js.map

View File

@@ -0,0 +1,21 @@
package com.gitee.dbswitch.common.entity;
import java.util.Objects;
import org.slf4j.MDC;
public abstract class AbstractLogging {
private final MdcKeyValue mdc;
public AbstractLogging(MdcKeyValue mdc) {
this.mdc = Objects.requireNonNull(mdc, "mdc is null");
}
protected void setupMdc() {
MDC.put(mdc.getMdcKey(), mdc.getMdcValue());
}
protected void cleanMdc() {
MDC.remove(mdc.getMdcKey());
}
}

View File

@@ -0,0 +1,24 @@
package com.gitee.dbswitch.common.entity;
import java.util.function.Function;
public class LoggingFunction<T, R> extends AbstractLogging implements Function<T, R> {
private final Function<T, R> command;
public LoggingFunction(Function<T, R> command, MdcKeyValue mdc) {
super(mdc);
this.command = command;
}
@Override
public R apply(T t) {
try {
setupMdc();
return command.apply(t);
} finally {
cleanMdc();
}
}
}

View File

@@ -0,0 +1,22 @@
package com.gitee.dbswitch.common.entity;
public class LoggingRunnable extends AbstractLogging implements Runnable {
private final Runnable command;
public LoggingRunnable(Runnable command, MdcKeyValue mdc) {
super(mdc);
this.command = command;
}
@Override
public void run() {
try {
setupMdc();
command.run();
} finally {
cleanMdc();
}
}
}

View File

@@ -0,0 +1,24 @@
package com.gitee.dbswitch.common.entity;
import java.util.function.Supplier;
public class LoggingSupplier<T> extends AbstractLogging implements Supplier<T> {
private final Supplier<T> command;
public LoggingSupplier(Supplier<T> command, MdcKeyValue mdc) {
super(mdc);
this.command = command;
}
@Override
public T get() {
try {
setupMdc();
return command.get();
} finally {
cleanMdc();
}
}
}

View File

@@ -0,0 +1,28 @@
package com.gitee.dbswitch.common.entity;
public class MdcKeyValue {
private String mdcKey;
private String mdcValue;
public MdcKeyValue(String mdcKey, String mdcValue) {
this.mdcKey = mdcKey;
this.mdcValue = mdcValue;
}
public String getMdcKey() {
return mdcKey;
}
public void setMdcKey(String mdcKey) {
this.mdcKey = mdcKey;
}
public String getMdcValue() {
return mdcValue;
}
public void setMdcValue(String mdcValue) {
this.mdcValue = mdcValue;
}
}

View File

@@ -195,6 +195,7 @@ public class MigrationHandler implements Supplier<Long> {
throw new RuntimeException("字段映射配置有误,禁止将多个字段映射到一个同名字段!"); throw new RuntimeException("字段映射配置有误,禁止将多个字段映射到一个同名字段!");
} }
if (interrupted) { if (interrupted) {
log.info("task job is interrupted!");
throw new RuntimeException("task is interrupted"); throw new RuntimeException("task is interrupted");
} }
IDatabaseWriter writer = DatabaseWriterFactory.createDatabaseWriter( IDatabaseWriter writer = DatabaseWriterFactory.createDatabaseWriter(
@@ -241,6 +242,7 @@ public class MigrationHandler implements Supplier<Long> {
} }
if (interrupted) { if (interrupted) {
log.info("task job is interrupted!");
throw new RuntimeException("task is interrupted"); throw new RuntimeException("task is interrupted");
} }
@@ -253,6 +255,7 @@ public class MigrationHandler implements Supplier<Long> {
} }
if (interrupted) { if (interrupted) {
log.info("task job is interrupted!");
throw new RuntimeException("task is interrupted"); throw new RuntimeException("task is interrupted");
} }
@@ -277,6 +280,7 @@ public class MigrationHandler implements Supplier<Long> {
} }
if (interrupted) { if (interrupted) {
log.info("task job is interrupted!");
throw new RuntimeException("task is interrupted"); throw new RuntimeException("task is interrupted");
} }
@@ -352,6 +356,7 @@ public class MigrationHandler implements Supplier<Long> {
try (ResultSet rs = srs.getResultset()) { try (ResultSet rs = srs.getResultset()) {
while (rs.next()) { while (rs.next()) {
if (interrupted) { if (interrupted) {
log.info("task job is interrupted!");
throw new RuntimeException("task is interrupted"); throw new RuntimeException("task is interrupted");
} }
Object[] record = new Object[sourceFields.size()]; Object[] record = new Object[sourceFields.size()];
@@ -482,6 +487,7 @@ public class MigrationHandler implements Supplier<Long> {
*/ */
private void checkFull(List<String> fields) { private void checkFull(List<String> fields) {
if (interrupted) { if (interrupted) {
log.info("task job is interrupted!");
throw new RuntimeException("task is interrupted"); throw new RuntimeException("task is interrupted");
} }
if (cacheInsert.size() >= BATCH_SIZE || cacheUpdate.size() >= BATCH_SIZE if (cacheInsert.size() >= BATCH_SIZE || cacheUpdate.size() >= BATCH_SIZE

View File

@@ -9,7 +9,10 @@
///////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////
package com.gitee.dbswitch.data.service; package com.gitee.dbswitch.data.service;
import com.fasterxml.jackson.databind.ObjectMapper; import com.gitee.dbswitch.common.entity.LoggingFunction;
import com.gitee.dbswitch.common.entity.LoggingRunnable;
import com.gitee.dbswitch.common.entity.LoggingSupplier;
import com.gitee.dbswitch.common.entity.MdcKeyValue;
import com.gitee.dbswitch.common.util.DbswitchStrUtils; import com.gitee.dbswitch.common.util.DbswitchStrUtils;
import com.gitee.dbswitch.core.model.TableDescription; import com.gitee.dbswitch.core.model.TableDescription;
import com.gitee.dbswitch.core.service.IMetaDataByDatasourceService; import com.gitee.dbswitch.core.service.IMetaDataByDatasourceService;
@@ -20,6 +23,7 @@ import com.gitee.dbswitch.data.entity.SourceDataSourceProperties;
import com.gitee.dbswitch.data.handler.MigrationHandler; import com.gitee.dbswitch.data.handler.MigrationHandler;
import com.gitee.dbswitch.data.util.BytesUnitUtils; import com.gitee.dbswitch.data.util.BytesUnitUtils;
import com.gitee.dbswitch.data.util.DataSourceUtils; import com.gitee.dbswitch.data.util.DataSourceUtils;
import com.gitee.dbswitch.data.util.JsonUtils;
import com.zaxxer.hikari.HikariDataSource; import com.zaxxer.hikari.HikariDataSource;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@@ -33,6 +37,7 @@ import java.util.function.Supplier;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.core.task.AsyncTaskExecutor; import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.StopWatch; import org.springframework.util.StopWatch;
@@ -46,11 +51,6 @@ import org.springframework.util.StopWatch;
@Service @Service
public class MigrationService { public class MigrationService {
/**
* JSON序列化工具
*/
private final ObjectMapper jackson = new ObjectMapper();
/** /**
* 性能统计记录表 * 性能统计记录表
*/ */
@@ -76,6 +76,11 @@ public class MigrationService {
*/ */
private final AsyncTaskExecutor taskExecutor; private final AsyncTaskExecutor taskExecutor;
/**
* 任务执行实时记录MDC
*/
private MdcKeyValue mdcKeyValue;
/** /**
* 构造函数 * 构造函数
* *
@@ -86,6 +91,10 @@ public class MigrationService {
this.taskExecutor = Objects.requireNonNull(tableMigrationExecutor, "taskExecutor is null"); this.taskExecutor = Objects.requireNonNull(tableMigrationExecutor, "taskExecutor is null");
} }
public void setMdcKeyValue(MdcKeyValue mdcKeyValue) {
this.mdcKeyValue = Objects.requireNonNull(mdcKeyValue, "mdcKeyValue is null");
}
/** /**
* 中断执行中的任务 * 中断执行中的任务
*/ */
@@ -94,10 +103,22 @@ public class MigrationService {
migrationHandlers.forEach(MigrationHandler::interrupt); migrationHandlers.forEach(MigrationHandler::interrupt);
} }
/**
* 执行入口
*/
public void run() {
if (Objects.nonNull(mdcKeyValue)) {
Runnable runnable = new LoggingRunnable(this::doRun, this.mdcKeyValue);
runnable.run();
} else {
doRun();
}
}
/** /**
* 执行主逻辑 * 执行主逻辑
*/ */
public void run() throws Exception { private void doRun() {
StopWatch watch = new StopWatch(); StopWatch watch = new StopWatch();
watch.start(); watch.start();
@@ -113,6 +134,7 @@ public class MigrationService {
List<SourceDataSourceProperties> sourcesProperties = properties.getSource(); List<SourceDataSourceProperties> sourcesProperties = properties.getSource();
for (SourceDataSourceProperties sourceProperties : sourcesProperties) { for (SourceDataSourceProperties sourceProperties : sourcesProperties) {
if (interrupted) { if (interrupted) {
log.info("task job is interrupted!");
throw new RuntimeException("task is interrupted"); throw new RuntimeException("task is interrupted");
} }
try (HikariDataSource sourceDataSource = DataSourceUtils.createSourceDataSource(sourceProperties)) { try (HikariDataSource sourceDataSource = DataSourceUtils.createSourceDataSource(sourceProperties)) {
@@ -121,10 +143,10 @@ public class MigrationService {
// 判断处理的策略:是排除还是包含 // 判断处理的策略:是排除还是包含
List<String> includes = DbswitchStrUtils.stringToList(sourceProperties.getSourceIncludes()); List<String> includes = DbswitchStrUtils.stringToList(sourceProperties.getSourceIncludes());
log.info("Includes tables is :{}", jackson.writeValueAsString(includes)); log.info("Includes tables is :{}", JsonUtils.toJsonString(includes));
List<String> filters = DbswitchStrUtils List<String> filters = DbswitchStrUtils
.stringToList(sourceProperties.getSourceExcludes()); .stringToList(sourceProperties.getSourceExcludes());
log.info("Filter tables is :{}", jackson.writeValueAsString(filters)); log.info("Filter tables is :{}", JsonUtils.toJsonString(filters));
boolean useExcludeTables = includes.isEmpty(); boolean useExcludeTables = includes.isEmpty();
if (useExcludeTables) { if (useExcludeTables) {
@@ -138,13 +160,14 @@ public class MigrationService {
List<CompletableFuture<Void>> futures = new ArrayList<>(); List<CompletableFuture<Void>> futures = new ArrayList<>();
List<String> schemas = DbswitchStrUtils.stringToList(sourceProperties.getSourceSchema()); List<String> schemas = DbswitchStrUtils.stringToList(sourceProperties.getSourceSchema());
log.info("Source schema names is :{}", jackson.writeValueAsString(schemas)); log.info("Source schema names is :{}", JsonUtils.toJsonString(schemas));
AtomicInteger numberOfFailures = new AtomicInteger(0); AtomicInteger numberOfFailures = new AtomicInteger(0);
AtomicLong totalBytesSize = new AtomicLong(0L); AtomicLong totalBytesSize = new AtomicLong(0L);
final int indexInternal = sourcePropertiesIndex; final int indexInternal = sourcePropertiesIndex;
for (String schema : schemas) { for (String schema : schemas) {
if (interrupted) { if (interrupted) {
log.info("task job is interrupted!");
break; break;
} }
List<TableDescription> tableList = sourceMetaDataService.queryTableList(schema); List<TableDescription> tableList = sourceMetaDataService.queryTableList(schema);
@@ -201,6 +224,9 @@ public class MigrationService {
} }
} }
log.info("service run all success, total migrate table count={} ", totalTableCount); log.info("service run all success, total migrate table count={} ", totalTableCount);
} catch (Throwable t) {
log.error("service run failed:{}", t.getMessage(), ExceptionUtils.getRootCause(t));
throw t;
} finally { } finally {
watch.stop(); watch.stop();
log.info("total ellipse = {} s", watch.getTotalTimeSeconds()); log.info("total ellipse = {} s", watch.getTotalTimeSeconds());
@@ -264,7 +290,7 @@ public class MigrationService {
Set<String> exists) { Set<String> exists) {
MigrationHandler instance = MigrationHandler.createInstance(td, properties, indexInternal, sds, tds, exists); MigrationHandler instance = MigrationHandler.createInstance(td, properties, indexInternal, sds, tds, exists);
migrationHandlers.add(instance); migrationHandlers.add(instance);
return instance; return Objects.isNull(mdcKeyValue) ? instance : new LoggingSupplier<>(instance, mdcKeyValue);
} }
/** /**
@@ -277,12 +303,13 @@ public class MigrationService {
private Function<Throwable, Long> getExceptHandler( private Function<Throwable, Long> getExceptHandler(
TableDescription td, TableDescription td,
AtomicInteger numberOfFailures) { AtomicInteger numberOfFailures) {
return (e) -> { Function<Throwable, Long> function = (e) -> {
log.error("Error migration for table: {}.{}, error message: {}", log.error("Error migration for table: {}.{}, error message: {}",
td.getSchemaName(), td.getTableName(), e.getMessage()); td.getSchemaName(), td.getTableName(), e.getMessage());
numberOfFailures.incrementAndGet(); numberOfFailures.incrementAndGet();
throw new RuntimeException(e); throw new RuntimeException(e);
}; };
return Objects.isNull(mdcKeyValue) ? function : new LoggingFunction<>(function, mdcKeyValue);
} }
} }

View File

@@ -7,7 +7,7 @@
// Date : 2020/1/2 // Date : 2020/1/2
// Location: beijing , china // Location: beijing , china
///////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////
package com.gitee.dbswitch.admin.util; package com.gitee.dbswitch.data.util;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.JavaType;