ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

muduo源码解析30-网络库8:tcpserver类

2020-09-02 19:31:20  阅读:265  来源: 互联网

标签:muduo const eventloop 30 tcpserver 源码 连接 loop acceptor


tcpserver:

说明:

之前说的acceptor负责接收连接,tcpconnection负责对这个连接进行操作。

那么这两个合起来就有一个tcpserver的基本架构了

tcpserver使用acceptor来接受一个连接,使用tcpconnection来对这个连接进行处理。

tcpserver.h

/*
TcpServer实现了对于TCP服务的封装,功能是管理accept获得的TcpConnection
TcpServer是供用户直接使用的,生命期由用户控制,用户只需要设置好callback函数,再调用start即可

tcpserver维护当前所有的tcpconenction集合,便于对其进行管理
tcpserver构造函数中完成了对于acceptor类对象的构造,因此socket(),bind()操作在
tcpserver构造时就已经做好,而listen()则在tcpserver::start()中做好


构造函数中acceptor设置发生连接事件的回调函数就是tcpserver::newConnection,也就是说
acceptor中的acceptorsocket发生了可读网络事件(连接到来),首先acceptorcahnnel会调用
acceptor::handleRead()先accept()这个连接,此时连接已经接收完成.
但是还需要调用连接建立完成时的回调函数tcpserver::newConnection,在newConnection中
则是完成新建一个tcpconnection对象,并把它加入tcpconnection集合中来方便对所有的
tcpconnection连接进行管理.

需要注意一点就是,tcpserver并不是单线程的,其内部使用一个eventloopthreadpool
也就是说有多个IO线程,每个IO线程都有一个eventloop对象,因此也就有多个
while(1)
{
poll();
handleEvent();
}

这样的好处就是提高并发性,多个连接到来时,单eventloop可能会来不及处理.
这样子会带来一个问题,怎么统计当前所有连接进来的客户机呢?因此是多线程处理IO,
每个线程都有一个poller::m_pollfds,只对该线程的套接字集合进行管理,而tcpserver
如何知道哪些套接字正处于链接呢?
tcpserver使用tcpconenction来维护一个tcp连接集合,每次acceptor接受一个新的连接时,会
回调tcpserver::newConnection()新建一个tcpconnection加入到tcp连接集合,
每次断开连接时,会回调tcpserver::removeConnection()把退出的tcpconnection从tcp
连接集合中删除.


*/

#ifndef TCPSERVER_H
#define TCPSERVER_H

#include"base/atomic.h"
#include"base/types.h"
#include"net/tcpconnection.h"

#include<map>

namespace mymuduo {

namespace net {

class acceptor;
class eventloop;
class eventloopthreadpool;

class tcpserver
{
public:
    typedef std::function<void(eventloop*)> ThreadInitCallback;

    //端口复用/地址复用
    enum Option
    {
        kNoReusePort,
        kReusePort
    };

    tcpserver(eventloop* loop,const inetaddress& listenAddr,const string& name,
              Option option=kNoReusePort);
    ~tcpserver();

    //获得ipPort,name,eventloop*
    const string& ipPort()const{return m_ipPort;}
    const string& name() const{return m_name;}
    eventloop* getLoop() const{return m_loop;}

    //设置线程池内线程数量,设置线程回调函数
    void setThreadNum(int numThreads);
    void setThreadInitCallback(const ThreadInitCallback& cb){m_threadInitCallback=cb;}
    std::shared_ptr<eventloopthreadpool> threadPool(){return m_threadPool;}

    //tcpserver启动...
    void start();

    //设置连接建立时,读消息时,写完成时的回调函数
    void setConnectionCallback(const ConnectionCallback& cb){m_connectionCallback=cb;}
    void setMessageCallback(const MessageCallback& cb){m_messageCallback=cb;}
    void setWriteCompleteCallback(const WriteCompleteCallback& cb){m_writeCompleteCallback=cb;}


private:

    //新建一个连接,accpetor默认的接受连接时的回调函数
    void newConnection(int sockfd,const inetaddress& peerAddr);
    //移除一个连接
    void removeConnection(const TcpConnectionPtr& conn);
    //loop内部移除一个连接
    void removeConnectionInLoop(const TcpConnectionPtr& conn);

    //每个tcpconnection都有一个名字
    typedef std::map<string, TcpConnectionPtr> ConnectionMap;

    eventloop* m_loop;                  //tcpserver所在的那个eventloop
    const string m_ipPort;              //tcpserver的ipPort
    const string m_name;                //name
     //acceptor智能指针,用于创建套接字,绑定,监听和接收一个连接,
    std::unique_ptr<acceptor> m_acceptor;
    std::shared_ptr<eventloopthreadpool> m_threadPool;  //eventloop线程池智能指针
    ConnectionCallback m_connectionCallback;        //建立连接时的回调函数
    MessageCallback m_messageCallback;              //读消息时的回调函数
    WriteCompleteCallback m_writeCompleteCallback;  //写事件完成时的回调函数
    ThreadInitCallback m_threadInitCallback;        //线程初始化回调
    atomicInt32 m_start;                            //tcpserver是否开始
    int m_nextConnId;                               //先一个连接client的fd
    ConnectionMap m_connections;     //名字到tcpconenction的映射

};

}//namespace net

}//namespace mymuduo

#endif // TCPSERVER_H

tcpserver.cpp

#include "tcpserver.h"

#include"base/logging.h"
#include"net/acceptor.h"
#include"net/eventloop.h"
#include"eventloopthreadpool.h"
#include"net/socketsops.h"

namespace mymuduo {

namespace net {

//构造函数,初始化成员,设置默认连接建立回调,读消息回调
//acceptor完成套接字的创建,绑定,监听和接受一个新的连接操作
tcpserver::tcpserver(eventloop* loop,const inetaddress& listenAddr,
                     const string& name,Option option)
    :m_loop(loop),m_ipPort(listenAddr.toIpPort()),m_name(name),
      m_acceptor(new acceptor(loop,listenAddr,option==kReusePort)),
      m_threadPool(new eventloopthreadpool(loop,name)),
      m_connectionCallback(defaultConnectionCallback),
      m_messageCallback(defaultMessageCallback),
      m_nextConnId(1)
{
    //m_acceptor接受一个新的连接时,回调tcpserver::newConnection
    m_acceptor->setNewConnectionCallback(
                std::bind(&tcpserver::newConnection,this,_1,_2));
}

//析构函数,关闭所有tcpconnection时,让tcpconnection回调&tcpconnection::connectDestroyed关闭连接
tcpserver::~tcpserver()
{
    m_loop->assertInLoopThread();
    LOG_TRACE << "TcpServer::~TcpServer [" << m_name << "] destructing";

    for (auto& item : m_connections)
    {
      TcpConnectionPtr conn(item.second);
      item.second.reset();
      conn->getLoop()->runInLoop(
        std::bind(&tcpconnection::connectDestroyed, conn));
    }
}

//设置线程池线程数量
void tcpserver::setThreadNum(int numThreads)
{
    assert(0<=numThreads);
    m_threadPool->setThreadNum(numThreads);
}

//启动tcpserver,实质上启动内部线程池,并让eventloop回调acceptor::listen(),完成监听操作
void tcpserver::start()
{
    if(m_start.getAndSet(1)==0)
    {
        m_threadPool->start(m_threadInitCallback);
        assert(!m_acceptor->listenning());
        m_loop->runInLoop(std::bind(&acceptor::listen,get_pointer(m_acceptor)));
    }
}

//在新连接到达时,acceptor会先回调acceptor::handleRead()接受一个连接
//此时连接已经被acceptor接受完成,然后acceptor会调用&tcpserver::newConnection()
//调用的目的是把这个连接本身封装成一个tcpconnection,并把它加入到ConnectionMap,
//tcpconenction目的是便于对tcp连接进行管理
void tcpserver::newConnection(int sockfd, const inetaddress &peerAddr)
{
    m_loop->assertInLoopThread();
    //在线程池内找到一个eventloopthread,让其执行新建一个tcpconenction操作
    eventloop* ioloop=m_threadPool->getNextLoop();
    char buf[64];
    snprintf(buf,sizeof buf,"-%s#%d",ipPort().data(),m_nextConnId);
    ++m_nextConnId;
    string connName=m_name+buf;
    LOG_INFO << "TcpServer::newConnection [" << m_name
             << "] - new connection [" << connName
             << "] from " << peerAddr.toIpPort();
    inetaddress localAddr(sockets::getLocalAddr(sockfd));
    //新建一个tcp连接,这个操作在线程池内的ioloop线程中执行
    TcpConnectionPtr conn(new tcpconnection(ioloop,connName,sockfd,localAddr,peerAddr));
    m_connections[connName]=conn;   //更新tcpserver内部连接集合
    //设置tcpconnection的四个回调函数
    conn->setConnectionCallback(m_connectionCallback);
    conn->setMessageCallback(m_messageCallback);
    conn->setWriteCompleteCallback(m_writeCompleteCallback);
    conn->setCloseCallback(std::bind(&tcpserver::removeConnection,this,_1));
    //让eventloop回调tcpconnection::connectEstablished完成连接建立完成时操作
    ioloop->runInLoop(std::bind(&tcpconnection::connectEstablished,conn));
}

//conn关闭时eventloop回调tcpserver::removeConnectionInLoop用于关闭tcp连接
void tcpserver::removeConnection(const TcpConnectionPtr &conn)
{
    m_loop->runInLoop(std::bind(&tcpserver::removeConnectionInLoop,this,conn));
}

//
void tcpserver::removeConnectionInLoop(const TcpConnectionPtr &conn)
{
    m_loop->assertInLoopThread();
    LOG_INFO << "TcpServer::removeConnectionInLoop [" << m_name
             << "] - connection " << conn->name();
    //在m_connections中移除conn
    size_t n=m_connections.erase(conn->name());
    eventloop* ioloop=conn->getLoop();
    //让eventloop回调tcpconnection::connectDestroyed完成tcpconenction的销毁
    ioloop->queueInLoop(std::bind(&tcpconnection::connectDestroyed,conn));

}

}//namespace net

}//namespace mymuduo

测试:

#include "net/tcpserver.h"

#include "base/logging.h"
#include "base/thread.h"
#include "net/eventloop.h"
#include "net/inetaddress.h"

#include <utility>

#include <stdio.h>
#include <unistd.h>

using namespace mymuduo;
using namespace mymuduo::net;

int numthreads = 0;

class EchoServer
{
 public:
  EchoServer(eventloop* loop, const inetaddress& listenAddr)
    : loop_(loop),
      server_(loop, listenAddr, "EchoServer")
  {
    server_.setConnectionCallback(
        std::bind(&EchoServer::onConnection, this, _1));
    server_.setMessageCallback(
        std::bind(&EchoServer::onMessage, this, _1, _2, _3));
    server_.setThreadNum(numthreads);
  }

  void start()
  {
    server_.start();
  }
  // void stop();

 private:
  void onConnection(const TcpConnectionPtr& conn)
  {
    LOG_TRACE << conn->peerAddress().toIpPort() << " -> "
        << conn->localAddress().toIpPort() << " is "
        << (conn->connected() ? "UP" : "DOWN");
    LOG_INFO << conn->getTcpInfoString();

    conn->send("hello\n");
  }

  void onMessage(const TcpConnectionPtr& conn, buffer* buf, timestamp time)
  {
    string msg(buf->retrieveAllAsString());
    LOG_TRACE << conn->name() << " recv " << msg.size() << " bytes at " << time.toString();
    if (msg == "exit\n")
    {
      conn->send("bye\n");
      conn->shutdown();
    }
    if (msg == "quit\n")
    {
      loop_->quit();
    }
    //打印一下读到的数据
    LOG_INFO<<msg.data();
    conn->send(msg);
  }

  eventloop* loop_;
  tcpserver server_;
};

int main()
{

  LOG_INFO << "pid = " << getpid() << ", tid = " << currentthread::tid();
  LOG_INFO << "sizeof TcpConnection = " << sizeof(tcpconnection);

  numthreads = 8;

  eventloop loop;
  inetaddress listenAddr("192.168.1.103",12306);
  EchoServer server(&loop, listenAddr);

  server.start();

  loop.loop();
}

实现了一个echo服务器,可以接受连接并echo客户机发送过来的消息。

 

标签:muduo,const,eventloop,30,tcpserver,源码,连接,loop,acceptor
来源: https://www.cnblogs.com/woodineast/p/13603596.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有