ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

基于POISX线程的线程池实现

2021-12-31 21:35:24  阅读:147  来源: 互联网

标签:基于 task struct worker POISX 线程 mutex pool


一、线程池的功能

  减少线程的创建和销毁的内存资源消耗。

二、线程池基础结构

  • 消息队列  
  • 任务执行队列
  • 管理组件

2.1 简易数据结构

2.1.1 消息队列

  使用双向链表实现队列结构,节点消息包括执行的消息函数以及函数参数数据。

1 struct nTask {
2     void (*task_func)(struct nTask *task);
3     void *user_data;
4 
5     struct nTask *prev;
6     struct nTask *next;
7 };
2.1.2 任务执行队列

  使用双向链表实现队列结构,节点信息包括线程id、线程存活状态、所属管理组件。

struct nWorker {
    pthread_t threadid;
    int terminate;
    struct nManager *manager;

    struct nWorker *prev;
    struct nWorker *next;
};
2.1.3 管理组件

  管理组件管理消息队列、任务执行队列。同时,使用锁和条件变量完成对共享数据的保护操作。

typedef struct nManager {
    struct nTask *tasks;
    struct nWorker *workers;

    pthread_mutex_t mutex;
    pthread_cond_t cond;
} ThreadPool;

2.2 底层数据结构接口实现

2.2.1 向队列头插入节点
#define LIST_INSERT(item, list) do {    \
    item->prev = NULL;                    \
    item->next = list;                    \
    if ((list) != NULL) (list)->prev = item; \
    (list) = item;                        \
} while(0)
2.2.2 删除队列中指定节点
#define LIST_REMOVE(item, list) do {    \
    if (item->prev != NULL) item->prev->next = item->next; \
    if (item->next != NULL) item->next->prev = item->prev; \
    if (list == item) list = item->next;                     \
    item->prev = item->next = NULL;    \
} while(0)

2.3 线程任务定义

  线程池线程执行相同的线程回调函数,在回调函数内,获取消息队列中待处理的消息并执行。

// callback != task
static void *nThreadPoolCallback(void *arg) {

    struct nWorker *worker = (struct nWorker*)arg;

    while (1) {

        pthread_mutex_lock(&worker->manager->mutex);
        while (worker->manager->tasks == NULL) {
            if (worker->terminate) break;
            pthread_cond_wait(&worker->manager->cond, &worker->manager->mutex);
        }
        if (worker->terminate) {
            pthread_mutex_unlock(&worker->manager->mutex);
            break;
        }

        struct nTask *task = worker->manager->tasks;
        LIST_REMOVE(task, worker->manager->tasks);

        pthread_mutex_unlock(&worker->manager->mutex);

        task->task_func(task);
    }

    free(worker);
    
}

  对共享数据消息队列的操作要在锁中进行。执行任务在锁外进行使锁尽可能小。

三、业务接口实现

3.1 线程池创建函数

  此函数参数为线程池对象指针和最大线程数。

  互斥锁和条件变量的初始化有静态初始化和动态初始化两种。此处用静态初始化的互斥锁和条件变量和memset来初始化线程池的互斥锁和条件变量。

  在进行内存申请时要判断内存申请是否成功。

// API
int nThreadPoolCreate(ThreadPool *pool, int numWorkers) {

    if (pool == NULL) return -1;
    if (numWorkers < 1) numWorkers = 1;
    memset(pool, 0, sizeof(ThreadPool));

    pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
    memcpy(&pool->cond, &blank_cond, sizeof(pthread_cond_t));

    //pthread_mutex_init(&pool->mutex, NULL);
    pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;
    memcpy(&pool->mutex, &blank_mutex, sizeof(pthread_mutex_t));
    

    int i = 0;
    for (i = 0;i < numWorkers;i ++) {
        struct nWorker *worker = (struct nWorker*)malloc(sizeof(struct nWorker));
        if (worker == NULL) {
            perror("malloc");
            return -2;
        }
        memset(worker, 0, sizeof(struct nWorker));
        worker->manager = pool; //

        int ret = pthread_create(&worker->threadid, NULL, nThreadPoolCallback, worker);
        if (ret) {
            perror("pthread_create");
            free(worker);
            return -3;
        }
        
        LIST_INSERT(worker, pool->workers);
    }

    // success
    return 0; 

}

3.2 线程池销毁函数

  线程池销毁主要工作是将线程的存活状态设置为销毁,并广播条件变量通知线程回调函数销毁线程。条件变量要配合锁使用。

// API
int nThreadPoolDestory(ThreadPool *pool, int nWorker) {

    struct nWorker *worker = NULL;

    for (worker = pool->workers;worker != NULL;worker = worker->next) {
        worker->terminate;
    }

    pthread_mutex_lock(&pool->mutex);

    pthread_cond_broadcast(&pool->cond);

    pthread_mutex_unlock(&pool->mutex);

    pool->workers = NULL;
    pool->tasks = NULL;

    return 0;
    
}

3.3 消息添加函数

  此函数操作共享数据消息队列,所以需要加锁操作。pthread_cond_signal()函数作用是随机唤醒一个等待条件变量的线程。

// API
int nThreadPoolPushTask(ThreadPool *pool, struct nTask *task) {

    pthread_mutex_lock(&pool->mutex);

    LIST_INSERT(task, pool->tasks);

    pthread_cond_signal(&pool->cond);

    pthread_mutex_unlock(&pool->mutex);

}

四、测试样例

 

#if 1

#define THREADPOOL_INIT_COUNT    20
#define TASK_INIT_SIZE            1000


void task_entry(struct nTask *task) { //type 

    //struct nTask *task = (struct nTask*)task;
    int idx = *(int *)task->user_data;

    printf("idx: %d\n", idx);

    free(task->user_data);
    free(task);
}


int main(void) {

    ThreadPool pool = {0};
    
    nThreadPoolCreate(&pool, THREADPOOL_INIT_COUNT);
    // pool --> memset();
    
    int i = 0;
    for (i = 0;i < TASK_INIT_SIZE;i ++) {
        struct nTask *task = (struct nTask *)malloc(sizeof(struct nTask));
        if (task == NULL) {
            perror("malloc");
            exit(1);
        }
        memset(task, 0, sizeof(struct nTask));

        task->task_func = task_entry;
        task->user_data = malloc(sizeof(int));
        *(int*)task->user_data  = i;

        
        nThreadPoolPushTask(&pool, task);
    }

    getchar();
    
}


#endif

 

标签:基于,task,struct,worker,POISX,线程,mutex,pool
来源: https://www.cnblogs.com/unrealCat/p/15754444.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有