feat 订单取消/修复/取消/同步等操作添加分布式锁, 防止出现重复操作

This commit is contained in:
nws
2024-01-06 18:35:20 +08:00
parent 9858c3d305
commit 04538cc141
11 changed files with 161 additions and 118 deletions

View File

@@ -56,11 +56,6 @@
<artifactId>common-super-query</artifactId>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
</dependency>
<!-- 支付核心包-->
<dependency>
<groupId>cn.bootx.platform</groupId>
@@ -104,5 +99,10 @@
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>lock4j-redis-template-spring-boot-starter</artifactId>
<version>${lock4j.version}</version>
</dependency>
</dependencies>
</project>

View File

@@ -1,5 +1,6 @@
package cn.bootx.platform.daxpay.service.core.payment.close.service;
import cn.bootx.platform.common.core.exception.RepetitiveOperationException;
import cn.bootx.platform.daxpay.code.PayStatusEnum;
import cn.bootx.platform.daxpay.exception.pay.PayFailureException;
import cn.bootx.platform.daxpay.exception.pay.PayUnsupportedMethodException;
@@ -13,6 +14,8 @@ import cn.bootx.platform.daxpay.service.core.record.pay.entity.PayOrder;
import cn.bootx.platform.daxpay.service.core.record.pay.service.PayOrderService;
import cn.bootx.platform.daxpay.service.func.AbsPayCloseStrategy;
import cn.hutool.core.collection.CollectionUtil;
import com.baomidou.lock.LockInfo;
import com.baomidou.lock.LockTemplate;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@@ -34,6 +37,8 @@ public class PayCloseService {
private final PayOrderService payOrderService;
private final PayCloseRecordService payCloseRecordService;
private final LockTemplate lockTemplate;
/**
* 关闭支付
*/
@@ -48,7 +53,15 @@ public class PayCloseService {
payOrder = payOrderService.findByBusinessNo(param.getBusinessNo())
.orElseThrow(() -> new PayFailureException("未查询到支付订单"));
}
this.close(payOrder);
LockInfo lock = lockTemplate.lock("payment:close:" + payOrder.getId());
if (Objects.isNull(lock)){
throw new RepetitiveOperationException("支付订单已在关闭中,请勿重复发起");
}
try {
this.close(payOrder);
} finally {
lockTemplate.releaseLock(lock);
}
}
/**

View File

@@ -1,21 +1,24 @@
package cn.bootx.platform.daxpay.service.core.payment.pay.service;
import cn.bootx.platform.common.core.exception.RepetitiveOperationException;
import cn.bootx.platform.daxpay.code.PayStatusEnum;
import cn.bootx.platform.daxpay.service.core.payment.pay.factory.PayStrategyFactory;
import cn.bootx.platform.daxpay.service.core.record.pay.builder.PaymentBuilder;
import cn.bootx.platform.daxpay.service.core.record.pay.entity.PayOrder;
import cn.bootx.platform.daxpay.service.core.record.pay.service.PayOrderService;
import cn.bootx.platform.daxpay.exception.pay.PayUnsupportedMethodException;
import cn.bootx.platform.daxpay.service.func.AbsPayStrategy;
import cn.bootx.platform.daxpay.service.func.PayStrategyConsumer;
import cn.bootx.platform.daxpay.param.pay.PayParam;
import cn.bootx.platform.daxpay.param.pay.PayWayParam;
import cn.bootx.platform.daxpay.param.pay.SimplePayParam;
import cn.bootx.platform.daxpay.result.pay.PayResult;
import cn.bootx.platform.daxpay.service.core.payment.pay.factory.PayStrategyFactory;
import cn.bootx.platform.daxpay.service.core.record.pay.builder.PaymentBuilder;
import cn.bootx.platform.daxpay.service.core.record.pay.entity.PayOrder;
import cn.bootx.platform.daxpay.service.core.record.pay.service.PayOrderService;
import cn.bootx.platform.daxpay.service.func.AbsPayStrategy;
import cn.bootx.platform.daxpay.service.func.PayStrategyConsumer;
import cn.bootx.platform.daxpay.util.PayUtil;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.bean.copier.CopyOptions;
import cn.hutool.core.collection.CollectionUtil;
import com.baomidou.lock.LockInfo;
import com.baomidou.lock.LockTemplate;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@@ -41,6 +44,8 @@ public class PayService {
private final PayAssistService payAssistService;
private final LockTemplate lockTemplate;
/**
* 支付下单接口(同步/异步/组合支付)
* 1. 同步支付:都只会在第一次执行中就完成支付,例如钱包、储值卡都是调用完就进行了扣减,完成了支付记录
@@ -55,18 +60,28 @@ public class PayService {
// 异步支付方式检查
PayUtil.validationAsyncPay(payParam);
// 获取并校验支付订单状态, 如果超时, 触发支付单同步和修复动作
PayOrder payOrder = payAssistService.getOrderAndCheck(payParam.getBusinessNo());
String businessNo = payParam.getBusinessNo();
// 加锁
LockInfo lock = lockTemplate.lock("payment:pay:" + businessNo);
if (Objects.isNull(lock)){
throw new RepetitiveOperationException("正在支付中,请勿重复支付");
}
try {
// 获取并校验支付订单状态, 如果超时, 触发支付单同步和修复动作
PayOrder payOrder = payAssistService.getOrderAndCheck(payParam.getBusinessNo());
// 初始化上下文
payAssistService.initPayContext(payOrder, payParam);
// 初始化上下文
payAssistService.initPayContext(payOrder, payParam);
// 异步支付且非第一次支付
if (Objects.nonNull(payOrder) && payOrder.isAsyncPay()) {
return this.paySyncNotFirst(payParam, payOrder);
} else {
// 第一次发起支付或同步支付
return this.firstPay(payParam, payOrder);
// 异步支付且非第一次支付
if (Objects.nonNull(payOrder) && payOrder.isAsyncPay()) {
return this.paySyncNotFirst(payParam, payOrder);
} else {
// 第一次发起支付或同步支付
return this.firstPay(payParam, payOrder);
}
} finally {
lockTemplate.releaseLock(lock);
}
}

View File

@@ -1,5 +1,6 @@
package cn.bootx.platform.daxpay.service.core.payment.refund.service;
import cn.bootx.platform.common.core.exception.RepetitiveOperationException;
import cn.bootx.platform.common.core.util.ValidationUtil;
import cn.bootx.platform.daxpay.code.PayStatusEnum;
import cn.bootx.platform.daxpay.service.core.payment.refund.factory.PayRefundStrategyFactory;
@@ -13,6 +14,8 @@ import cn.bootx.platform.daxpay.param.pay.SimpleRefundParam;
import cn.bootx.platform.daxpay.result.pay.RefundResult;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.collection.CollectionUtil;
import com.baomidou.lock.LockInfo;
import com.baomidou.lock.LockTemplate;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@@ -20,6 +23,7 @@ import org.springframework.transaction.annotation.Transactional;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
/**
@@ -35,6 +39,9 @@ public class PayRefundService {
private final PayRefundAssistService payRefundAssistService;;
private final PayOrderService payOrderService;
private final LockTemplate lockTemplate;
/**
* 支付退款
*/
@@ -69,19 +76,30 @@ public class PayRefundService {
PayOrder payOrder = payRefundAssistService.getPayOrderAndCheckByRefundParam(param, simple);
// 参数校验
ValidationUtil.validateParam(param);
// 退款上下文初始化
payRefundAssistService.initRefundContext(param);
// 是否全部退款
if (param.isRefundAll()){
// 全部退款根据支付订单的退款信息构造退款参数
List<RefundChannelParam> channelParams = payOrder.getRefundableInfos()
.stream()
.map(o -> new RefundChannelParam().setChannel(o.getChannel())
.setAmount(o.getAmount()))
.collect(Collectors.toList());
param.setRefundChannels(channelParams);
// 加锁
LockInfo lock = lockTemplate.lock("payment:refund:" + payOrder.getId());
if (Objects.isNull(lock)){
throw new RepetitiveOperationException("退款处理中,请勿重复操作");
}
try {
// 退款上下文初始化
payRefundAssistService.initRefundContext(param);
// 是否全部退款
if (param.isRefundAll()){
// 全部退款根据支付订单的退款信息构造退款参数
List<RefundChannelParam> channelParams = payOrder.getRefundableInfos()
.stream()
.map(o -> new RefundChannelParam().setChannel(o.getChannel())
.setAmount(o.getAmount()))
.collect(Collectors.toList());
param.setRefundChannels(channelParams);
}
return this.refundByChannel(param,payOrder);
} finally {
lockTemplate.releaseLock(lock);
}
return this.refundByChannel(param,payOrder);
}
/**

View File

@@ -1,6 +1,7 @@
package cn.bootx.platform.daxpay.service.core.payment.sync.service;
import cn.bootx.platform.common.core.exception.BizException;
import cn.bootx.platform.common.core.exception.RepetitiveOperationException;
import cn.bootx.platform.common.core.util.LocalDateTimeUtil;
import cn.bootx.platform.daxpay.code.PayStatusEnum;
import cn.bootx.platform.daxpay.code.PaySyncStatusEnum;
@@ -19,6 +20,8 @@ import cn.bootx.platform.daxpay.service.core.record.pay.service.PayOrderService;
import cn.bootx.platform.daxpay.service.core.record.sync.entity.PaySyncRecord;
import cn.bootx.platform.daxpay.service.core.record.sync.service.PaySyncRecordService;
import cn.bootx.platform.daxpay.service.func.AbsPaySyncStrategy;
import com.baomidou.lock.LockInfo;
import com.baomidou.lock.LockTemplate;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@@ -48,6 +51,8 @@ public class PaySyncService {
private final PayRepairService repairService;
private final LockTemplate lockTemplate;
/**
* 支付同步, 开启一个新的事务, 不受外部抛出异常的影响
*/
@@ -75,46 +80,56 @@ public class PaySyncService {
* 2. 如果状态不一致, 调用修复逻辑进行修复
*/
@Transactional(propagation = Propagation.REQUIRES_NEW)
public PaySyncResult syncPayOrder(PayOrder order) {
// 获取同步策略类
AbsPaySyncStrategy syncPayStrategy = PaySyncStrategyFactory.create(order.getAsyncChannel());
syncPayStrategy.initPayParam(order);
// 记录支付同步前后的状态
String oldStatus = order.getStatus();
String repairStatus = null;
// 执行同步操作, 获取支付网关同步的结果
GatewaySyncResult syncResult = syncPayStrategy.doSyncStatus();
// 判断是否同步成功
if (Objects.equals(syncResult.getSyncStatus(), PaySyncStatusEnum.FAIL)){
// 同步失败, 返回失败响应, 同时记录失败的日志
this.saveRecord(order, syncResult, true, oldStatus, null, syncResult.getErrorMsg());
return new PaySyncResult().setErrorMsg(syncResult.getErrorMsg());
public PaySyncResult syncPayOrder(PayOrder payOrder) {
// 加锁
LockInfo lock = lockTemplate.lock("payment:refund:" + payOrder.getId());
if (Objects.isNull(lock)){
throw new RepetitiveOperationException("支付同步处理中,请勿重复操作");
}
// 判断网关状态是否和支付单一致, 同时更新网关同步状态
boolean statusSync = this.checkAndAdjustSyncStatus(syncResult,order);
try {
// 状态不一致,执行支付单修复逻辑
if (!statusSync){
this.resultHandler(syncResult, order);
repairStatus = order.getStatus();
}
} catch (PayFailureException e) {
// 同步失败, 返回失败响应, 同时记录失败的日志
syncResult.setSyncStatus(PaySyncStatusEnum.FAIL);
this.saveRecord(order, syncResult, false, oldStatus, null, e.getMessage());
return new PaySyncResult().setErrorMsg(e.getMessage());
}
// 获取同步策略类
AbsPaySyncStrategy syncPayStrategy = PaySyncStrategyFactory.create(payOrder.getAsyncChannel());
syncPayStrategy.initPayParam(payOrder);
// 记录支付单同步前后的状态
String oldStatus = payOrder.getStatus();
String repairStatus = null;
// 同步成功记录日志
this.saveRecord( order, syncResult, !statusSync, oldStatus, repairStatus, null);
return new PaySyncResult()
.setGatewayStatus(syncResult.getSyncStatus().getCode())
.setSuccess(true)
.setRepair(!statusSync)
.setOldStatus(oldStatus)
.setRepairStatus(repairStatus);
// 执行同步操作, 获取支付网关同步的结果
GatewaySyncResult syncResult = syncPayStrategy.doSyncStatus();
// 判断是否同步成功
if (Objects.equals(syncResult.getSyncStatus(), PaySyncStatusEnum.FAIL)){
// 同步失败, 返回失败响应, 同时记录失败的日志
this.saveRecord(payOrder, syncResult, true, oldStatus, null, syncResult.getErrorMsg());
return new PaySyncResult().setErrorMsg(syncResult.getErrorMsg());
}
// 判断网关状态是否和支付单一致, 同时更新网关同步状态
boolean statusSync = this.checkAndAdjustSyncStatus(syncResult,payOrder);
try {
// 状态不一致,执行支付单修复逻辑
if (!statusSync){
this.resultHandler(syncResult, payOrder);
repairStatus = payOrder.getStatus();
}
} catch (PayFailureException e) {
// 同步失败, 返回失败响应, 同时记录失败的日志
syncResult.setSyncStatus(PaySyncStatusEnum.FAIL);
this.saveRecord(payOrder, syncResult, false, oldStatus, null, e.getMessage());
return new PaySyncResult().setErrorMsg(e.getMessage());
}
// 同步成功记录日志
this.saveRecord( payOrder, syncResult, !statusSync, oldStatus, repairStatus, null);
return new PaySyncResult()
.setGatewayStatus(syncResult.getSyncStatus().getCode())
.setSuccess(true)
.setRepair(!statusSync)
.setOldStatus(oldStatus)
.setRepairStatus(repairStatus);
} finally {
lockTemplate.releaseLock(lock);
}
}
/**

View File

@@ -1,39 +0,0 @@
package cn.bootx.platform.daxpay.service.core.payment.sync.task;
import cn.bootx.platform.daxpay.service.core.payment.sync.service.PaySyncService;
import cn.bootx.platform.daxpay.param.pay.PaySyncParam;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
/**
* 对未过期的支付中订单进行状态同步
* @author xxm
* @since 2024/1/1
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class PayOrderSyncTaskService {
private final cn.bootx.platform.daxpay.service.core.timeout.dao.PayExpiredTimeRepository PayExpiredTimeRepository;
private final PaySyncService paySyncService;
/**
* 同步支付订单任务
*/
public void syncTask() {
log.info("开始同步支付订单");
// 1. 从超时订单列表中获取到未超时的订单号
for (String s : PayExpiredTimeRepository.getNormalKeysBy30Day()) {
try {
Long paymentId = Long.parseLong(s);
PaySyncParam paySyncParam = new PaySyncParam();
paySyncParam.setPaymentId(paymentId);
paySyncService.sync(paySyncParam);
} catch (Exception e) {
log.error("同步支付订单异常", e);
}
}
}
}

View File

@@ -1,13 +1,18 @@
package cn.bootx.platform.daxpay.service.core.timeout.task;
import cn.bootx.platform.common.core.exception.RepetitiveOperationException;
import cn.bootx.platform.daxpay.param.pay.PaySyncParam;
import cn.bootx.platform.daxpay.service.core.payment.sync.service.PaySyncService;
import cn.bootx.platform.daxpay.service.core.timeout.dao.PayExpiredTimeRepository;
import com.baomidou.lock.LockInfo;
import com.baomidou.lock.LockTemplate;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.util.Objects;
import java.util.Set;
/**
@@ -20,15 +25,20 @@ import java.util.Set;
@RequiredArgsConstructor
public class PayExpiredTimeTask {
private final PayExpiredTimeRepository repository;
private final PaySyncService paySyncService;
private final LockTemplate lockTemplate;
// @Scheduled(cron = "*/5 * * * * ?")
@Scheduled(cron = "*/5 * * * * ?")
public void task(){
log.info("执行超时取消任务....");
Set<String> expiredKeys = repository.getExpiredKeys(LocalDateTime.now());
for (String expiredKey : expiredKeys) {
log.info("key:{}", expiredKey);
LockInfo lock = lockTemplate.lock("payment:expired:" + expiredKey,10000,0);
if (Objects.isNull(lock)){
throw new RepetitiveOperationException("支付同步处理中,请勿重复操作");
}
try {
// 执行同步操作, 网关同步时会对支付的进行状态的处理
Long paymentId = Long.parseLong(expiredKey);
@@ -37,6 +47,8 @@ public class PayExpiredTimeTask {
paySyncService.sync(paySyncParam);
} catch (Exception e) {
log.error("超时取消任务 异常", e);
} finally {
lockTemplate.releaseLock(lock);
}
}

View File

@@ -1,12 +1,16 @@
package cn.bootx.platform.daxpay.service.core.timeout.task;
import cn.bootx.platform.common.core.exception.RepetitiveOperationException;
import cn.bootx.platform.daxpay.param.pay.PaySyncParam;
import cn.bootx.platform.daxpay.service.core.payment.sync.service.PaySyncService;
import cn.bootx.platform.daxpay.service.core.timeout.dao.PayExpiredTimeRepository;
import com.baomidou.lock.LockInfo;
import com.baomidou.lock.LockTemplate;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.Objects;
import java.util.Set;
/**
@@ -22,12 +26,17 @@ public class PayWaitOrderSyncTask {
private final PaySyncService paySyncService;
private final LockTemplate lockTemplate;
public void task(){
log.info("开始同步支付订单");
// 从超时订单列表中获取到未超时的订单号
Set<String> keys = repository.getNormalKeysBy30Day();
for (String key : keys) {
log.info("key:{}", key);
LockInfo lock = lockTemplate.lock("payment:sync:" + key,10000,0);
if (Objects.isNull(lock)){
throw new RepetitiveOperationException("支付同步处理中,请勿重复操作");
}
try {
Long paymentId = Long.parseLong(key);
PaySyncParam paySyncParam = new PaySyncParam();
@@ -36,6 +45,8 @@ public class PayWaitOrderSyncTask {
paySyncService.sync(paySyncParam);
} catch (Exception e) {
log.error("同步支付订单异常", e);
} finally {
lockTemplate.releaseLock(lock);
}
}