Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 2 additions & 11 deletions packages/datacommons-api/datacommons_api/api_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from datacommons_api.app import app
from datacommons_api.core.config import get_config, initialize_config
from datacommons_api.core.logging import get_logger, setup_logging
from datacommons_db.session import get_session, initialize_db
from datacommons_db.session import get_session
from datacommons_api.services.graph_service import GraphService

setup_logging()
Expand Down Expand Up @@ -56,16 +56,7 @@ def start(
gcp_spanner_database_name=gcp_spanner_database_name,
)

# Initialize the database
logger.info("Initializing database...")
logger.info("GCP Project ID: %s", config.GCP_PROJECT_ID)
logger.info("GCP Spanner Instance ID: %s", config.GCP_SPANNER_INSTANCE_ID)
logger.info("GCP Spanner Database Name: %s", config.GCP_SPANNER_DATABASE_NAME)
initialize_db(
config.GCP_PROJECT_ID,
config.GCP_SPANNER_INSTANCE_ID,
config.GCP_SPANNER_DATABASE_NAME,
)

logger.info("Starting API server...")
uvicorn.run(
app,
Expand Down
13 changes: 2 additions & 11 deletions packages/datacommons-api/datacommons_api/services/graph_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,17 +202,8 @@ def node_model_to_graph_node(node: NodeModel) -> GraphNode:

property_value = {}

if edge.object_bytes:
# If the edge has bytes, decode them and add them to the property value
property_value["@value"] = base64.b64decode(edge.object_bytes).decode(
"utf-8"
)
elif edge.object_value:
# If the edge has a literal value, add it to the property value
property_value["@value"] = edge.object_value
else:
# If the edge has an object id, add it to the property value
property_value["@id"] = edge.object_id
# All edges are object IDs
property_value["@id"] = edge.object_id
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The assumption that all edges are object IDs might lead to semantically incorrect JSON-LD output for literal properties (such as names, descriptions, or numerical values). If the ingestion pipeline now stores literal values within the object_id column, they should be represented using the @value key in the JSON-LD document instead of @id. In JSON-LD, @id is reserved for IRIs/CURIEs, and using it for literal data can break downstream consumers expecting valid RDF semantics.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dwnoble can you confirm?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gemini's right, it's a little more complicated than this. Now literal values are stored back on the node table.

Christie has had a PR up for a few weeks (😓 ) that also addresses these schema updates. Lets both review that PR, get it in, and then rebase this PR on top of it-wdyt @gmechali ?


# If the edge has provenance, add it to the property value
if edge.provenance:
Expand Down
15 changes: 1 addition & 14 deletions packages/datacommons-db/datacommons_db/models/edge.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,25 +35,12 @@ class EdgeModel(Base):
)
predicate = sa.Column(String(1024), primary_key=True)
object_id = sa.Column(String(1024), primary_key=True)
object_value = sa.Column(String(OBJECT_VALUE_MAX_LENGTH), nullable=True)
object_bytes = sa.Column(sa.LargeBinary(), nullable=True)
object_hash = sa.Column(String(64), primary_key=True, nullable=True)
provenance = sa.Column(String(1024), primary_key=True, nullable=True)
# Use deferred to avoid loading the node data into memory
object_value_tokenlist = deferred(
sa.Column(Text(), nullable=True)
) # TOKENLIST is a Spanner type, but represented as String in SQLAlchemy
provenance = sa.Column(String(1024), primary_key=True, nullable=False)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The removal of the object_value, object_bytes, and object_hash columns, along with the EdgeByObjectValue index, is a breaking change that will cause runtime failures in GraphService.

Specifically:

  • AttributeError: GraphService.node_model_to_graph_node and GraphService.coerce_edge_val_for_db_write still attempt to access these deleted attributes on EdgeModel instances.
  • Database Error: The drop_tables command in api_cli.py calls GraphService.drop_tables(), which executes a raw SQL DROP INDEX EdgeByObjectValue. This will fail if the index is no longer present in the schema.
  • Integrity Error: Changing provenance to nullable=False while it is part of the primary key will cause failures in GraphService.create_edge_model if a provenance is not provided (as it defaults to None).

Please update GraphService to align with the new schema or include those changes in this PR to maintain system integrity.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The provenance column is now nullable=False and part of the primary key. However, the create_edge_model function in graph_service.py (line 127) still treats provenance as an optional argument. This inconsistency will lead to IntegrityError during database inserts if a provenance value is not provided. Additionally, the removal of object_value, object_bytes, and object_hash fields will cause AttributeError in coerce_edge_val_for_db_write and insert_node_models_batch within graph_service.py. While the write path is planned for removal, it is best practice to update or stub these dependent methods in this PR to avoid leaving the codebase in a broken state.


# Define relationships to both source and target nodes
source_node = relationship(
"NodeModel", foreign_keys=[subject_id], back_populates="outgoing_edges"
)

# Indexes
__table_args__ = (
# Index for object_value lookups
sa.Index("EdgeByObjectValue", "object_value"),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Removing the EdgeByObjectValue index here will cause the GraphService.drop_tables() method (located in packages/datacommons-api/datacommons_api/services/graph_service.py, line 512) to fail. That method contains a hardcoded SQL command DROP INDEX EdgeByObjectValue which will now target a non-existent index. Please update the drop_tables method to reflect this schema change.

)

def __repr__(self):
return f"<EdgeModel(subject_id='{self.subject_id}', predicate='{self.predicate}', object_id='{self.object_id}')>"
31 changes: 2 additions & 29 deletions packages/datacommons-db/datacommons_db/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@

import logging

from sqlalchemy import Engine, create_engine, inspect
from sqlalchemy import Engine, create_engine
from sqlalchemy.orm import Session, sessionmaker

from datacommons_db.models.base import Base

logger = logging.getLogger(__name__)

REQUIRED_TABLES = ["Edge", "Node", "Observation"]



def get_engine(project_id: str, instance_id: str, database_name: str) -> Engine:
Expand Down Expand Up @@ -56,31 +56,4 @@ def get_session(project_id: str, instance_id: str, database_name: str) -> Sessio
return session()


def initialize_db(project_id: str, instance_id: str, database_name: str):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with this function removed, can you add some instructions to the README on how users can get initialize spanner?

It might include something like:
(1) option 1, use terraform to deploy to GCP, and give instructions for provisioning spanner from there
(2) option 2, without terraform, clone the https://github.com/datacommonsorg/import/ repo, and use the import pipeline w / DirectRunner to initialize a spanner db

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes but I will do it in a follow up so i can reference the terraforms once they're checked in! Will do it tomorrow at the latest!

"""Initialize the Spanner database.

Args:
project_id: GCP project ID
instance_id: Cloud Spanner instance ID
database_name: Cloud Spanner database name
"""
engine = get_engine(project_id, instance_id, database_name)

# Check if database is empty by inspecting existing tables
inspector = inspect(engine)
existing_tables = inspector.get_table_names()

# Check if all required tables exist
missing_tables = [
table for table in REQUIRED_TABLES if table not in existing_tables
]
if missing_tables:
logger.warning(
"Missing required tables in database %s: %s", database_name, missing_tables
)

# Only create tables if database is completely empty
if not existing_tables or missing_tables:
# Import all models so they are properly initialized with the call to Base.metadata.create_all
logger.info("Creating tables %s in database %s", REQUIRED_TABLES, database_name)
Base.metadata.create_all(engine)
Loading