Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ axum = "0.7"
object_store = { version = "0.12.4", features = ["aws"] }
aws-config = "1"
aws-sdk-ec2 = "1"
openssl = { version = "0.10", features = ["vendored"] }

[[bin]]
name = "dfbench"
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,5 +119,5 @@ Several arguments can be passed for running the benchmarks against different sca
for example:

```shell
npm run datafusion-bench -- --sf 10 --files-per-task 4 --query 7
npm run datafusion-bench -- --datset tpch_sf10 --files-per-task 4 --query 7
```
263 changes: 166 additions & 97 deletions benchmarks/cdk/bin/@bench-common.ts
Original file line number Diff line number Diff line change
@@ -1,135 +1,204 @@
import path from "path";
import fs from "fs/promises";
import { z } from 'zod';
import {z} from 'zod';

export const ROOT = path.join(__dirname, '../../..')
export const BUCKET = 's3://datafusion-distributed-benchmarks' // hardcoded in CDK code

// Simple data structures
export type QueryResult = {
query: string;
iterations: { elapsed: number; row_count: number }[];
query: string;
iterations: { elapsed: number; row_count: number }[];
failure?: string
}

export type BenchmarkResults = {
queries: QueryResult[];
queries: QueryResult[];
}

export const BenchmarkResults = z.object({
queries: z.array(z.object({
query: z.string(),
iterations: z.array(z.object({
elapsed: z.number(),
row_count: z.number()
queries: z.array(z.object({
query: z.string(),
iterations: z.array(z.object({
elapsed: z.number(),
row_count: z.number()
})),
failed: z.string().optional()
}))
}))
})

export const IDS = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22]

export async function writeJson(results: BenchmarkResults, outputPath?: string) {
if (!outputPath) return;
await fs.mkdir(path.dirname(outputPath), { recursive: true });
await fs.writeFile(outputPath, JSON.stringify(results, null, 2));
if (!outputPath) return;
await fs.mkdir(path.dirname(outputPath), { recursive: true });
await fs.writeFile(outputPath, JSON.stringify(results, null, 2));
}

export async function compareWithPrevious(results: BenchmarkResults, outputPath: string) {
let prevResults: BenchmarkResults;
try {
const prevContent = await fs.readFile(outputPath, 'utf-8');
prevResults = BenchmarkResults.parse(JSON.parse(prevContent));
} catch {
return; // No previous results to compare
}

console.log('\n==== Comparison with previous run ====');

for (const query of results.queries) {
const prevQuery = prevResults.queries.find(q => q.query === query.query);
if (!prevQuery || prevQuery.iterations.length === 0 || query.iterations.length === 0) {
continue;
let prevResults: BenchmarkResults;
try {
const prevContent = await fs.readFile(outputPath, 'utf-8');
prevResults = BenchmarkResults.parse(JSON.parse(prevContent));
} catch {
return; // No previous results to compare
}

const avgPrev = Math.round(
prevQuery.iterations.reduce((sum, i) => sum + i.elapsed, 0) / prevQuery.iterations.length
);
const avg = Math.round(
query.iterations.reduce((sum, i) => sum + i.elapsed, 0) / query.iterations.length
);

const factor = avg < avgPrev ? avgPrev / avg : avg / avgPrev;
const tag = avg < avgPrev ? "faster" : "slower";
const emoji = factor > 1.2 ? (avg < avgPrev ? "✅" : "❌") : (avg < avgPrev ? "✔" : "✖");

console.log(
`${query.query.padStart(8)}: prev=${avgPrev.toString().padStart(4)} ms, new=${avg.toString().padStart(4)} ms, ${factor.toFixed(2)}x ${tag} ${emoji}`
);
}
}

export interface BenchmarkRunner {
createTables(sf: number): Promise<void>;

executeQuery(query: string): Promise<{ rowCount: number }>;
}
console.log('\n==== Comparison with previous run ====');

export async function runBenchmark(
runner: BenchmarkRunner,
options: {
sf: number;
iterations: number;
specificQuery?: number;
outputPath: string;
}
) {
const { sf, iterations, specificQuery, outputPath } = options;
for (const query of results.queries) {
const prevQuery = prevResults.queries.find(q => q.query === query.query);
if (!prevQuery || prevQuery.iterations.length === 0 || query.iterations.length === 0) {
continue;
}

const results: BenchmarkResults = { queries: [] };
const queriesPath = path.join(ROOT, "testdata", "tpch", "queries")
const avgPrev = Math.round(
prevQuery.iterations.reduce((sum, i) => sum + i.elapsed, 0) / prevQuery.iterations.length
);
const avg = Math.round(
query.iterations.reduce((sum, i) => sum + i.elapsed, 0) / query.iterations.length
);

console.log("Creating tables...");
await runner.createTables(sf);
const factor = avg < avgPrev ? avgPrev / avg : avg / avgPrev;
const tag = avg < avgPrev ? "faster" : "slower";
const emoji = factor > 1.2 ? (avg < avgPrev ? "✅" : "❌") : (avg < avgPrev ? "✔" : "✖");

for (let id of IDS) {
if (specificQuery && specificQuery !== id) {
continue;
console.log(
`${query.query.padStart(8)}: prev=${avgPrev.toString().padStart(4)} ms, new=${avg.toString().padStart(4)} ms, ${factor.toFixed(2)}x ${tag} ${emoji}`
);
}
}

const queryId = `q${id}`;
const filePath = path.join(queriesPath, `${queryId}.sql`)
const queryToExecute = await fs.readFile(filePath, 'utf-8')
export interface TableSpec {
schema: string
name: string
s3Path: string
}

export interface BenchmarkRunner {
createTables(s3Paths: TableSpec[]): Promise<void>;

const queryResult: QueryResult = {
query: queryId,
iterations: []
};
executeQuery(query: string): Promise<{ rowCount: number }>;
}

console.log(`Warming up query ${id}...`)
await runner.executeQuery(queryToExecute);
async function tablePathsForDataset(dataset: string): Promise<TableSpec[]> {
const datasetPath = path.join(ROOT, "benchmarks", "data", dataset)

const result: TableSpec[] = []
for (const entryName of await fs.readdir(datasetPath)) {
const dir = path.join(datasetPath, entryName)
if (await isDirWithAllParquetFiles(dir)) {
result.push({
name: entryName,
schema: dataset,
s3Path: `${BUCKET}/${dataset}/${entryName}/`
})
}
}
return result
}

for (let i = 0; i < iterations; i++) {
const start = new Date()
const response = await runner.executeQuery(queryToExecute);
const elapsed = Math.round(new Date().getTime() - start.getTime())
async function isDirWithAllParquetFiles(dir: string): Promise<boolean> {
let readDir
try {
readDir = await fs.readdir(dir)
} catch (e) {
return false
}
for (const file of readDir) {
if (!file.endsWith(".parquet")) {
return false
}
}
return true
}

queryResult.iterations.push({
elapsed,
row_count: response.rowCount
});
async function queriesForDataset(dataset: string): Promise<[string, string][]> {
const datasetSuffix = dataset.split("_")[0]
const queriesPath = path.join(ROOT, "testdata", datasetSuffix, "queries")

console.log(
`Query ${id} iteration ${i} took ${elapsed} ms and returned ${response.rowCount} rows`
);
const queries: [string, string][] = []
for (const queryName of await fs.readdir(queriesPath)) {
const sql = await fs.readFile(path.join(queriesPath, queryName), 'utf-8');
queries.push([queryName, sql])
}
queries.sort(([name1], [name2]) => numericId(name1) > numericId(name2) ? 1 : -1)
return queries
}

const avg = Math.round(
queryResult.iterations.reduce((a, b) => a + b.elapsed, 0) / queryResult.iterations.length
);
console.log(`Query ${id} avg time: ${avg} ms`);
function numericId(queryName: string): number {
return parseInt([...queryName.matchAll(/(\d+)/g)][0][0])
}

results.queries.push(queryResult);
}
export async function runBenchmark(
runner: BenchmarkRunner,
options: {
dataset: string
iterations: number;
queries: number[];
outputPath: string;
}
) {
const { dataset, iterations, queries, outputPath } = options;

const results: BenchmarkResults = { queries: [] };

console.log("Creating tables...");
const s3Paths = await tablePathsForDataset(dataset)
await runner.createTables(s3Paths);

for (const [queryName, sql] of await queriesForDataset(dataset)) {
const id = numericId(queryName)

if (queries.length > 0 && !queries.includes(id)) {
continue;
}

const queryResult: QueryResult = {
query: queryName,
iterations: [],
};

console.log(`Warming up query ${id}...`)
try {
await runner.executeQuery(sql);
} catch (e: any) {
queryResult.failure = e.toString();
console.error(`Query ${queryResult.query} failed: ${queryResult.failure}`)
continue
}

for (let i = 0; i < iterations; i++) {
const start = new Date()
let response
try {
response = await runner.executeQuery(sql);
} catch (e: any) {
queryResult.failure = e.toString();
break
}
const elapsed = Math.round(new Date().getTime() - start.getTime())

queryResult.iterations.push({
elapsed,
row_count: response.rowCount
});

console.log(
`Query ${id} iteration ${i} took ${elapsed} ms and returned ${response.rowCount} rows`
);
}

const avg = Math.round(
queryResult.iterations.reduce((a, b) => a + b.elapsed, 0) / queryResult.iterations.length
);
console.log(`Query ${id} avg time: ${avg} ms`);

if (queryResult.failure) {
console.error(`Query ${queryResult.query} failed: ${queryResult.failure}`)
}
results.queries.push(queryResult);
}

// Write results and compare
await compareWithPrevious(results, outputPath);
await writeJson(results, outputPath);
// Write results and compare
await compareWithPrevious(results, outputPath);
await writeJson(results, outputPath);
}
Loading