Module panama.ml.flow.utils

Functions

def build_experiment_name_from_str(spark: pyspark.sql.session.SparkSession, experiment_name: str) ‑> str
Expand source code
def build_experiment_name_from_str(spark: SparkSession, experiment_name: str) -> str:
    """
    Build an experiment name from a string for a given user.

    Args:
        experiment_name (str): The name of the experiment.

    Returns:
        str: The experiment name with the user's name included in the path.
    """
    dbutils = get_db_utils(spark)
    current_user = json.loads(dbutils.notebook.entry_point.getDbutils().notebook().getContext().toJson())["tags"][
        "user"
    ]
    return os.path.join(f"/Users/{current_user}/{experiment_name}")

Build an experiment name from a string for a given user.

Args

experiment_name : str
The name of the experiment.

Returns

str
The experiment name with the user's name included in the path.
def change_model_stage(model_name: str,
model_version: int | str,
new_status: str,
archive_existing_versions: bool = False) ‑> None
Expand source code
def change_model_stage(
    model_name: str, model_version: Union[int, str], new_status: str, archive_existing_versions: bool = False
) -> None:
    """Change the stage of a model version in the MLflow model registry.

    Args:
        model_name: The name of the model to change the version status for.
        model_version: The version of the model to change the status for.
        new_status: The new status to assign to the model version.
        archive_existing_versions: If this flag is set to True, all existing model versions in the stage will be automatically moved to the “archived” stage. Only valid when stage is "staging" or "production" otherwise an error will be raised.
    """
    if new_status not in ["None", "Staging", "Production", "Archived"]:
        raise ValueError(
            f"{new_status} value for new_status not supported, allowed values are 'Development', 'Staging', 'Production', 'Archived'"
        )
    MlflowClient().transition_model_version_stage(
        name=model_name, version=model_version, stage=new_status, archive_existing_versions=archive_existing_versions
    )

Change the stage of a model version in the MLflow model registry.

Args

model_name
The name of the model to change the version status for.
model_version
The version of the model to change the status for.
new_status
The new status to assign to the model version.
archive_existing_versions
If this flag is set to True, all existing model versions in the stage will be automatically moved to the “archived” stage. Only valid when stage is "staging" or "production" otherwise an error will be raised.
def change_model_stage_remotely(model_name: str,
model_version: int | str,
new_status: str,
registry_scope: str,
registry_key: str) ‑> None
Expand source code
def change_model_stage_remotely(
    model_name: str, model_version: Union[int, str], new_status: str, registry_scope: str, registry_key: str
) -> None:
    """Change the stage of a model version in a remote MLflow model registry.

    Args:
        model_name: The name of the model to change the version status for.
        model_version: The version of the model to change the status for.
        new_status: The new status to assign to the model version.
        registry_scope: The scope of the registry to change the model status in.
        registry_key: The key for the registry to change the model status in.
    """
    if new_status not in ["None", "Staging", "Production", "Archived"]:
        raise ValueError(
            f"{new_status} value for new_status not supported, allowed values are 'Development', 'Staging', 'Production', 'Archived'"
        )
    registry_uri = f"databricks://{registry_scope}:{registry_key}"
    mlflow.set_registry_uri(registry_uri)
    client = MlflowClient(tracking_uri=None, registry_uri=registry_uri)
    client.transition_model_version_stage(model_name, model_version, new_status)

Change the stage of a model version in a remote MLflow model registry.

Args

model_name
The name of the model to change the version status for.
model_version
The version of the model to change the status for.
new_status
The new status to assign to the model version.
registry_scope
The scope of the registry to change the model status in.
registry_key
The key for the registry to change the model status in.
def delete_model_from_registry(model_name: str, version: int | str = None) ‑> None
Expand source code
def delete_model_from_registry(model_name: str, version: Union[int, str] = None) -> None:
    """Delete a model from the MLflow model registry.

    Args:
        model_name: The name of the model to delete.
        model_version: The version of the model to delete.
    """
    client = MlflowClient()
    if version:
        client.delete_registered_model(model_name, version=version)
    else:
        client.delete_registered_model(model_name)

Delete a model from the MLflow model registry.

Args

model_name
The name of the model to delete.
model_version
The version of the model to delete.
def get_best_model_by_experiment(experiment_name: str, metric_name: str, model_class: str, ascending: bool = True) ‑> Any
Expand source code
def get_best_model_by_experiment(
    experiment_name: str, metric_name: str, model_class: str, ascending: bool = True
) -> Any:
    """Retrieve the best model from an MLflow experiment based on a specified metric and filtering on model_class name if saved as model_type during logging.

    Args:
        experiment_name: The name of the experiment to query.
        metric_name: The name of the metric to order the runs by.
        model_class: The class of the model to filter the runs.
        ascending: Whether to order the runs in ascending or descending order.
            Defaults to True (ascending).

    Returns:
        The best model with the underlying implementation.
    """
    experiment_id = mlflow.get_experiment_by_name(experiment_name).experiment_id
    filter_by = f"params.model_type = '{model_class}'"
    best_run = mlflow.search_runs(
        experiment_ids=[experiment_id],
        filter_string=filter_by,
        order_by=[f"metrics.{metric_name} {'' if ascending else 'DESC'}"],
        max_results=1,
    ).run_id[0]
    model_uri = f"runs:/{best_run}/model"
    pyfunc_model = mlflow.pyfunc.load_model(model_uri)
    return unwrap_model_impl(pyfunc_model)

Retrieve the best model from an MLflow experiment based on a specified metric and filtering on model_class name if saved as model_type during logging.

Args

experiment_name
The name of the experiment to query.
metric_name
The name of the metric to order the runs by.
model_class
The class of the model to filter the runs.
ascending
Whether to order the runs in ascending or descending order. Defaults to True (ascending).

Returns

The best model with the underlying implementation.

def get_best_runs_by_experiment(experiment_name: str, metric_name: str, ascending: bool = True, n_best: int = 5) ‑> pandas.core.frame.DataFrame
Expand source code
def get_best_runs_by_experiment(
    experiment_name: str, metric_name: str, ascending: bool = True, n_best: int = 5
) -> pd.DataFrame:
    """Query the given experiment for runs and order them based on the given metric.

    Args:
        experiment_name: The name of the experiment to query.
        metric_name: The name of the metric to order the runs by.
        ascending: Whether to order the runs in ascending or descending order.
            Defaults to True (ascending).
        n_best: The number of best runs to return.

    Returns:
        A DataFrame containing the best runs, sorted by the given metric.
    """
    experiment_id = mlflow.get_experiment_by_name(experiment_name).experiment_id
    best_runs = mlflow.search_runs(
        experiment_ids=[experiment_id],
        order_by=[f"metrics.{metric_name} {'' if ascending else 'DESC'}"],
        max_results=n_best,
    )
    return best_runs

Query the given experiment for runs and order them based on the given metric.

Args

experiment_name
The name of the experiment to query.
metric_name
The name of the metric to order the runs by.
ascending
Whether to order the runs in ascending or descending order. Defaults to True (ascending).
n_best
The number of best runs to return.

Returns

A DataFrame containing the best runs, sorted by the given metric.

def get_experiment_id(experiment_name: str) ‑> int
Expand source code
def get_experiment_id(experiment_name: str) -> int:
    """Retrieve the ID of the experiment with the given name.

    Args:
        experiment_name: The name of the experiment to retrieve.

    Returns:
        The ID of the experiment with the given name.
    """
    experiment_id = mlflow.get_experiment_by_name(experiment_name).experiment_id
    return experiment_id

Retrieve the ID of the experiment with the given name.

Args

experiment_name
The name of the experiment to retrieve.

Returns

The ID of the experiment with the given name.

def get_flavour_from_model(model: mlflow.pyfunc.PyFuncModel) ‑> None
Expand source code
def get_flavour_from_model(model: mlflow.pyfunc.PyFuncModel) -> None:
    """Method used to set the flavour based on the model
    Args:
        model: the model whose flavour is to be extracted.

    """

    klass = model.__class__
    module = klass.__module__

    if module == "builtins":
        flavour = klass.__qualname__
    else:
        flavour = module + "." + klass.__qualname__

    return flavour.split(".")[0]

Method used to set the flavour based on the model

Args

model
the model whose flavour is to be extracted.
def get_model_run_id_by_run_name(experiment_id: str, run_name: str) ‑> str | None
Expand source code
def get_model_run_id_by_run_name(experiment_id: str, run_name: str) -> Union[str, None]:
    """Retrieve the run ID of a specific run in an MLflow experiment based on the run name.

    Args:
        experiment_id: The ID of the MLflow experiment.
        run_name: The name of the run.

    Returns:
        The run ID of the specified run, or None if the run is not found.
    """
    run_ids = mlflow.search_runs(experiment_ids=experiment_id, filter_string=f"run_name='{run_name}'")["run_id"]
    if len(run_ids.index) > 0:
        return run_ids.values[0]
    else:
        return None

Retrieve the run ID of a specific run in an MLflow experiment based on the run name.

Args

experiment_id
The ID of the MLflow experiment.
run_name
The name of the run.

Returns

The run ID of the specified run, or None if the run is not found.

def get_model_run_id_from_model_name_stage(model_name: str, model_stage: str) ‑> str
Expand source code
def get_model_run_id_from_model_name_stage(model_name: str, model_stage: str) -> str:
    """
    Get model run id from registered model's name and stage.

    Parameters:
        model_name (str): Name of registered model.
        model_stage (str): Registered stage of model. Accepted values Staging|Archived|Production|None

    Returns:
        str: run id.
    """
    if model_stage not in ["None", "Staging", "Production", "Archived"]:
        raise ValueError(
            f"{model_stage} value for stage not supported, allowed values are 'Development', 'Staging', 'Production', 'Archived'"
        )
    model_uri = f"models:/{model_name}/{model_stage}"  # Get model uri
    model_info = mlflow.models.get_model_info(model_uri=model_uri)  # From uri, get model info

    return model_info.run_id

Get model run id from registered model's name and stage.

Parameters

model_name (str): Name of registered model. model_stage (str): Registered stage of model. Accepted values Staging|Archived|Production|None

Returns

str
run id.
def get_or_create_experiment(experiment_name: str) ‑> int
Expand source code
def get_or_create_experiment(experiment_name: str) -> int:
    """Get the ID of an existing experiment with the given name, or create a new experiment.

    Args:
        experiment_name: The name of the experiment to get or create.

    Returns:
        The ID of the existing or newly created experiment.
    """
    client = MlflowClient()
    experiment = client.get_experiment_by_name(experiment_name)
    if experiment is not None:
        return experiment.experiment_id
    else:
        return client.create_experiment(experiment_name)

Get the ID of an existing experiment with the given name, or create a new experiment.

Args

experiment_name
The name of the experiment to get or create.

Returns

The ID of the existing or newly created experiment.

def get_parent_run_id(experiment_id: str, run_name: str = None) ‑> str
Expand source code
def get_parent_run_id(experiment_id: str, run_name: str = None) -> str:
    """Retrieve the run ID of the parent run in an MLflow experiment.

    Args:
        experiment_id: The ID of the MLflow experiment.
        run_name: Optional. The name of the parent run.

    Returns:
        The run ID of the parent run.
    """
    runs = mlflow.search_runs([experiment_id])
    if len(runs.index) > 0:
        if run_name is not None:
            runs = runs[runs["tags.mlflow.runName"] == run_name]
        if len(runs.index) > 0:
            return runs.sort_values("start_time").iloc[-1].run_id
        else:
            return None
    else:
        return None

Retrieve the run ID of the parent run in an MLflow experiment.

Args

experiment_id
The ID of the MLflow experiment.
run_name
Optional. The name of the parent run.

Returns

The run ID of the parent run.

def get_run_id(experiment_id: int, run_name: str) ‑> str
Expand source code
def get_run_id(experiment_id: int, run_name: str) -> str:
    """Get the run ID of a run in a specific experiment.

    Args:
        experiment_id: The ID of the experiment.
        run_name: The name of the run.

    Returns:
        The run ID of the run.
    """
    client = MlflowClient()
    runs = client.search_runs(experiment_ids=[experiment_id], filter_string=f"params.`mlflow.runName`='{run_name}'")
    if len(runs) == 0:
        raise ValueError(f"No run found for experiment {experiment_id} and run name {run_name}")
    return runs[0].info.run_id

Get the run ID of a run in a specific experiment.

Args

experiment_id
The ID of the experiment.
run_name
The name of the run.

Returns

The run ID of the run.

def get_run_name_from_run_id(run_id: str)
Expand source code
def get_run_name_from_run_id(run_id: str):
    """Retrieve the run name in an MLflow experiment from run ID.

    Args:
        run_id (str): the run ID.

    Returns (str):
        The run name.
    """
    # From id get info
    run_info = mlflow.get_run(run_id)

    # From info get name
    return run_info.data.tags.get("mlflow.runName")

Retrieve the run name in an MLflow experiment from run ID.

Args

run_id : str
the run ID.

Returns (str): The run name.

def list_experiments() ‑> List[str]
Expand source code
def list_experiments() -> List[str]:
    """List the names of all available experiments.

    Returns:
        A list of strings, where each string corresponds to the name of an experiment.
    """
    experiments = mlflow.search_experiments()
    experiment_names = [e.name for e in experiments]
    return experiment_names

List the names of all available experiments.

Returns

A list of strings, where each string corresponds to the name of an experiment.

def list_registered_models() ‑> List[Dict]
Expand source code
def list_registered_models() -> List[Dict]:
    """List alll registered models.

    Returns:
        A list of registered model informations dictionaries.
    """
    client = MlflowClient()
    return client.search_registered_models()

List alll registered models.

Returns

A list of registered model informations dictionaries.

def load_model_from_registry(model_name: str, model_stage: str) ‑> mlflow.models.model.Model | None
Expand source code
def load_model_from_registry(model_name: str, model_stage: str) -> Union[None, mlflow.models.Model]:
    """Load a model from the MLflow model registry.

    Args:
        model_name: The name of the model to load.
        model_stage: The stage of the model to load.

    Returns:
        The loaded model, or None if the model was not found.
    """
    model_path = f"models:/{model_name}/{model_stage}"
    try:
        model = mlflow.pyfunc.load_model(model_path)
    except:
        print(f"Model {model_name} in stage {model_stage} not found")
        model = None
    return model

Load a model from the MLflow model registry.

Args

model_name
The name of the model to load.
model_stage
The stage of the model to load.

Returns

The loaded model, or None if the model was not found.

def load_model_from_remote_registry(registry_scope: str, registry_key: str, model_name: str, model_stage: str) ‑> mlflow.models.model.Model | None
Expand source code
def load_model_from_remote_registry(
    registry_scope: str, registry_key: str, model_name: str, model_stage: str
) -> Union[None, mlflow.models.Model]:
    """Load a model from a remote MLflow model registry.

    Args:
        registry_scope: The scope of the registry to load the model from.
        registry_key: The key for the registry to load the model from.
        model_name: The name of the model to load.
        model_stage: The stage of the model to load.

    Returns:
        The loaded model, or None if the model was not found.
    """
    try:
        model = mlflow.pyfunc.load_model(
            f"models://{registry_scope}:{registry_key}@databricks/{model_name}/{model_stage}"
        )
    except:
        print(f"Model {model_name} in stage {model_stage} not found")
        model = None
    return model

Load a model from a remote MLflow model registry.

Args

registry_scope
The scope of the registry to load the model from.
registry_key
The key for the registry to load the model from.
model_name
The name of the model to load.
model_stage
The stage of the model to load.

Returns

The loaded model, or None if the model was not found.

def load_model_from_uc_registry(spark: pyspark.sql.session.SparkSession,
model_name: str,
alias: str = 'production') ‑> mlflow.pyfunc.model.PythonModel
Expand source code
def load_model_from_uc_registry(spark: SparkSession, model_name: str, alias: str = "production") -> PythonModel:
    """
    Loads a model from the MLflow Model Registry in the specified environment and alias.

    Args:
        spark (SparkSession): The SparkSession object.
        model_name (str): The name of the registered model.
        alias (str, optional): The alias of the model version. Defaults to "production".

    Returns:
        PythonModel: The loaded model.

    Raises:
        Exception: If the specified model or alias is not found in the registry.
    """
    env = get_env_from_cluster_tag(spark)
    model_uri = f"models:/{env}_analytics.models.{model_name}@{alias}"
    return mlflow.pyfunc.load_model(model_uri)

Loads a model from the MLflow Model Registry in the specified environment and alias.

Args

spark : SparkSession
The SparkSession object.
model_name : str
The name of the registered model.
alias : str, optional
The alias of the model version. Defaults to "production".

Returns

PythonModel
The loaded model.

Raises

Exception
If the specified model or alias is not found in the registry.
def log_model_in_experiment(model: mlflow.models.model.Model,
experiment_id: int,
run_name: str,
run_id: str = None,
input_example=None,
signature: mlflow.models.signature.ModelSignature = None,
run_status_running: bool = False) ‑> None
Expand source code
def log_model_in_experiment(
    model: mlflow.models.Model,
    experiment_id: int,
    run_name: str,
    run_id: str = None,
    input_example=None,
    signature: mlflow.models.signature.ModelSignature = None,
    run_status_running: bool = False,
) -> None:
    """Log a model to an experiment. Use run_id combined with run_name if you need to update an existing run.

    Args:
        model: The model to log.
        experiment_id: The ID of the experiment to log the model to.
        run_name (Optional if use run_id): The name of the run to log the model to.
        run_id (Optional if use run_name): The id of the run to log the model to.
        input_example: One or several instances of valid model input. The input example is used as a hint of what data to feed the model. It will be converted to a Pandas DataFrame and then serialized to json using the Pandas split-oriented format, or a numpy array where the example will be serialized to json by converting it to a list. Bytes are base64-encoded. When the signature parameter is None, the input example is used to infer a model signature
        signature: An instance of the ModelSignature class that describes the model’s inputs and outputs
        run_status_running (Optional, bool): if the run provided is in status running, the function will not try to start it, otherwise it will.
    """

    model_flavour = get_flavour_from_model(model)

    if run_status_running == False:
        with mlflow.start_run(experiment_id=experiment_id, run_name=run_name, run_id=run_id) as run:
            mlflow.__getattribute__(model_flavour).log_model(
                model, "model", input_example=input_example, signature=signature
            )
            model_params = model.get_params()
            for param in model_params:
                mlflow.log_param(param, model_params[param])
    else:
        mlflow.__getattribute__(model_flavour).log_model(
            model, "model", input_example=input_example, signature=signature
        )
        model_params = model.get_params()
        for param in model_params:
            mlflow.log_param(param, model_params[param])

Log a model to an experiment. Use run_id combined with run_name if you need to update an existing run.

Args

model
The model to log.
experiment_id
The ID of the experiment to log the model to.
run_name : Optional if use run_id
The name of the run to log the model to.
run_id : Optional if use run_name
The id of the run to log the model to.
input_example
One or several instances of valid model input. The input example is used as a hint of what data to feed the model. It will be converted to a Pandas DataFrame and then serialized to json using the Pandas split-oriented format, or a numpy array where the example will be serialized to json by converting it to a list. Bytes are base64-encoded. When the signature parameter is None, the input example is used to infer a model signature
signature
An instance of the ModelSignature class that describes the model’s inputs and outputs
run_status_running : Optional, bool
if the run provided is in status running, the function will not try to start it, otherwise it will.
def register_model(model_run_id: int, model_name: str, tags: Dict[str, Any] | None = None) ‑> mlflow.entities.model_registry.model_version.ModelVersion
Expand source code
def register_model(
    model_run_id: int, model_name: str, tags: Optional[Dict[str, Any]] = None
) -> mlflow.entities.model_registry.ModelVersion:
    """Register a model in the MLflow model registry.

    Args:
        model_run_id: The run ID of the model to register.
        model_name: The name to give the registered model.
        tags: A dictionary of key-value pairs that are converted into mlflow.entities.model_registry.ModelVersionTag objects.
    """
    return mlflow.register_model(model_uri=f"runs:/{model_run_id}/model", name=model_name, tags=tags)

Register a model in the MLflow model registry.

Args

model_run_id
The run ID of the model to register.
model_name
The name to give the registered model.
tags
A dictionary of key-value pairs that are converted into mlflow.entities.model_registry.ModelVersionTag objects.
def register_model_remotely(model_run_id: int,
model_name: str,
registry_scope: str,
registry_key: str,
tags: Dict[str, Any] | None = None) ‑> None
Expand source code
def register_model_remotely(
    model_run_id: int, model_name: str, registry_scope: str, registry_key: str, tags: Optional[Dict[str, Any]] = None
) -> None:
    """Register a model in a remote MLflow model registry.

    Args:
        model_run_id: The run ID of the model to register.
        model_name: The name to give the registered model.
        registry_scope: The scope of the registry to register the model in.
        registry_key: The key for the registry to register the model in.
        tags: A dictionary of key-value pairs that are converted into mlflow.entities.model_registry.ModelVersionTag objects.
    """
    registry_uri = f"databricks://{registry_scope}:{registry_key}"
    mlflow.set_registry_uri(registry_uri)
    mlflow.register_model(model_uri=f"runs:/{model_run_id}/model", name=model_name, tags=tags)

Register a model in a remote MLflow model registry.

Args

model_run_id
The run ID of the model to register.
model_name
The name to give the registered model.
registry_scope
The scope of the registry to register the model in.
registry_key
The key for the registry to register the model in.
tags
A dictionary of key-value pairs that are converted into mlflow.entities.model_registry.ModelVersionTag objects.
def register_model_uc(spark: pyspark.sql.session.SparkSession,
model: mlflow.pyfunc.model.PythonModel,
model_name: str,
input_example: pandas.core.frame.DataFrame,
alias: str = 'production') ‑> None
Expand source code
def register_model_uc(
    spark: SparkSession, model: PythonModel, model_name: str, input_example: pd.DataFrame, alias: str = "production"
) -> None:
    """
    Registers a model in MLflow Model Registry using the specified model flavor.

    Args:
        spark (SparkSession): The SparkSession object.
        model (PythonModel): The trained model object to be registered.
        model_name (str): The desired name for the registered model.
        input_example (pd.DataFrame): input example to infer model signature. Pass first rows of training dataset.

    Returns:
        None
    """
    model_flavour = get_flavour_from_model(model)
    env = get_env_from_cluster_tag(spark)

    model_name = f"{env}_analytics.models.{model_name}"
    mlflow.set_registry_uri("databricks-uc")
    mlflow.__getattribute__(model_flavour).log_model(
        model,
        input_example=input_example,
        artifact_path="model",
        registered_model_name=model_name,
    )
    client = MlflowClient()
    versions = client.search_model_versions("name = {model_name}")
    newer_version = max([model_version_info.version for model_version_info in versions])
    client.set_registered_model_alias(model_name, alias, newer_version)

Registers a model in MLflow Model Registry using the specified model flavor.

Args

spark : SparkSession
The SparkSession object.
model : PythonModel
The trained model object to be registered.
model_name : str
The desired name for the registered model.
input_example : pd.DataFrame
input example to infer model signature. Pass first rows of training dataset.

Returns

None

def unwrap_model_impl(pyfunc_model: ) ‑> Any
Expand source code
def unwrap_model_impl(pyfunc_model: mlflow.pyfunc) -> Any:
    """Extracts the underlying model implementation from an MLflow PyFunc model.

    Args:
        pyfunc_model: An MLflow PyFunc model.

    Returns:
        The underlying model implementation.
    """
    model_attr = [
        k for k in pyfunc_model._model_impl.__dict__.keys() if k.endswith("_model") and not k.startswith("_")
    ][0]
    return getattr(pyfunc_model._model_impl, model_attr)

Extracts the underlying model implementation from an MLflow PyFunc model.

Args

pyfunc_model
An MLflow PyFunc model.

Returns

The underlying model implementation.