参考文献

消息队列使用场景

  • 异步处理
    • 优点:
      • 可以更快地返回结果;
      • 减少等待,自然实现了步骤之间的并发,提升系统总体的性能.
  • 流程控制
    • 优点:
      • 能根据下游的处理能力自动调节流量,达到“削峰填谷”的作用
    • 缺点:
      • 增加了系统调用链环节,导致总体的响应时延变长.
      • 上下游系统都要将同步调用改为异步消息,增加了系统的复杂度.
  • 服务解耦
  • 作为发布 / 订阅系统实现一个微服务级系统间的观察者模式;
  • 连接流计算任务和数据;
  • 用于将消息广播给大量接收者.

消息中间件的选择

RabbitMQ

优点
  • RabbitMQ 的架构简单,易于上手,开发人员可以快速使用.
  • RabbitMQ 提供了丰富的消息模式,例如发布-订阅模式、工作队列模式、RPC 模式等.
  • RabbitMQ 支持各种消息传递协议,包括 AMQP、MQTT、STOMP 等.
  • RabbitMQ 的可靠性较高,支持持久化消息,可以确保消息不会丢失.
缺点
  • RabbitMQ 的性能较低,不适合处理高吞吐量的消息.
  • RabbitMQ 的可扩展性有限,节点的数量和规模受限于 Erlang VM 的性能.
  • RabbitMQ 的集群模式复杂,需要了解一些分布式系统的知识.
  • RabbitMQ 的消息大小受到 Erlang VM 的内存限制,不适合处理大型消息.

RocketMQ

优点
  • RocketMQ 的性能较高,能够处理大规模的消息流量.
  • RocketMQ 的可靠性较高,支持消息持久化和消息备份机制,可以确保消息不会丢失.
  • RocketMQ 支持丰富的消息传递模式,例如发布-订阅模式、点对点模式等.
  • RocketMQ 的集群模式简单,易于部署和维护.
  • RocketMQ 具备很好的可扩展性,可以方便地添加新节点以扩展集群规模.
缺点
  • RocketMQ 对网络环境的要求较高,需要高速稳定的网络环境.
  • RocketMQ 不支持消息转发,不能处理跨集群的消息传递.
  • RocketMQ 的消息模型较为简单,没有 Kafka 的复杂度和灵活性.

Kafka

优点
  • Kafka 的性能非常高,可以处理高吞吐量的消息.
  • Kafka 的消息模型非常灵活,支持多种消息传递模式,例如发布-订阅模式、消息队列模式等.
  • Kafka 具备很好的可扩展性,可以方便地添加新节点以扩展集群规模.
  • Kafka 支持跨集群的消息传递,非常适合分布式系统的应用场景.
  • Kafka 具有较好的容错性,可以通过副本机制来避免数据丢失.
缺点
  • Kafka 的架构较为复杂,需要对其内部原理和机制有一定的了解才能使用.
  • Kafka 对存储和网络资源的要求较高,需要足够的存储空间和带宽.

三者适用场景和不适用场景

  • 吞吐量:Kafka是最强大的分布式消息队列之一,可处理每秒数百万消息.根据Kafka的性能测试,单个Kafka实例可以处理每秒10万个消息,而具有100个分区和3个副本的集群可以处理每秒数百万消息.RabbitMQ和RocketMQ通常也可以处理每秒数万到数十万个消息,但它们的性能受到各种因素的影响,例如网络延迟、IO操作等.
  • 消息可靠性:Kafka是一种可靠性非常高的消息队列,其主要是通过副本机制来保证消息的可靠性.RabbitMQ和RocketMQ也可以通过持久化消息、消息确认等方式来提高消息的可靠性,但它们的实现方式不同,对于不同的业务需求,可能需要选择不同的方案.
  • 稳定性:RabbitMQ和RocketMQ都可以提供较高的稳定性和可用性,它们都有一些内置的机制来避免消息的丢失或重复消费.而Kafka虽然吞吐量很高,但在处理高负载情况下,可能会发生一些问题,例如消息积压、网络拥塞等.
  • 部署成本:Kafka可以在各种环境中部署,但它需要相对较高的维护成本,例如安装、升级、监控等.RabbitMQ和RocketMQ的安装和使用相对简单,对于小规模的应用程序来说,它们可能更加适合.
适用场景和不适用场景
  • 适用场景

    • Kafka:大规模的分布式系统,高吞吐量、高可靠性、实时数据处理、流处理等场景.

    • RabbitMQ:适用于中小型企业的消息传递、工作队列、发布/订阅等场景.

    • RocketMQ:适用于消息顺序、长时间堆积、流式处理、分布式事务等场景.

  • 不适用场景

    • Kafka:小规模的应用程序,对可靠性和消息顺序要求不高的场景.

    • RabbitMQ:对于高吞吐量、实时数据处理等场景,RabbitMQ可能无法提供足够的性能.

    • RocketMQ:对于小规模的应用程序,RocketMQ可能会导致一些额外的开销.同时,RocketMQ的社区支持不如Kafka和RabbitMQ那么强大.

消息模型

队列模型

  • 最初的消息队列,就是一个严格意义上的队列.在计算机领域,“队列(Queue)”是一种数据结构,有完整而严格的定义.
  • 第一个是先进先出,这里面隐含着的一个要求是,在消息入队出队过程中,需要保证这些消息严格有序,按照什么顺序写进队列,必须按照同样的顺序从队列中读出来.

发布-订阅模型(Publish-Subscribe Pattern)

  • 在发布 - 订阅模型中,消息的发送方称为发布者(Publisher),消息的接收方称为订阅者(Subscriber),服务端存放消息的容器称为主题(Topic).发布者将消息发送到主题中,订阅者在接收消息之前需要先“订阅主题”.“订阅”在这里既是一个动作,同时还可以认为是主题在消费时的一个逻辑副本,每份订阅中,订阅者都可以接收到主题的所有消息.
  • 在消息领域的历史上很长的一段时间,队列模式和发布-订阅模式是并存的,有些消息队列同时支持这两种消息模型,比如 ActiveMQ.仔细对比一下这两种模型,生产者就是发布者,消费者就是订阅者,队列就是主题,并没有本质的区别.它们最大的区别其实就是,一份消息数据能不能被消费多次的问题.

RabbitMQ 的消息模型

  • 它是少数依然坚持使用队列模型的产品之一.
  • 在 RabbitMQ 中,Exchange 位于生产者和队列之间,生产者并不关心将消息发送给哪个队列,而是将消息发送给 Exchange,由 Exchange 上配置的策略来决定将消息投递到哪些队列中.
  • 同一份消息如果需要被多个消费者来消费,需要配置 Exchange 将消息发送到多个队列,每个队列中都存放一份完整的消息数据,可以为一个消费者提供消费服务.这也可以变相地实现新发布 - 订阅模型中,“一份消息数据可以被多个订阅者来多次消费”这样的功能.

RocketMQ 的消息模型

  • RocketMQ 使用的消息模型是标准的发布 - 订阅模型.

消息中间件可能遇到的问题

  • 消息的分布式事务

  • 消息丢失问题

  • 重复消息以及重复消费问题

  • 消息积压问题

RabbitMQ 如何保证消息不丢失?

  • 持久化消息:RabbitMQ 支持将消息标记为持久化,以确保消息不会因为 RabbitMQ 重启或崩溃而丢失.要持久化消息,需要将消息的 delivery mode 属性设置为 2.
  • 消息确认:在发布消息时,可以请求 RabbitMQ 发送一个确认信号来确保消息已经被正确地传递到一个队列中.这个确认信号可以在确认消息被 RabbitMQ 接收时发送,也可以在消息被消费者处理时发送.
  • 事务:RabbitMQ 支持 AMQP 事务,通过事务机制,可以将多个操作绑定在一个原子性的操作序列中,从而确保消息的完整性和一致性.
  • 备份队列:RabbitMQ 还支持备份队列,可以在消息无法被路由到目标队列时将消息发送到备份队列中,从而避免消息丢失.

RocketMQ如何保证消息不丢失?

  • 集群模式:RocketMQ支持将消息存储在多个Broker节点中,以此来实现消息冗余和备份.当某个Broker节点出现故障时,其他节点会自动接管该节点的工作,保证消息的可靠性.
  • 主从同步:在Master-Slave模式下,Master节点负责写入消息,Slave节点负责备份消息.在写入消息之后,Master节点会将消息同步到Slave节点中,确保消息能够在节点故障时得到恢复.
  • 刷盘机制:RocketMQ使用异步刷盘的方式来保证消息不丢失.在消息写入内存后,Broker节点会异步将消息刷盘到磁盘中.在刷盘完成之前,Broker节点不会将该消息标记为已经发送,以此来保证消息不会丢失.
  • 重试机制:当消息发送失败时,RocketMQ会自动进行重试.RocketMQ默认会在发送失败的第一次尝试之后,将消息保存在本地磁盘中.在之后的定时任务中,RocketMQ会重新发送这些消息,直到发送成功为止.
  • 消息确认机制:RocketMQ支持两种消息确认方式:同步确认和异步确认.同步确认是指当消息发送成功后,Producer会立即收到Broker节点的确认消息,从而保证消息不会丢失.异步确认是指当消息发送成功后,Broker节点会异步发送确认消息,Producer在之后通过回调函数的方式接收确认消息

Kafka如何保证消息不丢失?

  • 写入数据到多个副本:在 Kafka 中,消息被写入到一个 topic 中,而一个 topic 可以分为多个 partition.每个 partition 可以有多个副本(replica),其中一个是 leader 副本,其余是 follower 副本.producer 发送消息时,可以指定消息要被写入的 partition,Kafka 会将消息写入 partition 的 leader 副本,并将消息同步到其他 follower 副本.只有当 leader 副本和所有 follower 副本都已成功写入消息时,producer 才会收到确认,才会认为消息已经成功写入 Kafka.如果 leader 副本出现故障,Kafka 会从 follower 副本中选举一个新的 leader,保证消息的可用性.
  • 异步写入消息:Kafka producer 可以使用异步写入模式,这意味着 producer 发送消息时不会等待 Kafka 的确认,而是在后台进行写入.这种方式可以显著提高写入性能.当 Kafka 确认消息已经被写入时,producer 会得到一个回调,以便处理发送失败或者其他异常情况.
  • 持久化存储:Kafka 的数据存储是基于磁盘的,即使服务器崩溃,磁盘上的数据也可以被保留下来.这意味着即使出现硬件故障,Kafka 也可以保证消息不会丢失.
  • 复制和备份:Kafka 还可以使用复制和备份来保证数据的安全性.Kafka 允许将数据从一个 Kafka 集群复制到另一个集群中,以便在出现故障时进行恢复.此外,Kafka 还支持定期备份数据到远程存储,以便在发生灾难性故障时进行数据恢复.

MQ产生消息重复的原因

  • 消息重复发送:当生产者在发送消息的时候,由于网络等原因未能及时收到确认信息,导致重复发送相同的消息。
  • 消息消费失败:当消息在被消费者处理时出现异常等问题,可能导致消费失败,此时生产者可能会尝试重新发送相同的消息。
  • 消息重复消费:当消费者消费消息时,在消息处理成功之前宕机或者断开连接,可能导致消息重新被消费,从而出现消息重复消费的情况。
  • 网络分区问题:在分布式系统中,网络分区是一种常见的问题,如果消息队列的多个节点之间发生网络分区,可能会导致消息重复发送或者重复消费。
  • 高并发处理问题:在高并发的情况下,可能会出现多个消费者同时处理相同的消息,从而导致消息的重复处理。

如何处理重复消息?

  • 对于重复发送的消息,可以使用消息去重机制,避免重复发送相同的消息。
  • 对于消费失败的消息,可以采用消息重试机制,在一定的时间内多次尝试重新消费。
  • 对于消息重复消费的情况,可以使用幂等性保证机制,确保重复消费不会产生错误的结果。
  • 在设计系统时,应该避免出现网络分区的问题,或者采取一些措施来解决网络分区问题,例如使用多个数据中心、采用冗余备份等。
  • 在高并发的情况下,应该采用一些限流措施,避免出现多个消费者同时处理相同的消息。

RabbitMQ如何处理消息积压?

  • 增加消费者:可以增加消费者以加快消费消息的速度.可以使用 RabbitMQ 的消息负载均衡功能,在多个消费者之间平均分配消息.
  • 增加节点:可以通过添加更多的节点来扩展 RabbitMQ 集群的容量和吞吐量.在添加节点之前,请确保 RabbitMQ 已经按照最佳实践进行配置,以确保群集正常工作.
  • 增加队列容量:可以增加队列的容量,以处理更多的消息.这可以通过增加队列的内存限制或磁盘限制来实现.
  • 减少消息大小:可以减少消息的大小,以减少队列的负载.可以考虑使用二进制协议来减少消息的大小.
  • 设置消息过期时间:可以设置消息的过期时间,让 RabbitMQ 自动删除已经过期的消息.
  • 队列镜像:可以通过镜像队列将队列复制到多个节点中,从而提高队列的可靠性和性能.
  • 消息持久化:可以将消息持久化到磁盘上,以确保在 RabbitMQ 重启或崩溃后不会丢失消息.

RocketMQ如何处理消息积压?

  • 增加消费者:可以增加消费者以加快消费消息的速度.可以使用 RocketMQ 的负载均衡功能,在多个消费者之间平均分配消息.
  • 增加 Broker:可以通过添加更多的 Broker 节点来扩展 RocketMQ 集群的容量和吞吐量.在添加 Broker 节点之前,请确保 RocketMQ 已经按照最佳实践进行配置,以确保集群正常工作.
  • 调整消息存储策略:可以将消息存储策略从异步刷盘改为同步刷盘,从而减少消息积压的风险.
  • 设置消息过期时间:可以设置消息的过期时间,让 RocketMQ 自动删除已经过期的消息.
  • 队列分区:可以将队列分区,将一个队列分为多个子队列,从而提高消费消息的并发度.
  • 延迟消息:可以将延迟消息发送到延迟队列中,从而避免消息积压的风险.
  • 消息压缩:可以对消息进行压缩,以减少消息传输的大小

Kafka如何处理消息积压?

  • 增加消费者:可以增加消费者以加快消费消息的速度.可以使用 Kafka 的消费者群组功能,在多个消费者之间平均分配消息.
  • 增加 Broker:可以通过添加更多的 Broker 节点来扩展 Kafka 集群的容量和吞吐量.在添加 Broker 节点之前,请确保 Kafka 已经按照最佳实践进行配置,以确保集群正常工作.
  • 调整分区:可以通过增加分区来提高 Kafka 的吞吐量和并发度.可以使用 Kafka 的 rebalance 功能来重新分配分区.
  • 提高副本数量:可以增加 Kafka Topic 中分区的副本数量,从而提高消息的可靠性和可用性.
  • 设置消息过期时间:可以设置消息的过期时间,让 Kafka 自动删除已经过期的消息.
  • 调整日志清理策略:可以调整 Kafka 的日志清理策略,从而保留更多的消息.默认情况下,Kafka 使用基于时间的日志清理策略,可以考虑使用基于日志大小的日志清理策略.
  • 压缩消息:可以对消息进行压缩,以减少消息传输的大小.