Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 62 additions & 96 deletions pipeline/ingestion/README.md
Original file line number Diff line number Diff line change
@@ -1,99 +1,65 @@
# Spanner Graph Data Ingestion Pipeline

## org.datacommons.ingestion.pipeline.IngestionPipeline

This module loads all tables (observations, nodes and edges) for all or
specified import groups into Spanner.
The import group version to load from are fetched from the DC API's version
endpoint.

### Worker configuration

- Memory requirement
- Pipeline is more memory intensive than CPU.
- Memory hungry steps are
- Writing to spanner
- The Spanner sink sorts and batches mutations in memory. To improve
throughput, the pipeline uses larger batch sizes, which
consequently requires more memory.
- Duplicate nodes and edges are eliminated within the pipeline using
bundle-level caches. These caches track seen items and are cleared
only after a bundle is processed.
- Pipeline works best with ram/cpu ratio greater than
- 8 - when importing individual import groups.
- 16 - when importing all import groups.
- Worker machine type
- For a RAM/CPU ratio of 8, it's advisable to use 16-core workers (such as
n2/e2-highmem-16) instead of 8-core alternatives (like n2/e2-highmem-8).
This approach helps prevent excessive garbage collection and GC thrashing,
which can otherwise lead to Dataflow terminating workers and causing
workflow failures.
- For a RAM/CPU ratio of 16, prefer 8-core workers (e.g.,
n2-custom-8-131072-ext with 8 cores and 128GiB). Using 16-core workers
would necessitate 256GiB of ram, leading to a larger Java heap. This
larger heap can cause extended GC pauses, during which worker threads are
unresponsive. The Dataflow service might interpret this unresponsiveness
as a failed worker and terminate it.
- Worker count
- Dataflow autoscaling has the tendency to increase worker
counts for spanner write step.
- Cap Dataflow's total worker threads to 6x (big
rows like observations) - 15x (small rows like edges/nodes) your Spanner
node count. This prevents Spanner overload (for spanner step, workers
counts are increased by dataflow autoscaling ) and helps maintain write
throughput.
- This can be done using pipeline option `--maxNumWorkers`.
- Please note that `maxNumWorkers` takes count of virtual machines(vm)
and not worker threads. Hence,
`maxNumWorkers = Total worker threads / number of cores per vm`.
- Setting the initial worker count (`numWorkers`) to match the maximum
worker count (`maxNumWorkers`) saves several minutes by eliminating the
Dataflow autoscaling typical ramp-up time to reach the maximum number of
workers.

### Example usages:

### Import all import groups

```shell
mvn -Pdataflow-runner compile exec:java -pl ingestion -am -Dexec.mainClass=org.datacommons.ingestion.pipeline.IngestionPipeline \
-Dexec.args="--project=datcom-store --gcpTempLocation=gs://keyurs-dataflow/temp --runner=DataflowRunner --region=us-central1 --numWorkers=120 --maxNumWorkers=120 --dataflowServiceOptions=enable_google_cloud_profiler --workerMachineType=n2-custom-8-131072-ext"
```

### Import all import groups while skipping observations

```shell
mvn -Pdataflow-runner compile exec:java -pl ingestion -am -Dexec.mainClass=org.datacommons.ingestion.pipeline.IngestionPipeline \
-Dexec.args="--skipProcessing=SKIP_OBS --project=datcom-store --gcpTempLocation=gs://keyurs-dataflow/temp --runner=DataflowRunner --region=us-central1 --numWorkers=30 --maxNumWorkers=100 --dataflowServiceOptions=enable_google_cloud_profiler --workerMachineType=e2-highmem-16"
# Graph Ingestion Pipeline

The `GraphIngestionPipeline` is an Apache Beam-based ingestion pipeline designed to load graph data (nodes, edges, and observations) into Google Cloud Spanner.

## Key Features

- **Multi-Import Support**: Processes a list of imports and their respective graph data paths in a single execution.
- **Data Formats**: Automatically detects and handles both MCF (multi-line) and TFRecord file formats.
- **Automated Schema Management**: Automatically creates the necessary Spanner database and tables (`Node`, `Edge`, `Observation`) if they do not exist.
- **Intelligent Processing**:
- Splits graph data into schema and observation nodes.
- Combines schema nodes for specific imports (e.g., `Schema`, `Place`).
- Optimizes observation graphs for efficient Spanner storage.
- Transforms nodes before ingestion.
- **Data Integrity**: Manages data deletion for re-imports and ensures proper write ordering (writing nodes before edges).

## Pipeline Configuration

The pipeline is configured using `IngestionPipelineOptions`. Key options include:

- `--projectId`: GCP project ID (default: `datcom-store`).
- `--spannerInstanceId`: Spanner Instance ID (default: `dc-kg-test`).
- `--spannerDatabaseId`: Spanner Database ID (default: `dc_graph_5`).
- `--importList`: A JSON array of objects specifying the imports to process. Each object must contain `importName` and `graphPath`.
- `--skipDelete`: Boolean flag to skip deleting existing data for the specified imports (default: `false`).
- `--numShards`: Number of shards for writing mutations to Spanner (default: `1`).
- `--spannerNodeTableName`: Name of the Spanner Node table (default: `Node`).
- `--spannerEdgeTableName`: Name of the Spanner Edge table (default: `Edge`).
- `--spannerObservationTableName`: Name of the Spanner Observation table (default: `Observation`).

## Example Usage

To run the pipeline using the Dataflow runner:

```bash
mvn -Pdataflow-runner compile exec:java \
-pl ingestion -am \
-Dexec.mainClass=org.datacommons.ingestion.pipeline.GraphIngestionPipeline \
-Dexec.args="--project=YOUR_PROJECT_ID \
--spannerInstanceId=YOUR_INSTANCE_ID \
--spannerDatabaseId=YOUR_DATABASE_ID \
--importList='[{\"importName\": \"Schema\", \"graphPath\": \"gs://path/to/schema/mcf/\"}, {\"importName\": \"SampleImport\", \"graphPath\": \"gs://path/to/data.tfrecord\"}]' \
--runner=DataflowRunner \
--region=us-central1 \
--numWorkers=10 \
--maxNumWorkers=20"
```

### Import specific import group

```shell
mvn -Pdataflow-runner compile exec:java -pl ingestion -am -Dexec.mainClass=org.datacommons.ingestion.pipeline.IngestionPipeline \
-Dexec.args="--importGroupVersion=ipcc_2025_04_04_03_46_23 --project=datcom-store --gcpTempLocation=gs://keyurs-dataflow/temp --runner=DataflowRunner --region=us-central1 --numWorkers=20 --maxNumWorkers=30 \
--dataflowServiceOptions=enable_google_cloud_profiler --workerMachineType=e2-highmem-16"
```

### Import specific import group while skipping graph

```shell
mvn -Pdataflow-runner compile exec:java -pl ingestion -am -Dexec.mainClass=org.datacommons.ingestion.pipeline.IngestionPipeline \
-Dexec.args="--importGroupVersion=ipcc_2025_04_04_03_46_23 --skipProcessing=SKIP_GRAPH --project=datcom-store --gcpTempLocation=gs://keyurs-dataflow/temp --runner=DataflowRunner --region=us-central1 --numWorkers=20 --maxNumWorkers=20 --dataflowServiceOptions=enable_google_cloud_profiler --workerMachineType=e2-highmem-16"
```

## Debug options

When running any pipeline, various debug options are possible. Below are some
that can be useful for the ingestion pipeline:

```shell
# Log hot keys
--hotKeyLoggingEnabled=true

# Enable profiler
--dataflowServiceOptions=enable_google_cloud_profiler

# Capture output samples for display in the cloud console
--experiments=enable_data_sampling
## Input Format for `--importList`

The `--importList` argument expects a JSON string representing an array of import configurations.

Example:
```json
[
{
"importName": "Schema",
"graphPath": "gs://datcom-store/graph/schema/"
},
{
"importName": "Place",
"graphPath": "gs://datcom-store/graph/place/place.tfrecord"
}
]
```
99 changes: 99 additions & 0 deletions pipeline/ingestion/README_LegacyPipeline.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# Spanner Graph Data Ingestion Pipeline

## org.datacommons.ingestion.pipeline.IngestionPipeline

This module loads all tables (observations, nodes and edges) for all or
specified import groups into Spanner.
The import group version to load from are fetched from the DC API's version
endpoint.

### Worker configuration

- Memory requirement
- Pipeline is more memory intensive than CPU.
- Memory hungry steps are
- Writing to spanner
- The Spanner sink sorts and batches mutations in memory. To improve
throughput, the pipeline uses larger batch sizes, which
consequently requires more memory.
- Duplicate nodes and edges are eliminated within the pipeline using
bundle-level caches. These caches track seen items and are cleared
only after a bundle is processed.
- Pipeline works best with ram/cpu ratio greater than
- 8 - when importing individual import groups.
- 16 - when importing all import groups.
- Worker machine type
- For a RAM/CPU ratio of 8, it's advisable to use 16-core workers (such as
n2/e2-highmem-16) instead of 8-core alternatives (like n2/e2-highmem-8).
This approach helps prevent excessive garbage collection and GC thrashing,
which can otherwise lead to Dataflow terminating workers and causing
workflow failures.
- For a RAM/CPU ratio of 16, prefer 8-core workers (e.g.,
n2-custom-8-131072-ext with 8 cores and 128GiB). Using 16-core workers
would necessitate 256GiB of ram, leading to a larger Java heap. This
larger heap can cause extended GC pauses, during which worker threads are
unresponsive. The Dataflow service might interpret this unresponsiveness
as a failed worker and terminate it.
- Worker count
- Dataflow autoscaling has the tendency to increase worker
counts for spanner write step.
- Cap Dataflow's total worker threads to 6x (big
rows like observations) - 15x (small rows like edges/nodes) your Spanner
node count. This prevents Spanner overload (for spanner step, workers
counts are increased by dataflow autoscaling ) and helps maintain write
throughput.
- This can be done using pipeline option `--maxNumWorkers`.
- Please note that `maxNumWorkers` takes count of virtual machines(vm)
and not worker threads. Hence,
`maxNumWorkers = Total worker threads / number of cores per vm`.
- Setting the initial worker count (`numWorkers`) to match the maximum
worker count (`maxNumWorkers`) saves several minutes by eliminating the
Dataflow autoscaling typical ramp-up time to reach the maximum number of
workers.

### Example usages:

### Import all import groups

```shell
mvn -Pdataflow-runner compile exec:java -pl ingestion -am -Dexec.mainClass=org.datacommons.ingestion.pipeline.IngestionPipeline \
-Dexec.args="--project=datcom-store --gcpTempLocation=gs://keyurs-dataflow/temp --runner=DataflowRunner --region=us-central1 --numWorkers=120 --maxNumWorkers=120 --dataflowServiceOptions=enable_google_cloud_profiler --workerMachineType=n2-custom-8-131072-ext"
```

### Import all import groups while skipping observations

```shell
mvn -Pdataflow-runner compile exec:java -pl ingestion -am -Dexec.mainClass=org.datacommons.ingestion.pipeline.IngestionPipeline \
-Dexec.args="--skipProcessing=SKIP_OBS --project=datcom-store --gcpTempLocation=gs://keyurs-dataflow/temp --runner=DataflowRunner --region=us-central1 --numWorkers=30 --maxNumWorkers=100 --dataflowServiceOptions=enable_google_cloud_profiler --workerMachineType=e2-highmem-16"
```

### Import specific import group

```shell
mvn -Pdataflow-runner compile exec:java -pl ingestion -am -Dexec.mainClass=org.datacommons.ingestion.pipeline.IngestionPipeline \
-Dexec.args="--importGroupVersion=ipcc_2025_04_04_03_46_23 --project=datcom-store --gcpTempLocation=gs://keyurs-dataflow/temp --runner=DataflowRunner --region=us-central1 --numWorkers=20 --maxNumWorkers=30 \
--dataflowServiceOptions=enable_google_cloud_profiler --workerMachineType=e2-highmem-16"
```

### Import specific import group while skipping graph

```shell
mvn -Pdataflow-runner compile exec:java -pl ingestion -am -Dexec.mainClass=org.datacommons.ingestion.pipeline.IngestionPipeline \
-Dexec.args="--importGroupVersion=ipcc_2025_04_04_03_46_23 --skipProcessing=SKIP_GRAPH --project=datcom-store --gcpTempLocation=gs://keyurs-dataflow/temp --runner=DataflowRunner --region=us-central1 --numWorkers=20 --maxNumWorkers=20 --dataflowServiceOptions=enable_google_cloud_profiler --workerMachineType=e2-highmem-16"
```

## Debug options

When running any pipeline, various debug options are possible. Below are some
that can be useful for the ingestion pipeline:

```shell
# Log hot keys
--hotKeyLoggingEnabled=true

# Enable profiler
--dataflowServiceOptions=enable_google_cloud_profiler

# Capture output samples for display in the cloud console
--experiments=enable_data_sampling
```