mirror of
https://gitee.com/dromara/dbswitch.git
synced 2025-10-14 22:00:23 +00:00
修复issue反馈的问题
This commit is contained in:
@@ -17,6 +17,7 @@ import com.gitee.dbswitch.admin.execution.ExecuteJobTaskRunnable;
|
|||||||
import com.gitee.dbswitch.admin.type.JobStatusEnum;
|
import com.gitee.dbswitch.admin.type.JobStatusEnum;
|
||||||
import com.gitee.dbswitch.admin.type.ScheduleModeEnum;
|
import com.gitee.dbswitch.admin.type.ScheduleModeEnum;
|
||||||
import com.gitee.dbswitch.common.event.EventSubscriber;
|
import com.gitee.dbswitch.common.event.EventSubscriber;
|
||||||
|
import com.gitee.dbswitch.common.event.ExceptionHandler;
|
||||||
import com.gitee.dbswitch.common.event.ListenedEvent;
|
import com.gitee.dbswitch.common.event.ListenedEvent;
|
||||||
import com.gitee.dbswitch.common.event.TaskEventHub;
|
import com.gitee.dbswitch.common.event.TaskEventHub;
|
||||||
import com.gitee.dbswitch.common.util.UuidUtils;
|
import com.gitee.dbswitch.common.util.UuidUtils;
|
||||||
@@ -44,7 +45,7 @@ import org.springframework.stereotype.Service;
|
|||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@Service
|
@Service
|
||||||
public class ScheduleService implements InitializingBean {
|
public class ScheduleService implements InitializingBean, ExceptionHandler {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @Bean是一个方法级别上的注解,Bean的ID为方法名字。
|
* @Bean是一个方法级别上的注解,Bean的ID为方法名字。
|
||||||
@@ -60,14 +61,20 @@ public class ScheduleService implements InitializingBean {
|
|||||||
@Resource
|
@Resource
|
||||||
private AssignmentJobDAO assignmentJobDAO;
|
private AssignmentJobDAO assignmentJobDAO;
|
||||||
|
|
||||||
private TaskEventHub taskEventBus = new TaskEventHub("manualRun", 5);
|
private TaskEventHub taskEventBus = new TaskEventHub("manual-run", 5, this);
|
||||||
|
|
||||||
private Map<String, ExecuteJobTaskRunnable> taskRunnableMap = new ConcurrentHashMap<>();
|
private Map<String, ExecuteJobTaskRunnable> taskRunnableMap = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
@Override
|
||||||
public void afterPropertiesSet() throws Exception {
|
public void afterPropertiesSet() throws Exception {
|
||||||
taskEventBus.registerSubscriber(new EventSubscriber(this::manualRunTask));
|
taskEventBus.registerSubscriber(new EventSubscriber(this::manualRunTask));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handleException(ListenedEvent event, Throwable throwable) {
|
||||||
|
log.warn("Failed to handle event: {}", event, throwable);
|
||||||
|
}
|
||||||
|
|
||||||
private void manualRunTask(ListenedEvent event) {
|
private void manualRunTask(ListenedEvent event) {
|
||||||
event.checkArgs(Long.class, String.class);
|
event.checkArgs(Long.class, String.class);
|
||||||
Long taskId = (Long) event.getArgs()[0];
|
Long taskId = (Long) event.getArgs()[0];
|
||||||
|
@@ -0,0 +1,15 @@
|
|||||||
|
// Copyright tang. All rights reserved.
|
||||||
|
// https://gitee.com/inrgihc/dbswitch
|
||||||
|
//
|
||||||
|
// Use of this source code is governed by a BSD-style license
|
||||||
|
//
|
||||||
|
// Author: tang (inrgihc@126.com)
|
||||||
|
// Date : 2020/1/2
|
||||||
|
// Location: beijing , china
|
||||||
|
/////////////////////////////////////////////////////////////
|
||||||
|
package com.gitee.dbswitch.common.event;
|
||||||
|
|
||||||
|
public interface ExceptionHandler {
|
||||||
|
|
||||||
|
void handleException(ListenedEvent event, Throwable throwable);
|
||||||
|
}
|
@@ -9,7 +9,11 @@
|
|||||||
/////////////////////////////////////////////////////////////
|
/////////////////////////////////////////////////////////////
|
||||||
package com.gitee.dbswitch.common.event;
|
package com.gitee.dbswitch.common.event;
|
||||||
|
|
||||||
import com.google.common.eventbus.AsyncEventBus;
|
import com.google.common.collect.Lists;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
@@ -18,29 +22,41 @@ import org.apache.commons.lang3.concurrent.BasicThreadFactory;
|
|||||||
|
|
||||||
public class TaskEventHub {
|
public class TaskEventHub {
|
||||||
|
|
||||||
private AsyncEventBus asyncEventBus;
|
private String identifier;
|
||||||
|
private ExecutorService executor;
|
||||||
|
private List<EventSubscriber> eventSubscribers;
|
||||||
|
private ExceptionHandler exceptionHandler;
|
||||||
|
|
||||||
public TaskEventHub(String identifier, int poolSize) {
|
public TaskEventHub(String identifier, int poolSize, ExceptionHandler handler) {
|
||||||
this.asyncEventBus = new AsyncEventBus(identifier,
|
this.identifier = Objects.requireNonNull(identifier, "identifier must not be null");
|
||||||
new ThreadPoolExecutor(
|
this.executor = new ThreadPoolExecutor(
|
||||||
poolSize,
|
poolSize,
|
||||||
poolSize,
|
poolSize,
|
||||||
0L,
|
0L,
|
||||||
TimeUnit.MILLISECONDS,
|
TimeUnit.MILLISECONDS,
|
||||||
new LinkedBlockingQueue<>(),
|
new LinkedBlockingQueue<>(),
|
||||||
new BasicThreadFactory.Builder()
|
new BasicThreadFactory.Builder()
|
||||||
.namingPattern(identifier + "-%d")
|
.namingPattern(this.identifier + "-%d")
|
||||||
.build()
|
.build()
|
||||||
)
|
|
||||||
);
|
);
|
||||||
|
this.eventSubscribers = new CopyOnWriteArrayList<>();
|
||||||
|
this.exceptionHandler = handler;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void registerSubscriber(EventSubscriber subscriber) {
|
public void registerSubscriber(EventSubscriber subscriber) {
|
||||||
asyncEventBus.register(subscriber);
|
this.eventSubscribers.add(subscriber);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void notifyEvent(@Nullable Object... args) {
|
public void notifyEvent(@Nullable Object... args) {
|
||||||
String identifier = asyncEventBus.identifier();
|
ListenedEvent event = new ListenedEvent(this, identifier, args);
|
||||||
asyncEventBus.post(new ListenedEvent(this, identifier, args));
|
for (EventSubscriber subscriber : Lists.newArrayList(eventSubscribers)) {
|
||||||
|
this.executor.submit(() -> {
|
||||||
|
try {
|
||||||
|
subscriber.handleEvent(event);
|
||||||
|
} catch (Throwable e) {
|
||||||
|
exceptionHandler.handleException(event, e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user