-
Notifications
You must be signed in to change notification settings - Fork 4
[DCP] Remove DB and Table Auto Creation from DCP Service + Modify the schema to match Ingestion Pipeline #26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
5433649
c55f013
e676023
4baac14
54ef5b3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -35,25 +35,12 @@ class EdgeModel(Base): | |
| ) | ||
| predicate = sa.Column(String(1024), primary_key=True) | ||
| object_id = sa.Column(String(1024), primary_key=True) | ||
| object_value = sa.Column(String(OBJECT_VALUE_MAX_LENGTH), nullable=True) | ||
| object_bytes = sa.Column(sa.LargeBinary(), nullable=True) | ||
| object_hash = sa.Column(String(64), primary_key=True, nullable=True) | ||
| provenance = sa.Column(String(1024), primary_key=True, nullable=True) | ||
| # Use deferred to avoid loading the node data into memory | ||
| object_value_tokenlist = deferred( | ||
| sa.Column(Text(), nullable=True) | ||
| ) # TOKENLIST is a Spanner type, but represented as String in SQLAlchemy | ||
| provenance = sa.Column(String(1024), primary_key=True, nullable=False) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
|
|
||
| # Define relationships to both source and target nodes | ||
| source_node = relationship( | ||
| "NodeModel", foreign_keys=[subject_id], back_populates="outgoing_edges" | ||
| ) | ||
|
|
||
| # Indexes | ||
| __table_args__ = ( | ||
| # Index for object_value lookups | ||
| sa.Index("EdgeByObjectValue", "object_value"), | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removing the |
||
| ) | ||
|
|
||
| def __repr__(self): | ||
| return f"<EdgeModel(subject_id='{self.subject_id}', predicate='{self.predicate}', object_id='{self.object_id}')>" | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,14 +14,14 @@ | |
|
|
||
| import logging | ||
|
|
||
| from sqlalchemy import Engine, create_engine, inspect | ||
| from sqlalchemy import Engine, create_engine | ||
| from sqlalchemy.orm import Session, sessionmaker | ||
|
|
||
| from datacommons_db.models.base import Base | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| REQUIRED_TABLES = ["Edge", "Node", "Observation"] | ||
|
|
||
|
|
||
|
|
||
| def get_engine(project_id: str, instance_id: str, database_name: str) -> Engine: | ||
|
|
@@ -56,31 +56,4 @@ def get_session(project_id: str, instance_id: str, database_name: str) -> Sessio | |
| return session() | ||
|
|
||
|
|
||
| def initialize_db(project_id: str, instance_id: str, database_name: str): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. with this function removed, can you add some instructions to the README on how users can get initialize spanner? It might include something like:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes but I will do it in a follow up so i can reference the terraforms once they're checked in! Will do it tomorrow at the latest! |
||
| """Initialize the Spanner database. | ||
|
|
||
| Args: | ||
| project_id: GCP project ID | ||
| instance_id: Cloud Spanner instance ID | ||
| database_name: Cloud Spanner database name | ||
| """ | ||
| engine = get_engine(project_id, instance_id, database_name) | ||
|
|
||
| # Check if database is empty by inspecting existing tables | ||
| inspector = inspect(engine) | ||
| existing_tables = inspector.get_table_names() | ||
|
|
||
| # Check if all required tables exist | ||
| missing_tables = [ | ||
| table for table in REQUIRED_TABLES if table not in existing_tables | ||
| ] | ||
| if missing_tables: | ||
| logger.warning( | ||
| "Missing required tables in database %s: %s", database_name, missing_tables | ||
| ) | ||
|
|
||
| # Only create tables if database is completely empty | ||
| if not existing_tables or missing_tables: | ||
| # Import all models so they are properly initialized with the call to Base.metadata.create_all | ||
| logger.info("Creating tables %s in database %s", REQUIRED_TABLES, database_name) | ||
| Base.metadata.create_all(engine) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The removal of the
object_value,object_bytes, andobject_hashcolumns, along with theEdgeByObjectValueindex, is a breaking change that will cause runtime failures inGraphService.Specifically:
GraphService.node_model_to_graph_nodeandGraphService.coerce_edge_val_for_db_writestill attempt to access these deleted attributes onEdgeModelinstances.drop_tablescommand inapi_cli.pycallsGraphService.drop_tables(), which executes a raw SQLDROP INDEX EdgeByObjectValue. This will fail if the index is no longer present in the schema.provenancetonullable=Falsewhile it is part of the primary key will cause failures inGraphService.create_edge_modelif a provenance is not provided (as it defaults toNone).Please update
GraphServiceto align with the new schema or include those changes in this PR to maintain system integrity.