Module panama.data_quality.checks
Classes
class Check (df: pyspark.sql.dataframe.DataFrame, col: Union[str, List[str]])
-
Abstract class for checks.
Attributes: df (DataFrame): spark dataframe to be analyzed. col (str): column of the dataframe to be analyzed.
Expand source code
class Check(ABC): """Abstract class for checks. Attributes: df (DataFrame): spark dataframe to be analyzed. col (str): column of the dataframe to be analyzed. """ def __init__(self, df: DataFrame, col: Union[List[str], str]): self.df = df self.col = col self._set_logger() def _set_logger(self): """Set logger if logging is active.""" # create logger only if logging is active if panama.logging.get_active_log() < 99: self.logger = panama.logging.get_logger() def _log(self, step_id: str, trace: Union[Exception, str, None] = None, level: int = panama.logging.INFO): if hasattr(self, "logger"): self.logger.log(msg=step_id, level=level, exc_info=trace) #type: ignore @abstractmethod def _evaluate(): raise NotImplementedError() @abstractmethod def warning(): raise NotImplementedError() @abstractmethod def fail(): raise NotImplementedError() @abstractmethod def drop(): raise NotImplementedError()
Ancestors
- abc.ABC
Subclasses
Methods
def drop()
def fail()
def warning()
class NullCheck (df: pyspark.sql.dataframe.DataFrame, col: str)
-
Class with methods for checking null values in a column.
Attributes:
df (DataFrame): spark dataframe to be checked. col (str): column of df to be checked. logger (PanamaLogger): logger for events. check_row_count (int): number of null values in column col of df.
Inizialize the class.
Args
df
:DataFrame
- spark dataframe to be checked.
col
:str
- column of df to be checked.
Expand source code
class NullCheck(Check): """Class with methods for checking null values in a column. Attributes: df (DataFrame): spark dataframe to be checked. col (str): column of df to be checked. logger (PanamaLogger): logger for events. check_row_count (int): number of null values in column col of df. """ def __init__(self, df: DataFrame, col: str): """Inizialize the class. Args: df (DataFrame): spark dataframe to be checked. col (str): column of df to be checked. """ super().__init__(df, col) self.check_row_count = self._evaluate() def _evaluate(self) -> int: """Private method used to count nulls in column col of df. Returns: int: null number in col of df. """ assert isinstance(self.col, str), TypeError( f"col parameter for NullCheck must be a str, {type(self.col)} was found." ) check_row_count = self.df.where(F.col(self.col).isNull()).count() return check_row_count def warning(self): """Method to handle a warning message for null values.""" if self.check_row_count > 0: msg = DataQualityWarning("Column '%s' has %d null values." % (self.col, self.check_row_count)) log_level = panama.logging.WARN else: msg = "DataQuality check passed: '%s' column has no null value." % (self.col) log_level = panama.logging.INFO self._log(step_id="NullCheck", trace=msg, level=log_level) def fail(self): """Method to handle an error for null values. Raises: DataQualityException: Column <col> has <check_row_count> null values. """ if self.check_row_count > 0: content = f"Column '{self.col}' has {self.check_row_count} null values." msg = DataQualityException(content) self._log(step_id="NullCheck", trace=msg, level=panama.logging.ERROR) raise msg else: msg = "DataQuality check passed: '%s' column has no null value." % (self.col) self._log(step_id="NullCheck", trace=msg, level=panama.logging.INFO) def drop(self) -> DataFrame: """Method used to drop records of df where column col has null values. Returns: DataFrame: df without the rows where column col has null values. """ assert isinstance(self.col, str), TypeError( f"col parameter for NullCheck must be a str, {type(self.col)} was found." ) if self.check_row_count > 0: self.df = self.df.where(F.col(self.col).isNotNull()) msg = "%d null records on column '%s' were dropped" % (self.check_row_count, self.col) else: msg = "DataQuality check passed: '%s' column has no null value." % (self.col) self._log(step_id="NullCheck", level=panama.logging.INFO, trace=msg) return self.df
Ancestors
- Check
- abc.ABC
Methods
def drop(self) ‑> pyspark.sql.dataframe.DataFrame
-
Method used to drop records of df where column col has null values.
Returns
DataFrame
- df without the rows where column col has null values.
def fail(self)
-
Method to handle an error for null values.
Raises
DataQualityException
- Column
has null values.
def warning(self)
-
Method to handle a warning message for null values.
class ThresholdsCheck (df, col: str, condition: str, boundaries: List[Union[int, float]], lower_bound_incl: bool = True, upper_bound_incl: bool = True)
-
The goal of this check is to check when the given condition evaluates to True and act (fail, warning, drop) when it evaluates to False.
Attributes:
df (DataFrame): spark dataframe to check. col (str): dataframe column to check. condition (str, ['between', 'not between']): kind of condition. Must be either 'between' or 'not between'. boundaries (list): extres for the allowed values. lower_bound_incl (bool, optional): flag for included lower bound. Defaults to True. upper_bound_incl (bool, optional): flag for included upper bound. Defaults to True.
Initialize Thresholds checks.
Args
df
:DataFrame
- spark dataframe to check.
col
:str
- dataframe column to check.
condition
:str
- kind of condition. Must be either 'between' or 'not between'.
boundaries
:list
- extres for the allowed values.
lower_bound_incl
:bool
, optional- flag for included lower bound. Defaults to True.
upper_bound_incl
:bool
, optional- flag for included upper bound. Defaults to True.
Expand source code
class ThresholdsCheck(Check): """The goal of this check is to check when the given condition evaluates to True and act (fail, warning, drop) when it evaluates to False. Attributes: df (DataFrame): spark dataframe to check. col (str): dataframe column to check. condition (str, ['between', 'not between']): kind of condition. Must be either 'between' or 'not between'. boundaries (list): extres for the allowed values. lower_bound_incl (bool, optional): flag for included lower bound. Defaults to True. upper_bound_incl (bool, optional): flag for included upper bound. Defaults to True. """ def __init__( self, df, col: str, condition: str, boundaries: List[Union[int, float]], lower_bound_incl: bool = True, upper_bound_incl: bool = True, ): """Initialize Thresholds checks. Args: df (DataFrame): spark dataframe to check. col (str): dataframe column to check. condition (str): kind of condition. Must be either 'between' or 'not between'. boundaries (list): extres for the allowed values. lower_bound_incl (bool, optional): flag for included lower bound. Defaults to True. upper_bound_incl (bool, optional): flag for included upper bound. Defaults to True. """ super().__init__(df, col) self.evaluation_string = self._generate_evaluation_string_from_inputs( col=col, condition=condition, boundaries=boundaries, lower_bound_incl=lower_bound_incl, upper_bound_incl=upper_bound_incl, ) self.check_row_count = self._evaluate() @staticmethod def validate_or_default_check_config(check_config: dict): """Private method to validate a configuration. Args: check_config (dict): configuration. Returns: dict: configuration checked. """ # Check boundaries ThresholdsCheck._validate_boundaries(check_config) # Check condition presence, use default and give warning if not found check_config = ThresholdsCheck._validate_condition(check_config) # Check lower and upper buondaries inclusivity presence, use defaults and give warning if not found check_config = ThresholdsCheck._validate_bound_inclusivity(check_config) return check_config @staticmethod def _validate_boundaries(check_config: Dict): """Method used to validate the boundaries of a configuration. Args: check_config (Dict): configuration to check. Raises: NameError: missing boundaries from check_config. ValueError: boundaries must be an iterable having exactly two elements. """ try: boundaries = check_config["boundaries"] except: raise NameError( "check_config not compliant: with 'thresholds' check you need to specify a 'boundaries' key with a 2 element list as value." ) try: assert len(boundaries) == 2 except: raise ValueError( "check_config not compliant: with 'thresholds' check you need to specify a 'boundaries' key with a 2 element list as value." ) @staticmethod def _validate_condition(check_config: dict) -> dict: """Private method used to check the condition value of a config. Raises a warning if no 'condition' parameter is in the configuration. Args: check_config (dict): config to check Returns: dict: configuration checked """ accepted_condition_list = ["between", "not between"] default_condition = accepted_condition_list[0] # check if configuration has condition as key try: condition = check_config["condition"] except: warning_string = "check_config WARNING: condition parameter not provided, will use default '%s'" % ( default_condition ) print(warning_string) check_config.update({"condition": default_condition}) return check_config # check that the condition value is an accepted one, default is 'between'. if condition in accepted_condition_list: pass else: warning_string = ( "check_config WARNING: %s not accepted as condition with this check type. Using default condition '%s'" % (condition, default_condition) ) print(warning_string) check_config.update({"condition": default_condition}) return check_config @staticmethod def _validate_bound_inclusivity(check_config: dict) -> dict: """Private method used to validate the bound inclusivity. Args: check_config (dict): config to check. Raises: TypeError: the boundary must be of type bool. Returns: dict: validated config. """ default_value = True for i in ["lower_bound_incl", "upper_bound_incl"]: try: locals()[i] = check_config[i] except: warning_string = "check_config WARNING: %s not provided using default %s" % (i, default_value) print(warning_string) check_config.update({i: default_value}) if not isinstance(check_config[i], bool): raise TypeError("%s must be of type bool" % (locals()[i])) return check_config @staticmethod def _generate_evaluation_string_from_inputs( col: str, condition: str, boundaries: list, lower_bound_incl: bool = True, upper_bound_incl: bool = True ): """Private method used to generate the evaluation string for thresholds. Args: col (str): column to be checked condition (str): condition to validate ('between' or 'not between') boundaries (list): minimum and maximum value lower_bound_incl (bool, optional): flag to request lower bound inclusive. Defaults to True. upper_bound_incl (bool, optional): flag to request upper bound inclusive. Defaults to True. Returns: str: evaluation string. """ # Set operators if condition == "not between": lower_bound_operator = ">" upper_bound_operator = "<" logical_operator = "and" elif condition == "between": lower_bound_operator = "<" upper_bound_operator = ">" logical_operator = "or" else: raise ValueError(f"condition must be either 'between' or 'not between', {condition} was found.") # Add equality to operators if requested if not lower_bound_incl: lower_bound_operator += "=" if not upper_bound_incl: upper_bound_operator += "=" # get boundaries values lower_bound = min(boundaries) upper_bound = max(boundaries) # Compose evaluation string considering case where inf is provided if lower_bound == -float("inf"): evaluation_string = "%s %s %s" % (col, upper_bound_operator, upper_bound) elif upper_bound == float("inf"): evaluation_string = "%s %s %s" % (col, lower_bound_operator, lower_bound) else: evaluation_string = "%s %s %s %s %s %s %s" % ( col, lower_bound_operator, lower_bound, logical_operator, col, upper_bound_operator, upper_bound, ) return evaluation_string def _evaluate(self) -> int: """Method used to evaluate the number of rows not complaiant to check definition. Returns: int: the number of rows to keep. """ return self.df.where(self.evaluation_string).count() def warning(self): """Method to handle a warning message for out-of-boundaries values.""" if self.check_row_count > 0: msg = DataQualityWarning( "Column '%s' has %d values not compliant with thresholds." % ( self.col, self.check_row_count, ) ) log_level = panama.logging.WARN else: msg = "DataQuality check passed: '%s' column has every value compliant with thresholds." % (self.col) log_level = panama.logging.INFO self._log(step_id="ThresholdsCheck", trace=msg, level=log_level) def fail(self): """Method to handle an error for null values. Raises: DataQualityException: Column <col> has <check_row_count> values not compliant with thresholds. """ if self.check_row_count > 0: content = f"Column '{self.col}' has {self.check_row_count} values not compliant with thresholds." msg = DataQualityException(content) self._log(step_id="ThresholdsCheck", trace=msg, level=panama.logging.ERROR) raise msg else: msg = "DataQuality check passed: '%s' column has every value compliant with thresholds." % (self.col) self._log(step_id="ThresholdsCheck", trace=msg, level=panama.logging.INFO) def drop(self): """Method used to drop records of df where column col has null values. Returns: DataFrame: df without the rows where column col has values not compliant with thresholds. """ if self.check_row_count > 0: self.df = self.df.where(~F.expr(self.evaluation_string)) msg = "%d non compliant records on column '%s' were dropped" % (self.check_row_count, self.col) else: msg = "DataQuality check passed: '%s' column has every value compliant with thresholds." % (self.col) self._log(step_id="ThresholdsCheck", level=panama.logging.INFO, trace=msg) return self.df
Ancestors
- Check
- abc.ABC
Static methods
def validate_or_default_check_config(check_config: dict)
-
Private method to validate a configuration.
Args
check_config
:dict
- configuration.
Returns
dict
- configuration checked.
Methods
def drop(self)
-
Method used to drop records of df where column col has null values.
Returns
DataFrame
- df without the rows where column col has values not compliant with thresholds.
def fail(self)
-
Method to handle an error for null values.
Raises
DataQualityException
- Column
has values not compliant with thresholds.
def warning(self)
-
Method to handle a warning message for out-of-boundaries values.
class UniqueCheck (df: pyspark.sql.dataframe.DataFrame, col: List[str])
-
Class for checking unique values in a subset of dataframe columns.
Attributes: duplicates_df_aggr (int): rows having duplicates on the selected columns. duplicates_df_row_count (int): number of rows having duplicates on the selected columns.
Initialize unique check class.
Args
df
:DataFrame
- spark dataframe to check.
col
:list
- list of columns to check.
Expand source code
class UniqueCheck(Check): """Class for checking unique values in a subset of dataframe columns. Attributes: duplicates_df_aggr (int): rows having duplicates on the selected columns. duplicates_df_row_count (int): number of rows having duplicates on the selected columns. """ def __init__(self, df: DataFrame, col: List[str]): """Initialize unique check class. Args: df (DataFrame): spark dataframe to check. col (list): list of columns to check. """ super().__init__(df, col) self.duplicates_df_aggr, self.duplicates_df_row_count = self._evaluate() def _evaluate(self) -> Tuple: """Private method used to evaluate the duplicates for the selected cols and their quantity. Returns: List: dataframe of duplicate rows, duplicate row count """ duplicates_df_aggr = self.df.groupBy(self.col).count().where(F.col("count") > 1) duplicates_df_row_count = duplicates_df_aggr.count() return duplicates_df_aggr, duplicates_df_row_count def warning(self): """Method to handle a warning message for duplicate values.""" if self.duplicates_df_row_count > 0: content = f"Columns '{self.col}' have {self.duplicates_df_row_count} non unique values." msg = DataQualityWarning(content) log_level = panama.logging.WARN else: msg = "DataQuality check passed: '%s' columns have no duplicates value." % (self.col) log_level = panama.logging.INFO self._log(step_id="UniqueCheck", level=log_level, trace=msg) def fail(self): """Method to handle an error for null values. Raises: DataQualityException: Column <col> has <duplicate_row_count> duplicate values. """ if self.duplicates_df_row_count > 0: content = f"Columns '{self.col}' have {self.duplicates_df_row_count} non unique values." msg = DataQualityException(content) print("Values with duplicates:") self.duplicates_df_aggr.select(self.col).show() self._log(step_id="UniqueCheck", trace=msg, level=panama.logging.ERROR) raise msg else: msg = "DataQuality check passed: '%s' columns have no duplicates value." % (self.col) self._log(step_id="UniqueCheck", trace=msg, level=panama.logging.INFO) def drop(self): """Method used to drop records of df where column col has null values. Returns: DataFrame: df without the rows where column col has duplicate values. """ assert isinstance(self.col, list), TypeError( f"col parameter for UniqueCheck must be a list, {type(self.col)} was found." ) if self.duplicates_df_row_count > 0: self.df = self.df.dropDuplicates(subset=self.col) msg = "%d null records on column '%s' were dropped" % (self.duplicates_df_row_count, self.col) else: msg = "DataQuality check passed: '%s' column has no duplicates value." % (self.col) self._log(step_id="UniqueCheck", trace=msg, level=panama.logging.INFO) return self.df
Ancestors
- Check
- abc.ABC
Methods
def drop(self)
-
Method used to drop records of df where column col has null values.
Returns
DataFrame
- df without the rows where column col has duplicate values.
def fail(self)
-
Method to handle an error for null values.
Raises
DataQualityException
- Column
has duplicate values.
def warning(self)
-
Method to handle a warning message for duplicate values.