Module panama.data_quality.data_quality
Classes
class DataQuality (spark: pyspark.sql.session.SparkSession, df: pyspark.sql.dataframe.DataFrame)
-
Expand source code
class DataQuality: def __init__(self, spark: SparkSession, df: DataFrame): self.spark = spark self.df = df # List containing accepted inputs in add_check methods self.check_type_list = ["null", "unique", "thresholds"] self.action_list = ["warning", "fail", "drop"] # Init list containg queue of check to run self.pipeline = [] @staticmethod def _validate_single_column_input(col: Union[str, list], check_type: str): """ Check if column input contains single column name when null check is asked during add_method. Throws error if more than one column is found. Args: col (Union[str, list]): contains the argument input of add_check method. Raises: TypeError: If more than one column is passed. TypeError: If one column is passed but it's not represented as a string. """ if isinstance(col, list): raise TypeError("With %s check_type please use one column at a time in columns parameter." % (check_type)) elif not isinstance(col, str): raise TypeError( "With %s check_type, all columns arguments must be passed as a string value." % (check_type) ) else: pass @staticmethod def _convert_col_input_into_list_type(col: Union[str, list]): """ Convert col to list if a single column is passed. Throws error if the single column isn't not represented as a string. Args: col (Union[str, list]): contains the argument input of add_check method. Raises: TypeError: If one column is passed but it's not represented as a string. Returns: col (list): list containing col input. """ if isinstance(col, list): return col elif isinstance(col, str): col = list(col) return col else: raise TypeError("With 'null' check_type, 'col' argument must be passed as a string or a list") def _validate_check_type_input(self, check_type: str): """ Check if requested check type is one of the accepted. Throws error if doesn't. Args: check_type (str): contains the argument input of add_check method. Raises: ValueError: inform and suggests what check types are avaiable. """ if check_type not in self.check_type_list: value_error_string = "'%s' value not accepted for check_type argument. Please use: %s" % ( check_type, self.check_type_list, ) raise ValueError(value_error_string) def _validate_action_input(self, action: str): """ Check if requested action is one of the accepted. Throws error if doesn't. Args: action (str): contains the argument input of add_check method. Raises: ValueError: inform and suggests what actions are avaiable. """ if action not in self.action_list: value_error_string = "'%s' value not accepted for check_type argument. Please use: %s" % ( action, self.action_list, ) raise ValueError(value_error_string) def _add_null_check(self, col: Union[str, list], action: str): """Private method used to add a null check. Args: col (Union[str, list]): column or list of columns whose values are to check. action (str): kind of action to perform. Can be either 'drop', 'warning' or 'fail'. """ # Check if columns input is a single column self._validate_single_column_input(col=col, check_type="null") # Add check configuration to pipeline check = {"col": col, "check_type": "null", "action": action} self.pipeline.append(check) def _add_unique_check(self, col: Union[str, list], action: str): """Private method used to add a unique check. Args: col (Union[str, list]): column or list of columns whose values are to check. action (str): kind of action to perform. Can be either 'drop', 'warning' or 'fail'. """ # Convert col argument to list when is single value self._convert_col_input_into_list_type(col) # Add check configuration to pipeline check = {"col": col, "check_type": "unique", "action": action} self.pipeline.append(check) def _add_thresholds_check(self, col: Union[str, list], action: str, check_config: dict = dict()): """Private method used to add a threshold check. Args: col (Union[str, list]): column or list of columns whose values are to check. action (str): kind of action to perform. Can be either 'drop', 'warning' or 'fail'. check_config (dict, optional): configuration of the check. Defaults to dict(). """ # Check if columns input is a single column self._validate_single_column_input(col=col, check_type="thresholds") # Validate check_config and apply eventual default values check_config = ThresholdsCheck.validate_or_default_check_config(check_config) # Add check configuration to pipeline check = {"col": col, "check_type": "thresholds", "action": action} check.update(check_config) self.pipeline.append(check) def add_check(self, col: Union[str, list], check_type: str, action: str, check_config: dict = dict()): """General method used to add a check. Args: col (Union[str, list]): column or list of columns whose values are to check. check_type (str): kind of check. Can be either 'null', 'drop' or 'thresholds'. action (str): kind of cation to perform. Can be either 'drop', 'warning' or 'fail'. check_config (dict, optional): Dictionary holding the configuration for a thresholds check. Unused if check_type != 'thresholds'. Defaults to dict(). """ # Validate check type self._validate_check_type_input(check_type=check_type) # Validate action self._validate_action_input(action=action) # Depending on check type, run specific function and add check to pipeline if check_type == "null": self._add_null_check(col=col, action=action) elif check_type == "unique": self._add_unique_check(col=col, action=action) elif check_type == "thresholds": self._add_thresholds_check(col=col, action=action, check_config=check_config) def set_pipeline(self, pipeline_config: List[Dict]): """Method to add a pipeline from a list of dicts. Args: pipeline_config (Iterable[Dict]): configuration for a pipeline. """ self.pipeline = pipeline_config def _run_single_check(self, check_config: Dict): """Private method used to run a single check. Requires a configuration dictionary. Args: check_config (Dict): dictionary holding a configuration for a check. Returns: DataFrame: returns dataframe with dropped rows, if the step is 'drop'. """ # get col value and check that is passed in configuration col = check_config.get("col") if col is None: raise ValueError("Missing col entry from check_config") # get the type of check from configuration check_type = check_config.get("check_type") # get the action type: action = check_config.get("action") if check_type == "null": check = NullCheck(df=self.df, col=col) elif check_type == "unique": check = UniqueCheck(df=self.df, col=col) elif check_type == "thresholds": kw_list = ["condition", "boundaries", "upper_bound_incl", "lower_bound_incl"] kwargs = {k: v for k, v in check_config.items() if k in kw_list} check = ThresholdsCheck(df=self.df, col=col, **kwargs) if action == "drop": self.df = check.drop() # type: ignore return self.df elif action == "warning": check.warning() # type: ignore elif action == "fail": check.fail() # type: ignore def run(self, mode: Optional[str] = None) -> None: """Method used to run all the checks for the dataframe. If mode parameter it's used, table will be cached or persisted before running the check's pipeline. Args: mode (str): what caching operation to perform. Accepted values are 'cache' and 'persist'. """ cache_or_persist_sdf(sdf=self.df, mode=mode) # cache dataframe for check in self.pipeline: self._run_single_check(check_config=check) def return_df(self) -> DataFrame: """Get the dataframe to check. Returns: DataFrame: dataframe to check or already checked. """ return self.df def clean_pipeline(self): print("Dropping every check in the current pipeline") self.pipeline = []
Methods
def add_check(self, col: Union[str, list], check_type: str, action: str, check_config: dict = {})
-
General method used to add a check.
Args
col
:Union[str, list]
- column or list of columns whose values are to check.
check_type
:str
- kind of check. Can be either 'null', 'drop' or 'thresholds'.
action
:str
- kind of cation to perform. Can be either 'drop', 'warning' or 'fail'.
check_config
:dict
, optional- Dictionary holding the configuration for a thresholds check. Unused if check_type != 'thresholds'. Defaults to dict().
def clean_pipeline(self)
def return_df(self) ‑> pyspark.sql.dataframe.DataFrame
-
Get the dataframe to check.
Returns
DataFrame
- dataframe to check or already checked.
def run(self, mode: Optional[str] = None) ‑> None
-
Method used to run all the checks for the dataframe. If mode parameter it's used, table will be cached or persisted before running the check's pipeline.
Args
mode
:str
- what caching operation to perform. Accepted values are 'cache' and 'persist'.
def set_pipeline(self, pipeline_config: List[Dict])
-
Method to add a pipeline from a list of dicts.
Args
pipeline_config
:Iterable[Dict]
- configuration for a pipeline.