mirror of
https://gitee.com/dromara/RuoYi-Cloud-Plus.git
synced 2025-11-28 01:00:05 +08:00
update 优化 完善 rocketmq 相关演示案例
This commit is contained in:
@@ -2,13 +2,10 @@
|
|||||||
|
|
||||||
## 模块说明
|
## 模块说明
|
||||||
|
|
||||||
1. rabbit: 普通消息、延迟队列
|
1. rabbitmq: 普通消息、延迟队列
|
||||||
2. rocket:普通消息、事务消息
|
2. rocketmq:普通消息、事务消息、延迟消息
|
||||||
3. kafka:普通消息、stream流的使用
|
3. kafka:普通消息、stream流的使用
|
||||||
|
|
||||||
后续可实现的:
|
|
||||||
|
|
||||||
1. rocket 顺序、异步、延时等
|
|
||||||
|
|
||||||
## 使用方式
|
## 使用方式
|
||||||
|
|
||||||
@@ -23,7 +20,7 @@ sh mqadmin updateTopic -n <nameserver_address> -t <topic_name> -c <cluster_name>
|
|||||||
```
|
```
|
||||||
|
|
||||||
```shell
|
```shell
|
||||||
bin/mqadmin updatetopic -n localhost:9876 -t TestTopic -c DefaultCluster
|
bin/mqadmin updatetopic -n localhost:9876 -t test-topic -c DefaultCluster
|
||||||
```
|
```
|
||||||
|
|
||||||
创建事务消息的topic
|
创建事务消息的topic
|
||||||
@@ -33,7 +30,7 @@ sh mqadmin updateTopic -n <nameserver_address> -t <topic_name> -c <cluster_name>
|
|||||||
```
|
```
|
||||||
|
|
||||||
```shell
|
```shell
|
||||||
bin/mqadmin updatetopic -n localhost:9876 -t transaction_topic -c DefaultCluster -a +message.type=TRANSACTION
|
bin/mqadmin updatetopic -n localhost:9876 -t transaction-topic -c DefaultCluster -a +message.type=TRANSACTION
|
||||||
```
|
```
|
||||||
|
|
||||||
kafka:
|
kafka:
|
||||||
@@ -43,5 +40,5 @@ kafka-topics.sh --create --topic <topic_name> --bootstrap-server <broker_list> -
|
|||||||
```
|
```
|
||||||
|
|
||||||
```shell
|
```shell
|
||||||
kafka-topics.sh --create --topic my_topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
|
kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
|
||||||
```
|
```
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package org.dromara.stream.consumer;
|
package org.dromara.stream.consumer;
|
||||||
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.rocketmq.common.message.MessageExt;
|
||||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
@@ -11,10 +12,12 @@ import org.springframework.stereotype.Component;
|
|||||||
**/
|
**/
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@Component
|
@Component
|
||||||
@RocketMQMessageListener(topic = "TestTopic", consumerGroup = "springboot-mq-consumer-1")
|
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "test-consumer-group")
|
||||||
public class NormalRocketConsumer implements RocketMQListener<String> {
|
public class NormalRocketConsumer implements RocketMQListener<MessageExt> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onMessage(String message) {
|
public void onMessage(MessageExt ext) {
|
||||||
log.info("【消费者】接收消息:{}" ,message);
|
log.info("【消费者】接收消息:消息体 => {}, tag => {}", new String(ext.getBody()), ext.getTags());
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -11,11 +11,12 @@ import org.springframework.stereotype.Component;
|
|||||||
**/
|
**/
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@Component
|
@Component
|
||||||
@RocketMQMessageListener(consumerGroup = "transaction-group", topic = "transaction_topic")
|
@RocketMQMessageListener(topic = "transaction-topic", consumerGroup = "transaction-group")
|
||||||
public class TransactionRocketConsumer implements RocketMQListener<String> {
|
public class TransactionRocketConsumer implements RocketMQListener<String> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onMessage(String message) {
|
public void onMessage(String message) {
|
||||||
log.info("【消费者】===>接收事务消息:{}",message);
|
log.info("【消费者】===>接收事务消息:{}",message);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,9 +1,10 @@
|
|||||||
package org.dromara.stream.producer;
|
package org.dromara.stream.producer;
|
||||||
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.apache.rocketmq.client.producer.SendResult;
|
|
||||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||||
|
import org.apache.rocketmq.spring.support.RocketMQHeaders;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.messaging.Message;
|
||||||
import org.springframework.messaging.support.MessageBuilder;
|
import org.springframework.messaging.support.MessageBuilder;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
@@ -19,7 +20,21 @@ public class NormalRocketProducer {
|
|||||||
private RocketMQTemplate rocketMQTemplate;
|
private RocketMQTemplate rocketMQTemplate;
|
||||||
|
|
||||||
public void sendMessage() {
|
public void sendMessage() {
|
||||||
SendResult sendResult = rocketMQTemplate.syncSend("TestTopic", MessageBuilder.withPayload("hello world test").build());
|
// 发送普通消息
|
||||||
log.info("发送普通同步消息-msg,syncSendMessage===>{}", sendResult);
|
// rocketMQTemplate.convertAndSend("test-topic", "test");
|
||||||
|
|
||||||
|
// 发送带tag的消息
|
||||||
|
Message<String> message = MessageBuilder.withPayload("test").setHeader(RocketMQHeaders.TAGS, "test-tag").build();
|
||||||
|
rocketMQTemplate.send("test-topic", message);
|
||||||
|
|
||||||
|
// 延迟消息
|
||||||
|
// RocketMQ预定义了一些延迟等级,每个等级对应不同的延迟时间范围。这些等级从1到18,分别对应1s、5s、10s、30s、1m、2m、3m、4m、5m、6m、7m、8m、9m、10m、20m、30m、1h、2h的延迟时间。
|
||||||
|
org.apache.rocketmq.common.message.Message msg = new org.apache.rocketmq.common.message.Message();
|
||||||
|
msg.setDelayTimeLevel(3);
|
||||||
|
try {
|
||||||
|
rocketMQTemplate.getProducer().send(msg);
|
||||||
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -36,10 +36,10 @@ spring:
|
|||||||
|
|
||||||
--- # rocketmq 配置
|
--- # rocketmq 配置
|
||||||
rocketmq:
|
rocketmq:
|
||||||
name-server: localhost:9876
|
name-server: localhost:9876
|
||||||
producer:
|
producer:
|
||||||
# 生产者组
|
# 生产者组
|
||||||
group: dist-test
|
group: dist-test
|
||||||
|
|
||||||
--- # nacos 配置
|
--- # nacos 配置
|
||||||
spring:
|
spring:
|
||||||
|
|||||||
Reference in New Issue
Block a user