Module panama.utils.context_manager
Functions
def get_context_manager() ‑> ContextManager
-
Function to get the current context manager. An error is raised if context manager is not set.
Raises
RuntimeError
- Context Manager not set. Please set a context manager before performing any operation.
Returns
ContextManager
- the current context manager.
def set_context_manager(context_manager)
-
Function used to set the new context manager.
Args
context_manager
:ContextManager
- the new context manager
def setup_workflow_sys_path(spark: SparkSession, project: str) ‑> None
-
Set up the sys.path for the workflow otherwise module wouldn't be found. Run this function right before importing anything from project repo
Args
spark
:SparkSession
- The SparkSession object.
project
:str
- The name of the project.
Returns
None
Classes
class ContextManager (spark: SparkSession, project: str, custom_settings: Union[Dict[str, Any], None] = None)
-
A class that manages the context settings for a Spark session.
Args
spark
:SparkSession
- The Spark session.
custom_settings
:Union[Dict[str, Any], None]
, optional- Custom settings to override the default settings. Defaults to None.
Expand source code
class ContextManager: """ A class that manages the context settings for a Spark session. Args: spark (SparkSession): The Spark session. custom_settings (Union[Dict[str, Any], None], optional): Custom settings to override the default settings. Defaults to None. """ def __init__(self, spark: SparkSession, project: str, custom_settings: Union[Dict[str, Any], None] = None): self.spark = self._setup_spark(spark) self.project = project self._setup_sys_path(project) self._get_base_settings(custom_settings) self._set_context_manager() def _set_context_manager(self): set_context_manager(self) @staticmethod def _setup_spark(spark: SparkSession) -> SparkSession: """ Set up the Spark session. Args: spark (SparkSession): The Spark session. Returns: SparkSession: The configured Spark session. """ return SparkUtils(spark).configure_all_data_sources_to_spark_session() def _setup_sys_path(self, project: str): setup_workflow_sys_path(self.spark, project) def _get_additional_params(self): "This method allows to customize the context manager for each project. A class inheriting from ContextManager needs to be implemented and must implement this method to comput project custom params" raise NotImplementedError def _read_settings(self) -> Dict: """ Read the settings from the current notebook bindings. Returns: Dict[str, Any]: The settings. """ dbutils = get_db_utils(self.spark) return dbutils.notebook.entry_point.getCurrentBindings() def _get_base_settings(self, settings: Union[Dict[str, Any], None] = None) -> None: """ Get the base settings. Args: settings (Union[Dict[str, Any], None], optional): Settings to override the notebook bindings. Defaults to None. """ if not settings: settings = self._read_settings() for k, v in settings.items(): setattr(self, k, v) def add_setting(self, param_name: str, param_value: Any) -> None: """ Add a custom setting. Args: param_name (str): The name of the setting. param_value (Any): The value of the setting. """ setattr(self, param_name, param_value) self._set_context_manager() @staticmethod def _is_date(param_value): """ Check if a parameter value is a valid date. Args: param_value: The parameter value to check. Returns: bool: True if the parameter value is a valid date, False otherwise. """ try: parse(param_value, fuzzy=False) return True except ValueError: return False
Methods
def add_setting(self, param_name: str, param_value: Any) ‑> None
-
Add a custom setting.
Args
param_name
:str
- The name of the setting.
param_value
:Any
- The value of the setting.