Module panama.feature_engineering.data_cleaning

Classes

class DataCleaner
Expand source code
class DataCleaner:
    def __init__(self):
        self.strategies = {
            "mean": self._impute_with_mean,
            "median": self._impute_with_median,
            "mode": self._impute_with_mode,
            "previous_hour": self._impute_with_previous_hour,
            "previous_day": self._impute_with_previous_day,
            "previous_week": self._impute_with_previous_week,
            "previous_month": self._impute_with_previous_month,
            "previous_year": self._impute_with_previous_year,
        }
        super().__init__()

    def detect_outliers_zscore(
        self,
        sdf: DataFrame,
        input_cols: Union[str, List[str]],
        group_cols: List[str] = [],
        threshold: float = 3,
        remove: bool = False,
    ) -> DataFrame:
        """
        Detects outliers in the input column(s) in a Spark DataFrame using the z-score method.

        Args:
            sdf: The Spark DataFrame to detect outliers in.
            input_cols: The name of the column or a list of column names to detect outliers in.
                If a list is provided, outlier detection will be performed on each column in the list.
            group_cols: The names of the columns to group by when detecting outliers. Defaults to an empty list.
            threshold: The z-score threshold for outliers detection. Defaults to 3.
            remove: If True, remove the rows containing the outliers.
                If False, keep the rows containing the outliers and create an additional column for each input column to identify them. Defaults to False.

        Returns:
            The input DataFrame with additional columns indicating whether each row is an outlier or not.
            If remove is True, the DataFrame will have the outliers removed insted.

        Raises:
            TypeError: If input_cols is not a string or a list.
        """
        if isinstance(input_cols, str):
            sdf = self._detect_outliers_zscore(
                sdf=sdf, input_col=input_cols, group_cols=group_cols, threshold=threshold, remove=remove
            )
        elif isinstance(input_cols, list):
            for input_col in input_cols:
                sdf = self._detect_outliers_zscore(
                    sdf=sdf, input_col=input_col, group_cols=group_cols, threshold=threshold, remove=remove
                )
        else:
            raise TypeError(f"{type(input_cols)} not valid for input_cols")
        return sdf

    @staticmethod
    def _detect_outliers_zscore(
        sdf: DataFrame, input_col: str, group_cols: List[str] = [], threshold: float = 3, remove: bool = False
    ) -> DataFrame:
        """
        Detects outliers in the input column in a Spark DataFrame using the z-score method.

        Args:
            sdf: The Spark DataFrame to detect outliers in.
            input_col: The name of the column to detect outliers in.
            group_cols: The names of the columns to group by when detecting outliers. Defaults to an empty list.
            threshold: The z-score threshold for outliers detection. Defaults to 3.
            remove: If True, remove the rows containing the outliers.
                If False, keep the rows containing the outliers and create an additional column for each input column to identify them. Defaults to False.

        Returns:
            Input DataFrame with an additional column indicating whether each row is an outlier or not.
            If remove is True, the DataFrame will have the outliers removed insted.

        Raises:
            ValueError: If input_col is not a column of sdf.
        """
        if input_col in sdf.columns:
            w = Window.partitionBy(group_cols)
            sdf = sdf.withColumn("mean", F.mean(F.col(input_col)).over(w)).withColumn(
                "stddev", F.stddev(F.col(input_col)).over(w)
            )
            sdf = sdf.withColumn("zscore".lower(), F.abs((F.col(input_col) - F.col("mean")) / F.col("stddev")))
            sdf = sdf.withColumn(f"is_outlier_{input_col}", F.col("zscore") > threshold)
            if remove:
                sdf = sdf.filter(
                    (~F.col(f"is_outlier_{input_col}")) | (F.col(f"is_outlier_{input_col}").isNull())
                ).drop(f"is_outlier_{input_col}")
            return sdf.drop("mean", "stddev", "zscore")
        else:
            raise ValueError(f"{input_col} is not a column of sdf")

    def detect_outliers_SMA_bands(
        self,
        sdf: DataFrame,
        time_col: str,
        input_cols: Union[str, List[str]],
        group_cols: List[str] = [],
        n_stddev_threshold: int = 2,
        lookback_window: int = 20,
        remove: bool = False,
    ) -> DataFrame:
        """
        Detects outliers in the input column(s) in a Spark DataFrame using the Simple Moving Average (SMA) Bands method.

        Args:
            sdf: The Spark DataFrame to detect outliers in.
            time_col: The name of the column containing the timestamp.
            input_cols: The name of the column or a list of column names to detect outliers in.
                If a list is provided, outlier detection will be performed on each column in the list.
            group_cols: The names of the columns to group by when detecting outliers. Defaults to an empty list.
            n_stddev_threshold: The number of standard deviations from the mean to use as the threshold for outlier detection. Defaults to 2.
            lookback_window: The size of the lookback window for the SMA calculation. Defaults to 20.
            remove: If True, remove the rows containing the outliers.
                If False, keep the rows containing the outliers and create an additional column for each input column to identify them. Defaults to False.

        Returns:
            The input DataFrame with additional columns indicating whether each row is an outlier or not.
            If remove is True, the DataFrame will have the outliers removed instead.

        Raises:
            TypeError: If input_cols is not a string or a list of strings.
        """
        if isinstance(input_cols, str):
            sdf = self._detect_outliers_SMA_bands(
                sdf=sdf,
                time_col=time_col,
                input_col=input_cols,
                group_cols=group_cols,
                n_stddev_threshold=n_stddev_threshold,
                lookback_window=lookback_window,
                remove=remove,
            )
        elif isinstance(input_cols, list):
            for input_col in input_cols:
                sdf = self._detect_outliers_SMA_bands(
                    sdf=sdf,
                    time_col=time_col,
                    input_col=input_col,
                    group_cols=group_cols,
                    n_stddev_threshold=n_stddev_threshold,
                    lookback_window=lookback_window,
                    remove=remove,
                )
        else:
            raise TypeError(f"{type(input_cols)} not valid for input_cols")
        return sdf

    @staticmethod
    def _detect_outliers_SMA_bands(
        sdf: DataFrame,
        time_col: str,
        input_col: str,
        group_cols: List[str] = [],
        n_stddev_threshold: int = 2,
        lookback_window: int = 20,
        remove: bool = False,
    ) -> DataFrame:
        """
        Detects outliers in the input column in a Spark DataFrame using the Simple Moving Average (SMA) Bands method.

        Args:
            sdf: The Spark DataFrame to detect outliers in.
            time_col: The name of the column containing the timestamp.
            input_col: The name of the column to detect outliers in.
            group_cols: The names of the columns to group by when detecting outliers. Defaults to an empty list.
            n_stddev_threshold: The number of standard deviations from the mean to use as the threshold for outlier detection. Defaults to 2.
            lookback_window: The size of the lookback window for the SMA calculation. Defaults to 20.
            remove: If True, remove the rows containing the outliers.
                If False, keep the rows containing the outliers and create an additional column for each input column to identify them. Defaults to False.

        Returns:
            The input DataFrame with additional columns indicating whether each row is an outlier or not.
            If remove is True, the DataFrame will have the outliers removed instead.

        Raises:
            ValueError: If input_col is not a column of sdf.
        """
        if input_col in sdf.columns:
            sdf = sdf.withColumn(
                "ranked_ind",
                F.row_number().over(Window.partitionBy(group_cols).orderBy(time_col)),
            )
            sdf_a = (
                sdf.select(group_cols + [time_col, "ranked_ind"])
                .alias("a")
                .withColumnRenamed("ranked_ind", "ranked_ind_a")
                .withColumnRenamed(time_col, f"{time_col}_a")
            )
            sdf_b = (
                sdf.select(group_cols + [time_col, input_col, "ranked_ind"])
                .alias("b")
                .withColumnRenamed("ranked_ind", "ranked_ind_b")
                .withColumnRenamed(time_col, f"{time_col}_b")
            )
            if group_cols != []:
                stg = sdf_a.join(sdf_b, on=group_cols).filter(
                    (F.col("ranked_ind_a") >= F.col("ranked_ind_b"))
                    & (F.col("ranked_ind_a") < F.col("ranked_ind_b") + lookback_window)
                )
            else:
                stg = sdf_a.join(
                    sdf_b,
                    on=(
                        (sdf_a.ranked_ind_a >= sdf_b.ranked_ind_b)
                        & (sdf_a.ranked_ind_a < sdf_b.ranked_ind_b + lookback_window)
                    ),
                )
            stg = stg.groupby(group_cols + [f"{time_col}_a"]).agg(
                F.round(F.mean(input_col), 4).alias("sma_mean"),
                F.coalesce(F.round(F.stddev(input_col), 4), F.lit(0)).alias("sma_stddev"),
            )
            stg = stg.withColumn(
                "ub_sma",
                F.round(F.col("sma_mean") + n_stddev_threshold * F.col("sma_stddev"), 4),
            )
            stg = stg.withColumn(
                "lb_sma",
                F.round(F.col("sma_mean") - n_stddev_threshold * F.col("sma_stddev"), 4),
            )
            sdf = sdf.join(
                stg.withColumnRenamed(f"{time_col}_a", time_col),
                on=group_cols + [time_col],
                how="left",
            ).drop("ranked_ind")
            sdf = sdf.withColumn(
                f"is_outlier_{input_col}",
                ((F.col(input_col) < F.col("lb_sma")) | (F.col(input_col) > F.col("ub_sma"))),
            )
            if remove:
                sdf = sdf.filter(
                    (~F.col(f"is_outlier_{input_col}")) | (F.col(f"is_outlier_{input_col}").isNull())
                ).drop(f"is_outlier_{input_col}")
            return sdf.drop("sma_mean", "sma_stddev", "ub_sma", "lb_sma")
        else:
            raise ValueError(f"{input_col} is not a column of sdf")

    @staticmethod
    def _impute_with_mean(sdf: DataFrame, input_col: str, group_cols: List[str]) -> DataFrame:
        """
        Computes the mean of a column grouped by one or more columns, and replaces missing values in the column with the computed mean.

        Args:
            sdf: The Spark DataFrame to impute missing values in.
            input_col: The name of the column to impute missing values in.
            group_cols: The names of the columns to group by.

        Returns:
            A new Spark DataFrame where missing values in the specified column have been replaced with the mean of that column, computed for each group separately.
        """
        w = Window.partitionBy(group_cols)
        return sdf.withColumn(input_col, F.coalesce(F.col(input_col), F.mean(F.col(input_col)).over(w)))

    @staticmethod
    def _impute_with_median(sdf: DataFrame, input_col: str, group_cols: List[str]) -> DataFrame:
        """
        Computes the median of a column grouped by one or more columns, and replaces missing values in the column with the computed median.

        Args:
            sdf: The Spark DataFrame to impute missing values in.
            input_col: The name of the column to impute missing values in.
            group_cols: The names of the columns to group by.

        Returns:
            A new Spark DataFrame where missing values in the specified column have been replaced with the median of that column, computed for each group separately.
        """
        w = Window.partitionBy(group_cols)
        return sdf.withColumn(input_col, F.coalesce(F.col(input_col), F.expr(f"percentile({input_col}, 0.5)").over(w)))

    @staticmethod
    def _impute_with_mode(sdf: DataFrame, input_col: str, group_cols: List[str] = []) -> DataFrame:
        """
        Computes the mode of a column grouped by one or more columns, and replaces missing values in the column with the computed mode.

        Args:
            sdf: The Spark DataFrame to impute missing values in.
            input_col: The name of the column to impute missing values in.
            group_cols: The names of the columns to group by.

        Returns:
            A new Spark DataFrame where missing values in the specified column have been replaced with the mode of that column, computed for each group separately.
        """
        sdf_mode = sdf.groupBy(group_cols + [input_col]).count()
        w = Window.partitionBy(group_cols).orderBy(F.desc("count"))
        sdf_mode = (
            sdf_mode.withColumn("row_number", F.row_number().over(w))
            .filter(F.col("row_number") == 1)
            .withColumnRenamed(input_col, f"{input_col}_mode")
        ).select(group_cols + [f"{input_col}_mode"])
        if group_cols != []:
            sdf = sdf.join(
                sdf_mode,
                on=group_cols,
                how="left",
            )
            return sdf.withColumn(input_col, F.coalesce(F.col(input_col), F.col(f"{input_col}_mode"))).drop(
                f"{input_col}_mode"
            )
        else:
            return sdf.withColumn(input_col, F.coalesce(F.col(input_col), F.lit(sdf_mode.collect()[0][0]))).drop(
                f"{input_col}_mode"
            )

    @staticmethod
    def _impute_with_previous_hour(sdf: DataFrame, time_col: str, input_col: str, group_cols: List[str]) -> DataFrame:
        """
        Imputes missing values in a column by replacing them with the corresponding values from the previous hour, grouped by one or more columns.

        Args:
            sdf: The Spark DataFrame to impute missing values in.
            time_col: The name of the column containing the datetime.
            input_col: The name of the column to impute missing values in.
            group_cols: The names of the columns to group by.

        Returns:
            A new Spark DataFrame where missing values in the specified column have been replaced with the corresponding values from the previous hour,
            computed for each group separately.
        """
        fe = FeatureExtractor()
        sdf = fe.compute_col_lag_safe(
            sdf=sdf, lag_col=time_col, lag_interval="1 hour", value_col=input_col, join_cols=group_cols
        )
        sdf = sdf.withColumn(input_col, F.coalesce(F.col(input_col), F.col(f"{input_col}-1hour"))).drop(
            f"{input_col}-1hour"
        )
        return sdf

    @staticmethod
    def _impute_with_previous_day(sdf: DataFrame, time_col: str, input_col: str, group_cols: List[str]) -> DataFrame:
        """
        Imputes missing values in a column by replacing them with the corresponding values from the previous day, grouped by one or more columns.
        It handles both daily values (column time_col containing dates) and hourly values (column time_col containing datetimes).

        Args:
            sdf: The Spark DataFrame to impute missing values in.
            time_col: The name of the column containing the datetime or date.
            input_col: The name of the column to impute missing values in.
            group_cols: The names of the columns to group by.

        Returns:
            A new Spark DataFrame where missing values in the specified column have been replaced with the corresponding values from the previous day,
            computed for each group separately.
        """
        fe = FeatureExtractor()
        sdf = fe.compute_col_lag_safe(
            sdf=sdf, lag_col=time_col, lag_interval="1 day", value_col=input_col, join_cols=group_cols
        )
        sdf = sdf.withColumn(input_col, F.coalesce(F.col(input_col), F.col(f"{input_col}-1day"))).drop(
            f"{input_col}-1day"
        )
        return sdf

    @staticmethod
    def _impute_with_previous_week(sdf: DataFrame, time_col: str, input_col: str, group_cols: List[str]) -> DataFrame:
        """
        Imputes missing values in a column by replacing them with the corresponding values from the previous week, grouped by one or more columns.
        It handles both daily values (column time_col containing dates) and hourly values (column time_col containing datetimes).

        Args:
            sdf: The Spark DataFrame to impute missing values in.
            time_col: The name of the column containing the datetime or date.
            input_col: The name of the column to impute missing values in.
            group_cols: The names of the columns to group by.

        Returns:
            A new Spark DataFrame where missing values in the specified column have been replaced with the corresponding values from the previous week,
            computed for each group separately.
        """
        fe = FeatureExtractor()
        sdf = fe.compute_col_lag_safe(
            sdf=sdf, lag_col=time_col, lag_interval="1 week", value_col=input_col, join_cols=group_cols
        )
        sdf = sdf.withColumn(input_col, F.coalesce(F.col(input_col), F.col(f"{input_col}-1week"))).drop(
            f"{input_col}-1week"
        )
        return sdf

    @staticmethod
    def _impute_with_previous_month(sdf: DataFrame, time_col: str, input_col: str, group_cols: List[str]) -> DataFrame:
        """
        Imputes missing values in a column by replacing them with the corresponding values from the previous month, grouped by one or more columns.
        It handles both daily values (column time_col containing dates) and hourly values (column time_col containing datetimes).

        Args:
            sdf: The Spark DataFrame to impute missing values in.
            time_col: The name of the column containing the datetime or date.
            input_col: The name of the column to impute missing values in.
            group_cols: The names of the columns to group by.

        Returns:
            A new Spark DataFrame where missing values in the specified column have been replaced with the corresponding values from the previous month,
            computed for each group separately.
        """
        fe = FeatureExtractor()
        sdf = fe.compute_col_lag_safe(
            sdf=sdf, lag_col=time_col, lag_interval="1 month", value_col=input_col, join_cols=group_cols
        )
        sdf = sdf.withColumn(input_col, F.coalesce(F.col(input_col), F.col(f"{input_col}-1month"))).drop(
            f"{input_col}-1month"
        )
        return sdf

    @staticmethod
    def _impute_with_previous_year(sdf: DataFrame, time_col: str, input_col: str, group_cols: List[str]) -> DataFrame:
        """
        Imputes missing values in a column by replacing them with the corresponding values from the previous year, grouped by one or more columns.
        It handles both daily values (column time_col containing dates) and hourly values (column time_col containing datetimes).

        Args:
            sdf: The Spark DataFrame to impute missing values in.
            time_col: The name of the column containing the datetime or date.
            input_col: The name of the column to impute missing values in.
            group_cols: The names of the columns to group by.

        Returns:
            A new Spark DataFrame where missing values in the specified column have been replaced with the corresponding values from the previous year,
            computed for each group separately.
        """
        fe = FeatureExtractor()
        sdf = fe.compute_col_lag_safe(
            sdf=sdf, lag_col=time_col, lag_interval="1 year", value_col=input_col, join_cols=group_cols
        )
        sdf = sdf.withColumn(input_col, F.coalesce(F.col(input_col), F.col(f"{input_col}-1year"))).drop(
            f"{input_col}-1year"
        )
        return sdf

    def impute_missing_values(
        self,
        sdf: DataFrame,
        input_cols: Union[str, List[str]],
        group_cols: List[str] = [],
        strategy: str = "mean",
        time_col: Optional[str] = None,
    ) -> DataFrame:
        """
        Imputes missing values in one or more columns of a Spark DataFrame according to a specified strategy.

        Args:
            sdf: The Spark DataFrame to impute missing values in.
            input_cols: The name of the column or a list of names of columns to impute missing values in.
            group_cols: The names of the columns to group by. Defaults to empty list.
            strategy: The imputation strategy to use. Must be one of the following: "mean", "median", "mode", "previous_hour", "previous_day", "previous_week",
                "previous_month", "previous_year". Defaults to "mean".
            time_col: The name of the column containing the datetime or date. Required if strategy is "previous_hour", "previous_day", "previous_week",
                "previous_month" or "previous_year". Defaults to None.

        Returns:
            A new Spark DataFrame where missing values in the specified column(s) have been replaced according to the specified strategy.

        Raises:
            ValueError: If the specified strategy is not supported.
            TypeError: If the type of input_cols is not valid.
        """
        if strategy not in self.strategies.keys():
            raise ValueError(f"{strategy} not valid for strategy")
        if isinstance(input_cols, str):
            sdf = self._impute_missing_values(
                sdf=sdf, input_col=input_cols, group_cols=group_cols, fn=self.strategies[strategy], time_col=time_col
            )
        elif isinstance(input_cols, list):
            for input_col in input_cols:
                sdf = self._impute_missing_values(
                    sdf=sdf, input_col=input_col, group_cols=group_cols, fn=self.strategies[strategy], time_col=time_col
                )
        else:
            raise TypeError(f"{type(input_cols)} not valid for input_cols")
        return sdf

    def _impute_missing_values(
        self,
        sdf: DataFrame,
        input_col: str,
        group_cols: List[str] = [],
        time_col: Optional[str] = None,
        fn: Optional[Callable] = None,
    ) -> DataFrame:
        """
        Imputes missing values in a column of a Spark DataFrame according to a specified function.

        Args:
            sdf: The Spark DataFrame to impute missing values in.
            input_col: The name of the column to impute missing values in.
            group_cols: The names of the columns to group by. Defaults to empty list.
            time_col: The name of the column containing the datetime or date. Required if the imputation function requires it. Defaults to None.
            fn: The imputation function to use.

        Returns:
            A new Spark DataFrame where missing values in the specified column have been replaced according to the specified function.

        Raises:
            ValueError: If the input column is not a column of sdf.
        """
        if input_col in sdf.columns:
            if time_col:
                return fn(sdf=sdf, input_col=input_col, group_cols=group_cols, time_col=time_col)  # type: ignore
            else:
                return fn(sdf=sdf, input_col=input_col, group_cols=group_cols)  # type: ignore
        else:
            raise ValueError(f"{input_col} is not a column of sdf")

Methods

def detect_outliers_SMA_bands(self, sdf: pyspark.sql.dataframe.DataFrame, time_col: str, input_cols: Union[str, List[str]], group_cols: List[str] = [], n_stddev_threshold: int = 2, lookback_window: int = 20, remove: bool = False) ‑> pyspark.sql.dataframe.DataFrame

Detects outliers in the input column(s) in a Spark DataFrame using the Simple Moving Average (SMA) Bands method.

Args

sdf
The Spark DataFrame to detect outliers in.
time_col
The name of the column containing the timestamp.
input_cols
The name of the column or a list of column names to detect outliers in. If a list is provided, outlier detection will be performed on each column in the list.
group_cols
The names of the columns to group by when detecting outliers. Defaults to an empty list.
n_stddev_threshold
The number of standard deviations from the mean to use as the threshold for outlier detection. Defaults to 2.
lookback_window
The size of the lookback window for the SMA calculation. Defaults to 20.
remove
If True, remove the rows containing the outliers. If False, keep the rows containing the outliers and create an additional column for each input column to identify them. Defaults to False.

Returns

The input DataFrame with additional columns indicating whether each row is an outlier or not. If remove is True, the DataFrame will have the outliers removed instead.

Raises

TypeError
If input_cols is not a string or a list of strings.
def detect_outliers_zscore(self, sdf: pyspark.sql.dataframe.DataFrame, input_cols: Union[str, List[str]], group_cols: List[str] = [], threshold: float = 3, remove: bool = False) ‑> pyspark.sql.dataframe.DataFrame

Detects outliers in the input column(s) in a Spark DataFrame using the z-score method.

Args

sdf
The Spark DataFrame to detect outliers in.
input_cols
The name of the column or a list of column names to detect outliers in. If a list is provided, outlier detection will be performed on each column in the list.
group_cols
The names of the columns to group by when detecting outliers. Defaults to an empty list.
threshold
The z-score threshold for outliers detection. Defaults to 3.
remove
If True, remove the rows containing the outliers. If False, keep the rows containing the outliers and create an additional column for each input column to identify them. Defaults to False.

Returns

The input DataFrame with additional columns indicating whether each row is an outlier or not. If remove is True, the DataFrame will have the outliers removed insted.

Raises

TypeError
If input_cols is not a string or a list.
def impute_missing_values(self, sdf: pyspark.sql.dataframe.DataFrame, input_cols: Union[str, List[str]], group_cols: List[str] = [], strategy: str = 'mean', time_col: Optional[str] = None) ‑> pyspark.sql.dataframe.DataFrame

Imputes missing values in one or more columns of a Spark DataFrame according to a specified strategy.

Args

sdf
The Spark DataFrame to impute missing values in.
input_cols
The name of the column or a list of names of columns to impute missing values in.
group_cols
The names of the columns to group by. Defaults to empty list.
strategy
The imputation strategy to use. Must be one of the following: "mean", "median", "mode", "previous_hour", "previous_day", "previous_week", "previous_month", "previous_year". Defaults to "mean".
time_col
The name of the column containing the datetime or date. Required if strategy is "previous_hour", "previous_day", "previous_week", "previous_month" or "previous_year". Defaults to None.

Returns

A new Spark DataFrame where missing values in the specified column(s) have been replaced according to the specified strategy.

Raises

ValueError
If the specified strategy is not supported.
TypeError
If the type of input_cols is not valid.