当前位置 : 主页 > 编程语言 > 其它开发 >

redisson之分布式锁实现原理(三)

来源:互联网 收集:自由互联 发布时间:2022-06-17
官网:https://github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95 一、什么是分布式锁1.1、什么是分布式锁 分布式锁,即分布式系统中的锁。在单体应用中我们通过锁解决的是控制共享资源访问的

官网:https://github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95

一、什么是分布式锁 1.1、什么是分布式锁

分布式锁,即分布式系统中的锁。在单体应用中我们通过锁解决的是控制共享资源访问的问题,而分布式锁,就是解决了分布式系统中控制共享资源访问的问题。与单体应用不同的是,分布式系统中竞争共享资源的最小粒度从线程升级成了进程。

1.2、分布式锁应该具备哪些条件
  • 在分布式系统环境下,一个方法在同一时间只能被一个机器的一个线程执行
  • 高可用的获取锁与释放锁
  • 高性能的获取锁与释放锁
  • 具备可重入特性(可理解为重新进入,由多于一个任务并发使用,而不必担心数据错误)
  • 具备锁失效机制,即自动解锁,防止死锁
  • 具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败
1.3、分布式锁的实现方式
  • 基于数据库实现分布式锁
  • 基于Zookeeper实现分布式锁
  • 基于reids实现分布式锁
二、基于数据库的分布式锁

基于数据库的锁实现也有两种方式,一是基于数据库表的增删,另一种是基于数据库排他锁。

2.1、基于数据库表的增删

基于数据库表增删是最简单的方式,首先创建一张锁的表主要包含下列字段:类的全路径名+方法名,时间戳等字段。

具体的使用方式:当需要锁住某个方法时,往该表中插入一条相关的记录。类的全路径名+方法名是有唯一性约束的,如果有多个请求同时提交到数据库的话,数据库会保证只有一个操作可以成功,那么我们就认为操作成功的那个线程获得了该方法的锁,可以执行方法体内容。执行完毕之后,需要delete该记录。

(这里只是简单介绍一下,对于上述方案可以进行优化,如:应用主从数据库,数据之间双向同步;一旦挂掉快速切换到备库上;做一个定时任务,每隔一定时间把数据库中的超时数据清理一遍;使用while循环,直到insert成功再返回成功;记录当前获得锁的机器的主机信息和线程信息,下次再获取锁的时候先查询数据库,如果当前机器的主机信息和线程信息在数据库可以查到的话,直接把锁分配给他就可以了,实现可重入锁)

2.2、基于数据库排他锁

基于MySql的InnoDB引擎,可以使用以下方法来实现加锁操作

public void lock(){
    connection.setAutoCommit(false)
    int count = 0;
    while(count < 4){
        try{
            select * from lock where lock_name=xxx for update;
            if(结果不为空){
                //代表获取到锁
                return;
            }
        }catch(Exception e){
 
        }
        //为空或者抛异常的话都表示没有获取到锁
        sleep(1000);
        count++;
    }
    throw new LockException();
}

在查询语句后面增加for update,数据库会在查询过程中给数据库表增加排他锁。获得排它锁的线程即可获得分布式锁,当获得锁之后,可以执行方法的业务逻辑,执行完方法之后,释放锁connection.commit()。当某条记录被加上排他锁之后,其他线程无法获取排他锁并被阻塞。

2.3、基于数据库锁的优缺点

上面两种方式都是依赖数据库表,一种是通过表中的记录判断当前是否有锁存在,另外一种是通过数据库的排他锁来实现分布式锁。

  • 优点是直接借助数据库,简单容易理解。
  • 缺点是操作数据库需要一定的开销,性能问题需要考虑。
三、基于Zookeeper的分布式锁

       基于zookeeper临时有序节点可以实现的分布式锁。每个客户端对某个方法加锁时,在zookeeper上的与该方法对应的指定节点的目录下,生成一个唯一的瞬时有序节点。 判断是否获取锁的方式很简单,只需要判断有序节点中序号最小的一个。 当释放锁的时候,只需将这个瞬时节点删除即可。同时,其可以避免服务宕机导致的锁无法释放,而产生的死锁问题。 (第三方库有 Curator,Curator提供的InterProcessMutex是分布式锁的实现)

Zookeeper实现的分布式锁存在两个个缺点:

(1)性能上可能并没有缓存服务那么高,因为每次在创建锁和释放锁的过程中,都要动态创建、销毁瞬时节点来实现锁功能。ZK中创建和删除节点只能通过Leader服务器来执行,然后将数据同步到所有的Follower机器上。

(2)zookeeper的并发安全问题:因为可能存在网络抖动,客户端和ZK集群的session连接断了,zk集群以为客户端挂了,就会删除临时节点,这时候其他客户端就可以获取到分布式锁了。

 四、基于redis的分布式锁

redis命令说明:

(1)setnx命令:set if not exists,当且仅当 key 不存在时,将 key 的值设为 value。若给定的 key 已经存在,则 SETNX 不做任何动作。

返回1,说明该进程获得锁,将 key 的值设为 value
返回0,说明其他进程已经获得了锁,进程不能进入临界区。
命令格式:setnx lock.key lock.value

(2)get命令:获取key的值,如果存在,则返回;如果不存在,则返回nil

命令格式:get lock.key

(3)getset命令:该方法是原子的,对key设置newValue这个值,并且返回key原来的旧值。

命令格式:getset lock.key newValue

(4)del命令:删除redis中指定的key

命令格式:del lock.key

方案一:基于set命令的分布式锁

1、加锁:使用setnx进行加锁,当该指令返回1时,说明成功获得锁

2、解锁:当得到锁的线程执行完任务之后,使用del命令释放锁,以便其他线程可以继续执行setnx命令来获得锁

1)存在的问题:假设线程获取了锁之后,在执行任务的过程中挂掉,来不及显示地执行del命令释放锁,那么竞争该锁的线程都会执行不了,产生死锁的情况。

(2)解决方案:设置锁超时时间

3、设置锁超时时间:setnx 的 key 必须设置一个超时时间,以保证即使没有被显式释放,这把锁也要在一定时间后自动释放。可以使用expire命令设置锁超时时间

1)存在问题:

setnx 和 expire 不是原子性的操作,假设某个线程执行setnx 命令,成功获得了锁,但是还没来得及执行expire 命令,服务器就挂掉了,这样一来
,这把锁就没有设置过期时间了,变成了死锁,别的线程再也没有办法获得锁了。 (
2)解决方案:redis的set命令支持在获取锁的同时设置key的过期时间

4、使用set命令加锁并设置锁过期时间:

命令格式:set <lock.key> <lock.value> nx ex <expireTime>

详情参考redis使用文档:http://doc.redisfans.com/string/set.html

1)存在问题:

① 假如线程A成功得到了锁,并且设置的超时时间是 30 秒。如果某些原因导致线程 A 执行的很慢,过了 30 秒都没执行完,这时候锁过期自动释放,
线程 B 得到了锁。 ② 随后,线程A执行完任务,接着执行del指令来释放锁。但这时候线程 B 还没执行完,线程A实际上删除的是线程B加的锁。 (
2)解决方案: 可以在 del 释放锁之前做一个判断,验证当前的锁是不是自己加的锁。在加锁的时候把当前的线程 ID 当做value,并在删除之前验证 key 对应的
value 是不是自己线程的 ID。但是,这样做其实隐含了一个新的问题,get操作、判断和释放锁是两个独立操作,不是原子性。对于非原子性的问题,
我们可以使用Lua脚本来确保操作的原子性

5、锁续期:(这种机制类似于redisson的看门狗机制,文章后面会详细说明)

虽然步骤4避免了线程A误删掉key的情况,但是同一时间有 A,B 两个线程在访问代码块,仍然是不完美的。怎么办呢?我们可以让获得锁的线程开启一个守护线程,用来给快要过期的锁“续期”。

① 假设线程A执行了29 秒后还没执行完,这时候守护线程会执行 expire 指令,为这把锁续期 20 秒。守护线程从第 29 秒开始执行,每 20 秒执行
一次。 ② 情况一:当线程A执行完任务,会显式关掉守护线程。 ③ 情况二:如果服务器忽然断电,由于线程 A 和守护线程在同一个进程,守护线程也会停下。这把锁到了超时的时候,没人给它续命,也就自动释放了。
 五、Redisson 是什么

       Redisson 是架设在 Redis 基础上的一个 Java驻内存数据网格框架, 充分利用 Redis 键值数据库提供的一系列优势, 基于 Java 使用工具包中常用接口, 为使用者提供了 一系列具有分布式特性的常用工具类;使得原本作为协调单机多线程并发程序的工具包 获得了协调分布式多机多线程并发系统的能力, 大大降低了设计和研发大规模分布式系统的难度,同时结合各富特色的分布式服务, 更进一步 简化了分布式环境中程序相互之间的协作。

5.1、Redisson 重入锁

由于 Redisson 太过于复杂, 设计的 API 调用大多用 Netty 相关, 所以这里只对 如何加锁、如何实现重入锁进行分析以及如何锁续时进行分析

5.2、创建锁

下面这个简单的程序, 就是使用 Redisson 创建了一个非公平的可重入锁,lock() 方法加锁成功 默认过期时间 30 秒, 并且支持 "看门狗" 续时功能。

导入pom

        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.16.0</version>
        </dependency>

把上节课的代码改下

@RestController
@RequestMapping("/redisson")
public class RedissonController {

    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    @Autowired
    RedissonClient redissonClient;

    @GetMapping("/save")
    public String save(){
        stringRedisTemplate.opsForValue().set("key","redisson");
        return "save ok";
    }

    @GetMapping("/get")
    public String get(){

        RLock lock=redissonClient.getLock ( "myLock" );
        String str=null;
            if (lock.tryLock ()){
                System.out.println ("拿到了锁");
                str=stringRedisTemplate.opsForValue().get("key");

            }else{
                System.out.println ("没有拿到锁");
            }
            lock.unlock ();

        return str;
    }

}

然后自己用JMeter工具自己测试下就知道结果了

六、Redisson分布式锁的实现原理

       通过redisson,非常简单就可以实现我们所需要的功能,当然这只是redisson的冰山一角,redisson最强大的地方就是提供了分布式特性的常用工具类。使得原本作为协调单机多线程并发程序的并发程序的工具包获得了协调分布式多级多线程并发系统的能力,降低了程序员在分布式环境下解决分布式问题的难度,下面分析一下RedissonLock的实现原理;

 

 

 

    public boolean tryLock() {
        return (Boolean)this.get(this.tryLockAsync());
    }
    @Override
    public RFuture<Boolean> tryLockAsync(long threadId) {
        return tryAcquireOnceAsync(-1, -1, null, threadId);
    }
RedissonLock.tryAcquireOnceAsync

一直点跟到最后发现进入了下面这个方法,RedissonLock不同的加锁方法,流程会有所差别:
tryLock()不带参数最终调用的是下面代码,从过上面tryLockAsync方法可知,我们默认不传递参数时,程序会默认配置参数,其中传过来的参数leaseTime为-1,unint是null,internalLockLeaseTime默认设置是30S

    private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        RFuture ttlRemainingFuture;
//leaseTime就是租约时间,就是redis key的过期时间。
if (leaseTime != -1L) {//如果设置过期时间 ttlRemainingFuture = this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN); } else {
//否则,在获取锁之后,需要加上定时任务,给锁设置一个内部过期时间,并不断刷新这个时间直到释放锁 ttlRemainingFuture
= this.tryLockInnerAsync(waitTime, this.internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN); }

  //当tryLockInnerAsync执行结束后,触发下面回调
  ttlRemainingFuture.onComplete((ttlRemaining, e) -


        ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            if (e == null) {
//lock acquired
//获取锁成功后,调用定时任务延迟锁时间
if (ttlRemaining) {
//表示设置过期时间,更新internalLockLeaseTime
if (leaseTime != -1L) { this.internalLockLeaseTime = unit.toMillis(leaseTime); } else {
// 设置一个定时任务
this.scheduleExpirationRenewal(threadId); } } } }); return ttlRemainingFuture; }
tryLockInnerAsync加锁实现

此段脚本为一段lua脚本:
KEY[1]: 为你加锁的lock值
ARGV[2]: 为线程id
ARGV[1]: 为设置的过期时间

第一个if:
判断是否存在设置lock的key是否存在,不存在则利用redis的hash结构设置一个hash,值为1,并设置过期时间,后续返回锁。
第二个if:
判断是否存在设置lock的key是否存在,存在此线程的hash,则为这个锁的重入次数加1(将hash值+1),并重新设置过期时间,后续返回锁。
最后返回:
这个最后返回不是说最后结果返回,是代表以上两个if都没有进入,则代表处于竞争锁的情况,后续返回竞争锁的过期时间

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return this.evalWriteAsync(this.getRawName(), LongCodec.INSTANCE, command,
// 这里面的 KEYS[1] 就是我们在最开始获取的Redis的那把锁,看那个属性或者说是锁是否存在
"if (redis.call('exists', KEYS[1]) == 0) " +
"then " +
// 如果不存在,走下面逻辑,给当前线程设置值加1,也就是1
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
// 设置过期时间
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 这块的意思是锁存在,但是获取锁的是当前线程,支持可重入锁
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) " +
"then " +
// 给当前线程的数值加1
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
// 设置过期时间
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 没有获取到锁,查看一下锁的有效期并返回
"return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getRawName()), new Object[]{unit.toMillis(leaseTime), this.getLockName(threadId)});
}

通过这种eval表达式,lua脚本保证原子性
如果不存在锁:
等价于:

命令 备注 EXISTS lockkey 判断锁是否存在 HSET lockkey uuid:threadId 1 设置hash field和value值 PEXPIRE lockkey internalLockLeaseTime 设置lockkey的过期时间

 

 

 

 

 

如果存在锁:
等价于:

命令 备注 HEXISTS lockkey uuid:threadId 判断当前线程是否已经获取到锁 HSET lockkey uuid:threadId 1 设置hash field和value值 HINCRBY lockkey uuid:threadId 1 给对应的field的值加1(相当于可重入) PEXPIRE lockkey internalLockLeaseTime 重置过期时间

 

 

 

 

 

 

普通的不带参加锁逻辑就结束了。

tryLock带参数就相对复杂一些,加入了线程自旋相关的逻辑处理:

tryLock具有返回值,true或者false,表示是否成功获取锁。

   @Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
//走tryAcquireAsync的逻辑
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
// 获取锁失败后,中途tryLock会一直判断中间操作耗时是否已经消耗锁的过期时间,如果消耗完则返回false
time -= System.currentTimeMillis() - current;
if (time <= 0) {
//如果已经超时,则直接失败
acquireFailed(waitTime, unit, threadId);
return false;
}

current = System.currentTimeMillis();
// 订阅锁释放事件
// 如果当前线程通过 Redis 的 channel 订阅锁的释放事件获取得知已经被释放,则会发消息通知待等待的线程进行竞争.

RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
// 将订阅阻塞,阻塞时间设置为我们调用tryLock设置的最大等待时间,超过时间则返回false
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
unsubscribe(subscribeFuture, threadId);
}
});
}
acquireFailed(waitTime, unit, threadId);
return false;
}
//收到订阅的消息后走的逻辑
try {
time -= System.currentTimeMillis() - current;
//判断时间是否超时
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
// 循环获取锁,但由于上面有最大等待时间限制,基本会在上面返回false
while (true) {
long currentTime = System.currentTimeMillis();
ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}

time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}

                 // 通过信号量(共享锁)阻塞,等待解锁消息. (减少申请锁调用的频率)
                 // 如果剩余时间(ttl)小于wait time ,就在 ttl 时间内,从Entry的信号量获取一个许可(除非被中断或者一直没有可用的许可)。
                  // 否则就在wait time 时间范围内等待可以通过信号量

                // waiting for message
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}

time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
}
} finally {
unsubscribe(subscribeFuture, threadId);
}
// return get(tryLockAsync(waitTime, leaseTime, unit));
}

到这里带参加锁逻辑就结束了。接下来聊下释放锁过程

unlock() 释放锁

可重入锁 RedissonLock 释放锁的源码还是比较简单的,我们可以分为两步:第一步是执行释放锁的lua脚本,第二步就是停止 watchdog 的运行。

RedissonBaseLock.unlockAsync

@Override
public RFuture<Void> unlockAsync(long threadId) {
    RPromise<Void> result = new RedissonPromise<Void>();
    RFuture<Boolean> future = unlockInnerAsync(threadId);

    future.onComplete((opStatus, e) -> {
        // 停止 watchdog 的定时续过期时间,其实就是将对应的 ExpirationEntry 从 EXPIRATION_RENEWAL_MAP 中移除,当 watchdog 执行时发现当前客户端当前线程没有 ExpirationEntry 了,那么就会停止执行了。
        cancelExpirationRenewal(threadId);

        if (e != null) {
            result.tryFailure(e);
            return;
        }
        
        // 如果 lua脚本返回的是null,证明当前线程之前并没有成功获取锁,执行tryFailure方法
        if (opStatus == null) {
            IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                    + id + " thread-id: " + threadId);
            result.tryFailure(cause);
            return;
        }
        // 成功释放锁,执行 trySuccess 方法
        result.trySuccess(null);
    });

    return result;
}
释放锁 lua 脚本
    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                        "return nil;" +
                        "end; " +
                        "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                        "if (counter > 0) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                        "return 0; " +
                        "else " +
                        "redis.call('del', KEYS[1]); " +
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return nil;",
                Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
    }

Redis提供了一组命令可以让开发者实现“发布/订阅”模式(publish/subscribe) . 该模式同样可以实现进程间的消息传递,它的实现原理是:

发布/订阅模式包含两种角色,分别是发布者和订阅者。订阅者可以订阅一个或多个频道,而发布者可以向指定的频道发送消息,所有订阅此频道的订阅者都会收到该消息

发布者发布消息的命令是PUBLISH, 用法是

PUBLISH channel message

比如向channel.1发一条消息:hello

PUBLISH channel.1 “hello”

这样就实现了消息的发送,该命令的返回值表示接收到这条消息的订阅者数量。因为在执行这条命令的时候还没有订阅者订阅该频道,所以返回为0. 另外值得注意的是消息发送出去不会持久化,如果发送之前没有订阅者,那么后续再有订阅者订阅该频道,之前的消息就收不到了

订阅者订阅消息的命令是:

SUBSCRIBE channel [channel …]

该命令同时可以订阅多个频道,比如订阅channel.1的频道:SUBSCRIBE channel.1,执行SUBSCRIBE命令后客户端会进入订阅状态。

到这里释放锁的过程也说完了,但还有一个问题,那就是锁过期了怎么办;

如何解决锁过期问题

       一般来说,我们去获得分布式锁时,为了避免死锁的情况,我们会对锁设置一个超时时间,但是有一种情况是,如果在指定时间内当前线程没有执行完,由于锁超时导致锁被释放,那么其他线程就会拿到这把锁,从而导致一些故障。

为了避免这种情况,Redisson引入了一个Watch Dog机制,这个机制是针对分布式锁来实现锁的自动续约,简单来说,如果当前获得锁的线程没有执行完,那么Redisson会自动给Redis中目标key延长超时时间。默认情况下,看门狗的续期时间是30s,也可以通过修改Config.lockWatchdogTimeout来另行指定。

@Override
public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
    return tryLock(waitTime, -1, unit);  //leaseTime=-1
}

实际上,当我们通过tryLock方法没有传递超时时间时,默认会设置一个30s的超时时间,避免出现死锁的问题。

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    RFuture<Long> ttlRemainingFuture;
    if (leaseTime != -1) { 
        ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    } else { //当leaseTime为-1时,leaseTime=internalLockLeaseTime,默认是30s,表示当前锁的过期时间。
        
        //this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
        ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                                               TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    }
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
        if (e != null) { //说明出现异常,直接返回
            return;
        }
        // lock acquired
        if (ttlRemaining == null) { //表示第一次设置锁键
            if (leaseTime != -1) { //表示设置过超时时间,更新internalLockLeaseTime,并返回
                internalLockLeaseTime = unit.toMillis(leaseTime);
            } else { //leaseTime=-1,启动Watch Dog
                scheduleExpirationRenewal(threadId);
            }
        }
    });
    return ttlRemainingFuture;
}

由于默认设置了一个30s的过期时间,为了防止过期之后当前线程还未执行完,所以通过定时任务对过期时间进行续约。

  • 首先,会先判断在expirationRenewalMap中是否存在了entryName,这是个map结构,主要还是判断在这个服务实例中的加锁客户端的锁key是否存在,
  • 如果已经存在了,就直接返回;主要是考虑到RedissonLock是可重入锁。
protected void scheduleExpirationRenewal(long threadId) {
    ExpirationEntry entry = new ExpirationEntry();
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) {
        oldEntry.addThreadId(threadId);
    } else {// 第一次加锁的时候会调用,内部会启动WatchDog
        entry.addThreadId(threadId);
        renewExpiration();
    
    }
}

定义一个定时任务,该任务中调用renewExpirationAsync方法进行续约。

private void renewExpiration() {
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {
        return;
    }
    //用到了时间轮机制
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }
            // renewExpirationAsync续约租期
            RFuture<Boolean> future = renewExpirationAsync(threadId);
            future.onComplete((res, e) -> {
                if (e != null) {
                    log.error("Can't update lock " + getRawName() + " expiration", e);
                    EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                    return;
                }
 
                if (res) {
                    // reschedule itself
                    renewExpiration();
                }
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);//每次间隔租期的1/3时间执行
 
    ee.setTimeout(task);
}

执行Lua脚本,对指定的key进行续约。

protected RFuture<Boolean> renewExpirationAsync(long threadId) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                          "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return 1; " +
                          "end; " +
                          "return 0;",
                          Collections.singletonList(getRawName()),
                          internalLockLeaseTime, getLockName(threadId));
}

到这里所有流程就全部讲完了

 

这短短的一生我们最终都会失去,不妨大胆一点,爱一个人,攀一座山,追一个梦 【本文来自:台湾服务器 http://www.558idc.com/tw.html 复制请保留原URL】
上一篇:链表——数据结构
下一篇:没有了
网友评论