diff --git a/bin/mvn b/bin/mvn index f142133d8..a81a911ca 100755 --- a/bin/mvn +++ b/bin/mvn @@ -36,6 +36,7 @@ docker run $INTERACTIVE_FLAGS --rm \ -v "$GIT_TOPLEVEL":/var/maven/project \ -w /var/maven/project/"$GIT_PREFIX" \ -v ~/.m2:/var/maven/.m2 \ + -e MAVEN_OPTS \ -e MAVEN_CONFIG=/var/maven/.m2 \ -e GOOGLE_APPLICATION_CREDENTIALS \ $MOUNT_CREDENTIALS_FLAGS \ diff --git a/ingestion-beam/bin/run-pioneer-benchmark b/ingestion-beam/bin/run-pioneer-benchmark new file mode 100755 index 000000000..109395046 --- /dev/null +++ b/ingestion-beam/bin/run-pioneer-benchmark @@ -0,0 +1,103 @@ +#!/bin/bash +# Runs a benchmark that can be used to quantify the overhead of the +# DecryptPioneerPayloads step. A document sample of ingested data is encrypted +# using JOSE and is run through the Pioneer-enabled decoder. This is compared +# against the plaintext data. +# +# Initial findings show that for every 1 minute spent on ParsePayload, there are +# 3 minutes spend on DecryptPioneerPayloads. Peak throughput in ParsePayload +# goes from 3360 elements/sec to 2466 elements/sec when adding encryption. +# +# This script requires the use of GNU Coreutils. This may be installed on MacOS +# via homebrew: `brew install coreutils`. + +set -ex + +export MAVEN_OPTS="-Xms8g -Xmx8g" +reset=${RESET:-false} +project=$(gcloud config get-value project) +bucket="gs://${BUCKET?bucket value must be specified}" +prefix="ingestion-beam-benchmark" +staging="benchmark_staging" + +cd "$(dirname "$0")/.." + +if [[ ! -f document_sample.ndjson ]]; then + echo "missing document_sample.ndjson, run download-document-sample" + exit 1 +fi + +if [[ ! -f pioneer_benchmark_data.ndjson ]] || [[ ${reset} == "true" ]]; then + # generate the data using the document sample, this may take a while depending on your machine + ./bin/mvn clean compile exec:java -Dexec.mainClass=com.mozilla.telemetry.PioneerBenchmarkGenerator +fi + +# assert bucket can be read +gsutil ls "$bucket" &> /dev/null + +# generate a new folder and sync it to gcs, assumes a bucket value +if [[ ! -d $staging ]]; then + plaintext=$staging/input/plaintext + ciphertext=$staging/input/ciphertext + metadata=$staging/metadata + mkdir -p $plaintext + mkdir -p $ciphertext + mkdir -p $metadata + # shuffle to avoid data skew, and to prepare for file splitting if necessary + shuf document_sample.ndjson > $plaintext/part-0.ndjson + shuf pioneer_benchmark_data.ndjson > $ciphertext/part-0.ndjson + cp pioneer_benchmark_key.json $staging/metadata/key.json + # compute the location of the remote key and insert it into the metadata + remote_key="$bucket/$prefix/metadata/key.json" + jq "(.. | .private_key_uri?) |= \"$remote_key\"" pioneer_benchmark_metadata.json > $staging/metadata/metadata.json + cp schemas.tar.gz $metadata + cp cities15000.txt $metadata + cp GeoLite2-City.mmdb $metadata +fi + +# this can take a while, depending on your upload speed (~1 GB of data) +gsutil -m rsync -R -d $staging/ "$bucket/$prefix/" + +# plaintext +./bin/mvn compile exec:java -Dexec.mainClass=com.mozilla.telemetry.Decoder -Dexec.args="\ + --runner=Dataflow \ + --profilingAgentConfiguration='{\"APICurated\": true}' + --project=$project \ + --autoscalingAlgorithm=NONE \ + --workerMachineType=n1-standard-1 \ + --gcpTempLocation=$bucket/tmp \ + --numWorkers=2 \ + --geoCityDatabase=$bucket/$prefix/metadata/GeoLite2-City.mmdb \ + --geoCityFilter=$bucket/$prefix/metadata/cities15000.txt \ + --schemasLocation=$bucket/$prefix/metadata/schemas.tar.gz \ + --inputType=file \ + --input=$bucket/$prefix/input/plaintext/'part-*' \ + --outputType=file \ + --output=$bucket/$prefix/output/plaintext/ \ + --errorOutputType=file \ + --errorOutput=$bucket/$prefix/error/plaintext/ \ +" + +# ciphertext +./bin/mvn compile exec:java -Dexec.mainClass=com.mozilla.telemetry.Decoder -Dexec.args="\ + --runner=Dataflow \ + --profilingAgentConfiguration='{\"APICurated\": true}' + --project=$project \ + --autoscalingAlgorithm=NONE \ + --workerMachineType=n1-standard-1 \ + --gcpTempLocation=$bucket/tmp \ + --numWorkers=2 \ + --pioneerEnabled=true \ + --pioneerMetadataLocation=$bucket/$prefix/metadata/metadata.json \ + --pioneerKmsEnabled=false \ + --pioneerDecompressPayload=false \ + --geoCityDatabase=$bucket/$prefix/metadata/GeoLite2-City.mmdb \ + --geoCityFilter=$bucket/$prefix/metadata/cities15000.txt \ + --schemasLocation=$bucket/$prefix/metadata/schemas.tar.gz \ + --inputType=file \ + --input=$bucket/$prefix/input/ciphertext/'part-*' \ + --outputType=file \ + --output=$bucket/$prefix/output/ciphertext/ \ + --errorOutputType=file \ + --errorOutput=$bucket/$prefix/error/ciphertext/ \ +" diff --git a/ingestion-beam/src/main/java/com/mozilla/telemetry/PioneerBenchmarkGenerator.java b/ingestion-beam/src/main/java/com/mozilla/telemetry/PioneerBenchmarkGenerator.java new file mode 100644 index 000000000..ae0a0b31e --- /dev/null +++ b/ingestion-beam/src/main/java/com/mozilla/telemetry/PioneerBenchmarkGenerator.java @@ -0,0 +1,108 @@ +package com.mozilla.telemetry; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.base.Charsets; +import com.mozilla.telemetry.util.Json; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.security.PublicKey; +import java.util.HashSet; +import java.util.Optional; +import java.util.stream.Stream; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; +import org.jose4j.jwe.ContentEncryptionAlgorithmIdentifiers; +import org.jose4j.jwe.JsonWebEncryption; +import org.jose4j.jwe.KeyManagementAlgorithmIdentifiers; +import org.jose4j.jwk.EcJwkGenerator; +import org.jose4j.jwk.EllipticCurveJsonWebKey; +import org.jose4j.jwk.JsonWebKey.OutputControlLevel; +import org.jose4j.keys.EllipticCurves; +import org.jose4j.lang.JoseException; + +/** Generate a dataset for benchmarking the DecryptPioneerPayloads transform. */ +public class PioneerBenchmarkGenerator { + + static final ObjectMapper mapper = new ObjectMapper(); + + /** Encrypt a payload using a public key and insert it into an envelope. */ + public static byte[] encrypt(byte[] data, PublicKey key) throws IOException, JoseException { + JsonWebEncryption jwe = new JsonWebEncryption(); + jwe.setPayload(new String(data, Charsets.UTF_8)); + jwe.setAlgorithmHeaderValue(KeyManagementAlgorithmIdentifiers.ECDH_ES); + jwe.setEncryptionMethodHeaderParameter(ContentEncryptionAlgorithmIdentifiers.AES_256_GCM); + jwe.setKey(key); + String serializedJwe = jwe.getCompactSerialization(); + ObjectNode node = mapper.createObjectNode(); + node.put("payload", serializedJwe); + return Json.asString(node).getBytes(Charsets.UTF_8); + } + + /** Read a Pubsub ndjson message wrapping failures in an Optional. */ + public static Optional parsePubsub(String data) { + try { + return Optional.of(Json.readPubsubMessage(data)); + } catch (Exception e) { + e.printStackTrace(); + return Optional.empty(); + } + } + + /** Encrypt the payload in a Pubsub message and place it into an envelope. */ + public static Optional transform(PubsubMessage message, PublicKey key) { + try { + PubsubMessage encryptedMessage = new PubsubMessage(encrypt(message.getPayload(), key), + message.getAttributeMap()); + return Optional.of(Json.asString(encryptedMessage)); + } catch (IOException | JoseException e) { + e.printStackTrace(); + return Optional.empty(); + } + } + + /** Read in documents in the shape of Pubsub ndjson files and encrypt using + * Pioneer parameters and envelope. Write out the relevant metadata files for + * the single key in use. */ + public static void main(final String[] args) throws JoseException, IOException { + final Path inputPath = Paths.get("document_sample.ndjson"); + final Path outputPath = Paths.get("pioneer_benchmark_data.ndjson"); + final Path keyPath = Paths.get("pioneer_benchmark_key.json"); + final Path metadataPath = Paths.get("pioneer_benchmark_metadata.json"); + + EllipticCurveJsonWebKey key = EcJwkGenerator.generateJwk(EllipticCurves.P256); + HashSet namespaces = new HashSet(); + + // runs in ~2:30 min + try (Stream stream = Files.lines(inputPath)) { + // side-effects to get the set of attributes + Files.write(outputPath, (Iterable) stream.map(PioneerBenchmarkGenerator::parsePubsub) + .filter(Optional::isPresent).map(Optional::get).map(message -> { + // side-effects and side-input + namespaces.add(message.getAttribute("document_namespace")); + return transform(message, key.getPublicKey()); + }).filter(Optional::isPresent).map(Optional::get)::iterator); + } catch (IOException e) { + e.printStackTrace(); + } + + // write out the key + Files.write(keyPath, key.toJson(OutputControlLevel.INCLUDE_PRIVATE).getBytes(Charsets.UTF_8)); + + // write out the metadata + ArrayNode metadata = mapper.createArrayNode(); + for (String namespace : namespaces) { + ObjectNode node = mapper.createObjectNode(); + node.put("private_key_id", namespace); + // TODO: write this to the appropriate location like gcs, if relevant + node.put("private_key_uri", keyPath.toString()); + // empty value + node.put("kms_resource_id", ""); + metadata.add(node); + } + Files.write(metadataPath, metadata.toPrettyString().getBytes(Charsets.UTF_8)); + + } +}