Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions packages/datacommons-mcp/.env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,13 @@ DC_TYPE=base
# =============================================================================

# Use these variables to run the server against non-prod (autopush, staging) or local instances
# of the Data Commons API and Search endpoints.
# of the Data Commons API.
# When using a local instance, you may also need to use the
# --skip-api-key-validation command-line flag if running without a DC_API_KEY.

# Root URL for a non-prod or local Data Commons API (mixer) instance
# DC_API_ROOT=http://localhost:8081/v2

# Root URL for a non-prod or local Data Commons Search (website) instance
# DC_SEARCH_ROOT=http://localhost:8080

# Root URL for Data Commons API key validation
# Configure for non-prod environments
# DC_API_KEY_VALIDATION_ROOT=https://api.datacommons.org
169 changes: 14 additions & 155 deletions packages/datacommons-mcp/datacommons_mcp/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import re
from pathlib import Path

import requests
from datacommons_client.client import DataCommonsClient

from datacommons_mcp._constrained_vars import place_statvar_constraint_mapping
Expand Down Expand Up @@ -60,35 +59,22 @@ def __init__(
self,
dc: DataCommonsClient,
search_scope: SearchScope = SearchScope.BASE_ONLY,
base_index: str = "base_uae_mem",
custom_index: str | None = None,
sv_search_base_url: str = "https://datacommons.org",
topic_store: TopicStore | None = None,
_place_like_constraints: list[str] | None = None,
*,
use_search_indicators: bool = False,
) -> None:
"""
Initialize the DCClient with a DataCommonsClient and search configuration.

Args:
dc: DataCommonsClient instance
search_scope: SearchScope enum controlling search behavior
base_index: Index to use for base DC searches
custom_index: Index to use for custom DC searches (None for base DC)
sv_search_base_url: Base URL for SV search endpoint
topic_store: Optional TopicStore for caching

# TODO(@jm-rivera): Remove this parameter once new endpoint is live.
_place_like_constraints: Optional list of place-like constraints
"""
self.dc = dc
self.search_scope = search_scope
self.base_index = base_index
self.custom_index = custom_index
# Precompute search indices to validate configuration at instantiation time
self.search_indices = self._compute_search_indices()
self.sv_search_base_url = sv_search_base_url
self.variable_cache = LruCache(128)

if topic_store is None:
Expand All @@ -100,31 +86,9 @@ def __init__(
else:
self._place_like_statvar_store = {}

self.use_search_indicators = use_search_indicators

#
# Initialization & Configuration
#
def _compute_search_indices(self) -> list[str]:
"""Compute and validate search indices based on the configured search_scope.

Raises a ValueError immediately for invalid configurations (e.g., CUSTOM_ONLY
without a custom_index).
"""
indices: list[str] = []

if self.search_scope in [SearchScope.CUSTOM_ONLY, SearchScope.BASE_AND_CUSTOM]:
if self.custom_index is not None and self.custom_index != "":
indices.append(self.custom_index)
elif self.search_scope == SearchScope.CUSTOM_ONLY:
raise ValueError(
"Custom index not configured but CUSTOM_ONLY search scope requested"
)

if self.search_scope in [SearchScope.BASE_ONLY, SearchScope.BASE_AND_CUSTOM]:
indices.append(self.base_index)

return indices

def _compute_place_like_statvar_store(self, constraints: list[str]) -> None:
"""Compute and cache place-like to statistical variable mappings.
Expand Down Expand Up @@ -275,10 +239,6 @@ def _get_topic_places_with_data(

return places_with_data

#
# New Search Indicators Endpoint (/api/nl/search-indicators)
#

def _check_topic_exists_recursive(
self, topic_dcid: str, place_dcids: list[str]
) -> bool:
Expand Down Expand Up @@ -359,60 +319,6 @@ def _expand_topics_to_variables(

return list(expanded_variables.values())

async def _call_search_indicators_temp(
self, queries: list[str], *, max_results: int = 10
) -> dict:
"""
Temporary method that mirrors search_svs but calls the new search-indicators endpoint.

This method takes the same arguments and returns the same structure as search_svs,
but uses the new /api/nl/search-indicators endpoint instead of /api/nl/search-vector.

This method is temporary to create a minimal delta between the two endpoints to minimize the impact of the change.
After the 1.0 release, this method should be removed in favor of a more complete implementation.

Returns:
Dictionary mapping query strings to lists of results with 'SV' and 'CosineScore' keys
"""
results_map = {}
endpoint_url = f"{self.sv_search_base_url}/api/nl/search-indicators"
headers = {"Content-Type": "application/json", **SURFACE_HEADER}

# Use precomputed indices based on configured search scope
indices = self.search_indices

for query in queries:
# Prepare parameters for the new endpoint
params = {
"queries": [query],
"limit_per_index": max_results,
"index": indices,
}

try:
response = await asyncio.to_thread(
requests.get,
endpoint_url,
params=params,
headers=headers, # noqa: S113
)
response.raise_for_status()
api_response = response.json()

# Transform the response to match search_svs format
transformed_results = self._transform_search_indicators_to_svs_format(
api_response, max_results=max_results
)
results_map[query] = transformed_results

except Exception as e: # noqa: BLE001
logger.error(
"An unexpected error occurred for query '%s': %s", query, e
)
results_map[query] = []

return results_map

def _call_fetch_indicators(self, queries: list[str]) -> dict:
"""
Helper method to call the datacommons-client fetch_indicators and transform the response.
Expand Down Expand Up @@ -473,43 +379,6 @@ def _call_fetch_indicators(self, queries: list[str]) -> dict:

return results_map

def _transform_search_indicators_to_svs_format(
self, api_response: dict, *, max_results: int = 10
) -> list[dict]:
"""
Transform search-indicators response to match search_svs format.

Returns:
List of dictionaries with 'SV' and 'CosineScore' keys
"""
results = []
query_results = api_response.get("queryResults", [])

for query_result in query_results:
for index_result in query_result.get("indexResults", []):
for indicator in index_result.get("results", []):
dcid = indicator.get("dcid")
if not dcid:
continue

# Extract score (default to 0.0 if not present)
score = indicator.get("score", 0.0)

results.append(
{
"SV": dcid,
"CosineScore": score,
"description": indicator.get("description"),
"alternate_descriptions": indicator.get(
"search_descriptions"
),
}
)

# Sort by score descending, then limit results
results.sort(key=lambda x: x["CosineScore"], reverse=True)
return results[:max_results]

async def fetch_indicators(
self,
query: str,
Expand Down Expand Up @@ -616,25 +485,24 @@ async def fetch_indicators(
}

async def _search_vector(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we refactor some of these methods and/or rename them? I think the current flow is
services.search_indicators -> services._search_vector -> client.fetch_indicators -> client._search_vector -> client._call_fetch_indicators
which is hard to follow and not clear what each step is actually doing or why it's grouped the way it is.

This could be a follow up as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. Will do this in a follow up. And will look into the async question in the other comment as well.

self, query: str, max_results: int = 10, *, include_topics: bool = True
self,
query: str,
# TODO(keyurs): Use max_results once it's supported by the underlying client.
# The noqa: ARG002 is to suppress the unused argument error.
max_results: int = 10, # noqa: ARG002
*,
include_topics: bool = True,
) -> dict:
"""
Search for topics and variables using the search-indicators or search-vector endpoint.
Search for topics and variables using the fetch_indicators library method.
"""
# Always include topics since we need to expand topics to variables.
if self.use_search_indicators:
logger.info("Calling legacy search-indicators endpoint for: '%s'", query)
search_results = await self._call_search_indicators_temp(
queries=[query],
max_results=max_results,
)
else:
logger.info("Calling client library fetch_indicators for: '%s'", query)
# Run the synchronous client method in a thread
search_results = await asyncio.to_thread(
self._call_fetch_indicators,
queries=[query],
)
logger.info("Calling client library fetch_indicators for: '%s'", query)
# Run the synchronous client method in a thread
search_results = await asyncio.to_thread(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this need to be in a thread and does this overall (clients._search_vector) method need to be async?

self._call_fetch_indicators,
queries=[query],
)

results = search_results.get(query, [])

Expand Down Expand Up @@ -839,19 +707,14 @@ def _create_base_dc_client(settings: BaseDCSettings) -> DCClient:
}
if settings.api_root:
logger.info("Using API root for base DC: %s", settings.api_root)
logger.info("Using search root for base DC: %s", settings.search_root)
dc_client_args["url"] = settings.api_root
dc = DataCommonsClient(**dc_client_args)

# Create DCClient
return DCClient(
dc=dc,
search_scope=SearchScope.BASE_ONLY,
base_index=settings.base_index,
custom_index=None,
sv_search_base_url=settings.search_root,
topic_store=topic_store,
use_search_indicators=settings.use_search_indicators,
)


Expand Down Expand Up @@ -884,11 +747,7 @@ def _create_custom_dc_client(settings: CustomDCSettings) -> DCClient:
return DCClient(
dc=dc,
search_scope=search_scope,
base_index=settings.base_index,
custom_index=settings.custom_index,
sv_search_base_url=settings.custom_dc_url, # Use custom_dc_url as sv_search_base_url
topic_store=topic_store,
# TODO (@jm-rivera): Remove place-like parameter new search endpoint is live.
_place_like_constraints=settings.place_like_constraints,
use_search_indicators=settings.use_search_indicators,
)
26 changes: 0 additions & 26 deletions packages/datacommons-mcp/datacommons_mcp/data_models/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,6 @@ class DCSettings(BaseSettings):
default="", alias="DC_API_KEY", description="API key for Data Commons"
)

use_search_indicators: bool = Field(
default=False,
alias="DC_USE_SEARCH_INDICATORS",
description="Whether to use the legacy search-indicators endpoint (True) or the client library (False) for fetching indicators.",
)

instructions_dir: str | None = Field(
default=None,
alias="DC_INSTRUCTIONS_DIR",
Expand All @@ -71,16 +65,6 @@ def __init__(self, **kwargs: dict[str, Any]) -> None:
alias="DC_TYPE",
description="Type of Data Commons (must be 'base')",
)
search_root: str = Field(
default="https://datacommons.org",
alias="DC_SEARCH_ROOT",
description="Search base URL for base DC",
)
base_index: str = Field(
default="base_uae_mem",
alias="DC_BASE_INDEX",
description="Search index for base DC",
)
topic_cache_paths: list[str] | None = Field(
default=None,
alias="DC_TOPIC_CACHE_PATHS",
Expand Down Expand Up @@ -130,16 +114,6 @@ def __init__(self, **kwargs: dict[str, Any]) -> None:
alias="DC_SEARCH_SCOPE",
description="Search scope for queries",
)
base_index: str = Field(
default="medium_ft",
alias="DC_BASE_INDEX",
description="Search index for base DC",
)
custom_index: str = Field(
default="user_all_minilm_mem",
alias="DC_CUSTOM_INDEX",
description="Search index for custom DC",
)
root_topic_dcids: list[str] | None = Field(
default=None,
alias="DC_ROOT_TOPIC_DCIDS",
Expand Down
Loading