feat(service): 添加定时任务和事件监听服务

This commit is contained in:
DaxPay
2024-10-30 18:51:55 +08:00
parent f50e02046a
commit dcae277c73
4 changed files with 363 additions and 0 deletions

View File

@@ -0,0 +1,81 @@
package org.dromara.daxpay.service.event;
import cn.bootx.platform.starter.redis.delay.annotation.DelayEventListener;
import cn.bootx.platform.starter.redis.delay.annotation.DelayJobEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.dromara.daxpay.core.enums.MerchantNotifyTypeEnum;
import org.dromara.daxpay.service.code.DaxPayCode;
import org.dromara.daxpay.service.common.context.MchAppLocal;
import org.dromara.daxpay.service.common.local.PaymentContextLocal;
import org.dromara.daxpay.service.dao.notice.callback.MerchantCallbackTaskManager;
import org.dromara.daxpay.service.dao.notice.notify.MerchantNotifyTaskManager;
import org.dromara.daxpay.service.service.assist.PaymentAssistService;
import org.dromara.daxpay.service.service.config.MerchantNotifyConfigService;
import org.dromara.daxpay.service.service.notice.callback.MerchantCallbackSendService;
import org.dromara.daxpay.service.service.notice.notify.MerchantNotifySendService;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.util.Objects;
/**
* 商户通知事件服务类
* @author xxm
* @since 2024/8/18
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class MerchantNoticeEventService {
private final MerchantNotifySendService merchantNotifySendService;
private final MerchantCallbackSendService merchantCallbackSendService;
private final MerchantNotifyTaskManager merchantNotifyTaskManager;
private final MerchantCallbackTaskManager merchantCallbackTaskManager;
private final PaymentAssistService paymentAssistService;
private final MerchantNotifyConfigService merchantNotifyConfigService;
/**
* 接受商户通知发送任务的延时消息
*/
@DelayEventListener(DaxPayCode.Event.MERCHANT_NOTIFY_SENDER)
public void NotifyTaskReceiveJob(DelayJobEvent<Long> event){
// 获取任务
Long taskId = event.getMessage();
var taskOpt = merchantNotifyTaskManager.findById(taskId);
if (taskOpt.isPresent()){
var task = taskOpt.get();
paymentAssistService.initMchApp(task.getAppId());
MchAppLocal mchAppInfo = PaymentContextLocal.get().getMchAppInfo();
// 判断通知方式是否为http并且订阅了该类型的通知
boolean subscribe = merchantNotifyConfigService.getSubscribeByAppIdAndType(mchAppInfo.getAppId(), task.getNotifyType());
if (Objects.equals(mchAppInfo.getNotifyType(), MerchantNotifyTypeEnum.HTTP.getCode()) && subscribe){
merchantNotifySendService.sendData(task, mchAppInfo.getNotifyUrl(), LocalDateTime.now(), true);
} else {
log.info("商户消息通知未开启任务ID{}",taskId);
}
} else {
log.error("商户消息通知发送任务不存在任务ID{}",taskId);
}
}
/**
* 接受商户回调消息发送任务的延时消息
*/
@DelayEventListener(DaxPayCode.Event.MERCHANT_CALLBACK_SENDER)
public void callbackReceiveJob(DelayJobEvent<Long> event){
// 获取任务
Long taskId = event.getMessage();
var taskOpt = merchantCallbackTaskManager.findById(taskId);
if (taskOpt.isPresent()){
var task = taskOpt.get();
paymentAssistService.initMchApp(task.getAppId());
merchantCallbackSendService.sendData(task,true);
} else {
log.error("商户回调发送任务不存在任务ID{}",taskId);
}
}
}

View File

@@ -0,0 +1,92 @@
package org.dromara.daxpay.service.event;
import cn.bootx.platform.starter.redis.delay.annotation.DelayEventListener;
import cn.bootx.platform.starter.redis.delay.annotation.DelayJobEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.dromara.daxpay.core.enums.PayStatusEnum;
import org.dromara.daxpay.core.enums.RefundStatusEnum;
import org.dromara.daxpay.core.enums.TransferStatusEnum;
import org.dromara.daxpay.service.code.DaxPayCode;
import org.dromara.daxpay.service.dao.order.pay.PayOrderManager;
import org.dromara.daxpay.service.dao.order.refund.RefundOrderManager;
import org.dromara.daxpay.service.dao.order.transfer.TransferOrderManager;
import org.dromara.daxpay.service.entity.order.pay.PayOrder;
import org.dromara.daxpay.service.service.assist.PaymentAssistService;
import org.dromara.daxpay.service.service.trade.pay.PaySyncService;
import org.dromara.daxpay.service.service.trade.refund.RefundSyncService;
import org.dromara.daxpay.service.service.trade.transfer.TransferSyncService;
import org.springframework.stereotype.Service;
import java.util.Optional;
/**
* 订单交易相关的延时事件
* @author xxm
* @since 2024/8/16
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class TradeOrderEventService {
private final PaymentAssistService paymentAssistService;
private final PayOrderManager payOrderManager;
private final PaySyncService paySyncService;
private final RefundSyncService refundSyncService;
private final RefundOrderManager refundOrderManager;
private final TransferOrderManager transferOrderManager;
private final TransferSyncService transferSyncService;
/**
* 接收订单超时事件, 发起同步
*/
@DelayEventListener(DaxPayCode.Event.MERCHANT_PAY_TIMEOUT)
public void payExpired(DelayJobEvent<Long> event) {
Optional<PayOrder> orderOpt = payOrderManager.findById(event.getMessage());
if (orderOpt.isPresent()) {
PayOrder payOrder = orderOpt.get();
// 不是支付中不需要进行同步
if (payOrder.getStatus().equals(PayStatusEnum.PROGRESS.getCode())|| payOrder.getStatus().equals(PayStatusEnum.TIMEOUT.getCode())) {
paymentAssistService.initMchApp(payOrder.getAppId());
paySyncService.syncPayOrder(payOrder);
}
}
}
/**
* 接收退款订单同步事件
*/
@DelayEventListener(DaxPayCode.Event.MERCHANT_REFUND_SYNC)
public void refundDelaySync(DelayJobEvent<Long> event) {
var orderOpt = refundOrderManager.findById(event.getMessage());
if (orderOpt.isPresent()) {
var order = orderOpt.get();
// 不是退款中不需要进行同步
if (order.getStatus().equals(RefundStatusEnum.PROGRESS.getCode())) {
paymentAssistService.initMchApp(order.getAppId());
refundSyncService.syncRefundOrder(order);
}
}
}
/**
* 接收转账订单超时事件
*/
@DelayEventListener(DaxPayCode.Event.MERCHANT_TRANSFER_SYNC)
public void TransferDelaySync(DelayJobEvent<Long> event) {
var orderOpt = transferOrderManager.findById(event.getMessage());
if (orderOpt.isPresent()) {
var order = orderOpt.get();
// 不是退款中不需要进行同步
if (order.getStatus().equals(TransferStatusEnum.PROGRESS.getCode())) {
paymentAssistService.initMchApp(order.getAppId());
transferSyncService.syncTransferOrder(order);
}
}
}
}

View File

@@ -0,0 +1,95 @@
package org.dromara.daxpay.service.task;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.dromara.daxpay.service.dao.order.pay.PayOrderManager;
import org.dromara.daxpay.service.dao.order.refund.RefundOrderManager;
import org.dromara.daxpay.service.dao.order.transfer.TransferOrderManager;
import org.dromara.daxpay.service.entity.order.pay.PayOrder;
import org.dromara.daxpay.service.entity.order.refund.RefundOrder;
import org.dromara.daxpay.service.entity.order.transfer.TransferOrder;
import org.dromara.daxpay.service.service.assist.PaymentAssistService;
import org.dromara.daxpay.service.service.trade.pay.PaySyncService;
import org.dromara.daxpay.service.service.trade.refund.RefundSyncService;
import org.dromara.daxpay.service.service.trade.transfer.TransferSyncService;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* 交易订单同步定时任务
* @author xxm
* @since 2024/8/29
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class OrderSyncTaskService {
private final PayOrderManager payOrderManager;
private final PaySyncService paySyncService;
private final RefundOrderManager refundOrderManager;
private final RefundSyncService refundSyncService;
private final TransferOrderManager transferOrderManager;
private final TransferSyncService transferSyncService;
private final PaymentAssistService paymentAssistService;
/**
* 支付单超时检测 一分钟一次, 查询支付
*/
@Scheduled(cron = "0 */1 * * * ?")
public void queryExpiredTask(){
// 从数据库查询获取超时的任务对象
List<PayOrder> payOrders = payOrderManager.queryExpiredOrderNotTenant();
for (PayOrder order : payOrders) {
try {
// 设置补偿来源为定时任务
paymentAssistService.initMchApp(order.getAppId());
paySyncService.syncPayOrder(order);
} catch (Exception e) {
log.error("超时取消任务异常, ID: {}, 订单号: {}",order.getId(), order.getOrderNo(), e);
}
}
}
/**
* 退款定时同步任务 一分钟一次, 查询一分钟之前退款中的订单进行同步
* 10分钟内一分钟一次
* 一天内一小时一次
* 超过一天一天一次
*
*/
@Scheduled(cron = "0 */1 * * * ?")
public void refundSyncTask(){
// 查询退款中的退款订单
List<RefundOrder> list = refundOrderManager.findAllByProgress();
for (RefundOrder refundOrder : list) {
try {
// 调用同步方法
paymentAssistService.initMchApp(refundOrder.getAppId());
refundSyncService.syncRefundOrder(refundOrder);
} catch (Exception e) {
log.warn("退款执行同步失败, ID: {}, 退款号: {}",refundOrder.getId(), refundOrder.getRefundNo(), e);
}
}
}
/**
* 转账订单同步, 一分钟一次, 获取一分钟之前转账中的订单
*/
@Scheduled(cron = "0 */1 * * * ?")
public void transferSyncTask(){
List<TransferOrder> list = transferOrderManager.findAllByProgress();
for (var transferOrder : list) {
try {
// 调用同步方法
paymentAssistService.initMchApp(transferOrder.getAppId());
transferSyncService.syncTransferOrder(transferOrder);
} catch (Exception e) {
log.warn("转账执行同步失败, ID: {}, 转账号: {}",transferOrder.getId(),transferOrder.getTransferNo(), e);
}
}
}
}

View File

@@ -0,0 +1,95 @@
package org.dromara.daxpay.service.task;
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.StrUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.dromara.daxpay.service.dao.config.ChannelConfigManager;
import org.dromara.daxpay.service.dao.constant.ChannelConstManager;
import org.dromara.daxpay.service.dao.merchant.MchAppManager;
import org.dromara.daxpay.service.entity.config.ChannelConfig;
import org.dromara.daxpay.service.entity.constant.ChannelConst;
import org.dromara.daxpay.service.entity.merchant.MchApp;
import org.dromara.daxpay.service.entity.reconcile.ReconcileStatement;
import org.dromara.daxpay.service.enums.MchAppStatusEnum;
import org.dromara.daxpay.service.param.reconcile.ReconcileCreatParam;
import org.dromara.daxpay.service.service.assist.PaymentAssistService;
import org.dromara.daxpay.service.service.reconcile.ReconcileStatementService;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.time.LocalDate;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* 每日对账定时任务, 一天一次, 上午10.30执行
* @author xxm
* @since 2024/1/20
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class ReconcileTask {
private final ReconcileStatementService reconcileService;
private final MchAppManager mchAppManager;
private final ChannelConfigManager channelConfigManager;
private final ChannelConstManager channelConstManager;
private final PaymentAssistService paymentAssistService;
/**
* 任务实现, 上午10.30执行
*/
@Scheduled(cron = "0 30 10 * * ?")
public void reconcileTask() {
Map<String, String> channelMap = channelConstManager.findAll()
.stream()
.collect(Collectors.toMap(ChannelConst::getCode, ChannelConst::getName));
// 遍历所有启用的应用
List<MchApp> mchApps = mchAppManager.findAllByStatus(MchAppStatusEnum.ENABLE);
// 遍历应用下启用的通道
for (MchApp mchApp : mchApps) {
// 设置上下文
paymentAssistService.initMchApp(mchApp.getAppId());
List<ChannelConfig> configs = channelConfigManager.findEnableByAppId(mchApp.getAppId());
for (ChannelConfig config : configs) {
try {
log.info("应用: {} 通道: {} 执行对账任务任务 }", mchApp.getAppId(), config.getChannel());
this.reconcileTaskRun(mchApp, config, channelMap);
} catch (Exception e) {
log.error("应用: {} 通道: {} 执行对账任务失败 }", mchApp.getAppId(), config.getChannel(), e);
}
}
}
}
/**
* 执行任务
*/
public void reconcileTaskRun(MchApp mchApp, ChannelConfig config, Map<String, String> channelMap){
// 1. 创建订单
// 标题 【日期】 - 应用 - 通道
String title = StrUtil.format("【{}】{}-{}",
DateUtil.format(DateUtil.yesterday(), DatePattern.NORM_DATE_PATTERN), mchApp.getAppId(), channelMap.get(config.getChannel()));
ReconcileCreatParam param = new ReconcileCreatParam()
.setAppId(mchApp.getAppId())
.setChannel(config.getChannel())
.setDate(LocalDate.now().plusDays(-1))
.setTitle(title);
ReconcileStatement statement = reconcileService.create(param);
// 2. 执行对账任务, 下载对账单并解析和存储
reconcileService.downAndSave(statement);
// 3. 执行账单明细比对, 生成差异单
reconcileService.compare(statement);
}
}