Skip to content
3 changes: 3 additions & 0 deletions nmdc_runtime/site/graphs.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ def apply_changesheet():

@graph
def apply_metadata_in():
"""
TODO: Document this function.
"""
# Note: We use `_` as a "placeholder" variable.
outputs = perform_mongo_updates(get_json_in())
_ = add_output_run_event(outputs)
Expand Down
7 changes: 6 additions & 1 deletion nmdc_runtime/site/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,9 @@ def get_json_in(context):

@op(required_resource_keys={"runtime_api_site_client", "mongo"})
def perform_mongo_updates(context, json_in):
"""
TODO: Document this function.
"""
mongo = context.resources.mongo
client: RuntimeApiSiteClient = context.resources.runtime_api_site_client
op_id = context.op_config.get("operation_id")
Expand All @@ -389,7 +392,9 @@ def perform_mongo_updates(context, json_in):
context.log.debug(f"{docs}")

rv = validate_json(
docs, mongo.db
docs,
mongo.db,
check_inter_document_references=True,
) # use *exact* same check as /metadata/json:validate
if rv["result"] == "errors":
raise Failure(str(rv["detail"]))
Expand Down
73 changes: 67 additions & 6 deletions tests/test_ops/test_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
)

from nmdc_runtime.site.graphs import apply_metadata_in
from tests.lib.faker import Faker


@pytest.fixture
Expand Down Expand Up @@ -46,17 +47,30 @@ def op_context():


def test_apply_metadata_in_functional_annotation_agg(op_context):
mongo = op_context.resources.mongo
db = op_context.resources.mongo.db # concise alias

# Insert the _referenced_ document first, so that referential integrity is maintained.
faker = Faker()
workflow_execution_id = "nmdc:wfmtan-13-hemh0a82.1"
workflow_execution: dict = faker.generate_metagenome_annotations(
1,
was_informed_by=['nmdc:dgns-00-000001'],
has_input=['nmdc:bsm-00-000001']
)[0]
workflow_execution["id"] = workflow_execution_id
assert db.workflow_execution.count_documents({"id": workflow_execution_id}) == 0
db.workflow_execution_set.insert_one(workflow_execution)

docs = {
"functional_annotation_agg": [
{
"was_generated_by": "nmdc:wfmtan-13-hemh0a82.1",
"was_generated_by": workflow_execution_id,
"gene_function_id": "KEGG.ORTHOLOGY:K00005",
"count": 10,
"type": "nmdc:FunctionalAnnotationAggMember",
},
{
"was_generated_by": "nmdc:wfmtan-13-hemh0a82.1",
"was_generated_by": workflow_execution_id,
"gene_function_id": "KEGG.ORTHOLOGY:K01426",
"count": 5,
"type": "nmdc:FunctionalAnnotationAggMember",
Expand All @@ -65,12 +79,12 @@ def test_apply_metadata_in_functional_annotation_agg(op_context):
}
# Ensure the docs are not already in the test database.
for doc_spec in docs["functional_annotation_agg"]:
mongo.db.functional_annotation_agg.delete_many(doc_spec)
db.functional_annotation_agg.delete_many(doc_spec)

extra_run_config_data = _ensure_job__metadata_in(
docs,
op_context.resources.runtime_api_user_client.username,
mongo.db,
db,
op_context.resources.runtime_api_site_client.client_id,
drs_object_exists_ok=True, # If there exists a DRS object with a matching checksum, use it.
)
Expand All @@ -80,8 +94,55 @@ def test_apply_metadata_in_functional_annotation_agg(op_context):
)

assert (
mongo.db.functional_annotation_agg.count_documents(
db.functional_annotation_agg.count_documents(
{"$or": docs["functional_annotation_agg"]}
)
== 2
)

# 🧹 Clean up the test database.
db.workflow_execution_set.delete_many({"id": workflow_execution_id})
for doc_spec in docs["functional_annotation_agg"]:
db.functional_annotation_agg.delete_many(doc_spec)


def test_apply_metadata_in_checks_referential_integrity(op_context):
db = op_context.resources.mongo.db # concise alias

# Confirm the _referenced_ document does not exist (anywhere the schema says it can).
workflow_execution_id = "nmdc:wfmtan-00-000000.1"
assert db.workflow_execution_set.count_documents({"id": workflow_execution_id}) == 0

functional_annotation_agg_member = {
"was_generated_by": workflow_execution_id,
"gene_function_id": "KEGG.ORTHOLOGY:K00005",
"count": 10,
"type": "nmdc:FunctionalAnnotationAggMember",
}

docs = {"functional_annotation_agg": [functional_annotation_agg_member]}

# Confirm the `functional_annotation_agg` document does not exist.
assert db.functional_annotation_agg.count_documents(
functional_annotation_agg_member
) == 0

# Note: This statement was copied from the `test_apply_metadata_in_functional_annotation_agg` test above.
extra_run_config_data = _ensure_job__metadata_in(
docs,
op_context.resources.runtime_api_user_client.username,
db,
op_context.resources.runtime_api_site_client.client_id,
drs_object_exists_ok=True, # If there exists a DRS object with a matching checksum, use it.
)

with pytest.raises(Exception) as exc_info:
apply_metadata_in.to_job(**preset_normal).execute_in_process(
run_config=extra_run_config_data
)
assert "references" in str(exc_info.value)

# Confirm the `functional_annotation_agg` document _still_ does not exist.
assert db.functional_annotation_agg.count_documents(
functional_annotation_agg_member
) == 0