ICode9

精准搜索请尝试: 精确搜索
首页 > 数据库> 文章详细

postgresql/lightdb的notify机制--可靠缓存、MQ消息事务的救星

2022-09-10 15:33:53  阅读:255  来源: 互联网

标签:postgresql lightdb -- void queue notify Notify commit LISTEN


http://www.light-pg.com/docs/lightdb/13.3-22.2/sql-notify.html

http://www.light-pg.com/docs/lightdb/13.3-22.2/sql-listen.html

https://wiki.postgresql.org/wiki/PgNotificationHelper

https://jdbc.postgresql.org/documentation/head/listennotify.html

https://tapoueh.org/blog/2018/07/postgresql-listen-notify/

在写入clog/xact前那一刻,内核会将通知加入队列。如下:

xact.c

static void
CommitTransaction(void)
{
......
    /*
     * Insert notifications sent by NOTIFY commands into the queue.  This
     * should be late in the pre-commit sequence to minimize time spent
     * holding the notify-insertion lock.  However, this could result in
     * creating a snapshot, so we must do it before serializable cleanup.
     */
    PreCommit_Notify();
......

asyc.c负责notify相关的实现:

/*
 * PreCommit_Notify
 *
 *        This is called at transaction commit, before actually committing to
 *        clog.
 *
 *        If there are pending LISTEN actions, make sure we are listed in the
 *        shared-memory listener array.  This must happen before commit to
 *        ensure we don't miss any notifies from transactions that commit
 *        just after ours.
 *
 *        If there are outbound notify requests in the pendingNotifies list,
 *        add them to the global queue.  We do that before commit so that
 *        we can still throw error if we run out of queue space.
 */
void
PreCommit_Notify(void)
{
    ListCell   *p;

    if (!pendingActions && !pendingNotifies)
        return;                    /* no relevant statements in this xact */

    if (Trace_notify)
        elog(DEBUG1, "PreCommit_Notify");

    /* Preflight for any pending listen/unlisten actions */
    if (pendingActions != NULL)
    {
        foreach(p, pendingActions->actions)
        {
            ListenAction *actrec = (ListenAction *) lfirst(p);

            switch (actrec->action)
            {
                case LISTEN_LISTEN:
                    Exec_ListenPreCommit();
                    break;
                case LISTEN_UNLISTEN:
                    /* there is no Exec_UnlistenPreCommit() */
                    break;
                case LISTEN_UNLISTEN_ALL:
                    /* there is no Exec_UnlistenAllPreCommit() */
                    break;
            }
        }
    }

    /* Queue any pending notifies (must happen after the above) */
    if (pendingNotifies)
    {
        ListCell   *nextNotify;

        /*
         * Make sure that we have an XID assigned to the current transaction.
         * GetCurrentTransactionId is cheap if we already have an XID, but not
         * so cheap if we don't, and we'd prefer not to do that work while
         * holding NotifyQueueLock.
         */
        (void) GetCurrentTransactionId();

        /*
         * Serialize writers by acquiring a special lock that we hold till
         * after commit.  This ensures that queue entries appear in commit
         * order, and in particular that there are never uncommitted queue
         * entries ahead of committed ones, so an uncommitted transaction
         * can't block delivery of deliverable notifications.
         *
         * We use a heavyweight lock so that it'll automatically be released
         * after either commit or abort.  This also allows deadlocks to be
         * detected, though really a deadlock shouldn't be possible here.
         *
         * The lock is on "database 0", which is pretty ugly but it doesn't
         * seem worth inventing a special locktag category just for this.
         * (Historical note: before PG 9.0, a similar lock on "database 0" was
         * used by the flatfiles mechanism.)
         */
        LockSharedObject(DatabaseRelationId, InvalidOid, 0,
                         AccessExclusiveLock);

        /* Now push the notifications into the queue */
        backendHasSentNotifications = true;

        nextNotify = list_head(pendingNotifies->events);
        while (nextNotify != NULL)
        {
            /*
             * Add the pending notifications to the queue.  We acquire and
             * release NotifyQueueLock once per page, which might be overkill
             * but it does allow readers to get in while we're doing this.
             *
             * A full queue is very uncommon and should really not happen,
             * given that we have so much space available in the SLRU pages.
             * Nevertheless we need to deal with this possibility. Note that
             * when we get here we are in the process of committing our
             * transaction, but we have not yet committed to clog, so at this
             * point in time we can still roll the transaction back.
             */
            LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
            asyncQueueFillWarning();
            if (asyncQueueIsFull())
                ereport(ERROR,
                        (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
                         errmsg("too many notifications in the NOTIFY queue")));
            nextNotify = asyncQueueAddEntries(nextNotify);
            LWLockRelease(NotifyQueueLock);
        }
    }
}

调用RecordTransactionCommit()(在此之前,WAL记录已经刷新到pg_wal中)更新事务的提交状态到pg_xact后,会调用AtCommit_Notify发送通知。如下:

    smgrDoPendingDeletes(true);

    AtCommit_Notify();
    AtEOXact_GUC(true, 1);
    AtEOXact_SPI(true);

async.c中:

/*
 * AtCommit_Notify
 *
 *        This is called at transaction commit, after committing to clog.
 *
 *        Update listenChannels and clear transaction-local state.
 */
void
AtCommit_Notify(void)
{
    ListCell   *p;

    /*
     * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
     * return as soon as possible
     */
    if (!pendingActions && !pendingNotifies)
        return;

    if (Trace_notify)
        elog(DEBUG1, "AtCommit_Notify");

    /* Perform any pending listen/unlisten actions */
    if (pendingActions != NULL)
    {
        foreach(p, pendingActions->actions)
        {
            ListenAction *actrec = (ListenAction *) lfirst(p);

            switch (actrec->action)
            {
                case LISTEN_LISTEN:
                    Exec_ListenCommit(actrec->channel);
                    break;
                case LISTEN_UNLISTEN:
                    Exec_UnlistenCommit(actrec->channel);
                    break;
                case LISTEN_UNLISTEN_ALL:
                    Exec_UnlistenAllCommit();
                    break;
            }
        }
    }

    /* If no longer listening to anything, get out of listener array */
    if (amRegisteredListener && listenChannels == NIL)
        asyncQueueUnregister();

    /* And clean up */
    ClearPendingActionsAndNotifies();
}

 

标签:postgresql,lightdb,--,void,queue,notify,Notify,commit,LISTEN
来源: https://www.cnblogs.com/zhjh256/p/15485431.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有