ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

如何利用 JavaScript 实现并发控制

2021-07-14 21:34:50  阅读:192  来源: 互联网

标签:resolve return args 利用 shift JavaScript 并发 const fn


一、前言

在开发过程中,有时会遇到需要控制任务并发执行数量的需求。

例如一个爬虫程序,可以通过限制其并发任务数量来降低请求频率,从而避免由于请求过于频繁被封禁问题的发生。

接下来,本文介绍如何实现一个并发控制器。

 

二、示例

  const task = timeout => new Promise((resolve) => setTimeout(() => {
    resolve(timeout);
  }, timeout))

  const taskList = [1000, 3000, 200, 1300, 800, 2000];

  async function startNoConcurrentControl() {
    console.time(NO_CONCURRENT_CONTROL_LOG);
    await Promise.all(taskList.map(item => task(item)));
    console.timeEnd(NO_CONCURRENT_CONTROL_LOG);
  }

  startNoConcurrentControl();

上述示例代码利用 Promise.all 方法模拟6个任务并发执行的场景,执行完所有任务的总耗时为 3000 毫秒。

下面会采用该示例来验证实现方法的正确性。

 

三、实现

由于任务并发执行的数量是有限的,那么就需要一种数据结构来管理不断产生的任务。

队列的「先进先出」特性可以保证任务并发执行的顺序,在 JavaScript 中可以通过「数组来模拟队列」

  class Queue {
    constructor() {
      this._queue = [];
    }

    push(value) {
      return this._queue.push(value);
    }

    shift() {
      return this._queue.shift();
    }

    isEmpty() {
      return this._queue.length === 0;
    }
  }

对于每一个任务,需要管理其执行函数和参数:

  class DelayedTask {
    constructor(resolve, fn, args) {
      this.resolve = resolve;
      this.fn = fn;
      this.args = args;
    }
  }

接下来实现核心的 TaskPool 类,该类主要用来控制任务的执行:

  class TaskPool {
    constructor(size) {
      this.size = size;
      this.queue = new Queue();
    }

    addTask(fn, args) {
      return new Promise((resolve) => {
        this.queue.push(new DelayedTask(resolve, fn, args));
        if (this.size) {
          this.size--;
          const { resolve: taskResole, fn, args } = this.queue.shift();
          taskResole(this.runTask(fn, args));
        }
      })
    }

    pullTask() {
      if (this.queue.isEmpty()) {
        return;
      }

      if (this.size === 0) {
        return;
      }

      this.size++;
      const { resolve, fn, args } = this.queue.shift();
      resolve(this.runTask(fn, args));
    }

    runTask(fn, args) {
      const result = Promise.resolve(fn(...args));

      result.then(() => {
        this.size--;
        this.pullTask();
      }).catch(() => {
        this.size--;
        this.pullTask();
      })

      return result;
    }
  }

TaskPool 包含三个关键方法:

 

addTask: 将新的任务放入队列当中,并触发任务池状态检测,如果当前任务池非满载状态,则从队列中取出任务放入任务池中执行。

 

 

runTask: 执行当前任务,任务执行完成之后,更新任务池状态,此时触发主动拉取新任务的机制。

 

 

pullTask: 如果当前队列不为空,且任务池不满载,则主动取出队列中的任务执行。

 

接下来,将前面示例的并发数控制为2个:

  const cc = new ConcurrentControl(2);

  async function startConcurrentControl() {
    console.time(CONCURRENT_CONTROL_LOG);
    await Promise.all(taskList.map(item => cc.addTask(task, [item])))
    console.timeEnd(CONCURRENT_CONTROL_LOG);
  }

  startConcurrentControl();

执行流程如下:

最终执行任务的总耗时为 5000 毫秒。

http://www.ssnd.com.cn 化妆品OEM代加工

四、高阶函数优化参数传递

  await Promise.all(taskList.map(item => cc.addTask(task, [item])))

手动传递每个任务的参数的方式显得非常繁琐,这里可以通过「高阶函数实现参数的自动透传」

  addTask(fn) {
    return (...args) => {
      return new Promise((resolve) => {
        this.queue.push(new DelayedTask(resolve, fn, args));

        if (this.size) {
          this.size--;
          const { resolve: taskResole, fn: taskFn, args: taskArgs } = this.queue.shift();
          taskResole(this.runTask(taskFn, taskArgs));
        }
      })
    }
  }

改造之后的代码显得简洁了很多:

  await Promise.all(taskList.map(cc.addTask(task)))
 

五、优化出队操作

数组一般都是基于一块「连续内存」来存储,当调用数组的 shift 方法时,首先是删除头部元素(时间复杂度 O(1)),然后需要将未删除元素左移一位(时间复杂度 O(n)),所以 shift 操作的时间复杂度为 O(n)。

由于 JavaScript 语言的特性,V8 在实现 jsArray 的时候给出了一种空间和时间权衡的解决方案,在不同的场景下,jsArray 会在 FixedArray 和 HashTable 两种模式间切换。

在 hashTable 模式下,shift 操作省去了左移的时间复杂度,其时间复杂度可以降低为 O(1),即使如此,shift 仍然是一个耗时的操作。

在数组元素比较多且需要频繁执行 shift 操作的场景下,可以通过 「reverse + pop」 的方式优化。

  const Benchmark = require('benchmark');
  const suite = new Benchmark.Suite;

  suite.add('shift', function() {
    let count = 10;
    const arr = generateArray(count);
    while (count--) {
      arr.shift();
    }
  })
  .add('reverse + pop', function() {
    let count = 10;
    const arr = generateArray(count);
    arr.reverse();
    while (count--) {
      arr.pop();
    }
  })
  .on('cycle', function(event) {
    console.log(String(event.target));
  })
  .on('complete', function() {
    console.log('Fastest is ' + this.filter('fastest').map('name'));
    console.log('\n')
  })
  .run({
    async: true
  })

通过 benchmark.js 跑出的基准测试数据,可以很容易地看出哪种方式的效率更高:

回顾之前 Queue 类的实现,由于只有一个数组来存储任务,直接使用 reverse + pop 的方式,必然会影响任务执行的次序。

这里就需要引入双数组的设计,一个数组负责入队操作,一个数组负责出队操作。

  class HighPerformanceQueue {
    constructor() {
      this.q1 = []; // 用于 push 数据
      this.q2 = []; // 用于 shift 数据
    }

    push(value) {
      return this.q1.push(value);
    }

    shift() {
      let q2 = this.q2;
      if (q2.length === 0) {
        const q1 = this.q1;
        if (q1.length === 0) {
          return;
        }
        this.q1 = q2; // 感谢 @shaonialife 同学指正
        q2 = this.q2 = q1.reverse();
      }

      return q2.pop();
    }

    isEmpty() {
      if (this.q1.length === 0 && this.q2.length === 0) {
        return true;
      }
      return false;
    }
  }

最后通过基准测试来验证优化的效果:

 

标签:resolve,return,args,利用,shift,JavaScript,并发,const,fn
来源: https://www.cnblogs.com/Qooo/p/15013018.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有