Redis安装与使用
那些数据适合使用缓存
- 即时性,数据一致性要求不高的
- 访问量大且更新评率不高的数据(读多,写少)
redis安装
下载镜像
1
docker pull redis
创建实例并启动
1
2
3
4
5
6
7
8# 创建配置文件夹
mkdir -p /mydata/redis/conf
touch /mydata/redis/conf/redis.conf
docker run -p 6379:6379 --name redis -v /mydata/redis/data:/data \
-v /mydata/redis/conf/redis.conf:/etc/redis/redis.conf \
-d redis redis-server /etc/redis/redis.conf使用 redis 镜像执行 redis-cli 命令连接
1
docker exec -it redis redis-cli
Redis压测处理异常问题
产生的堆外内存溢出: OutOfDirectMomoryError
- 原因是因为SpringBoot2.0以后版本默认使用 Lettuce 作为操作 Redis 的客户端 . Lettuce 使用 netty 进行网络通信
- Lettuce 的 bug 导致 netty 堆外内存溢出, -Hmx300m: netty如果没有指定堆外内存,会默认使用 -Hmx300m作为默认空间
- 可以通过 -Dio.netty.maxDircetMemory 进行设置
- 解决方法
- 不能使用 -Dio.netty.maxDircetMemory 只是去调大堆外内存
- 1)、升级 lettuce 客户端
- 2)、切换使用 jedis
1 | <!-- 引入redis依赖 --> |
缓存失效问题之缓存穿透
缓存穿透
缓存穿透是指查询一个一定
不存在的数据,由于缓存是不命中,将去查询数据库,但是数据库也无此记录,我们没有将这次查询的 null 写入缓存,这将导致这个不存在的数据每次
请求都要到存储层去查询,失去了缓存的意义。
在流量大时,可能 DB 就挂掉了,要是有人利用不存在的 key 频繁攻击我们的应用,这就是
漏洞
解决方案:接口层增加校验,如用户鉴权校验,id做基础校验,id<=0的直接拦截
即使查询了空结果也进行缓存,并且设置较短的失效时间
缓存失效问题之缓存雪崩
缓存雪崩
缓存雪崩是指在我们设置缓存时采用了相同的过期时间,导致缓存在某一时刻同时失
效,请求全部转发到 DB,DB 瞬时压力过重雪崩。
解决方案:原有的失效时间基础上增加一个随机值,比如 1-5 分钟随机,这样每一个缓存的过期时间的
重复率就会降低,就很难引发集体失效的事件。
redis高可用,如果缓存数据库是分布式部署,将热点数据均匀分布在不同得缓存数据库中。设置热点数据永远不过期。
缓存失效问题之缓存击穿
缓存击穿
对于一些设置了过期时间的 key,如果这些 key 可能会在某些时间点被超高并发地访问,是一种非常“热点”的数据。
这个时候,需要考虑一个问题:如果这个 key 在大量请求同时进来前正好失效,那么所有对这个 key 的数据查询都落到 db,我们称为缓存击穿。
解决方案:设置热点数据永远不过期。
接口限流与熔断,降级。重要的接口一定要做好限流策略,防止用户恶意刷接口,同时要降级准备,当接口中的某些 服务 不可用时候,进行熔断,失败快速返回机制。
布隆过滤器。bloomfilter就类似于一个hash set,用于快速判某个元素是否存在于集合中,其典型的应用场景就是快速判断一个key是否存在于某容器,不存在就直接返回。布隆过滤器的关键就在于hash算法和容器大小
加锁;高并发情况下,给单独需要查询数据的的线程加锁,其他线程等待。
分布式锁
分布式锁与本地锁
本地锁,只能锁住当前进程,在分布式架构下,集群部署中,效果不好,所以需要分布式锁
本地锁:synchronize , JUC(Lock) 均是本地锁在分布式情况下,必须使用分布式锁
分布式锁实现
我们可以同时去一个地方”占坑“,如果占到,就执行逻辑。否则就必须等待,知道释放锁。
”占坑“可以去redis,也可以去数据库,可以去任何大家都能访问的地方。
等待可以自选的方式。
场景解决
使用Redis
获取锁,setnx(“lock”,1111)
设置成功返回ok->获取到锁 ->执行逻辑 ->删除锁 ->结束
设置失败返回null ->没获取到锁 ->等待(自旋) ->重新设置setNX(“lock”,1111) -> 直到获取到锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15// Boolean setIfAbsent(K key, V value);
String uuid = UUID.randomUUID().toString();
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid);
// 获取到锁
if (lock) {
// 执行逻辑
// 删除锁
redisTemplate.delete("lock");
return result;
}else {
// 没有获取到锁
// 等待100ms重试
try{Thread.sleep(200); }catch (Exception e){ }
return testWithRedissonLock(); //自旋的方式
}问题- setnx占好了位置,业务代码异常或者程序在页面过程中宕机了,没有执行删除锁的逻辑,这就造成了死锁
解决设置锁的自动过期,即使没有删除,会自动删除
1
2
3
4
5Boolean setIfAbsent(K key, V value);
// 代码实现
String uuid = UUID.randomUUID().toString();
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid);
问题- 直接删除锁;如果由于业务时间很长,锁自己过期了,此时直接删除锁,有可能把别人正在持有的锁删除了
解决占锁的时候,值指定为uuid,每个人匹配自己的锁才删除
1
2
3
4
5
6<T> T execute(RedisScript<T> script, List<K> keys, Object... args);
// 代码实现
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
Long lock1 = redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList("lock"), uuid);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32String uuid = UUID.randomUUID().toString();
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid, 300, TimeUnit.SECONDS);
if (lock) {
System.out.println("获取分布式锁成功...");
//加锁成功... 执行业务
// 2、设置过期时间,必须和加锁是同步的,原子的;在setnx的时候就开始设置过期时间
// redisTemplate.expire("lock",30,TimeUnit.SECONDS);
try {
// 执行业务
} finally {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
//删除锁
Long lock1 = redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList("lock"), uuid);
}
//获取值对比+对比成功删除=原子操作 lua 脚本解锁
// String lockValue = redisTemplate.opsForValue().get("lock");
// if(uuid.equals(lockValue)){
// 删除我自己的锁
// redisTemplate.delete("lock");//删除锁
// }
return result;
} else {
//加锁失败...重试。synchronized ()
// 休眠 100ms 重试
System.out.println("获取分布式锁失败...等待重试");
try {
Thread.sleep(200);
} catch (Exception e) {
}
return testWithRedissonLock();//自旋的方式
}
}这种方式实现分布式锁,重点是原子获取锁和原子删除锁的逻辑实现
Redisson实现分布式锁
redisson可重入锁
1 | Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。 |
整合Redisson做为分布式锁的框架
引入依赖
1 | <!-- https://mvnrepository.com/artifact/org.redisson/redisson --> |
配置redisson
1 |
|
测试是否注入
1 |
|
demo演示
1 | public String hello() { |
- 模拟两台服务器集群,两个线程同时发请求,a服务先获取到锁之后,停止a服务器,人为制造a服务器不解锁的情况,b服务器一样也能完成请求
- redisson 解决了两个事情
- 1 . 锁的自动续期,如果业务时间超长,运行期间自动给锁续上新的30s(默认加的锁都是30s),不用担心业务时间长,锁自动过期被删掉的问题
- 2 . 加锁的业务只要运行完成,就不会给当前锁续期,即使不手动解锁,锁默认在30s以后会自动删除
- redisson 解决了两个事情
lock看门狗机制-redisson解决死锁
1 | // 加锁以后10秒钟自动解锁 |
1 | // 最常见的使用方法 |
两种加锁的方法,对比区别
1
2
3
4
5
6lock.lock(10, TimeUnit.SECONDS);// 10自动解锁,自动解锁的时间一定要大于业务执行的时间;因为lock.lock(10, TimeUnit.SECONDS)在设置了10自动到期之后,不会自动续期
//1、【lock.lock(10, TimeUnit.SECONDS)】如果我们传递了锁的超时时间,就发送给redis执行脚本,运行占锁,默认超时就是我们指定的时间
//2、【lock.lock()】如果我们未指定锁的超时时间,就使用30*1000【LockWatchdogTimeout看门狗默认时间30s】
// 只要占锁成功,就会启动一个定时任务【重新给锁设置过期时间,新的过期时间就是看门狗的默认时间(30s)】,每隔10s【internalLockLeaseTime【看门狗时间】/3】就会续期,续满看门狗的时间【30s】
最佳实战
1
1、lock.lock(30, TimeUnit.SECONDS);// 省掉了整个续期操作,业务完成后手动解锁
redisson读写锁
基于Redis的Redisson分布式可重入读写锁
RReadWriteLockJava对象实现了java.util.concurrent.locks.ReadWriteLock接口。其中读锁和写锁都继承了RLock 接口。分布式可重入读写锁允许同时有多个读锁和一个写锁处于加锁状态。
1 | RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock"); |
另外Redisson还通过加锁的方法提供了
leaseTime的参数来指定加锁的时间。超过这个时间后锁便自动解开了。
1 | // 10秒钟以后自动解锁 |
读写锁功能作用- 保证一定能读到最新的数据
- 在修改期间。
写锁是一个排他锁(互斥锁);读锁是一个共享锁。 写锁没释放读锁就必须等待
- 在修改期间。
- 保证一定能读到最新的数据
补充细节
读锁 + 读锁:相当于无锁,并发读,只会在redis中记录好所有当前的读锁,他们都会自动加锁成功
写锁 + 读锁:等待写锁释放
写锁 + 写锁:阻塞方式
读锁 + 写锁:
1
// 写一个demo,伪造在读的时候,也sleep30s,同步在写入一条数据
有读锁,写锁也需要等待。
总结:
只要是写锁的存在,都必须等待写锁释放。
缓存数据一致性问题
双写模式
失效模式
改进方法 1-分布式读写锁
分布式读写锁。读数据等待写数据整个操作完成