feat 消息通知功能开发

This commit is contained in:
xxm1995
2024-02-21 16:51:23 +08:00
parent 34de3574ba
commit 3ef655aa3e
9 changed files with 222 additions and 55 deletions

View File

@@ -77,8 +77,8 @@ public class PayBuilder {
.setNotNotify(payParam.isNotNotify())
.setNotifyUrl(noticeInfo.getNotifyUrl())
.setReturnUrl(noticeInfo.getReturnUrl())
.setSign(payParam.getSign())
.setSignType(platform.getSignType())
.setReqSign(payParam.getSign())
.setReqSignType(platform.getSignType())
.setAttach(payParam.getAttach())
.setApiVersion(payParam.getVersion())
.setReqTime(payParam.getReqTime());

View File

@@ -50,13 +50,19 @@ public class PayOrderExtra extends MpBaseEntity implements EntityBaseFunction<Pa
@TableField(updateStrategy = FieldStrategy.ALWAYS)
private String returnUrl;
/** 签名类型 */
@DbColumn(comment = "签名类型")
private String signType;
@DbColumn(comment = "是否需要通知客户系统")
private boolean notice;
/** 签名,以最后一次为准 */
@DbColumn(comment = "回调通知时是否需要进行签名")
private boolean noticeSign;
/** 请求签名类型 */
@DbColumn(comment = "签名类型")
private String reqSignType;
/** 请求签名值,以最后一次为准 */
@DbColumn(comment = "签名")
private String sign;
private String reqSign;
/** 商户扩展参数,回调时会原样返回 */
@DbColumn(comment = "商户扩展参数")

View File

@@ -1,6 +1,7 @@
package cn.bootx.platform.daxpay.service.core.payment.notice.entity;
import cn.bootx.platform.common.mybatisplus.base.MpCreateEntity;
import cn.bootx.platform.daxpay.service.code.ClientNoticeTypeEnum;
import cn.bootx.table.modify.annotation.DbColumn;
import cn.bootx.table.modify.annotation.DbTable;
import com.baomidou.mybatisplus.annotation.TableName;
@@ -26,7 +27,7 @@ public class ClientNoticeTask extends MpCreateEntity {
/**
* 回调类型
* @see
* @see ClientNoticeTypeEnum
*/
@DbColumn(comment = "回调类型")
private String type;
@@ -41,7 +42,7 @@ public class ClientNoticeTask extends MpCreateEntity {
/** 发送次数 */
@DbColumn(comment = "发送次数")
private Integer sendTimes;
private Integer sendCount;
/** 发送地址 */
@DbColumn(comment = "发送地址")

View File

@@ -1,11 +1,23 @@
package cn.bootx.platform.daxpay.service.core.payment.notice.service;
import cn.bootx.platform.common.core.exception.RepetitiveOperationException;
import cn.bootx.platform.common.core.util.CollUtil;
import cn.bootx.platform.common.core.util.LocalDateTimeUtil;
import cn.bootx.platform.common.redis.RedisClient;
import cn.bootx.platform.common.spring.exception.RetryableException;
import cn.bootx.platform.daxpay.code.PaySignTypeEnum;
import cn.bootx.platform.daxpay.service.code.ClientNoticeTypeEnum;
import cn.bootx.platform.daxpay.service.core.notice.result.PayChannelResult;
import cn.bootx.platform.daxpay.service.core.notice.result.PayNoticeResult;
import cn.bootx.platform.daxpay.service.core.order.pay.dao.PayChannelOrderManager;
import cn.bootx.platform.daxpay.service.core.order.pay.dao.PayOrderExtraManager;
import cn.bootx.platform.daxpay.service.core.order.pay.entity.PayChannelOrder;
import cn.bootx.platform.daxpay.service.core.order.pay.entity.PayOrder;
import cn.bootx.platform.daxpay.service.core.order.pay.entity.PayOrderExtra;
import cn.bootx.platform.daxpay.service.core.payment.notice.dao.ClientNoticeTaskManager;
import cn.bootx.platform.daxpay.service.core.payment.notice.entity.ClientNoticeTask;
import cn.hutool.extra.spring.SpringUtil;
import cn.bootx.platform.daxpay.service.core.system.config.entity.PlatformConfig;
import cn.bootx.platform.daxpay.service.core.system.config.service.PlatformConfigService;
import cn.bootx.platform.daxpay.util.PaySignUtil;
import cn.hutool.http.ContentType;
import cn.hutool.http.HttpException;
import cn.hutool.http.HttpResponse;
@@ -14,13 +26,12 @@ import com.baomidou.lock.LockInfo;
import com.baomidou.lock.LockTemplate;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.retry.annotation.Retryable;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.util.Objects;
import java.util.Set;
import java.util.*;
import java.util.stream.Collectors;
/**
* 消息通知任务服务
@@ -33,36 +44,129 @@ import java.util.Set;
@RequiredArgsConstructor
public class ClientNoticeService {
private final PayOrderExtraManager orderExtraManager;
private final PayChannelOrderManager channelOrderManager;
private final PlatformConfigService configService;
private final ClientNoticeTaskManager taskManager;
private final RedisClient redisClient;
private final LockTemplate lockTemplate;
private final String KEY = "client:notice:task";
private static final String KEY = "client:notice:task";
private static final Map<Integer,Integer> DELAY_TIME = new HashMap<>();
/*
* 初始化延迟时间表, 总共会发起16次通知吗, 总计 24h4m
* 通知频率为0s/15s/15s/30s/3m/10m/20m/30m/30m/30m/60m/3h/3h/3h/6h/6h
*/
static {
DELAY_TIME.put(1, 0);
DELAY_TIME.put(2, 15 * 1000);
DELAY_TIME.put(3, 15 * 1000);
DELAY_TIME.put(4, 30 * 1000);
DELAY_TIME.put(5, 3 * 60 * 1000);
DELAY_TIME.put(6, 10 * 60 * 1000);
DELAY_TIME.put(7, 20 * 60 * 1000);
DELAY_TIME.put(8, 30 * 60 * 1000);
DELAY_TIME.put(9, 30 * 60 * 1000);
DELAY_TIME.put(10, 30 * 60 * 1000);
DELAY_TIME.put(11, 60 * 60 * 1000);
DELAY_TIME.put(12, 3 * 60 * 60 * 1000);
DELAY_TIME.put(13, 3 * 60 * 60 * 1000);
DELAY_TIME.put(14, 3 * 60 * 60 * 1000);
DELAY_TIME.put(15, 6 * 60 * 60 * 1000);
DELAY_TIME.put(16, 6 * 60 * 60 * 1000);
}
/**
* 注册消息通知任务, 失败重试3次, 间隔一秒
* 注册支付消息通知任务, 失败重试3次, 间隔一秒
* @param order 支付订单
* @param orderExtra 支付订单扩展信息
* @param channelOrders 支付通道订单
*/
@Async("bigExecutor")
@Retryable(value = RetryableException.class)
public void register(){
public void registerPayNotice(PayOrder order, PayOrderExtra orderExtra, List<PayChannelOrder> channelOrders) {
// 支付订单扩展信息为空则进行查询
if (Objects.isNull(orderExtra)){
Optional<PayOrderExtra> extraOpt = orderExtraManager.findById(order.getId());
if (!extraOpt.isPresent()){
log.error("未找到支付扩展信息数据错误订单ID{}",order.getId());
return;
}
orderExtra = extraOpt.get();
}
// 通道支付订单为空则进行查询
if (CollUtil.isEmpty(channelOrders)){
channelOrders = channelOrderManager.findAllByPaymentId(order.getId());
}
// 将任务写入到任务表中
ClientNoticeTask task = new ClientNoticeTask()
.setType(ClientNoticeTypeEnum.PAY.getType())
.setUrl(orderExtra.getNotifyUrl())
.setSendCount(0);
try {
taskManager.save(task);
} catch (Exception e) {
log.error("注册支付消息通知任务失败数据错误订单ID{}",order.getId());
throw new RuntimeException(e);
}
// 同时触发一次通知, 如果成功记录结束
try {
// 同时触发一次通知, 如果成功发送, 任务结束
this.sendData(task, LocalDateTime.now());
} catch (HttpException e) {
this.failHandler(task, LocalDateTime.now());
}
}
// 不成功更新任务表, 并注册到Redis对应的表中
/**
* 构建出任务对象
*/
private void buildTask(PayOrder order, PayOrderExtra orderExtra, List<PayChannelOrder> channelOrders){
// 组装内容
List<PayChannelResult> channels = channelOrders.stream()
.map(o->new PayChannelResult().setChannel(o.getChannel()).setWay(o.getPayWay()).setAmount(o.getAmount()))
.collect(Collectors.toList());
PayNoticeResult payNoticeResult = new PayNoticeResult()
.setPaymentId(order.getId())
.setAsyncPay(order.isAsyncPay())
.setBusinessNo(order.getBusinessNo())
.setAmount(order.getAmount())
.setPayTime(order.getPayTime())
.setCreateTime(order.getCreateTime())
.setStatus(order.getStatus())
.setAttach(orderExtra.getAttach())
.setPayChannels(channels);
PlatformConfig config = configService.getConfig();
// 是否需要签名
if (orderExtra.isNoticeSign()){
// 签名
if (Objects.equals(config.getSignType(), PaySignTypeEnum.MD5.getCode())){
payNoticeResult.setSign(PaySignUtil.md5Sign(payNoticeResult,config.getSignSecret()));
} else {
payNoticeResult.setSign(PaySignUtil.hmacSha256Sign(payNoticeResult,config.getSignSecret()));
}
}
}
/**
* 从redis中执行任务, 通过定时任务触发
*/
public void run(long start, long end){
@Async("asyncExecutor")
public void taskRun(long start, long end){
// 查询Redis任务表,获取任务
Set<String> taskIds = redisClient.zrangeByScore(KEY, start, end);
// 发起一个异步任务,
for (String taskId : taskIds) {
SpringUtil.getBean(this.getClass()).run(Long.valueOf(taskId));
this.run(Long.valueOf(taskId));
}
// 删除Redis中任务
redisClient.zremRangeByScore(KEY, start, end);
@@ -71,8 +175,7 @@ public class ClientNoticeService {
/**
* 执行任务
*/
@Async("asyncExecutor")
public void run(Long taskId){
private void run(Long taskId){
LocalDateTime now = LocalDateTime.now();
// 开启分布式锁
LockInfo lock = lockTemplate.lock(KEY + ":" + taskId,2000, 50);
@@ -82,20 +185,13 @@ public class ClientNoticeService {
// 查询任务, 进行发送
ClientNoticeTask task = taskManager.findById(taskId).orElse(null);
try {
if (Objects.nonNull(task)){
HttpResponse execute = HttpUtil.createPost(task.getUrl())
.body(task.getContent(), ContentType.JSON.getValue())
.timeout(5000)
.execute();
String body = execute.body();
// 如果响应值等于SUCCESS, 说明发送成功
if (Objects.equals(body, "SUCCESS")){
this.successHandler(task);
} else {
this.failHandler(task,now);
}
// 不存在任务直接跳过
if (Objects.isNull(task)) {
return;
}
this.sendData(task, now);
} catch (HttpException e) {
//noinspection DataFlowIssue
this.failHandler(task, now);
} finally {
lockTemplate.releaseLock(lock);
@@ -103,20 +199,56 @@ public class ClientNoticeService {
}
/**
* 成功处理
* 处理任务, 出现异常直接抛出
*/
public void successHandler(ClientNoticeTask task){
// 记录成功并保存
task.setSuccess(true);
private void sendData(ClientNoticeTask task, LocalDateTime now){
HttpResponse execute = HttpUtil.createPost(task.getUrl())
.body(task.getContent(), ContentType.JSON.getValue())
.timeout(5000)
.execute();
String body = execute.body();
// 如果响应值等于SUCCESS, 说明发送成功, 进行成功处理
if (Objects.equals(body, "SUCCESS")){
this.successHandler(task);
} else {
this.failHandler(task,now);
}
}
/**
* 失败处理
* 成功处理
*/
public void failHandler(ClientNoticeTask task, LocalDateTime now){
// 次数+1
// 注册任务到redis中
private void successHandler(ClientNoticeTask task){
// 记录成功并保存
task.setSuccess(true);
taskManager.updateById(task);
}
/**
* 失败处理, 首先发送次数+1, 然后
*/
private void failHandler(ClientNoticeTask task, LocalDateTime now){
// 次数+1
task.setSendCount(task.getSendCount() + 1);
// 注册任务到redis中
this.pushRedis(task, now);
// 更新任务
taskManager.updateById(task);
}
/**
* 注册到Redis任务列表中
*/
private void pushRedis(ClientNoticeTask task, LocalDateTime now){
// 判断发送次数是否超过16次
if (task.getSendCount() > 16){
return;
}
// 根据当前次数和时间计算出毫秒值
Integer delay = DELAY_TIME.get(task.getSendCount());
long taskTime = LocalDateTimeUtil.timestamp(now) + delay;
redisClient.zadd(KEY, String.valueOf(task.getId()), taskTime);
}
}

View File

@@ -1,4 +1,4 @@
package cn.bootx.platform.daxpay.service.core.notice.service;
package cn.bootx.platform.daxpay.service.core.payment.notice.service;
import cn.bootx.platform.daxpay.service.configuration.DaxPayProperties;
import cn.bootx.platform.daxpay.service.core.order.pay.dao.PayOrderExtraManager;

View File

@@ -156,7 +156,7 @@ public class PayAssistService {
String notifyUrl = PaymentContextLocal.get().getNoticeInfo().getNotifyUrl();
String returnUrl = PaymentContextLocal.get().getNoticeInfo().getReturnUrl();
payOrderExtra.setReqTime(payParam.getReqTime())
.setSign(payParam.getSign())
.setReqSign(payParam.getSign())
.setNotNotify(payParam.isNotNotify())
.setNotifyUrl(notifyUrl)
.setReturnUrl(returnUrl)

View File

@@ -13,6 +13,7 @@ import cn.bootx.platform.daxpay.service.core.order.pay.dao.PayChannelOrderManage
import cn.bootx.platform.daxpay.service.core.order.pay.entity.PayChannelOrder;
import cn.bootx.platform.daxpay.service.core.order.pay.entity.PayOrder;
import cn.bootx.platform.daxpay.service.core.order.pay.service.PayOrderService;
import cn.bootx.platform.daxpay.service.core.payment.notice.service.ClientNoticeService;
import cn.bootx.platform.daxpay.service.core.payment.pay.factory.PayStrategyFactory;
import cn.bootx.platform.daxpay.service.func.AbsPayStrategy;
import cn.bootx.platform.daxpay.util.PayUtil;
@@ -51,6 +52,8 @@ public class PayService {
private final PayAssistService payAssistService;
private final ClientNoticeService clientNoticeService;
private final PayChannelOrderManager payChannelOrderManager;
private final LockTemplate lockTemplate;
@@ -135,7 +138,7 @@ public class PayService {
}
/**
* 执行第一次支付的方法
* 执行第一次发起支付的方法
*/
private void firstPayHandler(PayParam payParam, PayOrder payOrder) {
@@ -173,7 +176,7 @@ public class PayService {
.setPayTime(LocalDateTime.now());
payOrderService.updateById(payOrder);
}
// 5.2 如果异步支付完成, 进行订单完成处理
// 5.2 如果异步支付完成, 进行订单完成处理, 同时发送回调消息
AsyncPayLocal asyncPayInfo = PaymentContextLocal.get().getAsyncPayInfo();
if (asyncPayInfo.isPayComplete()) {
payOrder.setGatewayOrderNo(asyncPayInfo.getGatewayOrderNo())
@@ -182,10 +185,15 @@ public class PayService {
payOrderService.updateById(payOrder);
}
// 如果支付完成 发送通知
if (Objects.equals(payOrder.getStatus(), SUCCESS.getCode())){
clientNoticeService.registerPayNotice(payOrder, null, channelOrders);
}
}
/**
* 异步支付执行(非第一次请求), 只执行异步支付策略, 报错不影响继续发起支付
* 异步支付执行(非第一次请求), 只执行异步支付策略, 因为同步支付已经支付成功. 报错不影响继续发起支付
*/
private PayResult paySyncNotFirst(PayParam payParam, PayOrder payOrder) {
@@ -196,7 +204,7 @@ public class PayService {
return PayBuilder.buildPayResultByPayOrder(payOrder);
}
// 2.获取 异步支付通道,通过工厂生成对应的策略组(只包含异步支付的策略, 同步支付相关逻辑不再进行执行)
// 2.获取 异步支付通道,通过工厂生成对应的策略组(只包含异步支付的策略, 同步支付已经成功不再继续执行)
PayChannelParam payChannelParam = payAssistService.getAsyncPayParam(payParam, payOrder);
List<AbsPayStrategy> payStrategyList = PayStrategyFactory.createAsyncLast(Collections.singletonList(payChannelParam));
@@ -212,7 +220,16 @@ public class PayService {
// 5.2支付发起成功处理
payStrategyList.forEach(AbsPayStrategy::doSuccessHandler);
// 6. 更新支付订单和扩展参数
// 6.1 如果异步支付完成, 进行订单完成处理, 并触发通知
AsyncPayLocal asyncPayInfo = PaymentContextLocal.get().getAsyncPayInfo();
if (asyncPayInfo.isPayComplete()) {
payOrder.setGatewayOrderNo(asyncPayInfo.getGatewayOrderNo())
.setStatus(SUCCESS.getCode())
.setPayTime(LocalDateTime.now());
clientNoticeService.registerPayNotice(payOrder,null,null);
}
// 6.2 更新支付订单和扩展参数
payOrderService.updateById(payOrder);
payAssistService.updatePayOrderExtra(payParam,payOrder.getId());

View File

@@ -3,13 +3,15 @@ package cn.bootx.platform.daxpay.service.core.payment.repair.service;
import cn.bootx.platform.common.core.function.CollectorsFunction;
import cn.bootx.platform.daxpay.code.PayStatusEnum;
import cn.bootx.platform.daxpay.exception.pay.PayFailureException;
import cn.bootx.platform.daxpay.service.code.PaymentTypeEnum;
import cn.bootx.platform.daxpay.service.code.PayRepairWayEnum;
import cn.bootx.platform.daxpay.service.code.PaymentTypeEnum;
import cn.bootx.platform.daxpay.service.common.local.PaymentContextLocal;
import cn.bootx.platform.daxpay.service.core.order.pay.dao.PayChannelOrderManager;
import cn.bootx.platform.daxpay.service.core.order.pay.dao.PayOrderExtraManager;
import cn.bootx.platform.daxpay.service.core.order.pay.entity.PayChannelOrder;
import cn.bootx.platform.daxpay.service.core.order.pay.entity.PayOrder;
import cn.bootx.platform.daxpay.service.core.order.pay.service.PayOrderService;
import cn.bootx.platform.daxpay.service.core.payment.notice.service.ClientNoticeService;
import cn.bootx.platform.daxpay.service.core.payment.repair.factory.PayRepairStrategyFactory;
import cn.bootx.platform.daxpay.service.core.payment.repair.result.PayRepairResult;
import cn.bootx.platform.daxpay.service.core.record.repair.entity.PayRepairRecord;
@@ -41,10 +43,14 @@ public class PayRepairService {
private final PayOrderService payOrderService;
private final ClientNoticeService clientNoticeService;
private final PayChannelOrderManager channelOrderManager;
private final PayRepairRecordService recordService;
private final PayOrderExtraManager payOrderExtraManager;
/**
* 修复支付单
*/
@@ -122,6 +128,11 @@ public class PayRepairService {
// 读取支付网关中的时间
order.setPayTime(payTime);
payOrderService.updateById(order);
List<PayChannelOrder> channelOrders = strategies.stream()
.map(AbsPayRepairStrategy::getChannelOrder)
.collect(Collectors.toList());
// 发送通知
clientNoticeService.registerPayNotice(order, null, channelOrders);
}
/**