(1)手动方式(需要考虑的问题比较多:锁时序问题、服务超时、服务宕机.....)
总结:1.获取锁:在获取锁的时候加上过期时间,这个过程需要是原子操作
2.释放锁:将获取锁的值并比较再删除锁的操作需要是原子操作(避免因业务机器宕机、业务超时等引起的问题)
解决:用luau脚本解决锁释放的过程中引发的问题
// 分布式锁(实现原理:利用redis的setnx 命令 原子操作:操作复杂,要考虑的多) public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLock() { //1.抢占分布式锁,去redis占坑 String uuid = UUID.randomUUID().toString(); //可以尝试把过期时间调大点 Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid,30, TimeUnit.SECONDS);//加锁, if (lock) { System.out.println("获取分布式锁成功...."); //加锁成功...执行业务 //2.设置过期时间,必须和加锁是同步的、原子的 // stringRedisTemplate.expire("lock",30, TimeUnit.SECONDS); Map<String, List<Catelog2Vo>> dataFromDb=null; try{ dataFromDb = getDataFromDb();//不管是否出现异常,都finally直接删锁 }finally { // stringRedisTemplate.delete("lock");//删除锁;删除锁后所有线程又可以重新抢锁 //获取值对比+对比成功删除锁=原子操作 Lua脚本删除锁 String script="if redis.call('get',KEYS[1])==ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end"; //利用Lua脚本删除锁 Long lock1 = stringRedisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList("lock"), uuid); // String lockvalue = stringRedisTemplate.opsForValue().get("lock"); // if (uuid.equals(lockvalue)){ // //删除我自己的锁 // stringRedisTemplate.delete("lock");//删除锁 // } } return dataFromDb; } else { System.out.println("获取分布式锁失败....等待重试"); //加锁失败...重试, 相当于synchronized,自旋的方式,会一直在重试 //如果觉得重试频率太高了,可以在这休眠100ms再重试 try { Thread.sleep(200); }catch (Exception e){ } return getCatalogJsonFromDbWithRedisLock();//自旋的方式一直在这重试 } }(2)使用 redission
相比手动方式,利用redission来实现分布式锁就简单得多,redission的操作都是原子操作,底层是用luau脚本在执行,要么全部执行成功,要么执行失败,而且不会引发锁时序的问题
redission的配置:
@Configuration public class MyRedissionConfig { /** * (redisson、jedis、lettuce都是对redis底层操作的客户端,只是redisson是分布式锁) * 所有对redisson的操作都是通过RedissonClient对象 * @return * @throws IOException */ @Bean(destroyMethod="shutdown") RedissonClient redissonClient() throws IOException { //1.创建配置 Config config = new Config(); //Redis url 应该以 redis:// or rediss:// (安全连接) 开头 config.useSingleServer().setAddress("redis://192.168.56.10:6379"); //2.根据config创建出RedissonClient实例 RedissonClient redissonClient = Redisson.create(config); return redissonClient; } } @Autowired private RedissonClient redissonClient; // 分布式锁(redisson 原子操作:操作简单,有封装) public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedissonLock() { //1.锁的名字,锁的粒度越细运行越快 //锁的粒度,具体缓存的是某个数据,11号商品:product-11-lock RLock lock = redissonClient.getLock("CatalogJson-lock"); lock.lock();//TODO:看门狗:锁的自动续期 System.out.println("获取分布式锁成功...."); Map<String, List<Catelog2Vo>> dataFromDb=null; try{ dataFromDb = getDataFromDb();//不管是否出现异常,都finally直接删锁 }finally { lock.unlock(); } return dataFromDb; }上面示例中getDataFromDb()是从数据库获取数据的一个方法
基于Redis的Redisson分布式可重入锁RLock Java对象实现了java.util.concurrent.locks.Lock接口。同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。
RLock lock = redisson.getLock("anyLock");
// 最常见的使用方法
lock.lock(); //有看门狗机制
看门狗机制总结:
问题:lock.lock(10, TimeUnit.SECONDS),在锁时间到期后,不会自动续期,也就是看门狗机制失效不起作用 1、如果我们设置了锁的超时时间,就发送redis执行脚本,来进行占锁,默认超时就是我们指定的时间。 2、如果我们未指定锁的超时时间,就使用30s【LockWatchTimeout看门狗的默认时间】; 只要占锁成功,就会启动一个定时任务【来重新给锁设置过期时间,新的过期时间就是看门狗的默认时间】,每隔10s就会自动再次续期,续成30s internalLockLeaseTime :看门狗时间/3 ,也就是10s 最佳实战:推荐使用 lock.lock(10, TimeUnit.SECONDS);省掉了整个续期操作,手动解锁。把过期时间调大,大于业务处理时间,防止业务超时导致锁不住其他线程大家都知道,如果负责储存这个分布式锁的Redisson节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是30秒钟,也可以通过修改Config.lockWatchdogTimeout来另行指定。
另外Redisson还通过加锁的方法提供了leaseTime的参数来指定加锁的时间。超过这个时间后锁便自动解开了。
推荐使用这种方式:手动解锁,可把过期时间调大,大于业务处理时间,防止业务超时导致锁不住其他线程
// 加锁以后10秒钟自动解锁 // 无需调用unlock方法手动解锁 lock.lock(10, TimeUnit.SECONDS);//加这种lock方法有时间的锁,看门狗机制会失效 // 尝试加锁,最多等待100秒,上锁以后10秒自动解锁 boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS); if (res) { try { System.out.println("加锁成功.........执行业务"+Thread.currentThread().getId()); Thread.sleep(10000); } finally { //解锁 lock.unlock(); } } /** * 分布式锁测试 * @return */ @ResponseBody @GetMapping({"/hello"}) public String hello(){ //1.获得一把锁,只要锁的名字一样,就是同一把锁 RLock lock = redissonClient.getLock("my-lock"); //2.加锁 lock.lock();//阻塞式等待。默认加的锁都是30s的时间 //TODO:看门狗:锁的自动续期 //1)、锁的自动续期,如果业务执行时间超长,运行期间自动给锁续上新的30s,不用担心业务时间长,锁自动过期被删除 //2) 、加锁的业务只要运行完成,就不会给当前锁续期,即使不手动解锁(例如业务宕机),锁默认在30以后自动删除 // lock.lock(10, TimeUnit.SECONDS);//10秒后自动解锁,自动解锁的时间一定要大于业务的执行时间 //问题:lock.lock(10, TimeUnit.SECONDS),在锁时间到期后,不会自动续期 //1、如果我们设置了锁的超时时间,就发送redis执行脚本,来进行占锁,默认超时就是我们指定的时间。 //2、如果我们未指定锁的超时时间,就使用30*1000【LockWatchTimeout看门狗的默认时间】; // 只要占锁成功,就会启动一个定时任务【来重新给锁设置过期时间,新的过期时间就是看门狗的默认时间】,每隔10s就会自动再次续期,续成30s // internalLockLeaseTime :看门狗时间/3 ,也就是10s //最佳实战:推荐使用 //1)、lock.lock(10, TimeUnit.SECONDS);省掉了整个续期操作,手动解锁。把过期时间调大,大于业务处理时间,防止业务超时导致锁不住其他线程 try{ System.out.println("加锁成功.........执行业务"+Thread.currentThread().getId()); Thread.sleep(10000); }catch (Exception e){ }finally { //3.解锁 System.out.println("释放锁....."+Thread.currentThread().getId()); lock.unlock(); //不管业务是否执行出现异常,最后都必须解锁 } return "hello"; }基于Redis的Redisson分布式可重入公平锁也是实现了java.util.concurrent.locks.Lock接口的一种RLock对象。同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。它保证了当多个Redisson客户端线程同时请求加锁时,优先分配给先发出请求的线程。所有请求线程会在一个队列中排队,当某个线程出现宕机时,Redisson会等待5秒后继续下一个线程,也就是说如果前面有5个线程都处于等待状态,那么后面的线程会等待至少25秒。
RLock fairLock = redisson.getFairLock("anyLock"); // 最常见的使用方法 fairLock.lock();大家都知道,如果负责储存这个分布式锁的Redis节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是30秒钟,也可以通过修改Config.lockWatchdogTimeout来另行指定。
另外Redisson还通过加锁的方法提供了leaseTime的参数来指定加锁的时间。超过这个时间后锁便自动解开了。
// 10秒钟以后自动解锁 // 无需调用unlock方法手动解锁 fairLock.lock(10, TimeUnit.SECONDS); // 尝试加锁,最多等待100秒,上锁以后10秒自动解锁 boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS); ... fairLock.unlock();Redisson同时还为分布式可重入公平锁提供了异步执行的相关方法:
RLock fairLock = redisson.getFairLock("anyLock"); fairLock.lockAsync(); fairLock.lockAsync(10, TimeUnit.SECONDS); Future<Boolean> res = fairLock.tryLockAsync(100, 10, TimeUnit.SECONDS);基于Redis的Redisson分布式可重入读写锁RReadWriteLock Java对象实现了java.util.concurrent.locks.ReadWriteLock接口。其中读锁和写锁都继承了RLock接口。
分布式可重入读写锁允许同时有多个读锁和一个写锁处于加锁状态。
RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock"); // 最常见的使用方法 rwlock.readLock().lock(); // 或 rwlock.writeLock().lock();大家都知道,如果负责储存这个分布式锁的Redis节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是30秒钟,也可以通过修改Config.lockWatchdogTimeout来另行指定。
另外Redisson还通过加锁的方法提供了leaseTime的参数来指定加锁的时间。超过这个时间后锁便自动解开了。
// 10秒钟以后自动解锁 // 无需调用unlock方法手动解锁 rwlock.readLock().lock(10, TimeUnit.SECONDS); // 或 rwlock.writeLock().lock(10, TimeUnit.SECONDS); // 尝试加锁,最多等待100秒,上锁以后10秒自动解锁 boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS); // 或 boolean res = rwlock.writeLock().tryLock(100, 10, TimeUnit.SECONDS); ... lock.unlock();测试读写锁:
/** * 测试读写锁 * */ //保证一定能读到最新的数据,在修改期间,写锁是一个排他锁(互斥锁),读锁是一个共享锁 //只要写锁存在没被释放,读操作就必须等待 @ResponseBody @GetMapping({"/write"}) public String writeValue(){ RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("rw-lock"); String s=""; RLock wLock = readWriteLock.writeLock(); try{ //1.改数据加写锁,读数据加读锁 wLock.lock(); s = UUID.randomUUID().toString(); Thread.sleep(10000); stringRedisTemplate.opsForValue().set("writevalue",s); } catch (InterruptedException e) { e.printStackTrace(); } finally { wLock.unlock(); } return s; } @ResponseBody @GetMapping({"/read"}) public String readValue(){ RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("rw-lock"); String s=""; RLock rLock = readWriteLock.readLock(); rLock.lock(); try{ s = stringRedisTemplate.opsForValue().get("writevalue"); } catch (Exception e) { e.printStackTrace(); } finally { rLock.unlock(); } return s; } 保证一定能读到最新的数据,在修改期间,写锁是一个排他锁(互斥锁、独享锁),读锁是一个共享锁 只要写锁存在没被释放,读操作就必须等待 写+读:先写再读,必须等待写锁释放 写+写:阻塞方式 读+写:先读再写,有读锁,写也需要等待 读+读:相当于无锁,并发读,只会在redis中记录好所有当前的读锁,他们都会同时加锁 只要有写锁存在,读写都必须等待基于Redis的Redisson的分布式信号量(Semaphore)Java对象RSemaphore采用了与java.util.concurrent.Semaphore相似的接口和用法。同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。
RSemaphore semaphore = redisson.getSemaphore("semaphore"); semaphore.acquire(); //或 semaphore.acquireAsync(); semaphore.acquire(23); semaphore.tryAcquire(); //或 semaphore.tryAcquireAsync(); semaphore.tryAcquire(23, TimeUnit.SECONDS); //或 semaphore.tryAcquireAsync(23, TimeUnit.SECONDS); semaphore.release(10); semaphore.release(); //或 semaphore.releaseAsync();信号量测试:
/** * 测试信号量 *acquire获取一个则信号量-1 *release释放一个则信号量+1 * * 信号量也可用于分布式限流 */ @ResponseBody @GetMapping({"/park"}) public String park() throws InterruptedException { RSemaphore park = redissonClient.getSemaphore("park"); // park.acquire();//阻塞方式,获取一个信号量,获取一个值 .....加 boolean b = park.tryAcquire(); if (b){ //执行业务 }else { return "error"; } return "ok..."+b; } @ResponseBody @GetMapping({"/go"}) public String go() throws InterruptedException { RSemaphore park = redissonClient.getSemaphore("park"); park.release();//阻塞方式,释放一个信号量,减少一个值 .....减 return "ok"; } acquire获取一个则信号量-1 release释放一个则信号量+1 信号量也可用于分布式限流基于Redisson的Redisson分布式闭锁(CountDownLatch)Java对象RCountDownLatch采用了与java.util.concurrent.CountDownLatch相似的接口和用法。
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch"); latch.trySetCount(1); latch.await(); // 在其他线程或其他JVM里 RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch"); latch.countDown();闭锁测试:
/** * 放假 锁门 * 1班没人了,2班...3班... * 5个班全走完,我们可以锁大门了 */ @ResponseBody @GetMapping({"/lockDoor"}) public String lockDoor() throws InterruptedException { RCountDownLatch door = redissonClient.getCountDownLatch("door"); door.trySetCount(5); door.await();//等待闭锁都完成 //执行其他业务 return "可以关大门了"; } @ResponseBody @GetMapping({"/gogogo/{id}"}) public String lockDoor(@PathVariable String id) throws InterruptedException { RCountDownLatch door = redissonClient.getCountDownLatch("door"); door.countDown();//计数减一 return id+"班走完了"; }总结:
缓存的问题: 1.空结果进行缓存,解决缓存穿透(redis中不存在,会大并发查数据库) 2.设置过期时间(加随机值),解决缓存雪崩(redis数据大面积失效) 3.加锁,解决缓存击穿 (redis对于热点数据失效)
需要缓存的业务 考虑缓存的两种用法模式 1.读模式,如何读取一个数据,应该遵循先从缓存中读取, 如果缓存中没有,再在数据库读取,如果在数据库查到数据则再放到缓存中,并返回 2.写模式,如何保证缓存中的数据和数据库中的数据是一致的 可以使用双写模式或失效模式 双写模式:如果修改数据。如果缓存中有,则可以改完数据库中的数据后,再改缓存中的数据,把缓存中以前的数据覆盖掉 失效模式:改完数据库数据以后,可以把缓存中的数据直接清除掉,可以保证下一次从缓存中拿到的数据是最新的 如果出现数据不一致,可以通过加锁来保证顺序一致性问题,来达到缓存与数据库的最终一致
上图加的是本地锁,例如synchronize、JUC里面的锁都只能锁住当前进程,而不能锁住其他服务,所以根据上面图片显示,最终还是会有8个请求跑进来去查询数据库
String uuid = UUID.randomUUID().toString(); Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid,30, TimeUnit.SECONDS);
