ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

分布式幂等 2 (基于请求参数幂等)- 自定义接口幂等(注解) @AvoidResubmit(isLoc = false,interval=3000)

2022-08-05 11:05:22  阅读:149  来源: 互联网

标签:body return String 自定义 interval request import false null


场景:用于接口请求参数幂等,基于请求参数判断在3s(interval)时间内是否重复提交,重复提交,则直接返回 {"code":2500,"message":"重复提交"}

EVN :  springboot 2.3.12  + jdk8

使用: 1.(在需要做类似幂等的接口加上注解)@AvoidResubmit   interval: 两次相同请求的最小时间间隔(ms),小于这个时间,认为是重复提交 

      isLoc=true  : 表示单机版本,使用本地缓存,不需要配置redis ,  isLoc = flase , 用于分布式服务,需要配置redis 

     2.请求时必须带上认证请求头 Authorization,如果接口不需要认证 Authorization 的值, 可以是当前sessionId  or  userId  ,  多次请求唯一即可   

    @PostMapping("add") 
    @AvoidResubmit(isLoc = false,interval = 2000)
    public JSONObject add(@RequestBody @Validated AgreeDO agree)
    {
        // doingreturn ResponseUtil.getResult(ResponseCode.SUCCESS.getCode(),ResponseCode.SUCCESS.getMessage(),null);
    }

eg:   

curl -X POST "http://localhost:7213/agree/add" -H "accept: */*" -H "Content-Type: application/json" -H "Authorization:a369361156674bb496fc94ee49c84bd6_1659490855779" 
-d "{ \"avatar\": \"string\", \"remark\": \"string\", \"ts\": 0, \"type\": \"string\", \"typeId\": \"string\", \"userId\": \"string\", \"userName\": \"string\"}"

    {"code":2500,"message":"数据重复提交"}

依赖:

<dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
 <dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>22.0</version>
</dependency>

 

添加如下三个类:

AvoidResubmit AvoidResubmitHandler,  SeaRequestBodyHolder(解决获取body问题)
/**
 * 自定义注解,用于是否做方重复提交数据检测
 */
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface AvoidResubmit {
    boolean isLoc() default true;
     //* 间隔时间(ms),小于此时间视为重复提交
     int interval() default 3000;
}

 

AvoidResubmitHandler
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.icil.bx.common.config.SeaRequestBodyHolder;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.annotation.Order;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.stereotype.Component;
import org.springframework.util.DigestUtils;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import javax.servlet.http.HttpServletRequest;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
 * 依赖:
 *         <dependency>
 *               <groupId>org.springframework.boot</groupId>
 *               <artifactId>spring-boot-starter-data-redis</artifactId>
 *            </dependency>
 *             <dependency>
 *             <groupId>com.google.guava</groupId>
 *             <artifactId>guava</artifactId>
 *             <version>22.0</version>
 *         </dependency>
 */
/***************************
 *<pre>
 * @Project Name : bx-blog-service
 * @Package      : com.icil.bx.handler
 * @File Name    : ResubmitLock
 * @Author       :  Sea
 * @Date         : 8/2/22 3:25 PM
 * @Purpose      : 接口幂等
 * @History      :
 *</pre>
 ************* 基于请求参数校验的幂等操作 =>  **************/
/**
 * 目的:  自定义切片,防止表单重复提交,网络抖动, 尤其是服务间调用,retry,防止重复提交数据
 *     如果在单位时间内(自定义2s内), 同一个用户请求的数据一样,则认为是重复提交
 * 1. 提交数据,需要携带认证  AUTHORIZATION_TOKEN_HEADER = "Authorization";
 * 2. 请求时,生成唯一 key  uri+token+ body.md5
 * 3. 验证 唯一 key 是否存在, 并且时间间隔是否小于设定
 * 4. 如果存在,并且小于设定时间间隔, 则说明重复提交,响应: 重复提交  {"code":2500,message:"重复提交"}
 */
@Slf4j
@Aspect
@Component
public class AvoidResubmitHandler{
    private final static  int RESUBMIT_CODE = 2500;
    private final static  int RESUBMIT_NO_TOKEN = 2501;
    private final static  String CODE = "code";
    private final static  String OK = "200";
    /**
     * @param joinPoint
     * @param avoidResubmit
     * @return
     * @throws Throwable
     */
    @Order(-1)
    @Around("@annotation(avoidResubmit)")
    public Object handlerAvoidResubmit(ProceedingJoinPoint joinPoint, AvoidResubmit avoidResubmit) throws Throwable
    {
        List<Object> cacheKeyList = new ArrayList<>();
        try
        {
            HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
            //把body 放入list, 后面取出, 因为@RequestBody 在接口处已经获取了一次,后面通过HttpServletRequest,无法获取了
            //另外 用于解决非 post方法(put del), 请求参数防抖
            Object[] bodyArgs = joinPoint.getArgs();
            cacheKeyList.add((bodyArgs!=null&&bodyArgs.length>0)?JSON.toJSONString(bodyArgs):null);
            String resubmitToken = request.getHeader(AUTHORIZATION_TOKEN_HEADER);// Authorization
            if(!this.checkTokenExist(resubmitToken,avoidResubmit,request,cacheKeyList))
            {
                if(StringUtils.isBlank(resubmitToken)){ //2501
                    return new JSONObject(){{put(CODE,RESUBMIT_NO_TOKEN);put("message","没有认证信息,bad token");}};}
                    //controller 我的接口层默认返回的对象是 JSONObject;//2500
                    return new JSONObject(){{put(CODE,RESUBMIT_CODE);put("message","数据重复提交");}};
            }
        }catch (Exception e)
        {   //操作过程出错,回滚
            e.printStackTrace();
            ifNotOkClearLockAndCache(null,avoidResubmit.isLoc(),  cacheKeyList.isEmpty()?"":cacheKeyList.get(0)+"");
        }
        finally
        {       //释放资源,避免内存泄露
               SeaRequestBodyHolder.resetRequestBody();
        }

        Object result = joinPoint.proceed();//
        // 可能存在网络问题,响应不 ok,需要立马再次提交,所以如果code 不是 200 , 可以再次立马提交
        ifNotOkClearLockAndCache( result, avoidResubmit.isLoc(),  cacheKeyList.isEmpty()?"":cacheKeyList.get(0)+"");
        return result;
    }

        /**
         * 如果code不是 200, 回滚,放行
         * @param result : 此次result : JSONObject  我的ResponseUtils : 封装 new JSONObject(){{put("code",2500);put("message","xx","data":"");}}
         * @param isLoc
         * @param cacheKey
         */
        private void ifNotOkClearLockAndCache(Object result,Boolean isLoc, String cacheKey){
          try
          {
              if(StringUtils.isBlank(cacheKey)){return;}
              JSONObject myResult =result==null ? new JSONObject(): (JSONObject) result ;
              String code =  myResult.get(CODE)==null?null:myResult.get(CODE)+"";//code
              code = StringUtils.isBlank(code)?myResult.get(CODE.toUpperCase())+"":code;//CODE
                //code:2500 数据重复提交
              if(!OK.equalsIgnoreCase(code)&&!(RESUBMIT_CODE+"").equalsIgnoreCase(code)){
                  System.err.println("roll back");
                    if(isLoc){
                        cacheLoc.invalidate(cacheKey);
                        MyNxLockUtils.unlock("lc"+cacheKey,"1");
                    }else
                    {
                        redisTemplate.delete(cacheKey);
                        redisTemplate.delete("lc"+cacheKey);
                    }
              }
          }catch (Exception e)
          { e.printStackTrace();
            log.error(e+"");
          }
        }



    // ################### base logic handler  ###################
    //对于用户唯一
    private static String  AUTHORIZATION_TOKEN_HEADER = "Authorization";
    private static String  RESUBMIT_PREFIX =  "resubmit_";
    private static long    cacheTimeOutSec = 10;//3min
    public  static  Cache<String, Long> cacheLoc =  CacheBuilder.newBuilder().expireAfterWrite(cacheTimeOutSec, TimeUnit.SECONDS).build();

    private static RedisTemplate<String,Object> redisTemplate ;
    @Autowired
    public  void setRedisTemplate(RedisTemplate redisTemplate) {
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        redisTemplate.setKeySerializer(stringRedisSerializer);
        redisTemplate.setValueSerializer(stringRedisSerializer);
        this.redisTemplate = redisTemplate;
    }

    /**
     * if true .  psss
     * @param token
     * @param avoidResubmit
     * @param request
     * @return
     */
    public Boolean checkTokenExist(String token, AvoidResubmit avoidResubmit,HttpServletRequest request,List<Object> cacheKeyList)
    {   //token is Authorization
        if(StringUtils.isBlank(token)){return false;}
        if(avoidResubmit.isLoc())
        {
         return checkExistInLoc(token,avoidResubmit,request,cacheKeyList);
        }else
        {
         return checkExistInRedis(token,avoidResubmit,request,cacheKeyList);
        }
    }



    /**
     * 检测请求body是否之前发送过,解决重复点击问题(抖动)
     * if exist, return  false
     * if not exist , add  key:uri , value: body MD5
     * remark : 如果接口使用了@RequestBody ,就无法再次获取body,  并且出现:getInputStream() has already been called for this request
     * @param request
     */
    private String getBodyMD5(HttpServletRequest request) {
        try {
          /*BufferedReader reader = request.getReader();
            if(reader==null){return null;}
            String body = IOUtils.toString(reader);*/
            String body = SeaRequestBodyHolder.getRequestBody();
            if(body==null){return null;}
            System.err.println("body---"+body);
            //MD5
            MessageDigest md5 = MessageDigest.getInstance("MD5");
            return Base64.getEncoder().encodeToString(md5.digest(body.getBytes("utf-8")));
        } catch (Exception e) {
            log.warn("get body from HttpServletRequest error , I will get it from request Args later "+e);
            return null;
        }
    }

    /**
     * if exist return true
     * @param token
     * @param avoidResubmit
     * @param request
     * @return
     */
    private Boolean checkExistInLoc(String token,AvoidResubmit avoidResubmit,HttpServletRequest request,List<Object> cacheKeyList){
        Boolean result = true;
        //检测body
        String bodyMD5 = getBodyMD5(request);
        if(bodyMD5==null&&cacheKeyList.get(0)!=null){
           bodyMD5 =  DigestUtils.md5DigestAsHex(cacheKeyList.remove(0).toString().getBytes());
        }
        cacheKeyList.clear();
        if(bodyMD5!=null)
        {
            //类似分布式锁
            String key = DigestUtils.md5DigestAsHex((token + request.getRequestURI()+bodyMD5).getBytes());
            String lcKey="lc"+key;
            Boolean lock = MyNxLockUtils.getLock(lcKey, "1");
            if(!lock){return false;}
            //检测请求body是否之前发送过,解决重复点击问题
            Long reqBeforeTs = cacheLoc.getIfPresent(key);
            System.err.println("reqBeforeTs"+reqBeforeTs);
            if(reqBeforeTs!=null)//之前请求过
            {
                System.err.println("diff time :" +(new Date().getTime() - reqBeforeTs));
                //2s 以内,认为是重复提交
                if(new Date().getTime() - reqBeforeTs < avoidResubmit.interval())
                {
                    result = false;
                }else{ //超过规定的时间,删除 key ,放置新的值
                    cacheLoc.put(key,new Date().getTime());
                }
            }else
            {
                cacheKeyList.add(key); //添加到List,后面异常情况补偿回滚处理,当前请求不ok的情况
                cacheLoc.put(key,new Date().getTime());
            }
            MyNxLockUtils.unlock(lcKey,"1");
        }
        return  result;
    }

    /**
     * if exist return true
     * @param token
     * @param avoidResubmit
     * @param request
     * @return
     */
    private Boolean checkExistInRedis(String token,AvoidResubmit avoidResubmit,HttpServletRequest request,List<Object> cacheKeyList){
        Boolean result = true;
        //检测body
        String bodyMD5 = getBodyMD5(request);
        if(bodyMD5==null&&cacheKeyList.get(0)!=null){
            bodyMD5 =  DigestUtils.md5DigestAsHex(cacheKeyList.remove(0).toString().getBytes());
        }
        cacheKeyList.clear();
        if(bodyMD5!=null)
        {
            //类似分布式锁
            String key = RESUBMIT_PREFIX+DigestUtils.md5DigestAsHex((token + request.getRequestURI()+bodyMD5).getBytes());
            String lcKey="lc"+key;
            Boolean getLock = redisTemplate.opsForValue().setIfAbsent( lcKey , "1", 5, TimeUnit.SECONDS);
            if(!getLock){return false;}
            //检测请求body是否之前发送过,解决重复点击问题
            Object reqBeforeTs = redisTemplate.opsForValue().get(key);
            System.err.println("reqBeforeTs"+reqBeforeTs);
            if(reqBeforeTs!=null)//之前请求过
            {
                System.err.println("diff time :" +(new Date().getTime() - Long.valueOf(reqBeforeTs+"")));
                //2s 以内,认为是重复提交
                if(new Date().getTime() - Long.valueOf(reqBeforeTs+"") < avoidResubmit.interval())
                {
                    result = false;
                }else{ //超过规定的时间,删除 key ,放置新的值
                    redisTemplate.opsForValue().set(key,new Date().getTime()+"");
                }
            }else
            {
                cacheKeyList.add(key); //添加到List,后面异常情况补偿回滚处理,当前请求不ok的情况
                redisTemplate.opsForValue().set(key,new Date().getTime()+"");
            }
            redisTemplate.delete(lcKey);
        }
        return  result;
    }


    /**
     * 自定义loc类分布式锁
     */
    public static class MyNxLockUtils {
        private static  volatile String  lcPoint0="xx0", lcPoint1="xx1", lcPoint2="xx2" ,lcPoint3="xx3", lcPoint4="xx4",
                                         lcPoint5="xx5", lcPoint6="xx6", lcPoint7="xx7", lcPoint8="xx8", lcPoint9="xx9";
        /**
         * 分段枷锁,提升效率
         * @param k
         * @return
         */
        private static String getLcPoint(String k){
            switch (k.hashCode()%10){
                case 1: return lcPoint1;
                case 2: return lcPoint2;
                case 3: return lcPoint3;
                case 4: return lcPoint4;
                case 5: return lcPoint5;
                case 6: return lcPoint6;
                case 7: return lcPoint7;
                case 8: return lcPoint8;
                case 9: return lcPoint9;
                default: return lcPoint0;
            }
        }
        private static Cache<String, String> lockCache = null;
        static {
            lockCache =  CacheBuilder.newBuilder().expireAfterWrite(10, TimeUnit.SECONDS).build();
        }
        /**
         * @param k
         * @param v
         * @return
         */
        public static Boolean getLock(String k, String v){
            if(lockCache.getIfPresent(k)==null)
            {
                synchronized (getLcPoint(k)){
                    if(lockCache.getIfPresent(k)==null){
                        lockCache.put(k,v);
                        return true;
                    }
                }
            }
            return false;
        }
        public static  void unlock(String k, String v)
        {
            String ifPresent = lockCache.getIfPresent(k);
            if(ifPresent!=null&&ifPresent.equals(v)){
                lockCache.invalidate(k);
            }
        }
    }
}

 

SeaRequestBodyHolder
import com.sea.handler.AvoidResubmit;
import org.apache.commons.io.IOUtils;
import org.springframework.core.MethodParameter;
import org.springframework.core.NamedThreadLocal;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpInputMessage;
import org.springframework.http.converter.HttpMessageConverter;
import org.springframework.web.bind.annotation.ControllerAdvice;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import org.springframework.web.servlet.mvc.method.annotation.RequestBodyAdviceAdapter;
import javax.servlet.http.HttpServletRequest;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;

/***************************
 *<pre>
 * @Project Name : Sea-blog-service
 * @Package      : com.sea.common.config
 * @File Name    : SeaRequestBodyHolder
 * @Author       :  Sea
 * @Date         : 8/4/22 2:11 PM
 * @Purpose      :  body 默认只能获取一次: 如果接口使用了@RequestBody , HttpServletRequest在接口会获取一次body,后面就无法再次获取body,
 *                  并且出现:getInputStream() has already been called for this request
 *                  此处在 获取body后,优选缓存一份body
 * @History      :
 *</pre>
 ***************************/
@ControllerAdvice
public class SeaRequestBodyHolder extends RequestBodyAdviceAdapter {
    private static String  requestBody = "body";
    private static final ThreadLocal<Map<String,String>> requestBodyHolder = new NamedThreadLocal("Sea Request body holder");
    public static String getRequestBody(){
        try{
            String body = requestBodyHolder.get().get(requestBody);
            resetRequestBody();
            return body;
        }catch (Exception e){
            return null;
        }
    }
    public static void resetRequestBody(){
          requestBodyHolder.remove();
    }
    @Override
    public boolean supports(MethodParameter methodParameter, Type type, Class<? extends HttpMessageConverter<?>> aClass) {
        return true;
    }
    //如果仅仅是post 方法防抖,可以在此处操作
    @Override
    public HttpInputMessage beforeBodyRead(HttpInputMessage httpInputMessage, MethodParameter methodParameter, Type type, Class<? extends HttpMessageConverter<?>> aClass) throws IOException {
        HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
//        System.err.println(request.getMethod());
        // 不包含注解 @CheckResubmit 不需要读body 直接返回
        if(!methodParameter.hasMethodAnnotation(AvoidResubmit.class)){
            return super.beforeBodyRead(httpInputMessage, methodParameter, type, aClass);
        }
        // 第一次 读取body
        String bodyStr = IOUtils.toString(httpInputMessage.getBody(), StandardCharsets.UTF_8);
        // 重新new 一个 HttpInputMessage
        return new HttpInputMessage() {
            @Override
            public InputStream getBody() throws IOException {
                String body = bodyStr;
                //放入requestBodyHolder,后面获取
                requestBodyHolder.set(new HashMap<String, String>(){{put(requestBody,body);}});
                // 重新写入 body
                return new ByteArrayInputStream(body.getBytes(StandardCharsets.UTF_8));
            }
            @Override
            public HttpHeaders getHeaders() {
                return httpInputMessage.getHeaders();
            }
        };
    }
}

 

标签:body,return,String,自定义,interval,request,import,false,null
来源: https://www.cnblogs.com/lshan/p/16553596.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有