ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

JUC

2022-07-02 22:34:01  阅读:171  来源: 互联网

标签:JUC return System 线程 println CompletableFuture out


并发大纲

java.util.concurrent包涵盖三块内容

  • atomic
  • locks
  • 其他

start线程解读

  • 初始程序
public static void main(String[] args) {
        Thread t1 = new Thread(() ->{
        },"t1");
        t1.start();
    }
//start
    public synchronized void start() {
        /**
         * This method is not invoked for the main method thread or "system"
         * group threads created/set up by the VM. Any new functionality added
         * to this method in the future may have to also be added to the VM.
         *
         * A zero status value corresponds to state "NEW".
         */
        if (threadStatus != 0)
            throw new IllegalThreadStateException();

        /* Notify the group that this thread is about to be started
         * so that it can be added to the group's list of threads
         * and the group's unstarted count can be decremented. */
        group.add(this);

        boolean started = false;
        try {
            start0();
            started = true;
        } finally {
            try {
                if (!started) {
                    group.threadStartFailed(this);
                }
            } catch (Throwable ignore) {
                /* do nothing. If start0 threw a Throwable then
                  it will be passed up the call stack */
            }
        }
    }
private native void start0();//start0是一个native方法
  • native调用了本地方法,我们可以通过下载官网OpenJDK查看其源码

    • thread.c
      java线程是通过start的方法启动执行的,主要内容在native方法start0中
      Openjdk的写JNI一般是一一对应的,Thread.java对应的就是Thread.c
      start0其实就是JVM_StartThread。此时查看源代码可以看到在jvm.h中找到了声明,jvm.cpp中有实现

    • vm.cpp

    • thread.cpp
      • 终于在这里调用了操作系统的线程启动,os::start_thread(thread);

Java多线程相关概念(1把锁 2个并(并发和并行))

  1. 并发:是在同一实体上的多个事件,是在同一台处理器上“同时”处理多个任务,同一时刻,其实是只有一个事件在发生。
  2. 并行:是在不同实体上的多个事件,是在多台处理器上同时处理多个任务,同一时刻,大家都真的在做事情,你做你的,我做我的

并发VS并行

3个程(进程 线程 管程)

  • 通过上面start线程的案例,其实进程线程都来源于操作系统。
    1. 进程:系统中运行的一个应用程序就是一个进程,每一个进程都有它自己的内存空间和系统资源。
    2. 线程:也被称为轻量级进程,在同一个进程内基本会有1一个或多个线程,是大多数操作系统进行调度的基本单元。
    3. 管程: Monitor(监视器),也就是我们平时说的锁,Monitor其实是一种同步机制,他的义务是保证(同一时间)只有一个线程可以访问被保护的数据和代码。JVM中同步是基于进入和退出监视器对象(Monitor,管程对象)来实现的,每个对象实例都会有一个Monitor对象,Monitor对象会和Java对象一同创建并销毁,它底层是由C++语言来实现的.

进程VS线程

进程是…,线程是…,进程和线程的最大不同在于进程基本上是独立的,而线程不一定,线程共享方法区和堆,线程私有栈、本地方法栈和程序计数器

用户线程和守护线程

Java线程分为用户线程和守护线程

  • 线程的daemon属性为
  • true表示是守护线程
  • false表示是用户线程。

用户线程

是系统的工作线程,它会完成这个程序需要完成的业务操作

守护线程

是一种特殊的线程,为其他线程服务的,在后台默默地完成一些系统性的服务,比如垃圾回收线程。

总结

点击查看代码
public class DaemonDemo
{
public static void main(String[] args)
{
    Thread t1 = new Thread(() -> {
        System.out.println(Thread.currentThread().getName()+"\t 开始运行,"+(Thread.currentThread().isDaemon() ? "守护线程":"用户线程"));
        while (true) {

        }
    }, "t1");
    //线程的daemon属性为true表示是守护线程,false表示是用户线程
    //---------------------------------------------
    t1.setDaemon(true);
    //-----------------------------------------------
    t1.start();
    //3秒钟后主线程再运行
    try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }

    System.out.println("----------main线程运行完毕");
}

}
  • 两种情况
    • 未加t1.setDaemon(true);,默认是用户线程,他会继续运行,所以灯亮着
    • 加了t1.setDaemon(true);是守护线程,当用户线程main方法结束后自动退出了

    • 守护线程作为一个服务线程,没有服务对象就没有必要继续运行了,如果用户线程全部结束了,意味着程序需要完成的业务操作已经结束了,系统可退出了。假如当系统只剩下守护线程的时候,java虚拟机会自动退出。
    • setDaemon(true)方法必须在start()之前设置,否则报IIIegalThreadStateException异常

CompletableFuture(异步实现)

Future和Callable接口

* Future接口(FutureTask实现类)定义了操作异步任务执行一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等。(异步:可以被叫停,可以被取消)
* 一句话:Future接口可以为主线程开一个分支任务,专门为主线程处理耗时和费力的复杂业务。
* eg.比如主线程让一个子线程去执行任务,子线程可能比较耗时,启动子线程开始执行任务后,主线程就去做其他事情了,过了一会才去获取子任务的执行结果。老师在上课,但是口渴,于是让班长这个线程去买水,自己可以继续上课,实现了异步任务。
* 有个目的:异步多线程任务执行且有返回结果,三个特点:多线程/有返回/异步任务(班长作为老师去买水作为新启动的异步多线程任务且买到水有结果返回)

FutureTask三功能合一:

  • 作为线程:实现了runnable接口
  • 异步处理:实现了future接口
  • 有返回值:构造注入了Callable,提供了Callable功能
点击查看代码

    /**
     * Creates a {@code FutureTask} that will, upon running, execute the
     * given {@code Callable}.
     *
     * @param  callable the callable task
     * @throws NullPointerException if the callable is null
     */
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

ForkJoinTask

在 JDK 1.7 , ForkJoin,并行执行任务!提高效率。大数据量!
大数据:Map Reduce (把大任务拆分为小任务)

就是在必要的情况下,将一个大任务,进行拆分(fork) 成若干个小任务(拆到给出的临界值为止),再将一个个的小任务运算的结果进行join汇总

工作窃取

工作窃取模式 (work-stealing):当执行新的任务时它可以将其拆分成 更小的任务执行,并将小任务加到线程队列中,当没有任务执行时,再从一个随机线程的队列中偷一个并把它放在自己的队列中
相对于一般的线程池实现 ,fork/join 框架的优势体现在对其中包含的任务的处理方式上,在一般的线程池中,如果一个线程正在执行的任务由于某些原因无法继续运行那么该线程会处于等待状态。而在fork/join 框架实现中,如果某个子问题由于等待另外一个子问题的完成而无法继续运行。那么处理该子问题的线程会主动寻找其他尚未运行的子问题(窃取过来)来执行,这种方式减少了线程的等待时间,提高了性能

ForkJoinTask实现类

  • RecursiveTask>:有返回值的递归任务
  • RecursiveAction:无返回值得递归事件

实现类Demo

点击查看代码
package com.kuang.forkjoin;

import java.util.concurrent.RecursiveTask;

/**
 * 求和计算的任务!
 * 3000   6000(ForkJoin)  9000(Stream并行流)
 * // 如何使用 forkjoin
 * // 1、forkjoinPool 通过它来执行
 * // 2、计算任务 forkjoinPool.execute(ForkJoinTask task)
 * // 3. 计算类要继承 ForkJoinTask
 */
public class ForkJoinDemo extends RecursiveTask<Long> {

    private Long start;  // 1
    private Long end;    // 1990900000

    // 临界值
    private Long temp = 10000L;

    public ForkJoinDemo(Long start, Long end) {
        this.start = start;
        this.end = end;
    }

    // 计算方法
    @Override
    protected Long compute() {
        if ((end-start)<temp){
            Long sum = 0L;
            for (Long i = start; i <= end; i++) {
                sum += i;
            }
            return sum;
        }else { // forkjoin 递归
            long middle = (start + end) / 2; // 中间值
            ForkJoinDemo task1 = new ForkJoinDemo(start, middle);
            task1.fork(); // 拆分任务,把任务压入线程队列
            ForkJoinDemo task2 = new ForkJoinDemo(middle+1, end);
            task2.fork(); // 拆分任务,把任务压入线程队列

            return task1.join() + task2.join();
        }
    }
}

###测试类Demo
点击查看代码
package com.kuang.forkjoin;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.DoubleStream;
import java.util.stream.IntStream;
import java.util.stream.LongStream;

/**
 * 同一个任务,别人效率高你几十倍!
 */
public class Test {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // test1(); // 12224
        // test2(); // 10038
        // test3(); // 153
    }

    // 普通程序员
    public static void test1(){
        Long sum = 0L;
        long start = System.currentTimeMillis();
        for (Long i = 1L; i <= 10_0000_0000; i++) {
            sum += i;
        }
        long end = System.currentTimeMillis();
        System.out.println("sum="+sum+" 时间:"+(end-start));
    }

    // 会使用ForkJoin
    public static void test2() throws ExecutionException, InterruptedException {
        long start = System.currentTimeMillis();

        ForkJoinPool forkJoinPool = new ForkJoinPool();
        ForkJoinTask<Long> task = new ForkJoinDemo(0L, 10_0000_0000L);
        ForkJoinTask<Long> submit = forkJoinPool.submit(task);// 提交任务
        Long sum = submit.get();

        long end = System.currentTimeMillis();

        System.out.println("sum="+sum+" 时间:"+(end-start));
    }

    public static void test3(){
        long start = System.currentTimeMillis();
        // Stream并行流 ()  (]
        long sum = LongStream.rangeClosed(0L, 10_0000_0000L).parallel().reduce(0, Long::sum);
        long end = System.currentTimeMillis();
        System.out.println("sum="+"时间:"+(end-start));
    }

}

CountDownLatch

用过 CountDownLatch 么?什么场景下用的?
CountDownLatch的作用就是 允许 count 个线程阻塞在一个地方,直至这count个线程的任务都执行完毕。之前在项目中,有一个使用多线程读取多个文件处理的场景,我用到了 。具体场景是下面这样的:
我们要读取处理 6 个文件,这 6 个任务都是没有执行顺序依赖的任务,但是我们需要返回给用户的时候将这几个文件的处理的结果进行统计整理。
为此我们定义了一个线程池和 count 为 6 的CountDownLatch对象 。使用线程池处理读取任务,每一个线程处理完之后就将 count-1,调用方法,直到所有文件读取完之后,才会接着执行后面的逻辑。
伪代码是下面这样的:

点击查看代码
public class CountDownLatchExample1 {
    // 处理文件的数量
    private static final int threadCount = 6;

    public static void main(String[] args) throws InterruptedException {
        // 创建一个具有固定线程数量的线程池对象(推荐使用构造方法创建)
        ExecutorService threadPool = Executors.newFixedThreadPool(10);
        final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
        for (int i = 0; i < threadCount; i++) {
            final int threadnum = i;
            threadPool.execute(() -> {
                try {
                    //处理文件的业务操作
                    //......
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    //表示一个文件已经被完成
                    countDownLatch.countDown();
                }

            });
        }
        countDownLatch.await();
        threadPool.shutdown();
        System.out.println("finish");
    }
}
###有没有可以改进的地方呢? 可以使用 CompletableFuture 类来改进!Java8 的 CompletableFuture 提供了很多对多线程友好的方法,使用它可以很方便地为我们编写多线程程序,什么异步、串行、并行或者等待所有线程执行完任务什么的都非常方便。
点击查看代码
CompletableFuture<Void> task1 =
    CompletableFuture.supplyAsync(()->{
        //自定义业务操作
    });
......
CompletableFuture<Void> task6 =
    CompletableFuture.supplyAsync(()->{
    //自定义业务操作
    });
......
CompletableFuture<Void> headerFuture=CompletableFuture.allOf(task1,.....,task6);

try {
    headerFuture.join();
} catch (Exception ex) {
    //......
}
System.out.println("all done. ");
上面的代码还可以接续优化,当任务过多的时候,把每一个 task 都列出来不太现实,可以考虑通过集合来添加任务。
点击查看代码
//文件夹位置
List<String> filePaths = Arrays.asList(...)
// 异步处理所有文件
List<CompletableFuture<String>> fileFutures = filePaths.stream()
    .map(filePath -> doSomeThing(filePath))
    .collect(Collectors.toList());
// 将他们合并起来
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
    fileFutures.toArray(new CompletableFuture[fileFutures.size()])
);

CompletableFuture

Bi函数式接口

BiConsumer

函数式接口 BiConsumer

点击查看代码
@FunctionalInterface
public interface BiConsumer<T, U> {

    /**
     * Performs this operation on the given arguments.
     *
     * @param t the first input argument
     * @param u the second input argument
     */
    void accept(T t, U u);

    /**
     * Returns a composed {@code BiConsumer} that performs, in sequence, this
     * operation followed by the {@code after} operation. If performing either
     * operation throws an exception, it is relayed to the caller of the
     * composed operation.  If performing this operation throws an exception,
     * the {@code after} operation will not be performed.
     *
     * @param after the operation to perform after this operation
     * @return a composed {@code BiConsumer} that performs in sequence this
     * operation followed by the {@code after} operation
     * @throws NullPointerException if {@code after} is null
     */
    default BiConsumer<T, U> andThen(BiConsumer<? super T, ? super U> after) {
        Objects.requireNonNull(after);

        return (l, r) -> {
            accept(l, r);
            after.accept(l, r);
        };
    }
}    public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action) {
        return uniWhenCompleteStage(null, action);
}
与普通的消费者接口不同的是,BiConsumer可以传入两个参数。 ####BiFunction
点击查看代码 ``` @FunctionalInterface public interface BiFunction<t, u,="" r=""> {
/**
 * Applies this function to the given arguments.
 *
 * @param t the first function argument
 * @param u the second function argument
 * @return the function result
 */
R apply(T t, U u);

/**
 * Returns a composed function that first applies this function to
 * its input, and then applies the {@code after} function to the result.
 * If evaluation of either function throws an exception, it is relayed to
 * the caller of the composed function.
 *
 * @param <V> the type of output of the {@code after} function, and of the
 *           composed function
 * @param after the function to apply after this function is applied
 * @return a composed function that first applies this function and then
 * applies the {@code after} function
 * @throws NullPointerException if after is null
 */
default <V> BiFunction<T, U, V> andThen(Function<? super R, ? extends V> after) {
    Objects.requireNonNull(after);
    return (T t, U u) -> after.apply(apply(t, u));
}

}

</details>
与普通的Function接口不同的是,BiFunction可以传入两个参数。

####BiPredicate

<details>
<summary>点击查看代码</summary>

@FunctionalInterface
public interface BiPredicate<T, U> {

/**
 * Evaluates this predicate on the given arguments.
 *
 * @param t the first input argument
 * @param u the second input argument
 * @return {@code true} if the input arguments match the predicate,
 * otherwise {@code false}
 */
boolean test(T t, U u);

/**
 * Returns a composed predicate that represents a short-circuiting logical
 * AND of this predicate and another.  When evaluating the composed
 * predicate, if this predicate is {@code false}, then the {@code other}
 * predicate is not evaluated.
 *
 * <p>Any exceptions thrown during evaluation of either predicate are relayed
 * to the caller; if evaluation of this predicate throws an exception, the
 * {@code other} predicate will not be evaluated.
 *
 * @param other a predicate that will be logically-ANDed with this
 *              predicate
 * @return a composed predicate that represents the short-circuiting logical
 * AND of this predicate and the {@code other} predicate
 * @throws NullPointerException if other is null
 */
default BiPredicate<T, U> and(BiPredicate<? super T, ? super U> other) {
    Objects.requireNonNull(other);
    return (T t, U u) -> test(t, u) && other.test(t, u);
}

/**
 * Returns a predicate that represents the logical negation of this
 * predicate.
 *
 * @return a predicate that represents the logical negation of this
 * predicate
 */
default BiPredicate<T, U> negate() {
    return (T t, U u) -> !test(t, u);
}

/**
 * Returns a composed predicate that represents a short-circuiting logical
 * OR of this predicate and another.  When evaluating the composed
 * predicate, if this predicate is {@code true}, then the {@code other}
 * predicate is not evaluated.
 *
 * <p>Any exceptions thrown during evaluation of either predicate are relayed
 * to the caller; if evaluation of this predicate throws an exception, the
 * {@code other} predicate will not be evaluated.
 *
 * @param other a predicate that will be logically-ORed with this
 *              predicate
 * @return a composed predicate that represents the short-circuiting logical
 * OR of this predicate and the {@code other} predicate
 * @throws NullPointerException if other is null
 */
default BiPredicate<T, U> or(BiPredicate<? super T, ? super U> other) {
    Objects.requireNonNull(other);
    return (T t, U u) -> test(t, u) || other.test(t, u);
}

}

</details>

与普通的断言接口不同的是,BiPredicate可以传入两个参数。
###CompletableFuture启动异步任务

####CompletableFuture创建方式:
<details>
<summary>点击查看代码</summary>

/**
 * Returns a new CompletableFuture that is asynchronously completed
 * by a task running in the {@link ForkJoinPool#commonPool()} after
 * it runs the given action.
 *
 * @param runnable the action to run before completing the
 * returned CompletableFuture
 * @return the new CompletableFuture
 */
public static CompletableFuture<Void> runAsync(Runnable runnable) {
    return asyncRunStage(asyncPool, runnable);
}

/**
 * Returns a new CompletableFuture that is asynchronously completed
 * by a task running in the given executor after it runs the given
 * action.
 *
 * @param runnable the action to run before completing the
 * returned CompletableFuture
 * @param executor the executor to use for asynchronous execution
 * @return the new CompletableFuture
 */
public static CompletableFuture<Void> runAsync(Runnable runnable,
                                               Executor executor) {
    return asyncRunStage(screenExecutor(executor), runnable);
}

/**
 * Returns a new CompletableFuture that is asynchronously completed
 * by a task running in the {@link ForkJoinPool#commonPool()} with
 * the value obtained by calling the given Supplier.
 *
 * @param supplier a function returning the value to be used
 * to complete the returned CompletableFuture
 * @param <U> the function's return type
 * @return the new CompletableFuture
 */
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
    return asyncSupplyStage(asyncPool, supplier);
}

/**
 * Returns a new CompletableFuture that is asynchronously completed
 * by a task running in the given executor with the value obtained
 * by calling the given Supplier.
 *
 * @param supplier a function returning the value to be used
 * to complete the returned CompletableFuture
 * @param executor the executor to use for asynchronous execution
 * @param <U> the function's return type
 * @return the new CompletableFuture
 */
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
                                                   Executor executor) {
    return asyncSupplyStage(screenExecutor(executor), supplier);
}
</details>
* runAsync方法不支持返回值。其中Executor指的是可以传入我们的线程池对象
* supplyAsync可以支持返回值。其中Executor指的是可以传入我们的线程池对象
####CompletableFuture回调方法:
#####whenComplete
<details>
<summary>点击查看代码</summary>

public CompletableFuture<T> whenComplete(
    BiConsumer<? super T, ? super Throwable> action) {
    return uniWhenCompleteStage(null, action);
}

public CompletableFuture<T> whenCompleteAsync(
    BiConsumer<? super T, ? super Throwable> action) {
    return uniWhenCompleteStage(asyncPool, action);
}

public CompletableFuture<T> whenCompleteAsync(
    BiConsumer<? super T, ? super Throwable> action, Executor executor) {
    return uniWhenCompleteStage(screenExecutor(executor), action);
}
</details>
**whenComplete可以处理正常的计算结果**
whenComplete和whenCompleteAsync的区别:
whenComplete:是当前线程执行当前任务,等待任务执行之后继续执行当前的whenComplete
whenCompleteAsync:是执行把whenCompleteAsync这个任务提交给线程池中的其他线程来进行执行。
方法不以Async结尾,意味着Action使用相同的线程执行
方法以Async结尾可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)

<details>
<summary>点击查看代码</summary>

package com.bilibili.juc.cf;

import java.util.concurrent.*;

/**

  • @auther zzyy

  • @create 2022-01-16 16:53
    */
    public class CompletableFutureUseDemo
    {
    public static void main(String[] args) throws ExecutionException, InterruptedException
    {

     ExecutorService threadPool = Executors.newFixedThreadPool(3);
    
     try
     {
         CompletableFuture.supplyAsync(() -> {
             System.out.println(Thread.currentThread().getName() + "----come in");
             int result = ThreadLocalRandom.current().nextInt(10);
             try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
             System.out.println("-----1秒钟后出结果:" + result);
             if(result > 2)
             {
                 int i=10/0;
             }
             return result;
         },threadPool).whenComplete((v,e) -> {
             if (e == null) {
                 System.out.println("-----计算完成,更新系统UpdateValue:"+v);
             }
         }).exceptionally(e -> {
             e.printStackTrace();
             System.out.println("异常情况:"+e.getCause()+"\t"+e.getMessage());
             return null;
         });
    
         System.out.println(Thread.currentThread().getName()+"线程先去忙其它任务");
     }catch (Exception e){
         e.printStackTrace();
     }finally {
         threadPool.shutdown();
     }
    
    
     //主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:暂停3秒钟线程
     //try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
    

    }

    private static void future1() throws InterruptedException, ExecutionException
    {
    CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
    System.out.println(Thread.currentThread().getName() + "----come in");
    int result = ThreadLocalRandom.current().nextInt(10);
    try {
    TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    System.out.println("-----1秒钟后出结果:" + result);
    return result;
    });

     System.out.println(Thread.currentThread().getName()+"线程先去忙其它任务");
    
     System.out.println(completableFuture.get());
    

    }
    }

</details>
#####exceptionally
<details>
<summary>点击查看代码</summary>

/**
 * Returns a new CompletableFuture that is completed when this
 * CompletableFuture completes, with the result of the given
 * function of the exception triggering this CompletableFuture's
 * completion when it completes exceptionally; otherwise, if this
 * CompletableFuture completes normally, then the returned
 * CompletableFuture also completes normally with the same value.
 * Note: More flexible versions of this functionality are
 * available using methods {@code whenComplete} and {@code handle}.
 *
 * @param fn the function to use to compute the value of the
 * returned CompletableFuture if this CompletableFuture completed
 * exceptionally
 * @return the new CompletableFuture
 */
public CompletableFuture<T> exceptionally(
    Function<Throwable, ? extends T> fn) {
    return uniExceptionallyStage(fn);
}
</details>
exceptionally处理异常情况。
#####handle
handle:whenComplete和exceptionally的结合版。方法执行后的处理,无论成功与失败都可处理
代码示例:
<details>
<summary>点击查看代码</summary>

	/**
	 * 方法执行完成后的处理
	 */
    CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
        System.out.println("当前线程:" + Thread.currentThread().getId());
        System.out.println("CompletableFuture...");
        return 10/1;
    }, service).handle((t,u)->{ // R apply(T t, U u);
        System.out.println("handle:");
        if (t != null){
            System.out.println("存在返回结果:" + t);
            return 8;
        }
        if (u != null){
            System.out.println("存在日常:" + u);
            return 9;
        }
        return 5;

    });
    Integer integer = completableFuture2.get();
    System.out.println(integer);
</details>
####CompletableFuture异步任务场景
#####线程串行化
![](https://www.icode9.com/i/l/?n=22&i=blog/2052693/202207/2052693-20220702220543745-77745372.png)
* thenRun:不能获取到上一步的执行结果,无返回值
* thenAcceptAsyne 能接受上—步结果,但是无返回值
*  thenApplyAsyne 能接受上—步结果,有返回值
我们即要能感知到上一步的执行结果,也要能有自己线程执行成功的自己的返回值
<details>
<summary>点击查看代码</summary>

我是B
// CompletableFuture completableFuture2 = CompletableFuture.supplyAsync(() -> {
// System.out.println("当前线程:" + Thread.currentThread().getId());
// System.out.println("CompletableFuture...");
// return 10;//拿到A的返回值
// }, service).thenApplyAsync((u)->{
// System.out.println("返回值" + u);
// System.out.println("任务2启动");
// return 5;//自己的返回值再返回出去
// });
// System.out.println(completableFuture2.get());
/*
* main....start....
当前线程:11
CompletableFuture...
返回值10
任务2启动
5
main....end....
* */

</details>
#####双线程均完成才能后续
![](https://www.icode9.com/i/l/?n=22&i=blog/2052693/202207/2052693-20220702220750618-1597148675.png)
![](https://www.icode9.com/i/l/?n=22&i=blog/2052693/202207/2052693-20220702220756429-2125070604.png)
* runAfterBothAsync 两人任务组合,不能得到前任务的结果和无返回值
* thenAcceptBothAsync 两人任务组合,能得到前任务的结果和无返回值
* thenCombineAsync 两人任务组合,能得到前任务的结果和有返回值
传入的参数:CompletionStage是什么?其实还是我们的CompletableFuture
![](https://www.icode9.com/i/l/?n=22&i=blog/2052693/202207/2052693-20220702221118441-955053338.png)
两个任务必须都完成,触发该Runnable参数指定的任务即当前lambda表达式的内容
![](https://www.icode9.com/i/l/?n=22&i=blog/2052693/202207/2052693-20220702221255897-296473484.png)
<details>
<summary>点击查看代码</summary>

public <U,V> CompletableFuture<V> thenCombineAsync(
    CompletionStage<? extends U> other,
    BiFunction<? super T,? super U,? extends V> fn, Executor executor) {
    return biApplyStage(screenExecutor(executor), other, fn);
}
</details>
示例代码
<details>
<summary>点击查看代码</summary>

// CompletableFuture completableFuture3 = CompletableFuture.supplyAsync(() -> {
// System.out.println("当前线程:" + Thread.currentThread().getId());
// System.out.println("任务1...");
// return 111;
// }, service);
// CompletableFuture completableFuture4 = CompletableFuture.supplyAsync(() -> {
// System.out.println("当前线程:" + Thread.currentThread().getId());
// System.out.println("任务2...");
// return 222;
// }, service);

// completableFuture3.runAfterBothAsync(completableFuture4,()->{
// System.out.println("任务3...");
// },service);
/*
* main....start....
main....end....
当前线程:11
任务1...
当前线程:12
任务2...
任务3...
* */

// completableFuture3.thenAcceptBothAsync(completableFuture4, (f1,f2) -> {
// System.out.println("任务3...");
// System.out.println("f1:" + f1 + ".f2:" + f2);
// }, service);
/*
* main....start....
main....end....
当前线程:11
任务1...
当前线程:12
任务2...
任务3...
f1:111.f2:222
* */

// CompletableFuture integerCompletableFuture = completableFuture3.thenCombineAsync(completableFuture4, (f1, f2) -> {
// System.out.println("任务3...");
// System.out.println("f1:" + f1 + ".f2:" + f2);
// return 3;
// }, service);
// System.out.println(integerCompletableFuture.get());
/*
* main....start....
当前线程:11
任务1...
当前线程:12
任务2...
任务3...
f1:111.f2:222
3
main....end....
* */

</details>
#####双线程完成其一就能后续
![](https://www.icode9.com/i/l/?n=22&i=blog/2052693/202207/2052693-20220702221403505-2041750249.png)
![](https://www.icode9.com/i/l/?n=22&i=blog/2052693/202207/2052693-20220702221453178-1544369176.png)
<details>
<summary>点击查看代码</summary>

// CompletableFuture completableFuture5 = CompletableFuture.supplyAsync(() -> {
// System.out.println("当前线程:" + Thread.currentThread().getId());
// System.out.println("任务1...");
// return 111;
// }, service);
//
// CompletableFuture completableFuture6 = CompletableFuture.supplyAsync(() -> {
// System.out.println("当前线程:" + Thread.currentThread().getId());
// try {
// Thread.sleep(2000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// System.out.println("任务2结束...");
// return 222;
// }, service);

// completableFuture5.runAfterEitherAsync(completableFuture6, () -> {
// System.out.println("任务3...");
// }, service);
/*
* main....start....
main....end....
当前线程:11
任务1...
当前线程:12
任务3...
任务2结束...
* */

// completableFuture5.acceptEitherAsync(completableFuture6, (f1) -> {
// System.out.println("f1:" + f1);
// System.out.println("任务3...");
// }, service);
/*
* main....start....
当前线程:11
任务1...
当前线程:12
main....end....
f1:111
任务3...
任务2...
* */

// CompletableFuture integerCompletableFuture = completableFuture5.applyToEitherAsync(completableFuture6, (f1) -> {
// System.out.println("f1:" + f1);
// System.out.println("任务3...");
// return 6;
// }, service);
// System.out.println(integerCompletableFuture.get());
/*
* main....start....
当前线程:11
任务1...
当前线程:12
f1:111
任务3...
6
main....end....
任务2结束...
* */

</details>
#####多任务组合
<details>
<summary>点击查看代码</summary>

/* ------------- Arbitrary-arity constructions -------------- */

/**
 * Returns a new CompletableFuture that is completed when all of
 * the given CompletableFutures complete.  If any of the given
 * CompletableFutures complete exceptionally, then the returned
 * CompletableFuture also does so, with a CompletionException
 * holding this exception as its cause.  Otherwise, the results,
 * if any, of the given CompletableFutures are not reflected in
 * the returned CompletableFuture, but may be obtained by
 * inspecting them individually. If no CompletableFutures are
 * provided, returns a CompletableFuture completed with the value
 * {@code null}.
 *
 * <p>Among the applications of this method is to await completion
 * of a set of independent CompletableFutures before continuing a
 * program, as in: {@code CompletableFuture.allOf(c1, c2,
 * c3).join();}.
 *
 * @param cfs the CompletableFutures
 * @return a new CompletableFuture that is completed when all of the
 * given CompletableFutures complete
 * @throws NullPointerException if the array or any of its elements are
 * {@code null}
 */
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
    return andTree(cfs, 0, cfs.length - 1);
}

/**
 * Returns a new CompletableFuture that is completed when any of
 * the given CompletableFutures complete, with the same result.
 * Otherwise, if it completed exceptionally, the returned
 * CompletableFuture also does so, with a CompletionException
 * holding this exception as its cause.  If no CompletableFutures
 * are provided, returns an incomplete CompletableFuture.
 *
 * @param cfs the CompletableFutures
 * @return a new CompletableFuture that is completed with the
 * result or exception of any of the given CompletableFutures when
 * one completes
 * @throws NullPointerException if the array or any of its elements are
 * {@code null}
 */
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
    return orTree(cfs, 0, cfs.length - 1);
}
</details>
1. allOf:等待所有任务完成
2. anyOf:只要一个任务完成 
<details>
<summary>点击查看代码</summary>

	CompletableFuture<String> img = CompletableFuture.supplyAsync(() -> {
		System.out.println("查询商品图片信息");
		return "1.jpg";
	},service);

	CompletableFuture<String> attr = CompletableFuture.supplyAsync(() -> {
		try {
			Thread.sleep(2000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		System.out.println("查询商品属性");
		return "麒麟990 5G  钛空银";
	},service);


	CompletableFuture<String> desc = CompletableFuture.supplyAsync(() -> {
		System.out.println("查询商品介绍");
		return "华为";
	},service);

	/**
	 * 等这三个都做完
	 */

	CompletableFuture<Void> allOf = CompletableFuture.allOf(img, attr, desc);
	allOf.join();

// System.out.println("main....end" + desc.get() + attr.get() + img.get());
// CompletableFuture anyOf = CompletableFuture.anyOf(img, attr, desc);
// anyOf.get();

	System.out.println("main....end" + img.get()+attr.get()+desc.get());

main....start
查询商品图片信息
查询商品介绍

这里卡2s

查询商品属性
main....end1.jpg麒麟990 5G 钛空银华为

</details>
####CompletableFuture+自定义线程池
#####MyThreadConfig
线程池配置
<details>
<summary>点击查看代码</summary>

/**

  • @Description: 本源码分享自 www.cx1314.cn 欢迎访问获取更多资源 线程池配置类
  • @Created: 程序源码论坛
  • @author: cx
  • @createTime: 2020-06-23 20:24
    **/

@EnableConfigurationProperties(ThreadPoolConfigProperties.class)
@Configuration
public class MyThreadConfig {

@Bean
public ThreadPoolExecutor threadPoolExecutor(ThreadPoolConfigProperties pool) {
    return new ThreadPoolExecutor(
            pool.getCoreSize(),
            pool.getMaxSize(),
            pool.getKeepAliveTime(),
            TimeUnit.SECONDS,
            new LinkedBlockingDeque<>(100000),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy()
    );
}

}

</details>
#####ThreadPoolConfigProperties
将配置参数抽取出来到application.properties
加上@Component,或者MyThreadConfig导入ThreadPoolConfigProperties.class的时候加上@EnableConfigurationProperties注解即可
<details>
<summary>点击查看代码</summary>

@ConfigurationProperties(prefix = "gulimall.thread")
// @Component
@Data
public class ThreadPoolConfigProperties {

private Integer coreSize;

private Integer maxSize;

private Integer keepAliveTime;

}

</details>
#####业务实现

<details>
<summary>点击查看代码</summary>

@Override
public SkuItemVo item(Long skuId) throws ExecutionException, InterruptedException {

    SkuItemVo skuItemVo = new SkuItemVo();

    CompletableFuture<SkuInfoEntity> infoFuture = CompletableFuture.supplyAsync(() -> {
        //1、sku基本信息的获取  pms_sku_info
        SkuInfoEntity info = this.getById(skuId);
        skuItemVo.setInfo(info);
        return info;
    }, executor);


    CompletableFuture<Void> saleAttrFuture = infoFuture.thenAcceptAsync((res) -> {
        //3、获取spu的销售属性组合
        List<SkuItemSaleAttrVo> saleAttrVos = skuSaleAttrValueService.getSaleAttrBySpuId(res.getSpuId());
        skuItemVo.setSaleAttr(saleAttrVos);
    }, executor);


    CompletableFuture<Void> descFuture = infoFuture.thenAcceptAsync((res) -> {
        //4、获取spu的介绍    pms_spu_info_desc
        SpuInfoDescEntity spuInfoDescEntity = spuInfoDescService.getById(res.getSpuId());
        skuItemVo.setDesc(spuInfoDescEntity);
    }, executor);


    CompletableFuture<Void> baseAttrFuture = infoFuture.thenAcceptAsync((res) -> {
        //5、获取spu的规格参数信息
        List<SpuItemAttrGroupVo> attrGroupVos = attrGroupService.getAttrGroupWithAttrsBySpuId(
            res.getSpuId(), res.getCatalogId());
        skuItemVo.setGroupAttrs(attrGroupVos);
    }, executor);


    // Long spuId = info.getSpuId();
    // Long catalogId = info.getCatalogId();

    //2、sku的图片信息    pms_sku_images
    CompletableFuture<Void> imageFuture = CompletableFuture.runAsync(() -> {
        List<SkuImagesEntity> imagesEntities = skuImagesService.getImagesBySkuId(skuId);
        skuItemVo.setImages(imagesEntities);
    }, executor);

    CompletableFuture<Void> seckillFuture = CompletableFuture.runAsync(() -> {
        //3、远程调用查询当前sku是否参与秒杀优惠活动
        R skuSeckilInfo = seckillFeignService.getSkuSeckilInfo(skuId);
        if (skuSeckilInfo.getCode() == 0) {
            //查询成功
            SeckillSkuVo seckilInfoData = skuSeckilInfo.getData("data", new TypeReference<SeckillSkuVo>() {
            });
            skuItemVo.setSeckillSkuVo(seckilInfoData);

            if (seckilInfoData != null) {
                long currentTime = System.currentTimeMillis();
                if (currentTime > seckilInfoData.getEndTime()) {
                    skuItemVo.setSeckillSkuVo(null);
                }
            }
        }
    }, executor);


    //等到所有任务都完成
    CompletableFuture.allOf(saleAttrFuture,descFuture,baseAttrFuture,imageFuture,seckillFuture).get();

    return skuItemVo;
}
</details>
#####application.properties如下
<details>
<summary>点击查看代码</summary>

spring.cache.type=redis

spring.cache.cache-names=qq,毫秒为单位

spring.cache.redis.time-to-live=3600000

如果指定了前缀就用我们指定的前缀,如果没有就默认使用缓存的名字作为前缀

spring.cache.redis.key-prefix=CACHE_

spring.cache.redis.use-key-prefix=true

是否缓存空值,防止缓存穿透

spring.cache.redis.cache-null-values=true

配置线程池

gulimall.thread.coreSize=20
gulimall.thread.maxSize=200
gulimall.thread.keepAliveTime=10

开启debug日志

logging.level.org.springframework.cloud.openfeign=debug
logging.level.org.springframework.cloud.sleuth=debug

服务追踪

spring.zipkin.base-url=http://192.168.18.80:9411/

关闭服务发现

spring.zipkin.discovery-client-enabled=false
spring.zipkin.sender.type=web

配置采样器

spring.sleuth.sampler.probability=1

</details>

标签:JUC,return,System,线程,println,CompletableFuture,out
来源: https://www.cnblogs.com/waacode/p/16438773.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有