Module panama.ml.auto.hyperoptimizer
Classes
class MachineLearningHyperOptimizer (opt_matrix: Dict[str, Dict[str, Union[SearchSpace, ForwardRef(None), BaseTunableModel]]])
-
A class for hyperparameter optimization.
Initializes the HyperOptimizer with a dict containing for each model, the model and the corresponding search space.
Args
opt_matrix
- A dictionary containing the models and their search spaces to be tuned. The keys of the dictionary are the names of the models and the values are dictionaries containing the following keys: - 'model': A BaseTunableModel instance to be tuned. - 'space': A SearchSpace instance specifying the search space for the given model. If not provided, a default search space will be used.
Returns
None.
Expand source code
class MachineLearningHyperOptimizer(_HyperOptimizer): """A class for hyperparameter optimization.""" def __init__(self, opt_matrix: Dict[str, Dict[str, Union[SearchSpace, None, BaseTunableModel]]]): super().__init__(opt_matrix) def _objective( self, params: Dict, model: BaseTunableModel, X_train: DataFrame, y_train: Union[DataFrame, Series, None], X_val: Union[DataFrame, None], y_val: Union[DataFrame, Series, None], metric: str, experiment_id: str, ) -> Dict: """The objective function to be minimized during hyperparameter tuning. Args: params: A dictionary of hyperparameters to be evaluated. model: The model to be evaluated. kwargs: A dictionary of training specifications. Returns: Dict: A dictionary containing the status of the optimization and the loss value. """ if isinstance(metric, str): scorer_fn = self.METRICS_MAP[metric] metric_name = metric else: scorer_fn = metric metric_name = metric.__name__ # force paramparam to avoid catboost from writing files on executors for which it has no permissions if isinstance(model, TunableCatBoostRegressor): params["allow_writing_files"] = False with mlflow.start_run(experiment_id=experiment_id, nested=True) as run: scorer = make_scorer(scorer_fn, greater_is_better=False) model.set_params(params) mlflow.log_params(params) if X_val is not None and y_val is not None: model.fit(X_train, y_train) # type: ignore loss = scorer(model, X_val, y_val) else: loss = cross_val_score( estimator=model.get_model(), X=X_train, y=y_train, scoring=scorer, cv=TimeSeriesSplit(n_splits=5), ).mean() if metric.startswith("neg_"): metric_name = metric.split("_", 1)[-1] mlflow.log_metric(metric_name, -loss) return {"status": STATUS_OK, "loss": -loss, "trained_model": model} def _setup_tuning_data( self, X: DataFrame, y: Union[DataFrame, Series], validation_mode: str = "cv", time_cols: Union[str, List[str], None] = None, ) -> Tuple[DataFrame, Union[DataFrame, None], Union[DataFrame, Series], Union[DataFrame, Series, None]]: """ Prepare the data for hyperparameter tuning. Args: X (pd.DataFrame): The feature matrix. y (Union[pd.DataFrame, pd.Series]): The target variable. validation_mode (str): The validation mode for tuning. Accepts 'val' (for holdout validation) or 'cv' (for cross-validation). Defaults to 'cv'. time_cols (Union[str, List[str]]): The name(s) of the time columns if time-based ordering is required. Defaults to None. Returns: Tuple[DataFrame, Union[DataFrame, None], Union[DataFrame, Series], Union[DataFrame, Series, None]]: A tuple containing the training and validation feature matrices and target vectors. """ if time_cols: X, y = self._order_by_time(X, y, time_cols) if validation_mode == "val": X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.15, shuffle=False, random_state=42) elif validation_mode == "cv": X_train = X y_train = y X_val = None y_val = None else: raise ValueError(f"Validation mode {validation_mode} not supported, accepted are 'val' and 'cv'") return X_train, X_val, y_train, y_val def _build_objective_function( self, X_train: DataFrame, y_train: Union[DataFrame, Series, None], X_val: Union[DataFrame, None], y_val: Union[DataFrame, Series, None], model: BaseTunableModel, metric: Union[str, None], experiment_id: str, ) -> Callable: """ Build the objective function for hyperparameter tuning. Args: X_train (pd.DataFrame): The training feature matrix. y_train (Union[pd.DataFrame, pd.Series]): The training target variable. X_val (Union[pd.DataFrame, None]): The validation feature matrix. None if validation_mode is 'cv'. y_val (Union[pd.DataFrame, pd.Series, None]): The validation target variable. None if validation_mode is 'cv'. model (BaseTunableModel): The tunable model to be trained. metric (str): The evaluation metric to optimize. experiment_id (str): The ID of the experiment. Returns: Callable: A callable function representing the objective function for hyperparameter tuning. """ return partial( self._objective, metric=metric, # type: ignore X_train=X_train, y_train=y_train, X_val=X_val, y_val=y_val, model=model, experiment_id=experiment_id, ) def tune( self, X: DataFrame, y: Union[DataFrame, Series], n_evals: int, parallelism: int, validation_mode: str = "cv", metric: Optional[str] = None, time_cols: Union[str, List[str], None] = None, experiment_name: Optional[str] = None, spark: Optional[SparkSession] = None, ) -> Tuple[BaseTunableModel, Dict, float]: """Tunes hyperparameters for the models using the specified search spaces. Args: X: The features for training. y: The target values for training. n_evals: The maximum number of evaluations for each model. parallelism: The number of Spark workers to use for parallel evaluation. training_specs: A dictionary of training specifications. Returns: BaseTunableModel: The best model found during tuning. """ best_model = None best_hyp = None best_loss = float("inf") X_train, X_val, y_train, y_val = self._setup_tuning_data(X, y, validation_mode, time_cols) # Check if experiment exists, if not, create it if experiment_name: experiment_id = get_or_create_experiment(experiment_name) mlflow.set_experiment(experiment_name=experiment_name) else: experiment_id = None for i, model in enumerate(self.models): trials = SparkTrials(spark_session=spark, parallelism=parallelism) space = self.spaces[i] objective_fn = self._build_objective_function(X_train, y_train, X_val, y_val, model, metric, experiment_id) # type: ignore hyp = fmin( objective_fn, space.get_space(), algo=tpe.suggest, max_evals=n_evals, trials=trials, ) loss = trials.best_trial["result"]["loss"] # type: ignore if loss < best_loss: best_model = trials.best_trial["result"]["trained_model"] # type: ignore best_loss = loss best_hyp = space_eval(space.get_space(), hyp) return best_model, best_hyp, best_loss # type: ignore
Ancestors
- panama.ml.auto.hyperoptimizer._HyperOptimizer
Methods
def tune(self, X: pandas.core.frame.DataFrame, y: Union[pandas.core.frame.DataFrame, pandas.core.series.Series], n_evals: int, parallelism: int, validation_mode: str = 'cv', metric: Optional[str] = None, time_cols: Union[str, List[str], ForwardRef(None)] = None, experiment_name: Optional[str] = None, spark: Optional[pyspark.sql.session.SparkSession] = None) ‑> Tuple[BaseTunableModel, Dict, float]
-
Tunes hyperparameters for the models using the specified search spaces.
Args
X
- The features for training.
y
- The target values for training.
n_evals
- The maximum number of evaluations for each model.
parallelism
- The number of Spark workers to use for parallel evaluation.
training_specs
- A dictionary of training specifications.
Returns
BaseTunableModel
- The best model found during tuning.
class TimeSeriesHyperOptimizer (opt_matrix: Dict[str, Dict[str, Union[SearchSpace, ForwardRef(None), BaseTunableModel]]])
-
A class for hyperparameter optimization.
Initializes the HyperOptimizer with a dict containing for each model, the model and the corresponding search space.
Args
opt_matrix
- A dictionary containing the models and their search spaces to be tuned. The keys of the dictionary are the names of the models and the values are dictionaries containing the following keys: - 'model': A BaseTunableModel instance to be tuned. - 'space': A SearchSpace instance specifying the search space for the given model. If not provided, a default search space will be used.
Expand source code
class TimeSeriesHyperOptimizer(_HyperOptimizer): """A class for hyperparameter optimization.""" def __init__( self, opt_matrix: Dict[str, Dict[str, Union[SearchSpace, None, BaseTunableModel]]], ): """Initializes the HyperOptimizer with a dict containing for each model, the model and the corresponding search space. Args: opt_matrix: A dictionary containing the models and their search spaces to be tuned. The keys of the dictionary are the names of the models and the values are dictionaries containing the following keys: - 'model': A BaseTunableModel instance to be tuned. - 'space': A SearchSpace instance specifying the search space for the given model. If not provided, a default search space will be used. """ super().__init__(opt_matrix) def _objective( self, params: Dict, model: BaseTunableModel, X: DataFrame, y: Series, metric: Union[str, Callable], experiment_id: str, ) -> Dict: """The objective function to be minimized during hyperparameter tuning. Args: params: A dictionary of hyperparameters to be evaluated. model: The model to be evaluated. kwargs: A dictionary of training specifications. Returns: Dict: A dictionary containing the status of the optimization and the loss value. """ if isinstance(metric, str): scorer = self.METRICS_MAP[metric] metric_name = metric else: scorer = metric metric_name = metric.__name__ with mlflow.start_run(experiment_id=experiment_id, nested=True) as run: model.set_params(params) model.fit(y, X) mlflow.log_params(params) y_hat = model.get_fittedvalues() loss = scorer(y, y_hat) mlflow.log_metric(metric_name, loss) return {"status": STATUS_OK, "loss": loss, "trained_model": model} def _cv_objective( self, params: Dict, model: BaseTunableModel, X: DataFrame, y: Series, metric: Union[str, Callable], experiment_id: str, ) -> Dict: """The objective function to be minimized during hyperparameter tuning. Args: params: A dictionary of hyperparameters to be evaluated. model: The model to be evaluated. kwargs: A dictionary of training specifications. Returns: Dict: A dictionary containing the status of the optimization and the loss value. """ if isinstance(metric, str): scorer = self.METRICS_MAP[metric] metric_name = metric else: scorer = metric metric_name = metric.__name__ try: freq = y.index.freq except: freq = pd.infer_freq(y.index) if freq in ["M", "MS", "D", "H"]: window_size = pd.DateOffset(months=1) test_size = pd.DateOffset(years=1) elif freq == "Y": window_size = pd.DateOffset(years=1) test_size = pd.DateOffset(years=2) else: raise ValueError("Unsupported frequency: {}".format(freq)) with mlflow.start_run(experiment_id=experiment_id, nested=True) as run: model.set_params(params) mlflow.log_params(params) end_date = y.index[-1] start_test_date = end_date - test_size y_hat = [] y_true = [] for d in pd.date_range(start=start_test_date, end=end_date, freq=freq): if d > end_date: break else: train_y = y[y.index < d] test_y = y[(y.index >= d) & (y.index < (d + window_size))] if len(train_y.index) == 0: continue y_true.extend(test_y) if X is not None: train_X = X[X.index < d] test_X = X[(X.index >= d) & (X.index < (d + window_size))] model.fit(train_y, train_X) preds = model.predict(future=test_X) else: model.fit(train_y) preds = model.predict(future=(test_y.index[0], test_y.index[-1])) y_hat.extend(preds) loss = scorer(y_true, y_hat) mlflow.log_metric(metric_name, loss) return {"status": STATUS_OK, "loss": loss, "trained_model": model} def _build_objective_function( self, X: DataFrame, y: Union[DataFrame, Series, None], model: BaseTunableModel, metric: Union[str, Callable], experiment_id: str, validation_mode: str, ) -> Callable: if validation_mode == "val": return partial( self._objective, metric=metric, X=X, y=y, model=model, experiment_id=experiment_id, ) elif validation_mode == "cv": return partial( self._cv_objective, metric=metric, X=X, y=y, model=model, experiment_id=experiment_id, ) else: raise ValueError(f"{validation_mode} not not valid, supported values are 'val' and 'cv'") def tune( self, X: DataFrame, y: Union[DataFrame, Series], n_evals: int, parallelism: int, validation_mode: str = "cv", metric: Union[str, Callable, None] = None, experiment_name: Optional[str] = None, spark: Optional[SparkSession] = None, ) -> Tuple[BaseTunableModel, Dict, float]: """Tunes hyperparameters for the models using the specified search spaces. Args: X: The features for training. y: The target values for training. n_evals: The maximum number of evaluations for each model. parallelism: The number of Spark workers to use for parallel evaluation. training_specs: A dictionary of training specifications. Returns: BaseTunableModel: The best model found during tuning. """ best_model = None best_hyp = None best_loss = float("inf") # Check if experiment exists, if not, create it if experiment_name: experiment_id = get_or_create_experiment(experiment_name) mlflow.set_experiment(experiment_name=experiment_name) else: experiment_id = None for i, model in enumerate(self.models): trials = SparkTrials(spark_session=spark, parallelism=parallelism) space = self.spaces[i] objective_fn = self._build_objective_function(X, y, model, metric, experiment_id, validation_mode) hyp = fmin( objective_fn, space.get_space(), algo=tpe.suggest, max_evals=n_evals, trials=trials, ) loss = trials.best_trial["result"]["loss"] if loss < best_loss: best_model = trials.best_trial["result"]["trained_model"] best_loss = loss best_hyp = space_eval(space.get_space(), hyp) return best_model, best_hyp, best_loss
Ancestors
- panama.ml.auto.hyperoptimizer._HyperOptimizer
Methods
def tune(self, X: pandas.core.frame.DataFrame, y: Union[pandas.core.frame.DataFrame, pandas.core.series.Series], n_evals: int, parallelism: int, validation_mode: str = 'cv', metric: Union[str, collections.abc.Callable, ForwardRef(None)] = None, experiment_name: Optional[str] = None, spark: Optional[pyspark.sql.session.SparkSession] = None) ‑> Tuple[BaseTunableModel, Dict, float]
-
Tunes hyperparameters for the models using the specified search spaces.
Args
X
- The features for training.
y
- The target values for training.
n_evals
- The maximum number of evaluations for each model.
parallelism
- The number of Spark workers to use for parallel evaluation.
training_specs
- A dictionary of training specifications.
Returns
BaseTunableModel
- The best model found during tuning.