ICode9

精准搜索请尝试: 精确搜索
首页 > 数据库> 文章详细

Redis缓存四大问题

2021-03-25 09:34:04  阅读:161  来源: 互联网

标签:缓存 Redis 查询 四大 当中 过滤器 new id


前言

观看笔记:https://www.bilibili.com/video/BV1gZ4y1s7Aw

之前这个的视频下架了,我就没办法再回过头去再看一遍;现在知道了,一边看一定得一定得做笔记。不然下次悄悄地没有了。想哭都没地方。

缓存穿透详解及实战

  1. 如何使用缓存?

(思路十分简单)

controller层(对于数据的访问):

@Autowired
OrderService orderService;

@Autowired
RedisTemplate redisTemplate;

@GetMapping("/test")
public Integer insertOrder(Order order){ return orderService.insertOrder(order);}

@GetMapping("/updateOrder")
public Integer updateOrder(Order order){ return orderService.updateOrder(order);}

//对于一条数据的访问;通过id进行查询数据;
@GetMapping("/selectid")
public R selectOrderById(Integer id){ return orderService.selectOrderById(id);}

service层:

package com.xxx.testcache.service;

import ...

public interface OrderService{
  Integer insertOrder(Order order);

  R selectOrderById(Integer id);

  List<Order> selectOrderAll();

  Integer updateOrder(Order order);  
}

service impl(实现)层:

//提高性能
//保护数据库
public R selectOrderById(Integer id){
  //查询缓存;
  //第一次查询缓存,即查询redis;
  //valueOperations这个对象专门针对于redis当中String类型的一个操作;
  //valueOperations是与SpringBoot整合之后的一个对象;valueOperations的上级即为redis;
  //valueOperations专用于操作字符串String类型;
  Object redisObj = valueOperations.get(String.valueOf(id));

  //命中缓存
  //如果缓存当中存在该对象则直接返回;
  if(redisObj != null){
    //正常返回数据
    return new R().setCode(200).setData(redisObj).setMsg("OK");
  }

  //否则如果缓存当中不存在该对象则查询数据库;
  //查询数据库之后,再将该数据加入到缓存当中;
  //方便下一次查询的时候更够查询得到该记录,该对象;
  //然后返回给前端
  try{
      Order order = orderMapper.selectOrderById(id);
      if(order != null){
        valueOperations.set(String.valueOf(id), order, 10, TimeUnit.MINUTES);// 加入缓存
      }
  }finally{

  }

  //如果数据库也查询不到;则返回前端:无此数据
  return new R().setCode(500).setData(new NullValueResultDO()).setMsg("查询无果");

  //思路简单:缓存当中有则返回该记录给前端;缓存当中没有则再查询数据库;
}

mapper层:

package com.xxx.testcache.mapper;

import ...

public interface OrderMapper{
  
    @Select("insert into t_order (id,name) values (#{id}, #{name})")
    Integer insertOrder(Order order);

    @Select("select * from t_order where id=#{id}")
    Order selectOrderById(Integer id);

    @Select("select * from t_order")
    List<Order> selectOrderAll();

    @Update("update t_order set  `name`=#{name} where id=#{id}")
    Integer updateOrder(Order order);
}
  1. 为什么要使用缓存?

(1)提高性能

查询redis比查询数据库快
现在的nosql数据库一般多用为redis;
那么关系型数据库一般多用为mysql;
所以当前举例关系型数据库以mysql为主;
非关系型数据库以redis为主进行讲解;

(2)保护数据库

如果没有缓存redis,则每一次请求都将要去访问数据库;
那么由此也就造成了数据库的访问压力
特别是在一些大公司当中,其访问量特别特别多的;
那么如果此时所有的查询请求都去往数据库的话,那么服务器数据库的压力也是非常大的;
所以当使用缓存之后;可以看到使用了缓存则,大部分请求都将会在[ 命中缓存 ]该步骤时进行返回给前端(就不再去访问数据库了);
第一次查询将查询数据库;第二次查询就不再会去查询数据库了而是会去查询缓存;

//提高性能
//保护数据库
public R selectOrderById(Integer id){
//查询缓存
Object redisObj = valueOperations.get(String.valueOf(id));

//命中缓存
if(redisObj != null){
  //正常返回数据
  return new R().setCode(200).setData(redisObj).setMsg("OK");
}
···

一般对于加入到缓存当中的数据,会加一个过期时间;
比如当前有一个促销活动,需要卖出商品;
且该商品的促销时间只做三天;
那么就可以给该商品数据的缓存数据设置一个三天的过期时间;
一般会给缓存当中的数据设置一个过期时间

valueOperations.set(String.valueOf(id), order, 10, TimeUnit.MINUTES);// 加入缓存

此段代码即普通程序员所写代码;
如果该段代码在中小型公司存在问题不大;
但是如果该段代码放在大公司当中,该段代码就不能够这样写了;

因为大公司当中要求特别高(╮(╯▽╰)╭);

且该段代码当中存在缓存三大问题中的两大问题没有解决:第一个问题是缓存穿透;第二个问题就是缓存击穿

  1. 缓存有三个问题?

(1) 缓存穿透

指的是 查询数据库和缓存当中都没有的数据查询的为空数据

(当前测试库当中的数据仅仅只有8条数据;但是现在查询数据id为2000的数据记录)

//id为2000的该数据在测试库当中并不存在
localhost:8080/selectid?id=2000  
//响应:{"code":500 , "data":{}, "msg":"查询无果"}
//尽管查询无果;但是该操作,即查询id为2000的该操作依然查询了数据库;
//前面说查询缓存,是为了保护数据库;
//但是现在一个不存在的数据,进行查询的时候每一次都进行查询了数据库;
//由于数据库当中不存在该数据记录行,那么缓存当中也肯定不会存在有该数据记录行

localhost:8080/selectid?id=1001
//响应:{"code":200 , "data":{"id":1001, "name":"王昭君"}, "msg":"OK"}

问题代码分析:
1、第一次查询缓存,查询无果之后,查询数据库;
2、数据库也查询不到则查询无果;
3、在数据记录行在数据库当中记录为空该情况下,该段代码每一次都将会去查询数据库;

当前市面上有两种解决缓存穿透问题的方案

(1)缓存空对象:代码简单,效果不好
(2)布隆过滤器:代码复杂,效果很好(一般用布隆过滤器

缓存空对象的实现思路:

当去查询一条在数据库当中不存在的数据记录行的时候;
不管该数据记录行在数据库当中查询得到或者查询不到,都将该信息交给缓存;

service impl(实现)层:

//提高性能
//保护数据库
public R selectOrderById(Integer id){
  //查询缓存;
  Object redisObj = valueOperations.get(String.valueOf(id));

  //命中缓存
  if(redisObj != null){
    //查询出来的对象进行判断是否其实例属于空对象
    if( redisObj instanceOf NullValueResultDO){
      //如果属于空对象则直接返回查询无果该响应结果
      return new R().setCode(500).setData(new NullValueResultDO()).setMsg("查询无果");
    }

    //正常返回数据
    return new R().setCode(200).setData(redisObj).setMsg("OK");
  }

  try{
      Order order = orderMapper.selectOrderById(id);
      if(order != null){
        valueOperations.set(String.valueOf(id), order, 10, TimeUnit.MINUTES);// 加入缓存
      }else{

        //此处添加else判断,当该数据记录行在数据库当中查询不到的时候的处理操作
        //如果数据库当中查询不到,那么此时使用一个空对象加入到缓存当中去;
        //当加入的内容为空对象的时候,那么每次查询开头的时候也就同样需要判断;是否为空对象,如果为空对象则直接返回前端查询无果;就将不再去进行查询数据库这一操作了;
        valueOperations.set(String.valueOf(id), new NullValueResultDO(), 10, TimeUnit.MINUTES);//加入缓存

      }
  }finally{

  }
  return new R().setCode(500).setData(new NullValueResultDO()).setMsg("查询无果");
}

空对象 NullValueResultDO(无属性):

package com.xxx.testcache.entity;

import lombok.*;

@Getter
@Setter
@ToString
public class NullValueResultDO{
  //private String name;
}

查询数据记录行在数据库当中为空的情况分析:
1、 第一次查询缓存,缓存当中没有命中,即缓存当中不存在该数据记录;

2、 此时查询数据库,查询数据库中发现该数据行为空;则将该一个空对象加入到缓存当中,其id为查询的内容,其值为空对象;则此时进行返回

3、 第二次进行查询缓存,同样查询的数据在数据库当中数据记录行为空;
则此时先进行查询缓存,缓存当中存在该数据记录行,也就是存在id为查询id的记录值;
则此时当缓存命中之后进行判断缓存的该实例是否为空对象;
如果为空对象则直接返回查询无果否则返回查询数据;

缓存穿透该第一种解决方案(缓存空对象)的效果为什么不好
原因:

  • 每一次换不同的查询,且每一次不同的查询(即id不同)明知该数据在数据库当中查询不到;但是都还是会去查询一次
  • 缓存空对象解决的问题是一个key(即一个id)对其进行多次访问的这样一个问题(当过期时间失效则又将会去查询一遍数据库);
  • 如果存在查询数据为大量空数据则将会导致redis当中存在有大量的空对象(空数据)(恶意大规模查询明知道数据库当中为记录行为空的数据(空数据);)弊端就在于占用redis的内存
//id为2000的该数据在测试库当中并不存在
localhost:8080/selectid?id=2000  
//响应:{"code":500 , "data":{}, "msg":"查询无果"}
localhost:8080/selectid?id=2000  
//此时响应的内容是查询的缓存当中的数据;
localhost:8080/selectid?id=2000  
//此时响应的内容是查询的缓存当中的数据;
localhost:8080/selectid?id=2000  
//此时响应的内容是查询的缓存当中的数据;
localhost:8080/selectid?id=2000  
//此时响应的内容是查询的缓存当中的数据;

---------------------------------------------------------------

localhost:8080/selectid?id=2002
//第一次查询缓存当中不存在的内容就将会去查询一遍数据库;
//该数据在数据库当中不存在;
//响应:{"code":500 , "data":{}, "msg":"查询无果"}
localhost:8080/selectid?id=2003
//第一次查询缓存当中不存在的内容就将会去查询一遍数据库;
//该数据在数据库当中不存在;
//响应:{"code":500 , "data":{}, "msg":"查询无果"}
localhost:8080/selectid?id=2004
//第一次查询缓存当中不存在的内容就将会去查询一遍数据库;
//该数据在数据库当中不存在;
//响应:{"code":500 , "data":{}, "msg":"查询无果"}

谷歌有一个框架叫做 Guava;在这个 Guava 框架当中已经写好了布隆过滤器;即已经提供了布隆过滤器;缺陷在于 Guava 框架当中的该布隆过滤器不支持分布式;

布隆过滤器应用

maven安装依赖:

<dependency>
  <groupId>com.google.guava</groupId>
  <artifactId>guava</artifactId>
  <version>27.0.1-jre</version>
</dependency>

安装依赖之后就可以直接使用;
调用其静态方法就可以直接创建出一个布隆过滤器;
如果对布隆过滤器没有认识的道友,可以将布隆过滤器理解为一个ArrayList集合,就理解为java当中的一个集合即可;

public class TestBloomFilter{
  private static int size = 1000000;

//size         预计要插入多少条数据
//fpp          容错率-->出现误判的概率是多少
//bloomFilter  位数组
//list         创建的是object数组
//bit          数组

//位数组  21亿  JVM内存     数据不会进行持久化  256M
//redis   42亿  redis内存   redis的持久化数据   512M==42亿位

//bit[]
private static BloomFilter<Integer> bloomFilter = BloomFilter.create(Funnels.integerFunnel(), size, 0.001);

//List list = new ArrayList();
public void main(String[] args){

  //Integer[] integers = new Integer[Integer.MAX_VALUE];
  
  //当前该循环会生成1~100,0000 当中所有的数值;
  for( int i = 1 ; i < = size ; i++ ){
    //将这些1~100,0000的数值全部添加到布隆过滤器bloomFilter(集合)当中
    bloomFilter.put(i);
    //bloomFilter.put("老王");
  }

  List<Integer> list = new ArrayList<>(10000);
  //故意取10000个不在过滤器里的值,看看有多少个会被认为在过滤器里
  //size + 10000 = 100,0000+1000;
  //size + 20000 = 100,0000+2000;
  //该数值肯定大于100,0000;肯定与第一次循环当中的数值相同;
  for( int i = size + 10000 ; i < size + 20000 ; i++ ){

    //调用该数值i在布隆过滤器当中是否存在的方法;如果存在则加入布隆过滤器;否则不加入布隆过滤器;
    //之前将布隆过滤器比喻为java当中的ArrayList集合;
    //则当前期望的是:此时当前该循环当中是不会任何一个数值加入到该布隆过滤器bloomFilter当中去的,也就是该ArrayList集合当中去;
    if(bloomFiilter.mightContain(i)){//误判
      list.add(i);
    }
  }

  System.out.println("误判的数量:"+list.size);
  //响应为:10
}
}
  • 布隆过滤器确确实实是一个集合对象
  • 布隆过滤器当中仅存在两个方法,
    • 第一个方法是put(Object obj);(将数据取值加入到布隆过滤器当中);
    • 第二个方法是mightContain(Object obj);(判断数据取值是否在布隆过滤器当中存在)
  • 加入到布隆过滤器当中的数据无法被修改(update)被删除(delete)被取出来(get);只能够进行添加(put)以及判断是否存在(mightContain);
  • mightContain 该方法有可能判断失误;
  • 此时理解布隆过滤器可以理解为一个特殊的集合对象;

定义布隆过滤器仅需要做(定义):
(1) size 预计要插入多少数据(预计插入量)
(2) fpp 容错率(False positive probability)—>出现误判的概率是多少(即上述当中误判的次数是可以由自己定义的;容错率是百分之百会有的;不能定义为0;如果定义为0就将会抱错)

误判:传一个key到该布隆过滤器(“ArrayList”)当中进行判断;该数据是否存在(mightContain)在该布隆过滤器当中;而该方法mightContain有可能会判断失误;而导致不存在在布隆过滤器当中的key也被判断出存在在该布隆过滤器当中了;(明明不存在,有可能会被判断为存在)

判断的该容错率取值越低;其底层的内存消耗则将越大
容错率取值越高;则内存开销

  private static int size = 1000000;

  private static BloomFilter<Integer> bloomFilter=BloomFilter.create(Funnels.integerFunnel(),size, 0.001);

  //此时第一次打印出现的误判次数为10次;
------------------------------------------------------------------
  private static int size = 1000000;

  private static BloomFilter<Integer> bloomFilter=BloomFilter.create(Funnels.integerFunnel(),size, 0.1);

  //此时第二次打印出现的误判次数为1033次;
------------------------------------------------------------------
  private static int size = 1000000;

  private static BloomFilter<Integer> bloomFilter=BloomFilter.create(Funnels.integerFunnel(),size, 0);

  //此时第二次打印出现的抛出异常,报错;
  //即布隆过滤器的容错率不能为0
  //java.lang.ExceptionInInitializerError
  //Caused by: java.lang.IllegalArgumentException:False positive probability(0.0) must be > 0.0

布隆过滤器原理
布隆过滤器的底层实现依赖于bit数组;
1个字节byte为8位;
1024个字节为1M;
所以其单位是很小;
512M=524288byte

由于位数组,即byte只有一位;所以其取值就只能够是0和1;
默认该数组当中的白色格子取值value全部为0;而蓝色格子取值value为1;

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-q8RrAaTR-1616635337557)(http://r.photo.store.qq.com/psc?/V13IdniL4CDhqM/TCfiP1YaPeRT4Jil9RANXy93sfkaGWRUBapr9y3pHbtQ6yRyawhbsd7AS4G1Kc*Gb56IM9A4pA41f7IAo4fIom1PvvFF11WuEdaLRX51c!/r)]

左边即为往布隆过滤器当中进行put方法操作的结果(把数据加入到布隆过滤器);

那么其位数组就会发生变化;
变化的表现有:

  • 该布隆过滤器其数组当中所有的key取值value默认都为0
  • 当添加完一条数据(“老王”)之后,就会出现三个格子,变成黄色;也就代表着,该三个格子所处index位置(可以不为3个,只是说该图经过计算得为3个(举个例子不要当真);
    • 为什么被计算出来为3个index位置的格子被改成了1;
    • 其实也可以改下,将其改成10个index位置被改颜色为黄色,即将默认取值0改成1),其value值被改变称为1了;默认为0;
  • 如何修改的呢?
    • 上述为3个index位置被改变颜色为黄色,即将默认取值改变为1;
    • 那么此时可以认为是3个hash函数对该加入到布隆过滤器当中的该取值“老王”进行了hash;
    • 那么3个hash函数对“老王”该取值进行了hash之后,就会得到有一个数字;
    • 打个比方:hash(“老王”)=1212121;hash某一字符串都将得到一个数值;
    • 通过hash获取的到一个数值之后,在对该布隆过滤器该数组的长度进行取余
    • 比如说该布隆过滤器该数组的长度是40;
    • 那么就会将对“老王”进行hash得到的数值 对 该布隆过滤器数组的长度进行 取余;即1212121%40=3;(该3是随意捏造的;不要当真;只是一个例子)
    • 那么此时布隆过滤器就将会将index取值为3的该位置的默认取值0改为1;
    • 步骤:首先对加入到布隆过滤器的数值进行hash,经过hash之后就会得到一个数值;将该数值进行取余该布隆过滤器数组的长度得到的数值就是在该布隆过滤器数组当中的一个下标;则将该下标的默认取值0进行修改为1;
    • 当前为什么会有3个不同的位置被改变颜色为黄色呢(即改变默认取值为1)?原因就在于有3个不同的hash函数,存在有不同的hash函数则将有3个不同位置的数组index被改变默认取值0为1;(通过hash函数对加入到布隆过滤器当中的取值进行hash得到下标位置再进行取余数组长度得到在该数组当中的下标index位置)
    • 使用了3个不同的hash函数对该put(Object obj) obj对象进行了hash
    • hash算法存在有多个,比如说hash32,hash16…(都是进行hash,只是hash的函数不一样;为什么是三个不同的hash呢?可以是10个100个n个不同的hash;自己设定;自己喜欢就好;喜欢多少个就多少个)
    • hash函数多并不一定导致容错率低,只能说是一种因素;

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-mQK39pgE-1616635337562)(http://r.photo.store.qq.com/psc?/V13IdniL4CDhqM/TCfiP1YaPeRT4Jil9RANX1IOvYMzLIGr4hMfNmq0SLpSUjWiYQqvcQRMK3kZarTxU24HFj0FaxTXP9dLIO.fiVY2eH4cPYFmojOxc8M!/r)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-RRx5HCF8-1616635337566)(http://r.photo.store.qq.com/psc?/V13IdniL4CDhqM/TCfiP1YaPeRT4Jil9RANX1nwjJZCSD2X9riQotNbwt9nUD*060h1sSAaCk5hI9WOQJEcFG3tGpPrepg4ztfFPKkDeVMefdqHGj9ns7B99jw!/r)]

位数组(布隆过滤器)当中默认全部为0,当时当插入的数据量过大,即位数组当中将有大部分原本默认取值为0的index将被改变取值为1;

当前插入到布隆过滤器当中的数据有21条数据记录行:
1、2、3、4、5、6、7、8、9、10、11、12、13、14、15、16、17、18、19、20、老王

可能存在(误判的原因)
(1)当进行查找“老王”时,是可以查找得到的;
(2)当进行查找 1 时,是可以查找得到的;

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-hgPdv5mh-1616635337570)(http://r.photo.store.qq.com/psc?/V13IdniL4CDhqM/TCfiP1YaPeRT4Jil9RANXw6xgVJROJh3BJY6xFr9VJp6l4Zz7PwB3XR97Ym8Flrt07pQJDvc4W10SFkqDBy5AkMwUaqHzOjviy6g9514Q!/r)]

(3)当进行查找 111 时,也是可以查找得到的;此时就出现了误判
该取值111并不是加入到布隆过滤器当中的数值;
而是21条数据添加通过3个不同的hash取余数组长度得到各个不同的在数组当中的下标位置index;
恰巧就将 111 该取值进行3个不同的hash函数得出的3个不同的数值取余数组长度得到的3个不同位置下标index的默认取值0进行了改值为1;
而111该数值的三个在布隆过滤器数组当中index下标位置取值默认为0被修改为1;
并非是111进行mightContain时进行修改的;
而是当添加21条数据记录时进行修改的;
也就是当[因素1]添加到布隆过滤器当中的数据量过大时,如果[因素2]有多个hash函数(hash冲突),那么布隆过滤器当中就可能存在多个下标位置index的取值由0改变为1的这种情况;

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-V7OLiP5e-1616635337573)(http://r.photo.store.qq.com/psc?/V13IdniL4CDhqM/TCfiP1YaPeRT4Jil9RANX6G6Nkh9oP4j1AI.xWFqKAqs9pJ6hpqT9bcebJgsD1MnDW3AU*HyNTcnfjGcwoSGm1Y5nNUuczxo97CBCFXTZsA!/r)]

1、为什么布隆过滤器没有取出的方法即get方法;—>因为其布隆过滤器底层没有进行保存其加入到布隆顾虑器当中真实的取值;只有位数组;
2、布隆过滤器也没有delete方法;—>由于在布隆过滤器当中该位数组当中其下标位置index要删除的位置其位置,同一个位置也有可能是其他数值所占的下标位;(hash冲突);如果将该下标位置index的取值改为0;那么也将会影响其他的数值的下标取值;也就是说;当其他的数据进行判断是否存在时,原本其他数据的下标位置index取值为1;当前因为有需要要进行删除的数据所以对该布隆过滤器的下标位置index取值进行更改为从1变为0;其他数据的下标位置与当前要删除的数据的下标位置相撞相同了(hash冲突);如果当前要删除的数据将该布隆过滤器数组当中下标位置的取值改为了0;则其他数据下次进行mightContain判断的时候将会被判断为不存在;所以布隆过滤器没有删除delete该方法;
3、布隆过滤器为什么会出现误判?—>误判的缘故就在于存在hash冲突

出现误判的概率,即容错率与什么因素有关?

  • 数组长度
  • hash算法的个数

**(1)**首先,数组的长度是肯定会影响容错率的;如果数组的长度是100,0000;同样是添加21条数据;那么误判的概率就不会又这么高了(即111不存在的数据也能够判断mightContain为存在);所以数组的长度是一个影响误判的因素;
**(2)**第二,hash算法的个数;如果数组的长度是100,0000;且该布隆过滤器当中存在有10个hash算法
(不同hash函数越多,则误判的概率越小,即不同hash函数个数越多,即算的在布隆过滤器当中的下标位置越多,而只有当满足算出来的所有的下标位置的取自为1,才将返回可能存在;否则只要满足有一个算出来的index其下标取值不为1,即为0的这种情况则返回绝对不存在;),

(不同hash个数函数取余数组长度所得出的位置必须是全部为1才能够判断其值
[即判断值是否存在在布隆过滤当中]存在于布隆过滤器当中;
即存在有3个不同的hash函数,查询数值117;
布隆过滤器数组当中仅只有2个位置可证明该117存在于布隆过滤器当中,即这两个下标位置取值value为1,还有一个index value为0;
而hash个数有3个,此时能够证明的只有2个,所以判断为绝对不存在;
即只有当查询的数值通过n个不同的hash算法在布隆过滤器数组当中需要有n个不同下标地址的index其取值value必须全部为1;
才能够判断为可能存在在该布隆过滤器数组当中;否则判断为绝对不存在;)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-65yQxSK3-1616635337575)(http://r.photo.store.qq.com/psc?/V13IdniL4CDhqM/TCfiP1YaPeRT4Jil9RANX1loeM8HGC*c1AuLK0zUPiHTgZ1ljiSnfjWcgvZCtpiFflSI8eyoV.J9xxKSa2wiJDTtvnfmwuqphXXn4wPQ7B4!/r)]

即如果数组长度越小;而不同的hash函数个数越多;
则容错率就将会增大
(即误判的几率增大,也就是说在数组长度小的情况下,而hash函数多,即算出来的在布隆过滤器数组当中的下标位置越多,即这些算出来的下标位置的取值就都将会被改为1,到时候布隆过滤器数组当中的下标位置index就将会有大量取值变动由0变为1;这样就会增大误判的几率,也就是说容错率也就增大了;)

所以说;跟数组的长度以及hash函数的个数必须要配合使用才能够达到布隆过滤器最好的效果;(hash函数的个数必须要与数组长度相匹配;)

numBits 即Google Guava框架中创建出来的位数组的长度;
size:100,0000(预计插入数值记录行),fpp:0.001(容错率)=(创建出来)=>numBits:14377587(位数组长度),numHashFunction:10(hash函数的个数)

size:100,0000(预计插入数值记录行),fpp:0.1(容错率)=(创建出来)=>numBits:4792539(位数组长度),numHashFunction:3(hash函数的个数)

  • hash个数越多(容错率越高),运算开销越大;(性能和容错率之间取一个);
  • 要性能高则hash个数少点;要准确率越高则hash个数越多则容错率越高;
    容错率低好还是高好?看具体业务需求;
...
Preconditions.checkNotNull(strategy);
if(expectedInsertions == 0L){
  expectedInsertions = 1L;
}

//numBits 位数组长度
//expectInsertions 预计插入布隆过滤器数组的数据量
//fpp 容错率(判断出错的概率)
long numBits = optimalNumOfBits(expectedInsertions, fpp);

//hash函数个数
int numHashFunctions = optimalNumOfHashFunctions(expectedInsertions, numBits);

//numBits与numHashFunctions 需要进行配合使用,一起使用更佳哦~
//只需要关注预计插入布隆过滤器数据记录行与容错率即可;位数组长度与hash函数个数不需要进行关心;因为

try{
    return new BloomFilter(new LockFreeBitArray(numBits), numHashFunctions, funnel, strategy);
}catch(IllegalArgumentException var10){
  ....
}
/**
  * long n   预计插入数据量
  * double m 容错率
  * description 算出hash函数个数
  */
@VisibleForTesting
static int optimalNumOfHashFunctions(long n,double m){
  return Math.max(1, (int)Math.round( double(m) / (double)n * Math.log(2.0D) ) );
}

/**
  * long n   预计插入数据量
  * double p 容错率
  * description 算出位数组长度
  */
@VisibleForTesting
static long optimalNumOfBits(long n,double p){
  if(p == 0.0D){
    p = 4.9E-324D;
  }

  return (long)((double)(-n) * Math.log(p) / (Math.log(2.0D) * Math.log(2.0D) ) );
}

自己手写分布式的布隆过滤器

手写布隆过滤器之前;怎么来实现;

  • 需要有一个位数组
    • 该位数组如何来的呢?
    • 其实redis当中也存在有布隆过滤器;redis当中的位数组;redis当中的位数组哪里来;即首先redis当中需要存在这样一个数据结构;
FLUSHALL

keys *

set laowang abc #key:laowang;value:abc

# set api底层如何如何存储value的取值(redis底层如何来存储该key:laowang的取值value abc的?)
# redis底层进行存储key的取值value是通过位数组来进行存储的;

首先将 abc 该字符串转换为二进制取值;

/**
  *  a ~ z 97 ~ 122
  *  第 33 ~ 126 号(共 94 个)是字符,其中第 48 ~ 57 号为 0 ~ 9 这十个阿拉伯数字
  */
public static void main(String[] args){
  toBinary("abc");// 24---999   0   1000  1
  //打印结果为 1100001 1100010 1100011
  //1100001和1100010类似,其实在二进制当中a和b只需要将第六位和第七位调换一下顺序即可;
  //redis用位数组,二进制的方式来进行保存set key所对应的value取值
}

验证redis使用位数组(二进制)进行保存key的取值value;

setbit laowang 6 1 # 设置key为laowang其value值的二进制位;设置其value值的二进制位的第6位取值为1;

setbit laowang 7 0 # 设置key为laowang其value值的二进制位;设置其value值的二进制位的第7位取值为0;

get laowang # 获取redis当中key为laowang的取值value
#当前响应为 “bbc”

# 原本key:laowang存储的value取值为:abc
# abc 在redis底层存储结构为位数组,即二进制数值;即为1100001 1100010 1100011
# 当前setbit这两个操作即将 1100001 1100010 这两个进行了修改为 11000011 1100011
# 所以redis再次查询的时候其取值变为了 “bbc”

在redis当中,String类型的value值其在redis底层,就是通过位数组来进行保存存储的;
在底层当中,当前“abc” or “bbc” 其在redis底层当中位数长度为 3 * 8 = 24 位;
底层的位数组的长度只有这么长;

setbit laowang 1000 0 # 设置进行修改key为laowang 其value值在redis底层的位数组(二进制)的第1000个位置,即下标为999的这个位置index修改取值为0;
# 当时当前 key:laowang 该value值在redis底层的其位数组长度是没有1000的;
# 在java当中,执行setbit laowang 1000 0 就将会造成数组越界
# 那么在redis当中,当进行设置第1000个位置上取值为0时,abc 该value字符串在redis底层的位数组(二进制)是如何保存的就依然是如何保存,即依然保存为1100001 1100010 1100011;
# redis底层会自动进行扩容的操作;即abc的位数组长度为24;所占据的index为0 ~ 23,即前面index为 0 ~ 23时,存储二进制值依然为 1100001 1100010 1100011 ; 那么即从index为24开始到999,这之间则全部用0进行代替;(index为999即为第1000个位置的index)
#setbit laowang 1000 1 #即index为24到998全部为0,而下标为999(即第1000个位置)的取值为1;
# 中间扩容的取值全部使用 0 这一默认取值来进行代替;

get laowang;
#此时响应为“bbc\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\\x00\x00\x00\x00\x00\x00\x00\x00\x00....”
#redis底层进行扩容,扩容其默认取值为0;即扩容时是扩容的位数组,因为其二进制全部是0;而转化出来之后就是该结果

现在位数组已经找到了,且还能够进行自动扩容;也就是redis底层对String类型操作set key value这一api,底层所使用的的数据结构(位数组);

setbit laowang666 10000 0
# key:laowang666 ; value: 相当于创建了一个length为 10000 的位数组;并且其底层的value值为 0(二进制取自);即到第10000位置之前二进制数值全部是0;

# 在redis当中称之为 位图;本质还是属于String类型;
# redis版本5.0.5
keys *
# laowang666
# laowang

实现

application.yml

mybatis:
  mapper-locations: classpath:mybatis/mapper/*.xml
bloom:
  filter:
    expectedInsertions: 1000
    fpp: 0.001F
package com.xxx.testcache.filter;

import ...

#ConfigurationProperties 自动装配属性
@ConfigurationProperties("bloom.filter")
@Component
public class RedisBloomFilter{

  //预计插入量
  private long expectedInsertions;

  //误判率
  private double fpp;

  @Autowired
  private RedisTemplate redisTemplate;

  //bit数组长度
  private long numBits;

  //hash函数数量
  private int numHashFunctions;

  public long getExpectedInsertions(){return expectedInsertions;}

  public void setExpectedInsertions(long expectedInsertions){this.expectedInsertions = expectedInsertions;}

  public double getFpp(){return fpp;}

  public void setFpp(double fpp){this.fpp = fpp;}

  //@PostConstruct
  //即Spring容器初始化的时候
  //在该类进行初始化的时候,就将会回调该方法init;对numBits以及numHashFunctions进行初始化值
  @PostConstruct
  public void init(){
    this.numBits = optimalNumOfBits(expectedInsertions, fpp);
    this.numHashFunctions = optimalNumOfHashFunctions(expectedInsertions, fpp);
  }

//计算hash函数个数
/**
  * long n   预计插入数据量
  * double m 容错率
  * description 算出hash函数个数
  */
//@VisibleForTesting
static int optimalNumOfHashFunctions(long n,double m){
  return Math.max(1, (int)Math.round( double(m) / (double)n * Math.log(2.0D) ) );
}

//计算bit数组长度
/**
  * long n   预计插入数据量
  * double p 容错率
  * description 算出位数组长度
  */
//@VisibleForTesting
static long optimalNumOfBits(long n,double p){
  if(p == 0.0D){
    p = 4.9E-324D;
  }

  return (long)((double)(-n) * Math.log(p) / (Math.log(2.0D) * Math.log(2.0D) ) );
}

/**
  * 判断 keys 是否存在于集合,
  * 是 则返回 true
  * 否 则返回 false
  */
  public boolean isExists(String key){
    long[] indexs = getIndexs(key);
    List list = redisTemplate.executePipelined(new RedisCallback<Object>(){

      @Nullable
      @Override
      public Object doInRedis(RedisConnection redisConnection) throws DataAccessException{
        redisConnection.openPipeline();
        for(long index : indexs){
          redisConnection.getBit("bf:laowang".getBytes(),index);
        }
        redisConnection.close();
        return null;
      }
    });

    return !list.contains(false);
  }

/**
  * 将 key 存入 redis bitmap 位图
  */
public void put(String key){// 老王laowang
  long[] indexs = getIndexs(key);
  redisTemplate.executePipeline(new RedisCallback<Object>(){

    @Nullable
    @Override
    public Object doInRedis(RedisConnection redisConnection) throws DataAccessException{
      redisConnection.openPipeline();//打开管道,提高效率
      for(long index:indexs){
        redisConnection.setBit("bf:laowang".getBytes(), index, true);
        //setbit laowang 7262 1
        //此时key写死;
        //index将修改的动态的index下标位置;
        //0代表false;1代表true;即与扩容默认为0修改取值为1做法一致;在redis当中0是位数组进行扩容时的默认值;1则为put进来的数据其位置下标index就需要改变该index上的取值0变成1;在java当中true即代表1;false即代表0
      }
      redisConnection.close();
      return null;
    }
  })
}

/**
  * 根据 key 获取 bitmap 下标; 一个hash函数对 20+1  20+2 3个不同的hash函数
  */
  private long[] getIndexs(String key){ //laowang 老王
    long hash1 = hash(key);
    long hash2 = hash1 >>> 16;
    long[] result = new long[numHashFunctions]; // 10
    for( int i = 0 ; i < numHashFunctions ; i++ ){ //numHashFunctions hash函数的数量
    long combinedHash = hash1 + i * hash2;
    //i 动态值,所以2个不同的hash再加上该一个带有动态值i则将会有三个不同的取值
    //引用三个不同的hash算法对同一个字符串key进行运算也可以的;
    if(combinedHash < 0){
        combinedHash = ^combinedHash;
    }
    result[i] = combinedHash % numBits;//对数组的长度numBits进行取余
    }
    return result;
  }
}

测试

expectedInsertions = 1000
fpp = 0.001
numBits = 14377
numHashFunctions = 10

key = "1001"
indexs = {long[10]@7667}
0 = 7262
1 = 6438
2 = 5614
3 = 4790
4 = 3966
5 = 3142
6 = 2318
7 = 1494
8 = 670
9 = 14223
# 0 ~ 9 :10个下标

RedisDataInit

package com.xxx.testcache.datainit;

import ...

@Component
public class RedisDataInit{
  @Autowired
  OrderService orderService;

  @Autowired
  RedisBloomFilter redisBloomFilter;

  @PostConstruct
  public void init(){
    List<Order> orders = orderService.selectOrderAll();
    for(Order order:orders){
      redisBloomFilter.put(String.valueOf(order.getId));//订单表的id
      //将订单表中的id添加到布隆过滤器当中在容器启动的时候
    }
  }
}

进行查询

service impl(实现)层:

//提高性能
//保护数据库
public R selectOrderById(Integer id){

  //解决缓存穿透
  //如果布隆过滤器当中存在id则说明数据库当中存在该条记录;则可以继续往下走
  //返回false说明布隆过滤器当中并不存在该id也就说明数据库以及缓存当中都不会有该条数据id
  //此处布隆过滤器由于存在误判的情况也就是容错率;但是现在应用的场景是缓存穿透也就是避免去进行查询数据库;而布隆过滤器误判的结果也就是仅仅让数据库多查一次而已;所以设想下如果有100000个恶意查询空对象请求;那么相比于100000个恶意查询空对象请求而言,布隆过滤器则要显得十分优越了;即便是存在有误判也比100000好;也需要看容错率的概率是多少;
  if (!bloomFilter.isExist(String.valueOf(id))){
      return new R().setCode(500).setData(new NullValueResultDO()).setMsg("非法访问");
  }

  //查询缓存;
  Object redisObj = valueOperations.get(String.valueOf(id));

  //命中缓存
  if(redisObj != null){
    //正常返回数据
    return new R().setCode(200).setData(redisObj).setMsg("OK");
  }

  try{
      Order order = orderMapper.selectOrderById(id);
      if(order != null){
        valueOperations.set(String.valueOf(id), order, 10, TimeUnit.MINUTES);// 加入缓存
      }
  }finally{

  }
  return new R().setCode(500).setData(new NullValueResultDO()).setMsg("查询无果");

}

测试

localhost:8080/selectid?id=2002
//由于有布隆过滤器的存在将不会去进行查询redis缓存也不会去查询数据库
//该数据在数据库当中不存在;
//响应:{"code":600 , "data":{}, "msg":"非法访问"}
localhost:8080/selectid?id=2003
//由于有布隆过滤器的存在将不会去进行查询redis缓存也不会去查询数据库
//该数据在数据库当中不存在;
//响应:{"code":600 , "data":{}, "msg":"非法访问"}
localhost:8080/selectid?id=2004
//由于有布隆过滤器的存在将不会去进行查询redis缓存也不会去查询数据库
//该数据在数据库当中不存在;
//响应:{"code":600 , "data":{}, "msg":"非法访问"}

------------------------------------------------------------------

localhost:8080/selectid?id=1001
//响应:{"code":200 , "data":{"id":1001, "name":"老王1"}, "msg":"OK"}
localhost:8080/selectid?id=1002
//响应:{"code":200 , "data":{"id":1002, "name":"老王2"}, "msg":"OK"}
localhost:8080/selectid?id=1003
//响应:{"code":200 , "data":{"id":1003, "name":"老王3"}, "msg":"OK"}
localhost:8080/selectid?id=1004
//响应:{"code":200 , "data":{"id":1004, "name":"老王4"}, "msg":"OK"}
localhost:8080/selectid?id=1005
//响应:{"code":200 , "data":{"id":1005, "name":"老王5"}, "msg":"OK"}

复杂:

  • 维护麻烦
  • 往数据库当中进行添加数据时,同时也需要往布隆过滤器当中进行添加put;不然的就就会被布隆过滤器当做是非法请求被拦截下来;
  • 如果订单表当中存在10万的数据;删除了8万的数据量;现在仅剩下2万的数据量;但是布隆过滤器没有办法进行删除;这个时候就需要进行考虑重建布隆过滤器即重新初始化该布隆过滤器;即从头到尾的再去跑一边put方法,即又重新将订单表当中的记录进行查询出来将其order id重新put入布隆过滤器当中
  • 这个重建布隆过滤器得使用定时任务来进行;定时任务多少合适需要看删除的数据量大小多不多;

缓存击穿详解及实战

上述代码存在两大问题,一是缓存穿透;二是缓存击穿;
缓存击穿问题也就是对于热点数据访问的问题;

缓存击穿:在查询时,数据库有数据,缓存当中没有(数据刚好失效);

情况分析:

(1)这条数据没有人访问过;说明该数据并非为热点数据;
(2)一般为数据刚好失效(由于将数据加入到缓存当中时,会加入一个过期时间;比如该数据当好面临过期时间失效,即理解为redis自动将该数据进行删除了[“自动删除”需要加双引号;涉及到redis当中的一个删除特点;可以先这么认为redis现在自动将该数据进行了删除],但是来了并发访问,由于该数据又是一个热点数据);

缓存击穿和缓存穿透是两个不同的问题;

模拟热点数据失效访问情形:

keys *
#laowang666
#laowang
#bf:laowang
package com.xxx.testcache.test;

import ...

public class Test{
  private static CountDownLatch countDownLatch = new CountDownLatch(99);

  @org.junit.Test
  public void test() throws InterruptedException{
    TicketsRunBle ticketsRunBle = new TicketsRunBle();
    for( int i = 0 ; i < 99 ; i++){
      Thread thread = new Thread(ticketsRunBle, "窗口");
      thread.start();
      countDownLatch.countDown();
    }
    Thread.currentThread.join();
  }

  public class TicketsRunBle implements Runnable{
    try{
      countDownLatch.await();
    }catch(InterruptedException e){
      e.printStackTrace();
    }

//    try{
//        Socket socket = new Socket("127.0.0.1", 1333);
//    }catch(IOException e){
//      e.printStackTrace();
//    }

    RestTemplate restTemplate = new RestTemplate();
    List<HttpMessageConverter<?>> fastJsonHttpMessageConverters = new ArrayList<>();
    fastJsonHttpMessageConverters.add(new FastJsonHttpMessageConverters);
    restTemplate.setMessageConverters(fastJsonHttpMessageConverters);
    R forObject = restTemplate.getForObject("http://localhost:8080/selectid?id=1001",R.class);
    System.out.println(forObject);
  }
}

即可以认为该段代码被99个人(线程)同时执行;

  //99个线程同时进行布隆过滤器的判断
 if (!bloomFilter.isExist(String.valueOf(id))){
      return new R().setCode(500).setData(new NullValueResultDO()).setMsg("非法访问");
  }

  //99个线程同时查询缓存;
  Object redisObj = valueOperations.get(String.valueOf(id));

  //当热点数据失效时,则此时redisObj则为空
  //命中缓存
  if(redisObj != null){
    //正常返回数据
    return new R().setCode(200).setData(redisObj).setMsg("OK");
  }

  //所以此时就会有99个线程同时去查询数据库;
  try{
      Order order = orderMapper.selectOrderById(id);
      if(order != null){
        valueOperations.set(String.valueOf(id), order, 10, TimeUnit.MINUTES);// 加入缓存
      }
  }finally{

  }
  return new R().setCode(500).setData(new NullValueResultDO()).setMsg("查询无果");

即上述代码将会有99次进行查询数据库的记录;

解决方案
使用分布式锁,即查询数据库之前进行加锁;
OrderServiceImpl

@Autowired
valueOperations valueOperations;

@Autowired
CacheTemplate cacheTemplate;

@Autowired
RedisBloomFilter RedisBloomFilter;

@Autowired
private RedisLock redisLock;

@Autowired
RedisTemplate redisTemplate;

@Autowired
SendMessageUtil sendMessageUtil;

@Override
public Integer insertOrder(Order order){
  Integer integer = orderMapper.insertOrder(order);
  return integer;
}
...

//提高性能
//保护数据库
public R selectOrderById(Integer id){
  //解决缓存穿透
  if (!bloomFilter.isExist(String.valueOf(id))){
      return new R().setCode(500).setData(new NullValueResultDO()).setMsg("非法访问");
  }

  //查询缓存;
  Object redisObj = valueOperations.get(String.valueOf(id));

  //命中缓存
  if(redisObj != null){
    if( redisObj instanceOf NullValueResultDO){
      return new R().setCode(500).setData(new NullValueResultDO()).setMsg("查询无果");
    }

    //正常返回数据
    return new R().setCode(200).setData(redisObj).setMsg("OK");
  }

  //在查询数据库之前进行加锁
  redisLock.lock(String.valueOf(id));

    try{

    //查询缓存;
    redisObj = valueOperations.get(String.valueOf(id));
    //命中缓存
    if(redisObj != null){
      if( redisObj instanceOf NullValueResultDO){
        return new R().setCode(500).setData(new NullValueResultDO()).setMsg("查询无果");
      }
      //正常返回数据
      return new R().setCode(200).setData(redisObj).setMsg("OK");
    }

      //查询数据库
      Order order = orderMapper.selectOrderById(id);
      if(order != null){
        valueOperations.set(String.valueOf(id), order, 10, TimeUnit.MINUTES);// 加入缓存
      }
  }finally{
        //解锁
        redisLock.unlock(String.valueOf(id));
  }
  return new R().setCode(500).setData(new NullValueResultDO()).setMsg("查询无果");
}

大并发访问数据库最容易导致数据库崩掉;

测试并模拟;

keys *
del 1001

上述代码仅会有1次访问数据库的机会;
分析:

//提高性能
//保护数据库
public R selectOrderById(Integer id){
  //99个线程同时访问布隆过滤器判断该id是否非法访问
  if (!bloomFilter.isExist(String.valueOf(id))){
      return new R().setCode(500).setData(new NullValueResultDO()).setMsg("非法访问");
  }

  //99个线程同时查询缓存;
  Object redisObj = valueOperations.get(String.valueOf(id));

  //由于热点数据的失效;所以导致99个线程同时都没有命中缓存
  //命中缓存
  if(redisObj != null){
    if( redisObj instanceOf NullValueResultDO){
      return new R().setCode(500).setData(new NullValueResultDO()).setMsg("查询无果");
    }

    //正常返回数据
    return new R().setCode(200).setData(redisObj).setMsg("OK");
  }

  //由于在查询数据库之前进行加锁,所以此时99个线程将会依次进行查询数据库操作;(并发)
  //即此时有一个线程拿着锁进入了代码块;还有98个线程在外面等着;
  redisLock.lock(String.valueOf(id));

    try{

      //相当于99个线程中第一个进入代码块的那一个线程会查询两次redis缓存;查询一次数据库;查询完成之后将数据加入到缓存当中去;finally最后进行解锁;紧接着98个线程第二个线程又会拿着锁进来;.....
      //转折点就在这里;当第一个进来的线程将数据查询完成并加入到缓存之后;后面的98个线程再次到redis缓存当中进行取值的时候那么这个时候就能够有值获取出来了;由于第一个线程已经查询过数据库并将取值加入到了缓存当中去了;

      //虽然有锁阻塞,但是却只有一次操作数据库此处;98个线程都去进行操作缓存;所以说这个效率是十分明显的;非常快;

    //查询缓存;
    redisObj = valueOperations.get(String.valueOf(id));
    //命中缓存
    if(redisObj != null){
      if( redisObj instanceOf NullValueResultDO){
        return new R().setCode(500).setData(new NullValueResultDO()).setMsg("查询无果");
      }
      //正常返回数据
      return new R().setCode(200).setData(redisObj).setMsg("OK");
    }

      //查询数据库
      Order order = orderMapper.selectOrderById(id);
      if(order != null){
        valueOperations.set(String.valueOf(id), order, 10, TimeUnit.MINUTES);// 加入缓存
      }
  }finally{
        //解锁
        redisLock.unlock(String.valueOf(id));
  }
  return new R().setCode(500).setData(new NullValueResultDO()).setMsg("查询无果");
}

如果此处没有使用分布式锁而是使用的synchronized则;selectOrderById查询数据为1001、1002、1003、1004、1005查询id不同,查询数据并不相同,但是却还是会被阻塞;这个效率就很低了;
而刚刚那种处理方式是:redisLock.lock(String.valueOf(id));是对查询id相同;即该这一类都是查询相同同一个id的并发请求的锁;
针对的是对于同一个查询内容的多个线程进行阻塞;
而对于不同查询内容的多个线程是进行分别阻塞的;
也就是说假设有99*2个线程,该99个线程都是进行查询id为1001的查询请求;则redisLock则对该98个线程进行阻塞,其中一个线程拿着id为1001的锁先进入代码块进行查询;(此时id为1001的数据过期失效)
而另外99个线程都是进行查询id为1002的查询请求所以此时这另外的这99个线程(查询id为1002的这99个线程),也会存在有一个线程拿着id为1002的锁先进入代码块,而其余的98个线程在外等候,阻塞;(此时id为1001的数据过期失效)

synchronized与redisLock这两者锁的粒度不一样;

RedisLock

//ThreadLock用于保存某个线程共享变量
//对于同一个static ThreadLocl,不同线程只能从中get,set,remove自己的变量,而不会影响其他的线程
private ThreadLocal<String> threadLocal = new ThreadLocal<>();

@Override
public void lock(String key){
  boolean b = tryLock(key);
  if(b){
    return;
  }
  try{
      Thread.sleep(50);
  }catch(InterruptedException e){
    e.printStackTrace();
  }
  lock(key);
}

@Override
public boolean tryLock(String key){
  String uuid = UUID.randomUUID().toString();
  ...
}

代码封装
OrderServiceImpl

@Override
public R selectOrderById(Integer id){
  return cacheTemplate.redisFindCache(String.valueOf(id), 10, TimeUnit.MINUTES, new CacheLoadble{
    @Override
    public Order load(){
      return orderMapper.selectOrderById(id);//此处仅需要写自己的业务逻辑即可
    }
  },true);
}

...

@Override
public List<Order> selectOrderAll(){ return orderMapper.selectOrderAll();}

@Override
public Integer updateOrder(Order order){
  redisTemplate.delete(String.value(order.getId()));
  Integer integer = orderMapper.updateOrder(order);
  order.setName("0");
  sendMessageUtil.placeOrderMessage(order);
  return integer;
}
}

CacheTemplate

package com.xxx.testcache.template;

import ...

@Component
public class CacheTemplate<T>{

@Autowired
private ValueOperations valueOperations;

@Autowired
RedisBloomFilter redisBloomFilter;

@Autowired
private Redisson redisson;

//private Lock lock = new ReentrantLock();

//查询缓存  有     直接返回前端
//          没有   查询数据库,加入缓存放回
//为什么要使用缓存:保护数据库;提高性能
/**
  * key 键
  * expire 过期时间
  * unit 过期时间单位
  * cacheLoadble 回调接口方法
  * 
  */
public R redisFindCache(String key, long expire, TimeUnit unit, CacheLoadble<T> cacheLoadble, boolean b){
 //--------------------- 缓存穿透-------------------------------------
  //解决缓存穿透
  if(!bloomFilter.isExist(key)){
    return new R().setCode(600).setData(new NullValueResultDO()).setMsg("非法访问");
  }

  //查询缓存
  Object redisObj = valueOperations.get(String.valueOf(id));

//-----------------------缓存击穿--------------------------------------

  //命中缓存
  if(redisObj != null){
    //正常返回数据
    return new R().setCode(200).setData(redisObj).setMsg("OK");
  }

  redisLock.lock(key);
  try{
      //查询缓存;
      redisObj = valueOperations.get(String.valueOf(id));

      //命中缓存
      if(redisObj != null){
          //正常返回数据
          return new R().setCode(200).setData(redisObj).setMsg("OK");
      }

     T load = cacheLoadble.load();//查询数据库
    if(load != null){
      valueOperations.set(key, load, expire, unit);//加入缓存
      return new R().setCode(200).setData(redisObj).setMsg("OK");
    }

  }finally{
      //解锁
      redisLock.unlock(String.valueOf(key));
  }
  return new R().setCode(500).setData(new NullValueResultDO()).setMsg("查询无果");
}
}

缓存雪崩及解决方案

缓存雪崩问题:

  1. redis服务器挂了
  2. 大部分数据失效

比如有100W的商品数据,设置的过期时间统一是10天;则10天后大部分数据统一失效;而查询请求不断;这个时候也就容易导致数据库崩掉;即引发雪崩问题;

(redis缓存击穿指的是热点数据失效解决的是并发请求)

针对这种情况的处理方式:

[规避]
(1)redis搭建高可用集群(cluster)
(2)错开数据过期时间

如果已经出现了缓存雪崩问题;则处理方式为降级 熔断;

数据一致性及解决方案

什么时候出现数据不一致的情况

  • 更新数据的时候;
    • 先更新数据库,再更新缓存
      • (步骤1)先更新数据库(步骤2)再更新缓存;
        • 如果此时更新缓存失败;则会导致数据库中是新数据,缓存中是旧数据,数据就出现了不一致
      • 1.线程A更新数据库;2.线程B更新数据库;4.线程A更新缓存;3.线程B更新缓存;
        • 更新数据库如果因为网络等原因,B却比A更早更新了缓存,这就导致了脏数据(不一致)
        • 解决方案:先删除缓存;再修改数据库。如果数据库修改失败了,那么数据库中是旧数据,缓存中是空的,那么数据不会不一致;因为读的时候缓存没有,则读数据库中旧数据,然后更新到缓存中。
      • 解决方案一[延时双删]:1.线程A删除缓存 2.线程B查询缓存,发现为空 3.线程B查询数据库并加入缓存 4.线程A更新数据库 5.线程A删除缓存(第五步一定要执行成功)
      • 解决方案二[串行化]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-URoR88yy-1616635337577)(http://r.photo.store.qq.com/psc?/V13IdniL4CDhqM/TCfiP1YaPeRT4Jil9RANX4pgLmAu8VahKl7KcdIHB0TUz5G4e7vKThDgFA4qONmduoQbaGmQQkOcSL4ZRToJCp.Uf19bAH3AwkYd90x7O1U!/r)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-PpFRz7Yl-1616635337577)(http://r.photo.store.qq.com/psc?/V13IdniL4CDhqM/TCfiP1YaPeRT4Jil9RANX.XryE.7uV.ikOi43KtZHMMeFc5xE9CdpCZxb813hOndwi1w1ZBWEexHrxFZIBZfelNT79b7aY12qTQp7NauUZ8!/r)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-D7f6vIYG-1616635337578)(http://r.photo.store.qq.com/psc?/V13IdniL4CDhqM/TCfiP1YaPeRT4Jil9RANX3mOzKCW7tUhJUWL8ztU7xPjPeJoo30G7me5QGmt7Sxi5j5klGAArIsMW*y8bO.byfXwfDCTDh.6x5I.sLatg!/r)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-iUqUc26a-1616635337580)(http://r.photo.store.qq.com/psc?/V13IdniL4CDhqM/TCfiP1YaPeRT4Jil9RANXS1qKXmK2eYWqUYxPcUsgsF6LV72VsmLXrkJ.UnQ7ePKGiWhTZhWMp1b6T2ekmO6bUvjPP2Xe6sCMSxuyCrPc!/r)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-b5eYR78i-1616635337580)(http://r.photo.store.qq.com/psc?/V13IdniL4CDhqM/TCfiP1YaPeRT4Jil9RANXS1qKXmK2eYWqUYxPcUsgsF6LV72VsmLXrkJ.UnQ7ePKGiWhTZhWMp1b6T2ekmO6bUvjPP2Xe6sCMSxuyCrPc!/r)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-3MMvQvIr-1616635337581)(http://r.photo.store.qq.com/psc?/V13IdniL4CDhqM/TCfiP1YaPeRT4Jil9RANXzQqwZMYfv*FkdsRt6pLKtRDNNQ8Pv2ce3feyq1.6u.ytk1wClBaFMU15A2dvCGK8aMDt0c6XmLkiGohBxyNLTo!/r)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-blbRvqHM-1616635337582)(http://r.photo.store.qq.com/psc?/V13IdniL4CDhqM/TCfiP1YaPeRT4Jil9RANX2CGSzajo.b8bZBcqLFBQeGmscwTpIsRf3EyyWT3nGZ8bOZlQ82XlsrsHCkdZ7RoJHLOsDtirE785Nk*g.jaA!/r)]

第五步一定要执行成功,实现:
StockListener采用了延时队列;

@Autowired
SendMessageUtil sendMessageUtil;

@RabbitListener(queue = "stock_queue", containerFactory="simpleRabbitListenerContainerFactory")
public void getStockMessage(Message message, Channel channel) throws Exception{
  System.out.println("接收到了消息"+new String(message.getBody(),"UTF-8"));

  Order order = JSON.parseObject(new String(message.getBody(),"UTF-8"), Order.class);
  try{
      //System.out.println(1/0);
      redisTemplate.delete(String.valueOf(Order.getId()));
      channel.basicAck(message.getMessageProperties().getDeliveryTag,false);
  }catch(Exception e){
      int count = Integer.parseInt(order.getName());
      System.out.println(order.getName());
      channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
      //重试5次;如果5次仍然不成功则记录到日志;到时候人为进行处理(人为干预);
      if(count!=5){
        count++;
        order.setName(String.valueOf(count));
        sendMessageUtil.placeOrderMessage(order);
        return;
      }
      loggerFactory.error(String.valueOf(order.getId));
      e.printStackTrace();
  }
}

redis 版本6.0之后即为多线程;

标签:缓存,Redis,查询,四大,当中,过滤器,new,id
来源: https://blog.csdn.net/qq_43409111/article/details/115195883

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有