From 019c6f70cbee96713a0143ab64411a3c610a1c50 Mon Sep 17 00:00:00 2001 From: Benjamin Root Date: Thu, 21 Oct 2021 17:09:55 -0400 Subject: [PATCH] Add support for computing SPEI from the spi console script. --- src/climate_indices/__spi__.py | 418 ++++++++++++++++++++++++--------- 1 file changed, 307 insertions(+), 111 deletions(-) diff --git a/src/climate_indices/__spi__.py b/src/climate_indices/__spi__.py index 25948a22..e35cb51d 100644 --- a/src/climate_indices/__spi__.py +++ b/src/climate_indices/__spi__.py @@ -190,6 +190,40 @@ def validate_dimensions( "precipitation", ) + if args.netcdf_pet is not None: + # make sure a precipitation variable name was specified + if args.var_name_pet is None: + msg = "A PET file was given, but not a PET variable name" + _logger.error(msg) + raise ValueError(msg) + + # validate the precipitation file itself + with xr.open_dataset(args.netcdf_pet) as dataset_pet: + + # make sure we have a valid precipitation variable name + if args.var_name_pet not in dataset_pet.variables: + msg = f"Invalid PET variable name: '{args.var_name_pet}'" + \ + f"does not exist in the PET file '{args.netcdf_pet}'" + _logger.error(msg) + raise ValueError(msg) + + # verify that the PET variable's dimensions are in + # the expected order, and if so then determine the input type + input_type_pet = \ + validate_dimensions( + dataset_pet, + args.var_name_pet, + "PET", + ) + + if input_type != input_type_pet: + msg = f"The PET input type, '{input_type_pet}', deduced from" \ + f" '{args.netcdf_pet}:{args.var_name_pet}' does not" \ + f" match the precipitation input type, '{input_type}'," \ + f" deduced from '{args.netcdf_precip}:{args.var_name_precip}'." + _logger.error(msg) + raise ValueError(msg) + if args.scales is None: msg = "Missing one or more time scales (missing --scales argument)" _logger.error(msg) @@ -230,10 +264,12 @@ def _get_variable_attributes( distribution: indices.Distribution, scale: int, periodicity: compute.Periodicity, + evapo: bool = False ): attrs = { - "long_name": "Standardized Precipitation Index (" + "long_name": "Standardized Precipitation" + f"{'-Evapotranspiration' if evapo else ''} Index (" f"{distribution.value.capitalize()}), " f"{scale}-{periodicity.unit()}", "valid_min": -3.09, @@ -430,6 +466,7 @@ def _drop_data_into_shared_arrays_divisions( def build_dataset_fitting_grid( ds_example: xr.Dataset, periodicity: compute.Periodicity, + evapo: bool = False, ) -> xr.Dataset: """ Builds a new Dataset object based on an example Dataset. Essentially copies @@ -438,6 +475,8 @@ def build_dataset_fitting_grid( :param ds_example: :param periodicity: + :param evapo: + Indicate whether this is for SPI (False, default) or SPEI (True). :return: """ @@ -448,13 +487,13 @@ def build_dataset_fitting_grid( else: raise ValueError(f"Unsupported periodicity: {periodicity}") - usage_url = "https://climate-indices.readthedocs.io/en/latest/#spi-monthly" + usage_url = f"https://climate-indices.readthedocs.io/en/latest/#{'spei' if evapo else 'spi'}-monthly" global_attrs = { 'description': f"Distribution fitting parameters for various {periodicity.unit()} " f"scales computed from {periodicity} precipitation input " "by the climate_indices package available from " f"{_GITHUB_URL}. The variables contained herein are meant " - "to be used as inputs for computing SPI datasets using " + f"to be used as inputs for computing {'SPEI' if evapo else 'SPI'} datasets using " f"the climate_indices package. See {usage_url} for " "example usage.", 'geospatial_lat_min': float(np.amin(ds_example.lat)), @@ -483,6 +522,7 @@ def build_dataset_fitting_grid( def build_dataset_fitting_divisions( ds_example: xr.Dataset, periodicity: compute.Periodicity, + evapo: bool = False, ) -> xr.Dataset: """ Builds a new Dataset object based on an example Dataset. Essentially copies @@ -491,6 +531,8 @@ def build_dataset_fitting_divisions( :param ds_example: :param periodicity: + :param evapo: + Indicate whether this is for SPI (False, default) or SPEI (True). :return: """ @@ -501,13 +543,13 @@ def build_dataset_fitting_divisions( else: raise ValueError(f"Unsupported periodicity: {periodicity}") - usage_url = "https://climate-indices.readthedocs.io/en/latest/#spi-monthly" + usage_url = f"https://climate-indices.readthedocs.io/en/latest/#{'spei' if evapo else 'spi'}-monthly" global_attrs = { 'description': f"Distribution fitting parameters for various {periodicity.unit()} " f"scales computed from {periodicity} precipitation input " "by the climate_indices package available from " f"{_GITHUB_URL}. The variables contained herein are meant " - "to be used as inputs for computing SPI datasets using " + f"to be used as inputs for computing {'SPEI' if evapo else 'SPI'} datasets using " f"the climate_indices package. See {usage_url} for " "example usage.", } @@ -533,8 +575,9 @@ def _compute_write_index(keyword_arguments): :param keyword_arguments: :return: """ + evapo = keyword_arguments.get('netcdf_pet', None) is not None - _logger.info(f"Computing {keyword_arguments['periodicity']} SPI") + _logger.info(f"Computing {keyword_arguments['periodicity']} {'SPEI' if evapo else 'SPI'}") # open the NetCDF files as an xarray DataSet object if "netcdf_precip" not in keyword_arguments: @@ -553,10 +596,33 @@ def _compute_write_index(keyword_arguments): raise ValueError(f"Invalid 'input_type' keyword argument: {input_type}") ds_precip = xr.open_dataset(keyword_arguments["netcdf_precip"], chunks=chunks) + var_name_pet = keyword_arguments.get("var_name_pet", None) + if evapo: + if not var_name_pet: + raise ValueError("No PET variable name was specified with a PET input file") + da_pet = xr.open_dataset(keyword_arguments["netcdf_pet"], chunks=chunks)[var_name_pet] + # Make sure there are no variable name conflicts + if var_name_pet in ds_precip: + cnt = 0 + while var_name_pet in ds_precip and cnt <= 99999: + cnt += 1 + var_name_pet = f"{da_pet.name}_{cnt}" + if cnt >= 99999: + _logger.error(f"This shouldn't happen. {da_pet.name=} {sorted(ds_precip.keys())=}") + raise ValueError("Can't find a non-conflicting variable name for PET in ds_precip." + " This should *never* happen in a sane and rational universe." + " Contact the developer with this error.") + ds_precip[var_name_pet] = da_pet + # trim out all data variables from the dataset except the ones we'll need input_var_names = [] + input_data_vars = [] if "var_name_precip" in keyword_arguments: input_var_names.append(keyword_arguments["var_name_precip"]) + input_data_vars.append(keyword_arguments["var_name_precip"]) + if var_name_pet is not None: + input_var_names.append(var_name_pet) + input_data_vars.append(var_name_pet) # only keep the latitude variable if we're dealing with divisions if input_type == InputType.divisions: input_var_names.append("lat") @@ -569,7 +635,8 @@ def _compute_write_index(keyword_arguments): if keyword_arguments["load_params"] is not None: ds_fitting = xr.open_dataset(keyword_arguments["load_params"]) else: - ds_fitting = build_dataset_fitting_grid(ds_precip, keyword_arguments['periodicity']) + ds_fitting = build_dataset_fitting_grid( + ds_precip, keyword_arguments['periodicity'], evapo) # build DataArrays for the parameter fittings we'll compute # (only if not already in the Dataset when loading from existing file) @@ -627,20 +694,19 @@ def _compute_write_index(keyword_arguments): # the shape of output variables is assumed to match that # of the input, so use the precipitation variable's shape - if "var_name_precip" in keyword_arguments: + if "var_name_precip" not in keyword_arguments: + raise ValueError("No precipitation variable name was specified.") - # convert precipitation data into millimeters - precip_var_name = keyword_arguments["var_name_precip"] - precip_unit = ds_precip[precip_var_name].units.lower() - if precip_unit not in ("mm", "millimeters", "millimeter", "mm/dy"): - if precip_unit in ("inches", "inch"): + for var_name in input_data_vars: + # convert precipitation and PET data into millimeters + unit = ds_precip[var_name].units.lower() + if unit not in ("mm", "millimeters", "millimeter", "mm/dy"): + if unit in ("inches", "inch"): # inches to mm conversion (1 inch == 25.4 mm) - ds_precip[precip_var_name].values *= 25.4 + # FIXME: Why .values? + ds_precip[var_name].values *= 25.4 else: - raise ValueError(f"Unsupported precipitation units: {precip_unit}") - - else: - raise ValueError("No precipitation variable name was specified.") + raise ValueError(f"Unsupported units for {var_name}: {unit}") if input_type == InputType.divisions: _drop_data_into_shared_arrays_divisions(ds_precip, input_var_names) @@ -674,11 +740,11 @@ def _compute_write_index(keyword_arguments): _logger.info( f"Computing {scale}-{keyword_arguments['periodicity'].unit()} " - f"SPI ({distribution.value.capitalize()})", + f"{'SPEI' if evapo else 'SPI'} ({distribution.value.capitalize()})", ) # TODO we may want to initialize the shared memory array - # for SPI with NaNs so it starts off empty at each iteration + # for SPI/SPEI with NaNs so it starts off empty at each iteration args = { "data_start_year": data_start_year, @@ -687,6 +753,7 @@ def _compute_write_index(keyword_arguments): "calibration_year_initial": keyword_arguments["calibration_start_year"], "calibration_year_final": keyword_arguments["calibration_end_year"], "periodicity": keyword_arguments["periodicity"], + "index": 'spei' if evapo else 'spi', } # compute the distribution fitting parameters if necessary (i.e. not loaded) @@ -694,44 +761,49 @@ def _compute_write_index(keyword_arguments): _parallel_fitting( distribution, _global_shared_arrays, - {"var_name_values": keyword_arguments["var_name_precip"]}, + {"var_name_values": keyword_arguments["var_name_precip"], + "var_name_values2": var_name_pet}, scale_fitting_var_names[str(scale)], input_type=input_type, number_of_workers=keyword_arguments["number_of_workers"], args=args, ) else: - # TODO load the fitting parameter arrays into shared memory + # Fitting parameters have already been loaded + # into shared arrays earlier pass + fitting_var_names = copy.deepcopy(scale_fitting_var_names[str(scale)]) + fitting_var_names["var_name_precip"] = keyword_arguments["var_name_precip"] + fitting_var_names["var_name_pet"] = var_name_pet # compute SPI in parallel - var_names_spi = copy.deepcopy(scale_fitting_var_names[str(scale)]) - var_names_spi["var_name_precip"] = keyword_arguments["var_name_precip"] _parallel_spi( _global_shared_arrays, - var_names_spi, + fitting_var_names, _KEY_RESULT, input_type=input_type, number_of_workers=keyword_arguments["number_of_workers"], args=args, ) - # build a Dataset for the SPI values, copying the just + # build a Dataset for the SPI/SPEI values, copying the just # computed values into it from the shared memory array - spi_var_name = f"spi_{distribution.value}_{scale}_{keyword_arguments['periodicity'].unit()}" - ds_spi = build_dataset_spi_grid( + prod_var_name = f"{'spei' if evapo else 'spi'}_{distribution.value}_{scale}_{keyword_arguments['periodicity'].unit()}" + # FIXME: What about divisions? + ds_prod = build_dataset_spi_grid( ds_precip, scale, keyword_arguments["periodicity"], distribution, data_start_year, - spi_var_name, + prod_var_name, + evapo=evapo, ) - # write the SPI dataset to NetCDF file + # write the SPI/SPEI dataset to NetCDF file netcdf_file_name = \ - keyword_arguments["output_file_base"] + "_" + spi_var_name + ".nc" - ds_spi.to_netcdf(netcdf_file_name) + keyword_arguments["output_file_base"] + "_" + prod_var_name + ".nc" + ds_prod.to_netcdf(netcdf_file_name) # dump the fitting variable arrays from shared-memory # into the DataArrays of the fitting Dataset @@ -758,10 +830,14 @@ def build_dataset_spi_grid( distribution: indices.Distribution, data_start_year: int, spi_var_name: str, + evapo: bool = False, ) -> xr.Dataset: - + """ + Not just for SPI! Set `evapo` to True and "spi" will mean "spei". + """ + var_name = spi_var_name global_attrs = { - 'description': f"SPI for {scale}-{periodicity.unit()} scale computed " + 'description': f"{'SPEI' if evapo else 'SPI'} for {scale}-{periodicity.unit()} scale computed " f"from {periodicity} precipitation input " "by the climate_indices package available from " f"{_GITHUB_URL}.", @@ -777,7 +853,7 @@ def build_dataset_spi_grid( "lon": ds_example.lon, "time": ds_example.time, } - ds_spi = xr.Dataset( + ds = xr.Dataset( coords=coords, attrs=global_attrs, ) @@ -797,17 +873,17 @@ def build_dataset_spi_grid( ) # create a new variable to contain the SPI values, assign into the dataset - var_attrs = _get_variable_attributes(distribution, scale, periodicity) - da_spi = xr.DataArray( + var_attrs = _get_variable_attributes(distribution, scale, periodicity, evapo) + da = xr.DataArray( data=index_values, coords=ds_example.coords, dims=("lat", "lon", "time"), - name=spi_var_name, + name=var_name, attrs=var_attrs, ) - ds_spi[spi_var_name] = da_spi + ds[var_name] = da - return ds_spi + return ds # ------------------------------------------------------------------------------ @@ -818,10 +894,14 @@ def build_dataset_spi_divisions( distribution: indices.Distribution, data_start_year: int, spi_var_name: str, + evapo: bool = False, ) -> xr.Dataset: - + """ + Not just for SPI! Set `evapo` to True and "spi" will mean "spei". + """ + var_name = spi_var_name global_attrs = { - 'description': f"SPI for {scale}-{periodicity.unit()} scale computed " + 'description': f"{'SPEI' if evapo else 'SPI'} for {scale}-{periodicity.unit()} scale computed " f"from {periodicity} precipitation input " "by the climate_indices package available from " f"{_GITHUB_URL}.", @@ -830,7 +910,7 @@ def build_dataset_spi_divisions( "division": ds_example.division, "time": ds_example.time, } - ds_spi = xr.Dataset( + ds = xr.Dataset( coords=coords, attrs=global_attrs, ) @@ -850,17 +930,17 @@ def build_dataset_spi_divisions( ) # create a new variable to contain the SPI values, assign into the dataset - var_attrs = _get_variable_attributes(distribution, scale, periodicity) - da_spi = xr.DataArray( + var_attrs = _get_variable_attributes(distribution, scale, periodicity, evapo) + da = xr.DataArray( data=index_values, coords=ds_example.coords, dims=("division", "time"), - name=spi_var_name, + name=var_name, attrs=var_attrs, ) - ds_spi[spi_var_name] = da_spi + ds[var_name] = da - return ds_spi + return ds # ------------------------------------------------------------------------------ @@ -907,6 +987,7 @@ def _parallel_spi( for i in range(number_of_workers): params = { "input_var_name": input_var_names["var_name_precip"], + "input2_var_name": input_var_names["var_name_pet"], "output_var_name": output_var_name, "sub_array_start": split_indices[i], "input_type": input_type, @@ -985,6 +1066,7 @@ def _parallel_fitting( for i in range(number_of_workers): params = { "input_var_name": input_var_names["var_name_values"], + "input2_var_name": input_var_names["var_name_values2"], "output_var_names": output_var_names, "sub_array_start": split_indices[i], "input_type": input_type, @@ -1022,6 +1104,7 @@ def _apply_to_subarray_spi(params): handles functions that take a single argument, and (2) this function can generally be imported from a module, as required by map(). + FIXME :param dict params: dictionary of parameters including a function name, "func1d", start and stop indices for specifying the subarray to which the function should be applied, "sub_array_start" and "sub_array_end", @@ -1036,6 +1119,16 @@ def _apply_to_subarray_spi(params): values_sub_array = np_array[start_index:end_index] args = params["args"] + if args["index"] == "spei": + shape = _global_shared_arrays[params["input2_var_name"]][_KEY_SHAPE] + array = _global_shared_arrays[params["input2_var_name"]][_KEY_ARRAY] + np_array = np.frombuffer(array.get_obj()).reshape(shape) + values2_sub_array = np_array[start_index:end_index] + elif args["index"] == "spi": + values2_sub_array = None + else: + raise TypeError(f"Unknown index: {args['index']}. Must be one of 'spi' or 'spei'") + if args["distribution"] == indices.Distribution.gamma: array_alpha = _global_shared_arrays[params["var_name_alpha"]][_KEY_ARRAY] @@ -1048,11 +1141,6 @@ def _apply_to_subarray_spi(params): np_array_beta = np.frombuffer(array_beta.get_obj()).reshape(shape_beta) sub_array_beta = np_array_beta[start_index:end_index] - fitting_params = { - "alpha": sub_array_alpha, - "beta": sub_array_beta, - } - elif args["distribution"] == indices.Distribution.pearson: array_prob_zero = _global_shared_arrays[params["var_name_prob_zero"]][_KEY_ARRAY] @@ -1074,19 +1162,9 @@ def _apply_to_subarray_spi(params): shape_skew = _global_shared_arrays[params["var_name_skew"]][_KEY_SHAPE] np_array_skew = np.frombuffer(array_skew.get_obj()).reshape(shape_skew) sub_array_skew = np_array_skew[start_index:end_index] - - fitting_params = { - "prob_zero": sub_array_prob_zero, - "loc": sub_array_loc, - "scale": sub_array_scale, - "skew": sub_array_skew, - } - else: raise ValueError(f"Unsupported distribution: {args['distribution']}") - args["fitting_params"] = fitting_params - # get the output shared memory array, convert to numpy, and get the subarray slice output_array = _global_shared_arrays[params["output_var_name"]][_KEY_ARRAY] computed_array = np.frombuffer(output_array.get_obj()).reshape(shape)[ @@ -1113,17 +1191,32 @@ def _apply_to_subarray_spi(params): "skew": sub_array_skew[i, j], } - computed_array[i, j] = \ - indices.spi( - values[j], - scale=args["scale"], - distribution=args["distribution"], - data_start_year=args["data_start_year"], - calibration_year_initial=args["calibration_year_initial"], - calibration_year_final=args["calibration_year_final"], - periodicity=args["periodicity"], - fitting_params=fitting_params, - ) + if args["index"] == "spi": + computed_array[i, j] = \ + indices.spi( + values[j], + scale=args["scale"], + distribution=args["distribution"], + data_start_year=args["data_start_year"], + calibration_year_initial=args["calibration_year_initial"], + calibration_year_final=args["calibration_year_final"], + periodicity=args["periodicity"], + fitting_params=fitting_params, + ) + elif args["index"] == "spei": + computed_array[i, j] = \ + indices.spei( + values[j], + values2_sub_array[i, j], + scale=args["scale"], + distribution=args["distribution"], + data_start_year=args["data_start_year"], + calibration_year_initial=args["calibration_year_initial"], + calibration_year_final=args["calibration_year_final"], + periodicity=args["periodicity"], + fitting_params=fitting_params, + ) + else: # divisions @@ -1143,17 +1236,31 @@ def _apply_to_subarray_spi(params): "skew": sub_array_skew[i], } - computed_array[i] = \ - indices.spi( - values_sub_array, - scale=args["scale"], - distribution=args["distribution"], - data_start_year=args["data_start_year"], - calibration_year_initial=args["calibration_year_initial"], - calibration_year_final=args["calibration_year_final"], - periodicity=args["periodicity"], - fitting_params=fitting_params, - ) + if args["index"] == "spi": + computed_array[i] = \ + indices.spi( + values_sub_array, + scale=args["scale"], + distribution=args["distribution"], + data_start_year=args["data_start_year"], + calibration_year_initial=args["calibration_year_initial"], + calibration_year_final=args["calibration_year_final"], + periodicity=args["periodicity"], + fitting_params=fitting_params, + ) + elif args["index"] == "spei": + computed_array[i] = \ + indices.spei( + values_sub_array, + values2_sub_array, + scale=args["scale"], + distribution=args["distribution"], + data_start_year=args["data_start_year"], + calibration_year_initial=args["calibration_year_initial"], + calibration_year_final=args["calibration_year_final"], + periodicity=args["periodicity"], + fitting_params=fitting_params, + ) # ------------------------------------------------------------------------------ @@ -1166,6 +1273,7 @@ def _apply_to_subarray_gamma(params): handles functions that take a single argument, and (2) this function can generally be imported from a module, as required by map(). + FIXME :param dict params: dictionary of parameters including a function name, "func1d", start and stop indices for specifying the subarray to which the function should be applied, "sub_array_start" and "sub_array_end", @@ -1179,6 +1287,7 @@ def _apply_to_subarray_gamma(params): values_array_key = params["input_var_name"] alpha_array_key = params["output_var_names"]["alpha"] beta_array_key = params["output_var_names"]["beta"] + values2_array_key = params["input2_var_name"] values_shape = _global_shared_arrays[values_array_key][_KEY_SHAPE] values_array = _global_shared_arrays[values_array_key][_KEY_ARRAY] @@ -1186,6 +1295,17 @@ def _apply_to_subarray_gamma(params): sub_array_values = values_np_array[start_index:end_index] args = params["args"] + index_name = args["index"] + + if index_name == 'spei': + values_shape = _global_shared_arrays[values2_array_key][_KEY_SHAPE] + values_array = _global_shared_arrays[values2_array_key][_KEY_ARRAY] + values_np_array = np.frombuffer(values_array.get_obj()).reshape(values_shape) + sub_array_values2 = values_np_array[start_index:end_index] + elif index_name == 'spi': + sub_array_values2 = None + else: + raise TypeError(f"Unknown index: {index_name}. Must be one of 'spi' or 'spei'") # get the output shared memory arrays, convert to numpy, and get the subarray slices fitting_shape = _global_shared_arrays[alpha_array_key][_KEY_SHAPE] @@ -1202,7 +1322,20 @@ def _apply_to_subarray_gamma(params): for j in range(values.shape[0]): # scale the values - scaled_values = compute.scale_values(values[j], args["scale"], args["periodicity"]) + if index_name == "spi": + scaled_values = compute.scale_values(values[j], args["scale"], args["periodicity"]) + elif index_name == "spei": + if np.amin(values[j]) < 0.0: + _logger.warn("Input contains negative values -- all negatives clipped to zero") + values[j] = np.clip(values[j], a_min=0.0, a_max=None) + + # subtract the PET from precipitation, adding an offset + # to ensure that all values are positive + p_minus_pet = (values[j] - sub_array_values2[i, j]) + 1000.0 + + # FIXME: No reshaping based on periodicity like is done in + # compute.scale_values()? + scaled_values = compute.sum_to_scale(p_minus_pet, args["scale"]) sub_array_alpha[i, j], sub_array_beta[i, j] = \ compute.gamma_parameters( @@ -1213,9 +1346,20 @@ def _apply_to_subarray_gamma(params): periodicity=args["periodicity"], ) else: # divisions + if index_name == "spi": + scaled_values = compute.scale_values(values, args["scale"], args["periodicity"]) + elif index_name == "spei": + if np.amin(values[j]) < 0.0: + _logger.warn("Input contains negative values -- all negatives clipped to zero") + values = np.clip(values, a_min=0.0, a_max=None) + + # subtract the PET from precipitation, adding an offset + # to ensure that all values are positive + p_minus_pet = (values - sub_array_values2[i]) + 1000.0 - # scale the values - scaled_values = compute.scale_values(values, args["scale"], args["periodicity"]) + # FIXME: No reshaping based on periodicity like is done in + # compute.scale_values()? + scaled_values = compute.sum_to_scale(p_minus_pet, args["scale"]) sub_array_alpha[i], sub_array_beta[i] = \ compute.gamma_parameters( @@ -1237,6 +1381,7 @@ def _apply_to_subarray_pearson(params): handles functions that take a single argument, and (2) this function can generally be imported from a module, as required by map(). + FIXME :param dict params: dictionary of parameters including a function name, "func1d", start and stop indices for specifying the subarray to which the function should be applied, "sub_array_start" and "sub_array_end", @@ -1254,6 +1399,7 @@ def _apply_to_subarray_pearson(params): scale_array_key = params["output_var_names"]["scale"] skew_array_key = params["output_var_names"]["skew"] loc_array_key = params["output_var_names"]["loc"] + values2_array_key = params["input2_var_name"] values_shape = _global_shared_arrays[values_array_key][_KEY_SHAPE] values_array = _global_shared_arrays[values_array_key][_KEY_ARRAY] @@ -1261,6 +1407,17 @@ def _apply_to_subarray_pearson(params): sub_array_values = values_np_array[start_index:end_index] args = params["args"] + index_name = args["index"] + + if index_name == 'spei': + values_shape = _global_shared_arrays[values2_array_key][_KEY_SHAPE] + values_array = _global_shared_arrays[values2_array_key][_KEY_ARRAY] + values_np_array = np.frombuffer(values_array.get_obj()).reshape(values_shape) + sub_array_values2 = values_np_array[start_index:end_index] + elif index_name == 'spi': + sub_array_values2 = None + else: + raise TypeError(f"Unknown index: {index_name}. Must be one of 'spi' or 'spei'") # get the output shared memory arrays, convert to numpy, and get the subarray slices fitting_shape = _global_shared_arrays[prob_zero_array_key][_KEY_SHAPE] @@ -1283,14 +1440,21 @@ def _apply_to_subarray_pearson(params): for i, values in enumerate(sub_array_values): if params["input_type"] == InputType.grid: for j in range(values.shape[0]): - # scale the values - scaled_values = \ - compute.scale_values( - values[j], - args["scale"], - args["periodicity"], - ) + if index_name == "spi": + scaled_values = compute.scale_values(values[j], args["scale"], args["periodicity"]) + elif index_name == "spei": + if np.amin(values[j]) < 0.0: + _logger.warn("Input contains negative values -- all negatives clipped to zero") + values[j] = np.clip(values[j], a_min=0.0, a_max=None) + + # subtract the PET from precipitation, adding an offset + # to ensure that all values are positive + p_minus_pet = (values[j] - sub_array_values2[i, j]) + 1000.0 + + # FIXME: No reshaping based on periodicity like is done in + # compute.scale_values()? + scaled_values = compute.sum_to_scale(p_minus_pet, args["scale"]) sub_array_prob_zero[i, j], sub_array_loc[i, j], sub_array_scale[i, j], sub_array_skew[i, j] = \ compute.pearson_parameters( @@ -1302,14 +1466,20 @@ def _apply_to_subarray_pearson(params): ) else: # divisions + if index_name == 'spi': + scaled_values = compute.scale_values(values, args["scale"], args["periodicity"]) + elif index_name == 'spei': + if np.amin(values[j]) < 0.0: + _logger.warn("Input contains negative values -- all negatives clipped to zero") + values = np.clip(values, a_min=0.0, a_max=None) - # scale the values - scaled_values = \ - compute.scale_values( - values, - args["scale"], - args["periodicity"], - ) + # subtract the PET from precipitation, adding an offset + # to ensure that all values are positive + p_minus_pet = (values - sub_array_values2[i]) + 1000.0 + + # FIXME: No reshaping based on periodicity like is done in + # compute.scale_values()? + scaled_values = compute.sum_to_scale(p_minus_pet, args["scale"]) sub_array_prob_zero[i], sub_array_loc[i], sub_array_scale[i], sub_array_skew[i] = \ compute.pearson_parameters( @@ -1411,7 +1581,12 @@ def main(): # type: () -> None _logger.info("Start time: %s", start_datetime) # parse the command line arguments - parser = argparse.ArgumentParser() + parser = argparse.ArgumentParser( + description="Compute the Standard Precipitation(-Evapotranspiration)" + " Index and optionally save and/or load the fitting parameters" + " used for these calculations. SPI is computed by default." + " Supply the potential evapotranspiration (PET) input to" + " compute the SPEI instead.") parser.add_argument( "--periodicity", help="Process input as either monthly or daily values", @@ -1421,7 +1596,7 @@ def main(): # type: () -> None ) parser.add_argument( "--scales", - help="Timestep scales over which the SPI values are to be computed", + help="Timestep scales over which the SPI or SPEI values are to be computed", type=int, nargs="*", ) @@ -1445,6 +1620,17 @@ def main(): # type: () -> None type=str, help="Precipitation variable name used in the precipitation NetCDF file", ) + parser.add_argument( + "--netcdf_pet", + type=str, + help="PET NetCDF file to be used as input for SPEI computations." + " If given, the SPEI is computed instead of the SPI.", + ) + parser.add_argument( + "--var_name_pet", + type=str, + help="PET variable name used in the PET NetCDF file for SPEI computations", + ) parser.add_argument( "--output_file_base", type=str, @@ -1491,14 +1677,21 @@ def main(): # type: () -> None else: # default ("all_but_one") number_of_workers = multiprocessing.cpu_count() - 1 + evapo = arguments.netcdf_pet is not None + # prepare precipitation NetCDF in case dimensions not (lat, lon, time) or if any coordinates are descending netcdf_precip = _prepare_file(arguments.netcdf_precip, arguments.var_name_precip) + netcdf_pet = (_prepare_file(arguments.netcdf_pet, + arguments.var_name_pet) + if evapo else None) - # keyword arguments used for the SPI function + # keyword arguments used for the SPI/SPEI function kwrgs = { "netcdf_precip": netcdf_precip, "var_name_precip": arguments.var_name_precip, + "netcdf_pet": netcdf_pet, + "var_name_pet": arguments.var_name_pet, "input_type": input_type, "scales": arguments.scales, "periodicity": arguments.periodicity, @@ -1511,12 +1704,15 @@ def main(): # type: () -> None "overwrite": arguments.overwrite, } - # compute and write SPI - _compute_write_index(kwrgs) - - # remove temporary file if one was created - if netcdf_precip != arguments.netcdf_precip: - os.remove(netcdf_precip) + try: + # compute and write SPI/SPEI + _compute_write_index(kwrgs) + finally: + # remove temporary file if one was created + if netcdf_precip != arguments.netcdf_precip: + os.remove(netcdf_precip) + if netcdf_pet != arguments.netcdf_pet: + os.remove(netcdf_pet) # report on the elapsed time end_datetime = datetime.now()