feat 适配消息队列

This commit is contained in:
xxm1995
2023-07-20 16:14:25 +08:00
parent 28911d91af
commit 30894b1cab
20 changed files with 161 additions and 119 deletions

View File

@@ -1,7 +1,8 @@
1.0.1
- 微信V3支付接口
- x 拆分网关同步相关代码
- x 记录网关同步记录
- 重构支付消息通知结构, 支持多种消息中间件
- x 重构支付消息通知结构, 支持多种消息中间件
- x 保存各通道的支付单
- 钱包支持设置开通时的默认金额
- 储值卡多卡支付和退款演示
@@ -9,3 +10,6 @@
- x 储值卡多卡退款到单卡, 不传卡号自动设置为默认卡
- 储值卡批量导入
- 储值卡支持多卡合一
1.0.2
- 支付超时逻辑重构
-

View File

@@ -121,9 +121,15 @@
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot</artifactId>
<version>${rocketmq.version}</version>
<optional>true</optional>
</dependency>
<!-- 消息队列 RabbitMQ-->
<!-- 消息队列 ActiveMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
<optional>true</optional>
</dependency>
<!-- 自动建表 -->
<dependency>

View File

@@ -1,9 +1,7 @@
package cn.bootx.platform.daxpay.core.pay.builder;
import cn.bootx.platform.daxpay.core.payment.entity.Payment;
import cn.bootx.platform.daxpay.mq.event.PayCancelEvent;
import cn.bootx.platform.daxpay.mq.event.PayCompleteEvent;
import cn.bootx.platform.daxpay.mq.event.PayRefundEvent;
import cn.bootx.platform.daxpay.mq.event.*;
import lombok.experimental.UtilityClass;
/**
@@ -19,27 +17,32 @@ public class PayEventBuilder {
* 支付完成
*/
public PayCompleteEvent buildPayComplete(Payment payment) {
PayCompleteEvent event = new PayCompleteEvent().setPaymentId(payment.getId())
return new PayCompleteEvent().setPaymentId(payment.getId())
.setBusinessId(payment.getBusinessId());
return event;
}
/**
* 支付撤销/关闭
*/
public PayCancelEvent buildPayCancel(Payment payment) {
PayCancelEvent event = new PayCancelEvent().setPaymentId(payment.getId())
return new PayCancelEvent().setPaymentId(payment.getId())
.setBusinessId(payment.getBusinessId());
}
/**
* 支付超时
*/
public PayExpiredTimeEvent buildPayExpiredTime(Payment payment) {
return new PayExpiredTimeEvent().setPaymentId(payment.getId())
.setBusinessId(payment.getBusinessId());
return event;
}
/**
* 支付退款
*/
public PayRefundEvent buildPayRefund(Payment payment) {
PayRefundEvent event = new PayRefundEvent().setPaymentId(payment.getId())
return new PayRefundEvent().setPaymentId(payment.getId())
.setBusinessId(payment.getBusinessId());
return event;
}
}

View File

@@ -14,7 +14,7 @@ import cn.bootx.platform.daxpay.core.pay.func.PayStrategyConsumer;
import cn.bootx.platform.daxpay.core.pay.result.PayCallbackResult;
import cn.bootx.platform.daxpay.core.payment.entity.Payment;
import cn.bootx.platform.daxpay.core.payment.service.PaymentService;
import cn.bootx.platform.daxpay.mq.PaymentEventSender;
import cn.bootx.platform.daxpay.mq.PayEventSender;
import cn.bootx.platform.daxpay.param.pay.PayParam;
import cn.hutool.core.collection.CollectionUtil;
import lombok.RequiredArgsConstructor;
@@ -40,7 +40,7 @@ public class PayCallbackService {
private final PaymentService paymentService;
private final PaymentEventSender eventSender;
private final PayEventSender eventSender;
/**
* 统一回调处理

View File

@@ -11,7 +11,7 @@ import cn.bootx.platform.daxpay.core.payment.service.PaymentService;
import cn.bootx.platform.daxpay.exception.payment.PayFailureException;
import cn.bootx.platform.daxpay.exception.payment.PayNotExistedException;
import cn.bootx.platform.daxpay.exception.payment.PayUnsupportedMethodException;
import cn.bootx.platform.daxpay.mq.PaymentEventSender;
import cn.bootx.platform.daxpay.mq.PayEventSender;
import cn.bootx.platform.daxpay.param.pay.PayParam;
import cn.hutool.core.collection.CollectionUtil;
import lombok.RequiredArgsConstructor;
@@ -37,7 +37,7 @@ public class PayCancelService {
private final PaymentService paymentService;
private final PaymentEventSender paymentEventSender;
private final PayEventSender payEventSender;
/**
* 根据业务id取消支付记录
@@ -95,7 +95,7 @@ public class PayCancelService {
payment = paymentService.findById(payment.getId()).orElseThrow(PayNotExistedException::new);
// 5. 发布撤销事件
paymentEventSender.sendPayCancel(PayEventBuilder.buildPayCancel(payment));
payEventSender.sendPayCancel(PayEventBuilder.buildPayCancel(payment));
}
/**

View File

@@ -21,7 +21,7 @@ import cn.bootx.platform.daxpay.dto.pay.PayResult;
import cn.bootx.platform.daxpay.exception.payment.PayFailureException;
import cn.bootx.platform.daxpay.exception.payment.PayNotExistedException;
import cn.bootx.platform.daxpay.exception.payment.PayUnsupportedMethodException;
import cn.bootx.platform.daxpay.mq.PaymentEventSender;
import cn.bootx.platform.daxpay.mq.PayEventSender;
import cn.bootx.platform.daxpay.param.pay.PayParam;
import cn.bootx.platform.daxpay.param.pay.PayWayParam;
import cn.bootx.platform.daxpay.util.PayWaylUtil;
@@ -50,7 +50,7 @@ public class PayService {
private final PaymentService paymentService;
private final PaymentEventSender eventSender;
private final PayEventSender eventSender;
private final MchAppManager mchAppManager;

View File

@@ -1,7 +1,6 @@
package cn.bootx.platform.daxpay.core.sync.record.entity;
import cn.bootx.mybatis.table.modify.annotation.DbComment;
import cn.bootx.mybatis.table.modify.annotation.DbTable;
import cn.bootx.mybatis.table.modify.mybatis.mysq.annotation.DbMySqlFieldType;
import cn.bootx.mybatis.table.modify.mybatis.mysq.constants.MySqlFieldTypeEnum;
import cn.bootx.platform.common.core.function.EntityBaseFunction;
@@ -24,7 +23,7 @@ import java.time.LocalDateTime;
*/
@EqualsAndHashCode(callSuper = true)
@Data
@DbTable(comment = "支付同步记录")
//@DbTable(comment = "支付同步记录")
@Accessors(chain = true)
@TableName("pay_sync_record")
public class PaySyncRecord extends MpCreateEntity implements EntityBaseFunction<PaySyncRecordDto> {

View File

@@ -5,7 +5,7 @@ import cn.bootx.platform.daxpay.core.pay.func.AbsPayStrategy;
import cn.bootx.platform.daxpay.core.payment.entity.Payment;
import cn.bootx.platform.daxpay.core.payment.service.PaymentService;
import cn.bootx.platform.daxpay.core.sync.result.PaySyncResult;
import cn.bootx.platform.daxpay.mq.PaymentEventSender;
import cn.bootx.platform.daxpay.mq.PayEventSender;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
@@ -31,7 +31,7 @@ public class PayExpiredTimeService {
private final PaymentService paymentService;
private final PaymentEventSender eventSender;
private final PayEventSender eventSender;
/**
* 支付单超时支付单处理

View File

@@ -14,7 +14,7 @@ import cn.bootx.platform.daxpay.core.sync.record.service.PaySyncRecordService;
import cn.bootx.platform.daxpay.core.sync.result.PaySyncResult;
import cn.bootx.platform.daxpay.exception.payment.PayFailureException;
import cn.bootx.platform.daxpay.exception.payment.PayUnsupportedMethodException;
import cn.bootx.platform.daxpay.mq.PaymentEventSender;
import cn.bootx.platform.daxpay.mq.PayEventSender;
import cn.bootx.platform.daxpay.param.pay.PayParam;
import cn.bootx.platform.daxpay.param.pay.PayWayParam;
import cn.bootx.platform.daxpay.util.PayWaylUtil;
@@ -44,7 +44,7 @@ public class PaySyncService {
private final PaySyncRecordService paySyncRecordService;
private final PaymentEventSender eventSender;
private final PayEventSender eventSender;
/**
* 同步订单的支付状态

View File

@@ -1,13 +1,11 @@
package cn.bootx.platform.daxpay.mq;
import cn.bootx.platform.daxpay.code.PaymentEventCode;
import cn.bootx.platform.daxpay.mq.event.PayCancelEvent;
import cn.bootx.platform.daxpay.mq.event.PayCompleteEvent;
import cn.bootx.platform.daxpay.mq.event.PayExpiredTimeEvent;
import cn.bootx.platform.daxpay.mq.event.PayRefundEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.retry.annotation.Retryable;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
@@ -21,7 +19,7 @@ import org.springframework.stereotype.Component;
@Slf4j
@Component
@RequiredArgsConstructor
public class PaymentEventSender {
public class PayEventSender {
private final PayMqMsgSender mqMsgSender;
@@ -40,7 +38,7 @@ public class PaymentEventSender {
@Async("bigExecutor")
@Retryable(value = Exception.class)
public void sendPayCancel(PayCancelEvent event) {
rabbitTemplate.convertAndSend(PaymentEventCode.EXCHANGE_PAYMENT, PaymentEventCode.PAY_CANCEL, event);
mqMsgSender.send(event);
}
/**
@@ -49,7 +47,7 @@ public class PaymentEventSender {
@Async("bigExecutor")
@Retryable(value = Exception.class)
public void sendPayRefund(PayRefundEvent event) {
rabbitTemplate.convertAndSend(PaymentEventCode.EXCHANGE_PAYMENT, PaymentEventCode.PAY_REFUND, event);
mqMsgSender.send(event);
}
/**
@@ -57,9 +55,8 @@ public class PaymentEventSender {
*/
@Async("bigExecutor")
@Retryable(value = Exception.class)
public void sendPaymentExpiredTime(Long paymentId) {
rabbitTemplate.convertAndSend(PaymentEventCode.EXCHANGE_PAYMENT, PaymentEventCode.PAYMENT_EXPIRED_TIME,
paymentId);
public void sendPayExpiredTime(PayExpiredTimeEvent event) {
mqMsgSender.send(event);
}
}

View File

@@ -1,75 +0,0 @@
package cn.bootx.platform.daxpay.mq;
import cn.bootx.platform.daxpay.code.PaymentEventCode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
/**
* 消息队列配置
*
* @author xxm
* @since 2021/6/25
*/
//@Configuration
public class PaymentRabbitMqConfiguration {
/** 支付完成队列 */
@Bean
public Queue payComplete() {
return new Queue(PaymentEventCode.PAY_COMPLETE);
}
/** 支付关闭/撤销队列 */
@Bean
public Queue payCancel() {
return new Queue(PaymentEventCode.PAY_CANCEL);
}
/** 支付退款队列 */
@Bean
public Queue payRefund() {
return new Queue(PaymentEventCode.PAY_REFUND);
}
/** 支付超时通知队列 */
@Bean
public Queue paymentExpiredTime() {
return new Queue(PaymentEventCode.PAYMENT_EXPIRED_TIME);
}
/** 交换机 */
@Bean
public DirectExchange paymentExchange() {
return new DirectExchange(PaymentEventCode.EXCHANGE_PAYMENT);
}
/** 绑定支付完成 */
@Bean
public Binding bindPayComplete() {
return BindingBuilder.bind(payComplete()).to(paymentExchange()).with(PaymentEventCode.PAY_COMPLETE);
}
/** 绑定支付关闭/撤销 */
@Bean
public Binding bindPayCancel() {
return BindingBuilder.bind(payCancel()).to(paymentExchange()).with(PaymentEventCode.PAY_CANCEL);
}
/** 绑定支付退款 */
@Bean
public Binding bindPayRefund() {
return BindingBuilder.bind(payRefund()).to(paymentExchange()).with(PaymentEventCode.PAY_REFUND);
}
/** 绑定支付超时通知 */
@Bean
public Binding bindPaymentExpiredTime() {
return BindingBuilder.bind(paymentExpiredTime())
.to(paymentExchange())
.with(PaymentEventCode.PAYMENT_EXPIRED_TIME);
}
}

View File

@@ -1,5 +1,7 @@
package cn.bootx.platform.daxpay.mq.event;
import cn.bootx.platform.common.jackson.util.JacksonUtil;
/**
* Mq事件消息定义
* @author xxm
@@ -10,7 +12,7 @@ public interface PayEvent {
String getQueueName();
/** 要发送的消息体 */
default Object toMessage(){
return this;
};
return JacksonUtil.toJson(this);
}
}

View File

@@ -0,0 +1,29 @@
package cn.bootx.platform.daxpay.mq.event;
import cn.bootx.platform.daxpay.code.PaymentEventCode;
import lombok.Data;
import lombok.experimental.Accessors;
/**
* 支付超时事件
* @author xxm
* @since 2023/7/20
*/
@Data
@Accessors(chain = true)
public class PayExpiredTimeEvent implements PayEvent{
/** 支付单ID */
private Long paymentId;
/** 业务单号 */
private String businessId;
/**
* MQ队列名称
*/
@Override
public String getQueueName() {
return PaymentEventCode.PAYMENT_EXPIRED_TIME;
}
}

View File

@@ -0,0 +1,14 @@
package cn.bootx.platform.daxpay.mq.vender.active.listener;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
/**
* activeMQ 消息接收
* @author xxm
* @since 2023/7/20
*/
@Component
@ConditionalOnProperty(name ="bootx.daxpay.mq-type", havingValue = "active")
public class PayActiveMqMsgListener {
}

View File

@@ -0,0 +1,47 @@
package cn.bootx.platform.daxpay.mq.vender.active.sender;
import cn.bootx.platform.daxpay.mq.PayMqMsgSender;
import cn.bootx.platform.daxpay.mq.event.PayEvent;
import lombok.RequiredArgsConstructor;
import org.apache.activemq.ScheduledMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
import javax.jms.TextMessage;
/**
* activeMQ 发送器
* @author xxm
* @since 2023/7/20
*/
@Component
@RequiredArgsConstructor
@ConditionalOnProperty(name ="bootx.daxpay.mq-type", havingValue = "active")
public class PayActiveMqMsgSender implements PayMqMsgSender {
private final JmsTemplate jmsTemplate;
/**
* 实时推送MQ消息
*/
@Override
public void send(PayEvent msg) {
jmsTemplate.convertAndSend(new ActiveMQQueue(msg.getQueueName()),msg.toMessage());
}
/**
* 推送MQ延迟消息
*/
@Override
public void send(PayEvent msg, int delay) {
jmsTemplate.send(new ActiveMQQueue(msg.getQueueName()), session -> {
TextMessage tm = session.createTextMessage(msg.toMessage().toString());
tm.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay * 1000);
tm.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 1*1000);
tm.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 1);
return tm;
});
}
}

View File

@@ -5,7 +5,7 @@ import cn.bootx.platform.daxpay.mq.event.PayEvent;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
import org.springframework.stereotype.Component;
import static cn.bootx.platform.daxpay.code.PaymentEventCode.DELAYED_EXCHANGE_PAYMENT;
@@ -14,7 +14,7 @@ import static cn.bootx.platform.daxpay.code.PaymentEventCode.DELAYED_EXCHANGE_PA
* @author xxm
* @since 2023/7/17
*/
@Service
@Component
@RequiredArgsConstructor
@ConditionalOnProperty(name ="bootx.daxpay.mq-type", havingValue = "rabbit")
public class PayRabbitMqMsgSender implements PayMqMsgSender {

View File

@@ -1,12 +1,26 @@
package cn.bootx.platform.daxpay.mq.vender.rocket.listener;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
/**
* rocketMQ 消息监听
* @author xxm
* @since 2023/7/17
*/
@Slf4j
@Component
@ConditionalOnProperty(name ="bootx.daxpay.mq-type", havingValue = "rocket")
public class PayRocketMqMsgListener {
//@RocketMQMessageListener(topic ="MQ_NAME", consumerGroup = "MQ_NAME")
public class PayRocketMqMsgListener implements RocketMQListener<String> {
/**
* 消息处理
*/
@Override
public void onMessage(String message) {
}
}

View File

@@ -5,14 +5,14 @@ import cn.bootx.platform.daxpay.mq.event.PayEvent;
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
import org.springframework.stereotype.Component;
/**
* rocketMQ 发送器
* @author xxm
* @since 2023/7/17
*/
@Service
@Component
@RequiredArgsConstructor
@ConditionalOnProperty(name ="bootx.daxpay.mq-type", havingValue = "rocket")
public class PayRocketMqMsgSender implements PayMqMsgSender {

View File

@@ -5,6 +5,7 @@ import cn.bootx.platform.daxpay.mq.event.PayEvent;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
/**
@@ -12,7 +13,7 @@ import org.springframework.stereotype.Service;
* @author xxm
* @since 2023/7/17
*/
@Service
@Component
@RequiredArgsConstructor
@ConditionalOnProperty(name ="bootx.daxpay.mq-type", havingValue = "spring", matchIfMissing = true)
public class PaySpringMsgSender implements PayMqMsgSender {

View File

@@ -2,7 +2,8 @@ package cn.bootx.platform.daxpay.task;
import cn.bootx.platform.common.core.util.CollUtil;
import cn.bootx.platform.daxpay.core.payment.dao.PaymentExpiredTimeRepository;
import cn.bootx.platform.daxpay.mq.PaymentEventSender;
import cn.bootx.platform.daxpay.mq.PayEventSender;
import cn.bootx.platform.daxpay.mq.event.PayExpiredTimeEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@@ -24,7 +25,7 @@ public class PayExpiredTimeTaskService {
private final PaymentExpiredTimeRepository expiredTimeRepository;
private final PaymentEventSender paymentEventSender;
private final PayEventSender payEventSender;
/**
* 定时查询, 如果有过时的发送到消息队列
@@ -36,7 +37,7 @@ public class PayExpiredTimeTaskService {
.collect(Collectors.toList());
if (CollUtil.isNotEmpty(paymentIds)) {
expiredTimeRepository.removeKeys(paymentIds.stream().map(String::valueOf).toArray(String[]::new));
paymentIds.forEach(paymentEventSender::sendPaymentExpiredTime);
paymentIds.forEach(id-> payEventSender.sendPayExpiredTime(new PayExpiredTimeEvent().setPaymentId(id)));
}
}