深圳幻海软件技术有限公司 欢迎您!

手写 p-limit,40 行代码实现并发控制

2023-02-28

前端代码经常要处理各种异步逻辑。有的是串行的:复制constpromise1=newPromise(function(resolve){//异步逻辑1...resolve();});constpromise2=newPromise(function(resolve){//异步逻辑2...resolv

前端代码经常要处理各种异步逻辑。

有的是串行的:

const promise1 = new Promise(function(resolve) {
    // 异步逻辑 1...
    resolve();
});
const promise2 = new Promise(function(resolve) {
    // 异步逻辑 2...
    resolve();
});

promise1.then(() => promise2);
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
await promise1;
await promise2;
  • 1.
  • 2.

有的是并行的:

await Promise.all([promise1, promise2]);
  • 1.
await Promise.race([promise1, promise2]);
  • 1.

并行的异步逻辑有时还要做并发控制。

并发控制是常见的需求,也是面试常考的面试题。

一般我们会用 p-limit 来做:

import pLimit from 'p-limit';

const limit = pLimit(2);

const input = [
    limit(() => fetchSomething('foo')),
    limit(() => fetchSomething('bar')),
    limit(() => doSomething())
];

const result = await Promise.all(input);
console.log(result);
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.

比如上面这段逻辑,就是几个异步逻辑并行执行,并且最大并发是 2。

那如何实现这样的并发控制呢?

我们自己来写一个:

首先,要传入并发数量,返回一个添加并发任务的函数,我们把它叫做 generator:

const pLimit = (concurrency) => {
    const generator = (fn, ...args) =>
      new Promise((resolve) => {
        //...
      });
      
    return generator;
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.

这里添加的并发任务要进行排队,所以我们准备一个 queue,并记录当前在进行中的异步任务。

const queue = [];
let activeCount = 0;

const generator = (fn, ...args) =>
  new Promise((resolve) => {
    enqueue(fn, resolve, ...args);
  });
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.

添加的异步任务就入队,也就是 enqueue。

enqueue 做的事情就是把一个异步任务添加到 queue 中,并且只要没达到并发上限就再执行一批任务:

const enqueue = (fn, resolve, ...args) => {
  queue.push(run.bind(null, fn, resolve, ...args));

  if (activeCount < concurrency && queue.length > 0) {
    queue.shift()();
  }
};
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.

具体运行的逻辑是这样的:

const run = async (fn, resolve, ...args) => {
  activeCount++;

  const result = (async () => fn(...args))();

  resolve(result);

  try {
    await result;
  } catch {}

  next();
};
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.

计数,运行这个函数,改变最后返回的那个 promise 的状态,然后执行完之后进行下一步处理:

下一步处理自然就是把活跃任务数量减一,然后再跑一个任务:

const next = () => {
  activeCount--;

  if (queue.length > 0) {
    queue.shift()();
  }
};
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.

这样就保证了并发的数量限制。

现在的全部代码如下,只有 40 行代码:

const pLimit = (concurrency) => {  
    const queue = [];
    let activeCount = 0;
  
    const next = () => {
      activeCount--;
  
      if (queue.length > 0) {
        queue.shift()();
      }
    };
  
    const run = async (fn, resolve, ...args) => {
      activeCount++;
  
      const result = (async () => fn(...args))();

      resolve(result);
  
      try {
        await result;
      } catch {}

      next();
    };
  
    const enqueue = (fn, resolve, ...args) => {
      queue.push(run.bind(null, fn, resolve, ...args));
  
      if (activeCount < concurrency && queue.length > 0) {
          queue.shift()();
      }
    };
  
    const generator = (fn, ...args) =>
      new Promise((resolve) => {
        enqueue(fn, resolve, ...args);
      });
  
    return generator;
};
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.

这就已经实现了并发控制。

不信我们跑跑看:

准备这样一段测试代码:

const limit = pLimit(2);
  
function asyncFun(value, delay) {
    return new Promise((resolve) => {
        console.log('start ' + value);
        setTimeout(() => resolve(value), delay);
    });
}

(async function () {
    const arr = [
        limit(() => asyncFun('aaa', 2000)),
        limit(() => asyncFun('bbb', 3000)),
        limit(() => asyncFun('ccc', 1000)),
        limit(() => asyncFun('ccc', 1000)),
        limit(() => asyncFun('ccc', 1000))
    ];
  
    const result = await Promise.all(arr);
    console.log(result);
})();
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.

没啥好说的,就是 setTimeout + promise,设置不同的 delay 时间。

并发数量为 2。

我们试下:

先并发执行前两个任务,2s 的时候一个任务执行完,又执行了一个任务,然后再过一秒,都执行完了,有同时执行了两个任务。

经过测试,我们已经实现了并发控制!

回顾一下我们实现的过程,其实就是一个队列来保存任务,开始的时候一次性执行最大并发数的任务,然后每执行完一个启动一个新的。

还是比较简单的。

上面的 40 行代码是最简化的版本,其实还有一些可以完善的地方,我们继续完善一下。

首先,我们要把并发数暴露出去,还要让开发者可以手动清理任务队列。

我们这样写:

Object.defineProperties(generator, {
  activeCount: {
    get: () => activeCount
  },
  pendingCount: {
    get: () => queue.length
  },
  clearQueue: {
    value: () => {
      queue.length = 0;
    }
  }
});
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.

用 Object.defineProperties 只定义 get 函数,这样 activeCount、pendingCount 就是只能读不能改的。

同时还提供了一个清空任务队列的函数。

然后传入的参数也加个校验逻辑:

if (!((Number.isInteger(concurrency) || concurrency === Infinity) && concurrency > 0)) {
  throw new TypeError('Expected `concurrency` to be a number from 1 and up');
}
  • 1.
  • 2.
  • 3.

不是整数或者小于 0 就报错,当然,Infinity 也是可以的。

最后,其实还有一个特别需要完善的点,就是这里:

const enqueue = (fn, resolve, ...args) => {
  queue.push(run.bind(null, fn, resolve, ...args));

  if (activeCount < concurrency && queue.length > 0) {
    queue.shift()();
  }
};
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.

应该改成这样:

const enqueue = (fn, resolve, ...args) => {
  queue.push(run.bind(null, fn, resolve, ...args));

  (async () => {
    await Promise.resolve();

    if (activeCount < concurrency && queue.length > 0) {
      queue.shift()();
    }
  })();
};
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.

因为 activeCount-- 的逻辑是在执行完任务之后才执行的,万一任务还没执行完,这时候 activeCount 就是不准的。

所以为了保证并发数量能控制准确,要等全部的微任务执行完再拿 activeCount。

怎么在全部的微任务执行完再执行逻辑呢?

加一个新的微任务不就行了?

所以有这样的 await Promise.resolve(); 的逻辑。

这样,就是一个完善的并发控制逻辑了,p-limit 也是这么实现的。

感兴趣的同学可以自己试一下:

const pLimit = (concurrency) => {
    if (!((Number.isInteger(concurrency) || concurrency === Infinity) && concurrency > 0)) {
      throw new TypeError('Expected `concurrency` to be a number from 1 and up');
    }
  
    const queue = [];
    let activeCount = 0;
  
    const next = () => {
      activeCount--;
  
      if (queue.length > 0) {
        queue.shift()();
      }
    };
  
    const run = async (fn, resolve, ...args) => {
      activeCount++;
  
      const result = (async () => fn(...args))();

      resolve(result);
  
      try {
        await result;
      } catch {}

      next();
    };
  
    const enqueue = (fn, resolve, ...args) => {
      queue.push(run.bind(null, fn, resolve, ...args));
  
      (async () => {
        await Promise.resolve();
  
        if (activeCount < concurrency && queue.length > 0) {
          queue.shift()();
        }
      })();
    };
  
    const generator = (fn, ...args) =>
      new Promise((resolve) => {
        enqueue(fn, resolve, ...args);
      });
  
    Object.defineProperties(generator, {
      activeCount: {
        get: () => activeCount
      },
      pendingCount: {
        get: () => queue.length
      },
      clearQueue: {
        value: () => {
          queue.length = 0;
        }
      }
    });
  
    return generator;
  };
  
const limit = pLimit(2);
  
function asyncFun(value, delay) {
    return new Promise((resolve) => {
        console.log('start ' + value);
        setTimeout(() => resolve(value), delay);
    });
}

(async function () {
    const arr = [
        limit(() => asyncFun('aaa', 2000)),
        limit(() => asyncFun('bbb', 3000)),
        limit(() => asyncFun('ccc', 1000)),
        limit(() => asyncFun('ccc', 1000)),
        limit(() => asyncFun('ccc', 1000))
    ];
  
    const result = await Promise.all(arr);
    console.log(result);
})();
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.
  • 62.
  • 63.
  • 64.
  • 65.
  • 66.
  • 67.
  • 68.
  • 69.
  • 70.
  • 71.
  • 72.
  • 73.
  • 74.
  • 75.
  • 76.
  • 77.
  • 78.
  • 79.
  • 80.
  • 81.
  • 82.
  • 83.
  • 84.
  • 85.

总结

js 代码经常要处理异步逻辑的串行、并行,还可能要做并发控制,这也是面试常考的点。

实现并发控制的核心就是通过一个队列保存所有的任务,然后最开始批量执行一批任务到最大并发数,然后每执行完一个任务就再执行一个新的。

其中要注意的是为了保证获取的任务数量是准确的,要在所有微任务执行完之后再获取数量。

实现并发控制只要 40 多行代码,其实这就是 p-limit 的源码了,大家感兴趣也可以自己实现一下。