深圳幻海软件技术有限公司 欢迎您!

怎样实现一个分布式的公平锁?

2023-02-28

在并发的场景下,很多时候,我们的业务开发中会有加锁的操作,以此来保证执行的互斥,保障业务逻辑。比如在Java里就有多种基于AQS的组件,方便使用。创建锁的时候,还可以特别的指定一下,当前这个锁,是否需要公平。复制/***Createsaninstanceof{@codeReentrantLock}w

在并发的场景下,很多时候,我们的业务开发中会有加锁的操作,以此来保证执行的互斥,保障业务逻辑。比如在 Java 里就有多种基于 AQS 的组件,方便使用。创建锁的时候,还可以特别的指定一下,当前这个锁,是否需要公平。

 /**
     * Creates an instance of {@code ReentrantLock} with the
     * given fairness policy.
     *
     * @param fair {@code true} if this lock should use a fair ordering policy
     */
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.

对于需要公平的场景,和我们真实生活一样,这里的 FairSync 会通过一个 CLH 队列将请求线程排队。

在单实例应用中,本机的 CLH 排队就足够了,我们现在切换到分布式的场景。

在分布式场景下,为了实现锁的功能,就出现了各种分布式锁。相比单实例场景下的锁只能锁定自己的实例,分布式的锁由于统一的外部中间件的介入,将锁的信息提取到独立的外部,所以可以将多个应用实例做到互斥。

那分布式的场景下,怎么样能保证公平呢?

和我们从单实例到分布式加锁的思路一样,要公平,就排队,在统一的第三方处进行排队。

来第三方这里排队,也有一些需要注意的点。比如你在判断当前队列里有没有等待,如果没有就取锁成功,执行,有等待就入队,而判断的这个逻辑,仍然可能是并发操作,也需要做到加锁处理。就好像你看了一眼某个饭店没什么人,开心的去买了杯奶茶,回来一看满了。

对于分布式锁,基于 Redis 的 Redission 用得不少。如果换成Redis 分布式公平锁,那基本就只有 Redission 了。

下面我通过两段代码,以及部分文字,描述下 Redission 的公平锁的实现。

简要概括下:

Redission 的公平锁,是通过「 Redis + Lua 脚本」来实现的。在拿到一个 Redission 的 Redis 连接之后,通过 「eval()」可以执行一段 Lua script,同时传入一些 key 和 args。因为不管有多少 Lua 的逻辑,都是在同一个连接内,所以不会存在买完奶茶发现人满了的情况。这里应用到了 Redis 的 pub/sub 功能,等待的线程,会在轮到自己时收到 Redis 的提醒,前提是需要订阅了相应的通知。

来看加锁的 Lua 逻辑,代码写的比较清楚,我也加了些对应的Redis操作以及参数的注释。通过 list 来存储排队信息,同时每个等待线程都有一个超时时间,超时退出队列。 所以eval 执行这个的时候,返回的是一个 ttl Long 类型。表示过期时间。

[[

   用于 lock 操作。

    KEYS[1] = lockName
    KEYS[2] = waitQueueName
    KEYS[3] = timeoutName
     
    ARGV[1] = waitTime
    ARGV[2] = lockName
    ARGV[3] = leaseTime
    ARGV[4] = currentTime
--]]

while true do
    local firstThreadId2 = redis.call("lindex", KEYS[2], 0)
    if firstThreadId2 == false then
        break
    end
    local timeout = tonumber(redis.call("zscore", KEYS[3], firstThreadId2))
    if timeout <= tonumber(ARGV[4]) then
        redis.call("zrem", KEYS[3], firstThreadId2)
        redis.call("lpop", KEYS[2])
    else
        break
    end
end
if
    (redis.call("exists", KEYS[1]) == 0) and
        ((redis.call("exists", KEYS[2]) == 0) or (redis.call("lindex", KEYS[2], 0) == ARGV[2]))
 then
    redis.call("lpop", KEYS[2])
    redis.call("zrem", KEYS[3], ARGV[2])   -- 移除有序集合中的一个或多个成员
    redis.call("hset", KEYS[1], ARGV[2], 1)    -- 将哈希表 key 中的字段 field 的值设为 value 。
    redis.call("pexpire", KEYS[1], ARGV[1])
    return nil
end
if (redis.call("hexists", KEYS[1], ARGV[2]) == 1) then          --   HEXISTS key field 查看哈希表 key 中,指定的字段是否存在。
    redis.call("hincrby", KEYS[1], ARGV[2], 1)       --   HINCRBY key field increment  为哈希表 key 中的指定字段的整数值加上增量 increment 。
    redis.call("pexpire", KEYS[1], ARGV[1])       -- Redis PEXPIRE 命令和 EXPIRE 命令的作用类似,但是它以毫秒为单位设置 key 的生存时间,而不像 EXPIRE 命令那样,以秒为单位。
    return nil
end
local firstThreadId = redis.call("lindex", KEYS[2], 0)
local ttl
if firstThreadId ~= false and firstThreadId ~= ARGV[2] then
    ttl = tonumber(redis.call("zscore", KEYS[3], firstThreadId)) - tonumber(ARGV[4])
else
    ttl = redis.call("pttl", KEYS[1])   -- Redis Pttl 命令以毫秒为单位返回 key 的剩余过期时间。
end
local timeout = ttl  + tonumber(ARGV[3]) + tonumber(ARGV[4])
if redis.call("zadd", KEYS[3], timeout, ARGV[2]) == 1 then
    redis.call("rpush", KEYS[2], ARGV[2])
end
return ttl
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.

再来看解锁的逻辑。我前面加锁的一些内容对应着看,重点在于 「publish」这里,在轮到某个线程时,nextThreadId 这个 channel 会收到通知。

这里的 threadId 是我们在加锁和解锁的时候都需要传入的。如果你留意过 Java 的线程 Id 就会发现,不同实例之间有很大概率会重复的。为了避免,各个 Client 在传入 ThradId 的时候,除了真实的 id 外,还需要加入各个 client 对应的信息加以区分。


--[[ 
   用于 unlock 操作。
   
    KEYS[1] = lockName
    KEYS[2] = waitQueueName
    KEYS[3] = timeoutName
    KEYS[4] = channelName
    
    ARGV[1] = message
    ARGV[2] = leaseTime
    ARGV[3] = lockName
    ARGV[3] = currentTime
--]]

while true do
    local firstThreadId2 = redis.call("lindex", KEYS[2], 0)
    if firstThreadId2 == false then
        break
    end
    local timeout = tonumber(redis.call("zscore", KEYS[3], firstThreadId2))
    if timeout <= tonumber(ARGV[4]) then
        redis.call("zrem", KEYS[3], firstThreadId2)
        redis.call("lpop", KEYS[2])
    else
        break
    end
end

if (redis.call("exists", KEYS[1]) == 0) then
    local nextThreadId = redis.call("lindex", KEYS[2], 0)
    if nextThreadId ~= false then
        redis.call("publish", KEYS[4] .. ":" .. nextThreadId, ARGV[1])
    end
    return 1
end
if (redis.call("hexists", KEYS[1], ARGV[3]) == 0) then
    return nil
end
local counter = redis.call("hincrby", KEYS[1], ARGV[3], -1)
if (counter > 0) then
    redis.call("pexpire", KEYS[1], ARGV[2])
    return 0
end

redis.call("del", KEYS[1])
local nextThreadId = redis.call("lindex", KEYS[2], 0)
if nextThreadId ~= false then
    redis.call("publish", KEYS[4] .. ":" .. nextThreadId, ARGV[1])
end
return 1
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.

看过了解锁逻辑后,外面eval 执行加锁的时候,需要对应有 sub ,才会收到这里解锁的 pub 信息,否则就卡住了。

这里需要注意下,sub 这个功能是个阻塞操作,需要单独的线程里执行,通过一个 Future 来实现一定等待时间的 sub 功能,超时再 unsub。这块逻辑 Redission 封装的比较多,感兴趣的可以到源码里点点。