ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

JFrame(用C++11特性重构系列——线程池)

2022-05-15 10:33:15  阅读:152  来源: 互联网

标签:11 std JFrame return void queue 线程 include


线程池的基本思想:有一个异步任务队列,任何地方都可以往此队列中加任务,其中任务就是一个个待执行的函数,然后还有一个线程池,线程池中有一定数量的线程,线程池一经启动,每个运行的线程都会从这个异步队列中不断取出任务并执行!

SyncQueue

  1 #ifndef SYNCQUEUE_H
  2 #define SYNCQUEUE_H
  3 
  4 #include<list>
  5 #include<mutex>
  6 #include<thread>
  7 #include<condition_variable>
  8 #include <iostream>
  9 using namespace std;
 10 
 11 template<typename T>
 12 class SyncQueue
 13 {
 14 public:
 15     SyncQueue(int maxSize) :m_maxSize(maxSize), m_needStop(false)
 16     {
 17     }
 18 
 19     void Put(const T&x)
 20     {
 21         Add(x);
 22     }
 23 
 24     void Put(T&&x)
 25     {
 26         Add(std::forward<T>(x));
 27     }
 28 
 29     void Take(std::list<T>& list)
 30     {
 31         std::unique_lock<std::mutex> locker(m_mutex);
 32         m_notEmpty.wait(locker, [this]{return m_needStop || NotEmpty(); });
 33 
 34         if (m_needStop)
 35             return;
 36         list = std::move(m_queue);
 37         m_notFull.notify_one();
 38     }
 39 
 40     void Take(T& t)
 41     {
 42         std::unique_lock<std::mutex> locker(m_mutex);
 43         m_notEmpty.wait(locker, [this]{return m_needStop || NotEmpty(); });
 44 
 45         if (m_needStop)
 46             return;
 47         t = m_queue.front();
 48         m_queue.pop_front();
 49         m_notFull.notify_one();
 50     }
 51 
 52     void Stop()
 53     {
 54         {
 55             std::lock_guard<std::mutex> locker(m_mutex);
 56             m_needStop = true;
 57         }
 58         m_notFull.notify_all();
 59         m_notEmpty.notify_all();
 60     }
 61 
 62     bool Empty()
 63     {
 64         std::lock_guard<std::mutex> locker(m_mutex);
 65         return m_queue.empty();
 66     }
 67 
 68     bool Full()
 69     {
 70         std::lock_guard<std::mutex> locker(m_mutex);
 71         return m_queue.size() == m_maxSize;
 72     }
 73 
 74     size_t Size()
 75     {
 76         std::lock_guard<std::mutex> locker(m_mutex);
 77         return m_queue.size();
 78     }
 79 
 80     int Count()
 81     {
 82         return m_queue.size();
 83     }
 84 private:
 85     bool NotFull() const
 86     {
 87         bool full = m_queue.size() >= m_maxSize;
 88         if (full)
 89             cout << "full, waiting,thread id: " << this_thread::get_id() << endl;
 90         return !full;
 91     }
 92 
 93     bool NotEmpty() const
 94     {
 95         bool empty = m_queue.empty();
 96         if (empty)
 97             cout << "empty,waiting,thread id: " << this_thread::get_id() << endl;
 98         return !empty;
 99     }
100 
101     template<typename F>
102     void Add(F&&x)
103     {
104         std::unique_lock< std::mutex> locker(m_mutex);
105         m_notFull.wait(locker, [this]{return m_needStop || NotFull(); });
106         if (m_needStop)
107             return;
108 
109         m_queue.push_back(std::forward<F>(x));
110         m_notEmpty.notify_one();
111     }
112 
113 private:
114     std::list<T> m_queue; //缓冲区
115     std::mutex m_mutex; //互斥量和条件变量结合起来使用
116     std::condition_variable m_notEmpty;//不为空的条件变量
117     std::condition_variable m_notFull; //没有满的条件变量
118     int m_maxSize; //同步队列最大的size
119 
120     bool m_needStop; //停止的标志
121 };
122 
123 #endif // SYNCQUEUE_H

ThreadPool

 1 #ifndef THREADPOOL_H
 2 #define THREADPOOL_H
 3 
 4 #include<list>
 5 #include<thread>
 6 #include<functional>
 7 #include<memory>
 8 #include <atomic>
 9 #include "SyncQueue.hpp"
10 
11 const int MaxTaskCount = 2000;
12 class ThreadPool
13 {
14 public:
15     using Task = std::function<void()>;
16     ThreadPool(int numThreads = std::thread::hardware_concurrency())
17         : m_queue(MaxTaskCount), m_numThreads(numThreads)
18     {
19         // Start(numThreads);
20     }
21 
22     ~ThreadPool(void)
23     {
24         //如果没有停止时则主动停止线程池
25         Stop();
26     }
27 
28     void Stop()
29     {
30         std::call_once(m_flag, [this]{StopThreadGroup(); }); //保证多线程情况下只调用一次StopThreadGroup
31     }
32 
33     void AddTask(Task&&task)
34     {
35         m_queue.Put(std::forward<Task>(task));
36     }
37 
38     void AddTask(const Task& task)
39     {
40         m_queue.Put(task);
41     }
42 
43     void Start()
44     {
45         m_running = true;
46         //创建线程组
47         for (int i = 0; i <m_numThreads; ++i)
48         {
49             m_threadgroup.push_back(std::make_shared<std::thread>(&ThreadPool::RunInThread, this));
50         }
51     }
52 private:
53     void RunInThread()
54     {
55         while (m_running)
56         {
57             //取任务分别执行
58             std::list<Task> list;
59             m_queue.Take(list);
60 
61             for (auto& task : list)
62             {
63                 if (!m_running)
64                     return;
65 
66                 task();
67             }
68         }
69     }
70 
71     void StopThreadGroup()
72     {
73         m_queue.Stop(); //让同步队列中的线程停止
74         m_running = false; //置为false,让内部线程跳出循环并退出
75 
76         for (auto thread : m_threadgroup) //等待线程结束
77         {
78             if (thread)
79                 thread->join();
80         }
81         m_threadgroup.clear();
82     }
83 
84     std::list<std::shared_ptr<std::thread>> m_threadgroup; //处理任务的线程组
85     SyncQueue<Task> m_queue; //同步队列
86     atomic_bool m_running; //是否停止的标志
87     std::once_flag m_flag;
88     int m_numThreads = 0;
89 };
90 
91 #endif // THREADPOOL_H

测试代码:

 1 void testThreadPool()
 2 {
 3     ThreadPool pool;
 4     std::thread thd1([&pool]{
 5         for(int i = 0; i < 10000; i++)
 6         {
 7             this_thread::sleep_for(std::chrono::seconds(1));
 8             auto thdid = this_thread::get_id();
 9             pool.AddTask([thdid]{
10                 std::cout << "同步层线程 1 的线程 ID:" << thdid << std::endl;
11             });
12         }
13     });
14     std::thread thd2([&pool]{
15         for(int i = 0; i < 10000; i++)
16         {
17             this_thread::sleep_for(std::chrono::seconds(1));
18             auto thdid = this_thread::get_id();
19             pool.AddTask([thdid]{
20                 std::cout << "同步层线程 2 的线程 ID:" << thdid << std::endl;
21             });
22         }
23     });
24     pool.Start();
25     this_thread::sleep_for(std::chrono::seconds(100));
26     pool.Stop();
27     thd1.join();
28     thd2.join();
29 }

 

标签:11,std,JFrame,return,void,queue,线程,include
来源: https://www.cnblogs.com/JackZheng/p/16272511.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有