Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: concurrency mode #77

Merged
merged 7 commits into from
Apr 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .eslintrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"commonjs": true
},
"rules": {
"import/no-extraneous-dependencies": "off",
"no-restricted-syntax": "off",
"no-await-in-loop": "off",
"no-plusplus": "off",
Expand Down
21 changes: 20 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,9 @@ export type Hook = (task: Task, mode: "warmup" | "run") => void | Promise<void>;
```

- `async run()`: run the added tasks that were registered using the `add` method
- `async runConcurrently(limit: number = Infinity)`: similar to the `run` method but runs concurrently rather than sequentially
- `async runConcurrently(threshold: number = Infinity, mode: "bench" | "task" = "bench")`: similar to the `run` method but runs concurrently rather than sequentially. See the [Concurrency](#Concurrency) section.
- `async warmup()`: warm up the benchmark tasks
- `async warmupConcurrently(threshold: number = Infinity, mode: "bench" | "task" = "bench")`: warm up the benchmark tasks concurrently
- `reset()`: reset each task and remove its result
- `add(name: string, fn: Fn, opts?: FnOpts)`: add a benchmark task to the task map
- `Fn`: `() => any | Promise<any>`
Expand Down Expand Up @@ -370,6 +371,24 @@ import { hrtimeNow } from 'tinybench';
```
It may make your benchmarks slower, check #42.

## Concurrency

- When `mode` is set to `null` (default), concurrency is disabled.
- When `mode` is set to 'task', each task's iterations (calls of a task function) run concurrently.
- When `mode` is set to 'bench', different tasks within the bench run concurrently. Concurrent cycles.

```ts
// options way (recommended)
bench.threshold = 10 // The maximum number of concurrent tasks to run. Defaults to Infinity.
bench.concurrency = "task" // The concurrency mode to determine how tasks are run.
// await bench.warmup()
await bench.run()

// standalone method way
// await bench.warmupConcurrently(10, "task")
await bench.runConcurrently(10, "task") // with runConcurrently, mode is set to 'bench' by default
```

## Prior art

- [Benchmark.js](https://github.com/bestiejs/benchmark.js)
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
"eslint-config-airbnb-base": "^15.0.0",
"eslint-plugin-import": "^2.26.0",
"nano-staged": "^0.5.0",
"p-limit": "^4.0.0",
"size-limit": "^7.0.8",
"tsup": "^5.11.7",
"typescript": "^5.2.2",
Expand Down
3 changes: 3 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

88 changes: 65 additions & 23 deletions src/bench.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import pLimit from 'p-limit';
import type {
Hook,
Options,
Expand All @@ -24,6 +25,20 @@ export default class Bench extends EventTarget {

_todos: Map<string, Task> = new Map();

/**
* Executes tasks concurrently based on the specified concurrency mode.
*
* - When `mode` is set to `null` (default), concurrency is disabled.
* - When `mode` is set to 'task', each task's iterations (calls of a task function) run concurrently.
* - When `mode` is set to 'bench', different tasks within the bench run concurrently.
*/
concurrency: 'task' | 'bench' | null = null;

/**
* The maximum number of concurrent tasks to run. Defaults to Infinity.
*/
threshold = Infinity;

signal?: AbortSignal;

throws: boolean;
Expand Down Expand Up @@ -67,7 +82,7 @@ export default class Bench extends EventTarget {
}
}

runTask(task: Task) {
private runTask(task: Task) {
if (this.signal?.aborted) return task;
return task.run();
}
Expand All @@ -77,7 +92,11 @@ export default class Bench extends EventTarget {
* {@link add} method.
* Note: This method does not do any warmup. Call {@link warmup} for that.
*/
async run() {
async run(): Promise<Task[]> {
if (this.concurrency === 'bench') {
// TODO: in the next major, we should remove runConcurrently
return this.runConcurrently(this.threshold, this.concurrency);
}
this.dispatchEvent(createBenchEvent('start'));
const values: Task[] = [];
for (const task of [...this._tasks.values()]) {
Expand All @@ -88,32 +107,26 @@ export default class Bench extends EventTarget {
}

/**
* similar to the {@link run} method but runs concurrently rather than sequentially
* default limit is Infinity
* See Bench.{@link concurrency}
*/
async runConcurrently(limit = Infinity) {
this.dispatchEvent(createBenchEvent('start'));
async runConcurrently(threshold = Infinity, mode: NonNullable<Bench['concurrency']> = 'bench'): Promise<Task[]> {
this.threshold = threshold;
this.concurrency = mode;

const remainingTasks = [...this._tasks.values()];
const values: Task[] = [];
if (mode === 'task') {
return this.run();
}

const handleConcurrency = async () => {
while (remainingTasks.length > 0) {
const runningTasks: (Promise<Task> | Task)[] = [];
this.dispatchEvent(createBenchEvent('start'));

// Start tasks up to the concurrency limit
while (runningTasks.length < limit && remainingTasks.length > 0) {
const task = remainingTasks.pop()!;
runningTasks.push(this.runTask(task));
}
const limit = pLimit(threshold);

// Wait for all running tasks to complete
const completedTasks = await Promise.all(runningTasks);
values.push(...completedTasks);
}
};
const promises: Promise<Task>[] = [];
for (const task of [...this._tasks.values()]) {
promises.push(limit(() => this.runTask(task)));
}

await handleConcurrency();
const values = await Promise.all(promises);

this.dispatchEvent(createBenchEvent('complete'));

Expand All @@ -124,13 +137,42 @@ export default class Bench extends EventTarget {
* warmup the benchmark tasks.
* This is not run by default by the {@link run} method.
*/
async warmup() {
async warmup(): Promise<void> {
if (this.concurrency === 'bench') {
// TODO: in the next major, we should remove *Concurrently methods
await this.warmupConcurrently(this.threshold, this.concurrency);
return;
}
this.dispatchEvent(createBenchEvent('warmup'));
for (const [, task] of this._tasks) {
await task.warmup();
}
}

/**
* warmup the benchmark tasks concurrently.
* This is not run by default by the {@link runConcurrently} method.
*/
async warmupConcurrently(threshold = Infinity, mode: NonNullable<Bench['concurrency']> = 'bench'): Promise<void> {
this.threshold = threshold;
this.concurrency = mode;

if (mode === 'task') {
await this.warmup();
return;
}

this.dispatchEvent(createBenchEvent('warmup'));
const limit = pLimit(threshold);
const promises: Promise<void>[] = [];

for (const [, task] of this._tasks) {
promises.push(limit(() => task.warmup()));
}

await Promise.all(promises);
}

/**
* reset each task and remove its result
*/
Expand Down
56 changes: 36 additions & 20 deletions src/task.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import pLimit from 'p-limit';
import type {
Fn,
TaskEvents,
Expand Down Expand Up @@ -52,6 +53,8 @@ export default class Task extends EventTarget {
}

private async loop(time: number, iterations: number): Promise<{ error?: unknown, samples?: number[] }> {
const concurrent = this.bench.concurrency === 'task';
const { threshold } = this.bench;
let totalTime = 0; // ms
const samples: number[] = [];
if (this.opts.beforeAll != null) {
Expand All @@ -63,33 +66,46 @@ export default class Task extends EventTarget {
}
const isAsync = await isAsyncTask(this);

const executeTask = async () => {
if (this.opts.beforeEach != null) {
await this.opts.beforeEach.call(this);
}

let taskTime = 0;
if (isAsync) {
const taskStart = this.bench.now();
await this.fn.call(this);
taskTime = this.bench.now() - taskStart;
} else {
const taskStart = this.bench.now();
this.fn.call(this);
taskTime = this.bench.now() - taskStart;
}

samples.push(taskTime);
totalTime += taskTime;

if (this.opts.afterEach != null) {
await this.opts.afterEach.call(this);
}
};

const limit = pLimit(threshold);
try {
const promises: Promise<void>[] = []; // only for task level concurrency
while (
(totalTime < time || samples.length < iterations)
(totalTime < time || ((samples.length + limit.activeCount + limit.pendingCount) < iterations))
&& !this.bench.signal?.aborted
) {
if (this.opts.beforeEach != null) {
await this.opts.beforeEach.call(this);
}

let taskTime = 0;
if (isAsync) {
const taskStart = this.bench.now();
await this.fn.call(this);
taskTime = this.bench.now() - taskStart;
if (concurrent) {
promises.push(limit(executeTask));
} else {
const taskStart = this.bench.now();
this.fn.call(this);
taskTime = this.bench.now() - taskStart;
}

samples.push(taskTime);
totalTime += taskTime;

if (this.opts.afterEach != null) {
await this.opts.afterEach.call(this);
await executeTask();
}
}
if (promises.length) {
await Promise.all(promises);
}
} catch (error) {
return { error };
}
Expand Down
69 changes: 67 additions & 2 deletions test/sequential.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ test('sequential', async () => {
expect(isFirstTaskDefined).toBe(true);
});

test('concurrent', async () => {
test.each(['warmup', 'run'])('%s concurrent (bench level)', async (mode) => {
const concurrentBench = new Bench({
time: 0,
iterations: 100,
throws: true,
});

let shouldBeDefined1: true;
Expand All @@ -52,7 +53,12 @@ test('concurrent', async () => {
shouldNotBeDefinedFirst2 = true;
});

concurrentBench.runConcurrently();
if (mode === 'warmup') {
concurrentBench.warmupConcurrently();
} else {
concurrentBench.runConcurrently();
}

await setTimeout(0);
expect(shouldBeDefined1!).toBeDefined();
expect(shouldBeDefined2!).toBeDefined();
Expand All @@ -62,3 +68,62 @@ test('concurrent', async () => {
expect(shouldNotBeDefinedFirst1!).toBeDefined();
expect(shouldNotBeDefinedFirst2!).toBeDefined();
});

test.each(['warmup', 'run'])('%s concurrent (task level)', async (mode) => {
const iterations = 10;
const concurrentBench = new Bench({
time: 0,
warmupTime: 0,
iterations,
warmupIterations: iterations,
});
const key = 'sample 1';

const runs = { value: 0 };
concurrentBench
.add(key, async () => {
runs.value++;
await setTimeout(10);
// all task function should be here after 10ms
expect(runs.value).toEqual(iterations);
await setTimeout(10);
});

if (mode === 'warmup') {
await concurrentBench.warmup();
} else {
await concurrentBench.run();
for (const result of concurrentBench.results) {
expect(result?.error).toMatch(/AssertionError/);
}
}
expect(concurrentBench.getTask(key)!.runs).toEqual(0);

concurrentBench.reset();
runs.value = 0;

if (mode === 'warmup') {
await concurrentBench.warmupConcurrently();
} else {
await concurrentBench.runConcurrently();
for (const result of concurrentBench.results) {
expect(result?.error).toMatch(/AssertionError/);
}
}
expect(concurrentBench.getTask(key)!.runs).toEqual(0);
concurrentBench.reset();
runs.value = 0;

if (mode === 'warmup') {
await concurrentBench.warmupConcurrently(Infinity, 'task');
expect(runs.value).toEqual(10);
} else {
await concurrentBench.runConcurrently(Infinity, 'task');

for (const result of concurrentBench.results) {
expect(result?.error).toBeUndefined();
}
expect(runs.value).toEqual(10);
expect(concurrentBench.getTask(key)!.runs).toEqual(10);
}
});
Loading