重新说说Redis实现分布式公平可重入锁的实现,这次具体说说异步唤醒机制,这次带上QPS检测

重新说说Redis实现分布式公平可重入锁的实现,这次具体说说异步唤醒机制,这次带上QPS检测

在之前的文章中,我们对Redisson的lock进行了刨析

如下:

  • 看门狗续期
  • 使用hash+lua实现可重入锁

但是还有一点,我们之前使用的自旋+线程休眠来达到线程互斥阻塞的效果。

但是这样做会有一点问题,我们每次休眠的时间都是固定的,仍然会有一大部分空窗期,我设置30s的锁过期,那么好,20个线程足足跑了两三分钟,这个效率绝对不行,对吧。

那么我们还有没有其他办法?

Redssion源码中使用了异步消息订阅

呃呃呃,之前的朋友说我的代码run不起来,没错,我故意改了一些地方,毕竟天下没有免费的午餐对吧,我主要还是提供一下思路,和大概的代码,具体的大家自行去运行了,理解会更深(bushi,好吧,我承认之前我刨析的时候自己实现的代码没有跑 ,所以今天来带大家跑一下)

那么我们也来实现一个

我们使用的是RedisTemplate,然后线程阻塞和唤醒的话我们需要一个同步锁,但是感觉有点问题

因为Java的线程阻塞和唤醒,必须要同步锁,所以我们需要一个变量,那么每个线程都要一个变量,啊啊啊,然后又要使用hashmap来存储变量对应的线程???太他妈浪费内存了

其实在JDK6之后给我提供了一个api=>LockSupport

这里面封装了一些本地方法来进行线程操作

LockSupport

简单介绍下api吧

LockSupport.park(); //阻塞当前线程
LockSupport.parkNanos(long nanos); // 让当前线程休眠 nancos ms
LockSupport.unpark(Thread thread); //唤醒指定线程

看到这里,我相信大家应该都有思路了。

我们实现一个FIFO队列来保证锁竞争的公平性

锁竞争公平性的保证

public static ConcurrentHashMap<String, ConcurrentLinkedQueue<Thread>> threadWaitQueue=new ConcurrentHashMap<>();;

每当锁释放的时候,我们使用消息订阅通知一下,然后unpark不就行了。

但是还有一个问题,如果说某个线程长时间占用,或者消息丢失怎么办?

所以说哦我们使用parkNanos

好了直接上源代码,这次我放能够运行的

Pub/Sub

第一个是消息监听的

package cn.katool.lock;

import cn.hutool.core.util.ObjectUtil;
import cn.katool.util.ScheduledTaskUtil;
import cn.katool.util.db.nosql.RedisUtils;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.LockSupport;

@Component
@Slf4j
public class LockMessageWatchDog implements MessageListener {
    public static ConcurrentHashMap<String, ConcurrentLinkedQueue<Thread>> threadWaitQueue=new ConcurrentHashMap<>();;
    public static final String LOCK_MQ_NAME = "LOCK:RELASE:QUEUE";

    @SneakyThrows
    @Override
    public void onMessage(Message message, byte[] pattern) {
        log.info("从消息通道={}监听到消息",new String(pattern));
        log.info("从消息通道={}监听到消息",new String(message.getChannel()));
        String lockName = new String(message.getBody()).substring(1, message.getBody().length - 1);
        log.info("元消息={}", lockName);
        log.info("threadWaitQueue:{}",threadWaitQueue);;
        ConcurrentLinkedQueue<Thread> threads = threadWaitQueue.get(lockName);
        if (ObjectUtil.isEmpty(threads)||threads.isEmpty()){
            log.info("没有线程需要lock:{}在等待", lockName);
            threadWaitQueue.remove(lockName);
            return ;
        }
        Thread poll = threads.poll();
        log.info("唤醒线程={}",poll.getName());
        LockSupport.unpark(poll);
    }

    //表示监听一个频道

    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory factory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(factory);
        container.addMessageListener(this, new ChannelTopic(LOCK_MQ_NAME));
        return container;
    }

}

下面是锁的工具类

LockUtil

/**
 * Title
 *
 * @ClassName: LockUtil
 * @Description:锁工具类,通过内部枚举类实现单例,防止反射攻击
 * @author: Karos
 * @date: 2023/1/4 0:17
 * @Blog: https://www.wzl1.top/
 */

package cn.katool.lock;

import cn.hutool.core.util.BooleanUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.RandomUtil;
import cn.katool.Config.LockConfig;
import cn.katool.Exception.ErrorCode;
import cn.katool.Exception.KaToolException;
import cn.katool.util.ScheduledTaskUtil;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Scope;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.TimeoutUtils;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.DigestUtils;
import org.springframework.util.ObjectUtils;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.concurrent.*;
import java.util.concurrent.locks.LockSupport;

import static cn.katool.lock.LockMessageWatchDog.LOCK_MQ_NAME;
import static cn.katool.lock.LockMessageWatchDog.threadWaitQueue;

@Component
@Scope("prototype")
@Slf4j
public class LockUtil {
        @Resource
        RedisTemplate redisTemplate;
        // lockName:uID:numbers
        // key[1] 锁名
        // ARGV[1] UID
        // ARGV[2] 时间
        @Value("if redis.call('exists',KEYS[1]) ~= 0 then\n" +        // 这个锁是否存在
                "        if redis.call('hexists',KEYS[1],ARGV[1] ) == 0 then\n" +       // 存在但不是自己的锁
                "            return redis.call('pttl',KEYS[1]);\n" +                  // 返回剩余时间
                "        end\n" +
//                "        -- 如果是自己的锁就记录次数\n" +
                "        redis.call('hincrby',KEYS[1],ARGV[1],1);\n" +                // 是自己的锁重入
                "else\n" +
                "        redis.call('hset',KEYS[1],ARGV[1],1);\n" +  // 没有的话就加上
                "end\n" +
                "redis.call('pexpire',KEYS[1],ARGV[2]);\n" +      //延期
                "return nil")             //返回
        private String lockScript;
        @Value("    if redis.call('exists',KEYS[1]) ~= 0 then\n" +                            //如果锁存在
                "        if redis.call('hexists',KEYS[1],ARGV[1]) ~= 0 then\n" +                      // 如果是自己的锁
                "                local inc= redis.call('hincrby',KEYS[1],ARGV[1],-1);\n" +
                "               if inc == 0 then" +
                "                    redis.call('hdel',KEYS[1],ARGV[1]);\n" +
                "               end" +
                "         return inc;\n" +
                "        end\n" +
                "    end\n" +
                " return nil\n")
        private String unLockScript;

        @Value("    if redis.call('exists',KEYS[1]) ~= 0 then\n" +
                "       local lessTime = redis.call('pttl',KEYS[1]);\n" +
                "       redis.call('pexpire',KEYS[1],ARGV[1]);\n" +                            //如果锁存在
                "    end\n" +
                "    return redis.call('pttl',KEYS[1]);" )
        private String delayLockScript;

        public String serviceUUid=RandomUtil.randomString(16);

        private volatile static boolean isOpenCorn=false;

        /**
         * 带看门狗机制上锁
         * @param lockObj
         * @return
         */
        public boolean DistributedLock(Object lockObj){
                try {
                        return DistributedLock(lockObj,null,null,true);
                } catch (KaToolException e) {
                        throw new RuntimeException(e);
                }
        }
        @Resource
        LockConfig lockConfig;

        //加锁
        public Long luaToRedisByLock(String lockName,Long expTime,TimeUnit timeUnit,String[] hashkey){
                long id = Thread.currentThread().getId();
                //生成hashkey
                String hashKey=hashkey[0]= DigestUtils.md5DigestAsHex(new String(id+serviceUUid).getBytes());
                log.info("serviceUUid:{},id:{},hashKey:{}",serviceUUid,id,hashKey);
                DefaultRedisScript defaultRedisScript = new DefaultRedisScript();
                ArrayList<String> keys = new ArrayList<>();
                ArrayList<Object> args = new ArrayList();
                keys.add(lockName);
                args.add(hashKey);
                args.add(TimeoutUtils.toMillis(expTime,timeUnit));
                defaultRedisScript.setScriptText(lockScript);
                defaultRedisScript.setResultType(Long.class);
//                log.info("lockLua -> {}",lockScript);
                Long execute = (Long) redisTemplate.execute(defaultRedisScript, keys, args.toArray());
                if (ObjectUtil.isEmpty(execute)) {
                        log.info("katool=> {}成功抢到锁,锁名:{},过期时间:{},单位:{}",hashKey,lockName,expTime,timeUnit);
                }
                return execute;
        }

        //释放锁
        @Transactional
        public Long luaToRedisByUnLock(String lockName,Thread thread){
                long id = thread==null?Thread.currentThread().getId():thread.getId();
                //生成value1
                String hashKey= DigestUtils.md5DigestAsHex(new String(id+serviceUUid).getBytes());
                DefaultRedisScript defaultRedisScript = new DefaultRedisScript();
                ArrayList<String> keys = new ArrayList<>();
                ArrayList<String> args = new ArrayList();
                keys.add(lockName);
                args.add(hashKey);
                defaultRedisScript.setScriptText(unLockScript);
                defaultRedisScript.setResultType(Long.class);
//                log.info("unlockLua -> {}",unLockScript);
                Long remainLocks = (Long) redisTemplate.execute(defaultRedisScript, keys, args.toArray());
                if (ObjectUtil.isEmpty(remainLocks)) {
                        log.error("katool=> {}释放锁失败,请释放自己的锁,锁名:{}",hashKey,lockName);
                }
                else if(remainLocks==0){
                        log.info("katool=> {}成功释放锁,锁名:{}",hashKey,lockName);
                        redisTemplate.convertAndSend(LOCK_MQ_NAME,lockName);
                }
                return remainLocks;
        }

        public Boolean luaToRedisByDelay(String lockName,Long expTimeInc,TimeUnit timeUnit){
                long id = Thread.currentThread().getId();
                DefaultRedisScript defaultRedisScript = new DefaultRedisScript();
                ArrayList<String> keys = new ArrayList<>();
                ArrayList<Object> args = new ArrayList();
                keys.add(lockName);
                args.add(TimeoutUtils.toMillis(expTimeInc,timeUnit)+3000);
                defaultRedisScript.setScriptText(delayLockScript);
                defaultRedisScript.setResultType(Long.class);
                Long expire = redisTemplate.getExpire(lockName);
                Long execute = (Long) redisTemplate.execute(defaultRedisScript, keys, args.toArray());
                return execute>expire;
        }

        /**
         * @param obj
         * @param exptime
         * @param timeUnit
         * @return
         * @throws KaToolException
         */
        public boolean DistributedLock(Object obj,Long exptime,TimeUnit timeUnit,Boolean isDelay) throws KaToolException {
                if (ObjectUtil.isEmpty(obj)){
                        throw new KaToolException(ErrorCode.PARAMS_ERROR," Lock=> 传入obj为空");
                }
                if(ObjectUtil.isEmpty(exptime)){
                        exptime= lockConfig.getInternalLockLeaseTime();;
                }
                if (ObjectUtils.isEmpty(timeUnit)){
                        timeUnit=lockConfig.getTimeUnit();
                }
                String lockName="Lock:"+obj.toString();
                Long aLong = -1L;
                // 进入互斥等待
                Thread currentThread = Thread.currentThread();
                while(true){
                        String[] hashkey = new String[1];
                                aLong=luaToRedisByLock(lockName, exptime, timeUnit, hashkey);
                                if (aLong==null) {
                                        break;
                                }
                        log.debug("katool=> {}未抢到锁,线程等待通知唤醒,最多等待时间:{},锁名:{},过期时间:{},单位:{}",hashkey[0],aLong,lockName,exptime,timeUnit);
//                                Thread.sleep(aLong/3);     // 初步改进:使用线程休眠,采用自旋锁+线程互斥
                        ConcurrentLinkedQueue<Thread> threads = threadWaitQueue.get(lockName);
                        if (ObjectUtil.isEmpty(threads)){
                                threads=new ConcurrentLinkedQueue<>();
                                log.info("新增线程进入等待lock:{}队列",lockName);
                                threadWaitQueue.putIfAbsent(lockName,threads);
                                log.debug("threadWaitQueue:{}",threadWaitQueue);;
                        }
                        if (!threads.contains(currentThread)) {
                                threads.add(currentThread);
                        }
                        log.debug("threadWaitQueue:{}",threadWaitQueue);;
                        LockSupport.parkNanos(aLong);             // 等待通知唤醒 或者 超时自行处理
                        log.debug("katool=> {}未抢到锁,线程被唤醒,重新抢锁,锁名:{},过期时间:{},单位:{}",hashkey[0],lockName,exptime,timeUnit);
                        // 线程被唤醒后删除自身的thread队列,避免死锁
                        if (threads.contains(currentThread)) {
                                threads.remove(currentThread);
                        }
                }
                //实现看门狗
                if (isDelay){
                        Thread thread = currentThread;
                        TimeUnit finalTimeUnit = timeUnit;
                        Long finalExptime = exptime;
                        ScheduledFuture<?> scheduledFuture = ScheduledTaskUtil.submitTask(new Runnable() {
                                @SneakyThrows
                                @Override
                                public void run() {

                                        boolean alive = thread.isAlive()||thread.isInterrupted();
                                        ScheduledFuture future = threadWatchDog.get(thread.getId());
                                        if (alive) {
                                                log.info("Thread ID:{} 线程仍然存活,看门狗续期中...", thread.getId());
                                                delayDistributedLock(obj, finalExptime >= 3 ? (finalExptime) / 3 : finalExptime, finalTimeUnit);
                                                return;
                                        } else {
                                                if (future.isCancelled()||future.isDone()) {
                                                        log.error("Thread ID:{} 线程已经死亡,但是没有对应的scheduleId", thread.getId());
                                                        return;
                                                }
                                                log.info("Thread ID:{} 线程死亡,看门狗自动解锁", thread.getId());
                                              DistributedUnLock(obj,thread);
                                                future.cancel(true);
                                                return;
                                        }
                                }
                        },finalExptime, finalExptime, finalTimeUnit);
                        threadWatchDog.put(thread.getId(),scheduledFuture);
                }
                return BooleanUtil.isTrue(aLong!=null);
        }
        ConcurrentHashMap<Long,ScheduledFuture> threadWatchDog=new ConcurrentHashMap<>();
        //延期
        public boolean delayDistributedLock(Object obj,Long exptime,TimeUnit timeUnit) throws KaToolException {
                if (ObjectUtils.isEmpty(obj)){
                        throw new KaToolException(ErrorCode.PARAMS_ERROR," Lock=> 传入obj为空");
                }
                //本身就是一句话,具备原子性,没有必要使用lua脚本
                Boolean expire = luaToRedisByDelay("Lock:"+obj.toString(),exptime,timeUnit);
                log.info("katool=> LockUntil => delayDistributedLock:{} value:{} extime:{} timeUnit:{}",obj.toString(), "1", exptime, timeUnit);
                return BooleanUtil.isTrue(expire);
        }
        //释放锁
        public Long DistributedUnLock(Object obj) throws KaToolException {
                Long remainLocks = DistributedUnLock(obj, null);
                return remainLocks;
        }

        public Long DistributedUnLock(Object obj,Thread thread) throws KaToolException {
                if (ObjectUtils.isEmpty(obj)){
                        throw new KaToolException(ErrorCode.PARAMS_ERROR," Lock=> 传入obj为空");
                }
//                由于这里有了可重入锁,不应该直接删除Boolean aBoolean = redisTemplate.delete("Lock:" + obj.toString());
                Long remainLocks = luaToRedisByUnLock("Lock:" + obj.toString(),thread);
                threadWatchDog.get(Thread.currentThread().getId()).cancel(true);
                threadWatchDog.remove(Thread.currentThread().getId());
                log.info("katool=> LockUntil => unDistributedLock:{} isdelete:{} watchDog is cancel and drop",obj.toString(),true);
                return remainLocks;
        }
        //利用枚举类实现单例模式,枚举类属性为静态的
        private static enum SingletonFactory{
                Singleton;
                LockUtil lockUtil;
                private SingletonFactory(){
                        lockUtil=new LockUtil();
                }
                public LockUtil getInstance(){
                        return lockUtil;
                }
        }
        @Bean("LockUtil")
        public static LockUtil getInstance(){
                return SingletonFactory.Singleton.lockUtil;
        }

        private LockUtil(){

        }

}

在消息发布那里,我本来打算放在Lua脚本里面运行的,但是凡在Lua脚本里面不知道为什么执行失败,但是单独的lua,使用eval来执行又没问题,所以我放在外面,加了一层事务来运行。

其中有一个定时任务工具,在之前我是使用的HuTool的CronUtil来实现的定时任务,但是对于动态修改执行周期,不太方便,需要生成Corn表达式,所以我改成了使用ScheduledThreadPoolExecutor线程池的方式

package cn.katool.util;

import cn.hutool.cron.CronUtil;

import java.sql.Time;
import java.util.concurrent.*;

public class ScheduledTaskUtil {
    private static ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(500,
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.CallerRunsPolicy());

    public static ScheduledFuture<?> submitTask(Runnable run, long delay, TimeUnit timeUnit){
        ScheduledFuture<?> schedule = executor.scheduleWithFixedDelay(run, 0,delay, timeUnit);
        return schedule;
    }
    public static ScheduledFuture<?> submitTask(Callable run, long delay, TimeUnit timeUnit){
        ScheduledFuture<?> schedule = executor.scheduleWithFixedDelay(()->{
            try {
                run.call();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }, 0,delay, timeUnit);
        return schedule;
    }

    public static ScheduledFuture<?> submitTask(Runnable run,long initdeay, long delay, TimeUnit timeUnit){
        ScheduledFuture<?> schedule = executor.scheduleWithFixedDelay(run, initdeay,delay, timeUnit);
        return schedule;
    }
    public static ScheduledFuture<?> submitTask(Callable run,long initdeay,  long delay, TimeUnit timeUnit){
        ScheduledFuture<?> schedule = executor.scheduleWithFixedDelay(()->{
            try {
                run.call();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }, initdeay,delay, timeUnit);
        return schedule;
    }
}

测试代码

下面我们来看看试验吧,我已经将代码上传到了Maven中央仓库,大家引入依赖即可

        <dependency>
            <groupId>cn.katool</groupId>
            <artifactId>katool</artifactId>
            <version>1.9.0</version>
        </dependency>

然后我们在yml里面进行配置

server:
    port: 8081
katool:
    lock:
        internalLockLeaseTime: 30   # 分布式锁默认租约时间
        timeUnit: seconds           # 租约时间单位
    util:
        redis:
            policy: "caffeine"      # 选择内存缓存策略,caffeine
            exptime: {5*60*1000}           # LFU过期时间
            time-unit: milliseconds #  过期时间单位

spring:
    redis:
        database: 1
        host: 127.0.0.1
        port: 6379
        timeout: 5000
        lettuce:
            pool:
                max-active: 100 # 连接池最大连接数(使用负值表示没有限制)
                max-idle: 100 # 连接池中的最大空闲连接
                min-idle: 3 # 连接池中的最小空闲连接
                max-wait: -50000ms # 连接池最大阻塞等待时间(使用负值表示没有限制)
    task:
        execution:
            pool:
                core-size: 300
                max-size: 700
                queue-capacity: 500
                keep-alive: 300s

Controller层

package cn.katool.katooltest;

import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.RandomUtil;
import cn.katool.util.db.nosql.RedisUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;

@RestController
@RequestMapping("/test")
public class TestController {

    @Resource
    RedisUtils redisUtils;
    @GetMapping
    public String test() throws InterruptedException {
        redisUtils.lock("this");
//synchronized ("this".intern()){
    String value = (String) redisUtils.getValue("1234");
    if (ObjectUtil.isEmpty(value)){
        redisUtils.setValue("1234", RandomUtil.randomString(10),5L, TimeUnit.MINUTES);
        value = (String) redisUtils.getValue("1234");
    }
//}
        redisUtils.unlock("this");

    return value.toString();
    }
}

使用ApiFox进行API测试

100个线程同时并发来测试

image-20230912130231164

第一次测验

image-20230912130722748

第二次测验

image-20230912130649975

第三次测验

image-20230912130846628

第四次测验

image-20230912131011554

第五次测验

image-20230912131349496

总结

出去最高和最低的,QPS的均值在1700左右

用jmetter来测试下

image-20230912163416245

大概是1600

1000个线程同时并发测试

ApiFox,并发量上去了贼卡,还是用Jmeter吧

image-20230912174914359

不过,这有点低啊,为了看是不是我们分布式锁的问题,我们用同步锁来试试

image-20230912175708601

还真是

我再来试试Redisson

package cn.katool.katooltest;

import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.RandomUtil;
import cn.katool.util.db.nosql.RedisUtils;
import lombok.extern.slf4j.Slf4j;
import org.redisson.Redisson;
import org.redisson.api.RLock;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;

@RestController
@RequestMapping("/test")
@Slf4j
public class TestController {

    @Resource
    RedisUtils redisUtils;

    public int i=0;

    @Resource
    Redisson redisson;

    Integer q=100;
    @GetMapping
    public String test() throws InterruptedException {
//        redisUtils.lock("this");
        RLock aThis = redisson.getLock("this");
        aThis.lock();
//synchronized ("this".intern()){
    String value = (String) redisUtils.getValue("1234");
        log.info("i={},q={}",i++,q);
    if (ObjectUtil.isEmpty(value)){
        redisUtils.setValue("1234", RandomUtil.randomString(10),5L, TimeUnit.MINUTES);
        value = (String) redisUtils.getValue("1234");
    }
//        redisUtils.unlock("this");
    aThis.unlock();
    return value.toString();
//}

    }
}

image-20230912181637777

人家的QPS将近1000

好吧我是fw

会不会是因为Redisson没有使用公平锁的问题?

我们来试一试公平锁

image-20230912183422296

事实证明,我确实是fw

思考优化

我的思路和Redisson底层思路差不多,但是为什么会出现这么大的差错?

数据结构的选择?

我们的公平锁队列的实现方式还记得吗?

public static ConcurrentHashMap<String, ConcurrentLinkedQueue<Thread>> threadWaitQueue=new ConcurrentHashMap<>();;

我们为了并发安全使用了ConcurrentHashMap ConcurrentLinkedQueue

使用ConcurrentHashMap是为了避免扩容时rehash,导致出现循环链表

那么我们使用ConcurrentLinkedQueue的目的呢?

我们多个线程加入的时候,是看谁先加入,Queue也是单机的

在删除的时候,如果有多个订阅者同时监听,那么可能会造成任务丢失,但是我们这里,一个服务内只有一个消费者,貌似没必要。

那么我们使用最基础的,现在改改代码

@SneakyThrows
    @Override
    public void onMessage(Message message, byte[] pattern) {
        log.info("从消息通道={}监听到消息",new String(pattern));
        log.info("从消息通道={}监听到消息",new String(message.getChannel()));
        String lockName = new String(message.getBody()).substring(1, message.getBody().length - 1);
        log.info("元消息={}", lockName);
        log.info("threadWaitQueue:{}",threadWaitQueue);;
        LinkedList<Thread> threads = threadWaitQueue.get(lockName);
        if (ObjectUtil.isEmpty(threads)||threads.isEmpty()){
            log.info("没有线程需要lock:{}在等待", lockName);
            threadWaitQueue.remove(lockName);
            return ;
        }
            Thread peek = threads.peek();
            log.info("唤醒线程={}",peek.getName());
            LockSupport.unpark(peek);   // 竞争到后会自行删除

    }
  LinkedList<Thread> threads = threadWaitQueue.get(lockName);

                while(true){
                        String[] hashkey = new String[1];
                                aLong=luaToRedisByLock(lockName, exptime, timeUnit, hashkey);
                                if (aLong==null) {
                                        break;
                                }
                        log.debug("katool=> {}未抢到锁,线程等待通知唤醒,最多等待时间:{},锁名:{},过期时间:{},单位:{}",hashkey[0],aLong,lockName,exptime,timeUnit);
//                                Thread.sleep(aLong/3);     // 初步改进:使用线程休眠,采用自旋锁+线程互斥
                        if (ObjectUtil.isEmpty(threads)){
                                threads=new LinkedList<Thread>();
                                log.info("新增线程进入等待lock:{}队列",lockName);
                                threadWaitQueue.putIfAbsent(lockName,threads);
                                log.debug("threadWaitQueue:{}",threadWaitQueue);;
                        }
                        if (!threads.contains(currentThread)) {
                               threads.push(currentThread);
                        }
                        log.debug("threadWaitQueue:{}",threadWaitQueue);;
                        LockSupport.parkNanos(((aLong<<1)+aLong)>>1);             // 自行争取一次
                        log.debug("katool=> {}未抢到锁,线程被唤醒,重新抢锁,锁名:{},过期时间:{},单位:{}",hashkey[0],lockName,exptime,timeUnit);
                }
                // 线程被唤醒后删除自身的thread队列,避免死锁
                if (threads.contains(currentThread)) {
                        threads.remove(currentThread);
                }

看看优化后的效果?

image-20230912212558725

和原来差不多?难道是log的问题?

我把log改到debug级别

image-20230912213209770

woc???

那我试试用线程安全的queue

image-20230912214429245

好好好,都差不多,还是改回去,免得出现一些bug,hhh

后来查了一下,log.info采用的是异步输出,也就是说会占用线程,从而大大减小了QPS,我的电脑目前单机项目最高的QPS在1k左右,最高能够并发5000个线程,超过就得报错。

3000并发测试

下面这个是3000并发的QPS

image-20230913092704734

另外还有,这篇文章的代码我采用了掠夺式,也就是线程被锁一定时间后还没有被通知唤醒的话,自行回去争取一次拿锁,文章只是实现一些思路,具体的使用可以更具自己的开发环境进行修改,目前KaTool中支持掠夺和非掠夺两种方式。

  • 微信或QQ扫一扫

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注

目录