ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

如何实现一个简单的并发控制?

2021-10-18 17:02:42  阅读:205  来源: 互联网

标签:控制 const executing asyncPool ret 并发 Promise 简单 array


并发控制的概念相信大家都非常熟悉,比如浏览器请求的并发控制等。今天,我们结合 async-pool 这个开源工具来看看如何实现一个简单的并发控制。

async-pool 的代码分为 es6 和 es7 两个版本,都非常简单,我们主要基于 es6 版本进行说明。

在去除参数校验等逻辑以后,核心代码如下,非常短小精悍:

function asyncPool(poolLimit, array, iteratorFn) {
  let i = 0;
  const ret = [];
  const executing = [];
  const enqueue = function() {
    if (i === array.length) {
      return Promise.resolve();
    }
    const item = array[i++];
    const p = Promise.resolve().then(() => iteratorFn(item, array));
    ret.push(p);

    let r = Promise.resolve();

    if (poolLimit <= array.length) {
      const e = p.then(() => executing.splice(executing.indexOf(e), 1));
      executing.push(e);
      if (executing.length >= poolLimit) {
        r = Promise.race(executing);
      }
    }

    return r.then(() => enqueue());
  };
  return enqueue().then(() => Promise.all(ret));
}

asyncPool 支持三个参数,第一个是并发数量,第二个是一组请求输入,第三个是返回 promise 的迭代函数。我们举一个例子来进行说明。

假设我们现在有 500 个请求需要发送,并发数量控制是 50。那么我们可以这样使用 asyncPool

asyncPool(50, [/* 500 个请求的参数数据 */], () => {/* 发起请求的函数 */})

我们现在来详细说明 asyncPool 的工作原理。

首先,asyncPool 中初始化了两个数组,ret 保存返回结果,其顺序要与输入顺序一致,executing 用于记录当前正在执行的请求。

asyncPool 中创建了一个 enqueue 函数,负责具体的并发控制逻辑。

enqueue 函数中,通过变量 i 来逐个获取请求输入参数,调用迭代函数发起请求,然后将返回的 promise 保存在 ret 中。

const item = array[i++];
const p = Promise.resolve().then(() => iteratorFn(item, array));
ret.push(p);

之后就是并发数量控制的核心逻辑:

let r = Promise.resolve();

if (poolLimit <= array.length) {
    const e = p.then(() => executing.splice(executing.indexOf(e), 1));
    executing.push(e);
    if (executing.length >= poolLimit) {
        r = Promise.race(executing);
    }
}

return r.then(() => enqueue());

如果并发数量限制大于要发起的请求数量,则无需通过 executing 数组来记录正在执行的请求,直接循环发起请求即可。

如果并发数量限制小于要发起的请求数量,则首先通过之前调用迭代函数返回的 promise 生成一个新的 promise,放入 executing 中。在这个新的 promise 完成时,将其从 executing 中删除。

如果 executing 数组长度大于并发数量控制,则使用 Promise.race(executing) 获取最先返回的 promsie,并通过它进行下一次迭代。

通过变量 r 我们可以看到,在整个循环过程中,enqueue 函数会形成一个 promise 链,在最后一个 promise 返回之后,asyncPool 通过 Promise.all 将所有的结果返回。

return enqueue().then(() => Promise.all(ret));

至此,async-pool 的核心逻辑我们就分析完了。上面的分析过程是基于 es6 版本的代码,es7 版本更加简洁,如下,看官们可以自行分析:

async function asyncPool(poolLimit, array, iteratorFn) {
  const ret = [];
  const executing = [];
  for (const item of array) {
    const p = Promise.resolve().then(() => iteratorFn(item, array));
    ret.push(p);

    if (poolLimit <= array.length) {
      const e = p.then(() => executing.splice(executing.indexOf(e), 1));
      executing.push(e);
      if (executing.length >= poolLimit) {
        await Promise.race(executing);
      }
    }
  }
  return Promise.all(ret);
}

我们知道,不管是 Promise.race 还是 Promise.all,只要有一个 promise 达到 Fufilled 或者 Rejected 状态,整个就会返回。这在接口请求的的场景中是不合适的。我们应该如何改造呢?

其实也非常简单,只要在迭代函数的调用处做一些特殊处理即可。

iteratorFn(item, array).then(resp => resp).catch(error => error);

常见面试知识点、技术解决方案、教程,都可以扫码关注公众号“众里千寻”获取,或者来这里 https://everfind.github.io

众里千寻

标签:控制,const,executing,asyncPool,ret,并发,Promise,简单,array
来源: https://www.cnblogs.com/everfind/p/async-pool.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有