ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

手动实现一个简易线程池记录

2022-01-02 19:02:13  阅读:126  来源: 互联网

标签:task 队列 lock 手动 queue 简易 任务 线程 public


**假设此时读者已经对于线程池的的七大参数配置,核心线程数最大线程数、阻塞队列拒绝策略已经有了一定的了解,那摩再看看这个代码可能会对你理解线程池的底层实现原理有一定的帮助

  1. 自定义线程池+工作线程实现

class ThreadPool{

    // 任务队列
   private BlockingQueue<Runnable> taskQueue;

     // 线程集合
   private  HashSet<Worker> workers = new HashSet<>();

    // 核心线程数
   private int coreSize;

   // 获取任务的超时时间
   private long timeout;

   private TimeUnit timeUnit;

    // 拒绝策略
   private RejectPolicy<Runnable> rejectPolicy;

    public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit,int queueCapcity,RejectPolicy<Runnable> rejectPolicy) {
        this.coreSize = coreSize;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
        this.taskQueue = new BlockingQueue<>(queueCapcity);
        this.rejectPolicy = rejectPolicy;
    }


    // 执行任务
    public void execute(Runnable task){
        // 当任务数没有超过coreSize时,直接交给worker对象执行
        // 如果任务数量超过coreSize时,加入任务队列暂存

        synchronized (workers){
            if (workers.size() < coreSize){
                Worker worker = new Worker(task);
                System.out.println("新增worker{},{}"+worker+"任务对象{}"+task);
                workers.add(worker);
                worker.start();
            }else {
                System.out.println("加入任务队列{}"+task);
                  // 说明此时线程池中的核心线程都在被使用,因此新进来的任务需要被加入到任务队列中
               // taskQueue.put(task);
                /** 等待又有一些等待策略
                 *   1) 死等
                 *   2) 带超时等待
                 *   3) 放弃任务执行
                 *   4) 抛出异常
                 *   5) 调用者自己执行任务
                 *   还有更多的选择
                 */
               taskQueue.tryPut(rejectPolicy,task);



            }
        }
    }




    class Worker extends Thread{

        private Runnable task;

        public Worker(Runnable task){
            this.task = task;
        }

        @Override
        public void run() {
            // 执行任务
            // 1) 当task不为空,执行任务
            // 2) 当task执行完毕,再接着从任务队列获取任务并执行(需要复用该线程)
//            while (task != null || (task = taskQueue.take()) != null){
            while (task != null || (task = taskQueue.poll(timeout,timeUnit)) != null){
                try {
                    System.out.println("正在执行..{}"+task);
                    task.run();
                } catch (Exception e){
                    e.printStackTrace();
                }finally {
                      // 代表当前的这一个任务执行完毕
                    task = null;
                }
            }

            synchronized (workers){
                System.out.println("work 被移除{}"+this);

                  // 任务执行完毕,从队列中移除
                workers.remove(this);
            }

        }
    }
}

2.阻塞队列配置

class BlockingQueue<T>{

        //1. 创建任务队列(Deque底层是一个双向链表,有两种实现,linkedlist和arrayDeque)
        // arrayDequeue效率要比linkedlist效率好一些
    private Deque<T> queue = new ArrayDeque<>();

         // 2. 锁,保证一个线程一次只能从一个队列中获取一个任务
    private ReentrantLock lock = new ReentrantLock();

          // 3. 生产者条件变量
    private Condition fullWaitSet = lock.newCondition();
          // 4. 消费者条件变量
    private Condition emptyWaitSet = lock.newCondition();

         //5.容量
    private int capity;

     // 设置容量
    public BlockingQueue(int capity) {
        this.capity = capity;
    }

    /**
     *  带超时的阻塞获取
     * @param timeout
     * @param unit
     * @return
     */
    public T poll(long timeout, TimeUnit unit){
        lock.lock();
        try{
             // 将timeout 统一转换为纳秒
            long nanos = unit.toNanos(timeout);
            while (queue.isEmpty()){
                try {
                    // 返回的是剩余时间
                    if (nanos <= 0){
                       return null;
                    }
                    nanos = emptyWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            // 获取到该队列的队首元素,获取到之后会移除该任务
            T t = queue.removeFirst();
            fullWaitSet.signal();
            return t;
        }finally {
            lock.unlock();
        }

    }

    /**
     * 阻塞获取,意思就是如果任务队列空了,就不能再继续往队列中获取任务了
     * 需要阻塞住
     * @return
     */
    public T take(){
        lock.lock();
        try{
            while (queue.isEmpty()){
                try {
                    emptyWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
              // 获取到该队列的队首元素,获取到之后会移除该任务
            T t = queue.removeFirst();
            fullWaitSet.signal();
            return t;
        }finally {
            lock.unlock();
        }
    }

      // 阻涉添加
    public void put(T task){
        lock.lock();
        try{
               // 当任务队列满了之后,新进来的任务就进入等待
            while (queue.size() == capity){
                try {
                    System.out.println("等待加入任务队列"+task);
                    fullWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            queue.addLast(task);
               // 唤醒
            emptyWaitSet.signal();
        }finally {
            lock.unlock();
        }


    }

    /**
     *带超时的阻塞添加
     * @return
     */
    public boolean offer(T task, long timeout, TimeUnit timeUnit){
        lock.lock();
        try{
              // 将时间统一转换为纳秒
            long nacos = timeUnit.toNanos(timeout);
             // 当任务队列满了之后,新进来的任务就进入等待
            while (queue.size() == capity){
                try {
                    System.out.println("等待加入任务队列"+task);
                    if (nacos <=0){
                        return false;
                    }
                       // 返回的是剩余需要等待的时间
                    nacos = fullWaitSet.awaitNanos(nacos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            queue.addLast(task);
            // 唤醒
            emptyWaitSet.signal();
            return true;
        }finally {
            lock.unlock();
        }


    }

    // 获取队列大小
    public int size(){
        lock.lock();
        try {
            return queue.size();
        }finally {
            lock.unlock();
        }
    }

    public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
        lock.lock();
        try {
         // 判断队列是否满了
           if (queue.size() == capity){
              rejectPolicy.reject(this,task);
           }else {  // 工作队列有空闲
               queue.addLast(task);
               emptyWaitSet.signal();
           }
        }finally {
            lock.unlock();
        }
    }
}

3.自定义聚聚策略

@FunctionalInterface  // 拒绝策略
interface RejectPolicy<T>{
  void reject(BlockingQueue<T> queue,T task);

}

4.主线程测试

public class TestPool1 {
    public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(2, 1000, TimeUnit.MILLISECONDS, 10,((queue, task) -> {
//            queue.put(task);  // 1.死等策略
             // 2.超时等待
            // queue.offer(task,1500,TimeUnit.MILLISECONDS);
            // 3.让调用者放弃任务执行
            //System.out.println("放弃{}"+task);
              // 4.让调用者抛出异常
//                throw new RuntimeException("任务执行失败"+task);

               // 5. 让调用者自己执行任务
                     task.run();  // 这其实就是主线程自己执行的任务

        }));
        for (int i = 0; i < 5; i++) {
            int j = i;
            threadPool.execute(()->{
                try {
                       // 此时休眠是模拟线程处理任务需要花费一些时间,这时候又有新任务进来
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(j);
            });
        }
    }
}

5.最后完整线程池结构代码,这里是在一些内部类的基础上编写的,重要理解设计和思想

package Thread.diyThreadPool;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class TestPool1 {
    public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(2, 1000, TimeUnit.MILLISECONDS, 10,((queue, task) -> {
//            queue.put(task);  // 1.死等策略
             // 2.超时等待
            // queue.offer(task,1500,TimeUnit.MILLISECONDS);
            // 3.让调用者放弃任务执行
            //System.out.println("放弃{}"+task);
              // 4.让调用者抛出异常
//                throw new RuntimeException("任务执行失败"+task);

               // 5. 让调用者自己执行任务
                     task.run();  // 这其实就是主线程自己执行的任务
        }));
        for (int i = 0; i < 5; i++) {
            int j = i;
            threadPool.execute(()->{
                try {
                       // 此时休眠是模拟线程处理任务需要花费一些时间,这时候又有新任务进来
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(j);
            });
        }
    }
}

@FunctionalInterface  // 拒绝策略
interface RejectPolicy<T>{
  void reject(BlockingQueue<T> queue,T task);

}

class ThreadPool{

    // 任务队列
   private BlockingQueue<Runnable> taskQueue;

     // 线程集合
   private  HashSet<Worker> workers = new HashSet<>();

    // 核心线程数
   private int coreSize;

   // 获取任务的超时时间
   private long timeout;

   private TimeUnit timeUnit;

    // 拒绝策略
   private RejectPolicy<Runnable> rejectPolicy;

    public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit,int queueCapcity,RejectPolicy<Runnable> rejectPolicy) {
        this.coreSize = coreSize;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
        this.taskQueue = new BlockingQueue<>(queueCapcity);
        this.rejectPolicy = rejectPolicy;
    }

    // 执行任务
    public void execute(Runnable task){
        // 当任务数没有超过coreSize时,直接交给worker对象执行
        // 如果任务数量超过coreSize时,加入任务队列暂存

        synchronized (workers){
            if (workers.size() < coreSize){
                Worker worker = new Worker(task);
                System.out.println("新增worker{},{}"+worker+"任务对象{}"+task);
                workers.add(worker);
                worker.start();
            }else {
                System.out.println("加入任务队列{}"+task);
                  // 说明此时线程池中的核心线程都在被使用,因此新进来的任务需要被加入到任务队列中
               // taskQueue.put(task);
                /** 等待又有一些等待策略
                 *   1) 死等
                 *   2) 带超时等待
                 *   3) 放弃任务执行
                 *   4) 抛出异常
                 *   5) 调用者自己执行任务
                 *   还有更多的选择
                 */
               taskQueue.tryPut(rejectPolicy,task);
            }
        }
    }
    
    class Worker extends Thread{

        private Runnable task;

        public Worker(Runnable task){
            this.task = task;
        }

        @Override
        public void run() {
            // 执行任务
            // 1) 当task不为空,执行任务
            // 2) 当task执行完毕,再接着从任务队列获取任务并执行(需要复用该线程)
//            while (task != null || (task = taskQueue.take()) != null){
            while (task != null || (task = taskQueue.poll(timeout,timeUnit)) != null){
                try {
                    System.out.println("正在执行..{}"+task);
                    task.run();
                } catch (Exception e){
                    e.printStackTrace();
                }finally {
                      // 代表当前的这一个任务执行完毕
                    task = null;
                }
            }

            synchronized (workers){
                System.out.println("work 被移除{}"+this);

                  // 任务执行完毕,从队列中移除
                workers.remove(this);
            }

        }
    }
}

class BlockingQueue<T>{

        //1. 创建任务队列(Deque底层是一个双向链表,有两种实现,linkedlist和arrayDeque)
        // arrayDequeue效率要比linkedlist效率好一些
    private Deque<T> queue = new ArrayDeque<>();

         // 2. 锁,保证一个线程一次只能从一个队列中获取一个任务
    private ReentrantLock lock = new ReentrantLock();

          // 3. 生产者条件变量
    private Condition fullWaitSet = lock.newCondition();
          // 4. 消费者条件变量
    private Condition emptyWaitSet = lock.newCondition();

         //5.容量
    private int capity;

     // 设置容量
    public BlockingQueue(int capity) {
        this.capity = capity;
    }

    /**
     *  带超时的阻塞获取
     * @param timeout
     * @param unit
     * @return
     */
    public T poll(long timeout, TimeUnit unit){
        lock.lock();
        try{
             // 将timeout 统一转换为纳秒
            long nanos = unit.toNanos(timeout);
            while (queue.isEmpty()){
                try {
                    // 返回的是剩余时间
                    if (nanos <= 0){
                       return null;
                    }
                    nanos = emptyWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            // 获取到该队列的队首元素,获取到之后会移除该任务
            T t = queue.removeFirst();
            fullWaitSet.signal();
            return t;
        }finally {
            lock.unlock();
        }

    }

    /**
     * 阻塞获取,意思就是如果任务队列空了,就不能再继续往队列中获取任务了
     * 需要阻塞住
     * @return
     */
    public T take(){
        lock.lock();
        try{
            while (queue.isEmpty()){
                try {
                    emptyWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
              // 获取到该队列的队首元素,获取到之后会移除该任务
            T t = queue.removeFirst();
            fullWaitSet.signal();
            return t;
        }finally {
            lock.unlock();
        }
    }

      // 阻涉添加
    public void put(T task){
        lock.lock();
        try{
               // 当任务队列满了之后,新进来的任务就进入等待
            while (queue.size() == capity){
                try {
                    System.out.println("等待加入任务队列"+task);
                    fullWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            queue.addLast(task);
               // 唤醒
            emptyWaitSet.signal();
        }finally {
            lock.unlock();
        }
    }

    /**
     *带超时的阻塞添加
     * @return
     */
    public boolean offer(T task, long timeout, TimeUnit timeUnit){
        lock.lock();
        try{
              // 将时间统一转换为纳秒
            long nacos = timeUnit.toNanos(timeout);
             // 当任务队列满了之后,新进来的任务就进入等待
            while (queue.size() == capity){
                try {
                    System.out.println("等待加入任务队列"+task);
                    if (nacos <=0){
                        return false;
                    }
                       // 返回的是剩余需要等待的时间
                    nacos = fullWaitSet.awaitNanos(nacos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            queue.addLast(task);
            // 唤醒
            emptyWaitSet.signal();
            return true;
        }finally {
            lock.unlock();
        }
    }

    // 获取队列大小
    public int size(){
        lock.lock();
        try {
            return queue.size();
        }finally {
            lock.unlock();
        }
    }

    public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
        lock.lock();
        try {
         // 判断队列是否满了
           if (queue.size() == capity){
              rejectPolicy.reject(this,task);
           }else {  // 工作队列有空闲
               queue.addLast(task);
               emptyWaitSet.signal();
           }
        }finally {
            lock.unlock();
        }
    }
}

标签:task,队列,lock,手动,queue,简易,任务,线程,public
来源: https://blog.csdn.net/qq_45243783/article/details/122279050

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有