Module panama.ml.auto.hyperoptimizer

Classes

class MachineLearningHyperOptimizer (opt_matrix: Dict[str, Dict[str, Union[SearchSpace, ForwardRef(None), BaseTunableModel]]])

A class for hyperparameter optimization.

Initializes the HyperOptimizer with a dict containing for each model, the model and the corresponding search space.

Args

opt_matrix
A dictionary containing the models and their search spaces to be tuned. The keys of the dictionary are the names of the models and the values are dictionaries containing the following keys: - 'model': A BaseTunableModel instance to be tuned. - 'space': A SearchSpace instance specifying the search space for the given model. If not provided, a default search space will be used.

Returns

None.

Expand source code
class MachineLearningHyperOptimizer(_HyperOptimizer):
    """A class for hyperparameter optimization."""

    def __init__(self, opt_matrix: Dict[str, Dict[str, Union[SearchSpace, None, BaseTunableModel]]]):
        super().__init__(opt_matrix)

    def _objective(
        self,
        params: Dict,
        model: BaseTunableModel,
        X_train: DataFrame,
        y_train: Union[DataFrame, Series, None],
        X_val: Union[DataFrame, None],
        y_val: Union[DataFrame, Series, None],
        metric: str,
        experiment_id: str,
    ) -> Dict:
        """The objective function to be minimized during hyperparameter tuning.

        Args:
            params: A dictionary of hyperparameters to be evaluated.
            model: The model to be evaluated.
            kwargs: A dictionary of training specifications.

        Returns:
            Dict: A dictionary containing the status of the optimization and the loss value.
        """
        if isinstance(metric, str):
            scorer_fn = self.METRICS_MAP[metric]
            metric_name = metric
        else:
            scorer_fn = metric
            metric_name = metric.__name__

        # force paramparam to avoid catboost from writing files on executors for which it has no permissions
        if isinstance(model, TunableCatBoostRegressor):
            params["allow_writing_files"] = False

        with mlflow.start_run(experiment_id=experiment_id, nested=True) as run:
            scorer = make_scorer(scorer_fn, greater_is_better=False)
            model.set_params(params)
            mlflow.log_params(params)
            if X_val is not None and y_val is not None:
                model.fit(X_train, y_train)  # type: ignore
                loss = scorer(model, X_val, y_val)
            else:
                loss = cross_val_score(
                    estimator=model.get_model(),
                    X=X_train,
                    y=y_train,
                    scoring=scorer,
                    cv=TimeSeriesSplit(n_splits=5),
                ).mean()
            if metric.startswith("neg_"):
                metric_name = metric.split("_", 1)[-1]
            mlflow.log_metric(metric_name, -loss)
        return {"status": STATUS_OK, "loss": -loss, "trained_model": model}

    def _setup_tuning_data(
        self,
        X: DataFrame,
        y: Union[DataFrame, Series],
        validation_mode: str = "cv",
        time_cols: Union[str, List[str], None] = None,
    ) -> Tuple[DataFrame, Union[DataFrame, None], Union[DataFrame, Series], Union[DataFrame, Series, None]]:
        """
        Prepare the data for hyperparameter tuning.
        Args:
            X (pd.DataFrame): The feature matrix.
            y (Union[pd.DataFrame, pd.Series]): The target variable.
            validation_mode (str): The validation mode for tuning. Accepts 'val' (for holdout validation) or 'cv'
                (for cross-validation). Defaults to 'cv'.
            time_cols (Union[str, List[str]]): The name(s) of the time columns if time-based ordering is required.
                Defaults to None.

        Returns:
            Tuple[DataFrame, Union[DataFrame, None], Union[DataFrame, Series], Union[DataFrame, Series, None]]: A tuple containing the training and
                validation feature matrices and target vectors.
        """
        if time_cols:
            X, y = self._order_by_time(X, y, time_cols)
        if validation_mode == "val":
            X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.15, shuffle=False, random_state=42)
        elif validation_mode == "cv":
            X_train = X
            y_train = y
            X_val = None
            y_val = None
        else:
            raise ValueError(f"Validation mode {validation_mode} not supported, accepted are 'val' and 'cv'")
        return X_train, X_val, y_train, y_val

    def _build_objective_function(
        self,
        X_train: DataFrame,
        y_train: Union[DataFrame, Series, None],
        X_val: Union[DataFrame, None],
        y_val: Union[DataFrame, Series, None],
        model: BaseTunableModel,
        metric: Union[str, None],
        experiment_id: str,
    ) -> Callable:
        """
        Build the objective function for hyperparameter tuning.

        Args:
            X_train (pd.DataFrame): The training feature matrix.
            y_train (Union[pd.DataFrame, pd.Series]): The training target variable.
            X_val (Union[pd.DataFrame, None]): The validation feature matrix. None if validation_mode is 'cv'.
            y_val (Union[pd.DataFrame, pd.Series, None]): The validation target variable. None if validation_mode is 'cv'.
            model (BaseTunableModel): The tunable model to be trained.
            metric (str): The evaluation metric to optimize.
            experiment_id (str): The ID of the experiment.

        Returns:
            Callable: A callable function representing the objective function for hyperparameter tuning.
        """
        return partial(
            self._objective,
            metric=metric,  # type: ignore
            X_train=X_train,
            y_train=y_train,
            X_val=X_val,
            y_val=y_val,
            model=model,
            experiment_id=experiment_id,
        )

    def tune(
        self,
        X: DataFrame,
        y: Union[DataFrame, Series],
        n_evals: int,
        parallelism: int,
        validation_mode: str = "cv",
        metric: Optional[str] = None,
        time_cols: Union[str, List[str], None] = None,
        experiment_name: Optional[str] = None,
        spark: Optional[SparkSession] = None,
    ) -> Tuple[BaseTunableModel, Dict, float]:
        """Tunes hyperparameters for the models using the specified search spaces.

        Args:
            X: The features for training.
            y: The target values for training.
            n_evals: The maximum number of evaluations for each model.
            parallelism: The number of Spark workers to use for parallel evaluation.
            training_specs: A dictionary of training specifications.

        Returns:
            BaseTunableModel: The best model found during tuning.
        """
        best_model = None
        best_hyp = None
        best_loss = float("inf")

        X_train, X_val, y_train, y_val = self._setup_tuning_data(X, y, validation_mode, time_cols)

        # Check if experiment exists, if not, create it
        if experiment_name:
            experiment_id = get_or_create_experiment(experiment_name)
            mlflow.set_experiment(experiment_name=experiment_name)
        else:
            experiment_id = None
        for i, model in enumerate(self.models):
            trials = SparkTrials(spark_session=spark, parallelism=parallelism)
            space = self.spaces[i]
            objective_fn = self._build_objective_function(X_train, y_train, X_val, y_val, model, metric, experiment_id)  # type: ignore
            hyp = fmin(
                objective_fn,
                space.get_space(),
                algo=tpe.suggest,
                max_evals=n_evals,
                trials=trials,
            )
            loss = trials.best_trial["result"]["loss"]  # type: ignore
            if loss < best_loss:
                best_model = trials.best_trial["result"]["trained_model"]  # type: ignore
                best_loss = loss
                best_hyp = space_eval(space.get_space(), hyp)
        return best_model, best_hyp, best_loss  # type: ignore

Ancestors

  • panama.ml.auto.hyperoptimizer._HyperOptimizer

Methods

def tune(self, X: pandas.core.frame.DataFrame, y: Union[pandas.core.frame.DataFrame, pandas.core.series.Series], n_evals: int, parallelism: int, validation_mode: str = 'cv', metric: Optional[str] = None, time_cols: Union[str, List[str], ForwardRef(None)] = None, experiment_name: Optional[str] = None, spark: Optional[pyspark.sql.session.SparkSession] = None) ‑> Tuple[BaseTunableModel, Dict, float]

Tunes hyperparameters for the models using the specified search spaces.

Args

X
The features for training.
y
The target values for training.
n_evals
The maximum number of evaluations for each model.
parallelism
The number of Spark workers to use for parallel evaluation.
training_specs
A dictionary of training specifications.

Returns

BaseTunableModel
The best model found during tuning.
class TimeSeriesHyperOptimizer (opt_matrix: Dict[str, Dict[str, Union[SearchSpace, ForwardRef(None), BaseTunableModel]]])

A class for hyperparameter optimization.

Initializes the HyperOptimizer with a dict containing for each model, the model and the corresponding search space.

Args

opt_matrix
A dictionary containing the models and their search spaces to be tuned. The keys of the dictionary are the names of the models and the values are dictionaries containing the following keys: - 'model': A BaseTunableModel instance to be tuned. - 'space': A SearchSpace instance specifying the search space for the given model. If not provided, a default search space will be used.
Expand source code
class TimeSeriesHyperOptimizer(_HyperOptimizer):
    """A class for hyperparameter optimization."""

    def __init__(
        self,
        opt_matrix: Dict[str, Dict[str, Union[SearchSpace, None, BaseTunableModel]]],
    ):
        """Initializes the HyperOptimizer with a dict containing for each model,
        the model and the corresponding search space.

        Args:
            opt_matrix: A dictionary containing the models and their search spaces to be tuned.
                        The keys of the dictionary are the names of the models and the values are dictionaries containing the following keys:
                            - 'model': A BaseTunableModel instance to be tuned.
                            - 'space': A SearchSpace instance specifying the
                                       search space for the given model.
                                       If not provided, a default search space
                                       will be used.
        """
        super().__init__(opt_matrix)

    def _objective(
        self,
        params: Dict,
        model: BaseTunableModel,
        X: DataFrame,
        y: Series,
        metric: Union[str, Callable],
        experiment_id: str,
    ) -> Dict:
        """The objective function to be minimized during hyperparameter tuning.

        Args:
            params: A dictionary of hyperparameters to be evaluated.
            model: The model to be evaluated.
            kwargs: A dictionary of training specifications.

        Returns:
            Dict: A dictionary containing the status of the optimization and the loss value.
        """
        if isinstance(metric, str):
            scorer = self.METRICS_MAP[metric]
            metric_name = metric
        else:
            scorer = metric
            metric_name = metric.__name__
        with mlflow.start_run(experiment_id=experiment_id, nested=True) as run:
            model.set_params(params)
            model.fit(y, X)
            mlflow.log_params(params)
            y_hat = model.get_fittedvalues()
            loss = scorer(y, y_hat)
            mlflow.log_metric(metric_name, loss)
        return {"status": STATUS_OK, "loss": loss, "trained_model": model}

    def _cv_objective(
        self,
        params: Dict,
        model: BaseTunableModel,
        X: DataFrame,
        y: Series,
        metric: Union[str, Callable],
        experiment_id: str,
    ) -> Dict:
        """The objective function to be minimized during hyperparameter tuning.

        Args:
            params: A dictionary of hyperparameters to be evaluated.
            model: The model to be evaluated.
            kwargs: A dictionary of training specifications.

        Returns:
            Dict: A dictionary containing the status of the optimization and the loss value.
        """
        if isinstance(metric, str):
            scorer = self.METRICS_MAP[metric]
            metric_name = metric
        else:
            scorer = metric
            metric_name = metric.__name__

        try:
            freq = y.index.freq
        except:
            freq = pd.infer_freq(y.index)

        if freq in ["M", "MS", "D", "H"]:
            window_size = pd.DateOffset(months=1)
            test_size = pd.DateOffset(years=1)
        elif freq == "Y":
            window_size = pd.DateOffset(years=1)
            test_size = pd.DateOffset(years=2)
        else:
            raise ValueError("Unsupported frequency: {}".format(freq))

        with mlflow.start_run(experiment_id=experiment_id, nested=True) as run:
            model.set_params(params)
            mlflow.log_params(params)

            end_date = y.index[-1]
            start_test_date = end_date - test_size
            y_hat = []
            y_true = []
            for d in pd.date_range(start=start_test_date, end=end_date, freq=freq):
                if d > end_date:
                    break
                else:
                    train_y = y[y.index < d]
                    test_y = y[(y.index >= d) & (y.index < (d + window_size))]
                    if len(train_y.index) == 0:
                        continue
                    y_true.extend(test_y)

                    if X is not None:
                        train_X = X[X.index < d]
                        test_X = X[(X.index >= d) & (X.index < (d + window_size))]
                        model.fit(train_y, train_X)
                        preds = model.predict(future=test_X)
                    else:
                        model.fit(train_y)
                        preds = model.predict(future=(test_y.index[0], test_y.index[-1]))

                    y_hat.extend(preds)

            loss = scorer(y_true, y_hat)
            mlflow.log_metric(metric_name, loss)
        return {"status": STATUS_OK, "loss": loss, "trained_model": model}

    def _build_objective_function(
        self,
        X: DataFrame,
        y: Union[DataFrame, Series, None],
        model: BaseTunableModel,
        metric: Union[str, Callable],
        experiment_id: str,
        validation_mode: str,
    ) -> Callable:
        if validation_mode == "val":
            return partial(
                self._objective,
                metric=metric,
                X=X,
                y=y,
                model=model,
                experiment_id=experiment_id,
            )
        elif validation_mode == "cv":
            return partial(
                self._cv_objective,
                metric=metric,
                X=X,
                y=y,
                model=model,
                experiment_id=experiment_id,
            )
        else:
            raise ValueError(f"{validation_mode} not not valid, supported values are 'val' and 'cv'")

    def tune(
        self,
        X: DataFrame,
        y: Union[DataFrame, Series],
        n_evals: int,
        parallelism: int,
        validation_mode: str = "cv",
        metric: Union[str, Callable, None] = None,
        experiment_name: Optional[str] = None,
        spark: Optional[SparkSession] = None,
    ) -> Tuple[BaseTunableModel, Dict, float]:
        """Tunes hyperparameters for the models using the specified search spaces.

        Args:
            X: The features for training.
            y: The target values for training.
            n_evals: The maximum number of evaluations for each model.
            parallelism: The number of Spark workers to use for parallel evaluation.
            training_specs: A dictionary of training specifications.

        Returns:
            BaseTunableModel: The best model found during tuning.
        """
        best_model = None
        best_hyp = None
        best_loss = float("inf")

        # Check if experiment exists, if not, create it
        if experiment_name:
            experiment_id = get_or_create_experiment(experiment_name)
            mlflow.set_experiment(experiment_name=experiment_name)
        else:
            experiment_id = None
        for i, model in enumerate(self.models):
            trials = SparkTrials(spark_session=spark, parallelism=parallelism)
            space = self.spaces[i]
            objective_fn = self._build_objective_function(X, y, model, metric, experiment_id, validation_mode)
            hyp = fmin(
                objective_fn,
                space.get_space(),
                algo=tpe.suggest,
                max_evals=n_evals,
                trials=trials,
            )
            loss = trials.best_trial["result"]["loss"]
            if loss < best_loss:
                best_model = trials.best_trial["result"]["trained_model"]
                best_loss = loss
                best_hyp = space_eval(space.get_space(), hyp)
        return best_model, best_hyp, best_loss

Ancestors

  • panama.ml.auto.hyperoptimizer._HyperOptimizer

Methods

def tune(self, X: pandas.core.frame.DataFrame, y: Union[pandas.core.frame.DataFrame, pandas.core.series.Series], n_evals: int, parallelism: int, validation_mode: str = 'cv', metric: Union[str, collections.abc.Callable, ForwardRef(None)] = None, experiment_name: Optional[str] = None, spark: Optional[pyspark.sql.session.SparkSession] = None) ‑> Tuple[BaseTunableModel, Dict, float]

Tunes hyperparameters for the models using the specified search spaces.

Args

X
The features for training.
y
The target values for training.
n_evals
The maximum number of evaluations for each model.
parallelism
The number of Spark workers to use for parallel evaluation.
training_specs
A dictionary of training specifications.

Returns

BaseTunableModel
The best model found during tuning.