前端代码经常要处理各种异步逻辑。
有的是串行的:
复制 const promise1 = new Promise( function( resolve) {
// 异步逻辑 1 ...
resolve( ) ;
} ) ;
const promise2 = new Promise( function( resolve) {
// 异步逻辑 2 ...
resolve( ) ;
} ) ;
promise1.then ( ( ) => promise2) ;
复制 await promise1;
await promise2;
有的是并行的:
复制 await Promise.all ( [ promise1, promise2] ) ;
复制 await Promise.race ( [ promise1, promise2] ) ;
并行的异步逻辑有时还要做并发控制。
并发控制是常见的需求,也是面试常考的面试题。
一般我们会用 p-limit 来做:
复制 import pLimit from 'p-limit' ;
const limit = pLimit( 2 ) ;
const input = [
limit ( ( ) => fetchSomething( 'foo' ) ) ,
limit ( ( ) => fetchSomething( 'bar' ) ) ,
limit ( ( ) => doSomething( ) )
] ;
const result = await Promise.all ( input) ;
console.log ( result) ;
1. 2. 3. 4. 5. 6. 7. 8. 9. 10. 11. 12.
比如上面这段逻辑,就是几个异步逻辑并行执行,并且最大并发是 2。
那如何实现这样的并发控制呢?
我们自己来写一个:
首先,要传入并发数量,返回一个添加并发任务的函数,我们把它叫做 generator:
复制 const pLimit = ( concurrency) => {
const generator = ( fn, ...args) =>
new Promise( ( resolve) => {
// ...
} ) ;
return generator;
}
这里添加的并发任务要进行排队,所以我们准备一个 queue,并记录当前在进行中的异步任务。
复制 const queue = [ ] ;
let activeCount = 0 ;
const generator = ( fn, ...args) =>
new Promise( ( resolve) => {
enqueue( fn, resolve, ...args) ;
} ) ;
添加的异步任务就入队,也就是 enqueue。
enqueue 做的事情就是把一个异步任务添加到 queue 中,并且只要没达到并发上限就再执行一批任务:
复制 const enqueue = ( fn, resolve, ...args) => {
queue.push ( run.bind ( null , fn, resolve, ...args) ) ;
if ( activeCount < concurrency && queue.length > 0 ) {
queue.shift ( ) ( ) ;
}
} ;
具体运行的逻辑是这样的:
复制 const run = async ( fn, resolve, ...args) => {
activeCount++ ;
const result = ( async ( ) => fn( ...args) ) ( ) ;
resolve( result) ;
try {
await result;
} catch { }
next( ) ;
} ;
1. 2. 3. 4. 5. 6. 7. 8. 9. 10. 11. 12. 13.
计数,运行这个函数,改变最后返回的那个 promise 的状态,然后执行完之后进行下一步处理:
下一步处理自然就是把活跃任务数量减一,然后再跑一个任务:
复制 const next = ( ) => {
activeCount--;
if ( queue.length > 0 ) {
queue.shift ( ) ( ) ;
}
} ;
这样就保证了并发的数量限制。
现在的全部代码如下,只有 40 行代码:
复制 const pLimit = ( concurrency) => {
const queue = [ ] ;
let activeCount = 0 ;
const next = ( ) => {
activeCount--;
if ( queue.length > 0 ) {
queue.shift ( ) ( ) ;
}
} ;
const run = async ( fn, resolve, ...args) => {
activeCount++ ;
const result = ( async ( ) => fn( ...args) ) ( ) ;
resolve( result) ;
try {
await result;
} catch { }
next( ) ;
} ;
const enqueue = ( fn, resolve, ...args) => {
queue.push ( run.bind ( null , fn, resolve, ...args) ) ;
if ( activeCount < concurrency && queue.length > 0 ) {
queue.shift ( ) ( ) ;
}
} ;
const generator = ( fn, ...args) =>
new Promise( ( resolve) => {
enqueue( fn, resolve, ...args) ;
} ) ;
return generator;
} ;
1. 2. 3. 4. 5. 6. 7. 8. 9. 10. 11. 12. 13. 14. 15. 16. 17. 18. 19. 20. 21. 22. 23. 24. 25. 26. 27. 28. 29. 30. 31. 32. 33. 34. 35. 36. 37. 38. 39. 40. 41.
这就已经实现了并发控制。
不信我们跑跑看:
准备这样一段测试代码:
复制 const limit = pLimit( 2 ) ;
function asyncFun( value, delay) {
return new Promise( ( resolve) => {
console.log ( 'start ' + value) ;
setTimeout( ( ) => resolve( value) , delay) ;
} ) ;
}
( async function ( ) {
const arr = [
limit ( ( ) => asyncFun( 'aaa' , 2000 ) ) ,
limit ( ( ) => asyncFun( 'bbb' , 3000 ) ) ,
limit ( ( ) => asyncFun( 'ccc' , 1000 ) ) ,
limit ( ( ) => asyncFun( 'ccc' , 1000 ) ) ,
limit ( ( ) => asyncFun( 'ccc' , 1000 ) )
] ;
const result = await Promise.all ( arr) ;
console.log ( result) ;
} ) ( ) ;
1. 2. 3. 4. 5. 6. 7. 8. 9. 10. 11. 12. 13. 14. 15. 16. 17. 18. 19. 20. 21.
没啥好说的,就是 setTimeout + promise,设置不同的 delay 时间。
并发数量为 2。
我们试下:
先并发执行前两个任务,2s 的时候一个任务执行完,又执行了一个任务,然后再过一秒,都执行完了,有同时执行了两个任务。
经过测试,我们已经实现了并发控制!
回顾一下我们实现的过程,其实就是一个队列来保存任务,开始的时候一次性执行最大并发数的任务,然后每执行完一个启动一个新的。
还是比较简单的。
上面的 40 行代码是最简化的版本,其实还有一些可以完善的地方,我们继续完善一下。
首先,我们要把并发数暴露出去,还要让开发者可以手动清理任务队列。
我们这样写:
复制 Object.defineProperties ( generator, {
activeCount: {
get: ( ) => activeCount
} ,
pendingCount: {
get: ( ) => queue.length
} ,
clearQueue: {
value: ( ) => {
queue.length = 0 ;
}
}
} ) ;
1. 2. 3. 4. 5. 6. 7. 8. 9. 10. 11. 12. 13.
用 Object.defineProperties 只定义 get 函数,这样 activeCount、pendingCount 就是只能读不能改的。
同时还提供了一个清空任务队列的函数。
然后传入的参数也加个校验逻辑:
复制 if ( ! ( ( Number.isInteger ( concurrency) || concurrency === Infinity) && concurrency > 0 ) ) {
throw new TypeError( 'Expected `concurrency` to be a number from 1 and up' ) ;
}
不是整数或者小于 0 就报错,当然,Infinity 也是可以的。
最后,其实还有一个特别需要完善的点,就是这里:
复制 const enqueue = ( fn, resolve, ...args) => {
queue.push ( run.bind ( null , fn, resolve, ...args) ) ;
if ( activeCount < concurrency && queue.length > 0 ) {
queue.shift ( ) ( ) ;
}
} ;
应该改成这样:
复制 const enqueue = ( fn, resolve, ...args) => {
queue.push ( run.bind ( null , fn, resolve, ...args) ) ;
( async ( ) => {
await Promise.resolve ( ) ;
if ( activeCount < concurrency && queue.length > 0 ) {
queue.shift ( ) ( ) ;
}
} ) ( ) ;
} ;
因为 activeCount-- 的逻辑是在执行完任务之后才执行的,万一任务还没执行完,这时候 activeCount 就是不准的。
所以为了保证并发数量能控制准确,要等全部的微任务执行完再拿 activeCount。
怎么在全部的微任务执行完再执行逻辑呢?
加一个新的微任务不就行了?
所以有这样的 await Promise.resolve(); 的逻辑。
这样,就是一个完善的并发控制逻辑了,p-limit 也是这么实现的。
感兴趣的同学可以自己试一下:
复制 const pLimit = ( concurrency) => {
if ( ! ( ( Number.isInteger ( concurrency) || concurrency === Infinity) && concurrency > 0 ) ) {
throw new TypeError( 'Expected `concurrency` to be a number from 1 and up' ) ;
}
const queue = [ ] ;
let activeCount = 0 ;
const next = ( ) => {
activeCount--;
if ( queue.length > 0 ) {
queue.shift ( ) ( ) ;
}
} ;
const run = async ( fn, resolve, ...args) => {
activeCount++ ;
const result = ( async ( ) => fn( ...args) ) ( ) ;
resolve( result) ;
try {
await result;
} catch { }
next( ) ;
} ;
const enqueue = ( fn, resolve, ...args) => {
queue.push ( run.bind ( null , fn, resolve, ...args) ) ;
( async ( ) => {
await Promise.resolve ( ) ;
if ( activeCount < concurrency && queue.length > 0 ) {
queue.shift ( ) ( ) ;
}
} ) ( ) ;
} ;
const generator = ( fn, ...args) =>
new Promise( ( resolve) => {
enqueue( fn, resolve, ...args) ;
} ) ;
Object.defineProperties ( generator, {
activeCount: {
get: ( ) => activeCount
} ,
pendingCount: {
get: ( ) => queue.length
} ,
clearQueue: {
value: ( ) => {
queue.length = 0 ;
}
}
} ) ;
return generator;
} ;
const limit = pLimit( 2 ) ;
function asyncFun( value, delay) {
return new Promise( ( resolve) => {
console.log ( 'start ' + value) ;
setTimeout( ( ) => resolve( value) , delay) ;
} ) ;
}
( async function ( ) {
const arr = [
limit ( ( ) => asyncFun( 'aaa' , 2000 ) ) ,
limit ( ( ) => asyncFun( 'bbb' , 3000 ) ) ,
limit ( ( ) => asyncFun( 'ccc' , 1000 ) ) ,
limit ( ( ) => asyncFun( 'ccc' , 1000 ) ) ,
limit ( ( ) => asyncFun( 'ccc' , 1000 ) )
] ;
const result = await Promise.all ( arr) ;
console.log ( result) ;
} ) ( ) ;
1. 2. 3. 4. 5. 6. 7. 8. 9. 10. 11. 12. 13. 14. 15. 16. 17. 18. 19. 20. 21. 22. 23. 24. 25. 26. 27. 28. 29. 30. 31. 32. 33. 34. 35. 36. 37. 38. 39. 40. 41. 42. 43. 44. 45. 46. 47. 48. 49. 50. 51. 52. 53. 54. 55. 56. 57. 58. 59. 60. 61. 62. 63. 64. 65. 66. 67. 68. 69. 70. 71. 72. 73. 74. 75. 76. 77. 78. 79. 80. 81. 82. 83. 84. 85.
总结 js 代码经常要处理异步逻辑的串行、并行,还可能要做并发控制,这也是面试常考的点。
实现并发控制的核心就是通过一个队列保存所有的任务,然后最开始批量执行一批任务到最大并发数,然后每执行完一个任务就再执行一个新的。
其中要注意的是为了保证获取的任务数量是准确的,要在所有微任务执行完之后再获取数量。
实现并发控制只要 40 多行代码,其实这就是 p-limit 的源码了,大家感兴趣也可以自己实现一下。