RabbitMQ-如何保证消息不丢失
参考文献
RabbitMQ如何保证消息不丢失?
确保消息到MQ
生产者配置(结合Spring)
-
在 Spring 中,通过 RabbitTemplate 来发送消息到 RabbitMQ,可以通过配置 RabbitTemplate 的 ConfirmCallback 和 ReturnCallback 来实现消息的确认机制。
-
ConfirmCallback 是消息发送确认的回调函数接口,该接口方法如下:
1
2
3
4
5
6
7
8
9
10public interface ConfirmCallback {
/**
* 消息确认回调方法
* @param correlationData 相关数据
* @param ack 消息是否成功到达交换机
* @param cause 失败原因
*/
void confirm(CorrelationData correlationData, boolean ack, String cause);
} -
ReturnCallback 是消息发送失败后的回调函数接口,该接口方法如下:
1
2
3
4
5
6
7
8
9
10
11
12public interface ReturnCallback {
/**
* 消息未投递到队列时回调方法
* @param message 消息
* @param replyCode 回复代码
* @param replyText 回复文本
* @param exchange 交换器名称
* @param routingKey 路由键
*/
void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey);
} -
可以在 RabbitTemplate 中配置 ConfirmCallback 和 ReturnCallback,示例代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class RabbitmqConfig {
private RabbitmqProperties rabbitmqProperties;
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 开启失败通知
rabbitTemplate.setMandatory(true);
// 设置消息确认回调函数
rabbitTemplate.setConfirmCallback(new RabbitmqConfirmCallback());
// 设置消息失败返回回调函数
rabbitTemplate.setReturnCallback(new RabbitmqReturnCallback());
return rabbitTemplate;
}
/**
* 消息确认回调函数实现
*/
private class RabbitmqConfirmCallback implements RabbitTemplate.ConfirmCallback {
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
// 消息发送到 Exchange 成功
} else {
// 消息发送到 Exchange 失败
}
}
}
/**
* 消息失败返回回调函数实现
*/
private class RabbitmqReturnCallback implements RabbitTemplate.ReturnCallback {
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
// 消息发送到队列失败
}
}
} -
通过设置 RabbitTemplate 的 ConfirmCallback 和 ReturnCallback,可以在生产者发送消息后得到消息的确认结果,进而保证消息到达 RabbitMQ。
确保消息在队列正确的存储
-
对交换器,队列,消息都进行持久化
-
对交换器进行持久化配置
1
2
3
4
5
6
7
public Exchange directExchange() {
return ExchangeBuilder.directExchange(EXCHANGE_NAME)
// 设置为持久化
.durable(true)
.build();
} -
对队列进行持久化配置
1
2
3
4
5
6
public Queue queue() {
// 设置为持久化
return QueueBuilder.durable(QUEUE_NAME)
.build();
} -
对消息进行持久化配置
1
2
3
4
5MessageProperties messageProperties = new MessageProperties();
// 将deliveryMode 属性设置为 PERSISTENT,以使消息进行持久化
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
Message message = new Message(messageBody.getBytes(), messageProperties);
rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, message);
确保消息从队列中正确的投递至消费者
消费者配置
1 |
|
1 |
|
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 HoleLin's Blog!