ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

Java 基础6 - 线程

2022-07-26 23:00:36  阅读:131  来源: 互联网

标签:Java 队列 基础 任务 线程 new 等待 public


线程

实现线程的两种方式:继承 Thread,实现 Runnable 接口

有了 Thread 不就够了?通过继承Thread来实现线程虽然比较简单,但 Java 中每个类最多只能有一个父类,如果类已经有父类了,就不能再继承 Thread。

启动线程调 start 而不是 run,一个线程对象只能启动一次

线程的状态

  1. NEW:没有调用 start 的线程状态为 NEW。
  2. TERMINATED:线程运行结束后状态为 TERMINATED。
  3. RUNNABLE:调用 start 后线程在执行 run 方法且没有阻塞时状态为 RUNNABLE,不过,RUNNABLE 不代表 CPU 一定在执行该线程的代码,可能正在执行也可能在等待操作系统分配时间片,只是它没有在等待其他条件。
  4. BLOCKED、WAITING、TIMED_WAITING:都表示线程被阻塞了,在等待某些条件。

除了main线程外,至少还有一个负责垃圾回收的线程,这个线程就是 daemon 线程,在 main 线程结束的时候,垃圾回收线程也会退出。

Thread 几个静态方法:

  1. sleep 方法

    用该方法会让当前线程睡眠指定的时间,单位是毫秒。睡眠期间,该线程会让出 ** CPU(CPU可以去干其他事了),睡眠期间,线程可以被中断**,如果被中断,sleep 会抛出 InterruptedException 异常。

  2. yield 方法

    调用该方法,是告诉操作系统的调度器:现在不着急占用 CPU,可以先让其他线程运行。不过,这对调度器也仅仅是建议,调度器如何处理是不一定的,它可能完全忽略该调用。

  3. join 方法

    可以让调用join的线程等待该线程结束,join 实际上就是调用了 wait 方法。

共享内存及可能存在的问题

每个线程表示一条单独的执行流,有自己的程序计数器,有自己的栈,但线程之间可以共享内存,它们可以访问和操作相同的对象。当多条执行流执行相同的程序代码时,每条执行流都有单独的栈,方法中的参数和局部变量都有自己的一份。当多条执行流可以操作相同的变量时,可能会出现一些意料之外的结果,包括竞态条件内存可见性问题。

竞态条件

所谓竞态条件(race condition)是指,当多个线程访问和操作同一个对象时,最终执行结果与执行时序有关,可能正确也可能不正确。

10个线程同时对一个变量counter执行加一,可能每次结果都不一样,因为counter++这个操作不是原子操作,它分为三个步骤:

  1. 取counter的当前值
  2. 在当前值基础上加1
  3. 将新值重新赋值给counter。

如何解决这个问题:

  1. synchronized关键字
  2. 显式锁
  3. 原子变量

内存可见性

多个线程可以共享访问和操作相同的变量,但一个线程对一个共享变量的修改,另一个线程不一定马上就能看到,甚至永远也看不到。这就是内存可见性问题。在计算机系统中,除了内存,数据还会被缓存在 CPU 的寄存器以及各级缓存中,当访问一个变量时,可能直接从寄存器或 CPU 缓存中获取,而不一定到内存中去取,当修改一个变量时,也可能是先写到缓存中,稍后才会同步更新到内存中。

怎么解决:

  1. volatile 关键字
  2. synchronized 关键字
  3. 显式锁

Synchronized关键字

synchronized 可以用于修饰类的实例方法静态方法代码块

方法加了 synchronized 后,方法内的代码就变成了原子操作。

synchronized 实例方法实际保护的是同一个对象的方法调用,确保同时只能有一个线程执行。

synchronized 保护的是对象而非代码,只要访问的是同一个对象的 synchronized 方法,即使是不同的代码,也会被同步顺序访问。synchronized 方法不能防止非 synchronized 方法被同时执行,所以一般在保护变量时,需要在所有访问该变量的方法上加上 synchronized 。

  1. 实例方法

    synchronized 实例方法保护的是当前实例对象,即this, this 对象有一个锁和一个等待队列,锁只能被一个线程持有,其他试图获得同样锁的线程需要等待。

    执行synchronized实例方法的过程大致如下:

    1. 尝试获得锁,如果能够获得锁,继续下一步,否则加入等待队列,阻塞并等待唤醒
    2. 执行实例方法体代码
    3. 释放锁,如果等待队列上有等待的线程,从中取一个并唤醒,如果有多个等待的线程,唤醒哪一个是不一定的,不保证公平性。

    当前线程不能获得锁的时候,它会加入等待队列等待,线程的状态会变为 BLOCKED。

  2. 静态方法

    对静态方法,保护的是类对象。实际上,每个对象都有一个锁和一个等待队列,类对象也是。

  3. 代码块

    synchronized 括号里面的就是保护的对象,因为任意对象都有一个锁和等待队列,或者说,任何对象都可以作为锁对象。

几个特征

  1. 可重入性

    对同一个执行线程,它在获得了锁之后,在调用其他需要同样锁的代码时,可以直接调用。

    可重入是通过记录锁的持有线程和持有数量来实现的,当调用被 synchronized 保护的代码时,检查对象是否已被锁,如果是,再检查是否被当前线程锁定,如果是,增加持有数量,如果不是被当前线程锁定,才加入等待队列,当释放锁时,减少持有数量,当数量变为0时才释放整个锁。

  2. 内存可见性

    在释放锁时,所有写入都会写回内存,而获得锁后,都会从内存中读最新数据。

  3. 死锁

    应该尽量避免在持有一个锁的同时去申请另一个锁,如果确实需要多个锁,所有代码都应该按照相同的顺序去申请锁。

协作

多线程之间除了竞争访问同一个资源外,也经常需要相互协作,基本方式就是 wait/notify

Java 的根父类是 Object , Java 在 Object 类而非 Thread 类中定义了一些线程协作的基本方法,这些方法有两类,一类是 wait ,另一类是 notify 。

wait实际上做了什么?除了用于锁的等待队列,每个对象还有另一个等待队列,表示条件队列,该队列用于线程间的协作。调用wait就会把当前线程放到条件队列上并阻塞,表示当前线程执行不下去了,它需要等待一个条件,这个条件它自己改变不了,需要其他线程改变。

但调用wait时,线程会释放对象锁。

一个线程因为等待某个条件执行不下去,当这个条件改变之后就该调用 notify 方法了,notify 会从条件队列中选一个线程,将其从队列中移除并唤醒,选哪个是不确定的。而 notifyAll 会移除条件队列中所有的线程并全部唤醒。

调用notify会把在条件队列中等待的线程唤醒并从队列中移除,但它不会释放对象锁。

唤醒之后线程会重新尝试竞争获得锁:如果能够获得锁,线程状态变为RUNNABLE,并从wait调用中返回,否则,该线程加入对象锁等待队列,线程状态变为BLOCKED,只有在获得锁后才会从 wait 调用中返回。

线程从wait调用中返回后,不代表其等待的条件就一定成立,它需要重新检查其等待的条件,这也是在条件附近看到 while 而不是 if 的原因。

中断

停止一个线程的主要机制是中断,中断并不是强迫终止一个线程,它是一种协作机制,是给线程传递一个取消信号,但是由线程来决定如何以及何时退出。

每个线程都有一个标志位,表示该线程是否被中断了。

中断相关的方法:

public void interrupt(); // 中断线程
public boolean isInterrupted(); // 线程的中断标志位是否为true
public static boolean interrupted(); // 线程的中断标志位是否为true + 清空中断标志位

注意:interrupt方法不一定会真正“中断”线程,

不同状态对中断信号的反应

  1. RUNNABLE

    线程在运行或具备运行条件只是在等待操作系统调度。

  2. WAITING/TIMED_WAITING

    线程在等待某个条件或超时。线程调用join/wait/sleep方法会进入WAITING或TIMED_WAITING状态。调用interrupt()会使得该线程抛出InterruptedException。需要注意的是,抛出异常后,中断标志位会被清空,而不是被设置。InterruptedException是一个受检异常,线程必须进行处理。

  3. BLOCKED

    线程在等待锁,试图进入同步块。调用interrupt()只是会设置线程的中断标志位,线程依然会处于BLOCKED状态,也就是说,interrupt()并不能使一个在等待锁的线程真正“中断”。

    test方法在持有锁lock的情况下启动线程a,而线程a也去尝试获得锁lock,所以会进入锁等待队列,随后test调用线程a的interrupt方法并调用join等待线程线程a结束,线程a会结束吗?不会,interrupt方法只会设置线程的中断标志,而并不会使它从锁等待队列中出来。

        public static void test() throws InterruptedException {
            synchronized (lock) {
                A a = new A();
                a.start();
                Thread.sleep(1000);
                a.interrunpt();
                a.join();
            }
        }
    

    注意:在使用 synchronized 关键字获取锁的过程中不响应中断请求,这是 synchronized 的局限性。

  4. NEW/TERMINATE

    线程还未启动或已结束。调用 interrupt() 对它没有任何效果,中断标志位也不会被设置。

取消/关闭线程的正确方式

原子操作

CAS

原子操作依赖一个很重要的方法:

public final boolean compareAndSet(int expect, int update)

这个方法就被成为CAS。该方法有两个参数 expect 和 update ,以原子方式实现了如下功能:如果当前值等于 expect ,则更新为 update ,否则不更新,如果更新成功,返回 true,否则返回 false 。

以 AtomicInteger 为例,AtomicInteger 可以在程序中用作一个计数器,多个线程并发更新,也总能实现正确性。它的主要内部成员是:

public volatile int value; // 这个变量天生保证内存可见性

AtomicInteger 有个方法 incrementAndGet:

public final int incrementAndGet() {
    for(;;) {
        int current = get(); // 获取当前值value
        int next = current + 1; // 计算期望的值next
        // 调CAS方法进行更新,如果更新没有成功,说明value被别的线程改了,则再去取最新值并尝试更新直到成功为止。
        if(compareAndSet(current, next)) {
            return next;
        }
    }
}

与 synchronized 锁相比,这种原子更新方式代表一种不同的思维方式。synchronized 是悲观的,它假定更新很可能冲突,所以先获取锁,得到锁后才更新。原子变量的更新逻辑是乐观的,它假定冲突比较少,但使用 CAS 更新,也就是进行冲突检测,如果确实冲突了,那也没关系,继续尝试就好了。

AQS

AQS是一个抽象类AbstractQueuedSynchronizer。

AQS封装了一个状态,给子类提供了查询和设置状态的方法:

public volatile int state;
protected final int getState();
protected final void setState(int newState);
protected final boolean compareAndSetState(int expect, int update);

用于实现锁时,AQS 可以保存锁的当前持有线程,提供了方法进行查询和设置:

private transient Thread exclusiveOwnerThread;
protected final void setExclusiveOwnerThread(Thread t);
protected final Thread getExclusiveOwnerThread();

AQS内部维护了一个等待队列,借助 CAS 方法实现了无阻塞算法进行更新。

BUG

使用 CAS 方式更新有一个 ABA 问题。该问题是指,假设当前值为A,如果另一个线程先将 A 修改成 B ,再修改回成 A ,当前线程的 CAS 操作无法分辨当前值发生过变化。

ABA 是不是一个问题与程序的逻辑有关,一般不是问题。而如果确实有问题,解决方法是使用 AtomicStampedReference

显示锁

简介

显式锁接口和类主要有:

  1. 锁接口Lock,主要实现类是 ReentrantLock
  2. 读写锁接口 ReadWriteLock,主要实现类是 ReentrantReadWriteLock

Lock 接口定义为:

public interface Lock {
    void lock();
    void lockInterruptibly() throws InterruptedException;
    // 可以避免死锁。在持有一个锁获取另一个锁而获取不到的时候,可以释放已持有的锁,给其他线程获取锁的机会,然后重试获取所有锁。
    boolean tryLock();
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    void unlock();
    Condition newCondition();
}
  1. lock()/unlock():就是普通的获取锁和释放锁方法,lock()会阻塞直到成功。
  2. lockInterruptibly():与lock()的不同是,它可以响应中断。
  3. tryLock():只是尝试获取锁,立即返回,不阻塞
  4. tryLock(long time, TimeUnit unit):先尝试获取锁,如果能成功则立即返回true,否则阻塞等待
  5. newCondition:新建一个条件,一个Lock可以关联多个条件(见下文显式条件)

Lock接口的主要实现类是ReentrantLock,底层依赖CAS,AQS,ReentrantLock,它的基本用法lock/unlock实现了与synchronized一样的语义,包括:

  1. 可重入,一个线程在持有一个锁的前提下,可以继续获得该锁
  2. 可以解决竞态条件问题
  3. 可以保证内存可见性

相较于synchronized

ReentrantLock 和 synchronized 都是默认不保证公平。使用显式锁,一定要记得调用 unlock。

相比 synchronized , ReentrantLock 可以实现与 synchronized 相同的语义,而且支持以非阻塞方式获取锁,可以响应中断,可以限时,更为灵活。

synchronized代表一种声明式编程思维,程序员更多的是表达一种同步声明,由 Java 系统负责具体实现,程序员不知道其实现细节;显式锁代表一种命令式编程思维,程序员实现所有细节。

简单总结下,能用 synchronized 就用 synchronized,不满足要求时再考虑 ReentrantLock。

显式条件

显式锁与 synchronized 相对应,而显式条件与 wait/notify 相对应。wait/notify与synchronized配合使用,显式条件与显式锁配合使用。

Condition 表示条件变量,是一个接口,其中有 await、signal、signalAll 方法。

await 对应于 Object 的 wait , signal 对应于 notify, signalAll 对应于 notifyAll,语义也是一样的。

一般的 await 相关方法都是响应中断的,如果发生了中断,会抛出 InterruptedException,但中断标志位会被清空。awaitUnInterruptibly() 方法不会响应中断,它不会由于中断结束,但当它返回时,如果等待过程中发生了中断,中断标志位会被设置。

await在进入等待队列后,会释放锁,释放CPU,当其他线程将它唤醒后,或等待超时后,或发生中断异常后,它都需要重新获取锁,获取锁后,才会从 await 方法中退出。

示例:

    static class MyBlockQueue<E> {
        private Queue<E> queue = null;
        private int limit;
        private Lock lock = new ReentrantLock();

        private Condition notFull = lock.newCondition();
        private Condition notEmpty = lock.newCondition();

        public  MyBlockQueue(int limit) {
            this.limit = limit;
            // ArrayDeque是线程不安全的
            queue = new ArrayDeque<>();
        }

        private void put(E e) throws InterruptedException {
            lock.lockInterruptibly();
            try {
                // 队列满,在notFull等待,不让放
                while (queue.size() == limit) {
                    notFull.await();
                }
                queue.add(e);
                // 唤醒一下,现在不空了
                notEmpty.signal();
            } finally {
                lock.unlock();
            }
        }

        public E take() throws InterruptedException {
            lock.lockInterruptibly();
            try {
                // 队列空,在notEmpty等待,不让取
                while (queue.isEmpty()) {
                    notEmpty.await();
                }
                E e = queue.poll();
                // 唤醒一下,现在不满了
                notFull.signal();
                return e;
            } finally {
                lock.unlock();
            }
        }
    }

上述代码定义了两个等待条件:不满(notFull)、不空(notEmpty)。在put方法中,如果队列满,则在notFull上等待;在take方法中,如果队列空,则在notEmpty上等待。put操作后通知 notEmpty, take 操作后通知 notFull。这样,代码更清晰易读。

异步任务

基本接口:

  1. Runnable 和 Callable:表示要执行的异步任务。
  2. Executor 和 ExecutorService:表示执行服务。
  3. Future 表示异步任务的结果。

Runnable 没有返回结果,而 Callable 有,Runnable 不会抛出异常,而 Callable 会。

Executor 表示最简单的执行服务,可以执行一个 Runnable,没有返回结果。

ExecutorService 扩展了 Executor,其中的 submit 方法表示提交一个任务,返回值类型都是 Future,返回后,只是表示任务已提交,不代表已执行,通过Future可以查询异步任务的状态、获取最终结果、取消任务等。

Future中的 get 用于返回异步任务最终的结果,如果任务还未执行完成,会阻塞等待;cancel 用于取消异步任务,如果任务已完成、或已经取消、或由于某种原因不能取消, cancel 返回 false,否则返回true。isDone 和 isCancelled 用于查询任务状态。isCancelled 表示任务是否被取消,只要 cancel 方法返回了 true,随后的isCancelled 方法都会返回 true,即使执行任务的线程还未真正结束。isDone 表示任务是否结束,不管什么原因都算。

Future 是一个重要的概念,是实现“任务的提交”与“任务的执行”相分离的关键,任务提交者和任务执行服务通过它隔离各自的关注点,同时进行协作。

基本使用:

package AsyncTask;

import java.util.Random;
import java.util.concurrent.*;

public class AsyncTaskDemo {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newSingleThreadExecutor();

        Future<Integer> future = executorService.submit(new Task());
        System.out.println("这是主线程");
        Thread.sleep(100);
        try {
            System.out.println("任务结果" + future.get());
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        executorService.shutdown();
    }

    static class Task implements Callable<Integer> {
        @Override
        public Integer call() throws Exception {
            int sleepSeconds = new Random().nextInt(1000);
            System.out.println("子线程开始休眠");
            Thread.sleep(sleepSeconds);
            System.out.println("子线程休眠结束");
            return sleepSeconds;
        }
    }
}

示例中的大致步骤就是:

  1. 定义一个任务描述要做的事
  2. 创建ExecutorService实例
  3. ExecutorService实例提交任务
  4. 在别处获取任务结果
  5. 关闭ExecutorService

其中 ExecutorService 有两个关闭方法:ExecutorServicshutdown 和 shutdownNow。区别是,shutdown表示不再接受新任务,shutdownNow不仅不接受新任务,而且会终止已提交但尚未执行的任务,对于正在执行的任务,一般会调用线程的interrupt方法尝试中断,不过,线程可能不响应中断,shutdownNow会返回已提交但尚未执行的任务列表。shutdown 和 shutdownNow 不会阻塞等待,它们返回后不代表所有任务都已结束,调用者可以通过awaitTermination等待所有任务结束。

ExecutorService 有两组批量提交任务的方法:invokeAll 和 invokeAny。invokeAll 等待所有任务完成,返回的 Future 列表中,每个 Future 的 isDone 方法都返回true,不过 isDone 为true不代表任务就执行成功了,可能是被取消了。而对于 invokeAny,只要有一个任务在限时内成功返回了,它就会返回该任务的结果,其他任务会被取消

原理

好累啊不想写了

线程池

线程池主要由两个概念组成:一个是任务队列;另一个是工作者线程。工作者线程主体就是一个循环,循环从队列中接受任务并执行,任务队列保存待执行的任务。( JavaScript 中的异步实现也是类似的套路哦)

  1. 它可以重用线程,避免线程创建的开销。
  2. 任务过多时,通过排队避免创建过多线程,减少系统资源消耗和竞争,确保任务有序完成。

线程池的实现类是 ThreadPoolExecutor,它继承自 AbstractExecutorService ,实现了 ExecutorService ,基本用法与上节异步任务介绍的类似。

构造方法:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue);

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory);

第二个构造方法多了两个参数 threadFactory 和 handler,这两个参数一般不需要,第一个构造方法会设置默认值。参数 corePoolSize、maximumPoolSize、keepAliveTime、unit 用于控制线程池中线程的个数,workQueue 表示任务队列,threadFactory 用于对创建的线程进行一些配置,handler表示任务拒绝策略。

  1. 线程池大小

    1. corePoolSize:核心线程个数
    2. maximumPoolSize:最大线程个数
    3. keepAliveTime和unit:空闲线程存活时间

    一般情况下,有新任务到来的时候,如果当前线程个数小于 corePoolSize,就会创建一个新线程来执行该任务,需要说明的是,即使其他线程现在也是空闲的,也会创建新线程。不过,如果线程个数大于等于 corePoolSize,那就不会立即创建新线程了,它会先尝试排队,需要强调的是,它是“尝试”排队,而不是“阻塞等待”入队,如果队列满了或其他原因不能立即入队,它就不会排队,而是检查线程个数是否达到了 maximumPoolSize,如果没有,就会继续创建线程,直到线程数达到 maximumPoolSize。

    keepAliveTime 的目的是为了释放多余的线程资源,它表示,当线程池中的线程个数大于 corePoolSize 时额外空闲线程的存活时间。如果该值为 0 ,则表示所有线程都不会超时终止。

  2. 队列

    这里要求队列类型是阻塞队列 BlockingQueue。

    • LinkedBlockingQueue:基于链表的阻塞队列,可以指定最大长度,但默认是无界的。
    • ArrayBlockingQueue:基于数组的有界阻塞队列。
    • PriorityBlockingQueue:基于堆的无界阻塞优先级队列。
    • SynchronousQueue:没有实际存储空间的同步阻塞队列。

    注意:如果用的是无界队列,需要强调的是,线程个数最多只能达到 corePoolSize,到达 corePoolSize 后,新的任务总会排队,参数 maximumPoolSize 也就没有意义了。对于 SynchronousQueue,它没有实际存储元素的空间,当尝试排队时,只有正好有空闲线程在等待接受任务时,才会入队成功,否则,总是会创建新线程,直到达到 maximumPoolSize。

  3. 任务拒绝策略

    如果队列有界,且 maximumPoolSize 有限,则当队列排满,线程个数也达到了 maximumPoolSize,这时,新任务会触发线程池的任务拒绝策略。

    ThreadPoolExecuto r实现了4种处理方式。

    1. ThreadPoolExecutor.AbortPolicy:这就是默认的方式,抛出异常。
    2. ThreadPoolExecutor.DiscardPolicy:静默处理,忽略新任务,不抛出异常,也不执行。
    3. ThreadPoolExecutor.DiscardOldestPolicy:将等待时间最长的任务扔掉,然后自己排队。
    4. ThreadPoolExecutor.CallerRunsPolicy:在任务提交者线程中执行任务,而不是交给线程池中的线程执行。

    拒绝策略可以在构造方法中进行指定,也可以通过 set 方法进行指定

  4. 工厂

    线程池还可以接受一个参数:ThreadFactory。它是一个接口,由这个接口l来定义如何创建一个 Thread。

  5. 核心线程

    线程个数小于等于 corePoolSiz e时,我们称这些线程为核心线程,默认情况下:

    • 核心线程不会预先创建,只有当有任务时才会创建。
    • 核心线程不会因为空闲而被终止,keepAliveTime 参数不适用。不过,ThreadPoolExecutor 可以调用方法可以改变这个默认行为。
  6. 死锁

    提交给线程池的任务之间有如果依赖,这种情况可能会导致出现死锁。这个死锁不是说共享资源竞争的死锁,而是单纯的等待,比如任务A,在它的执行过程中,它给同样的任务执行服务提交了一个任务B,但需要等待任务B结束。

    解决办法:可以使用 newCachedThreadPool 创建线程池,让线程数不受限制。另一个解决方法是使用 SynchronousQueue,它可以避免死锁,怎么做到的呢?对于普通队列,入队只是把任务放到了队列中,而对于 SynchronousQueue 来说,入队成功就意味着已有线程接受处理,如果入队失败,可以创建更多线程直到 maximumPoolSize,如果达到了 maximumPoolSize,会触发拒绝机制,不管怎么样,都不会死锁。

定时任务

TimerTask

示例

package AsyncTask;

import java.util.Timer;
import java.util.TimerTask;

public class TimerDemo {
    public static void main(String[] args) throws InterruptedException {
        Timer timer = new Timer();
        timer.schedule(new DelayTask(), 10);
        // 延迟指定时间后以固定时延执行
        timer.schedule(new DelayTask2(), 100, 1000);
        Thread.sleep(4000);
        timer.cancel();
    }

    static class DelayTask extends TimerTask {
        @Override
        public void run() {
            System.out.println("延迟1任务执行");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }
    static class DelayTask2 extends TimerTask {
        @Override
        public void run() {
            System.out.println("延迟2任务执行");
        }
    }
}

创建一个 Timer 对象,先运行 DelayTask,再固定周期运行 DelayTask2,最后调用 Timer 的 cancel 方法取消所有定时任务。

这里会发现 DelayTask2 总是等 DelayTas k执行之后才开始输出,因为一个 Timer 对象只有一个 Timer 线程在执行,所以 DelayTask2 被 DelayTask 给强行延迟了。

注意:任务的延迟执行分为固定延时(fixed-delay)与固定频率(fixed-rate),二者都是重复执行,但后一次任务执行相对的时间是不一样的,对于固定延时,它是基于上次任务的“实际”执行时间来算的,如果由于某种原因,上次任务延时了,则本次任务也会延时,而固定频率会尽量补够运行次数。

基本原理

Timer 内部主要由任务队列Timer 线程两部分组成,一个 Timer 对象只有一个 Timer 线程。任务队列是一个基于堆实现的优先级队列,按照下次执行的时间排优先级。Timer 线程主体是一个循环,从队列中获取任务,如果队列中有任务且计划执行时间小于等于当前时间,就执行它,如果队列中没有任务或第一个任务延时还没到,就睡眠。

在执行任何一个任务的 run 方法时,一旦 run 抛出异常,Timer 线程就会退出,从而所有定时任务都会被取消。

如果希望各个定时任务不互相干扰,一定要在 run 方法内捕获所有异常

总之需要注意:

  1. 后台只有一个线程在运行
  2. 固定频率的任务被延迟后,可能会立即执行多次,将次数补够
  3. 固定延时任务的延时相对的是任务执行前的时间
  4. 不要在定时任务中使用无限循环
  5. 一个定时任务的未处理异常会导致所有定时任务被取消

ScheduledExecutorService

由于 Timer/TimerTask 的一些问题,Java 并发包引入了 ScheduledExecutorService。ScheduledExecutorService 的主要实现类是ScheduledThreadPoolExecutor,它是线程池 ThreadPoolExecutor 的子类,是基于线程池实现的。它的任务队列是一个无界的优先级队列,所以最大线程数对它没有作用,即使 corePoolSize 设为 0,它也会至少运行一个线程。

与 Timer 不同,它不支持以绝对时间作为首次运行的时间。另外,单个定时任务的异常不会再导致全部定时任务被取消,即使后台只有一个线程执行任务。不过,需要强调的是,任务发生异常不会在任何地方体现,也就是说在 run 方法里 throw 了之后什么也看不见。所以,与 Timer 中的任务类似,应该捕获所有异常。

package AsyncTask;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ScheduledExecutorServiceDemo {
    public static void main(String[] args) throws InterruptedException {
        ScheduledExecutorService timer = Executors.newScheduledThreadPool(10);
        timer.schedule(new LongRunTask(), 10, TimeUnit.MILLISECONDS);
        timer.scheduleWithFixedDelay(new FixedDelayTask(), 100, 1000, TimeUnit.MILLISECONDS);
        Thread.sleep(4000);
        timer.shutdown();
    }

    static class LongRunTask implements Runnable {

        @Override
        public void run() {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("LongRunTask");

            throw new RuntimeException();
        }
    }

    static class FixedDelayTask implements Runnable {

        @Override
        public void run() {
            System.out.println("FixedDelayTask");
        }
    }
}

原理

ScheduledThreadPoolExecutor 的实现思路与 Timer 基本是类似的,都有一个基于堆的优先级队列,保存待执行的定时任务,它的主要不同是:

  • 它的背后是线程池,可以有多个线程执行任务。
  • 它在任务执行后再设置下次执行的时间,对于固定延时的任务更为合理。
  • 任务执行线程会捕获任务执行过程中的所有异常,一个定时任务的异常不会影响其他定时任务,不过,发生异常的任务(即使是一个重复任务)不会再被调度。

工具类

读写锁ReentrantReadWriteLock

synchronized 和显式锁 ReentrantLock,对于同一受保护对象的访问,无论是读还是写,它们都要求获得相同的锁。在一些场景中,这是没有必要的,多个线程的读操作完全可以并行,在读多写少的场景中,让读操作并行可以明显提高性能。

通过一个 ReadWriteLock 产生两个锁:一个读锁,一个写锁。读操作使用读锁,写操作使用写锁。需要注意的是,只有“读-读”操作是可以并行的,“读-写”和“写-写”都不可以。

内部,它们使用同一个整数变量表示锁的状态,16 位给读锁用,16 位给写锁用,使用一个变量便于进行 CAS 操作,锁的等待队列其实也只有一个。写锁的获取,就是确保当前没有其他线程持有任何锁,否则就等待。写锁释放后,也就是将等待队列中的第一个线程唤醒,唤醒的可能是等待读锁的,也可能是等待写锁的。读锁的获取不太一样,首先,只要写锁没有被持有,就可以获取到读锁,此外,在获取到读锁后,它会检查等待队列,逐个唤醒最前面的等待读锁的线程,直到第一个等待写锁的线程。如果有其他线程持有写锁,获取读锁会等待。读锁释放后,检查读锁和写锁数是否都变为了 0,如果是,唤醒等待队列中的下一个线程。

package AsyncTask;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class MyCache {
    private Map<String, Object> map = new HashMap<>();
    private ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    private Lock readLock = readWriteLock.readLock();
    private Lock writeLock = readWriteLock.writeLock();

    public Object get(String key) {
        readLock.lock();
        try {
            return map.get(key);
        } finally {
            readLock.unlock();
        }
    }

    public Object put(String key, Object value) {
        writeLock.lock();
        try {
            return map.put(key, value);
        } finally {
            writeLock.unlock();
        }
    }

    public void clear() {
        writeLock.lock();
        try {
            map.clear();
        } finally {
            writeLock.unlock();
        }
    }
}

信号量Semaphore

有的单个资源即使可以被并发访问,但并发访问数多了可能影响性能,所以希望限制并发访问的线程数。

一般锁只能由持有锁的线程释放,而 Semaphore 表示的只是一个许可数,任意线程都可以调用其 release 方法。主要的锁实现类 ReentrantLock 是可重入的,而 Semaphore 不是,每一次的 acquire 调用都会消耗一个许可,acquire 是会阻塞的。

package AsyncTask;

import java.util.concurrent.Semaphore;

public class SemaphoreDemo {
    public static class ConcurrentLimitException extends RuntimeException {
        private static final long serialVersionUID = 1L;
    }

    private static final int MAX_PERMITS = 10;

    private Semaphore permits = new Semaphore(MAX_PERMITS);

    public boolean login(String name, String pwd) {
        if(!permits.tryAcquire()) {
            throw new ConcurrentLimitException();
        }
        // TODO 校验密码
        return true;
    }

    public void logout(String name) {
        // TODO 登出操作
        permits.release();
    }
}

倒计时门栓CountDownLatch

门栓的两种应用场景:一种是同时开始,另一种是主从协作。

同时开始场景中,运行员线程等待主裁判线程发出开始指令的信号,一旦发出后,所有运动员线程同时开始,计数初始为1,运动员线程调用 await,主线程调用 countDown

主从协作模式中,主线程依赖工作线程的结果,需要等待工作线程结束,这时,计数初始值为工作线程的个数,工作线程结束后调用 countDown,主线程调用 await 进行等待。

package AsyncTask;

import java.util.concurrent.CountDownLatch;

// 同时开始场景
public class RacerWithCountDwnLatch {
    static class Racer extends Thread {
        CountDownLatch latch;
        public Racer(CountDownLatch latch) {
            this.latch = latch;
        }

        @Override
        public void run() {
            try {
                latch.await(); // 没有countDown信号就会卡在这
                System.out.println(getName() + "开始正式运行" + System.currentTimeMillis());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        int num = 10;
        CountDownLatch latch = new CountDownLatch(1);

        Thread[] racers = new Thread[num];

        for(int i = 0; i < 10; i++) {
            racers[i] = new Racer(latch);
            racers[i].start();
        }

        Thread.sleep(1000);
        // 发信号让线程一起开始动作
        latch.countDown();
    }
}

package AsyncTask;


import java.util.concurrent.CountDownLatch;

// 主从协作场景
public class MasterWorkerDemo {
    static class Worker extends Thread {
        CountDownLatch latch;
        public Worker(CountDownLatch latch) {
            this.latch = latch;
        }

        @Override
        public void run() {
            try {
                int sleepTime = (int) (Math.random() * 10);
                System.out.println(sleepTime);
                Thread.sleep(sleepTime);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                latch.countDown();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        int num = 10;
        CountDownLatch latch = new CountDownLatch(num);

        Worker[] workers = new Worker[num];
        for(int i = 0; i < num; i++) {
            workers[i] = new Worker(latch);
            workers[i].start();
        }

        latch.await();
        System.out.println("全部结束");
    }
}

循环栅栏CyclicBarrier

CyclicBarrier 特别适用于并行迭代计算,每个线程负责一部分计算,然后在栅栏处等待其他线程完成,所有线程到齐后,交换数据和计算结果,再进行下一次迭代。

与 CountDownLatch 类似,它也有一个数字,但表示的是参与的线程个数。

它有一个构造方法,接受一个 Runnable 参数,这个参数表示栅栏动作,当所有线程到达栅栏后,在所有线程执行下一步动作前,运行参数中的动作,这个动作由最后一个到达栅栏的线程执行。

CyclicBarrier 的主要方法就是 await,await 在等待其他线程到达栅栏,调用 await 后,表示自己已经到达,如果自己是最后一个到达的,就执行可选的命令,执行后,唤醒所有等待的线程,然后重置内部的同步计数,以循环使用。

package AsyncTask;

import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierDemo {
    static class Tourist extends Thread {
        CyclicBarrier barrier;
        public Tourist(CyclicBarrier barrier) {
            this.barrier = barrier;
        }

        @Override
        public void run() {
            try {
                Thread.sleep((int) (Math.random() * 10));

                // 第一次集合
                barrier.await();
                System.out.println(getName() + "继续" + System.currentTimeMillis());
                Thread.sleep((int) (Math.random() * 10));

                // 第二次集合
                barrier.await();
                System.out.println(getName() + "继续" + System.currentTimeMillis());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        int num = 3;
        Tourist[] tourists = new Tourist[num];
        CyclicBarrier barrier = new CyclicBarrier(num, new Runnable() {
            @Override
            public void run() {
                System.out.println("全部集合了" + System.currentTimeMillis() + " 最后执行者:" + Thread.currentThread().getName());
            }
        });

        for (int p = 0; p < num; p++) {
            tourists[p] = new Tourist(barrier);
            tourists[p].start();
        }

    }
}

ThreadLocal

线程本地变量是说,每个线程都有同一个变量的独有拷贝。

多个线程访问的虽然是同一个变量,但每个线程都有自己的独立的值,这就是线程本地变量的含义。

使用场景:日期处理、随机数和上下文信息。

  1. 日期处理

    package Threads.ThreadLocal;
    
    
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    /**
     * 每个线程使用自己的DateFormat,就不存在安全问题了,在线程的整个使用过程中,只需要创建一次,又避免了频繁创建的开销
     */
    public class ThreadLocalDateFormat {
        static ThreadLocal<SimpleDateFormat> sdf = new ThreadLocal<SimpleDateFormat>() {
            @Override
            protected SimpleDateFormat initialValue() {
                return new SimpleDateFormat("yyyy-MM-dd");
            }
        };
    
        public static String date2String(Date date) {
            return sdf.get().format(date);
        }
    
        public static Date string2Date(String str) throws ParseException {
            return sdf.get().parse(str);
        }
    
    }
    
  2. 随机数

    即使对象是线程安全的,使用 ThreadLocal 也可以减少竞争,它是 Random 的子类,利用了 ThreadLocal,它没有 public 的构造方法,通过静态方法current 获取对象,这个对象就是个就是一个 ThreadLocal 变量。

  3. 上下文信息

    package Threads.ThreadLocal;
    
    public class ReqContext {
        public static class Req {};
    
        private static ThreadLocal<String> localUserId = new ThreadLocal<>();
        private static ThreadLocal<Req> localReq = new ThreadLocal<>();
    
        public static String getCurrentUserId() {
            return localUserId.get();
        }
    
        public static void setCurrentUserId(String userId) {
            localUserId.set(userId);
        }
    
        public static Req getCurrentReq() {
            return localReq.get();
        }
    
        public static void setCurrentReq(Req req) {
            localReq.set(req);
        }
    }
    

    在一个 Web 服务器中,一个线程执行用户的请求,在执行过程中,很多代码都会访问一些共同的信息,比如请求信息、用户身份信息,它们是线程执行过程中的全局信息,在首次获取到信息时,调用 set 方法如 setCurrentRequest/setCurrentUserId 进行设置,然后就可以在代码的任意其他地方调用 get 相关方法进行获取了。

原理

每个线程都有一个 Map,类型为 ThreadLocalMap ,调用set实际上是在线程自己的Map里设置了一个条目,键为当前的 ThreadLocal 对象,值为 value。

每个线程都有一个 Map,对于每个 ThreadLocal 对象,调用其get/set实际上就是以 ThreadLocal 对象为键读写当前线程的 Map,这样,就实现了每个线程都有自己的独立副本的效果。

小结:

本章介绍了 Java 一些同步协作工具:

  1. 在读多写少的场景中使用 ReentrantReadWriteLock 替代 ReentrantLock,以提高性能。
  2. 使用 Semaphore 限制对资源的并发访问数。
  3. 使用 CountDownLatch 实现不同角色线程间的同步。
  4. 使用 CyclicBarrier 实现同一角色线程间的协调一致。
  5. CyclicBarrier 与 CountDownLatch 可能容易混淆,强调下它们的区别:
    • CountDownLatch 的参与线程是有不同角色的,有的负责倒计时,有的在等待倒计时变为 0,负责倒计时和等待倒计时的线程都可以有多个,用于不同角色线程间的同步。
    • CyclicBarrier 的参与线程角色是一样的,用于同一角色线程间的协调一致。
    • CountDownLatch 是一次性的,而 CyclicBarrier 是可以重复利用的。

标签:Java,队列,基础,任务,线程,new,等待,public
来源: https://www.cnblogs.com/nyfblog/p/16523010.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有