forked from Baffelan/sdmx-mcp-gateway
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain_server.py
More file actions
5053 lines (4388 loc) · 188 KB
/
main_server.py
File metadata and controls
5053 lines (4388 loc) · 188 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
SDMX MCP Gateway Server
A Model Context Protocol server for progressive SDMX data discovery.
Provides tools, resources, and prompts for exploring statistical data.
Supports both STDIO (development) and Streamable HTTP (production) transports.
Usage:
# Development (STDIO)
python main_server.py
# Production (Streamable HTTP)
python main_server.py --transport http
# With MCP Inspector
mcp dev ./main_server.py
"""
from __future__ import annotations
import argparse
import asyncio
import logging
import sys
from typing import Any
from mcp.server.fastmcp import Context, FastMCP
from pydantic import BaseModel, Field
# Import lifespan and context
from app_context import AppContext, app_lifespan
# Import structured output models
from models.schemas import (
CodeChange,
CodeInfo,
CodeOverlap,
ComparisonSummary,
ComponentInfo,
ConceptChange,
ConceptRef,
DataAvailabilityResult,
DataflowDiagramResult,
DataflowDimensionComparisonResult,
DataflowInfo,
DataflowListResult,
DataflowStructureResult,
DataflowSummary,
DataUrlResult,
DimensionChange,
DimensionCodesResult,
DimensionComparison,
DimensionInfo,
EndpointInfo,
EndpointListResult,
EndpointSwitchConfirmation,
EndpointSwitchResult,
FilterInfo,
KeyBuildResult,
PaginationInfo,
ReferenceChange,
RepresentationInfo,
StructureComparisonResult,
StructureDiagramResult,
StructureEdge,
StructureInfo,
StructureNode,
TimeOverlap,
TimeRange,
ValidationResult,
)
# Import resources and prompts
from prompts.sdmx_prompts import (
sdmx_best_practices,
sdmx_discovery_guide,
sdmx_query_builder,
sdmx_troubleshooting_guide,
)
from resources.sdmx_resources import (
get_agency_info,
get_sdmx_format_guide,
get_sdmx_query_syntax_guide,
list_known_agencies,
)
from sdmx_progressive_client import SDMXProgressiveClient
# Logger - configured lazily in main() to avoid early writes
logger = logging.getLogger(__name__)
# Initialize FastMCP server with lifespan
mcp = FastMCP(
"SDMX Data Gateway",
lifespan=app_lifespan,
)
# =============================================================================
# Helper Functions for Session Management
# =============================================================================
def get_session_client(ctx: Context[Any, Any, Any] | None) -> SDMXProgressiveClient:
"""
Get the SDMX client for the current session from lifespan context.
This ensures each session uses its own endpoint configuration,
preventing interference between users in multi-user deployments.
Args:
ctx: MCP Context object
Returns:
SDMXProgressiveClient configured for the current session's endpoint
"""
if ctx is None:
# Fallback to default session
from tools.sdmx_tools import get_default_client
return get_default_client()
try:
# Get the lifespan context (AppContext)
lifespan_ctx = ctx.request_context.lifespan_context
if isinstance(lifespan_ctx, AppContext):
return lifespan_ctx.get_client(ctx)
# Fallback to default
from tools.sdmx_tools import get_default_client
return get_default_client()
except (AttributeError, TypeError):
# Lifespan context not available, use default
from tools.sdmx_tools import get_default_client
return get_default_client()
def get_app_context(ctx: Context[Any, Any, Any] | None) -> AppContext | None:
"""
Get the AppContext from the lifespan context.
Args:
ctx: MCP Context object
Returns:
AppContext or None if not available
"""
if ctx is None:
return None
try:
lifespan_ctx = ctx.request_context.lifespan_context
if isinstance(lifespan_ctx, AppContext):
return lifespan_ctx
return None
except (AttributeError, TypeError):
return None
# =============================================================================
# Discovery Tools
# =============================================================================
@mcp.tool()
async def list_dataflows(
keywords: list[str] | None = None,
agency_id: str | None = None,
limit: int = 10,
offset: int = 0,
ctx: Context[Any, Any, Any] | None = None,
) -> DataflowListResult:
"""
List available SDMX dataflows, optionally filtered by keywords.
This is typically the first step in SDMX data discovery. Returns a list of
statistical domains (dataflows) available from the specified agency.
If you already know a country code or indicator code, consider using
find_code_usage_across_dataflows() instead — it directly discovers all
dataflows with data for that code, across all topics.
Args:
keywords: Optional list of keywords to filter dataflows
agency_id: The agency to query (uses session endpoint if not specified)
limit: Number of results to return (default: 10)
offset: Number of results to skip for pagination (default: 0)
Returns:
Structured result with dataflows, pagination info, and navigation hints
"""
from config import get_dataflow_agency
from tools.sdmx_tools import list_dataflows as list_dataflows_impl
# Get session-specific client for multi-user support
client = get_session_client(ctx)
user_provided_agency = agency_id is not None
agency_id = agency_id or client.agency_id
# When user didn't explicitly provide agency_id, check for dataflow listing override
# (e.g. OECD publishes under sub-agencies, requiring "all" for listing)
if not user_provided_agency:
ep_key = _get_session_endpoint_key(ctx)
df_agency = get_dataflow_agency(ep_key)
if df_agency:
agency_id = df_agency
result = await list_dataflows_impl(client, keywords, agency_id, limit, offset, ctx)
# Convert to structured output
if "error" in result:
# Return minimal result on error
return DataflowListResult(
discovery_level="overview",
agency_id=agency_id,
total_found=0,
showing=0,
offset=offset,
limit=limit,
keywords=keywords,
dataflows=[],
pagination=PaginationInfo(
has_more=False, next_offset=None, total_pages=0, current_page=1
),
next_step=f"Error: {result['error']}",
)
# Build structured result
dataflows = [
DataflowSummary(
id=df["id"],
name=df["name"],
description=df.get("description", ""),
)
for df in result.get("dataflows", [])
]
pagination = PaginationInfo(
has_more=result.get("pagination", {}).get("has_more", False),
next_offset=result.get("pagination", {}).get("next_offset"),
total_pages=result.get("pagination", {}).get("total_pages", 0),
current_page=result.get("pagination", {}).get("current_page", 1),
)
filter_info = None
if result.get("filter_info"):
fi = result["filter_info"]
filter_info = FilterInfo(
keywords_used=fi.get("keywords_used", []),
total_before_filter=fi.get("total_before_filter", 0),
total_after_filter=fi.get("total_after_filter", 0),
filter_reduced_by=fi.get("filter_reduced_by", 0),
)
return DataflowListResult(
discovery_level=result.get("discovery_level", "overview"),
agency_id=result.get("agency_id", agency_id),
total_found=result.get("total_found", len(dataflows)),
showing=result.get("showing", len(dataflows)),
offset=result.get("offset", offset),
limit=result.get("limit", limit),
keywords=result.get("keywords"),
dataflows=dataflows,
pagination=pagination,
filter_info=filter_info,
next_step=result.get("next_step", "Use get_dataflow_structure() to explore a dataflow"),
)
@mcp.tool()
async def get_dataflow_structure(
dataflow_id: str,
agency_id: str | None = None,
ctx: Context[Any, Any, Any] | None = None,
) -> DataflowStructureResult:
"""
Get detailed structure information for a specific dataflow.
Returns dimensions, attributes, measures, and codelist references.
Use this after list_dataflows() to understand data organization.
Args:
dataflow_id: The dataflow identifier
agency_id: The agency (uses session endpoint if not specified)
Returns:
Structured result with dataflow metadata and structure definition
"""
from tools.sdmx_tools import get_dataflow_structure as get_structure_impl
# Get session-specific client for multi-user support
client = get_session_client(ctx)
agency_id = agency_id or client.agency_id
result = await get_structure_impl(client, dataflow_id, agency_id, ctx)
if "error" in result:
# Return minimal structure on error
return DataflowStructureResult(
discovery_level="structure",
dataflow=DataflowInfo(
id=dataflow_id,
name=f"Error loading {dataflow_id}",
description=result["error"],
version="latest",
),
structure=StructureInfo(
id="unknown",
key_template="",
key_example="",
dimensions=[],
attributes=[],
measure=None,
),
next_steps=[f"Error: {result['error']}"],
)
# Build structured result from simplified response
dataflow = DataflowInfo(
id=dataflow_id,
name=result.get("dataflow_name", ""),
description="",
version="latest",
)
struct_data = result.get("structure", {})
dimensions = [
DimensionInfo(
id=dim.get("id", ""),
position=dim.get("position", 0),
type=dim.get("type", "Dimension"),
codelist=dim.get("codelist"),
)
for dim in struct_data.get("dimensions", [])
]
structure = StructureInfo(
id=struct_data.get("id", ""),
key_template=struct_data.get("key_template", ""),
key_example=struct_data.get("key_example", ""),
dimensions=dimensions,
attributes=struct_data.get("attributes", []),
measure=struct_data.get("measure"),
)
return DataflowStructureResult(
discovery_level=result.get("discovery_level", "structure"),
dataflow=dataflow,
structure=structure,
next_steps=result.get("next_steps", []),
)
@mcp.tool()
async def get_codelist(
codelist_id: str,
agency_id: str | None = None,
version: str = "latest",
search_term: str | None = None,
ctx: Context[Any, Any, Any] | None = None,
) -> dict[str, Any]:
"""
Get codes and values for a specific codelist.
Codelists define the allowed values for dimensions (e.g., country codes, commodity codes).
Use this to find the exact codes needed for your data query.
Args:
codelist_id: The codelist identifier
agency_id: The agency (uses session endpoint if not specified)
version: Version (default: "latest")
search_term: Optional search term to filter codes
Returns:
Dictionary with codelist information and codes
"""
# Get session-specific client for multi-user support
client = get_session_client(ctx)
agency_id = agency_id or client.agency_id
result = await client.browse_codelist(codelist_id, agency_id, version, search_term)
return result
@mcp.tool()
async def get_dimension_codes(
dataflow_id: str,
dimension_id: str,
limit: int = 50,
offset: int = 0,
agency_id: str | None = None,
ctx: Context[Any, Any, Any] | None = None,
) -> DimensionCodesResult:
"""
Get codes for a specific dimension of a dataflow.
This allows drilling down into specific dimensions without loading all codelists at once.
Useful for finding valid values for a particular dimension in your data query.
Args:
dataflow_id: The dataflow identifier
dimension_id: The dimension identifier
limit: Maximum codes to return (default: 50)
offset: Number of codes to skip for pagination (default: 0)
agency_id: The agency (uses session endpoint if not specified)
Returns:
Structured result with codes for the dimension
"""
from tools.sdmx_tools import get_dimension_codes as get_codes_impl
# Get session-specific client for multi-user support
client = get_session_client(ctx)
agency_id = agency_id or client.agency_id
result = await get_codes_impl(client, dataflow_id, dimension_id, agency_id, limit, offset, ctx)
if "error" in result:
return DimensionCodesResult(
discovery_level="codes",
dataflow_id=dataflow_id,
dimension_id=dimension_id,
position=0,
codelist_id=None,
total_codes=0,
showing=0,
search_term=None,
codes=[],
usage=f"Error: {result['error']}",
example_keys=[],
)
codes = [
CodeInfo(
id=code.get("id", ""),
name=code.get("name", ""),
description=code.get("description"),
)
for code in result.get("codes", [])
]
return DimensionCodesResult(
discovery_level=result.get("discovery_level", "codes"),
dataflow_id=result.get("dataflow_id", dataflow_id),
dimension_id=result.get("dimension_id", dimension_id),
position=result.get("position", 0),
codelist_id=result.get("codelist_id"),
total_codes=result.get("total_codes", len(codes)),
showing=result.get("showing", len(codes)),
search_term=None,
codes=codes,
usage=result.get("usage", result.get("next_step", "")),
example_keys=result.get("example_keys", []),
)
# =============================================================================
# Code Usage Discovery Tools
# =============================================================================
class CodeUsageInfo(BaseModel):
"""Information about a single code's usage."""
code: str = Field(description="The code being checked")
is_used: bool = Field(description="Whether this code has actual data")
dimension_id: str | None = Field(default=None, description="Dimension where code is used")
class CodeUsageResult(BaseModel):
"""Result from get_code_usage() tool."""
discovery_level: str = Field(default="code_usage", description="Discovery workflow level")
dataflow_id: str = Field(description="Dataflow checked")
dimension_id: str | None = Field(default=None, description="Dimension checked (if specific)")
constraint_id: str | None = Field(default=None, description="Actual constraint used")
codes_checked: list[CodeUsageInfo] = Field(description="Usage status for each code")
summary: dict[str, int] = Field(description="Summary counts: total_checked, used, unused")
all_used_codes: dict[str, list[str]] | None = Field(
default=None,
description="All codes with actual data per dimension (if no specific codes requested)",
)
interpretation: list[str] = Field(description="Human-readable explanation")
api_calls_made: int = Field(default=1, description="Number of API calls made")
class CrossDataflowUsageInfo(BaseModel):
"""Information about code usage across dataflows."""
dataflow_id: str = Field(description="Dataflow ID")
dataflow_version: str | None = Field(
default=None, description="Dataflow version from ConstraintAttachment"
)
dataflow_name: str | None = Field(default=None, description="Dataflow name (if available)")
dimension_id: str = Field(description="Dimension where code is used")
is_used: bool = Field(description="Whether code has actual data in this dataflow")
class CrossDataflowCodeUsageResult(BaseModel):
"""Result from find_code_usage_across_dataflows() tool."""
discovery_level: str = Field(default="cross_dataflow_usage", description="Discovery level")
dimension_id: str | None = Field(
default=None, description="Dimension filter (None = searched all dimensions)"
)
code: str = Field(description="Code checked")
total_dataflows_checked: int = Field(description="Dataflows checked for actual usage")
dataflows_with_data: list[CrossDataflowUsageInfo] = Field(
description="Dataflows where code has actual data"
)
summary: dict[str, int] = Field(
description="Summary: dataflows_checked, with_data, without_data"
)
interpretation: list[str] = Field(description="Human-readable explanation")
api_calls_made: int = Field(description="Number of API calls made")
@mcp.tool()
async def get_code_usage(
dataflow_id: str,
codes: list[str] | None = None,
dimension_id: str | None = None,
agency_id: str | None = None,
ctx: Context[Any, Any, Any] | None = None,
) -> CodeUsageResult:
"""
Efficiently check if specific codes are actually used in a dataflow's data.
This uses the Actual ContentConstraint (if available) to determine which
codes have real data, WITHOUT iterating through data queries. This is
much faster than trial-and-error data requests.
Use cases:
- "Is country code 'FJ' actually used in DF_SDG?"
- "Which indicator codes have data?" (leave codes empty)
- "Are these 5 codes I want to use valid AND have data?"
Args:
dataflow_id: The dataflow to check
codes: Optional list of specific codes to check. If empty, returns all used codes.
dimension_id: Optional dimension to check. If empty, checks all dimensions.
agency_id: The agency (uses session endpoint if not specified)
Returns:
CodeUsageResult with:
- codes_checked: List of codes with their usage status
- all_used_codes: All codes that have data (by dimension)
- summary: Counts of used/unused codes
Examples:
>>> get_code_usage("DF_SDG", codes=["FJ", "WS", "XX"], dimension_id="GEO_PICT")
# Checks if Fiji, Samoa, and "XX" have SDG data
>>> get_code_usage("DF_SDG", dimension_id="INDICATOR")
# Returns all indicator codes that actually have data
"""
client = get_session_client(ctx)
agency = agency_id or client.agency_id
ep_key = _get_session_endpoint_key(ctx)
api_calls = 0
if ctx:
await ctx.info("Checking code usage for " + dataflow_id + "...")
try:
info, fetch_calls = await _fetch_constraint_info(
client, dataflow_id, agency, endpoint_key=ep_key
)
api_calls += fetch_calls
if not info.used_codes:
return CodeUsageResult(
dataflow_id=dataflow_id,
dimension_id=dimension_id,
constraint_id=None,
codes_checked=[],
summary={"total_checked": 0, "used": 0, "unused": 0},
all_used_codes=None,
interpretation=[
"No ContentConstraint found for " + dataflow_id + ".",
"Cannot efficiently determine code usage.",
],
api_calls_made=api_calls,
)
constraint_id = info.constraint_id or ""
constraint_type = info.constraint_type or "Actual"
# Convert sets to sorted lists
all_used_codes: dict[str, list[str]] = {
dim_id: sorted(code_set)
for dim_id, code_set in info.used_codes.items()
}
# Check specific codes if provided
codes_checked: list[CodeUsageInfo] = []
if codes:
if dimension_id:
used_in_dim = info.used_codes.get(dimension_id, set())
for code in codes:
codes_checked.append(
CodeUsageInfo(
code=code, is_used=code in used_in_dim, dimension_id=dimension_id
)
)
else:
for code in codes:
found_in = None
for dim_id, dim_codes in info.used_codes.items():
if code in dim_codes:
found_in = dim_id
break
codes_checked.append(
CodeUsageInfo(
code=code, is_used=found_in is not None, dimension_id=found_in
)
)
used_count = sum(1 for c in codes_checked if c.is_used)
summary = {
"total_checked": len(codes_checked),
"used": used_count,
"unused": len(codes_checked) - used_count,
}
interpretation = [
"**Dataflow:** " + dataflow_id,
"**Constraint:** " + constraint_id + " (" + constraint_type + ")",
]
if constraint_type == "Allowed":
interpretation.append(
"**Note:** Using Allowed constraint (permitted codes, not confirmed in data)."
)
if codes:
interpretation.append(
"**Codes checked:** " + str(len(codes))
+ " - " + str(used_count) + " used, "
+ str(len(codes) - used_count) + " unused"
)
else:
interpretation.append("**Codes with data by dimension:**")
for dim_id, dim_codes in sorted(all_used_codes.items()):
interpretation.append(" - " + dim_id + ": " + str(len(dim_codes)) + " codes")
return CodeUsageResult(
dataflow_id=dataflow_id,
dimension_id=dimension_id,
constraint_id=constraint_id,
codes_checked=codes_checked,
summary=summary,
all_used_codes=all_used_codes if not codes else None,
interpretation=interpretation,
api_calls_made=api_calls,
)
except Exception as e:
logger.exception("Failed to check code usage")
return CodeUsageResult(
dataflow_id=dataflow_id,
dimension_id=dimension_id,
constraint_id=None,
codes_checked=[],
summary={"total_checked": 0, "used": 0, "unused": 0},
all_used_codes=None,
interpretation=["Error: " + str(e)],
api_calls_made=api_calls,
)
class TimeAvailabilityResult(BaseModel):
"""Result from check_time_availability() tool."""
discovery_level: str = Field(default="time_availability")
dataflow_id: str = Field(description="Dataflow checked")
query_period: str = Field(description="Period that was queried")
implied_frequency: str = Field(description="Implied frequency: A, S, Q, M, W, or D")
query_start: str = Field(description="Start of query period (ISO date)")
query_end: str = Field(description="End of query period (ISO date)")
availability: str = Field(
description="'no' (ruled out), 'plausible' (worth querying), "
"or 'plausible_different_frequency' (data exists but at different granularity)"
)
available_frequencies: list[str] = Field(description="FREQ codes from the constraint")
constraint_time_start: str | None = Field(
default=None, description="Earliest date in constraint TimeRange"
)
constraint_time_end: str | None = Field(
default=None, description="Latest date in constraint TimeRange"
)
overlap: str = Field(description="Time overlap: 'full', 'partial', or 'none'")
interpretation: list[str] = Field(description="Step-by-step reasoning")
recommendation: str = Field(description="Suggested next action")
api_calls_made: int = Field(default=1, description="Number of API calls made")
@mcp.tool()
async def check_time_availability(
dataflow_id: str,
query_period: str,
agency_id: str | None = None,
ctx: Context[Any, Any, Any] | None = None,
) -> TimeAvailabilityResult:
"""
Check whether a specific time period is likely to have data in a dataflow.
Uses the Actual ContentConstraint (FREQ values + TimeRange) to quickly
rule out periods that definitely have no data, without querying the data
itself. The constraint only tells us what CAN'T exist — a "plausible"
result means "worth querying", not "guaranteed to have data".
Use after identifying a dataflow and before building a data URL.
For confirmed availability, query the data directly via build_data_url().
Three-valued result:
- "no": constraint rules this out — don't bother querying
- "plausible": period within range and frequency matches — worth trying
- "plausible_different_frequency": data exists in this time window but
at different granularity (e.g. querying monthly but only annual exists)
Args:
dataflow_id: The dataflow to check
query_period: The period to check (e.g. "2010", "2010-Q1", "2010-01", "2010-W05")
agency_id: The agency (uses session endpoint if not specified)
Returns:
TimeAvailabilityResult with availability classification and reasoning
"""
from datetime import date as date_type
from utils import classify_time_overlap, parse_query_period
client = get_session_client(ctx)
agency = agency_id or client.agency_id
ep_key = _get_session_endpoint_key(ctx)
api_calls = 0
if ctx:
await ctx.info("Checking time availability for " + dataflow_id + " period " + query_period + "...")
# Parse the query period
try:
q_start, q_end, implied_freq = parse_query_period(query_period)
except ValueError as exc:
return TimeAvailabilityResult(
dataflow_id=dataflow_id,
query_period=query_period,
implied_frequency="?",
query_start="",
query_end="",
availability="no",
available_frequencies=[],
overlap="none",
interpretation=["Invalid period format: " + str(exc)],
recommendation="Fix the period format and try again. "
"Valid examples: 2010, 2010-Q1, 2010-01, 2010-M01, 2010-W01, 2010-01-15",
)
try:
info, fetch_calls = await _fetch_constraint_info(
client, dataflow_id, agency, endpoint_key=ep_key
)
api_calls += fetch_calls
if not info.used_codes and info.time_start is None:
return TimeAvailabilityResult(
dataflow_id=dataflow_id,
query_period=query_period,
implied_frequency=implied_freq,
query_start=q_start.isoformat(),
query_end=q_end.isoformat(),
availability="no",
available_frequencies=[],
overlap="none",
interpretation=[
"No ContentConstraint found for " + dataflow_id + ".",
"Cannot determine time availability from metadata alone.",
],
recommendation="No constraint available. Use get_data_availability() or query the data directly.",
api_calls_made=api_calls,
)
# Extract FREQ values from used_codes
available_freqs = sorted(info.used_codes.get("FREQ", set()))
# Parse time range from _ConstraintInfo
time_start: date_type | None = None
time_end: date_type | None = None
if info.time_start:
try:
time_start = date_type.fromisoformat(info.time_start[:10])
except (ValueError, TypeError):
pass
if info.time_end:
try:
time_end = date_type.fromisoformat(info.time_end[:10])
except (ValueError, TypeError):
pass
# Determine overlap
if time_start is not None and time_end is not None:
overlap = classify_time_overlap(q_start, q_end, time_start, time_end)
else:
# No time range in constraint — can't rule out on time
overlap = "full"
# Determine frequency match
freq_match = len(available_freqs) == 0 or implied_freq in available_freqs
# Build interpretation
interpretation: list[str] = [
"**Dataflow:** " + dataflow_id,
"**Query period:** " + query_period + " -> " + q_start.isoformat() + " to " + q_end.isoformat() + " (implied freq: " + implied_freq + ")",
]
if time_start and time_end:
interpretation.append(
"**Constraint time range:** " + time_start.isoformat() + " to " + time_end.isoformat()
)
else:
interpretation.append("**Constraint time range:** not specified")
if available_freqs:
interpretation.append("**Available frequencies:** " + ", ".join(available_freqs))
else:
interpretation.append("**Available frequencies:** unconstrained (FREQ not in constraint)")
interpretation.append("**Time overlap:** " + overlap)
interpretation.append("**Frequency match:** " + ("yes" if freq_match else "no"))
if info.constraint_type == "Allowed":
interpretation.append(
"**Note:** Using Allowed constraint (permitted values, not confirmed in data)."
)
# Decision logic
if overlap == "none":
availability = "no"
if time_start and time_end:
recommendation = (
"No data for " + query_period + ". "
"Available range: " + time_start.isoformat() + " to " + time_end.isoformat() + "."
)
else:
recommendation = "Period outside available range."
elif freq_match:
availability = "plausible"
freq_label = implied_freq + " data" if available_freqs else "Data"
if overlap == "partial":
recommendation = (
query_period + " partially overlaps the constraint range"
+ (" (" + time_start.isoformat() + " to " + time_end.isoformat() + ")" if time_start and time_end else "")
+ ". Data might exist for the covered portion. Query to confirm."
)
else:
recommendation = (
freq_label + " exists in range"
+ (" " + time_start.isoformat() + " to " + time_end.isoformat() if time_start and time_end else "")
+ "; " + query_period + " falls within. Query to confirm."
)
else:
availability = "plausible_different_frequency"
recommendation = (
"No " + implied_freq + " data exists. "
"Available frequencies: " + ", ".join(available_freqs) + ". "
"Data spans this time window but at different granularity. Try a different frequency."
)
return TimeAvailabilityResult(
dataflow_id=dataflow_id,
query_period=query_period,
implied_frequency=implied_freq,
query_start=q_start.isoformat(),
query_end=q_end.isoformat(),
availability=availability,
available_frequencies=available_freqs,
constraint_time_start=time_start.isoformat() if time_start else None,
constraint_time_end=time_end.isoformat() if time_end else None,
overlap=overlap,
interpretation=interpretation,
recommendation=recommendation,
api_calls_made=api_calls,
)
except Exception as e:
logger.exception("Failed to check time availability")
return TimeAvailabilityResult(
dataflow_id=dataflow_id,
query_period=query_period,
implied_frequency=implied_freq,
query_start=q_start.isoformat(),
query_end=q_end.isoformat(),
availability="no",
available_frequencies=[],
overlap="none",
interpretation=["Error: " + str(e)],
recommendation="Error checking time availability. Try get_data_availability() instead.",
api_calls_made=api_calls,
)
@mcp.tool()
async def find_code_usage_across_dataflows(
code: str,
dimension_id: str | None = None,
agency_id: str | None = None,
ctx: Context[Any, Any, Any] | None = None,
) -> CrossDataflowCodeUsageResult:
"""
Discover all dataflows that have data for a given code.
Use this as your starting point when exploring what data exists for a
country, indicator, or any other code. For example, to find everything
available for Fiji: find_code_usage_across_dataflows("FJ", dimension_id="GEO_PICT").
Searches all constraints in a single API call.
**When to use this tool:**
- "What datasets have data for Vanuatu?" -> code="VU", dimension_id="GEO_PICT"
- "Which dataflows cover GDP indicators?" -> code="GDP", dimension_id="INDICATOR"
- "What data exists for this country across all topics?" -> start here, then use
compare_dataflow_dimensions() to check how the discovered dataflows relate.
**Workflow A -- search by dimension (direct):**
find_code_usage_across_dataflows("FJ", dimension_id="GEO_PICT")
Returns only matches where "FJ" appears in the GEO_PICT dimension.
**Workflow B -- search by codelist (two steps):**
If you know a code belongs to a codelist (e.g., CL_COM_GEO_PICT) but
not which dimensions use it:
1. Call this tool WITHOUT dimension_id to get all dataflows/dimensions
where the code appears.
2. For each matched dataflow, call get_dataflow_structure() to inspect
the DSD and verify which codelist each matched dimension uses.
**Provider support:** Bulk search requires endpoint support. Currently
supported by SPC (Actual), ECB (Allowed), and UNICEF (Actual). Other
endpoints will return a message explaining the limitation.
Args:
code: The specific code to check (e.g., "FJ")
dimension_id: Optional dimension to restrict search (e.g., "GEO_PICT").
If provided, only matches in this dimension are returned.
If omitted, all dimensions are searched.
agency_id: The agency (uses session endpoint if not specified)
Returns:
CrossDataflowCodeUsageResult with:
- dataflows_with_data: Dataflows where code is actually used
- summary: Counts of usage
"""
import xml.etree.ElementTree as ET
from config import get_constraint_strategy
from utils import SDMX_NAMESPACES
client = get_session_client(ctx)
agency = agency_id or client.agency_id
ep_key = _get_session_endpoint_key(ctx)
ns = SDMX_NAMESPACES
api_calls = 0
bulk_strategy = get_constraint_strategy(ep_key, "bulk")
if bulk_strategy is None:
return CrossDataflowCodeUsageResult(
dimension_id=dimension_id,
code=code,
total_dataflows_checked=0,
dataflows_with_data=[],
summary={"dataflows_checked": 0, "with_data": 0, "without_data": 0},
interpretation=[
"**Endpoint " + ep_key + " does not support bulk cross-dataflow search.**",
"",
"Alternative: use get_code_usage(dataflow_id, codes=['"
+ code + "']) to check individual dataflows.",
],
api_calls_made=0,
)
if ctx:
await ctx.info("Searching all constraints for code '" + code + "' (" + ep_key + ")...")
headers = {"Accept": "application/vnd.sdmx.structure+xml;version=2.1"}
try:
session = await client._get_session()
# Build URL based on bulk strategy
if bulk_strategy == "contentconstraint":
constraints_url = (
client.base_url + "/contentconstraint/"
+ agency + "/all/latest?detail=full"
)
elif bulk_strategy == "availableconstraint":
constraints_url = (
client.base_url + "/availableconstraint/all/all/all/all"
)
else:
return CrossDataflowCodeUsageResult(
dimension_id=dimension_id,