分布式事物锁

分布式锁解决司机抢单

因为上面写的司机抢单并没有考虑并发,类似于电商的超卖问题

**解决方案

  • 第一种 设置数据库事务的隔离级别,设置为Serializable,效率低下

  • 第二种 使用乐观锁解决,通过版本号进行控制

  • 第三种 加锁解决,学习过synchronized 及lock锁,本地锁,目前微服务架构,分布式部署方式。

本地锁的局限性

  • 我们使用锁一般都是,synchronized 及lock锁,这些都是本地锁,只在当前jvm生效,在微服务里面就是只有当个微服务生效
  • 举例演示 TestController
@Tag(name = "测试接口")  
@RestController  
@RequestMapping("/order/test")  
public class TestController {  
  
    @Autowired  
    private TestService testService;  
  
    @GetMapping("testLock")  
    public Result testLock() {  
        testService.testLock();  
        return Result.ok();  
    }  
}

TestServiceImpl

@Service  
public class TestServiceImpl implements TestService{  
  
    @Autowired  
    private StringRedisTemplate redisTemplate;  
  
    @Override  
    public synchronized void testLock() {  
        //从redis里面获取数据  
        String value = redisTemplate.opsForValue().get("num");  
  
        if(StringUtils.isBlank(value)) {  
            return;  
        }  
  
        //把从redis获取数据+1  
        int num = Integer.parseInt(value);  
  
        //数据+1之后放回到redis里面  
        redisTemplate.opsForValue().set("num",String.valueOf(++num));  
    }  
}
  • 测试,模拟200个请求并发过程

  • 在redis添加初始值,num = 0

使用测试工具 jmeter 实现功能测试

![[Pasted image 20250605203137.png]] ![[Pasted image 20250605203153.png]]

加上锁 synchronized 后是没有问题

  • 上面的测试方式是在一个服务内进行的,单机测试,但是如果部署到集群下这个锁不一定会生效

实现分布式锁-redis

设置过期时间,到时间之后自动释放锁

@Override
public void testLock() {
	//从redis里面获取数据
	//1 获取当前锁  setnx
	Boolean ifAbsent = redisTemplate.opsForValue().setIfAbsent("lock", "lock");

	//2 如果获取到锁,从redis获取数据 数据+1 放回redis里面
	if(ifAbsent) {
		//获取锁成功,执行业务代码
		//1.先从redis中通过key num获取值  key提前手动设置 num 初始值:0
		String value = redisTemplate.opsForValue().get("num");
		//2.如果值为空则非法直接返回即可
		if (StringUtils.isBlank(value)) {
			return;
		}
		//3.对num值进行自增加一
		int num = Integer.parseInt(value);
		redisTemplate.opsForValue().set("num", String.valueOf(++num));

		//3 释放锁
		redisTemplate.delete("lock");
	} else {
		try {
			Thread.sleep(100);
			this.testLock();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

问题:如果锁的删除时间小于事物的执行时间 例如:

  • 场景:如果业务逻辑执行的时间是7s
  • index1业务逻辑没执行完,3秒后释放
  • index2获取到锁,执行业务逻辑,3秒后锁会被自动释放
  • index3获取到锁,执行业务逻辑
  • index1业务逻辑完成,开始调用del释放锁,这是释放的是index3的锁,导致index3的业务只执行1s就被别人释放

修改后的代码

//uuid防止误删
@Override
public void testLock() {
    //从redis里面获取数据
    String uuid = UUID.randomUUID().toString();
    //1 获取当前锁  setnx  + 设置过期时间
    //        Boolean ifAbsent = redisTemplate.opsForValue().setIfAbsent("lock", "lock");
    Boolean ifAbsent =
            redisTemplate.opsForValue()
                    .setIfAbsent("lock", uuid,10, TimeUnit.SECONDS);

    //2 如果获取到锁,从redis获取数据 数据+1 放回redis里面
    if(ifAbsent) {
        //获取锁成功,执行业务代码
        //1.先从redis中通过key num获取值  key提前手动设置 num 初始值:0
        String value = redisTemplate.opsForValue().get("num");
        //2.如果值为空则非法直接返回即可
        if (StringUtils.isBlank(value)) {
            return;
        }
        //3.对num值进行自增加一
        int num = Integer.parseInt(value);
        redisTemplate.opsForValue().set("num", String.valueOf(++num));
        //出现异常

        //3 释放锁
        String redisUuid = redisTemplate.opsForValue().get("lock");
        if(uuid.equals(redisUuid)) {
            redisTemplate.delete("lock");
        }
       
    } else {
        try {
            Thread.sleep(100);
            this.testLock();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

通过LUA脚本保证原子性

通过uuid防止误删,但是还是有问题,不具备原子性

@Override
public void testLock() {
	//从redis里面获取数据
	String uuid = UUID.randomUUID().toString();
	//1 获取当前锁  setnx  + 设置过期时间
	//        Boolean ifAbsent = redisTemplate.opsForValue().setIfAbsent("lock", "lock");
	Boolean ifAbsent =
			redisTemplate.opsForValue()
					.setIfAbsent("lock", uuid,3, TimeUnit.SECONDS);

	//2 如果获取到锁,从redis获取数据 数据+1 放回redis里面
	if(ifAbsent) {
		//获取锁成功,执行业务代码
		//1.先从redis中通过key num获取值  key提前手动设置 num 初始值:0
		String value = redisTemplate.opsForValue().get("num");
		//2.如果值为空则非法直接返回即可
		if (StringUtils.isBlank(value)) {
			return;
		}
		//3.对num值进行自增加一
		int num = Integer.parseInt(value);
		redisTemplate.opsForValue().set("num", String.valueOf(++num));
		//出现异常

		//3 释放锁 lua脚本实现
		DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
		// lua脚本
		String script = "if redis.call(\"get\",KEYS[1]) == ARGV[1]\n" +
				"then\n" +
				"    return redis.call(\"del\",KEYS[1])\n" +
				"else\n" +
				"    return 0\n" +
				"end";
		redisScript.setScriptText(script);
		// 设置返回结果
		redisScript.setResultType(Long.class);
		redisTemplate.execute(redisScript, Arrays.asList("lock"), uuid);
	} else {
		try {
			Thread.sleep(100);
			this.testLock();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

总结

1、加锁

// 1. 从Redis中获取锁,set k1 v1 px 20000 nx
String uuid = UUID.randomUUID().toString();
Boolean lock = this.redisTemplate.opsForValue()
      .setIfAbsent("lock", uuid, 2, TimeUnit.SECONDS);

2、使用lua释放锁

// 2. 释放锁 del
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
// 设置lua脚本返回的数据类型
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
// 设置lua脚本返回类型为Long
redisScript.setResultType(Long.class);
redisScript.setScriptText(script);
redisTemplate.execute(redisScript, Arrays.asList("lock"),uuid);

3、重试

Thread.sleep(500); 
testLock();

为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件:

第一个:互斥性,在任何时刻,只有一个客户端能持有锁。

第二个:不会发生死锁,即使有一个客户端在获取锁操作时候崩溃了,也能保证其他客户端能获取到锁。

第三个:解铃还须系铃人,解锁加锁必须同一个客户端操作。

第四个:加锁和解锁必须具备原子性

实现分布式锁-Redisson

准备工作

引入依赖

在common里service-utils引入依赖

<dependency>  
    <groupId>org.redisson</groupId>  
    <artifactId>redisson</artifactId>  
</dependency>

创建Redisson配置类

@Data  
@Configuration  
@ConfigurationProperties(prefix = "spring.data.redis")  
public class RedissonConfig {  
  
    private String host;  
  
    private String password;  
  
    private String port;  
  
    private int timeout = 3000;  
    private static String ADDRESS_PREFIX = "redis://";  
  
    @Bean  
    RedissonClient redissonSingle() {  
        Config config = new Config();  
  
        if(!StringUtils.hasText(host)){  
            throw new RuntimeException("host is  empty");  
        }  
        SingleServerConfig serverConfig = config.useSingleServer()  
                .setAddress(ADDRESS_PREFIX + this.host + ":" + port)  
                .setTimeout(this.timeout);  
        if(StringUtils.hasText(this.password)) {  
            serverConfig.setPassword(this.password);  
        }  
        return Redisson.create(config);  
    }  
}

在业务方法编写加锁和解锁

// Redisson实现  
@Override  
public void testLock() {  

	// 通过redisson创建锁对象  
	RLock lock = redissonClient.getLock("lock1");  

	// 尝试获取锁  
	// 阻塞一直等待直到获取到,获取锁之后设置锁的超时时间为10秒,没有参数默认是30秒  
	lock.lock(10, TimeUnit.SECONDS);  

	// tryLock,设置等待时间为30秒,超时时间为10秒  
//        try {  
//            boolean b = lock.tryLock(30, 10, TimeUnit.SECONDS);  
//        } catch (InterruptedException e) {  
//            throw new RuntimeException(e);  
//        }  

	//编写业务代码  
	//1.先从redis中通过key num获取值  key提前手动设置 num 初始值:0  
	String value = redisTemplate.opsForValue().get("num");  
	//2.如果值为空则非法直接返回即可  
	if (StringUtils.isBlank(value)) {  
		return;  
	}  
	//3.对num值进行自增加一  
	int num = Integer.parseInt(value);  
	redisTemplate.opsForValue().set("num", String.valueOf(++num));  

	//释放锁  
	lock.unlock();  
}

看门狗原理

只要线程加锁成功,就会启动一个watch dog看门狗,它是一个后台进程,每隔十秒检查一下,如果线程还持有锁,那么就会不断延长锁key的生存时间,因此可以解决锁过期释放,业务还没完成的问题

  • 若使用 tryLock() 方法并指定了租约时间,则不会启动看门狗
  • 如果我们未指定超时时间,就会使用 lockwatchdogTimeout = 30 * 1000
Posted on:
June 7, 2025
Length:
3 minute read, 581 words
Tags:
project
See Also:
CompletableFuture异步编排
MongoDB
快速上手xxl-job