消息队列(MQ)

消息队列(MQ)

消息队列需要用到网络,它的非网络实现是EventBus(消息总线),常用于:

  • 各种中间件
  • 微服务之间通信
  • 统一进程内不同线程通信
  • 具有循环依赖关系的对象之间的通信

(一)基本概念

  1. 生产者:将数据放入消息队列的角色
  2. 消费者:从消息队列拿出数据的角色

为了避免消息队列服务器宕机造成消息丢失,会将成功发送到消息队列的消息存储在消息生产者服务器上,等消息真正被消费者服务器处理后才删除消息Kafka不会删除)。在消息队列服务器宕机后,生产者服务器会选择分布式消息队列服务器集群中的其他服务器发布消息


(二)消息推拉模型:push/pull

1.push:慢消费 -> 消息堆积

  • push模式的最大缺点就是慢消费造成的消息堆积
  • 消费者消费message的速率小于生产者生产message的速度,则势必造成消息在broker中堆积

2.pull:消息延迟 & 忙等

  • pull模式的最大缺点就是消息延迟忙等

  • pull模式中,消息拉取的主动权在消费方手中,但是消费方无法知道生产者何时生产消息,所以如果pull没有拉取到消息,需要等待一段事件后再重新拉取;

  • 业内成熟的方案是:pull间隔从一个较短的时间开始,每次pull没有拉取到消息,则间隔以指数增长,直到pull到消息后间隔回退到较短的时间

    听起来是不是很耳熟?没错,其设计思想和TCP中的拥塞控制机制:慢启动 & 快速恢复是一毛一样的(o゚v゚)ノ

  • 但是遗憾的是,即使采用了上述方案,也不能保证消息就一定会被准时消费,一旦生产者在pull间隔发送消息,那么该消息就势必会延迟

  • RocketMQ中采用了长轮询的机制 —— 消费者pull拉取消息失败后不是return,而是挂起连接wait,生产者有消息后再notify该线程。但是有一点要注意,海量长连接开销非常大,需要合理评估wait的时间

    是不是又感觉有点熟悉?这种思想在JVM中的自旋锁HTTP2NIO都有体现。

3.push-pull

  • 为了解决push模式和pull模式各自的缺点,在大型消息中间件(比如RocketMQKafka)中,都采用的是push-pull模式 —— 生产者push通知消费者消息到达,然后消费者主动pull消息

(三)MQ的优点

  1. 解耦:生产者只需要将数据下发到MQ,消费者如果感兴趣就进行订阅,生产者不关心哪些消费者需要消费该数据,生产者和消费者就解耦开了,相互之间的修改不会影响到对方。

    为了避免MQ宕机导致消息丢失,一般生产者还是会存储消息,直到消息被消费者消费后才删除本机的消息。如果MQ宕机,生产者会选择MQ集群中的其他MQ再次发送消息。

  2. 异步:生产者将数据下发到MQ,消费者异步消费数据,生产者不需要阻塞等待,能够大大提升响应速度

    常见的应用场景:
    不使用MQ时,用户的请求数据直接写入数据库,高并发情况下数据库压力剧增,响应速度变慢。加入MQ后,用户的请求写入MQ就返回,再由消费者异步地从MQ拿到数据后写入数据库,由于MQ处理速度远大于数据库,所以响应速度大大提升。
    但是要注意,由于数据写入MQ就返回了,而数据在后续的校验/写入数据库都可能失败,所以要修改后续流程进行配合,比如提交订单,不能在数据写入MQ就返回用户提交订单成功,而是应该在MQ的消费者真正处理完数据后,再通过电子邮件/短信通知用户提交订单成功,防止交易纠纷。

  3. 削峰/限流:消费者可以处理多大的流量就去MQ拿多少的数据,不会出现所有请求一次发给消费者造成系统崩溃的情况

    常见应用场景:
    秒杀系统流量削峰。


(四)MQ的缺点

  1. 可用性降低:引入MQ后,就要考虑消息丢失和MQ宕机的情况。

  2. 系统复杂性提高:引入MQ后,要保证消息不会被重复消费、处理消息丢失、保证消息传递的顺序性。

    如何保证消息严格顺序消费?

  3. 一种是像RocketMQ那样端对端单点消费(见第六节),生产者将排序的一组消息发送到单一的queue,通过加锁,保证只有单个消费者消费这个queue上的消息,这种情况下的消息一定是严格顺序消费的;如果有多组排序消息,可以通过hash的方式,将不同消息组发送到不同的队列

  4. 另一种则是将顺序消息发送到单一的消费者上,通过内存队列进行排序,然后再由该消费者发送给其他worker消费者进行处理

  5. 一致性问题:MQ异步处理数据,如果消费者没有消费数据,就会导致数据不一致。

如何保证消息不被重复消费?Redis分布式锁

消息被消费者消费后,一般会给MQ发送一个确认消息(比如RabbitMQ的ACK/RocketMQ的CONSUME_SUCCESS/Kafka的offset),但是由于网络延迟等原因,MQ没有收到确认信息,就会将信息再发给其他消费者,造成重复消费

如何解决呢?这要根据应用场景分析:

  1. 如果是INSERT数据库的操作,给消息一个唯一索引,如果重复消费就会主键冲突,可以防止脏数据;

  2. 如果是RedisSET操作,那就不用处理,Redis本身就具备幂等性;

    如果上述情况都不适用,那么就请你想一下,如何保证在一个分布式系统中,某一个请求或某一份数据只使用一次?
    答案当然是分布式锁ZooKeeper分布式锁效率不高,而且如果是非KafkaMQ集群,还需要新引入ZK,所以我们应该选择Redis分布式锁

  3. 给消息分配一个全局唯一序列作为Redis的key,然后SETNX key加锁,每次消费开始前,都去获取对应的锁

RocketMQ为了追求高性能,就没有实现消息的“每个消息只被处理一次”,而是要求业务通过去重保证消息的幂等性

如何防范消息丢失?ACK & MQ持久化 & 手动确认消息

1.防范生产者消息丢失

  1. 事务模式(transaction):生产者开启事务发送消息,如果发送失败事务就会回滚。

    事务模式吞吐量低,一般不常用。
    但是如果生产者和消费者之间的数据需要同步,有严格的一致性要求,则必须用到此模式保证在同一个事务中执行,比如订单不能付了款但看不到商品。

  2. 确认模式(confirm) :给所有消息分配一个唯一的ID,消息被投递到正确的MQ后会给生产者发送一个ACK消息,如果没有成功则发送一个NACK,要求生产者重新投递

1
2
3
4
5
6
7
8
9
10
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleNack(long deliveryTag, boolean multiple) throws Exception {
//失败措施
}

public void handleAck(long deliveryTag, boolean multiple) throws Exception {
//成功措施
}
});

2.防范MQ消息丢失

MQ持久化,这样即使MQ挂了,重启也能恢复数据。

3.防范消费者消息丢失

发生这种情况的原因是消费者开启了自动确认消息模式,消费者会自动确认收到的消息,然后MQ会删除消息。
改为手动即可。


(五)MQ的两种规范

1.JMS(Java Message Service)

Java消息服务,允许应用组件基于J2EE平台创建、发送、接收和读取消息,它使分布式通信耦合度更低,消息服务更加可靠以及异步性。

五种JMS消息格式

  1. StreamMessage:Java原始值数据流
  2. MapMessage:K/V对
  3. TextMessage:字符串
  4. ObjectMessage:序列化Java对象
  5. BytesMessage:一个字节的数据流

两种JMS消息模型

(1) 点到点(P2P)模型:Queue


使用队列作为消息通信载体,一条消息只能被一个消费者消费,未被消费的数据在队列中被保留到过期/被消费。

(2) 发布/订阅(Pub/Sub)模型:Topic


使用主题作为消息通信载体,类似于广播模式,生产者发布一条消息,该消息通过主题传递给所有订阅的消费者。

Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public class JMSDemo{
ConnectionFactory connectionFactory; //连接工厂
Connection connection; //连接
Session session; //会话
Destination destination; //目的地(即消息发往的队列/主题)
MessageProducer producer; //生产者
MessageConsumer consumer; //消费者
Message message; //消息
boolean useTransactiono = false; //是否使用事务
try{
//1.获取连接工厂
Context ctx = new InitContext();
connectionFactory = (ConnectionFactory)ctx.lookup("ConnectionFactoryName");
//如,使用ActiveMQ时:
//connectionFactory = new ActiveMQConnectionFactory(user,passwd,
//getOptimizeBrokerUrl(broker) );

//2.使用连接工厂创建连接
connection = connectionFactory.createConnection();

//3.启动连接
connection.start();

//4.从连接创建会话(是否使用事务)
session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE);

//5.获取目的地
destination = session.createQueue("FXXKSN.QUEUE");

//6.创建Producer & Message
producer = session.createProducer(destination);
message = session.createTextMessage("Fxxk SN");

//7.创建Consumer
consumer = session.createConsumer(destination);

//8.发送/接收Message
//8-1.同步接收
message = (TextMessage) consumer.receive(1000);
//8-2.异步接收:设置监听器
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if(message != null) {
doMessageEvent(message);
}
}
});
}catch(JMSException e) {
//...
}finally{
//9.关闭资源
producer.close();
session.close();
connection.close();
}
}

2.AMQP(Advanced Message Queue Protocol)

  • 高级消息队列协议,是二进制应用层协议,兼容JMS。

    AMQP没有明确指明类似JMS中的P2PPub/Sub模型,但是通过ExchangeQueue之间的路由规则可以很容易模拟出这些模型,所以说AMQP兼容JMS

  • 基于此协议的客户端与消息中间件可传递消息,不受客户端/中间件产品、不同开发语言等条件的限制

AMQP中消息中间件领域的通用概念

  1. Broker接收、分发消息的服务器,提供“存储-转发”功能

  2. ConnectionProducer/ConsumerBroker之间的TCP连接

    如果每一次访问Broker都建立一次Connection,那么消息量大时建立TCP连接的开销非常大,效率也很低,所以AMQP提出了通道Channel的概念。

  3. ChannelChannel内部建立的逻辑连接,如果应用程序支持多线程,通常每个线程创建单独的Channel进行通信,每个Channel之间完全隔离

  4. MessageAMQP中的消息在结构上也包含HeaderBody,其中Header是由生产者添加的持久化标志、接收队列、优先级等各种属性的集合,而Body则是真正需要传输的领域数据

AMQP消息通信处理链

  1. 交换器Exchange接收应用程序的消息,并根据一定的规则(即绑定,Binding)路由到消息队列
  2. 消息队列Queue存储消息,直到消息被消费
  3. 绑定Binding定义了交换器和消息队列之间的关联,即路由规则

参考RabbitMQ架构


两种规范的比较

JMS不跨语言、不跨平台,只支持Java语言;
AMQP跨语言、跨平台。
JMS支持5种不同的消息类型;
AMQP仅仅支持二进制。
JMS支持P2P和Pub/Sub两种消息模型;
AMQP支持direct exchange/ fanout exchange/ topic exchange/ headers exchange/ system exchange五种消息模型(后4种与JMS的Pub/Sub没有本质区别,只是在路由机制上做了更详细的划分)。


(六)常见MQ 及其高可用性(HA)

1.ActiveMQ

开发语言:Java
单机吞吐量:万级
时效性:ms
可用性:高(主从架构)
消息丢失:可能性低
消息推拉模式:Push/Pull
特性:文档多、支持各种协议


2.RabbitMQ(数据量不大时选择)

《RabbitMQ》

规范:AMQP
开发语言:Erlang
单机吞吐量:万级
时效性:us
可用性:高(主从架构)
消息丢失:可能性低
消息推拉模式:Push/Pull
特性:并发能力极强、延时非常低


3.RocketMQ(阿里开源,数据量大时选择)

《RocketMQ》

规范:JMS
开发语言:Java
单机吞吐量:十万级
时效性:ms
可用性:非常高(分布式架构),一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用
消息丢失:理论上不会
消息推拉模式:Push/Pull
特性:功能完备、扩展性佳,可以保证严格的消息顺序,以及亿级的消息堆积能力,消息可靠性比Kafka更好

RocketMQ高可用性集群

  1. ProducerName Server(就相当于Kafka集群中的ZooKeeper,主要用于服务治理)中的任意一个建立长连接,获取所有Topic的路由信息,并定时向Name Server发送心跳
  2. Producer只能将消息发送到Broker Master
  3. Consumer可以从Broker Master/Broker Slave订阅消息

消息的严格顺序性

参考https://www.jianshu.com/p/57be402365ee

首先必须明确,并不是所有消息都要保证严格的顺序消费;只有那些在业务上有严格的顺序性的消息才需要顺序消费;而且顺序消息的吞吐量对比普通消息必然不高
那么RocketMQ是如何实现消息的严格顺序性的呢?

  1. 发送时的顺序性:RocketMQ内部通过FIFO队列发送消息,生产者将消息排序后单线程发送到FIFO队列,如果是顺序消息,必须发送到特定的同一个queue,无论是否有负载均衡策略
  2. 存储时的顺序性:FIFO队列发送过来的顺序消息,在Broker中存储时肯定也保持了顺序性;
  3. 消费时的顺序性:如果是顺序消息只允许单个消费者消费,消费者在消费前要首先获取queue的互斥锁,只有持有锁的消费者才能消费该queue上的顺序消息
  4. 顺序消息不设置超时时间:如果是顺序消息,不会由于超时未被消费而返还给Broker,一旦到达消费端就会不断尝试消费,直到超过最大重试次数

    要注意,即使采用了上述的措施,也无法保证100%消息顺序消费
    试想:顺序消息m1 m2 m3在将m1发送到queue1后,queue1宕机,此时Broker选择queue2发送剩余的m2 m3m2 m3被消费后,queue1重新上线,导致m1慢于m2 m3消费。

顺序消息存在的问题
  1. 具有顺序的消息会发送到单一的一个queue上,不使用负载均衡策略,可能导致单个queue消息量很大,其他队列空闲
  2. 顺序消息只能单个queue、单个消费者、单线程处理,可能导致消息堆积
  3. 顺序消息不会超时,所以也不会重发,如果在消费端尝试消费失败,达到最大重试次数后,会直接加入Broker中的死信队列
  4. 并不能保证100%顺序消费

4.Kafka(主要用于日志收集)

《Kafka》

开发语言:Scala
单机吞吐量:十万级
时效性:ms以内
可用性:非常高(分布式架构),一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用
特性:只支持主要的MQ功能,不支持消息查询和消息回溯,但是提供超高的吞吐量,在大数据领域的实时计算和日志收集应用广;唯一的劣势是可能消息重复消费,对数据准确性造成轻微影响。
消息丢失:理论上不会
消息推拉模式:Push-Pull

生产者将消息push到broker(Kafka集群中一个实例称为一个broker),消费者主动pull

主要应用领域:日志收集

Kafka高可用性集群

Kafka-ha

Kafka通过ZooKeeper管理集群,选举Leader如果Leader宕机,与Leader数据最相近的Follower会立即晋升为Leader,其实不存在选举),以及在Consumer Group发生变化时进行rebalance

-------------本文结束感谢您的阅读-------------

本文标题:消息队列(MQ)

文章作者:DragonBaby308

发布时间:2019年07月26日 - 21:01

最后更新:2020年04月17日 - 22:31

原始链接:http://www.dragonbaby308.com/mq/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

急事可以使用右下角的DaoVoice,我绑定了微信会立即回复,否则还是推荐Valine留言喔( ఠൠఠ )ノ
0%