ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

基于libevent线程池实现

2022-04-14 01:04:03  阅读:216  来源: 互联网

标签:基于 int void XThread base 线程 libevent include


XThreadPool.h

#ifndef XTHREADPOOL_H
#define XTHREADPOOL_H
#include <vector>

class XThread;
class XTask;
class XThreadPool
{
private:
    int threadCount = 0; // 线程数量
    int lastThread = -1;
    std::vector<XThread *> threads;

public:
    // 单例模式
    static XThreadPool *Get()
    {
        static XThreadPool p;
        return &p;
    }

    // 初始化所有线程并启动线程
    void Init(int threadCount);

    // 分发线程
    void Dispatch(XTask *task);
};
#endif

XThreadPool.cpp

#include <XTask.h>
#include <XThread.h>
#include <XThreadPool.h>
#include <iostream>
#include <thread>

using namespace std;

void XThreadPool::Init(int threadCount)
{
    this->threadCount = threadCount;
    this->lastThread = -1;
    for (int i = 0; i < threadCount; i++)
    {
        XThread *t = new XThread();
        t->id = i + 1; // 传递线程编号
        cout << "create thread " << i << endl;
        t->Start(); // 启动线程
        threads.push_back(t);
        this_thread::sleep_for(chrono::microseconds(10)); // 10ms
    }
}

// 分发线程
void XThreadPool::Dispatch(XTask *task)
{
    // 轮询
    if (!task)
    {
        return;
    }
    int tid = (lastThread + 1) % threadCount;
    lastThread = tid;
    XThread *t = threads[tid];

    t->AddTask(task);
    // 激活线程
    t->Activate();
}

XThread.h

#ifndef XTHREAD_H
#define XTHREAD_H
#include <event2/bufferevent.h>
#include <event2/event.h>
#include <list>
#include <mutex>
class XTask;
class XThread
{
public:
    // 启动线程
    void Start();

    // 线程入口函数
    void Main();

    //安装线程,初始化event_base和管道监听事件,用于激活线程
    bool Setup();

    // 收到主线程发出的激活消息,(线程池的分发调用)
    void Notify(evutil_socket_t fd, short which);

    // 线程激活
    void Activate();

    // 添加处理的任务,一个线程可以同时处理多个任务,共用一个event_base
    void AddTask(XTask *t);

    int id = 0; // 线程编号
private:
    int notify_send_fd = 0;
    event_base *base = 0;

    std::list<XTask *> tasks; // 任务列表
    std::mutex tasks_mutex;   // 线程安全互斥
};
#endif

XThread.cpp

#include "XThread.h"
#include <XTask.h>
#include <event2/buffer.h>
#include <event2/event.h>
#include <iostream>
#include <thread>
#include <unistd.h>

using namespace std;

// 激活线程任务的事件回调函数
static void NotifyCB(evutil_socket_t fd, short which, void *arg)
{
    XThread *t = (XThread *)arg;
    t->Notify(fd, which);
}

void XThread::Notify(evutil_socket_t fd, short which)
{
    // 水平触发,只要没有接收完成,会再次进来
    char buf[2] = {0};
    int re = read(fd, buf, 1);
    if (re <= 0)
    {
        return;
    }
    cout << "id = " << id << buf << endl;
    XTask *task = NULL;

    // 获取任务,并初始化任务
    tasks_mutex.lock();
    if (tasks.empty())
    {
        tasks_mutex.unlock();
        return;
    }
    task = tasks.front(); // 先进先出
    tasks.pop_front();
    tasks_mutex.unlock();
    task->Init();
}

// 添加处理的任务,一个线程可以同时处理多个任务,共用一个event_base
void XThread::AddTask(XTask *t)
{
    if (!t)
        return;
    t->set_base(this->base);
    tasks_mutex.lock();
    tasks.push_back(t);
    tasks_mutex.unlock();
}

// 线程激活
void XThread::Activate()
{
    int re = write(this->notify_send_fd, "c", 1);
    if (re <= 0)
    {
        cerr << "Activate failed" << endl;
    }
}

// 启动线程
void XThread::Start()
{
    Setup();
    // 启动线程
    thread th(&XThread::Main, this);

    // 断开与主线程联系
    th.detach();
}

// 线程入口函数
void XThread::Main()
{
    cout << id << "void XThread::Main() begin" << endl;
    if (!base)
    {
        cerr << "Thread::Main() failed! base is null" << endl;
    }
    event_base_dispatch(base);
    event_base_free(base);
    cout << id << "void XThread::Main() end" << endl;
}

//安装线程,初始化event_base和管道监听事件,用于激活线程
bool XThread::Setup()
{
    // linux用管道 创建为管道,不能用send recv ,用read write
    // fds[0] 读 fds[1]写
    int fds[2];
    if (pipe(fds))
    {
        cerr << "pipe failed" << endl;
        return false;
    }

    // 读取绑定到event事件中,写入要保存
    notify_send_fd = fds[1];

    // 创建libevent上下文,无锁
    event_config *ev_config = event_config_new();
    event_config_set_flag(ev_config, EVENT_BASE_FLAG_NOLOCK);
    this->base = event_base_new_with_config(ev_config);
    event_config_free(ev_config);
    if (!base)
    {
        cerr << "event_base_new_with_config failed in thread" << endl;
    }

    // 添加管道监听事件,用于激活线程执行任务
    event *ev = event_new(base, fds[0], EV_READ | EV_PERSIST, NotifyCB, this);
    event_add(ev, NULL);

    return true;
}

XTask.h

#ifndef XTASK_H
#define XTASK_H
#include <event2/event.h>
class XTask
{
public:
//初始化任务
    virtual bool Init() = 0;
    
    void set_sock(int sock)
    {
        this->sock = sock;
    }

    void set_threadid(int thread_id)
    {
        this->thread_id = thread_id;
    }

    int thread_idfunc()
    {
        return thread_id;
    }

    int sockfunc()
    {
        return sock;
    }

    event_base *basefunc()
    {
        return base;
    }

    void set_base(event_base *base)
    {
        this->base = base;
    }

private:
    event_base *base = 0;
    int sock = 0;
    int thread_id = 0;

    
};
#endif

标签:基于,int,void,XThread,base,线程,libevent,include
来源: https://www.cnblogs.com/meng-chao/p/16142827.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有