diff --git a/dbswitch-admin/src/main/java/com/gitee/dbswitch/admin/service/ScheduleService.java b/dbswitch-admin/src/main/java/com/gitee/dbswitch/admin/service/ScheduleService.java index 650a5baf..f3a68f87 100644 --- a/dbswitch-admin/src/main/java/com/gitee/dbswitch/admin/service/ScheduleService.java +++ b/dbswitch-admin/src/main/java/com/gitee/dbswitch/admin/service/ScheduleService.java @@ -17,6 +17,7 @@ import com.gitee.dbswitch.admin.execution.ExecuteJobTaskRunnable; import com.gitee.dbswitch.admin.type.JobStatusEnum; import com.gitee.dbswitch.admin.type.ScheduleModeEnum; import com.gitee.dbswitch.common.event.EventSubscriber; +import com.gitee.dbswitch.common.event.ExceptionHandler; import com.gitee.dbswitch.common.event.ListenedEvent; import com.gitee.dbswitch.common.event.TaskEventHub; import com.gitee.dbswitch.common.util.UuidUtils; @@ -44,7 +45,7 @@ import org.springframework.stereotype.Service; @Slf4j @Service -public class ScheduleService implements InitializingBean { +public class ScheduleService implements InitializingBean, ExceptionHandler { /** * @Bean是一个方法级别上的注解,Bean的ID为方法名字。 @@ -60,14 +61,20 @@ public class ScheduleService implements InitializingBean { @Resource private AssignmentJobDAO assignmentJobDAO; - private TaskEventHub taskEventBus = new TaskEventHub("manualRun", 5); + private TaskEventHub taskEventBus = new TaskEventHub("manual-run", 5, this); private Map taskRunnableMap = new ConcurrentHashMap<>(); + @Override public void afterPropertiesSet() throws Exception { taskEventBus.registerSubscriber(new EventSubscriber(this::manualRunTask)); } + @Override + public void handleException(ListenedEvent event, Throwable throwable) { + log.warn("Failed to handle event: {}", event, throwable); + } + private void manualRunTask(ListenedEvent event) { event.checkArgs(Long.class, String.class); Long taskId = (Long) event.getArgs()[0]; diff --git a/dbswitch-common/src/main/java/com/gitee/dbswitch/common/event/ExceptionHandler.java b/dbswitch-common/src/main/java/com/gitee/dbswitch/common/event/ExceptionHandler.java new file mode 100644 index 00000000..ed0ff1c7 --- /dev/null +++ b/dbswitch-common/src/main/java/com/gitee/dbswitch/common/event/ExceptionHandler.java @@ -0,0 +1,15 @@ +// Copyright tang. All rights reserved. +// https://gitee.com/inrgihc/dbswitch +// +// Use of this source code is governed by a BSD-style license +// +// Author: tang (inrgihc@126.com) +// Date : 2020/1/2 +// Location: beijing , china +///////////////////////////////////////////////////////////// +package com.gitee.dbswitch.common.event; + +public interface ExceptionHandler { + + void handleException(ListenedEvent event, Throwable throwable); +} diff --git a/dbswitch-common/src/main/java/com/gitee/dbswitch/common/event/TaskEventHub.java b/dbswitch-common/src/main/java/com/gitee/dbswitch/common/event/TaskEventHub.java index ab209362..b62e8317 100644 --- a/dbswitch-common/src/main/java/com/gitee/dbswitch/common/event/TaskEventHub.java +++ b/dbswitch-common/src/main/java/com/gitee/dbswitch/common/event/TaskEventHub.java @@ -9,7 +9,11 @@ ///////////////////////////////////////////////////////////// package com.gitee.dbswitch.common.event; -import com.google.common.eventbus.AsyncEventBus; +import com.google.common.collect.Lists; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -18,29 +22,41 @@ import org.apache.commons.lang3.concurrent.BasicThreadFactory; public class TaskEventHub { - private AsyncEventBus asyncEventBus; + private String identifier; + private ExecutorService executor; + private List eventSubscribers; + private ExceptionHandler exceptionHandler; - public TaskEventHub(String identifier, int poolSize) { - this.asyncEventBus = new AsyncEventBus(identifier, - new ThreadPoolExecutor( - poolSize, - poolSize, - 0L, - TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<>(), - new BasicThreadFactory.Builder() - .namingPattern(identifier + "-%d") - .build() - ) + public TaskEventHub(String identifier, int poolSize, ExceptionHandler handler) { + this.identifier = Objects.requireNonNull(identifier, "identifier must not be null"); + this.executor = new ThreadPoolExecutor( + poolSize, + poolSize, + 0L, + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(), + new BasicThreadFactory.Builder() + .namingPattern(this.identifier + "-%d") + .build() ); + this.eventSubscribers = new CopyOnWriteArrayList<>(); + this.exceptionHandler = handler; } public void registerSubscriber(EventSubscriber subscriber) { - asyncEventBus.register(subscriber); + this.eventSubscribers.add(subscriber); } public void notifyEvent(@Nullable Object... args) { - String identifier = asyncEventBus.identifier(); - asyncEventBus.post(new ListenedEvent(this, identifier, args)); + ListenedEvent event = new ListenedEvent(this, identifier, args); + for (EventSubscriber subscriber : Lists.newArrayList(eventSubscribers)) { + this.executor.submit(() -> { + try { + subscriber.handleEvent(event); + } catch (Throwable e) { + exceptionHandler.handleException(event, e); + } + }); + } } }