diff --git a/benchmarks/cdk/bin/@bench-common.ts b/benchmarks/cdk/bin/@bench-common.ts new file mode 100644 index 0000000..2d978b6 --- /dev/null +++ b/benchmarks/cdk/bin/@bench-common.ts @@ -0,0 +1,135 @@ +import path from "path"; +import fs from "fs/promises"; +import { z } from 'zod'; + +export const ROOT = path.join(__dirname, '../../..') + +// Simple data structures +export type QueryResult = { + query: string; + iterations: { elapsed: number; row_count: number }[]; +} + +export type BenchmarkResults = { + queries: QueryResult[]; +} + +export const BenchmarkResults = z.object({ + queries: z.array(z.object({ + query: z.string(), + iterations: z.array(z.object({ + elapsed: z.number(), + row_count: z.number() + })) + })) +}) + +export const IDS = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22] + +export async function writeJson(results: BenchmarkResults, outputPath?: string) { + if (!outputPath) return; + await fs.mkdir(path.dirname(outputPath), { recursive: true }); + await fs.writeFile(outputPath, JSON.stringify(results, null, 2)); +} + +export async function compareWithPrevious(results: BenchmarkResults, outputPath: string) { + let prevResults: BenchmarkResults; + try { + const prevContent = await fs.readFile(outputPath, 'utf-8'); + prevResults = BenchmarkResults.parse(JSON.parse(prevContent)); + } catch { + return; // No previous results to compare + } + + console.log('\n==== Comparison with previous run ===='); + + for (const query of results.queries) { + const prevQuery = prevResults.queries.find(q => q.query === query.query); + if (!prevQuery || prevQuery.iterations.length === 0 || query.iterations.length === 0) { + continue; + } + + const avgPrev = Math.round( + prevQuery.iterations.reduce((sum, i) => sum + i.elapsed, 0) / prevQuery.iterations.length + ); + const avg = Math.round( + query.iterations.reduce((sum, i) => sum + i.elapsed, 0) / query.iterations.length + ); + + const factor = avg < avgPrev ? avgPrev / avg : avg / avgPrev; + const tag = avg < avgPrev ? "faster" : "slower"; + const emoji = factor > 1.2 ? (avg < avgPrev ? "✅" : "❌") : (avg < avgPrev ? "✔" : "✖"); + + console.log( + `${query.query.padStart(8)}: prev=${avgPrev.toString().padStart(4)} ms, new=${avg.toString().padStart(4)} ms, ${factor.toFixed(2)}x ${tag} ${emoji}` + ); + } +} + +export interface BenchmarkRunner { + createTables(sf: number): Promise; + + executeQuery(query: string): Promise<{ rowCount: number }>; +} + +export async function runBenchmark( + runner: BenchmarkRunner, + options: { + sf: number; + iterations: number; + specificQuery?: number; + outputPath: string; + } +) { + const { sf, iterations, specificQuery, outputPath } = options; + + const results: BenchmarkResults = { queries: [] }; + const queriesPath = path.join(ROOT, "testdata", "tpch", "queries") + + console.log("Creating tables..."); + await runner.createTables(sf); + + for (let id of IDS) { + if (specificQuery && specificQuery !== id) { + continue; + } + + const queryId = `q${id}`; + const filePath = path.join(queriesPath, `${queryId}.sql`) + const queryToExecute = await fs.readFile(filePath, 'utf-8') + + const queryResult: QueryResult = { + query: queryId, + iterations: [] + }; + + console.log(`Warming up query ${id}...`) + await runner.executeQuery(queryToExecute); + + for (let i = 0; i < iterations; i++) { + const start = new Date() + const response = await runner.executeQuery(queryToExecute); + const elapsed = Math.round(new Date().getTime() - start.getTime()) + + queryResult.iterations.push({ + elapsed, + row_count: response.rowCount + }); + + console.log( + `Query ${id} iteration ${i} took ${elapsed} ms and returned ${response.rowCount} rows` + ); + } + + const avg = Math.round( + queryResult.iterations.reduce((a, b) => a + b.elapsed, 0) / queryResult.iterations.length + ); + console.log(`Query ${id} avg time: ${avg} ms`); + + results.queries.push(queryResult); + } + + // Write results and compare + await compareWithPrevious(results, outputPath); + await writeJson(results, outputPath); +} diff --git a/benchmarks/cdk/bin/datafusion-bench.ts b/benchmarks/cdk/bin/datafusion-bench.ts index 7363130..c12dcb0 100644 --- a/benchmarks/cdk/bin/datafusion-bench.ts +++ b/benchmarks/cdk/bin/datafusion-bench.ts @@ -1,193 +1,124 @@ import path from "path"; -import fs from "fs/promises"; -import { Command } from "commander"; -import { z } from 'zod'; - -const ROOT = path.join(__dirname, '../../..') +import {Command} from "commander"; +import {z} from 'zod'; +import {BenchmarkRunner, ROOT, runBenchmark} from "./@bench-common"; // Remember to port-forward a worker with // aws ssm start-session --target {host-id} --document-name AWS-StartPortForwardingSession --parameters "portNumber=9000,localPortNumber=9000" -async function main () { - const program = new Command(); - - program - .option('--sf ', 'Scale factor', '1') - .option('-i, --iterations ', 'Number of iterations', '3') - .option('--files-per-task ', 'Files per task', '4') - .option('--cardinality-task-sf ', 'Cardinality task scale factor', '2') - .option('--shuffle-batch-size ', 'Shuffle batch coalescing size (number of rows)', '8192') - .option('--query ', 'A specific query to run', undefined) - .parse(process.argv); - - const options = program.opts(); - - const sf = parseInt(options.sf); - const iterations = parseInt(options.iterations); - const filesPerTask = parseInt(options.filesPerTask); - const cardinalityTaskSf = parseInt(options.cardinalityTaskSf); - const shuffleBatchSize = parseInt(options.shuffleBatchSize); - - // Compare with previous results first - const results: BenchmarkResults = { queries: [] }; - const queriesPath = path.join(ROOT, "testdata", "tpch", "queries") - - console.log("Creating tables...") - await query(createTablesSql(sf)) - await query(` - SET distributed.files_per_task=${filesPerTask}; - SET distributed.cardinality_task_count_factor=${cardinalityTaskSf}; - SET distributed.shuffle_batch_size=${shuffleBatchSize} - `) - - for (let id of IDS) { - if (options.query && parseInt(options.query) !== id) { - continue - } - const queryId = `q${id}`; - const filePath = path.join(queriesPath, `${queryId}.sql`) - const content = await fs.readFile(filePath, 'utf-8') - - const queryResult: QueryResult = { - query: queryId, - iterations: [] - }; +async function main() { + const program = new Command(); + + program + .option('--sf ', 'Scale factor', '1') + .option('-i, --iterations ', 'Number of iterations', '3') + .option('--files-per-task ', 'Files per task', '4') + .option('--cardinality-task-sf ', 'Cardinality task scale factor', '2') + .option('--shuffle-batch-size ', 'Shuffle batch coalescing size (number of rows)', '8192') + .option('--query ', 'A specific query to run', undefined) + .parse(process.argv); + + const options = program.opts(); + + const sf = parseInt(options.sf); + const iterations = parseInt(options.iterations); + const filesPerTask = parseInt(options.filesPerTask); + const cardinalityTaskSf = parseInt(options.cardinalityTaskSf); + const shuffleBatchSize = parseInt(options.shuffleBatchSize); + const specificQuery = options.query ? parseInt(options.query) : undefined; + + const runner = new DataFusionRunner({ + filesPerTask, + cardinalityTaskSf, + shuffleBatchSize, + }); + + const outputPath = path.join(ROOT, "benchmarks", "data", `tpch_sf${sf}`, "remote-results.json"); + + await runBenchmark(runner, { + sf, + iterations, + specificQuery, + outputPath, + }); +} - for (let i = 0; i < iterations; i++) { - const start = new Date() - const response = await query(content) - const elapsed = Math.round(new Date().getTime() - start.getTime()) +const QueryResponse = z.object({ + count: z.number(), + plan: z.string() +}) +type QueryResponse = z.infer - queryResult.iterations.push({ - elapsed, - row_count: response.count - }); +class DataFusionRunner implements BenchmarkRunner { + private url = 'http://localhost:9000'; - console.log( - `Query ${id} iteration ${i} took ${elapsed} ms and returned ${response.count} rows` - ); + constructor(private readonly options: { + filesPerTask: number; + cardinalityTaskSf: number; + shuffleBatchSize: number; + }) { } - const avg = Math.round( - queryResult.iterations.reduce((a, b) => a + b.elapsed, 0) / queryResult.iterations.length - ); - console.log(`Query ${id} avg time: ${avg} ms`); - - results.queries.push(queryResult); - } + async executeQuery(sql: string): Promise<{ rowCount: number }> { + let response + if (sql.includes("create view")) { + // This is query 15 + let [createView, query, dropView] = sql.split(";") + await this.query(createView); + response = await this.query(query) + await this.query(dropView); + } else { + response = await this.query(sql) + } + + return {rowCount: response.count}; + } - // Write results and compare - const outputPath = path.join(ROOT, "benchmarks", "data", `tpch_sf${sf}`, "remote-results.json"); - await compareWithPrevious(results, outputPath); - await writeJson(results, outputPath); -} + private async query(sql: string): Promise { + const url = new URL(this.url); + url.searchParams.set('sql', sql); -// Simple data structures -type QueryResult = { - query: string; - iterations: { elapsed: number; row_count: number }[]; -} + const response = await fetch(url.toString()); -type BenchmarkResults = { - queries: QueryResult[]; -} + if (!response.ok) { + const msg = await response.text(); + throw new Error(`Query failed: ${response.status} ${msg}`); + } -const BenchmarkResults = z.object({ - queries: z.array(z.object({ - query: z.string(), - iterations: z.array(z.object({ - elapsed: z.number(), - row_count: z.number() - })) - })) -}) - -async function writeJson (results: BenchmarkResults, outputPath?: string) { - if (!outputPath) return; - await fs.writeFile(outputPath, JSON.stringify(results, null, 2)); -} - -async function compareWithPrevious (results: BenchmarkResults, outputPath: string) { - let prevResults: BenchmarkResults; - try { - const prevContent = await fs.readFile(outputPath, 'utf-8'); - prevResults = BenchmarkResults.parse(JSON.parse(prevContent)); - } catch { - return; // No previous results to compare - } - - console.log('\n==== Comparison with previous run ===='); - - for (const query of results.queries) { - const prevQuery = prevResults.queries.find(q => q.query === query.query); - if (!prevQuery || prevQuery.iterations.length === 0 || query.iterations.length === 0) { - continue; + const unparsed = await response.json(); + return QueryResponse.parse(unparsed); } - const avgPrev = Math.round( - prevQuery.iterations.reduce((sum, i) => sum + i.elapsed, 0) / prevQuery.iterations.length - ); - const avg = Math.round( - query.iterations.reduce((sum, i) => sum + i.elapsed, 0) / query.iterations.length - ); - - const factor = avg < avgPrev ? avgPrev / avg : avg / avgPrev; - const tag = avg < avgPrev ? "faster" : "slower"; - const emoji = factor > 1.2 ? (avg < avgPrev ? "✅" : "❌") : (avg < avgPrev ? "✔" : "✖"); - - console.log( - `${query.query.padStart(8)}: prev=${avgPrev.toString().padStart(4)} ms, new=${avg.toString().padStart(4)} ms, ${factor.toFixed(2)}x ${tag} ${emoji}` - ); - } -} - - -function createTablesSql (sf: number): string { - let stmt = '' - for (const tbl of [ - "lineitem", - "orders", - "part", - "partsupp", - "customer", - "nation", - "region", - "supplier", - ]) { - // language=SQL format=false - stmt += ` + async createTables(sf: number): Promise { + let stmt = ''; + for (const tbl of [ + "lineitem", + "orders", + "part", + "partsupp", + "customer", + "nation", + "region", + "supplier", + ]) { + // language=SQL format=false + stmt += ` DROP TABLE IF EXISTS ${tbl}; CREATE EXTERNAL TABLE IF NOT EXISTS ${tbl} STORED AS PARQUET LOCATION 's3://datafusion-distributed-benchmarks/tpch_sf${sf}/${tbl}/'; - ` - } - return stmt -} - -const IDS = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22] - -const QueryResponse = z.object({ - count: z.number(), - plan: z.string() -}) -type QueryResponse = z.infer - -async function query (sql: string): Promise { - const url = new URL('http://localhost:9000') - url.searchParams.set('sql', sql) - - const response = await fetch(url.toString()) - - if (!response.ok) { - const msg = await response.text() - throw new Error(`Query failed: ${response.status} ${msg}`) - } + `; + } + await this.query(stmt); + await this.query(` + SET distributed.files_per_task=${this.options.filesPerTask}; + SET distributed.cardinality_task_count_factor=${this.options.cardinalityTaskSf}; + SET distributed.shuffle_batch_size=${this.options.shuffleBatchSize} + `); + } - const unparsed = await response.json() - return QueryResponse.parse(unparsed) } main() - .catch(err => { - console.error(err) - process.exit(1) - }) + .catch(err => { + console.error(err) + process.exit(1) + }) diff --git a/benchmarks/cdk/bin/trino-bench.ts b/benchmarks/cdk/bin/trino-bench.ts new file mode 100644 index 0000000..d353637 --- /dev/null +++ b/benchmarks/cdk/bin/trino-bench.ts @@ -0,0 +1,249 @@ +import path from "path"; +import { Command } from "commander"; +import { ROOT, runBenchmark, BenchmarkRunner } from "./@bench-common"; + +// Remember to port-forward Trino coordinator with +// aws ssm start-session --target {instance-0-id} --document-name AWS-StartPortForwardingSession --parameters "portNumber=8080,localPortNumber=8080" + +async function main() { + const program = new Command(); + + program + .option('--sf ', 'Scale factor', '1') + .option('-i, --iterations ', 'Number of iterations', '3') + .option('--query ', 'A specific query to run', undefined) + .parse(process.argv); + + const options = program.opts(); + + const sf = parseInt(options.sf); + const iterations = parseInt(options.iterations); + const specificQuery = options.query ? parseInt(options.query) : undefined; + + const runner = new TrinoRunner({ sf }); + const outputPath = path.join(ROOT, "benchmarks", "data", `tpch_sf${sf}`, "remote-results.json"); + + await runBenchmark(runner, { + sf, + iterations, + specificQuery, + outputPath, + }); +} + +class TrinoRunner implements BenchmarkRunner { + private trinoUrl = 'http://localhost:8080'; + + constructor(private readonly options: { + sf: number + }) { + } + + + async executeQuery(sql: string): Promise<{ rowCount: number }> { + // Fix query 4: Add DATE prefix to date literals that don't have it. + sql = sql.replace(/(? { + // Submit query + const submitResponse = await fetch(`${this.trinoUrl}/v1/statement`, { + method: 'POST', + headers: { + 'X-Trino-User': 'benchmark', + 'X-Trino-Catalog': 'hive', + 'X-Trino-Schema': `tpch_sf${this.options.sf}`, + }, + body: sql.trim().replace(/;+$/, ''), + }); + + if (!submitResponse.ok) { + const msg = await submitResponse.text(); + throw new Error(`Query submission failed: ${submitResponse.status} ${msg}`); + } + + let result: any = await submitResponse.json(); + let rowCount = 0; + + // Poll for results + while (result.nextUri) { + const pollResponse = await fetch(result.nextUri); + + if (!pollResponse.ok) { + const msg = await pollResponse.text(); + throw new Error(`Query polling failed: ${pollResponse.status} ${msg}`); + } + + result = await pollResponse.json(); + + // Count rows if data is present + if (result.data) { + if (typeof result.data?.[0]?.[0] === 'string') { + // Extract row count from EXPLAIN ANALYZE output + const outputMatch = result.data[0][0].match(/Output.*?(\d+)\s+rows/i); + if (outputMatch) { + rowCount = parseInt(outputMatch[1]); + } + } else { + rowCount += result.data.length; + } + } + + // Check for errors + if (result.error) { + throw new Error(`Query failed: ${result.error.message}`); + } + } + + return { rowCount }; + } + + async createTables(sf: number): Promise { + const schema = `tpch_sf${sf}`; + + // Create schema first + await this.executeSingleStatement(`CREATE SCHEMA IF NOT EXISTS hive.${schema} WITH (location = 's3://datafusion-distributed-benchmarks/tpch_sf${sf}/')`); + + // Create customer table + await this.executeSingleStatement(`DROP TABLE IF EXISTS hive.${schema}.customer`); + await this.executeSingleStatement(`CREATE TABLE hive.${schema}.customer + ( + c_custkey bigint, + c_name varchar(25), + c_address varchar(40), + c_nationkey bigint, + c_phone varchar(15), + c_acctbal decimal(15, 2), + c_mktsegment varchar(10), + c_comment varchar(117) + ) + WITH (external_location = 's3://datafusion-distributed-benchmarks/tpch_sf${sf}/customer/', format = 'PARQUET')`); + + // Create lineitem table + await this.executeSingleStatement(`DROP TABLE IF EXISTS hive.${schema}.lineitem`); + await this.executeSingleStatement(`CREATE TABLE hive.${schema}.lineitem + ( + l_orderkey bigint, + l_partkey bigint, + l_suppkey bigint, + l_linenumber integer, + l_quantity decimal(15, 2), + l_extendedprice decimal(15, 2), + l_discount decimal(15, 2), + l_tax decimal(15, 2), + l_returnflag varchar(1), + l_linestatus varchar(1), + l_shipdate date, + l_commitdate date, + l_receiptdate date, + l_shipinstruct varchar(25), + l_shipmode varchar(10), + l_comment varchar(44) + ) + WITH (external_location = 's3://datafusion-distributed-benchmarks/tpch_sf${sf}/lineitem/', format = 'PARQUET')`); + + // Create nation table + await this.executeSingleStatement(`DROP TABLE IF EXISTS hive.${schema}.nation`); + await this.executeSingleStatement(`CREATE TABLE hive.${schema}.nation + ( + n_nationkey bigint, + n_name varchar(25), + n_regionkey bigint, + n_comment varchar(152) + ) + WITH (external_location = 's3://datafusion-distributed-benchmarks/tpch_sf${sf}/nation/', format = 'PARQUET')`); + + // Create orders table + await this.executeSingleStatement(`DROP TABLE IF EXISTS hive.${schema}.orders`); + await this.executeSingleStatement(`CREATE TABLE hive.${schema}.orders + ( + o_orderkey bigint, + o_custkey bigint, + o_orderstatus varchar(1), + o_totalprice decimal(15, 2), + o_orderdate date, + o_orderpriority varchar(15), + o_clerk varchar(15), + o_shippriority integer, + o_comment varchar(79) + ) + WITH (external_location = 's3://datafusion-distributed-benchmarks/tpch_sf${sf}/orders/', format = 'PARQUET')`); + + // Create part table + await this.executeSingleStatement(`DROP TABLE IF EXISTS hive.${schema}.part`); + await this.executeSingleStatement(`CREATE TABLE hive.${schema}.part + ( + p_partkey bigint, + p_name varchar(55), + p_mfgr varchar(25), + p_brand varchar(10), + p_type varchar(25), + p_size integer, + p_container varchar(10), + p_retailprice decimal(15, 2), + p_comment varchar(23) + ) + WITH (external_location = 's3://datafusion-distributed-benchmarks/tpch_sf${sf}/part/', format = 'PARQUET')`); + + // Create partsupp table + await this.executeSingleStatement(`DROP TABLE IF EXISTS hive.${schema}.partsupp`); + await this.executeSingleStatement(`CREATE TABLE hive.${schema}.partsupp + ( + ps_partkey bigint, + ps_suppkey bigint, + ps_availqty integer, + ps_supplycost decimal(15, 2), + ps_comment varchar(199) + ) + WITH (external_location = 's3://datafusion-distributed-benchmarks/tpch_sf${sf}/partsupp/', format = 'PARQUET')`); + + // Create region table + await this.executeSingleStatement(`DROP TABLE IF EXISTS hive.${schema}.region`); + await this.executeSingleStatement(`CREATE TABLE hive.${schema}.region + ( + r_regionkey bigint, + r_name varchar(25), + r_comment varchar(152) + ) + WITH (external_location = 's3://datafusion-distributed-benchmarks/tpch_sf${sf}/region/', format = 'PARQUET')`); + + // Create supplier table + await this.executeSingleStatement(`DROP TABLE IF EXISTS hive.${schema}.supplier`); + await this.executeSingleStatement(`CREATE TABLE hive.${schema}.supplier + ( + s_suppkey bigint, + s_name varchar(25), + s_address varchar(40), + s_nationkey bigint, + s_phone varchar(15), + s_acctbal decimal(15, 2), + s_comment varchar(101) + ) + WITH (external_location = 's3://datafusion-distributed-benchmarks/tpch_sf${sf}/supplier/', format = 'PARQUET')`); + } +} + +main() + .catch(err => { + console.error(err) + process.exit(1) + }) diff --git a/benchmarks/cdk/lib/cdk-stack.ts b/benchmarks/cdk/lib/cdk-stack.ts index 96e15f2..c821217 100644 --- a/benchmarks/cdk/lib/cdk-stack.ts +++ b/benchmarks/cdk/lib/cdk-stack.ts @@ -7,6 +7,7 @@ import * as cr from 'aws-cdk-lib/custom-resources'; import { Construct } from 'constructs'; import * as path from 'path'; import { execSync } from 'child_process'; +import { trinoWorkerCommands, trinoUserDataCommands } from "./trino"; const ROOT = path.join(__dirname, '../../..') @@ -18,7 +19,7 @@ interface CdkStackProps extends StackProps { } export class CdkStack extends Stack { - constructor (scope: Construct, id: string, props: CdkStackProps) { + constructor(scope: Construct, id: string, props: CdkStackProps) { super(scope, id, props); const { config } = props; @@ -83,6 +84,25 @@ export class CdkStack extends Stack { resources: ['*'], })); + // Grant Glue permissions for Trino Hive metastore + role.addToPolicy(new iam.PolicyStatement({ + actions: [ + 'glue:GetDatabase', + 'glue:GetDatabases', + 'glue:GetTable', + 'glue:GetTables', + 'glue:GetPartition', + 'glue:GetPartitions', + 'glue:CreateTable', + 'glue:UpdateTable', + 'glue:DeleteTable', + 'glue:CreateDatabase', + 'glue:UpdateDatabase', + 'glue:DeleteDatabase', + ], + resources: ['*'], + })); + // Grant read access to the bucket and worker binary bucket.grantRead(role); workerBinary.grantRead(role); @@ -117,7 +137,8 @@ EOF`, // Enable and start the service 'systemctl daemon-reload', 'systemctl enable worker', - 'systemctl start worker' + 'systemctl start worker', + ...trinoUserDataCommands(i, this.region) ); const instance = new ec2.Instance(this, `BenchmarkInstance${i}`, { @@ -158,33 +179,47 @@ sudo journalctl -u worker.service -f -o cat // Downloads the latest version of the worker binary and restarts the systemd service. // This is done instead of the userData.addS3Download() so that the instance does not need // to restart every time a new worker binary is available. - const restartWorker = new cr.AwsCustomResource(this, 'RestartWorkerService', { - onUpdate: { - service: 'SSM', - action: 'sendCommand', - parameters: { - DocumentName: 'AWS-RunShellScript', - InstanceIds: instances.map(inst => inst.instanceId), - Parameters: { - commands: [ - `aws s3 cp s3://${workerBinary.s3BucketName}/${workerBinary.s3ObjectKey} /usr/local/bin/worker`, - 'chmod +x /usr/local/bin/worker', - 'systemctl restart worker', - ], - }, + sendCommandsUnconditionally(this, 'RestartWorkerService', instances, [ + `aws s3 cp s3://${workerBinary.s3BucketName}/${workerBinary.s3ObjectKey} /usr/local/bin/worker`, + 'chmod +x /usr/local/bin/worker', + 'systemctl restart worker', + ]) + + // Then start workers (they will discover the coordinator) + const [coordinator, ...workers] = instances + sendCommandsUnconditionally(this, 'TrinoCoordinatorCommands', [coordinator], ['systemctl start trino']) + sendCommandsUnconditionally(this, 'TrinoWorkerCommands', workers, trinoWorkerCommands(coordinator)) + } +} + +function sendCommandsUnconditionally( + construct: Construct, + name: string, + instances: ec2.Instance[], + commands: string[] +) { + const cmd = new cr.AwsCustomResource(construct, name, { + onUpdate: { + service: 'SSM', + action: 'sendCommand', + parameters: { + DocumentName: 'AWS-RunShellScript', + InstanceIds: instances.map(inst => inst.instanceId), + Parameters: { + commands }, - physicalResourceId: cr.PhysicalResourceId.of(`restart-${Date.now()}`), - ignoreErrorCodesMatching: '.*', }, - policy: cr.AwsCustomResourcePolicy.fromStatements([ - new iam.PolicyStatement({ - actions: ['ssm:SendCommand'], - resources: ['*'], - }), - ]), - }); - - // Ensure instances are created before restarting - restartWorker.node.addDependency(...instances) - } + physicalResourceId: cr.PhysicalResourceId.of(`${name}-${Date.now()}`), + ignoreErrorCodesMatching: '.*', + }, + policy: cr.AwsCustomResourcePolicy.fromStatements([ + new iam.PolicyStatement({ + actions: ['ssm:SendCommand'], + resources: ['*'], + }), + ]), + }); + + // Ensure instances are created before restarting + cmd.node.addDependency(...instances) } diff --git a/benchmarks/cdk/lib/trino.ts b/benchmarks/cdk/lib/trino.ts new file mode 100644 index 0000000..ee97a45 --- /dev/null +++ b/benchmarks/cdk/lib/trino.ts @@ -0,0 +1,108 @@ +import * as ec2 from 'aws-cdk-lib/aws-ec2'; + +const TRINO_VERSION = 476 + +export function trinoUserDataCommands(instanceIndex: number, region: string): string[] { + const isCoordinator = instanceIndex === 0; + + return [ + // Install Java 24 for Trino (Trino 478 requires Java 24+) + 'yum install -y java-24-amazon-corretto-headless python', + + // Download and install Trino 478 (latest version) + 'cd /opt', + `curl -L -o trino-server.tar.gz https://repo1.maven.org/maven2/io/trino/trino-server/${TRINO_VERSION}/trino-server-${TRINO_VERSION}.tar.gz`, + 'tar -xzf trino-server.tar.gz', + `mv trino-server-${TRINO_VERSION} trino-server`, + 'rm trino-server.tar.gz', + + // Create Trino directories + 'mkdir -p /var/trino/data', + 'mkdir -p /opt/trino-server/etc/catalog', + + // Configure Trino node properties + `cat > /opt/trino-server/etc/node.properties << 'TRINO_EOF' +node.environment=benchmark +node.id=instance-${instanceIndex} +node.data-dir=/var/trino/data +TRINO_EOF`, + + // Configure Trino JVM settings (minimal - using conservative 8GB heap) + `cat > /opt/trino-server/etc/jvm.config << 'TRINO_EOF' +-server +-Xmx8G +-XX:+UseG1GC +-XX:G1HeapRegionSize=32M +-XX:+ExplicitGCInvokesConcurrent +-XX:+HeapDumpOnOutOfMemoryError +-XX:+ExitOnOutOfMemoryError +-Djdk.attach.allowAttachSelf=true +TRINO_EOF`, + + // Configure Trino config.properties (workers will be reconfigured during lazy startup) + isCoordinator + ? `cat > /opt/trino-server/etc/config.properties << 'TRINO_EOF' +coordinator=true +node-scheduler.include-coordinator=true +http-server.http.port=8080 +discovery.uri=http://localhost:8080 +TRINO_EOF` + : `cat > /opt/trino-server/etc/config.properties << 'TRINO_EOF' +coordinator=false +http-server.http.port=8080 +discovery.uri=http://localhost:8080 +TRINO_EOF`, + + // Configure Hive catalog with AWS Glue metastore + `cat > /opt/trino-server/etc/catalog/hive.properties << 'TRINO_EOF' +connector.name=hive +hive.metastore=glue +hive.metastore.glue.region=${region} +fs.native-s3.enabled=true +s3.region=${region} +TRINO_EOF`, + + // Configure TPCH catalog for reference + `cat > /opt/trino-server/etc/catalog/tpch.properties << 'TRINO_EOF' +connector.name=tpch +TRINO_EOF`, + + // Download Trino CLI + 'curl -L -o /usr/local/bin/trino https://repo1.maven.org/maven2/io/trino/trino-cli/478/trino-cli-478-executable.jar', + 'chmod +x /usr/local/bin/trino', + + // Create Trino systemd service + `cat > /etc/systemd/system/trino.service << 'TRINO_EOF' +[Unit] +Description=Trino Server +After=network.target + +[Service] +Type=forking +ExecStart=/opt/trino-server/bin/launcher start +ExecStop=/opt/trino-server/bin/launcher stop +Restart=on-failure +User=root +WorkingDirectory=/opt/trino-server + +[Install] +WantedBy=multi-user.target +TRINO_EOF`, + + // Enable Trino (but don't start yet - will be started lazily after all instances are up) + 'systemctl daemon-reload', + 'systemctl enable trino', + 'systemctl start trino' + ]; +} + +export function trinoWorkerCommands(coordinator: ec2.Instance) { + return [ + `cat > /opt/trino-server/etc/config.properties << TRINO_EOF +coordinator=false +http-server.http.port=8080 +discovery.uri=http://${coordinator.instancePrivateIp}:8080 +TRINO_EOF`, + 'systemctl restart trino', + ] +} \ No newline at end of file diff --git a/benchmarks/cdk/package.json b/benchmarks/cdk/package.json index fc4fb94..3c006f8 100644 --- a/benchmarks/cdk/package.json +++ b/benchmarks/cdk/package.json @@ -10,7 +10,8 @@ "test": "jest", "cdk": "cdk", "sync-bucket": "aws s3 sync ../data s3://datafusion-distributed-benchmarks/", - "datafusion-bench": "npx ts-node bin/datafusion-bench.ts" + "datafusion-bench": "npx ts-node bin/datafusion-bench.ts", + "trino-bench": "npx ts-node bin/trino-bench.ts" }, "devDependencies": { "@types/jest": "^29.5.14",