前言
本文概括
对RabbitMQ的核心知识点进行详细的介绍
一、RabbitMQ 基础概念
RabbitMQ 是基于 AMQP 协议的消息中间件,核心组件包括:
- 生产者(Producer):发送消息的应用
- 消费者(Consumer):接收并处理消息的应用
- 队列(Queue):存储消息的容器,消息在队列中是先进先出
- 交换机(Exchange):接收生产者消息,根据路由规则将消息路由到一个或多个队列
- 路由键(Routing Key):交换机路由消息的依据
- 绑定(Binding):交换机与队列之间的关联规则
二、同步调用 vs 异步调用
- 维度 同步调用 异步调用(RabbitMQ)
- 执行方式 请求 - 阻塞 - 等待响应 发送消息后立即返回,不阻塞
- 耦合性 强耦合(调用方需知道被调用方地址) 弱耦合(通过 MQ 解耦)
- 可靠性 依赖被调用方可用性 消息持久化保障可靠性
- 场景 实时性要求高、逻辑强依赖 异步任务、流量削峰、系统解耦
Java 示例:同步 vs 异步调用对比
1 | // 同步调用示例(伪代码) |
三、RabbitMQ 技术选型考量
选择 RabbitMQ 需考虑以下因素:
- 业务场景:异步任务、流量削峰、系统解耦、最终一致性
- 性能要求:RabbitMQ 单机可支持万级 TPS,可通过集群水平扩展
- 可靠性要求:提供持久化、Confirm、Return、死信等机制保障消息不丢失
- 生态与社区:Spring 生态深度集成,文档和解决方案丰富
- 运维成本:部署简单,支持 Docker 容器化,监控工具完善
四、Work 模型(工作队列)
Work 模型用于多个消费者竞争消费同一个队列的消息,实现任务的并行处理。
核心特点
- 队列中消息被多个消费者瓜分,每个消息仅被一个消费者处理
- 可通过 prefetch 控制消费者每次预取的消息数,避免消息堆积
Java 实战代码1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42// 配置类
public class WorkQueueConfig {
// 声明工作队列
public Queue workQueue() {
return new Queue("work.queue", true); // 持久化队列
}
}
// 生产者
public class WorkProducer {
private RabbitTemplate rabbitTemplate;
public void sendWorkMessage(int count) {
for (int i = 0; i < count; i++) {
rabbitTemplate.convertAndSend("work.queue", "Work 模型消息 " + i);
}
}
}
// 消费者1
public class WorkConsumer1 {
public void consume(String message) throws InterruptedException {
System.out.println("消费者1 处理消息:" + message);
Thread.sleep(1000); // 模拟处理耗时
}
}
// 消费者2
public class WorkConsumer2 {
public void consume(String message) throws InterruptedException {
System.out.println("消费者2 处理消息:" + message);
Thread.sleep(2000); // 模拟处理耗时
}
}
五、交换机(Exchange)类型
RabbitMQ 有四种交换机类型,每种类型路由规则不同。
Direct 交换机(精确匹配)
路由规则:消息的 routingKey 与队列的 bindingKey 完全匹配时,消息路由到该队列1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43// 配置类
public class DirectExchangeConfig {
// 声明 Direct 交换机
public DirectExchange directExchange() {
return new DirectExchange("direct.exchange", true, false);
}
// 声明队列
public Queue directQueue1() {
return new Queue("direct.queue1", true);
}
public Queue directQueue2() {
return new Queue("direct.queue2", true);
}
// 绑定交换机与队列
public Binding directBinding1() {
return BindingBuilder.bind(directQueue1()).to(directExchange()).with("direct.key1");
}
public Binding directBinding2() {
return BindingBuilder.bind(directQueue2()).to(directExchange()).with("direct.key2");
}
}
// 生产者
public class DirectProducer {
private RabbitTemplate rabbitTemplate;
public void sendDirectMessage() {
rabbitTemplate.convertAndSend("direct.exchange", "direct.key1", "Direct 消息,路由到 queue1");
rabbitTemplate.convertAndSend("direct.exchange", "direct.key2", "Direct 消息,路由到 queue2");
}
}Fanout 交换机(广播)
路由规则:忽略 routingKey,将消息路由到所有绑定的队列1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40// 配置类
public class FanoutExchangeConfig {
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanout.exchange", true, false);
}
public Queue fanoutQueue1() {
return new Queue("fanout.queue1", true);
}
public Queue fanoutQueue2() {
return new Queue("fanout.queue2", true);
}
public Binding fanoutBinding1() {
return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
}
public Binding fanoutBinding2() {
return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
}
}
// 生产者
public class FanoutProducer {
private RabbitTemplate rabbitTemplate;
public void sendFanoutMessage() {
// routingKey 可忽略,消息会路由到所有绑定的队列
rabbitTemplate.convertAndSend("fanout.exchange", "", "Fanout 广播消息");
}
}- Topic 交换机(模糊匹配)
路由规则:routingKey 和 bindingKey 支持通配符(* 匹配一个单词,# 匹配零个或多个单词)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42// 配置类
public class TopicExchangeConfig {
public TopicExchange topicExchange() {
return new TopicExchange("topic.exchange", true, false);
}
public Queue topicQueue1() {
return new Queue("topic.queue1", true);
}
public Queue topicQueue2() {
return new Queue("topic.queue2", true);
}
// 匹配 user.* 格式的 routingKey
public Binding topicBinding1() {
return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("user.*");
}
// 匹配 user.# 格式的 routingKey
public Binding topicBinding2() {
return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("user.#");
}
}
// 生产者
public class TopicProducer {
private RabbitTemplate rabbitTemplate;
public void sendTopicMessage() {
rabbitTemplate.convertAndSend("topic.exchange", "user.login", "用户登录消息,路由到 queue1 和 queue2");
rabbitTemplate.convertAndSend("topic.exchange", "user.order.create", "用户创建订单消息,仅路由到 queue2");
}
} - Headers 交换机(头部匹配)
路由规则:根据消息的 headers 属性匹配,性能较差,实际使用较少
六、声明队列和交换机
在 Java 中,声明队列和交换机有两种方式:显式声明(通过代码)和隐式声明(通过 Spring 注解自动声明)。
显式声明(推荐)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41// 配置类
public class DeclareConfig {
// 声明交换机
public DirectExchange customExchange() {
// 参数:名称、是否持久化、是否自动删除、其他参数
return new DirectExchange("custom.exchange", true, false, null);
}
// 声明队列
public Queue customQueue() {
// 参数:名称、是否持久化、是否排他、是否自动删除、其他参数
return QueueBuilder.durable("custom.queue")
.exclusive(false)
.autoDelete(false)
.build();
}
// 绑定交换机与队列
public Binding customBinding() {
return BindingBuilder.bind(customQueue()).to(customExchange()).with("custom.key");
}
}
隐式声明(通过消费者注解)
java
运行
public class ImplicitDeclareConsumer {
// 监听时自动声明队列、交换机并绑定
public void consume(String message) {
System.out.println("隐式声明的消费者:" + message);
}
}
七、消息转换器
消息转换器用于 Java 对象与字节数组(MQ 传输格式)之间的转换,推荐使用 Jackson2JsonMessageConverter。
配置消息转换器1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53// 配置类
public class MessageConverterConfig {
public Jackson2JsonMessageConverter messageConverter() {
ObjectMapper objectMapper = new ObjectMapper();
// 支持 Java 8 时间类型
objectMapper.registerModule(new JavaTimeModule());
// 忽略空值字段
objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
return new Jackson2JsonMessageConverter(objectMapper);
}
// 配置 RabbitTemplate 使用自定义消息转换器
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter messageConverter) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(messageConverter);
return rabbitTemplate;
}
}
实体类示例
java
运行
public class Order {
private Long orderId;
private String userId;
private BigDecimal amount;
private LocalDateTime createTime;
}
生产者使用
java
运行
public class ConverterProducer {
private RabbitTemplate rabbitTemplate;
public void sendOrder(Order order) {
rabbitTemplate.convertAndSend("converter.exchange", "converter.key", order);
}
}
消费者使用
java
运行
public class ConverterConsumer {
public void consume(Order order) {
System.out.println("收到订单:" + order);
}
}
八、生产者可靠性保障
- Confirm 机制(消息到达交换机确认)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56// 配置类
public class ProducerReliabilityConfig {
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter messageConverter) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(messageConverter);
// 开启 Confirm 机制
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
System.out.println("消息已成功到达交换机,correlationId:" + correlationData.getId());
} else {
System.out.println("消息到达交换机失败,原因:" + cause);
// 实现重试逻辑
}
});
return rabbitTemplate;
}
}
// 生产者发送带 correlationId 的消息
public class ConfirmProducer {
private RabbitTemplate rabbitTemplate;
public void sendConfirmMessage() {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("confirm.exchange", "confirm.key", "Confirm 测试消息", correlationData);
}
}
2. Return 机制(消息路由到队列失败返回)
java
运行
public class ProducerReliabilityConfig {
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter messageConverter) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(messageConverter);
// 开启 Return 机制
rabbitTemplate.setReturnsCallback(returnedMessage -> {
System.out.println("消息路由队列失败,交换机:" + returnedMessage.getExchange());
System.out.println("路由键:" + returnedMessage.getRoutingKey());
System.out.println("回复码:" + returnedMessage.getReplyCode());
System.out.println("回复文本:" + returnedMessage.getReplyText());
System.out.println("消息体:" + new String(returnedMessage.getMessage().getBody()));
// 实现路由失败的处理逻辑
});
return rabbitTemplate;
}
}
九、消费者可靠性保障
- 手动 ACK 与重试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42// 配置类
public class ConsumerReliabilityConfig {
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 手动 ACK
factory.setDefaultRequeueRejected(false); // 消费失败不重新入队(避免死循环)
factory.setRetryTemplate(buildRetryTemplate()); // 配置重试
return factory;
}
private RetryTemplate buildRetryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
// 重试策略:最多重试 3 次,每次间隔 1 秒
SimpleRetryPolicy policy = new SimpleRetryPolicy(3, Collections.singletonMap(Exception.class, true));
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(1000);
retryTemplate.setRetryPolicy(policy);
retryTemplate.setBackOffPolicy(backOffPolicy);
return retryTemplate;
}
}
// 消费者示例
public class ManualAckConsumer {
public void consume(String message, Channel channel, Message msg) throws IOException {
try {
System.out.println("处理消息:" + message);
// 业务逻辑处理...
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false); // 手动确认
} catch (Exception e) {
System.out.println("处理消息异常,准备重试或拒绝:" + e.getMessage());
// 若开启了重试,此处无需手动拒绝,由重试机制处理
// 若未开启重试,可手动拒绝并路由到死信队列
// channel.basicReject(msg.getMessageProperties().getDeliveryTag(), false);
}
}
}
2. 死信交换机(DLX)
死信交换机用于接收被拒绝、过期、队列满了的消息,实现消息的最终兜底处理。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55// 配置类
public class DeadLetterConfig {
// 死信交换机
public DirectExchange dlxExchange() {
return new DirectExchange("dlx.exchange", true, false);
}
// 死信队列
public Queue dlxQueue() {
return new Queue("dlx.queue", true);
}
// 死信队列与死信交换机绑定
public Binding dlxBinding() {
return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("dlx.key");
}
// 业务队列(配置死信参数)
public Queue businessQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx.exchange"); // 绑定死信交换机
args.put("x-dead-letter-routing-key", "dlx.key"); // 死信路由键
args.put("x-message-ttl", 60000); // 消息过期时间(可选)
return QueueBuilder.durable("business.queue")
.withArguments(args)
.build();
}
// 业务交换机
public DirectExchange businessExchange() {
return new DirectExchange("business.exchange", true, false);
}
// 业务队列与业务交换机绑定
public Binding businessBinding() {
return BindingBuilder.bind(businessQueue()).to(businessExchange()).with("business.key");
}
}
// 死信消费者
public class DlxConsumer {
public void consume(String message) {
System.out.println("收到死信消息:" + message);
// 处理死信逻辑,如记录日志、人工干预
}
}
十、延迟消息
延迟消息指消息发送后,在指定时间后才会被消费者消费。RabbitMQ 实现延迟消息有两种方式:
方式一:死信交换机 + TTL(推荐)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69// 配置类
public class DelayMessageConfig {
// 死信交换机(用于延迟消息的最终路由)
public DirectExchange dlxExchange() {
return new DirectExchange("delay.dlx.exchange", true, false);
}
// 死信队列
public Queue dlxQueue() {
return new Queue("delay.dlx.queue", true);
}
// 死信队列与死信交换机绑定
public Binding dlxBinding() {
return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("delay.dlx.key");
}
// 延迟交换机(用于接收延迟消息)
public DirectExchange delayExchange() {
return new DirectExchange("delay.exchange", true, false);
}
// 延迟队列(配置 TTL 和死信参数)
public Queue delayQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "delay.dlx.exchange"); // 绑定死信交换机
args.put("x-dead-letter-routing-key", "delay.dlx.key"); // 死信路由键
return QueueBuilder.durable("delay.queue")
.withArguments(args)
.build();
}
// 延迟队列与延迟交换机绑定
public Binding delayBinding() {
return BindingBuilder.bind(delayQueue()).to(delayExchange()).with("delay.key");
}
}
// 延迟消息生产者
public class DelayProducer {
private RabbitTemplate rabbitTemplate;
public void sendDelayMessage(String message, int delaySeconds) {
MessageProperties properties = new MessageProperties();
// 设置消息过期时间(毫秒)
properties.setExpiration(String.valueOf(delaySeconds * 1000));
Message msg = new Message(message.getBytes(), properties);
rabbitTemplate.send("delay.exchange", "delay.key", msg);
}
}
// 延迟消息消费者(死信消费者)
public class DelayConsumer {
public void consume(String message) {
System.out.println("收到延迟消息(" + new Date() + "):" + message);
// 处理延迟消息逻辑
}
}
方式二:延迟插件(rabbitmq_delayed_message_exchange)
需先安装延迟插件,然后使用 x-delayed-type 类型的交换机。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47// 配置类
public class DelayedMessagePluginConfig {
// 声明延迟交换机(类型为 x-delayed-message)
public CustomExchange delayedExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct"); // 底层路由类型
return new CustomExchange("delayed.exchange", "x-delayed-message", true, false, args);
}
// 声明延迟队列
public Queue delayedQueue() {
return new Queue("delayed.queue", true);
}
// 绑定交换机与队列
public Binding delayedBinding() {
return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with("delayed.key").noargs();
}
}
// 延迟消息生产者
public class DelayedPluginProducer {
private RabbitTemplate rabbitTemplate;
public void sendDelayedMessage(String message, int delaySeconds) {
MessageProperties properties = new MessageProperties();
// 设置延迟时间(毫秒)
properties.setDelay(delaySeconds * 1000);
Message msg = new Message(message.getBytes(), properties);
rabbitTemplate.send("delayed.exchange", "delayed.key", msg);
}
}
// 延迟消息消费者
public class DelayedPluginConsumer {
public void consume(String message) {
System.out.println("收到延迟消息(插件方式):" + message);
}
}
十一、延迟消息取消订单实战
模拟电商场景:用户下单后 30 分钟未支付,自动取消订单;若用户在 30 分钟内支付,取消延迟的取消订单操作。
实体类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16// 订单实体类
public class Order {
private Long orderId;
private String userId;
private BigDecimal amount;
private Integer status; // 0-待支付,1-已支付,2-已取消
private LocalDateTime createTime;
}
// 取消订单消息
public class CancelOrderMessage {
private Long orderId;
private String correlationId; // 唯一标识,用于取消延迟消息
}配置类(死信 + TTL 方式)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45// 取消订单配置类
public class CancelOrderConfig {
// 死信交换机
public DirectExchange cancelDlxExchange() {
return new DirectExchange("cancel.order.dlx.exchange", true, false);
}
// 死信队列
public Queue cancelDlxQueue() {
return new Queue("cancel.order.dlx.queue", true);
}
// 死信绑定
public Binding cancelDlxBinding() {
return BindingBuilder.bind(cancelDlxQueue()).to(cancelDlxExchange()).with("cancel.order.dlx.key");
}
// 延迟交换机
public DirectExchange cancelDelayExchange() {
return new DirectExchange("cancel.order.delay.exchange", true, false);
}
// 延迟队列(配置 TTL 和死信)
public Queue cancelDelayQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "cancel.order.dlx.exchange");
args.put("x-dead-letter-routing-key", "cancel.order.dlx.key");
args.put("x-message-ttl", 30 * 60 * 1000); // 30 分钟过期
return QueueBuilder.durable("cancel.order.delay.queue")
.withArguments(args)
.build();
}
// 延迟绑定
public Binding cancelDelayBinding() {
return BindingBuilder.bind(cancelDelayQueue()).to(cancelDelayExchange()).with("cancel.order.delay.key");
}
}生产者(创建订单时发送延迟取消消息)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45// 订单生产者
public class OrderProducer {
private RabbitTemplate rabbitTemplate;
/**
* 创建订单并发送延迟取消消息
*/
public String createOrder(Order order) {
// 生成唯一 correlationId,用于后续取消延迟消息
String correlationId = UUID.randomUUID().toString();
// 保存订单到数据库(状态:待支付)
order.setStatus(0);
order.setCreateTime(LocalDateTime.now());
orderMapper.insert(order); // 假设存在 orderMapper
// 发送延迟取消订单消息
CancelOrderMessage cancelMsg = new CancelOrderMessage();
cancelMsg.setOrderId(order.getOrderId());
cancelMsg.setCorrelationId(correlationId);
CorrelationData correlationData = new CorrelationData(correlationId);
rabbitTemplate.convertAndSend(
"cancel.order.delay.exchange",
"cancel.order.delay.key",
cancelMsg,
correlationData
);
return correlationId;
}
/**
* 用户支付后,取消延迟的取消订单消息
*/
public void payOrder(Long orderId, String correlationId) {
// 更新订单状态为已支付
orderMapper.updateStatus(orderId, 1);
// 取消延迟的取消订单消息
rabbitTemplate.cancel(correlationId);
}
}消费者(处理延迟取消订单)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21// 取消订单消费者
public class CancelOrderConsumer {
private OrderMapper orderMapper;
public void consume(CancelOrderMessage message) {
// 查询订单
Order order = orderMapper.selectById(message.getOrderId());
if (order == null) {
return;
}
// 只有待支付状态才取消订单
if (order.getStatus() == 0) {
orderMapper.updateStatus(order.getOrderId(), 2);
System.out.println("订单 " + order.getOrderId() + " 因超时未支付已取消");
}
}
}监听延迟消息(可选,用于监控)
1
2
3
4
5
6
7
8
9
10
11// 延迟消息监听
public class DelayMessageListener {
public void listen(CancelOrderMessage message, Channel channel, Message msg) throws IOException {
// 此方法实际不会被调用,因为消息会直接过期路由到死信队列
// 若需监控延迟消息,可在此处记录日志
System.out.println("监控到延迟消息:" + message);
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
}
}
