Module panama.utils.context_manager

Functions

def get_context_manager() ‑> ContextManager

Function to get the current context manager. An error is raised if context manager is not set.

Raises

RuntimeError
Context Manager not set. Please set a context manager before performing any operation.

Returns

ContextManager
the current context manager.
def set_context_manager(context_manager)

Function used to set the new context manager.

Args

context_manager : ContextManager
the new context manager
def setup_workflow_sys_path(spark: SparkSession, project: str) ‑> None

Set up the sys.path for the workflow otherwise module wouldn't be found. Run this function right before importing anything from project repo

Args

spark : SparkSession
The SparkSession object.
project : str
The name of the project.

Returns

None

Classes

class ContextManager (spark: SparkSession, project: str, custom_settings: Union[Dict[str, Any], None] = None)

A class that manages the context settings for a Spark session.

Args

spark : SparkSession
The Spark session.
custom_settings : Union[Dict[str, Any], None], optional
Custom settings to override the default settings. Defaults to None.
Expand source code
class ContextManager:
    """
    A class that manages the context settings for a Spark session.

    Args:
        spark (SparkSession): The Spark session.
        custom_settings (Union[Dict[str, Any], None], optional): Custom settings to override the default settings. Defaults to None.
    """

    def __init__(self, spark: SparkSession, project: str, custom_settings: Union[Dict[str, Any], None] = None):
        self.spark = self._setup_spark(spark)
        self.project = project
        self._setup_sys_path(project)
        self._get_base_settings(custom_settings)
        self._set_context_manager()

    def _set_context_manager(self):
        set_context_manager(self)

    @staticmethod
    def _setup_spark(spark: SparkSession) -> SparkSession:
        """
        Set up the Spark session.

        Args:
            spark (SparkSession): The Spark session.

        Returns:
            SparkSession: The configured Spark session.
        """
        return SparkUtils(spark).configure_all_data_sources_to_spark_session()

    def _setup_sys_path(self, project: str):
        setup_workflow_sys_path(self.spark, project)

    def _get_additional_params(self):
        "This method allows to customize the context manager for each project. A class inheriting from ContextManager needs to be implemented and must implement this method to comput project custom params"
        raise NotImplementedError

    def _read_settings(self) -> Dict:
        """
        Read the settings from the current notebook bindings.

        Returns:
            Dict[str, Any]: The settings.
        """
        dbutils = get_db_utils(self.spark)
        return dbutils.notebook.entry_point.getCurrentBindings()

    def _get_base_settings(self, settings: Union[Dict[str, Any], None] = None) -> None:
        """
        Get the base settings.

        Args:
            settings (Union[Dict[str, Any], None], optional): Settings to override the notebook bindings. Defaults to None.
        """
        if not settings:
            settings = self._read_settings()
        for k, v in settings.items():
            setattr(self, k, v)

    def add_setting(self, param_name: str, param_value: Any) -> None:
        """
        Add a custom setting.

        Args:
            param_name (str): The name of the setting.
            param_value (Any): The value of the setting.
        """
        setattr(self, param_name, param_value)
        self._set_context_manager()

    @staticmethod
    def _is_date(param_value):
        """
        Check if a parameter value is a valid date.

        Args:
            param_value: The parameter value to check.

        Returns:
            bool: True if the parameter value is a valid date, False otherwise.
        """
        try:
            parse(param_value, fuzzy=False)
            return True
        except ValueError:
            return False

Methods

def add_setting(self, param_name: str, param_value: Any) ‑> None

Add a custom setting.

Args

param_name : str
The name of the setting.
param_value : Any
The value of the setting.