最近在看 RocketMQ 5.x 的消费模型时,一个问题特别典型:
消费者到底是定时去拉消息,还是一直监听?
如果只看表面现象,你会觉得很像”监听”:
- 消息一到,消费者很快就能拿到
- 延时消息到点后也会很快触发处理
但如果从实现原理看,它又不是很多人理解的那种”服务端直接主动推送到业务代码”。
这篇就把这件事讲清楚。
先说结论
RocketMQ 5.x 的 Go SDK,从消费模型上看:
本质上是拉。 效果上像监听。
原因是它不是那种”每隔几秒扫一次”的短轮询,而是:
长轮询。
也就是:
- 客户端先发起一次接收请求
- 如果暂时没有消息,服务端先不急着返回
- 请求会在服务端挂住等待
- 有消息时立刻返回
- 没消息时等到超时再返回
- 然后客户端再发下一次请求
所以它看起来像”持续监听”,但底层其实还是”客户端主动拉取”。
先认识一下这些工具
RocketMQ 是什么
RocketMQ 是消息队列,用来解耦系统之间的调用关系。
典型场景包括:
- 订单异步通知
- 任务延后执行
- 重试补偿
- 削峰填谷
- 事件驱动
gRPC 是什么
gRPC 是一种高性能 RPC 通信框架。
可以简单理解成:
- 比普通 REST 更偏内部服务调用
- 通常基于 Protobuf 编码
- 底层跑在 HTTP/2 上
RocketMQ 5.x 的 Go SDK,官方文档也明确把它归到 gRPC protocol Go SDK 这条链路里,所以它不是传统的 HTTP JSON 风格。(RocketMQ)
怎么安装
如果你是 Go 项目,安装 RocketMQ Go SDK:
go get github.com/apache/rocketmq-clients/golang/v5
生产者一次发一条,还是多条
如果你用的是 Go SDK 常规 producer 写法,那么:
一次 Send,发的是一条消息。
也就是说,业务代码通常是:
msg := &Message{
Topic: "your_topic",
Body: []byte("hello"),
}
producer.Send(ctx, msg)
如果你有 10 条消息,一般就是:
- 循环 10 次
- 每次构造一条消息
- 每次调用一次发送
所以从业务理解上说,producer 侧最稳的认知是:
一次发送,就是一条业务消息。
消费者一次拿一条,还是多条
这里要区分”单次调用”和”单条消息”。
消费者侧常见问题不是”只能一条一条收”,而是:
一次 Receive 调用,可以最多拿 N 条。
也就是说:
- 一次调用不是只能返回 1 条
- 但也不是必须凑满 N 条才返回
- 它表示”这一趟最多给你多少条”
所以更准确的说法是:
单次 Receive 可以批量返回,业务代码再逐条处理。
这里的”一次”到底是多久
这个问题非常关键。
很多人听到”单次 Receive 最多拿 N 条”,就会继续问:
这个”一次”是 1 秒?5 秒?10 秒?固定周期吗?
答案是:
不是固定调度周期。
这里的”一次”,指的是一次 Receive(...) 方法调用。
这次调用能持续多久,取决于长轮询等待时间。
你可以把它理解成:
- 调用一次
Receive - 如果现在就有消息,立刻返回
- 如果现在没消息,请求先挂住
- 最长等一段时间
- 有消息就提前返回
- 没消息就超时返回
所以它不是:
- 每 5 秒扫一次
- 每 10 秒执行一次定时任务
而是:
一次长轮询请求。
为什么看起来像一直监听
因为客户端通常不是只调一次 Receive 就结束,而是会不断循环:
- 发起一次接收请求
- 等消息 / 等超时
- 拿到消息后处理
- 再继续下一次接收
如果把这个过程连起来,效果就很像”这个消费者一直挂在那儿监听消息”。
但底层不是服务端无限制主动往你进程里塞数据,而是:
客户端一直在发起下一次长轮询请求。
不是定时扫,而是长轮询
这里要和短轮询对比一下。
短轮询
短轮询更像这样:
消费者:有消息吗?
Broker:没有。
过 1 秒
消费者:有消息吗?
Broker:没有。
过 1 秒
消费者:有消息吗?
Broker:有。
问题在于:
- 空请求太多
- 延迟不稳定
- 请求浪费严重
长轮询
长轮询更像这样:
消费者:有消息吗?我先等着。
Broker:现在没有,你这个请求先挂住等一会儿。
……
生产者发来一条消息
Broker:有了,返回给你。
消费者:收到。
这就是为什么它既保持了”客户端拉取”的模型,又能呈现出”接近实时监听”的效果。
长轮询的底层到底是什么技术
很多人一听”轮询”,就以为是那种普通 HTTP 接口不断请求。
RocketMQ 5.x Go SDK 不是这种模式。
它底层更接近:
gRPC + Protobuf + HTTP/2
拆开理解就是:
1)你的业务代码调用 SDK
比如:
consumer.Receive(ctx, maxMessageNum, invisibleDuration)
2)SDK 发起一个 gRPC 请求
这个请求不是普通浏览器那种 REST JSON 调用,而是 RPC 调用。
3)gRPC 跑在 HTTP/2 长连接上
这里很多人容易误会:
“HTTP 不就是短连接吗?”
这其实是个老印象。
更准确地说:
- HTTP 是应用层协议
- 长连接 / 短连接说的是底下的 TCP 连接怎么复用
到了 HTTP/2 时代,连接本来就很适合长期复用。
所以 RocketMQ 5.x 的 Go SDK,并不是每次都新建一个 TCP 连接再断开,而是在已经建立好的连接上,复用 gRPC 调用。
4)没有消息时,服务端先把这次调用挂起
这就是长轮询的核心。
- 有消息就立即返回
- 没消息就先等着
- 等到超时再返回空结果
所以本质上:
不是服务端直接推送到你业务函数。 也不是客户端每秒固定扫一次。 而是一个被服务端短暂挂起的接收调用。
延时消息为什么到点后会很快被消费
这个问题也特别常见。
很多人会想:
“如果是延时消息,那是不是到了时间后,还得等下一次消费者轮询才能拿到?”
答案是:
通常不会傻等很久。
原因在于:
- 延时消息发上去后,Broker 会先持有
- 在到达投递时间之前,这条消息对消费者不可见
- 一旦到期,消息变成可投递状态
- 如果此时消费者正好有一个长轮询请求挂着,Broker 很快就会把消息返回给它
所以时序大概像这样:
10:00 producer 发送一条"10:05 才投递"的消息
10:00~10:04 消费者拿不到,因为还没到时间
10:05 消息变为可投递
10:05+ 长轮询中的消费者收到返回
10:05++ 开始处理业务
因此业务上看起来就像:
消息一到时间,消费者几乎立刻就接到了。
消费者拿到多条消息后怎么处理
虽然一次 Receive 可能拿到多条,但实际业务里一般还是:
- 逐条处理
- 逐条确认
- 不建议一把全算成功
为什么?
因为批量拿到不等于批量业务成功。
比如一次拿到 10 条:
- 第 1 条成功
- 第 2 条成功
- 第 3 条失败
这时如果你做得太粗暴,就可能把失败消息也一起当成成功处理掉。
所以工程上更稳的思路仍然是:
批量接收,逐条处理。
这个模型适合什么场景
RocketMQ 这种消费模型,特别适合下面这些场景:
- 延时任务触发
- 订单超时关闭
- 异步状态通知
- 重试任务补偿
- 低延迟消费,但又不想服务端硬推
它兼顾了两点:
- 客户端仍然掌握消费节奏
- 消息到达后又足够接近实时
这也是为什么它在工程上很常见。
最后总结
把这件事压缩成几句话,就是:
- RocketMQ 消费底层本质是拉,不是裸推
- 但它不是短轮询定时扫,而是长轮询
- 长轮询的底层通信不是普通 REST,而是 gRPC over HTTP/2
- 没消息时,请求会先挂住等待
- 有消息时,Broker 会尽快返回给消费者
- 延时消息在到期前不可见,到期后会很快进入消费流程
所以如果你以前一直在纠结:
- 到底是监听还是轮询
- 为什么看起来像实时
- HTTP 为什么又能长连接
现在可以统一成一个理解:
它是运行在 HTTP/2 长连接之上的 gRPC 长轮询消费模型。
评论 / 0
共 0 条你可能是第一个留下评论的人