前言

本文概括
对RabbitMQ的核心知识点进行详细的介绍

一、RabbitMQ 基础概念

RabbitMQ 是基于 AMQP 协议的消息中间件,核心组件包括:

  • 生产者(Producer):发送消息的应用
  • 消费者(Consumer):接收并处理消息的应用
  • 队列(Queue):存储消息的容器,消息在队列中是先进先出
  • 交换机(Exchange):接收生产者消息,根据路由规则将消息路由到一个或多个队列
  • 路由键(Routing Key):交换机路由消息的依据
  • 绑定(Binding):交换机与队列之间的关联规则

二、同步调用 vs 异步调用

  • 维度 同步调用 异步调用(RabbitMQ)
  • 执行方式 请求 - 阻塞 - 等待响应 发送消息后立即返回,不阻塞
  • 耦合性 强耦合(调用方需知道被调用方地址) 弱耦合(通过 MQ 解耦)
  • 可靠性 依赖被调用方可用性 消息持久化保障可靠性
  • 场景 实时性要求高、逻辑强依赖 异步任务、流量削峰、系统解耦
    Java 示例:同步 vs 异步调用对比
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 同步调用示例(伪代码)
public class SyncService {
public Result callService() {
// 直接调用其他服务,阻塞等待响应
return otherService.doSomething();
}
}

// 异步调用示例(RabbitMQ)
@Component
public class AsyncProducer {
@Autowired
private RabbitTemplate rabbitTemplate;

public void sendAsyncMessage() {
// 发送消息后立即返回,不阻塞
rabbitTemplate.convertAndSend("async.exchange", "async.key", "异步消息内容");
}
}

三、RabbitMQ 技术选型考量

选择 RabbitMQ 需考虑以下因素:

  • 业务场景:异步任务、流量削峰、系统解耦、最终一致性
  • 性能要求:RabbitMQ 单机可支持万级 TPS,可通过集群水平扩展
  • 可靠性要求:提供持久化、Confirm、Return、死信等机制保障消息不丢失
  • 生态与社区:Spring 生态深度集成,文档和解决方案丰富
  • 运维成本:部署简单,支持 Docker 容器化,监控工具完善

四、Work 模型(工作队列)

Work 模型用于多个消费者竞争消费同一个队列的消息,实现任务的并行处理。
核心特点

  • 队列中消息被多个消费者瓜分,每个消息仅被一个消费者处理
  • 可通过 prefetch 控制消费者每次预取的消息数,避免消息堆积
    Java 实战代码
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    // 配置类
    @Configuration
    public class WorkQueueConfig {
    // 声明工作队列
    @Bean
    public Queue workQueue() {
    return new Queue("work.queue", true); // 持久化队列
    }
    }

    // 生产者
    @Component
    public class WorkProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendWorkMessage(int count) {
    for (int i = 0; i < count; i++) {
    rabbitTemplate.convertAndSend("work.queue", "Work 模型消息 " + i);
    }
    }
    }

    // 消费者1
    @Component
    public class WorkConsumer1 {
    @RabbitListener(queues = "work.queue")
    public void consume(String message) throws InterruptedException {
    System.out.println("消费者1 处理消息:" + message);
    Thread.sleep(1000); // 模拟处理耗时
    }
    }

    // 消费者2
    @Component
    public class WorkConsumer2 {
    @RabbitListener(queues = "work.queue")
    public void consume(String message) throws InterruptedException {
    System.out.println("消费者2 处理消息:" + message);
    Thread.sleep(2000); // 模拟处理耗时
    }
    }

五、交换机(Exchange)类型

RabbitMQ 有四种交换机类型,每种类型路由规则不同。

  1. Direct 交换机(精确匹配)
    路由规则:消息的 routingKey 与队列的 bindingKey 完全匹配时,消息路由到该队列

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    // 配置类
    @Configuration
    public class DirectExchangeConfig {
    // 声明 Direct 交换机
    @Bean
    public DirectExchange directExchange() {
    return new DirectExchange("direct.exchange", true, false);
    }

    // 声明队列
    @Bean
    public Queue directQueue1() {
    return new Queue("direct.queue1", true);
    }

    @Bean
    public Queue directQueue2() {
    return new Queue("direct.queue2", true);
    }

    // 绑定交换机与队列
    @Bean
    public Binding directBinding1() {
    return BindingBuilder.bind(directQueue1()).to(directExchange()).with("direct.key1");
    }

    @Bean
    public Binding directBinding2() {
    return BindingBuilder.bind(directQueue2()).to(directExchange()).with("direct.key2");
    }
    }

    // 生产者
    @Component
    public class DirectProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendDirectMessage() {
    rabbitTemplate.convertAndSend("direct.exchange", "direct.key1", "Direct 消息,路由到 queue1");
    rabbitTemplate.convertAndSend("direct.exchange", "direct.key2", "Direct 消息,路由到 queue2");
    }
    }
  2. Fanout 交换机(广播)
    路由规则:忽略 routingKey,将消息路由到所有绑定的队列

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    // 配置类
    @Configuration
    public class FanoutExchangeConfig {
    @Bean
    public FanoutExchange fanoutExchange() {
    return new FanoutExchange("fanout.exchange", true, false);
    }

    @Bean
    public Queue fanoutQueue1() {
    return new Queue("fanout.queue1", true);
    }

    @Bean
    public Queue fanoutQueue2() {
    return new Queue("fanout.queue2", true);
    }

    @Bean
    public Binding fanoutBinding1() {
    return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
    }

    @Bean
    public Binding fanoutBinding2() {
    return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
    }
    }

    // 生产者
    @Component
    public class FanoutProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendFanoutMessage() {
    // routingKey 可忽略,消息会路由到所有绑定的队列
    rabbitTemplate.convertAndSend("fanout.exchange", "", "Fanout 广播消息");
    }
    }
  3. Topic 交换机(模糊匹配)
    路由规则:routingKey 和 bindingKey 支持通配符(* 匹配一个单词,# 匹配零个或多个单词)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    // 配置类
    @Configuration
    public class TopicExchangeConfig {
    @Bean
    public TopicExchange topicExchange() {
    return new TopicExchange("topic.exchange", true, false);
    }

    @Bean
    public Queue topicQueue1() {
    return new Queue("topic.queue1", true);
    }

    @Bean
    public Queue topicQueue2() {
    return new Queue("topic.queue2", true);
    }

    // 匹配 user.* 格式的 routingKey
    @Bean
    public Binding topicBinding1() {
    return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("user.*");
    }

    // 匹配 user.# 格式的 routingKey
    @Bean
    public Binding topicBinding2() {
    return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("user.#");
    }
    }

    // 生产者
    @Component
    public class TopicProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendTopicMessage() {
    rabbitTemplate.convertAndSend("topic.exchange", "user.login", "用户登录消息,路由到 queue1 和 queue2");
    rabbitTemplate.convertAndSend("topic.exchange", "user.order.create", "用户创建订单消息,仅路由到 queue2");
    }
    }
  4. Headers 交换机(头部匹配)
    路由规则:根据消息的 headers 属性匹配,性能较差,实际使用较少

六、声明队列和交换机

在 Java 中,声明队列和交换机有两种方式:显式声明(通过代码)和隐式声明(通过 Spring 注解自动声明)。
显式声明(推荐)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// 配置类
@Configuration
public class DeclareConfig {
// 声明交换机
@Bean
public DirectExchange customExchange() {
// 参数:名称、是否持久化、是否自动删除、其他参数
return new DirectExchange("custom.exchange", true, false, null);
}

// 声明队列
@Bean
public Queue customQueue() {
// 参数:名称、是否持久化、是否排他、是否自动删除、其他参数
return QueueBuilder.durable("custom.queue")
.exclusive(false)
.autoDelete(false)
.build();
}

// 绑定交换机与队列
@Bean
public Binding customBinding() {
return BindingBuilder.bind(customQueue()).to(customExchange()).with("custom.key");
}
}
隐式声明(通过消费者注解)
java
运行
@Component
public class ImplicitDeclareConsumer {
// 监听时自动声明队列、交换机并绑定
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "implicit.queue", durable = "true"),
exchange = @Exchange(name = "implicit.exchange", type = ExchangeTypes.DIRECT),
key = "implicit.key"
))
public void consume(String message) {
System.out.println("隐式声明的消费者:" + message);
}
}

七、消息转换器

消息转换器用于 Java 对象与字节数组(MQ 传输格式)之间的转换,推荐使用 Jackson2JsonMessageConverter。
配置消息转换器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// 配置类
@Configuration
public class MessageConverterConfig {
@Bean
public Jackson2JsonMessageConverter messageConverter() {
ObjectMapper objectMapper = new ObjectMapper();
// 支持 Java 8 时间类型
objectMapper.registerModule(new JavaTimeModule());
// 忽略空值字段
objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
return new Jackson2JsonMessageConverter(objectMapper);
}

// 配置 RabbitTemplate 使用自定义消息转换器
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter messageConverter) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(messageConverter);
return rabbitTemplate;
}
}
实体类示例
java
运行
@Data
public class Order {
private Long orderId;
private String userId;
private BigDecimal amount;
private LocalDateTime createTime;
}
生产者使用
java
运行
@Component
public class ConverterProducer {
@Autowired
private RabbitTemplate rabbitTemplate;

public void sendOrder(Order order) {
rabbitTemplate.convertAndSend("converter.exchange", "converter.key", order);
}
}
消费者使用
java
运行
@Component
public class ConverterConsumer {
@RabbitListener(queues = "converter.queue")
public void consume(Order order) {
System.out.println("收到订单:" + order);
}
}

八、生产者可靠性保障

  1. Confirm 机制(消息到达交换机确认)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    // 配置类
    @Configuration
    public class ProducerReliabilityConfig {
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter messageConverter) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setMessageConverter(messageConverter);

    // 开启 Confirm 机制
    rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
    if (ack) {
    System.out.println("消息已成功到达交换机,correlationId:" + correlationData.getId());
    } else {
    System.out.println("消息到达交换机失败,原因:" + cause);
    // 实现重试逻辑
    }
    });

    return rabbitTemplate;
    }
    }

    // 生产者发送带 correlationId 的消息
    @Component
    public class ConfirmProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendConfirmMessage() {
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    rabbitTemplate.convertAndSend("confirm.exchange", "confirm.key", "Confirm 测试消息", correlationData);
    }
    }
    2. Return 机制(消息路由到队列失败返回)
    java
    运行
    @Configuration
    public class ProducerReliabilityConfig {
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter messageConverter) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setMessageConverter(messageConverter);

    // 开启 Return 机制
    rabbitTemplate.setReturnsCallback(returnedMessage -> {
    System.out.println("消息路由队列失败,交换机:" + returnedMessage.getExchange());
    System.out.println("路由键:" + returnedMessage.getRoutingKey());
    System.out.println("回复码:" + returnedMessage.getReplyCode());
    System.out.println("回复文本:" + returnedMessage.getReplyText());
    System.out.println("消息体:" + new String(returnedMessage.getMessage().getBody()));
    // 实现路由失败的处理逻辑
    });

    return rabbitTemplate;
    }
    }

九、消费者可靠性保障

  1. 手动 ACK 与重试
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    // 配置类
    @Configuration
    public class ConsumerReliabilityConfig {
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 手动 ACK
    factory.setDefaultRequeueRejected(false); // 消费失败不重新入队(避免死循环)
    factory.setRetryTemplate(buildRetryTemplate()); // 配置重试
    return factory;
    }

    private RetryTemplate buildRetryTemplate() {
    RetryTemplate retryTemplate = new RetryTemplate();
    // 重试策略:最多重试 3 次,每次间隔 1 秒
    SimpleRetryPolicy policy = new SimpleRetryPolicy(3, Collections.singletonMap(Exception.class, true));
    FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
    backOffPolicy.setBackOffPeriod(1000);
    retryTemplate.setRetryPolicy(policy);
    retryTemplate.setBackOffPolicy(backOffPolicy);
    return retryTemplate;
    }
    }

    // 消费者示例
    @Component
    public class ManualAckConsumer {
    @RabbitListener(queues = "manual.ack.queue", containerFactory = "rabbitListenerContainerFactory")
    public void consume(String message, Channel channel, Message msg) throws IOException {
    try {
    System.out.println("处理消息:" + message);
    // 业务逻辑处理...
    channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false); // 手动确认
    } catch (Exception e) {
    System.out.println("处理消息异常,准备重试或拒绝:" + e.getMessage());
    // 若开启了重试,此处无需手动拒绝,由重试机制处理
    // 若未开启重试,可手动拒绝并路由到死信队列
    // channel.basicReject(msg.getMessageProperties().getDeliveryTag(), false);
    }
    }
    }

2. 死信交换机(DLX)

死信交换机用于接收被拒绝、过期、队列满了的消息,实现消息的最终兜底处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// 配置类
@Configuration
public class DeadLetterConfig {
// 死信交换机
@Bean
public DirectExchange dlxExchange() {
return new DirectExchange("dlx.exchange", true, false);
}

// 死信队列
@Bean
public Queue dlxQueue() {
return new Queue("dlx.queue", true);
}

// 死信队列与死信交换机绑定
@Bean
public Binding dlxBinding() {
return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("dlx.key");
}

// 业务队列(配置死信参数)
@Bean
public Queue businessQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx.exchange"); // 绑定死信交换机
args.put("x-dead-letter-routing-key", "dlx.key"); // 死信路由键
args.put("x-message-ttl", 60000); // 消息过期时间(可选)
return QueueBuilder.durable("business.queue")
.withArguments(args)
.build();
}

// 业务交换机
@Bean
public DirectExchange businessExchange() {
return new DirectExchange("business.exchange", true, false);
}

// 业务队列与业务交换机绑定
@Bean
public Binding businessBinding() {
return BindingBuilder.bind(businessQueue()).to(businessExchange()).with("business.key");
}
}

// 死信消费者
@Component
public class DlxConsumer {
@RabbitListener(queues = "dlx.queue")
public void consume(String message) {
System.out.println("收到死信消息:" + message);
// 处理死信逻辑,如记录日志、人工干预
}
}

十、延迟消息

延迟消息指消息发送后,在指定时间后才会被消费者消费。RabbitMQ 实现延迟消息有两种方式:
方式一:死信交换机 + TTL(推荐)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
// 配置类
@Configuration
public class DelayMessageConfig {
// 死信交换机(用于延迟消息的最终路由)
@Bean
public DirectExchange dlxExchange() {
return new DirectExchange("delay.dlx.exchange", true, false);
}

// 死信队列
@Bean
public Queue dlxQueue() {
return new Queue("delay.dlx.queue", true);
}

// 死信队列与死信交换机绑定
@Bean
public Binding dlxBinding() {
return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("delay.dlx.key");
}

// 延迟交换机(用于接收延迟消息)
@Bean
public DirectExchange delayExchange() {
return new DirectExchange("delay.exchange", true, false);
}

// 延迟队列(配置 TTL 和死信参数)
@Bean
public Queue delayQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "delay.dlx.exchange"); // 绑定死信交换机
args.put("x-dead-letter-routing-key", "delay.dlx.key"); // 死信路由键
return QueueBuilder.durable("delay.queue")
.withArguments(args)
.build();
}

// 延迟队列与延迟交换机绑定
@Bean
public Binding delayBinding() {
return BindingBuilder.bind(delayQueue()).to(delayExchange()).with("delay.key");
}
}

// 延迟消息生产者
@Component
public class DelayProducer {
@Autowired
private RabbitTemplate rabbitTemplate;

public void sendDelayMessage(String message, int delaySeconds) {
MessageProperties properties = new MessageProperties();
// 设置消息过期时间(毫秒)
properties.setExpiration(String.valueOf(delaySeconds * 1000));
Message msg = new Message(message.getBytes(), properties);
rabbitTemplate.send("delay.exchange", "delay.key", msg);
}
}

// 延迟消息消费者(死信消费者)
@Component
public class DelayConsumer {
@RabbitListener(queues = "delay.dlx.queue")
public void consume(String message) {
System.out.println("收到延迟消息(" + new Date() + "):" + message);
// 处理延迟消息逻辑
}
}

方式二:延迟插件(rabbitmq_delayed_message_exchange)
需先安装延迟插件,然后使用 x-delayed-type 类型的交换机。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// 配置类
@Configuration
public class DelayedMessagePluginConfig {
// 声明延迟交换机(类型为 x-delayed-message)
@Bean
public CustomExchange delayedExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct"); // 底层路由类型
return new CustomExchange("delayed.exchange", "x-delayed-message", true, false, args);
}

// 声明延迟队列
@Bean
public Queue delayedQueue() {
return new Queue("delayed.queue", true);
}

// 绑定交换机与队列
@Bean
public Binding delayedBinding() {
return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with("delayed.key").noargs();
}
}

// 延迟消息生产者
@Component
public class DelayedPluginProducer {
@Autowired
private RabbitTemplate rabbitTemplate;

public void sendDelayedMessage(String message, int delaySeconds) {
MessageProperties properties = new MessageProperties();
// 设置延迟时间(毫秒)
properties.setDelay(delaySeconds * 1000);
Message msg = new Message(message.getBytes(), properties);
rabbitTemplate.send("delayed.exchange", "delayed.key", msg);
}
}

// 延迟消息消费者
@Component
public class DelayedPluginConsumer {
@RabbitListener(queues = "delayed.queue")
public void consume(String message) {
System.out.println("收到延迟消息(插件方式):" + message);
}
}

十一、延迟消息取消订单实战

模拟电商场景:用户下单后 30 分钟未支付,自动取消订单;若用户在 30 分钟内支付,取消延迟的取消订单操作。

  1. 实体类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    // 订单实体类
    @Data
    public class Order {
    private Long orderId;
    private String userId;
    private BigDecimal amount;
    private Integer status; // 0-待支付,1-已支付,2-已取消
    private LocalDateTime createTime;
    }

    // 取消订单消息
    @Data
    public class CancelOrderMessage {
    private Long orderId;
    private String correlationId; // 唯一标识,用于取消延迟消息
    }
  2. 配置类(死信 + TTL 方式)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    // 取消订单配置类
    @Configuration
    public class CancelOrderConfig {
    // 死信交换机
    @Bean
    public DirectExchange cancelDlxExchange() {
    return new DirectExchange("cancel.order.dlx.exchange", true, false);
    }

    // 死信队列
    @Bean
    public Queue cancelDlxQueue() {
    return new Queue("cancel.order.dlx.queue", true);
    }

    // 死信绑定
    @Bean
    public Binding cancelDlxBinding() {
    return BindingBuilder.bind(cancelDlxQueue()).to(cancelDlxExchange()).with("cancel.order.dlx.key");
    }

    // 延迟交换机
    @Bean
    public DirectExchange cancelDelayExchange() {
    return new DirectExchange("cancel.order.delay.exchange", true, false);
    }

    // 延迟队列(配置 TTL 和死信)
    @Bean
    public Queue cancelDelayQueue() {
    Map<String, Object> args = new HashMap<>();
    args.put("x-dead-letter-exchange", "cancel.order.dlx.exchange");
    args.put("x-dead-letter-routing-key", "cancel.order.dlx.key");
    args.put("x-message-ttl", 30 * 60 * 1000); // 30 分钟过期
    return QueueBuilder.durable("cancel.order.delay.queue")
    .withArguments(args)
    .build();
    }

    // 延迟绑定
    @Bean
    public Binding cancelDelayBinding() {
    return BindingBuilder.bind(cancelDelayQueue()).to(cancelDelayExchange()).with("cancel.order.delay.key");
    }
    }
  3. 生产者(创建订单时发送延迟取消消息)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    // 订单生产者
    @Component
    public class OrderProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
    * 创建订单并发送延迟取消消息
    */
    public String createOrder(Order order) {
    // 生成唯一 correlationId,用于后续取消延迟消息
    String correlationId = UUID.randomUUID().toString();

    // 保存订单到数据库(状态:待支付)
    order.setStatus(0);
    order.setCreateTime(LocalDateTime.now());
    orderMapper.insert(order); // 假设存在 orderMapper

    // 发送延迟取消订单消息
    CancelOrderMessage cancelMsg = new CancelOrderMessage();
    cancelMsg.setOrderId(order.getOrderId());
    cancelMsg.setCorrelationId(correlationId);

    CorrelationData correlationData = new CorrelationData(correlationId);
    rabbitTemplate.convertAndSend(
    "cancel.order.delay.exchange",
    "cancel.order.delay.key",
    cancelMsg,
    correlationData
    );

    return correlationId;
    }

    /**
    * 用户支付后,取消延迟的取消订单消息
    */
    public void payOrder(Long orderId, String correlationId) {
    // 更新订单状态为已支付
    orderMapper.updateStatus(orderId, 1);

    // 取消延迟的取消订单消息
    rabbitTemplate.cancel(correlationId);
    }
    }
  4. 消费者(处理延迟取消订单)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    // 取消订单消费者
    @Component
    public class CancelOrderConsumer {
    @Autowired
    private OrderMapper orderMapper;

    @RabbitListener(queues = "cancel.order.dlx.queue")
    public void consume(CancelOrderMessage message) {
    // 查询订单
    Order order = orderMapper.selectById(message.getOrderId());
    if (order == null) {
    return;
    }

    // 只有待支付状态才取消订单
    if (order.getStatus() == 0) {
    orderMapper.updateStatus(order.getOrderId(), 2);
    System.out.println("订单 " + order.getOrderId() + " 因超时未支付已取消");
    }
    }
    }
  5. 监听延迟消息(可选,用于监控)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    // 延迟消息监听
    @Component
    public class DelayMessageListener {
    @RabbitListener(queues = "cancel.order.delay.queue")
    public void listen(CancelOrderMessage message, Channel channel, Message msg) throws IOException {
    // 此方法实际不会被调用,因为消息会直接过期路由到死信队列
    // 若需监控延迟消息,可在此处记录日志
    System.out.println("监控到延迟消息:" + message);
    channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
    }
    }