-
Notifications
You must be signed in to change notification settings - Fork 135
adding files related to NOA GFS poc #1824
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
fee7a82
adcf7ab
7e0150e
069eadd
0e97f29
5b46602
94c3769
644825a
6ab31c8
68b28b8
2af5ae9
e0bfb51
8532138
b13690b
0a643dd
18461a0
d60df2f
4e3b9f6
3b7efda
d50a1f3
ab3db64
2c83c8f
d7940c5
6200509
0cbda78
e979a2d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,45 @@ | ||
| # NOAA: Global Forecast System Dataset | ||
| ## Overview | ||
| The NOAA-GFS 0.25 Atmos dataset provides high-resolution global atmospheric and land-surface data on a 0.25-degree (~28km) grid. It includes a wide range of meteorological variables, such as temperature, wind, humidity, precipitation, and soil moisture, generated four times daily with forecasts extending up to 16 days (384 hours). | ||
| The dataset provides a standardized global output on a 0.25-degree (~28km) equidistant cylindrical grid, covering the entire Earth's surface and up to 127 vertical atmospheric layers. It is distributed in GRIB2 (Gridded Binary Edition 2) format via the NOAA Operational Model Archive and Distribution System (NOMADS) and is categorized as a public domain product of the United States Government. | ||
|
|
||
| ## Data Source | ||
| **Source URL:** | ||
| https://nomads.ncep.noaa.gov/pub/data/nccf/com/gfs/prod/ | ||
|
|
||
| **Provenance Description:** | ||
| The NOAA Global Forecast System (GFS) 0.25 Atmos dataset is produced and maintained by the National Centers for Environmental Prediction (NCEP), a component of the National Oceanic and Atmospheric Administration (NOAA). The data is generated through the Global Data Assimilation System (GDAS), which integrates global observations from satellites, weather balloons, radar, and commercial aircraft into the Finite Volume Cubed-Sphere (FV3) dynamical core. | ||
|
|
||
| ## How To Download Input Data | ||
| The source contains a huge number of data files. For the correct file: | ||
| - Go to the source | ||
| - Choose the date of observation. | ||
| - Select one of the 4 directories available. These directories represent the data collected 4 times a day. | ||
| - Select the atmos directory for atmospheric data. | ||
| - There are multiple files available in the directory, some holding vertical soundings, others files with raw, unstructured data for super computers, Surface flux files etc. | ||
| - For general mapping and analysis of the GFS data, the following format files are available: | ||
| gfs.t00z.pgrb2.0p25.f000 | ||
| gfs.t00z.pgrb2.0p25.f001 | ||
| and so on, till gfs.t00z.pgrb2.0p25.f384 | ||
| - t00z represents the cycle of the day selected out of 4; 0p25 denoting the 0.25 degrees horizontal resolution and fXXX refers to the forecast hour. | ||
| - Till the 120th hour, i.e f000 to f120, the data is provided in 1-hour increments. After f120 (Day 5), the data switches to 3-hour increments. | ||
| - The .idx file has the metadata and the variables that are present in the main data file. | ||
| - The main file is a binary file and can be converted using the wgrib2 tool by NOAA. | ||
|
|
||
| The wgrib2 tool is available in Github from NOAA. | ||
| - Once the raw data file is downloaded, we use the wgrib2 tool provided by NOAA on github. | ||
| - Clone the repository and install the wgrib2 tool. | ||
| - Convert the binary file into the desired format (csv) using the command : wgrib2 input_file.grib2 -csv output.csv | ||
|
|
||
| **Inventory URL:** | ||
| https://www.nco.ncep.noaa.gov/pmb/products/gfs/gfs.t00z.pgrb2.0p25.anl.shtml | ||
| This is the URL to the description of the variables. | ||
| ## Processing Instructions | ||
| The processing of data is done using custom script which: | ||
| - connects to Google Cloud Storage bucket and opens local CSV file containing raw NOAA GFS weather data | ||
| - references the parameter mapping to translate short meteorological codes (like TMP or UGRD) into formal descriptive terms (like Temperature_Place or WindSpeed) and assigns their corresponding scientific units | ||
| - runs a cleaning function to standardize "Levels." It converts human-readable strings like "2 m above ground" or "1000 mb" into structured IDs | ||
| - It combines the parameter and the level to create Data Commons Identifier (DCID). For example, temperature at the surface becomes dcid:Temperature_Place_SurfaceLevel. (DCID Construction) | ||
| - processes the data in batches of 1,000 rows, writing them to a memory buffer before "streaming" that chunk directly to the Google Cloud bucket. | ||
|
|
||
| After processing the input csv to the structured output csv, the output.csv is stored in the bucket. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,222 @@ | ||
| import csv | ||
| import io | ||
| import re | ||
| import time | ||
| from google.cloud import storage | ||
|
|
||
| # --- CONFIGURATION --- | ||
| BUCKET_NAME = "unresolved_mcf" | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we make the config variables as flags. Use flags from absl for this. |
||
| INPUT_LOCAL = "../noa_gfs/input_files/gfs.t00z.pgrb2.0p25.f000.csv" | ||
| OUTPUT_BLOB_NAME = "noaa_gfs/noaa_gfs_output.csv" | ||
|
|
||
| # 1. Parameter Mapping (Original) | ||
| param_map = { | ||
| 'PRMSL': ('Pressure_Place', 'Pascal'), | ||
| 'MSLET': ('MSLPEtaReduction_Pressure_Atmosphere', 'Pascal'), | ||
| 'TMP': ('Temperature_Place', 'Kelvin'), | ||
| 'DPT': ('DewPointTemperature_Atmosphere', 'Kelvin'), | ||
| 'APTMP': ('Apparent_Temperature_Place', 'Kelvin'), | ||
| 'HGT': ('GeopotentialHeight_Place', 'GeopotentialMeters'), | ||
| 'RH': ('Humidity_Place', 'Percent'), | ||
| 'SPFH': ('Humidity_Place', ''), | ||
| 'UGRD': ('WindSpeed_Place', 'MeterPerSecond'), | ||
| 'VGRD': ('WindSpeed_Place', 'MeterPerSecond'), | ||
| 'VIS': ('Visibility_Place', 'Meter'), | ||
| 'GUST': ('Max_WindSpeed_Place', 'MeterPerSecond'), | ||
| 'PRES': ('Pressure_Atmosphere', 'Pascal'), | ||
| 'CLMR': ('MixingRatio_Cloud', ''), | ||
| 'ICMR': ('MixingRatio_Ice', ''), | ||
| 'RWMR': ('MixingRatio_Rainwater', ''), | ||
| 'SNMR': ('MixingRatio_Snow', ''), | ||
| 'GRLE': ('Count_Graupel', ''), | ||
| 'REFD': ('Reflectivity_Place', 'Decibel'), | ||
| 'REFC': ('Max_CompositeReflectivity_Place', 'Decibel'), | ||
| 'VVEL': ('PressureVerticalVelocity_Velocity_Place', 'PascalPerSecond'), | ||
| 'DZDT': ('GeometricVerticalVelocity_Velocity_Place', 'MeterPerSecond'), | ||
| 'ABSV': ('AbsoluteVorticity_Place', 'InverseSecond'), | ||
| 'O3MR': ('Ozone_MixingRatio_Atmosphere', ''), | ||
| 'VRATE': ('VentilationRate_Place', 'SquareMeterPerSecond'), | ||
| 'TSOIL': ('Temperature_Soil', 'Kelvin'), | ||
| 'SOILW': ('VolumetricSoilMoisture_Soil', ''), | ||
| 'SOILL': ('LiquidWaterContent_Soil', ''), | ||
| 'TCDC': ('CloudCover_Place', 'Percent'), | ||
| 'HINDEX': ('HainesIndex_Place', ''), | ||
| 'CNWAT': ('CloudWaterContent_Atmosphere', 'KilogramPerMeterSquared'), | ||
| 'WEASD': ('SnowWaterEquivalent_Place', 'KilogramPerMeterSquared'), | ||
| 'SNOD': ('Depth_Snow', 'Meter'), | ||
| 'ICETK': ('Thickness_Ice', 'Meter'), | ||
| 'ICEG': ('GrowthRate_Count_Ice', 'MeterPerSecond'), | ||
| 'CPOFP': ('FrozenPrecipitation_Place', 'Percent'), | ||
| 'PRATE': ('PrecipitationRate_Place', ''), | ||
| 'CSNOW': ('Occurrence_Place_SurfaceLevel_Snow', ''), | ||
| 'CICEP': ('Occurrence_Place_SurfaceLevel_IcePellets', ''), | ||
| 'CFRZR': ('Occurrence_Place_SurfaceLevel_FreezingRain', ''), | ||
| 'CRAIN': ('Occurrence_Place_SurfaceLevel_Rain', ''), | ||
| 'VEG': ('Area_Place_SurfaceLevel_Vegetation', 'Percent'), | ||
| 'SFCR': ('SurfaceRoughness_Place', 'Meter'), | ||
| 'FRICV': ('FrictionalVelocity_Place', 'MeterPerSecond'), | ||
| 'SOTYP': ('SoilType_Soil', ''), | ||
| 'WILT': ('WiltingPoint_Soil', ''), | ||
| 'FLDCP': ('FieldCapacity_Soil', ''), | ||
| 'SUNSD': ('SunshineDuration_Place', 'Second'), | ||
| 'LFTX': ('SurfaceLiftedIndex_Atmosphere', 'Kelvin'), | ||
| '4LFTX': ('BestLiftedIndex_Atmosphere', 'Kelvin'), | ||
| 'CAPE': ('ConvectiveAvailablePotentialEnergy_Atmosphere', 'JoulePerKilogram'), | ||
| 'CIN': ('ConvectiveInhibition_Atmosphere', 'JoulePerKilogram'), | ||
| 'PWAT': ('PrecipitableWater_Place', 'KilogramPerMeterSquared'), | ||
| 'CWAT': ('CloudWater_Place', 'KilogramPerMeterSquared'), | ||
| 'TOZNE': ('Concentration_Atmosphere_Ozone', ''), | ||
| 'LCDC': ('CloudCover_Place_LowCloudLayer', 'Percent'), | ||
| 'MCDC': ('CloudCover_Place_MiddleCloudLayer', 'Percent'), | ||
| 'HCDC': ('CloudCover_Place_HighCloudLayer', 'Percent'), | ||
| 'HLCY': ('StormRelativeHelicity_Atmosphere', 'MetersSquaredPerSecondSquared'), | ||
| 'USTM': ('StormMotion_Atmosphere', 'MeterPerSecond'), | ||
| 'VSTM': ('StormMotion_Atmosphere', 'MeterPerSecond'), | ||
| 'ICAHT': ('ICAOStandardAtmosphere_Altitude_Atmosphere', 'Meter'), | ||
| 'VWSH': ('WindShear_Atmosphere', 'InverseSecond'), | ||
| 'POT': ('PotentialTemperature_Atmosphere', 'Kelvin'), | ||
| 'HPBL': ('PlanetaryBoundaryLayer_Altitude_Atmosphere', 'Meter'), | ||
| 'PLPL': ('LiftedParcelLevel_Pressure_Atmosphere', 'Pascal'), | ||
| 'LAND': ('Area_LandCover', 'SquareDegree'), | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the same logic in pvmap achieved through custom script. If yes, where is the Multiplier logic implemented ? |
||
| 'ICEC': ('Area_IceCover', 'SquareDegree'), | ||
| 'ICETMP': ('Temperature_SeaIce', 'Kelvin'), | ||
| } | ||
|
|
||
| # 2. Helper Function to Clean Level for DCID | ||
| def format_level_dcid(level): | ||
| l = str(level).lower().strip() | ||
|
|
||
| if l == "mean sea level": | ||
| return "0MetersAboveMeanSeaLevel" | ||
| if "m above mean sea level" in l: | ||
| val = l.split(" ")[0].replace("-", "To") | ||
| return f"{val}MetersAboveMeanSeaLevel" | ||
|
|
||
| if l == "surface": return "SurfaceLevel" | ||
| if "entire atmosphere" in l: return "" | ||
| if l == "planetary boundary layer": return "PlanetaryBoundaryLayer" | ||
| if "low cloud layer" in l: return "LowCloudLayer" | ||
| if "middle cloud layer" in l: return "MiddleCloudLayer" | ||
| if "high cloud layer" in l: return "HighCloudLayer" | ||
| if l == "0c isotherm": return "Isotherm0C" | ||
| if l == "highest tropospheric freezing level": return "HighestTroposphericFreezingLevel" | ||
|
|
||
| if "hybrid level" in l: | ||
| val = l.split(" ")[0] | ||
| return "LowestHybridLevel" if val == "1" else f"{val}HybridLevel" | ||
|
|
||
| if "m below ground" in l: | ||
| match = re.search(r'([0-9.]+)-?([0-9.]*)', l) | ||
| if match: | ||
| start, end = match.group(1), match.group(2) | ||
| return f"{start}To{end}Meter" if end else f"{start}Meter" | ||
|
|
||
| if "m above ground" in l: | ||
| val = l.split(" ")[0].replace("-", "To") | ||
| return f"{val}Meter" | ||
|
|
||
| if "mb" in l: | ||
| # Extracts values from "30-0 mb" -> "30To0Millibar" | ||
| # Prevents "GroundLevel" from being attached to Millibar layers later | ||
| val = l.split(" ")[0].replace("-", "To") | ||
| return f"{val}Millibar" | ||
|
|
||
| if "sigma" in l: | ||
| val = l.split(" ")[0].replace("-", "To") | ||
| suffix = "SigmaLayer" if "layer" in l else "SigmaLevel" | ||
| return f"{val}{suffix}" | ||
|
|
||
| if "pv=" in l: | ||
| return "PotentialVorticityNeg2PVU" if ("neg" in l or "-2" in l) else "PotentialVorticity2PVU" | ||
|
|
||
| return "".join(word.capitalize() for word in l.replace("-", " ").split() if word) | ||
|
|
||
| # 3. DCID Constructor Logic | ||
| def construct_dcid(param_raw, level_raw): | ||
| param = str(param_raw).upper() | ||
| level_clean = format_level_dcid(level_raw) | ||
|
|
||
| mapping = param_map.get(param) | ||
| base = mapping[0] if mapping else param | ||
|
|
||
| if param == 'RH' and not level_clean: | ||
| return "dcid:Humidity_RelativeHumidity" | ||
|
|
||
| if level_clean and level_clean in base: | ||
| dcid = f"dcid:{base}" | ||
| elif not level_clean: | ||
| dcid = f"dcid:{base}" | ||
| else: | ||
| dcid = f"dcid:{base}_{level_clean}" | ||
|
|
||
| if param in ['UGRD', 'VGRD', 'USTM', 'VSTM']: | ||
| suffix = "UComponent" if param in ['UGRD', 'USTM'] else "VComponent" | ||
| if param in ['UGRD', 'VGRD'] and level_clean == "10Meter": | ||
| return f"dcid:WindSpeed_{suffix}_Height10Meters" | ||
| return f"{dcid}_{suffix}" | ||
|
|
||
| if param == 'RH': return f"{dcid}_RelativeHumidity" | ||
| if param == 'SPFH': return f"{dcid}_SpecificHumidity" | ||
| if param == 'REFC': return f"dcid:{base}" | ||
|
|
||
| return dcid | ||
|
|
||
| def process_and_upload_true_stream(): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use doc string with brief description for all the functions. |
||
| client = storage.Client() | ||
| bucket = client.bucket(BUCKET_NAME) | ||
| blob = bucket.blob(OUTPUT_BLOB_NAME) | ||
| blob.chunk_size = 64 * 1024 * 1024 | ||
|
|
||
| with open(INPUT_LOCAL, mode='r') as f_in: | ||
| reader = csv.DictReader(f_in) | ||
| output_buffer = io.StringIO() | ||
| writer = csv.writer(output_buffer) | ||
| writer.writerow(['observationDate', 'value', 'variableMeasured', 'measurementMethod', 'latitude', 'longitude', 'placeName', 'unit']) | ||
|
|
||
| with blob.open("w", content_type='text/csv') as cloud_file: | ||
| cloud_file.write(output_buffer.getvalue()) | ||
| output_buffer.seek(0); output_buffer.truncate(0) | ||
|
|
||
| for i, row in enumerate(reader): | ||
| param = row['Parameter'] | ||
| level = row['Level'] | ||
| obs_date = row['Valid_Time'].replace(' ', 'T') | ||
| dcid = construct_dcid(param, level) | ||
|
|
||
| l_low = level.lower() | ||
|
|
||
| # Logic to determine measurementMethod | ||
| # If it is Millibar or Mean Sea Level, it must be empty | ||
| if "mb" in l_low or "mean sea level" in l_low: | ||
| method = "" | ||
| else: | ||
| method = "GroundLevel" if "ground" in l_low else "" | ||
|
|
||
| writer.writerow([ | ||
| obs_date, | ||
| row['Value'], | ||
| dcid, | ||
| method, | ||
| row['Latitude'], | ||
| row['Longitude'], | ||
| f"latLong/{row['Latitude']}_{row['Longitude']}", | ||
| param_map.get(param.upper(), ('', ''))[1] | ||
| ]) | ||
|
|
||
| if i % 1000 == 0: | ||
| cloud_file.write(output_buffer.getvalue()) | ||
| output_buffer.seek(0); output_buffer.truncate(0) | ||
|
|
||
| cloud_file.write(output_buffer.getvalue()) | ||
|
|
||
| if __name__ == "__main__": | ||
| start_time = time.perf_counter() | ||
| print(f"Process started: {time.strftime('%Y-%m-%d %H:%M:%S')}") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. use loggimg module instead of print |
||
| try: | ||
| process_and_upload_true_stream() | ||
| print("Upload complete.") | ||
| except Exception as e: | ||
| print(f"Error: {e}") | ||
| duration = time.perf_counter() - start_time | ||
| mins, secs = divmod(duration, 60) | ||
| print(f"Total Execution Time: {int(mins)}m {secs:.2f}s") | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| parameter,value | ||
| dc_api_root,https://api.datacommons.org/ | ||
| output_columns,"observationDate,value,variableMeasured,measurementMethod,latitude,longitude,placeName,unit" | ||
| observation_date_format,%Y-%m-%dT%H:%M:%S | ||
| #sourceUrl,https://nomads.ncep.noaa.gov/pub/data/nccf/com/gfs/prod/gfs.20251224/00/atmos/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we rename the file to a more meaningful name.