ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

TransmittableThreadLocal使用踩坑-(主线程set,异步线程get)

2021-12-26 19:35:54  阅读:482  来源: 互联网

标签:线程 set Thread get static new import 上下文


背景:为了获取相关字段方便,项目里使用了TransmittableThreadLocal上下文,在异步逻辑中get值时发现并非当前请求的值,且是偶发状况。

发现:TransmittableThreadLocal是阿里开源的可以实现父子线程值传递的工具,其子线程必须使用TtlRunnable\TtlCallable修饰或者线程池使用TtlExecutors修饰(防止数据“污染”),而Tomcat线程池是没有被TtlExecutors修饰的,所以请求中存在异步操作,且使用了TransmittableThreadLocal上下文,那么就有可能出现线程不安全的问题。

话不多说上代码:

封装的上下文,成员变量RequestHeader

package org.example.ttl.threadLocal;

import com.alibaba.ttl.TransmittableThreadLocal;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import org.apache.commons.lang3.ObjectUtils;

/**
 * description:
 * author: JohnsonLiu
 * create at:  2021/12/24  23:19
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class RequestContext {

    private static final ThreadLocal<RequestContext> transmittableThreadLocal = new TransmittableThreadLocal();


    private static final RequestContext INSTANCE = new RequestContext();
    private RequestHeader requestHeader;


    public static void create(RequestHeader requestHeader) {
        transmittableThreadLocal.set(new RequestContext(requestHeader));
    }

    public static RequestContext current() {
        return ObjectUtils.defaultIfNull(transmittableThreadLocal.get(), INSTANCE);
    }

    public static RequestHeader get() {
        return current().getRequestHeader();
    }

    public static void remove() {
        transmittableThreadLocal.set(null);
    }


    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    @ToString
    static class RequestHeader {
        private String requestUrl;
        private String requestType;
    }
}

获取上下文内容的case:


package org.example.ttl.threadLocal;

import com.alibaba.ttl.threadpool.TtlExecutors;

import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* description: TransmittableThreadLocal正确使用
* author: JohnsonLiu
* create at: 2021/12/24 22:24
* 验证结论:
* 1.线程池必须使用TtlExecutors修饰,或者Runnable\Callable必须使用TtlRunnable\TtlCallable修饰
* ---->原因:子线程复用如果有修改上下文内容会对下次使用造成“污染”,而修饰后的子线程在执行run方法后会进行“回放”,防止污染;
* <p>
* 2.特殊情况下会存在线程安全问题:Tomcat线程池没有被TtlExecutors修饰(所以不存在回放),其子线程也是复用,如果我们业务存在 :
* 请求(Thread-1)-->创建上下文(Thread-1)-->异步任务(任务中存在获取上下文行为)Thread-2-->响应(Thread-1),恰巧在异步任务还未执行到获取上下文的时候且Tomcat回收的该线程又执行请求打到同一台机器上并新创建了新的上下文,
* 上下文写法如RequestContext,此时上次请求的异步任务获取的上下文就会被这次请求新创建的上下文覆盖,造成数据错误,存在“同线程”安全问题。
* --->原因:TransmittableThreadLocal也好,ThreadLocal也罢,其内部都是维护一个ThreadLocalMap,并且是跟Thread绑定,正是由于线程复用,所以同一个Thread可能会存在连续短时间内请求到同一台机器的含有异步任务的接口上,
* 所以也就可能存在ThreadLocalMap复用(缓存),ThreadLocalMap.Entry覆盖,从而造成线程不安全问题。
*/
public class TransmittableThreadLocalCase2 {

// 为达到线程100%复用便于测试,线程池核心数1

private static final Executor executor = TtlExecutors.getTtlExecutor(new ThreadPoolExecutor(1, 1, 1000, TimeUnit.MICROSECONDS, new LinkedBlockingQueue<>(1000)));
// 如果使用一般的线程池或者Runnable\Callable时,会存在线程“污染”,比如线程池中线程会复用,如果复用的线程修改了上下文的值,会“污染”该线程执行下一次任务
// private static final Executor executor = new ThreadPoolExecutor(1, 1, 1000, TimeUnit.MICROSECONDS, new LinkedBlockingQueue<>(1000));


public static void main(String[] args) {

RequestContext.create(new RequestContext.RequestHeader("url", "get"));
System.out.println(Thread.currentThread().getName() + " 子线程(rm之前 同步):" + RequestContext.get());
// 模拟另一个线程修改上下文内容
executor.execute(() -> {
RequestContext.create(new RequestContext.RequestHeader("url", "put"));
});

// 保证上面子线程修改成功
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}

// 异步获取上下文内容
executor.execute(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 子线程(rm之前 异步):" + RequestContext.get());
});

// 主线程修改上下文内容
RequestContext.create(new RequestContext.RequestHeader("url", "post"));
System.out.println(Thread.currentThread().getName() + " 子线程(rm之前 同步<reCreate>):" + RequestContext.get());

// 主线程remove
RequestContext.remove();

// 子线程获取remove后的上下文内容
executor.execute(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 子线程(rm之后 异步):" + RequestContext.get());
});
}
}
 
使用一般线程池结果:
使用修饰后的线程池结果:

 

这种问题的解决办法:

如果大家跟我一样存在这样的使用,那么也会低概率存在这样的问题,一种优化的方式是:

主线程创建上下文后,执行子任务的时候传递都子任务中,子任务中方法透传或者在子任务中把传递过来的数据再次创建一个上下文供子任务单独使用,使用完记得remove。

 

另外还有一种不同于上面的上下文用法,同样使用不当也会存在线程安全问题:

上代码样例

package org.example.ttl.threadLocal;

import com.alibaba.ttl.TransmittableThreadLocal;

import java.util.LinkedHashMap;
import java.util.Map;

/**
 * description: TransmittableThreadLocal正确使用
 * author: JohnsonLiu
 * create at:  2021/12/24  23:19
 */
public class ServiceContext {

    private static final ThreadLocal<Map<Integer, Integer>> transmittableThreadLocal = new TransmittableThreadLocal() {
        /**
         * 如果使用的是TtlExecutors装饰的线程池或者TtlRunnable、TtlCallable装饰的任务
         * 重写copy方法且重新赋值给新的LinkedHashMap,不然会导致父子线程都是持有同一个引用,只要有修改取值都会变化。引用值线程不安全
         * parentValue是父线程执行子任务那个时刻的快照值,后续父线程再次set值也不会影响子线程get,因为已经不是同一个引用
         * @param parentValue
         * @return
         */
        @Override
        public Object copy(Object parentValue) {
            if (parentValue instanceof Map) {
                System.out.println("copy");
                return new LinkedHashMap<Integer, Integer>((Map) parentValue);
            }
            return null;
        }

        /**
         * 如果使用普通线程池执行异步任务,重写childValue即可实现子线程获取的是父线程执行任务那个时刻的快照值,重新赋值给新的LinkedHashMap,父线程修改不会影响子线程(非共享)
         * 但是如果使用的是TtlExecutors装饰的线程池或者TtlRunnable、TtlCallable装饰的任务,此时就会变成引用共享,必须得重写copy方法才能实现非共享
         * @param parentValue
         * @return
         */
        @Override
        protected Object childValue(Object parentValue) {
            if (parentValue instanceof Map) {
                System.out.println("childValue");
                return new LinkedHashMap<Integer, Integer>((Map) parentValue);
            }
            return null;
        }

        /**
         * 初始化,每次get时都会进行初始化
         * @return
         */
        @Override
        protected Object initialValue() {
            System.out.println("initialValue");
            return new LinkedHashMap<Integer, Integer>();
        }
    };

    public static void set(Integer key, Integer value) {
        transmittableThreadLocal.get().put(key, value);
    }

    public static Map<Integer, Integer> get() {
        return transmittableThreadLocal.get();
    }

    public static void remove() {
        transmittableThreadLocal.remove();
    }
}

使用case:

package org.example.ttl.threadLocal;

import com.alibaba.ttl.threadpool.TtlExecutors;

import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * description: TransmittableThreadLocal正确使用
 * author: JohnsonLiu
 * create at:  2021/12/24  22:24
 */
public class TransmittableThreadLocalCase {


    private static final Executor executor = TtlExecutors.getTtlExecutor(new ThreadPoolExecutor(1, 1, 1000, TimeUnit.MICROSECONDS, new LinkedBlockingQueue<>(1000)));
//    private static final Executor executor = new ThreadPoolExecutor(1, 1, 1000, TimeUnit.MICROSECONDS, new LinkedBlockingQueue<>(1000));

    static int i = 0;

    public static void main(String[] args) {

        ServiceContext.set(++i, i);

        executor.execute(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " 子线程(rm之前):" + ServiceContext.get());
        });

        ServiceContext.set(++i, i);
        ServiceContext.remove();

        executor.execute(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " 子线程(rm之后):" + ServiceContext.get());
        });
    }
}

 

代码中已有详细的说明,大家可自行copy代码验证。

 

以上仅代表个人理解,如有错误之处,还请留言指教,进行更正!

 

 

 

 

 

 


标签:线程,set,Thread,get,static,new,import,上下文
来源: https://www.cnblogs.com/JohnsonLiu/p/15733680.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有