Module panama.utils

Sub-modules

panama.utils.context_manager
panama.utils.mock_utils
panama.utils.nbutils
panama.utils.scala_utils
panama.utils.spark_df
panama.utils.spark_utils
panama.utils.utils

Functions

def dbutils_fs_ls(path: str, spark: pyspark.sql.session.SparkSession) ‑> list
Expand source code
def dbutils_fs_ls(path: str, spark: SparkSession) -> list:
    """Function that just wrap the list file command from dbutils.
    Get dbutils object with SparkUtils class method and return the comeplete resulting list.

    Args:
        path (str): Path to list file from.
        spark (SparkSession): Spark session used to handle computation.

    Returns:
        list: The result of the list file command
    """
    dbutils = get_db_utils(spark)
    return dbutils.fs.ls(path)

Function that just wrap the list file command from dbutils. Get dbutils object with SparkUtils class method and return the comeplete resulting list.

Args

path : str
Path to list file from.
spark : SparkSession
Spark session used to handle computation.

Returns

list
The result of the list file command
def dbutils_fs_ls_names(path: str, spark: pyspark.sql.session.SparkSession) ‑> list
Expand source code
def dbutils_fs_ls_names(path: str, spark: SparkSession) -> list:
    """Run the dbutils_fs_ls function and then return a list containing only file names.

    Args:
        path (str): Path to list file from.
        spark (SparkSession): Spark session used to handle computation.

    Returns:
        list: List of file names.
    """
    dir_list = dbutils_fs_ls(path, spark)
    return [i.name for i in dir_list]

Run the dbutils_fs_ls function and then return a list containing only file names.

Args

path : str
Path to list file from.
spark : SparkSession
Spark session used to handle computation.

Returns

list
List of file names.
def dbutils_fs_ls_paths(path: str, spark: pyspark.sql.session.SparkSession) ‑> list
Expand source code
def dbutils_fs_ls_paths(path: str, spark: SparkSession) -> list:
    """Run the dbutils_fs_ls function and then return a list containing only file absolute paths.

    Args:
        path (str): Path to list file from.
        spark (SparkSession): Spark session used to handle computation.

    Returns:
        list: List of file paths.
    """
    dir_list = dbutils_fs_ls(path, spark)
    return [i.path for i in dir_list]

Run the dbutils_fs_ls function and then return a list containing only file absolute paths.

Args

path : str
Path to list file from.
spark : SparkSession
Spark session used to handle computation.

Returns

list
List of file paths.
def get_connection_config_json() ‑> dict
Expand source code
def get_connection_config_json() -> dict:
    """Get json file containing all data sources connection configurations.
    Read it as a dict and return it

    Returns:
        dict: Dict with all connection configurations.
    """
    connections_config_path = "/dbfs/pythonPackages/Panama/config/connections.json"

    if runtime() == "local":
        connections_config_path = connections_config_path[1:]

    if not os.path.exists(connections_config_path):
        warnings.warn("MISSING connections.json file, an empty dictionary is returned.")
        return dict()

    with open(connections_config_path, "r") as json_file:
        connection_config = json.load(json_file)

    return connection_config

Get json file containing all data sources connection configurations. Read it as a dict and return it

Returns

dict
Dict with all connection configurations.
def get_db_utils(spark)
Expand source code
def get_db_utils(spark):
    """
    Retrieves a DBUtils object based on the runtime environment.

    Args:
        spark (SparkSession): The SparkSession object.

    Returns:
        DBUtils: The DBUtils object.
    """
    # see https://learn.microsoft.com/en-gb/azure/databricks/dev-tools/databricks-connect#access-dbutils
    if runtime() == "databricks":
        if spark.conf.get("spark.databricks.service.client.enabled") == "true":
            from pyspark.dbutils import DBUtils  # type: ignore

            return DBUtils(spark)
        else:
            import IPython

            return IPython.get_ipython().user_ns["dbutils"]  # type: ignore
    else:
        from panama.dbutils_mock import DBUtils

        return DBUtils(spark)

Retrieves a DBUtils object based on the runtime environment.

Args

spark : SparkSession
The SparkSession object.

Returns

DBUtils
The DBUtils object.
def get_env_from_cluster_tag(spark: pyspark.sql.session.SparkSession) ‑> str
Expand source code
def get_env_from_cluster_tag(spark: SparkSession) -> str:
    """
    Uses active cluster id to ask for its custom tags. Then retrieve the `env` tag.
    Also checks that env is one of `dev`, `test` or `prod`.

    Args:
        spark (SparkSession): The SparkSession object

    Returns:
        str: the `env` tag in the current cluster representing acting workspace.
    """
    env_allowed_values = ["dev", "test", "prod"]

    # Get cluster ID from spark conf
    cluster_id = spark.conf.get("spark.databricks.clusterUsageTags.clusterId")

    # Init Databricks workspace client
    w = WorkspaceClient()

    # Read cluster's `env` tag
    env = w.clusters.get(cluster_id=cluster_id).custom_tags.get("env").lower()  # type: ignore

    # Check env validity
    if env not in env_allowed_values:
        raise ValueError(
            f"{env} is not a valid env tag. Please switch cluster or fix tag on the current one. Use one from {env_allowed_values}"
        )
    else:
        return env

Uses active cluster id to ask for its custom tags. Then retrieve the env tag. Also checks that env is one of dev, test or prod.

Args

spark : SparkSession
The SparkSession object

Returns

str
the env tag in the current cluster representing acting workspace.
def get_secret_from_keyvault(spark: pyspark.sql.session.SparkSession, key: str, scope=None) ‑> str
Expand source code
def get_secret_from_keyvault(spark: SparkSession, key: str, scope=None) -> str:
    """Get the content of a secret from Key Vault

    Args:
        spark (SparkSession): Spark session used to handle computation.
        key (str): Reference to specific Key Vault secret.
        scope (optional): Reference to specific Key Vault. Defaults to "aa-kv-secret" (default logic defined in function).

    Returns:
        str: Redacted (not visible) content of secret.
    """
    # Define default scope
    if scope is None:
        scope = "aa-kv-secret"
    else:
        pass
    dbutils = get_db_utils(spark)
    return dbutils.secrets.get(scope, key)  # type: ignore

Get the content of a secret from Key Vault

Args

spark : SparkSession
Spark session used to handle computation.
key : str
Reference to specific Key Vault secret.
scope : optional
Reference to specific Key Vault. Defaults to "aa-kv-secret" (default logic defined in function).

Returns

str
Redacted (not visible) content of secret.
def get_spark_session() ‑> pyspark.sql.session.SparkSession
Expand source code
def get_spark_session() -> SparkSession:
    """
    Retrieves a SparkSession object based on the runtime environment.

    Returns:
        SparkSession: The SparkSession object.
    """
    if runtime() == "databricks":
        return SparkSession.builder.getOrCreate()  # type: ignore
    else:
        # see https://docs.delta.io/latest/quick-start.html#set-up-apache-spark-with-delta-lake&language-python
        builder = (
            SparkSession.builder.appName("MyApp")  # type: ignore
            .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
            .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
            .config("spark.executor.memory", "1g")
            .config("spark.driver.memory", "4g")
            .enableHiveSupport()
        )
        return configure_spark_with_delta_pip(builder).getOrCreate()

Retrieves a SparkSession object based on the runtime environment.

Returns

SparkSession
The SparkSession object.
def runtime() ‑> str
Expand source code
def runtime() -> str:
    """
    Determine the runtime environment based on the presence of the "DATABRICKS_RUNTIME_VERSION" environment variable.

    Returns:
        str: The runtime environment, which can be either "databricks" or "local".
    """
    if "DATABRICKS_RUNTIME_VERSION" in os.environ:
        return "databricks"
    else:
        return "local"

Determine the runtime environment based on the presence of the "DATABRICKS_RUNTIME_VERSION" environment variable.

Returns

str
The runtime environment, which can be either "databricks" or "local".
def time_converter(unix_timestamp) ‑> time.struct_time
Expand source code
def time_converter(unix_timestamp) -> struct_time:
    """Function used to convert a unix_timestamp to a struct_time.

    Args:
        unix_timestamp (float): unix timestamp.

    Returns:
        struct_time: time object with datetime information.
    """
    return datetime.astimezone(datetime.fromtimestamp(unix_timestamp), tz=timezone("Europe/Rome")).timetuple()

Function used to convert a unix_timestamp to a struct_time.

Args

unix_timestamp : float
unix timestamp.

Returns

struct_time
time object with datetime information.
def timestamp_str_to_datetime(timestamp: str) ‑> datetime.datetime
Expand source code
def timestamp_str_to_datetime(timestamp: str) -> datetime:
    """Convert a timestamp string to a datetime. Allowed formats are '%Y-%m-%d' and '%Y-%m-%d %H:%M:%S'.
    If the first format is found, the time is assumed to be 23:59:59.

    Args:
        timestamp (str): timestamp to be converted.

    Returns:
        datetime: timestamp as a datetime object
    """
    date_pattern = "\d{4}-\d{2}-\d{2}$"  # type: ignore
    if re.match(date_pattern, timestamp.strip()):
        timestamp = " ".join([timestamp, "23:59:59"])
    return datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S")

Convert a timestamp string to a datetime. Allowed formats are '%Y-%m-%d' and '%Y-%m-%d %H:%M:%S'. If the first format is found, the time is assumed to be 23:59:59.

Args

timestamp : str
timestamp to be converted.

Returns

datetime
timestamp as a datetime object

Classes

class SparkUtils (spark: pyspark.sql.session.SparkSession | None = None)
Expand source code
class SparkUtils:
    def __init__(self, spark: Optional[SparkSession] = None) -> None:
        """Generate a basic spark session if not provided.

        Args:
            spark (Optional[SparkSession], optional): spark session to manipulate. Defaults to None.
        """
        if spark is None:
            self.spark = get_spark_session()
        else:
            self.spark = spark

        # Set dbutils
        self.dbutils = get_db_utils(self.spark)

    def get_dbutils(self):
        """Return dbutils object.

        Returns:
            DBUtils: dbutils.
        """
        return self.dbutils

    def get_spark_session(self) -> SparkSession:
        """Return spark session

        Returns:
            SparkSession: Spark session used to handle computation.
        """
        return self.spark

    def configure_all_data_sources_to_spark_session(self) -> SparkSession:
        """
        Set all the conf necessary to data storages connection and return spark session.

        Returns:
            (SparkSession): complete configuration of spark session
        """

        # Get connection config json
        connection_config = get_connection_config_json()

        # Set spark configuration for every connection, different modality for every storage type
        for conn_name in connection_config:
            conn = connection_config.get(conn_name)
            conn_credentials = conn.get("credentials")  # type: ignore
            conn_type = conn.get("type")  # type: ignore

            # Adls Gen2 configuration
            if conn_type == "adls":
                # Get secret from key vault
                service_credential = get_secret_from_keyvault(
                    spark=self.spark,
                    key=conn_credentials.get("client_secret"),
                    scope=conn_credentials.get("scope"),
                )

                # Set spark configuration
                self.spark = self.set_adls_conf(
                    storage_account=conn_credentials.get("storage_account"),
                    application_id=conn_credentials.get("application_id"),
                    service_credential=service_credential,
                    directory_id=conn_credentials.get("directory_id"),
                )

            elif conn_type == "blob":
                # Get secret from key vault
                service_credential = get_secret_from_keyvault(
                    spark=self.spark,
                    key=conn_credentials.get("client_secret"),
                    scope=conn_credentials.get("scope"),
                )

                # Set spark configuration
                self.spark = self.set_blob_conf(
                    storage_account=conn_credentials.get("storage_account"), service_credential=service_credential
                )

        return self.spark

    def set_adls_conf(
        self,
        storage_account: str,
        application_id: str,
        service_credential: str,
        directory_id: str,
    ) -> SparkSession:
        """Set spark configurations necessary to set connection to all adls Gen2 without mounting

        Args:
            spark (SparkSession)
            storage_account (str): azure storage account name
            application_id (str): azure adls name
            service_credential (str): adls connection key
            directory_id (str): azure directory id

        Returns:
            spark (SparkSession): spark session with configuration set
        """

        # Configuration for read_table
        self.spark.conf.set(f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net", "OAuth")
        self.spark.conf.set(
            f"fs.azure.account.oauth.provider.type.{storage_account}.dfs.core.windows.net",
            "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
        )
        self.spark.conf.set(
            f"fs.azure.account.oauth2.client.id.{storage_account}.dfs.core.windows.net", f"{application_id}"
        )
        self.spark.conf.set(
            f"fs.azure.account.oauth2.client.secret.{storage_account}.dfs.core.windows.net", service_credential
        )
        self.spark.conf.set(
            f"fs.azure.account.oauth2.client.endpoint.{storage_account}.dfs.core.windows.net",
            f"https://login.microsoftonline.com/{directory_id}/oauth2/token",
        )
        # Configuration for read_file
        self.spark.conf.set(f"fs.azure.account.key.{storage_account}.dfs.core.windows.net", service_credential)
        return self.spark

    def set_blob_conf(
        self,
        storage_account: str,
        service_credential: str,
    ) -> SparkSession:
        """Set spark configurations necessary to set connection to blob storage without mounting

        Args:
            spark (SparkSession)
            storage_account (str): azure storage account name
            service_credential (str): blob connection key

        Returns:
            spark (SparkSession): spark session with configuration set
        """

        # Configuration for read_file
        self.spark.conf.set(f"fs.azure.account.key.{storage_account}.blob.core.windows.net", service_credential)
        return self.spark

Generate a basic spark session if not provided.

Args

spark : Optional[SparkSession], optional
spark session to manipulate. Defaults to None.

Methods

def configure_all_data_sources_to_spark_session(self) ‑> pyspark.sql.session.SparkSession
Expand source code
def configure_all_data_sources_to_spark_session(self) -> SparkSession:
    """
    Set all the conf necessary to data storages connection and return spark session.

    Returns:
        (SparkSession): complete configuration of spark session
    """

    # Get connection config json
    connection_config = get_connection_config_json()

    # Set spark configuration for every connection, different modality for every storage type
    for conn_name in connection_config:
        conn = connection_config.get(conn_name)
        conn_credentials = conn.get("credentials")  # type: ignore
        conn_type = conn.get("type")  # type: ignore

        # Adls Gen2 configuration
        if conn_type == "adls":
            # Get secret from key vault
            service_credential = get_secret_from_keyvault(
                spark=self.spark,
                key=conn_credentials.get("client_secret"),
                scope=conn_credentials.get("scope"),
            )

            # Set spark configuration
            self.spark = self.set_adls_conf(
                storage_account=conn_credentials.get("storage_account"),
                application_id=conn_credentials.get("application_id"),
                service_credential=service_credential,
                directory_id=conn_credentials.get("directory_id"),
            )

        elif conn_type == "blob":
            # Get secret from key vault
            service_credential = get_secret_from_keyvault(
                spark=self.spark,
                key=conn_credentials.get("client_secret"),
                scope=conn_credentials.get("scope"),
            )

            # Set spark configuration
            self.spark = self.set_blob_conf(
                storage_account=conn_credentials.get("storage_account"), service_credential=service_credential
            )

    return self.spark

Set all the conf necessary to data storages connection and return spark session.

Returns

(SparkSession): complete configuration of spark session

def get_dbutils(self)
Expand source code
def get_dbutils(self):
    """Return dbutils object.

    Returns:
        DBUtils: dbutils.
    """
    return self.dbutils

Return dbutils object.

Returns

DBUtils
dbutils.
def get_spark_session(self) ‑> pyspark.sql.session.SparkSession
Expand source code
def get_spark_session(self) -> SparkSession:
    """Return spark session

    Returns:
        SparkSession: Spark session used to handle computation.
    """
    return self.spark

Return spark session

Returns

SparkSession
Spark session used to handle computation.
def set_adls_conf(self,
storage_account: str,
application_id: str,
service_credential: str,
directory_id: str) ‑> pyspark.sql.session.SparkSession
Expand source code
def set_adls_conf(
    self,
    storage_account: str,
    application_id: str,
    service_credential: str,
    directory_id: str,
) -> SparkSession:
    """Set spark configurations necessary to set connection to all adls Gen2 without mounting

    Args:
        spark (SparkSession)
        storage_account (str): azure storage account name
        application_id (str): azure adls name
        service_credential (str): adls connection key
        directory_id (str): azure directory id

    Returns:
        spark (SparkSession): spark session with configuration set
    """

    # Configuration for read_table
    self.spark.conf.set(f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net", "OAuth")
    self.spark.conf.set(
        f"fs.azure.account.oauth.provider.type.{storage_account}.dfs.core.windows.net",
        "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
    )
    self.spark.conf.set(
        f"fs.azure.account.oauth2.client.id.{storage_account}.dfs.core.windows.net", f"{application_id}"
    )
    self.spark.conf.set(
        f"fs.azure.account.oauth2.client.secret.{storage_account}.dfs.core.windows.net", service_credential
    )
    self.spark.conf.set(
        f"fs.azure.account.oauth2.client.endpoint.{storage_account}.dfs.core.windows.net",
        f"https://login.microsoftonline.com/{directory_id}/oauth2/token",
    )
    # Configuration for read_file
    self.spark.conf.set(f"fs.azure.account.key.{storage_account}.dfs.core.windows.net", service_credential)
    return self.spark

Set spark configurations necessary to set connection to all adls Gen2 without mounting

Args

spark (SparkSession)
storage_account : str
azure storage account name
application_id : str
azure adls name
service_credential : str
adls connection key
directory_id : str
azure directory id

Returns

spark (SparkSession): spark session with configuration set

def set_blob_conf(self, storage_account: str, service_credential: str) ‑> pyspark.sql.session.SparkSession
Expand source code
def set_blob_conf(
    self,
    storage_account: str,
    service_credential: str,
) -> SparkSession:
    """Set spark configurations necessary to set connection to blob storage without mounting

    Args:
        spark (SparkSession)
        storage_account (str): azure storage account name
        service_credential (str): blob connection key

    Returns:
        spark (SparkSession): spark session with configuration set
    """

    # Configuration for read_file
    self.spark.conf.set(f"fs.azure.account.key.{storage_account}.blob.core.windows.net", service_credential)
    return self.spark

Set spark configurations necessary to set connection to blob storage without mounting

Args

spark (SparkSession)
storage_account : str
azure storage account name
service_credential : str
blob connection key

Returns

spark (SparkSession): spark session with configuration set