add 新增 ruoyi-powerjob-server 整合 powerjob 框架

update 重构 ruoyi-job 适配 powerjob
remove 移除 xxljob 建议使用 powerjob
This commit is contained in:
疯狂的狮子Li
2023-06-29 12:16:13 +08:00
parent 68d1742da1
commit 65351030ef
206 changed files with 918 additions and 39278 deletions

View File

@@ -55,11 +55,6 @@
<artifactId>ruoyi-common-mybatis</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>ruoyi-common-seata</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>ruoyi-common-job</artifactId>

View File

@@ -0,0 +1,56 @@
package org.dromara.job.processors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import tech.powerjob.common.utils.NetUtils;
import tech.powerjob.worker.core.processor.ProcessResult;
import tech.powerjob.worker.core.processor.TaskContext;
import tech.powerjob.worker.core.processor.TaskResult;
import tech.powerjob.worker.core.processor.sdk.BroadcastProcessor;
import tech.powerjob.worker.log.OmsLogger;
import java.util.List;
/**
* 广播处理器 示例
*
* @author tjq
* @since 2020/4/17
*/
@Slf4j
@Component
public class BroadcastProcessorDemo implements BroadcastProcessor {
@Override
public ProcessResult preProcess(TaskContext context) {
System.out.println("===== BroadcastProcessorDemo#preProcess ======");
context.getOmsLogger().info("BroadcastProcessorDemo#preProcess, current host: {}", NetUtils.getLocalHost());
if ("rootFailed".equals(context.getJobParams())) {
return new ProcessResult(false, "console need failed");
} else {
return new ProcessResult(true);
}
}
@Override
public ProcessResult process(TaskContext taskContext) throws Exception {
OmsLogger logger = taskContext.getOmsLogger();
System.out.println("===== BroadcastProcessorDemo#process ======");
logger.info("BroadcastProcessorDemo#process, current host: {}", NetUtils.getLocalHost());
long sleepTime = 1000;
try {
sleepTime = Long.parseLong(taskContext.getJobParams());
} catch (Exception e) {
logger.warn("[BroadcastProcessor] parse sleep time failed!", e);
}
Thread.sleep(Math.max(sleepTime, 1000));
return new ProcessResult(true);
}
@Override
public ProcessResult postProcess(TaskContext context, List<TaskResult> taskResults) {
System.out.println("===== BroadcastProcessorDemo#postProcess ======");
context.getOmsLogger().info("BroadcastProcessorDemo#postProcess, current host: {}, taskResult: {}", NetUtils.getLocalHost(), taskResults);
return new ProcessResult(true, "success");
}
}

View File

@@ -0,0 +1,41 @@
package org.dromara.job.processors;
import com.alibaba.fastjson.JSONObject;
import org.springframework.stereotype.Component;
import tech.powerjob.official.processors.util.CommonUtils;
import tech.powerjob.worker.core.processor.ProcessResult;
import tech.powerjob.worker.core.processor.TaskContext;
import tech.powerjob.worker.core.processor.sdk.BasicProcessor;
import tech.powerjob.worker.log.OmsLogger;
import java.util.Date;
import java.util.Optional;
/**
* LogTestProcessor
*
* @author tjq
* @since 2022/9/18
*/
@Component
public class LogTestProcessor implements BasicProcessor {
@Override
public ProcessResult process(TaskContext context) throws Exception {
final OmsLogger omsLogger = context.getOmsLogger();
final String parseParams = CommonUtils.parseParams(context);
final JSONObject config = Optional.ofNullable(JSONObject.parseObject(parseParams)).orElse(new JSONObject());
final long loopTimes = Optional.ofNullable(config.getLong("loopTimes")).orElse(1000L);
for (int i = 0; i < loopTimes; i++) {
omsLogger.debug("[DEBUG] one DEBUG log in {}", new Date());
omsLogger.info("[INFO] one INFO log in {}", new Date());
omsLogger.warn("[WARN] one WARN log in {}", new Date());
omsLogger.error("[ERROR] one ERROR log in {}", new Date());
}
return new ProcessResult(true);
}
}

View File

@@ -0,0 +1,93 @@
package org.dromara.job.processors;
import com.google.common.collect.Lists;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import org.dromara.common.json.utils.JsonUtils;
import org.springframework.stereotype.Component;
import tech.powerjob.worker.core.processor.ProcessResult;
import tech.powerjob.worker.core.processor.TaskContext;
import tech.powerjob.worker.core.processor.sdk.MapProcessor;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
/**
* Map处理器 示例
*
* @author tjq
* @since 2020/4/18
*/
@Component
public class MapProcessorDemo implements MapProcessor {
/**
* 每一批发送任务大小
*/
private static final int BATCH_SIZE = 100;
/**
* 发送的批次
*/
private static final int BATCH_NUM = 5;
@Override
public ProcessResult process(TaskContext context) throws Exception {
log.info("============== MapProcessorDemo#process ==============");
log.info("isRootTask:{}", isRootTask());
log.info("taskContext:{}", JsonUtils.toJsonString(context));
if (isRootTask()) {
log.info("==== MAP ====");
List<SubTask> subTasks = Lists.newLinkedList();
for (int j = 0; j < BATCH_NUM; j++) {
SubTask subTask = new SubTask();
subTask.siteId = j;
subTask.itemIds = Lists.newLinkedList();
subTasks.add(subTask);
for (int i = 0; i < BATCH_SIZE; i++) {
subTask.itemIds.add(i + j * 100);
}
}
map(subTasks, "MAP_TEST_TASK");
return new ProcessResult(true, "map successfully");
} else {
log.info("==== PROCESS ====");
SubTask subTask = (SubTask) context.getSubTask();
for (Integer itemId : subTask.getItemIds()) {
if (Thread.interrupted()) {
// 任务被中断
log.info("job has been stop! so stop to process subTask: {} => {}", subTask.getSiteId(), itemId);
break;
}
log.info("processing subTask: {} => {}", subTask.getSiteId(), itemId);
int max = Integer.MAX_VALUE >> 7;
for (int i = 0; ; i++) {
// 模拟耗时操作
if (i > max) {
break;
}
}
}
// 测试在 Map 任务中追加上下文
context.getWorkflowContext().appendData2WfContext("Yasuo", "A sword's poor company for a long road.");
boolean b = ThreadLocalRandom.current().nextBoolean();
if (context.getCurrentRetryTimes() >= 1) {
// 重试的话一定会成功
b = true;
}
return new ProcessResult(b, "RESULT:" + b);
}
}
@Getter
@NoArgsConstructor
@AllArgsConstructor
public static class SubTask {
private Integer siteId;
private List<Integer> itemIds;
}
}

View File

@@ -0,0 +1,93 @@
package org.dromara.job.processors;
import cn.hutool.core.lang.Dict;
import com.google.common.collect.Lists;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.dromara.common.json.utils.JsonUtils;
import org.springframework.stereotype.Component;
import tech.powerjob.worker.core.processor.ProcessResult;
import tech.powerjob.worker.core.processor.TaskContext;
import tech.powerjob.worker.core.processor.TaskResult;
import tech.powerjob.worker.core.processor.sdk.MapReduceProcessor;
import tech.powerjob.worker.log.OmsLogger;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
/**
* MapReduce 处理器示例
* 控制台参数:{"batchSize": 100, "batchNum": 2}
*
* @author tjq
* @since 2020/4/17
*/
@Slf4j
@Component
public class MapReduceProcessorDemo implements MapReduceProcessor {
@Override
public ProcessResult process(TaskContext context) throws Exception {
OmsLogger omsLogger = context.getOmsLogger();
log.info("============== TestMapReduceProcessor#process ==============");
log.info("isRootTask:{}", isRootTask());
log.info("taskContext:{}", JsonUtils.toJsonString(context));
// 根据控制台参数获取MR批次及子任务大小
final Dict jobParams = JsonUtils.parseMap(context.getJobParams());
Integer batchSize = (Integer) jobParams.getOrDefault("batchSize", 100);
Integer batchNum = (Integer) jobParams.getOrDefault("batchNum", 10);
if (isRootTask()) {
log.info("==== MAP ====");
omsLogger.info("[DemoMRProcessor] start root task~");
List<TestSubTask> subTasks = Lists.newLinkedList();
for (int j = 0; j < batchNum; j++) {
for (int i = 0; i < batchSize; i++) {
int x = j * batchSize + i;
subTasks.add(new TestSubTask("name" + x, x));
}
map(subTasks, "MAP_TEST_TASK");
subTasks.clear();
}
omsLogger.info("[DemoMRProcessor] map success~");
return new ProcessResult(true, "MAP_SUCCESS");
} else {
log.info("==== NORMAL_PROCESS ====");
omsLogger.info("[DemoMRProcessor] process subTask: {}.", JsonUtils.toJsonString(context.getSubTask()));
log.info("subTask: {}", JsonUtils.toJsonString(context.getSubTask()));
Thread.sleep(1000);
if (context.getCurrentRetryTimes() == 0) {
return new ProcessResult(false, "FIRST_FAILED");
} else {
return new ProcessResult(true, "PROCESS_SUCCESS");
}
}
}
@Override
public ProcessResult reduce(TaskContext context, List<TaskResult> taskResults) {
log.info("================ MapReduceProcessorDemo#reduce ================");
log.info("TaskContext: {}", JsonUtils.toJsonString(context));
log.info("List<TaskResult>: {}", JsonUtils.toJsonString(taskResults));
context.getOmsLogger().info("MapReduce job finished, result is {}.", taskResults);
boolean success = ThreadLocalRandom.current().nextBoolean();
return new ProcessResult(success, context + ": " + success);
}
@Getter
@ToString
@NoArgsConstructor
@AllArgsConstructor
public static class TestSubTask {
private String name;
private int age;
}
}

View File

@@ -0,0 +1,35 @@
package org.dromara.job.processors;
import org.springframework.stereotype.Component;
import tech.powerjob.worker.core.processor.ProcessResult;
import tech.powerjob.worker.core.processor.TaskContext;
import tech.powerjob.worker.core.processor.sdk.BasicProcessor;
import tech.powerjob.worker.log.OmsLogger;
import java.util.Optional;
/**
* @author Echo009
* @since 2022/4/27
*/
@Component
public class SimpleProcessor implements BasicProcessor {
@Override
public ProcessResult process(TaskContext context) throws Exception {
OmsLogger logger = context.getOmsLogger();
String jobParams = Optional.ofNullable(context.getJobParams()).orElse("S");
logger.info("Current context:{}", context.getWorkflowContext());
logger.info("Current job params:{}", jobParams);
// 测试中文问题 #581
if (jobParams.contains("CN")) {
return new ProcessResult(true, "任务成功啦!!!");
}
return jobParams.contains("F") ? new ProcessResult(false) : new ProcessResult(true, "yeah!");
}
}

View File

@@ -0,0 +1,51 @@
package org.dromara.job.processors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import tech.powerjob.worker.core.processor.ProcessResult;
import tech.powerjob.worker.core.processor.TaskContext;
import tech.powerjob.worker.core.processor.sdk.BasicProcessor;
import tech.powerjob.worker.log.OmsLogger;
import java.util.Collections;
/**
* 单机处理器 示例
*
* @author tjq
* @since 2020/4/17
*/
@Slf4j
@Component
public class StandaloneProcessorDemo implements BasicProcessor {
@Override
public ProcessResult process(TaskContext context) throws Exception {
OmsLogger omsLogger = context.getOmsLogger();
omsLogger.info("StandaloneProcessorDemo start process,context is {}.", context);
omsLogger.info("Notice! If you want this job process failed, your jobParams need to be 'failed'");
omsLogger.info("Let's test the exception~");
// 测试异常日志
try {
Collections.emptyList().add("277");
} catch (Exception e) {
omsLogger.error("oh~it seems that we have an exception~", e);
}
log.info("================ StandaloneProcessorDemo#process ================");
log.info("jobParam:{}", context.getJobParams());
log.info("instanceParams:{}", context.getInstanceParams());
String param;
// 解析参数,非处于工作流中时,优先取实例参数(允许动态[instanceParams]覆盖静态参数[jobParams]
if (context.getWorkflowContext() == null) {
param = StringUtils.isBlank(context.getInstanceParams()) ? context.getJobParams() : context.getInstanceParams();
} else {
param = context.getJobParams();
}
// 根据参数判断是否成功
boolean success = !"failed".equals(param);
omsLogger.info("StandaloneProcessorDemo finished process,success: {}", success);
omsLogger.info("anyway, we finished the job successfully~Congratulations!");
return new ProcessResult(success, context + ": " + success);
}
}

View File

@@ -0,0 +1,25 @@
package org.dromara.job.processors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import tech.powerjob.worker.core.processor.ProcessResult;
import tech.powerjob.worker.core.processor.TaskContext;
import tech.powerjob.worker.core.processor.sdk.BasicProcessor;
/**
* 测试超时任务(可中断)
*
* @author tjq
* @since 2020/4/20
*/
@Component
@Slf4j
public class TimeoutProcessor implements BasicProcessor {
@Override
public ProcessResult process(TaskContext context) throws Exception {
long sleepTime = Long.parseLong(context.getJobParams());
log.info("TaskInstance({}) will sleep {} ms", context.getInstanceId(), sleepTime);
Thread.sleep(Long.parseLong(context.getJobParams()));
return new ProcessResult(true, "impossible~~~~QAQ~");
}
}

View File

@@ -1,39 +0,0 @@
package org.dromara.job.service;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import io.seata.spring.annotation.GlobalTransactional;
import lombok.extern.slf4j.Slf4j;
import org.apache.dubbo.config.annotation.DubboReference;
import org.dromara.system.api.RemoteUserService;
import org.dromara.system.api.domain.bo.RemoteUserBo;
import org.dromara.system.api.model.LoginUser;
import org.springframework.stereotype.Service;
/**
* XxlJob 多服务调用
*
* @author Lion Li
*/
@Slf4j
@Service
public class MultiService {
@DubboReference
private RemoteUserService remoteUserService;
/**
* 多服务调用
*/
@GlobalTransactional(rollbackFor = Exception.class)
@XxlJob("multiServiceHandler")
public void multiServiceHandler() throws Exception {
LoginUser admin = remoteUserService.getUserInfo("admin", "000000");
XxlJobHelper.log("XXL-JOB, multiServiceHandler result: {}", admin.toString());
RemoteUserBo remoteUserBo = new RemoteUserBo();
remoteUserBo.setUserName("test");
remoteUserBo.setNickName("test");
remoteUserService.registerUserInfo(remoteUserBo);
}
}

View File

@@ -1,253 +0,0 @@
package org.dromara.job.service;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
/**
* XxlJob开发示例Bean模式
* <p>
* 开发步骤:
* 1、任务开发在Spring Bean实例中开发Job方法
* 2、注解配置为Job方法添加注解 "@XxlJob(value="自定义jobhandler名称", init = "JobHandler初始化方法", destroy = "JobHandler销毁方法")"注解value值对应的是调度中心新建任务的JobHandler属性的值。
* 3、执行日志需要通过 "XxlJobHelper.log" 打印执行日志;
* 4、任务结果默认任务结果为 "成功" 状态,不需要主动设置;如有诉求,比如设置任务结果为失败,可以通过 "XxlJobHelper.handleFail/handleSuccess" 自主设置任务结果;
*
* @author xuxueli 2019-12-11 21:52:51
*/
@Slf4j
@Service
public class SampleService {
/**
* 1、简单任务示例Bean模式
*/
@XxlJob("demoJobHandler")
public void demoJobHandler() throws Exception {
XxlJobHelper.log("XXL-JOB, Hello World.");
for (int i = 0; i < 5; i++) {
XxlJobHelper.log("beat at:" + i);
}
// default success
}
/**
* 2、分片广播任务
*/
@XxlJob("shardingJobHandler")
public void shardingJobHandler() throws Exception {
// 分片参数
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();
XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
// 业务逻辑
for (int i = 0; i < shardTotal; i++) {
if (i == shardIndex) {
XxlJobHelper.log("第 {} 片, 命中分片开始处理", i);
} else {
XxlJobHelper.log("第 {} 片, 忽略", i);
}
}
}
/**
* 3、命令行任务
*/
@XxlJob("commandJobHandler")
public void commandJobHandler() throws Exception {
String command = XxlJobHelper.getJobParam();
int exitValue = -1;
BufferedReader bufferedReader = null;
try {
// command process
ProcessBuilder processBuilder = new ProcessBuilder();
processBuilder.command(command);
processBuilder.redirectErrorStream(true);
Process process = processBuilder.start();
//Process process = Runtime.getRuntime().exec(command);
BufferedInputStream bufferedInputStream = new BufferedInputStream(process.getInputStream());
bufferedReader = new BufferedReader(new InputStreamReader(bufferedInputStream));
// command log
String line;
while ((line = bufferedReader.readLine()) != null) {
XxlJobHelper.log(line);
}
// command exit
process.waitFor();
exitValue = process.exitValue();
} catch (Exception e) {
XxlJobHelper.log(e);
} finally {
if (bufferedReader != null) {
bufferedReader.close();
}
}
if (exitValue == 0) {
// default success
} else {
XxlJobHelper.handleFail("command exit value(" + exitValue + ") is failed");
}
}
/**
* 4、跨平台Http任务
* 参数示例:
* "url: http://www.baidu.com\n" +
* "method: get\n" +
* "data: content\n";
*/
@XxlJob("httpJobHandler")
public void httpJobHandler() throws Exception {
// param parse
String param = XxlJobHelper.getJobParam();
if (param == null || param.trim().length() == 0) {
XxlJobHelper.log("param[" + param + "] invalid.");
XxlJobHelper.handleFail();
return;
}
String[] httpParams = param.split("\n");
String url = null;
String method = null;
String data = null;
for (String httpParam : httpParams) {
if (httpParam.startsWith("url:")) {
url = httpParam.substring(httpParam.indexOf("url:") + 4).trim();
}
if (httpParam.startsWith("method:")) {
method = httpParam.substring(httpParam.indexOf("method:") + 7).trim().toUpperCase();
}
if (httpParam.startsWith("data:")) {
data = httpParam.substring(httpParam.indexOf("data:") + 5).trim();
}
}
// param valid
if (url == null || url.trim().length() == 0) {
XxlJobHelper.log("url[" + url + "] invalid.");
XxlJobHelper.handleFail();
return;
}
if (method == null || !Arrays.asList("GET", "POST").contains(method)) {
XxlJobHelper.log("method[" + method + "] invalid.");
XxlJobHelper.handleFail();
return;
}
boolean isPostMethod = method.equals("POST");
// request
HttpURLConnection connection = null;
BufferedReader bufferedReader = null;
try {
// connection
URL realUrl = new URL(url);
connection = (HttpURLConnection) realUrl.openConnection();
// connection setting
connection.setRequestMethod(method);
connection.setDoOutput(isPostMethod);
connection.setDoInput(true);
connection.setUseCaches(false);
connection.setReadTimeout(5 * 1000);
connection.setConnectTimeout(3 * 1000);
connection.setRequestProperty("connection", "Keep-Alive");
connection.setRequestProperty("Content-Type", "application/json;charset=UTF-8");
connection.setRequestProperty("Accept-Charset", "application/json;charset=UTF-8");
// do connection
connection.connect();
// data
if (isPostMethod && data != null && data.trim().length() > 0) {
DataOutputStream dataOutputStream = new DataOutputStream(connection.getOutputStream());
dataOutputStream.write(data.getBytes("UTF-8"));
dataOutputStream.flush();
dataOutputStream.close();
}
// valid StatusCode
int statusCode = connection.getResponseCode();
if (statusCode != 200) {
throw new RuntimeException("Http Request StatusCode(" + statusCode + ") Invalid.");
}
// result
bufferedReader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8"));
StringBuilder result = new StringBuilder();
String line;
while ((line = bufferedReader.readLine()) != null) {
result.append(line);
}
String responseMsg = result.toString();
XxlJobHelper.log(responseMsg);
return;
} catch (Exception e) {
XxlJobHelper.log(e);
XxlJobHelper.handleFail();
return;
} finally {
try {
if (bufferedReader != null) {
bufferedReader.close();
}
if (connection != null) {
connection.disconnect();
}
} catch (Exception e2) {
XxlJobHelper.log(e2);
}
}
}
/**
* 5、生命周期任务示例任务初始化与销毁时支持自定义相关逻辑
*/
@XxlJob(value = "demoJobHandler2", init = "init", destroy = "destroy")
public void demoJobHandler2() throws Exception {
XxlJobHelper.log("XXL-JOB, Hello World.");
}
public void init() {
log.info("init");
}
public void destroy() {
log.info("destory");
}
}

View File

@@ -0,0 +1,36 @@
package org.dromara.job.workflow;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import tech.powerjob.worker.core.processor.ProcessResult;
import tech.powerjob.worker.core.processor.TaskContext;
import tech.powerjob.worker.core.processor.sdk.BasicProcessor;
import tech.powerjob.worker.log.OmsLogger;
import java.util.Map;
/**
* 工作流测试
*
* @author tjq
* @since 2020/6/2
*/
@Component
@Slf4j
public class WorkflowStandaloneProcessor implements BasicProcessor {
@Override
public ProcessResult process(TaskContext context) throws Exception {
OmsLogger logger = context.getOmsLogger();
logger.info("current jobParams: {}", context.getJobParams());
logger.info("current context: {}", context.getWorkflowContext());
log.info("jobParams:{}", context.getJobParams());
log.info("currentContext:{}", JSON.toJSONString(context));
// 尝试获取上游任务
Map<String, String> workflowContext = context.getWorkflowContext().fetchWorkflowContext();
log.info("工作流上下文数据:{}", workflowContext);
return new ProcessResult(true, context.getJobId() + " process successfully.");
}
}