ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

使用PThread+消息队列创建一个生产者消费者模型

2021-11-22 17:02:58  阅读:242  来源: 互联网

标签:what MessageQueue 队列 PThread 生产者 int pthread msg Message


一、概述

  案例:使用pthread+消息队列(单链表环形队列) 实现生产者消费者模型

  各个类的职责说明:

  1.message_queue.cpp消息队列

    ps:这个类最主要的方法有两个(这个类是线程安全的),一个是enqueueMessage(Message)向消息队列中放入数据,另一个是dequeueMessage(&Message)从消息队列中取出数据。其中,一旦有消息放入enqueueMessage中,此方法内部就会发送一个信号pthread_cond_signal通知需要消费的地方消费。dequeueMessage方法是一个阻塞方法,如果队列中没有数据就会调用pthread_cond_wait方法进行阻塞。enqueueMessage中的pthread_cond_signal通知的就是dequeueMessage方法中的pthread_cond_wait方法。

  2.根据上述消息队列的特性,新建两个线程:pProducer(生产者)、pCustomer(消费者),为了测试方便,生产者和消费者都用一个while(1)(死循环)来表示,即:生产者一直生产,消费者一直消费。其中生产者线程5秒生产一个产品,生产完成品后加入消息队列,然后在消息队列中调用enqueueMessage方法中的pthread_cond_signal,告诉消费者,可以进行消费了。消费者线程本来在阻塞状态,当其受到此信号的时候就会让pthread_cond_wait解除阻塞状态进入下一次循环,此时消息队列中取出的数据就是刚刚生产者生产的数据。其实整个流程就是这样,来回往复。当然在实际的项目中不能写while(1),最好定义一个变量可以控制如:isRunning

  

二、示例代码

  1.消息队列message_queue.h、message_queue.cpp

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/types.h>

#include <fcntl.h>
#include <pthread.h>

/**
 * 消息实体
 * */
class Message{
private:
        int what;
        int arg1;
        int arg2;
        void* obj;
public:
        Message();
        ~Message();
        Message(int what);
        Message(int what,int arg1,int arg2);
        Message(int what,void* obj);
        Message(int what,int arg1,int arg2,void * obj);

        int execute();
        int getWhat(){
                return what;
        }
        int getArg1(){
                return arg1;
        }
        int getArg2(){
                return arg2;
        }
        void* getObj(){
                return obj;
        }
};


/**
 * 创建环形队列节点
 * */
typedef struct MessageNode{
        Message *msg;//消息体
        struct MessageNode *next;//节点元素的下一个节点
        MessageNode(){
        msg = NULL;
        next = NULL;
        }
}MessageNode;


/**
 *单链表环形队列创建
 * */
class MessageQueue{
private:
        MessageNode *mFrist;//队列头
        MessageNode *mLast;//队列尾
        int mNbPackets;
        bool mAbortRequest;
        pthread_mutex_t mLock;//声明互斥锁
        pthread_cond_t mCondition;//声明条件变量
        const char* queueName;//队列名称

public:
        MessageQueue();
        ~MessageQueue();
        MessageQueue(const char* queueName);
        //初始化
        void init();
        void flush();
        int enqueueMessage(Message* msg);//添加元素
        int dequeueMessage(Message **msg,bool block);//取出元素
        int size();//队列大小
        void abort();
};

  

#include "message_queue.h"

MessageQueue::MessageQueue(){
    init();
}

MessageQueue::MessageQueue(const char* queueName){
    init();
    queueName = queueName;
}

void MessageQueue::init(){
    int initLockCode = pthread_mutex_init(&mLock,NULL);//初始化互斥锁
    int initConditionCode = pthread_cond_init(&mCondition,NULL);//初始化提交件变量
    mNbPackets = 0;
    mFrist = NULL;
    mLast = NULL;

    mAbortRequest = false;
}

MessageQueue::~MessageQueue(){
    flush();
    pthread_mutex_destroy(&mLock);
    pthread_cond_destroy(&mCondition);
}

//获取队列元素个数
int MessageQueue::size(){
    pthread_mutex_lock(&mLock);
    int size = mNbPackets;
    pthread_mutex_unlock(&mLock);
    return size;
}

/**
 * 清空集合中的元素
 * */
void MessageQueue::flush(){
    MessageNode *curNode ,*nextNode;
    Message *msg;
    pthread_mutex_lock(&mLock);
    for(curNode=mFrist;curNode!=NULL; curNode = nextNode){
        nextNode = curNode->next;
        msg = curNode->msg;
        if(NULL!=msg){
            delete msg;
        }
        delete curNode;
        curNode = NULL;
    }
    mLast = NULL;
    mFrist = NULL;
    mNbPackets = 0;
    pthread_mutex_unlock(&mLock);
}

int MessageQueue::enqueueMessage(Message *msg){
//    printf("enqueueMessage start.....\n");
    if(mAbortRequest){
        delete msg;
        return -1;
    }
    //printf("create MessageNode start....\n");
    MessageNode *node = new MessageNode();
    node->msg = msg;
    node->next = NULL;
//    printf("create MessageNode end ....\n");
//    printf("pthread_mutex_lock....\n");
    pthread_mutex_lock(&mLock);
    if(mFrist == NULL){
//        printf("mFirst=node\n");
        mFrist= node;
    }else{
//        printf("mLast->next=node\n");
        mLast->next = node;
    }
//    printf("mLast=node\n");
    mLast = node;
    mNbPackets++;
//    printf("pthread_cond_signal");
    pthread_cond_signal(&mCondition);
    pthread_mutex_unlock(&mLock);
    return 0;

}


/**
 * 从队列中取数据
 * */
int MessageQueue::dequeueMessage(Message **msg,bool block){
    MessageNode *node;
    int ret;
    pthread_mutex_lock(&mLock);
    for(;;){
        if(mAbortRequest){
            ret = -1;
            break;
        }
        node = mFrist;
        if(node){
            mFrist = node->next;
            if(!mFrist){
                mLast = NULL;
            }
            mNbPackets--;
            *msg = node->msg;
            delete node;
            node = NULL;
            ret = 1;
            break;
        }else if(!block){
            ret = 0;
            break;
        }else{
            pthread_cond_wait(&mCondition,&mLock);
        }
    }
    pthread_mutex_unlock(&mLock);
    return ret;
}
/**
 * 销毁队列
 * */
void MessageQueue::abort(){
    pthread_mutex_lock(&mLock);
    mAbortRequest = true;
    pthread_cond_signal(&mCondition);
    pthread_mutex_unlock(&mLock);    
}


Message::Message(){

}

Message::Message(int what){
    this->what = what;
}
Message::Message(int what,int arg1,int arg2){
    this->what = what;
    this->arg1 = arg1;
    this->arg2 = arg2;
}

Message::Message(int what ,void *obj){
    this->what = what;
    this->obj = obj;
}

Message::Message(int what,int arg1,int arg2,void *obj){
    this->what = what;
    this->arg1 = arg1;
    this->arg2 = arg2;
    this->obj = obj;
}

Message::~Message(){

}

int Message::execute(){
    
    return 0;
}

 2.测试类main.cpp

 

#include <stdio.h>
#include <iostream>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <unistd.h>
#include <pthread.h>

#include "message_queue.h"
using namespace std;
MessageQueue *queue;
//创建两个线程,一个线程生产数据,一个线程消费数据。生产线程5秒生产一个,消费线程:一旦有数据就立马消费
pthread_t pProducer;//生产者线程id
pthread_t pCustomer;//消费者线程id



void * producerHandler(void *arg){
    int index = 0;
    while(1){
        index++;
        cout << "生产第:"<<index<<"个产品"<<endl;
         char *name = (char*)"生产第";
            Message msg(index,name);
                queue->enqueueMessage(&msg);
        sleep(5);
    }

}

void *customerHandler(void* arg){
    while(1){

        Message *getMsg;
        int ret = queue->dequeueMessage(&getMsg,true);
        if(ret>0){
        cout<< "消费第:"<<getMsg->getWhat()<<"个产品"<<endl;
               // printf("what=[%d],obj=[%s]\n",getMsg->getWhat(),getMsg->getObj());
        }else{
                printf("集合出现了异常\n");
        }
}

}

int main(){
      queue = new MessageQueue();//创建线程
    pthread_create(&pProducer,NULL,producerHandler,(void*)1);
    pthread_create(&pCustomer,NULL,customerHandler,(void*)2);
    
    //等待线程结束
    pthread_join(pProducer,NULL);
    pthread_join(pCustomer,NULL);

    //实例化集合,并给集合添加元素
//    printf("入口开始了哈哈哈\n");
//    MessageQueue  *queue = new MessageQueue();
//    char *name = (char*)"你好洛洛杨";
//    Message msg(1,name);
//    printf("开始填充数据\n");
//    queue->enqueueMessage(&msg);
//    printf("结束填充数据\n");
//    
//    //从集合中取出元素
//    printf("开始取出数据\n");
//    Message *getMsg;
//    int ret = queue->dequeueMessage(&getMsg,true);
//    printf("取出数据结束\n");
//    if(ret>0){
//        printf("what=[%d],obj=[%s]\n",getMsg->getWhat(),getMsg->getObj());
//    }else{
//        printf("集合出现了异常\n");
//    }

    return 0;
}

 

  

三、示例图,从图中可以看出,确实是生产一个产品就消费一个产品,假如生产者不生产消费者就一直阻塞。

 

标签:what,MessageQueue,队列,PThread,生产者,int,pthread,msg,Message
来源: https://www.cnblogs.com/tony-yang-flutter/p/15589231.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有