Add a data pipeline to this repo that downloads the latest HIT, SWE, SWAPI and CODICE data from the I-ALiRT APIs into CSV data files, one per day and save them in the datastore.
We already have data pipelines for MAG science and MAG HK already but they are going to need to change for the new pipelines - see this code for the existing prefect data flows
We have newer framework/infrastructure code for pipeline building which allows us to more easily build pipelines, track progress and reuse shared code in Stages so that every pipeline can reuse similar logic. For example code to put to PublishFileToDatastoreStage, or a stage to update the workflow tracking progress table. An example of a pipeline that uses the pipeline framework is at https://github.com/ImperialCollegeLondon/imap-pipeline-core/blob/main/src/prefect_server/pollLoPivotPlatform.py and that calls the LoPivotPlatformPipeline class
I would be helpful for reusability if the new IMAP IALIRT pipelines are built using the new Pipeline/Stages framework.
The pipelines can all also be executed with a simple CLI so that the prefect flow code is really just a scheduler and wrapper around plain CLI code that is framework agnostic. For example the MAG IALIRT CLI command is in https://github.com/ImperialCollegeLondon/imap-pipeline-core/blob/main/src/imap_mag/cli/fetch/ialirt.py and can be called like so: imap-mag fetch ialirt --start-date 2025-01-02 --end-date 2025-01-03
The CLI needs to be maintained and extended for the new instruments. I suggest you add a new instrument argument that defaults to mag, e.g. imap-mag fetch ialirt --instrument swe --start-date 2025-01-02 --end-date 2025-01-03 or imap-mag fetch ialirt --instrument hit --start-date 2025-01-02 --end-date 2025-01-03
To get the instrument data you can reuse the existing API client - see https://github.com/ImperialCollegeLondon/imap-pipeline-core/blob/main/src/imap_mag/client/IALiRTApiClient.py. This calls a package issued by the mission which has documentation at https://pypi.org/project/ialirt-data-access/
The list of instrument data products we want to download are
- hit
- mag (already done)
- codice_lo
- codice_hi
- swapi
- swe
Example API cURLs:
curl "https://ialirt.imap-mission.com/space-weather?instrument=hit&time_utc_start=2026-04-22T05:30:00&time_utc_end=2026-04-22T08:30:00"
curl "https://ialirt.imap-mission.com/space-weather?instrument=swe&time_utc_start=2026-04-22T05:30:00&time_utc_end=2026-04-22T08:30:00"
Each instrument json data should be converted and saved in a CSV file per day. The current MAG data is in the datastore in files arranged by year and month, so 1st Feb 2026 is in
[datastore root]\ialirt\2026\02\imap_ialirt_20260201.csv. Unfortunately the MAG data does not have MAG in the name - oops - but lets leave that be. I suggest you flow instrument name into the files so you get [datastore root]\ialirt\2026\02\imap_ialirt_hit_20260201.csv, [datastore root]\ialirt\2026\02\imap_ialirt_swe_20260201.csv etc in the same folder
When downloading we use the Workflow Progress database table with a key like MAG_IALIRT or MAG_IALIRT_HK to keep track of where we have already downloaded up to and then crawl forwards in time to only download the new data. You should track each IALIRT instrument separately with their own key and crawl each one in turn.
We also have flow arguments allowing us to manually catch up/re-download earlier data by passing date args, but the default is to just download the new stuff. When downloading new data the rows get appended to the existing CSV files if it already exists so that we do not re-download the same data for today every 5minutes. This code is in https://github.com/ImperialCollegeLondon/imap-pipeline-core/blob/main/src/imap_mag/download/FetchIALiRT.py
In prod, the 2 current I-ALiRT flows (mag, mag_hk) run (in containers) in prefect once per hour, and poll for data every (configurable) 5 minutes. They automatically shut down just before the hour boundary. We don't really want 7 instruments plus 1 HK (so 8!) containers running all the time - that is a waste of resources. Instead refactor to have one I-ALiRT flow that runs hourly. Within that one Prefect Flow I suggest you have a main loop that runs every (configurable) 5 minutes and which launches one Prefect Task
to get the latests data for per IALIRT instrument in turn (so 8 tasks), including the mag_hk, and mag. Then once they have all downloaded sleep for a while and start again launching another 8 tasks. This setup will keep logs separate, make observability easy and not waste resources.
For the data to appear in the database we can reuse the existing postgres data pipeline to read CSV files and map them to a corresponding postgres table using my library crump. All you need to do is add to the config files so it knows how to find the files and how to map them:
- Add the path e.g.
ialirt/**/imap_ialirt_swe_*.csv to the new CSV files to the main app config file in
https://github.com/ImperialCollegeLondon/imap-pipeline-core/blob/main/imap-mag-config.yaml#L107
postgres_upload:
crump_config_path: imap-db-ingest-config.yaml
database_url_env_var_or_block_name: 'imap-database'
enable_history: true
paths_to_match:
.........
- "ialirt_hk/**/imap_ialirt_hk*.csv"
- "ialirt/**/imap_ialirt_*.csv"
- "ialirt/**/imap_ialirt_swe_*.csv"
- Add a new table def like the IALIRT MAG csv data mapping is at https://github.com/ImperialCollegeLondon/imap-pipeline-core/blob/main/imap-db-ingest-config.yaml#L11028
You can generate the crump config file from the csv data file using the CLI easily once you have CSV file you want to ingest, e.g. crump prepare imap_ialirt_swe_20260201.csv --config imap-db-ingest-config.yaml --job ialirt_swe will spit out the required config to ingest that data. you might then need to tweak the config and check the data types etc.
Add a data pipeline to this repo that downloads the latest HIT, SWE, SWAPI and CODICE data from the I-ALiRT APIs into CSV data files, one per day and save them in the datastore.
We already have data pipelines for MAG science and MAG HK already but they are going to need to change for the new pipelines - see this code for the existing prefect data flows
We have newer framework/infrastructure code for pipeline building which allows us to more easily build pipelines, track progress and reuse shared code in Stages so that every pipeline can reuse similar logic. For example code to put to PublishFileToDatastoreStage, or a stage to update the workflow tracking progress table. An example of a pipeline that uses the pipeline framework is at https://github.com/ImperialCollegeLondon/imap-pipeline-core/blob/main/src/prefect_server/pollLoPivotPlatform.py and that calls the LoPivotPlatformPipeline class
I would be helpful for reusability if the new IMAP IALIRT pipelines are built using the new Pipeline/Stages framework.
The pipelines can all also be executed with a simple CLI so that the prefect flow code is really just a scheduler and wrapper around plain CLI code that is framework agnostic. For example the MAG IALIRT CLI command is in https://github.com/ImperialCollegeLondon/imap-pipeline-core/blob/main/src/imap_mag/cli/fetch/ialirt.py and can be called like so:
imap-mag fetch ialirt --start-date 2025-01-02 --end-date 2025-01-03The CLI needs to be maintained and extended for the new instruments. I suggest you add a new instrument argument that defaults to mag, e.g.
imap-mag fetch ialirt --instrument swe --start-date 2025-01-02 --end-date 2025-01-03orimap-mag fetch ialirt --instrument hit --start-date 2025-01-02 --end-date 2025-01-03To get the instrument data you can reuse the existing API client - see https://github.com/ImperialCollegeLondon/imap-pipeline-core/blob/main/src/imap_mag/client/IALiRTApiClient.py. This calls a package issued by the mission which has documentation at https://pypi.org/project/ialirt-data-access/
The list of instrument data products we want to download are
Example API cURLs:
Each instrument json data should be converted and saved in a CSV file per day. The current MAG data is in the datastore in files arranged by year and month, so 1st Feb 2026 is in
[datastore root]\ialirt\2026\02\imap_ialirt_20260201.csv. Unfortunately the MAG data does not have MAG in the name - oops - but lets leave that be. I suggest you flow instrument name into the files so you get[datastore root]\ialirt\2026\02\imap_ialirt_hit_20260201.csv,[datastore root]\ialirt\2026\02\imap_ialirt_swe_20260201.csvetc in the same folderWhen downloading we use the Workflow Progress database table with a key like MAG_IALIRT or MAG_IALIRT_HK to keep track of where we have already downloaded up to and then crawl forwards in time to only download the new data. You should track each IALIRT instrument separately with their own key and crawl each one in turn.
We also have flow arguments allowing us to manually catch up/re-download earlier data by passing date args, but the default is to just download the new stuff. When downloading new data the rows get appended to the existing CSV files if it already exists so that we do not re-download the same data for today every 5minutes. This code is in https://github.com/ImperialCollegeLondon/imap-pipeline-core/blob/main/src/imap_mag/download/FetchIALiRT.py
In prod, the 2 current I-ALiRT flows (mag, mag_hk) run (in containers) in prefect once per hour, and poll for data every (configurable) 5 minutes. They automatically shut down just before the hour boundary. We don't really want 7 instruments plus 1 HK (so 8!) containers running all the time - that is a waste of resources. Instead refactor to have one I-ALiRT flow that runs hourly. Within that one Prefect Flow I suggest you have a main loop that runs every (configurable) 5 minutes and which launches one Prefect Task
to get the latests data for per IALIRT instrument in turn (so 8 tasks), including the mag_hk, and mag. Then once they have all downloaded sleep for a while and start again launching another 8 tasks. This setup will keep logs separate, make observability easy and not waste resources.
For the data to appear in the database we can reuse the existing postgres data pipeline to read CSV files and map them to a corresponding postgres table using my library crump. All you need to do is add to the config files so it knows how to find the files and how to map them:
ialirt/**/imap_ialirt_swe_*.csvto the new CSV files to the main app config file inhttps://github.com/ImperialCollegeLondon/imap-pipeline-core/blob/main/imap-mag-config.yaml#L107
You can generate the crump config file from the csv data file using the CLI easily once you have CSV file you want to ingest, e.g.
crump prepare imap_ialirt_swe_20260201.csv --config imap-db-ingest-config.yaml --job ialirt_swewill spit out the required config to ingest that data. you might then need to tweak the config and check the data types etc.