Source code for diviner.grouped_pmdarima

import inspect
import os
import warnings
from copy import deepcopy
from typing import Tuple, List, Dict
from pmdarima import ARIMA, AutoARIMA
from pmdarima.pipeline import Pipeline
from pmdarima.warnings import ModelFitWarning

import numpy as np
import pandas as pd
from statsmodels.tools.sm_exceptions import ConvergenceWarning
from diviner.model.base_model import GroupedForecaster
from diviner.serialize.pmdarima_serializer import (
    grouped_pmdarima_save,
    grouped_pmdarima_load,
)
from diviner.utils.common import (
    _validate_keys_in_df,
    _restructure_fit_payload,
    _restructure_predictions,
    create_reporting_df,
    _get_last_datetime_per_group,
    _get_datetime_freq_per_group,
)
from diviner.utils.pmdarima_utils import (
    _extract_arima_model,
    _get_arima_params,
    _get_arima_training_metrics,
    _generate_prediction_config,
    _generate_prediction_datetime_series,
    _generate_group_subset_prediction_config,
)
from diviner.data.pandas_group_generator import PandasGroupGenerator
from diviner.data.utils.dataframe_utils import apply_datetime_index_to_groups
from diviner.exceptions import DivinerException


[docs]class GroupedPmdarima(GroupedForecaster): def __init__( self, model_template, ): """ A class for constructing multiple ``pmdarima`` models from a single normalized input DataFrame. This implementation supports submission of a model template that is one of: ``pmdarima.arima.arima.ARIMA``, ``pmdarima.arima.auto.AutoARIMA``, or ``pmdarima.pipeline.Pipeline``. The constructor argument of ``model_template`` will apply the settings specified as part of instantiation of these classes to all groups within the input DataFrame. :param model_template: The type of model to build for each of the groups identified. Supported templates: * ``pmdarima.arima.arima.ARIMA`` - A wrapper around ``statsmodels.api.SARIMAX``. See: https://alkaline-ml.com/pmdarima/modules/generated/pmdarima.\ arima.ARIMA.html#pmdarima.arima.ARIMA * ``pmdarima.arima.auto.AutoARIMA`` - An auto-tunable order and seasonal order SARIMAX implementation. See: https://alkaline-ml.com/pmdarima/modules/generated/pmdarima.\ arima.AutoARIMA.html * ``pmdarima.pipeline.Pipeline`` - An sklearn-like pipeline orchestrator for building preprocessing and model components for ``pmdarima``. See: https://alkaline-ml.com/pmdarima/modules/generated/pmdarima.\ pipeline.Pipeline.html#pmdarima.pipeline.Pipeline For examples showing the usage of each of these template paradigms, see the examples section of this package. """ super().__init__() self._y_col = None self._datetime_col = None self._model_template = model_template self._exog_cols = None self._master_key = "grouping_key" self._predict_col = None self._max_datetime_per_group = None self._datetime_freq_per_group = None self._ndiffs = None self._nsdiffs = None def _extract_individual_model(self, group_key): self._fit_check() model_instance = self.model.get(group_key) if not model_instance: raise DivinerException(f"The model for group {group_key} was not trained.") return model_instance def _fit_individual_model(self, group_key, group_df, silence_warnings, **fit_kwargs): y = group_df[self._y_col] model = deepcopy(self._model_template) if self._exog_cols: exog = group_df[self._exog_cols] else: exog = None # Set 'd' term if pre-calculated with `PmdarimaAnalyzer.calculate_ndiffs` if self._ndiffs: d_term = self._ndiffs.get(group_key, None) if d_term: if isinstance(model, ARIMA): setattr(model, "order", (model.order[0], d_term, model.order[2])) elif isinstance(model, Pipeline): final_stage = model.steps[-1][1] if isinstance(final_stage, AutoARIMA): setattr(final_stage, "d", d_term) elif isinstance(final_stage, ARIMA): setattr( final_stage, "order", (final_stage.order[0], d_term, final_stage.order[2]), ) elif isinstance(model, AutoARIMA): setattr(model, "d", d_term) # Set 'D' term if pre-calculated with `PmdarimaAnalyzer.calculate_nsdiffs` if self._nsdiffs: sd_term = self._nsdiffs.get(group_key, None) if sd_term: if isinstance(model, ARIMA): setattr( model, "seasonal_order", ( model.seasonal_order[0], sd_term, model.seasonal_order[2], model.seasonal_order[3], ), ) elif isinstance(model, Pipeline): final_stage = model.steps[-1][1] if isinstance(final_stage, AutoARIMA): setattr(final_stage, "D", sd_term) elif isinstance(final_stage, ARIMA): setattr( final_stage, "seasonal_order", ( final_stage.seasonal_order[0], sd_term, final_stage.seasonal_order[2], final_stage.seasonal_order[3], ), ) elif isinstance(model, AutoARIMA): setattr(model, "D", sd_term) with warnings.catch_warnings(): # Suppress SARIMAX RuntimeWarning if silence_warnings: warnings.filterwarnings("ignore", category=RuntimeWarning) warnings.filterwarnings("ignore", category=ConvergenceWarning) warnings.filterwarnings("ignore", category=ModelFitWarning) warnings.filterwarnings("ignore", category=UserWarning) return {group_key: model.fit(y=y, X=exog, **fit_kwargs)}
[docs] def fit( self, df, group_key_columns, y_col: str, datetime_col: str, exog_cols: List[str] = None, ndiffs: Dict = None, nsdiffs: Dict = None, silence_warnings: bool = False, **fit_kwargs, ): """ Fit method for training a ``pmdarima`` model on the submitted normalized DataFrame. When initialized, the input DataFrame will be split into an iterable collection of grouped data sets based on the ``group_key_columns`` arguments, which is then used to fit individual ``pmdarima`` models (or a supplied ``Pipeline``) upon the templated object supplied as a class instance argument `model_template`. For API information for ``pmdarima``'s ``ARIMA``, ``AutoARIMA``, and ``Pipeline`` APIs, see: https://alkaline-ml.com/pmdarima/modules/classes.html#api-ref :param df: A normalized group data set consisting of a datetime column that defines ordering of the series, an endogenous regressor column that specifies the series data for training (e.g. ``y_col``), and column(s) that define the grouping of the series data. An example normalized data set: =========== ===== ======== ============ ====== region zone country ds y =========== ===== ======== ============ ====== 'northeast' 1 "US" "2021-10-01" 1234.5 'northeast' 2 "US" "2021-10-01" 3255.6 'northeast' 1 "US" "2021-10-02" 1255.9 =========== ===== ======== ============ ====== Wherein the grouping_key_columns could be one, some, or all of ``['region', 'zone', 'country']``, the datetime_col would be the `'ds'` column, and the series ``y_col`` (endogenous regressor) would be `'y'`. :param group_key_columns: The columns in the ``df`` argument that define, in aggregate, a unique time series entry. For example, with the DataFrame referenced in the ``df`` param, group_key_columns could be: ``('region', 'zone')`` or ``('region')`` or ``('country', 'region', 'zone')`` :param y_col: The name of the column within the DataFrame input to any method within this class that contains the endogenous regressor term (the raw data that will be used to train and use as a basis for forecasting). :param datetime_col: The name of the column within the DataFrame input that defines the datetime or date values associated with each row of the endogenous regressor (``y_col``) data. :param exog_cols: An optional collection of column names within the submitted data to class methods that contain exogenous regressor elements to use as part of model fitting and predicting. Default: ``None`` :param ndiffs: optional overrides to the ``d`` ``ARIMA`` differencing term for stationarity enforcement. The structure of this argument is a dictionary in the form of: ``{<group_key>: <d_term>}``. To calculate, use ``diviner.PmdarimaAnalyzer.calculate_ndiffs()`` Default: ``None`` :param nsdiffs: optional overrides to the ``D`` SARIMAX seasonal differencing term for seasonal stationarity enforcement. The structure of this argument is a dictionary in the form of: ``{<group_key>: <D_term>}``. To calculate, use :py:meth:``diviner.PmdarimaAnalyzer.calculate_nsdiffs`` Default: ``None`` :param silence_warnings: If ``True``, removes ``SARIMAX`` and underlying optimizer warning message from stdout printing. With a sufficiently large nubmer of groups to process, the volume of these messages to stdout may become very large. Default: ``False`` :param fit_kwargs: ``fit_kwargs`` for ``pmdarima``'s ``ARIMA``, ``AutoARIMA``, or ``Pipeline`` stage overrides. For more information, see the ``pmdarima`` docs: https://alkaline-ml.com/pmdarima/index.html :return: object instance of ``GroupedPmdarima`` with the persisted fit model attached. """ self._model_init_check() self._y_col = y_col self._datetime_col = datetime_col self._exog_cols = exog_cols self._group_key_columns = group_key_columns if ndiffs and isinstance(ndiffs, dict): self._ndiffs = ndiffs if nsdiffs and isinstance(nsdiffs, dict): self._nsdiffs = nsdiffs _validate_keys_in_df(df, self._group_key_columns) grouped_data = PandasGroupGenerator( self._group_key_columns, self._datetime_col, self._y_col ).generate_processing_groups(df) dt_indexed_group_data = apply_datetime_index_to_groups(grouped_data, self._datetime_col) self._max_datetime_per_group = _get_last_datetime_per_group(dt_indexed_group_data) self._datetime_freq_per_group = _get_datetime_freq_per_group(dt_indexed_group_data) fit_model = [ self._fit_individual_model(group_key, group_df, silence_warnings, **fit_kwargs) for group_key, group_df in dt_indexed_group_data ] self.model = _restructure_fit_payload(fit_model) return self
def _predict_single_group(self, row_entry, n_periods_col, exog, **predict_kwargs): group_key = row_entry[self._master_key] return_conf_int = row_entry.get("return_conf_int", False) alpha = row_entry.get("alpha", 0.05) periods = row_entry[n_periods_col] inverse_transform = row_entry.get("inverse_transform", True) model = self._extract_individual_model(group_key) if isinstance(self._model_template, Pipeline): prediction = model.predict( n_periods=periods, X=exog, return_conf_int=return_conf_int, alpha=alpha, inverse_transform=inverse_transform, **predict_kwargs, ) else: prediction = model.predict( n_periods=periods, X=exog, return_conf_int=return_conf_int, alpha=alpha, **predict_kwargs, ) if return_conf_int: prediction_raw = pd.DataFrame.from_records(prediction).T prediction_raw.columns = [self._predict_col, "_yhat_err"] prediction_df = pd.DataFrame( prediction_raw["_yhat_err"].to_list(), columns=["yhat_lower", "yhat_upper"], ) prediction_df.insert( loc=0, column=self._predict_col, value=prediction_raw[self._predict_col] ) else: prediction_df = pd.DataFrame.from_dict({self._predict_col: prediction}) prediction_df[self._master_key] = prediction_df.apply(lambda x: group_key, 1) prediction_df[self._datetime_col] = _generate_prediction_datetime_series( self._max_datetime_per_group.get(group_key), self._datetime_freq_per_group.get(group_key), periods, ) return prediction_df def _run_predictions(self, df, n_periods_col="n_periods", exog=None, **predict_kwargs): self._fit_check() processing_data = PandasGroupGenerator( self._group_key_columns, self._datetime_col, self._y_col )._get_df_with_master_key_column(df) prediction_collection = [ self._predict_single_group(row, n_periods_col, exog, **predict_kwargs) for idx, row in processing_data.iterrows() ] return _restructure_predictions( prediction_collection, self._group_key_columns, self._master_key )
[docs] def predict( self, n_periods: int, predict_col: str = "yhat", alpha: float = 0.05, return_conf_int: bool = False, inverse_transform: bool = True, exog=None, **predict_kwargs, ): """ Prediction method for generating forecasts for each group that has been trained as part of a call to ``fit()``. Note that ``pmdarima``'s API does not support predictions outside of the defined datetime frequency that was validated during training (i.e., if the series endogenous data is at an hourly frequency, the generated predictions will be at an hourly frequency and cannot be modified from within this method). :param n_periods: The number of future periods to generate. The start of the generated predictions will be 1 frequency period after the maximum datetime value per group during training. For example, a data set used for training that has a datetime frequency in days that ends on 7/10/2021 will, with a value of ``n_periods=7``, start its prediction on 7/11/2021 and generate daily predicted values up to and including 7/17/2021. :param predict_col: The name to be applied to the column containing predicted data. Default: ``'yhat'`` :param alpha: Optional value for setting the confidence intervals for error estimates. Note: this is only utilized if ``return_conf_int`` is set to ``True``. Default: ``0.05`` (representing a 95% CI) :param return_conf_int: Boolean flag for whether to calculate confidence interval error estimates for predicted values. The intervals of ``yhat_upper`` and ``yhat_lower`` are based on the ``alpha`` parameter. Default: ``False`` :param inverse_transform: Optional argument used only for ``Pipeline`` models that include either a ``BoxCoxEndogTransformer`` or a ``LogEndogTransformer``. Default: ``True`` :param exog: Exogenous regressor components as a 2-D array. Note: if the model is trained with exogenous regressor components, this argument is required. Default: ``None`` :param predict_kwargs: Extra ``kwarg`` arguments for any of the transform stages of a ``Pipeline`` or for additional ``predict`` ``kwargs`` to the model instance. ``Pipeline`` ``kwargs`` are specified in the manner of ``sklearn`` ``Pipeline`` format (i.e., ``<stage_name>__<arg name>=<value>``. e.g., to change the values of a fourier transformer at prediction time, the override would be: ``{'fourier__n_periods': 45})`` :return: A consolidated (unioned) single DataFrame of predictions per group. """ self._fit_check() self._predict_col = predict_col prediction_config = _generate_prediction_config( self, n_periods, alpha, return_conf_int, inverse_transform, ) return self._run_predictions(prediction_config, exog=exog, **predict_kwargs)
[docs] def predict_groups( self, groups: List[Tuple[str]], n_periods: int, predict_col: str = "yhat", alpha: float = 0.05, return_conf_int: bool = False, inverse_transform: bool = False, exog=None, on_error: str = "raise", **predict_kwargs, ): """ This is a prediction method that allows for generating a subset of forecasts based on the collection of keys. By specifying individual groups in the ``groups`` argument, a limited scope forecast can be performed without incurring the runtime costs associated with predicting all groups. :param groups: ``List[Tuple[str]]`` the collection of group (s) to generate forecast predictions. The group definitions must be the values within the ``group_key_columns`` that were used during the ``fit`` of the model in order to return valid forecasts. .. Note:: The positional ordering of the values are important and must match the order of ``group_key_columns`` for the ``fit`` argument to provide correct prediction forecasts. :param n_periods: The number of row events to forecast :param predict_col: The name of the column in the output ``DataFrame`` that contains the forecasted series data. Default: ``"yhat"`` :param alpha: Optional value for setting the confidence intervals for error estimates. Note: this is only utilized if ``return_conf_int`` is set to ``True``. Default: ``0.05`` (representing a 95% CI) :param return_conf_int: Boolean flag for whether to calculate confidence interval error estimates for predicted values. The intervals of ``yhat_upper`` and ``yhat_lower`` are based on the ``alpha`` parameter. Default: ``False`` :param inverse_transform: Optional argument used only for ``Pipeline`` models that include either a ``BoxCoxEndogTransformer`` or a ``LogEndogTransformer``. Default: ``False`` :param exog: Exogenous regressor components as a 2-D array. Note: if the model is trained with exogenous regressor components, this argument is required. Default: ``None`` :param predict_kwargs: Extra ``kwarg`` arguments for any of the transform stages of a ``Pipeline`` or for additional ``predict`` ``kwargs`` to the model instance. ``Pipeline`` ``kwargs`` are specified in the manner of ``sklearn`` ``Pipeline`` format (i.e., ``<stage_name>__<arg name>=<value>``. e.g., to change the values of a fourier transformer at prediction time, the override would be: ``{'fourier__n_periods': 45})`` :param on_error: Alert level setting for handling mismatched group keys. Default: ``"raise"`` The valid modes are: * "ignore" - no logging or exception raising will occur if a submitted group key in the ``groups`` argument is not present in the model object. .. Note:: This is a silent failure mode and will not present any indication of a failure to generate forecast predictions. * "warn" - any keys that are not present in the fit model will be recorded as logged warnings. * "raise" - any keys that are not present in the fit model will cause a ``DivinerException`` to be raised. :return: A consolidated (unioned) single DataFrame of forecasts for all groups specified in the ``groups`` argument. """ self._fit_check() self._predict_col = predict_col prediction_config = _generate_group_subset_prediction_config( self, groups, n_periods, alpha, return_conf_int, inverse_transform, on_error ) return self._run_predictions(prediction_config, exog=exog, **predict_kwargs)
[docs] def get_metrics(self): """ Retrieve the ``ARIMA`` fit metrics that are generated during the ``AutoARIMA`` or ``ARIMA`` training event. Note: These metrics are not validation metrics. Use the ``cross_validate()`` method for retrieving back-testing error metrics. :return: ``Pandas`` ``DataFrame`` with metrics provided as columns and a row entry per group. """ self._fit_check() metric_extract = {} for group in self.model.keys(): arima_model = _extract_arima_model(self._extract_individual_model(group)) metric_extract[group] = _get_arima_training_metrics(arima_model) return create_reporting_df(metric_extract, self._master_key, self._group_key_columns)
[docs] def get_model_params(self): """ Retrieve the parameters from the ``fit`` ``model_template`` that was passed in and return them in a denormalized ``Pandas`` ``DataFrame``. Parameters in the return ``DataFrame`` are columns with a row for each group defined during ``fit()``. :return: ``Pandas`` ``DataFrame`` with ``fit`` parameters for each group. """ self._fit_check() params_extract = {} for group in self.model.keys(): arima_model = _extract_arima_model(self._extract_individual_model(group)) params_extract[group] = _get_arima_params(arima_model) return create_reporting_df(params_extract, self._master_key, self._group_key_columns)
[docs] def cross_validate(self, df, metrics, cross_validator, error_score=np.nan, verbosity=0): """ Method for performing cross validation on each group of the fit model. The supplied cross_validator to this method will be used to perform either rolling or shifting window prediction validation throughout the data set. Windowing behavior for the cross validation must be defined and configured through the cross_validator that is submitted. See: https://alkaline-ml.com/pmdarima/modules/classes.html#cross-validation-split-utilities for details on the underlying implementation of cross validation with ``pmdarima``. :param df: A ``DataFrame`` that contains the endogenous series and the grouping key columns that were defined during training. Any missing key entries will not be scored. Note that each group defined within the model will be retrieved from this ``DataFrame``. keys that do not exist will raise an Exception. :param metrics: A list of metric names or string of single metric name to use for cross validation metric calculation. :param cross_validator: A cross validator instance from ``pmdarima.model_selection`` (``RollingForecastCV`` or ``SlidingWindowForecastCV``). Note: setting low values of ``h`` or ``step`` will dramatically increase execution time). :param error_score: Default value to assign to a score calculation if an error occurs in a given window iteration. Default: ``np.nan`` (a silent ignore of the failure) :param verbosity: print verbosity level for ``pmdarima``'s cross validation stages. Default: ``0`` (no printing to stdout) :return: ``Pandas DataFrame`` containing the group information and calculated cross validation metrics for each group. """ from diviner.scoring.pmdarima_cross_validate import ( _cross_validate_grouped_pmdarima, ) self._fit_check() group_data = PandasGroupGenerator( self._group_key_columns, self._datetime_col, self._y_col ).generate_processing_groups(df) dt_group_data = apply_datetime_index_to_groups(group_data, self._datetime_col) cv_results = _cross_validate_grouped_pmdarima( self.model, dt_group_data, self._y_col, metrics, cross_validator, error_score, self._exog_cols, verbosity, ) return create_reporting_df(cv_results, self._master_key, self._group_key_columns)
[docs] def save(self, path: str): """ Serialize and write the instance of this class (if it has been fit) to the path specified. Note: The serialized model is base64 encoded for top-level items and ``pickle``'d for ``pmdarima`` individual group models and any ``Pandas`` ``DataFrame``. :param path: Path to write this model's instance to. :return: None """ self._fit_check() directory = os.path.dirname(path) os.makedirs(directory, exist_ok=True) grouped_pmdarima_save(self, path)
[docs] @classmethod def load(cls, path: str): """ Load a ``GroupedPmdarima`` instance from a saved serialized version. Note: This is a class instance and as such, a ``GroupedPmdarima`` instance does not need to be initialized in order to load a saved model. For example: ``loaded_model = GroupedPmdarima.load(<location>)`` :param path: The path to a serialized instance of ``GroupedPmdarima`` :return: The ``GroupedPmdarima`` instance that was saved. """ attr_dict = grouped_pmdarima_load(path) init_args = inspect.signature(cls.__init__).parameters.keys() cleaned_attr_dict = {key.lstrip("_"): value for key, value in attr_dict.items()} init_cls = [cleaned_attr_dict[arg] for arg in init_args if arg != "self"] instance = cls(*init_cls) for key, value in attr_dict.items(): if key not in init_args: setattr(instance, key, value) return instance