diff --git a/runtime/databricks/automl_runtime/forecast/prophet/model.py b/runtime/databricks/automl_runtime/forecast/prophet/model.py index d884c69..67b3386 100644 --- a/runtime/databricks/automl_runtime/forecast/prophet/model.py +++ b/runtime/databricks/automl_runtime/forecast/prophet/model.py @@ -26,7 +26,7 @@ from databricks.automl_runtime.forecast import OFFSET_ALIAS_MAP, DATE_OFFSET_KEYWORD_MAP from databricks.automl_runtime.forecast.model import ForecastModel, mlflow_forecast_log_model from databricks.automl_runtime import version -from databricks.automl_runtime.forecast.utils import is_quaterly_alias, make_future_dataframe +from databricks.automl_runtime.forecast.utils import is_quaterly_alias, make_future_dataframe, apply_preprocess_func PROPHET_ADDITIONAL_PIP_DEPS = [ @@ -110,26 +110,36 @@ def make_future_dataframe(self, horizon: int = None, include_history: bool = Tru freq=pd.DateOffset(**offset_kwarg), include_history=include_history) - def _predict_impl(self, horizon: int = None, include_history: bool = True) -> pd.DataFrame: + def _predict_impl(self, future_df: pd.DataFrame) -> pd.DataFrame: """ Predict using the API from prophet model. - :param horizon: Int number of periods to forecast forward. - :param include_history: Boolean to include the historical dates in the data - frame for predictions. - :return: A pd.DataFrame with the forecast components. + :param future_df: future input dataframe. This dataframe should contain + the time series column and covariate columns if available. It is used as the + input for generating predictions. + :return: A pd.DataFrame that represents the model's output. The predicted target + column is named 'yhat'. """ - future_pd = self.make_future_dataframe(horizon=horizon or self._horizon, include_history=include_history) - return self.model().predict(future_pd) + return self.model().predict(future_df) - def predict_timeseries(self, horizon: int = None, include_history: bool = True) -> pd.DataFrame: + def predict_timeseries(self, horizon: int = None, include_history: bool = True, future_df: pd.DataFrame = None) -> pd.DataFrame: """ - Predict using the prophet model. + Predict using the prophet model. The input dataframe will be preprocessed if with covariates. :param horizon: Int number of periods to forecast forward. :param include_history: Boolean to include the historical dates in the data frame for predictions. - :return: A pd.DataFrame with the forecast components. + :param future_df: Optional future input dataframe. This dataframe should contain + the time series column and covariate columns if available. It is used as the + input for generating predictions. + :return: A pd.DataFrame that represents the model's output. The predicted target + column is named 'yhat'. """ - return self._predict_impl(horizon, include_history) + if future_df is None: + future_df = self.make_future_dataframe(horizon=horizon or self._horizon, include_history=include_history) + + if self._preprocess_func and self._split_col: + future_df = apply_preprocess_func(future_df, self._preprocess_func, self._split_col) + future_df.rename(columns={self._time_col: "ds"}, inplace=True) + return self._predict_impl(future_df) def predict(self, context: mlflow.pyfunc.model.PythonModelContext, model_input: pd.DataFrame) -> pd.Series: """ @@ -143,15 +153,7 @@ def predict(self, context: mlflow.pyfunc.model.PythonModelContext, model_input: test_df = model_input.copy() if self._preprocess_func and self._split_col: - # Apply the same preprocessing pipeline to test_df. The preprocessing function requires the "y" column - # and the split column to be present, as they are used in the trial notebook. These columns are added - # temporarily and removed after preprocessing. - # see https://src.dev.databricks.com/databricks-eng/universe/-/blob/automl/python/databricks/automl/core/sections/templates/preprocess/finish_with_transform.jinja?L3 - # and https://src.dev.databricks.com/databricks-eng/universe/-/blob/automl/python/databricks/automl/core/sections/templates/preprocess/select_columns.jinja?L8-10 - test_df["y"] = None - test_df[self._split_col] = "prediction" - test_df = self._preprocess_func(test_df) - test_df.drop(columns=["y", self._split_col], inplace=True, errors="ignore") + test_df = apply_preprocess_func(test_df, self._preprocess_func, self._split_col) test_df.rename(columns={self._time_col: "ds"}, inplace=True) predict_df = self.model().predict(test_df) @@ -260,28 +262,36 @@ def _predict_impl(self, df: pd.DataFrame, horizon: int = None, include_history: future_pd[self._id_cols] = df[self._id_cols].iloc[0] return future_pd - def predict_timeseries(self, horizon: int = None, include_history: bool = True) -> pd.DataFrame: + def predict_timeseries(self, horizon: int = None, include_history: bool = True, future_df: pd.DataFrame = None) -> pd.DataFrame: """ Predict using the prophet model. :param horizon: Int number of periods to forecast forward. :param include_history: Boolean to include the historical dates in the data frame for predictions. - :return: A pd.DataFrame with the forecast components. + :param future_df: Optional future input dataframe. This dataframe should contain + the time series column and covariate columns if available. It is used as the + input for generating predictions. + :return: A pd.DataFrame that represents the model's output. The predicted target + column is named 'yhat'. """ horizon=horizon or self._horizon - end_time = pd.Timestamp(self._timeseries_end) - future_df = make_future_dataframe( - start_time=self._timeseries_starts, - end_time=end_time, - horizon=horizon, - frequency_unit=self._frequency_unit, - frequency_quantity=self._frequency_quantity, - include_history=include_history, - groups=self._model_json.keys(), - identity_column_names=self._id_cols - ) + if future_df is None: + end_time = pd.Timestamp(self._timeseries_end) + future_df = make_future_dataframe( + start_time=self._timeseries_starts, + end_time=end_time, + horizon=horizon, + frequency_unit=self._frequency_unit, + frequency_quantity=self._frequency_quantity, + include_history=include_history, + groups=self._model_json.keys(), + identity_column_names=self._id_cols + ) future_df["ts_id"] = future_df[self._id_cols].apply(tuple, axis=1) - return future_df.groupby(self._id_cols).apply(lambda df: self._predict_impl(df, horizon, include_history)).reset_index() + if self._preprocess_func and self._split_col: + future_df = apply_preprocess_func(future_df, self._preprocess_func, self._split_col) + future_df.rename(columns={self._time_col: "ds"}, inplace=True) + return future_df.groupby(self._id_cols).apply(lambda df: self._predict_impl(df, horizon, include_history)).reset_index(drop=True) @staticmethod def get_reserved_cols() -> List[str]: @@ -354,7 +364,6 @@ def model_prediction(df): return_df = test_df.merge(predict_df, how="left", on=["ds"] + self._id_cols) return return_df["yhat"] - def mlflow_prophet_log_model(prophet_model: Union[ProphetModel, MultiSeriesProphetModel], sample_input: pd.DataFrame = None) -> None: """ diff --git a/runtime/databricks/automl_runtime/forecast/utils.py b/runtime/databricks/automl_runtime/forecast/utils.py index 195d35a..496f386 100644 --- a/runtime/databricks/automl_runtime/forecast/utils.py +++ b/runtime/databricks/automl_runtime/forecast/utils.py @@ -279,3 +279,21 @@ def calculate_period_differences( freq_alias = PERIOD_ALIAS_MAP[OFFSET_ALIAS_MAP[frequency_unit]] # It is intended to get the floor value. And in the later check we will use this floor value to find out if it is not consistent. return (end_time.to_period(freq_alias) - start_time.to_period(freq_alias)).n // frequency_quantity + +def apply_preprocess_func(df: pd.DataFrame, preprocess_func: callable, split_col: str) -> pd.DataFrame: + """ + Apply the preprocessing function to the dataframe. The preprocessing function requires the "y" column + and the split column to be present, as they are used in the trial notebook. These columns are added + temporarily and removed after preprocessing. + see https://src.dev.databricks.com/databricks-eng/universe/-/blob/automl/python/databricks/automl/core/sections/templates/preprocess/finish_with_transform.jinja?L3 + and https://src.dev.databricks.com/databricks-eng/universe/-/blob/automl/python/databricks/automl/core/sections/templates/preprocess/select_columns.jinja?L8-10 + :param df: pd.DataFrame to be preprocessed. + :param preprocess_func: preprocessing function to be applied to the dataframe. + :param split_col: name of the split column to be added to the dataframe. + :return: preprocessed pd.DataFrame. + """ + df["y"] = None + df[split_col] = "prediction" + df = preprocess_func(df) + df.drop(columns=["y", split_col], inplace=True, errors="ignore") + return df diff --git a/runtime/tests/automl_runtime/forecast/prophet/model_test.py b/runtime/tests/automl_runtime/forecast/prophet/model_test.py index 7a941ae..5ef5297 100644 --- a/runtime/tests/automl_runtime/forecast/prophet/model_test.py +++ b/runtime/tests/automl_runtime/forecast/prophet/model_test.py @@ -15,6 +15,7 @@ # import unittest +from unittest.mock import patch import datetime import pandas as pd @@ -103,6 +104,42 @@ def test_model_save_and_load(self): include_history=False ) self.assertEqual(len(forecast_future_pd), 1) + + @patch("databricks.automl_runtime.forecast.prophet.model.ProphetModel._predict_impl") + def test_predict_timeseries_with_preprocess_func(self, mock_predict_impl): + # Mock the output of _predict_impl + mock_predict_impl.side_effect = lambda df: df + + # Define a preprocess function + def preprocess_func(df): + df["feature"] = df["feature"] * 2 + return df + + # Create a ProphetModel instance with preprocess_func + prophet_model = ProphetModel( + model_json=PROPHET_MODEL_JSON, + horizon=3, + frequency_unit="d", + frequency_quantity=1, + time_col="time", + preprocess_func=preprocess_func, + split_col="split" + ) + + # Input DataFrame + input_df = pd.DataFrame({"time": ["2020-10-01", "2020-10-02", "2020-10-03"], "feature": [1, 2, 3]}) + + # Call predict_timeseries + result = prophet_model.predict_timeseries(future_df=input_df) + + # Assertions + mock_predict_impl.assert_called_once() + self.assertEqual(len(result), 3) + + # Check if the preprocess_func was applied + processed_df = mock_predict_impl.call_args[0][0] # Get the DataFrame passed to _predict_impl + self.assertTrue((processed_df["feature"] == [2, 4, 6]).all()) # Check if "y" was doubled + self.assertIn("ds", processed_df.columns) # Ensure "ds" column exists def test_make_future_dataframe(self): for feq_unit in OFFSET_ALIAS_MAP: @@ -451,3 +488,68 @@ def preprocess_func(df): ) yhat = prophet_model.predict(None, test_df) self.assertEqual(2, len(yhat)) + + @patch("databricks.automl_runtime.forecast.prophet.model.MultiSeriesProphetModel._predict_impl") + def test_predict_timeseries(self, mock_predict_impl): + # Mock the output of _predict_impl + mock_predict_impl.side_effect = lambda df, horizon, include_history: pd.DataFrame({ + "ds": df["ds"], + "feature": df["feature"], + "id": df["id"] + }) + + # Define a preprocess function + def preprocess_func(df): + df["feature"] = df["feature"] * 2 + return df + + # Create a MultiSeriesProphetModel instance + model_json = { + ("id1",): '{"model": "mock_model_1"}', + ("id2",): '{"model": "mock_model_2"}' + } + timeseries_starts = {("id1",): pd.Timestamp("2020-01-01"), ("id2",): pd.Timestamp("2020-01-01")} + timeseries_end = "2020-12-31" + prophet_model = MultiSeriesProphetModel( + model_json=model_json, + timeseries_starts=timeseries_starts, + timeseries_end=timeseries_end, + horizon=3, + frequency_unit="d", + frequency_quantity=1, + time_col="time", + id_cols=["id"], + preprocess_func=preprocess_func, + split_col="split" + ) + + # Input DataFrame + input_df = pd.DataFrame({ + "time": ["2020-10-01", "2020-10-02", "2020-10-03", "2020-10-01", "2020-10-02", "2020-10-03"], + "feature": [1, 2, 3, 4, 5, 6], + "id": ["id1", "id1", "id1", "id2", "id2", "id2"] + }) + + # Call predict_timeseries + result = prophet_model.predict_timeseries(future_df=input_df) + + # Assertions + mock_predict_impl.assert_called() + self.assertEqual(len(result), 6) + self.assertIn("feature", result.columns) + self.assertIn("ds", result.columns) + self.assertIn("id", result.columns) + + # Check the calls to _predict_impl + calls = mock_predict_impl.call_args_list + self.assertEqual(len(calls), 2) # Ensure _predict_impl is called twice (once per group) + + # Check the first call + first_call_df = calls[0][0][0] # Get the DataFrame passed in the first call + self.assertTrue((first_call_df["feature"] == [2, 4, 6]).all()) + self.assertTrue((first_call_df["id"] == ["id1", "id1", "id1"]).all()) + + # Check the second call + second_call_df = calls[1][0][0] # Get the DataFrame passed in the second call + self.assertTrue((second_call_df["feature"] == [8, 10, 12]).all()) + self.assertTrue((second_call_df["id"] == ["id2", "id2", "id2"]).all()) \ No newline at end of file