我们为什么需要消息总线?
在消息总线上线前,马蜂窝大部分业务中的异步需求是通过 Redis 队列来实现。随着消息量增加,经常会出现消息积压、不同消息之间互相影响的问题。为解决这些问题,电商研发团队开始规划和设计消息总线。
为什么会有消息总线,而不是让业务系统直接用 PHP 或者其他语言对接 RabbitMQ,Kafka 这样的消息系统?
「消息总线和直接使用消息系列有什么实际的区别?」,这是很多研发同学一开始不太理解的地方。假如只是为了用一个性能更好的消息系统代替 Redis,确实并不需要消息总线的这个角色。
但当我们从实际业务角度出发,对公司整体技术架构和开发场景的梳理时,发现如果直接让业务系统对接消息系统,并不是一个很好的方式,并且至少会面临以下问题:
系统分散。各开发团队需要维护各自的消息服务,彼此之间相对隔离。
增加开发难度。用户需要关注具体消息所在消息服务的配置,关注不同业务的消息可能要对接不同种类的消息系统。
维护成本高。用户需要管理自己消费服务的稳定性,处理各种服务异常,保证消费的可靠性。特别对于 PHP 来说,这个成本还是比较高的。
管理难度大。没法对业务消息的创建和订阅关系进行统一管理,也不方便对业务消息中的敏感数据进行权限管理。
不易扩展。无法统一消息系统扩展功能(路由、延时、重试、消费确认等)的使用。
总体来说,直接使用消息系统可以被看成是一个面向技术的接入方式;而消息总线则期望通过隐藏部署、分组和通信等细节,实现一个面向业务的接入方式。
架构设计和技术实现
1. 架构设计
消息总线隐藏了消息发送、路由、分组、存储、消费负载、通信、高可用等一些列问题。对使用者来说,只需要在发送端调用一个 SDK 消息发送方法,在消费端提供一个 PHP 消费方法即可。
图1 马蜂窝消息总线架构设计
马蜂窝消息总线当前使用 RabbitMQ 作为消息引擎,在发送端提供了 SDK,作为消息总线的 Broker 角色,包含了消息路由分组的功能,负责消息的 Publish。
消息的订阅关系,目前是持久化在 MySQL 中,在消息发送时会根据订阅关系把消息投递到对应的业务消费者。
而在消费端,并没有直接用 PHP 去接入 RabbitMQ,而是使用 Deliver 服务集群 (Golang 服务) 来负责把 AMQP 协议转为 HTTP 协议,然后通过 PHPService 进行消费 PHP 代码的执行。
这个方案在设计时,同时考虑到了未来系统规模扩展后的消息分组,以及关键环节的可替代性。
- SDK 充当了消息服务 Broker 的角色,可以控制消息的路由、分组。未来在微服务体系中可以保持整体架构不变,只采用其他方案实现 Broker。
- 可以根据业务场景对接不同的消息引擎,比如对业务一致性要求高的业务使用 RabbitMQ,而对并发要求较高的可以使用 Kafka。对业务来说是无感知的。
- Deliver 和 Application Service 之间可扩展更多的通信协议,支持应用更灵活的消费方式,包括支持未来在微服务中的消费服务。
2. 技术实现
1). 减少流转复杂度
为了保证消息在消息总线内各环节流转时减少复杂度,能够被统一处理,消息体被设计为统一的结构。主要分为以下 3 个部分:
图2 消息体的定义
- Parameter——参数部分包含消息 ID,来源,时间等参数。
- Conetent——消息内容,在 PHP 中使用者可以把消费方法的输入参数放入 Content 中。
- Receiver——标注了消息的接收者 (PHP 中为消费者的方法)。
2). 在线服务异步
点对点模式是业务中常用的一种异步模式,
图3 点对点消息模式
业务应用把不需要在同步请求中执行的逻辑放到异步去执行。发送消息的业务需要明确处理消息的接收者 (消费的 PHP 方法)。消息在发送时需要明确指定唯一的一个 Receiver。
当前通过消息总线 SDK 提供的 invoke 方法可以指定消费的应用方法。
3). 解耦
消息的发布订阅模式,是一个标准的业务解耦模式。
图4 发布订阅(广播)
App 1 的应用只负责发出消息,至于什么业务需要关注,下游业务应用自己订阅该消息就可以。很大程度上减少了上游业务和下游业务的耦合程度和开发调试成本。
消息总线使用 DB 来进行消息订阅关系的存储,上游业务的消息经过消息总线 Broker 时会根据订阅关系,裂变为 Receiver 是订阅应用的多条消息。这样的消息裂变方式使消息后续在消息总线流转时目标明确,在进行消费负载,消费确认,失败重试等场景时可以按照 Receiver 进行隔离。
同样调用方可以使用 SDK 提供的 pub 方法进行消息的发送,订阅方通过消息管理系统进行消息订阅的申请。
4). 防消息干扰
很多使用消息总线的同学比较关心不同消息之间是否会相互干扰,比如由于某个消息短时间内大量涌入是否会造成其他消息被阻塞。
通过前面架构的介绍,可以看到所有的消息经过 Broker 时可以进行路由、分组。消息总线未来会根据业务和消息量来做一些物理隔离,保障业务之间不会相互影响。
而在一个分组内,消息总线也有一些机制保障分组内的不同消息不会相互影响。
图5 防消息干扰机制
消息经过 Broker 默认会进入一个 Online Queue 的队列中,Deliver 集群中会有多个 Deliver 监听 Online Queue。在 Deliver 服务内,通过 Dispatcher 来控制消息总并发消费量,以及同类型消息的并发消费量。当某种类型的并发消息数量超过阈值时,就会被转发到 Offline Queue,避免消费 Worker 都被同一个类型的消息占用。而 Offline Queue 会被独立的 Deliver 服务监听进行消费,不影响 Online Queue 的消费。
5). 消费服务高可用
为了保证消费时的高可用,Deliever 群在负责进行消费协议转换之外,也做了一些策略来保证消费端的高可用。
- 熔断
在消息一段时间内失败数量超过阈值时,停止对队列的消费,避免由于服务抖动和线上故障引起的大面积消息。
- 消费失败
熔断后,Deliver 服务会对后端应用服务健康度进行监控,在服务恢复后可自动恢复消费。
- 系统失败重试
消息总线服务发生故障时,可对期间的失败消息采用重试策略进行重试,避免由于基础服务问题造成的消费失败。
- 业务失败重试
在业务应用消费时产生业务异常,可在订阅消息时指定是否进行重试。消息总线会对需要失败的消息按照一定的时间周期进行多次重试。
- Graceful 重启
Deliver 实现了 Graceful 重启和退出,保障当前正在消费的消息都处理完成后才会进程退出。
未来规划
图7 未来演进方向
1. 产品化
当前消息总线在功能上经过近一年的迭代,已经基本稳定。但在消息管理,监控,统计等环节对开发者来说还不够友好,接下来一段时间会着重优化系统的易用性。
目前已经在规划以下方向的优化改进:
开发者可以通过消息管理系统进行新增消息,订阅消息 (加入权限的审核) 等操作,代替当前手工提 issue 的方式。
开发者可以通过系统关注到自己消息的消费情况,并及时接收到消息处理异常的报警。
完善监控体系,提供更精细维度的系统监控数据。
2. 微服务
关于在微服务架构内提供消息总线服务,也已经在计划当中。包括在微服务内进行消息发送和使用某个微服务进行消息的消费。未来整个消息总线计划会往下图的架构进行演进,增加对多语言和不同架构服务的支持。适应更多的业务开发场景,提供更稳定,友好的消息总线服务。
另外对消息引擎的技术选型,未来也会考虑接入 Kafka,RocketMQ 等其他消息队列服务。根据不同业务场景的消息特性,在发布时选择进入不同的消息队列服务。比如对可靠性,数据安全性要求高的消息会进入 RabbitMQ,而对高吞吐量的消息可以进入 Kafka。但对消息的发送方和订阅方来说都可以不用关心这些细节,仍然按照统一的方式进行接入。
马蜂窝消息总线服务当前也在不断迭代中,在很多地方还有不少没有考虑到的问题。欢迎大家多提宝贵意见,您可以扫描下方二维码订阅「马蜂窝技术」更多内容。
本文作者:梁亮,马蜂窝电商研发团队技术专家。2004 年毕业于西安邮电大学,曾在新浪,开心网,阿里巴巴等公司工作。先后从事搜索,社交,视频,电商等多个方向的研发工作。于 2017 年加入马蜂窝,现负责马蜂窝电商平台服务系统开发。
【本文是51CTO专栏作者马蜂窝技术的原创文章,作者微信公众号马蜂窝技术(ID:mfwtech)】
戳这里,看该作者更多好文