Module panama.data_quality.data_quality

Classes

class DataQuality (spark: pyspark.sql.session.SparkSession, df: pyspark.sql.dataframe.DataFrame)
Expand source code
class DataQuality:
    def __init__(self, spark: SparkSession, df: DataFrame):
        self.spark = spark
        self.df = df

        # List containing accepted inputs in add_check methods
        self.check_type_list = ["null", "unique", "thresholds"]
        self.action_list = ["warning", "fail", "drop"]

        # Init list containg queue of check to run
        self.pipeline = []

    @staticmethod
    def _validate_single_column_input(col: Union[str, list], check_type: str):
        """
        Check if column input contains single column name when null check is asked during add_method.
        Throws error if more than one column is found.

        Args:
            col (Union[str, list]): contains the argument input of add_check method.

        Raises:
            TypeError: If more than one column is passed.
            TypeError: If one column is passed but it's not represented as a string.
        """
        if isinstance(col, list):
            raise TypeError("With %s check_type please use one column at a time in columns parameter." % (check_type))
        elif not isinstance(col, str):
            raise TypeError(
                "With %s check_type, all columns arguments must be passed as a string value." % (check_type)
            )
        else:
            pass

    @staticmethod
    def _convert_col_input_into_list_type(col: Union[str, list]):
        """
        Convert col to list if a single column is passed.
        Throws error if the single column isn't not represented as a string.

        Args:
            col (Union[str, list]): contains the argument input of add_check method.

        Raises:
            TypeError: If one column is passed but it's not represented as a string.

        Returns:
            col (list): list containing col input.
        """
        if isinstance(col, list):
            return col
        elif isinstance(col, str):
            col = list(col)
            return col
        else:
            raise TypeError("With 'null' check_type, 'col' argument must be passed as a string or a list")

    def _validate_check_type_input(self, check_type: str):
        """
        Check if requested check type is one of the accepted.
        Throws error if doesn't.

        Args:
            check_type (str): contains the argument input of add_check method.

        Raises:
            ValueError: inform and suggests what check types are avaiable.
        """

        if check_type not in self.check_type_list:
            value_error_string = "'%s' value not accepted for check_type argument. Please use: %s" % (
                check_type,
                self.check_type_list,
            )
            raise ValueError(value_error_string)

    def _validate_action_input(self, action: str):
        """
        Check if requested action is one of the accepted.
        Throws error if doesn't.

        Args:
            action (str): contains the argument input of add_check method.

        Raises:
            ValueError: inform and suggests what actions are avaiable.
        """
        if action not in self.action_list:
            value_error_string = "'%s' value not accepted for check_type argument. Please use: %s" % (
                action,
                self.action_list,
            )
            raise ValueError(value_error_string)

    def _add_null_check(self, col: Union[str, list], action: str):
        """Private method used to add a null check.

        Args:
            col (Union[str, list]): column or list of columns whose values are to check.
            action (str): kind of action to perform. Can be either 'drop', 'warning' or 'fail'.
        """
        # Check if columns input is a single column
        self._validate_single_column_input(col=col, check_type="null")

        # Add check configuration to pipeline
        check = {"col": col, "check_type": "null", "action": action}
        self.pipeline.append(check)

    def _add_unique_check(self, col: Union[str, list], action: str):
        """Private method used to add a unique check.

        Args:
            col (Union[str, list]): column or list of columns whose values are to check.
            action (str): kind of action to perform. Can be either 'drop', 'warning' or 'fail'.
        """
        # Convert col argument to list when is single value
        self._convert_col_input_into_list_type(col)

        # Add check configuration to pipeline
        check = {"col": col, "check_type": "unique", "action": action}
        self.pipeline.append(check)

    def _add_thresholds_check(self, col: Union[str, list], action: str, check_config: dict = dict()):
        """Private method used to add a threshold check.

        Args:
            col (Union[str, list]): column or list of columns whose values are to check.
            action (str): kind of action to perform. Can be either 'drop', 'warning' or 'fail'.
            check_config (dict, optional): configuration of the check. Defaults to dict().
        """
        # Check if columns input is a single column
        self._validate_single_column_input(col=col, check_type="thresholds")

        # Validate check_config and apply eventual default values
        check_config = ThresholdsCheck.validate_or_default_check_config(check_config)

        # Add check configuration to pipeline
        check = {"col": col, "check_type": "thresholds", "action": action}
        check.update(check_config)
        self.pipeline.append(check)

    def add_check(self, col: Union[str, list], check_type: str, action: str, check_config: dict = dict()):
        """General method used to add a check.

        Args:
            col (Union[str, list]): column or list of columns whose values are to check.
            check_type (str): kind of check. Can be either 'null', 'drop' or 'thresholds'.
            action (str): kind of cation to perform. Can be either 'drop', 'warning' or 'fail'.
            check_config (dict, optional): Dictionary holding the configuration for a thresholds check. Unused if check_type != 'thresholds'. Defaults to dict().
        """
        # Validate check type
        self._validate_check_type_input(check_type=check_type)

        # Validate action
        self._validate_action_input(action=action)

        # Depending on check type, run specific function and add check to pipeline
        if check_type == "null":
            self._add_null_check(col=col, action=action)

        elif check_type == "unique":
            self._add_unique_check(col=col, action=action)

        elif check_type == "thresholds":
            self._add_thresholds_check(col=col, action=action, check_config=check_config)

    def set_pipeline(self, pipeline_config: List[Dict]):
        """Method to add a pipeline from a list of dicts.

        Args:
            pipeline_config (Iterable[Dict]): configuration for a pipeline.
        """
        self.pipeline = pipeline_config

    def _run_single_check(self, check_config: Dict):
        """Private method used to run a single check. Requires a configuration dictionary.

        Args:
            check_config (Dict): dictionary holding a configuration for a check.

        Returns:
            DataFrame: returns dataframe with dropped rows, if the step is 'drop'.
        """
        # get col value and check that is passed in configuration
        col = check_config.get("col")
        if col is None:
            raise ValueError("Missing col entry from check_config")

        # get the type of check from configuration
        check_type = check_config.get("check_type")

        # get the action type:
        action = check_config.get("action")

        if check_type == "null":
            check = NullCheck(df=self.df, col=col)
        elif check_type == "unique":
            check = UniqueCheck(df=self.df, col=col)
        elif check_type == "thresholds":
            kw_list = ["condition", "boundaries", "upper_bound_incl", "lower_bound_incl"]
            kwargs = {k: v for k, v in check_config.items() if k in kw_list}
            check = ThresholdsCheck(df=self.df, col=col, **kwargs)

        if action == "drop":
            self.df = check.drop()  # type: ignore
            return self.df
        elif action == "warning":
            check.warning()  # type: ignore
        elif action == "fail":
            check.fail()  # type: ignore

    def run(self, mode: Optional[str] = None) -> None:
        """Method used to run all the checks for the dataframe.
        If mode parameter it's used, table will be cached or persisted before running the check's pipeline.

        Args:
            mode (str): what caching operation to perform. Accepted values are 'cache' and 'persist'.
        """
        cache_or_persist_sdf(sdf=self.df, mode=mode)  # cache dataframe
        for check in self.pipeline:
            self._run_single_check(check_config=check)

    def return_df(self) -> DataFrame:
        """Get the dataframe to check.

        Returns:
            DataFrame: dataframe to check or already checked.
        """
        return self.df

    def clean_pipeline(self):
        print("Dropping every check in the current pipeline")
        self.pipeline = []

Methods

def add_check(self, col: Union[str, list], check_type: str, action: str, check_config: dict = {})

General method used to add a check.

Args

col : Union[str, list]
column or list of columns whose values are to check.
check_type : str
kind of check. Can be either 'null', 'drop' or 'thresholds'.
action : str
kind of cation to perform. Can be either 'drop', 'warning' or 'fail'.
check_config : dict, optional
Dictionary holding the configuration for a thresholds check. Unused if check_type != 'thresholds'. Defaults to dict().
def clean_pipeline(self)
def return_df(self) ‑> pyspark.sql.dataframe.DataFrame

Get the dataframe to check.

Returns

DataFrame
dataframe to check or already checked.
def run(self, mode: Optional[str] = None) ‑> None

Method used to run all the checks for the dataframe. If mode parameter it's used, table will be cached or persisted before running the check's pipeline.

Args

mode : str
what caching operation to perform. Accepted values are 'cache' and 'persist'.
def set_pipeline(self, pipeline_config: List[Dict])

Method to add a pipeline from a list of dicts.

Args

pipeline_config : Iterable[Dict]
configuration for a pipeline.