Skip to content

Commit 14b311f

Browse files
committed
feat: add pLimit & createLimitedVersionOfFn; refactor Dispatcher idleState
1 parent 6b90d94 commit 14b311f

File tree

12 files changed

+192
-61
lines changed

12 files changed

+192
-61
lines changed

package.json

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -34,23 +34,10 @@
3434
"gen-readme": "swig render ./readme/readme.md > README.md && prettier --write README.md",
3535
"dev": "tsup --watch",
3636
"build": "tsup",
37+
"typecheck": "tsc --noEmit",
3738
"prepublishOnly": "npm run build"
3839
},
39-
"repository": {
40-
"type": "git",
41-
"url": "git+https://github.com/magicdawn/promise.map.git"
42-
},
43-
"keywords": [
44-
"promise",
45-
"map",
46-
"parallel"
47-
],
48-
"author": "magicdawn",
49-
"license": "MIT",
50-
"bugs": {
51-
"url": "https://github.com/magicdawn/promise.map/issues"
52-
},
53-
"homepage": "https://github.com/magicdawn/promise.map#readme",
40+
"packageManager": "[email protected]+sha512.76e2379760a4328ec4415815bcd6628dee727af3779aaa4c914e3944156c4299921a89f976381ee107d41f12cfa4b66681ca9c718f0668fa0831ed4c6d8ba56c",
5441
"devDependencies": {
5542
"@magicdawn/eslint-config": "^0.1.0",
5643
"@magicdawn/prettier-config": "^0.0.4",
@@ -68,6 +55,21 @@
6855
"typescript": "^5.7.2",
6956
"vitest": "^2.1.8"
7057
},
58+
"repository": {
59+
"type": "git",
60+
"url": "git+https://github.com/magicdawn/promise.map.git"
61+
},
62+
"keywords": [
63+
"promise",
64+
"map",
65+
"parallel"
66+
],
67+
"author": "magicdawn",
68+
"license": "MIT",
69+
"bugs": {
70+
"url": "https://github.com/magicdawn/promise.map/issues"
71+
},
72+
"homepage": "https://github.com/magicdawn/promise.map#readme",
7173
"husky": {
7274
"hooks": {
7375
"pre-commit": "lint-staged"
@@ -78,7 +80,6 @@
7880
"prettier --write"
7981
]
8082
},
81-
"packageManager": "[email protected]+sha512.76e2379760a4328ec4415815bcd6628dee727af3779aaa4c914e3944156c4299921a89f976381ee107d41f12cfa4b66681ca9c718f0668fa0831ed4c6d8ba56c",
8283
"publishConfig": {
8384
"registry": "https://registry.npmjs.org/"
8485
}

src/dispatcher.ts

Lines changed: 35 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,29 @@
1-
export type UniversalMap<E, V> = E extends object ? WeakMap<E, V> : Map<E, V>
1+
type Ref<T> = { val: T }
22

33
export class Dispatcher<E> {
4-
private executors: E[]
5-
private idleMap: UniversalMap<E, boolean>
4+
static fromConcurrency(concurrency: number, label = '') {
5+
if (!isFinite(concurrency)) throw new Error('concurrency must be finite')
6+
if (!(concurrency > 0)) throw new Error('concurrency must be greater than 0')
7+
const executors = new Array(concurrency).fill(0).map((_, index) =>
8+
[label, `executors(${concurrency})`, index]
9+
.map((x) => x && x.toString().trim())
10+
.filter(Boolean)
11+
.join('.'),
12+
)
13+
return new Dispatcher(executors)
14+
}
15+
16+
private unwrapRef<T>(ref: Ref<T>): T {
17+
return ref.val
18+
}
19+
20+
private executorRefs: Ref<E>[]
21+
private idleState: WeakMap<Ref<E>, boolean>
622
constructor(executors: E[]) {
723
if (!executors.length) throw new Error('executors can not be empty')
8-
this.executors = executors
9-
10-
this.idleMap = (
11-
executors[0] && typeof executors[0] === 'object'
12-
? new WeakMap<E & object, boolean>()
13-
: new Map<E, boolean>()
14-
) as UniversalMap<E, boolean>
15-
executors.forEach((x) => this.idleMap.set(x, true))
24+
this.executorRefs = executors.map((x) => ({ val: x })) // weakmap 需要 reference type 作为 key
25+
this.idleState = new WeakMap()
26+
this.executorRefs.forEach((x) => this.idleState.set(x, true))
1627
}
1728

1829
private aborted = false
@@ -22,35 +33,36 @@ export class Dispatcher<E> {
2233
}
2334

2435
pendingResolves: Array<() => void> = []
25-
replenish = (executor: E) => {
36+
replenish = (executor: Ref<E>) => {
2637
if (this.aborted) return
2738
if (!this.pendingResolves.length) return
2839
this.pendingResolves.shift()?.()
2940
}
3041

31-
private async getExecutor() {
32-
const find = () => this.executors.find((x) => this.idleMap.get(x))
42+
private async getExecutorRef() {
43+
const find = () => this.executorRefs.find((x) => this.idleState.get(x))
3344

34-
let executor = find()
35-
while (!executor) {
45+
let executorRef = find()
46+
while (!executorRef) {
3647
const { promise, resolve } = Promise.withResolvers<void>()
3748
this.pendingResolves.push(resolve)
3849
await promise
39-
executor = find()
50+
executorRef = find()
4051
}
4152

42-
this.idleMap.set(executor, false) // mark used
43-
return executor
53+
this.idleState.set(executorRef, false) // mark used
54+
return executorRef
4455
}
4556

46-
async dispatch<T>(action: (executor: E) => T) {
47-
const executor = await this.getExecutor()
57+
async dispatch<R>(action: (executor: E) => R): Promise<Awaited<R>> {
58+
const executorRef = await this.getExecutorRef()
59+
const executor = this.unwrapRef(executorRef)
4860
try {
4961
return await action(executor)
5062
} finally {
51-
this.idleMap.set(executor, true)
63+
this.idleState.set(executorRef, true)
5264
// replenish run as a `macro task`, before this macro task, `abort` can be called
53-
setTimeout(() => this.replenish(executor))
65+
setTimeout(() => this.replenish(executorRef))
5466
}
5567
}
5668
}

src/index.ts

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,4 @@
11
export { Dispatcher } from './dispatcher'
22
export { pmapWorker } from './worker'
3-
export { pmap } from './pmap'
4-
5-
// pmap default
6-
import { pmap } from './pmap'
7-
export default pmap
3+
export { pLimit, createLimitedVersionOfFn } from './limit'
4+
export { pmap, pmap as default } from './pmap'

src/limit.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
import { Dispatcher } from './dispatcher'
2+
3+
// api same as `p-limit` package
4+
export function pLimit(concurrency: number) {
5+
const dispatcher = Dispatcher.fromConcurrency(concurrency, 'pLimit')
6+
return function limit<T extends unknown[], R>(fn: (...args: T) => R, ...args: NoInfer<T>) {
7+
return dispatcher.dispatch(() => fn(...args))
8+
}
9+
}
10+
11+
export function createLimitedVersionOfFn<T extends unknown[], R>(
12+
fn: (...args: T) => R,
13+
concurrency: number,
14+
) {
15+
const dispatcher = Dispatcher.fromConcurrency(concurrency, 'createLimitedVersionOfFn')
16+
return function limitedVersionOfFn(...args: NoInfer<T>) {
17+
return dispatcher.dispatch(() => fn(...args))
18+
}
19+
}

src/pmap.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,7 @@ export async function pmap<T, R>(
66
concurrency: number,
77
) {
88
concurrency = Math.min(concurrency, arr.length)
9-
const executors = new Array(concurrency).fill(0).map((_, index) => `pmap.executor.${index}`)
10-
const dispatcher = new Dispatcher(executors)
9+
const dispatcher = Dispatcher.fromConcurrency(concurrency, 'pmap')
1110
try {
1211
return await Promise.all(
1312
arr.map((item, index) => dispatcher.dispatch(() => fn(item, index, arr))),

test/.eslintrc.yml

Lines changed: 0 additions & 4 deletions
This file was deleted.

test/_shared.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
export function approximateCostTime(time: number, tolerance: number) {
2+
return function satisfy(val: number) {
3+
return val >= time && val - time <= Math.abs(tolerance)
4+
}
5+
}

test/dispatcher.test.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
import { describe, expect, it } from 'vitest'
2+
import { Dispatcher } from '../src'
3+
4+
describe('constructor check', () => {
5+
it('should throws when executors is empty', () => {
6+
expect(() => new Dispatcher([])).toThrow('executors can not be empty')
7+
})
8+
})

test/limit.test.ts

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
import { delay, noop, range } from 'es-toolkit'
2+
import { expect, describe, it } from 'vitest'
3+
import { createLimitedVersionOfFn, pLimit } from '../src'
4+
import { approximateCostTime } from './_shared'
5+
6+
describe('concurrency invalid check', () => {
7+
describe('pLimit(concurrency)', () => {
8+
it('should throws when concurrency is invalid', () => {
9+
expect(() => pLimit(0)).toThrowError('concurrency must be greater than 0')
10+
expect(() => pLimit(Infinity)).toThrowError('concurrency must be finite')
11+
})
12+
})
13+
14+
describe('createLimitedVersionOfFn(fn, concurrency)', () => {
15+
it('should throws when concurrency is invalid', () => {
16+
expect(() => createLimitedVersionOfFn(noop, 0)).toThrowError(
17+
'concurrency must be greater than 0',
18+
)
19+
expect(() => createLimitedVersionOfFn(noop, Infinity)).toThrowError(
20+
'concurrency must be finite',
21+
)
22+
})
23+
})
24+
})
25+
26+
describe('concurrency control is correct', () => {
27+
it('pLimit', async function () {
28+
let arr = range(5) // [0 .. 4]
29+
async function measureCostTime(concurrency: number) {
30+
let start = performance.now()
31+
32+
async function work(x: number) {
33+
await delay(x * 10)
34+
return x * 10
35+
}
36+
37+
const limit = pLimit(concurrency)
38+
const result = await Promise.all(arr.map((value) => limit(() => work(value))))
39+
expect(result).toEqual(arr.map((value) => value * 10))
40+
41+
const cost = performance.now() - start
42+
return cost
43+
}
44+
45+
function approximateCostTime(time: number, tolerance: number) {
46+
return function satisfy(val: number) {
47+
return Math.abs(val - time) <= Math.abs(tolerance)
48+
}
49+
}
50+
51+
const tolerance = 10
52+
53+
// Infinity
54+
expect(await measureCostTime(arr.length)).toSatisfy(approximateCostTime(40, tolerance))
55+
56+
// executor-0: 0ms 20ms 40ms
57+
// executor-1: 10ms 30ms
58+
expect(await measureCostTime(2)).toSatisfy(approximateCostTime(60, tolerance))
59+
60+
// executor-0: 0ms 30ms
61+
// executor-1: 10ms 40ms
62+
// executor-2: 20ms
63+
expect(await measureCostTime(3)).toSatisfy(approximateCostTime(50, tolerance))
64+
})
65+
66+
it('createLimitedVersionOfFn', async function () {
67+
let arr = range(5) // [0 .. 4]
68+
async function measureCostTime(concurrency: number) {
69+
let start = performance.now()
70+
71+
async function work(x: number) {
72+
await delay(x * 10)
73+
return x * 10
74+
}
75+
76+
const workWithLimit = createLimitedVersionOfFn(work, concurrency)
77+
const result = await Promise.all(arr.map((x) => workWithLimit(x)))
78+
expect(result).toEqual(arr.map((value) => value * 10))
79+
80+
const cost = performance.now() - start
81+
return cost
82+
}
83+
84+
const tolerance = 10
85+
86+
// Infinity
87+
expect(await measureCostTime(arr.length)).toSatisfy(approximateCostTime(40, tolerance))
88+
89+
// executor-0: 0ms 20ms 40ms
90+
// executor-1: 10ms 30ms
91+
expect(await measureCostTime(2)).toSatisfy(approximateCostTime(60, tolerance))
92+
93+
// executor-0: 0ms 30ms
94+
// executor-1: 10ms 40ms
95+
// executor-2: 20ms
96+
expect(await measureCostTime(3)).toSatisfy(approximateCostTime(50, tolerance))
97+
})
98+
})

test/pmap.test.ts

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { delay, range } from 'es-toolkit'
22
import { describe, it, expect } from 'vitest'
33
import pmap from '../src'
4+
import { approximateCostTime } from './_shared'
45

56
describe('pmap', function () {
67
it('it works', async function () {
@@ -32,12 +33,6 @@ describe('pmap', function () {
3233
return performance.now() - start
3334
}
3435

35-
function approximateCostTime(time: number, tolerance: number) {
36-
return function satisfy(val: number) {
37-
return Math.abs(val - time) <= Math.abs(tolerance)
38-
}
39-
}
40-
4136
const tolerance = 10
4237

4338
// Infinity
@@ -56,7 +51,7 @@ describe('pmap', function () {
5651
it('should not start new after errored', async () => {
5752
const arr = [1, 2, 3, 4, 5, 6]
5853
const called: number[] = []
59-
await expect(
54+
await expect(() =>
6055
pmap(
6156
arr,
6257
async (x: number) => {
@@ -68,7 +63,7 @@ describe('pmap', function () {
6863
},
6964
2,
7065
),
71-
).rejects.toThrow('test error')
66+
).rejects.toThrowError('test error')
7267
expect(called).toEqual([1, 2, 3])
7368
})
7469
})

0 commit comments

Comments
 (0)