Module panama.io

Sub-modules

panama.io.feature_store
panama.io.io

Classes

class DatabricksFeatureStore (spark: pyspark.sql.session.SparkSession, config: Optional[Dict] = None)

A feature store implementation that uses the Databricks Feature Store API.

Initializes a new instance of the DatabricksFeatureStore class and connects to the FeatureStoreClient.

Expand source code
class DatabricksFeatureStore(PanamaFeatureStore, FeatureStoreClient, IOInterface):
    """
    A feature store implementation that uses the Databricks Feature Store API.
    """

    def __init__(self, spark: SparkSession, config: Optional[Dict] = None):
        """
        Initializes a new instance of the DatabricksFeatureStore class and connects to the
        FeatureStoreClient.
        """
        self.spark = spark
        self.client = self.connect()
        PanamaFeatureStore.__init__(self, config)

    def connect(self) -> FeatureStoreClient:
        return FeatureStoreClient()

    def create_feature_table(
        self,
        table_name: str,
        primary_keys: List[str],
        description: str,
        df: Optional[DataFrame] = None,
        schema: Optional[StructType] = None,
        partition_columns: Optional[List[str]] = None,
    ) -> Union[str, None]:
        """
        Create and return a feature table in feature store with the given name and primary keys.

        The returned feature table has the given name and primary keys. Uses the provided schema or the inferred schema of the provided df. If df
        is provided, this data will be saved in a Delta table.

        Args:
            table_name (str): Name of the table to read from. Format must be "<catalog>.<schema>.<table>". Catalog must be indicated without env prefix.
            primary_keys (List[str]): A list of primary key columns.
            description (str): A description of the feature table.
            df (Optional, pyspark.sql.DataFrame): Table used to infer schema if not given.
            schema (Optional, StructType): Feature table schema.
            partition_columns (Optional, List[str]): A list of partition columns. Default to None.

        Returns:
            Union[str, None]: The name of the created feature table.
        """

        # First set catalog
        table_name = self._assign_env_to_string(s=table_name, purpose="w")

        schema_name_isolated = re.search(pattern=r"\.(\w*?)\.", string=table_name).group(1)  # type: ignore
        table_name_isolated = re.search(pattern=r"\.([^\.]+)$", string=table_name).group(1)  # type: ignore

        if (
            self.spark.sql(f"SHOW TABLES IN {schema_name_isolated}")
            .where(F.col("tableName") == table_name_isolated)
            .count()
            > 0
        ):
            trace = f"{table_name} already exists, function will interrupt and do nothing, use write_feature_table to add data to an existing table."
            print(trace)
            logger = panama.logging.get_logger()
            logger.warning(__name__, exc_info=Warning(trace))
            return
        else:
            # Create feature table
            self.client.create_table(
                df=df,
                name=table_name,
                primary_keys=primary_keys,
                schema=schema,
                partition_columns=partition_columns,
                description=description,
            )
            return f"Created Feature Store table at {table_name}"

    def write_feature_table(self, df: DataFrame, table_name: str, mode: str = "overwrite"):
        """
        Writes data to a feature table in the Databricks Feature Store.

        Args:
            df (pyspark.sql.DataFrame): The data to be written to the feature table.
            table_name (str): Name of the table to read from. Format must be "<catalog>.<schema>.<table>". Catalog must be indicated without env prefix.
            mode (str): The mode to use when writing data to the feature table (default is 'overwrite', other option is 'merge').
        """
        # First set catalog
        table_name = self._assign_env_to_string(s=table_name, purpose="w")

        # Write data to feature table
        self.client.write_table(name=table_name, df=df, mode=mode)

    def read_feature_table(self, table_name: str) -> DataFrame:
        """
        Reads data from a feature table in the Databricks Feature Store.

        Args:
            table_name (str): Name of the table to read from. Format must be "<catalog>.<schema>.<table>". Catalog must be indicated without env prefix.
            features (Optional[List[str]]): A list of feature names to be read (default is None, which reads all features).

        Returns:
            pyspark.sql.DataFrame: The data from the feature table.
        """
        # First set catalog
        table_name = self._assign_env_to_string(s=table_name, purpose="r")

        return self.client.read_table(name=table_name)

    def _get_table(self, name: str, purpose: str) -> FeatureTable:
        """
        Get a feature table’s metadata.

        Args:
            name (str): A feature table name of the form <catalog>.<schema>.<table_name>, for example dev_analytics.feature_store.user_features.
            catalog_name (Optional, str): Name of catalog to set before running the main functionality. If not specified the default object catalog will be used.
            purpose (str): string indicating if the string will be used toW read or to write table. Allowed values are 'r' and 'w' only.
        Returns:
            (databricks.feature_store.entities.feature_table.FeatureTable): Value class describing one feature table
        """
        # First set catalog
        table_name = self._assign_env_to_string(s=name, purpose=purpose)

        return FeatureStoreClient().get_table(name=table_name)

    def _unpack_feature_lookup_dict(self, lookup_dict: Dict) -> Tuple[List[FeatureLookup], List[PanamaFeatureLookup]]:
        """Unpack input dict and create two separated feature list, a FeatureLookup list and a PanamaFeatureLookup list.

        Args:
            lookup_dict (Dict): input dict with FeatureTables (or standard table)
            catalog_name (Optional, str): IT'S ONLY MEANT FOR TEST PURPOSE, DO NOT USE!!!

        Returns:
            Tuple[List[FeatureLookup], List[PanamaFeatureLookup]]: a list of FeatureLookup objects and a list of PanamaFeatureLookup
                to be used as input for feature_lookup method.
        """

        feature_lookup = []
        table_lookup = []

        for lookup in lookup_dict:
            lookup_config = lookup_dict.get(lookup)

            # Check if table's schema ends with `_fs`.
            # In that case it will be treated as a feature store feature lookup, otherwise as a panama feature lookup
            is_feature_table = lookup.split(".")[1].endswith("_fs")

            if is_feature_table == True:  # FeatureLookup case
                # If empty go find feature table primary keys and use those as lookup keys
                lookup_key = lookup_config.get("lookup_key")  # type: ignore
                if lookup_key is None:
                    lookup_key = self._get_table(name=lookup, purpose="r").primary_keys

                feature_names = lookup_config.get("feature_names")  # type: ignore

                feature_lookup.append(
                    FeatureLookup(
                        table_name=self._assign_env_to_string(lookup, "r"),
                        lookup_key=lookup_key,
                        feature_names=feature_names,
                    )
                )

            else:  # panama lookup case
                lookup_key = lookup_config.get("lookup_key")  # type: ignore
                feature_names = lookup_config.get("feature_names")  # type: ignore

                table_lookup.append(
                    PanamaFeatureLookup(
                        table_name=lookup,  # i don't need to add prefix because the io will do it.
                        lookup_key=lookup_key,
                        feature_names=feature_names,
                    )
                )

        return feature_lookup, table_lookup

    def _add_feature_to_sdf(
        self,
        df: DataFrame,
        feature_lookups: List[PanamaFeatureLookup],
        exclude_columns: Optional[List[str]] = None,
    ) -> DataFrame:
        """Enrich given pyspark.DataFrame with feature located in tables. The feature lookup are defined inside the PanamaFeatureLookup objects.

        Args:
            df (pyspark.DataFrame): input dataframe.
            feature_lookups (List(PanamaFeatureLookup)): List of features to join into the DataFrame.
            exclude_columns (List(str), optional): Names of the columns to drop from the enriched dataframe. Default to None.

        Returns:
            DataFrame: input dataframe plus desired features.
        """

        io = IOAdls(spark=self.spark)

        # Iterate over feature_lookups
        for feature in feature_lookups:
            # Read table and select desired features (plus keys by default)
            feature_table = io.read_table(
                table_name=feature.table_name,
            ).select(
                feature.lookup_key + feature.feature_names  # type: ignore
            )

            # Perform join over lookup key
            df = df.join(feature_table, how="left", on=feature.lookup_key)

        # If required drop excluded columns
        if exclude_columns is not None:
            df = df.drop(*exclude_columns)

        return df

    def feature_lookup(
        self,
        df: DataFrame,
        lookup_dict: Dict,
        label: str,
        exclude_columns: List[str] = [],
    ) -> TrainingSet:
        """Add features from feauture store to given DataFrame.

        Structure of lookup_dict must be:
        {
            "catalog.schema.table_name" (str - feature table):
                {
                    "lookup_key": (list - ordered list of columns in df that match feature table primary keys),
                    "feature_names": (list - features in feature table that we want to add to df)
                }
        }

        Args:
            df (DataFrame): input dataframe to enrich.
            lookup_dict (Dict): dictonary describing every feature lookup (structure described above).
            label (str): Names of column(s) in DataFrame that contain training set labels. To create a training set without a label field, i.e. for unsupervised training set, specify label = None
            exclude_columns (List(str)): Names of the columns to drop from the TrainingSet DataFrame

        Returns:
            DataFrame: Object of type TrainingSet (databricks.feature_store.training_set.TrainingSet)
        """
        # Separate feature store lookup from standard table lookup
        feature_lookup, table_lookup = self._unpack_feature_lookup_dict(lookup_dict)

        # First perform general table's feature lookup that enriches df
        if len(table_lookup) > 0:
            df = self._add_feature_to_sdf(df=df, feature_lookups=table_lookup, exclude_columns=exclude_columns)

        # Then perform feature table's FeatureLookup creating final training set
        training_set = self.client.create_training_set(
            df=df, feature_lookups=feature_lookup, label=label, exclude_columns=exclude_columns
        )

        return training_set

Ancestors

Methods

def connect(self) ‑> databricks.feature_store.client.FeatureStoreClient
def create_feature_table(self, table_name: str, primary_keys: List[str], description: str, df: Optional[pyspark.sql.dataframe.DataFrame] = None, schema: Optional[pyspark.sql.types.StructType] = None, partition_columns: Optional[List[str]] = None) ‑> Optional[str]

Create and return a feature table in feature store with the given name and primary keys.

The returned feature table has the given name and primary keys. Uses the provided schema or the inferred schema of the provided df. If df is provided, this data will be saved in a Delta table.

Args

table_name : str
Name of the table to read from. Format must be "..". Catalog must be indicated without env prefix.
primary_keys : List[str]
A list of primary key columns.
description : str
A description of the feature table.
df : Optional, pyspark.sql.DataFrame
Table used to infer schema if not given.
schema : Optional, StructType
Feature table schema.
partition_columns : Optional, List[str]
A list of partition columns. Default to None.

Returns

Union[str, None]
The name of the created feature table.
def feature_lookup(self, df: pyspark.sql.dataframe.DataFrame, lookup_dict: Dict, label: str, exclude_columns: List[str] = []) ‑> databricks.feature_store.training_set.TrainingSet

Add features from feauture store to given DataFrame.

Structure of lookup_dict must be: { "catalog.schema.table_name" (str - feature table): { "lookup_key": (list - ordered list of columns in df that match feature table primary keys), "feature_names": (list - features in feature table that we want to add to df) } }

Args

df : DataFrame
input dataframe to enrich.
lookup_dict : Dict
dictonary describing every feature lookup (structure described above).
label : str
Names of column(s) in DataFrame that contain training set labels. To create a training set without a label field, i.e. for unsupervised training set, specify label = None

exclude_columns (List(str)): Names of the columns to drop from the TrainingSet DataFrame

Returns

DataFrame
Object of type TrainingSet (databricks.feature_store.training_set.TrainingSet)
def read_feature_table(self, table_name: str) ‑> pyspark.sql.dataframe.DataFrame

Reads data from a feature table in the Databricks Feature Store.

Args

table_name : str
Name of the table to read from. Format must be "..
". Catalog must be indicated without env prefix.
features : Optional[List[str]]
A list of feature names to be read (default is None, which reads all features).

Returns

pyspark.sql.DataFrame
The data from the feature table.
def write_feature_table(self, df: pyspark.sql.dataframe.DataFrame, table_name: str, mode: str = 'overwrite')

Writes data to a feature table in the Databricks Feature Store.

Args

df : pyspark.sql.DataFrame
The data to be written to the feature table.
table_name : str
Name of the table to read from. Format must be "..
". Catalog must be indicated without env prefix.
mode : str
The mode to use when writing data to the feature table (default is 'overwrite', other option is 'merge').
class IOAdls (spark)

Extends the IO interface to SQL data structure.

Expand source code
class IOAdls(IOInterface):
    """
    Extends the IO interface to SQL data structure.
    """

    def __init__(self, spark):
        super().__init__(spark)

    @staticmethod
    def _generate_absolute_path(path: str, storage_account: str, container: str, source_type: str) -> str:
        """Generate the url that points to the required path inside the data lake.

        Args:
            path (str): File or table location
            storage_account (str): Azure storage account.
            container (str): Name of the container where to find files or tables.
            source_type (str): value extracted from connections.json indicating type of source (adls, blob, etc..)

        Returns:
            str: the final url to read data from.
        """
        if source_type == "adls":
            return f"abfss://{container}@{storage_account}.dfs.core.windows.net/{path}"
        elif source_type == "blob":
            return f"wasb://{container}@{storage_account}.blob.core.windows.net/{path}"
        else:
            raise ValueError("Accepted source_type in connections.json are 'adls', 'blob' in IOAdls")

    @staticmethod
    def _get_storage_account_from_name(adls_name: str):
        """Get storage connection config from config json

        Args:
            adls_name (str): Name of datalake used to get connection config.

        Returns:
            (str): Azure storage account.
        """
        adls_config = IOInterface._get_connection_config_json().get(adls_name)
        adls_storage_account = adls_config.get("credentials").get("storage_account")  # type: ignore
        return adls_storage_account

    @staticmethod
    def _get_source_type_from_name(adls_name: str):
        """Get storage connection config from config json

        Args:
            adls_name (str): Name of datalake used to get connection config.

        Returns:
            (str): source_type value.
        """
        adls_config = IOInterface._get_connection_config_json().get(adls_name)
        source_type = adls_config.get("type")  # type: ignore
        return source_type

    @staticmethod
    def generate_connection_string(adls_name: str, container: str, path: str = "") -> str:
        """Generate the url that points to the required path inside the data lake.

        Args:

            container (str): Name of the container where to find files or tables.
            path (str): File or table location

        Returns:
            str: the final url to read data from."""

        # Build absolute adsl path from adls name
        storage_account = IOAdls._get_storage_account_from_name(adls_name)
        source_type = IOAdls._get_source_type_from_name(adls_name)

        absolute_path = IOAdls._generate_absolute_path(
            path=path, storage_account=storage_account, container=container, source_type=source_type
        )
        return absolute_path

    ############################### READING

    @panama.logging.log_execution(blocking=True)
    def read_file(
        self,
        adls_name: str,
        container: str,
        file_path: str,
        file_format: str = "delta",
        extra_options: Optional[dict] = None,
        create_view: Optional[bool] = False,
    ) -> DataFrame:
        """Reads a table directly from an ADLS instance's path location.

        Args:
            adls_name (str): Name of datalake (or blob) used to get connection config.
            container (str): Name of the container where to find files.
            file_path (str): Path where to find file (or files). Use folder to read multiple files, or specific file path with extension.
            file_format (str, optional): Format of the file to read. Defaults to "delta".
            extra_options (Optional[dict], optional): Additional options to pass to the spark read command as a dictionary. Defaults to None.
            create_view (bool, optional): return temp view containing entire table content. Defaults to False.

        Returns:
            DataFrame: containing the data from the specified table.
        """
        storage_account = self._get_storage_account_from_name(adls_name)
        source_type = self._get_source_type_from_name(adls_name)

        # Compose datalake file path from given table info
        abs_file_path = self._generate_absolute_path(
            path=file_path, storage_account=storage_account, container=container, source_type=source_type
        )

        # Init reader
        sdf_reader = self.spark.read.format(file_format)

        if extra_options:
            sdf_reader = sdf_reader.options(**extra_options)

        sdf = sdf_reader.load(abs_file_path)

        if create_view:
            self._create_sdf_temp_view(sdf=sdf, table_name_path=file_path)

        # Read table
        return sdf

    @panama.logging.log_execution(blocking=True)
    def read_table(
        self,
        table_name: str,
        max_datetime: Union[str, None] = None,
        create_view: Optional[bool] = False,
        force_read_to_env: Optional[bool] = False,
    ) -> DataFrame:
        """Reads a table from unity catalog.

        ATTENTION: momentary fix will let the user read dev catalog from dev environment (instead of test catalog from dev environment).

        Args:
            table_name (str): Name of the table to read from. Format must be "<catalog>.<schema>.<table>". Catalog must be indicated without env prefix.
            max_datetime (str, optional): filter out datetime newer (inclusive) to datetime input. Format must be '%Y-%m-%d %H:%M:%S' or '%Y-%m-%d'. Defaults to None.
            table_format (str, optional): Format of the table to read (e.g., 'delta').. Defaults to "delta".
            create_view (bool, optional): return temp view containing entire table content. Defaults to False.
            force_read_to_env (Optional, bool): if True, force read on same env of execution. This will only works on '_analytics' catalogs. Default to False.

        Returns:
            DataFrame: containing the data from the specified table.
        """

        # If required, lookup for latest version of delta table wrt the max_datetime input
        if max_datetime:
            time_travel_datetime = self.get_latest_delta_datetime(
                table_name=table_name,
                max_datetime=max_datetime,
                force_read_to_env=force_read_to_env,
            )
        else:
            time_travel_datetime = None

        # Set env in catalog name
        table_name = self._assign_env_to_string(table_name, purpose="r", force_read_to_env=force_read_to_env)

        query = f"SELECT * FROM {table_name}"

        if time_travel_datetime:
            # Add time travel sql condition.
            query += f' TIMESTAMP AS OF "{time_travel_datetime}"'
        else:
            pass

        sdf = self.spark.sql(query)

        # Create view if requested
        if create_view:
            self._create_sdf_temp_view(sdf=sdf, table_name_path=table_name)  # type: ignore

        return sdf

    def get_latest_delta_datetime(
        self,
        table_name: str,
        max_datetime: Union[str, None] = None,
        force_read_to_env: Optional[bool] = False,
    ) -> datetime.datetime:
        """
        Get latest timestamp when the delta table was written.
        If max_date or max_datetime input is prior to oldest table timestamp return the latter.

        Args:
            table_name (str): Name of the table to read from. Format must be "<catalog>.<schema>.<table>". Catalog must be indicated without env prefix.
            max_datetime (str, optional): filter out datetime newer (inclusive) to datetime input. Format must be '%Y-%m-%d %H:%M:%S' or '%Y-%m-%d'. Defaults to None.
            force_read_to_env (Optional, bool): if True, force read on same env of execution. This will only works on '_analytics' catalogs. Default to False.

        Returns:
            datetime.datetime: latest time when the delta table was written.
        """

        # Validate max_datetime input format and define filter condition.
        if max_datetime:
            date_filter = timestamp_str_to_datetime(max_datetime)

        # Set env in catalog name
        table_name = self._assign_env_to_string(table_name, "r", force_read_to_env=force_read_to_env)

        # Query delta table's history looking for timestamps
        query = (
            f"select timestamp as last_ts from (select * from (describe history {table_name})) order by timestamp desc"
        )
        timestamp_list = self.spark.sql(query).collect()

        # Extract last_ts from every row
        timestamp_list = [row["last_ts"] for row in timestamp_list]

        # Return latest if max_datetime was not provided.
        if max_datetime is None:
            print(f"No max_date passed for {table_name} extraction, returning last available timestamp.")
            return timestamp_list[0]

        # Filter list based on defined filter
        filtered_timestamp_list = [t for t in timestamp_list if t <= date_filter]  # type: ignore

        # Return last available timestamp or absolute older if filter date is prior to it.
        if len(filtered_timestamp_list) > 0:
            output_timestamp = filtered_timestamp_list[0]
        else:
            warnings.warn("No timestamp prior to max_date/max_datetime, returning the oldest timestamp")
            output_timestamp = timestamp_list[-1]

        return output_timestamp

    def read_most_recent_file(
        self,
        container: str,
        adls_name: str,
        file_name: str,
        file_format: str,
        look_folder: str = "",
        max_date: Union[str, None] = None,
        max_iterations: int = 100,
        extra_options: Optional[dict] = None,
        create_view: Optional[bool] = False,
    ) -> DataFrame:
        """Get the most recent file path from a directory with the structure:
            look_folder:
                - "2023-01-01"
                    - filename.csv
                - "2023-02-01"
                    - filename.csv
                - "2023-03-01"
                    - filename.csv
        Then read the file and return the spark dataframe containing the data.

        Args:
            container (str): Name of the container where to find and read file.
            adls_name (str): Name of datalake used to get connection config.
            file_name (str): Name of the file that we want to get.
            file_format (str): Extension of the file we are looking for.
            look_folder (str, Optional): Main directory containing the data that needs to be selected. Default is root.
            date (str, optional): launch date, that works as an upper limit (inclusive) for the latest copies of the file. Format must be %Y-%m-%d. Defaults to None.
            max_iterations (int, optional): limit the number of searching tentatives. Defaults to 100.
            extra_options (Optional[dict], optional): Additional options to pass to the spark read command as a dictionary. Defaults to None.
            create_view (bool, optional): return temp view containing entire table content. Defaults to False.

        Returns:
            DataFrame: spark dataframe containing data inside most recent version of file required.
        """

        # Search for latest version of file and get it's path
        file_path = self.get_path_most_recent_file(
            container=container,
            adls_name=adls_name,
            look_folder=look_folder,
            file_name=file_name,
            file_format=file_format,
            max_date=max_date,
            max_iterations=max_iterations,
        )

        # Read file using path
        return self.read_file(
            adls_name=adls_name,
            container=container,
            file_path=file_path,
            file_format=file_format,
            extra_options=extra_options,
            create_view=create_view,
        )

    @panama.logging.log_execution(blocking=True)
    def get_path_most_recent_file(
        self,
        container: str,
        adls_name: str,
        file_name: str,
        file_format: str,
        look_folder: str = "",
        max_date: Union[str, None] = None,
        max_iterations: int = 10,
    ):
        """Get the most recent file path from a directory with the structure:
            look_folder:
                - "2023-01-01"
                    - filename.csv
                - "2023-02-01"
                    - filename.csv
                - "2023-03-01"
                    - filename.csv

        Args:
            container (str): Name of the container where to find files.
            adls_name (str): Name of datalake used to get connection config.
            file_name (str): Name of the file that we want to get (no extension).
            file_format (str): Extension of the file we are looking for.
            look_folder (str, Optional): Main directory containing the data that needs to be selected. Default is root.
            max_date (str, optional): launch date, that works as an upper limit (inclusive) for the latest copies of the file. Format must be %Y-%m-%d. Defaults to None.
            max_iterations (int, optional): limit the number of searching tentatives. Defaults to 10.
        """

        # Concatenate file_name and file_format
        if re.search(string=file_name, pattern=".*\..*"):  # type: ignore
            raise ValueError("Please provide file_name input without the extension. Use the file_format parameter.")
        else:
            file_name_complete = ".".join([file_name, file_format])

        # Build absolute adsl path of look_folder
        storage_account = self._get_storage_account_from_name(adls_name)
        source_type = self._get_source_type_from_name(adls_name)

        look_folder_path = self._generate_absolute_path(
            path=look_folder, storage_account=storage_account, container=container, source_type=source_type
        )

        # Get list of paths in look_folder
        dir_list = dbutils_fs_ls_names(look_folder_path, self.spark)

        # Select only folder with %Y-%m-%d format...
        date_format = re.compile("\d{4}-\d{2}-\d{2}")  # type: ignore
        date_dir_list = list(filter(date_format.match, dir_list))
        # ... filter by date if requested
        if max_date:
            try:
                re.search(string=max_date, pattern=date_format).group(0)  # type: ignore
            except:
                raise ValueError("max_date must be of format %Y-%m-%d")
            date_dir_list = [i for i in date_dir_list if i <= f"{max_date}/"]
        # ... and sort
        date_dir_list = sorted(date_dir_list, reverse=True)

        try:
            print(f"{len(date_dir_list)} subfolder found, searching for file starting from more recent")
        except:
            raise FileNotFoundError(f"{look_folder} does not contain any folder with valid date format (%Y-%m-%d)")

        # Start searching for desired file
        file_found = None
        n = 0
        while file_found == None and n <= min(len(date_dir_list), max_iterations):
            # Search in current subfolder iteration a path that ends with file name
            try:
                p = look_folder_path + "/" + date_dir_list[n]
            except:
                raise FileNotFoundError(
                    "File not found! Check the file name (remember to use the extension) or the folder path"
                )
            try:
                # If successfull will interrupt the loop
                file_found = [i for i in dbutils_fs_ls_names(path=p, spark=self.spark) if i == file_name_complete][0]
                print("File found in folder:", str(date_dir_list[n])[0:-1])
                return "/".join([look_folder, date_dir_list[n], file_found])
            except:
                # Next iteration
                n += 1

        # If loop ended means that file was not found, throw error
        raise IndexError(
            "File not found! Check the file name (remember to use the extension) or the folder path or use a higher max_iterations"
        )

    ############################## WRITING

    @staticmethod
    def _df_repartitioning(sdf, mode: str, partitions: int) -> DataFrame:
        """Perform either repartioning or coalesce over input spark dataframe.

        Args:
            sdf (DataFrame): spark dataframe, main subject of the function.
            mode (str): string to control which operation to perform. Accepted values are 'repartition' and 'coalesce'.
            partitions (int): number of partition to obtain. Object of the selected method.

        Raises:
            KeyError: when mode is not 'repartition' or 'coalesce'.

        Returns:
            DataFrame: repartitioned spark dataframe.
        """
        if mode == "repartition":
            return sdf.repartition(partitions)
        elif mode == "coalesce":
            return sdf.coalesce(partitions)
        else:
            raise KeyError("Repartitioning modality unknown. Please use 'repartition' or 'coalesce'")

    @panama.logging.log_execution(blocking=True)  # type: ignore
    def write_file(
        self,
        data: DataFrame,
        adls_name: str,
        container: str,
        file_path: str,
        mode: str = "append",
        file_format: str = "delta",
        repartitioning: Optional[list] = None,
        partition_by: Optional[list] = None,
        extra_options: Optional[dict] = None,
        save_as_table: bool = False,
    ) -> None:
        """Write data into datalake's table.
        Optionally provide "repartitiong" a list like [str, int] to perform repartitiong of dataframe before writing.
        Accepted repartitiong modality are "repartition" and "coalesce".

        Args:
            data (DataFrame): spark dataframe containing data.
            adls_name (str): Name of datalake used to get connection config.
            container (str): Name of the container where to write files.
            file_path (str): Path where to write file. Provide path containing a table. If empty, last part of path will be the name of the new table.
            mode (str, optional): spark write mode. Defaults to "append".
            file_format (str, optional): Format of the file to write. Defaults to "delta".
            repartitioning (Optional[list], optional): List indicating how to perform pre-writing repartitioning. Defaults to None.
            partition_by (Optional[list], optional): column(s) to use as writing partitions. Defaults to None.
            extra_options (Optional[dict], optional): Additional options to pass to the spark write command as a dictionary. Defaults to None.
            save_as_table (bool, optional): if True, the write command will be SaveAsTable, else it will use save. Defaults to False.
        """
        # Compose datalake file path from given table info
        storage_account = self._get_storage_account_from_name(adls_name)
        source_type = self._get_source_type_from_name(adls_name)

        file_path = self._generate_absolute_path(
            path=file_path, storage_account=storage_account, container=container, source_type=source_type
        )

        # If dataframe partitioning instrunction were given, execute it accordingly
        if repartitioning is not None and len(repartitioning) == 2:
            data = self._df_repartitioning(sdf=data, mode=repartitioning[0], partitions=repartitioning[1])

        # Init data wirter
        data_writer = data.write

        # If partition_columns is provided then add partitionBy step to write pipeline
        if partition_by is not None:
            data_writer = data_writer.partitionBy(partition_by)

        # Add extra options if any was provided
        if extra_options:
            data_writer = data_writer.options(**extra_options)

        # Define write format and modality
        data_writer = data_writer.format(file_format).mode(mode)

        # Final save statement
        if save_as_table == True:
            data_writer.saveAsTable(file_path)
        else:
            data_writer.save(file_path)
        print(f"Table written in {file_path}")

Ancestors

Static methods

def generate_connection_string(adls_name: str, container: str, path: str = '') ‑> str

Generate the url that points to the required path inside the data lake.

Args

container : str
Name of the container where to find files or tables.
path : str
File or table location

Returns

str
the final url to read data from.

Methods

def get_latest_delta_datetime(self, table_name: str, max_datetime: Optional[str] = None, force_read_to_env: Optional[bool] = False) ‑> datetime.datetime

Get latest timestamp when the delta table was written. If max_date or max_datetime input is prior to oldest table timestamp return the latter.

Args

table_name : str
Name of the table to read from. Format must be "..
". Catalog must be indicated without env prefix.
max_datetime : str, optional
filter out datetime newer (inclusive) to datetime input. Format must be '%Y-%m-%d %H:%M:%S' or '%Y-%m-%d'. Defaults to None.
force_read_to_env : Optional, bool
if True, force read on same env of execution. This will only works on '_analytics' catalogs. Default to False.

Returns

datetime.datetime
latest time when the delta table was written.
def get_path_most_recent_file(self, container: str, adls_name: str, file_name: str, file_format: str, look_folder: str = '', max_date: Optional[str] = None, max_iterations: int = 10)

Get the most recent file path from a directory with the structure: look_folder: - "2023-01-01" - filename.csv - "2023-02-01" - filename.csv - "2023-03-01" - filename.csv

Args

container : str
Name of the container where to find files.
adls_name : str
Name of datalake used to get connection config.
file_name : str
Name of the file that we want to get (no extension).
file_format : str
Extension of the file we are looking for.
look_folder : str, Optional
Main directory containing the data that needs to be selected. Default is root.
max_date : str, optional
launch date, that works as an upper limit (inclusive) for the latest copies of the file. Format must be %Y-%m-%d. Defaults to None.
max_iterations : int, optional
limit the number of searching tentatives. Defaults to 10.
def read_file(self, adls_name: str, container: str, file_path: str, file_format: str = 'delta', extra_options: Optional[dict] = None, create_view: Optional[bool] = False) ‑> pyspark.sql.dataframe.DataFrame

Reads a table directly from an ADLS instance's path location.

Args

adls_name : str
Name of datalake (or blob) used to get connection config.
container : str
Name of the container where to find files.
file_path : str
Path where to find file (or files). Use folder to read multiple files, or specific file path with extension.
file_format : str, optional
Format of the file to read. Defaults to "delta".
extra_options : Optional[dict], optional
Additional options to pass to the spark read command as a dictionary. Defaults to None.
create_view : bool, optional
return temp view containing entire table content. Defaults to False.

Returns

DataFrame
containing the data from the specified table.
def read_most_recent_file(self, container: str, adls_name: str, file_name: str, file_format: str, look_folder: str = '', max_date: Optional[str] = None, max_iterations: int = 100, extra_options: Optional[dict] = None, create_view: Optional[bool] = False) ‑> pyspark.sql.dataframe.DataFrame

Get the most recent file path from a directory with the structure: look_folder: - "2023-01-01" - filename.csv - "2023-02-01" - filename.csv - "2023-03-01" - filename.csv Then read the file and return the spark dataframe containing the data.

Args

container : str
Name of the container where to find and read file.
adls_name : str
Name of datalake used to get connection config.
file_name : str
Name of the file that we want to get.
file_format : str
Extension of the file we are looking for.
look_folder : str, Optional
Main directory containing the data that needs to be selected. Default is root.
date : str, optional
launch date, that works as an upper limit (inclusive) for the latest copies of the file. Format must be %Y-%m-%d. Defaults to None.
max_iterations : int, optional
limit the number of searching tentatives. Defaults to 100.
extra_options : Optional[dict], optional
Additional options to pass to the spark read command as a dictionary. Defaults to None.
create_view : bool, optional
return temp view containing entire table content. Defaults to False.

Returns

DataFrame
spark dataframe containing data inside most recent version of file required.
def read_table(self, table_name: str, max_datetime: Optional[str] = None, create_view: Optional[bool] = False, force_read_to_env: Optional[bool] = False) ‑> pyspark.sql.dataframe.DataFrame

Reads a table from unity catalog.

ATTENTION: momentary fix will let the user read dev catalog from dev environment (instead of test catalog from dev environment).

Args

table_name : str
Name of the table to read from. Format must be "..
". Catalog must be indicated without env prefix.
max_datetime : str, optional
filter out datetime newer (inclusive) to datetime input. Format must be '%Y-%m-%d %H:%M:%S' or '%Y-%m-%d'. Defaults to None.
table_format : str, optional
Format of the table to read (e.g., 'delta').. Defaults to "delta".
create_view : bool, optional
return temp view containing entire table content. Defaults to False.
force_read_to_env : Optional, bool
if True, force read on same env of execution. This will only works on '_analytics' catalogs. Default to False.

Returns

DataFrame
containing the data from the specified table.
def write_file(self, data: pyspark.sql.dataframe.DataFrame, adls_name: str, container: str, file_path: str, mode: str = 'append', file_format: str = 'delta', repartitioning: Optional[list] = None, partition_by: Optional[list] = None, extra_options: Optional[dict] = None, save_as_table: bool = False) ‑> None

Write data into datalake's table. Optionally provide "repartitiong" a list like [str, int] to perform repartitiong of dataframe before writing. Accepted repartitiong modality are "repartition" and "coalesce".

Args

data : DataFrame
spark dataframe containing data.
adls_name : str
Name of datalake used to get connection config.
container : str
Name of the container where to write files.
file_path : str
Path where to write file. Provide path containing a table. If empty, last part of path will be the name of the new table.
mode : str, optional
spark write mode. Defaults to "append".
file_format : str, optional
Format of the file to write. Defaults to "delta".
repartitioning : Optional[list], optional
List indicating how to perform pre-writing repartitioning. Defaults to None.
partition_by : Optional[list], optional
column(s) to use as writing partitions. Defaults to None.
extra_options : Optional[dict], optional
Additional options to pass to the spark write command as a dictionary. Defaults to None.
save_as_table : bool, optional
if True, the write command will be SaveAsTable, else it will use save. Defaults to False.
class IOSql (spark)

Extends the IO interface to SQL data structure.

Expand source code
class IOSql(IOInterface):
    """
    Extends the IO interface to SQL data structure.
    """

    def __init__(self, spark):
        super().__init__(spark)

    def _get_connection_properties(self, database_name: str) -> dict:
        """Returns the connection properties for the specified database.

        Args:
            database_name (str): Name of the database.

        Raises:
            ValueError: Raise error if the database is not sqlserver or oracle.

        Returns:
            dict: A dict containing connection configuration.
        """

        # Get database connection config from config json
        database_config = super()._get_connection_config_json().get(database_name)

        database_credentials = database_config.get("credentials")  # type: ignore

        # Get password from KeVault using relative secret
        pswd = get_secret_from_keyvault(
            key=database_credentials.get("password"), spark=self.spark, scope=database_credentials.get("scope")
        )

        # Generate url depending on database type and add driver if necessary
        if database_config.get("type") == "sqlserver":  # type: ignore
            url = f"jdbc:sqlserver://{database_credentials.get('host')};databaseName={database_credentials.get('database')}"
            driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
            connection_config = {
                "url": url,
                "user": database_credentials.get("username"),
                "password": pswd,
                "driver": driver,
                "fetchSize": 1000,
            }
        elif database_config.get("type") == "oracle":  # type: ignore
            url = f"jdbc:oracle:thin:@{database_credentials.get('host')}:{database_credentials.get('port')}/{database_credentials.get('service_name')}"
            driver = "oracle.jdbc.driver.OracleDriver"
            connection_config = {
                "url": url,
                "user": database_credentials.get("username"),
                "password": pswd,
                "driver": driver,
                "oracle.jdbc.timezoneAsRegion": False,
                "fetchSize": 1000,
            }
        else:
            raise ValueError(f"Unsupported database type: {database_config.get('type')}")  # type: ignore

        return connection_config

    @staticmethod
    def _convert_all_decimal_columns_to_double(sdf):
        decimal_cols = [col[0] for col in sdf.dtypes if col[1].startswith("decimal")]

        for col_name in decimal_cols:
            sdf = sdf.withColumn(col_name, F.col(col_name).cast("double"))
        return sdf

    @panama.logging.log_execution(blocking=True)
    def read_table(
        self,
        database_name: str,
        table_name: Optional[str] = None,
        query: Optional[str] = None,
        extra_options: Optional[dict] = None,
        create_view: Optional[bool] = False,
    ) -> DataFrame:
        """Reads data from a table in an SQLServer or Oracle database.

        Args:
            database_name (str): Name of the database to read from.
            table_name (Optional[str], optional): optional when input 'query' parameter, name of the table to read from. Format of table_name is "schema.table". Defaults to None.
            query (Optional[str], optional): optional when input 'table_name' parameter, query to execute instead of reading the whole table. Defaults to None.
            extra_options (Optional[dict], optional): Additional options to pass to the JDBC connector as a dictionary. Defaults to None.
            create_view (bool, optional): return temp view containing entire table content. Defaults to False.

        Raises:
            ValueError: Raise error if user try to input both query and table_name at the same time.

        Returns:
            DataFrame: A Spark DataFrame containing the data from the specified table or query.
        """

        # Check if user provided only one argument between table_name and query
        if (table_name is not None) and (query is not None):
            raise ValueError("Cannot specify both table_name and query.")

        connection_config = self._get_connection_properties(database_name)

        if table_name:
            connection_config.update({"dbtable": table_name})
        elif query:
            connection_config.update({"query": query})

        if extra_options:
            connection_config.update(extra_options)

        sdf = self.spark.read.format("jdbc").options(**connection_config).load()

        # Convert all decimal columns to doubleType standard
        sdf = self._convert_all_decimal_columns_to_double(sdf)

        if create_view:
            if table_name is None:
                table_name = re.search(string=query, pattern="((FROM)|(from))\s([\w\.]+)\s")  # type: ignore
            self._create_sdf_temp_view(sdf=sdf, table_name_path=table_name)  # type: ignore

        return sdf

Ancestors

Methods

def read_table(self, database_name: str, table_name: Optional[str] = None, query: Optional[str] = None, extra_options: Optional[dict] = None, create_view: Optional[bool] = False) ‑> pyspark.sql.dataframe.DataFrame

Reads data from a table in an SQLServer or Oracle database.

Args

database_name : str
Name of the database to read from.
table_name : Optional[str], optional
optional when input 'query' parameter, name of the table to read from. Format of table_name is "schema.table". Defaults to None.
query : Optional[str], optional
optional when input 'table_name' parameter, query to execute instead of reading the whole table. Defaults to None.
extra_options : Optional[dict], optional
Additional options to pass to the JDBC connector as a dictionary. Defaults to None.
create_view : bool, optional
return temp view containing entire table content. Defaults to False.

Raises

ValueError
Raise error if user try to input both query and table_name at the same time.

Returns

DataFrame
A Spark DataFrame containing the data from the specified table or query.