Module panama.data_quality.checks

Classes

class Check (df: pyspark.sql.dataframe.DataFrame, col: Union[str, List[str]])

Abstract class for checks.

Attributes: df (DataFrame): spark dataframe to be analyzed. col (str): column of the dataframe to be analyzed.

Expand source code
class Check(ABC):
    """Abstract class for checks.

    Attributes:
    df (DataFrame): spark dataframe to be analyzed.
    col (str): column of the dataframe to be analyzed.
    """

    def __init__(self, df: DataFrame, col: Union[List[str], str]):
        self.df = df
        self.col = col
        self._set_logger()

    def _set_logger(self):
        """Set logger if logging is active."""
        # create logger only if logging is active
        if panama.logging.get_active_log() < 99:
            self.logger = panama.logging.get_logger()

    def _log(self, step_id: str, trace: Union[Exception, str, None] = None, level: int = panama.logging.INFO):
        if hasattr(self, "logger"):
            self.logger.log(msg=step_id, level=level, exc_info=trace) #type: ignore

    @abstractmethod
    def _evaluate():
        raise NotImplementedError()

    @abstractmethod
    def warning():
        raise NotImplementedError()

    @abstractmethod
    def fail():
        raise NotImplementedError()

    @abstractmethod
    def drop():
        raise NotImplementedError()

Ancestors

  • abc.ABC

Subclasses

Methods

def drop()
def fail()
def warning()
class NullCheck (df: pyspark.sql.dataframe.DataFrame, col: str)

Class with methods for checking null values in a column.

Attributes:

df (DataFrame): spark dataframe to be checked. col (str): column of df to be checked. logger (PanamaLogger): logger for events. check_row_count (int): number of null values in column col of df.

Inizialize the class.

Args

df : DataFrame
spark dataframe to be checked.
col : str
column of df to be checked.
Expand source code
class NullCheck(Check):
    """Class with methods for checking null values in a column.

    Attributes:

    df (DataFrame): spark dataframe to be checked.
    col (str): column of df to be checked.
    logger (PanamaLogger): logger for events.
    check_row_count (int): number of null values in column col of df.
    """

    def __init__(self, df: DataFrame, col: str):
        """Inizialize the class.

        Args:
            df (DataFrame): spark dataframe to be checked.
            col (str): column of df to be checked.
        """
        super().__init__(df, col)
        self.check_row_count = self._evaluate()

    def _evaluate(self) -> int:
        """Private method used to count nulls in column col of df.

        Returns:
            int: null number in col of df.
        """
        assert isinstance(self.col, str), TypeError(
            f"col parameter for NullCheck must be a str, {type(self.col)} was found."
        )
        check_row_count = self.df.where(F.col(self.col).isNull()).count()
        return check_row_count

    def warning(self):
        """Method to handle a warning message for null values."""
        if self.check_row_count > 0:
            msg = DataQualityWarning("Column '%s' has %d null values." % (self.col, self.check_row_count))
            log_level = panama.logging.WARN
        else:
            msg = "DataQuality check passed: '%s' column has no null value." % (self.col)
            log_level = panama.logging.INFO
        self._log(step_id="NullCheck", trace=msg, level=log_level)

    def fail(self):
        """Method to handle an error for null values.

        Raises:
            DataQualityException: Column <col> has <check_row_count> null values.
        """
        if self.check_row_count > 0:
            content = f"Column '{self.col}' has {self.check_row_count} null values."
            msg = DataQualityException(content)
            self._log(step_id="NullCheck", trace=msg, level=panama.logging.ERROR)
            raise msg
        else:
            msg = "DataQuality check passed: '%s' column has no null value." % (self.col)
            self._log(step_id="NullCheck", trace=msg, level=panama.logging.INFO)

    def drop(self) -> DataFrame:
        """Method used to drop records of df where column col has null values.

        Returns:
            DataFrame: df without the rows where column col has null values.
        """
        assert isinstance(self.col, str), TypeError(
            f"col parameter for NullCheck must be a str, {type(self.col)} was found."
        )

        if self.check_row_count > 0:
            self.df = self.df.where(F.col(self.col).isNotNull())
            msg = "%d null records on column '%s' were dropped" % (self.check_row_count, self.col)
        else:
            msg = "DataQuality check passed: '%s' column has no null value." % (self.col)
        self._log(step_id="NullCheck", level=panama.logging.INFO, trace=msg)

        return self.df

Ancestors

Methods

def drop(self) ‑> pyspark.sql.dataframe.DataFrame

Method used to drop records of df where column col has null values.

Returns

DataFrame
df without the rows where column col has null values.
def fail(self)

Method to handle an error for null values.

Raises

DataQualityException
Column has null values.
def warning(self)

Method to handle a warning message for null values.

class ThresholdsCheck (df, col: str, condition: str, boundaries: List[Union[int, float]], lower_bound_incl: bool = True, upper_bound_incl: bool = True)

The goal of this check is to check when the given condition evaluates to True and act (fail, warning, drop) when it evaluates to False.

Attributes:

df (DataFrame): spark dataframe to check. col (str): dataframe column to check. condition (str, ['between', 'not between']): kind of condition. Must be either 'between' or 'not between'. boundaries (list): extres for the allowed values. lower_bound_incl (bool, optional): flag for included lower bound. Defaults to True. upper_bound_incl (bool, optional): flag for included upper bound. Defaults to True.

Initialize Thresholds checks.

Args

df : DataFrame
spark dataframe to check.
col : str
dataframe column to check.
condition : str
kind of condition. Must be either 'between' or 'not between'.
boundaries : list
extres for the allowed values.
lower_bound_incl : bool, optional
flag for included lower bound. Defaults to True.
upper_bound_incl : bool, optional
flag for included upper bound. Defaults to True.
Expand source code
class ThresholdsCheck(Check):
    """The goal of this check is to check when the given condition evaluates to True and act (fail, warning, drop) when it evaluates to False.

    Attributes:

    df (DataFrame): spark dataframe to check.
    col (str): dataframe column to check.
    condition (str, ['between', 'not between']): kind of condition. Must be either 'between' or 'not between'.
    boundaries (list): extres for the allowed values.
    lower_bound_incl (bool, optional): flag for included lower bound. Defaults to True.
    upper_bound_incl (bool, optional): flag for included upper bound. Defaults to True.
    """

    def __init__(
        self,
        df,
        col: str,
        condition: str,
        boundaries: List[Union[int, float]],
        lower_bound_incl: bool = True,
        upper_bound_incl: bool = True,
    ):
        """Initialize Thresholds checks.

        Args:
            df (DataFrame):  spark dataframe to check.
            col (str):  dataframe column to check.
            condition (str): kind of condition. Must be either 'between' or 'not between'.
            boundaries (list): extres for the allowed values.
            lower_bound_incl (bool, optional): flag for included lower bound. Defaults to True.
            upper_bound_incl (bool, optional): flag for included upper bound. Defaults to True.
        """

        super().__init__(df, col)

        self.evaluation_string = self._generate_evaluation_string_from_inputs(
            col=col,
            condition=condition,
            boundaries=boundaries,
            lower_bound_incl=lower_bound_incl,
            upper_bound_incl=upper_bound_incl,
        )

        self.check_row_count = self._evaluate()

    @staticmethod
    def validate_or_default_check_config(check_config: dict):
        """Private method to validate a configuration.

        Args:
            check_config (dict): configuration.

        Returns:
            dict: configuration checked.
        """
        # Check boundaries
        ThresholdsCheck._validate_boundaries(check_config)

        # Check condition presence, use default and give warning if not found
        check_config = ThresholdsCheck._validate_condition(check_config)

        # Check lower and upper buondaries inclusivity presence, use defaults and give warning if not found
        check_config = ThresholdsCheck._validate_bound_inclusivity(check_config)

        return check_config

    @staticmethod
    def _validate_boundaries(check_config: Dict):
        """Method used to validate the boundaries of a configuration.

        Args:
            check_config (Dict): configuration to check.

        Raises:
            NameError: missing boundaries from check_config.
            ValueError: boundaries must be an iterable having exactly two elements.
        """
        try:
            boundaries = check_config["boundaries"]
        except:
            raise NameError(
                "check_config not compliant: with 'thresholds' check you need to specify a 'boundaries' key with a 2 element list as value."
            )

        try:
            assert len(boundaries) == 2
        except:
            raise ValueError(
                "check_config not compliant: with 'thresholds' check you need to specify a 'boundaries' key with a 2 element list as value."
            )

    @staticmethod
    def _validate_condition(check_config: dict) -> dict:
        """Private method used to check the condition value of a config. Raises a warning if no 'condition' parameter is in the configuration.

        Args:
            check_config (dict): config to check

        Returns:
            dict: configuration checked
        """
        accepted_condition_list = ["between", "not between"]
        default_condition = accepted_condition_list[0]

        # check if configuration has condition as key
        try:
            condition = check_config["condition"]
        except:
            warning_string = "check_config WARNING: condition parameter not provided, will use default '%s'" % (
                default_condition
            )
            print(warning_string)
            check_config.update({"condition": default_condition})

            return check_config

        # check that the condition value is an accepted one, default is 'between'.
        if condition in accepted_condition_list:
            pass
        else:
            warning_string = (
                "check_config WARNING: %s not accepted as condition with this check type. Using default condition '%s'"
                % (condition, default_condition)
            )
            print(warning_string)
            check_config.update({"condition": default_condition})

        return check_config

    @staticmethod
    def _validate_bound_inclusivity(check_config: dict) -> dict:
        """Private method used to validate the bound inclusivity.

        Args:
            check_config (dict): config to check.

        Raises:
            TypeError: the boundary must be of type bool.

        Returns:
            dict: validated config.
        """

        default_value = True
        for i in ["lower_bound_incl", "upper_bound_incl"]:
            try:
                locals()[i] = check_config[i]
            except:
                warning_string = "check_config WARNING: %s not provided using default %s" % (i, default_value)
                print(warning_string)
                check_config.update({i: default_value})

            if not isinstance(check_config[i], bool):
                raise TypeError("%s must be of type bool" % (locals()[i]))
        return check_config

    @staticmethod
    def _generate_evaluation_string_from_inputs(
        col: str, condition: str, boundaries: list, lower_bound_incl: bool = True, upper_bound_incl: bool = True
    ):
        """Private method used to generate the evaluation string for thresholds.

        Args:
            col (str): column to be checked
            condition (str): condition to validate ('between' or 'not between')
            boundaries (list): minimum and maximum value
            lower_bound_incl (bool, optional): flag to request lower bound inclusive. Defaults to True.
            upper_bound_incl (bool, optional): flag to request upper bound inclusive. Defaults to True.

        Returns:
            str: evaluation string.
        """
        # Set operators
        if condition == "not between":
            lower_bound_operator = ">"
            upper_bound_operator = "<"
            logical_operator = "and"
        elif condition == "between":
            lower_bound_operator = "<"
            upper_bound_operator = ">"
            logical_operator = "or"
        else:
            raise ValueError(f"condition must be either 'between' or 'not between', {condition} was found.")

        # Add equality to operators if requested
        if not lower_bound_incl:
            lower_bound_operator += "="

        if not upper_bound_incl:
            upper_bound_operator += "="

        # get boundaries values
        lower_bound = min(boundaries)
        upper_bound = max(boundaries)

        # Compose evaluation string considering case where inf is provided
        if lower_bound == -float("inf"):
            evaluation_string = "%s %s %s" % (col, upper_bound_operator, upper_bound)
        elif upper_bound == float("inf"):
            evaluation_string = "%s %s %s" % (col, lower_bound_operator, lower_bound)
        else:
            evaluation_string = "%s %s %s %s %s %s %s" % (
                col,
                lower_bound_operator,
                lower_bound,
                logical_operator,
                col,
                upper_bound_operator,
                upper_bound,
            )

        return evaluation_string

    def _evaluate(self) -> int:
        """Method used to evaluate the number of rows not complaiant to check definition.

        Returns:
            int: the number of rows to keep.
        """

        return self.df.where(self.evaluation_string).count()

    def warning(self):
        """Method to handle a warning message for out-of-boundaries values."""
        if self.check_row_count > 0:
            msg = DataQualityWarning(
                "Column '%s' has %d values not compliant with thresholds."
                % (
                    self.col,
                    self.check_row_count,
                )
            )
            log_level = panama.logging.WARN
        else:
            msg = "DataQuality check passed: '%s' column has every value compliant with thresholds." % (self.col)
            log_level = panama.logging.INFO
        self._log(step_id="ThresholdsCheck", trace=msg, level=log_level)

    def fail(self):
        """Method to handle an error for null values.

        Raises:
            DataQualityException: Column <col> has <check_row_count> values not compliant with thresholds.
        """
        if self.check_row_count > 0:
            content = f"Column '{self.col}' has {self.check_row_count} values not compliant with thresholds."
            msg = DataQualityException(content)
            self._log(step_id="ThresholdsCheck", trace=msg, level=panama.logging.ERROR)
            raise msg
        else:
            msg = "DataQuality check passed: '%s' column has every value compliant with thresholds." % (self.col)
            self._log(step_id="ThresholdsCheck", trace=msg, level=panama.logging.INFO)

    def drop(self):
        """Method used to drop records of df where column col has null values.

        Returns:
            DataFrame: df without the rows where column col has values not compliant with thresholds.
        """
        if self.check_row_count > 0:
            self.df = self.df.where(~F.expr(self.evaluation_string))
            msg = "%d non compliant records on column '%s' were dropped" % (self.check_row_count, self.col)
        else:
            msg = "DataQuality check passed: '%s' column has every value compliant with thresholds." % (self.col)
        self._log(step_id="ThresholdsCheck", level=panama.logging.INFO, trace=msg)
        return self.df

Ancestors

Static methods

def validate_or_default_check_config(check_config: dict)

Private method to validate a configuration.

Args

check_config : dict
configuration.

Returns

dict
configuration checked.

Methods

def drop(self)

Method used to drop records of df where column col has null values.

Returns

DataFrame
df without the rows where column col has values not compliant with thresholds.
def fail(self)

Method to handle an error for null values.

Raises

DataQualityException
Column has values not compliant with thresholds.
def warning(self)

Method to handle a warning message for out-of-boundaries values.

class UniqueCheck (df: pyspark.sql.dataframe.DataFrame, col: List[str])

Class for checking unique values in a subset of dataframe columns.

Attributes: duplicates_df_aggr (int): rows having duplicates on the selected columns. duplicates_df_row_count (int): number of rows having duplicates on the selected columns.

Initialize unique check class.

Args

df : DataFrame
spark dataframe to check.
col : list
list of columns to check.
Expand source code
class UniqueCheck(Check):
    """Class for checking unique values in a subset of dataframe columns.

    Attributes:
    duplicates_df_aggr (int): rows having duplicates on the selected columns.
    duplicates_df_row_count (int): number of rows having duplicates on the selected columns.
    """

    def __init__(self, df: DataFrame, col: List[str]):
        """Initialize unique check class.

        Args:
            df (DataFrame): spark dataframe to check.
            col (list): list of columns to check.
        """
        super().__init__(df, col)

        self.duplicates_df_aggr, self.duplicates_df_row_count = self._evaluate()

    def _evaluate(self) -> Tuple:
        """Private method used to evaluate the duplicates for the selected cols and their quantity.

        Returns:
            List: dataframe of duplicate rows, duplicate row count
        """
        duplicates_df_aggr = self.df.groupBy(self.col).count().where(F.col("count") > 1)
        duplicates_df_row_count = duplicates_df_aggr.count()
        return duplicates_df_aggr, duplicates_df_row_count

    def warning(self):
        """Method to handle a warning message for duplicate values."""
        if self.duplicates_df_row_count > 0:
            content = f"Columns '{self.col}' have {self.duplicates_df_row_count} non unique values."
            msg = DataQualityWarning(content)
            log_level = panama.logging.WARN

        else:
            msg = "DataQuality check passed: '%s' columns have no duplicates value." % (self.col)
            log_level = panama.logging.INFO
        self._log(step_id="UniqueCheck", level=log_level, trace=msg)

    def fail(self):
        """Method to handle an error for null values.

        Raises:
            DataQualityException: Column <col> has <duplicate_row_count> duplicate values.
        """
        if self.duplicates_df_row_count > 0:
            content = f"Columns '{self.col}' have {self.duplicates_df_row_count} non unique values."
            msg = DataQualityException(content)
            print("Values with duplicates:")
            self.duplicates_df_aggr.select(self.col).show()
            self._log(step_id="UniqueCheck", trace=msg, level=panama.logging.ERROR)
            raise msg

        else:
            msg = "DataQuality check passed: '%s' columns have no duplicates value." % (self.col)
            self._log(step_id="UniqueCheck", trace=msg, level=panama.logging.INFO)

    def drop(self):
        """Method used to drop records of df where column col has null values.

        Returns:
            DataFrame: df without the rows where column col has duplicate values.
        """
        assert isinstance(self.col, list), TypeError(
            f"col parameter for UniqueCheck must be a list, {type(self.col)} was found."
        )

        if self.duplicates_df_row_count > 0:
            self.df = self.df.dropDuplicates(subset=self.col)
            msg = "%d null records on column '%s' were dropped" % (self.duplicates_df_row_count, self.col)
        else:
            msg = "DataQuality check passed: '%s' column has no duplicates value." % (self.col)
        self._log(step_id="UniqueCheck", trace=msg, level=panama.logging.INFO)

        return self.df

Ancestors

Methods

def drop(self)

Method used to drop records of df where column col has null values.

Returns

DataFrame
df without the rows where column col has duplicate values.
def fail(self)

Method to handle an error for null values.

Raises

DataQualityException
Column has duplicate values.
def warning(self)

Method to handle a warning message for duplicate values.