Redis分布式锁深入分析

Redis分布式锁深入分析

有关分布式锁的制作我在之前的文章已经提过,感兴趣的可以看一下

针对RedisTemplate实现WatchDog – Karos (wzl1.top)

但是这个方法仍然有问题存在,下面我们来聊一聊

注意,这篇文章本质上只是讨论分布式锁的问题,如果要看源代码分析,可以看看这位大佬的代码,我在这里引入代码只是想要说明一些解决方案:分布式锁 | Joseph's Blog (gitee.io)

最原始的Redis分布式锁

最开始大家刚学分布式锁的时候,用的是这个指令,

setnx key value

然后使用expire给他设置过期时间

看似没有问题

image-20230616021643706

难道真的没问题吗?

试想一下,在高并发下,redis出现了雪崩,那么你设置了setnx,但是在设置expire之前崩了,呃呃呃~

没错,要解决这个问题,得实现原子性,原子性,我们在MySQL里面通过学习了事务来解决

image-20230616022013037

那么redis,能不能类似实现事务呢?其实redis本身是有事务的,但是这种简单的语句,用Lua也行(没错,就是你打游戏开脚本哪个)

但是在这里我们不讲Lua,主要说一下思想,其实就是通过lua将两个原子语句封装在一起,再发送给redis服务器进行执行

lua-redis快速入门直接看最后

这个分布式锁实现过于简单,就不在这里说了,hhh~

Redis官方针对SETNX的改动

其实Redis官方在后面也看出了SETNX的缺点,所以他在2.6.12版本开始,加入了一个新的指令

set key value EX|PX nx|ex

EX|PX是expire和pexpire,nx是不存在则执行,ex是存在则执行

简单说下,然后RedisTemplate.setValue().setIfAbsent()方法也进行了重写

这样就保证了原子性,这个方法在我之前的文章里面也用过。

并且我参考Redisson的思想制作了分布式锁看门狗机制

当时其实是在想续期问题如何解决,解决之后就感觉自己

image-20230616022948257

直到最近,群佬看博客,指出了一个问题,这个方法是否可重入呢?

image-20230616023100917

说到这里,你可能对可重入有点迷惑,那么现在,我们来介绍一下可重入锁

可重入锁

什么是可重入锁?

来看看介绍吧。

可重入就是说某个线程已经获得某个锁,可以再次获取锁而不会出现死锁。

简单的例子,这里我用伪代码来解释

syn(this){
    sout("加锁成功A")
    syn(this){
        sout("加锁成功B")
    }
}

那么这里,我们会发生什么呢?

按照没接触可重入锁的情况或者没有这样试过的情况来说,执行完 sout("加锁成功A")后便会产生死锁问题

而可重入锁,就是说,在此时,你依然可以进入并执行sout("加锁成功B")

那么应用场景?

最容易想到的是递归调用,但是还有其他的业务方面可以说一说,

比如你要调用业务方法A,业务A中有操纵了要上锁业务B,同时业务A又需要全局上锁,那么这个地方就需要可重入了

基于Redis-Hash的可重入锁实现

在Redisson中,采用的是hash进行锁的存储,然后对hash设置一个过期时间

大概的数据结构是这样的

image-20230616025322273

hashname为key,hashkey为thread1,value是锁的重入次数

但是这里我要提一点,这里的thread1,可不仅仅是threadId,使用分布式锁通常是在分布式、微服务i项目下,不同的服务中也有可能出现线程ID相同的问题,所以这里加一个服务名,其实生成个UUID就可以了

大概的格式就是这样:

mylock:HEX(uuid+theadId):num

但是还有个设置过期时间的问题,如何设置?

我这里跟着我之前的帖子来讲,在那里我是使用的RedisTemplate来实现分布式锁+看门狗机制

但是没有考虑可重入的问题,那么我这次就加上

我们要加过期时间,同时又要确保原子性,那么就用Lua

加锁

对于加锁的Lua如下

    -- 如果Lock存在
    if redis.call("exists",KEYS[1]) ~= 0 then
        -- 如果不是自己的锁
        if redis.call("exists",KEYS[1],ARGS[1]) == 0 then
            -- 不是自己的锁,返回剩余时间
            return redis.call("pttl",KEYS[1]);
        end
        -- 如果是自己的锁就记录次数
        redis.call("hincrby",KEYS[1],ARGS[1],1);
        -- 延期
        redis.call("pexpire",KEYS[1],ARGS[2]);
    else
        redis.call("hset",KEYS[1],ARGS[1],1);
        -- 设置默认延期
        redis.call("pexpire",KEYS[1],ARGS[2]);
    end
    -- 如果Lock不存在,那么就直接加上就可以了,hhh
    return nil;

这里解释一下KEY和ARG,key是hash名,args是指命令携带参数

key1:索命

args1:服务线程唯一ID

args2:过期时间

然后在代码里面的实现

image-20230616025942863

解锁

解锁也差不多

--    解锁的逻辑和加锁相似
    -- 如果Lock存在
    if redis.call("exists",KEYS[1]) ~= 0 then
        -- 如果是自己的锁
        if redis.call("hexists",KEYS[1],ARGS[1]) ~= 0 then
            -- 如果是最后一层 直接delete
            if redis.call("hget",KEYS[1],ARGS[1]) == 0 then
                redis.call("del",KEYs[1]);
                a=0
            else
            -- 如果不是,那么久锁层数减一
                a=redis.call("hincrby",KEYS[1],ARGS[1],-1);
            end
        end
        return a;
    end
    -- 如果Lock不存在,那么就return,hhh
    return nil;

image-20230616030009479

续期的话本来就是一条语句,不变就可以了

然后我和之前的代码相比,自旋锁改了一下,hhh

image-20230616030117535

看门狗机制实现

之前其实已经实现过,这里就再来看看吧,这里我为了方便一点,用的Hutool来演示,但是实际用的时候还是用Netty等框架比较好,毕竟Redission也是用的Netty

image-20230616030317267

目前还存在的问题+Reddisson源码分析 —— 自旋锁

没错,别以为这样就完了,细心的话会发现我上面的代码里面,写的是最暴力的自旋锁(图一个方便,hhh)

如果说一直循环下去,那么无疑是非常浪费CPU的

站着茅坑不拉屎是我的错,但是看着别人蹲上了,我心里又特别难过表情包图片gif动图 - 求表情网,斗图从此不求人!

那么如何解决?

解决方案

细心的同学已经发现了,在我加锁失败的时候,会返回一个ttl,也就是当前key还有多久失效

那么我们是不是可以在while里面是指一个阻塞,然后等过了这么久再唤醒线程就可以了?

没错,Reddisson底层也是这样实现的,基于Redis发布订阅,但是这里我给大家简单引个路子

你可以理解为把阻塞的线程ID放进一个阻塞队列里面,而我们的服务器就去订阅这个队列,其实这个队列在Redis里面叫做Channel,感兴趣的可以去看看。

那么是如何订阅的呢?

其实在源代码中,Redisson是放了一个“消息检测器”来进行监听

下面来看看Redisson加锁的代码

阻塞加锁源码 lock()

//阻塞加锁
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
        long threadId = Thread.currentThread().getId();
        Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return;     //这里拿到锁了
        }
        CompletableFuture<RedissonLockEntry> future = subscribe(threadId);    //对当前线程进行消息订阅
        pubSub.timeout(future);     //设置订阅超时
        RedissonLockEntry entry;
        if (interruptibly) {
            entry = commandExecutor.getInterrupted(future);
        } else {
            entry = commandExecutor.get(future);
        }

        try {
            while (true) {
             // 循环重试获取锁,直至重新获取锁成功才跳出循环
            // 此种做法阻塞进程,一直处于等待锁手动释放或者超时才继续线程 
                ttl = tryAcquire(-1, leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    break;
                }

                // waiting for message
                if (ttl >= 0) {
                    try {
                        entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                        if (interruptibly) {
                            throw e;
                        }
                        entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    }
                } else {
                    if (interruptibly) {
                        entry.getLatch().acquire();
                    } else {
                        entry.getLatch().acquireUninterruptibly();
                    }
                }
            }
        } finally {
            unsubscribe(entry, threadId);
        }
//        get(lockAsync(leaseTime, unit));
    }

非阻塞加锁

//不阻塞加锁,waitTime是最大容忍时间,这个概念不做过多解释,就是等待你自选的时间
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        long threadId = Thread.currentThread().getId();
        Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return true;
        }
        // 计算第一次尝试获取锁后剩余的时间
        time -= System.currentTimeMillis() - current;
        if (time <= 0) {
            acquireFailed(waitTime, unit, threadId);    //获取失败
            return false;
        }

        current = System.currentTimeMillis();
        //消息订阅
        CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        try {
            subscribeFuture.get(time, TimeUnit.MILLISECONDS);   //设置一个最多订阅时间
        } catch (TimeoutException e) {
            if (!subscribeFuture.completeExceptionally(new RedisTimeoutException(
                    "Unable to acquire subscription lock after " + time + "ms. " +
                            "Try to increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."))) {
                subscribeFuture.whenComplete((res, ex) -> {
                    if (ex == null) {
                        unsubscribe(res, threadId);
                    }
                });
            }
            acquireFailed(waitTime, unit, threadId);
            return false;
        } catch (ExecutionException e) {
            acquireFailed(waitTime, unit, threadId);
            return false;
        }

        try {
            time -= System.currentTimeMillis() - current;
            if (time <= 0) {
                acquireFailed(waitTime, unit, threadId);
                return false;
            }

            while (true) {
                long currentTime = System.currentTimeMillis();
                ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    return true;
                }

                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(waitTime, unit, threadId);
                    return false;
                }

                // waiting for message
                currentTime = System.currentTimeMillis();
                if (ttl >= 0 && ttl < time) {
                    commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);//阻塞,等待消息
                } else {
                    commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);//阻塞,等待消息
                }

                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(waitTime, unit, threadId);
                    return false;
                }
            }
        } finally {
            unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);
        }
//        return get(tryLockAsync(waitTime, leaseTime, unit));
    }

消息订阅

其上述类容中的订阅,都通过下面的方法进行回调,在解锁的时候会发布消息

package org.redisson.pubsub;

import org.redisson.RedissonLockEntry;

import java.util.concurrent.CompletableFuture;

/**
 * LockPubSub类是一个用于锁的发布-订阅实现。
 * 它继承自PublishSubscribe类,用于处理锁的订阅和消息发布。
 * 锁的订阅者是RedissonLockEntry对象。
 * 当接收到特定的消息时,会执行相应的操作。
 */
public class LockPubSub extends PublishSubscribe<RedissonLockEntry> {

    // 解锁消息
    public static final Long UNLOCK_MESSAGE = 0L;
    // 读锁解锁消息
    public static final Long READ_UNLOCK_MESSAGE = 1L;

    public LockPubSub(PublishSubscribeService service) {
        super(service);
    }

    @Override
    protected RedissonLockEntry createEntry(CompletableFuture<RedissonLockEntry> newPromise) {
        return new RedissonLockEntry(newPromise);
    }

    @Override
    protected void onMessage(RedissonLockEntry value, Long message) {
        if (message.equals(UNLOCK_MESSAGE)) {
            // 获取等待执行的Runnable对象,并执行
            Runnable runnableToExecute = value.getListeners().poll();
            if (runnableToExecute != null) {
                runnableToExecute.run();
            }

            // 释放锁计数器
            value.getLatch().release();
        } else if (message.equals(READ_UNLOCK_MESSAGE)) {
            // 循环执行等待执行的Runnable对象,并执行
            while (true) {
                Runnable runnableToExecute = value.getListeners().poll();
                if (runnableToExecute == null) {
                    break;
                }
                runnableToExecute.run();
            }

            // 释放锁计数器,释放所有等待的读锁
            value.getLatch().release(value.getLatch().getQueueLength());
        }
    }

}

总结

其实这里收获最大的,应该是自旋锁,虽然说在最后使用了发布订阅来完成异步唤醒,但还是有一些缺点,比如这个锁是否公平,如果说,这里要让你实现公平锁,读者你又打算如何解决?

想到这里,我又想到了Reactor模型,其实我们可以做一个BossGroup来存放一下阻塞线程ID,其实就是一个阻塞队列

再用一个WorkerGroup来对每个ThreadID进行处理,当然这里提供的是一个思路,如果要完成的话,相当于是写一个小型中间件,也挺有意思,后面打算试一试,hhh

Redis-Lua快速学习

当编写 Lua 脚本与 Redis 进行交互时,以下是一些常用的 Lua 脚本指南和技巧:

  1. 命令调用:使用 redis.call 函数来调用 Redis 命令。例如,redis.call('GET', 'mykey') 将调用 Redis 的 GET 命令并返回键为 'mykey' 的值。
  2. 参数访问:可以使用 KEYS 表来访问传递给 Lua 脚本的键列表,使用 ARGV 表来访问传递给 Lua 脚本的额外参数。例如,KEYS[1] 表示第一个键,ARGV[1] 表示第一个额外参数。
  3. 返回结果:Lua 脚本可以通过使用 return 语句来返回结果。例如,return redis.call('GET', 'mykey') 将返回键为 'mykey' 的值。
  4. 循环和条件:Lua 提供了一些基本的循环和条件语句,例如 forwhileif 等,可以在 Lua 脚本中使用。
  5. 容错处理:在编写 Lua 脚本时,可以考虑添加容错处理,例如使用 pcall 函数来捕获 Redis 命令的错误并进行处理。
  6. 事务支持:Redis 的 Lua 脚本支持事务,可以使用 redis.call('MULTI') 开始事务,然后使用 redis.call('EXEC') 执行事务。在事务中,可以执行多个 Redis 命令,并将其作为一个原子操作进行提交或回滚。
  7. 脚本缓存:Redis 可以缓存 Lua 脚本,以提高执行效率。您可以使用 EVALSHA 命令来执行缓存的脚本。在 Java RedisTemplate 中,您可以使用 execute 方法的 execute(script, keys, args) 形式来执行缓存的脚本。

这些指南和技巧可帮助您编写更复杂和灵活的 Lua 脚本与 Redis 进行交互。在编写 Lua 脚本时,请参考 Redis 官方文档以及 Lua 官方文档,以了解更多 Lua 编程语言和 Redis 命令的细节和用法。

当编写 Lua 脚本时,可以使用循环和条件语句来实现逻辑控制。以下是一些示例:

  1. 使用 for 循环:
for i = 1, 10 do
  -- 执行操作,例如打印循环变量
  print(i)
end
  1. 使用 while 循环:
local i = 1
while i <= 10 do
  -- 执行操作,例如打印循环变量
  print(i)
  i = i + 1
end
  1. 使用 if-else 条件:
local num = 5
if num < 0 then
  print("Number is negative")
elseif num == 0 then
  print("Number is zero")
else
  print("Number is positive")
end
  1. 使用 repeat-until 循环:
local i = 1
repeat
  -- 执行操作,例如打印循环变量
  print(i)
  i = i + 1
until i > 10

这些示例展示了在 Lua 脚本中使用循环和条件语句的基本用法。您可以根据自己的需求和逻辑在 Lua 脚本中编写更复杂的循环和条件控制结构。请注意,在 Lua 中,条件语句使用 if-elseif-else 结构,而不是像其他编程语言中的 if-else 结构。此外,Lua 的索引从 1 开始,而不是从 0 开始,这与一些其他编程语言有所不同。

请确保根据您的实际需求和逻辑编写正确的循环和条件控制结构,并根据 Redis 脚本的要求将其集成到您的 Lua 脚本中。

  • 微信或QQ扫一扫

1 thought on “Redis分布式锁深入分析

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注

目录