深圳幻海软件技术有限公司 欢迎您!

春节活动 - 高峰值奖励发放技术方案

2023-03-01

作者|张健1.背景2022年春节活动在8款字节系APP上线,包含了红包雨、集年味卡和烟火大会等诸多玩法。红包雨、集卡开奖和烟火大会都存在高峰值突发流量。其中,红包雨活动会在10分钟内给几千万甚至上亿用户发放上亿现金奖励,且大多数请求集中在前3分钟。在项目启动时,红包雨活动作为最大的流量来源,预估的发

作者|张健

1. 背景

2022年春节活动在8款字节系 APP 上线,包含了红包雨、集年味卡和烟火大会等诸多玩法。红包雨、集卡开奖和烟火大会都存在高峰值突发流量。其中,红包雨活动会在10分钟内给几千万甚至上亿用户发放上亿现金奖励,且大多数请求集中在前3分钟。在项目启动时,红包雨活动作为最大的流量来源,预估的发红包峰值流量有180万 QPS 。

为了保证用户体验、活动效果和资金安全,红包雨系统需要保证超高的稳定性。在系统设计上不能强依赖任何外部系统,在极端情况下仅需要红包雨服务可用,用户请求即可正常处理并返回结果。奖励系统作为红包系统的下游服务,负责用户奖励的入账,需要承载最高180万 QPS 的奖励发放请求,并且在出现异常情况时保证用户体验无损,奖励可以最终入账,做到不超发不少发。

2. 技术挑战

2.1 峰值流量高

除夕当天会进行7场红包雨,从12:00起每小时进行一场,集卡开奖和烟火大会于19:30开始。当晚20:00前后,红包雨、集卡开奖和烟火大会的发奖流量将会叠加在一起,届时可能产生超过200万 QPS 的发奖流量。下游资产中台服务仅提供30万 QPS 的现金红包、40万 QPS 的优惠券入账能力。奖励系统需要削峰限流,异步入账奖励,确保下游服务不过载。

2.2 奖励种类多

除现金红包外,在集卡和烟火大会场景会发放10多种优惠券、实物奖励、头像挂件等。不同的优惠券由不同的下游系统发放,且每个系统的吞吐能力不同,甚至部分系统只能提供2000 TPS 的处理能力。奖励系统在进行削峰限流时,不同奖励种类限流的阈值需要根据下游系统吞吐能力进行个性化配置。下游系统能力有限的情况下,需要保证现金优先入账。

2.3 系统高可靠

引入消息队列进行奖励异步发放后,需要尽可能保证奖励事件的可靠投递和可靠消费,任何奖励最终都要入账,还需兼顾消息队列集群的稳定和容灾。

在内部服务出灾的情况下,或奖励事件在消息队列中堆积时,需要做到用户无感知,用户在活动钱包页可见奖励流水,随时可以正常提现。除通过消费奖励事件入账外,还需引入用户提现行为触发强制入账的能力,与此同时还要保证安全可靠,不能被黑产攻击造成资金损失。

3. 技术方案

基于春节活动峰值流量高、稳定性要求高的特点,为了保证高峰值流量下奖励系统稳定可靠,技术方案选型时选择了基于消息队列削峰、异步处理请求的总体方案。奖励发放的大概流程如下:

在奖励事件生产侧,为了尽可能降低上游接入方的开发成本,基于不同接入场景特性,由奖励系统提供奖励 SDK ,并定义简单清晰的发奖接口,供接入方选用。奖励事件的可靠投递由 SDK 内部保证。奖励事件 MQ 使用了公司内 ByteMQ 和 RocketMQ 两种消息队列,防止因单个消息队列集群宕机导致整个系统不可用。

在奖励事件消费侧,针对每一个 Topic 创建一个消费者服务,四个消费者功能完全一致。由消费者服务保证消息可靠消费和消费限速。

除激励金币外,其他奖励类型通过资产中台服务调用各个下游发放。春节活动期间,资产中台暂未支持发奖请求的削峰,需要在奖励系统前置进行。业务上,同一订单号只能发放一种奖励一次,由于资产中台和激励中台系统之间数据隔离,需要奖励系统支持单一订单号跨服务发放幂等。

3.1 奖励SDK设计

SDK 以代码“内嵌”的方式运行在接入方服务内,可以避免 RPC 方式网络传输、请求数据序列化和返回数据反序列化带来的时延和性能消耗。尽管 SDK 的整体时延和性能优于 RPC 方式,对 SDK 本身的稳定性、性能消耗和接口响应时延依然有非常高的要求。以红包雨场景为例,发奖接口需要50ms内返回,若响应时间超过50ms将会增加整个活动玩法接口的处理时间,影响红包雨服务的吞吐量,最终会影响用户参与春节活动的体验。

奖励 SDK 在功能上实现了奖励Token 的生成和存储和奖励事件的可靠投递。 接口设计上面向不同接入场景针对性地提供定制接口,最大限度的降低使用方的理解和接入成本,减少开发周期。

为了保证 SDK 代码结构清晰,并具有较高的拓展性和可维护性,在代码结构层面,SDK 内部使用了分层设计,分为了对外接口层、内部接口层和内部实现层。

3.1.1 对外接口层

对外接口层定义了暴露给使用者的外部接口,除初始化、反初始化等接口和通用的异步发奖接口外,还为红包雨、烟火大会和集卡分别提供差异化定制接口。通用异步发奖接口定义和奖励 RPC 服务的异步发奖接口保持一致,通过调用 RPC 接口和通过 SDK 发奖的接入方可以低成本的双向迁移。

定制接口结合使用场景的特点,固化诸如活动 ID、场景 ID、奖励类型等通用参数,减少接口入参个数,函数名称语义更清晰,可进一步降低接入方的使用成本,提升接入方代码的可读性和可维护性。对于部分场景,还承担了全局幂等 ID的拼接工作。

发奖请求除用户信息(用户 ID、设备 ID 和 AppID )、奖励信息(奖励类型、数值)外,还需携带一个全局唯一 ID 作为订单号,以实现根据订单号幂等的能力。订单号由接入方根据活动信息和用户信息拼接而成。所有的接口都支持调用方写入拓展字段(Map 格式的键值对)保存业务自定义信息。

3.1.2 内部接口层

内部接口层提供了通用的奖励异步发放接口(SendBonus)、Token 生成和存储接口(GenBonusToken)、初始化接口和反初始化接口。外部接口基于内部接口进行差异化封装,提供更细化的功能。内部接口层对上层屏蔽内部实现细节。

以异步发放接口 SendBonus 函数为例,主要集成了参数检查、打点监控、虚拟队列(Queue)选择、奖励消息的构造和发送、奖励 Token 的生成和存储等功能。参数校验通过后,SendBonus 接口即返回奖励 Token,供上层调用者使用(一般是返回给前端和客户端)。

/*
  SendBonus
  @act 活动信息
  @user 用户信息
  @bonus 奖励信息
*/
func SendBonus(ctx context.Context, act Activity, user User, bonus *BonusContent) (string, error) {
   // 参数检查
   if err := CheckParams(act, user); err != nil {
   // 输出错误日志,监控异常请求
      return "", err
   }

   // 检查奖励类型是否合法
   cfg, err := CheckBonus(bonus)
   if err != nil {
      // 输出错误日志,监控异常请求
      return "", err
   }
   
   // 构造奖励消息
   message := &event.BonusEvent{...}
 
   // SendEvent内部根据奖励属性选择队列
   if err = queue.SendEvent(ctx, message); err != nil {
      return GenBonusToken(ctx, act, user, info, true), err
   }
   
   // 构造并返回奖励Token
   return GenBonusToken(ctx, act, user, info, true), nil
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.

3.1.3 内部实现层

内部实现层主要包含奖励 Token 和虚拟队列 Queue 两大模块。Token 模块负责 Token 的生成、存储和查询;Queue 模块负责实现消息的可靠投递。

A. Token 模块

在整个活动系统内部,奖励系统通过消费奖励事件(异步消息)进行真实的奖励发放。在奖励系统内部出灾或奖励实际入账存在压单的情况下,引入 Token 机制来保证用户体验无损、保证用户在活动页面可见奖励流水、保证用户使用奖励时可操作(现金可提现、优惠券可使用等)。Token 作为用户获得奖励的凭据而存在,和奖励事件一一对应。Token 的产生和流转过程如下图所示:

Token 数据结构和加解密

Token 内部数据结构使用 Protobuf 定义,相对于 JSON 方式序列化和反序列化性能均有提升、序列化后的数据大小减小了50%。Token 数据会返回给客户端并保存在本地,为防止黑产解析 Token 构造数据恶意请求服务端接口,需要对Token 数据进行加密。Token 对象使用 Protobuf 进行序列化后的明文使用公司内的 KMS 工具进行加密。加密后的密文使用 Base64 算法进行编码,以便在网络传输和客户端本地存储。解密时先进行 Base64 解码,再使用 KMS 工具进行解密,拿到的明文使用 Brotobuf 进行反序列化后即可得到 Token 对象。

Token 数据内容如下所示:

syntax = "proto3";

message BonusToken {
  string TradeNo      = 1;  // 订单号,全局唯一,用于幂等
  int64  UserID       = 2;  // 发奖当时的APP内的UID
  string Activity     = 3;  // 活动
  string Scene        = 4;  // 场景
  int64  AwardType    = 5;  // 奖励类型
  int32  AwardCount   = 6;  // 奖励数值
  int64  AwardTime    = 7;  // 奖励发放时间戳
  string Desc         = 8;  // 奖励文案
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.

Token 存储

Token 存储是典型的写多读少场景,底层存储需要直接承载发奖的峰值流量(预估350万 QPS ,部分场景一次请求会发放多个奖励),用户进入钱包页面才会读取存储(预估40万QPS),读写请求量级相差较多。数据的有效期较短,奖励真正入账后即可删除。写入场景均为插入单个 Token,读取场景均为读 Token 列表。

Token 主要由红包雨、集卡开奖和烟火大会发奖产生,其中红包雨和集卡开奖的奖励数量有明确的数量上限。在烟火大会玩法中,用户最快每30秒即可领取一次奖励,对用户领奖次数没有限制,理论上单个用户在整个烟火大会活动可以产生500个 Token。

基于预估的线上流量、读写模型和活动特点,决定使用 Redis 作为底层存储,数据结构使用 Hash,用户的 ActID 作为 Hash 数据的 Key、Token 的订单号 TradeNo 作为 Hash 的 Field、Token 序列化后的明文作为 Hash 的 Value。

Token 服务

Token 服务提供了查询用户 Token 列表和加密 Token 合法性校验接口。根据Token 密文是否可以正常解密、解密后的 Token 是否存在于 Redis 中,Token 合法性校验接口返回三种结果:

  • 非法 Token:密文无法解密
  • 未知 Token:密文可解密,但存储无记录
  • 合法 Token:密文可解密,且存储有记录

奖励 SDK 在写 Token 的 Redis 时不会进行失败重试,存在极少数 Token 没有保存成功的情况。为了保证资金安全、防止黑产恶意攻击,可解密的未知 Token 不能用作强制入账。

Token 使用

用户参与活动获得奖励后,Token 由活动前端调用客户端 JSB 进行保存。用户查看奖励流水时,活动钱包页前端会通过 JSB 读取本地 Token 列表,在请求资产中台服务时携带。资产中台服务使用 TokenSDK 进行解密,同时会请求 Token 服务读取服务端 Token 列表,并进行合并操作。资产中台还会在合并后的列表中删除已经入账的 Token,在返回给用户的流水里插入暂未入账的流水并修正活动钱包余额,保证用户奖励及时可见。

用户在活动钱包页进行提现时,也会将客户端本地 Token 带给资产中台服务。资产中台服务对未入账的合法 Token 进行强制入账,保证用户可以完成提现操作。

客户端和服务端 Token 的作用

当奖励系统依赖的消息队列出灾导致无法写入或消费时、或由于削峰限流导致奖励真实入账存在延迟时,两种 Token 都可以在一定程度上保证用户体验无损。

客户端 Token 通过用户设备和后台服务之间的网络传递,保存于用户设备存储。服务端 Token 通过内部网络传递,保存于中心化的 Redis 存储。两种 Token 互为备份,在本地 Token 不可取时,可以依赖服务端 Token。服务端 Token 服务出灾时,客户端 Token 仍然可以保证用户体验。

本次活动在字节系8个 APP 同时上线,Token 服务还可以保证用户在不同 APP 上,甚至不同的设备上的体验一致。

B. Queue 模块

Queue 模块负责提供 “可靠” 的消息投递服务。对外暴露的 SendEvent 函数能够根据奖励选用对应的虚拟队列进行消息发送、并提供统一的监控能力。

func SendEvent(ctx context.Context, msg *BonusEvent) error {
    // 根据奖励信息选择专用的虚拟队列
   queue := GetQueue(msg.Activity, msg.Scene, msg.BonusType)
   data, err := proto.Marshal(message)
   if err != nil {
      return err
   }
   return queue.Send(ctx, message.UserID, message.UniqueID, data)
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.

虚拟队列(Queue)是对公司内 ByteMQ 和 RocketMQ 的封装,内部通过代码封装屏蔽了两种消息队列 Producer-SDK 的使用细节,并支持使用两种 MQ 进行互备,提升整个系统的容灾能力。虚拟队列的类图如下所示:

虚拟队列的 Send 方法可根据用户 ID 动态的调整主备生产者的使用比例,在单个生产者失败的情况下提供自动容灾能力。

func (q *Queue) Send(ctx context.Context, uid int64, tradeNo string, data []byte) error {
   var err error
   if (uid % 100) < GetQueueRatio(q.Name()) {
      err = q.Master.Send(ctx, tradeNo, data)
      if err != nil {
         err = q.Backup.Send(ctx, tradeNo, data)
      }
   } else {
      err = q.Backup.Send(ctx, tradeNo, data)
      if err != nil {
         err = q.Master.Send(ctx, tradeNo, data)
      }
   }
   return err
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.

使用 RocketMQ 或 ByteMQ 的 SDK 异步批量发送功能时,由 Producer 屏蔽两个 SDK 失败回调的差异,统一使用失败消息通道返回给上层。虚拟队列的 Retry 逻辑负责读取主备 Producer 的失败消息,并采取主备轮转的方式进行发送重试。在服务进程无异常退出的情况下,可保证消息最终发送成功。进程正常退出时,Close 方法会等待所有消息处理完成再返回。

消息队列 Topic可配置

虚拟队列内部使用了 Master 和 Backup 两个消息队列,通过代码抽象和底层消息队列类型做了解耦。在真实线上环境,为了达到灾备的目的,单个虚拟队列的 Master 和 Backup 需要使用不同类型或者不同物理集群的消息队列 Topic。

在春节活动期间,ByteMQ 和 RocketMQ 的研发和运维团队分别提供了一个活动专用集群,并做重点运维保障。奖励系统在 ByteMQ 和 RocketMQ 的活动集群申请各申请了两个 Topic。基于4个 Topic,在上层构建了3个虚拟队列。

Topic 的 Producer 实例可以在不同的 Queue 中复用。上图中,ByteMQ 的生产者 S 在 Special Queue 中作为 Master,在 Express Queue 中作为 Backup;RocketMQ 的生产者 B 同时在 Massive 和 Special Queue 中作为 Backup。

奖励 SDK 内部使用的消息队列 Topic 配置在了动态配置 TCC 中,虚拟队列和 Producer 实例之间的映射关系也可通过 TCC 配置。做到了代码和消息队列集群、Topic 解耦。开发测试、线上运行阶段可以非常方便的更换消息队列Topic。

奖励对应的虚拟队列可配置

奖励类型和虚拟队列的对应关系配置在 TCC 中,不同的奖励类型可以动态的指定发送的虚拟队列,没有配置时默认使用 Massive 虚拟队列。在 SendEvent 方法中,调用 GetQueue 发放选用虚拟队列。春节活动期间,Massive 虚拟队列承载所有场景发放的现金奖励;Special 虚拟队列承载了所有场景发放的优惠券;Express 虚拟队列承载了所有场景下的激励金币奖励。

消息异步批量发送

ByteMQ 和 RocketMQ 的生产者 SDK 均支持同步发送和异步批量发送消息。RocketMQ 同步发送时延 P99为20 ms,而 ByteMQ 同步发送时延 P99为秒级。在发送同等数量级的消息时,RocketMQ 的 CPU 占用明显高于 ByteMQ。在异步发送模式下,消息队列的生产者 SDK 会启动协程定时或当缓冲区内的消息达到阈值时发送。定时的时间间隔和缓冲区阈值可以在初始化时配置。批量发送可以降低生产者对消息队列服务的请求次数,假设每100个消息批量发送一次,最高可以将消息队列服务的 QPS 降低100倍,极大的减轻消息队列集群的负载。

为了降低奖励事件发送接口的响应时延,以及保持消息队列集群负载低水位,在大流量发奖场景均使用异步批量发送模式,并配置 ByteMQ 承载主要的流量。

3.2 消费者设计

消息队列的削峰功能,基于控制消费者的消费速度实现。RocketMQ 消费方式基于长轮训方式实现,兼具了推拉两种模式的优点。ByteMQ 消费方式为拉模式。消费者实例可通过控制拉消息的频率和单次拉取消息的数量来控制消费速度。

在春节活动奖励发放场景,不仅需要动态的调整多个消息队列的总消费速度,保证下游奖励服务、资产中台服务、激励中台服务不过载,且充分利用机器资源;还需要动态的控制不同奖励类型的消费速度,支持现金等重要奖励优先入账。

活动中发放的奖励类型较多,不能为每种奖励单独分配消息队列 Topic。不同奖励类型发放的数量差异显著,发放量级大和入账优先级高的奖励独占 Topic,发放量级小和入账优先级低的奖励共用一个 Topic。不同奖励类型的真实入账服务(资产中台服务的下游服务)入账能力不同,入账能力最小的服务每秒仅能处理2000的发放请求。需要支持奖励类型维度的灵活消费控速能力。

在多维度的控速基础上,还需要提供可靠消费的能力,每个奖励消息至少成功处理一次(At least Once),所有奖励最终成功入账。

基于上述背景,奖励消费者服务消息拉取速度(从 Topic 读取消息)和消息处理速度(通过奖励类型限速,调用奖励系统发放奖励)可能存在差异。当拉取速度小于处理速度时,奖励服务吞吐量下降,消息在 Broker 中堆积时间变长;当拉取速度大于处理速度时,不能通过奖励类型限速的消息会堆积在消费者服务进程内存中,并阻塞消费,差异显著时可能造成消费者服务进程因 OOM 而退出,影响服务稳定性。对于被奖励类型限速的消息,需要立即进行重入 队列,消费者服务继续处理后续消息。由于网络波动等原因,暂时处理失败的消息,也需要重入队列,保证消息可以最终处理成功。

3.2.1 消费控速实现

A. 消费限速

RocketMQ 消费者实例在启动时可配置单实例消费速度和消费 Worker 数量。动态调整消费速度,需要重启消费者实例。ByteMQ 兼容 Kafka 协议,Golang 代码中消费 ByteMQ 队列使用了  sarama-cluster (https://github.com/bsm/sarama-cluster)。sarama-cluster 相比于RocketMQ 的 SDK 更加简单,没有提供单实例消费限速能力。单实例可以订阅多个 Partition,每个 Partition 会启动一个协程从 Broker 读取消息,多个 Partiton 共用一个全局通道(Channel)写入待处理消息。业务代码需要从全局通道中读取消息进行处理。限速逻辑只能在业务逻辑中实现,动态调整消费速度无需重启消费者实例。

基于 sarama-cluster 的特点,使用 Go 原生限速器(golang.org/x/time/rate)实现了 ByteMQ 消费者的单实例限速器。代码实现如下:

type Limiter struct {
   Open    bool
   Fetcher LimitFetcher
   inner   *rate.Limiter
   stop    chan struct{}
}
// Wait 处理消息前调用,返回后进行处理
func (s *Limiter) Wait() {
   if s.Open {
      _ = s.inner.Wait(context.Background())
   }
}
// Loop 用于监听限速变化
func (s *Limiter) Loop() {
   for s.Open && s.Fetcher != nil {
      select {
      case <-time.After(time.Second * 5):
         newLimit := s.Fetcher()
         if newLimit != int(s.inner.Limit()) {
            s.inner.SetLimit(rate.Limit(newLimit))
         }
      case <-s.stop:
         return
      }
   }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.

Go 原生限速器采用令牌桶算法实现限流,内部没有维护 Timer,而是采用了惰加载的思路,在获取 Token 时根据时间差计算更新可用 Token 数量。没有任何外部依赖,非常适合用于单实例限流。

动态调整限流器的速率时,通过限速器 Reserve 和 Wait 接口消耗但未使用的Token 不会被取消。使用 Wait 方法阻塞的时间不会因为速率的调整而变化。速率调整发生后,对下游产生的 QPS 由三部分组成:调整前已经在等待的请求(阻塞在 rate.Limiter::Wait()) 、调整后新增的 Token 带来的请求和 Burst(桶容量)带来的请求。调整后短时间内的对下游产生的 QPS 可能超过预期的速度。对于突发流量场景,Burst 不宜设置过大。

// SetLimitAt sets a new Limit for the limiter. The new Limit, and Burst, may be violated
// or underutilized by those which reserved (using Reserve or Wait) but did not yet act
// before SetLimitAt was called.
func (lim *Limiter) SetLimitAt(now time.Time, newLimit Limit)
  • 1.
  • 2.
  • 3.
  • 4.
B. 并发消费

RocketMQ 有序消费时,单个 Queue 只能分配一个 Worker 进行消费,只有当前 Queue 上一个消息成功处理后,才会处理下一个消息,消费速度受限于Queue 的数量和单个消息的处理时延;无序消费时,所有 Worker 共用一个缓冲区,随机消费不同 Queue 的消息,Worker 之间并发处理消息,Worker 数量越多消费速度越快。

RocketMQ 进行消息确认(ACK)时,本地处理成功的消息数量超过一定数量时,或者距离上一次提交超过一定时间后,消费者实例会批量提交(BatchCommit)成功消费信息给 Broker。批量提交请求中包含每个消息的 MsgID、QueueID 和 Offset 等。Broker 侧提供了消息确认窗口机制,每次保存对应Queue 的窗口中最小 Offset 到磁盘。若 Broker 发生宕机,窗口中大于磁盘保存 Offset 的消息,将会被再次消费。在消费者视角,会消费到已经成功确认的消息。因此,RocketMQ 不能保证 At Most Once,消息处理逻辑需要保证幂等。

ByteMQ 消息确认机制相对简单,Broker 没有提供消息确认窗口机制,收到消费者实例的 Commit 请求时,直接保存当前 Offset,偏移量小于当前 Offset 的消息将不会再次被消费。在消费者实例中,业务代码调用的 MarkOffset 方法,会基于确认消息的 Offset+1并记录在内存中,由协程定时提交到 Broker。若消费者实例发生宕机,Offset 未提交到 Broker 的消息将会被 Broker 再次下发,ByteMQ 也不能保证 At Most Once,消费者也需要保证处理逻辑需要保证幂等。

消费 ByteMQ 时,从 sarama-cluster 暴露的全局通道中读取消息后,同步处理成功后调用 MarkOffset 方法可以保证顺序消费。但同步处理会严重降低消费速度(单实例同一时刻只能处理一个消息)。启动协程异步处理可以并发处理消息,并可通过增加协程数量来提升消费速度。但在消费者进程异常退出、消费者宕机等情况下会造成消息丢失。例如:Offset 较大的消息处理后并成功确认(Offset 成功提交到 Broker)后,Offset 较小的消息还未处理成功时消费者宕机,Broker 不再下发该消息,导致该消息漏处理,不满足 At Least Once 语义。

// MarkOffset marks the provided message as processed, alongside a metadata string
// that represents the state of the partition consumer at that point in time. The
// metadata string can be used by another consumer to restore that state, so it
// can resume consumption.
//
// Note: calling MarkOffset does not necessarily commit the offset to the backend
// store immediately for efficiency reasons, and it may never be committed if
// your application crashes. This means that you may end up processing the same
// message twice, and your processing should ideally be idempotent.
func (c *Consumer) MarkOffset(msg *sarama.ConsumerMessage, metadata string) {
   c.subs.Fetch(c.client.config.TryWrapTopicByEnv(msg.Topic), msg.Partition).MarkOffset(msg.Offset+ 1, metadata)
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.

解决上述消息漏处理的问题,需要针对 ByteMQ 的确认机制在业务层进行优化,即在消费者代码中自助实现消息确认窗口机制。在消费者进程中,按照消息顺序将其 Offset 缓存在链表中,同时以 Offset 为 Key 在 HashMap 中存储链表节点指针。消息成功处理时,通过 HashMap 寻址,修改链表节点状态。本地协程定时从链表头部扫描,严格按照顺序向 Broker 提交成功消费的 Offset。并发处理时,保证较大 Offset 的消息不会提前确认给 Broker。

3.2.2 事件处理逻辑

RocketMQ 提供了失败队列,并提供重试能力,但 ByteMQ 没有失败处理机制,为抹平两种消息队列的差异,事件处理方法(HandleMessage)需要尽最大可能保证成功处理,对于处理失败的消息需要进行重入队列(SendEventToBackup)。

RocketMQ 消费者失败消息多次重入队列失败后,会继续利用消息队列 SDK 提供的失败重试能力。由于 ByteMQ 的 SDK 没有失败处理机制, 失败消息多次重入队列失败后,依然会对其 Offset 进行确认,保证不会阻塞后续消息处理。

HandleMessage

// HandleMessage for ByteMQ
func HandleMessage(msg *sarama.ConsumerMessage) error {
   err := DoReward(msg.Context, msg.Value, limiter)
   MarkOffser(msg, err) // 本地确认,由异步协程定时提交
   return nil
}

// HandleMessage for RocketMQ
func (w wrapper) HandleMessage(ctx context.Context, msg *pb.ConsumeMessage) error {
   return handler.DoReward(ctx, msg.Msg.MsgBody, limiter)
}

type Limiter interface {
   Allow(*BonusEvent) bool
}

func DoReward(ctx context.Context, data []byte, rate Limiter) error {
   bonus := &BonusEvent{}
   if err := proto.Unmarshal(data, bonus); err != nil {
      return err
   }
   // 按照奖励类型限流,当rate为nil时不限流,熔断时直接重入队列
   if rate == nil || rate.Allow(bonus) {
      // 同步调用奖励服务进行发奖
      if err := callReward(ctx, bonus); err == nil {
         return nil
      }
   }
   // 处理失败:重新写入队列
   return SendEventToBackup(ctx, bonus.UniqueID, bonus)
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.

SendEventToBackup

func SendEventToBackup(ctx context.Context, tradeNo string, bonus *BonusEvent) error {
   bonus.Retry++  // 增加Retry次数
   data, err := proto.Marshal(bonus)
   if err != nil {
      return err
   }
   // 使用新PartitonKey进行重发
   newPartitionKey := fmt.Sprintf("%s{%d}", bonus.UniqueID, bonus.Retry)
   for _, queue := range instances {
      // 多个备选队列用于重入队列 
      if err = queue.Send(ctx, newPartitionKey, data); err == nil {
         return nil
      }
   }
   // 极端情况下通过日志回捞的方式处理
   logs.CtxError(ctx, "%s", base64.StdEncoding.EncodeToString(data) )
   return err
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.

3.2.3 奖励类型限速

由于不同奖励类型最终由不同的下游系统入账,为保证下游系统都稳定性,减少下游系统返回限流错误和无效调用,针对每一个奖励类型单独配置了单实例限速。

func NewLimiter() *Limiter {
   l := &Limiter{
      m:      sync.Map{},
      ticker: time.NewTicker(5 * time.Second),
   }
   l.loop()
   return l
}

type Limiter struct {
   m      sync.Map
   ticker *time.Ticker
}

type innerLimiter struct {
   *rate.Limiter
   Fuse bool
}

// Allow 返回true时处理消息;返回false时不处理消息,直接重入队列
func (L *Limiter) Allow(event *BonusEvent) bool {
   if event == nil {
      return true
   }
   if v, exist := L.m.Load(GetBonusType(event)); exist {
      if inner, ok := v.(*innerLimiter); ok {
          if inner.Fuse { // 开启了熔断开关
             return false
          }
          return inner.Allow()
       }
   }
   return true
}

func (L *Limiter) loop() {
   go func() {
      defer Recover()
      L.run()
      for range L.ticker.C {
         L.run()
      }
   }()
}

// 监听配置变更,动态调整限速
func (L *Limiter) run() {
   for wt, config := range tcc.GetRateCfg() {
      value, exist := L.m.Load(wt)
      if !exist || value == nil {
          // 创建新增限流器
         L.m.Store(wt, &innerLimiter{
            Limiter: rate.NewLimiter(rate.Limit(config.Rate), config.Burst),
            Fuse:    config.Fuse,
         })
         continue
      }
      
      if inner, ok := value.(*innerLimiter); ok {
         // 更新已有限流器
         inner.Fuse = config.Fuse
         if int(inner.Limiter.Limit()) != config.Rate {
            inner.Limiter.SetLimit(rate.Limit(config.Rate)) 
         }
         continue
      }

      L.m.Delete(wt)
      L.m.Store(wt, &innerLimiter{
         Limiter: rate.NewLimiter(rate.Limit(config.Rate), config.Burst),
         Fuse:    config.Fuse,
      })
   }
}

func (L *Limiter) Close() {
   if L.ticker != nil {
      L.ticker.Stop()
      L.ticker = nil
   }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.
  • 62.
  • 63.
  • 64.
  • 65.
  • 66.
  • 67.
  • 68.
  • 69.
  • 70.
  • 71.
  • 72.
  • 73.
  • 74.
  • 75.
  • 76.
  • 77.
  • 78.
  • 79.
  • 80.
  • 81.

3.2.4 消费和奖励类型限速协调

消费者类似于一个管道,消费限速相当于流入管道的流量限制,奖励类型限速相当于流出管道的流量限制。当消费速度大于所有类型速度之和时,会导致请求重入队列。减少重入队列需要保证两点:

  1. 消费限速和奖励类型限速联动,调整类型限速时消费速度自动调整适配
  2. 上游发放奖励时,不同奖励出现的概率分布和类型限速配置匹配

在春节活动中,奖励发放的概率由算法策略控制。在红包雨、烟火大会、集卡开奖等场景下,概率分布符合预期,没有发生重入队列。

3.3 奖励服务设计

奖励服务负责调用资产中台服务和激励中台服务发放具体的奖励。对上层提供全局幂等的保证、失败托管重试、预算控制等能力。

由于上游存在使用同一个幂等 ID 发放不同奖励的情况,且不同的下游系统之间数据隔离,故需要奖励服务存储所有发奖请求处理状态及结果,用于保证全局幂等。发放请求使用公司自研的 Abase 进行存储,同时利用了 Abase 提供的 CAS 能力,对奖励发放行为进行了并发控制,确保同一个幂等 ID 仅能用于一次发放行为。上游重试请求的奖励类型和数值需要和原始请求保持一致,才能通过校验,进入真正的发放流程。

奖励服务对外提供同步发奖和异步发奖两类接口。对于需要感知奖励发放结果的场景,上游需要使用同步发奖接口。例如奖励事件消费者,需要明确感知发放是否成功,来决策是否需要重试等。同步接口稳定性和响应时延强依赖下游服务。部分奖励下游发放逻辑较重,耗时较长,容易导致上游调用超时,稳定性降低。

对于无需实时感知发放结果,或对接口响应实验非常敏感的场景,上游需要使用异步发奖接口。异步接口在通过预算控制,成功将消息投递到消息队列后返回。异步接口可以提升系统吞吐能力,降低上游等待时间。利用消息队列的削峰和异步能力,奖励服务可以直接承接中等规模(发放 QPS 在10万到50万)的发奖场景接入。对于大规模(发放 QPS 在50万之上)的发奖场景,需要通过奖励 SDK 接入。相对于同步接口,异步接口支持通用的失败重试逻辑和异常处理能力,接入方无需再次开发相关逻辑,可降低研发投入。

3.3.1 同步发奖

同步发奖接口会实时返回下游系统返回的入账结果。对于失败请求由上游服务负责处理,奖励服务不进行托管。奖励同步发放的流程如下图所示:

上述流程图中,写消息队列、添加记录节点可以根据场景要求,可设置为强依赖节点,也可设置为弱依赖节点。当写消息队列和添加记录节点被设置为弱依赖时,奖励服务不能严格保证全局幂等,此时的幂等性需要下游系统保证;在消息队列和 Abase 存储系统出灾时,奖励服务可正常对外提供服务。

3.3.2 异步发奖

上游调用异步发奖接口虽然不会实时返回发放结果,但会在上游请求时同步调用预算控制服务进行扣减预算。异步发奖流程中,发奖请求成功写入消息队列后,立即返回。后续发奖流程由奖励系统的消费者服务通过消费消息触发,并保证最终成功入账。

异步发奖请求处理过程中,收到下游系统返回的不可重试错误时,会将异常请求写入专用的失败队列并落 Hive 表存档,以便后续处理。

3.3.3 预算控制

预算控制是保证资金安全的手段之一。在春节活动中,除活动玩法自身的频控逻辑和预算控制策略外,奖励系统、资产中台和下游账户服务都有自身的预算控制策略。

奖励系统中场景预算通过动态配置 TCC 配置,可支持动态调整。预算消耗情况通过 KV 存储,为防止出现热点 Key,根据接入场景的流量大小做了分 Key,单预算 Key 承载小于500 QPS 的请求。进行预算扣减时,通过对唯一订单号进行哈希求余来决定具体的预算 Key,并在预算 Key 的 Value 中存储若干条最新的订单号,基于存储系统的 CAS 能力提供有限的预算扣减幂等能力。若在单预算 Key 上产生较高的并发请求,存储的订单号被淘汰的情况下发生超时重试,会导致预算超扣。进行预算配置时,做了一定比例的超配,防止因为流量不均和预算超扣导致误拦截。

资产中台系统中,基于 Redis 执行 Lua 脚本的能力,实现了多 Key 事务预算控制方案,提供了相对严格的预算控制能力。在下游的账户服务中,基于关系型数据的事务能力进行了严格的预算控制,保证在活动场景不会发生超发。

4. 总结

春节活动于2022年1月24日正式上线,2022年1月31日(除夕)结束,共持续7天。活动期间通过奖励系统发放各类奖励约70亿笔,仅除夕当天就发放20亿笔。在多场红包雨中,奖励系统从生产端到消费端做到了全部消息的可靠处理,离线对账未检测到任何有效差异,现金奖励全部成功入账。

在春节活动中对相关服务的性能、稳定性和可靠性有着极高的要求。在设计技术方案时,技术选型和常规需求有所不同,需要在可供选择的组件中权衡性能和可靠性。降低系统复杂度,减少外部依赖,并对依赖部分进行充分的深入的了解是保证整个系统稳定可靠的关键。