diff --git a/benchmarks/cdk/.gitignore b/benchmarks/cdk/.gitignore index f60797b6..965593e2 100644 --- a/benchmarks/cdk/.gitignore +++ b/benchmarks/cdk/.gitignore @@ -1,4 +1,5 @@ *.js +*.log !jest.config.js *.d.ts node_modules diff --git a/benchmarks/cdk/bin/spark-bench.ts b/benchmarks/cdk/bin/spark-bench.ts new file mode 100644 index 00000000..ef4462b4 --- /dev/null +++ b/benchmarks/cdk/bin/spark-bench.ts @@ -0,0 +1,177 @@ +import path from "path"; +import {Command} from "commander"; +import {ROOT, runBenchmark, BenchmarkRunner} from "./@bench-common"; +import * as hive from 'hive-driver'; + +const { TCLIService, TCLIService_types } = hive.thrift; + +// Prerequisites: +// Port-forward Spark Thrift Server: aws ssm start-session --target {instance-0-id} --document-name AWS-StartPortForwardingSession --parameters "portNumber=10000,localPortNumber=10000" + +async function main() { + const program = new Command(); + + program + .option('--sf ', 'Scale factor', '1') + .option('-i, --iterations ', 'Number of iterations', '3') + .option('--query ', 'A specific query to run', undefined) + .option('--host ', 'Thrift server host', 'localhost') + .option('--port ', 'Thrift server port', '10000') + .parse(process.argv); + + const options = program.opts(); + + const sf = parseInt(options.sf); + const iterations = parseInt(options.iterations); + const specificQuery = options.query ? parseInt(options.query) : undefined; + const host = options.host; + const port = parseInt(options.port); + + const runner = new SparkRunner({ sf, host, port }); + const outputPath = path.join(ROOT, "benchmarks", "data", `tpch_sf${sf}`, "spark-results.json"); + + try { + await runBenchmark(runner, { + sf, + iterations, + specificQuery, + outputPath, + }); + } finally { + await runner.close(); + } +} + +class SparkRunner implements BenchmarkRunner { + private client: any = null; + private session: any = null; + + constructor(private readonly options: { + sf: number; + host: string; + port: number; + }) { + } + + private async getSession() { + if (!this.session) { + const client = new hive.HiveClient(TCLIService, TCLIService_types); + + this.client = await client.connect( + { + host: this.options.host, + port: this.options.port + }, + new hive.connections.TcpConnection(), + new hive.auth.NoSaslAuthentication() + ); + + this.session = await this.client.openSession({ + client_protocol: TCLIService_types.TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10 + }); + + console.log('Connected to Spark Thrift Server'); + } + return this.session; + } + + async close() { + if (this.session) { + await this.session.close(); + this.session = null; + this.client = null; + } + } + + async executeQuery(sql: string): Promise<{ rowCount: number }> { + // Handle query 15 which has multiple statements + if (sql.includes("create view")) { + const statements = sql.split(";").map(s => s.trim()).filter(s => s); + const [createView, query, dropView] = statements; + + await this.executeSingleStatement(createView); + const result = await this.executeSingleStatement(query); + await this.executeSingleStatement(dropView); + + return result; + } else { + return await this.executeSingleStatement(sql); + } + } + + private async executeSingleStatement(sql: string): Promise<{ rowCount: number }> { + const session = await this.getSession(); + + const operation = await session.executeStatement(sql); + await operation.finished(); + + // Fetch schema and data + let rowCount; + + const tableResult = await operation.fetchAll(); + rowCount = tableResult.length; + + await operation.close(); + + return { rowCount }; + } + + async createTables(sf: number): Promise { + const database = `tpch_sf${sf}`; + + console.log(`Creating database ${database}...`); + + await this.executeSingleStatement(`CREATE DATABASE IF NOT EXISTS ${database}`); + await this.executeSingleStatement(`USE ${database}`); + + console.log('Creating tables...'); + + const tables = [ + { + name: 'customer', + schema: `c_custkey BIGINT, c_name STRING, c_address STRING, c_nationkey BIGINT, c_phone STRING, c_acctbal DECIMAL(15,2), c_mktsegment STRING, c_comment STRING` + }, + { + name: 'lineitem', + schema: `l_orderkey BIGINT, l_partkey BIGINT, l_suppkey BIGINT, l_linenumber INT, l_quantity DECIMAL(15,2), l_extendedprice DECIMAL(15,2), l_discount DECIMAL(15,2), l_tax DECIMAL(15,2), l_returnflag STRING, l_linestatus STRING, l_shipdate DATE, l_commitdate DATE, l_receiptdate DATE, l_shipinstruct STRING, l_shipmode STRING, l_comment STRING` + }, + { + name: 'nation', + schema: `n_nationkey BIGINT, n_name STRING, n_regionkey BIGINT, n_comment STRING` + }, + { + name: 'orders', + schema: `o_orderkey BIGINT, o_custkey BIGINT, o_orderstatus STRING, o_totalprice DECIMAL(15,2), o_orderdate DATE, o_orderpriority STRING, o_clerk STRING, o_shippriority INT, o_comment STRING` + }, + { + name: 'part', + schema: `p_partkey BIGINT, p_name STRING, p_mfgr STRING, p_brand STRING, p_type STRING, p_size INT, p_container STRING, p_retailprice DECIMAL(15,2), p_comment STRING` + }, + { + name: 'partsupp', + schema: `ps_partkey BIGINT, ps_suppkey BIGINT, ps_availqty INT, ps_supplycost DECIMAL(15,2), ps_comment STRING` + }, + { + name: 'region', + schema: `r_regionkey BIGINT, r_name STRING, r_comment STRING` + }, + { + name: 'supplier', + schema: `s_suppkey BIGINT, s_name STRING, s_address STRING, s_nationkey BIGINT, s_phone STRING, s_acctbal DECIMAL(15,2), s_comment STRING` + } + ]; + + for (const table of tables) { + await this.executeSingleStatement(`DROP TABLE IF EXISTS ${table.name}`); + await this.executeSingleStatement(`CREATE EXTERNAL TABLE ${table.name} (${table.schema}) STORED AS PARQUET LOCATION 's3a://datafusion-distributed-benchmarks/tpch_sf${sf}/${table.name}/'`); + } + + console.log('Tables created successfully!'); + } +} + +main() + .catch(err => { + console.error(err) + process.exit(1) + }) diff --git a/benchmarks/cdk/lib/cdk-stack.ts b/benchmarks/cdk/lib/cdk-stack.ts index c821217f..8b597bd8 100644 --- a/benchmarks/cdk/lib/cdk-stack.ts +++ b/benchmarks/cdk/lib/cdk-stack.ts @@ -1,13 +1,14 @@ -import { CfnOutput, RemovalPolicy, Stack, StackProps, Tags } from 'aws-cdk-lib'; +import {CfnOutput, RemovalPolicy, Stack, StackProps, Tags} from 'aws-cdk-lib'; import * as ec2 from 'aws-cdk-lib/aws-ec2'; import * as s3 from 'aws-cdk-lib/aws-s3'; import * as iam from 'aws-cdk-lib/aws-iam'; import * as s3assets from 'aws-cdk-lib/aws-s3-assets'; import * as cr from 'aws-cdk-lib/custom-resources'; -import { Construct } from 'constructs'; +import {Construct} from 'constructs'; import * as path from 'path'; -import { execSync } from 'child_process'; -import { trinoWorkerCommands, trinoUserDataCommands } from "./trino"; +import {execSync} from 'child_process'; +import {trinoWorkerCommands, trinoUserDataCommands} from "./trino"; +import {sparkMasterCommands, sparkWorkerCommands, sparkUserDataCommands} from "./spark"; const ROOT = path.join(__dirname, '../../..') @@ -138,7 +139,8 @@ EOF`, 'systemctl daemon-reload', 'systemctl enable worker', 'systemctl start worker', - ...trinoUserDataCommands(i, this.region) + ...trinoUserDataCommands(i, this.region), + ...sparkUserDataCommands(i, this.region) ); const instance = new ec2.Instance(this, `BenchmarkInstance${i}`, { @@ -149,7 +151,8 @@ EOF`, machineImage: ec2.MachineImage.latestAmazonLinux2023(), securityGroup, role, - userData + userData, + userDataCausesReplacement: true }); // Tag for peer discovery @@ -189,6 +192,11 @@ sudo journalctl -u worker.service -f -o cat const [coordinator, ...workers] = instances sendCommandsUnconditionally(this, 'TrinoCoordinatorCommands', [coordinator], ['systemctl start trino']) sendCommandsUnconditionally(this, 'TrinoWorkerCommands', workers, trinoWorkerCommands(coordinator)) + + // Start Spark master and workers + const [sparkMaster, ...sparkWorkers] = instances + sendCommandsUnconditionally(this, 'SparkMasterCommands', [sparkMaster], sparkMasterCommands()) + sendCommandsUnconditionally(this, 'SparkWorkerCommands', sparkWorkers, sparkWorkerCommands(sparkMaster)) } } diff --git a/benchmarks/cdk/lib/spark.ts b/benchmarks/cdk/lib/spark.ts new file mode 100644 index 00000000..393ecc7f --- /dev/null +++ b/benchmarks/cdk/lib/spark.ts @@ -0,0 +1,180 @@ +import * as ec2 from 'aws-cdk-lib/aws-ec2'; + +const SPARK_VERSION = '4.0.1' +const HADOOP_VERSION = '3' + +export function sparkUserDataCommands(instanceIndex: number, region: string): string[] { + const isMaster = instanceIndex === 0; + + return [ + // Install Java 17 for Spark + 'yum install -y java-17-amazon-corretto-headless', + + // Download and install Spark + 'cd /opt', + `curl -L -o spark.tgz https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz`, + 'tar -xzf spark.tgz', + `mv spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION} spark`, + 'rm spark.tgz', + + // Create Spark directories + 'mkdir -p /var/spark/logs', + 'mkdir -p /var/spark/work', + 'mkdir -p /var/spark/tmp', + + // Download AWS SDK JARs for S3 access + 'cd /opt/spark/jars', + 'curl -L -O https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar', + 'curl -L -O https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.262/aws-java-sdk-bundle-1.12.262.jar', + + // Set JAVA_HOME and SPARK_HOME + `cat > /opt/spark/conf/spark-env.sh << 'SPARK_EOF' +export JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.x86_64 +export SPARK_HOME=/opt/spark +export SPARK_LOG_DIR=/var/spark/logs +export SPARK_WORKER_DIR=/var/spark/work +export SPARK_LOCAL_DIRS=/var/spark/tmp +export SPARK_MASTER_HOST=localhost +export SPARK_MASTER_PORT=7077 +export SPARK_MASTER_WEBUI_PORT=8080 +export SPARK_WORKER_WEBUI_PORT=8081 +export SPARK_WORKER_MEMORY=8g +export SPARK_WORKER_CORES=4 +SPARK_EOF`, + 'chmod +x /opt/spark/conf/spark-env.sh', + + // Configure Spark defaults + `cat > /opt/spark/conf/spark-defaults.conf << 'SPARK_EOF' +spark.master spark://localhost:7077 +spark.eventLog.enabled true +spark.eventLog.dir /var/spark/logs +spark.history.fs.logDirectory /var/spark/logs +spark.sql.warehouse.dir /var/spark/warehouse +spark.driver.memory 8g +spark.executor.memory 8g +spark.sql.adaptive.enabled true +spark.sql.adaptive.coalescePartitions.enabled true +spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem +spark.hadoop.fs.s3a.aws.credentials.provider com.amazonaws.auth.InstanceProfileCredentialsProvider +SPARK_EOF`, + + // Create Spark Thrift Server systemd service (only on master) + ...(isMaster ? [ + `cat > /etc/systemd/system/spark-thrift.service << 'SPARK_THRIFT_EOF' +[Unit] +Description=Spark Thrift Server +After=network.target spark-master.service spark-worker.service +Requires=spark-master.service + +[Service] +Type=forking +ExecStart=/opt/spark/sbin/start-thriftserver.sh --master spark://localhost:7077 --hiveconf hive.server2.thrift.port=10000 +ExecStop=/opt/spark/sbin/stop-thriftserver.sh +Restart=on-failure +User=root +WorkingDirectory=/opt/spark +Environment="JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.x86_64" +Environment="SPARK_HOME=/opt/spark" + +[Install] +WantedBy=multi-user.target +SPARK_THRIFT_EOF`, + 'systemctl enable spark-thrift', + ] : []), + + // Create Spark Master systemd service + isMaster + ? `cat > /etc/systemd/system/spark-master.service << 'SPARK_EOF' +[Unit] +Description=Spark Master +After=network.target + +[Service] +Type=forking +ExecStart=/opt/spark/sbin/start-master.sh +ExecStop=/opt/spark/sbin/stop-master.sh +Restart=on-failure +User=root +WorkingDirectory=/opt/spark +Environment="JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.x86_64" +Environment="SPARK_HOME=/opt/spark" + +[Install] +WantedBy=multi-user.target +SPARK_EOF` + : 'echo "Worker node - master service not created"', + + // Create Spark Worker systemd service + `cat > /etc/systemd/system/spark-worker.service << 'SPARK_EOF' +[Unit] +Description=Spark Worker +After=network.target + +[Service] +Type=forking +ExecStart=/opt/spark/sbin/start-worker.sh spark://localhost:7077 +ExecStop=/opt/spark/sbin/stop-worker.sh +Restart=on-failure +User=root +WorkingDirectory=/opt/spark +Environment="JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.x86_64" +Environment="SPARK_HOME=/opt/spark" + +[Install] +WantedBy=multi-user.target +SPARK_EOF`, + + // Enable services (but don't start yet - will be started lazily after all instances are up) + 'systemctl daemon-reload', + ...(isMaster + ? [ + 'systemctl enable spark-master', + 'systemctl enable spark-worker' + ] + : [ + 'systemctl enable spark-worker' + ]) + ]; +} + +export function sparkMasterCommands() { + return [ + 'systemctl start spark-master', + 'sleep 5', // Wait for master to start + 'systemctl start spark-worker', // Master also runs a worker + 'sleep 3', // Wait for worker to register + 'systemctl start spark-thrift', // Start Spark Thrift Server for SQL access + ]; +} + +export function sparkWorkerCommands(master: ec2.Instance) { + return [ + `cat > /opt/spark/conf/spark-env.sh << SPARK_EOF +export JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.x86_64 +export SPARK_HOME=/opt/spark +export SPARK_LOG_DIR=/var/spark/logs +export SPARK_WORKER_DIR=/var/spark/work +export SPARK_LOCAL_DIRS=/var/spark/tmp +export SPARK_MASTER_HOST=${master.instancePrivateIp} +export SPARK_MASTER_PORT=7077 +export SPARK_MASTER_WEBUI_PORT=8080 +export SPARK_WORKER_WEBUI_PORT=8081 +export SPARK_WORKER_MEMORY=8g +export SPARK_WORKER_CORES=4 +SPARK_EOF`, + `cat > /opt/spark/conf/spark-defaults.conf << SPARK_EOF +spark.master spark://${master.instancePrivateIp}:7077 +spark.eventLog.enabled true +spark.eventLog.dir /var/spark/logs +spark.history.fs.logDirectory /var/spark/logs +spark.sql.warehouse.dir /var/spark/warehouse +spark.driver.memory 8g +spark.executor.memory 8g +spark.sql.adaptive.enabled true +spark.sql.adaptive.coalescePartitions.enabled true +spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem +spark.hadoop.fs.s3a.aws.credentials.provider com.amazonaws.auth.InstanceProfileCredentialsProvider +SPARK_EOF`, + 'systemctl start spark-worker', + ]; +} diff --git a/benchmarks/cdk/package-lock.json b/benchmarks/cdk/package-lock.json index 7c55a38b..62b28a36 100644 --- a/benchmarks/cdk/package-lock.json +++ b/benchmarks/cdk/package-lock.json @@ -11,6 +11,7 @@ "aws-cdk-lib": "2.215.0", "commander": "^14.0.2", "constructs": "^10.0.0", + "hive-driver": "^1.0.0", "zod": "^4.1.12" }, "bin": { @@ -1244,6 +1245,12 @@ "sprintf-js": "~1.0.2" } }, + "node_modules/async-limiter": { + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/async-limiter/-/async-limiter-1.0.1.tgz", + "integrity": "sha512-csOlWGAcRFJaI6m+F2WKdnMKr4HhdhFVBk0H/QbJFMCr+uO2kwohwXQPxw/9OCxp05r5ghVBFSyioixx3gfkNQ==", + "license": "MIT" + }, "node_modules/aws-cdk": { "version": "2.1031.2", "resolved": "https://registry.npmjs.org/aws-cdk/-/aws-cdk-2.1031.2.tgz", @@ -1767,6 +1774,12 @@ "node": ">=8" } }, + "node_modules/browser-or-node": { + "version": "1.3.0", + "resolved": "https://registry.npmjs.org/browser-or-node/-/browser-or-node-1.3.0.tgz", + "integrity": "sha512-0F2z/VSnLbmEeBcUrSuDH5l0HxTXdQQzLjkmBR4cYfvg1zJrKSlmIZFqyFR8oX0NrwPhy3c3HQ6i3OxMbew4Tg==", + "license": "MIT" + }, "node_modules/browserslist": { "version": "4.28.0", "resolved": "https://registry.npmjs.org/browserslist/-/browserslist-4.28.0.tgz", @@ -2435,6 +2448,16 @@ "node": ">= 0.4" } }, + "node_modules/hive-driver": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/hive-driver/-/hive-driver-1.0.0.tgz", + "integrity": "sha512-shUlHN7PAiVp+7UMT3aryUa5nrtMtb3ht+mF5irHvn2qoRwrN4hwhE8c4Gn62qzdfyXBkGYcyov9rRPw9aoC4Q==", + "license": "MIT", + "dependencies": { + "node-int64": "^0.4.0", + "thrift": "^0.20.0" + } + }, "node_modules/html-escaper": { "version": "2.0.2", "resolved": "https://registry.npmjs.org/html-escaper/-/html-escaper-2.0.2.tgz", @@ -2574,6 +2597,15 @@ "dev": true, "license": "ISC" }, + "node_modules/isomorphic-ws": { + "version": "4.0.1", + "resolved": "https://registry.npmjs.org/isomorphic-ws/-/isomorphic-ws-4.0.1.tgz", + "integrity": "sha512-BhBvN2MBpWTaSHdWRb/bwdZJ1WaehQ2L1KngkCkfLUGF0mAWAT1sQUQacEmQ0jXkFw/czDXPNQSL5u2/Krsz1w==", + "license": "MIT", + "peerDependencies": { + "ws": "*" + } + }, "node_modules/istanbul-lib-coverage": { "version": "3.2.2", "resolved": "https://registry.npmjs.org/istanbul-lib-coverage/-/istanbul-lib-coverage-3.2.2.tgz", @@ -3490,7 +3522,6 @@ "version": "0.4.0", "resolved": "https://registry.npmjs.org/node-int64/-/node-int64-0.4.0.tgz", "integrity": "sha512-O5lz91xSOeoXP6DulyHfllpq+Eg00MWitZIbtPfoSEvqIHdl5gfcY6hYzDWnj0qD5tz52PI08u9qUvSVeUBeHw==", - "dev": true, "license": "MIT" }, "node_modules/node-releases": { @@ -3762,6 +3793,17 @@ ], "license": "MIT" }, + "node_modules/q": { + "version": "1.5.1", + "resolved": "https://registry.npmjs.org/q/-/q-1.5.1.tgz", + "integrity": "sha512-kV/CThkXo6xyFEZUugw/+pIOywXcDbFYgSct5cT3gqlbkBE1SJdwy6UQoZvodiWF/ckQLZyDE/Bu1M6gVu5lVw==", + "deprecated": "You or someone you depend on is using Q, the JavaScript Promise library that gave JavaScript developers strong feelings about promises. They can almost certainly migrate to the native JavaScript promise now. Thank you literally everyone for joining me in this bet against the odds. Be excellent to each other.\n\n(For a CapTP with native promises, see @endo/eventual-send and @endo/captp)", + "license": "MIT", + "engines": { + "node": ">=0.6.0", + "teleport": ">=0.2.0" + } + }, "node_modules/react-is": { "version": "18.3.1", "resolved": "https://registry.npmjs.org/react-is/-/react-is-18.3.1.tgz", @@ -4047,6 +4089,22 @@ "node": ">=8" } }, + "node_modules/thrift": { + "version": "0.20.0", + "resolved": "https://registry.npmjs.org/thrift/-/thrift-0.20.0.tgz", + "integrity": "sha512-oSmJTaoIAGolpupVHFfsWcmdEKX81fcDI6ty0hhezzdgZvp0XyXgMe9+1YusI8Ahy0HK4n8jlNrkPjOPeHZjdQ==", + "license": "Apache-2.0", + "dependencies": { + "browser-or-node": "^1.2.1", + "isomorphic-ws": "^4.0.1", + "node-int64": "^0.4.0", + "q": "^1.5.0", + "ws": "^5.2.3" + }, + "engines": { + "node": ">= 10.18.0" + } + }, "node_modules/tmpl": { "version": "1.0.5", "resolved": "https://registry.npmjs.org/tmpl/-/tmpl-1.0.5.tgz", @@ -4373,6 +4431,15 @@ "node": "^12.13.0 || ^14.15.0 || >=16.0.0" } }, + "node_modules/ws": { + "version": "5.2.4", + "resolved": "https://registry.npmjs.org/ws/-/ws-5.2.4.tgz", + "integrity": "sha512-fFCejsuC8f9kOSu9FYaOw8CdO68O3h5v0lg4p74o8JqWpwTf9tniOD+nOB78aWoVSS6WptVUmDrp/KPsMVBWFQ==", + "license": "MIT", + "dependencies": { + "async-limiter": "~1.0.0" + } + }, "node_modules/y18n": { "version": "5.0.8", "resolved": "https://registry.npmjs.org/y18n/-/y18n-5.0.8.tgz", diff --git a/benchmarks/cdk/package.json b/benchmarks/cdk/package.json index 3c006f89..b89eb02f 100644 --- a/benchmarks/cdk/package.json +++ b/benchmarks/cdk/package.json @@ -11,7 +11,8 @@ "cdk": "cdk", "sync-bucket": "aws s3 sync ../data s3://datafusion-distributed-benchmarks/", "datafusion-bench": "npx ts-node bin/datafusion-bench.ts", - "trino-bench": "npx ts-node bin/trino-bench.ts" + "trino-bench": "npx ts-node bin/trino-bench.ts", + "spark-bench": "npx ts-node bin/spark-bench.ts" }, "devDependencies": { "@types/jest": "^29.5.14", @@ -24,8 +25,9 @@ }, "dependencies": { "aws-cdk-lib": "2.215.0", - "constructs": "^10.0.0", "commander": "^14.0.2", + "constructs": "^10.0.0", + "hive-driver": "^1.0.0", "zod": "^4.1.12" } }