ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

记一次openresty协程返回结果错乱排查

2022-07-20 22:34:48  阅读:257  来源: 互联网

标签:co thread ctx lua openresty 错乱 协程 ngx wait


记一次openresty协程返回结果错乱排查

现场

在我普通的日常开发中,我写了一段普通查redis的代码,上线以后马上有报错,nginx errorlog如下

ERROR : "xxx/redis.lua:175: bad argument #1 to 'byte' (string expected, got boolean)" "POST xxx HTTP/1.1"

然后马上看了下产生报错的代码

local function _read_reply(self, sock)
    local line, err = sock:receive()
    if not line then
        if err == "timeout" and not self.subscribed then
            sock:close()
        end
        return nil, err
    end

    local prefix = byte(line)

redis.lua是agentzh写的开源库,看到这个很难理解,因为 sock:receive() 即ngx.socket.tcp.receive() 方法返回的应该是 line 和err都应该是字符串,line怎么会是bool呢

排查经过

然后打印了一下line 和 err

redis debug: line:true err:{"warning_count":0,"affected_rows":1,"insert_id":331954435,"server_status":2}
  • line 确实是个 bool,而err变成了一个table,熟悉自己的业务,这个err一看到我就知道是我们mysql db中间件返回的。看到这我也很费解,cosocket(openresty的socket)怎么会返回我们db的数据。。。。

  • 之后就是慢慢排查代码,后来发现上面有写了一个插入db的协程

    -- update db
    local sql = "INSERT INTO xxxxxxxx;"
    --db_api("write", sql)  --db_api()是我们自己的db中间件
    local co, err = ngx.thread.spawn(db_api, sql)
    if co then
        local ok, ret = ngx.thread.wait(co)
    end

然后对了一下上面这个数据库的主键,果然是到了3亿多,那无疑就是redis receive 的时候返回了db中间件的结果,line=true,err=table 就和ngx.thread.wait函数的返回对上了

  • openresty 的cosocket实现也是用了协程,而我们也在这个代码里使用了spawn的一些协程,所以盲猜可能是openresty协程有某些bug
  • 但这两个返回值串了的问题在测试环境复现的频率非常低,在线上却是必现,一头包,之后在往上翻代码看到了另外一段协程操作
    -- spawn all threads
    for i = 1, #req_list do
        ...

        local thread, err = ngx.thread.spawn(icapture, uri, options, origin)
        if thread then
            threads[i] = thread
        
        ...
        end
    end

    local res, origin =  ngx.thread.wait(threads)

感觉问题有可能出现在这里,在官方文档中wait函数在等待多个协程时,只有有任意一个终止就会返回-any。

结果反复排查 确实是wait函数的问题,写了个复现case

local thread2,err  = ngx.thread.spawn(function()
    ngx.sleep(2)
    return 2
end)
local thread3,err  = ngx.thread.spawn(function()
    ngx.sleep(1)
    return 3
end)
local ok, ret = ngx.thread.wait(thread2, thread3)

local thread1,err  = ngx.thread.spawn(function()
    ngx.sleep(3)
    return 1
end)
local ok, ret = ngx.thread.wait(thread1)

ngx.say('expect 1,got '..ret)

输出确实是

expect 1,got 2

从表现上来说是在使用wait()等待过多个子协程之后,openresty并没有抛弃同组其他的子协程的返回值,在发起新的spawn-wait thread1过程中就会返回了 thread2的结果。

  • 至于redis 为什么会拿到db协程的返回,也不难理解,就是cosocket也使用了协程

源码分析

  • 看到了具体表现,猜了部分原因,来源码找罪魁祸首

首先是wait()的实现

static int
ngx_http_lua_uthread_wait(lua_State *L)
{
    int                          i, nargs, nrets;
    lua_State                   *sub_co;
    ngx_http_request_t          *r;
    ngx_http_lua_ctx_t          *ctx;
    ngx_http_lua_co_ctx_t       *coctx, *sub_coctx;

    r = ngx_http_lua_get_req(L);
    if (r == NULL) {
        return luaL_error(L, "no request found");
    }

    ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module);
    if (ctx == NULL) {
        return luaL_error(L, "no request ctx found");
    }

    ngx_http_lua_check_context(L, ctx, NGX_HTTP_LUA_CONTEXT_YIELDABLE);

    coctx = ctx->cur_co_ctx;

    nargs = lua_gettop(L);
    if (nargs == 0) {
        return luaL_error(L, "at least one coroutine should be specified");
    }

    for (i = 1; i <= nargs; i++) {
        sub_co = lua_tothread(L, i);

        ...
        luaL_argcheck(L, sub_co, i, "lua thread expected");

        sub_coctx = ngx_http_lua_get_co_ctx(sub_co, ctx);

        switch (sub_coctx->co_status) {
        case NGX_HTTP_LUA_CO_ZOMBIE:

            ngx_http_lua_probe_info("found zombie child");

            nrets = lua_gettop(sub_coctx->co);

            dd("child retval count: %d, %s: %s", (int) nrets,
               luaL_typename(sub_coctx->co, -1),
               lua_tostring(sub_coctx->co, -1));

            if (nrets) {
                lua_xmove(sub_coctx->co, L, nrets);
            }


            return nrets;

        case NGX_HTTP_LUA_CO_DEAD:
            dd("uthread already waited: %p (parent %p)", sub_coctx,
               coctx);

            if (i < nargs) {
                /* just ignore it if it is not the last one */
                continue;
            }

            /* being the last one */
            lua_pushnil(L);
            lua_pushliteral(L, "already waited or killed");
            return 2;

        default:
            dd("uthread %p still alive, status: %d, parent %p", sub_coctx,
               sub_coctx->co_status, coctx);
            break;
        }

        ngx_http_lua_probe_user_thread_wait(L, sub_coctx->co);
        sub_coctx->waited_by_parent = 1;
    }

    return lua_yield(L, 0);
}

ngx.thread.wait 在接受多个子协程作为参数的时候,
主要流程就是循环传进来的子协程,如果是LUA_CO_ZOMBIE ,说明这个子协程执行完成,会直接返回结果
对没有运行完的子协程标记 waited_by_parent = 1,标记已经被等待过,
最后yield,让出cpu,等下次唤醒

协程运行函数 ngx_http_lua_run_thread()

ngx_http_lua_run_thread(lua_State *L, ngx_http_request_t *r,
ngx_http_lua_ctx_t *ctx, volatile int nrets) 
        
        ... 
        if (ctx->cur_co_ctx->is_uthread) {
            /* being a user thread */

            lua_settop(L, 0);

            parent_coctx = ctx->cur_co_ctx->parent_co_ctx;

            if (ngx_http_lua_coroutine_alive(parent_coctx)) {
                if (ctx->cur_co_ctx->waited_by_parent) {
                    ngx_http_lua_probe_info("parent already waiting");
                    ctx->cur_co_ctx->waited_by_parent = 0;
                    success = 1;
                    goto user_co_done;
                }

                ngx_http_lua_probe_info("parent still alive");

                if (ngx_http_lua_post_zombie_thread(r, parent_coctx,
                                                    ctx->cur_co_ctx)
                    != NGX_OK)
                {
                    return NGX_ERROR;
                }

                lua_pushboolean(ctx->cur_co_ctx->co, 1);
                lua_insert(ctx->cur_co_ctx->co, 1);

                ctx->cur_co_ctx->co_status = NGX_HTTP_LUA_CO_ZOMBIE;
                ctx->cur_co_ctx = NULL;
                return NGX_AGAIN;
            }

            ngx_http_lua_del_thread(r, L, ctx, ctx->cur_co_ctx);
            ctx->uthreads--;

            if (ctx->uthreads == 0) {
                if (ngx_http_lua_entry_thread_alive(ctx)) {
                    ctx->cur_co_ctx = NULL;
                    return NGX_AGAIN;
                }

                /* all threads terminated already */
                goto done;
            }

            /* some other user threads still running */
            ctx->cur_co_ctx = NULL;
            return NGX_AGAIN;
        }
        ....

我们可以看到主体之后,is_uthread 先判断是不是子协程,ngx_http_lua_coroutine_alive() 判断父协程是否存活,然后再判断waited_by_parent 是否有被父协程wait过,如果wait过就执行 user_co_done 做结束的数据交换工作

user_co_done:

                nrets = lua_gettop(ctx->cur_co_ctx->co);

                next_coctx = ctx->cur_co_ctx->parent_co_ctx;

                if (next_coctx == NULL) {
                    /* being a light thread */
                    goto no_parent;
                }

                next_co = next_coctx->co;

                if (nrets) {
                    lua_xmove(ctx->cur_co_ctx->co, next_co, nrets);
                }

                if (ctx->cur_co_ctx->is_uthread) {
                    ngx_http_lua_del_thread(r, L, ctx, ctx->cur_co_ctx);
                    ctx->uthreads--;
                }

                if (!ctx->cur_co_ctx->is_wrap) {
                    /*
                     * ended successfully, coroutine.resume returns true plus
                     * any return values
                     */
                    lua_pushboolean(next_co, success);
                    lua_insert(next_co, 1);
                    nrets++;
                }

                ctx->cur_co_ctx = next_coctx;

                ngx_http_lua_probe_info("set parent running");

                next_coctx->co_status = NGX_HTTP_LUA_CO_RUNNING;

                ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
                               "lua coroutine: lua user thread ended normally");

                continue;

lua_pushboolean() 额外添加 wait()的第一个参数 true,之后结果放入父协程ctx,循环结束resume父协程

源码分析结果

  • 很明显问题就出在wait() 函数中的waited_by_parent,在wait多个子协程时,会将循环到第一个运行结束之前的子协程都标记为 waited_by_parent=1,而在子协程运行过程中,只要waited_by_parent=1 就会将 ture 和其他返回值入栈,所以表现就是别的协程函数拿到的返回是刚刚运行结束的子协程的返回

怎么处理

这个问题发现在github issue也发现了有人提过
https://github.com/openresty/lua-nginx-module/issues/1439
不光 sock.receive,sleep也是协程,所以也拿到了完成的子协程的返回值

不过agentzh不觉得这是个bug,是feature。作者不打算调整那我们只能在使用上注意

使用注意

  • 在使用 ngx.thread.wait()的时候,要避免是wait多个子协程。像示例,用for来wait所有子协程(wait all)
 
  local capture = ngx.location.capture
  local spawn = ngx.thread.spawn
  local wait = ngx.thread.wait
  local say = ngx.say
 
  local function fetch(uri)
      return capture(uri)
  end
 
  local threads = {
      spawn(fetch, "/foo"),
      spawn(fetch, "/bar"),
      spawn(fetch, "/baz")
  }
 
  for i = 1, #threads do
      local ok, res = wait(threads[i])
      if not ok then
          say(i, ": failed to run: ", res)
      else
          say(i, ": status: ", res.status)
          say(i, ": body: ", res.body)
      end
  end

如果场景确实就是需要wait any的情况下(反正我是不会这样用了),在获取到一个结果后,可能需要:

  1. 确保所有其他子协程都执行完毕,比如使用ngx.thread.kill、ngx.sleep
  2. 然后for + ngx.sleep(0.01) 把ctx里脏数据清空(n = wait协程的数量,上文也说过ngx.sleep 也是协程,所以sleep的返回值就是之前其他子协程的返回值)

标签:co,thread,ctx,lua,openresty,错乱,协程,ngx,wait
来源: https://www.cnblogs.com/wenzaicaicai/p/16500102.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有