分布式锁比较常见的实现方式有以下几种:

1.Memcached 实现的分布式锁:使用 add 命令,添加成功的情况下,表示创建分布式锁成功。

2.ZooKeeper 实现的分布式锁:使用 ZooKeeper 顺序临时节点来实现分布式锁。

3.Reids的分布式锁,很多大公司会基于Reidis做扩展开发。setnx(set if not exists),Redisson,watch dog。

4.基于数据库,比如 MySQL。主键或唯一索引的唯一性。

1.Redisson

3.1.介绍

Redisson是一个Redis的java 客户端。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。

3.2.官网

https://redisson.org/

3.3.教程

https://github.com/redisson/redisson/wiki

3.4.基本用法

3.4.1.导入依赖

         <dependency>
            <groupId>org.redisson</groupId>

            <artifactId>redisson</artifactId>

            <version>3.11.1</version>

        </dependency>

3.4.2.application.properties

redisson.address=redis://redis:6379
redisson.password=qweQWE123!@###

3.4.3.RedissonConfig.java

@Configuration
public class RedissonConfig {

    @Value("${redisson.address}")
    private String addressUrl;
    @Value("${redisson.password}")
    private String password;
    @Bean
    public RedissonClient getRedisson() throws Exception{
        Config config = new Config();
        config.useSingleServer()
                .setAddress(addressUrl)
                .setPassword(password);
        RedissonClient redisson = Redisson.create(config);
        return redisson;
    }
}

3.4.4.重构代码

    @Resource
    private RedissonClient redissonClient;
    
    @GetMapping("/seckill/{skuId}")
    public ResponseEntity<Void> seckill(@PathVariable("skuId") String skuId) throws InterruptedException {
        //Redisson锁解决以上所有问题
        String lockKey = "buying_" + skuId;
        RLock lock = redissonClient.getLock(lockKey);
        if (lock.tryLock(30L,TimeUnit.SECONDS)) {
            try {

                Integer stock = Integer.valueOf((String) stringRedisTemplate.opsForHash().get(STOCK_KEY, skuId));
                //1.判断库存是否足够 > 0
                if (stock <= 0) {
                    //1.1.不够->返回400
                    return ResponseEntity.badRequest().build();
                }

                //1.2.够->减库存,返回200
                log.debug("剩余库存:{}", stock);
//                Thread.sleep(3000);
                stringRedisTemplate.opsForHash().increment(STOCK_KEY, skuId, -1);
                new AService().some();
            } catch (Exception e) {
                log.error("秒杀异常", e);
                return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
            } finally {
                //解锁
               lock.unlock();
            }

        }
        return ResponseEntity.ok().build();
    }

测试结果如下:

没有超卖

吞吐量提升2倍

3.5.原理

3.6.源码

3.6.1.加锁逻辑

    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  //当前线程还未加锁成功
                  "if (redis.call('exists', KEYS[1]) == 0) then " +
                      //hset("buying_2","ef2f1dad-1a99-4720-bf3a-069f46cf18d2:211",1)
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                      //设置过期时间(默认30s)                        
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  //前线程已经加锁成功
                  //hexists("buying_2","ef2f1dad-1a99-4720-bf3a-069f46cf18d2:211")       
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      //累加加锁次数                        
                      //hincrby("buying_2","ef2f1dad-1a99-4720-bf3a-069f46cf18d2:211") 
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      //设置过期时间(默认30s)                         
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  //加锁失败,返回当前锁过期时间,方便后续重试                            
                  "return redis.call('pttl', KEYS[1]);",
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }

 KEYS[1]:keys中第一个元素(redis key)
 ARGV[1]:params中第一个元素 (过期时间)
 ARGV[2]:params中第二个元素 (唯一标识)

以上主要逻辑是通过lua实现的。

什么是lua

Lua 是一种轻量小巧的脚本语言,用标准C语言编写并以源代码形式开放, 其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。

lua优点

支持原子性操作:Redis会将整个脚本作为一个整体执行,中间不会被其他请求插入。因此在脚本运行过程中无需担心会出现竞态条件,无需使用事务
降低网络开销:将多个请求通过脚本的形式一次发送到服务器,减少了网络的时延
脚本复用:客户端发送的脚本可支持永久存在redis中,这样其他客户端可以复用这一脚本,而不需要使用代码完成相同的逻辑。

为什么redis采用lua实现?

redis的每个命令是具有原子性的,在某些特殊的场景下,我们需要多个命令的原子性。保证多个命名执行过程中不能被干扰。

3.6.2.解锁逻辑

    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                //锁不存在,直接返回                              
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                "end; " +
                //加锁次数减1,返回加锁次数                              
                "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                //大于0,说明还不能删除,刷新过期时间即可                              
                "if (counter > 0) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "return 0; " +
                //否则,删除索引,并且发布解锁消息                              
                "else " +
                    "redis.call('del', KEYS[1]); " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; "+
                "end; " +
                "return nil;",
                Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));

    }

3.6.3.看门狗刷新过期时间

    protected RFuture<Boolean> renewExpirationAsync(long threadId) {
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                //判断是否存在                               
                "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    //存在,刷新过期时间                          
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return 1; " +
                "end; " +
                "return 0;",
            Collections.<Object>singletonList(getName()), 
            internalLockLeaseTime, getLockName(threadId));
    }

以上任务在一个子线程中执行。

2.原生redis封装RedisLock接口

解决:阻塞锁,误删锁,可重入锁,提前释放锁,锁失效等问题

结合以上分析,解决方案如下:

自定义lock接口

public interface Lock {

    /**
     * 获取锁
     * @param lockKey 锁的唯一key
     * @param sec 尝试获取多久后放弃,单位秒
     * @return true:加锁成功  false:加锁失败
     */
    Boolean lock(String lockKey,int sec);

    /**
     * 解锁
     */
    void unlock();

}

实现RedisLock

@Slf4j
@Component
public class RedisLock implements Lock {

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    private static final ThreadLocal<Map<String, Object>> threadLocal = new ThreadLocal<>();

    private final static String LOCK_KEY = "lockKey";
    private final static String THREAD_ID = "threadId";
    private final static String THREAD = "thread";

    @Override
    public Boolean lock(String lockKey,int sec) {
        Boolean lock = false;
        //1.先判断当前线程是否已经有锁
        Map<String, Object> threadMap = threadLocal.get();
        String threadId = null;

        HashOperations<String, Object, Object> hashOperations = stringRedisTemplate.opsForHash();

        if (threadMap!=null) {
            //1.1.有锁->次数累加
            threadId = threadMap.get(THREAD_ID).toString();
            hashOperations.increment(lockKey, threadId, 1);
            lock = true;
        } else {
            //1.2.没有锁->setnx加锁,
            threadId = UUID.randomUUID().toString();
            long begin = System.currentTimeMillis();
            while (!lock) {
                //加锁,默认加锁次数为1
                lock = stringRedisTemplate.opsForHash().putIfAbsent(lockKey, lockKey, "1");
                if (lock) {
                    //绑定线程id
                    stringRedisTemplate.opsForHash().put(lockKey, threadId, "1");
                    //设置过期时间
                    stringRedisTemplate.expire(lockKey, 100, TimeUnit.SECONDS);
                }else{
                    //判断是否超时
                    long end = System.currentTimeMillis();
                    if (end - begin > sec * 1000) {
                        log.debug("获取锁超时,取消...");
                        return false;
                    }
                }
            }

            //开启异步续约线程,执行续约
            Thread thread = new Thread(() -> {
                //过5秒,就将lockkey过期时间延长10秒
                try {
                    while (true) {
                        Thread.sleep(5000);
                        log.debug("自动续约,延长10秒...");
                        stringRedisTemplate.expire(lockKey, 100, TimeUnit.SECONDS);
                    }
                } catch (InterruptedException e) {
                    log.debug("异步续命线程中断...");
                }
            });
            log.debug("开启异步续命线程...");
            thread.start();

            //将异步线程存入当前线程中,方便后续中止
            Map<String, Object> map = new HashMap<>(5);
            map.put("thread", thread);
            map.put("lockKey", lockKey);
            map.put("threadId", threadId);

            threadLocal.set(map);
        }

        return lock;
    }

    @Override
    public void unlock() {
        Map<String, Object> threadMap = threadLocal.get();
        if (threadMap != null) {
            String lockKey = threadMap.get(LOCK_KEY).toString();
            String threadId = threadMap.get(THREAD_ID).toString();
            Thread thread = (Thread) threadMap.get(THREAD);

            HashOperations<String, Object, Object> hashOperations = stringRedisTemplate.opsForHash();
            Long count = hashOperations.increment(lockKey, threadId, -1);
            //加锁次数-1<=0,就可以解锁
            if (count <= 0) {
                stringRedisTemplate.delete(lockKey);
                //同时清除异步续约任务
                log.debug("中断任务:{}", threadId);
                thread.interrupt();
                log.debug("删除当前线程锁信息:{}", threadMap);
                threadLocal.remove();
            }
        }
    }

}

    //性能优化
    @Autowired
    private Lock redisLock;

    @GetMapping("/seckill/{skuId}")
    public  ResponseEntity<Boolean> seckill(@PathVariable("skuId") String skuId) {
        //先获取锁(setnx buying_skuid  是否能成功)
        String lockKey = "buying_"+skuId;
        //双检索机制,先不加锁查库存
        Long stock = Long.valueOf(redisTemplate.opsForHash().get(STOCK_KEY, skuId).toString());
        if (stock <= 0) {
            log.debug("秒杀异常");
            return ResponseEntity.status(400).body(false);
        }
        Boolean lock = redisLock.lock(lockKey,5);
        if (lock) {
            try {
                //1.加锁后,再查库存一次。判断库存是否足够
                stock = Long.valueOf(redisTemplate.opsForHash().get(STOCK_KEY, skuId).toString());
                //1.1.够->减库存,并返回true
                if (stock > 0) {
                    aService.test(lockKey);
                    log.debug("开始减库存:{}",stock);
                    redisTemplate.opsForHash().increment(STOCK_KEY, skuId,-1);
                    return ResponseEntity.ok(true);
                }
                //1.2.不够->返回false
            } catch (Exception e) {
                log.debug("秒杀异常");
                return ResponseEntity.status(400).body(false);
            }finally {
                redisLock.unlock();
            }
        }
        return ResponseEntity.status(400).body(false);
    }

测试发现一切ok。