Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion pipeline/data/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,16 @@
<artifactId>protobuf-java</artifactId>
<version>${protobuf.java.version}</version>
</dependency>
<dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-storage</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package org.datacommons.ingestion.data;

import com.google.common.base.Joiner;
import java.nio.charset.StandardCharsets;

/** Util functions for the pipeline data model. */
public class DataUtils {

// Standard FNV-1a 32-bit constants
private static final int FNV_32_INIT = 0x811c9dc5;
private static final int FNV_32_PRIME = 0x01000193;

/**
* Generates a consistent facet ID using the FNV-1a 32-bit hash algorithm.
*
* <p>This is designed to replicate the legacy Go facet ID generation implementation in Mixer's
* GetFacetID function. See
* https://github.com/datacommonsorg/mixer/blob/0618c1f3ef80703c98fc97f6c6c6e5cd3d7c13d3/internal/util/util.go#L497-L515
*
* @param importName The name of the import this observation belongs to.
* @param measurementMethod The measurement method of the observation.
* @param observationPeriod The observation period of the observation.
* @param scalingFactor The scaling factor of the observation.
* @param unit The unit of the observation.
* @param isDcAggregate Whether the observation is a DC aggregate.
* @return A consistent facet ID string.
*/
public static String generateFacetId(
String importName,
String measurementMethod,
String observationPeriod,
String scalingFactor,
String unit,
boolean isDcAggregate) {
// Only include fields that are set in hash.
// This is so the hashes stay consistent if more fields are added.
String s =
Joiner.on("-").join(importName, measurementMethod, observationPeriod, scalingFactor, unit);
if (isDcAggregate) {
s += "-IsDcAggregate";
}

int hash = fnv1a32(s);

// Go's fmt.Sprint on a uint32 treats it as unsigned.
// We must do the same in Java to avoid negative string values.
return Integer.toUnsignedString(hash);
}

/**
* Computes the 32-bit FNV-1a hash of a string.
*
* <p>Note: Java does not provide a built-in FNV-1a implementation, so we implement it manually
* here.
*
* @param data The input string to hash.
* @return The FNV-1a 32-bit hash as an integer.
*/
private static int fnv1a32(String data) {
int hash = FNV_32_INIT;
for (byte b : data.getBytes(StandardCharsets.UTF_8)) {
hash ^= (b & 0xff); // Bitwise XOR with the unsigned byte value
hash *= FNV_32_PRIME;
}
return hash;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -270,11 +270,9 @@ public Builder provenanceUrl(String provenanceUrl) {
}

public Observation build() {
int intHash =
Objects.hash(
this.facetId =
DataUtils.generateFacetId(
importName, measurementMethod, observationPeriod, scalingFactor, unit, isDcAggregate);
// Convert to positive long and then to string
this.facetId = String.valueOf((long) intHash & 0x7fffffffL);
return new Observation(this);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package org.datacommons.ingestion.data;

import static org.junit.Assert.assertEquals;

import java.util.Arrays;
import java.util.Collection;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;

@RunWith(Parameterized.class)
public class DataUtilsTest {

private final String expectedId;
private final String importName;
private final String measurementMethod;
private final String observationPeriod;
private final String scalingFactor;
private final String unit;
private final boolean isDcAggregate;

public DataUtilsTest(
String expectedId,
String importName,
String measurementMethod,
String observationPeriod,
String scalingFactor,
String unit,
boolean isDcAggregate) {
this.expectedId = expectedId;
this.importName = importName;
this.measurementMethod = measurementMethod;
this.observationPeriod = observationPeriod;
this.scalingFactor = scalingFactor;
this.unit = unit;
this.isDcAggregate = isDcAggregate;
}

// This method provides the data for the test below
@Parameters(name = "Test {index}: expected {0} for {1}")
public static Collection<Object[]> data() {
return Arrays.asList(
new Object[][] {
// Format: expectedId, importName, measurementMethod, observationPeriod, scalingFactor,
// unit, isDcAggregate
{"3981252704", "WorldDevelopmentIndicators", "", "P1Y", "", "", false},
{
"10983471",
"CensusACS5YearSurvey_SubjectTables_S2601A",
"CensusACS5yrSurveySubjectTable",
"",
"",
"",
false
},
{"2825511676", "CDC_Mortality_UnderlyingCause", "", "", "", "", false},
{"1226172227", "CensusACS1YearSurvey", "CensusACS1yrSurvey", "", "", "", false},
{"2176550201", "USCensusPEP_Annual_Population", "CensusPEPSurvey", "P1Y", "", "", false},
{"2645850372", "CensusACS5YearSurvey_AggCountry", "CensusACS5yrSurvey", "", "", "", true},
{
"1541763368",
"USDecennialCensus_RedistrictingRelease",
"USDecennialCensus",
"",
"",
"",
false
},
{
"4181918134",
"OECDRegionalDemography_Population",
"OECDRegionalStatistics",
"P1Y",
"",
"",
false
},
{
"1964317807",
"CensusACS5YearSurvey_SubjectTables_S0101",
"CensusACS5yrSurveySubjectTable",
"",
"",
"",
false
},
{"2517965213", "CensusPEP", "CensusPEPSurvey", "", "", "", false}
});
}

@Test
public void testGenerateFacetId() {
String facetId =
DataUtils.generateFacetId(
importName, measurementMethod, observationPeriod, scalingFactor, unit, isDcAggregate);

assertEquals(expectedId, facetId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -206,40 +206,45 @@ public void testParseTimeSeriesRow() {
new NodesEdges()
.addNode(
Node.builder()
.subjectId("dc/os/Mean_PrecipitableWater_Atmosphere_geoId_sch2915390_870755137")
.value("dc/os/Mean_PrecipitableWater_Atmosphere_geoId_sch2915390_870755137")
.name("Mean_PrecipitableWater_Atmosphere | geoId/sch2915390 | 870755137")
.subjectId(
"dc/os/Mean_PrecipitableWater_Atmosphere_geoId_sch2915390_4134842815")
.value("dc/os/Mean_PrecipitableWater_Atmosphere_geoId_sch2915390_4134842815")
.name("Mean_PrecipitableWater_Atmosphere | geoId/sch2915390 | 4134842815")
.types(List.of("StatVarObsSeries"))
.build())
.addNode(
Node.builder()
.subjectId("jVWNIHt73yOspqKD0fnvTCH8GCW7m38F3gW+JB+aWms=")
.value("Mean_PrecipitableWater_Atmosphere | geoId/sch2915390 | 870755137")
.subjectId("jKdXZgRFpeibUoXpgItXgC+oCoPMFsqP5UFyqNJ+Xss=")
.value("Mean_PrecipitableWater_Atmosphere | geoId/sch2915390 | 4134842815")
.build())
.addEdge(
Edge.builder()
.subjectId("dc/os/Mean_PrecipitableWater_Atmosphere_geoId_sch2915390_870755137")
.subjectId(
"dc/os/Mean_PrecipitableWater_Atmosphere_geoId_sch2915390_4134842815")
.predicate("variableMeasured")
.objectId("Mean_PrecipitableWater_Atmosphere")
.provenance("dc/base/NOAA_GFS_WeatherForecast")
.build())
.addEdge(
Edge.builder()
.subjectId("dc/os/Mean_PrecipitableWater_Atmosphere_geoId_sch2915390_870755137")
.subjectId(
"dc/os/Mean_PrecipitableWater_Atmosphere_geoId_sch2915390_4134842815")
.predicate("observationAbout")
.objectId("geoId/sch2915390")
.provenance("dc/base/NOAA_GFS_WeatherForecast")
.build())
.addEdge(
Edge.builder()
.subjectId("dc/os/Mean_PrecipitableWater_Atmosphere_geoId_sch2915390_870755137")
.subjectId(
"dc/os/Mean_PrecipitableWater_Atmosphere_geoId_sch2915390_4134842815")
.predicate("name")
.objectId("jVWNIHt73yOspqKD0fnvTCH8GCW7m38F3gW+JB+aWms=")
.objectId("jKdXZgRFpeibUoXpgItXgC+oCoPMFsqP5UFyqNJ+Xss=")
.provenance("dc/base/NOAA_GFS_WeatherForecast")
.build())
.addEdge(
Edge.builder()
.subjectId("dc/os/Mean_PrecipitableWater_Atmosphere_geoId_sch2915390_870755137")
.subjectId(
"dc/os/Mean_PrecipitableWater_Atmosphere_geoId_sch2915390_4134842815")
.predicate("typeOf")
.objectId("StatVarObsSeries")
.provenance("dc/base/NOAA_GFS_WeatherForecast")
Expand Down