0%

RabbitMQ消息投递确认

RabbitMQ消息投递确认

背景

​ 在项目中引入了RabbitMQ用于服务间一些解耦操作,在运行了一段时间之后,突然在某个环境中出现了一些异常情况。

问题

​ 从投递者日志来看,确实投递者有将对应的数据投递出去;从消费者日志来看,一直未监听到对应的消息,并且从RabbitMQ后台来看,对应队列上是存在监听,并且通过RabbitMQ控制台通过Publish message发出的消息,在消费者服务的日志中可以看到对应的消费信息。

​ 经过反复的查看,确定投递者服务投递的消息对应的交换机和路由器应该是没有问题的。排查到这里实际上我已经没有任何排查思路了,我猜测既然消费者的日志中并没有出现消费的相关日志,那么大概率问题应该是在投递端。

解决

​ 在RabbitMQ中是有消息投递确认的回调处理,当投递失败时,可以通过回调接口在投递者服务中打印出对应的信息,这样至少排查问题时可以知道,是投递出现了问题。

这里先简单解释两个概念:

消息是由投递者投递到交换机,再由交换机根据指定路由键投递到对应的队列

  • publisher-confirm:发送者确认,消息成功投递到交换机返回ack,消息未投递到交换机返回nack注意,这里的维度是交换机,也就是交换机接收了,就认为你投递成功。

  • publisher-return:发送者回执,消息发送队列失败会回调这个方法。注意,这里的维度是队列,就是交换机有没有将消息投递到具体的队列上,若投递不到具体的队列,则回调这个方法。

1.在spring配置项中开启投递确认

1
2
3
4
5
6
# simple 同步等待confirm结果,直到超时
spring.rabbitmq.publisher-confirm-type=correlated
# 开始publish-return功能,同样基于callback机制,不过是定义ReturnCallback
spring.rabbitmq.publisher-returns=true
# 定义消息路由失败时的策略,true则调用ReturnCallback; false则丢弃
spring.rabbitmq.template.mandatory=true

2.设置投递确认和投递回执监听

我这里为了快捷测试,就通过@PostConstruct设置对应的回调信息,实际项目中,应通过@Configuration或者@Bean抽取到对应的配置类中,因为setReturnsCallback等方法对应每一个rabbitTemplate实例只能设置一次,第二次赋值,将会抛出异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Autowired
private RabbitTemplate rabbitTemplate;

@PostConstruct
public void init(){
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.info("消息发送失败,应答码{},原因{},交换机{},路由key{},消息{}",
returnedMessage.getReplyCode(), returnedMessage.getReplyText(), returnedMessage.getExchange(),
returnedMessage.getRoutingKey(), returnedMessage.getMessage());
throw new RuntimeException("消息投递异常");
}
});
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
System.out.println(correlationData);
}
});
}

@PostMapping("/testMq")
@ApiOperation("测试发送消息")
public RestResponse<Boolean> testMq(@RequestParam("exchange") String exchange, @RequestParam("route") String route) {
rabbitTemplate.convertAndSend(exchange, route, "{}");
return RestResponse.<Boolean>builder().data(null).build();
}

测试结论:

1.若投递一个不存在的交换机,setConfirmCallback()中返回false不会进入returnCallback监听方法。这里还是要老生常谈一下,一定要注意,若交换机投递失败,returnCallback是不会接受到监听回调。

2.若投递一个存在的交换机且存在的路由键,setConfirmCallback()中返回true不会进入returnCallback监听方法。因为只有投递队列失败才会进入到这个方法中。

3.若投递一个存在的交换机且不存在的路由键,setConfirmCallback()中返回true会进入returnCallback监听方法。

-------------本文结束感谢您的阅读-------------