Module panama.utils.spark_utils

Functions

def dbutils_fs_ls(path: str, spark: pyspark.sql.session.SparkSession) ‑> list

Function that just wrap the list file command from dbutils. Get dbutils object with SparkUtils class method and return the comeplete resulting list.

Args

path : str
Path to list file from.
spark : SparkSession
Spark session used to handle computation.

Returns

list
The result of the list file command
def dbutils_fs_ls_names(path: str, spark: pyspark.sql.session.SparkSession) ‑> list

Run the dbutils_fs_ls function and then return a list containing only file names.

Args

path : str
Path to list file from.
spark : SparkSession
Spark session used to handle computation.

Returns

list
List of file names.
def dbutils_fs_ls_paths(path: str, spark: pyspark.sql.session.SparkSession) ‑> list

Run the dbutils_fs_ls function and then return a list containing only file absolute paths.

Args

path : str
Path to list file from.
spark : SparkSession
Spark session used to handle computation.

Returns

list
List of file paths.
def get_connection_config_json() ‑> dict

Get json file containing all data sources connection configurations. Read it as a dict and return it

Returns

dict
Dict with all connection configurations.
def get_db_utils(spark)

Retrieves a DBUtils object based on the runtime environment.

Args

spark : SparkSession
The SparkSession object.

Returns

DBUtils
The DBUtils object.
def get_env_from_cluster_tag(spark: pyspark.sql.session.SparkSession) ‑> str

Uses active cluster id to ask for its custom tags. Then retrieve the env tag. Also checks that env is one of dev, test or prod.

Args

spark : SparkSession
The SparkSession object

Returns

str
the env tag in the current cluster representing acting workspace.
def get_secret_from_keyvault(spark: pyspark.sql.session.SparkSession, key: str, scope=None) ‑> str

Get the content of a secret from Key Vault

Args

spark : SparkSession
Spark session used to handle computation.
key : str
Reference to specific Key Vault secret.
scope : optional
Reference to specific Key Vault. Defaults to "aa-kv-secret" (default logic defined in function).

Returns

str
Redacted (not visible) content of secret.
def get_spark_session() ‑> pyspark.sql.session.SparkSession

Retrieves a SparkSession object based on the runtime environment.

Returns

SparkSession
The SparkSession object.
def runtime() ‑> str

Determine the runtime environment based on the presence of the "DATABRICKS_RUNTIME_VERSION" environment variable.

Returns

str
The runtime environment, which can be either "databricks" or "local".

Classes

class SparkUtils (spark: Optional[pyspark.sql.session.SparkSession] = None)

Generate a basic spark session if not provided.

Args

spark : Optional[SparkSession], optional
spark session to manipulate. Defaults to None.
Expand source code
class SparkUtils:
    def __init__(self, spark: Optional[SparkSession] = None) -> None:
        """Generate a basic spark session if not provided.

        Args:
            spark (Optional[SparkSession], optional): spark session to manipulate. Defaults to None.
        """
        if spark is None:
            self.spark = get_spark_session()
        else:
            self.spark = spark

        # Set dbutils
        self.dbutils = get_db_utils(self.spark)

    def get_dbutils(self):
        """Return dbutils object.

        Returns:
            DBUtils: dbutils.
        """
        return self.dbutils

    def get_spark_session(self) -> SparkSession:
        """Return spark session

        Returns:
            SparkSession: Spark session used to handle computation.
        """
        return self.spark

    def configure_all_data_sources_to_spark_session(self) -> SparkSession:
        """
        Set all the conf necessary to data storages connection and return spark session.

        Returns:
            (SparkSession): complete configuration of spark session
        """

        # Get connection config json
        connection_config = get_connection_config_json()

        # Set spark configuration for every connection, different modality for every storage type
        for conn_name in connection_config:
            conn = connection_config.get(conn_name)
            conn_credentials = conn.get("credentials")  # type: ignore
            conn_type = conn.get("type")  # type: ignore

            # Adls Gen2 configuration
            if conn_type == "adls":
                # Get secret from key vault
                service_credential = get_secret_from_keyvault(
                    spark=self.spark,
                    key=conn_credentials.get("client_secret"),
                    scope=conn_credentials.get("scope"),
                )

                # Set spark configuration
                self.spark = self.set_adls_conf(
                    storage_account=conn_credentials.get("storage_account"),
                    application_id=conn_credentials.get("application_id"),
                    service_credential=service_credential,
                    directory_id=conn_credentials.get("directory_id"),
                )

            elif conn_type == "blob":
                # Get secret from key vault
                service_credential = get_secret_from_keyvault(
                    spark=self.spark,
                    key=conn_credentials.get("client_secret"),
                    scope=conn_credentials.get("scope"),
                )

                # Set spark configuration
                self.spark = self.set_blob_conf(
                    storage_account=conn_credentials.get("storage_account"), service_credential=service_credential
                )

        return self.spark

    def set_adls_conf(
        self,
        storage_account: str,
        application_id: str,
        service_credential: str,
        directory_id: str,
    ) -> SparkSession:
        """Set spark configurations necessary to set connection to all adls Gen2 without mounting

        Args:
            spark (SparkSession)
            storage_account (str): azure storage account name
            application_id (str): azure adls name
            service_credential (str): adls connection key
            directory_id (str): azure directory id

        Returns:
            spark (SparkSession): spark session with configuration set
        """

        # Configuration for read_table
        self.spark.conf.set(f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net", "OAuth")
        self.spark.conf.set(
            f"fs.azure.account.oauth.provider.type.{storage_account}.dfs.core.windows.net",
            "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
        )
        self.spark.conf.set(
            f"fs.azure.account.oauth2.client.id.{storage_account}.dfs.core.windows.net", f"{application_id}"
        )
        self.spark.conf.set(
            f"fs.azure.account.oauth2.client.secret.{storage_account}.dfs.core.windows.net", service_credential
        )
        self.spark.conf.set(
            f"fs.azure.account.oauth2.client.endpoint.{storage_account}.dfs.core.windows.net",
            f"https://login.microsoftonline.com/{directory_id}/oauth2/token",
        )
        # Configuration for read_file
        self.spark.conf.set(f"fs.azure.account.key.{storage_account}.dfs.core.windows.net", service_credential)
        return self.spark

    def set_blob_conf(
        self,
        storage_account: str,
        service_credential: str,
    ) -> SparkSession:
        """Set spark configurations necessary to set connection to blob storage without mounting

        Args:
            spark (SparkSession)
            storage_account (str): azure storage account name
            service_credential (str): blob connection key

        Returns:
            spark (SparkSession): spark session with configuration set
        """

        # Configuration for read_file
        self.spark.conf.set(f"fs.azure.account.key.{storage_account}.blob.core.windows.net", service_credential)
        return self.spark

Methods

def configure_all_data_sources_to_spark_session(self) ‑> pyspark.sql.session.SparkSession

Set all the conf necessary to data storages connection and return spark session.

Returns

(SparkSession): complete configuration of spark session

def get_dbutils(self)

Return dbutils object.

Returns

DBUtils
dbutils.
def get_spark_session(self) ‑> pyspark.sql.session.SparkSession

Return spark session

Returns

SparkSession
Spark session used to handle computation.
def set_adls_conf(self, storage_account: str, application_id: str, service_credential: str, directory_id: str) ‑> pyspark.sql.session.SparkSession

Set spark configurations necessary to set connection to all adls Gen2 without mounting

Args

spark (SparkSession)
storage_account : str
azure storage account name
application_id : str
azure adls name
service_credential : str
adls connection key
directory_id : str
azure directory id

Returns

spark (SparkSession): spark session with configuration set

def set_blob_conf(self, storage_account: str, service_credential: str) ‑> pyspark.sql.session.SparkSession

Set spark configurations necessary to set connection to blob storage without mounting

Args

spark (SparkSession)
storage_account : str
azure storage account name
service_credential : str
blob connection key

Returns

spark (SparkSession): spark session with configuration set