深圳幻海软件技术有限公司 欢迎您!

消息中间件深度系列|异构消息队列的海量数据流转Connect架构解析

2023-02-28

一、背景   5G时代,万物互联,越来越多的企业期望搭建数据分析业务中台,利用大数据技术、通过全局规划来治理企业的数据资产。而在业务系统,或者大数据系统中异构数据源之间的数据同步是十分有必要的,传统的点对点的数据同步工具,应对越来越多的异构数据源同步会产生N*N的问题,

一、背景     

5G时代,万物互联,越来越多的企业期望搭建数据分析业务中台,利用大数据技术、通过全局规划来治理企业的数据资产。而在业务系统,或者大数据系统中异构数据源之间的数据同步是十分有必要的,传统的点对点的数据同步工具,应对越来越多的异构数据源同步会产生N*N的问题,付出的开发成本和维护成本都是非常高的。因此,移动云消息队列MQTT团队积极打通数据孤岛,基于开源RocketMQ Connect组件推出全新的MQTT-RocketMQ Connect架构,助力海量物联网消息自由流转,为万物互联保驾护航。

二、MQTT-RocketMQ Connect介绍   

首先,先简单介绍一下MQTT-RocketMQ Connect架构的基石—RocketMQ Connect,它是RocketMQ数据集成的重要组件,可将各种系统中的数据通过高效、可靠、流的方式,流入流出到RocketMQ,可以实现各种异构数据系统的连接,构建数据管道、ETL、CDC、数据湖等能力。

从架构上看,RocketMQ Connect就是借助RocketMQ从其他异构系统获取数据且以消息的方式发送到RocketMQ作为中转,然后从RocketMQ消费消息并写入到其他系统。

图1 RocketMQ Connect 总览

MQTT-RocketMQ Connect在开源的Apache RocketMQ Connect组件基础之上,根据移动云消息队列MQTT的数据模型、业务场景和流转规则等特点,做了深度的架构优化与设计,实现了移动云消息队列RocketMQ与MQTT之间的消息流转与规则管理。它主要由Connector、Runtime、Worker和Task组成。

Connector

包含 Source Connector和 Sink Connector两类,其中,

1.Source Connector:负责从源数据中获取数据并将其发送到 RocketMQ。

2.Sink Connector:负责使用来自 RocketMQ的消息并将数据写入目标存储。

Runtime

Runtime是Source、Sink Connector的运行时环境,负责加载Connector,提供RESTful接口,启动Connector任务,集群节点之间服务发现、配置同步、消费进度保存、故障转移、负载均衡等能力。

Worker

一个Worker进程代表一个Runtime 运行时环境进程,多个Worker进程组成了一个集群,支持更多的Connector 和 Task的并行运行工作。

Task

Task是执行具体的数据解析和转储的任务,其中,

1.SourceTask:从源数据系统中,执行完成数据解析工作,通过poll()接口暴露给Runtime。

2.SinkTask:Runtime从内存获取数据并通过put()接口方法解析至目标数据源系统中。

3.DirectTask:同时包含SourceTask和SinkTask,两者直接交互,不再经过Runtime。

 三、MQTT-RocketMQ Connect架构设计  

消息队列MQTT以RocketMQ作为消息的存储层,消息数据会在RocketMQ中保存一份。因此,可以将消息队列MQTT的存储层RocketMQ作为源数据端。采用标准的Connect架构要实现异构数据源的数据流转,Source Task 和Sink Task必须一一对应,两者通过中间的RocketMQ关联。按照现在的架构两端都是RocketMQ,使用一个特殊的Direct Task,让消息不再经过中间的RocketMQ,而是直接流入到目标RocketMQ中,反之亦然。通过优化架构可以有效降低时延,提升速率。

图2 移动云消息队列MQTT消息存储架构

在Runtime进程组成的集群中,将源消息队列的海量数据,通过端到端Connector和Task以数据解析和转储的方式异步复制至目标集群,完成异构消息队列的数据流转。其中Runtime集群中每个Worker节点启动Connector相关的配置信息,也会像集群信息一样在集群中每个节点全量同步,同时会持久化到每个节点。集群中如果有某个Worker节点挂掉,集群信息会发生变化,当每个节点检查到集群信息发生了变化就会触发负载均衡,对集群中运行的Connector和Task重新分配,从而保证故障节点的任务分配到其它节点处理,保证高可用。

图3  MQTT-RocketMQ Connect架构图

了解了MQTT-RocketMQ Connect的架构,下面看一下如何自己实现一个简单的MQTT和RocketMQ之间的消息流转。

通过前面的介绍,应该清楚,需要实现两个Connector和Task,一个是从作为MQTT存储层的RocketMQ到目标RocketMQ的Connector和Task,第二个是从RocketMQ读数据写入到目标MQTT的Connector和Task。

图4  MQTT消息流转到RocketMQ流程图

以消息从MQTT流转到RocketMQ为例,主要由三组接口组成:SourceConnector、SourceTask和SinkTask。

图5  Connector和Task接口概览

1.SourceConnector负责connector生命周期的管理、创建对应的Task并将接收到的Connector配置信息拆分出每个task的配置信息。

2.SourceTask负责拉取消息,并对消费者的生命周期进行管理。用户还可以根据实际需要添加消息封装、转存等方法。

3.SinkTask负责接收SourceTask推送的消息,并对生产者的生命周期进行管理。同样的,用户还可以根据实际需要添加消息解析,过滤等方法。

一个connector的生命周期主要分为三个阶段:启动、运行、停止

创建并启动connector

创建并启动Connector过程大致可以分为以下几个阶段:

  • 控制台创建规则阶段
  • 初始化配置阶段
  • 负载均衡阶段

图6  Connector启动阶段流程图

运行task任务

  • 在Connector 实例被启动后,Connector可以根据配置信息,对解析任务进行拆分,分配出task。这么做的目的是为了提高并行度,提升处理效率。

停止并删除connector

停止并删除Connector过程大致可以分为以下几个阶段:

  • 控制台停止规则阶段
  • 更新配置阶段
  • 负载均衡阶段

图7  Connector停止阶段流程图

四、MQTT-RocketMQ Connect高可用部署  

MQTT-RocketMQ Connect Worker支持两种运行模式,集群单机模式。

4.1/集群模式

集群模式,顾名思义,由多个Worker节点组成高可用集群。集群间的config、offset和status信息通过指定RocketMQ Topic存储,新增Worker节点也会获取到集群中的这些config、offset和status信息,并且触发负载均衡,重新分配集群中的任务,使集群达到均衡的状态。减少Woker节点或者Worker宕机也会触发负载均衡,从而保障集群中所有的任务都可以均衡的在集群中存活的节点中正常运行。

图8  MQTT-RocketMQ Connect集群模式示意图

4.2 /单机模式

单机模式,Connector任务运行在单机上,Worker本身没有高可用,任务offset信息持久化在本地。适合一些对高可用要求不高或者不需要Worker保障高可用的场景,例如部署在k8s集群中,由k8s集群保障高可用。

五、MQTT-RocketMQ Connect优秀特性

为了保证MQTT和RocketMQ之间有高速稳定的消息流转通道,MQTT-RocketMQ Connect具有许多优秀的特性:


六、总结与展望     

本文介绍了异构消息队列海量数据流转的设计与实践,基于RocketMQ Connect和移动云消息队列MQTT本身的架构特点,做了深度的架构优化与设计,实现了移动云消息队列RocketMQ与MQTT之间的消息流转与规则管理。随着万物互联的持续深入,未来消息队列MQTT团队还将基于现在的架构继续优化和创新,例如:

   1       ✦  ✦  ✦  ✦  ✦

增加对其他异构数据源(Redis、MySQL、Kafka)等组件的消息流转支持

   2       ✦  ✦  ✦  ✦  ✦

增加对集群Worker、Connector、Task状态的管理

   3       ✦  ✦  ✦  ✦  ✦

优化不支持poll方式获取消息的服务