ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

微服务:多级缓存下

2022-04-09 01:03:34  阅读:194  来源: 互联网

标签:缓存 服务 -- ngx 多级 redis 查询 item local


多级缓存下

Redis缓存预热

Redis缓存会面临冷启动问题:

冷启动:服务刚刚启动时,Redis中并没有缓存,如果所有商品数据都在第一次查询时添加缓存,可能会给数据库带来较大压力。

缓存预热:在实际开发中,我们可以利用大数据统计用户访问的热点数据,在项目启动时将这些热点数据提前查询并保存到Redis中。

1)利用Docker安装Redis

docker run --name redis -p 6379:6379 -d redis redis-server --appendonly yes

2)在item-service服务中引入Redis依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

3)配置Redis地址

spring:
  redis:
    host: 192.168.150.101

4)编写初始化类

缓存预热需要在项目启动时完成,并且必须是拿到RedisTemplate之后。

这里我们利用InitializingBean接口来实现,因为InitializingBean可以在对象被Spring创建并且成员变量全部注入后执行。

@Component
public class RedisHandler implements InitializingBean {
    @Autowired
    private StringRedisTemplate redisTemplate;
    @Autowired
    private IItemService itemService;
    @Autowired
    private IItemStockService iItemStockService;

    private static final ObjectMapper MAPPER = new ObjectMapper();
    @Override
    public void afterPropertiesSet() throws Exception {
        //初始化缓存
        //1.查询商品信息
        List<Item> itemList = itemService.list();
        //2.放入缓存
        for (Item item:itemList) {
            //2.1 item序列化为json
            String json = MAPPER.writeValueAsString(item);
            //2.2将json存入
            redisTemplate.opsForValue().set("item:id:"+item.getId(),json) ;
        }

        //3.查询库存信息
        List<ItemStock> stockList = iItemStockService.list();
        //4.放入缓存
        for (ItemStock stock:stockList) {
            //4.1 item序列化为json
            String json = MAPPER.writeValueAsString(stock);
            //4.2将json存入
            redisTemplate.opsForValue().set("stock:id:"+stock.getId(),json) ;
        }
    }
}

Redis缓存

封装Redis工具

OpenResty提供了操作Redis的模块,我们只要引入该模块就能直接使用。但是为了方便,我们将Redis操作封装到之前的common.lua工具库中。

修改/usr/local/openresty/lualib/common.lua文件:

1)引入Redis模块,并初始化Redis对象

-- 导入redis
local redis = require('resty.redis')
-- 初始化redis
local red = redis:new()
red:set_timeouts(1000, 1000, 1000)

2)封装函数,用来释放Redis连接,其实是放入连接池

-- 关闭redis连接的工具方法,其实是放入连接池
local function close_redis(red)
    local pool_max_idle_time = 10000 -- 连接的空闲时间,单位是毫秒
    local pool_size = 100 --连接池大小
    local ok, err = red:set_keepalive(pool_max_idle_time, pool_size)
    if not ok then
        ngx.log(ngx.ERR, "放入redis连接池失败: ", err)
    end
end

3)封装函数,根据key查询Redis数据

-- 查询redis的方法 ip和port是redis地址,key是查询的key
local function read_redis(ip, port, key)
    -- 获取一个连接
    local ok, err = red:connect(ip, port)
    if not ok then
        ngx.log(ngx.ERR, "连接redis失败 : ", err)
        return nil
    end
    -- 查询redis
    local resp, err = red:get(key)
    -- 查询失败处理
    if not resp then
        ngx.log(ngx.ERR, "查询Redis失败: ", err, ", key = " , key)
    end
    --得到的数据为空处理
    if resp == ngx.null then
        resp = nil
        ngx.log(ngx.ERR, "查询Redis数据为空, key = ", key)
    end
    close_redis(red)
    return resp
end

4)导出

-- 将方法导出
local _M = {  
    read_http = read_http,
    read_redis = read_redis
}  
return _M

完整的common.lua:

-- 导入redis
local redis = require('resty.redis')
-- 初始化redis
local red = redis:new()
red:set_timeouts(1000, 1000, 1000)

-- 关闭redis连接的工具方法,其实是放入连接池
local function close_redis(red)
    local pool_max_idle_time = 10000 -- 连接的空闲时间,单位是毫秒
    local pool_size = 100 --连接池大小
    local ok, err = red:set_keepalive(pool_max_idle_time, pool_size)
    if not ok then
        ngx.log(ngx.ERR, "放入redis连接池失败: ", err)
    end
end

-- 查询redis的方法 ip和port是redis地址,key是查询的key
local function read_redis(ip, port, key)
    -- 获取一个连接
    local ok, err = red:connect(ip, port)
    if not ok then
        ngx.log(ngx.ERR, "连接redis失败 : ", err)
        return nil
    end
    -- 查询redis
    local resp, err = red:get(key)
    -- 查询失败处理
    if not resp then
        ngx.log(ngx.ERR, "查询Redis失败: ", err, ", key = " , key)
    end
    --得到的数据为空处理
    if resp == ngx.null then
        resp = nil
        ngx.log(ngx.ERR, "查询Redis数据为空, key = ", key)
    end
    close_redis(red)
    return resp
end

-- 封装函数,发送http请求,并解析响应
local function read_http(path, params)
    local resp = ngx.location.capture(path,{
        method = ngx.HTTP_GET,
        args = params,
    })
    if not resp then
        -- 记录错误信息,返回404
        ngx.log(ngx.ERR, "http查询失败, path: ", path , ", args: ", args)
        ngx.exit(404)
    end
    return resp.body
end
-- 将方法导出
local _M = {  
    read_http = read_http,
    read_redis = read_redis
}  
return _M

实现Redis查询

接下来,我们就可以去修改item.lua文件,实现对Redis的查询了。

查询逻辑是:

  • 根据id查询Redis
  • 如果查询失败则继续查询Tomcat
  • 将查询结果返回

1)修改/usr/local/openresty/lua/item.lua文件,添加一个查询函数:

-- 导入common函数库
local common = require('common')
local read_http = common.read_http
local read_redis = common.read_redis
-- 封装查询函数
function read_data(key, path, params)
    -- 查询本地缓存
    local val = read_redis("127.0.0.1", 6379, key)
    -- 判断查询结果
    if not val then
        ngx.log(ngx.ERR, "redis查询失败,尝试查询http, key: ", key)
        -- redis查询失败,去查询http
        val = read_http(path, params)
    end
    -- 返回数据
    return val
end

2)而后修改商品查询、库存查询的业务:

image-20210821114528954

3)完整的item.lua代码:

-- 导入common函数库
local common = require('common')
local read_http = common.read_http
local read_redis = common.read_redis
-- 导入cjson库
local cjson = require('cjson')

-- 封装查询函数
function read_data(key, path, params)
    -- 查询本地缓存
    local val = read_redis("127.0.0.1", 6379, key)
    -- 判断查询结果
    if not val then
        ngx.log(ngx.ERR, "redis查询失败,尝试查询http, key: ", key)
        -- redis查询失败,去查询http
        val = read_http(path, params)
    end
    -- 返回数据
    return val
end

-- 获取路径参数
local id = ngx.var[1]

-- 查询商品信息
local itemJSON = read_data("item:id:" .. id,  "/item/" .. id, nil)
-- 查询库存信息
local stockJSON = read_data("item:stock:id:" .. id, "/item/stock/" .. id, nil)

-- JSON转化为lua的table
local item = cjson.decode(itemJSON)
local stock = cjson.decode(stockJSON)
-- 组合数据
item.stock = stock.stock
item.sold = stock.sold

-- 把item序列化为json 返回结果
ngx.say(cjson.encode(item))

nginx本地缓存

现在,整个多级缓存中只差最后一环,也就是nginx的本地缓存了。如图:

image-20210821114742950

本地缓存API

OpenResty为Nginx提供了shard dict的功能,可以在nginx的多个worker之间共享数据,实现缓存功能。

1)开启共享字典,在nginx.conf的http下添加配置:

 # 共享字典,也就是本地缓存,名称叫做:item_cache,大小150m
 lua_shared_dict item_cache 150m; 

2)操作共享字典:

-- 获取本地缓存对象
local item_cache = ngx.shared.item_cache
-- 存储, 指定key、value、过期时间,单位s,默认为0代表永不过期
item_cache:set('key', 'value', 1000)
-- 读取
local val = item_cache:get('key')

案例:在查询商品时,优先查询OpenResty的本地缓存

需求:

  • 修改item.lua中的read_data函数,优先查询本地缓存,未命中时再查询Redis、Tomcat

  • 查询Redis或Tomcat成功后,将数据写入本地缓存,并设置有效期

  • 商品基本信息,有效期30分钟

  • 库存信息,有效期1分钟

-- 导入common函数库
local common = require('common')
local read_http = common.read_http
local read_redis = common.read_redis
--导入cjson
local cjson = require('cjson')
--导入共享词典
local item_cache = ngx.shared.item_cache


--封装查询函数
function read_data(key,expire,path,params)
    --查询本地缓存
    local val = item_cache:get(key)
    ngx.log(ngx.ERR,"val:",val)
    if not val then
        ngx.log(ngx.ERR,"本地缓存查询失败,key:",key) 
        --如果没空,则查询redis
        val = read_redis("127.0.0.1",6379,key)
        --判断查询结果
        if not resp then
            --redis查询失败查http
            ngx.log(ngx.ERR,"redis查询失败,key:",key)
            val = read_http(path,params)
        end
    end
    --将数据写入本地缓存
    item_cache:set(key,val,expire)
    return val
    
end

local id =ngx.var[1]

--查询商品信息
local itemJson = read_data("item:id:" .. id,1800,"/item/" .. id,nil)
--查询库存信息
local stockJson = read_data("stock:id:" .. id,60,"/item/stock/" .. id,nil)
--json转化为lua的table,反序列化
local item = cjson.decode(itemJson)
local stock = cjson.decode(stockJson)
--组合数据
item.stock = stock.stock
item.sold = stock.sold
--序列化后返回结果
ngx.say(cjson.encode(item))

缓存同步

缓存数据同步的常见方式有三种:

设置有效期:给缓存设置有效期,到期后自动删除。再次查询时更新

  • 优势:简单、方便
  • 缺点:时效性差,缓存过期之前可能不一致
  • 场景:更新频率较低,时效性要求低的业务

同步双写:在修改数据库的同时,直接修改缓存

  • 优势:时效性强,缓存与数据库强一致
  • 缺点:有代码侵入,耦合度高;
  • 场景:对一致性、时效性要求较高的缓存数据

异步通知:修改数据库时发送事件通知,相关服务监听到通知后修改缓存数据

  • 优势:低耦合,可以同时通知多个缓存服务
  • 缺点:时效性一般,可能存在中间不一致状态
  • 场景:时效性要求一般,有多个服务需要同步

而异步实现又可以基于MQ或者Canal来实现:

1)基于MQ的异步通知:

image-20210821115552327

解读:

  • 商品服务完成对数据的修改后,只需要发送一条消息到MQ中。
  • 缓存服务监听MQ消息,然后完成对缓存的更新

依然有少量的代码侵入。

2)基于Canal的通知

image-20210821115719363

解读:

  • 商品服务完成商品修改后,业务直接结束,没有任何代码侵入
  • Canal监听MySQL变化,当发现变化后,立即通知缓存服务
  • 缓存服务接收到canal通知,更新缓存

代码零侵入


认识Canal

Canal [kə'næl],译意为水道/管道/沟渠,canal是阿里巴巴旗下的一款开源项目,基于Java开发。基于数据库增量日志解析,提供增量数据订阅&消费。GitHub的地址:https://github.com/alibaba/canal

Canal是基于mysql的主从同步来实现的,MySQL主从同步的原理如下:

image-20210821115914748

  • 1)MySQL master 将数据变更写入二进制日志( binary log),其中记录的数据叫做binary log events
  • 2)MySQL slave 将 master 的 binary log events拷贝到它的中继日志(relay log)
  • 3)MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

而Canal就是把自己伪装成MySQL的一个slave节点,从而监听master的binary log变化。再把得到的变化信息通知给Canal的客户端,进而完成对其它数据库的同步。

image-20210821115948395

监听Canal

Canal提供了各种语言的客户端,当Canal监听到binlog变化时,会通知Canal的客户端。

image-20210821120049024

我们可以利用Canal提供的Java客户端,监听Canal通知消息。当收到变化的消息时,完成对缓存的更新。

不过这里我们会使用GitHub上的第三方开源的canal-starter客户端。地址:https://github.com/NormanGyllenhaal/canal-client

与SpringBoot完美整合,自动装配,比官方客户端要简单好用很多。

  • 引入依赖
<dependency>
    <groupId>top.javatool</groupId>
    <artifactId>canal-spring-boot-starter</artifactId>
    <version>1.2.1-RELEASE</version>
</dependency>
  • 编写配置

    canal:
      destination: heima # canal的集群名字,要与安装canal时设置的名称一致
      server: 192.168.150.101:11111 # canal服务地址
    
  • 编写逻辑

    //首先给实体类加上对应的注解
    @Data
    @TableName("tb_item")
    public class Item {
        @TableId(type = IdType.AUTO)
        @Id //代表主键
        private  Long id;//商品id
        private String name;//商品名称
        private String title;//商品标题
        private Long price;//价格(分)
        private String image;//商品图片
        private String category;//分类名称
        private String brand;//品牌名称
        private String spec;//规格
        private Integer status;//商品状态 1-正常,2-下架
        private Date createTime;//创建时间
        private Date updateTime;//更新时间
        @TableField(exist = false)
        @Transient //代表不存在于表中
        private Integer stock;
        @TableField(exist = false)
        @Transient
        private Integer sold;
    }
    
  • 编写监听逻辑

    @Component
    @CanalTable("tb_item")
    public class ItemHandle implements EntryHandler<Item> {
        @Autowired
        private RedisHandler redisHandler;
    
        @Autowired
        private Cache<Long,Item> itemCache;
    
        @Override
        public void insert(Item item) {
            //JVM新增
            itemCache.put(item.getId(),item);
            //redis新增
            redisHandler.saveItem(item);
        }
    
        @Override
        public void update(Item before, Item after) {
            itemCache.put(after.getId(),after);
            redisHandler.saveItem(after);
        }
    
        @Override
        public void delete(Item item) {
            //JVM删除
            itemCache.invalidate(item.getId());
            //redis删除
            redisHandler.deleteById(item.getId());
    
        }
    }
    //来自redisHandler中的方法
    public void saveItem(Item item){
        //2.1 item序列化为json
        String json = null;
        try {
            json = MAPPER.writeValueAsString(item);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        //2.2将json存入
        redisTemplate.opsForValue().set("item:id:"+item.getId(),json);
    }
    public void deleteById(Long id){
        redisTemplate.delete("item:id:"+id);
    }
    

RabbitMQ高级特性

消息队列在使用过程中,面临着很多实际问题需要思考:

image-20210718155003157

消息可靠性

消息从发送,到消费者接收,会经理多个过程:

image-20210718155059371

其中的每一步都可能导致消息丢失,常见的丢失原因包括:

  • 发送时丢失:
    • 生产者发送的消息未送达exchange
    • 消息到达exchange后未到达queue
  • MQ宕机,queue将消息丢失
  • consumer接收到消息后未消费就宕机

针对这些问题,RabbitMQ分别给出了解决方案:

  • 生产者确认机制
  • mq持久化
  • 消费者确认机制
  • 失败重试机制

生产者消息确认

RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。这种机制必须给每个消息指定一个唯一ID。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。

返回结果有两种方式:

  • publisher-confirm,发送者确认
    • 消息成功投递到交换机,返回ack
    • 消息未投递到交换机,返回nack
  • publisher-return,发送者回执
    • 消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。

image-20210718160907166

注意:

image-20210718161707992

案例:SpringAMQP实现生产者确认

  • 修改配置

首先,修改publisher服务中的application.yml文件,添加下面的内容:

spring:
  rabbitmq:
    publisher-confirm-type: correlated
    publisher-returns: true
    template:
      mandatory: true
   

说明:

  • publish-confirm-type:开启publisher-confirm,这里支持两种类型:
    • simple:同步等待confirm结果,直到超时
    • correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
  • publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
  • template.mandatory:定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息

编写逻辑

每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目加载时配置:

修改publisher服务,添加一个:

//编写publisher-return
@Configuration
@Slf4j
public class CommonConfig implements ApplicationContextAware {

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        //获取rabbitTemplate对象
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        //lambda表达式
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            //记录日志
            log.error("消息发送到队列失败,响应码:{},失败原因:{},交换机:{},路由key:{}",
                    replyCode,replyText,exchange,routingKey);
            //重发消息等。。
        });
    }
}
//编写publisher-confirm
@Test
public void testSendMessage2SimpleQueue() throws InterruptedException {
    //准备消息
    String message = "hello, spring amqp!";

    //准备correlationData
    //准备消息id
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

    correlationData.getFuture().addCallback(result -> {
        if(result.isAck()){
            //ACK
            log.debug("消息成功投递到交换机!消息id:{}",correlationData.getId());
        }else{
            //NACK
            log.error("发送失败,消息id:{}",correlationData.getId());
            //重发消息
        }
    }, ex -> {
        log.error("消息发送失败!"+ex);
        //重发消息
    });

    //发送消息
    rabbitTemplate.convertAndSend("amq.topic", "simple.test", message, correlationData);
}
//消息成功投递到交换机!消息id:ec49a9e8-d218-4279-b314-c3d12f896c85
//发送失败,消息id:3d5d2b55-d194-4c29-833e-eb921b0e265f
//消息发送到队列失败,响应码:312,失败原因:NO_ROUTE,交换机:amq.topic,路由key:saimple.test

小结

SpringAMQP中处理消息确认的几种情况:

publisher-comfirm:

  • 消息成功发送到exchange,返回ack

  • 消息发送失败,没有到达交换机,返回nack

  • 消息发送过程中出现异常,没有收到回执

  • 消息成功发送到exchange,但没有路由到queue,调用ReturnCallback


消息持久化

生产者确认可以确保消息投递到RabbitMQ的队列中,但是消息发送到RabbitMQ以后,如果突然宕机,也可能导致消息丢失。

要想确保消息在RabbitMQ中安全保存,必须开启消息持久化机制。

  • 交换机持久化
  • 队列持久化
  • 消息持久化

持久化交换机

/**
 * 持久化的交换机
 * @return
 */
@Bean
public DirectExchange simpleDirect(){
    return new DirectExchange("simpleDirect",true,false);
}

持久化队列

/**
 * 持久化的队列
 * @return
 */
@Bean
public Queue simpleQueue(){
    return QueueBuilder.durable("simple.queue").build();
}

消息持久化

利用SpringAMQP发送消息时,可以设置消息的属性(MessageProperties),指定delivery-mode:

  • 1:非持久化
  • 2:持久化
@Test
public void testD(){
    Message msg = MessageBuilder.withBody("hello_world".getBytes(StandardCharsets.UTF_8))//消息体
        .setDeliveryMode(MessageDeliveryMode.PERSISTENT)//消息持久化
        .build();
    rabbitTemplate.convertAndSend("simple.queue",msg);
}

消费者确认

RabbitMQ是阅后即焚机制,RabbitMQ确认消息被消费者消费后会立刻删除。

而RabbitMQ是通过消费者回执来确认消费者是否成功处理消息的:消费者获取消息后,应该向RabbitMQ发送ACK回执,表明自己已经处理消息。

设想这样的场景:

  • 1)RabbitMQ投递消息给消费者
  • 2)消费者获取消息后,返回ACK给RabbitMQ
  • 3)RabbitMQ删除消息
  • 4)消费者宕机,消息尚未处理

这样,消息就丢失了。因此消费者返回ACK的时机非常重要。

而SpringAMQP则允许配置三种确认模式:

  • manual:手动ack,需要在业务代码结束后,调用api发送ack。

  • auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack

  • none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除

由此可知:

  • none模式下,消息投递是不可靠的,可能丢失
  • auto模式类似事务机制,出现异常时返回nack,消息回滚到mq;没有异常,返回ack
  • manual:自己根据业务情况,判断什么时候该ack

一般,我们都是使用默认的auto即可。

使用spring提供的默认消息确认

  • 修改配置文件
spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto

在异常位置打断点,再次发送消息,程序卡在断点时,可以发现此时消息状态为unack(未确定状态):

image-20210718171705383

抛出异常后,因为Spring会自动返回nack,所以消息恢复至Ready状态,并且没有被RabbitMQ删除:

image-20210718171759179


失败重试机制

当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力:

image-20210718172746378

我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。

修改consumer服务的application.yml文件,添加内容:

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000 # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

重启consumer服务,重复之前的测试。可以发现:

  • 在重试3次后,SpringAMQP会抛出异常AmqpRejectAndDontRequeueException,说明本地重试触发了
  • 查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是ack,mq删除消息了

结论:

  • 开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试
  • 重试达到最大次数后,Spring会返回ack,消息会被丢弃

失败重试策略

在之前的测试中,达到最大重试次数后,消息会被丢弃,这是由Spring内部机制决定的。

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理,它包含三种不同的实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式

  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队

  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。

1)在consumer服务中定义处理失败消息的交换机和队列

@Bean
public DirectExchange errorMessageExchange(){
    return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
    return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
    return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}

2)定义一个RepublishMessageRecoverer,关联队列和交换机

@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
    return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}

总结

如何确保RabbitMQ消息的可靠性?

  • 开启生产者确认机制,确保生产者的消息能到达队列
  • 开启持久化功能,确保消息未消费前在队列中不会丢失
  • 开启消费者确认机制为auto,由spring确认消息处理成功后完成ack
  • 开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理

标签:缓存,服务,--,ngx,多级,redis,查询,item,local
来源: https://www.cnblogs.com/Boerk/p/16120599.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有