分布式锁比较常见的实现方式有以下几种:
1.Memcached 实现的分布式锁:使用 add 命令,添加成功的情况下,表示创建分布式锁成功。
2.ZooKeeper 实现的分布式锁:使用 ZooKeeper 顺序临时节点来实现分布式锁。
3.Reids的分布式锁,很多大公司会基于Reidis做扩展开发。setnx(set if not exists),Redisson,watch dog。
4.基于数据库,比如 MySQL。主键或唯一索引的唯一性。
1.Redisson
3.1.介绍
Redisson是一个Redis的java 客户端。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet
, Set
, Multimap
, SortedSet
, Map
, List
, Queue
, BlockingQueue
, Deque
, BlockingDeque
, Semaphore
, Lock
, AtomicLong
, CountDownLatch
, Publish / Subscribe
, Bloom filter
, Remote service
, Spring cache
, Executor service
, Live Object service
, Scheduler service
) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。
3.2.官网
3.3.教程
https://github.com/redisson/redisson/wiki
3.4.基本用法
3.4.1.导入依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.11.1</version>
</dependency>
3.4.2.application.properties
redisson.address=redis://redis:6379
redisson.password=qweQWE123!@###
3.4.3.RedissonConfig.java
@Configuration
public class RedissonConfig {
@Value("${redisson.address}")
private String addressUrl;
@Value("${redisson.password}")
private String password;
@Bean
public RedissonClient getRedisson() throws Exception{
Config config = new Config();
config.useSingleServer()
.setAddress(addressUrl)
.setPassword(password);
RedissonClient redisson = Redisson.create(config);
return redisson;
}
}
3.4.4.重构代码
@Resource
private RedissonClient redissonClient;
@GetMapping("/seckill/{skuId}")
public ResponseEntity<Void> seckill(@PathVariable("skuId") String skuId) throws InterruptedException {
//Redisson锁解决以上所有问题
String lockKey = "buying_" + skuId;
RLock lock = redissonClient.getLock(lockKey);
if (lock.tryLock(30L,TimeUnit.SECONDS)) {
try {
Integer stock = Integer.valueOf((String) stringRedisTemplate.opsForHash().get(STOCK_KEY, skuId));
//1.判断库存是否足够 > 0
if (stock <= 0) {
//1.1.不够->返回400
return ResponseEntity.badRequest().build();
}
//1.2.够->减库存,返回200
log.debug("剩余库存:{}", stock);
// Thread.sleep(3000);
stringRedisTemplate.opsForHash().increment(STOCK_KEY, skuId, -1);
new AService().some();
} catch (Exception e) {
log.error("秒杀异常", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
} finally {
//解锁
lock.unlock();
}
}
return ResponseEntity.ok().build();
}
测试结果如下:
没有超卖
吞吐量提升2倍
3.5.原理
3.6.源码
3.6.1.加锁逻辑
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
//当前线程还未加锁成功
"if (redis.call('exists', KEYS[1]) == 0) then " +
//hset("buying_2","ef2f1dad-1a99-4720-bf3a-069f46cf18d2:211",1)
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
//设置过期时间(默认30s)
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
//前线程已经加锁成功
//hexists("buying_2","ef2f1dad-1a99-4720-bf3a-069f46cf18d2:211")
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
//累加加锁次数
//hincrby("buying_2","ef2f1dad-1a99-4720-bf3a-069f46cf18d2:211")
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
//设置过期时间(默认30s)
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
//加锁失败,返回当前锁过期时间,方便后续重试
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
KEYS[1]:keys中第一个元素(redis key)
ARGV[1]:params中第一个元素 (过期时间)
ARGV[2]:params中第二个元素 (唯一标识)
以上主要逻辑是通过lua实现的。
什么是lua
Lua 是一种轻量小巧的脚本语言,用标准C语言编写并以源代码形式开放, 其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。
lua优点
支持原子性操作:Redis会将整个脚本作为一个整体执行,中间不会被其他请求插入。因此在脚本运行过程中无需担心会出现竞态条件,无需使用事务
降低网络开销:将多个请求通过脚本的形式一次发送到服务器,减少了网络的时延
脚本复用:客户端发送的脚本可支持永久存在redis中,这样其他客户端可以复用这一脚本,而不需要使用代码完成相同的逻辑。
为什么redis采用lua实现?
redis的每个命令是具有原子性的,在某些特殊的场景下,我们需要多个命令的原子性。保证多个命名执行过程中不能被干扰。
3.6.2.解锁逻辑
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
//锁不存在,直接返回
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
//加锁次数减1,返回加锁次数
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
//大于0,说明还不能删除,刷新过期时间即可
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
//否则,删除索引,并且发布解锁消息
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
3.6.3.看门狗刷新过期时间
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
//判断是否存在
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
//存在,刷新过期时间
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
}
以上任务在一个子线程中执行。
2.原生redis封装RedisLock接口
解决:阻塞锁,误删锁,可重入锁,提前释放锁,锁失效等问题
结合以上分析,解决方案如下:
自定义lock接口
public interface Lock {
/**
* 获取锁
* @param lockKey 锁的唯一key
* @param sec 尝试获取多久后放弃,单位秒
* @return true:加锁成功 false:加锁失败
*/
Boolean lock(String lockKey,int sec);
/**
* 解锁
*/
void unlock();
}
实现RedisLock
@Slf4j
@Component
public class RedisLock implements Lock {
@Autowired
private StringRedisTemplate stringRedisTemplate;
private static final ThreadLocal<Map<String, Object>> threadLocal = new ThreadLocal<>();
private final static String LOCK_KEY = "lockKey";
private final static String THREAD_ID = "threadId";
private final static String THREAD = "thread";
@Override
public Boolean lock(String lockKey,int sec) {
Boolean lock = false;
//1.先判断当前线程是否已经有锁
Map<String, Object> threadMap = threadLocal.get();
String threadId = null;
HashOperations<String, Object, Object> hashOperations = stringRedisTemplate.opsForHash();
if (threadMap!=null) {
//1.1.有锁->次数累加
threadId = threadMap.get(THREAD_ID).toString();
hashOperations.increment(lockKey, threadId, 1);
lock = true;
} else {
//1.2.没有锁->setnx加锁,
threadId = UUID.randomUUID().toString();
long begin = System.currentTimeMillis();
while (!lock) {
//加锁,默认加锁次数为1
lock = stringRedisTemplate.opsForHash().putIfAbsent(lockKey, lockKey, "1");
if (lock) {
//绑定线程id
stringRedisTemplate.opsForHash().put(lockKey, threadId, "1");
//设置过期时间
stringRedisTemplate.expire(lockKey, 100, TimeUnit.SECONDS);
}else{
//判断是否超时
long end = System.currentTimeMillis();
if (end - begin > sec * 1000) {
log.debug("获取锁超时,取消...");
return false;
}
}
}
//开启异步续约线程,执行续约
Thread thread = new Thread(() -> {
//过5秒,就将lockkey过期时间延长10秒
try {
while (true) {
Thread.sleep(5000);
log.debug("自动续约,延长10秒...");
stringRedisTemplate.expire(lockKey, 100, TimeUnit.SECONDS);
}
} catch (InterruptedException e) {
log.debug("异步续命线程中断...");
}
});
log.debug("开启异步续命线程...");
thread.start();
//将异步线程存入当前线程中,方便后续中止
Map<String, Object> map = new HashMap<>(5);
map.put("thread", thread);
map.put("lockKey", lockKey);
map.put("threadId", threadId);
threadLocal.set(map);
}
return lock;
}
@Override
public void unlock() {
Map<String, Object> threadMap = threadLocal.get();
if (threadMap != null) {
String lockKey = threadMap.get(LOCK_KEY).toString();
String threadId = threadMap.get(THREAD_ID).toString();
Thread thread = (Thread) threadMap.get(THREAD);
HashOperations<String, Object, Object> hashOperations = stringRedisTemplate.opsForHash();
Long count = hashOperations.increment(lockKey, threadId, -1);
//加锁次数-1<=0,就可以解锁
if (count <= 0) {
stringRedisTemplate.delete(lockKey);
//同时清除异步续约任务
log.debug("中断任务:{}", threadId);
thread.interrupt();
log.debug("删除当前线程锁信息:{}", threadMap);
threadLocal.remove();
}
}
}
}
//性能优化
@Autowired
private Lock redisLock;
@GetMapping("/seckill/{skuId}")
public ResponseEntity<Boolean> seckill(@PathVariable("skuId") String skuId) {
//先获取锁(setnx buying_skuid 是否能成功)
String lockKey = "buying_"+skuId;
//双检索机制,先不加锁查库存
Long stock = Long.valueOf(redisTemplate.opsForHash().get(STOCK_KEY, skuId).toString());
if (stock <= 0) {
log.debug("秒杀异常");
return ResponseEntity.status(400).body(false);
}
Boolean lock = redisLock.lock(lockKey,5);
if (lock) {
try {
//1.加锁后,再查库存一次。判断库存是否足够
stock = Long.valueOf(redisTemplate.opsForHash().get(STOCK_KEY, skuId).toString());
//1.1.够->减库存,并返回true
if (stock > 0) {
aService.test(lockKey);
log.debug("开始减库存:{}",stock);
redisTemplate.opsForHash().increment(STOCK_KEY, skuId,-1);
return ResponseEntity.ok(true);
}
//1.2.不够->返回false
} catch (Exception e) {
log.debug("秒杀异常");
return ResponseEntity.status(400).body(false);
}finally {
redisLock.unlock();
}
}
return ResponseEntity.status(400).body(false);
}
测试发现一切ok。