Module panama.utils.context_manager

Functions

def get_context_manager() ‑> ContextManager
Expand source code
def get_context_manager() -> ContextManager:
    """Function to get the current context manager.
    An error is raised if context manager is not set.

    Raises:
        RuntimeError: Context Manager not set. Please set a context manager before performing any operation.

    Returns:
        ContextManager: the current context manager.
    """
    cm = _CONTEXT_MANAGER
    if cm is None:
        raise RuntimeError("Context Manager not set. Please set a context manager before performing any operation.")
    return cm

Function to get the current context manager. An error is raised if context manager is not set.

Raises

RuntimeError
Context Manager not set. Please set a context manager before performing any operation.

Returns

ContextManager
the current context manager.
def set_context_manager(context_manager)
Expand source code
def set_context_manager(context_manager):
    """Function used to set the new context manager.

    Args:
        context_manager (ContextManager): the new context manager
    """
    global _CONTEXT_MANAGER
    _CONTEXT_MANAGER = context_manager

Function used to set the new context manager.

Args

context_manager : ContextManager
the new context manager
def setup_workflow_sys_path(spark: SparkSession, project: str) ‑> None
Expand source code
def setup_workflow_sys_path(spark: SparkSession, project: str) -> None:
    """Set up the sys.path for the workflow otherwise module wouldn't  be found.
    Run this function right before importing anything from project repo

    Args:
        spark (SparkSession): The SparkSession object.
        project (str): The name of the project.

    Returns:
        None
    """
    dbutils = get_db_utils(spark)
    if runtime() == "databricks":
        # handle the weird case of notebookPath() being empty
        # when launching from visual studio code???
        # ask databricks maybe
        try:
            nb_path = dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get()  # type: ignore
        except:
            nb_path = None
        if nb_path is not None:
            is_file_in_repo = nb_path[1:].split("/", 1)[0].lower() == "repos"

            # If the notebook is not in the repo, we are in the workspace
            if not is_file_in_repo:

                # Add path to sys.path
                if "TEST_AREA" in nb_path:  # If in test area add TEST_AREA to shared path
                    sys.path.append(f"/Workspace/Shared/{project.upper()}/TEST_AREA")
                else:  # Else add standard just add shared path
                    sys.path.append(f"/Workspace/Shared/{project.upper()}")

Set up the sys.path for the workflow otherwise module wouldn't be found. Run this function right before importing anything from project repo

Args

spark : SparkSession
The SparkSession object.
project : str
The name of the project.

Returns

None

Classes

class ContextManager (spark: SparkSession,
project: str,
custom_settings: Union[Dict[str, Any], None] = None)
Expand source code
class ContextManager:
    """
    A class that manages the context settings for a Spark session.

    Args:
        spark (SparkSession): The Spark session.
        custom_settings (Union[Dict[str, Any], None], optional): Custom settings to override the default settings. Defaults to None.
    """

    def __init__(self, spark: SparkSession, project: str, custom_settings: Union[Dict[str, Any], None] = None):
        self.spark = self._setup_spark(spark)
        self.project = project
        self._setup_sys_path(project)
        self._get_base_settings(custom_settings)
        self._set_context_manager()

    def _set_context_manager(self):
        set_context_manager(self)

    @staticmethod
    def _setup_spark(spark: SparkSession) -> SparkSession:
        """
        Set up the Spark session.

        Args:
            spark (SparkSession): The Spark session.

        Returns:
            SparkSession: The configured Spark session.
        """
        return SparkUtils(spark).configure_all_data_sources_to_spark_session()

    def _setup_sys_path(self, project: str):
        setup_workflow_sys_path(self.spark, project)

    def _get_additional_params(self):
        "This method allows to customize the context manager for each project. A class inheriting from ContextManager needs to be implemented and must implement this method to comput project custom params"
        raise NotImplementedError

    def _read_settings(self) -> Dict:
        """
        Read the settings from the current notebook bindings.

        Returns:
            Dict[str, Any]: The settings.
        """
        dbutils = get_db_utils(self.spark)
        return dbutils.notebook.entry_point.getCurrentBindings()

    def _get_base_settings(self, settings: Union[Dict[str, Any], None] = None) -> None:
        """
        Get the base settings.

        Args:
            settings (Union[Dict[str, Any], None], optional): Settings to override the notebook bindings. Defaults to None.
        """
        if not settings:
            settings = self._read_settings()
        for k, v in settings.items():
            setattr(self, k, v)

    def add_setting(self, param_name: str, param_value: Any) -> None:
        """
        Add a custom setting.

        Args:
            param_name (str): The name of the setting.
            param_value (Any): The value of the setting.
        """
        setattr(self, param_name, param_value)
        self._set_context_manager()

    @staticmethod
    def _is_date(param_value):
        """
        Check if a parameter value is a valid date.

        Args:
            param_value: The parameter value to check.

        Returns:
            bool: True if the parameter value is a valid date, False otherwise.
        """
        try:
            parse(param_value, fuzzy=False)
            return True
        except ValueError:
            return False

A class that manages the context settings for a Spark session.

Args

spark : SparkSession
The Spark session.
custom_settings : Union[Dict[str, Any], None], optional
Custom settings to override the default settings. Defaults to None.

Methods

def add_setting(self, param_name: str, param_value: Any) ‑> None
Expand source code
def add_setting(self, param_name: str, param_value: Any) -> None:
    """
    Add a custom setting.

    Args:
        param_name (str): The name of the setting.
        param_value (Any): The value of the setting.
    """
    setattr(self, param_name, param_value)
    self._set_context_manager()

Add a custom setting.

Args

param_name : str
The name of the setting.
param_value : Any
The value of the setting.