Module panama.utils.spark_utils
Functions
def dbutils_fs_ls(path: str, spark: pyspark.sql.session.SparkSession) ‑> list
-
Function that just wrap the list file command from dbutils. Get dbutils object with SparkUtils class method and return the comeplete resulting list.
Args
path
:str
- Path to list file from.
spark
:SparkSession
- Spark session used to handle computation.
Returns
list
- The result of the list file command
def dbutils_fs_ls_names(path: str, spark: pyspark.sql.session.SparkSession) ‑> list
-
Run the dbutils_fs_ls function and then return a list containing only file names.
Args
path
:str
- Path to list file from.
spark
:SparkSession
- Spark session used to handle computation.
Returns
list
- List of file names.
def dbutils_fs_ls_paths(path: str, spark: pyspark.sql.session.SparkSession) ‑> list
-
Run the dbutils_fs_ls function and then return a list containing only file absolute paths.
Args
path
:str
- Path to list file from.
spark
:SparkSession
- Spark session used to handle computation.
Returns
list
- List of file paths.
def get_connection_config_json() ‑> dict
-
Get json file containing all data sources connection configurations. Read it as a dict and return it
Returns
dict
- Dict with all connection configurations.
def get_db_utils(spark)
-
Retrieves a DBUtils object based on the runtime environment.
Args
spark
:SparkSession
- The SparkSession object.
Returns
DBUtils
- The DBUtils object.
def get_env_from_cluster_tag(spark: pyspark.sql.session.SparkSession) ‑> str
-
Uses active cluster id to ask for its custom tags. Then retrieve the
env
tag. Also checks that env is one ofdev
,test
orprod
.Args
spark
:SparkSession
- The SparkSession object
Returns
str
- the
env
tag in the current cluster representing acting workspace.
def get_secret_from_keyvault(spark: pyspark.sql.session.SparkSession, key: str, scope=None) ‑> str
-
Get the content of a secret from Key Vault
Args
spark
:SparkSession
- Spark session used to handle computation.
key
:str
- Reference to specific Key Vault secret.
scope
:optional
- Reference to specific Key Vault. Defaults to "aa-kv-secret" (default logic defined in function).
Returns
str
- Redacted (not visible) content of secret.
def get_spark_session() ‑> pyspark.sql.session.SparkSession
-
Retrieves a SparkSession object based on the runtime environment.
Returns
SparkSession
- The SparkSession object.
def runtime() ‑> str
-
Determine the runtime environment based on the presence of the "DATABRICKS_RUNTIME_VERSION" environment variable.
Returns
str
- The runtime environment, which can be either "databricks" or "local".
Classes
class SparkUtils (spark: Optional[pyspark.sql.session.SparkSession] = None)
-
Generate a basic spark session if not provided.
Args
spark
:Optional[SparkSession]
, optional- spark session to manipulate. Defaults to None.
Expand source code
class SparkUtils: def __init__(self, spark: Optional[SparkSession] = None) -> None: """Generate a basic spark session if not provided. Args: spark (Optional[SparkSession], optional): spark session to manipulate. Defaults to None. """ if spark is None: self.spark = get_spark_session() else: self.spark = spark # Set dbutils self.dbutils = get_db_utils(self.spark) def get_dbutils(self): """Return dbutils object. Returns: DBUtils: dbutils. """ return self.dbutils def get_spark_session(self) -> SparkSession: """Return spark session Returns: SparkSession: Spark session used to handle computation. """ return self.spark def configure_all_data_sources_to_spark_session(self) -> SparkSession: """ Set all the conf necessary to data storages connection and return spark session. Returns: (SparkSession): complete configuration of spark session """ # Get connection config json connection_config = get_connection_config_json() # Set spark configuration for every connection, different modality for every storage type for conn_name in connection_config: conn = connection_config.get(conn_name) conn_credentials = conn.get("credentials") # type: ignore conn_type = conn.get("type") # type: ignore # Adls Gen2 configuration if conn_type == "adls": # Get secret from key vault service_credential = get_secret_from_keyvault( spark=self.spark, key=conn_credentials.get("client_secret"), scope=conn_credentials.get("scope"), ) # Set spark configuration self.spark = self.set_adls_conf( storage_account=conn_credentials.get("storage_account"), application_id=conn_credentials.get("application_id"), service_credential=service_credential, directory_id=conn_credentials.get("directory_id"), ) elif conn_type == "blob": # Get secret from key vault service_credential = get_secret_from_keyvault( spark=self.spark, key=conn_credentials.get("client_secret"), scope=conn_credentials.get("scope"), ) # Set spark configuration self.spark = self.set_blob_conf( storage_account=conn_credentials.get("storage_account"), service_credential=service_credential ) return self.spark def set_adls_conf( self, storage_account: str, application_id: str, service_credential: str, directory_id: str, ) -> SparkSession: """Set spark configurations necessary to set connection to all adls Gen2 without mounting Args: spark (SparkSession) storage_account (str): azure storage account name application_id (str): azure adls name service_credential (str): adls connection key directory_id (str): azure directory id Returns: spark (SparkSession): spark session with configuration set """ # Configuration for read_table self.spark.conf.set(f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net", "OAuth") self.spark.conf.set( f"fs.azure.account.oauth.provider.type.{storage_account}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider", ) self.spark.conf.set( f"fs.azure.account.oauth2.client.id.{storage_account}.dfs.core.windows.net", f"{application_id}" ) self.spark.conf.set( f"fs.azure.account.oauth2.client.secret.{storage_account}.dfs.core.windows.net", service_credential ) self.spark.conf.set( f"fs.azure.account.oauth2.client.endpoint.{storage_account}.dfs.core.windows.net", f"https://login.microsoftonline.com/{directory_id}/oauth2/token", ) # Configuration for read_file self.spark.conf.set(f"fs.azure.account.key.{storage_account}.dfs.core.windows.net", service_credential) return self.spark def set_blob_conf( self, storage_account: str, service_credential: str, ) -> SparkSession: """Set spark configurations necessary to set connection to blob storage without mounting Args: spark (SparkSession) storage_account (str): azure storage account name service_credential (str): blob connection key Returns: spark (SparkSession): spark session with configuration set """ # Configuration for read_file self.spark.conf.set(f"fs.azure.account.key.{storage_account}.blob.core.windows.net", service_credential) return self.spark
Methods
def configure_all_data_sources_to_spark_session(self) ‑> pyspark.sql.session.SparkSession
-
Set all the conf necessary to data storages connection and return spark session.
Returns
(SparkSession): complete configuration of spark session
def get_dbutils(self)
-
Return dbutils object.
Returns
DBUtils
- dbutils.
def get_spark_session(self) ‑> pyspark.sql.session.SparkSession
-
Return spark session
Returns
SparkSession
- Spark session used to handle computation.
def set_adls_conf(self, storage_account: str, application_id: str, service_credential: str, directory_id: str) ‑> pyspark.sql.session.SparkSession
-
Set spark configurations necessary to set connection to all adls Gen2 without mounting
Args
- spark (SparkSession)
storage_account
:str
- azure storage account name
application_id
:str
- azure adls name
service_credential
:str
- adls connection key
directory_id
:str
- azure directory id
Returns
spark (SparkSession): spark session with configuration set
def set_blob_conf(self, storage_account: str, service_credential: str) ‑> pyspark.sql.session.SparkSession
-
Set spark configurations necessary to set connection to blob storage without mounting
Args
- spark (SparkSession)
storage_account
:str
- azure storage account name
service_credential
:str
- blob connection key
Returns
spark (SparkSession): spark session with configuration set