Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ public static void main(String[] args) {
PCollection<McfGraph> currentNodes;
if (options.getUseOptimizedGraphFormat()) {
LOGGER.info("Using tfrecord file format");
currentNodes = PipelineUtils.readMcfGraph(options.getCurrentData(), p);
previousNodes = PipelineUtils.readMcfGraph(options.getPreviousData(), p);
currentNodes = PipelineUtils.readMcfGraph("differ", options.getCurrentData(), p);
previousNodes = PipelineUtils.readMcfGraph("differ", options.getPreviousData(), p);
} else {
LOGGER.info("Using mcf file format");
previousNodes = PipelineUtils.readMcfFiles(options.getPreviousData(), p);
currentNodes = PipelineUtils.readMcfFiles(options.getCurrentData(), p);
previousNodes = PipelineUtils.readMcfFiles("differ", options.getPreviousData(), p);
currentNodes = PipelineUtils.readMcfFiles("differ", options.getCurrentData(), p);
}

// Process the input and perform diff operation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ public void testDiffer() {

// Process the input.
PCollection<McfGraph> currentGraph =
PipelineUtils.readMcfFiles(Paths.get(currentFile, "*.mcf").toString(), p);
PipelineUtils.readMcfFiles("test", Paths.get(currentFile, "*.mcf").toString(), p);
PCollection<McfGraph> previousGraph =
PipelineUtils.readMcfFiles(Paths.get(previousFile, "*.mcf").toString(), p);
PipelineUtils.readMcfFiles("test", Paths.get(previousFile, "*.mcf").toString(), p);
PCollectionTuple currentNodesTuple = DifferUtils.processGraph(currentGraph);
PCollectionTuple previousNodesTuple = DifferUtils.processGraph(previousGraph);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,18 @@
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.spanner.SpannerWriteResult;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.Wait;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.datacommons.ingestion.spanner.SpannerClient;
import org.datacommons.ingestion.util.GraphReader;
import org.datacommons.ingestion.util.PipelineUtils;
Expand Down Expand Up @@ -70,12 +70,6 @@ public static void buildPipeline(
Pipeline pipeline, IngestionPipelineOptions options, SpannerClient spannerClient) {
LOGGER.info("Running import pipeline for imports: {}", options.getImportList());

// Initialize lists to hold mutations from all imports.
List<PCollection<Void>> deleteOpsList = new ArrayList<>();
List<PCollection<Mutation>> obsMutationList = new ArrayList<>();
List<PCollection<Mutation>> edgeMutationList = new ArrayList<>();
List<PCollection<Mutation>> nodeMutationList = new ArrayList<>();

// Parse the input import list JSON.
JsonElement jsonElement = JsonParser.parseString(options.getImportList());
JsonArray jsonArray = jsonElement.getAsJsonArray();
Expand All @@ -97,37 +91,8 @@ public static void buildPipeline(
String graphPath = pathElement.getAsString();

// Process the individual import.
processImport(
pipeline,
spannerClient,
importName,
graphPath,
options.getSkipDelete(),
deleteOpsList,
nodeMutationList,
edgeMutationList,
obsMutationList);
}
// Finally, aggregate all collected mutations and write them to Spanner.
// 1. Process Deletes:
// First, execute all delete mutations to clear old data for the imports.
PCollection<Void> deleted =
PCollectionList.of(deleteOpsList).apply("DeleteOps", Flatten.pCollections());

// 2. Process Observations:
// Write observation mutations after deletes are complete.
if (options.getWriteObsGraph()) {
spannerClient.writeMutations(pipeline, "Observations", obsMutationList, deleted);
processImport(pipeline, spannerClient, importName, graphPath, options.getSkipDelete());
}

// 3. Process Nodes:
// Write node mutations after deletes are complete.
SpannerWriteResult writtenNodes =
spannerClient.writeMutations(pipeline, "Nodes", nodeMutationList, deleted);

// 4. Process Edges:
// Write edge mutations only after node mutations are complete to ensure referential integrity.
spannerClient.writeMutations(pipeline, "Edges", edgeMutationList, writtenNodes.getOutput());
}

/**
Expand All @@ -138,67 +103,94 @@ public static void buildPipeline(
* @param importName The name of the import.
* @param graphPath The full path to the graph data.
* @param skipDelete Whether to skip delete operations.
* @param deleteOpsList List to collect delete Ops.
* @param nodeMutationList List to collect node mutations.
* @param edgeMutationList List to collect edge mutations.
* @param obsMutationList List to collect observation mutations.
*/
private static void processImport(
Pipeline pipeline,
SpannerClient spannerClient,
String importName,
String graphPath,
boolean skipDelete,
List<PCollection<Void>> deleteOpsList,
List<PCollection<Mutation>> nodeMutationList,
List<PCollection<Mutation>> edgeMutationList,
List<PCollection<Mutation>> obsMutationList) {
boolean skipDelete) {
LOGGER.info("Import: {} Graph path: {}", importName, graphPath);

String provenance = "dc/base/" + importName;

// 1. Prepare Deletes:
// Generate mutations to delete existing data for this import/provenance.
// Create a dummy signal if deletes are skipped, so downstream dependencies are satisfied
// immediately.
PCollection<Void> deleteObsWait;
PCollection<Void> deleteEdgesWait;
if (!skipDelete) {
List<PCollection<Void>> deleteOps =
GraphReader.deleteExistingDataForImport(importName, provenance, pipeline, spannerClient);
deleteOpsList.addAll(deleteOps);
deleteObsWait =
spannerClient.deleteDataForImport(
pipeline, importName, spannerClient.getObservationTableName(), "import_name");
deleteEdgesWait =
spannerClient.deleteDataForImport(
pipeline, provenance, spannerClient.getEdgeTableName(), "provenance");
} else {
deleteObsWait =
pipeline.apply(
"CreateEmptyObsWait-" + importName, Create.empty(TypeDescriptor.of(Void.class)));
deleteEdgesWait =
pipeline.apply(
"CreateEmptyEdgesWait-" + importName, Create.empty(TypeDescriptor.of(Void.class)));
}

// 2. Read and Split Graph:
// Read the graph data (TFRecord or MCF files) and split into schema and observation nodes.
PCollection<McfGraph> graph =
graphPath.contains("tfrecord")
? PipelineUtils.readMcfGraph(graphPath, pipeline)
: PipelineUtils.readMcfFiles(graphPath, pipeline);
PCollectionTuple graphNodes = PipelineUtils.splitGraph(graph);
? PipelineUtils.readMcfGraph(importName, graphPath, pipeline)
: PipelineUtils.readMcfFiles(importName, graphPath, pipeline);
PCollectionTuple graphNodes = PipelineUtils.splitGraph(importName, graph);
PCollection<McfGraph> observationNodes = graphNodes.get(PipelineUtils.OBSERVATION_NODES_TAG);
PCollection<McfGraph> schemaNodes = graphNodes.get(PipelineUtils.SCHEMA_NODES_TAG);

// 3. Process Schema Nodes:
// Combine schema nodes if required, then convert to Node and Edge mutations.
// Combine nodes if required.
PCollection<McfGraph> combinedGraph = schemaNodes;
if (IMPORTS_TO_COMBINE.contains(importName)) {
combinedGraph = PipelineUtils.combineGraphNodes(schemaNodes);
combinedGraph = PipelineUtils.combineGraphNodes(importName, schemaNodes);
}

// Convert all nodes to mutations
PCollection<Mutation> nodeMutations =
GraphReader.graphToNodes(
importName, combinedGraph, spannerClient, nodeCounter, nodeInvalidTypeCounter)
"NodeMutations-" + importName,
combinedGraph,
spannerClient,
nodeCounter,
nodeInvalidTypeCounter)
.apply("ExtractNodeMutations-" + importName, Values.create());
PCollection<Mutation> edgeMutations =
GraphReader.graphToEdges(importName, combinedGraph, provenance, spannerClient, edgeCounter)
GraphReader.graphToEdges(
"EdgeMutations-" + importName,
combinedGraph,
provenance,
spannerClient,
edgeCounter)
.apply("ExtractEdgeMutations-" + importName, Values.create());

nodeMutationList.add(nodeMutations);
edgeMutationList.add(edgeMutations);
// Write Nodes
SpannerWriteResult writtenNodes =
spannerClient.writeMutations(pipeline, "WriteNodesToSpanner-" + importName, nodeMutations);

// Write Edges (wait for Nodes write and Edges delete)
edgeMutations.apply(
"EdgesWaitOn-" + importName, Wait.on(List.of(writtenNodes.getOutput(), deleteEdgesWait)));
spannerClient.writeMutations(pipeline, "WriteEdgesToSpanner-" + importName, edgeMutations);

// 4. Process Observation Nodes:
// Build an optimized graph from observation nodes and convert to Observation mutations.
PCollection<McfOptimizedGraph> optimizedGraph =
PipelineUtils.buildOptimizedMcfGraph(observationNodes);
PipelineUtils.buildOptimizedMcfGraph(importName, observationNodes);
PCollection<Mutation> observationMutations =
GraphReader.graphToObservations(optimizedGraph, importName, spannerClient, obsCounter)
.apply("ExtractObsMutations", Values.create());
obsMutationList.add(observationMutations);
.apply("ExtractObsMutations-" + importName, Values.create());
// Write Observations (wait for Obs delete)
observationMutations.apply("ObsWaitOn-" + importName, Wait.on(deleteObsWait));

spannerClient.writeMutations(
pipeline, "WriteObservationsToSpanner-" + importName, observationMutations);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.UUID;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
Expand Down Expand Up @@ -86,7 +85,7 @@ public class GraphIngestionPipelineIntegrationTest {
private String region;
private String emulatorHost;
private boolean isLocal;
private String importName = "TestImport-" + UUID.randomUUID().toString();
private String importName = "TestImport";
private String nodeNameValue = "Test Node Name";
private SpannerClient spannerClient;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.datacommons.ingestion.spanner.SpannerClient;
import org.junit.Before;
Expand Down Expand Up @@ -121,33 +119,24 @@ public MockSpannerClient(PCollection<Void> deleteSignal, SpannerWriteResult mock
}

@Override
public PCollection<Void> deleteObservationsForImport(String importName, Pipeline pipeline) {
return deleteSignal;
}

@Override
public PCollection<Void> deleteEdgesForImport(String provenance, Pipeline pipeline) {
public PCollection<Void> deleteDataForImport(
Pipeline pipeline, String importName, String tableName, String columnName) {
return deleteSignal;
}

@Override
public SpannerWriteResult writeMutations(
Pipeline pipeline,
String name,
List<PCollection<Mutation>> mutationList,
PCollection<?> waitSignal) {

PCollectionList.of(mutationList)
.apply("Flatten" + name + "Mutations-Test", Flatten.pCollections())
.apply(
"Capture" + name + "Mutations",
ParDo.of(
new DoFn<Mutation, Void>() {
@ProcessElement
public void processElement(@Element Mutation m) {
capturedMutations.add(m);
}
}));
Pipeline pipeline, String name, PCollection<Mutation> mutations) {

mutations.apply(
"Capture" + name + "Mutations",
ParDo.of(
new DoFn<Mutation, Void>() {
@ProcessElement
public void processElement(@Element Mutation m) {
capturedMutations.add(m);
}
}));
return mockWriteResult;
}
}
Expand Down
Loading