mirror of
https://gitee.com/dromara/dbswitch.git
synced 2026-01-14 07:06:02 +08:00
改进手动触发执行作业
This commit is contained in:
@@ -0,0 +1,163 @@
|
||||
// Copyright tang. All rights reserved.
|
||||
// https://gitee.com/inrgihc/dbswitch
|
||||
//
|
||||
// Use of this source code is governed by a BSD-style license
|
||||
//
|
||||
// Author: tang (inrgihc@126.com)
|
||||
// Date : 2020/1/2
|
||||
// Location: beijing , china
|
||||
/////////////////////////////////////////////////////////////
|
||||
package com.gitee.dbswitch.admin.execution;
|
||||
|
||||
import cn.hutool.core.exceptions.ExceptionUtil;
|
||||
import cn.hutool.extra.spring.SpringUtil;
|
||||
import com.gitee.dbswitch.admin.config.ExecutorConfig;
|
||||
import com.gitee.dbswitch.admin.dao.AssignmentConfigDAO;
|
||||
import com.gitee.dbswitch.admin.dao.AssignmentJobDAO;
|
||||
import com.gitee.dbswitch.admin.dao.AssignmentTaskDAO;
|
||||
import com.gitee.dbswitch.admin.entity.AssignmentConfigEntity;
|
||||
import com.gitee.dbswitch.admin.entity.AssignmentJobEntity;
|
||||
import com.gitee.dbswitch.admin.entity.AssignmentTaskEntity;
|
||||
import com.gitee.dbswitch.admin.logback.LogbackAppenderRegister;
|
||||
import com.gitee.dbswitch.admin.type.JobStatusEnum;
|
||||
import com.gitee.dbswitch.common.entity.MdcKeyValue;
|
||||
import com.gitee.dbswitch.data.config.DbswichProperties;
|
||||
import com.gitee.dbswitch.data.service.MigrationService;
|
||||
import com.gitee.dbswitch.data.util.JsonUtils;
|
||||
import com.google.common.cache.Cache;
|
||||
import com.google.common.cache.CacheBuilder;
|
||||
import java.sql.Timestamp;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.core.task.AsyncTaskExecutor;
|
||||
|
||||
@Slf4j
|
||||
public class ExecuteJobTaskRunnable implements Runnable {
|
||||
|
||||
private final static String MDC_KEY = LogbackAppenderRegister.LOG_MDC_KEY_NAME;
|
||||
|
||||
// 相同taskId的任务限制并发执行的粒度锁缓存对象
|
||||
private static Cache<String, ReentrantLock> mutexes = CacheBuilder.newBuilder()
|
||||
.expireAfterWrite(24 * 60L, TimeUnit.MINUTES)
|
||||
.build();
|
||||
|
||||
private volatile boolean interrupted = false;
|
||||
|
||||
private MigrationService migrationService;
|
||||
|
||||
private AssignmentTaskDAO assignmentTaskDAO;
|
||||
|
||||
private AssignmentConfigDAO assignmentConfigDAO;
|
||||
|
||||
private AssignmentJobDAO assignmentJobDAO;
|
||||
|
||||
private AsyncTaskExecutor migrationTaskExecutor;
|
||||
|
||||
private Long taskId;
|
||||
|
||||
private Integer schedule;
|
||||
|
||||
private String keyName;
|
||||
|
||||
public ExecuteJobTaskRunnable(Long taskId, Integer schedule, String keyName) {
|
||||
this.assignmentTaskDAO = SpringUtil.getBean(AssignmentTaskDAO.class);
|
||||
this.assignmentConfigDAO = SpringUtil.getBean(AssignmentConfigDAO.class);
|
||||
this.assignmentJobDAO = SpringUtil.getBean(AssignmentJobDAO.class);
|
||||
this.migrationTaskExecutor = SpringUtil.getBean(
|
||||
ExecutorConfig.TASK_EXECUTOR_BEAN_NAME, AsyncTaskExecutor.class);
|
||||
this.taskId = taskId;
|
||||
this.schedule = schedule;
|
||||
this.keyName = keyName;
|
||||
}
|
||||
|
||||
public void interrupt() {
|
||||
this.interrupted = true;
|
||||
if (null != this.migrationService) {
|
||||
this.migrationService.interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
AssignmentJobEntity assignmentJobEntity = assignmentJobDAO
|
||||
.newAssignmentJob(taskId, schedule, keyName);
|
||||
MdcKeyValue mdcKeyValue = new MdcKeyValue(MDC_KEY, assignmentJobEntity.getId().toString());
|
||||
|
||||
try {
|
||||
ReentrantLock lock = mutexes.get(taskId.toString(), ReentrantLock::new);
|
||||
while (!lock.tryLock(1, TimeUnit.SECONDS)) {
|
||||
if (interrupted) {
|
||||
log.info("Quartz task id:{} interrupted when get lock", taskId);
|
||||
return;
|
||||
}
|
||||
TimeUnit.SECONDS.sleep(1);
|
||||
}
|
||||
|
||||
try {
|
||||
log.info("Execute Job, and task id is : {} , job id is: {}",
|
||||
taskId, assignmentJobEntity.getId());
|
||||
|
||||
AssignmentTaskEntity task = assignmentTaskDAO.getById(taskId);
|
||||
AssignmentConfigEntity assignmentConfigEntity = assignmentConfigDAO
|
||||
.getByAssignmentTaskId(task.getId());
|
||||
|
||||
log.info("Execute Assignment [taskId={}],Task Name: {} ,configuration properties:{}",
|
||||
task.getId(),
|
||||
task.getName(),
|
||||
task.getContent());
|
||||
|
||||
try {
|
||||
DbswichProperties properties = JsonUtils.toBeanObject(
|
||||
task.getContent(), DbswichProperties.class);
|
||||
if (!assignmentConfigEntity.getFirstFlag()) {
|
||||
if (!assignmentConfigEntity.getTargetOnlyCreate()) {
|
||||
properties.getTarget().setTargetDrop(false);
|
||||
properties.getTarget().setOnlyCreate(false);
|
||||
properties.getTarget().setChangeDataSync(true);
|
||||
}
|
||||
}
|
||||
if (assignmentConfigEntity.getTargetOnlyCreate()) {
|
||||
properties.getTarget().setTargetDrop(true);
|
||||
}
|
||||
|
||||
migrationService = new MigrationService(properties, migrationTaskExecutor);
|
||||
if (interrupted) {
|
||||
log.info("Quartz task id:{} interrupted when prepare stage", taskId);
|
||||
return;
|
||||
}
|
||||
|
||||
// 实际执行JOB
|
||||
migrationService.setMdcKeyValue(mdcKeyValue);
|
||||
migrationService.run();
|
||||
|
||||
if (assignmentConfigEntity.getFirstFlag()) {
|
||||
AssignmentConfigEntity config = new AssignmentConfigEntity();
|
||||
config.setId(assignmentConfigEntity.getId());
|
||||
config.setTargetDropTable(assignmentConfigEntity.getTargetOnlyCreate());
|
||||
config.setFirstFlag(Boolean.FALSE);
|
||||
assignmentConfigDAO.updateSelective(config);
|
||||
}
|
||||
|
||||
assignmentJobEntity.setStatus(JobStatusEnum.PASS.getValue());
|
||||
log.info("Execute Assignment Success [taskId={},jobId={}],Task Name: {}",
|
||||
task.getId(), assignmentJobEntity.getId(), task.getName());
|
||||
} catch (Throwable e) {
|
||||
assignmentJobEntity.setStatus(JobStatusEnum.FAIL.getValue());
|
||||
assignmentJobEntity.setErrorLog(ExceptionUtil.stacktraceToString(e));
|
||||
log.info("Execute Assignment Failed [taskId={},jobId={}],Task Name: {}",
|
||||
task.getId(), assignmentJobEntity.getId(), task.getName(), e);
|
||||
} finally {
|
||||
assignmentJobEntity.setFinishTime(new Timestamp(System.currentTimeMillis()));
|
||||
assignmentJobDAO.updateSelective(assignmentJobEntity);
|
||||
}
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
} catch (ExecutionException | InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -9,27 +9,8 @@
|
||||
/////////////////////////////////////////////////////////////
|
||||
package com.gitee.dbswitch.admin.service;
|
||||
|
||||
import cn.hutool.core.exceptions.ExceptionUtil;
|
||||
import com.gitee.dbswitch.admin.dao.AssignmentConfigDAO;
|
||||
import com.gitee.dbswitch.admin.dao.AssignmentJobDAO;
|
||||
import com.gitee.dbswitch.admin.dao.AssignmentTaskDAO;
|
||||
import com.gitee.dbswitch.admin.entity.AssignmentConfigEntity;
|
||||
import com.gitee.dbswitch.admin.entity.AssignmentJobEntity;
|
||||
import com.gitee.dbswitch.admin.entity.AssignmentTaskEntity;
|
||||
import com.gitee.dbswitch.admin.logback.LogbackAppenderRegister;
|
||||
import com.gitee.dbswitch.common.entity.MdcKeyValue;
|
||||
import com.gitee.dbswitch.admin.type.JobStatusEnum;
|
||||
import com.gitee.dbswitch.data.util.JsonUtils;
|
||||
import com.gitee.dbswitch.data.config.DbswichProperties;
|
||||
import com.gitee.dbswitch.data.service.MigrationService;
|
||||
import com.google.common.cache.Cache;
|
||||
import com.google.common.cache.CacheBuilder;
|
||||
import java.sql.Timestamp;
|
||||
import com.gitee.dbswitch.admin.execution.ExecuteJobTaskRunnable;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import javax.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.quartz.DisallowConcurrentExecution;
|
||||
import org.quartz.InterruptableJob;
|
||||
@@ -39,7 +20,6 @@ import org.quartz.JobExecutionException;
|
||||
import org.quartz.JobKey;
|
||||
import org.quartz.PersistJobDataAfterExecution;
|
||||
import org.quartz.UnableToInterruptJobException;
|
||||
import org.springframework.core.task.AsyncTaskExecutor;
|
||||
import org.springframework.scheduling.quartz.QuartzJobBean;
|
||||
|
||||
/**
|
||||
@@ -58,13 +38,6 @@ public class JobExecutorService extends QuartzJobBean implements InterruptableJo
|
||||
public final static String TASK_ID = "taskId";
|
||||
public final static String SCHEDULE = "schedule";
|
||||
|
||||
private final static String MDC_KEY = LogbackAppenderRegister.LOG_MDC_KEY_NAME;
|
||||
|
||||
// 相同taskId的任务限制并发执行的粒度锁缓存对象
|
||||
private static Cache<String, ReentrantLock> mutexes = CacheBuilder.newBuilder()
|
||||
.expireAfterWrite(24 * 60L, TimeUnit.MINUTES)
|
||||
.build();
|
||||
|
||||
/**
|
||||
* 作为一个是否被中断的标识
|
||||
*/
|
||||
@@ -81,24 +54,9 @@ public class JobExecutorService extends QuartzJobBean implements InterruptableJo
|
||||
private String taskId;
|
||||
|
||||
/**
|
||||
* 迁移服务类
|
||||
* JOB实体
|
||||
*/
|
||||
private MigrationService migrationService;
|
||||
|
||||
/**
|
||||
* 这里可以使用Spring容器中的bean进行注入
|
||||
*/
|
||||
@Resource
|
||||
private AssignmentTaskDAO assignmentTaskDAO;
|
||||
|
||||
@Resource
|
||||
private AssignmentConfigDAO assignmentConfigDAO;
|
||||
|
||||
@Resource
|
||||
private AssignmentJobDAO assignmentJobDAO;
|
||||
|
||||
@Resource
|
||||
private AsyncTaskExecutor migrationTaskExecutor;
|
||||
private ExecuteJobTaskRunnable taskRunnable;
|
||||
|
||||
/**
|
||||
* 实现setter方法,Quartz会给成员变量taskId注入值
|
||||
@@ -113,8 +71,8 @@ public class JobExecutorService extends QuartzJobBean implements InterruptableJo
|
||||
public void interrupt() throws UnableToInterruptJobException {
|
||||
log.info("Quartz Schedule Task job is interrupting : taskId={} ", taskId);
|
||||
interrupted = true;
|
||||
if (Objects.nonNull(migrationService)) {
|
||||
migrationService.interrupt();
|
||||
if (Objects.nonNull(taskRunnable)) {
|
||||
taskRunnable.interrupt();
|
||||
}
|
||||
currentThread.interrupt();
|
||||
}
|
||||
@@ -131,84 +89,8 @@ public class JobExecutorService extends QuartzJobBean implements InterruptableJo
|
||||
JobKey key = context.getJobDetail().getKey();
|
||||
Long taskId = jobDataMap.getLongValue(TASK_ID);
|
||||
Integer schedule = jobDataMap.getIntValue(SCHEDULE);
|
||||
AssignmentJobEntity assignmentJobEntity = assignmentJobDAO
|
||||
.newAssignmentJob(taskId, schedule, key.getName());
|
||||
MdcKeyValue mdcKeyValue = new MdcKeyValue(MDC_KEY, assignmentJobEntity.getId().toString());
|
||||
|
||||
try {
|
||||
ReentrantLock lock = mutexes.get(taskId.toString(), ReentrantLock::new);
|
||||
while (!lock.tryLock(1, TimeUnit.SECONDS)) {
|
||||
if (interrupted) {
|
||||
log.info("Quartz task id:{} interrupted when get lock", jobDataMap.getLong(TASK_ID));
|
||||
return;
|
||||
}
|
||||
TimeUnit.SECONDS.sleep(1);
|
||||
}
|
||||
|
||||
try {
|
||||
log.info("Execute Quartz Job, and task id is : {} , job id is: {}", taskId,
|
||||
assignmentJobEntity.getId());
|
||||
|
||||
AssignmentTaskEntity task = assignmentTaskDAO.getById(taskId);
|
||||
AssignmentConfigEntity assignmentConfigEntity = assignmentConfigDAO
|
||||
.getByAssignmentTaskId(task.getId());
|
||||
|
||||
log.info("Execute Assignment [taskId={}],Task Name: {} ,configuration properties:{}",
|
||||
task.getId(),
|
||||
task.getName(),
|
||||
task.getContent());
|
||||
|
||||
try {
|
||||
DbswichProperties properties = JsonUtils.toBeanObject(
|
||||
task.getContent(), DbswichProperties.class);
|
||||
if (!assignmentConfigEntity.getFirstFlag()) {
|
||||
if (!assignmentConfigEntity.getTargetOnlyCreate()) {
|
||||
properties.getTarget().setTargetDrop(false);
|
||||
properties.getTarget().setOnlyCreate(false);
|
||||
properties.getTarget().setChangeDataSync(true);
|
||||
}
|
||||
}
|
||||
if (assignmentConfigEntity.getTargetOnlyCreate()) {
|
||||
properties.getTarget().setTargetDrop(true);
|
||||
}
|
||||
|
||||
migrationService = new MigrationService(properties, migrationTaskExecutor);
|
||||
if (interrupted) {
|
||||
log.info("Quartz task id:{} interrupted when prepare stage", jobDataMap.getLong(TASK_ID));
|
||||
return;
|
||||
}
|
||||
|
||||
// 实际执行JOB
|
||||
migrationService.setMdcKeyValue(mdcKeyValue);
|
||||
migrationService.run();
|
||||
|
||||
if (assignmentConfigEntity.getFirstFlag()) {
|
||||
AssignmentConfigEntity config = new AssignmentConfigEntity();
|
||||
config.setId(assignmentConfigEntity.getId());
|
||||
config.setTargetDropTable(assignmentConfigEntity.getTargetOnlyCreate());
|
||||
config.setFirstFlag(Boolean.FALSE);
|
||||
assignmentConfigDAO.updateSelective(config);
|
||||
}
|
||||
|
||||
assignmentJobEntity.setStatus(JobStatusEnum.PASS.getValue());
|
||||
log.info("Execute Assignment Success [taskId={},jobId={}],Task Name: {}",
|
||||
task.getId(), assignmentJobEntity.getId(), task.getName());
|
||||
} catch (Throwable e) {
|
||||
assignmentJobEntity.setStatus(JobStatusEnum.FAIL.getValue());
|
||||
assignmentJobEntity.setErrorLog(ExceptionUtil.stacktraceToString(e));
|
||||
log.info("Execute Assignment Failed [taskId={},jobId={}],Task Name: {}",
|
||||
task.getId(), assignmentJobEntity.getId(), task.getName(), e);
|
||||
} finally {
|
||||
assignmentJobEntity.setFinishTime(new Timestamp(System.currentTimeMillis()));
|
||||
assignmentJobDAO.updateSelective(assignmentJobEntity);
|
||||
}
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
} catch (ExecutionException | InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
taskRunnable = new ExecuteJobTaskRunnable(taskId, schedule, key.getName());
|
||||
taskRunnable.run();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -13,12 +13,15 @@ import com.gitee.dbswitch.admin.dao.AssignmentJobDAO;
|
||||
import com.gitee.dbswitch.admin.dao.AssignmentTaskDAO;
|
||||
import com.gitee.dbswitch.admin.entity.AssignmentJobEntity;
|
||||
import com.gitee.dbswitch.admin.entity.AssignmentTaskEntity;
|
||||
import com.gitee.dbswitch.admin.execution.ExecuteJobTaskRunnable;
|
||||
import com.gitee.dbswitch.admin.type.JobStatusEnum;
|
||||
import com.gitee.dbswitch.admin.type.ScheduleModeEnum;
|
||||
import com.gitee.dbswitch.common.event.TaskEventHub;
|
||||
import com.gitee.dbswitch.common.util.UuidUtils;
|
||||
import java.sql.Timestamp;
|
||||
import java.util.Date;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import javax.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
@@ -28,11 +31,12 @@ import org.quartz.JobDetail;
|
||||
import org.quartz.JobKey;
|
||||
import org.quartz.Scheduler;
|
||||
import org.quartz.SchedulerException;
|
||||
import org.quartz.SimpleScheduleBuilder;
|
||||
import org.quartz.Trigger;
|
||||
import org.quartz.TriggerBuilder;
|
||||
import org.quartz.TriggerKey;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.context.event.ApplicationReadyEvent;
|
||||
import org.springframework.context.event.EventListener;
|
||||
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
@@ -40,6 +44,12 @@ import org.springframework.stereotype.Service;
|
||||
@Service
|
||||
public class ScheduleService {
|
||||
|
||||
private static final String evenName = "start";
|
||||
|
||||
static {
|
||||
TaskEventHub.init(5);
|
||||
}
|
||||
|
||||
/**
|
||||
* @Bean是一个方法级别上的注解,Bean的ID为方法名字。
|
||||
* @Resource默认按照ByName自动注入
|
||||
@@ -54,6 +64,32 @@ public class ScheduleService {
|
||||
@Resource
|
||||
private AssignmentJobDAO assignmentJobDAO;
|
||||
|
||||
private TaskEventHub manualRunEvenHub = new TaskEventHub("manualRun");
|
||||
|
||||
private Map<String, ExecuteJobTaskRunnable> taskRunnableMap = new ConcurrentHashMap<>();
|
||||
|
||||
@EventListener(ApplicationReadyEvent.class)
|
||||
public void registerEventListener() {
|
||||
manualRunEvenHub.listen(
|
||||
evenName,
|
||||
(event) -> {
|
||||
event.checkArgs(Long.class, String.class);
|
||||
Long taskId = (Long) event.getArgs()[0];
|
||||
Integer schedule = ScheduleModeEnum.MANUAL.getValue();
|
||||
String jobKey = (String) event.getArgs()[1];
|
||||
ExecuteJobTaskRunnable taskRunnable
|
||||
= new ExecuteJobTaskRunnable(taskId, schedule, jobKey);
|
||||
taskRunnableMap.put(jobKey, taskRunnable);
|
||||
try {
|
||||
taskRunnable.run();
|
||||
} finally {
|
||||
taskRunnableMap.remove(jobKey);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
public void scheduleTask(Long taskId, ScheduleModeEnum scheduleMode) {
|
||||
/** 准备JobDetail */
|
||||
String jobKeyName = UuidUtils.generateUuid() + "@" + taskId.toString();
|
||||
@@ -70,11 +106,12 @@ public class ScheduleService {
|
||||
String triggerGroup = JobExecutorService.GROUP;
|
||||
TriggerKey triggerKey = TriggerKey.triggerKey(triggerName, triggerGroup);
|
||||
|
||||
log.info("Create schedule task, taskId: {}", taskId);
|
||||
log.info("Create schedule task, taskId: {}, jobKey: {}", taskId, jobKeyName);
|
||||
|
||||
AssignmentTaskEntity task = assignmentTaskDAO.getById(taskId);
|
||||
if (ScheduleModeEnum.MANUAL == scheduleMode) {
|
||||
scheduleOnce(jobBuilder.storeDurably(false).build(), triggerKey);
|
||||
manualRunEvenHub.notify(evenName, taskId, jobKeyName);
|
||||
//scheduleOnce(jobBuilder.storeDurably(false).build(), triggerKey);
|
||||
} else {
|
||||
scheduleCron(jobBuilder.storeDurably(true).build(), triggerKey, task.getCronExpression());
|
||||
}
|
||||
@@ -88,6 +125,12 @@ public class ScheduleService {
|
||||
return;
|
||||
}
|
||||
|
||||
ExecuteJobTaskRunnable runnable = taskRunnableMap.get(jobKeyName);
|
||||
if (null != runnable) {
|
||||
runnable.interrupt();
|
||||
return;
|
||||
}
|
||||
|
||||
String jobGroup = JobExecutorService.GROUP;
|
||||
JobKey jobKey = JobKey.jobKey(jobKeyName, jobGroup);
|
||||
|
||||
@@ -122,23 +165,23 @@ public class ScheduleService {
|
||||
}
|
||||
}
|
||||
|
||||
private void scheduleOnce(JobDetail jobDetail, TriggerKey triggerKey) {
|
||||
Scheduler scheduler = schedulerFactoryBean.getScheduler();
|
||||
Trigger simpleTrigger = TriggerBuilder.newTrigger()
|
||||
.startAt(new Date())
|
||||
.withIdentity(triggerKey)
|
||||
.withSchedule(SimpleScheduleBuilder.simpleSchedule().withRepeatCount(0))
|
||||
.build();
|
||||
|
||||
try {
|
||||
scheduler.scheduleJob(jobDetail, simpleTrigger);
|
||||
} catch (SchedulerException e) {
|
||||
log.error("Quartz schedule task by manual failed, taskId: {}.",
|
||||
jobDetail.getJobDataMap().get(JobExecutorService.TASK_ID), e);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
}
|
||||
// private void scheduleOnce(JobDetail jobDetail, TriggerKey triggerKey) {
|
||||
// Scheduler scheduler = schedulerFactoryBean.getScheduler();
|
||||
// Trigger simpleTrigger = TriggerBuilder.newTrigger()
|
||||
// .startAt(new Date())
|
||||
// .withIdentity(triggerKey)
|
||||
// .withSchedule(SimpleScheduleBuilder.simpleSchedule().withRepeatCount(0))
|
||||
// .build();
|
||||
//
|
||||
// try {
|
||||
// scheduler.scheduleJob(jobDetail, simpleTrigger);
|
||||
// } catch (SchedulerException e) {
|
||||
// log.error("Quartz schedule task by manual failed, taskId: {}.",
|
||||
// jobDetail.getJobDataMap().get(JobExecutorService.TASK_ID), e);
|
||||
// throw new RuntimeException(e);
|
||||
// }
|
||||
//
|
||||
// }
|
||||
|
||||
private void scheduleCron(JobDetail jobDetail, TriggerKey triggerKey, String cronExpression) {
|
||||
Scheduler scheduler = schedulerFactoryBean.getScheduler();
|
||||
|
||||
@@ -15,16 +15,16 @@ import java.util.Collections;
|
||||
import lombok.Getter;
|
||||
|
||||
@Getter
|
||||
public class ListenEvent extends java.util.EventObject {
|
||||
public class ListenedEvent extends java.util.EventObject {
|
||||
|
||||
private String name;
|
||||
private Object[] args;
|
||||
|
||||
public ListenEvent(Object source, String event) {
|
||||
public ListenedEvent(Object source, String event) {
|
||||
this(source, event, Collections.emptyList().toArray());
|
||||
}
|
||||
|
||||
public ListenEvent(Object source, String event, Object... args) {
|
||||
public ListenedEvent(Object source, String event, Object... args) {
|
||||
super(source);
|
||||
this.name = event;
|
||||
this.args = args;
|
||||
@@ -24,42 +24,36 @@ import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import javax.annotation.Nullable;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class EventHub {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(EventHub.class);
|
||||
@Slf4j
|
||||
public class TaskEventHub {
|
||||
|
||||
public static final String EVENT_WORKER = "event-worker-%d";
|
||||
public static final String ANY_EVENT = "*";
|
||||
|
||||
private static final List<EventListener> EMPTY = ImmutableList.of();
|
||||
private static final List<TaskEventListener> EMPTY = ImmutableList.of();
|
||||
|
||||
// Event executor
|
||||
private static ExecutorService executor = null;
|
||||
|
||||
private String name;
|
||||
private Map<String, List<EventListener>> listeners;
|
||||
private Map<String, List<TaskEventListener>> listeners;
|
||||
|
||||
public EventHub() {
|
||||
this("hub");
|
||||
}
|
||||
|
||||
public EventHub(String name) {
|
||||
LOG.debug("Create new EventHub: {}", name);
|
||||
public TaskEventHub(String name) {
|
||||
log.info("Create new EventHub: {}", name);
|
||||
|
||||
this.name = name;
|
||||
this.listeners = new ConcurrentHashMap<>();
|
||||
EventHub.init(1);
|
||||
TaskEventHub.init(1);
|
||||
}
|
||||
|
||||
public static synchronized void init(int poolSize) {
|
||||
if (executor != null) {
|
||||
return;
|
||||
}
|
||||
LOG.debug("Init pool(size {}) for EventHub", poolSize);
|
||||
log.debug("Init pool(size {}) for EventHub", poolSize);
|
||||
executor = new ThreadPoolExecutor(
|
||||
poolSize,
|
||||
poolSize,
|
||||
@@ -74,7 +68,7 @@ public class EventHub {
|
||||
|
||||
public static synchronized boolean destroy(long timeout)
|
||||
throws InterruptedException {
|
||||
LOG.debug("Destroy pool for EventHub");
|
||||
log.debug("Destroy pool for EventHub");
|
||||
executor.shutdown();
|
||||
return executor.awaitTermination(timeout, TimeUnit.SECONDS);
|
||||
}
|
||||
@@ -90,34 +84,34 @@ public class EventHub {
|
||||
}
|
||||
|
||||
public boolean containsListener(String event) {
|
||||
List<EventListener> ls = this.listeners.get(event);
|
||||
List<TaskEventListener> ls = this.listeners.get(event);
|
||||
return ls != null && ls.size() > 0;
|
||||
}
|
||||
|
||||
public List<EventListener> listeners(String event) {
|
||||
List<EventListener> ls = this.listeners.get(event);
|
||||
public List<TaskEventListener> listeners(String event) {
|
||||
List<TaskEventListener> ls = this.listeners.get(event);
|
||||
return ls == null ? EMPTY : Collections.unmodifiableList(ls);
|
||||
}
|
||||
|
||||
public void listen(String event, EventListener listener) {
|
||||
public void listen(String event, TaskEventListener listener) {
|
||||
Preconditions.checkNotNull(event, "event");
|
||||
Preconditions.checkNotNull(listener, "event listener");
|
||||
|
||||
if (!this.listeners.containsKey(event)) {
|
||||
this.listeners.putIfAbsent(event, new CopyOnWriteArrayList<>());
|
||||
}
|
||||
List<EventListener> ls = this.listeners.get(event);
|
||||
List<TaskEventListener> ls = this.listeners.get(event);
|
||||
assert ls != null : this.listeners;
|
||||
ls.add(listener);
|
||||
}
|
||||
|
||||
public List<EventListener> unlisten(String event) {
|
||||
List<EventListener> ls = this.listeners.remove(event);
|
||||
public List<TaskEventListener> unlisten(String event) {
|
||||
List<TaskEventListener> ls = this.listeners.remove(event);
|
||||
return ls == null ? EMPTY : Collections.unmodifiableList(ls);
|
||||
}
|
||||
|
||||
public int unlisten(String event, EventListener listener) {
|
||||
List<EventListener> ls = this.listeners.get(event);
|
||||
public int unlisten(String event, TaskEventListener listener) {
|
||||
List<TaskEventListener> ls = this.listeners.get(event);
|
||||
if (ls == null) {
|
||||
return 0;
|
||||
}
|
||||
@@ -130,13 +124,13 @@ public class EventHub {
|
||||
}
|
||||
|
||||
public Future<Integer> notify(String event, @Nullable Object... args) {
|
||||
List<EventListener> all = Collections.synchronizedList(new ArrayList<>());
|
||||
List<TaskEventListener> all = Collections.synchronizedList(new ArrayList<>());
|
||||
|
||||
List<EventListener> ls = this.listeners.get(event);
|
||||
List<TaskEventListener> ls = this.listeners.get(event);
|
||||
if (ls != null && !ls.isEmpty()) {
|
||||
all.addAll(ls);
|
||||
}
|
||||
List<EventListener> lsAny = this.listeners.get(ANY_EVENT);
|
||||
List<TaskEventListener> lsAny = this.listeners.get(ANY_EVENT);
|
||||
if (lsAny != null && !lsAny.isEmpty()) {
|
||||
all.addAll(lsAny);
|
||||
}
|
||||
@@ -145,18 +139,18 @@ public class EventHub {
|
||||
return CompletableFuture.completedFuture(0);
|
||||
}
|
||||
|
||||
ListenEvent ev = new ListenEvent(this, event, args);
|
||||
ListenedEvent ev = new ListenedEvent(this, event, args);
|
||||
|
||||
// The submit will catch params: `all`(Listeners) and `ev`(Event)
|
||||
return executor().submit(() -> {
|
||||
int count = 0;
|
||||
// Notify all listeners, and ignore the results
|
||||
for (EventListener listener : all) {
|
||||
for (TaskEventListener listener : all) {
|
||||
try {
|
||||
listener.event(ev);
|
||||
count++;
|
||||
} catch (Throwable e) {
|
||||
LOG.warn("Failed to handle event: {}", ev, e);
|
||||
log.warn("Failed to handle event: {}", ev, e);
|
||||
}
|
||||
}
|
||||
return count;
|
||||
@@ -164,14 +158,14 @@ public class EventHub {
|
||||
}
|
||||
|
||||
public Object call(String event, @Nullable Object... args) {
|
||||
List<EventListener> ls = this.listeners.get(event);
|
||||
List<TaskEventListener> ls = this.listeners.get(event);
|
||||
if (ls == null) {
|
||||
throw new RuntimeException("Not found listener for: " + event);
|
||||
} else if (ls.size() != 1) {
|
||||
throw new RuntimeException("Too many listeners for: " + event);
|
||||
}
|
||||
EventListener listener = ls.get(0);
|
||||
return listener.event(new ListenEvent(this, event, args));
|
||||
TaskEventListener listener = ls.get(0);
|
||||
return listener.event(new ListenedEvent(this, event, args));
|
||||
}
|
||||
|
||||
}
|
||||
@@ -9,7 +9,7 @@
|
||||
/////////////////////////////////////////////////////////////
|
||||
package com.gitee.dbswitch.common.event;
|
||||
|
||||
public interface EventListener extends java.util.EventListener {
|
||||
public interface TaskEventListener extends java.util.EventListener {
|
||||
|
||||
/**
|
||||
* The event callback
|
||||
@@ -17,5 +17,5 @@ public interface EventListener extends java.util.EventListener {
|
||||
* @param event object
|
||||
* @return event result
|
||||
*/
|
||||
Object event(ListenEvent event);
|
||||
Object event(ListenedEvent event);
|
||||
}
|
||||
@@ -142,7 +142,7 @@ public final class JdbcTypesUtils {
|
||||
}
|
||||
|
||||
if (isBinary(jdbcType)) {
|
||||
byte[] bytes = Convert.toPrimitiveByteArray(value);
|
||||
byte[] bytes = TypeConvertUtils.castToByteArray(value);
|
||||
return null == bytes ? 0 : bytes.length;
|
||||
} else if (isBoolean(jdbcType)) {
|
||||
return 1;
|
||||
@@ -151,4 +151,5 @@ public final class JdbcTypesUtils {
|
||||
return null == strValue ? 0 : strValue.length();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -66,7 +66,7 @@ public class DefaultTableDataQueryProvider
|
||||
sb.append(productType.quoteSchemaTableName(schemaName, tableName));
|
||||
if (CollectionUtils.isNotEmpty(orders)) {
|
||||
sb.append(" ORDER BY ");
|
||||
sb.append(StringUtils.join(orders, productType.quoteName(",")));
|
||||
sb.append(productType.quoteName(StringUtils.join(orders, productType.quoteName(","))));
|
||||
}
|
||||
return this.selectTableData(sb.toString(), getProductFeatures().convertFetchSize(this.fetchSize));
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user