44import sqlalchemy .sql .sqltypes as sqltypes
55import pandas as pd
66from dataregistry .registrar .registrar_util import _form_dataset_path
7- from dataregistry .exceptions import DataRegistryNYI , DataRegistryException
8- from functools import reduce
9- import numpy as np
7+ from dataregistry .exceptions import DataRegistryException
8+ # from dataregistry.exceptions import DataRegistryNYI, DataRegistryException
9+ # from functools import reduce
10+ # import numpy as np
1011
1112try :
1213 import sqlalchemy .dialects .postgresql as pgtypes
3839except ModuleNotFoundError :
3940 LITE_TYPES = {}
4041
41- from sqlalchemy .exc import DBAPIError , NoSuchColumnError
42+ from sqlalchemy .exc import DBAPIError
43+ # from sqlalchemy.exc import DBAPIError, NoSuchColumnError
4244
4345__all__ = ["Query" , "Filter" ]
4446
@@ -594,6 +596,7 @@ def find_datasets(
594596 filters = [],
595597 return_format = "property_dict" ,
596598 strip_table_names = False ,
599+ schema = None ,
597600 ):
598601 """
599602 Get specified properties for datasets satisfying all filters. Both
@@ -621,6 +624,8 @@ def find_datasets(
621624 strip_table_names : bool, optional
622625 True to remove the table name in the results columns
623626 This only works if a single table is needed for the query
627+ schema : optional
628+ May be "production", "working" or None. Defaults to None
624629
625630 Returns
626631 -------
@@ -648,18 +653,22 @@ def find_datasets(
648653 )
649654
650655 # Construct query
651- for schema in column_list .keys (): # Loop over each schema
652- # Do we want to search this schema given current query_mode?
653- if self .db_connection ._query_mode != "both" :
654- if self .db_connection .dialect != "sqlite" :
655- if self .db_connection ._query_mode != schema .split ("_" )[- 1 ]:
656- continue
656+ for sch in column_list .keys (): # Loop over each schema
657+ if not schema :
658+ # Do we want to search this schema given current query_mode?
659+ if self .db_connection ._query_mode != "both" :
660+ if self .db_connection .dialect != "sqlite" :
661+ if self .db_connection ._query_mode != sch .split ("_" )[- 1 ]:
662+ continue
663+ else :
664+ if not sch .endswith (schema ):
665+ continue
657666
658- schema_str = "" if self .db_connection .dialect == "sqlite" else f"{ schema } ."
659- columns = [f"{ p .table .name } .{ p .name } " for p in column_list [schema ]]
667+ schema_str = "" if self .db_connection .dialect == "sqlite" else f"{ sch } ."
668+ # columns = [f"{p.table.name}.{p.name}" for p in column_list[sch ]]
660669
661670 stmt = select (
662- * [p .label (f"{ p .table .name } .{ p .name } " ) for p in column_list [schema ]]
671+ * [p .label (f"{ p .table .name } .{ p .name } " ) for p in column_list [sch ]]
663672 )
664673
665674 # Create joins
@@ -711,7 +720,7 @@ def find_datasets(
711720 # Append filters if acceptable
712721 if len (filters ) > 0 :
713722 for f in filters :
714- stmt = self ._render_filter (f , stmt , schema )
723+ stmt = self ._render_filter (f , stmt , sch )
715724
716725 # Report the constructed SQL query
717726 self .db_connection .logger .debug (f"Executing query: { stmt } " )
@@ -729,7 +738,7 @@ def find_datasets(
729738 results .append (pd .DataFrame (result ))
730739
731740 # Combine results across schemas
732- if self .db_connection ._query_mode != "both" :
741+ if schema or self .db_connection ._query_mode != "both" :
733742 return_result = results [0 ]
734743 else :
735744 return_result = pd .concat (results , ignore_index = True )
@@ -775,18 +784,19 @@ def gen_filter(self, property_name, bin_op, value):
775784
776785 def get_dataset_absolute_path (self , dataset_id , schema = None ):
777786 """
778- Return full absolute path of specified dataset.
779-
780- If `query_mode="both"`, there may be two paths for the same `dataset_id`.
781- Either set `query_mode` to a single schema or pass "working" or "production"
782- to specify the schema.
787+ Return full absolute path of specified dataset in specified schema
788+ Note as used here `schema` is not an actual schema name, but a
789+ schema type (one of "production", "working" if specified at all)
783790
784791 Parameters
785792 ----------
786793 dataset_id : int
787794 Identifies dataset
788795 schema : str, optional
789- Which schema to search, defaults to `query_mode` if None.
796+ Which schema to search. May be "working", "production" or None.
797+ If None, it defaults to
798+ `query_mode` is not "both"
799+ "working" if `query_mode` is "both"
790800
791801 Returns
792802 -------
@@ -795,20 +805,28 @@ def get_dataset_absolute_path(self, dataset_id, schema=None):
795805 """
796806
797807 # Handle ambiguous `query_mode`
798- if self .db_connection ._query_mode == "both" and schema is None :
799- self .db_connection .logger .warning (
800- "Query mode is set to 'both', which may lead to ambiguous results. "
801- "Specify a schema by setting `query_mode` to 'working' or 'production', "
802- "or pass the schema explicitly to this function."
803- )
804- return None
808+ if not schema :
809+ if self .db_connection ._query_mode == "both" :
810+ schema = "working"
811+ else :
812+ schema = self .db_connection ._query_mode
813+ elif schema not in ("production" , "working" ):
814+ raise ValueError (
815+ f"Unknown schema value { schema } . Schema must be either 'working' or 'production'." )
816+
817+ # if self.db_connection._query_mode == "both" and schema is None:
818+ # self.db_connection.logger.warning(
819+ # "Query mode is set to 'both', which may lead to ambiguous results. "
820+ # "Specify a schema by setting `query_mode` to 'working' or 'production', "
821+ # "or pass the schema explicitly to this function."
822+ # )
823+ # return None
805824
806- # Validate schema
807- if schema is not None and schema not in {"working" , "production" }:
808- raise ValueError ("Schema must be either 'working' or 'production'." )
825+ # # Validate schema
826+ # if schema is not None and schema not in {"working", "production"}:
809827
810- # Default schema to query_mode if not provided
811- schema = schema or self .db_connection ._query_mode
828+ # # Default schema to query_mode if not provided
829+ # schema = schema or self.db_connection._query_mode
812830
813831 # Query the database
814832 results = self .find_datasets (
@@ -818,6 +836,7 @@ def get_dataset_absolute_path(self, dataset_id, schema=None):
818836 "dataset.relative_path" ,
819837 ],
820838 filters = [("dataset.dataset_id" , "==" , dataset_id )],
839+ schema = schema
821840 )
822841
823842 # Handle case where no results are found
@@ -827,31 +846,40 @@ def get_dataset_absolute_path(self, dataset_id, schema=None):
827846 )
828847 return None
829848
830- # Filter results if there are multiple entries (query_mode="both")
831- if len (results ["dataset.owner_type" ]) == 2 :
832- filtered_indices = [
833- i
834- for i , owner_type in enumerate (results ["dataset.owner_type" ])
835- if (schema == "production" and owner_type == "production" )
836- or (schema == "working" and owner_type != "production" )
837- ]
838-
839- if not filtered_indices :
840- self .db_connection .logger .warning (
841- f"No dataset found with dataset_id={ dataset_id } in schema '{ schema } '"
842- )
843- return None
844-
845- index = filtered_indices [0 ]
849+ # Don't need this because ambiguous case has been eliminated
850+ # # Filter results if there are multiple entries (query_mode="both")
851+ # if len(results["dataset.owner_type"]) == 2:
852+ # filtered_indices = [
853+ # i
854+ # for i, owner_type in enumerate(results["dataset.owner_type"])
855+ # if (schema == "production" and owner_type == "production")
856+ # or (schema == "working" and owner_type != "production")
857+ # ]
858+
859+ # if not filtered_indices:
860+ # self.db_connection.logger.warning(
861+ # f"No dataset found with dataset_id={dataset_id} in schema '{schema}'"
862+ # )
863+ # return None
864+
865+ # index = filtered_indices[0]
866+ # else:
867+ # index = 0
868+
869+ index = 0
870+
871+ # Find actual schema name to pass to _form_dataset_path
872+ if not self .db_connection ._namespace :
873+ schema_name = None
846874 else :
847- index = 0
875+ schema_name = self . db_connection . _namespace + '_' + schema
848876
849877 # Construct and return the absolute path
850878 return _form_dataset_path (
851879 results ["dataset.owner_type" ][index ],
852880 results ["dataset.owner" ][index ],
853881 results ["dataset.relative_path" ][index ],
854- schema = schema ,
882+ schema = schema_name ,
855883 root_dir = self ._root_dir ,
856884 )
857885
0 commit comments