ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

java中多线程之Future入门

2021-07-03 15:34:48  阅读:227  来源: 互联网

标签:java int InterruptedException private Future static 多线程 public


前言

Future可以看做一个异步的计算结果的票据,类似我们排队过程中获取的号,后面根据这个号去操作。

简单使用

Future需要配合Callable接口和线程池使用

@FunctionalInterface
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

Callable就是有返回值的Runnable。

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class Client {

  public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService executorService = Executors.newFixedThreadPool(5);
    Future<Integer> future = executorService.submit(() -> {
      System.out.println("start execute");
      try {
        Thread.sleep(2_000);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      System.out.println("end execute");
      return 5;
    });
    executorService.shutdown();
    System.out.println(future.get());
  }

}

创建一个线程池,提交一个Callable,返回一个Future,get()方法会使当前线程阻塞,直到Callable执行结束。

原理分析

public abstract interface Future<V> {
 public abstract boolean cancel(boolean paramBoolean);
 public abstract boolean isCancelled();
 public abstract boolean isDone();
 public abstract V get() throws InterruptedException, ExecutionException;
 public abstract V get(long paramLong, TimeUnit paramTimeUnit)
 throws InterruptedException, ExecutionException, TimeoutException; 
}

Future在等待结果过程中可以取消,但不一定取消成功,因为任务可能已经完成。
java提供了一个Future的实现类FutureTask

/**
 *
 * state可能的状态转变路径如下:
 * NEW -> COMPLETING -> NORMAL
 * NEW -> COMPLETING -> EXCEPTIONAL
 * NEW -> CANCELLED
 * NEW -> INTERRUPTING -> INTERRUPTED
 */
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;

内部维护一个运行状态,刚开始为NEW,正常结束为NORMAL,异常结束为EXCEPTIONAL

/** The underlying callable; nulled out after running */
    private Callable<V> callable;
    /** The result to return or exception to throw from get() */
    private Object outcome; // non-volatile, protected by state reads/writes
    /** The thread running the callable; CASed during run() */
    private volatile Thread runner;
    /** Treiber stack of waiting threads */
    private volatile WaitNode waiters;

使用一个Treiber栈来存储多个阻塞线程,可以简单看做一个线程安全且高效的栈。

public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }
/**
     * Awaits completion or aborts on interrupt or timeout.
     *
     * @param timed true if use timed waits
     * @param nanos time to wait, if timed
     * @return state upon completion or at timeout
     */
    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        long startTime = 0L;    // Special value 0L means not yet parked
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING)
                // We may have already promised (via isDone) that we are done
                // so never return empty-handed or throw InterruptedException
                Thread.yield();
            else if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }
            else if (q == null) {
                if (timed && nanos <= 0L)
                    return s;
                q = new WaitNode();
            }
            else if (!queued)
                queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);
            else if (timed) {
                final long parkNanos;
                if (startTime == 0L) { // first time
                    startTime = System.nanoTime();
                    if (startTime == 0L)
                        startTime = 1L;
                    parkNanos = nanos;
                } else {
                    long elapsed = System.nanoTime() - startTime;
                    if (elapsed >= nanos) {
                        removeWaiter(q);
                        return state;
                    }
                    parkNanos = nanos - elapsed;
                }
                // nanoTime may be slow; recheck before parking
                if (state < COMPLETING)
                    LockSupport.parkNanos(this, parkNanos);
            }
            else
                LockSupport.park(this);
        }
    }

调用get方法时,如果任务未完成,将当前线程加入等待队列并阻塞。使用CAS无锁技术来修改内部状态和等待队列。更多原理请参考 FutureTask源码分析

参考

FutureTask源码分析
FutureTask中Treiber堆的实现
理解与使用Treiber Stack

标签:java,int,InterruptedException,private,Future,static,多线程,public
来源: https://www.cnblogs.com/strongmore/p/14966439.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有