分布式锁入门
1 什么是分布式锁
java里的synchorized关键字和juc提供的各种同步工具类只能适用与单进程的环境下,如果出现多个进程或多个服务器上部署的应用对同一个资源(数据库表)进行争夺的方式,此时synchorized和juc同步工具类就无法控制,此时就需要引入分布式锁。
分布式锁是一种在分布式系统环境下,通过多个节点对共享资源进行访问控制的一种同步机制。它的主要目的是防止多个节点同时操作同一份数据,从而避免数据的不一致性。
2 分布式锁的特点
互斥性
在任何时刻,只有一个节点可以持有锁。
公平性
如果多个节点同时申请锁,系统应该保证每个节点都有获取锁的机会。
可重入性
同一个节点可以多次获取同一个锁,而不会被阻塞。
高可用
锁服务应该是高可用的,不能因为锁服务的故障而影响整个系统的运行。
3 分布式锁的简单实现
3.1 基于数据库的方式
SELECT语句后加FOR UPDATE的排他锁
sql的where条件中一定要有索引字段作为条件,否则可能会造成表锁
sql要包含在事务当中,事务提交后行锁自动释放
InnoDB才支持行锁
示例:
伪代码
@RestController class LockController { @Autowired JdbcTemplate jdbcTemplate; @Autowired PlatformTransactionManager transactionManager; @GetMapping("/data/lock") public Object dataLock() { // 开启事务 DefaultTransactionDefinition definition = new DefaultTransactionDefinition(); definition.setIsolationLevel(TransactionDefinition.ISOLATION_DEFAULT); definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED); TransactionStatus transaction = transactionManager.getTransaction(definition); try { Map<String, Object> map = jdbcTemplate.queryForMap("SELECT `number` FROM goods_storage WHERE `name` = 'book' FOR UPDATE"); // 模拟业务处理延时 TimeUnit.MILLISECONDS.sleep(50); Integer number = (Integer) map.get("number"); if (number > 0) { jdbcTemplate.update("UPDATE goods_storage SET `number` = `number` - 1 WHERE `name` = 'book'"); } else { throw new RuntimeException("库存不足"); } // 提交事务 System.out.println("commit============="); transactionManager.commit(transaction); } catch (Exception e) { // 回滚事务 System.out.println("rollback============="); transactionManager.rollback(transaction); throw new RuntimeException(e); } return "SUCCESS"; } }
表结构
DROP TABLE IF EXISTS `goods_storage`; CREATE TABLE `goods_storage` ( `name` varchar(255) NOT NULL, `number` int NULL DEFAULT NULL, PRIMARY KEY (`name`) USING BTREE ) ENGINE = InnoDB ; INSERT INTO `goods_storage` VALUES ('book', 10); SET FOREIGN_KEY_CHECKS = 1;
一个服务采用不同端口启动两次,采用nginx映射到不同的端口
IDEA启动相同服务的方法
修改启动类参数
idea1 不同版本的位置可能不一致,找一下Allow parallel run 这个配置
idea2 修改配置文件
idea3 点击重启
idea4 看到以不同的端口启动了两个实例
idea5
nginx 映射两个端口
upstream testmyapp { server localhost:8081; server localhost:8082; } server { listen 10000; location / { proxy_pass http://testmyapp; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; } }
jmeter并发测试
lock5 lock6 QPS 100/s 查看数据库结果,没有负数的情况
可以将FOR UDPATE 关键字去掉,会发现number字段会减少到负数
数据库乐观锁,增加版本字段
优点
:简单易实现
缺点
:增加数据库的压力
3.2 基于redis的方式
简单实现
@RestController class RedisLockController { @Autowired RedisTemplate redisTemplate; @Autowired JdbcTemplate jdbcTemplate; @GetMapping("/redis/lock") public Object redisLock() { String lockKey = "redis.lock"; String lockValue = UUID.randomUUID().toString(); try { lock(lockKey, lockValue); // 获取到锁 Map<String, Object> map = jdbcTemplate.queryForMap("SELECT `number` FROM goods_storage WHERE `name` = 'book' FOR UPDATE"); try { // 此处模拟业务逻辑处理时间 TimeUnit.MILLISECONDS.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); } Integer number = (Integer) map.get("number"); if (number > 0) { jdbcTemplate.update("UPDATE goods_storage SET `number` = `number` - 1 WHERE `name` = 'book'"); } else { throw new RuntimeException("库存不足"); } } finally { unLock(lockKey, lockValue); } return "SUCCESS"; } /** * 在获取分布式锁的基础上,首先在本地线程加锁,减少网络连接到来的延时和IO */ private synchronized void lock(String key, String lockValue) { ValueOperations ops = redisTemplate.opsForValue(); // 需要自己判断这个业务等待锁的时间,或者做更复杂的逻辑,比如引入队列等更复杂的操作,这里简单的死循环,一直等待获取到锁为止。 while (true) { // ops.setIfAbsent 是redisTemplate对setNX lua语句的封装 // setIfAbsent(Object key,Object value,long timeout,TimeUnit unit) 同时支持设置值和过期时间,也保证了操作的原子性 if (ops.setIfAbsent(key, lockValue)) { break; } // 此处做锁的重入操作,那么就需要另外维护一个进入的次数 } } private synchronized void unLock(String key, String lockValue) { ValueOperations ops = redisTemplate.opsForValue(); // 释放锁操作 // 查询锁的值,确保是自己持有的锁。 Object obj = ops.get(key); if (lockValue.equals(obj)) { // 此处还需要判断重入锁是否已经出完了,这里不做演示 redisTemplate.expire(key, 0, TimeUnit.MILLISECONDS); } } }
启动多节点(重复数据库实现的操作)
jemter压测(重复数据库实现的操作)
查看结果(重复数据库实现的操作)
4 实现分布式锁的问题
前面我们采用的mysql数据库和redis的方式简单实现了分布式锁,但还存在其他某些问题。
死锁问题
持有锁的节点出现异常或者宕机了没有释放锁,给锁设置过期时间。
锁续命问题
redis锁如果设置了key过期时间,但实际业务没有处理完。需要开启另外的线程来维护过期时间。
锁释放问题
出现错误解锁的问题,将其他锁给解了。持有锁时加上标识,只能释放自己的锁
锁的公平性问题
出现某个服务一直获取不到锁的情况。可以引入队列
锁的可重入
锁需要记录持有锁的服务信息,再次获取锁时判断是否已经持有锁
5 Redisson
引入依赖,maven坐标
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.12.0</version>
</dependency>
redisson配置类
@Configuration
public class RedissonConfig {
@Value("${spring.redis.host}")
String host;
@Value("${spring.redis.port}")
Integer port;
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
// redis 单服务,没有采用集群
config.useSingleServer().setAddress(String.format("redis://%s:%d", host, port));
return Redisson.create(config);
}
}
创建测试表
DROP TABLE IF EXISTS `redisson_lock_num`;
CREATE TABLE `redisson_lock_num` (
`num` int NULL DEFAULT NULL
);
INSERT INTO `redisson_lock_num` VALUES (10);
SET FOREIGN_KEY_CHECKS = 1;
web层代码
@RestController
class RedissonLockController {
@Autowired
RedissonClient redissonClient;
@Autowired
JdbcTemplate jdbcTemplate;
@GetMapping("/redisson")
public Object redissonLock() {
RLock lock = redissonClient.getLock("redisson.lock");
try {
lock.lock();
Map<String, Object> map = jdbcTemplate.queryForMap("SELECT `num` FROM redisson_lock_num");
Integer num = (Integer) map.get("num");
if (num > 0) {
jdbcTemplate.update("UPDATE redisson_lock_num SET `num` = `num` - 1");
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
return "SUCCESS";
}
}
启动两个服务
lock7 jemter测试,还是qps100
查看数据库结果,没有出现负数
lock8.png
优点
:效率高
缺点
:Redisson又需要另起一个线程给锁续命