ICode9

精准搜索请尝试: 精确搜索
首页 > 数据库> 文章详细

openGauss数据库源码解析系列文章——公共组件源码解析(上)

2021-05-27 02:32:49  阅读:233  来源: 互联网

标签:函数 database 数据库 源码 线程 pg openGauss 解析


在数据库组件中,一些组件是专用的,如词法解析只用于SQL引擎;而另外一些组件是公共的,用于整个数据库系统。openGauss的公共组件包括系统表、数据库初始化、多线程架构、线程池、内存管理、多维监控和模拟信号机制等。每个组件实现了一个独立的功能。本篇将从系统表、数据库初始化、多线程架构及线程池技术四个方面对公共组件的源代码实现进行介绍。

一、系统表

系统表又称为数据字典或者元数据,存储管理数据库对象的定义信息,如表、索引、触发器等。用户可通过系统表查询用户定义的具体对象信息,如表的每个字段类型。因为openGauss支持一个实例管理多个数据库,所以系统表分为实例级别的系统表和数据库级别的系统表。实例级别的系统表在一个实例管理的多个数据库之间共享,整个实例只有一份,这些系统表为pg_authid、pg_auth_members、pg_database、pg_db_role_setting 、pg_tablespace、pg_shdepend、pg_shdescription、pg_shseclabel。数据库级别的系统表比如pg_class,pg_depend,pg_index,pg_attribute等,每个数据库各有一份。

(一)系统表的定义

openGauss系统表定义全部在src/include/catalog目录下,每个头文件就是一个系统表的定义。如pg_database.h就是对pg_database的定义。在pg_database.h中,主要包括pg_database的表OID(object identifier,对象标识符)、类型OID、结构体定义、字段个数和每个字段ID(identifier,标识符)枚举值、数据库初始化默认值。下面是代码及其具体解释:
/*  pg_database本身也是一张表,DatabaseRelationId表示pg_database在系统表pg_class中的OID为1262(pg_class系统表中保存的是表的定义信息) */ 
#define DatabaseRelationId    1262
/*  pg_database本身也是一个结构类型,DatabaseRelation_Rowtype_Id表示pg_database在系统表pg_type中的OID为1248  */ 
#define DatabaseRelation_Rowtype_Id  1248
/* pg_database本身也是一个结构类型,DatabaseRelation_Rowtype_Id表示pg_database在系统表pg_type中的OID为1248  */
/*  BKI_SHARED_RELATION表示pg_database是实例级别的系统表 */
CATALOG(pg_database,1262) BKI_SHARED_RELATION BKI_ROWTYPE_OID(1248) BKI_SCHEMA_MACRO
{
    NameData    datname;              /* 数据库名称 */
    Oid         datdba;               /* 数据库拥有者 */
    int4        encoding;             /* 字符集编码 */
    NameData    datcollate;           /* LC_COLLATE 设置值 */
    NameData    datctype;             /* LC_CTYPE 设置值 */
    bool        datistemplate;        /* 是否允许作为模板数据库*/
    bool        datallowconn;         /* 是否允许链接 */
    int4        datconnlimit;         /* 最大连接数*/
    Oid         datlastsysoid;        /* 系统OID最大值*/
    ShortTransactionId datfrozenxid;  /* 冻结事务ID,所有小于这个值的事务ID已经冷冻。为了兼容原来的版本,使用32位事务ID*/
    Oid         dattablespace;     /* 数据库的缺省表空间 */
    NameData    datcompatibility;/* 数据库兼容模式,比如除0是报错还是当做正常处理*/
#ifdef CATALOG_VARLEN          /* 下面字段是变长字段 */
    aclitem        datacl[1];        /* 访问权限*/
#endif
    TransactionId datfrozenxid64; /* 冷冻事务ID,64-bit(比特)事务ID */
} FormData_pg_database;

CATALOG的宏定义代码为:

#define CATALOG(name,oid)  typedef struct CppConcat(FormData_,name)

因此CATALOG(pg_database,1262)就是对结构体FormData_pg_database的定义。之所以采用CATALOG,是因为这个格式是和BKI(backend interface,后端接口)脚本约定的格式,BKI脚本根据这个格式生成数据库的建表脚本。

接下来是数据库对象字段总数和每个字段ID的值的定义代码,定义这些值的目的主要代码访问数据库对象时清晰、方便维护、避免魔鬼数字(魔鬼数字指在代码中没有具体含义的数字、字符串。魔鬼数字会影响代码可读性,使读者无法理解看到的数字对应的含义,从而难以理解程序的意图)。

#define Natts_pg_database                14
#define Anum_pg_database_datname        1
#define Anum_pg_database_datdba            2
#define Anum_pg_database_encoding        3
#define Anum_pg_database_datcollate        4
#define Anum_pg_database_datctype        5
#define Anum_pg_database_datistemplate    6
#define Anum_pg_database_datallowconn    7
#define Anum_pg_database_datconnlimit    8
#define Anum_pg_database_datlastsysoid    9
#define Anum_pg_database_datfrozenxid    10
#define Anum_pg_database_dattablespace    11
#define Anum_pg_database_compatibility    12
#define Anum_pg_database_datacl            13
#define Anum_pg_database_datfrozenxid64    14

最后是创建数据库时的默认数值。代码中的值表示默认创建template1数据库,DATA的字段值和数据库结构体的值一一对应。对应代码如下:

DATA(insert OID = 1 (  template1 PGUID ENCODING "LC_COLLATE" "LC_CTYPE" t t -1 0 0 1663 "DB_COMPATIBILITY" _null_ 3));
SHDESCR("default template for new databases");
#define TemplateDbOid            1
#define DEFAULT_DATABASE  "postgres"

系统表头文件的内容和格式基本类似。

(二)系统表的访问

系统表定义后,接下来介绍数据库在运行过程中对系统表的访问。openGauss对系统表的访问主要是通过syscache机制。syscache机制是一个通用的机制,主要对系统表的数据进行缓存,提升系统表数据的访问速度,详细细节参照具体章节描述。这里主要描述与pg_database相关的部分。pg_database在枚举类型enum SysCacheIdentifier中定义的枚举值有一个:DATABASEOID,表示根据数据库OID访问pg_database系统表。同时需要把pg_database系统表访问模式添加到“struct cachedesc cacheinfo”中。与pg_database相关的代码如下:

 {DatabaseRelationId, /* DATABASEOID */
        DatabaseOidIndexId,
        1,
        {ObjectIdAttributeNumber, 0, 0, 0},
        4},

这几个值与cachedesc结构体的字段对应,表示pg_database表的OID值、索引的OID值、搜索时有1个key字段、搜索key字段ID为ObjectIdAttributeNumber、初始化为4个hash桶。相关的代码如下:

struct cachedesc {
    Oid reloid;   /* 缓存的表的OID */
    Oid indoid;   /* 缓存数据的索引OID */
    int nkeys;    /*  缓存搜索的key的个数 */
    int key[4];   /* key属性的编号*/
    int nbuckets; /* 缓存hash桶的个数*/
};

系统表的定义和访问主要逻辑如上问所述。与pg_database相关的SQL命令是ALTER DATABASE、CREATE DATABASE、DROP DATABASE,这些命令执行的结果是把数据库相关的信息存储到pg_database系统表中。

其他系统表的逻辑与pg_database相似,不再重复。

二、数据库初始化

数据库正常启动时需要指定数据目录,数据目录中包括了系统表的初始化数据。数据库初始化的过程会生成这些初始系统表数据文件,该过程由initdb和openGauss进程配合生成。initdb控制执行过程,创建目录和基本的配置文件;openGauss进程负责系统表的初始化。initdb通过PG_CMD_OPEN宏启动openGauss进程,同时打开一个管道流,然后通过解析系统表文件中的SQL命令,并把命令通过PG_CMD_PUTS宏的管道流发给openGauss进程,最后通过PG_CMD_CLOSE宏关闭管道流。PG_CMD_OPEN宏是系统函数popen的封装宏,PG_CMD_PUTS宏是系统函数fputs的封装宏,PG_CMD_CLOSE宏是系统函数pclose的封装宏。交互过程如图1所示。

在这里插入图片描述

图1 初始化交互过程图|

initdb在创建template1模板数据库时,命令参数指定了“snprintf_s(cmd, sizeof(cmd), sizeof(cmd) - 1, ""%s" --boot -x1 %s %s", backend_exec, boot_options, talkargs);”,其中“--boot”表示openGauss进程以一个特殊的bootstrap模式运行。在其他初始化系统表时,initdb命令参数指定了“snprintf_s(cmd, sizeof(cmd), sizeof(cmd) - 1, ""%s" %s template1 >%s", backend_exec, backend_options, DEVNULL); ”,其中“static const char* backend_options = "--single "”表示openGauss进程以单用户模式运行。

下面以setup_schema函数为例详细介绍这个过程。相关代码如下:

static void setup_schema(void)
{
    PG_CMD_DECL;
    char** line;
    char** lines;
    int nRet = 0;
    char* buf_features = NULL;
    fputs(_("creating information schema ... "), stdout);
    (void)fflush(stdout);
    lines = readfile(info_schema_file);
    /*
     * 使用-j 避免在information_schema.sql反斜杠处理
     */
    nRet = snprintf_s(
        cmd, sizeof(cmd), sizeof(cmd) - 1, "\"%s\" %s -j template1 >%s", backend_exec, backend_options, DEVNULL);
    securec_check_ss_c(nRet, "\0", "\0");
    PG_CMD_OPEN;
    for (line = lines; *line != NULL; line++) {
        PG_CMD_PUTS(*line);
        FREE_AND_RESET(*line);
    }
    FREE_AND_RESET(lines);
    PG_CMD_CLOSE;
    nRet = snprintf_s(
        cmd, sizeof(cmd), sizeof(cmd) - 1, "\"%s\" %s template1 >%s", backend_exec, backend_options, DEVNULL);
    securec_check_ss_c(nRet, "\0", "\0");
    PG_CMD_OPEN;
    PG_CMD_PRINTF1("UPDATE information_schema.sql_implementation_info "
                   "  SET character_value = '%s' "
                   "  WHERE implementation_info_name = 'DBMS VERSION';\n",
        infoversion);

    buf_features = escape_quotes(features_file);
    PG_CMD_PRINTF1("COPY information_schema.sql_features "
                   "  (feature_id, feature_name, sub_feature_id, "
                   "  sub_feature_name, is_supported, comments) "
                   " FROM E'%s';\n",
        buf_features);
    FREE_AND_RESET(buf_features);
    PG_CMD_CLOSE;
    check_ok();
}

在这个函数中,PG_CMD_DECL是一个变量定义宏,通过语句“char cmd[MAXPGPATH]”和“FILE* cmdfd = NULL”定义了两个变量。这样的作用是代码格式统一、阅读方便。

语句“readfile(info_schema_file)”表示读取info_schema_file文件,这个文件中存放了系统表初始化的SQL命令。

语句“snprintf_s(cmd, sizeof(cmd), sizeof(cmd) - 1, ""%s" %s -j template1 >%s", backend_exec, backend_options, DEVNULL)”是格式化openGauss后台进程的命令。语句“PG_CMD_OPEN”是以popen的方式运行cmd命令,启动openGauss进程。

语句“for (line = lines; *line != NULL; line++)” 表示遍历info_schema_file文件中的每条SQL命令,宏PG_CMD_PUTS把每个SQL命令发送给openGauss进程执行。

整个文件执行完毕,调用宏PG_CMD_CLOSE停止进程,关闭管道。setup_schema函数的后面代码处理是类似的,只是SQL命令是函数内生成的,使用宏PG_CMD_PRINTF1写入管道,发给openGauss进程。

setup_sysviews、setup_dictionary、setup_privileges等其他系统对象初始化函数过程都是类似的,不再重复描述。

initdb的整个初始化过程如下。

(1)对命令行参数进行解析。

(2)查找openGauss程序,设置PGDATA、PGDATA、PGPATH等环境变量。设置数据库初始化原始文件postgres.bki、postgres.description、postgres.shdescription、postgresql.conf.sample、mot.conf.sample、pg_hba.conf.sample、pg_ident.conf.sample。这些文件在shell命令make install执行安装后,默认都在“openGauss-server/dest/share/postgresql”目录下。

(3)数据库本地初始化,locale默认初始化为en_US.UTF-8,数据库编码默认初始化为UTF8,文本搜索默认初始化为english。

(4)检查数据库数据目录pg_data是否为空,是否需要创建,权限是否正确。

(5)创建subdirs变量指定的子目录。

(6)初始化postgresql.conf配置文件。

(7)创建template1数据库bootstrap_template1。这一步需要启动后台openGauss进程执行数据库的SQL语句,创建系统表。bootstrap_template1这个函数主要是读取postgres.bki文件中的SQL语句,发送到openGauss进程去执行,主要功能是创建系统表。SQL语句举例如下,其中语句create pg_type表示创建pg_type系统表,语句INSERT OID表示插入这个系统表的默认数据。这里的语法是专门为initdb定制的bootstrap解析语法,不是正式的SQL语法,语法文件也是单独的,可参照“openGauss-server\src\gausskernel\bootstrap”目录下的bootscanner.l和bootparse.y文件。pg_type系统对象在initdb初始化中的bootstrap语法相关代码如下,在初始化时就是解析下面语法格式完成pg_type系统对象的创建:

create pg_type 1247 bootstrap rowtype_oid 71
 (
 typname = name ,
 typnamespace = oid ,
 typowner = oid ,
 typlen = int2 ,
 typbyval = bool ,
 typtype = char ,
 typcategory = char ,
 typispreferred = bool ,
 typisdefined = bool ,
 typdelim = char ,
 typrelid = oid ,
 typelem = oid ,
 typarray = oid ,
 typinput = regproc ,
 typoutput = regproc ,
 typreceive = regproc ,
 typsend = regproc ,
 typmodin = regproc ,
 typmodout = regproc ,
 typanalyze = regproc ,
 typalign = char ,
 typstorage = char ,
 typnotnull = bool ,
 typbasetype = oid ,
 typtypmod = int4 ,
 typndims = int4 ,
 typcollation = oid ,
 typdefaultbin = pg_node_tree ,
 typdefault = text ,
 typacl = aclitem[]
 )
INSERT OID = 16 ( bool 11 10 1 t b B t t \054 0 0 1000 boolin boolout boolrecv boolsend - - - c p f 0 -1 0 0 _null_ _null_ _null_ )
INSERT OID = 17 ( bytea 11 10 -1 f b U f t \054 0 0 1001 byteain byteaout bytearecv byteasend - - - i x f 0 -1 0 0 _null_ _null_ _null_ )
...........
close pg_type

(8)使用setup_auth函数初始化pg_authid权限。该函数执行的SQL语句是在函数内静态定义“static const char* pg_authid_setup[]”。

(9)使用setup_depend函数创建系统表依赖关系。该函数执行的SQL语句是在函数内静态定义“static const char* pg_depend_setup[]”。

(10)使用load_plpgsql函数加载plpgsql扩展组件。该函数只执行一条SQL语句“CREATE EXTENSION plpgsql;”。

(11)使用setup_sysviews函数创建系统视图。该函数会读取system_views.sql文件中的SQL语句,发送到openGauss去执行,主要功能是创建系统视图。

(12)使用setup_perfviews函数创建性能视图。该函数会读取performance_views.sql文件中的SQL语句,发送到openGauss去执行,主要功能是创建性能视图。

(13)使用setup_conversion函数创建编码转换。该函数会读取conversion_create.sql文件中的SQL语句,发送到openGauss去执行,主要功能是创建编码转换函数。

(14)使用setup_dictionary函数创建词干数据字典。该函数会读取snowball_create.sql文件中的SQL语句,发送到openGauss去执行,主要功能是创建文本搜索函数。

(15)使用setup_privileges函数设置权限。setup_privileges函数通过xstrdu复制SQL常量字符串到一个动态数组内,然后遍历执行指定的SQL语句。

(16)使用load_supported_extension函数加载外表。该函数执行相应扩展组件的CREATE EXTENSION语句。

(17)使用setup_update函数更新系统表。该函数执行语句COPY pg_cast_oid.txt到数据库中,主要功能是创建类型强制转换处理函数。

(18)对template1进行垃圾数据清理,即执行三个SQL语句“ANALYZE;”、“VACUUM FULL;”、“VACUUM FREEZE;”。

(19)创建template0数据库,即复制template1到template0。

(20)创建postgres数据库,即复制template1到postgres。

(21)对template0、template1、postgres进行垃圾数据清理和事务ID冻结。

三、多线程架构

openGauss内核源自PostgreSQL,但在架构上进行了大量改造,其中一个调整就是将多进程架构修改为多线程架构。openGauss在启动后只有一个进程,后台任务都是以一个进程中的线程来运行。对于客户端的新连接,在非线程池模式下也是以启动一个业务线程来处理。在多线程架构下更容易实现多个线程资源的共享,如并行查询、线程池等。

(一)openGauss主要线程

openGauss的后台线程是不对等的,其中Postmaster是主线程,其他线程都是它创建出来的。openGauss后台线程的功能介绍如表1所示。

表1 后台线程的功能 | 后台线程 | 功能介绍 | |--|--| | Postmaster | openGauss数据库主线程。主要有两个功能:一是对连接进行监听,接收新的连接;二是监控所有子线程的状态,并且根据子线程退出状态进行处理,如果线程是FATAL退出,则重新拉起子线程。如果线程是PANIC退出,则进行整个数据库重新初始化。保证数据库的正常运行 | | Startup | 数据库启动线程。数据库启动时Postmaster主线程拉起的第一个子线程,主要完成数据库的日志REDO(重做)操作,进行数据库的恢复。日志REDO操作结束,数据库完成恢复后,如果不是备机,Startup线程就退出了。如果是备机,那么Startup线程一直在运行,REDO备机接收到新的日志| | Bgwriter | 后台数据写线程。周期性的把数据库数据缓冲区的内容同步到磁盘上 | | Checkpointer | 检查点线程。进行检查点操作,完成数据库的周期性检查点和执行检查点命令 | | Walwriter | 后台WAL写线程。主要功能是周期性的把日志缓冲区的内容同步到磁盘上| | Stat | 数据库运行信息统计收集线程。主要功能是收集各个线程操作数据库数据的统计信息,进行汇总后写入数据库的统计文件中,供查询优化分析和垃圾清理使用 | | Sysloger |运行日志写线程。主要功能是把各个线程的运行日志信息写到运行日志文件中 | | Vacuum Launch | 垃圾清理启动线程。主要有两个功能:一是通知Postmaster 启动一个垃圾清理线程;二是平衡各个垃圾清理线程的负载 | |Vacuum worker |垃圾清理线程。主要功能是对openGauss数据库的垃圾数据进行清理 | |Arch | 日志归档线程。主要功能是完成归档操作,把在线日志拷贝到归档目录 | | Postgres | 服务线程。在非线程池模式下,每个客户端连接对应一个服务线程,主要功能是接收客户端的操作请求,代表客户端在服务器完成数据库操作 |

(二)线程间通信

openGauss后台线程之间紧密配合,共同完成了数据库的数据处理任务。这些后台线程之间需要交换信息来协调彼此的行为。openGauss多线程通信使用了原来的PostgreSQL的多进程通信方式。具体如表2所示。

表2 多线程通信方式 | 通信方式 | 说明 | |--|--| |共享内存 |在数据库初始化时,Postmaster线程通过OS(操作系统)申请一块大的共享内存,并且完成初始化工作。openGauss使用到的所有共享内存都是这块内存的一部分。线程之间的一些信息交换就是通过共享内存完成的,共享内存的访问需要加锁保护 | |信号 | 对于一些紧急任务的处理,openGauss使用信号通知作为线程间通信的手段。因为信号可以中断处理线程当前的任务,立即响应信号对应的任务| | TCP | 客户端连接数据库服务器时,一般使用TCP进行通信 | |UNIX域套接字协议 | 如果是本地客户端,即客户端和服务器在同一个机器上,并且是UNIX操作系统,可以使用UNIX域套结字协议建立客户端和服务器进程的通信 | | UDP |UDP(user datagram protocol,用户数据报协议)是不可靠协议,主要用于后台线程向统计线程发送统计信息时使用 | | 管道 |管道可以是双向的,也可以是单向的。在openGauss中,主要使用了单向管道,用在后台线程向运行日志守护线程发送运行日志信息时使用 | | 文件 | 主要用于一些不太重要的场合,并且通信量比较大。在openGauss中,主要用在统计线程汇总统计信息,写到统计文件,供垃圾清理线程和后台服务器线程成本优化使用 | | 全局变量 |一种线程间共享信息的机制。openGauss对原来的PostgreSQL中进程内的全局变量添加THR_LOCAL定义为线程的局部变量,避免线程之间误用 |

(三)线程初始化流程

下面介绍线程的初始化流程。首先介绍openGauss进程的启动。openGauss进程的主函数入口在“\openGauss-server\src\gausskernel\process\main\main.cpp”文件中。在main.cpp文件中,主要完成实例Context(上下文)的初始化、本地化设置,根据main.cpp文件的入口参数调用BootStrapProcessMain函数、GucInfoMain函数、PostgresMain函数和PostmasterMain函数。BootStrapProcessMain函数和PostgresMain函数是在initdb场景下初始化数据库使用的。GucInfoMain函数作用是显示GUC(grand unified configuration,大统一配置,在数据库中指的是运行参数)参数信息。正常的数据库启动会进入PostmasterMain函数。下面对这个函数进行更详细的介绍。

(1) 进行Postmaster的Context初始化,初始化GUC参数,解析命令行参数。

(2) 调用StreamServerPort函数启动服务器监听和双机监听(如果配置了双机),调用reset_shared函数初始化共享内存和LWLock锁,调用gs_signal_monitor_startup函数注册信号处理线程,调用InitPostmasterDeathWatchHandle函数注册Postmaster死亡监控管道,把openGauss进程信息写入pid_file文件中,调用gspqsignal函数注册Postmaster的信号处理函数。

(3) 根据配置初始化黑匣子,调用pgstat_init函数初始化统计信息传递使用的UDP套接字通信,调用InitializeWorkloadManager函数初始化负载管理器,调用InitUniqueSQL函数初始化UniqueSQL,调用SysLogger_Start函数初始化运行日志的通信管道和SYSLOGGER线程,调用load_hba函数加载hba鉴权文件。

(4) 调用initialize_util_thread函数启动STARTUP线程,调用ServerLoop函数进入一个周期循环。在ServerLoop函数的周期循环中,进行客户端请求监听,如果有客户端连接请求,在非线程池模式下,则调用BackendStartup函数创建一个后台线程worker处理客户请求。在线程池模式下,把新的链接加入一个线程池组中。在ServerLoop函数的周期循环中,检查其他线程的运行状态。如果数据库是第一次启动,则调用initialize_util_thread函数启动其他后台线程。如果有后台线程FATAL级别错误退出,则调用initialize_util_thread函数重新启动该线程。如果是PANIC级别错误退出,则整个实例进行重新初始化。

PostmasterMain完成了线程之间的通信初始化和线程的启动,无论是后台线程的启动函数initialize_util_thread,还是工作线程的启动函数initialize_worker_thread,最后都是调用initialize_thread函数完成线程的启动。下面进行initialize_thread函数的介绍。

initialize_thread函数调用gs_thread_create函数创建线程,调用InternalThreadFunc函数处理线程。它的相关代码如下所示:

ThreadId initialize_thread(ThreadArg* thr_argv)
{
    gs_thread_t thread;
    if (0 != gs_thread_create(&thread, InternalThreadFunc, 1, (void*)thr_argv)) {
        gs_thread_release_args_slot(thr_argv);
        return InvalidTid;
    }
    return gs_thread_id(thread);
}

InternalThreadFunc函数的代码如下。该函数根据角色调用GetThreadEntry函数,GetThreadEntry函数直接以角色为下标,返回对应GaussdbThreadEntryGate数组对应的元素。数组的元素是处理具体任务的回调函数指针,指针指向的函数为GaussDbThreadMain。相关代码如下所示:

static void* InternalThreadFunc(void* args)
{
    knl_thread_arg* thr_argv = (knl_thread_arg*)args;
    gs_thread_exit((GetThreadEntry(thr_argv->role))(thr_argv));
    return (void*)NULL;
}
GaussdbThreadEntry GetThreadEntry(knl_thread_role role)
{
    Assert(role > MASTER && role < THREAD_ENTRY_BOUND);
    return GaussdbThreadEntryGate[role];
}
static GaussdbThreadEntry GaussdbThreadEntryGate[] = {GaussDbThreadMain<MASTER>,
    GaussDbThreadMain<WORKER>,
    GaussDbThreadMain<THREADPOOL_WORKER>,
    GaussDbThreadMain<THREADPOOL_LISTENER>,
  ......};

在GaussDbThreadMain函数中,首先初始化线程基本信息,Context和信号处理函数,接着就是根据thread_role角色的不同调用不同角色的处理函数,进入各个线程的main函数,比如GaussDbAuxiliaryThreadMain函数、AutoVacLauncherMain函数、WLMProcessThreadMain函数等。其中GaussDbAuxiliaryThreadMain函数是后台辅助线程处理函数。该函数的处理也类似GaussDbThreadMain函数,根据thread_role角色的不同调用不同角色的处理函数,进入各个线程的main函数,比如StartupProcessMain函数、CheckpointerMain函数、WalWriterMain函数、walrcvWriterMain函数等。

总结上面整个过程,openGauss多线程架构主要包括3个方面:

(1) 多线程之间的通信,由主线程在初始化阶段完成。

(2) 多线程的启动,由主线程创建各个角色线程,调用不同角色的处理函数完成。

(3) 主线程负责监控各个线程的运行,异常退出和重新拉起。

四、线程池技术

openGauss在多线程架构的基础上,实现了线程池。线程池机制实现了会话和处理线程分离,在大并发连接的情况下仍然能够保证系统有很好的SLA响应。另外不同的线程组可绑到不同的NUMA(non-uniform memory access,非一致性内存访问)核上,天然匹配NUMA化的CPU架构,从而提升openGauss的整体性能。

(一)线程池原理

openGauss线程池机制原理如图2所示,图中的主要对象如表3所示。

在这里插入图片描述

图2 线程池机制原理

表3 线程池对象 | 对象 | 说明| |--|--| | Postmaster |主线程。负责监听客户端发出的请求 | | ThreadPoolControler| 线程池总控。负责线程池的初始化和资源管理 | | ThreadSessionControler | 会话生命周期管理| | ThreadPoolGroup | 线程组。可以定义灵活的线程数量和绑核策略 | | ThreadPoolListener | 线程组监听线程。负责事件的分发和管理 | | ThreadPoolWorker | 工作线程 | | session | 客户端连接的一个会话 | | NUMA NODE | NUMA节点。表示一个线程组在NUMA结构下可以映射到一个NUMA节点上|

这些对象相互配合实现了线程池机制,它们的主要交互过程如下。

(1)客户端向数据库发起连接请求,Postmaster线程接收到连接请求并被唤醒。Postmaster线程创建该连接对应的socket,调用ThreadPoolControler函数创建会话(session)数据结构。ThreadPoolControler函数遍历当前所有的Thread Group(线程组),找到当前活跃会话数量最少的Thread Group,并把最新的会话分发给该Thread Group,加入该Thread Group的epoll列表之中。

(2)Thread Group的listener线程负责监听epoll列表中所有的客户连接。

(3)客户端发起任务请求,listener线程被唤醒。listener线程检查当前的Thread Group是否有空闲worker线程;如果有,则把当前会话分配给该worker线程,并唤醒该worker线程;如果没有,则把该会话放在等待队列之中。

(4)worker线程被唤醒后,读取客户端连接上的请求,执行相应请求,并返回请求结果。在一次事务结束(提交、回滚)或者事务超时退出的时候,worker线程的一次任务完成。worker线程将session返回给listener线程,listener线程继续等待该会话的下一次请求。worker线程返还会话后,检查会话等待队列;如果存在等待响应请求的会话,则直接从该队列中取出新的会话并继续工作;如果没有等待响应的session,则将自身标记为free(空闲)状态,等待listener线程唤醒。

(5)客户端断开连接时,worker线程被唤醒,关闭连接,同时清理会话相关结构,释放内存和fd(文件句柄)等资源。

(6)如果worker线程FATAL级别错误退出,退出时worker线程会从worker队列中注销掉。此时listener线程会重新启动一个新的worker线程,直到达到指定数量的worker线程。

(二)线程池实现

线程池功能由GUC参数enable_thread_pool控制,该变量设置为true时才能使用线程池功能。代码主要在“openGauss-server/src/gausskernel/process/threadpool”目录中,下面介绍主要代码实现流程。

Postmaster线程在ServerLoop中判断如果启用了线程池功能,则会调用“ThreadPoolControler::Init”函数进行线程池的初始化。在线程池初始化时,会判断NUMA节点的个数进行NUMA结构处理。相关代码如下:

    if (threadPoolActivated) {
        bool enableNumaDistribute = (g_instance.shmem_cxt.numaNodeNum > 1);
        g_threadPoolControler->Init(enableNumaDistribute);
}

“ThreadPoolControler::Init”函数的主要作用是创建m_sessCtrl成员和m_groups成员对象,根据绑核策略分配线程个数,调用“ThreadPoolGroup::init”函数进行线程组的初始化,调用“ThreadPoolGroup::WaitReady”函数等待各个线程组初始化结束。创建m_scheduler成员对象,并且调用“ThreadPoolScheduler::StartUp”函数启动线程池调度线程。在“ThreadPoolGroup::init”函数中,创建m_listener对象,启动listener线程。为ThreadWorkerSentry函数分配内存,初始化每个worker的互斥量和条件变量。调用“ThreadPoolGroup::AddWorker”函数创建worker对象,启动worker线程。

Postmaster线程在ServerLoop中如果监听到有客户端链接请求,判断启用了线程池功能,则会调用“ThreadPoolControler::DispatchSession”函数进行会话分发。相关代码如下:

if (threadPoolActivated &&!(i < MAXLISTEN && t_thrd.postmaster_cxt.listen_sock_type[i] == HA_LISTEN_SOCKET))
result = g_threadPoolControler->DispatchSession(port);
/*  ThreadPoolControler::DispatchSession的代码实现如下,找到一个会话数最少的线程组,创建会话,把会话添加到线程组的监听线程中  */ 
int ThreadPoolControler::DispatchSession(Port* port)
{
    ThreadPoolGroup* grp = NULL;
    knl_session_context* sc = NULL;
    grp = FindThreadGroupWithLeastSession();
    if (grp == NULL) {
        Assert(false);
        return STATUS_ERROR;
    }
    sc = m_sessCtrl->CreateSession(port);
    if (sc == NULL)
        return STATUS_ERROR;
    grp->GetListener()->AddNewSession(sc);
    return STATUS_OK;
}

listener线程的主函数为“TpoolListenerMain(ThreadPoolListener* listener)”。在该函数中设置线程的名字和信号处理函数,创建epoll等待事件,通知Postmaster线程已经准备好,调用t_pool_listener_loop函数(其实是调用“ThreadPoolListener::WaitTask()”函数进入等待事件状态)。如果有事件到来,调用“ThreadPoolListener::HandleConnEvent”函数找到事件对应的会话。调用“ThreadPoolListener::DispatchSession”函数,如果有空闲的worker线程,通知worker线程进行处理;如果没有空闲的worker线程,则把会话挂到等待队列中。

worker线程的主函数就是正常的SQL处理函数PostgresMain,与非线程模式相比,主要多了3处处理:

(1)worker线程准备就绪的通知。

(2)等待会话通知。

(3)连接退出处理。

worker线程的相关代码如下:

  if (IS_THREAD_POOL_WORKER) {
        u_sess->proc_cxt.MyProcPort->sock = PGINVALID_SOCKET;
        t_thrd.threadpool_cxt.worker->NotifyReady();
}
       if (IS_THREAD_POOL_WORKER) {
            t_thrd.threadpool_cxt.worker->WaitMission();
            Assert(u_sess->status != KNL_SESS_FAKE);
        }
    case 'X':
    case EOF:
                RemoveTempNamespace();
                InitThreadLocalWhenSessionExit();
                if (IS_THREAD_POOL_WORKER) {
                    t_thrd.threadpool_cxt.worker->CleanUpSession(false);
                    break;
                }

“ThreadPoolWorker::WaitMission”函数的主要作用是阻塞所有系统信号,避免系统信号比如SIGHUP等中断当前的处理。清除线程上的会话信息,保证没有上一个会话的内容,等待会话上新的请求,把会话给线程进行处理,允许系统信号中断。

“ThreadPoolWorker::CleanUpSession”函数的主要作用是清除会话,从Listener中去除会话,释放会话资源。

上面介绍了线程池的主要机制,综上所述,线程池主要是解决大并发的用户连接,在一定程度上可以起到流量控制的作用,即使用户的连接数很多,后端也不需要分配太多的线程。线程是OS的一种资源,如果线程太多,OS资源占用很多,并且大量线程的调度和切换会带来昂贵的开销。如果没有线程池,随着连接数的增多,系统的吞吐量会逐渐降低。另外一方面,把线程池划分为线程组,可以很好地匹配NUMA CPU架构的节点,提升多核情况下的访问性能。每个线程组一个监听者,避免了线程池的“惊群效应”。

Gauss松鼠会是汇集数据库爱好者和关注者的大本营, 大家共同学习、探索、分享数据库前沿知识和技术, 互助解决问题,共建数据库技术交流圈。

标签:函数,database,数据库,源码,线程,pg,openGauss,解析
来源: https://www.cnblogs.com/0591jb/p/14815769.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有