From 04ec3f02be8733399eebf1f345cd6732002c19c7 Mon Sep 17 00:00:00 2001 From: inrgihc Date: Tue, 25 Jul 2023 22:46:56 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=B9=E8=BF=9B=E6=89=8B=E5=8A=A8=E8=A7=A6?= =?UTF-8?q?=E5=8F=91=E6=89=A7=E8=A1=8C=E4=BD=9C=E4=B8=9A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../execution/ExecuteJobTaskRunnable.java | 163 ++++++++++++++++++ .../admin/service/JobExecutorService.java | 132 +------------- .../admin/service/ScheduleService.java | 85 ++++++--- .../{ListenEvent.java => ListenedEvent.java} | 6 +- .../{EventHub.java => TaskEventHub.java} | 62 +++---- ...ntListener.java => TaskEventListener.java} | 4 +- .../dbswitch/common/util/JdbcTypesUtils.java | 3 +- .../query/DefaultTableDataQueryProvider.java | 2 +- 8 files changed, 270 insertions(+), 187 deletions(-) create mode 100644 dbswitch-admin/src/main/java/com/gitee/dbswitch/admin/execution/ExecuteJobTaskRunnable.java rename dbswitch-common/src/main/java/com/gitee/dbswitch/common/event/{ListenEvent.java => ListenedEvent.java} (88%) rename dbswitch-common/src/main/java/com/gitee/dbswitch/common/event/{EventHub.java => TaskEventHub.java} (70%) rename dbswitch-common/src/main/java/com/gitee/dbswitch/common/event/{EventListener.java => TaskEventListener.java} (80%) diff --git a/dbswitch-admin/src/main/java/com/gitee/dbswitch/admin/execution/ExecuteJobTaskRunnable.java b/dbswitch-admin/src/main/java/com/gitee/dbswitch/admin/execution/ExecuteJobTaskRunnable.java new file mode 100644 index 00000000..acff3a35 --- /dev/null +++ b/dbswitch-admin/src/main/java/com/gitee/dbswitch/admin/execution/ExecuteJobTaskRunnable.java @@ -0,0 +1,163 @@ +// Copyright tang. All rights reserved. +// https://gitee.com/inrgihc/dbswitch +// +// Use of this source code is governed by a BSD-style license +// +// Author: tang (inrgihc@126.com) +// Date : 2020/1/2 +// Location: beijing , china +///////////////////////////////////////////////////////////// +package com.gitee.dbswitch.admin.execution; + +import cn.hutool.core.exceptions.ExceptionUtil; +import cn.hutool.extra.spring.SpringUtil; +import com.gitee.dbswitch.admin.config.ExecutorConfig; +import com.gitee.dbswitch.admin.dao.AssignmentConfigDAO; +import com.gitee.dbswitch.admin.dao.AssignmentJobDAO; +import com.gitee.dbswitch.admin.dao.AssignmentTaskDAO; +import com.gitee.dbswitch.admin.entity.AssignmentConfigEntity; +import com.gitee.dbswitch.admin.entity.AssignmentJobEntity; +import com.gitee.dbswitch.admin.entity.AssignmentTaskEntity; +import com.gitee.dbswitch.admin.logback.LogbackAppenderRegister; +import com.gitee.dbswitch.admin.type.JobStatusEnum; +import com.gitee.dbswitch.common.entity.MdcKeyValue; +import com.gitee.dbswitch.data.config.DbswichProperties; +import com.gitee.dbswitch.data.service.MigrationService; +import com.gitee.dbswitch.data.util.JsonUtils; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import java.sql.Timestamp; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; +import lombok.extern.slf4j.Slf4j; +import org.springframework.core.task.AsyncTaskExecutor; + +@Slf4j +public class ExecuteJobTaskRunnable implements Runnable { + + private final static String MDC_KEY = LogbackAppenderRegister.LOG_MDC_KEY_NAME; + + // 相同taskId的任务限制并发执行的粒度锁缓存对象 + private static Cache mutexes = CacheBuilder.newBuilder() + .expireAfterWrite(24 * 60L, TimeUnit.MINUTES) + .build(); + + private volatile boolean interrupted = false; + + private MigrationService migrationService; + + private AssignmentTaskDAO assignmentTaskDAO; + + private AssignmentConfigDAO assignmentConfigDAO; + + private AssignmentJobDAO assignmentJobDAO; + + private AsyncTaskExecutor migrationTaskExecutor; + + private Long taskId; + + private Integer schedule; + + private String keyName; + + public ExecuteJobTaskRunnable(Long taskId, Integer schedule, String keyName) { + this.assignmentTaskDAO = SpringUtil.getBean(AssignmentTaskDAO.class); + this.assignmentConfigDAO = SpringUtil.getBean(AssignmentConfigDAO.class); + this.assignmentJobDAO = SpringUtil.getBean(AssignmentJobDAO.class); + this.migrationTaskExecutor = SpringUtil.getBean( + ExecutorConfig.TASK_EXECUTOR_BEAN_NAME, AsyncTaskExecutor.class); + this.taskId = taskId; + this.schedule = schedule; + this.keyName = keyName; + } + + public void interrupt() { + this.interrupted = true; + if (null != this.migrationService) { + this.migrationService.interrupt(); + } + } + + @Override + public void run() { + AssignmentJobEntity assignmentJobEntity = assignmentJobDAO + .newAssignmentJob(taskId, schedule, keyName); + MdcKeyValue mdcKeyValue = new MdcKeyValue(MDC_KEY, assignmentJobEntity.getId().toString()); + + try { + ReentrantLock lock = mutexes.get(taskId.toString(), ReentrantLock::new); + while (!lock.tryLock(1, TimeUnit.SECONDS)) { + if (interrupted) { + log.info("Quartz task id:{} interrupted when get lock", taskId); + return; + } + TimeUnit.SECONDS.sleep(1); + } + + try { + log.info("Execute Job, and task id is : {} , job id is: {}", + taskId, assignmentJobEntity.getId()); + + AssignmentTaskEntity task = assignmentTaskDAO.getById(taskId); + AssignmentConfigEntity assignmentConfigEntity = assignmentConfigDAO + .getByAssignmentTaskId(task.getId()); + + log.info("Execute Assignment [taskId={}],Task Name: {} ,configuration properties:{}", + task.getId(), + task.getName(), + task.getContent()); + + try { + DbswichProperties properties = JsonUtils.toBeanObject( + task.getContent(), DbswichProperties.class); + if (!assignmentConfigEntity.getFirstFlag()) { + if (!assignmentConfigEntity.getTargetOnlyCreate()) { + properties.getTarget().setTargetDrop(false); + properties.getTarget().setOnlyCreate(false); + properties.getTarget().setChangeDataSync(true); + } + } + if (assignmentConfigEntity.getTargetOnlyCreate()) { + properties.getTarget().setTargetDrop(true); + } + + migrationService = new MigrationService(properties, migrationTaskExecutor); + if (interrupted) { + log.info("Quartz task id:{} interrupted when prepare stage", taskId); + return; + } + + // 实际执行JOB + migrationService.setMdcKeyValue(mdcKeyValue); + migrationService.run(); + + if (assignmentConfigEntity.getFirstFlag()) { + AssignmentConfigEntity config = new AssignmentConfigEntity(); + config.setId(assignmentConfigEntity.getId()); + config.setTargetDropTable(assignmentConfigEntity.getTargetOnlyCreate()); + config.setFirstFlag(Boolean.FALSE); + assignmentConfigDAO.updateSelective(config); + } + + assignmentJobEntity.setStatus(JobStatusEnum.PASS.getValue()); + log.info("Execute Assignment Success [taskId={},jobId={}],Task Name: {}", + task.getId(), assignmentJobEntity.getId(), task.getName()); + } catch (Throwable e) { + assignmentJobEntity.setStatus(JobStatusEnum.FAIL.getValue()); + assignmentJobEntity.setErrorLog(ExceptionUtil.stacktraceToString(e)); + log.info("Execute Assignment Failed [taskId={},jobId={}],Task Name: {}", + task.getId(), assignmentJobEntity.getId(), task.getName(), e); + } finally { + assignmentJobEntity.setFinishTime(new Timestamp(System.currentTimeMillis())); + assignmentJobDAO.updateSelective(assignmentJobEntity); + } + } finally { + lock.unlock(); + } + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + } + +} diff --git a/dbswitch-admin/src/main/java/com/gitee/dbswitch/admin/service/JobExecutorService.java b/dbswitch-admin/src/main/java/com/gitee/dbswitch/admin/service/JobExecutorService.java index b1e2587a..942f1bba 100644 --- a/dbswitch-admin/src/main/java/com/gitee/dbswitch/admin/service/JobExecutorService.java +++ b/dbswitch-admin/src/main/java/com/gitee/dbswitch/admin/service/JobExecutorService.java @@ -9,27 +9,8 @@ ///////////////////////////////////////////////////////////// package com.gitee.dbswitch.admin.service; -import cn.hutool.core.exceptions.ExceptionUtil; -import com.gitee.dbswitch.admin.dao.AssignmentConfigDAO; -import com.gitee.dbswitch.admin.dao.AssignmentJobDAO; -import com.gitee.dbswitch.admin.dao.AssignmentTaskDAO; -import com.gitee.dbswitch.admin.entity.AssignmentConfigEntity; -import com.gitee.dbswitch.admin.entity.AssignmentJobEntity; -import com.gitee.dbswitch.admin.entity.AssignmentTaskEntity; -import com.gitee.dbswitch.admin.logback.LogbackAppenderRegister; -import com.gitee.dbswitch.common.entity.MdcKeyValue; -import com.gitee.dbswitch.admin.type.JobStatusEnum; -import com.gitee.dbswitch.data.util.JsonUtils; -import com.gitee.dbswitch.data.config.DbswichProperties; -import com.gitee.dbswitch.data.service.MigrationService; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import java.sql.Timestamp; +import com.gitee.dbswitch.admin.execution.ExecuteJobTaskRunnable; import java.util.Objects; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantLock; -import javax.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.quartz.DisallowConcurrentExecution; import org.quartz.InterruptableJob; @@ -39,7 +20,6 @@ import org.quartz.JobExecutionException; import org.quartz.JobKey; import org.quartz.PersistJobDataAfterExecution; import org.quartz.UnableToInterruptJobException; -import org.springframework.core.task.AsyncTaskExecutor; import org.springframework.scheduling.quartz.QuartzJobBean; /** @@ -58,13 +38,6 @@ public class JobExecutorService extends QuartzJobBean implements InterruptableJo public final static String TASK_ID = "taskId"; public final static String SCHEDULE = "schedule"; - private final static String MDC_KEY = LogbackAppenderRegister.LOG_MDC_KEY_NAME; - - // 相同taskId的任务限制并发执行的粒度锁缓存对象 - private static Cache mutexes = CacheBuilder.newBuilder() - .expireAfterWrite(24 * 60L, TimeUnit.MINUTES) - .build(); - /** * 作为一个是否被中断的标识 */ @@ -81,24 +54,9 @@ public class JobExecutorService extends QuartzJobBean implements InterruptableJo private String taskId; /** - * 迁移服务类 + * JOB实体 */ - private MigrationService migrationService; - - /** - * 这里可以使用Spring容器中的bean进行注入 - */ - @Resource - private AssignmentTaskDAO assignmentTaskDAO; - - @Resource - private AssignmentConfigDAO assignmentConfigDAO; - - @Resource - private AssignmentJobDAO assignmentJobDAO; - - @Resource - private AsyncTaskExecutor migrationTaskExecutor; + private ExecuteJobTaskRunnable taskRunnable; /** * 实现setter方法,Quartz会给成员变量taskId注入值 @@ -113,8 +71,8 @@ public class JobExecutorService extends QuartzJobBean implements InterruptableJo public void interrupt() throws UnableToInterruptJobException { log.info("Quartz Schedule Task job is interrupting : taskId={} ", taskId); interrupted = true; - if (Objects.nonNull(migrationService)) { - migrationService.interrupt(); + if (Objects.nonNull(taskRunnable)) { + taskRunnable.interrupt(); } currentThread.interrupt(); } @@ -131,84 +89,8 @@ public class JobExecutorService extends QuartzJobBean implements InterruptableJo JobKey key = context.getJobDetail().getKey(); Long taskId = jobDataMap.getLongValue(TASK_ID); Integer schedule = jobDataMap.getIntValue(SCHEDULE); - AssignmentJobEntity assignmentJobEntity = assignmentJobDAO - .newAssignmentJob(taskId, schedule, key.getName()); - MdcKeyValue mdcKeyValue = new MdcKeyValue(MDC_KEY, assignmentJobEntity.getId().toString()); - - try { - ReentrantLock lock = mutexes.get(taskId.toString(), ReentrantLock::new); - while (!lock.tryLock(1, TimeUnit.SECONDS)) { - if (interrupted) { - log.info("Quartz task id:{} interrupted when get lock", jobDataMap.getLong(TASK_ID)); - return; - } - TimeUnit.SECONDS.sleep(1); - } - - try { - log.info("Execute Quartz Job, and task id is : {} , job id is: {}", taskId, - assignmentJobEntity.getId()); - - AssignmentTaskEntity task = assignmentTaskDAO.getById(taskId); - AssignmentConfigEntity assignmentConfigEntity = assignmentConfigDAO - .getByAssignmentTaskId(task.getId()); - - log.info("Execute Assignment [taskId={}],Task Name: {} ,configuration properties:{}", - task.getId(), - task.getName(), - task.getContent()); - - try { - DbswichProperties properties = JsonUtils.toBeanObject( - task.getContent(), DbswichProperties.class); - if (!assignmentConfigEntity.getFirstFlag()) { - if (!assignmentConfigEntity.getTargetOnlyCreate()) { - properties.getTarget().setTargetDrop(false); - properties.getTarget().setOnlyCreate(false); - properties.getTarget().setChangeDataSync(true); - } - } - if (assignmentConfigEntity.getTargetOnlyCreate()) { - properties.getTarget().setTargetDrop(true); - } - - migrationService = new MigrationService(properties, migrationTaskExecutor); - if (interrupted) { - log.info("Quartz task id:{} interrupted when prepare stage", jobDataMap.getLong(TASK_ID)); - return; - } - - // 实际执行JOB - migrationService.setMdcKeyValue(mdcKeyValue); - migrationService.run(); - - if (assignmentConfigEntity.getFirstFlag()) { - AssignmentConfigEntity config = new AssignmentConfigEntity(); - config.setId(assignmentConfigEntity.getId()); - config.setTargetDropTable(assignmentConfigEntity.getTargetOnlyCreate()); - config.setFirstFlag(Boolean.FALSE); - assignmentConfigDAO.updateSelective(config); - } - - assignmentJobEntity.setStatus(JobStatusEnum.PASS.getValue()); - log.info("Execute Assignment Success [taskId={},jobId={}],Task Name: {}", - task.getId(), assignmentJobEntity.getId(), task.getName()); - } catch (Throwable e) { - assignmentJobEntity.setStatus(JobStatusEnum.FAIL.getValue()); - assignmentJobEntity.setErrorLog(ExceptionUtil.stacktraceToString(e)); - log.info("Execute Assignment Failed [taskId={},jobId={}],Task Name: {}", - task.getId(), assignmentJobEntity.getId(), task.getName(), e); - } finally { - assignmentJobEntity.setFinishTime(new Timestamp(System.currentTimeMillis())); - assignmentJobDAO.updateSelective(assignmentJobEntity); - } - } finally { - lock.unlock(); - } - } catch (ExecutionException | InterruptedException e) { - throw new RuntimeException(e); - } - + taskRunnable = new ExecuteJobTaskRunnable(taskId, schedule, key.getName()); + taskRunnable.run(); } } diff --git a/dbswitch-admin/src/main/java/com/gitee/dbswitch/admin/service/ScheduleService.java b/dbswitch-admin/src/main/java/com/gitee/dbswitch/admin/service/ScheduleService.java index 0aeadf8d..d6405106 100644 --- a/dbswitch-admin/src/main/java/com/gitee/dbswitch/admin/service/ScheduleService.java +++ b/dbswitch-admin/src/main/java/com/gitee/dbswitch/admin/service/ScheduleService.java @@ -13,12 +13,15 @@ import com.gitee.dbswitch.admin.dao.AssignmentJobDAO; import com.gitee.dbswitch.admin.dao.AssignmentTaskDAO; import com.gitee.dbswitch.admin.entity.AssignmentJobEntity; import com.gitee.dbswitch.admin.entity.AssignmentTaskEntity; +import com.gitee.dbswitch.admin.execution.ExecuteJobTaskRunnable; import com.gitee.dbswitch.admin.type.JobStatusEnum; import com.gitee.dbswitch.admin.type.ScheduleModeEnum; +import com.gitee.dbswitch.common.event.TaskEventHub; import com.gitee.dbswitch.common.util.UuidUtils; import java.sql.Timestamp; -import java.util.Date; +import java.util.Map; import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; import javax.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -28,11 +31,12 @@ import org.quartz.JobDetail; import org.quartz.JobKey; import org.quartz.Scheduler; import org.quartz.SchedulerException; -import org.quartz.SimpleScheduleBuilder; import org.quartz.Trigger; import org.quartz.TriggerBuilder; import org.quartz.TriggerKey; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.event.EventListener; import org.springframework.scheduling.quartz.SchedulerFactoryBean; import org.springframework.stereotype.Service; @@ -40,6 +44,12 @@ import org.springframework.stereotype.Service; @Service public class ScheduleService { + private static final String evenName = "start"; + + static { + TaskEventHub.init(5); + } + /** * @Bean是一个方法级别上的注解,Bean的ID为方法名字。 * @Resource默认按照ByName自动注入 @@ -54,6 +64,32 @@ public class ScheduleService { @Resource private AssignmentJobDAO assignmentJobDAO; + private TaskEventHub manualRunEvenHub = new TaskEventHub("manualRun"); + + private Map taskRunnableMap = new ConcurrentHashMap<>(); + + @EventListener(ApplicationReadyEvent.class) + public void registerEventListener() { + manualRunEvenHub.listen( + evenName, + (event) -> { + event.checkArgs(Long.class, String.class); + Long taskId = (Long) event.getArgs()[0]; + Integer schedule = ScheduleModeEnum.MANUAL.getValue(); + String jobKey = (String) event.getArgs()[1]; + ExecuteJobTaskRunnable taskRunnable + = new ExecuteJobTaskRunnable(taskId, schedule, jobKey); + taskRunnableMap.put(jobKey, taskRunnable); + try { + taskRunnable.run(); + } finally { + taskRunnableMap.remove(jobKey); + } + return null; + } + ); + } + public void scheduleTask(Long taskId, ScheduleModeEnum scheduleMode) { /** 准备JobDetail */ String jobKeyName = UuidUtils.generateUuid() + "@" + taskId.toString(); @@ -70,11 +106,12 @@ public class ScheduleService { String triggerGroup = JobExecutorService.GROUP; TriggerKey triggerKey = TriggerKey.triggerKey(triggerName, triggerGroup); - log.info("Create schedule task, taskId: {}", taskId); + log.info("Create schedule task, taskId: {}, jobKey: {}", taskId, jobKeyName); AssignmentTaskEntity task = assignmentTaskDAO.getById(taskId); if (ScheduleModeEnum.MANUAL == scheduleMode) { - scheduleOnce(jobBuilder.storeDurably(false).build(), triggerKey); + manualRunEvenHub.notify(evenName, taskId, jobKeyName); + //scheduleOnce(jobBuilder.storeDurably(false).build(), triggerKey); } else { scheduleCron(jobBuilder.storeDurably(true).build(), triggerKey, task.getCronExpression()); } @@ -88,6 +125,12 @@ public class ScheduleService { return; } + ExecuteJobTaskRunnable runnable = taskRunnableMap.get(jobKeyName); + if (null != runnable) { + runnable.interrupt(); + return; + } + String jobGroup = JobExecutorService.GROUP; JobKey jobKey = JobKey.jobKey(jobKeyName, jobGroup); @@ -122,23 +165,23 @@ public class ScheduleService { } } - private void scheduleOnce(JobDetail jobDetail, TriggerKey triggerKey) { - Scheduler scheduler = schedulerFactoryBean.getScheduler(); - Trigger simpleTrigger = TriggerBuilder.newTrigger() - .startAt(new Date()) - .withIdentity(triggerKey) - .withSchedule(SimpleScheduleBuilder.simpleSchedule().withRepeatCount(0)) - .build(); - - try { - scheduler.scheduleJob(jobDetail, simpleTrigger); - } catch (SchedulerException e) { - log.error("Quartz schedule task by manual failed, taskId: {}.", - jobDetail.getJobDataMap().get(JobExecutorService.TASK_ID), e); - throw new RuntimeException(e); - } - - } +// private void scheduleOnce(JobDetail jobDetail, TriggerKey triggerKey) { +// Scheduler scheduler = schedulerFactoryBean.getScheduler(); +// Trigger simpleTrigger = TriggerBuilder.newTrigger() +// .startAt(new Date()) +// .withIdentity(triggerKey) +// .withSchedule(SimpleScheduleBuilder.simpleSchedule().withRepeatCount(0)) +// .build(); +// +// try { +// scheduler.scheduleJob(jobDetail, simpleTrigger); +// } catch (SchedulerException e) { +// log.error("Quartz schedule task by manual failed, taskId: {}.", +// jobDetail.getJobDataMap().get(JobExecutorService.TASK_ID), e); +// throw new RuntimeException(e); +// } +// +// } private void scheduleCron(JobDetail jobDetail, TriggerKey triggerKey, String cronExpression) { Scheduler scheduler = schedulerFactoryBean.getScheduler(); diff --git a/dbswitch-common/src/main/java/com/gitee/dbswitch/common/event/ListenEvent.java b/dbswitch-common/src/main/java/com/gitee/dbswitch/common/event/ListenedEvent.java similarity index 88% rename from dbswitch-common/src/main/java/com/gitee/dbswitch/common/event/ListenEvent.java rename to dbswitch-common/src/main/java/com/gitee/dbswitch/common/event/ListenedEvent.java index 19323a12..8ba9b11b 100644 --- a/dbswitch-common/src/main/java/com/gitee/dbswitch/common/event/ListenEvent.java +++ b/dbswitch-common/src/main/java/com/gitee/dbswitch/common/event/ListenedEvent.java @@ -15,16 +15,16 @@ import java.util.Collections; import lombok.Getter; @Getter -public class ListenEvent extends java.util.EventObject { +public class ListenedEvent extends java.util.EventObject { private String name; private Object[] args; - public ListenEvent(Object source, String event) { + public ListenedEvent(Object source, String event) { this(source, event, Collections.emptyList().toArray()); } - public ListenEvent(Object source, String event, Object... args) { + public ListenedEvent(Object source, String event, Object... args) { super(source); this.name = event; this.args = args; diff --git a/dbswitch-common/src/main/java/com/gitee/dbswitch/common/event/EventHub.java b/dbswitch-common/src/main/java/com/gitee/dbswitch/common/event/TaskEventHub.java similarity index 70% rename from dbswitch-common/src/main/java/com/gitee/dbswitch/common/event/EventHub.java rename to dbswitch-common/src/main/java/com/gitee/dbswitch/common/event/TaskEventHub.java index 0601738f..4f3ad478 100644 --- a/dbswitch-common/src/main/java/com/gitee/dbswitch/common/event/EventHub.java +++ b/dbswitch-common/src/main/java/com/gitee/dbswitch/common/event/TaskEventHub.java @@ -24,42 +24,36 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; +import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.concurrent.BasicThreadFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -public class EventHub { - - private static final Logger LOG = LoggerFactory.getLogger(EventHub.class); +@Slf4j +public class TaskEventHub { public static final String EVENT_WORKER = "event-worker-%d"; public static final String ANY_EVENT = "*"; - private static final List EMPTY = ImmutableList.of(); + private static final List EMPTY = ImmutableList.of(); // Event executor private static ExecutorService executor = null; private String name; - private Map> listeners; + private Map> listeners; - public EventHub() { - this("hub"); - } - - public EventHub(String name) { - LOG.debug("Create new EventHub: {}", name); + public TaskEventHub(String name) { + log.info("Create new EventHub: {}", name); this.name = name; this.listeners = new ConcurrentHashMap<>(); - EventHub.init(1); + TaskEventHub.init(1); } public static synchronized void init(int poolSize) { if (executor != null) { return; } - LOG.debug("Init pool(size {}) for EventHub", poolSize); + log.debug("Init pool(size {}) for EventHub", poolSize); executor = new ThreadPoolExecutor( poolSize, poolSize, @@ -74,7 +68,7 @@ public class EventHub { public static synchronized boolean destroy(long timeout) throws InterruptedException { - LOG.debug("Destroy pool for EventHub"); + log.debug("Destroy pool for EventHub"); executor.shutdown(); return executor.awaitTermination(timeout, TimeUnit.SECONDS); } @@ -90,34 +84,34 @@ public class EventHub { } public boolean containsListener(String event) { - List ls = this.listeners.get(event); + List ls = this.listeners.get(event); return ls != null && ls.size() > 0; } - public List listeners(String event) { - List ls = this.listeners.get(event); + public List listeners(String event) { + List ls = this.listeners.get(event); return ls == null ? EMPTY : Collections.unmodifiableList(ls); } - public void listen(String event, EventListener listener) { + public void listen(String event, TaskEventListener listener) { Preconditions.checkNotNull(event, "event"); Preconditions.checkNotNull(listener, "event listener"); if (!this.listeners.containsKey(event)) { this.listeners.putIfAbsent(event, new CopyOnWriteArrayList<>()); } - List ls = this.listeners.get(event); + List ls = this.listeners.get(event); assert ls != null : this.listeners; ls.add(listener); } - public List unlisten(String event) { - List ls = this.listeners.remove(event); + public List unlisten(String event) { + List ls = this.listeners.remove(event); return ls == null ? EMPTY : Collections.unmodifiableList(ls); } - public int unlisten(String event, EventListener listener) { - List ls = this.listeners.get(event); + public int unlisten(String event, TaskEventListener listener) { + List ls = this.listeners.get(event); if (ls == null) { return 0; } @@ -130,13 +124,13 @@ public class EventHub { } public Future notify(String event, @Nullable Object... args) { - List all = Collections.synchronizedList(new ArrayList<>()); + List all = Collections.synchronizedList(new ArrayList<>()); - List ls = this.listeners.get(event); + List ls = this.listeners.get(event); if (ls != null && !ls.isEmpty()) { all.addAll(ls); } - List lsAny = this.listeners.get(ANY_EVENT); + List lsAny = this.listeners.get(ANY_EVENT); if (lsAny != null && !lsAny.isEmpty()) { all.addAll(lsAny); } @@ -145,18 +139,18 @@ public class EventHub { return CompletableFuture.completedFuture(0); } - ListenEvent ev = new ListenEvent(this, event, args); + ListenedEvent ev = new ListenedEvent(this, event, args); // The submit will catch params: `all`(Listeners) and `ev`(Event) return executor().submit(() -> { int count = 0; // Notify all listeners, and ignore the results - for (EventListener listener : all) { + for (TaskEventListener listener : all) { try { listener.event(ev); count++; } catch (Throwable e) { - LOG.warn("Failed to handle event: {}", ev, e); + log.warn("Failed to handle event: {}", ev, e); } } return count; @@ -164,14 +158,14 @@ public class EventHub { } public Object call(String event, @Nullable Object... args) { - List ls = this.listeners.get(event); + List ls = this.listeners.get(event); if (ls == null) { throw new RuntimeException("Not found listener for: " + event); } else if (ls.size() != 1) { throw new RuntimeException("Too many listeners for: " + event); } - EventListener listener = ls.get(0); - return listener.event(new ListenEvent(this, event, args)); + TaskEventListener listener = ls.get(0); + return listener.event(new ListenedEvent(this, event, args)); } } \ No newline at end of file diff --git a/dbswitch-common/src/main/java/com/gitee/dbswitch/common/event/EventListener.java b/dbswitch-common/src/main/java/com/gitee/dbswitch/common/event/TaskEventListener.java similarity index 80% rename from dbswitch-common/src/main/java/com/gitee/dbswitch/common/event/EventListener.java rename to dbswitch-common/src/main/java/com/gitee/dbswitch/common/event/TaskEventListener.java index d0351622..b2049ba9 100644 --- a/dbswitch-common/src/main/java/com/gitee/dbswitch/common/event/EventListener.java +++ b/dbswitch-common/src/main/java/com/gitee/dbswitch/common/event/TaskEventListener.java @@ -9,7 +9,7 @@ ///////////////////////////////////////////////////////////// package com.gitee.dbswitch.common.event; -public interface EventListener extends java.util.EventListener { +public interface TaskEventListener extends java.util.EventListener { /** * The event callback @@ -17,5 +17,5 @@ public interface EventListener extends java.util.EventListener { * @param event object * @return event result */ - Object event(ListenEvent event); + Object event(ListenedEvent event); } \ No newline at end of file diff --git a/dbswitch-common/src/main/java/com/gitee/dbswitch/common/util/JdbcTypesUtils.java b/dbswitch-common/src/main/java/com/gitee/dbswitch/common/util/JdbcTypesUtils.java index 0daa45d6..a496fa25 100644 --- a/dbswitch-common/src/main/java/com/gitee/dbswitch/common/util/JdbcTypesUtils.java +++ b/dbswitch-common/src/main/java/com/gitee/dbswitch/common/util/JdbcTypesUtils.java @@ -142,7 +142,7 @@ public final class JdbcTypesUtils { } if (isBinary(jdbcType)) { - byte[] bytes = Convert.toPrimitiveByteArray(value); + byte[] bytes = TypeConvertUtils.castToByteArray(value); return null == bytes ? 0 : bytes.length; } else if (isBoolean(jdbcType)) { return 1; @@ -151,4 +151,5 @@ public final class JdbcTypesUtils { return null == strValue ? 0 : strValue.length(); } } + } diff --git a/dbswitch-core/src/main/java/com/gitee/dbswitch/provider/query/DefaultTableDataQueryProvider.java b/dbswitch-core/src/main/java/com/gitee/dbswitch/provider/query/DefaultTableDataQueryProvider.java index 20cc1c60..39457c78 100644 --- a/dbswitch-core/src/main/java/com/gitee/dbswitch/provider/query/DefaultTableDataQueryProvider.java +++ b/dbswitch-core/src/main/java/com/gitee/dbswitch/provider/query/DefaultTableDataQueryProvider.java @@ -66,7 +66,7 @@ public class DefaultTableDataQueryProvider sb.append(productType.quoteSchemaTableName(schemaName, tableName)); if (CollectionUtils.isNotEmpty(orders)) { sb.append(" ORDER BY "); - sb.append(StringUtils.join(orders, productType.quoteName(","))); + sb.append(productType.quoteName(StringUtils.join(orders, productType.quoteName(",")))); } return this.selectTableData(sb.toString(), getProductFeatures().convertFetchSize(this.fetchSize)); }