ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

postgres pg_receivewal代码分析

2022-06-26 22:04:18  阅读:173  来源: 互联网

标签:postgres stream receivewal pg query copybuf conn socket


src\bin\pg_basebackup\pg_receivewal.c
main 468
    初始化,参数解析,获取连接681,检查wal size 714
    StreamLog(); src\bin\pg_basebackup\pg_receivewal.c 760
        stream分配内存,获得数据库连接,检查版本,检查系统
        FindStreamingStart(&stream.timeline); 406---》200  //根据目的文件夹中最大id的文件
                                                                ,找最后的xlog segment,决定本次的起点。
            获取目的目录src\port\dirent.c
            得到文件
            看是否是partical,是否压缩
        XLogSegmentOffset(stream.startpos, WalSegSz);    //找到在segment开始的地方复制下一个
        设置stream
        ReceiveXlogStream(conn, &stream);   //从特定的position开始复制log stream
                                                                src\bin\pg_basebackup\receivelog.c 437
            while(1){
                if(!existsTimeLineHistoryFile){     //查看本地是否存在wal,如果是从上次结束的地方在开始
                    res = PQexec(conn, "TIMELINE_HISTORY %u");    //538
                    //参考http://www.postgres.cn/docs/14/protocol-replication.html
                }  
                res = PQexec(conn, START_REPLICATION %s%X/%X TIMELINE %u"); 
                //执行语句,指示服务器开始启动流WAL,从 WAL 位置XXX/XXX开始。如果TIMELINE选项被指定,流传送会在时间线tli上开始,否则会选择服务器的当前时间线。完成这个操作后,后面只需要从本地buffer中读取和处理。参考http://www.postgres.cn/docs/14/protocol-replication.html
                    PQexecStart(conn)
                    PQsendQuery(conn, query)
                        PQsendQueryInternal(conn, query, true);
                            pqPutMsgStart('Q', conn)
                            pqPuts(query, conn)
                                pqPutMsgBytes(s, strlen(s) + 1, conn)
                                    memcpy(conn->outBuffer + conn->outMsgEnd, buf, len);//这里放到了socket发送buffer中就可以自动完成发送的任务。
                            pqPutMsgEnd(conn)
                    PQexecFinish(conn)

                HandleCopyStream(conn, stream, &stoppos); //处理query返回的stream包
                    CopyStreamReceive()
                        rawlen = PQgetCopyData //从socket buffer中copy到用户态
                            pqGetCopyData3
                                getCopyDataMessage
                                    pqGetc //获取一个char,表示类型
                                    pqGetInt //int 表示这个包的size length
                            malloc //如果rawlen!=0, 根据获得的长度,分配内存,并从socketbuf中拷贝到用户态
                            memcpy
                        if(rawlen == 0)
                            CopyStreamPoll
                                select //使用linux select等待数据到达,直到超时
                            pqReadData //select拿到通知,数据已经到达
                                pqReadData
                                    pqsecure_read
                                        pqsecure_raw_read
                                            recv //linux socket api 从socket中收数据
                    while(1) { 
                        r = CopyStreamReceive(conn, sleeptime, stream->stop_socket, &copybuf);
                            rawlen = PQgetCopyData(conn, &copybuf, 1);
                                return pqGetCopyData3(conn, buffer, async);
                                    for;;{
                                        msgLength = getCopyDataMessage(conn); //获取一行的message数据,message是stream的控制信息,后面的keep live,或者是wal data等信息。
                                            pqGetc(&id, conn) //从socket连接中获取
                                            pqGetInt //int 表示这个包的size length
                                        malloc //根据获得的长度,分配内存,并从socketbuf中拷贝到用户态
                                        memcpy
                                    }
                        while (r != 0)//一次query持续在这个循环中接收和处理服务器发送来的package,除非超时退出此循环
		        {
                              根据获取到的服务器massage,不同操作
                              if (copybuf[0] == 'k')
                              {
                                  ProcessKeepaliveMsg(conn, stream, copybuf, r, blockpos,
                                                          &last_status)
                              }
                              else if (copybuf[0] == 'w')
                              {
                                  ProcessXLogDataMsg(conn, stream, copybuf, r, &blockpos)
                                      write(walfile, copybuf + hdr_len + bytes_written,bytes_to_write) //这里完成写本地文件,完成备份。
                              }
                              CopyStreamReceive //同上
                        }
                    }
            }

标签:postgres,stream,receivewal,pg,query,copybuf,conn,socket
来源: https://www.cnblogs.com/skpupil/p/16414475.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有