分布式事物锁
分布式锁解决司机抢单
因为上面写的司机抢单并没有考虑并发,类似于电商的超卖问题
**解决方案
第一种 设置数据库事务的隔离级别,设置为Serializable,效率低下
第二种 使用乐观锁解决,通过版本号进行控制
第三种 加锁解决,学习过synchronized 及lock锁,本地锁,目前微服务架构,分布式部署方式。
本地锁的局限性
- 我们使用锁一般都是,synchronized 及lock锁,这些都是本地锁,只在当前jvm生效,在微服务里面就是只有当个微服务生效
- 举例演示 TestController
@Tag(name = "测试接口")
@RestController
@RequestMapping("/order/test")
public class TestController {
@Autowired
private TestService testService;
@GetMapping("testLock")
public Result testLock() {
testService.testLock();
return Result.ok();
}
}
TestServiceImpl
@Service
public class TestServiceImpl implements TestService{
@Autowired
private StringRedisTemplate redisTemplate;
@Override
public synchronized void testLock() {
//从redis里面获取数据
String value = redisTemplate.opsForValue().get("num");
if(StringUtils.isBlank(value)) {
return;
}
//把从redis获取数据+1
int num = Integer.parseInt(value);
//数据+1之后放回到redis里面
redisTemplate.opsForValue().set("num",String.valueOf(++num));
}
}
测试,模拟200个请求并发过程
在redis添加初始值,num = 0
使用测试工具 jmeter 实现功能测试
![[Pasted image 20250605203137.png]] ![[Pasted image 20250605203153.png]]
加上锁 synchronized 后是没有问题
- 上面的测试方式是在一个服务内进行的,单机测试,但是如果部署到集群下这个锁不一定会生效
实现分布式锁-redis
设置过期时间,到时间之后自动释放锁
@Override
public void testLock() {
//从redis里面获取数据
//1 获取当前锁 setnx
Boolean ifAbsent = redisTemplate.opsForValue().setIfAbsent("lock", "lock");
//2 如果获取到锁,从redis获取数据 数据+1 放回redis里面
if(ifAbsent) {
//获取锁成功,执行业务代码
//1.先从redis中通过key num获取值 key提前手动设置 num 初始值:0
String value = redisTemplate.opsForValue().get("num");
//2.如果值为空则非法直接返回即可
if (StringUtils.isBlank(value)) {
return;
}
//3.对num值进行自增加一
int num = Integer.parseInt(value);
redisTemplate.opsForValue().set("num", String.valueOf(++num));
//3 释放锁
redisTemplate.delete("lock");
} else {
try {
Thread.sleep(100);
this.testLock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
问题:如果锁的删除时间小于事物的执行时间 例如:
- 场景:如果业务逻辑执行的时间是7s
- index1业务逻辑没执行完,3秒后释放
- index2获取到锁,执行业务逻辑,3秒后锁会被自动释放
- index3获取到锁,执行业务逻辑
- index1业务逻辑完成,开始调用del释放锁,这是释放的是index3的锁,导致index3的业务只执行1s就被别人释放
修改后的代码
//uuid防止误删
@Override
public void testLock() {
//从redis里面获取数据
String uuid = UUID.randomUUID().toString();
//1 获取当前锁 setnx + 设置过期时间
// Boolean ifAbsent = redisTemplate.opsForValue().setIfAbsent("lock", "lock");
Boolean ifAbsent =
redisTemplate.opsForValue()
.setIfAbsent("lock", uuid,10, TimeUnit.SECONDS);
//2 如果获取到锁,从redis获取数据 数据+1 放回redis里面
if(ifAbsent) {
//获取锁成功,执行业务代码
//1.先从redis中通过key num获取值 key提前手动设置 num 初始值:0
String value = redisTemplate.opsForValue().get("num");
//2.如果值为空则非法直接返回即可
if (StringUtils.isBlank(value)) {
return;
}
//3.对num值进行自增加一
int num = Integer.parseInt(value);
redisTemplate.opsForValue().set("num", String.valueOf(++num));
//出现异常
//3 释放锁
String redisUuid = redisTemplate.opsForValue().get("lock");
if(uuid.equals(redisUuid)) {
redisTemplate.delete("lock");
}
} else {
try {
Thread.sleep(100);
this.testLock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
通过LUA脚本保证原子性
通过uuid防止误删,但是还是有问题,不具备原子性
@Override
public void testLock() {
//从redis里面获取数据
String uuid = UUID.randomUUID().toString();
//1 获取当前锁 setnx + 设置过期时间
// Boolean ifAbsent = redisTemplate.opsForValue().setIfAbsent("lock", "lock");
Boolean ifAbsent =
redisTemplate.opsForValue()
.setIfAbsent("lock", uuid,3, TimeUnit.SECONDS);
//2 如果获取到锁,从redis获取数据 数据+1 放回redis里面
if(ifAbsent) {
//获取锁成功,执行业务代码
//1.先从redis中通过key num获取值 key提前手动设置 num 初始值:0
String value = redisTemplate.opsForValue().get("num");
//2.如果值为空则非法直接返回即可
if (StringUtils.isBlank(value)) {
return;
}
//3.对num值进行自增加一
int num = Integer.parseInt(value);
redisTemplate.opsForValue().set("num", String.valueOf(++num));
//出现异常
//3 释放锁 lua脚本实现
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
// lua脚本
String script = "if redis.call(\"get\",KEYS[1]) == ARGV[1]\n" +
"then\n" +
" return redis.call(\"del\",KEYS[1])\n" +
"else\n" +
" return 0\n" +
"end";
redisScript.setScriptText(script);
// 设置返回结果
redisScript.setResultType(Long.class);
redisTemplate.execute(redisScript, Arrays.asList("lock"), uuid);
} else {
try {
Thread.sleep(100);
this.testLock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
总结
1、加锁
// 1. 从Redis中获取锁,set k1 v1 px 20000 nx
String uuid = UUID.randomUUID().toString();
Boolean lock = this.redisTemplate.opsForValue()
.setIfAbsent("lock", uuid, 2, TimeUnit.SECONDS);
2、使用lua释放锁
// 2. 释放锁 del
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
// 设置lua脚本返回的数据类型
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
// 设置lua脚本返回类型为Long
redisScript.setResultType(Long.class);
redisScript.setScriptText(script);
redisTemplate.execute(redisScript, Arrays.asList("lock"),uuid);
3、重试
Thread.sleep(500);
testLock();
为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件:
第一个:互斥性,在任何时刻,只有一个客户端能持有锁。
第二个:不会发生死锁,即使有一个客户端在获取锁操作时候崩溃了,也能保证其他客户端能获取到锁。
第三个:解铃还须系铃人,解锁加锁必须同一个客户端操作。
第四个:加锁和解锁必须具备原子性
实现分布式锁-Redisson
准备工作
引入依赖
在common里service-utils引入依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
</dependency>
创建Redisson配置类
@Data
@Configuration
@ConfigurationProperties(prefix = "spring.data.redis")
public class RedissonConfig {
private String host;
private String password;
private String port;
private int timeout = 3000;
private static String ADDRESS_PREFIX = "redis://";
@Bean
RedissonClient redissonSingle() {
Config config = new Config();
if(!StringUtils.hasText(host)){
throw new RuntimeException("host is empty");
}
SingleServerConfig serverConfig = config.useSingleServer()
.setAddress(ADDRESS_PREFIX + this.host + ":" + port)
.setTimeout(this.timeout);
if(StringUtils.hasText(this.password)) {
serverConfig.setPassword(this.password);
}
return Redisson.create(config);
}
}
在业务方法编写加锁和解锁
// Redisson实现
@Override
public void testLock() {
// 通过redisson创建锁对象
RLock lock = redissonClient.getLock("lock1");
// 尝试获取锁
// 阻塞一直等待直到获取到,获取锁之后设置锁的超时时间为10秒,没有参数默认是30秒
lock.lock(10, TimeUnit.SECONDS);
// tryLock,设置等待时间为30秒,超时时间为10秒
// try {
// boolean b = lock.tryLock(30, 10, TimeUnit.SECONDS);
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
//编写业务代码
//1.先从redis中通过key num获取值 key提前手动设置 num 初始值:0
String value = redisTemplate.opsForValue().get("num");
//2.如果值为空则非法直接返回即可
if (StringUtils.isBlank(value)) {
return;
}
//3.对num值进行自增加一
int num = Integer.parseInt(value);
redisTemplate.opsForValue().set("num", String.valueOf(++num));
//释放锁
lock.unlock();
}
看门狗原理
只要线程加锁成功,就会启动一个watch dog看门狗,它是一个后台进程,每隔十秒检查一下,如果线程还持有锁,那么就会不断延长锁key的生存时间,因此可以解决锁过期释放,业务还没完成的问题
- 若使用
tryLock()方法并指定了租约时间,则不会启动看门狗 - 如果我们未指定超时时间,就会使用
lockwatchdogTimeout = 30 * 1000