diff --git a/pipeline/differ/src/main/java/org/datacommons/ingestion/differ/DifferPipeline.java b/pipeline/differ/src/main/java/org/datacommons/ingestion/differ/DifferPipeline.java index 405269b5..7cd5b0e9 100644 --- a/pipeline/differ/src/main/java/org/datacommons/ingestion/differ/DifferPipeline.java +++ b/pipeline/differ/src/main/java/org/datacommons/ingestion/differ/DifferPipeline.java @@ -30,12 +30,12 @@ public static void main(String[] args) { PCollection currentNodes; if (options.getUseOptimizedGraphFormat()) { LOGGER.info("Using tfrecord file format"); - currentNodes = PipelineUtils.readMcfGraph(options.getCurrentData(), p); - previousNodes = PipelineUtils.readMcfGraph(options.getPreviousData(), p); + currentNodes = PipelineUtils.readMcfGraph("differ", options.getCurrentData(), p); + previousNodes = PipelineUtils.readMcfGraph("differ", options.getPreviousData(), p); } else { LOGGER.info("Using mcf file format"); - previousNodes = PipelineUtils.readMcfFiles(options.getPreviousData(), p); - currentNodes = PipelineUtils.readMcfFiles(options.getCurrentData(), p); + previousNodes = PipelineUtils.readMcfFiles("differ", options.getPreviousData(), p); + currentNodes = PipelineUtils.readMcfFiles("differ", options.getCurrentData(), p); } // Process the input and perform diff operation. diff --git a/pipeline/differ/src/test/java/org/datacommons/ingestion/differ/DifferTest.java b/pipeline/differ/src/test/java/org/datacommons/ingestion/differ/DifferTest.java index c11becdf..752f001b 100644 --- a/pipeline/differ/src/test/java/org/datacommons/ingestion/differ/DifferTest.java +++ b/pipeline/differ/src/test/java/org/datacommons/ingestion/differ/DifferTest.java @@ -36,9 +36,9 @@ public void testDiffer() { // Process the input. PCollection currentGraph = - PipelineUtils.readMcfFiles(Paths.get(currentFile, "*.mcf").toString(), p); + PipelineUtils.readMcfFiles("test", Paths.get(currentFile, "*.mcf").toString(), p); PCollection previousGraph = - PipelineUtils.readMcfFiles(Paths.get(previousFile, "*.mcf").toString(), p); + PipelineUtils.readMcfFiles("test", Paths.get(previousFile, "*.mcf").toString(), p); PCollectionTuple currentNodesTuple = DifferUtils.processGraph(currentGraph); PCollectionTuple previousNodesTuple = DifferUtils.processGraph(previousGraph); diff --git a/pipeline/ingestion/src/main/java/org/datacommons/ingestion/pipeline/GraphIngestionPipeline.java b/pipeline/ingestion/src/main/java/org/datacommons/ingestion/pipeline/GraphIngestionPipeline.java index 9f25efc1..dfa918ea 100644 --- a/pipeline/ingestion/src/main/java/org/datacommons/ingestion/pipeline/GraphIngestionPipeline.java +++ b/pipeline/ingestion/src/main/java/org/datacommons/ingestion/pipeline/GraphIngestionPipeline.java @@ -4,18 +4,18 @@ import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonParser; -import java.util.ArrayList; import java.util.List; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.gcp.spanner.SpannerWriteResult; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.Values; +import org.apache.beam.sdk.transforms.Wait; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TypeDescriptor; import org.datacommons.ingestion.spanner.SpannerClient; import org.datacommons.ingestion.util.GraphReader; import org.datacommons.ingestion.util.PipelineUtils; @@ -70,12 +70,6 @@ public static void buildPipeline( Pipeline pipeline, IngestionPipelineOptions options, SpannerClient spannerClient) { LOGGER.info("Running import pipeline for imports: {}", options.getImportList()); - // Initialize lists to hold mutations from all imports. - List> deleteOpsList = new ArrayList<>(); - List> obsMutationList = new ArrayList<>(); - List> edgeMutationList = new ArrayList<>(); - List> nodeMutationList = new ArrayList<>(); - // Parse the input import list JSON. JsonElement jsonElement = JsonParser.parseString(options.getImportList()); JsonArray jsonArray = jsonElement.getAsJsonArray(); @@ -97,37 +91,8 @@ public static void buildPipeline( String graphPath = pathElement.getAsString(); // Process the individual import. - processImport( - pipeline, - spannerClient, - importName, - graphPath, - options.getSkipDelete(), - deleteOpsList, - nodeMutationList, - edgeMutationList, - obsMutationList); - } - // Finally, aggregate all collected mutations and write them to Spanner. - // 1. Process Deletes: - // First, execute all delete mutations to clear old data for the imports. - PCollection deleted = - PCollectionList.of(deleteOpsList).apply("DeleteOps", Flatten.pCollections()); - - // 2. Process Observations: - // Write observation mutations after deletes are complete. - if (options.getWriteObsGraph()) { - spannerClient.writeMutations(pipeline, "Observations", obsMutationList, deleted); + processImport(pipeline, spannerClient, importName, graphPath, options.getSkipDelete()); } - - // 3. Process Nodes: - // Write node mutations after deletes are complete. - SpannerWriteResult writtenNodes = - spannerClient.writeMutations(pipeline, "Nodes", nodeMutationList, deleted); - - // 4. Process Edges: - // Write edge mutations only after node mutations are complete to ensure referential integrity. - spannerClient.writeMutations(pipeline, "Edges", edgeMutationList, writtenNodes.getOutput()); } /** @@ -138,67 +103,94 @@ public static void buildPipeline( * @param importName The name of the import. * @param graphPath The full path to the graph data. * @param skipDelete Whether to skip delete operations. - * @param deleteOpsList List to collect delete Ops. - * @param nodeMutationList List to collect node mutations. - * @param edgeMutationList List to collect edge mutations. - * @param obsMutationList List to collect observation mutations. */ private static void processImport( Pipeline pipeline, SpannerClient spannerClient, String importName, String graphPath, - boolean skipDelete, - List> deleteOpsList, - List> nodeMutationList, - List> edgeMutationList, - List> obsMutationList) { + boolean skipDelete) { LOGGER.info("Import: {} Graph path: {}", importName, graphPath); String provenance = "dc/base/" + importName; // 1. Prepare Deletes: // Generate mutations to delete existing data for this import/provenance. + // Create a dummy signal if deletes are skipped, so downstream dependencies are satisfied + // immediately. + PCollection deleteObsWait; + PCollection deleteEdgesWait; if (!skipDelete) { - List> deleteOps = - GraphReader.deleteExistingDataForImport(importName, provenance, pipeline, spannerClient); - deleteOpsList.addAll(deleteOps); + deleteObsWait = + spannerClient.deleteDataForImport( + pipeline, importName, spannerClient.getObservationTableName(), "import_name"); + deleteEdgesWait = + spannerClient.deleteDataForImport( + pipeline, provenance, spannerClient.getEdgeTableName(), "provenance"); + } else { + deleteObsWait = + pipeline.apply( + "CreateEmptyObsWait-" + importName, Create.empty(TypeDescriptor.of(Void.class))); + deleteEdgesWait = + pipeline.apply( + "CreateEmptyEdgesWait-" + importName, Create.empty(TypeDescriptor.of(Void.class))); } // 2. Read and Split Graph: // Read the graph data (TFRecord or MCF files) and split into schema and observation nodes. PCollection graph = graphPath.contains("tfrecord") - ? PipelineUtils.readMcfGraph(graphPath, pipeline) - : PipelineUtils.readMcfFiles(graphPath, pipeline); - PCollectionTuple graphNodes = PipelineUtils.splitGraph(graph); + ? PipelineUtils.readMcfGraph(importName, graphPath, pipeline) + : PipelineUtils.readMcfFiles(importName, graphPath, pipeline); + PCollectionTuple graphNodes = PipelineUtils.splitGraph(importName, graph); PCollection observationNodes = graphNodes.get(PipelineUtils.OBSERVATION_NODES_TAG); PCollection schemaNodes = graphNodes.get(PipelineUtils.SCHEMA_NODES_TAG); // 3. Process Schema Nodes: - // Combine schema nodes if required, then convert to Node and Edge mutations. + // Combine nodes if required. PCollection combinedGraph = schemaNodes; if (IMPORTS_TO_COMBINE.contains(importName)) { - combinedGraph = PipelineUtils.combineGraphNodes(schemaNodes); + combinedGraph = PipelineUtils.combineGraphNodes(importName, schemaNodes); } + + // Convert all nodes to mutations PCollection nodeMutations = GraphReader.graphToNodes( - importName, combinedGraph, spannerClient, nodeCounter, nodeInvalidTypeCounter) + "NodeMutations-" + importName, + combinedGraph, + spannerClient, + nodeCounter, + nodeInvalidTypeCounter) .apply("ExtractNodeMutations-" + importName, Values.create()); PCollection edgeMutations = - GraphReader.graphToEdges(importName, combinedGraph, provenance, spannerClient, edgeCounter) + GraphReader.graphToEdges( + "EdgeMutations-" + importName, + combinedGraph, + provenance, + spannerClient, + edgeCounter) .apply("ExtractEdgeMutations-" + importName, Values.create()); - nodeMutationList.add(nodeMutations); - edgeMutationList.add(edgeMutations); + // Write Nodes + SpannerWriteResult writtenNodes = + spannerClient.writeMutations(pipeline, "WriteNodesToSpanner-" + importName, nodeMutations); + + // Write Edges (wait for Nodes write and Edges delete) + edgeMutations.apply( + "EdgesWaitOn-" + importName, Wait.on(List.of(writtenNodes.getOutput(), deleteEdgesWait))); + spannerClient.writeMutations(pipeline, "WriteEdgesToSpanner-" + importName, edgeMutations); // 4. Process Observation Nodes: // Build an optimized graph from observation nodes and convert to Observation mutations. PCollection optimizedGraph = - PipelineUtils.buildOptimizedMcfGraph(observationNodes); + PipelineUtils.buildOptimizedMcfGraph(importName, observationNodes); PCollection observationMutations = GraphReader.graphToObservations(optimizedGraph, importName, spannerClient, obsCounter) - .apply("ExtractObsMutations", Values.create()); - obsMutationList.add(observationMutations); + .apply("ExtractObsMutations-" + importName, Values.create()); + // Write Observations (wait for Obs delete) + observationMutations.apply("ObsWaitOn-" + importName, Wait.on(deleteObsWait)); + + spannerClient.writeMutations( + pipeline, "WriteObservationsToSpanner-" + importName, observationMutations); } } diff --git a/pipeline/ingestion/src/test/java/org/datacommons/ingestion/pipeline/GraphIngestionPipelineIntegrationTest.java b/pipeline/ingestion/src/test/java/org/datacommons/ingestion/pipeline/GraphIngestionPipelineIntegrationTest.java index f6611582..78cf063b 100644 --- a/pipeline/ingestion/src/test/java/org/datacommons/ingestion/pipeline/GraphIngestionPipelineIntegrationTest.java +++ b/pipeline/ingestion/src/test/java/org/datacommons/ingestion/pipeline/GraphIngestionPipelineIntegrationTest.java @@ -23,7 +23,6 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; -import java.util.UUID; import org.apache.beam.runners.dataflow.DataflowRunner; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.Pipeline; @@ -86,7 +85,7 @@ public class GraphIngestionPipelineIntegrationTest { private String region; private String emulatorHost; private boolean isLocal; - private String importName = "TestImport-" + UUID.randomUUID().toString(); + private String importName = "TestImport"; private String nodeNameValue = "Test Node Name"; private SpannerClient spannerClient; diff --git a/pipeline/ingestion/src/test/java/org/datacommons/ingestion/pipeline/GraphIngestionPipelineTest.java b/pipeline/ingestion/src/test/java/org/datacommons/ingestion/pipeline/GraphIngestionPipelineTest.java index 6a621d85..720d8d08 100644 --- a/pipeline/ingestion/src/test/java/org/datacommons/ingestion/pipeline/GraphIngestionPipelineTest.java +++ b/pipeline/ingestion/src/test/java/org/datacommons/ingestion/pipeline/GraphIngestionPipelineTest.java @@ -19,10 +19,8 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.TypeDescriptor; import org.datacommons.ingestion.spanner.SpannerClient; import org.junit.Before; @@ -121,33 +119,24 @@ public MockSpannerClient(PCollection deleteSignal, SpannerWriteResult mock } @Override - public PCollection deleteObservationsForImport(String importName, Pipeline pipeline) { - return deleteSignal; - } - - @Override - public PCollection deleteEdgesForImport(String provenance, Pipeline pipeline) { + public PCollection deleteDataForImport( + Pipeline pipeline, String importName, String tableName, String columnName) { return deleteSignal; } @Override public SpannerWriteResult writeMutations( - Pipeline pipeline, - String name, - List> mutationList, - PCollection waitSignal) { - - PCollectionList.of(mutationList) - .apply("Flatten" + name + "Mutations-Test", Flatten.pCollections()) - .apply( - "Capture" + name + "Mutations", - ParDo.of( - new DoFn() { - @ProcessElement - public void processElement(@Element Mutation m) { - capturedMutations.add(m); - } - })); + Pipeline pipeline, String name, PCollection mutations) { + + mutations.apply( + "Capture" + name + "Mutations", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement(@Element Mutation m) { + capturedMutations.add(m); + } + })); return mockWriteResult; } } diff --git a/pipeline/spanner/src/main/java/org/datacommons/ingestion/spanner/SpannerClient.java b/pipeline/spanner/src/main/java/org/datacommons/ingestion/spanner/SpannerClient.java index e364869e..137c3ced 100644 --- a/pipeline/spanner/src/main/java/org/datacommons/ingestion/spanner/SpannerClient.java +++ b/pipeline/spanner/src/main/java/org/datacommons/ingestion/spanner/SpannerClient.java @@ -38,13 +38,9 @@ import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.Wait; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionList; -import org.apache.beam.sdk.values.TypeDescriptor; import org.datacommons.ingestion.data.Edge; import org.datacommons.ingestion.data.Node; import org.datacommons.ingestion.data.Observation; @@ -89,35 +85,16 @@ protected SpannerClient(Builder builder) { } /** - * Helper method to flatten and write mutations to Spanner, optionally waiting on a signal. + * Helper method to write mutations to Spanner. * * @param pipeline The Beam pipeline. * @param name The name prefix for the transforms (e.g., "Node", "Edge"). - * @param mutationList The list of mutation PCollections to flatten. - * @param waitSignal Optional PCollection to wait on before writing. + * @param mutations The PCollection of mutations to write. * @return The result of the Spanner write operation. */ public SpannerWriteResult writeMutations( - Pipeline pipeline, - String name, - List> mutationList, - PCollection waitSignal) { - PCollection mutations; - if (mutationList.isEmpty()) { - mutations = - pipeline.apply( - "CreateEmpty" + name + "Mutations", Create.empty(TypeDescriptor.of(Mutation.class))); - } else { - mutations = - PCollectionList.of(mutationList) - .apply("Flatten" + name + "Mutations", Flatten.pCollections()); - } - - if (waitSignal != null) { - mutations = mutations.apply("WaitOn" + name, Wait.on(waitSignal)); - } - - return mutations.apply("Write" + name + "ToSpanner", getWriteTransform()); + Pipeline pipeline, String name, PCollection mutations) { + return mutations.apply(name, getWriteTransform()); } /** @@ -210,35 +187,25 @@ public void createDatabase() { } } - public PCollection deleteObservationsForImport(String importName, Pipeline pipeline) { - return pipeline - .apply("StartDeleteObservations", Create.of(importName)) - .apply( - "ExecuteDeleteObservationsDML", - ParDo.of( - new DeleteByColumnFn(this, observationTableName, "import_name", "importName"))); - } - - public PCollection deleteEdgesForImport(String provenance, Pipeline pipeline) { + public PCollection deleteDataForImport( + Pipeline pipeline, String importName, String tableName, String columnName) { + String stageName = tableName + "-" + importName.replaceFirst("^dc/base/", ""); return pipeline - .apply("StartDeleteEdges", Create.of(provenance)) + .apply("StartDelete" + stageName, Create.of(importName)) .apply( - "ExecuteDeleteEdgesDML", - ParDo.of(new DeleteByColumnFn(this, edgeTableName, "provenance", "provenance"))); + "ExecuteDelete" + stageName, + ParDo.of(new DeleteByColumnFn(this, tableName, columnName))); } static class DeleteByColumnFn extends DoFn { private final SpannerClient spannerClient; private final String tableName; private final String columnName; - private final String paramName; - public DeleteByColumnFn( - SpannerClient spannerClient, String tableName, String columnName, String paramName) { + public DeleteByColumnFn(SpannerClient spannerClient, String tableName, String columnName) { this.spannerClient = spannerClient; this.tableName = tableName; this.columnName = columnName; - this.paramName = paramName; } @ProcessElement @@ -258,8 +225,8 @@ public void processElement(ProcessContext c) { spannerClient.spannerInstanceId, spannerClient.spannerDatabaseId)); String dml = - String.format("DELETE FROM %s WHERE %s = @%s", tableName, columnName, paramName); - Statement statement = Statement.newBuilder(dml).bind(paramName).to(value).build(); + String.format("DELETE FROM %s WHERE %s = @%s", tableName, columnName, columnName); + Statement statement = Statement.newBuilder(dml).bind(columnName).to(value).build(); long rowCount = dbClient.executePartitionedUpdate(statement); LOGGER.info("Deleted {} rows from {} for {} {}", rowCount, tableName, columnName, value); c.output(null); diff --git a/pipeline/util/src/main/java/org/datacommons/ingestion/util/GraphReader.java b/pipeline/util/src/main/java/org/datacommons/ingestion/util/GraphReader.java index 19078295..3c7df45c 100644 --- a/pipeline/util/src/main/java/org/datacommons/ingestion/util/GraphReader.java +++ b/pipeline/util/src/main/java/org/datacommons/ingestion/util/GraphReader.java @@ -5,11 +5,9 @@ import java.io.Serializable; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; @@ -97,6 +95,9 @@ public static List graphToEdges(McfGraph graph, String provenance) { String dcid = GraphUtils.getPropertyValue(pv, "dcid"); String subjectId = !dcid.isEmpty() ? dcid : McfUtil.stripNamespace(nodeEntry.getKey()); for (Map.Entry entry : pv.entrySet()) { + if (entry.getKey().equals("dcid")) { + continue; + } for (TypedValue val : entry.getValue().getTypedValuesList()) { if (val.getType() != ValueType.RESOLVED_REF) { int valSize = val.getValue().getBytes(StandardCharsets.UTF_8).length; @@ -155,20 +156,13 @@ public static Observation graphToObservations(McfOptimizedGraph graph, String im return obs.build(); } - public static List> deleteExistingDataForImport( - String importName, String provenance, Pipeline pipeline, SpannerClient spannerClient) { - return Arrays.asList( - spannerClient.deleteObservationsForImport(importName, pipeline), - spannerClient.deleteEdgesForImport(provenance, pipeline)); - } - public static PCollection> graphToObservations( PCollection graph, String importName, SpannerClient spannerClient, Counter obsCounter) { return graph.apply( - "GraphToObs", + "GraphToObs-" + importName, ParDo.of( new DoFn>() { @ProcessElement diff --git a/pipeline/util/src/main/java/org/datacommons/ingestion/util/PipelineUtils.java b/pipeline/util/src/main/java/org/datacommons/ingestion/util/PipelineUtils.java index 657ca21e..6d869eec 100644 --- a/pipeline/util/src/main/java/org/datacommons/ingestion/util/PipelineUtils.java +++ b/pipeline/util/src/main/java/org/datacommons/ingestion/util/PipelineUtils.java @@ -87,9 +87,10 @@ public static McfOptimizedGraph parseToOptimizedGraph(byte[] element) { * @param p dataflow pipeline * @return PCollection of MCF graph proto */ - public static PCollection readMcfGraph(String files, Pipeline p) { - PCollection graph = readOptimizedMcfGraph(files, p); + public static PCollection readMcfGraph(String name, String files, Pipeline p) { + PCollection graph = readOptimizedMcfGraph(name, files, p); return graph.apply( + "MapObsToSeries-" + name, ParDo.of( new DoFn() { @ProcessElement @@ -109,15 +110,16 @@ public void processElement( * @param p Dataflow pipeline. * @return PCollection of McfOptimizedGraph proto. */ - public static PCollection readOptimizedMcfGraph(String files, Pipeline p) { + public static PCollection readOptimizedMcfGraph( + String name, String files, Pipeline p) { PCollection nodes = p.apply( - "ReadMcfGraph", + "ReadTFRecordFiles-" + name, TFRecordIO.read().from(files).withCompression(GZIP).withoutValidation()); PCollection graph = nodes.apply( - "ProcessGraph", + "ConvertToOptimizedGraph-" + name, ParDo.of( new DoFn() { @ProcessElement @@ -137,11 +139,11 @@ public void processElement( * @param p Dataflow pipeline. * @return PCollection of McfGraph proto. */ - public static PCollection readMcfFiles(String files, Pipeline p) { + public static PCollection readMcfFiles(String name, String files, Pipeline p) { String delimiter = "\n\n"; PCollection nodes = p.apply( - "ReadMcfFiles", + "ReadMcfFiles-" + name, TextIO.read() .withDelimiter(delimiter.getBytes()) .from(files) @@ -149,7 +151,7 @@ public static PCollection readMcfFiles(String files, Pipeline p) { PCollection mcf = nodes.apply( - "MapToGraph", + "MapMcfToGraph-" + name, MapElements.via( new SimpleFunction() { @Override @@ -160,9 +162,9 @@ public McfGraph apply(String input) { return mcf; } - public static PCollectionTuple splitGraph(PCollection graph) { + public static PCollectionTuple splitGraph(String name, PCollection graph) { return graph.apply( - "SplitGraph", + "SplitGraph-" + name, ParDo.of( new DoFn() { @ProcessElement @@ -188,11 +190,12 @@ public void process(ProcessContext c) { * @param graph PCollection of McfGraph protos.() * @return PCollection of McfOptimizedGraph protos. */ - public static PCollection buildOptimizedMcfGraph(PCollection graph) { + public static PCollection buildOptimizedMcfGraph( + String name, PCollection graph) { PCollection svObs = graph .apply( - "ExtractObs", + "ExtractObs-" + name, ParDo.of( new DoFn< McfGraph, KV>() { @@ -211,9 +214,9 @@ public void processElement( } } })) - .apply(GroupByKey.create()) + .apply("GroupByForObs-" + name, GroupByKey.create()) .apply( - "BuildOptimizedGraph", + "BuildOptimizedGraph-" + name, ParDo.of( new DoFn< KV>, @@ -250,10 +253,10 @@ public void processElement( * @param graph A PCollection of McfGraph protos to combine. * @return A PCollection of McfGraph protos, each containing a single combined node. */ - public static PCollection combineGraphNodes(PCollection graph) { + public static PCollection combineGraphNodes(String name, PCollection graph) { PCollection> graphNodes = graph.apply( - "MapGraphToNodes", + "MapGraphToNodes-" + name, ParDo.of( new DoFn>() { @ProcessElement @@ -269,7 +272,7 @@ public void processElement( PCollection> combined = graphNodes.apply( - "CombineGraphNodes", + "CombineGraphNodes-" + name, Combine.perKey( new Combine.CombineFn, PropertyValues>() { @Override @@ -330,7 +333,7 @@ public PropertyValues extractOutput(List accumulator) { PCollection combinedGraph = combined.apply( - "MapCombinedNodesToGraph", + "MapCombinedNodesToGraph-" + name, ParDo.of( new DoFn, McfGraph>() { @ProcessElement diff --git a/pipeline/util/src/test/java/org/datacommons/ingestion/util/GraphReaderTest.java b/pipeline/util/src/test/java/org/datacommons/ingestion/util/GraphReaderTest.java index f05a32a4..fd715628 100644 --- a/pipeline/util/src/test/java/org/datacommons/ingestion/util/GraphReaderTest.java +++ b/pipeline/util/src/test/java/org/datacommons/ingestion/util/GraphReaderTest.java @@ -173,6 +173,14 @@ public void testGraphToEdges() { .setType(ValueType.TEXT) .setValue("Subject Node")) .build()) + .putPvs( + "dcid", + McfGraph.Values.newBuilder() + .addTypedValues( + TypedValue.newBuilder() + .setType(ValueType.TEXT) + .setValue("dcid_subject")) + .build()) .putPvs( "typeOf", McfGraph.Values.newBuilder() diff --git a/pipeline/util/src/test/java/org/datacommons/ingestion/util/PipelineUtilsTest.java b/pipeline/util/src/test/java/org/datacommons/ingestion/util/PipelineUtilsTest.java index 28e6b87a..dc1a9bfd 100644 --- a/pipeline/util/src/test/java/org/datacommons/ingestion/util/PipelineUtilsTest.java +++ b/pipeline/util/src/test/java/org/datacommons/ingestion/util/PipelineUtilsTest.java @@ -105,7 +105,7 @@ public void testBuildOptimizedMcfGraph() { createStatVarObservationGraph( "obs4", "count_person", "country/India", "2022", "36.0"))); - PCollection result = PipelineUtils.buildOptimizedMcfGraph(input); + PCollection result = PipelineUtils.buildOptimizedMcfGraph("test", input); McfOptimizedGraph expected1 = McfOptimizedGraph.newBuilder() @@ -170,7 +170,7 @@ public void testCombineGraphNodes() { Map.of("propE", List.of("valE1")))); PCollection input = p.apply("CreateInput", Create.of(graph1, graph2)); - PCollection output = PipelineUtils.combineGraphNodes(input); + PCollection output = PipelineUtils.combineGraphNodes("test", input); PCollection mergedOutput = output.apply(