diff --git a/CHANGELOG.md b/CHANGELOG.md index 7262a1c..bef2003 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,9 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] (post 4.2.0 release) ### Added +- Support for new OGC endpoints ### Changed ### Deprecated ### Removed +- Removed functions calling old WPST endpoints ### Fixed ### Security @@ -17,7 +19,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added ### Changed - listJobs no longer takes username as an argument, you can only list jobs for the current `MAAP_PGT` token user -- submitJob gets the username from the `MAAP_PGT` token and not username being submitted as an argument +- submit_job gets the username from the `MAAP_PGT` token and not username being submitted as an argument ### Deprecated ### Removed ### Fixed diff --git a/README.md b/README.md index 2e840ad..189036c 100644 --- a/README.md +++ b/README.md @@ -27,7 +27,7 @@ $ python >>> from maap.maap import MAAP >>> maap = MAAP() ->>> granules = maap.searchGranule(sitename='lope', instrument='uavsar') +>>> granules = maap.search_granule(sitename='lope', instrument='uavsar') >>> for res in granules: print(res.getDownloadUrl()) res.download() @@ -64,17 +64,19 @@ where: With named attribute parameters, this query: ```python -lidarGranule = maap.searchGranule(instrument='lvis', attribute='string,Site Name,lope') +lidarGranule = maap.search_granule(instrument='lvis', attribute='string,Site Name,lope') ``` Simplifies to: ```python -lidarGranule = maap.searchGranule(instrument='lvis', site_name='lope') +lidarGranule = maap.search_granule(instrument='lvis', site_name='lope') ``` ## Test ```bash -python setup.py test +poetry install +poetry run pytest --cov=maap +poetry run pytest test/specific_test.py ``` diff --git a/docs/api/dps.md b/docs/api/dps.md index 3a30b6e..33ece6c 100644 --- a/docs/api/dps.md +++ b/docs/api/dps.md @@ -32,7 +32,7 @@ from maap.maap import MAAP maap = MAAP() # Submit a job -job = maap.submitJob( +job = maap.submit_job( identifier='my_analysis', algo_id='my_algorithm', version='main', diff --git a/docs/api/maap.md b/docs/api/maap.md index 5b06c20..2f28189 100644 --- a/docs/api/maap.md +++ b/docs/api/maap.md @@ -19,13 +19,13 @@ from maap.maap import MAAP maap = MAAP() # Search granules -granules = maap.searchGranule(short_name='GEDI02_A', limit=10) +granules = maap.search_granule(short_name='GEDI02_A', limit=10) # Search collections -collections = maap.searchCollection(provider='MAAP') +collections = maap.search_collection(provider='MAAP') # Submit a job -job = maap.submitJob( +job = maap.submit_job( identifier='analysis', algo_id='my_algo', version='main', diff --git a/docs/api/result.md b/docs/api/result.md index 9cbafa3..6a677c2 100644 --- a/docs/api/result.md +++ b/docs/api/result.md @@ -49,7 +49,7 @@ from maap.maap import MAAP maap = MAAP() # Search granules -granules = maap.searchGranule(short_name='GEDI02_A', limit=5) +granules = maap.search_granule(short_name='GEDI02_A', limit=5) for granule in granules: # Get URLs diff --git a/docs/index.md b/docs/index.md index c1bc030..402a72c 100644 --- a/docs/index.md +++ b/docs/index.md @@ -30,7 +30,7 @@ from maap.maap import MAAP maap = MAAP() # Search for granules -granules = maap.searchGranule( +granules = maap.search_granule( short_name='GEDI02_A', bounding_box='-122.5,37.5,-121.5,38.5', limit=10 @@ -42,7 +42,7 @@ for granule in granules: print(f"Downloaded: {local_path}") # Submit a job -job = maap.submitJob( +job = maap.submit_job( identifier='my_analysis', algo_id='my_algorithm', version='main', diff --git a/examples/BrowseExample.ipynb b/examples/BrowseExample.ipynb index a18d022..6f0ca7e 100644 --- a/examples/BrowseExample.ipynb +++ b/examples/BrowseExample.ipynb @@ -386,7 +386,7 @@ } ], "source": [ - "granule = maap.searchGranule(granule_ur='uavsar_AfriSAR_v1_SLC-lopenp_14043_16015_001_160308_L090.vrt')[0]\n", + "granule = maap.search_granule(granule_ur='uavsar_AfriSAR_v1_SLC-lopenp_14043_16015_001_160308_L090.vrt')[0]\n", "maap.show(granule)" ] }, @@ -747,7 +747,7 @@ } ], "source": [ - "granule = maap.searchGranule(granule_ur='ILVIS2_GA2016_0220_R1611_038024')[0]\n", + "granule = maap.search_granule(granule_ur='ILVIS2_GA2016_0220_R1611_038024')[0]\n", "maap.show(granule)" ] } diff --git a/examples/Search Collection - Basics-checkpoint.ipynb b/examples/Search Collection - Basics-checkpoint.ipynb index 49f3ddf..544c247 100644 --- a/examples/Search Collection - Basics-checkpoint.ipynb +++ b/examples/Search Collection - Basics-checkpoint.ipynb @@ -53,7 +53,7 @@ "metadata": {}, "outputs": [], "source": [ - "results = maap.searchCollection(keyword='precipitation')" + "results = maap.search_collection(keyword='precipitation')" ] }, { diff --git a/examples/Search Granule-checkpoint.ipynb b/examples/Search Granule-checkpoint.ipynb index 11c7f47..d3eb1bd 100644 --- a/examples/Search Granule-checkpoint.ipynb +++ b/examples/Search Granule-checkpoint.ipynb @@ -63,7 +63,7 @@ } ], "source": [ - "results = maap.searchCollection(keyword='land',data_center='modaps')\n", + "results = maap.search_collection(keyword='land',data_center='modaps')\n", "print(len(results))\n", "\n", "for res in results:\n", @@ -159,7 +159,7 @@ } ], "source": [ - "results = maap.searchGranule(limit=10,short_name=\"MOD11A1\")\n", + "results = maap.search_granule(limit=10,short_name=\"MOD11A1\")\n", "\n", "print(len(results))\n", "for res in results:\n", diff --git a/maap/AWS.py b/maap/AWS.py index 3803c84..fe2a53f 100644 --- a/maap/AWS.py +++ b/maap/AWS.py @@ -336,7 +336,7 @@ def workspace_bucket_credentials(self): See Also -------- :meth:`requester_pays_credentials` : For accessing external data - :meth:`maap.maap.MAAP.uploadFiles` : Upload files to shared storage + :meth:`maap.maap.MAAP.upload_files` : Upload files to shared storage """ headers = self._api_header headers["Accept"] = "application/json" diff --git a/maap/Profile.py b/maap/Profile.py index 3bc2ce5..eb3a62d 100644 --- a/maap/Profile.py +++ b/maap/Profile.py @@ -100,7 +100,7 @@ def account_info(self, proxy_ticket=None): Notes ----- - This method is used internally by :meth:`~maap.maap.MAAP.submitJob` + This method is used internally by :meth:`~maap.maap.MAAP.submit_job` to automatically include the username with job submissions. See Also diff --git a/maap/Result.py b/maap/Result.py index 2f96012..39e4abb 100644 --- a/maap/Result.py +++ b/maap/Result.py @@ -21,7 +21,7 @@ from maap.maap import MAAP maap = MAAP() - granules = maap.searchGranule(short_name='GEDI02_A', limit=5) + granules = maap.search_granule(short_name='GEDI02_A', limit=5) for granule in granules: # Get download URLs @@ -439,7 +439,7 @@ class Collection(Result): -------- Search for collections:: - >>> collections = maap.searchCollection(short_name='GEDI02_A') + >>> collections = maap.search_collection(short_name='GEDI02_A') >>> for c in collections: ... print(c['Collection']['ShortName']) ... print(c['Collection']['Description']) @@ -458,7 +458,7 @@ class Collection(Result): See Also -------- :class:`Granule` : Individual data file results - :meth:`maap.maap.MAAP.searchCollection` : Search for collections + :meth:`maap.maap.MAAP.search_collection` : Search for collections """ def __init__(self, metaResult, maap_host): @@ -512,7 +512,7 @@ class Granule(Result): -------- Search and access granule metadata:: - >>> granules = maap.searchGranule(short_name='GEDI02_A', limit=5) + >>> granules = maap.search_granule(short_name='GEDI02_A', limit=5) >>> granule = granules[0] >>> print(granule['Granule']['GranuleUR']) @@ -540,7 +540,7 @@ class Granule(Result): See Also -------- :class:`Collection` : Dataset metadata results - :meth:`maap.maap.MAAP.searchGranule` : Search for granules + :meth:`maap.maap.MAAP.search_granule` : Search for granules """ def __init__( diff --git a/maap/__init__.py b/maap/__init__.py index a436ba3..adf9538 100644 --- a/maap/__init__.py +++ b/maap/__init__.py @@ -24,7 +24,7 @@ maap = MAAP() # Search for granules - granules = maap.searchGranule( + granules = maap.search_granule( short_name='GEDI02_A', limit=10 ) @@ -34,7 +34,7 @@ local_path = granule.getData(destpath='/tmp') # Submit a job - job = maap.submitJob( + job = maap.submit_job( identifier='my_job', algo_id='my_algorithm', version='main', diff --git a/maap/config_reader.py b/maap/config_reader.py index e2f82e3..5e6423e 100644 --- a/maap/config_reader.py +++ b/maap/config_reader.py @@ -261,6 +261,9 @@ def __init__(self, maap_host): self.algorithm_build = self._get_api_endpoint("algorithm_build") self.mas_algo = self._get_api_endpoint("mas_algo") self.dps_job = self._get_api_endpoint("dps_job") + self.processes_ogc = self._get_api_endpoint("processes_ogc") + self.deployment_jobs_ogc = self._get_api_endpoint("deployment_jobs_ogc") + self.jobs_ogc = self._get_api_endpoint("jobs_ogc") self.member_dps_token = self._get_api_endpoint("member_dps_token") self.requester_pays = self._get_api_endpoint("requester_pays") self.edc_credentials = self._get_api_endpoint("edc_credentials") diff --git a/maap/dps/dps_job.py b/maap/dps/dps_job.py index 6c395dd..8b1a3d0 100644 --- a/maap/dps/dps_job.py +++ b/maap/dps/dps_job.py @@ -21,7 +21,7 @@ maap = MAAP() # Submit a job - job = maap.submitJob( + job = maap.submit_job( identifier='my_analysis', algo_id='my_algorithm', version='main', @@ -38,8 +38,8 @@ See Also -------- -:meth:`maap.maap.MAAP.submitJob` : Submit a new job -:meth:`maap.maap.MAAP.getJob` : Retrieve an existing job +:meth:`maap.maap.MAAP.submit_job` : Submit a new job +:meth:`maap.maap.MAAP.get_job` : Retrieve an existing job """ import json @@ -131,12 +131,12 @@ class DPSJob: -------- Get job status:: - >>> job = maap.getJob('f3780917-92c0-4440-8a84-9b28c2e64fa8') + >>> job = maap.get_job('f3780917-92c0-4440-8a84-9b28c2e64fa8') >>> print(f"Status: {job.status}") Wait for completion:: - >>> job = maap.submitJob(...) + >>> job = maap.submit_job(...) >>> job.wait_for_completion() >>> print(f"Final status: {job.status}") @@ -159,7 +159,7 @@ class DPSJob: See Also -------- - :meth:`maap.maap.MAAP.submitJob` : Submit new jobs + :meth:`maap.maap.MAAP.submit_job` : Submit new jobs :meth:`maap.maap.MAAP.listJobs` : List all jobs """ @@ -243,7 +243,7 @@ def wait_for_completion(self): -------- :: - >>> job = maap.submitJob(...) + >>> job = maap.submit_job(...) >>> job.wait_for_completion() >>> if job.status == 'Succeeded': ... print("Job completed successfully!") diff --git a/maap/maap.py b/maap/maap.py index 22a805b..7e19240 100644 --- a/maap/maap.py +++ b/maap/maap.py @@ -23,7 +23,7 @@ maap = MAAP() # Search for granules - granules = maap.searchGranule( + granules = maap.search_granule( short_name='GEDI02_A', limit=10 ) @@ -111,7 +111,7 @@ class MAAP(object): Search for granules:: - >>> granules = maap.searchGranule( + >>> granules = maap.search_granule( ... short_name='GEDI02_A', ... bounding_box='-122.5,37.5,-121.5,38.5', ... limit=5 @@ -121,7 +121,7 @@ class MAAP(object): Submit a job:: - >>> job = maap.submitJob( + >>> job = maap.submit_job( ... identifier='my_analysis', ... algo_id='my_algorithm', ... version='main', @@ -226,12 +226,12 @@ def _upload_s3(self, filename, bucket, objectKey): Notes ----- - This is an internal method primarily used by :meth:`uploadFiles`. + This is an internal method primarily used by :meth:`upload_files`. It uses the boto3 S3 client configured at module level. """ return s3_client.upload_file(filename, bucket, objectKey) - def searchGranule(self, limit=20, **kwargs): + def search_granule(self, limit=20, **kwargs): """ Search for granules in the CMR (Common Metadata Repository). @@ -275,14 +275,14 @@ def searchGranule(self, limit=20, **kwargs): -------- Search by collection name:: - >>> granules = maap.searchGranule( + >>> granules = maap.search_granule( ... short_name='GEDI02_A', ... limit=10 ... ) Search with spatial bounds:: - >>> granules = maap.searchGranule( + >>> granules = maap.search_granule( ... collection_concept_id='C1234567890-MAAP', ... bounding_box='-122.5,37.5,-121.5,38.5', ... limit=5 @@ -290,7 +290,7 @@ def searchGranule(self, limit=20, **kwargs): Search with temporal filter:: - >>> granules = maap.searchGranule( + >>> granules = maap.search_granule( ... short_name='AFLVIS2', ... temporal='2019-01-01T00:00:00Z,2019-12-31T23:59:59Z', ... limit=100 @@ -298,7 +298,7 @@ def searchGranule(self, limit=20, **kwargs): Search with pattern matching:: - >>> granules = maap.searchGranule( + >>> granules = maap.search_granule( ... readable_granule_name='*2019*', ... short_name='GEDI02_A' ... ) @@ -317,7 +317,7 @@ def searchGranule(self, limit=20, **kwargs): See Also -------- - :meth:`searchCollection` : Search for collections + :meth:`search_collection` : Search for collections :class:`~maap.Result.Granule` : Granule result class """ results = self._CMR.get_search_results(url=self.config.search_granule_url, limit=limit, **kwargs) @@ -328,7 +328,7 @@ def searchGranule(self, limit=20, **kwargs): self._get_api_header(), self._DPS) for result in results][:limit] - def downloadGranule(self, online_access_url, destination_path=".", overwrite=False): + def download_granule(self, online_access_url, destination_path=".", overwrite=False): """ Download a granule directly from an HTTP URL. @@ -356,7 +356,7 @@ def downloadGranule(self, online_access_url, destination_path=".", overwrite=Fal -------- Download a granule by URL:: - >>> local_file = maap.downloadGranule( + >>> local_file = maap.download_granule( ... 'https://data.maap-project.org/file/data.h5', ... destination_path='/tmp/downloads' ... ) @@ -364,7 +364,7 @@ def downloadGranule(self, online_access_url, destination_path=".", overwrite=Fal Force overwrite of existing files:: - >>> local_file = maap.downloadGranule( + >>> local_file = maap.download_granule( ... url, ... destination_path='/tmp', ... overwrite=True @@ -383,7 +383,7 @@ def downloadGranule(self, online_access_url, destination_path=".", overwrite=Fal See Also -------- - :meth:`searchGranule` : Search for granules + :meth:`search_granule` : Search for granules :meth:`~maap.Result.Granule.getData` : Download granule data """ filename = os.path.basename(urllib.parse.urlparse(online_access_url).path) @@ -397,7 +397,7 @@ def downloadGranule(self, online_access_url, destination_path=".", overwrite=Fal # noinspection PyProtectedMember return proxy._getHttpData(online_access_url, overwrite, final_destination) - def getCallFromEarthdataQuery(self, query, variable_name='maap', limit=1000): + def get_call_from_earthdata_query(self, query, variable_name='maap', limit=1000): """ Generate a MAAP API call string from an Earthdata search query. @@ -426,9 +426,9 @@ def getCallFromEarthdataQuery(self, query, variable_name='maap', limit=1000): Convert an Earthdata query:: >>> query = '{"instrument_h": ["GEDI"], "bounding_box": "-180,-90,180,90"}' - >>> code = maap.getCallFromEarthdataQuery(query) + >>> code = maap.get_call_from_earthdata_query(query) >>> print(code) - maap.searchGranule(instrument="GEDI", bounding_box="-180,-90,180,90", limit=1000) + maap.search_granule(instrument="GEDI", bounding_box="-180,-90,180,90", limit=1000) Notes ----- @@ -438,12 +438,12 @@ def getCallFromEarthdataQuery(self, query, variable_name='maap', limit=1000): See Also -------- - :meth:`getCallFromCmrUri` : Generate call from CMR URI - :meth:`searchGranule` : Execute a granule search + :meth:`get_call_from_cmr_uri` : Generate call from CMR URI + :meth:`search_granule` : Execute a granule search """ return self._CMR.generateGranuleCallFromEarthDataRequest(query, variable_name, limit) - def getCallFromCmrUri(self, search_url, variable_name='maap', limit=1000, search='granule'): + def get_call_from_cmr_uri(self, search_url, variable_name='maap', limit=1000, search='granule'): """ Generate a MAAP API call string from a CMR REST API URL. @@ -475,16 +475,16 @@ def getCallFromCmrUri(self, search_url, variable_name='maap', limit=1000, search Convert a CMR granule search URL:: >>> url = 'https://cmr.earthdata.nasa.gov/search/granules?short_name=GEDI02_A' - >>> code = maap.getCallFromCmrUri(url) + >>> code = maap.get_call_from_cmr_uri(url) >>> print(code) - maap.searchGranule(short_name="GEDI02_A", limit=1000) + maap.search_granule(short_name="GEDI02_A", limit=1000) Convert a collection search:: >>> url = 'https://cmr.earthdata.nasa.gov/search/collections?provider=MAAP' - >>> code = maap.getCallFromCmrUri(url, search='collection') + >>> code = maap.get_call_from_cmr_uri(url, search='collection') >>> print(code) - maap.searchCollection(provider="MAAP", limit=1000) + maap.search_collection(provider="MAAP", limit=1000) Notes ----- @@ -494,13 +494,13 @@ def getCallFromCmrUri(self, search_url, variable_name='maap', limit=1000, search See Also -------- - :meth:`getCallFromEarthdataQuery` : Generate call from Earthdata query - :meth:`searchGranule` : Execute a granule search - :meth:`searchCollection` : Execute a collection search + :meth:`get_call_from_earthdata_query` : Generate call from Earthdata query + :meth:`search_granule` : Execute a granule search + :meth:`search_collection` : Execute a collection search """ return self._CMR.generateCallFromEarthDataQueryString(search_url, variable_name, limit, search) - def searchCollection(self, limit=100, **kwargs): + def search_collection(self, limit=100, **kwargs): """ Search for collections in the CMR (Common Metadata Repository). @@ -541,20 +541,20 @@ def searchCollection(self, limit=100, **kwargs): -------- Search by short name:: - >>> collections = maap.searchCollection(short_name='GEDI02_A') + >>> collections = maap.search_collection(short_name='GEDI02_A') >>> for c in collections: ... print(c['Collection']['ShortName']) Search by provider:: - >>> collections = maap.searchCollection( + >>> collections = maap.search_collection( ... provider='MAAP', ... limit=50 ... ) Search by keyword:: - >>> collections = maap.searchCollection( + >>> collections = maap.search_collection( ... keyword='biomass forest', ... limit=20 ... ) @@ -562,18 +562,18 @@ def searchCollection(self, limit=100, **kwargs): Notes ----- Collections contain metadata about datasets but not the actual data - files. Use :meth:`searchGranule` to find individual data files within + files. Use :meth:`search_granule` to find individual data files within a collection. See Also -------- - :meth:`searchGranule` : Search for granules within collections + :meth:`search_granule` : Search for granules within collections :class:`~maap.Result.Collection` : Collection result class """ results = self._CMR.get_search_results(url=self.config.search_collection_url, limit=limit, **kwargs) return [Collection(result, self.config.maap_host) for result in results][:limit] - def getQueues(self): + def get_queues(self): """ Get available DPS processing queues (resources). @@ -591,7 +591,7 @@ def getQueues(self): -------- List available queues:: - >>> response = maap.getQueues() + >>> response = maap.get_queues() >>> queues = response.json() >>> for queue in queues: ... print(f"{queue['name']}: {queue['memory']} RAM") @@ -603,8 +603,8 @@ def getQueues(self): See Also -------- - :meth:`submitJob` : Submit a job to a queue - :meth:`registerAlgorithm` : Register an algorithm to run on queues + :meth:`submit_job` : Submit a job to a queue + :meth:`register_algorithm` : Register an algorithm to run on queues """ url = os.path.join(self.config.algorithm_register, 'resource') headers = self._get_api_header() @@ -617,813 +617,22 @@ def getQueues(self): ) return response - def registerAlgorithm(self, arg): - """ - Register an algorithm with the MAAP DPS. - - Registers a new algorithm configuration that can be executed on the - MAAP Data Processing System (DPS). - - Parameters - ---------- - arg : dict or str - Algorithm configuration as a dictionary or JSON string. Required - fields include: - - algorithm_name : str - Unique name for the algorithm. - code_version : str - Version identifier (e.g., Git branch or tag). - algorithm_description : str - Human-readable description. - docker_container_url : str - URL of the Docker container image. - script_command : str - Command to execute inside the container. - inputs : list of dict - Input parameter definitions with ``field`` and ``download`` keys. Format should be like - {'file': [{'name': 'input_file'}],'config': [{'name': 'config_param'}],'positional': [{'name': 'pos_arg'}]} - repo_url : str - Git repository URL for the algorithm source code. - - Returns - ------- - requests.Response - HTTP response indicating success or failure of registration. - - Examples - -------- - Register using a dictionary:: - - >>> config = { - ... 'algorithm_name': 'my_algorithm', - ... 'code_version': 'main', - ... 'algorithm_description': 'Processes satellite data', - ... 'docker_container_url': 'registry/image:tag', - ... 'script_command': 'python run.py', - ... 'inputs': { - ... 'file': [{'name': 'input_file'}], - ... 'config': [{'name': 'config_param'}], - ... 'positional': [{'name': 'pos_arg'}] - ... }, - ... 'repo_url': 'https://github.com/org/repo' - ... } - >>> response = maap.registerAlgorithm(config) - - Register using a JSON string:: - - >>> import json - >>> response = maap.registerAlgorithm(json.dumps(config)) - - Notes - ----- - After registration, algorithms need to be built before they can be - executed. The build process creates the Docker image on the DPS - infrastructure. - - See Also - -------- - :meth:`register_algorithm_from_yaml_file` : Register from YAML file - :meth:`listAlgorithms` : List registered algorithms - :meth:`deleteAlgorithm` : Delete an algorithm - """ - logger.debug('Registering algorithm with args ') - if type(arg) is dict: - arg = json.dumps(arg) - logger.debug(arg) - response = requests_utils.make_request(url=self.config.algorithm_register, config=self.config, - content_type='application/json', request_type=requests_utils.POST, - data=arg) - logger.debug('POST request sent to {}'.format(self.config.algorithm_register)) - return response - - def register_algorithm_from_yaml_file(self, file_path): - """ - Register an algorithm from a YAML configuration file. - - Reads algorithm configuration from a YAML file and registers it with - the MAAP DPS. - - Parameters - ---------- - file_path : str - Path to the YAML configuration file. - - Returns - ------- - requests.Response - HTTP response indicating success or failure of registration. - - Examples - -------- - Register from a YAML file:: - - >>> response = maap.register_algorithm_from_yaml_file('algorithm.yaml') - - Example YAML file structure:: - - algorithm_name: my_algorithm - code_version: main - algorithm_description: Process satellite data - docker_container_url: registry/image:tag - script_command: python run.py - inputs: - file: - - name: input_file - config: - - name: config_param - positional: - - name: pos_arg - repo_url: https://github.com/org/repo - - See Also - -------- - :meth:`registerAlgorithm` : Register from dict or JSON - :meth:`register_algorithm_from_yaml_file_backwards_compatible` : Legacy format - """ - algo_config = algorithm_utils.read_yaml_file(file_path) - return self.registerAlgorithm(algo_config) - - def register_algorithm_from_yaml_file_backwards_compatible(self, file_path): - """ - Register an algorithm from a legacy YAML configuration file. - - Reads algorithm configuration from an older YAML format and converts - it to the current format before registration. - - Parameters - ---------- - file_path : str - Path to the legacy YAML configuration file. - - Returns - ------- - requests.Response - HTTP response indicating success or failure of registration. - - Notes - ----- - This method supports the legacy YAML format with different field names: - - - ``algo_name`` -> ``algorithm_name`` - - ``version`` -> ``code_version`` - - ``environment`` -> ``environment_name`` - - ``description`` -> ``algorithm_description`` - - ``docker_url`` -> ``docker_container_url`` - - ``inputs`` -> ``algorithm_params`` - - ``run_command`` -> ``script_command`` - - ``repository_url`` -> ``repo_url`` - - See Also - -------- - :meth:`register_algorithm_from_yaml_file` : Current format - :meth:`registerAlgorithm` : Register from dict - """ - algo_yaml = algorithm_utils.read_yaml_file(file_path) - key_map = {"algo_name": "algorithm_name", "version": "code_version", "environment": "environment_name", - "description": "algorithm_description", "docker_url": "docker_container_url", - "inputs": "algorithm_params", "run_command": "script_command", "repository_url": "repo_url"} - output_config = {} - for key, value in algo_yaml.items(): - if key in key_map: - if key == "inputs": - inputs = [] - for argument in value: - inputs.append({"field": argument.get("name"), "download": argument.get("download")}) - output_config.update({"algorithm_params": inputs}) - else: - output_config.update({key_map.get(key): value}) - else: - output_config.update({key: value}) - logger.debug("Registering with config %s " % json.dumps(output_config)) - return self.registerAlgorithm(json.dumps(output_config)) - - def listAlgorithms(self): - """ - List all registered algorithms. - - Retrieves a list of all algorithms registered by the current user - on the MAAP DPS. - - Returns - ------- - requests.Response - HTTP response containing JSON list of algorithms. Each algorithm - entry includes name, version, description, and status information. - - Examples - -------- - List all algorithms:: - - >>> response = maap.listAlgorithms() - >>> algorithms = response.json() - >>> for algo in algorithms: - ... print(f"{algo['algorithm_name']}:{algo['code_version']}") - - See Also - -------- - :meth:`describeAlgorithm` : Get details for specific algorithm - :meth:`registerAlgorithm` : Register a new algorithm - :meth:`deleteAlgorithm` : Delete an algorithm - """ - url = self.config.mas_algo - headers = self._get_api_header() - logger.debug('GET request sent to {}'.format(url)) - logger.debug('headers:') - logger.debug(headers) - response = requests.get( - url=url, - headers=headers - ) - return response - - def describeAlgorithm(self, algoid): - """ - Get detailed information about a registered algorithm. - - Retrieves the full configuration and status of a specific algorithm. - - Parameters - ---------- - algoid : str - The algorithm identifier, typically in the format - ``algorithm_name:code_version``. - - Returns - ------- - requests.Response - HTTP response containing JSON with algorithm details including - configuration, build status, and parameter definitions. - - Examples - -------- - Get algorithm details:: - - >>> response = maap.describeAlgorithm('my_algorithm:main') - >>> details = response.json() - >>> print(f"Description: {details['algorithm_description']}") - >>> print(f"Docker: {details['docker_container_url']}") - - See Also - -------- - :meth:`listAlgorithms` : List all algorithms - :meth:`publishAlgorithm` : Publish an algorithm - """ - url = os.path.join(self.config.mas_algo, algoid) - headers = self._get_api_header() - logger.debug('GET request sent to {}'.format(url)) - logger.debug('headers:') - logger.debug(headers) - response = requests.get( - url=url, - headers=headers - ) - return response - - def publishAlgorithm(self, algoid): - """ - Publish an algorithm for public use. - - Makes a registered algorithm available for other MAAP users to - discover and execute. - - Parameters - ---------- - algoid : str - The algorithm identifier to publish, typically in the format - ``algorithm_name:code_version``. - - Returns - ------- - requests.Response - HTTP response indicating success or failure of publication. - - Examples - -------- - Publish an algorithm:: - - >>> response = maap.publishAlgorithm('my_algorithm:v1.0') - >>> if response.ok: - ... print("Algorithm published successfully") - - Notes - ----- - Published algorithms are visible to all MAAP users and can be - executed by anyone with DPS access. - - See Also - -------- - :meth:`registerAlgorithm` : Register an algorithm - :meth:`deleteAlgorithm` : Delete an algorithm - """ - url = self.config.mas_algo.replace('algorithm', 'publish') - headers = self._get_api_header() - body = { "algo_id": algoid} - logger.debug('POST request sent to {}'.format(url)) - logger.debug('headers:') - logger.debug(headers) - logger.debug('body:') - logger.debug(body) - response = requests.post( - url=url, - headers=headers, - data=body - ) - return response - - def deleteAlgorithm(self, algoid): - """ - Delete a registered algorithm. - - Removes an algorithm registration from the MAAP DPS. This does not - affect any completed jobs that used the algorithm. - - Parameters - ---------- - algoid : str - The algorithm identifier to delete, typically in the format - ``algorithm_name:code_version``. - - Returns - ------- - requests.Response - HTTP response indicating success or failure of deletion. - - Examples - -------- - Delete an algorithm:: - - >>> response = maap.deleteAlgorithm('my_algorithm:main') - >>> if response.ok: - ... print("Algorithm deleted") - - Warnings - -------- - This action cannot be undone. The algorithm configuration will be - permanently removed. - - See Also - -------- - :meth:`registerAlgorithm` : Register an algorithm - :meth:`listAlgorithms` : List algorithms - """ - url = os.path.join(self.config.mas_algo, algoid) - headers = self._get_api_header() - logger.debug('DELETE request sent to {}'.format(url)) - logger.debug('headers:') - logger.debug(headers) - response = requests.delete( - url=url, - headers=headers - ) - return response - - - def getJob(self, jobid): - """ - Get a DPS job with all available attributes. - - Retrieves a job object with its current status, results (if available), - and metrics (if available). - - Parameters - ---------- - jobid : str - The unique job identifier (UUID). - - Returns - ------- - DPSJob - A :class:`~maap.dps.dps_job.DPSJob` object with populated attributes - including status, outputs, and metrics. - - Examples - -------- - Get a job and inspect its status:: - - >>> job = maap.getJob('f3780917-92c0-4440-8a84-9b28c2e64fa8') - >>> print(f"Status: {job.status}") - >>> print(f"Outputs: {job.outputs}") - >>> print(f"Duration: {job.job_duration_seconds} seconds") - - See Also - -------- - :meth:`getJobStatus` : Get status only - :meth:`getJobResult` : Get results only - :meth:`getJobMetrics` : Get metrics only - :meth:`submitJob` : Submit a new job - """ - job = DPSJob(self.config) - job.id = jobid - job.retrieve_attributes() - return job - - def getJobStatus(self, jobid): - """ - Get the current status of a DPS job. - - Parameters - ---------- - jobid : str - The unique job identifier (UUID). - - Returns - ------- - str - The job status. Possible values are: - - - ``'Accepted'``: Job is queued - - ``'Running'``: Job is executing - - ``'Succeeded'``: Job completed successfully - - ``'Failed'``: Job failed - - ``'Dismissed'``: Job was cancelled - - Examples - -------- - Check job status:: - - >>> status = maap.getJobStatus('f3780917-92c0-4440-8a84-9b28c2e64fa8') - >>> print(f"Job status: {status}") - - See Also - -------- - :meth:`getJob` : Get full job object - :meth:`cancelJob` : Cancel a running job - """ - job = DPSJob(self.config) - job.id = jobid - return job.retrieve_status() - - def getJobResult(self, jobid): - """ - Get the output URLs from a completed DPS job. - - Parameters - ---------- - jobid : str - The unique job identifier (UUID). - - Returns - ------- - list of str - List of URLs pointing to job output files. Typically includes - HTTP, S3, and console URLs for the output directory. - - Examples - -------- - Get job outputs:: - - >>> outputs = maap.getJobResult('f3780917-92c0-4440-8a84-9b28c2e64fa8') - >>> for url in outputs: - ... print(url) - - Notes - ----- - This method only returns results for jobs that have completed - (succeeded or failed). For running jobs, the output list will be empty. - - See Also - -------- - :meth:`getJob` : Get full job object - :meth:`getJobMetrics` : Get job performance metrics - """ - job = DPSJob(self.config) - job.id = jobid - return job.retrieve_result() - - def getJobMetrics(self, jobid): - """ - Get performance metrics from a completed DPS job. - - Retrieves resource usage and timing information for a job. - - Parameters - ---------- - jobid : str - The unique job identifier (UUID). - - Returns - ------- - dict - Dictionary containing job metrics including: - - - ``machine_type``: EC2 instance type used - - ``job_start_time``: ISO timestamp of job start - - ``job_end_time``: ISO timestamp of job end - - ``job_duration_seconds``: Total execution time - - ``cpu_usage``: CPU time in nanoseconds - - ``mem_usage``: Memory usage in bytes - - ``max_mem_usage``: Peak memory usage in bytes - - ``directory_size``: Output directory size in bytes - - Examples - -------- - Get job metrics:: - - >>> metrics = maap.getJobMetrics('f3780917-92c0-4440-8a84-9b28c2e64fa8') - >>> print(f"Duration: {metrics['job_duration_seconds']} seconds") - >>> print(f"Max memory: {metrics['max_mem_usage']} bytes") - - See Also - -------- - :meth:`getJob` : Get full job object - :meth:`getJobResult` : Get job outputs - """ - job = DPSJob(self.config) - job.id = jobid - return job.retrieve_metrics() - - def cancelJob(self, jobid): - """ - Cancel a running or queued DPS job. - - Attempts to stop execution of a job that is currently running or - waiting in the queue. - - Parameters - ---------- - jobid : str - The unique job identifier (UUID) to cancel. - - Returns - ------- - str - Response from the DPS indicating the cancellation result. - - Examples - -------- - Cancel a job:: - - >>> result = maap.cancelJob('f3780917-92c0-4440-8a84-9b28c2e64fa8') - >>> print(result) - - Notes - ----- - Jobs that are already completed (Succeeded or Failed) cannot be - cancelled. The job status will be set to ``'Dismissed'`` upon - successful cancellation. - - See Also - -------- - :meth:`submitJob` : Submit a job - :meth:`getJobStatus` : Check job status - """ - job = DPSJob(self.config) - job.id = jobid - return job.cancel_job() - - def listJobs(self, *, - algo_id=None, - end_time=None, - get_job_details=True, - offset=0, - page_size=10, - queue=None, - start_time=None, - status=None, - tag=None, - version=None): - """ - List jobs submitted by the current user. - - Retrieves a paginated list of DPS jobs matching the specified filter - criteria. - - Parameters - ---------- - algo_id : str, optional - Filter by algorithm name. Must be provided together with ``version``. - end_time : str, optional - Filter for jobs completed before this time. Format: ISO 8601 - (e.g., ``'2024-01-01'`` or ``'2024-01-01T00:00:00.000000Z'``). - get_job_details : bool, optional - If ``True`` (default), return detailed job information. If ``False``, - return only job IDs and tags for faster response. - offset : int, optional - Number of jobs to skip for pagination. Default is 0. - page_size : int, optional - Number of jobs to return per page. Default is 10. - queue : str, optional - Filter by processing queue name. - start_time : str, optional - Filter for jobs started after this time. Format: ISO 8601. - status : str, optional - Filter by job status. Valid values: - - - ``'Accepted'``: Queued jobs - - ``'Running'``: Currently executing - - ``'Succeeded'``: Completed successfully - - ``'Failed'``: Completed with errors - - ``'Dismissed'``: Cancelled jobs - - tag : str, optional - Filter by user-defined job tag/identifier. - version : str, optional - Filter by algorithm version. Must be provided together with ``algo_id``. - - Returns - ------- - requests.Response - HTTP response containing JSON list of jobs matching the criteria. - - Raises - ------ - ValueError - If only one of ``algo_id`` or ``version`` is provided. Both must - be provided together or neither should be provided. - - Examples - -------- - List recent jobs:: - - >>> response = maap.listJobs(page_size=20) - >>> jobs = response.json() - >>> for job in jobs: - ... print(f"{job['job_id']}: {job['status']}") - - Filter by algorithm and version:: - - >>> response = maap.listJobs( - ... algo_id='my_algorithm', - ... version='main', - ... status='Succeeded' - ... ) - - Paginate through results:: - - >>> response = maap.listJobs(offset=0, page_size=10) - >>> # Get next page - >>> response = maap.listJobs(offset=10, page_size=10) - - Filter by time range:: - - >>> response = maap.listJobs( - ... start_time='2024-01-01', - ... end_time='2024-01-31' - ... ) - - See Also - -------- - :meth:`getJob` : Get details of a specific job - :meth:`submitJob` : Submit a new job - """ - url = "/".join( - segment.strip("/") - for segment in (self.config.dps_job, endpoints.DPS_JOB_LIST) - ) - - params = { - k: v - for k, v in ( - ("algo_id", algo_id), - ("end_time", end_time), - ("get_job_details", get_job_details), - ("offset", offset), - ("page_size", page_size), - ("queue", queue), - ("start_time", start_time), - ("status", status), - ("tag", tag), - ("version", version), - ) - if v is not None - } - - if (not algo_id) != (not version): - # Either algo_id or version was supplied as a non-empty string, but not both. - # Either both must be non-empty strings or both must be None. - raise ValueError("Either supply non-empty strings for both algo_id and version, or supply neither.") - - # DPS requests use 'job_type', which is a concatenation of 'algo_id' and 'version' - if algo_id and version: - params['job_type'] = f"{algo_id}:{version}" - - algo_id = params.pop('algo_id', None) - version = params.pop('version', None) - - if status is not None: - params['status'] = job.validate_job_status(status) - - headers = self._get_api_header() - logger.debug('GET request sent to {}'.format(url)) - logger.debug('headers:') - logger.debug(headers) - response = requests.get( - url=url, - headers=headers, - params=params, - ) - return response - - def submitJob(self, identifier, algo_id, version, queue, retrieve_attributes=False, **kwargs): - """ - Submit a job to the MAAP Data Processing System (DPS). - - Submits an algorithm for execution on the DPS infrastructure with the - specified parameters and compute resources. - - Parameters - ---------- - identifier : str - A user-defined tag or identifier for the job. Used for tracking - and organizing jobs. - algo_id : str - The algorithm name to execute. - version : str - The algorithm version (e.g., Git branch or tag). - queue : str - The compute queue/resource to use (e.g., ``'maap-dps-worker-8gb'``). - Use :meth:`getQueues` to list available queues. - retrieve_attributes : bool, optional - If ``True``, immediately retrieve job status after submission. - Default is ``False``. - **kwargs : dict - Algorithm input parameters. Parameter names must match those - defined in the algorithm registration. - - Returns - ------- - DPSJob - A :class:`~maap.dps.dps_job.DPSJob` object representing the - submitted job. Use the job's methods to monitor status and - retrieve results. - - Examples - -------- - Submit a basic job:: - - >>> job = maap.submitJob( - ... identifier='my_analysis_run', - ... algo_id='my_algorithm', - ... version='main', - ... queue='maap-dps-worker-8gb', - ... input_file='s3://bucket/input.tif' - ... ) - >>> print(f"Job ID: {job.id}") - - Submit with multiple parameters:: - - >>> job = maap.submitJob( - ... identifier='batch_processing', - ... algo_id='processor', - ... version='v2.0', - ... queue='maap-dps-worker-32gb', - ... input_granule='s3://bucket/data.h5', - ... output_format='geotiff', - ... resolution=30 - ... ) - - Submit and immediately get status:: - - >>> job = maap.submitJob( - ... identifier='urgent_job', - ... algo_id='my_algorithm', - ... version='main', - ... queue='maap-dps-worker-8gb', - ... retrieve_attributes=True - ... ) - >>> print(f"Status: {job.status}") - - Monitor job completion:: - - >>> job = maap.submitJob(...) - >>> job.wait_for_completion() - >>> print(f"Final status: {job.status}") - >>> print(f"Outputs: {job.outputs}") - - Notes - ----- - - The job executes asynchronously; this method returns immediately - after submission. - - Use :meth:`~maap.dps.dps_job.DPSJob.wait_for_completion` to block - until the job finishes. - - Input parameters with ``download=True`` in the algorithm config - will be downloaded to the job's working directory. - - See Also - -------- - :meth:`getJob` : Retrieve job information - :meth:`listJobs` : List submitted jobs - :meth:`cancelJob` : Cancel a running job - :meth:`getQueues` : List available queues - :class:`~maap.dps.dps_job.DPSJob` : Job management class - """ - # Note that this is temporary and will be removed when we remove the API not requiring username to submit a job - # Also this now overrides passing someone else's username into submitJob since we don't want to allow that - if self.profile is not None and self.profile.account_info() is not None and 'username' in self.profile.account_info().keys(): - kwargs['username'] = self.profile.account_info()['username'] - response = self._DPS.submit_job(request_url=self.config.dps_job, - identifier=identifier, algo_id=algo_id, version=version, queue=queue, **kwargs) - job = DPSJob(self.config) - job.set_submitted_job_result(response) - try: - if retrieve_attributes: - job.retrieve_attributes() - except: - logger.debug(f"Unable to retrieve attributes for job: {job}") - return job - - def uploadFiles(self, filenames): + # def register_algorithm_from_cwl_file(self, file_path): + # """ + # Registers an algorithm from a CWL file + # """ + # # Read cwl file returns a dict in the format to register an algorithm without a CWL + # algo_config = algorithm_utils.read_cwl_file(file_path) + # headers = self._get_api_header(content_type='application/json') + # logger.debug('POST request sent to {}'.format(self.config.processes_ogc)) + # response = requests.post( + # url=self.config.processes_ogc, + # headers=headers, + # json=algo_config + # ) + # return response + + def upload_files(self, filenames): """ Upload files to MAAP shared storage. @@ -1445,13 +654,13 @@ def uploadFiles(self, filenames): -------- Upload files to share:: - >>> result = maap.uploadFiles(['data.csv', 'config.json']) + >>> result = maap.upload_files(['data.csv', 'config.json']) >>> print(result) Upload file subdirectory: a1b2c3d4-e5f6-... (keep a record of...) Upload a single file:: - >>> result = maap.uploadFiles(['output.tif']) + >>> result = maap.upload_files(['output.tif']) Notes ----- @@ -1461,7 +670,7 @@ def uploadFiles(self, filenames): See Also -------- - :meth:`submitJob` : Use uploaded files as job inputs + :meth:`submit_job` : Use uploaded files as job inputs """ bucket = self.config.s3_user_upload_bucket prefix = self.config.s3_user_upload_dir @@ -1529,7 +738,7 @@ def show(self, granule, display_config={}): ---------- granule : dict A granule result dictionary, typically obtained from - :meth:`searchGranule`. Must contain ``Granule.GranuleUR``. + :meth:`search_granule`. Must contain ``Granule.GranuleUR``. display_config : dict, optional Configuration options for rendering. Common options include: @@ -1542,7 +751,7 @@ def show(self, granule, display_config={}): -------- Display a granule on a map:: - >>> granules = maap.searchGranule(short_name='AFLVIS2', limit=1) + >>> granules = maap.search_granule(short_name='AFLVIS2', limit=1) >>> maap.show(granules[0]) Display with custom rendering:: @@ -1560,7 +769,7 @@ def show(self, granule, display_config={}): See Also -------- - :meth:`searchGranule` : Search for granules to visualize + :meth:`search_granule` : Search for granules to visualize """ from mapboxgl.viz import RasterTilesViz @@ -1584,6 +793,289 @@ def show(self, granule, display_config={}): ) viz.show() + # OGC-compliant endpoint functions + def list_algorithms(self): + """ + Search all OGC processes + :return: Response object with all deployed processes + """ + headers = self._get_api_header() + logger.debug('GET request sent to {}'.format(self.config.processes_ogc)) + + response = requests.get( + url=self.config.processes_ogc, + headers=headers + ) + return response + + def register_algorithm(self, execution_unit_href): + """ + Deploy a new OGC process + :param execution_unit_href: URL to the CWL file + :return: Response object with deployment information + """ + headers = self._get_api_header(content_type='application/json') + data = { + "executionUnit": { + "href": execution_unit_href + } + } + logger.debug('POST request sent to {}'.format(self.config.processes_ogc)) + response = requests.post( + url=self.config.processes_ogc, + headers=headers, + json=data + ) + return response + + def get_deployment_status(self, deployment_id): + """ + Query the current status of an algorithm being deployed + :param deployment_id: The deployment job ID + :return: Response object with deployment status + """ + url = os.path.join(self.config.deployment_jobs_ogc, str(deployment_id)) + headers = self._get_api_header() + logger.debug('GET request sent to {}'.format(url)) + response = requests.get( + url=url, + headers=headers + ) + return response + + def describe_algorithm(self, process_id): + """ + Get detailed information about a specific OGC process + :param process_id: The process ID to describe + :return: Response object with process details + """ + url = os.path.join(self.config.processes_ogc, str(process_id)) + headers = self._get_api_header() + response = requests.get( + url=url, + headers=headers + ) + return response + + def update_algorithm(self, process_id, execution_unit_href): + """ + Replace an existing OGC process (must be the original deployer) + :param process_id: The process ID to update + :param execution_unit_href: URL to the new CWL file + :return: Response object with update information + """ + url = os.path.join(self.config.processes_ogc, str(process_id)) + headers = self._get_api_header(content_type='application/json') + data = { + "executionUnit": { + "href": execution_unit_href + } + } + logger.debug('PUT request sent to {}'.format(url)) + response = requests.put( + url=url, + headers=headers, + json=data + ) + return response + + def delete_algorithm(self, process_id): + """ + Delete an existing OGC process (must be the original deployer) + :param process_id: The process ID to delete + :return: Response object with deletion confirmation + """ + url = os.path.join(self.config.processes_ogc, str(process_id)) + headers = self._get_api_header() + logger.debug('DELETE request sent to {}'.format(url)) + response = requests.delete( + url=url, + headers=headers + ) + return response + + def get_algorithm_package(self, process_id): + """ + Access the formal description that can be used to deploy an OGC process + :param process_id: The process ID + :return: Response object with process package description + """ + url = os.path.join(self.config.processes_ogc, str(process_id), 'package') + headers = self._get_api_header() + logger.debug('GET request sent to {}'.format(url)) + response = requests.get( + url=url, + headers=headers + ) + return response + + def submit_job(self, process_id, inputs, queue, dedup=None, tag=None): + """ + Execute an OGC process job + :param process_id: The process ID to execute + :param inputs: Dictionary of input parameters for the process + :param queue: Queue to run the job on + :param dedup: Optional deduplication flag + :param tag: Optional user-defined tag for the job + :return: Response object with job execution information + """ + url = os.path.join(self.config.processes_ogc, str(process_id), 'execution') + headers = self._get_api_header(content_type='application/json') + data = { + "inputs": inputs, + "queue": queue + } + if dedup is not None: + data["dedup"] = dedup + if tag is not None: + data["tag"] = tag + + logger.debug('POST request sent to {}'.format(url)) + + response = requests.post( + url=url, + headers=headers, + json=data + ) + return response + + def get_job_status(self, job_id): + """ + Get the status of an OGC job + :param job_id: The job ID to check status for + :return: Response object with job status + """ + url = os.path.join(self.config.jobs_ogc, str(job_id)) + headers = self._get_api_header() + logger.debug('GET request sent to {}'.format(url)) + + response = requests.get( + url=url, + headers=headers + ) + return response + + def cancel_job(self, job_id, wait_for_completion=False): + """ + Cancel a running OGC job or delete a queued job + :param job_id: The job ID to cancel + :param wait_for_completion: Whether to wait for the cancellation to complete + :return: Response object with cancellation status + """ + url = os.path.join(self.config.jobs_ogc, str(job_id)) + params = {} + if wait_for_completion: + params['wait_for_completion'] = str(wait_for_completion).lower() + + headers = self._get_api_header() + logger.debug('DELETE request sent to {}'.format(url)) + + response = requests.delete( + url=url, + headers=headers, + params=params + ) + return response + + def get_job_result(self, job_id): + """ + Get the results of a completed OGC job + :param job_id: The job ID to get results for + :return: Response object with job results + """ + url = os.path.join(self.config.jobs_ogc, str(job_id), 'results') + headers = self._get_api_header() + logger.debug('GET request sent to {}'.format(url)) + response = requests.get( + url=url, + headers=headers + ) + return response + + def list_jobs(self, *, + process_id=None, + limit=None, + get_job_details=True, + offset=0, + page_size=10, + queue=None, + status=None, + tag=None, + min_duration=None, + max_duration=None, + type=None, + datetime=None, + priority=None): + """ + Returns a list of jobs for a given user that matches query params provided. + + Args: + process_id (id, optional): Algorithm ID to only show jobs submitted for this algorithm + limit (int, optional): Limit of jobs to send back + get_job_details (bool, optional): Flag that determines whether to return a detailed job list or a compact list containing just the job ids and their associated job tags. Default is True. + offset (int, optional): Offset for pagination. Default is 0. + page_size (int, optional): Page size for pagination. Default is 10. + queue (str, optional): Job processing resource. + status (str, optional): Job status, e.g. job-completed, job-failed, job-started, job-queued. + tag (str, optional): User job tag/identifier. + min_duration (int, optional): Minimum duration in seconds + max_duration (int, optional): Maximum duration in seconds + type (str, optional): Type, available values: process + datetime (str, optional): Either a date-time or an interval, half-bounded or bounded. Date and time expressions adhere to RFC 3339. Half-bounded intervals are expressed using double-dots. + priority (int, optional): Job priority, 0-9 + + Returns: + list: List of jobs for a given user that matches query params provided. + + Raises: + ValueError: If either algo_id or version is provided, but not both. + """ + params = { + k: v + for k, v in ( + ("processID", process_id), + ("limit", limit), + ("getJobDetails", get_job_details), + ("offset", offset), + ("pageSize", page_size), + ("queue", queue), + ("status", status), + ("tag", tag), + ("minDuration", min_duration), + ("maxDuration", max_duration), + ("type", type), + ("datetime", datetime), + ("priority", priority), + ) + if v is not None + } + + url = os.path.join(self.config.jobs_ogc) + headers = self._get_api_header() + + logger.debug('GET request sent to {}'.format(url)) + response = requests.get( + url=url, + headers=headers, + params=params + ) + return response + + def get_job_metrics(self, job_id): + """ + Get metrics for an OGC job + :param job_id: The job ID to get metrics for + :return: Response object with job metrics + """ + url = os.path.join(self.config.jobs_ogc, str(job_id), 'metrics') + headers = self._get_api_header() + logger.debug('GET request sent to {}'.format(url)) + response = requests.get( + url=url, + headers=headers + ) + return response + if __name__ == "__main__": print("initialized") \ No newline at end of file diff --git a/maap/utils/CMR.py b/maap/utils/CMR.py index 18ebf94..665228f 100644 --- a/maap/utils/CMR.py +++ b/maap/utils/CMR.py @@ -129,7 +129,7 @@ def generateGranuleCallFromEarthDataRequest(self, query, variable_name='maap', l params.append("limit=" + str(limit)) - result = variable_name + ".searchGranule(" + ", ".join(params) + ")" + result = variable_name + ".search_granule(" + ", ".join(params) + ")" return result @@ -157,7 +157,7 @@ def generateCallFromEarthDataQueryString(self, search_url, variable_name='maap', # e.g., # granules?collection_concept_id[]=C1&collection_concept_id[]=C2 # will be converted to - # maap.searchGranule(collection_concept_id="C1|C2") + # maap.search_granule(collection_concept_id="C1|C2") if any(x for x in params if x.startswith(p_key_assignment)): params[i - 1] = params[i - 1].replace(p_key_assignment, p_key_assignment + p_val + "|") else: @@ -167,8 +167,8 @@ def generateCallFromEarthDataQueryString(self, search_url, variable_name='maap', params.append("limit=" + str(limit)) if search == 'granule': - result = variable_name + ".searchGranule(" + ", ".join(params) + ")" + result = variable_name + ".search_granule(" + ", ".join(params) + ")" else: - result = variable_name + ".searchCollection(" + ", ".join(params) + ")" + result = variable_name + ".search_collection(" + ", ".join(params) + ")" return result diff --git a/maap/utils/algorithm_utils.py b/maap/utils/algorithm_utils.py index db12dd6..1717c84 100644 --- a/maap/utils/algorithm_utils.py +++ b/maap/utils/algorithm_utils.py @@ -18,6 +18,29 @@ def read_yaml_file(algo_yaml): algo_config = yaml_load(fr, Loader=Loader) return validate_algorithm_config(algo_config) +def read_cwl_file(algo_cwl): + """ + Parse through the CWL file and returns the response as the POST to register a + a process for OGC is expecting + https://github.com/MAAP-Project/joint-open-api-specs/blob/nasa-adaptation/ogc-api-processes/openapi-template/schemas/processes-core/postProcess.yaml + """ + try: + with open(algo_cwl, 'r') as f: + cwl_data = yaml.safe_load(f) + print(f"Successfully read and parsed '{algo_cwl}'") + return parse_cwl_data(cwl_data) + except FileNotFoundError: + print(f"Error: The file '{algo_cwl}' was not found.") + return None + except yaml.YAMLError as e: + print(f"Error parsing the YAML in '{algo_cwl}': {e}") + return None + +def parse_cwl_data(cwl_data): + algo_config = dict() + # TODO implement this and return cwl_data as a dictionary with important variables like + # the API is expecting + return algo_config def validate_algorithm_config(algo_config): return algo_config diff --git a/test/functional_test.py b/test/functional_test.py index de55154..d2ef8b7 100644 --- a/test/functional_test.py +++ b/test/functional_test.py @@ -79,7 +79,7 @@ def submit_job(maap: MAAP, wait_for_completion=False, queue="maap-dps-worker-8gb "output_filename": "output.tif", "outsize": "20" } - job = maap.submitJob(identifier="maap_functional_test", + job = maap.submit_job(identifier="maap_functional_test", algo_id=algo_name, version=algo_version, queue=queue, diff --git a/test/test_CMR.py b/test/test_CMR.py index 3a9ad33..d8878e7 100644 --- a/test/test_CMR.py +++ b/test/test_CMR.py @@ -16,79 +16,79 @@ def setUpClass(cls): cls._test_ur = 'uavsar_AfriSAR_v1-cov_lopenp_14043_16008_140_001_160225-geo_cov_4-4.bin' cls._test_site_name = 'lope' - def test_searchGranuleByInstrumentAndTrackNumber(self): - results = self.maap.searchGranule( + def test_search_granule_by_instrument_and_track_number(self): + results = self.maap.search_granule( instrument=self._test_instrument_name_uavsar, track_number=self._test_track_number, polarization='HH') self.assertTrue('concept-id' in results[0].keys()) - def test_searchGranuleByGranuleUR(self): - results = self.maap.searchGranule( + def test_search_granule_by_granule_ur(self): + results = self.maap.search_granule( granule_ur=self._test_ur) self.assertTrue('concept-id' in results[0].keys()) - def test_granuleDownload(self): - results = self.maap.searchGranule( + def test_granule_download(self): + results = self.maap.search_granule( granule_ur=self._test_ur) - download = results[0].getLocalPath('/Users/satorius/source') + download = results[0].get_local_path('/Users/satorius/source') self.assertTrue(len(download) > 0) - def test_granuleDownloadExternalDAAC(self): - # results = self.maap.searchGranule( + def test_granule_download_external_daac(self): + # results = self.maap.search_granule( # collection_concept_id='C1200231010-NASA_MAAP') - results = self.maap.searchGranule( + results = self.maap.search_granule( cmr_host='cmr.earthdata.nasa.gov', collection_concept_id='C2067521974-ORNL_CLOUD', granule_ur='GEDI_L3_Land_Surface_Metrics.GEDI03_elev_lowestmode_stddev_2019108_2020106_001_08.tif') - download = results[0].getData() + download = results[0].get_data() self.assertTrue(len(download) > 0) - def test_direct_granuleDownload(self): - results = self.maap.downloadGranule( + def test_direct_granule_download(self): + results = self.maap.download_granule( online_access_url='https://datapool.asf.alaska.edu/GRD_HD/SA/S1A_S3_GRDH_1SDH_20140615T034444_20140615T034512_001055_00107C_8977.zip', destination_path='./tmp' ) self.assertTrue(len(results) > 0) - def test_searchGranuleByInstrumentAndSiteName(self): - results = self.maap.searchGranule( + def test_search_granule_by_instrument_and_site_name(self): + results = self.maap.search_granule( instrument=self._test_instrument_name_lvis, site_name=self._test_site_name) self.assertTrue('concept-id' in results[0].keys()) - def test_searchGranuleWithPipeDelimiters(self): - results = self.maap.searchGranule( + def test_search_granule_with_pipe_delimiters(self): + results = self.maap.search_granule( instrument="LVIS|UAVSAR", platform="AIRCRAFT") self.assertTrue('concept-id' in results[0].keys()) - def test_searchFromEarthdata(self): - results = self.maap.searchCollection( + def test_search_from_earthdata(self): + results = self.maap.search_collection( instrument="LVIS|UAVSAR", platform="AIRCRAFT|B-200|COMPUTERS", data_center="MAAP Data Management Team|ORNL_DAAC") self.assertTrue('concept-id' in results[0].keys()) - def test_searchCollection(self): - results = self.maap.searchCollection( + def test_search_collection(self): + results = self.maap.search_collection( instrument=self._test_instrument_name_uavsar) self.assertTrue('concept-id' in results[0].keys()) - def test_searchGranuleWithWildcards(self): - results = self.maap.searchGranule(collection_concept_id="C1200110748-NASA_MAAP", + def test_search_granule_with_wildcards(self): + results = self.maap.search_granule(collection_concept_id="C1200110748-NASA_MAAP", readable_granule_name='*185*') self.assertTrue('concept-id' in results[0].keys()) - def test_getUrl(self): - results = self.maap.searchGranule(page_num="1", concept_id="C1214470488-ASF", sort_key="-start_date", limit=1) + def test_get_url(self): + results = self.maap.search_granule(page_num="1", concept_id="C1214470488-ASF", sort_key="-start_date", limit=1) - url = results[0].getHttpUrl() + url = results[0].get_http_url() self.assertTrue(url.startswith("http")) - url = results[0].getS3Url() + url = results[0].get_s3_url() self.assertTrue(url.startswith("s3")) diff --git a/test/test_DPS.py b/test/test_DPS.py deleted file mode 100644 index cb6dc47..0000000 --- a/test/test_DPS.py +++ /dev/null @@ -1,61 +0,0 @@ -from unittest import TestCase - -import yaml - -from maap.maap import MAAP -import logging -from yaml import load as yaml_load, dump as yaml_dump -try: - from yaml import CLoader as Loader, CDumper as Dumper -except ImportError: - from yaml import Loader, Dumper - -class TestDPS(TestCase): - logging.basicConfig(level=logging.DEBUG) - logger = logging.getLogger(__name__) - - @classmethod - def setUpClass(cls): - cls.logger.debug("Initializing MAAP") - cls.maap = MAAP() - - def test_registerAlgorithm(self): - self.maap.register_algorithm_from_yaml_file("dps_test_algo_config.yaml") - - def test_deleteAlgorithm(self): - pass - - def test_deleteJob(self): - pass - - def test_describeAlgorithm(self): - pass - - def test_dismissJob(self): - pass - - def test_getJobMetrics(self): - pass - - def test_getJobResult(self): - pass - - def test_getJobStatus(self): - pass - - def test_getQueues(self): - pass - - def test_listAlgorithms(self): - pass - - def test_listJobs(self): - pass - - def test_publishAlgorithm(self): - pass - - - def test_submitJob(self): - pass - diff --git a/test/test_MAAP.py b/test/test_MAAP.py index 5dcc082..aef8238 100644 --- a/test/test_MAAP.py +++ b/test/test_MAAP.py @@ -54,18 +54,18 @@ def test_genFromEarthdata(self): """ var_name = 'maapVar' - testResult = self.maap.getCallFromEarthdataQuery(query=input, variable_name=var_name) + testResult = self.maap.get_call_from_earthdata_query(query=input, variable_name=var_name) self.assertTrue( - testResult == var_name + '.searchGranule('\ + testResult == var_name + '.search_granule('\ 'processing_level_id="1A|1B|2|4", '\ 'instrument="LVIS|UAVSAR", '\ 'platform="AIRCRAFT|B-200|COMPUTERS", '\ 'data_center="MAAP Data Management Team", '\ 'bounding_box="-35.4375,-55.6875,-80.4375,37.6875")') - def test_uploadFiles(self): + def test_upload_files(self): self.maap._upload_s3 = MagicMock(return_value=None) - result = self.maap.uploadFiles(['test/s3-upload-testfile1.txt', 'test/s3-upload-testfile2.txt']) + result = self.maap.upload_files(['test/s3-upload-testfile1.txt', 'test/s3-upload-testfile2.txt']) upload_msg_regex = re.compile('Upload file subdirectory: .+ \\(keep a record of this if you want to share these files with other users\\)') self.assertTrue(re.match(upload_msg_regex, result)) diff --git a/test/test_ogc.py b/test/test_ogc.py new file mode 100644 index 0000000..84d3252 --- /dev/null +++ b/test/test_ogc.py @@ -0,0 +1,472 @@ +""" +Test module for algorithm and job functions in maap.py +""" + +import pytest +from maap.maap import MAAP + + +def test_list_algorithms(): + """Test list_algorithms function calls OGC algorithms endpoint and returns 200 with JSON""" + maap = MAAP(maap_host='api.dit.maap-project.org') + + response = maap.list_algorithms() + + # Check that we get a 200 status code + assert response.status_code == 200, f"Expected 200, got {response.status_code}" + + # Check that response is valid JSON + try: + json_data = response.json() + assert isinstance(json_data, (dict, list)), "Response should be valid JSON (dict or list)" + except ValueError as e: + pytest.fail(f"Response is not valid JSON: {e}") + + +def test_register_algorithm(): + """Test register_algorithm function with a valid CWL URL""" + maap = MAAP() + + # Skip test if we can't authenticate + try: + # Test that list_algorithms works first to ensure we have proper authentication + list_response = maap.list_algorithms() + if list_response.status_code != 200: + pytest.skip("Authentication required - skipping register_algorithm test") + except Exception: + pytest.skip("Cannot connect to MAAP API - skipping register_algorithm test") + + # Use a real CWL example URL that should work + sample_cwl_url = "https://raw.githubusercontent.com/MAAP-Project/maap-algorithms/master/examples/hello-world/hello-world.cwl" + + response = maap.register_algorithm(sample_cwl_url) + + # Should get a successful response or a meaningful error + assert response.status_code in [200, 201], f"Expected successful registration, got {response.status_code}: {response.text}" + + # Should return JSON with deployment info + json_data = response.json() + assert isinstance(json_data, dict), "Registration response should be a JSON object" + + # Should contain deployment information + assert "deploymentID" in json_data or "id" in json_data, "Response should contain deployment ID" + + +def test_get_deployment_status(): + """Test get_deployment_status function""" + maap = MAAP() + + # Skip test if we can't authenticate + try: + list_response = maap.list_algorithms() + if list_response.status_code != 200: + pytest.skip("Authentication required - skipping get_deployment_status test") + except Exception: + pytest.skip("Cannot connect to MAAP API - skipping get_deployment_status test") + + # Since we don't have a real deployment ID, this test will likely return 404 + # which is the expected behavior for a non-existent deployment + sample_deployment_id = "test-deployment-123" + + response = maap.get_deployment_status(sample_deployment_id) + + # Should get a valid response - 200 if found, 404 if not found + assert response.status_code in [200, 404], f"Expected 200 or 404, got {response.status_code}: {response.text}" + + # If deployment exists (200), should return JSON with status info + if response.status_code == 200: + json_data = response.json() + assert isinstance(json_data, dict), "Status response should be a JSON object" + assert "status" in json_data, "Response should contain status information" + + # Verify the URL contains the deployment ID + assert str(sample_deployment_id) in response.url + + +def test_describe_algorithm(): + """Test describe_algorithm function by getting algorithm list and describing first algorithm""" + maap = MAAP(maap_host='api.dit.maap-project.org') + + # First get the list of algorithms + list_response = maap.list_algorithms() + assert list_response.status_code == 200, f"Failed to get algorithm list: {list_response.status_code}" + + try: + processes_data = list_response.json() + except ValueError as e: + pytest.fail(f"Algorithm list response is not valid JSON: {e}") + + # Check if there are any algorithms + if not processes_data or (isinstance(processes_data, dict) and not processes_data.get('processes')): + pytest.skip("No algorithms available to test describe_algorithm") + + # Get the first algorithm + if isinstance(processes_data, dict) and 'processes' in processes_data: + processes = processes_data['processes'] + else: + processes = processes_data + + if not processes or len(processes) == 0: + pytest.skip("No algorithms available to test describe_algorithm") + + first_process = processes[0] + + # Find the self link or use process ID + process_id = None + if 'links' in first_process: + for link in first_process['links']: + if link.get('rel') == 'self': + href = link.get('href', '') + # Extract process ID from href like /ogc/processes/3 + if '/ogc/processes/' in href: + process_id = href.split('/ogc/processes/')[-1] + break + + # Fall back to process ID field if no self link found + if not process_id and 'id' in first_process: + process_id = first_process['id'] + + if not process_id: + pytest.skip("Could not determine algorithm ID to test describe_algorithm") + + # Now test the describe_algorithm function + describe_response = maap.describe_algorithm(process_id) + + # Check that we get a successful response + assert describe_response.status_code == 200, f"Expected 200, got {describe_response.status_code}" + + # Check that response is valid JSON + try: + describe_data = describe_response.json() + assert isinstance(describe_data, dict), "Describe response should be a JSON object" + except ValueError as e: + pytest.fail(f"Describe response is not valid JSON: {e}") + + # Verify the URL called contains the algorithm ID + assert str(process_id) in describe_response.url + + +def test_update_algorithm(): + """Test update_algorithm function""" + maap = MAAP() + + # Skip test if we can't authenticate + try: + list_response = maap.list_algorithms() + if list_response.status_code != 200: + pytest.skip("Authentication required - skipping update_algorithm test") + except Exception: + pytest.skip("Cannot connect to MAAP API - skipping update_algorithm test") + + # Use a non-existent algorithm ID - should return 404 which is expected + sample_process_id = "non-existent-algorithm-123" + sample_cwl_url = "https://raw.githubusercontent.com/MAAP-Project/maap-algorithms/master/examples/hello-world/hello-world.cwl" + + response = maap.update_algorithm(sample_process_id, sample_cwl_url) + + # Should get a valid response - 200 if successful, 404 if not found, 403 if not authorized + assert response.status_code in [200, 404, 403], f"Expected 200, 404, or 403, got {response.status_code}: {response.text}" + + # If successful (200), should return JSON with update info + if response.status_code == 200: + json_data = response.json() + assert isinstance(json_data, dict), "Update response should be a JSON object" + + # Verify the URL contains the process ID + assert str(sample_process_id) in response.url + + +def test_delete_algorithm(): + """Test delete_algorithm function""" + maap = MAAP() + + # Skip test if we can't authenticate + try: + list_response = maap.list_algorithms() + if list_response.status_code != 200: + pytest.skip("Authentication required - skipping delete_algorithm test") + except Exception: + pytest.skip("Cannot connect to MAAP API - skipping delete_algorithm test") + + # Use a non-existent algorithm ID - should return 404 which is expected + sample_process_id = "non-existent-algorithm-123" + + response = maap.delete_algorithm(sample_process_id) + + # Should get a valid response - 200/204 if successful, 404 if not found, 403 if not authorized + assert response.status_code in [200, 204, 404, 403], f"Expected 200, 204, 404, or 403, got {response.status_code}: {response.text}" + + # If successful (200/204), response might be empty or contain JSON + if response.status_code in [200, 204]: + if response.content: # Only check JSON if there's content + json_data = response.json() + assert isinstance(json_data, dict), "Delete response should be a JSON object" + + # Verify the URL contains the process ID + assert str(sample_process_id) in response.url + + +def test_get_algorithm_package(): + """Test get_algorithm_package function""" + maap = MAAP(maap_host='api.dit.maap-project.org') + + # First get the list of algorithms + list_response = maap.list_algorithms() + assert list_response.status_code == 200, f"Failed to get algorithm list: {list_response.status_code}" + + try: + processes_data = list_response.json() + except ValueError as e: + pytest.fail(f"Algorithm list response is not valid JSON: {e}") + + # Check if there are any algorithms + if not processes_data or (isinstance(processes_data, dict) and not processes_data.get('processes')): + pytest.skip("No algorithms available to test get_algorithm_package") + + # Get the first algorithm + if isinstance(processes_data, dict) and 'processes' in processes_data: + processes = processes_data['processes'] + else: + processes = processes_data + + if not processes or len(processes) == 0: + pytest.skip("No algorithms available to test get_algorithm_package") + + first_process = processes[0] + + # Find the self link or use process ID + process_id = None + if 'links' in first_process: + for link in first_process['links']: + if link.get('rel') == 'self': + href = link.get('href', '') + # Extract process ID from href like /ogc/processes/3 + if '/ogc/processes/' in href: + process_id = href.split('/ogc/processes/')[-1] + break + + # Fall back to process ID field if no self link found + if not process_id and 'id' in first_process: + process_id = first_process['id'] + + if not process_id: + pytest.skip("Could not determine algorithm ID to test get_algorithm_package") + + # Now test the package_response function + package_response = maap.get_algorithm_package(process_id) + + # Check that we get a successful response + assert package_response.status_code == 200, f"Expected 200, got {package_response.status_code}" + + # Check that response is valid JSON + try: + package_data = package_response.json() + assert isinstance(package_data, dict), "Algorithm Package response should be a JSON object" + except ValueError as e: + pytest.fail(f"Algorithm package response is not valid JSON: {e}") + + # Verify the URL called contains the algorithm ID + assert str(process_id) in package_response.url + + +def test_submit_job(): + """Test submit_job function by first getting a real algorithm ID""" + maap = MAAP() + + # Skip test if we can't authenticate + try: + list_response = maap.list_algorithms() + if list_response.status_code != 200: + pytest.skip("Authentication required - skipping submit_job test") + except Exception: + pytest.skip("Cannot connect to MAAP API - skipping submit_job test") + + # Get a real algorithm to test with + try: + algorithms_data = list_response.json() + if not algorithms_data or (isinstance(algorithms_data, dict) and not algorithms_data.get('processes')): + pytest.skip("No algorithms available to test submit_job") + + if isinstance(algorithms_data, dict) and 'processes' in algorithms_data: + algorithms = algorithms_data['processes'] + else: + algorithms = algorithms_data + + if not algorithms or len(algorithms) == 0: + pytest.skip("No algorithms available to test submit_job") + + # Get the first algorithm's ID + first_algorithm = algorithms[0] + algorithm_id = first_algorithm.get('id') or first_algorithm.get('processId') + + if not algorithm_id: + pytest.skip("Could not determine algorithm ID to test submit_job") + + except Exception as e: + pytest.skip(f"Could not parse algorithms list: {e}") + + # Test job submission with minimal inputs + sample_inputs = {} # Empty inputs for basic test + sample_queue = "maap-dps-worker-32gb" # Use a real queue name + + response = maap.submit_job(algorithm_id, sample_inputs, sample_queue) + + # Should get a response - 200/201 if successful, 400 if invalid inputs, 404 if algorithm not found + assert response.status_code in [200, 201, 400, 404], f"Expected valid response, got {response.status_code}: {response.text}" + + # If successful (200/201), should return JSON with job info + if response.status_code in [200, 201]: + json_data = response.json() + assert isinstance(json_data, dict), "Job submission response should be a JSON object" + assert "jobID" in json_data or "id" in json_data, "Response should contain job ID" + + # Verify the URL contains the algorithm ID + assert str(algorithm_id) in response.url + + +def test_get_job_status(): + """Test get_job_status function""" + maap = MAAP() + + # Skip test if we can't authenticate + try: + list_response = maap.list_jobs() + if list_response.status_code != 200: + pytest.skip("Authentication required - skipping get_job_status test") + except Exception: + pytest.skip("Cannot connect to MAAP API - skipping get_job_status test") + + # Use a non-existent job ID - should return 404 which is expected + sample_job_id = "non-existent-job-123" + + response = maap.get_job_status(sample_job_id) + + # Should get a valid response - 200 if found, 404 if not found + assert response.status_code in [200, 404], f"Expected 200 or 404, got {response.status_code}: {response.text}" + + # If job exists (200), should return JSON with status info + if response.status_code == 200: + json_data = response.json() + assert isinstance(json_data, dict), "Job status response should be a JSON object" + assert "status" in json_data, "Response should contain status information" + + # Verify the URL contains the job ID + assert str(sample_job_id) in response.url + + +def test_cancel_job(): + """Test cancel_job function""" + maap = MAAP() + + # Skip test if we can't authenticate + try: + list_response = maap.list_jobs() + if list_response.status_code != 200: + pytest.skip("Authentication required - skipping cancel_job test") + except Exception: + pytest.skip("Cannot connect to MAAP API - skipping cancel_job test") + + # Use a non-existent job ID - should return 404 which is expected + sample_job_id = "non-existent-job-123" + + response = maap.cancel_job(sample_job_id) + + # Should get a valid response - 200/204 if successful, 404 if not found, 409 if already completed + assert response.status_code in [200, 204, 404, 409], f"Expected 200, 204, 404, or 409, got {response.status_code}: {response.text}" + + # If successful (200/204), response might be empty or contain JSON + if response.status_code in [200, 204]: + if response.content: # Only check JSON if there's content + json_data = response.json() + assert isinstance(json_data, dict), "Cancel response should be a JSON object" + + # Verify the URL contains the job ID + assert str(sample_job_id) in response.url + + +def test_get_job_result(): + """Test get_job_result function""" + maap = MAAP() + + # Skip test if we can't authenticate + try: + list_response = maap.list_jobs() + if list_response.status_code != 200: + pytest.skip("Authentication required - skipping get_job_result test") + except Exception: + pytest.skip("Cannot connect to MAAP API - skipping get_job_result test") + + # Use a non-existent job ID - should return 404 which is expected + sample_job_id = "non-existent-job-123" + + response = maap.get_job_result(sample_job_id) + + # Should get a valid response - 200 if found, 404 if not found, 425 if not ready + assert response.status_code in [200, 404, 425], f"Expected 200, 404, or 425, got {response.status_code}: {response.text}" + + # If job results exist (200), should return JSON with result info + if response.status_code == 200: + json_data = response.json() + assert isinstance(json_data, dict), "Job result response should be a JSON object" + # Should contain outputs or links to result files + + # Verify the URL contains the job ID and 'results' + assert str(sample_job_id) in response.url + assert 'results' in response.url + + +def test_list_jobs(): + """Test list_jobs function""" + maap = MAAP() + + # Skip test if we can't authenticate + try: + response = maap.list_jobs() + if response.status_code != 200: + pytest.skip("Authentication required - skipping get_job_result test") + except Exception: + pytest.skip("Cannot connect to MAAP API - skipping get_job_result test") + + # Only check JSON content if we get a successful response + if response.status_code == 200: + json_data = response.json() + assert isinstance(json_data, (dict, list)), "Jobs list response should be JSON (dict or list)" + + # If it's a dict, it might have a 'jobs' key or similar + if isinstance(json_data, dict): + # Common structures: {"jobs": [...]} or {"processes": [...]} + assert len(json_data) >= 0, "Jobs response should be valid" + elif isinstance(json_data, list): + # Direct list of jobs + assert len(json_data) >= 0, "Jobs list should be valid" + + +def test_get_job_metrics(): + """Test get_job_metrics function""" + maap = MAAP() + + # Skip test if we can't authenticate + try: + list_response = maap.list_jobs() + if list_response.status_code != 200: + pytest.skip("Authentication required - skipping get_job_metrics test") + except Exception: + pytest.skip("Cannot connect to MAAP API - skipping get_job_metrics test") + + # Use a non-existent job ID - should return 404 which is expected + sample_job_id = "non-existent-job-123" + + response = maap.get_job_metrics(sample_job_id) + + # Should get a valid response - 200 if found, 404 if not found, 425 if not available + assert response.status_code in [200, 404, 425], f"Expected 200, 404, or 425, got {response.status_code}: {response.text}" + + # If job metrics exist (200), should return JSON with metrics info + if response.status_code == 200: + json_data = response.json() + assert isinstance(json_data, dict), "Job metrics response should be a JSON object" + # Should contain metrics like CPU usage, memory usage, duration, etc. + + # Verify the URL contains the job ID and 'metrics' + assert str(sample_job_id) in response.url + assert 'metrics' in response.url \ No newline at end of file