Skip to content

[ML-52574] Add covariate support in predict_timeseries for prediction table usage #169

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Apr 29, 2025
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions runtime/databricks/automl_runtime/forecast/prophet/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,30 @@ def predict(self, context: mlflow.pyfunc.model.PythonModelContext, model_input:
test_df.rename(columns={self._time_col: "ds"}, inplace=True)
predict_df = self.model().predict(test_df)
return predict_df["yhat"]

def predict_with_full_df_returned(self, model_input: pd.DataFrame) -> pd.DataFrame:
"""
Predict API for prediction tables with covariates
:param model_input: Input dataframe
:return: A pd.DataFrame with the forecast components.
"""
self._validate_cols(model_input, [self._time_col])
test_df = model_input.copy()

if self._preprocess_func and self._split_col:
# Apply the same preprocessing pipeline to test_df. The preprocessing function requires the "y" column
# and the split column to be present, as they are used in the trial notebook. These columns are added
# temporarily and removed after preprocessing.
# see https://src.dev.databricks.com/databricks-eng/universe/-/blob/automl/python/databricks/automl/core/sections/templates/preprocess/finish_with_transform.jinja?L3
# and https://src.dev.databricks.com/databricks-eng/universe/-/blob/automl/python/databricks/automl/core/sections/templates/preprocess/select_columns.jinja?L8-10
test_df["y"] = None
test_df[self._split_col] = "prediction"
test_df = self._preprocess_func(test_df)
test_df.drop(columns=["y", self._split_col], inplace=True, errors="ignore")

test_df.rename(columns={self._time_col: "ds"}, inplace=True)
predict_df = self.model().predict(test_df)
return predict_df

def infer_signature(self, sample_input: pd.DataFrame = None) -> ModelSignature:
if sample_input is None:
Expand Down Expand Up @@ -353,6 +377,43 @@ def model_prediction(df):
predict_df = test_df.groupby(self._id_cols).apply(model_prediction).reset_index(drop=True)
return_df = test_df.merge(predict_df, how="left", on=["ds"] + self._id_cols)
return return_df["yhat"]

def predict_with_full_df_returned(self, model_input: pd.DataFrame) -> pd.DataFrame:
"""
Predict API for prediction tables with covariates
:param model_input: Input dataframe
:return: A pd.DataFrame with the forecast components.
"""
self._validate_cols(model_input, self._id_cols + [self._time_col])
test_df = model_input.copy()
test_df["ts_id"] = test_df[self._id_cols].apply(tuple, axis=1)

if self._preprocess_func and self._split_col:
# Apply the same preprocessing pipeline to test_df. The preprocessing function requires the "y" column
# and the split column to be present, as they are used in the trial notebook. These columns are added
# temporarily and removed after preprocessing.
# see https://src.dev.databricks.com/databricks-eng/universe/-/blob/automl/python/databricks/automl/core/sections/templates/preprocess/finish_with_transform.jinja?L3
# and https://src.dev.databricks.com/databricks-eng/universe/-/blob/automl/python/databricks/automl/core/sections/templates/preprocess/select_columns.jinja?L8-10
test_df["y"] = None
test_df[self._split_col] = ""
test_df = test_df.groupby(self._id_cols).apply(self._preprocess_func).reset_index(drop=True)
test_df.drop(columns=["y", self._split_col], inplace=True, errors="ignore")

test_df.rename(columns={self._time_col: "ds"}, inplace=True)

def model_prediction(df):
model = self.model(df["ts_id"].iloc[0])
if model:
predicts = model.predict(df)
# We have to explicitly assign the id columns to avoid KeyError when model_input
# only has one row. For multi-rows model_input, the ts_id will be kept as index
# after groupby(self._id_cols).apply(...) and we can retrieve it by reset_index, but
# for one-row model_input the id columns are missing from index.
predicts[self._id_cols] = df.name
return predicts
predict_df = test_df.groupby(self._id_cols).apply(model_prediction).reset_index(drop=True)
return_df = test_df.merge(predict_df, how="left", on=["ds"] + self._id_cols)
return return_df


def mlflow_prophet_log_model(prophet_model: Union[ProphetModel, MultiSeriesProphetModel],
Expand Down