参考文献

RabbitMQ如何保证消息不丢失?

确保消息到MQ

生产者配置(结合Spring)
  • 在 Spring 中,通过 RabbitTemplate 来发送消息到 RabbitMQ,可以通过配置 RabbitTemplate 的 ConfirmCallback 和 ReturnCallback 来实现消息的确认机制。

  • ConfirmCallback 是消息发送确认的回调函数接口,该接口方法如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    public interface ConfirmCallback {

    /**
    * 消息确认回调方法
    * @param correlationData 相关数据
    * @param ack 消息是否成功到达交换机
    * @param cause 失败原因
    */
    void confirm(CorrelationData correlationData, boolean ack, String cause);
    }
  • ReturnCallback 是消息发送失败后的回调函数接口,该接口方法如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    public interface ReturnCallback {

    /**
    * 消息未投递到队列时回调方法
    * @param message 消息
    * @param replyCode 回复代码
    * @param replyText 回复文本
    * @param exchange 交换器名称
    * @param routingKey 路由键
    */
    void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey);
    }
  • 可以在 RabbitTemplate 中配置 ConfirmCallback 和 ReturnCallback,示例代码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    @Configuration
    public class RabbitmqConfig {

    @Autowired
    private RabbitmqProperties rabbitmqProperties;

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    // 开启失败通知
    rabbitTemplate.setMandatory(true);

    // 设置消息确认回调函数
    rabbitTemplate.setConfirmCallback(new RabbitmqConfirmCallback());

    // 设置消息失败返回回调函数
    rabbitTemplate.setReturnCallback(new RabbitmqReturnCallback());

    return rabbitTemplate;
    }

    /**
    * 消息确认回调函数实现
    */
    private class RabbitmqConfirmCallback implements RabbitTemplate.ConfirmCallback {
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    if (ack) {
    // 消息发送到 Exchange 成功
    } else {
    // 消息发送到 Exchange 失败
    }
    }
    }

    /**
    * 消息失败返回回调函数实现
    */
    private class RabbitmqReturnCallback implements RabbitTemplate.ReturnCallback {
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
    // 消息发送到队列失败
    }
    }
    }
  • 通过设置 RabbitTemplate 的 ConfirmCallback 和 ReturnCallback,可以在生产者发送消息后得到消息的确认结果,进而保证消息到达 RabbitMQ。

确保消息在队列正确的存储

  • 对交换器,队列,消息都进行持久化

  • 对交换器进行持久化配置

    1
    2
    3
    4
    5
    6
    7
    @Ben
    public Exchange directExchange() {
    return ExchangeBuilder.directExchange(EXCHANGE_NAME)
    // 设置为持久化
    .durable(true)
    .build();
    }
  • 对队列进行持久化配置

    1
    2
    3
    4
    5
    6
    @Bean
    public Queue queue() {
    // 设置为持久化
    return QueueBuilder.durable(QUEUE_NAME)
    .build();
    }
  • 对消息进行持久化配置

    1
    2
    3
    4
    5
    MessageProperties messageProperties = new MessageProperties();
    // 将deliveryMode 属性设置为 PERSISTENT,以使消息进行持久化
    messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
    Message message = new Message(messageBody.getBytes(), messageProperties);
    rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, message);

确保消息从队列中正确的投递至消费者

消费者配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Configuration
public class RabbitConfig {

@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setPort(5672);
return connectionFactory;
}

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrentConsumers(3);
factory.setMaxConcurrentConsumers(10);
factory.setPrefetchCount(10);
// 设置手动确认模式
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Component
public class RabbitmqConsumer {

@RabbitListener(queues = "testQueue")
public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
try {
// 处理消息
System.out.println("Received message: " + message);
// 手动确认消息
channel.basicAck(tag, false);
} catch (Exception e) {
// 消息处理失败,可以进行一些处理,比如记录日志等
// 然后将消息重新放回队列,让其他消费者来消费
channel.basicNack(tag, false, true);
}
}

}