ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

一个独立的线程函数示例

2022-07-13 15:03:33  阅读:128  来源: 互联网

标签:函数 示例 ThreadLock taf 线程 tempMsgQue include TC CPushCacheThread


 

 

一个独立的线程函数示例

#ifndef  __PUSH_CACHE_THREAD_H__
#define  __PUSH_CACHE_THREAD_H__
#include "util/tc_singleton.h"
#include "util/tc_thread.h"
#include "util/tc_config.h"  //taf::TC_Config 
#include "util/tc_thread_rwlock.h"
#include <vector>
#include <string>
#include "PushInterface.h"
#include "PushCache.h"

class CPushCacheThread :public taf::TC_Thread , public taf::TC_Singleton<CPushCacheThread>
{
public:
    CPushCacheThread();
    ~CPushCacheThread();

    bool initialize(const taf::TC_Config  &conf);
    void run();
    void terminate();

public:
    
    void pushToCache(const HQExtend::HPushUserMsgReq& oneMsg);

private:
    std::queue<HQExtend::HPushUserMsgReq> m_msgQue;

    taf::TC_ThreadLock m_ThreadLock;//线程锁 
    taf::TC_ThreadRWLocker m_rwLock;//读写锁
    HQExtend::PushCachePrx m_pushCachePrx;
    std::vector<std::string>  m_vObjName;
};


#endif 

 

#include "pushCacheThread.h"
#include "CountTimeApp.h"        //TELL_TIME_COST_THIS
#include "servant/Application.h" //ServerConfig::LocalIp
#include "servant/taf_logger.h"  //FDLOG
#include "Pusher.h"
using namespace std;

CPushCacheThread::CPushCacheThread()
{
    FDLOG("debug") << __FILE__ << ":" << __LINE__ << ":" << __func__ << "|" << std::this_thread::get_id() << endl;
}

CPushCacheThread::~CPushCacheThread()
{
    FDLOG("debug") << __FILE__ << ":" << __LINE__ << ":" << __func__ << "|" << std::this_thread::get_id() << endl;
    terminate();
}

bool CPushCacheThread::initialize(const taf::TC_Config &conf)
{
    bool bRet = false;
    try
    {
        string pushCacheObj = conf.get("/conf/push<pushCache>");
        if (!pushCacheObj.empty())
        { //预警历史缓存服务 接口
            m_pushCachePrx = Communicator::getInstance()->stringToProxy<HQExtend::PushCachePrx>(pushCacheObj);
            FDLOG("debug") << __FILE__ << ":" << __LINE__ << ":" << __func__ << "|"
                           << "Push Cache prx init succ " << endl;
            bRet = true;
        }
        return bRet;
    }
    catch (std::exception &ex)
    {
        LOG_ERROR << "Exception: " << ex.what() << endl;
    }
    catch (...)
    {
        LOG_ERROR << "Unkonwn exception" << endl;
    }

    return bRet;
}

void CPushCacheThread::run()
{
    FDLOG("cast") << __FILE__ << ":" << __LINE__ << ":" << __func__ << "|"
                  << " =========== run START =========== " << std::this_thread::get_id() << endl;
    while (_running)
    {
        try
        {
            // do nothing...
            std::queue<HQExtend::HPushUserMsgReq> tempMsgQue;
            {
                taf::TC_ThreadWLock wlock(m_rwLock);
                tempMsgQue.swap(m_msgQue);
            }
            while (tempMsgQue.size() > 0)
            {
                HQExtend::HPushUserMsgReq req = tempMsgQue.front();
                HQExtend::PushCachePrxCallbackPtr callback(new PushCacheCallback());
                m_pushCachePrx->async_pushUserMsg(callback, req);
                tempMsgQue.pop();
            }

            {
                TC_ThreadLock::Lock lock(m_ThreadLock);
                m_ThreadLock.timedWait(5000);
            }
        }
        catch (std::exception &ex)
        {
            LOG_ERROR << "Exception:" << ex.what() << endl;
        }
        catch (...)
        {
            LOG_ERROR << "Unkown exception." << endl;
        }
    }

    FDLOG("cast") << __FILE__ << ":" << __LINE__ << ":" << __func__ << "|"
                  << " =========== run END =========== " << std::this_thread::get_id() << endl;
}

void CPushCacheThread::terminate()
{
    FDLOG("cast") << __FILE__ << ":" << __LINE__ << ":" << __func__ << "|"
                  << " =========== terminate =========== " << std::this_thread::get_id() << endl;
    if (_running)
    {
        _running = false;
        try
        {
            TC_ThreadLock::Lock lock(m_ThreadLock);
            m_ThreadLock.notifyAll();
            LOG_DEBUG << __FILE__ << " CPushCacheThread::terminate " << endl;
        }
        catch (std::exception &ex)
        {
            LOG_ERROR << __FILE__ << "Exception:" << ex.what() << endl;
        }
        catch (...)
        {
            LOG_ERROR << __FILE__ << "Unknown exception." << endl;
        }
    }
}

void CPushCacheThread::pushToCache(const HQExtend::HPushUserMsgReq &oneMsg)
{
    taf::TC_ThreadWLock wlock(m_rwLock);
    m_msgQue.push(oneMsg);
}

 

标签:函数,示例,ThreadLock,taf,线程,tempMsgQue,include,TC,CPushCacheThread
来源: https://www.cnblogs.com/music-liang/p/16473892.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有