feat 抽取消息通知层

This commit is contained in:
xxm1995
2023-07-17 17:32:39 +08:00
parent 9477e8c3c7
commit 28911d91af
18 changed files with 350 additions and 28 deletions

View File

@@ -110,12 +110,20 @@
<artifactId>common-starter-quartz</artifactId>
</dependency>
<!-- 消息队列 amqp -->
<!-- 消息队列 RabbitMQ -->
<dependency>
<groupId>cn.bootx.platform</groupId>
<artifactId>common-rabbitmq</artifactId>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<optional>true</optional>
</dependency>
<!-- 消息队列 rocketMQ-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot</artifactId>
<version>${rocketmq.version}</version>
</dependency>
<!-- 消息队列 RabbitMQ-->
<!-- 自动建表 -->
<dependency>

View File

@@ -3,7 +3,10 @@ package cn.bootx.platform.daxpay.code;
public interface PaymentEventCode {
/** 支付中心交换机 */
String EXCHANGE_PAYMENT = "service.payment";
String EXCHANGE_PAYMENT = "service.exchange";
/** 支付中心交换机 */
String DELAYED_EXCHANGE_PAYMENT = "service.delayedExchange";
/** 支付完成 */
String PAY_COMPLETE = "pay.complete";

View File

@@ -0,0 +1,37 @@
package cn.bootx.platform.daxpay.configuration;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* 项目
* @author xxm
* @since 2023/7/17
*/
@Getter
@Setter
@ConfigurationProperties(prefix = "bootx.daxpay")
public class DaxPayProperties {
/**
* 消息队列类型
*/
private MqType mqType = MqType.SPRING;
/**
* 消息队列类型
* @author xxm
* @since 2023/7/17
*/
public enum MqType{
/** Spring 消息 */
SPRING,
/** ActiveMQ */
ACTIVE,
/** RabbitMQ */
RABBIT,
/** RocketMQ */
ROCKET;
}
}

View File

@@ -1,9 +1,9 @@
package cn.bootx.platform.daxpay.core.pay.builder;
import cn.bootx.platform.daxpay.core.payment.entity.Payment;
import cn.bootx.platform.daxpay.event.PayCancelEvent;
import cn.bootx.platform.daxpay.event.PayCompleteEvent;
import cn.bootx.platform.daxpay.event.PayRefundEvent;
import cn.bootx.platform.daxpay.mq.event.PayCancelEvent;
import cn.bootx.platform.daxpay.mq.event.PayCompleteEvent;
import cn.bootx.platform.daxpay.mq.event.PayRefundEvent;
import lombok.experimental.UtilityClass;
/**

View File

@@ -0,0 +1,24 @@
package cn.bootx.platform.daxpay.mq;
import cn.bootx.platform.daxpay.mq.event.PayEvent;
/**
* 支付消息队列发送接口定义
* @author xxm
* @since 2023/7/17
*/
public interface PayMqMsgSender {
/**
* 实时推送MQ消息
* @param msg 消息
*/
void send(PayEvent msg);
/**
* 推送MQ延迟消息
* @param msg 消息
* @param delay 延迟时间, 单位秒
*/
void send(PayEvent msg, int delay);
}

View File

@@ -1,12 +1,13 @@
package cn.bootx.platform.daxpay.mq;
import cn.bootx.platform.daxpay.code.PaymentEventCode;
import cn.bootx.platform.daxpay.event.PayCancelEvent;
import cn.bootx.platform.daxpay.event.PayCompleteEvent;
import cn.bootx.platform.daxpay.event.PayRefundEvent;
import cn.bootx.platform.daxpay.mq.event.PayCancelEvent;
import cn.bootx.platform.daxpay.mq.event.PayCompleteEvent;
import cn.bootx.platform.daxpay.mq.event.PayRefundEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.retry.annotation.Retryable;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
@@ -22,7 +23,7 @@ import org.springframework.stereotype.Component;
@RequiredArgsConstructor
public class PaymentEventSender {
private final RabbitTemplate rabbitTemplate;
private final PayMqMsgSender mqMsgSender;
/**
* 支付完成 事件发布
@@ -30,7 +31,7 @@ public class PaymentEventSender {
@Async("bigExecutor")
@Retryable(value = Exception.class)
public void sendPayComplete(PayCompleteEvent event) {
rabbitTemplate.convertAndSend(PaymentEventCode.EXCHANGE_PAYMENT, PaymentEventCode.PAY_COMPLETE, event);
mqMsgSender.send(event);
}
/**

View File

@@ -1,4 +1,4 @@
package cn.bootx.platform.daxpay.configuration;
package cn.bootx.platform.daxpay.mq;
import cn.bootx.platform.daxpay.code.PaymentEventCode;
import org.springframework.amqp.core.Binding;
@@ -6,7 +6,6 @@ import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 消息队列配置
@@ -14,7 +13,7 @@ import org.springframework.context.annotation.Configuration;
* @author xxm
* @since 2021/6/25
*/
@Configuration
//@Configuration
public class PaymentRabbitMqConfiguration {
/** 支付完成队列 */

View File

@@ -1,4 +1,4 @@
package cn.bootx.platform.daxpay.event;
package cn.bootx.platform.daxpay.mq.event;
import lombok.Data;
import lombok.experimental.Accessors;
@@ -11,7 +11,7 @@ import lombok.experimental.Accessors;
*/
@Data
@Accessors(chain = true)
public class PayCancelEvent {
public class PayCancelEvent implements PayEvent{
/** 支付单ID */
private Long paymentId;
@@ -19,4 +19,12 @@ public class PayCancelEvent {
/** 业务单号 */
private String businessId;
/**
* MQ队列名称
*/
@Override
public String getQueueName() {
return null;
}
}

View File

@@ -1,4 +1,4 @@
package cn.bootx.platform.daxpay.event;
package cn.bootx.platform.daxpay.mq.event;
import lombok.Data;
import lombok.experimental.Accessors;
@@ -11,7 +11,7 @@ import lombok.experimental.Accessors;
*/
@Data
@Accessors(chain = true)
public class PayCompleteEvent {
public class PayCompleteEvent implements PayEvent{
/** 支付单ID */
private Long paymentId;
@@ -19,4 +19,11 @@ public class PayCompleteEvent {
/** 业务单号 */
private String businessId;
/**
* MQ队列名称
*/
@Override
public String getQueueName() {
return null;
}
}

View File

@@ -0,0 +1,16 @@
package cn.bootx.platform.daxpay.mq.event;
/**
* Mq事件消息定义
* @author xxm
* @since 2023/7/17
*/
public interface PayEvent {
/** MQ队列名称 */
String getQueueName();
/** 要发送的消息体 */
default Object toMessage(){
return this;
};
}

View File

@@ -1,4 +1,4 @@
package cn.bootx.platform.daxpay.event;
package cn.bootx.platform.daxpay.mq.event;
import lombok.Data;
import lombok.experimental.Accessors;
@@ -11,7 +11,7 @@ import lombok.experimental.Accessors;
*/
@Data
@Accessors(chain = true)
public class PayRefundEvent {
public class PayRefundEvent implements PayEvent{
/** 支付单ID */
private Long paymentId;
@@ -19,4 +19,11 @@ public class PayRefundEvent {
/** 业务单号 */
private String businessId;
/**
* MQ队列名称
*/
@Override
public String getQueueName() {
return null;
}
}

View File

@@ -0,0 +1,80 @@
package cn.bootx.platform.daxpay.mq.vender.rabbit.config;
import cn.bootx.platform.daxpay.code.PaymentEventCode;
import cn.bootx.platform.daxpay.mq.event.PayEvent;
import cn.hutool.core.util.ClassUtil;
import cn.hutool.core.util.ReflectUtil;
import cn.hutool.extra.spring.SpringUtil;
import org.springframework.amqp.core.*;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.BeanDefinitionRegistryPostProcessor;
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import static cn.bootx.platform.daxpay.code.PaymentEventCode.DELAYED_EXCHANGE_PAYMENT;
/**
* 注入Rabbit相关的消息类
* @author xxm
* @since 2023/7/17
*/
@Configuration
@ConditionalOnProperty(name ="bootx.daxpay.mq-type", havingValue = "rabbit")
public class PayRabbitBeanRegistry implements BeanDefinitionRegistryPostProcessor {
/** 自定义交换机: 用于延迟消息 **/
@Bean
CustomExchange delayedExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(DELAYED_EXCHANGE_PAYMENT, "x-delayed-message", true, false, args);
}
/**
* 注册Bean
* @param beanDefinitionRegistry 应用程序上下文使用的 Bean 定义注册表
*/
@Override
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry beanDefinitionRegistry) throws BeansException {
this.initRabbitBeans(beanDefinitionRegistry);
}
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory configurableListableBeanFactory) throws BeansException {
}
/**
* 初始化RabbitMQ相关的消息, 并注册到Bean容器中
*/
public void initRabbitBeans(BeanDefinitionRegistry beanDefinitionRegistry){
// 扫描所有的消息类
Set<Class<?>> set = ClassUtil.scanPackageBySuper(ClassUtil.getPackage(PayEvent.class), PayEvent.class);
// 遍历
for (Class<?> clazz : set) {
// 实例化
PayEvent eventMsg = (PayEvent) ReflectUtil.newInstance(clazz);
// 注册队列(Queue)
RootBeanDefinition queueDefinition = new RootBeanDefinition(Queue.class, () -> new Queue(eventMsg.getQueueName()));
beanDefinitionRegistry.registerBeanDefinition(eventMsg.getQueueName(),queueDefinition);
// 注册与交换机的绑定关系Binding
Queue queue = SpringUtil.getBean(eventMsg.getQueueName(), Queue.class);
RootBeanDefinition bindingDefinition = new RootBeanDefinition(Binding.class,
BindingBuilder
.bind(queue)
.to(delayedExchange())
.with(PaymentEventCode.PAY_REFUND)::noargs);
beanDefinitionRegistry.registerBeanDefinition(eventMsg.getQueueName()+".exchange",bindingDefinition);
}
}
}

View File

@@ -1,14 +1,14 @@
package cn.bootx.platform.daxpay.mq;
package cn.bootx.platform.daxpay.mq.vender.rabbit.listener;
import cn.bootx.platform.common.rabbit.conditional.ConditionalOnRabbit;
import cn.bootx.platform.daxpay.code.PaymentEventCode;
import cn.bootx.platform.daxpay.core.sync.service.PayExpiredTimeService;
import cn.bootx.platform.daxpay.event.PayCancelEvent;
import cn.bootx.platform.daxpay.event.PayCompleteEvent;
import cn.bootx.platform.daxpay.event.PayRefundEvent;
import cn.bootx.platform.daxpay.mq.event.PayCancelEvent;
import cn.bootx.platform.daxpay.mq.event.PayCompleteEvent;
import cn.bootx.platform.daxpay.mq.event.PayRefundEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
/**
@@ -19,9 +19,9 @@ import org.springframework.stereotype.Component;
*/
@Slf4j
@Component
@ConditionalOnRabbit
@ConditionalOnProperty(name ="bootx.daxpay.mq-type", havingValue = "rabbit")
@RequiredArgsConstructor
public class PaymentMessageListener {
public class PayRabbitMqMsgListener {
private final PayExpiredTimeService payExpiredTimeService;

View File

@@ -0,0 +1,42 @@
package cn.bootx.platform.daxpay.mq.vender.rabbit.sender;
import cn.bootx.platform.daxpay.mq.PayMqMsgSender;
import cn.bootx.platform.daxpay.mq.event.PayEvent;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
import static cn.bootx.platform.daxpay.code.PaymentEventCode.DELAYED_EXCHANGE_PAYMENT;
/**
* RabbitMQ消息发送器
* @author xxm
* @since 2023/7/17
*/
@Service
@RequiredArgsConstructor
@ConditionalOnProperty(name ="bootx.daxpay.mq-type", havingValue = "rabbit")
public class PayRabbitMqMsgSender implements PayMqMsgSender {
private final RabbitTemplate rabbitTemplate;
/**
* 实时推送MQ消息
*/
@Override
public void send(PayEvent msg) {
rabbitTemplate.convertAndSend(msg.getQueueName(), msg.toMessage());
}
/**
* 推送MQ延迟消息
*/
@Override
public void send(PayEvent msg, int delay) {
rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_PAYMENT, msg.getQueueName(), msg.toMessage(),
messagePostProcessor ->{
messagePostProcessor.getMessageProperties().setDelay(Math.toIntExact(delay * 1000L));
return messagePostProcessor;
});
}
}

View File

@@ -0,0 +1,12 @@
package cn.bootx.platform.daxpay.mq.vender.rocket.listener;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
/**
* rocketMQ 消息监听
* @author xxm
* @since 2023/7/17
*/
@ConditionalOnProperty(name ="bootx.daxpay.mq-type", havingValue = "rocket")
public class PayRocketMqMsgListener {
}

View File

@@ -0,0 +1,36 @@
package cn.bootx.platform.daxpay.mq.vender.rocket.sender;
import cn.bootx.platform.daxpay.mq.PayMqMsgSender;
import cn.bootx.platform.daxpay.mq.event.PayEvent;
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
/**
* rocketMQ 发送器
* @author xxm
* @since 2023/7/17
*/
@Service
@RequiredArgsConstructor
@ConditionalOnProperty(name ="bootx.daxpay.mq-type", havingValue = "rocket")
public class PayRocketMqMsgSender implements PayMqMsgSender {
private final RocketMQTemplate rocketMQTemplate;
/**
* 实时推送MQ消息
*/
@Override
public void send(PayEvent msg) {
rocketMQTemplate.syncSend(msg.getQueueName(),msg.toMessage());
}
/**
* 推送MQ延迟消息
*/
@Override
public void send(PayEvent msg, int delay) {
rocketMQTemplate.syncSend(msg.getQueueName(),msg.toMessage(),delay);
}
}

View File

@@ -0,0 +1,41 @@
package cn.bootx.platform.daxpay.mq.vender.spring.sender;
import cn.bootx.platform.daxpay.mq.PayMqMsgSender;
import cn.bootx.platform.daxpay.mq.event.PayEvent;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Service;
/**
* Spring 事件方式
* @author xxm
* @since 2023/7/17
*/
@Service
@RequiredArgsConstructor
@ConditionalOnProperty(name ="bootx.daxpay.mq-type", havingValue = "spring", matchIfMissing = true)
public class PaySpringMsgSender implements PayMqMsgSender {
private final ApplicationEventPublisher applicationEventPublisher;
/**
* 实时推送MQ消息
*
* @param msg 消息
*/
@Override
public void send(PayEvent msg) {
applicationEventPublisher.publishEvent(msg);
}
/**
* 推送MQ延迟消息
*
* @param msg 消息
* @param delay 延迟时间, 单位秒
*/
@Override
public void send(PayEvent msg, int delay) {
applicationEventPublisher.publishEvent(msg);
}
}

View File

@@ -42,6 +42,7 @@
<lombok-mapstruct.version>0.2.0</lombok-mapstruct.version>
<mybatis-table-modify.version>1.5.3</mybatis-table-modify.version>
<wxjava.version>4.5.2.B</wxjava.version>
<rocketmq.version>2.2.3</rocketmq.version>
<lock4j.version>2.2.4</lock4j.version>
</properties>
<!-- 项目依赖版本管理 -->