feat 退款订单定时同步任务

This commit is contained in:
xxm1995
2024-03-12 17:57:15 +08:00
committed by 喵呀
parent ce048d3747
commit d69fdea2dd
14 changed files with 360 additions and 103 deletions

View File

@@ -37,4 +37,12 @@ public class PayOrderManager extends BaseManager<PayOrderMapper, PayOrder> {
QueryWrapper<PayOrder> generator = QueryGenerator.generator(query);
return page(mpPage, generator);
}
/**
* 强制更新 (忽略版本号)
*/
public void updateForceById(PayOrder payOrder) {
payOrder.setVersion(null);
this.updateById(payOrder);
}
}

View File

@@ -56,4 +56,17 @@ public class PayOrderService {
}
payOrderManager.updateById(payOrder);
}
/**
* 使用强制更新
*/
public void updateForceById(PayOrder payOrder){
// 如果是异步支付且支付订单完成, 需要删除订单超时任务记录
if (payOrder.isAsyncPay() && ORDER_FINISH.contains(payOrder.getStatus())){
expiredTimeService.cancelExpiredTime(payOrder.getId());
}
payOrder.setVersion(null);
payOrderManager.updateForceById(payOrder);
}
}

View File

@@ -4,6 +4,7 @@ import cn.bootx.platform.common.core.rest.param.PageParam;
import cn.bootx.platform.common.mybatisplus.impl.BaseManager;
import cn.bootx.platform.common.mybatisplus.util.MpUtil;
import cn.bootx.platform.common.query.generator.QueryGenerator;
import cn.bootx.platform.daxpay.code.RefundStatusEnum;
import cn.bootx.platform.daxpay.service.core.order.refund.entity.RefundOrder;
import cn.bootx.platform.daxpay.service.param.order.RefundOrderQuery;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
@@ -12,6 +13,8 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Repository;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Optional;
/**
@@ -46,4 +49,15 @@ public class RefundOrderManager extends BaseManager<RefundOrderMapper, RefundOrd
public boolean existsByRefundNo(String refundNo){
return this.existedByField(RefundOrder::getRefundNo,refundNo);
}
/**
* 查询退款中的支付订单
*/
public List<RefundOrder> findAllByProgress() {
LocalDateTime now = LocalDateTime.now();
return lambdaQuery()
.le(RefundOrder::getRefundTime,now)
.eq(RefundOrder::getStatus, RefundStatusEnum.PROGRESS)
.list();
}
}

View File

@@ -42,7 +42,7 @@ public class PayCallbackService {
public void payCallback() {
CallbackLocal callbackInfo = PaymentContextLocal.get().getCallbackInfo();
// 加锁
LockInfo lock = lockTemplate.lock("callback:payment:" + callbackInfo.getOrderId());
LockInfo lock = lockTemplate.lock("callback:payment:" + callbackInfo.getOrderId(),10000, 200);
if (Objects.isNull(lock)){
callbackInfo.setCallbackStatus(PayCallbackStatusEnum.IGNORE).setMsg("回调正在处理中,忽略本次回调请求");
log.warn("订单号: {} 回调正在处理中,忽略本次回调请求", callbackInfo.getOrderId());

View File

@@ -39,7 +39,7 @@ public class RefundCallbackService {
CallbackLocal callbackInfo = PaymentContextLocal.get().getCallbackInfo();
// 加锁
LockInfo lock = lockTemplate.lock("callback:refund:" + callbackInfo.getOrderId());
LockInfo lock = lockTemplate.lock("callback:refund:" + callbackInfo.getOrderId(),10000, 200);
if (Objects.isNull(lock)){
callbackInfo.setCallbackStatus(PayCallbackStatusEnum.IGNORE).setMsg("回调正在处理中,忽略本次回调请求");
log.warn("订单号: {} 回调正在处理中,忽略本次回调请求", callbackInfo.getOrderId());

View File

@@ -63,7 +63,7 @@ public class PayCloseService {
payOrder = payOrderQueryService.findByBusinessNo(param.getBusinessNo())
.orElseThrow(() -> new PayFailureException("未查询到支付订单"));
}
LockInfo lock = lockTemplate.lock("payment:close:" + payOrder.getId());
LockInfo lock = lockTemplate.lock("payment:close:" + payOrder.getId(),10000, 50);
if (Objects.isNull(lock)){
throw new RepetitiveOperationException("支付订单已在关闭中,请勿重复发起");
}

View File

@@ -189,7 +189,7 @@ public class ClientNoticeService {
private void run(Long taskId){
LocalDateTime now = LocalDateTime.now();
// 开启分布式锁
LockInfo lock = lockTemplate.lock(KEY + ":" + taskId,2000, 50);
LockInfo lock = lockTemplate.lock(KEY + ":" + taskId,10000, 200);
if (Objects.isNull(lock)){
throw new RepetitiveOperationException("支付同步处理中,请勿重复操作");
}

View File

@@ -1,54 +0,0 @@
package cn.bootx.platform.daxpay.service.core.payment.pay.task;
import cn.bootx.platform.common.core.exception.RepetitiveOperationException;
import cn.bootx.platform.daxpay.param.pay.PaySyncParam;
import cn.bootx.platform.daxpay.service.core.payment.pay.dao.PayExpiredTimeRepository;
import cn.bootx.platform.daxpay.service.core.payment.sync.service.PaySyncService;
import com.baomidou.lock.LockInfo;
import com.baomidou.lock.LockTemplate;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.Objects;
import java.util.Set;
/**
* 待支付订单的状态同步, 先不进行启用
* @author xxm
* @since 2024/1/5
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class PayWaitOrderSyncTask {
private final PayExpiredTimeRepository repository;
private final PaySyncService paySyncService;
private final LockTemplate lockTemplate;
public void task(){
log.debug("开始同步支付订单状态");
// 从超时订单列表中获取到未超时的订单号
Set<String> keys = repository.getNormalKeysBy30Day();
for (String key : keys) {
LockInfo lock = lockTemplate.lock("payment:sync:" + key,10000,200);
if (Objects.isNull(lock)){
throw new RepetitiveOperationException("支付同步处理中,请勿重复操作");
}
try {
Long paymentId = Long.parseLong(key);
PaySyncParam paySyncParam = new PaySyncParam();
paySyncParam.setPaymentId(paymentId);
// 执行网关同步, 网关同步时会对支付的进行状态的处理
paySyncService.sync(paySyncParam);
} catch (Exception e) {
log.error("同步支付订单异常", e);
} finally {
lockTemplate.releaseLock(lock);
}
}
}
}

View File

@@ -17,16 +17,15 @@ import cn.bootx.platform.daxpay.service.core.record.repair.entity.PayRepairRecor
import cn.bootx.platform.daxpay.service.core.record.repair.service.PayRepairRecordService;
import cn.bootx.platform.daxpay.service.func.AbsPayRepairStrategy;
import cn.hutool.core.util.IdUtil;
import com.baomidou.lock.LockInfo;
import com.baomidou.lock.LockTemplate;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -48,11 +47,20 @@ public class PayRepairService {
private final PayRepairRecordService recordService;
private final LockTemplate lockTemplate;
/**
* 修复支付单
*/
@Transactional(rollbackFor = Exception.class)
public PayRepairResult repair(PayOrder order, PayRepairWayEnum repairType){
// 添加分布式锁
LockInfo lock = lockTemplate.lock("repair:pay:" + order.getId(), 10000, 200);
if (Objects.isNull(lock)){
log.warn("当前支付定单正在修复中: {}", order.getId());
return new PayRepairResult();
}
// 1. 获取支付单管理的通道支付订单
Map<String, PayChannelOrder> channelOrderMap = channelOrderManager.findAllByPaymentId(order.getId())
.stream()

View File

@@ -23,6 +23,8 @@ import cn.bootx.platform.daxpay.service.core.record.repair.entity.PayRepairRecor
import cn.bootx.platform.daxpay.service.core.record.repair.service.PayRepairRecordService;
import cn.bootx.platform.daxpay.service.func.AbsRefundRepairStrategy;
import cn.hutool.core.util.IdUtil;
import com.baomidou.lock.LockInfo;
import com.baomidou.lock.LockTemplate;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@@ -56,54 +58,67 @@ public class RefundRepairService {
private final PayRepairRecordService recordService;
private final LockTemplate lockTemplate;
/**
* 修复退款单
*/
@Transactional(rollbackFor = Exception.class)
public RefundRepairResult repair(RefundOrder refundOrder, RefundRepairWayEnum repairType){
// 获取关联支付单
PayOrder payOrder = payOrderQueryService.findById(refundOrder.getPaymentId())
.orElseThrow(() -> new RuntimeException("支付单不存在"));
// 关联支付通道支付单
Map<String, PayChannelOrder> payChannelOrderMap = payChannelOrderManager.findAllByPaymentId(refundOrder.getPaymentId())
.stream()
.collect(Collectors.toMap(PayChannelOrder::getChannel, Function.identity(), CollectorsFunction::retainLatest));
// 异步通道退款单
Map<String, RefundChannelOrder> refundChannelOrderMap = refundChannelOrderManager.findAllByRefundId(refundOrder.getId())
.stream()
.collect(Collectors.toMap(RefundChannelOrder::getChannel, Function.identity(), CollectorsFunction::retainLatest));
// 2 初始化修复参数
List<String> channels = new ArrayList<>(payChannelOrderMap.keySet());
List<AbsRefundRepairStrategy> repairStrategies = RefundRepairStrategyFactory.createAsyncLast(channels);
for (AbsRefundRepairStrategy repairStrategy : repairStrategies) {
PayChannelOrder payChannelOrder = payChannelOrderMap.get(repairStrategy.getChannel().getCode());
RefundChannelOrder refundChannelOrder = refundChannelOrderMap.get(repairStrategy.getChannel().getCode());
repairStrategy.initRepairParam(refundOrder, refundChannelOrder, payOrder, payChannelOrder);
// 添加分布式锁
LockInfo lock = lockTemplate.lock("repair:refund:" + refundOrder.getId(), 10000, 200);
if (Objects.isNull(lock)){
log.warn("当前退款单正在修复中: {}", refundOrder.getId());
return new RefundRepairResult();
}
try {
// 获取关联支付单
PayOrder payOrder = payOrderQueryService.findById(refundOrder.getPaymentId())
.orElseThrow(() -> new RuntimeException("支付单不存在"));
// 关联支付通道支付单
Map<String, PayChannelOrder> payChannelOrderMap = payChannelOrderManager.findAllByPaymentId(refundOrder.getPaymentId())
.stream()
.collect(Collectors.toMap(PayChannelOrder::getChannel, Function.identity(), CollectorsFunction::retainLatest));
// 异步通道退款单
Map<String, RefundChannelOrder> refundChannelOrderMap = refundChannelOrderManager.findAllByRefundId(refundOrder.getId())
.stream()
.collect(Collectors.toMap(RefundChannelOrder::getChannel, Function.identity(), CollectorsFunction::retainLatest));
// 根据不同的类型执行对应的修复逻辑
RefundRepairResult repairResult = new RefundRepairResult();
if (Objects.requireNonNull(repairType) == RefundRepairWayEnum.REFUND_SUCCESS) {
repairResult = this.success(refundOrder,payOrder,repairStrategies);
} else if (repairType == RefundRepairWayEnum.REFUND_FAIL) {
repairResult = this.close(refundOrder,payOrder,repairStrategies);
} else {
log.error("走到了理论上讲不会走到的分支");
// 2 初始化修复参数
List<String> channels = new ArrayList<>(payChannelOrderMap.keySet());
List<AbsRefundRepairStrategy> repairStrategies = RefundRepairStrategyFactory.createAsyncLast(channels);
for (AbsRefundRepairStrategy repairStrategy : repairStrategies) {
PayChannelOrder payChannelOrder = payChannelOrderMap.get(repairStrategy.getChannel()
.getCode());
RefundChannelOrder refundChannelOrder = refundChannelOrderMap.get(repairStrategy.getChannel()
.getCode());
repairStrategy.initRepairParam(refundOrder, refundChannelOrder, payOrder, payChannelOrder);
}
// 根据不同的类型执行对应的修复逻辑
RefundRepairResult repairResult = new RefundRepairResult();
if (Objects.requireNonNull(repairType) == RefundRepairWayEnum.REFUND_SUCCESS) {
repairResult = this.success(refundOrder, payOrder, repairStrategies);
} else if (repairType == RefundRepairWayEnum.REFUND_FAIL) {
repairResult = this.close(refundOrder, payOrder, repairStrategies);
} else {
log.error("走到了理论上讲不会走到的分支");
}
// 设置修复ID并保存修复记录
repairResult.setRepairNo(IdUtil.getSnowflakeNextIdStr());
// 支付修复记录
PayRepairRecord payRepairRecord = this.payRepairRecord(payOrder, repairType, repairResult);
// 退款修复记录
PayRepairRecord refundRepairRecord = this.refundRepairRecord(refundOrder, repairType, repairResult);
// 发送通知
clientNoticeService.registerRefundNotice(refundOrder, null, new ArrayList<>(refundChannelOrderMap.values()));
recordService.saveAllRecord(Arrays.asList(payRepairRecord, refundRepairRecord));
return repairResult;
} finally {
lockTemplate.releaseLock(lock);
}
// 设置修复ID并保存修复记录
repairResult.setRepairNo(IdUtil.getSnowflakeNextIdStr());
// 支付修复记录
PayRepairRecord payRepairRecord = this.payRepairRecord(payOrder, repairType, repairResult);
// 退款修复记录
PayRepairRecord refundRepairRecord = this.refundRepairRecord(refundOrder, repairType, repairResult);
// 发送通知
clientNoticeService.registerRefundNotice(refundOrder, null, new ArrayList<>(refundChannelOrderMap.values()));
recordService.saveAllRecord(Arrays.asList(payRepairRecord, refundRepairRecord));
return repairResult;
}
/**

View File

@@ -88,7 +88,7 @@ public class PaySyncService {
@Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor = Exception.class)
public SyncResult syncPayOrder(PayOrder payOrder) {
// 加锁
LockInfo lock = lockTemplate.lock("sync:payment" + payOrder.getId(),10000,200);
LockInfo lock = lockTemplate.lock("sync:pay" + payOrder.getId(),10000,200);
if (Objects.isNull(lock)){
throw new RepetitiveOperationException("支付同步处理中,请勿重复操作");
}

View File

@@ -0,0 +1,27 @@
package cn.bootx.platform.daxpay.service.task;
import cn.bootx.platform.daxpay.service.task.service.RefundSyncTaskService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.quartz.*;
import org.springframework.stereotype.Component;
/**
* 退款定时同步任务 一分钟一次, 查询退款中的订单进行同步
* @author xxm
* @since 2024/3/12
*/
@Slf4j
@Component
@DisallowConcurrentExecution
@PersistJobDataAfterExecution
@RequiredArgsConstructor
public class RefundSyncTask implements Job {
private final RefundSyncTaskService refundSyncTaskService;
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
refundSyncTaskService.syncTask();
}
}

View File

@@ -0,0 +1,41 @@
package cn.bootx.platform.daxpay.service.task.service;
import cn.bootx.platform.daxpay.service.core.order.refund.dao.RefundOrderManager;
import cn.bootx.platform.daxpay.service.core.order.refund.entity.RefundOrder;
import cn.bootx.platform.daxpay.service.core.payment.sync.service.RefundSyncService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* 定时
* @author xxm
* @since 2024/3/12
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class RefundSyncTaskService {
private final RefundSyncService refundSyncService;
private final RefundOrderManager refundOrderManager;
/**
* 同步任务
*/
public void syncTask(){
// 查询退款中的退款订单
List<RefundOrder> list = refundOrderManager.findAllByProgress();
for (RefundOrder refundOrder : list) {
try {
// 调用同步方法
refundSyncService.syncRefundOrder(refundOrder);
} catch (Exception e) {
log.warn("退款执行同步失败, ID: {}",refundOrder.getId());
}
}
}
}