Module panama.metrics.spark_metrics

Functions

def accuracy(sdf: pyspark.sql.dataframe.DataFrame,
target_col: str,
predicted_col: str,
partition_cols: List[str] | None = None,
hierarchical: bool = False) ‑> pyspark.sql.dataframe.DataFrame
Expand source code
def accuracy(
    sdf: DataFrame,
    target_col: str,
    predicted_col: str,
    partition_cols: Optional[List[str]] = None,
    hierarchical: bool = False,
) -> DataFrame:
    """
    Computes accuracy metric for classification models.
    Accuracy is a measure of the proportion of correct predictions.

    Args:
        sdf (DataFrame): Input dataframe containing the target and predicted columns.
        target_col (str): Name of the target column in the dataframe.
        predicted_col (str): Name of the predicted column in the dataframe.
        partition_cols (List[str], optional): List of column names to partition by. Defaults to None.
        hierarchical (bool, optional): Whether to compute metric hierarchically. Defaults to False.
        target_normalization_col (str, optional): Name of the column to normalize the target column by. Defaults to None
        pred_normalization_col (str, optional): Name of the column to normalize the predicted column by. Defaults to None.

    Returns:
        DataFrame: Input dataframe with the computed accuracy metric added as a new column.
    """
    metric_name = "accuracy"
    sdf = sdf.withColumn(metric_name, (F.col(target_col) == F.col(predicted_col)).cast("integer"))

    if partition_cols:
        sdf = _compute_metric_by_partition(sdf, partition_cols, metric_name, hierarchical)
        sdf = sdf.drop(metric_name)
    return sdf

Computes accuracy metric for classification models. Accuracy is a measure of the proportion of correct predictions.

Args

sdf : DataFrame
Input dataframe containing the target and predicted columns.
target_col : str
Name of the target column in the dataframe.
predicted_col : str
Name of the predicted column in the dataframe.
partition_cols : List[str], optional
List of column names to partition by. Defaults to None.
hierarchical : bool, optional
Whether to compute metric hierarchically. Defaults to False.
target_normalization_col : str, optional
Name of the column to normalize the target column by. Defaults to None
pred_normalization_col : str, optional
Name of the column to normalize the predicted column by. Defaults to None.

Returns

DataFrame
Input dataframe with the computed accuracy metric added as a new column.
def mae(sdf: pyspark.sql.dataframe.DataFrame,
target_col: str,
predicted_col: str,
partition_cols: List[str] | None = None,
hierarchical: bool = False,
target_normalization_col: str | None = None,
pred_normalization_col: str | None = None,
normalization_mode: str | None = None) ‑> pyspark.sql.dataframe.DataFrame
Expand source code
def mae(
    sdf: DataFrame,
    target_col: str,
    predicted_col: str,
    partition_cols: Optional[List[str]] = None,
    hierarchical: bool = False,
    target_normalization_col: Optional[str] = None,
    pred_normalization_col: Optional[str] = None,
    normalization_mode: Optional[str] = None,
) -> DataFrame:
    """
    Computes mean absolute error (MAE) metric for regression models.
    MAE is a measure of the absolute error between the predicted and target values.

    Args:
        sdf (DataFrame): Input dataframe containing the target and predicted columns.
        target_col (str): Name of the target column in the dataframe.
        predicted_col (str): Name of the predicted column in the dataframe.
        partition_cols (List[str], optional): List of column names to partition by. Defaults to None.
        hierarchical (bool, optional): Whether to compute metric hierarchically. Defaults to False.
        target_normalization_col (str, optional): Name of the column to normalize the target column by. Defaults to None
        pred_normalization_col (str, optional): Name of the column to normalize the predicted column by. Defaults to None.

    Returns:
        DataFrame: Input dataframe with the computed MAE metric added as a new column.
    """
    metric_name = "mae"
    if target_normalization_col and pred_normalization_col:
        sdf, target_col = _normalize_col(sdf, target_col, target_normalization_col, normalization_mode)  # type: ignore
        sdf, predicted_col = _normalize_col(sdf, predicted_col, pred_normalization_col, normalization_mode)  # type: ignore
    sdf = sdf.withColumn(
        metric_name,
        F.abs(F.col(target_col) - F.col(predicted_col)),
    )

    if partition_cols:
        sdf = _compute_metric_by_partition(sdf, partition_cols, metric_name, hierarchical)
        sdf = sdf.drop(metric_name)
    return sdf

Computes mean absolute error (MAE) metric for regression models. MAE is a measure of the absolute error between the predicted and target values.

Args

sdf : DataFrame
Input dataframe containing the target and predicted columns.
target_col : str
Name of the target column in the dataframe.
predicted_col : str
Name of the predicted column in the dataframe.
partition_cols : List[str], optional
List of column names to partition by. Defaults to None.
hierarchical : bool, optional
Whether to compute metric hierarchically. Defaults to False.
target_normalization_col : str, optional
Name of the column to normalize the target column by. Defaults to None
pred_normalization_col : str, optional
Name of the column to normalize the predicted column by. Defaults to None.

Returns

DataFrame
Input dataframe with the computed MAE metric added as a new column.
def mape(sdf: pyspark.sql.dataframe.DataFrame,
target_col: str,
predicted_col: str,
partition_cols: List[str] | None = None,
hierarchical: bool = False,
target_normalization_col: str | None = None,
pred_normalization_col: str | None = None,
normalization_mode: str | None = None) ‑> pyspark.sql.dataframe.DataFrame
Expand source code
def mape(
    sdf: DataFrame,
    target_col: str,
    predicted_col: str,
    partition_cols: Optional[List[str]] = None,
    hierarchical: bool = False,
    target_normalization_col: Optional[str] = None,
    pred_normalization_col: Optional[str] = None,
    normalization_mode: Optional[str] = None,
) -> DataFrame:
    """
    Computes mean absolute percentage error (MAPE) metric for regression models.
    MAPE is a measure of the percentage error between the predicted and target values.

    Args:
        sdf (DataFrame): Input dataframe containing the target and predicted columns.
        target_col (str): Name of the target column in the dataframe.
        predicted_col (str): Name of the predicted column in the dataframe.
        partition_cols (List[str], optional): List of column names to partition by. Defaults to None.
        hierarchical (bool, optional): Whether to compute metric hierarchically. Defaults to False.
        target_normalization_col (str, optional): Name of the column to normalize the target column by. Defaults to None
        pred_normalization_col (str, optional): Name of the column to normalize the predicted column by. Defaults to None.

    Returns:
        DataFrame: Input dataframe with the computed MAPE metric added as a new column.
    """
    metric_name = "mape"
    if target_normalization_col and pred_normalization_col:
        sdf, target_col = _normalize_col(sdf, target_col, target_normalization_col, normalization_mode)  # type: ignore
        sdf, predicted_col = _normalize_col(sdf, predicted_col, pred_normalization_col, normalization_mode)  # type: ignore
    sdf = sdf.withColumn(
        metric_name,
        F.abs((F.col(target_col) - F.col(predicted_col)) / F.col(target_col)),
    )

    if partition_cols:
        sdf = _compute_metric_by_partition(sdf, partition_cols, metric_name, hierarchical)
        sdf = sdf.drop(metric_name)
    return sdf

Computes mean absolute percentage error (MAPE) metric for regression models. MAPE is a measure of the percentage error between the predicted and target values.

Args

sdf : DataFrame
Input dataframe containing the target and predicted columns.
target_col : str
Name of the target column in the dataframe.
predicted_col : str
Name of the predicted column in the dataframe.
partition_cols : List[str], optional
List of column names to partition by. Defaults to None.
hierarchical : bool, optional
Whether to compute metric hierarchically. Defaults to False.
target_normalization_col : str, optional
Name of the column to normalize the target column by. Defaults to None
pred_normalization_col : str, optional
Name of the column to normalize the predicted column by. Defaults to None.

Returns

DataFrame
Input dataframe with the computed MAPE metric added as a new column.
def mean_error(sdf: pyspark.sql.dataframe.DataFrame,
target_col: str,
predicted_col: str,
partition_cols: List[str] | None = None,
hierarchical: bool = False,
target_normalization_col: str | None = None,
pred_normalization_col: str | None = None,
normalization_mode: str | None = None) ‑> pyspark.sql.dataframe.DataFrame
Expand source code
def mean_error(
    sdf: DataFrame,
    target_col: str,
    predicted_col: str,
    partition_cols: Optional[List[str]] = None,
    hierarchical: bool = False,
    target_normalization_col: Optional[str] = None,
    pred_normalization_col: Optional[str] = None,
    normalization_mode: Optional[str] = None,
) -> DataFrame:
    """
    Computes mean error metric for regression models.
    Mean_error is a measure of the signed error between the predicted and target values.

    Args:
        sdf (DataFrame): Input dataframe containing the target and predicted columns.
        target_col (str): Name of the target column in the dataframe.
        predicted_col (str): Name of the predicted column in the dataframe.
        partition_cols (List[str], optional): List of column names to partition by. Defaults to None.
        hierarchical (bool, optional): Whether to compute metric hierarchically. Defaults to False.
        target_normalization_col (str, optional): Name of the column to normalize the target column by. Defaults to None
        pred_normalization_col (str, optional): Name of the column to normalize the predicted column by. Defaults to None.

    Returns:
        DataFrame: Input dataframe with the computed mean_error metric added as a new column.
    """
    metric_name = "mean_error"
    if target_normalization_col and pred_normalization_col:
        sdf, target_col = _normalize_col(sdf, target_col, target_normalization_col, normalization_mode)  # type: ignore
        sdf, predicted_col = _normalize_col(sdf, predicted_col, pred_normalization_col, normalization_mode)  # type: ignore
    sdf = sdf.withColumn(metric_name, F.col(target_col) - F.col(predicted_col))

    if partition_cols:
        sdf = _compute_metric_by_partition(sdf, partition_cols, metric_name, hierarchical)
        sdf = sdf.drop(metric_name)
    return sdf

Computes mean error metric for regression models. Mean_error is a measure of the signed error between the predicted and target values.

Args

sdf : DataFrame
Input dataframe containing the target and predicted columns.
target_col : str
Name of the target column in the dataframe.
predicted_col : str
Name of the predicted column in the dataframe.
partition_cols : List[str], optional
List of column names to partition by. Defaults to None.
hierarchical : bool, optional
Whether to compute metric hierarchically. Defaults to False.
target_normalization_col : str, optional
Name of the column to normalize the target column by. Defaults to None
pred_normalization_col : str, optional
Name of the column to normalize the predicted column by. Defaults to None.

Returns

DataFrame
Input dataframe with the computed mean_error metric added as a new column.
def mse(sdf: pyspark.sql.dataframe.DataFrame,
target_col: str,
predicted_col: str,
partition_cols: List[str] | None = None,
hierarchical: bool = False,
target_normalization_col: str | None = None,
pred_normalization_col: str | None = None,
normalization_mode: str | None = None) ‑> pyspark.sql.dataframe.DataFrame
Expand source code
def mse(
    sdf: DataFrame,
    target_col: str,
    predicted_col: str,
    partition_cols: Optional[List[str]] = None,
    hierarchical: bool = False,
    target_normalization_col: Optional[str] = None,
    pred_normalization_col: Optional[str] = None,
    normalization_mode: Optional[str] = None,
) -> DataFrame:
    """
    Computes mean squared error (MSE) metric for regression models.
    MSE is a measure of the squared error between the predicted and target values.

    Args:
        sdf (DataFrame): Input dataframe containing the target and predicted columns.
        target_col (str): Name of the target column in the dataframe.
        predicted_col (str): Name of the predicted column in the dataframe.
        partition_cols (List[str], optional): List of column names to partition by. Defaults to None.
        hierarchical (bool, optional): Whether to compute metric hierarchically. Defaults to False.
        target_normalization_col (str, optional): Name of the column to normalize the target column by. Defaults to None
        pred_normalization_col (str, optional): Name of the column to normalize the predicted column by. Defaults to None.

    Returns:
        DataFrame: Input dataframe with the computed MSE metric added as a new column.
    """
    metric_name = "mse"
    if target_normalization_col and pred_normalization_col:
        sdf, target_col = _normalize_col(sdf, target_col, target_normalization_col, normalization_mode)  # type: ignore
        sdf, predicted_col = _normalize_col(sdf, predicted_col, pred_normalization_col, normalization_mode)  # type: ignore
    sdf = sdf.withColumn(metric_name, F.pow(F.col(target_col) - F.col(predicted_col), F.lit(2)))

    if partition_cols:
        sdf = _compute_metric_by_partition(sdf, partition_cols, metric_name, hierarchical)
        sdf = sdf.drop(metric_name)
    return sdf

Computes mean squared error (MSE) metric for regression models. MSE is a measure of the squared error between the predicted and target values.

Args

sdf : DataFrame
Input dataframe containing the target and predicted columns.
target_col : str
Name of the target column in the dataframe.
predicted_col : str
Name of the predicted column in the dataframe.
partition_cols : List[str], optional
List of column names to partition by. Defaults to None.
hierarchical : bool, optional
Whether to compute metric hierarchically. Defaults to False.
target_normalization_col : str, optional
Name of the column to normalize the target column by. Defaults to None
pred_normalization_col : str, optional
Name of the column to normalize the predicted column by. Defaults to None.

Returns

DataFrame
Input dataframe with the computed MSE metric added as a new column.
def rmse(sdf: pyspark.sql.dataframe.DataFrame,
target_col: str,
predicted_col: str,
partition_cols: List[str] | None = None,
hierarchical: bool = False,
target_normalization_col: str | None = None,
pred_normalization_col: str | None = None,
normalization_mode: str | None = None) ‑> pyspark.sql.dataframe.DataFrame
Expand source code
def rmse(
    sdf: DataFrame,
    target_col: str,
    predicted_col: str,
    partition_cols: Optional[List[str]] = None,
    hierarchical: bool = False,
    target_normalization_col: Optional[str] = None,
    pred_normalization_col: Optional[str] = None,
    normalization_mode: Optional[str] = None,
) -> DataFrame:
    """
    Computes root mean squared error (RMSE) metric for regression models.
    RMSE is a measure of the squared error between the predicted and target values.

    Args:
        sdf (DataFrame): Input dataframe containing the target and predicted columns.
        target_col (str): Name of the target column in the dataframe.
        predicted_col (str): Name of the predicted column in the dataframe.
        partition_cols (List[str], optional): List of column names to partition by. Defaults to None.
        hierarchical (bool, optional): Whether to compute metric hierarchically. Defaults to False.
        target_normalization_col (str, optional): Name of the column to normalize the target column by. Defaults to None
        pred_normalization_col (str, optional): Name of the column to normalize the predicted column by. Defaults to None.

    Returns:
        DataFrame: Input dataframe with the computed RMSE metric added as a new column.
    """
    metric_name = "rmse"
    if target_normalization_col and pred_normalization_col:
        sdf, target_col = _normalize_col(sdf, target_col, target_normalization_col, normalization_mode)  # type: ignore
        sdf, predicted_col = _normalize_col(sdf, predicted_col, pred_normalization_col, normalization_mode)  # type: ignore
    sdf = sdf.withColumn(metric_name, F.sqrt(F.pow(F.col(target_col) - F.col(predicted_col), F.lit(2))))

    if partition_cols:
        sdf = _compute_metric_by_partition(sdf, partition_cols, metric_name, hierarchical)
        sdf = sdf.drop(metric_name)
    return sdf

Computes root mean squared error (RMSE) metric for regression models. RMSE is a measure of the squared error between the predicted and target values.

Args

sdf : DataFrame
Input dataframe containing the target and predicted columns.
target_col : str
Name of the target column in the dataframe.
predicted_col : str
Name of the predicted column in the dataframe.
partition_cols : List[str], optional
List of column names to partition by. Defaults to None.
hierarchical : bool, optional
Whether to compute metric hierarchically. Defaults to False.
target_normalization_col : str, optional
Name of the column to normalize the target column by. Defaults to None
pred_normalization_col : str, optional
Name of the column to normalize the predicted column by. Defaults to None.

Returns

DataFrame
Input dataframe with the computed RMSE metric added as a new column.
def smape(sdf: pyspark.sql.dataframe.DataFrame,
target_col: str,
predicted_col: str,
partition_cols: List[str] | None = None,
hierarchical: bool = False,
target_normalization_col: str | None = None,
pred_normalization_col: str | None = None,
normalization_mode: str | None = None) ‑> pyspark.sql.dataframe.DataFrame
Expand source code
def smape(
    sdf: DataFrame,
    target_col: str,
    predicted_col: str,
    partition_cols: Optional[List[str]] = None,
    hierarchical: bool = False,
    target_normalization_col: Optional[str] = None,
    pred_normalization_col: Optional[str] = None,
    normalization_mode: Optional[str] = None,
) -> DataFrame:
    """
    Computes symmetric mean absolute percentage error (SMAPE) metric for regression models.
    SMAPE is a measure of the percentage error between the predicted and target values.

    Args:
        sdf (DataFrame): Input dataframe containing the target and predicted columns.
        target_col (str): Name of the target column in the dataframe.
        predicted_col (str): Name of the predicted column in the dataframe.
        partition_cols (List[str], optional): List of column names to partition by. Defaults to None.
        hierarchical (bool, optional): Whether to compute metric hierarchically. Defaults to False.
        target_normalization_col (str, optional): Name of the column to normalize the target column by. Defaults to None
        pred_normalization_col (str, optional): Name of the column to normalize the predicted column by. Defaults to None.

    Returns:
        DataFrame: Input dataframe with the computed SMAPE metric added as a new column.
    """
    metric_name = "smape"
    if target_normalization_col and pred_normalization_col:
        sdf, target_col = _normalize_col(sdf, target_col, target_normalization_col, normalization_mode)  # type: ignore
        sdf, predicted_col = _normalize_col(sdf, predicted_col, pred_normalization_col, normalization_mode)  # type: ignore
    sdf = sdf.withColumn(
        metric_name,
        F.when((F.col(target_col) + F.col(predicted_col)) == 0, 0).otherwise(
            F.abs(F.col(target_col) - F.col(predicted_col))
            / ((F.abs(F.col(target_col)) + F.abs(F.col(predicted_col))) / 2)
        ),
    )

    if partition_cols:
        sdf = _compute_metric_by_partition(sdf, partition_cols, metric_name, hierarchical)
        sdf = sdf.drop(metric_name)
    return sdf

Computes symmetric mean absolute percentage error (SMAPE) metric for regression models. SMAPE is a measure of the percentage error between the predicted and target values.

Args

sdf : DataFrame
Input dataframe containing the target and predicted columns.
target_col : str
Name of the target column in the dataframe.
predicted_col : str
Name of the predicted column in the dataframe.
partition_cols : List[str], optional
List of column names to partition by. Defaults to None.
hierarchical : bool, optional
Whether to compute metric hierarchically. Defaults to False.
target_normalization_col : str, optional
Name of the column to normalize the target column by. Defaults to None
pred_normalization_col : str, optional
Name of the column to normalize the predicted column by. Defaults to None.

Returns

DataFrame
Input dataframe with the computed SMAPE metric added as a new column.