参考文献

死信队列

  • RabbitMQ中的死信队列(Dead Letter Queue,DLQ)是一种特殊的队列,用于接收那些未能被消费者正确处理的消息.当一条消息被认为“死亡”时(如过期、被拒绝、超出队列限制等),RabbitMQ会将其路由到预先设定的死信队列中,以便后续进一步处理.

死信队列的作用

  • 容错处理:当某些消息无法正常处理时,可以将其转发至死信队列,以免影响整个消息队列的正常运行.
  • 调试方便:死信队列可以用于存储消费者处理失败的消息,方便开发人员进行问题排查和调试.
  • 业务逻辑处理:死信队列也可以作为一个处理逻辑的触发点,例如当某些消息无法被正确处理时,可以将其路由到死信队列中触发其他业务逻辑的处理.

死信队列的创建步骤

  • 创建一个普通队列,并设置相关参数(如队列名、交换机、路由键等).

  • 设置队列的死信参数(如死信交换机、死信路由键等),以便将“死亡”的消息路由到指定的死信队列中.这些参数可以在创建队列时进行设置,也可以在之后通过队列属性进行修改.

  • 创建一个死信队列,用于接收那些未能被消费者正确处理的消息.同样需要设置相关参数(如队列名、交换机、路由键等).

  • 将死信队列绑定到死信交换机上,以便死信队列能够接收死信消息.

  • 将普通队列绑定到交换机上,开始向队列中发送消息.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    import com.rabbitmq.client.*;

    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;

    public class CreateQueueWithDLQ {
    private final static String QUEUE_NAME = "myqueue";
    private final static String DLX_EXCHANGE_NAME = "dlx_exchange";
    private final static String DLQ_QUEUE_NAME = "dlq_queue";
    private final static String DLX_ROUTING_KEY = "dlx_routing_key";

    public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection connection = factory.newConnection();
    Channel channel = connection.createChannel()) {
    // 创建死信交换机
    channel.exchangeDeclare(DLX_EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true);
    // 创建死信队列
    channel.queueDeclare(DLQ_QUEUE_NAME, true, false, false, null);
    // 将死信队列绑定到死信交换机上
    channel.queueBind(DLQ_QUEUE_NAME, DLX_EXCHANGE_NAME, DLX_ROUTING_KEY);

    // 设置普通队列的死信参数
    Map<String, Object> arguments = new HashMap<>();
    arguments.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME);
    arguments.put("x-dead-letter-routing-key", DLX_ROUTING_KEY);

    // 创建普通队列,并将死信参数设置到队列中
    channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);

    String message = "Hello, RabbitMQ!";
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
    System.out.println(" [x] Sent '" + message + "'");
    }
    }
    }