RabbitMQ消息队列在SpringBoot下的实践

RabbitMQ是一个基于AMQP协议(Advanced Message Queuing Protocol,高级消息队列协议)的开源消息中间件,用于程序之间的消息、数据传递服务。常见的场景是将程序中原有的耗时较长的调用拆分为前-后两部分,即,生产者和消费者。生产者负责向消息队列发送待处理数据,而消费者则从消息队列接收数据,完成原有程序中的长耗时的处理。异步处理的方式,一方面提高了生产者的即时响应能力和整体的处理效率,另一方面也可以实现程序之间的解耦。本文将介绍如何SpringBoot下实践RabbitMQ消息队列

abstarct

配置RabbitMQ

1. 添加RabbitMQ依赖

在pom.xml中添加RabbitMQ依赖

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2. 添加RabbitMQ配置

在application.properties配置文件中添加RabbitMQ服务的相关配置

1
2
3
4
5
6
7
8
9
# RabbitMQ 配置
# RabbitMQ 服务器地址
spring.rabbitmq.host=127.0.0.1
# RabbitMQ 服务器端口
spring.rabbitmq.port=5672
# RabbitMQ 用户名
spring.rabbitmq.username=zgh
# RabbitMQ 密码
spring.rabbitmq.password=zgh123456

实践

Topic Exchange

1. 写RabbitMQ Config类

在RabbitMQ Config类中声明主题交换机、消息队列,并通过BindingKey实现主题交换机和消息队列的绑定

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
@Configuration
public class TopicExchangeRabbitMQConfig {

public static final String topicExchangeName = "topicExchange1";

private static final String queue1BindingKey1 = "black.big.*";
private static final String queue1BindingKey2 = "black.*.cat";
private static final String queue2BindingKey = "*.small.*";
private static final String queue3BindingKey = "#";

// 声明主题交换机
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(topicExchangeName);
}

// 声明消息队列
@Bean
public Queue messageQueue1() {
return new Queue("queue1");
}

@Bean
public Queue messageQueue2() {
return new Queue("queue2");
}

@Bean
public Queue messageQueue3() {
return new Queue("queue3");
}

// 向主题交换机上绑定队列
@Bean
Binding bindingQueue1Exchange1(Queue messageQueue1, TopicExchange topicExchange) {
return BindingBuilder.bind( messageQueue1 )
.to( topicExchange )
.with( queue1BindingKey1 );
}

@Bean
Binding bindingQueue1Exchange2(Queue messageQueue1, TopicExchange topicExchange) {
return BindingBuilder.bind( messageQueue1 )
.to( topicExchange )
.with( queue1BindingKey2 );
}

@Bean
Binding bindingQueue2Exchange(Queue messageQueue2, TopicExchange topicExchange) {
return BindingBuilder.bind( messageQueue2 )
.to( topicExchange )
.with( queue2BindingKey );
}

@Bean
Binding bindingQueue3Exchange(Queue messageQueue3, TopicExchange topicExchange) {
return BindingBuilder.bind( messageQueue3 )
.to( topicExchange )
.with( queue3BindingKey );
}
}

2. 写生产者、消费者

在Spring AMQP 中提供了RabbitTemplate实例用于操作RabbitMQ,故在生产者方法中直接注入使用即可

1
2
3
4
5
6
7
8
9
@Component
public class TopicExchangeProducer {
@Autowired
private AmqpTemplate rabbitMQTemplate;

public void send(String msg, String routingKey) {
rabbitMQTemplate.convertAndSend(TopicExchangeRabbitMQConfig.topicExchangeName, routingKey, msg);
}
}

消费者类的方法上通过@RabbitListener注解用于监听指定名称的消息队列,而标注在方法上的@RabbitHandler则用于指示从指定消息队列收到消息后,使用该方法进行处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import cn.hutool.json.JSONUtil;
...
@Component
public class TopicExchangeConsumer {

@RabbitListener(queues = "queue1")
@RabbitHandler
public void getQueue1Message(String msg) {
Animal animal = JSONUtil.toBean( msg, Animal.class );
System.out.println("Queue 1 Receive [black.big.*, black.*.cat] , Msg: " + animal);
}

@RabbitListener(queues = "queue2")
@RabbitHandler
public void getQueue2Message(String msg) {
Animal animal = JSONUtil.toBean( msg, Animal.class );
System.out.println("Queue 2 Receive [*.small.*] , Msg: " + animal);
}

@RabbitListener(queues = "queue3")
@RabbitHandler
public void getQueue3Message(String msg) {
Animal animal = JSONUtil.toBean( msg, Animal.class );
System.out.println("Queue 3 Receive [#] , Msg: " + animal);
}
}

[Note]: 生产者、消费者对对象数据传递采用JSON序列化及反序列

3. 测试用例

这里使用HTTP请求来触发测试 Topic Exchange 的用例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
...
@RestController
@RequestMapping("testRabbitMQ")
public class RabbitMQTestController {

@Autowired
private TopicExchangeProducer topicExchangeProducer;

@GetMapping("testTopicExchange")
public String testTopicExchange() {
// Test 1
Animal animal1 = new Animal("black","big","cat");
String routingKey1 = animal1.getColor() + "." + animal1.getSize() + "." + animal1.getKind();
JSONObject jsonObject1 = JSONUtil.parseObj(animal1);
topicExchangeProducer.send(jsonObject1.toString(), routingKey1);

// Test 2
Animal animal2 = new Animal("red","small","pig");
String routingKey2 = animal2.getColor() + "." + animal2.getSize() + "." + animal2.getKind();
JSONObject jsonObject2 = JSONUtil.parseObj(animal2);
topicExchangeProducer.send(jsonObject2.toString(), routingKey2);

// Test 3
Animal animal3 = new Animal("blue","medium","dog");
String routingKey3 = null;
JSONObject jsonObject3 = JSONUtil.parseObj(animal3);
topicExchangeProducer.send(jsonObject3.toString(), routingKey3);

return "test TopicExchange success";
}
}
  • Test1 发送的消息为: animal1 对象,其routingKey:”black.big.cat”。根据Topic Exchange的路由规则,满足条件的只有Queue1 和 Queue3,测试结果符合预期,如下图黄框所示。值得一提的是,虽然绑定在交换机上的Queue1队列的两个BindingKey均满足路由规则,但是交换机对于一个消息也只会向同一个消息队列仅转发一次
  • Test2 发送的消息为: animal2 对象,其routingKey:”red.small.pig”。根据Topic Exchange的路由规则,满足条件的只有Queue2 和 Queue3,测试结果符合预期,如下图红框所示
  • Test3 发送的消息为: animal3 对象,其routingKey为空。根据Topic Exchange的路由规则,满足条件的只有Queue3,测试结果符合预期,如下图白框所示

figure 1.jpeg

Direct Exchange

1. 写RabbitMQ Config类

在RabbitMQ Config类中声明直连交换机、消息队列,并通过BindingKey实现直连交换机和消息队列的绑定

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@Configuration
public class DirectExchangeRabbitMQConfig {

public static final String directExchangeName = "directExchange1";

private static final String queue4BindingKey1 = "big";
private static final String queue4BindingKey2 = "small";
private static final String queue5BindingKey = "cat";

// 声明直连交换机
@Bean
public DirectExchange directExchange() {
return new DirectExchange(directExchangeName);
}

// 声明消息队列
@Bean
public Queue messageQueue4() {
return new Queue("queue4");
}

@Bean
public Queue messageQueue5() {
return new Queue("queue5");
}

// 向直连交换机上绑定队列
@Bean
Binding bindingQueue4Exchange1(Queue messageQueue4, DirectExchange directExchange) {
return BindingBuilder.bind( messageQueue4 )
.to( directExchange )
.with( queue4BindingKey1 );
}

@Bean
Binding bindingQueue4Exchange2(Queue messageQueue4, DirectExchange directExchange) {
return BindingBuilder.bind( messageQueue4 )
.to( directExchange )
.with( queue4BindingKey2 );
}

@Bean
Binding bindingQueue5Exchange(Queue messageQueue5, DirectExchange directExchange) {
return BindingBuilder.bind( messageQueue5 )
.to( directExchange )
.with( queue5BindingKey );
}
}

2. 写生产者、消费者

在Spring AMQP 中提供了RabbitTemplate实例用于操作RabbitMQ,故在生产者方法中直接注入使用即可

1
2
3
4
5
6
7
8
9
10
@Component
public class DirectExchangeProducer {

@Autowired
private AmqpTemplate rabbitMQTemplate;

public void send(String msg, String routingKey) {
rabbitMQTemplate.convertAndSend(DirectExchangeRabbitMQConfig.directExchangeName, routingKey, msg);
}
}

消费者类的方法上通过@RabbitListener注解用于监听指定名称的消息队列,而标注在方法上的@RabbitHandler则用于指示从指定消息队列收到消息后,使用该方法进行处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import cn.hutool.json.JSONUtil;
...
@Component
public class DirectExchangeConsumer {

@RabbitListener(queues = "queue4")
@RabbitHandler
public void getQueue1Message(String msg) {
Animal animal = JSONUtil.toBean( msg, Animal.class );
System.out.println("Queue 4 Receive [big, small] , Msg: " + animal);
}

@RabbitListener(queues = "queue5")
@RabbitHandler
public void getQueue2Message(String msg) {
Animal animal = JSONUtil.toBean( msg, Animal.class );
System.out.println("Queue 5 Receive [cat] , Msg: " + animal);
}
}

[Note]: 生产者、消费者对对象数据传递采用JSON序列化及反序列

3. 测试用例

这里使用HTTP请求来触发测试 Direct Exchange 的用例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
...
@RestController
@RequestMapping("testRabbitMQ")
public class RabbitMQTestController {

@Autowired
private DirectExchangeProducer directExchangeProducer;

@GetMapping("testDirectExchange")
public String testDirectExchange() {
// Test 1
Animal animal1 = new Animal("black","big","dog");
String routingKey1 = "big";
JSONObject jsonObject1 = JSONUtil.parseObj(animal1);
directExchangeProducer.send(jsonObject1.toString(), routingKey1);

// Test 2
Animal animal2 = new Animal("red","small","dog");
String routingKey2 = "small";
JSONObject jsonObject2 = JSONUtil.parseObj(animal2);
directExchangeProducer.send(jsonObject2.toString(), routingKey2);

// Test 3
Animal animal3 = new Animal("blue","medium","cat");
String routingKey3 = "cat";
JSONObject jsonObject3 = JSONUtil.parseObj(animal3);
directExchangeProducer.send(jsonObject3.toString(), routingKey3);

return "test DirectExchange success";
}
}
  • Test1 发送的消息为: animal1 对象,其routingKey:”big”。根据Direct Exchange的路由规则,满足条件的只有Queue4,测试结果符合预期,如下图红框所示
  • Test2 发送的消息为: animal2 对象,其routingKey:”small”。根据Direct Exchange的路由规则,满足条件的只有Queue4,测试结果符合预期,如下图黄框所示
  • Test3 发送的消息为: animal3 对象,其routingKey:”cat”。根据Direct Exchange的路由规则,满足条件的只有Queue5,测试结果符合预期,如下图白框所示

figure 2.jpeg

Fanout Exchange

在RabbitMQ Config类中声明扇形交换机、消息队列,并将扇形交换机和消息队列的绑定

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Configuration
public class FanoutExchangeRabbitMQConfig {

public static final String fanoutExchangeName = "fanoutExchange1";

// 声明扇形交换机
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(fanoutExchangeName);
}

// 声明消息队列
@Bean
public Queue messageQueue6() {
return new Queue("queue6");
}

@Bean
public Queue messageQueue7() {
return new Queue("queue7");
}

// 向扇形交换机上绑定队列
@Bean
Binding bindingQueue6Exchange1(Queue messageQueue6, FanoutExchange fanoutExchange) {
return BindingBuilder.bind( messageQueue6 )
.to( fanoutExchange );
}

@Bean
Binding bindingQueue7Exchange(Queue messageQueue7, FanoutExchange fanoutExchange) {
return BindingBuilder.bind( messageQueue7 )
.to( fanoutExchange );
}
}

2. 写生产者、消费者

在Spring AMQP 中提供了RabbitTemplate实例用于操作RabbitMQ,故在生产者方法中直接注入使用即可

1
2
3
4
5
6
7
8
9
10
@Component
public class FanoutExchangeProducer {

@Autowired
private AmqpTemplate rabbitMQTemplate;

public void send(String msg, String routingKey) {
rabbitMQTemplate.convertAndSend(FanoutExchangeRabbitMQConfig.fanoutExchangeName, routingKey, msg);
}
}

消费者类的方法上通过@RabbitListener注解用于监听指定名称的消息队列,而标注在方法上的@RabbitHandler则用于指示从指定消息队列收到消息后,使用该方法进行处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import cn.hutool.json.JSONUtil;
...
@Component
public class FanoutExchangeConsumer {

@RabbitListener(queues = "queue6")
@RabbitHandler
public void getQueue1Message(String msg) {
Animal animal = JSONUtil.toBean( msg, Animal.class );
System.out.println("Queue 6 Receive , Msg: " + animal);
}

@RabbitListener(queues = "queue7")
@RabbitHandler
public void getQueue2Message(String msg) {
Animal animal = JSONUtil.toBean( msg, Animal.class );
System.out.println("Queue 7 Receive , Msg: " + animal);
}
}

[Note]: 生产者、消费者对对象数据传递采用JSON序列化及反序列

3. 测试用例

这里使用HTTP请求来触发测试 Fanout Exchange 的用例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
...
@RestController
@RequestMapping("testRabbitMQ")
public class RabbitMQTestController {

@Autowired
private FanoutExchangeProducer fanoutExchangeProducer;

@GetMapping("testFanoutExchange")
public String testFanoutExchange() {
// Test 1
Animal animal1 = new Animal("black","big","dog");
String routingKey1 = "big";
JSONObject jsonObject1 = JSONUtil.parseObj(animal1);
fanoutExchangeProducer.send(jsonObject1.toString(), routingKey1);

// Test 2
Animal animal2 = new Animal("red","small","dog");
String routingKey2 = null;
JSONObject jsonObject2 = JSONUtil.parseObj(animal2);
fanoutExchangeProducer.send(jsonObject2.toString(), routingKey2);

return "test FanoutExchange success";
}
}
  • Test1 发送的消息为: animal1 对象,其routingKey:”big”。根据Fanout Exchange的路由规则,满足条件的有Queue6、Queue7,测试结果符合预期,如下图红框所示
  • Test2 发送的消息为: animal2 对象,其routingKey为空。根据Fanout Exchange的路由规则,满足条件的有Queue6、Queue7,测试结果符合预期,如下图白框所示

figure 3.jpeg

0%