AMQP 0.9.1 协议模型
AMQP 协议是什么
AMQP(高级消息队列协议)是一个网络协议。它支持符合要求的客户端应用(application)和消息中间件代理(messaging middleware broker)之间进行通信。
消息代理(message brokers)从发布者(publishers)亦称生产者(producers)那儿接收消息,并根据既定的路由规则把接收到的消息发送给处理消息的消费者(consumers)。
由于AMQP是一个网络协议,所以这个过程中的发布者,消费者,消息代理 可以存在于不同的设备上。
AMQP 0.9.1 的工作过程如下图:消息(message)被发布者(publisher)发送给交换机(exchange),交换机常常被比喻成邮局或者邮箱。然后交换机将收到的消息根据路由规则分发给绑定的队列(queue)。最后AMQP代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。

发布者(publisher)发布消息时可以给消息指定各种消息属性(message meta-data)。有些属性有可能会被消息代理(brokers)使用,然而其他的属性则是完全不透明的,例如消息数据,消息代理完全不关系消息数据是什么。
从安全角度考虑,网络是不可靠的,接收消息的应用也有可能在处理消息的时候失败。基于此原因,AMQP模块包含了一个消息确认(message acknowledgements)的概念:当一个消息从队列中投递给消费者后(consumer),消费者会通知一下消息代理(broker),这个可以是自动的也可以由处理消息的应用的开发者执行。当“消息确认”被启用的时候,消息代理不会完全将消息从队列中删除,直到它收到来自消费者的确认回执(acknowledgement)。
在某些情况下,例如当一个消息无法被成功路由时,消息或许会被返回给发布者并被丢弃。或者,如果消息代理执行了延期操作,消息会被放入一个所谓的死信队列中。此时,消息发布者可以选择某些参数来处理这些特殊情况。
队列,交换机和绑定统称为AMQP实体(AMQP entities)。
交换机和交换机类型
交换机是用来发送消息的AMQP实体。交换机拿到一个消息之后将它路由给一个或零个队列。它使用哪种路由算法是由交换机类型和被称作绑定(bindings)的规则所决定的。AMQP 0.9.1 的代理提供了四种交换机
| Name(交换机类型) | Default pre-declared names(预声明的默认名称) |
|---|---|
| Direct exchange(直连交换机) | (Empty string) and amq.direct |
| Fanout exchange(扇型交换机) | amq.fanout |
| Topic exchange(主题交换机) | amq.topic |
| Headers exchange(头交换机) | amq.match (and amq.headers in RabbitMQ) |
交换机可以有两个状态:持久(durable)、暂存(transient)。持久化的交换机会在消息代理(broker)重启后依旧存在,而暂存的交换机则不会(它们需要在代理再次上线后重新被声明)。
默认交换机
默认交换机(default exchange)实际上是一个由消息代理预先声明好的没有名字(名字为空字符串)的直连交换机(direct exchange)。它有一个特殊的属性使得它对于简单应用特别有用处:那就是每个新建队列(queue)如果没有指定交换机,将会自动绑定到默认交换机上,绑定的路由键(routing key)名称与队列名称相同。
举个栗子:当你声明了一个名为”search-indexing-online”的队列,AMQP代理会自动将其绑定到默认交换机上,绑定(binding)的路由键名称也是为”search-indexing-online”。因此,当携带着名为”search-indexing-online”的路由键的消息被发送到默认交换机的时候,此消息会被默认交换机路由至名为”search-indexing-online”的队列中。
直连交换机
直连型交换机(direct exchange)是根据消息携带的路由键(routing key)将消息投递给对应队列的。直连交换机用来处理消息的单播路由(unicast routing)(尽管它也可以处理多播路由)。下边介绍它是如何工作的:
- 将一个队列绑定到某个交换机上,同时赋予该绑定一个路由键(routing key)
- 当一个携带着路由键为
R的消息被发送给直连交换机时,交换机会把它路由给绑定值同样为R的队列。
直连交换机经常用来循环分发任务给多个工作者(workers)。当这样做的时候,我们需要明白一点,在AMQP 0.9.1中,如果有多个消费者绑定在某个路由键上,消息代理将在多个消费者之间选择一个投递。
直连型交换机图例:
扇型交换机
扇型交换机(funout exchange)将消息路由给绑定到它身上的所有队列,而不理会绑定的路由键。如果N个队列绑定到某个扇型交换机上,当有消息发送给此扇型交换机时,交换机会将消息的拷贝分别发送给这所有的N个队列。扇型用来交换机处理消息的广播路由(broadcast routing)。
因为扇型交换机投递消息的拷贝到所有绑定到它的队列,所以他的应用案例都极其相似:
- 大规模多用户在线(MMO)游戏可以使用它来处理排行榜更新等全局事件
- 体育新闻网站可以用它来近乎实时地将比分更新分发给移动客户端
- 分发系统使用它来广播各种状态和配置更新
- 在群聊的时候,它被用来分发消息给参与群聊的用户。(AMQP没有内置presence的概念,因此XMPP可能会是个更好的选择)
扇型交换机图例:
主题交换机
主题交换机(topic exchanges)通过对消息的路由键和队列到交换机的绑定模式之间的匹配,将消息路由给一个或多个队列。这很像直连交换机,唯一的区别是主题交换机的路由键是个模式匹配字符串,可以匹配多个队列。
主题交换机经常用来实现各种分发/订阅模式及其变种。主题交换机通常用来实现消息的多播路由(multicast routing)。
主题交换机拥有非常广泛的用户案例。无论何时,当一个问题涉及到那些想要有针对性的选择需要接收消息的 多消费者/多应用(multiple consumers/applications) 的时候,主题交换机都可以被列入考虑范围。
头交换机
有时消息的路由操作会涉及到多个属性,此时使用消息头就比用路由键更容易表达,头交换机(headers exchange)就是为此而生的。头交换机使用多个消息属性来代替路由键建立路由规则。通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。
我们可以绑定一个队列到头交换机上,并给他们之间的绑定使用多个用于匹配的头(header)。这个案例中,消息代理得从应用开发者那儿取到更多一段信息,换句话说,它需要考虑某条消息(message)是需要部分匹配还是全部匹配。上边说的“更多一段消息”就是”x-match”参数。当”x-match”设置为“any”时,消息头的任意一个值被匹配就可以满足条件,而当”x-match”设置为“all”的时候,就需要消息头的所有值都匹配成功。
头交换机可以视为直连交换机的另一种表现形式。头交换机能够像直连交换机一样工作,不同之处在于头交换机的路由规则是建立在头属性值之上,而不是路由键。路由键必须是一个字符串,而头属性值则没有这个约束,它们甚至可以是整数或者哈希值(字典)等。
队列
队列属性
AMQP中的队列(queue)跟其他消息队列或任务队列中的队列是很相似的:它们存储着即将被应用消费掉的消息。队列跟交换机共享某些属性,但是队列也有一些另外的属性。
- Name
- Durable(消息代理重启后,队列依旧存在)
- Exclusive(只被一个连接(connection)使用,而且当连接关闭后队列即被删除)
- Auto-delete(当最后一个消费者退订后即被删除)
- Arguments(一些消息代理用他来完成类似与TTL的某些额外功能)
队列在声明(declare)后才能被使用。如果一个队列尚不存在,声明一个队列会创建它。如果声明的队列已经存在,并且属性完全相同,那么此次声明不会对原有队列产生任何影响。如果声明中的属性与已存在队列的属性有差异,那么一个错误代码为406的通道级异常就会被抛出。
持久化队列(Durable queues)会被存储在磁盘上,当消息代理(broker)重启的时候,它依旧存在。没有被持久化的队列称作暂存队列(Transient queues)。
持久化的队列并不会使得路由到它的消息也具有持久性。倘若消息代理挂掉了,重新启动,那么在重启的过程中持久化队列会被重新声明。
无论怎样,如果要保证未被消费的消息持久化,必须进行单独设置,只有经过持久化的消息才能被重新恢复。
Rabbit Mq 中需要在发送消息时,设置消息的属性以开启消息持久化。
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
绑定交换机
绑定(Binding)是交换机(exchange)将消息(message)路由给队列(queue)所需遵循的规则。如果要指示交换机“E”将消息路由给队列“Q”,那么“Q”就需要与“E”进行绑定。绑定操作需要定义一个可选的路由键(routing key)属性给某些类型的交换机。路由键的意义在于从发送给交换机的众多消息中选择出某些消息,将其路由给绑定的队列。
拥有了交换机这个中间层,很多由发布者直接到队列难以实现的路由方案能够得以实现,并且避免了应用开发者的许多重复劳动,达到了解耦的效果。
如果AMQP的消息无法路由到队列(例如,发送到的交换机没有绑定队列),消息会被就地销毁或者返还给发布者,如果应用没能编写处理程序接受退还消息,那么消息将被无情的抛弃。
死信队列
死信队列也是一个普通的队列,也需要绑定在交换机中。
对于一个队列而言,每个队列可以设置一些限制,例如 消息最多重投次数、队列最大长度、消息对大存活时间 等,当消息达到限制时,消息代理可以简单的丢弃他们,或者将他们投递到预先设置的死信队列中。
在 Java 中,可以这样配置队列,并声明一些限制条件以及死信队列:
@Bean(name = "queue")
public Queue getOrderAddQueue() {
Map<String, Object> args = new HashMap<>(5);
// 绑定死信交换机和路由
args.put("x-dead-letter-exchange",DEAD_EXCHANGE);
args.put("x-dead-letter-routing-key",ODEAD_ROUTEING_KEY);
//队列的长度为 100
args.put("x-max-length",100);
//队列中的消息超过 3s 后过期
args.put("x-message-ttl", 3000L);
return new Queue(ORDER_ADD_QUEUE, true, false, false, args);
}
死信队列通常用于实现一些兜底方案,如系统负载过高,队列已满,通知客户未能完成某项工作等;也可用于定时事件,例如订单超时未支付则自动取消,消费者可以根据订单是否以支付来决定是否要拒绝或确认消息(或者干脆不设置消费者),当消息在队列中超时时,他被投递至死信队列中处理。
消费者
消息如果只是存储在队列里是没有任何用处的。被应用消费掉,消息的价值才能够体现。在AMQP 0-9-1 模型中,有两种途径可以达到此目的:
- 将消息投递给应用 (“push API”)
- 应用根据需要主动获取消息 (“pull API”)
使用 push API,应用(application)需要明确表示出它在某个特定队列里所感兴趣的,想要消费的消息。即消费者要指定一个队列的名字,表示订阅了这个队列。
随后,消息代理会与消费者打开一条 TCP 长连接,用以推送消息,消息代理会不断对消费者探活以确保消费者存活。大多数 AMQP 的实现都是采用 push 模型,但同时消息代理保留了 pull api 供外部调用。
每个消费者(订阅者)都有一个叫做消费者标签的标识符。它可以被用来退订消息。消费者标签实际上是一个字符串。
消息确认
消费者应用(Consumer applications) - 用来接受和处理消息的应用 - 在处理消息的时候偶尔会失败或者有时会直接崩溃掉。而且网络原因也有可能引起各种问题。这就给我们出了个难题,AMQP代理在什么时候删除消息才是正确的?AMQP 0.9.1 规范给我们两种建议:
- 当消息代理(broker)将消息发送给应用后立即删除。(使用AMQP方法:basic.deliver或basic.get-ok)
- 待应用(application)发送一个确认回执(acknowledgement)后再删除消息。(使用AMQP方法:basic.ack)
前者被称作自动确认模式(automatic acknowledgement model),后者被称作显式确认模式(explicit acknowledgement model)。
在显式模式下,由消费者应用来选择什么时候发送确认回执(acknowledgement)。在此期间,消息代理不会重复投递消息,但是如果消息代理检测到与消费者的连接断开,那么消息代理会将消息重新投递给另一个消费者。
拒绝消息
当一个消费者接收到某条消息后,处理过程有可能成功,有可能失败。应用可以向消息代理表明,本条消息由于“拒绝消息(Rejecting Messages)”的原因处理失败了。
当拒绝某条消息时,应用可以通过拒绝参数告诉消息代理如何处理这条消息——销毁它或者重新放入队列。当此队列只有一个消费者时,如果处理不当,可能会出现无限次投递、确认、重新投递的死循环。
在AMQP中,basic.reject 方法用来执行拒绝消息的操作。
但 basic.reject 有个限制:一次只能拒绝一个消息,无法做到批量拒绝,在 RabbitMq 中,一条消息可能会因为网络问题被多次投递到同一个消费者中,可以使用 basic.nack 方式来批量拒绝。
预取消息
在多个消费者共享一个队列的案例中,明确指定在收到下一个确认回执前每个消费者一次可以接受多少条消息是非常有用的。这可以在试图批量发布消息的时候起到简单的负载均衡和提高消息吞吐量的作用。
注意,RabbitMQ只支持通道级的预取计数,而不是连接级的或者基于大小的预取。
通过 basic.qos 告知消息代理自己可以接受多少条消息。
消息属性和有效载荷(消息主体)
AMQP模型中的消息(Message)对象是带有属性(Attributes)的。有些属性及其常见,以至于AMQP 0-9-1 明确的定义了它们,并且应用开发者们无需费心思思考这些属性名字所代表的具体含义。例如:
- Content type(内容类型)
- Content encoding(内容编码)
- Routing key(路由键)
- Delivery mode (persistent or not)
投递模式(持久化 或 非持久化) - Message priority(消息优先权)
- Message publishing timestamp(消息发布的时间戳)
- Expiration period(消息有效期)
- Publisher application id(发布应用的ID)
有些属性是被AMQP代理所使用的,但是大多数是开放给接收它们的应用解释器用的,有些属性是可选的也被称作消息头(headers),他们跟HTTP协议的X-Headers很相似,消息属性需要在消息被发布的时候定义。
AMQP的消息除属性外,也含有一个有效载荷 - Payload(消息实际携带的数据),它被AMQP代理当作不透明的字节数组来对待。
消息代理不会检查或者修改有效载荷,消息可以只包含属性而不携带有效载荷。
消息能够以持久化的方式发布,AMQP代理会将此消息存储在磁盘上,如果服务器重启,消息代理会保证收到的持久化消息不会丢失。
简单地将消息发送给一个持久化的交换机或者路由给一个持久化的队列,并不会使得此消息具有持久化性质:它完全取决与消息本身的持久模式(persistence mode)。将消息以持久化方式发布时,会对性能造成一定的影响(就像数据库操作一样,健壮性的存在必定造成一些性能牺牲)。
连接与通道
有些应用需要与AMQP代理建立多个 TCP 连接,例如主机上存在多个消费者。无论怎样,同时开启多个TCP连接都是不合适的,因为这样做会消耗掉过多的系统资源并且使得防火墙的配置更加困难。
AMQP 0.9.1提供了通道(channels)来处理多连接,多个通道可以共享一个 TCP 连接,类似于 HTTP2.0 多路复用原理。
在 RabbitMq Java 客户端中,当收到消息代理的推送时,客户端会从线程池中获取一个线程,调用消费者逻辑,客户端会为每个线程分配不同的通道。
一个特定通道上的通讯与其他通道上的通讯是完全隔离的,因此每个 AMQP 方法都需要携带一个通道号,这样客户端就可以指定此方法是为哪个通道准备的。
在 RabbitMq 中,总是基于通道去进行操作。
同步发送
AMQP 中并没有规定发送者如何同步发送,这在某些时候很有用处,例如对于一些事务而言,发送者必须要确认消息成功的被投递到消息代理中并被持久化。
Rabbit Mq 中支持两种同步发送模型。
Confirm 确认机制
Confirm 确认机制是一种异步确认模型,一旦发送者将信道设置为 Confirm 模型,任何在此信道上发送的消息都会被指派一个唯一的 ID,一旦消息到达消息代理,如果消息属性包含了 Confirm ID,则消息代理会回送一个确认(包含消息 ID 和 Confirm ID)给发送者。
一旦发送者得到消息代理回送的结果,或是成功、或是失败、或是超时,Rabbitmq 客户端就会异步调用用户编写的 Confirm 处理程序。
@Component
public class CustomConfirmAndReturnCallback implements RabbitTemplate.ConfirmCallback{
@Override
public void confirm(CorrelationData correlationData, boolean isSendSuccess, String error) {
.........
}
}
通常,如果消息未能发送成功,处理程序会将消息重复发送,以确保消息最终能够持久化。如果达到一定阈值后仍未发送成功,这应该要引起运维人员的注意(监控、告警)。
事务模式
RabbitMq 提供了一种同步模型,被称为事务模式,在发送消息前,可以通过 select 方式在通道上开启事务,此时发送者会将 select 信息发送给消息代理告知此通道开启了事务模式,消息代理同意后方可继续事务。
当发送者提交时,消息代理检查消息是否到达或是否被持久化,如果一切正常,则同意提交,否则将会拒绝提交。
一旦任何异常发生,发送者可以调用回滚方法撤回消息,消息代理在提交前不会消费消息,而一旦消息代理收到回滚消息后,它会丢弃此条消息。
try {
channel.txSelect();
channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
channel.txCommit();
} catch (Exception e) {
e.printStackTrace();
channel.txRollback();
}
相较于 Confirm 而言,事务模式是完全同步的模型,txCommit() 方法会堵塞直至消息成功投递或发生异常,这能确保消息被成功投递了再去执行后面的逻辑;而 Confirm 模式则无法确保,因为其仍然是一种异步模型。但事务模型缺点是性能损失,通常而言,Confirm 模式是一种折衷的选择。
分布式事务
MQ 可以用来处理一些分布式事务问题,核心思想是:开弓没有回头箭。
RocketMQ 实现了一些事务的功能,在 RocketMQ 中,存在两个概念:
- Half Message,半消息
暂时不能被 Consumer消费的消息。Producer已经把消息发送到 Broker端,但是此消息的状态被标记为不能投递,处于这种状态下的消息称为半消息。事实上,该状态下的消息会被放在一个叫做 RMQ_SYS_TRANS_HALF_TOPIC的主题下。
当 Producer端对它二次确认后,也就是 Commit之后,Consumer端才可以消费到;那么如果是Rollback,该消息则会被删除,永远不会被消费到。
- 事务状态回查
我们想,可能会因为网络原因、应用问题等,导致Producer端一直没有对这个半消息进行确认,那么这时候 Broker服务器会定时扫描这些半消息,主动找Producer端查询该消息的状态,这个也是半消息存在的意义。
例如如果上游服务提交了,但是未能来得及发送消息,就会存在不一致的问题。有了半消息之后,就相当于有了一个记录,当然通常上游服务自身也需要维护一个本地事务表,用以回答消息代理自身是否提交或未提交某个事务。
例如,一个经典的创建订单、增加积分分布式事务问题便可以通过消息队列实现,下图是整体的流程:
如果上游服务失败,则会发送回滚半消息到消息代理中,则消息代理会丢弃消息;如果上游服务成功,则发送提交消息,消息代理会投递事务消息到下游服务中;如果上游服务异常崩溃了,消息代理会回查事务状态;如果下游服务异常崩溃,消息队列能保证消息的持久化,并保证消息被消费者处理。
如果存在多个下游服务,则可以使用 topic 方式将事务消息投递到多个队列中,这可能需要消息队列本身的支持。
如果下游服务处理失败呢?例如扣减库存失败(库存为零),无论重试多少次都无法成功,在这个例子中可以让扣减库存成为上游服务,但在某些情况下,MQ 事务可能无能为力,毕竟:开弓没有回头🗡。
这种情况可能需要更强的 2PC 保证,在 2PC 事务中,事务协调器会发送 Prepare 消息给各个处理者,只有各个事务站点答复 OK 事务才能够被提交,例如如果库存为 0 了,这个事务处理者就会答复失败,从而导致整个事务回滚,如果事务站点答复 OK,则其可能需要锁或者预减库存来保证自己有能力扣减库存。
在 RabbitMQ 中没有对分布式事务的支持,但可以通过其本身提供的同步发送机制来保障事务消息发送到消息代理,对于发送者本地事务提交但未发送消息而崩溃的情况,仍然需要一个本地事务表来查询状态,但这可能需要由发送者自身主动检查(例如后台线程)。
分区日志
使用日志进行消息存储
在 AMQP 模型中,消息被赋予一个唯一的 ID,由消息代理投递并保障消息能正确消费,这会导致一些问题:重复消费、顺序消费、消息丢失……也许还有其他的方法!
日志只是磁盘上简单的仅追加记录序列,在 Raft 算法中,我们研究过基于日志来实现主从复制、Leader 选举。
事实上,同样的结构可以⽤于实现消息代理:生产者通过将消息追加到日志末尾来发送消息,⽽消费者通过依次取日志来接收消息。如果消费者读到日志末尾,则会等待新消息追加的通知。
为了扩展到比单个磁盘所能提供的更⾼吞吐量,可以对日志进⾏分区。不同种类或是完全独立的日志可以分成不同的分区,不同的分区可以托管在不同的机器上。
在每个分区内,代理为每个消息分配⼀个单调递增的序列号或偏移量(offset)。这种序列号是有意义的,因为分区是仅追加写⼊的,所以分区内的消息是完全有序的,没有跨不同分区的顺序保证。

Kafak 便是使用了这种分区日志式的消息。为了实现负载均衡,可以将某个分区完全交由一组消费组节点处理,消费组节点内部使用负载均衡处理。
与 AMQP 模型的对比
- AMQP 模型的中心在于消息代理;而分区日志的中心在于消费群组,分区日志模型总是主要是由消费组顺序的读取分区中的日志,如果读到末尾,则会等待新消息的追加通知,优点是吞吐量较高,但可能实时性较差。
- AMQP 模型中,对于同一个队列中的消息,往往需要等待上一条消息确认后再次发送;对于分区日志而言,由于消息在分区内是顺序的,消费组可以一次性读取所有消息,并在本地顺序解析,对于海量数据场景下,吞吐量大幅度提升。
- AMQP 模型中,需要跟踪每条消息的确认信息来判断消息是否被消费;对于分区日志而言,消息代理只需要得知消费组的读取偏移,那么这个偏移之前的日志肯定都被读取了,减少了额外的开销。
- AMQP 模型中消息是否持久化取决于一系列配置,如果消息被消费了,那么消息就会被删除;而在分区日志而言,日志总是被持久化到磁盘,多个消费组可以重复读取,天然支持消息扇出、消息回溯,但缺点是会占用较大的磁盘空间。
- AMQP 模型中,可以开启自动确认来实现完全异步的消息投递;而在分区日志中,消息总是要被顺序处理的。
- AMQP 模型中可以支持一些事务机制来保障消息被顺利投递;而在分区日志中,并没有这种保障,消息代理不会去确认消息是否被投递,给用户的保障往往是一个很弱的 最终一致性。
AMQP 常见问题总结
消息队列优点
解耦。发送者与消费者解耦,升级方便,伸缩性、扩展性都比较强。
可靠性保证。消息代理能够保证消息的持久化。
异步处理。可以立即返回,提高响应速度。
削峰。利用最大队列长度结合死信队列削峰,避免流量过多。
定时调度。利用私信队列结合消息存活时间,实现一个不太精确的定时调度,例如超时订单自动取消。
如何保障顺序消费
如果有三种消息 A、B、C,他们之间需要保障顺序消费,如果将他们绑定在不同的队列上,由于队列分发消息速率、网络延时、消费者处理速率等情况,将无法保证顺序消费。
最好的办法是将三种消息绑定在一个队列中,并开启手动确认,消费者根据消息类型进行处理,在同一个队列中,只有当上一条消息被确认后,下一条消息才能够发送。但这缺点是耦合度变高了,可能会使系统吞吐量下降。
如何解决重复消费问题
重复消费产生原因有两点:
- 消费者多次投递。例如因为网络超时,在 Confirm 回调中重复投递。
- 消息代理多次投递。例如如果开启了手动确认,并且消费者与消息代理的连接断开,则消息代理则会将消息投递到其他消费者。
- 消费者拒绝处理时选择重新入队。这是人为问题,不在我们的考虑之中。
MQ 本身并未提供一种解决办法,通常的解决办法是利用 Redis 中心化存储,在消费前,可以原子地的将消息 ID 存储起来,例如可以通过位图等方式。由于 Redis 是单线程处理的,如果消息存储失败或已存在消息(putIfAbsent),则可以响应 ack 告知消息代理不要再继续投递了。
分布式事务
参考上述分布式事务即可。