Module panama.metrics.spark_metrics
Functions
def accuracy(sdf: pyspark.sql.dataframe.DataFrame,
target_col: str,
predicted_col: str,
partition_cols: List[str] | None = None,
hierarchical: bool = False) ‑> pyspark.sql.dataframe.DataFrame-
Expand source code
def accuracy( sdf: DataFrame, target_col: str, predicted_col: str, partition_cols: Optional[List[str]] = None, hierarchical: bool = False, ) -> DataFrame: """ Computes accuracy metric for classification models. Accuracy is a measure of the proportion of correct predictions. Args: sdf (DataFrame): Input dataframe containing the target and predicted columns. target_col (str): Name of the target column in the dataframe. predicted_col (str): Name of the predicted column in the dataframe. partition_cols (List[str], optional): List of column names to partition by. Defaults to None. hierarchical (bool, optional): Whether to compute metric hierarchically. Defaults to False. target_normalization_col (str, optional): Name of the column to normalize the target column by. Defaults to None pred_normalization_col (str, optional): Name of the column to normalize the predicted column by. Defaults to None. Returns: DataFrame: Input dataframe with the computed accuracy metric added as a new column. """ metric_name = "accuracy" sdf = sdf.withColumn(metric_name, (F.col(target_col) == F.col(predicted_col)).cast("integer")) if partition_cols: sdf = _compute_metric_by_partition(sdf, partition_cols, metric_name, hierarchical) sdf = sdf.drop(metric_name) return sdf
Computes accuracy metric for classification models. Accuracy is a measure of the proportion of correct predictions.
Args
sdf
:DataFrame
- Input dataframe containing the target and predicted columns.
target_col
:str
- Name of the target column in the dataframe.
predicted_col
:str
- Name of the predicted column in the dataframe.
partition_cols
:List[str]
, optional- List of column names to partition by. Defaults to None.
hierarchical
:bool
, optional- Whether to compute metric hierarchically. Defaults to False.
target_normalization_col
:str
, optional- Name of the column to normalize the target column by. Defaults to None
pred_normalization_col
:str
, optional- Name of the column to normalize the predicted column by. Defaults to None.
Returns
DataFrame
- Input dataframe with the computed accuracy metric added as a new column.
def mae(sdf: pyspark.sql.dataframe.DataFrame,
target_col: str,
predicted_col: str,
partition_cols: List[str] | None = None,
hierarchical: bool = False,
target_normalization_col: str | None = None,
pred_normalization_col: str | None = None,
normalization_mode: str | None = None) ‑> pyspark.sql.dataframe.DataFrame-
Expand source code
def mae( sdf: DataFrame, target_col: str, predicted_col: str, partition_cols: Optional[List[str]] = None, hierarchical: bool = False, target_normalization_col: Optional[str] = None, pred_normalization_col: Optional[str] = None, normalization_mode: Optional[str] = None, ) -> DataFrame: """ Computes mean absolute error (MAE) metric for regression models. MAE is a measure of the absolute error between the predicted and target values. Args: sdf (DataFrame): Input dataframe containing the target and predicted columns. target_col (str): Name of the target column in the dataframe. predicted_col (str): Name of the predicted column in the dataframe. partition_cols (List[str], optional): List of column names to partition by. Defaults to None. hierarchical (bool, optional): Whether to compute metric hierarchically. Defaults to False. target_normalization_col (str, optional): Name of the column to normalize the target column by. Defaults to None pred_normalization_col (str, optional): Name of the column to normalize the predicted column by. Defaults to None. Returns: DataFrame: Input dataframe with the computed MAE metric added as a new column. """ metric_name = "mae" if target_normalization_col and pred_normalization_col: sdf, target_col = _normalize_col(sdf, target_col, target_normalization_col, normalization_mode) # type: ignore sdf, predicted_col = _normalize_col(sdf, predicted_col, pred_normalization_col, normalization_mode) # type: ignore sdf = sdf.withColumn( metric_name, F.abs(F.col(target_col) - F.col(predicted_col)), ) if partition_cols: sdf = _compute_metric_by_partition(sdf, partition_cols, metric_name, hierarchical) sdf = sdf.drop(metric_name) return sdf
Computes mean absolute error (MAE) metric for regression models. MAE is a measure of the absolute error between the predicted and target values.
Args
sdf
:DataFrame
- Input dataframe containing the target and predicted columns.
target_col
:str
- Name of the target column in the dataframe.
predicted_col
:str
- Name of the predicted column in the dataframe.
partition_cols
:List[str]
, optional- List of column names to partition by. Defaults to None.
hierarchical
:bool
, optional- Whether to compute metric hierarchically. Defaults to False.
target_normalization_col
:str
, optional- Name of the column to normalize the target column by. Defaults to None
pred_normalization_col
:str
, optional- Name of the column to normalize the predicted column by. Defaults to None.
Returns
DataFrame
- Input dataframe with the computed MAE metric added as a new column.
def mape(sdf: pyspark.sql.dataframe.DataFrame,
target_col: str,
predicted_col: str,
partition_cols: List[str] | None = None,
hierarchical: bool = False,
target_normalization_col: str | None = None,
pred_normalization_col: str | None = None,
normalization_mode: str | None = None) ‑> pyspark.sql.dataframe.DataFrame-
Expand source code
def mape( sdf: DataFrame, target_col: str, predicted_col: str, partition_cols: Optional[List[str]] = None, hierarchical: bool = False, target_normalization_col: Optional[str] = None, pred_normalization_col: Optional[str] = None, normalization_mode: Optional[str] = None, ) -> DataFrame: """ Computes mean absolute percentage error (MAPE) metric for regression models. MAPE is a measure of the percentage error between the predicted and target values. Args: sdf (DataFrame): Input dataframe containing the target and predicted columns. target_col (str): Name of the target column in the dataframe. predicted_col (str): Name of the predicted column in the dataframe. partition_cols (List[str], optional): List of column names to partition by. Defaults to None. hierarchical (bool, optional): Whether to compute metric hierarchically. Defaults to False. target_normalization_col (str, optional): Name of the column to normalize the target column by. Defaults to None pred_normalization_col (str, optional): Name of the column to normalize the predicted column by. Defaults to None. Returns: DataFrame: Input dataframe with the computed MAPE metric added as a new column. """ metric_name = "mape" if target_normalization_col and pred_normalization_col: sdf, target_col = _normalize_col(sdf, target_col, target_normalization_col, normalization_mode) # type: ignore sdf, predicted_col = _normalize_col(sdf, predicted_col, pred_normalization_col, normalization_mode) # type: ignore sdf = sdf.withColumn( metric_name, F.abs((F.col(target_col) - F.col(predicted_col)) / F.col(target_col)), ) if partition_cols: sdf = _compute_metric_by_partition(sdf, partition_cols, metric_name, hierarchical) sdf = sdf.drop(metric_name) return sdf
Computes mean absolute percentage error (MAPE) metric for regression models. MAPE is a measure of the percentage error between the predicted and target values.
Args
sdf
:DataFrame
- Input dataframe containing the target and predicted columns.
target_col
:str
- Name of the target column in the dataframe.
predicted_col
:str
- Name of the predicted column in the dataframe.
partition_cols
:List[str]
, optional- List of column names to partition by. Defaults to None.
hierarchical
:bool
, optional- Whether to compute metric hierarchically. Defaults to False.
target_normalization_col
:str
, optional- Name of the column to normalize the target column by. Defaults to None
pred_normalization_col
:str
, optional- Name of the column to normalize the predicted column by. Defaults to None.
Returns
DataFrame
- Input dataframe with the computed MAPE metric added as a new column.
def mean_error(sdf: pyspark.sql.dataframe.DataFrame,
target_col: str,
predicted_col: str,
partition_cols: List[str] | None = None,
hierarchical: bool = False,
target_normalization_col: str | None = None,
pred_normalization_col: str | None = None,
normalization_mode: str | None = None) ‑> pyspark.sql.dataframe.DataFrame-
Expand source code
def mean_error( sdf: DataFrame, target_col: str, predicted_col: str, partition_cols: Optional[List[str]] = None, hierarchical: bool = False, target_normalization_col: Optional[str] = None, pred_normalization_col: Optional[str] = None, normalization_mode: Optional[str] = None, ) -> DataFrame: """ Computes mean error metric for regression models. Mean_error is a measure of the signed error between the predicted and target values. Args: sdf (DataFrame): Input dataframe containing the target and predicted columns. target_col (str): Name of the target column in the dataframe. predicted_col (str): Name of the predicted column in the dataframe. partition_cols (List[str], optional): List of column names to partition by. Defaults to None. hierarchical (bool, optional): Whether to compute metric hierarchically. Defaults to False. target_normalization_col (str, optional): Name of the column to normalize the target column by. Defaults to None pred_normalization_col (str, optional): Name of the column to normalize the predicted column by. Defaults to None. Returns: DataFrame: Input dataframe with the computed mean_error metric added as a new column. """ metric_name = "mean_error" if target_normalization_col and pred_normalization_col: sdf, target_col = _normalize_col(sdf, target_col, target_normalization_col, normalization_mode) # type: ignore sdf, predicted_col = _normalize_col(sdf, predicted_col, pred_normalization_col, normalization_mode) # type: ignore sdf = sdf.withColumn(metric_name, F.col(target_col) - F.col(predicted_col)) if partition_cols: sdf = _compute_metric_by_partition(sdf, partition_cols, metric_name, hierarchical) sdf = sdf.drop(metric_name) return sdf
Computes mean error metric for regression models. Mean_error is a measure of the signed error between the predicted and target values.
Args
sdf
:DataFrame
- Input dataframe containing the target and predicted columns.
target_col
:str
- Name of the target column in the dataframe.
predicted_col
:str
- Name of the predicted column in the dataframe.
partition_cols
:List[str]
, optional- List of column names to partition by. Defaults to None.
hierarchical
:bool
, optional- Whether to compute metric hierarchically. Defaults to False.
target_normalization_col
:str
, optional- Name of the column to normalize the target column by. Defaults to None
pred_normalization_col
:str
, optional- Name of the column to normalize the predicted column by. Defaults to None.
Returns
DataFrame
- Input dataframe with the computed mean_error metric added as a new column.
def mse(sdf: pyspark.sql.dataframe.DataFrame,
target_col: str,
predicted_col: str,
partition_cols: List[str] | None = None,
hierarchical: bool = False,
target_normalization_col: str | None = None,
pred_normalization_col: str | None = None,
normalization_mode: str | None = None) ‑> pyspark.sql.dataframe.DataFrame-
Expand source code
def mse( sdf: DataFrame, target_col: str, predicted_col: str, partition_cols: Optional[List[str]] = None, hierarchical: bool = False, target_normalization_col: Optional[str] = None, pred_normalization_col: Optional[str] = None, normalization_mode: Optional[str] = None, ) -> DataFrame: """ Computes mean squared error (MSE) metric for regression models. MSE is a measure of the squared error between the predicted and target values. Args: sdf (DataFrame): Input dataframe containing the target and predicted columns. target_col (str): Name of the target column in the dataframe. predicted_col (str): Name of the predicted column in the dataframe. partition_cols (List[str], optional): List of column names to partition by. Defaults to None. hierarchical (bool, optional): Whether to compute metric hierarchically. Defaults to False. target_normalization_col (str, optional): Name of the column to normalize the target column by. Defaults to None pred_normalization_col (str, optional): Name of the column to normalize the predicted column by. Defaults to None. Returns: DataFrame: Input dataframe with the computed MSE metric added as a new column. """ metric_name = "mse" if target_normalization_col and pred_normalization_col: sdf, target_col = _normalize_col(sdf, target_col, target_normalization_col, normalization_mode) # type: ignore sdf, predicted_col = _normalize_col(sdf, predicted_col, pred_normalization_col, normalization_mode) # type: ignore sdf = sdf.withColumn(metric_name, F.pow(F.col(target_col) - F.col(predicted_col), F.lit(2))) if partition_cols: sdf = _compute_metric_by_partition(sdf, partition_cols, metric_name, hierarchical) sdf = sdf.drop(metric_name) return sdf
Computes mean squared error (MSE) metric for regression models. MSE is a measure of the squared error between the predicted and target values.
Args
sdf
:DataFrame
- Input dataframe containing the target and predicted columns.
target_col
:str
- Name of the target column in the dataframe.
predicted_col
:str
- Name of the predicted column in the dataframe.
partition_cols
:List[str]
, optional- List of column names to partition by. Defaults to None.
hierarchical
:bool
, optional- Whether to compute metric hierarchically. Defaults to False.
target_normalization_col
:str
, optional- Name of the column to normalize the target column by. Defaults to None
pred_normalization_col
:str
, optional- Name of the column to normalize the predicted column by. Defaults to None.
Returns
DataFrame
- Input dataframe with the computed MSE metric added as a new column.
def rmse(sdf: pyspark.sql.dataframe.DataFrame,
target_col: str,
predicted_col: str,
partition_cols: List[str] | None = None,
hierarchical: bool = False,
target_normalization_col: str | None = None,
pred_normalization_col: str | None = None,
normalization_mode: str | None = None) ‑> pyspark.sql.dataframe.DataFrame-
Expand source code
def rmse( sdf: DataFrame, target_col: str, predicted_col: str, partition_cols: Optional[List[str]] = None, hierarchical: bool = False, target_normalization_col: Optional[str] = None, pred_normalization_col: Optional[str] = None, normalization_mode: Optional[str] = None, ) -> DataFrame: """ Computes root mean squared error (RMSE) metric for regression models. RMSE is a measure of the squared error between the predicted and target values. Args: sdf (DataFrame): Input dataframe containing the target and predicted columns. target_col (str): Name of the target column in the dataframe. predicted_col (str): Name of the predicted column in the dataframe. partition_cols (List[str], optional): List of column names to partition by. Defaults to None. hierarchical (bool, optional): Whether to compute metric hierarchically. Defaults to False. target_normalization_col (str, optional): Name of the column to normalize the target column by. Defaults to None pred_normalization_col (str, optional): Name of the column to normalize the predicted column by. Defaults to None. Returns: DataFrame: Input dataframe with the computed RMSE metric added as a new column. """ metric_name = "rmse" if target_normalization_col and pred_normalization_col: sdf, target_col = _normalize_col(sdf, target_col, target_normalization_col, normalization_mode) # type: ignore sdf, predicted_col = _normalize_col(sdf, predicted_col, pred_normalization_col, normalization_mode) # type: ignore sdf = sdf.withColumn(metric_name, F.sqrt(F.pow(F.col(target_col) - F.col(predicted_col), F.lit(2)))) if partition_cols: sdf = _compute_metric_by_partition(sdf, partition_cols, metric_name, hierarchical) sdf = sdf.drop(metric_name) return sdf
Computes root mean squared error (RMSE) metric for regression models. RMSE is a measure of the squared error between the predicted and target values.
Args
sdf
:DataFrame
- Input dataframe containing the target and predicted columns.
target_col
:str
- Name of the target column in the dataframe.
predicted_col
:str
- Name of the predicted column in the dataframe.
partition_cols
:List[str]
, optional- List of column names to partition by. Defaults to None.
hierarchical
:bool
, optional- Whether to compute metric hierarchically. Defaults to False.
target_normalization_col
:str
, optional- Name of the column to normalize the target column by. Defaults to None
pred_normalization_col
:str
, optional- Name of the column to normalize the predicted column by. Defaults to None.
Returns
DataFrame
- Input dataframe with the computed RMSE metric added as a new column.
def smape(sdf: pyspark.sql.dataframe.DataFrame,
target_col: str,
predicted_col: str,
partition_cols: List[str] | None = None,
hierarchical: bool = False,
target_normalization_col: str | None = None,
pred_normalization_col: str | None = None,
normalization_mode: str | None = None) ‑> pyspark.sql.dataframe.DataFrame-
Expand source code
def smape( sdf: DataFrame, target_col: str, predicted_col: str, partition_cols: Optional[List[str]] = None, hierarchical: bool = False, target_normalization_col: Optional[str] = None, pred_normalization_col: Optional[str] = None, normalization_mode: Optional[str] = None, ) -> DataFrame: """ Computes symmetric mean absolute percentage error (SMAPE) metric for regression models. SMAPE is a measure of the percentage error between the predicted and target values. Args: sdf (DataFrame): Input dataframe containing the target and predicted columns. target_col (str): Name of the target column in the dataframe. predicted_col (str): Name of the predicted column in the dataframe. partition_cols (List[str], optional): List of column names to partition by. Defaults to None. hierarchical (bool, optional): Whether to compute metric hierarchically. Defaults to False. target_normalization_col (str, optional): Name of the column to normalize the target column by. Defaults to None pred_normalization_col (str, optional): Name of the column to normalize the predicted column by. Defaults to None. Returns: DataFrame: Input dataframe with the computed SMAPE metric added as a new column. """ metric_name = "smape" if target_normalization_col and pred_normalization_col: sdf, target_col = _normalize_col(sdf, target_col, target_normalization_col, normalization_mode) # type: ignore sdf, predicted_col = _normalize_col(sdf, predicted_col, pred_normalization_col, normalization_mode) # type: ignore sdf = sdf.withColumn( metric_name, F.when((F.col(target_col) + F.col(predicted_col)) == 0, 0).otherwise( F.abs(F.col(target_col) - F.col(predicted_col)) / ((F.abs(F.col(target_col)) + F.abs(F.col(predicted_col))) / 2) ), ) if partition_cols: sdf = _compute_metric_by_partition(sdf, partition_cols, metric_name, hierarchical) sdf = sdf.drop(metric_name) return sdf
Computes symmetric mean absolute percentage error (SMAPE) metric for regression models. SMAPE is a measure of the percentage error between the predicted and target values.
Args
sdf
:DataFrame
- Input dataframe containing the target and predicted columns.
target_col
:str
- Name of the target column in the dataframe.
predicted_col
:str
- Name of the predicted column in the dataframe.
partition_cols
:List[str]
, optional- List of column names to partition by. Defaults to None.
hierarchical
:bool
, optional- Whether to compute metric hierarchically. Defaults to False.
target_normalization_col
:str
, optional- Name of the column to normalize the target column by. Defaults to None
pred_normalization_col
:str
, optional- Name of the column to normalize the predicted column by. Defaults to None.
Returns
DataFrame
- Input dataframe with the computed SMAPE metric added as a new column.