diff --git a/pipeline/data/pom.xml b/pipeline/data/pom.xml index c43e68f4..0d1b53ee 100644 --- a/pipeline/data/pom.xml +++ b/pipeline/data/pom.xml @@ -43,10 +43,16 @@ protobuf-java ${protobuf.java.version} - + com.google.cloud google-cloud-storage + + junit + junit + ${junit.version} + test + diff --git a/pipeline/data/src/main/java/org/datacommons/ingestion/data/DataUtils.java b/pipeline/data/src/main/java/org/datacommons/ingestion/data/DataUtils.java new file mode 100644 index 00000000..02ec744c --- /dev/null +++ b/pipeline/data/src/main/java/org/datacommons/ingestion/data/DataUtils.java @@ -0,0 +1,62 @@ +package org.datacommons.ingestion.data; + +import com.google.common.base.Joiner; +import java.nio.charset.StandardCharsets; + +/** Util functions for the pipeline data model. */ +public class DataUtils { + + // Standard FNV-1a 32-bit constants + private static final int FNV_32_INIT = 0x811c9dc5; + private static final int FNV_32_PRIME = 0x01000193; + + /** + * Generates a consistent facet ID using the FNV-1a 32-bit hash algorithm. + * + *

This is designed to replicate the legacy Go facet ID generation implementation in Mixer's + * GetFacetID function. See + * https://github.com/datacommonsorg/mixer/blob/0618c1f3ef80703c98fc97f6c6c6e5cd3d7c13d3/internal/util/util.go#L497-L515 + * + * @param builder The Observation builder containing the fields to hash. + * @return A consistent facet ID string. + */ + public static String generateFacetId(Observation.Builder builder) { + // Only include fields that are set in hash. + // This is so the hashes stay consistent if more fields are added. + String s = + Joiner.on("-") + .join( + builder.getImportName(), + builder.getMeasurementMethod(), + builder.getObservationPeriod(), + builder.getScalingFactor(), + builder.getUnit()); + if (builder.getIsDcAggregate()) { + s += "-IsDcAggregate"; + } + + int hash = fnv1a32(s); + + // Go's fmt.Sprint on a uint32 treats it as unsigned. + // We must do the same in Java to avoid negative string values. + return Integer.toUnsignedString(hash); + } + + /** + * Computes the 32-bit FNV-1a hash of a string. + * + *

Note: Java does not provide a built-in FNV-1a implementation, so we implement it manually + * here. + * + * @param data The input string to hash. + * @return The FNV-1a 32-bit hash as an integer. + */ + private static int fnv1a32(String data) { + int hash = FNV_32_INIT; + for (byte b : data.getBytes(StandardCharsets.UTF_8)) { + hash ^= (b & 0xff); // Bitwise XOR with the unsigned byte value + hash *= FNV_32_PRIME; + } + return hash; + } +} diff --git a/pipeline/data/src/main/java/org/datacommons/ingestion/data/Observation.java b/pipeline/data/src/main/java/org/datacommons/ingestion/data/Observation.java index cf046947..1ca7d43e 100644 --- a/pipeline/data/src/main/java/org/datacommons/ingestion/data/Observation.java +++ b/pipeline/data/src/main/java/org/datacommons/ingestion/data/Observation.java @@ -269,12 +269,32 @@ public Builder provenanceUrl(String provenanceUrl) { return this; } + public String getImportName() { + return importName; + } + + public String getMeasurementMethod() { + return measurementMethod; + } + + public String getObservationPeriod() { + return observationPeriod; + } + + public String getScalingFactor() { + return scalingFactor; + } + + public String getUnit() { + return unit; + } + + public boolean getIsDcAggregate() { + return isDcAggregate; + } + public Observation build() { - int intHash = - Objects.hash( - importName, measurementMethod, observationPeriod, scalingFactor, unit, isDcAggregate); - // Convert to positive long and then to string - this.facetId = String.valueOf((long) intHash & 0x7fffffffL); + this.facetId = DataUtils.generateFacetId(this); return new Observation(this); } } diff --git a/pipeline/data/src/test/java/org/datacommons/ingestion/data/DataUtilsTest.java b/pipeline/data/src/test/java/org/datacommons/ingestion/data/DataUtilsTest.java new file mode 100644 index 00000000..5d81e9cc --- /dev/null +++ b/pipeline/data/src/test/java/org/datacommons/ingestion/data/DataUtilsTest.java @@ -0,0 +1,106 @@ +package org.datacommons.ingestion.data; + +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import java.util.Collection; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +@RunWith(Parameterized.class) +public class DataUtilsTest { + + private final String expectedId; + private final String importName; + private final String measurementMethod; + private final String observationPeriod; + private final String scalingFactor; + private final String unit; + private final boolean isDcAggregate; + + public DataUtilsTest( + String expectedId, + String importName, + String measurementMethod, + String observationPeriod, + String scalingFactor, + String unit, + boolean isDcAggregate) { + this.expectedId = expectedId; + this.importName = importName; + this.measurementMethod = measurementMethod; + this.observationPeriod = observationPeriod; + this.scalingFactor = scalingFactor; + this.unit = unit; + this.isDcAggregate = isDcAggregate; + } + + // This method provides the data for the test below + @Parameters(name = "Test {index}: expected {0}") + public static Collection data() { + return Arrays.asList( + new Object[][] { + // Format: expectedId, importName, measurementMethod, observationPeriod, scalingFactor, + // unit, isDcAggregate + {"3981252704", "WorldDevelopmentIndicators", "", "P1Y", "", "", false}, + { + "10983471", + "CensusACS5YearSurvey_SubjectTables_S2601A", + "CensusACS5yrSurveySubjectTable", + "", + "", + "", + false + }, + {"2825511676", "CDC_Mortality_UnderlyingCause", "", "", "", "", false}, + {"1226172227", "CensusACS1YearSurvey", "CensusACS1yrSurvey", "", "", "", false}, + {"2176550201", "USCensusPEP_Annual_Population", "CensusPEPSurvey", "P1Y", "", "", false}, + {"2645850372", "CensusACS5YearSurvey_AggCountry", "CensusACS5yrSurvey", "", "", "", true}, + { + "1541763368", + "USDecennialCensus_RedistrictingRelease", + "USDecennialCensus", + "", + "", + "", + false + }, + { + "4181918134", + "OECDRegionalDemography_Population", + "OECDRegionalStatistics", + "P1Y", + "", + "", + false + }, + { + "1964317807", + "CensusACS5YearSurvey_SubjectTables_S0101", + "CensusACS5yrSurveySubjectTable", + "", + "", + "", + false + }, + {"2517965213", "CensusPEP", "CensusPEPSurvey", "", "", "", false} + }); + } + + @Test + public void testGenerateFacetId() { + Observation.Builder builder = + Observation.builder() + .importName(importName) + .measurementMethod(measurementMethod) + .observationPeriod(observationPeriod) + .scalingFactor(scalingFactor) + .unit(unit) + .isDcAggregate(isDcAggregate); + String facetId = DataUtils.generateFacetId(builder); + + assertEquals(expectedId, facetId); + } +} diff --git a/pipeline/util/src/test/java/org/datacommons/ingestion/util/CacheReaderTest.java b/pipeline/util/src/test/java/org/datacommons/ingestion/util/CacheReaderTest.java index 636fd7b7..5a913365 100644 --- a/pipeline/util/src/test/java/org/datacommons/ingestion/util/CacheReaderTest.java +++ b/pipeline/util/src/test/java/org/datacommons/ingestion/util/CacheReaderTest.java @@ -206,40 +206,45 @@ public void testParseTimeSeriesRow() { new NodesEdges() .addNode( Node.builder() - .subjectId("dc/os/Mean_PrecipitableWater_Atmosphere_geoId_sch2915390_870755137") - .value("dc/os/Mean_PrecipitableWater_Atmosphere_geoId_sch2915390_870755137") - .name("Mean_PrecipitableWater_Atmosphere | geoId/sch2915390 | 870755137") + .subjectId( + "dc/os/Mean_PrecipitableWater_Atmosphere_geoId_sch2915390_4134842815") + .value("dc/os/Mean_PrecipitableWater_Atmosphere_geoId_sch2915390_4134842815") + .name("Mean_PrecipitableWater_Atmosphere | geoId/sch2915390 | 4134842815") .types(List.of("StatVarObsSeries")) .build()) .addNode( Node.builder() - .subjectId("jVWNIHt73yOspqKD0fnvTCH8GCW7m38F3gW+JB+aWms=") - .value("Mean_PrecipitableWater_Atmosphere | geoId/sch2915390 | 870755137") + .subjectId("jKdXZgRFpeibUoXpgItXgC+oCoPMFsqP5UFyqNJ+Xss=") + .value("Mean_PrecipitableWater_Atmosphere | geoId/sch2915390 | 4134842815") .build()) .addEdge( Edge.builder() - .subjectId("dc/os/Mean_PrecipitableWater_Atmosphere_geoId_sch2915390_870755137") + .subjectId( + "dc/os/Mean_PrecipitableWater_Atmosphere_geoId_sch2915390_4134842815") .predicate("variableMeasured") .objectId("Mean_PrecipitableWater_Atmosphere") .provenance("dc/base/NOAA_GFS_WeatherForecast") .build()) .addEdge( Edge.builder() - .subjectId("dc/os/Mean_PrecipitableWater_Atmosphere_geoId_sch2915390_870755137") + .subjectId( + "dc/os/Mean_PrecipitableWater_Atmosphere_geoId_sch2915390_4134842815") .predicate("observationAbout") .objectId("geoId/sch2915390") .provenance("dc/base/NOAA_GFS_WeatherForecast") .build()) .addEdge( Edge.builder() - .subjectId("dc/os/Mean_PrecipitableWater_Atmosphere_geoId_sch2915390_870755137") + .subjectId( + "dc/os/Mean_PrecipitableWater_Atmosphere_geoId_sch2915390_4134842815") .predicate("name") - .objectId("jVWNIHt73yOspqKD0fnvTCH8GCW7m38F3gW+JB+aWms=") + .objectId("jKdXZgRFpeibUoXpgItXgC+oCoPMFsqP5UFyqNJ+Xss=") .provenance("dc/base/NOAA_GFS_WeatherForecast") .build()) .addEdge( Edge.builder() - .subjectId("dc/os/Mean_PrecipitableWater_Atmosphere_geoId_sch2915390_870755137") + .subjectId( + "dc/os/Mean_PrecipitableWater_Atmosphere_geoId_sch2915390_4134842815") .predicate("typeOf") .objectId("StatVarObsSeries") .provenance("dc/base/NOAA_GFS_WeatherForecast")