Module panama.logging.config_run

Classes

class ConfigRun (context_manager: ContextManager)

Class used to initialize the data of a job run.

Attributes

dbutils_data : dict
data extracted from dbutils. If dbutils is missing, an empty dictionary is generated.
spark : SparkSession
current spark session.

Initialize the ConfigRun object. dbutils_data is generated.

Args

spark : Union[SparkSession, None]
current spark session. If None, the spark session is automatically fetched. Default is None.
Expand source code
class ConfigRun:
    """Class used to initialize the data of a job run.

    Attributes:
        dbutils_data (dict): data extracted from dbutils. If dbutils is missing, an empty dictionary is generated.
        spark (SparkSession): current spark session.
    """

    def __init__(self, context_manager: ContextManager):
        """Initialize the ConfigRun object. dbutils_data is generated.

        Args:
            spark (Union[SparkSession, None]): current spark session. If None, the spark session is automatically fetched. Default is None.
        """
        self.context_manager = context_manager
        self.spark = self.context_manager.spark
        self._set_dbutils_data()

    def _set_default(self, with_timestamp: bool = False):
        try:
            default = os.path.basename(sys.argv[0])
        except:
            try:
                default = __file__
            except:
                default = str(uuid.uuid4())

        if with_timestamp is True:
            ts = datetime.astimezone(datetime.now(), tz=timezone("Europe/Rome")).strftime("%Y_%m_%d__%H_%M_%S")
            default = "@".join([default, ts])

        return default

    def _set_dbutils_data(self):
        """Method used to try to set the dbutils data. If it fails no error is raised, and dbutils_data are set as empty."""

        self.dbutils = get_db_utils(self.spark)

        try:
            dbutils_data = self.dbutils.notebook.entry_point.getDbutils().notebook().getContext().toJson()  # type: ignore
            self.dbutils_data = json.loads(dbutils_data)["tags"]
        except:  #  (py4j.protocol.Py4JJavaError, NameError, ValueError) as e:
            print("No dbutils found. Cannot import data from dbutils.")
            self.dbutils_data = dict()

    def get_from_dbutils(self, key: str, default: Union[str, None] = None) -> Union[str, None]:
        """Method used to extract a value from dbutils_data.

        Args:
            key (str): name of the parameter to extract.
            default (Union[str, None], optional): default value if no parameter is found. Defaults to None.

        Returns:
            Union[str, None]: content of dbutils_data for the required key.
        """
        return self.dbutils_data.get(key, default)

    def get_from_databricks_conf(self, key: str, default: Union[str, None] = None) -> Union[str, None]:
        """Method used to extract a value from spark configuration.

        Args:
            key (str): name of the parameter to extract.
            default (Union[str, None], optional): default value if no parameter is found. Defaults to None.

        Returns:
            Union[str, None]: content of spark configuration for the required key.
        """
        return self.spark.conf.get(key, default)

    def get_job_id(self) -> Union[str, None]:
        """Method used to get the job_id from the dbutils_data.  if no job_id is found, returns current notebook name if dbutils is available, otherwise current file name.
        If also current file name is not available, a uuid4 is generated.

        Returns:
            Union[str, None]: value of jobId
        """
        default = self._set_default()
        return self.get_from_dbutils("jobId", default)

    def get_job_run_id(self) -> Union[str, None]:
        """Method used to get the job_run_id from the dbutils_data. If no job_run_id is found, returns current notebook name if dbutils is available, otherwise current file name.
        If also current file name is not available, a uuid4 is generated.

        Returns:
            Union[str, None]: value of jobId
        """
        default = self._set_default(with_timestamp=True)
        return self.get_from_dbutils("multitaskParentRunId", default)

    def get_task_id(self) -> Union[str, None]:
        """Method used to get the task_id from the dbutils_data. If no job_run_id is found, returns current notebook name if dbutils is available, otherwise current file name.
        If also current file name is not available, a uuid4 is generated.

        Returns:
            Union[str, None]: value of jobId
        """
        default = self._set_default()
        return self.get_from_dbutils("runId", default)

    def _get_workspace_url_root(self, default: Union[str, None] = None) -> Union[str, None]:
        """Method used to get the workspace url root.

        Args:
            default (Union[str, None], optional): default value if no value is found.. Defaults to None.

        Returns:
            Union[str, None]: root for the notebook link.
        """
        workspace_url_root = self.get_from_dbutils("browserHostName", default)
        if workspace_url_root is None:
            workspace_url_root = self.get_from_databricks_conf("spark.databricks.workspaceUrl", default)
        return workspace_url_root

    def get_workspace(self) -> Union[str, None]:
        """Method used to get the org_id from the dbutils_data. Returns None if no org_id is found.

        Returns:
            Union[str, None]: value of org_id
        """
        return self.get_from_dbutils("orgId")

    def get_job_name(self) -> Union[str, None]:
        """Method used to get the current job_name from dbutils_data. Returns None if no jobName is found.

        Returns:
            Union[str, None]: value of job_name
        """
        return self.get_from_dbutils("jobName")

    def get_task_name(self) -> str:
        """Method used to get the current task_name from notebook info. Returns sys.argv[0] as default if an error is raised, otherwise the notebook path.

        Returns:
            str: value of the taskName
        """

        try:
            path = self.dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get()  # type: ignore
        except:
            path = sys.argv[0]
        default = os.path.basename(path).split(".")[0]
        return self.get_from_dbutils("taskName", default)  # type: ignore

    def get_widgets(self) -> Mapping[str, object]:
        """Method used to get the widgets content.

        Returns:
            Union[str, None]: string of the widgets, repr of a dictionary.
        """
        widgets = self.context_manager.__dict__
        # remove the spark attribute
        widgets = {k: v for k, v in widgets.items() if k != "spark"}
        return widgets

    def get_url(self) -> str:
        """Method used to generate an url to a notebook.

        Args:
            job_id (str): id of the current job.
            task_id (str): id of the current task

        Returns:
            str: generated url. Id no url is generated, the string 'No job run associated' is returned.
        """
        url_root = self._get_workspace_url_root()
        workspace = self.get_workspace()
        if url_root is not None:
            url = f"https://{url_root}?o={workspace}#job/{self.job_id}/run/{self.task_id}"
        else:
            url = "No job run associated"
        return url

    def generate_config(self):
        """Method used to generate a default job run configuration.

        The following attributes are set:

            job_id str
            job_run_id str
            task_id str
            job_name Union[str, None]
            task_name Union[str, None]
            url str
            widgets str
            defaults Dict[str, str]
        """
        self.job_id = self.get_job_id()
        self.job_run_id = self.get_job_run_id()
        self.task_id = self.get_task_id()

        self.job_name = self.get_job_name()
        self.task_name = self.get_task_name()

        self.url = self.get_url()
        self.widgets = self.get_widgets()

        self.defaults = {"job_id": self.job_id, "job_run_id": self.job_run_id, "task_id": self.task_id}

        return self

Methods

def generate_config(self)

Method used to generate a default job run configuration.

The following attributes are set:

job_id str
job_run_id str
task_id str
job_name Union[str, None]
task_name Union[str, None]
url str
widgets str
defaults Dict[str, str]
def get_from_databricks_conf(self, key: str, default: Optional[str] = None) ‑> Optional[str]

Method used to extract a value from spark configuration.

Args

key : str
name of the parameter to extract.
default : Union[str, None], optional
default value if no parameter is found. Defaults to None.

Returns

Union[str, None]
content of spark configuration for the required key.
def get_from_dbutils(self, key: str, default: Optional[str] = None) ‑> Optional[str]

Method used to extract a value from dbutils_data.

Args

key : str
name of the parameter to extract.
default : Union[str, None], optional
default value if no parameter is found. Defaults to None.

Returns

Union[str, None]
content of dbutils_data for the required key.
def get_job_id(self) ‑> Optional[str]

Method used to get the job_id from the dbutils_data. if no job_id is found, returns current notebook name if dbutils is available, otherwise current file name. If also current file name is not available, a uuid4 is generated.

Returns

Union[str, None]
value of jobId
def get_job_name(self) ‑> Optional[str]

Method used to get the current job_name from dbutils_data. Returns None if no jobName is found.

Returns

Union[str, None]
value of job_name
def get_job_run_id(self) ‑> Optional[str]

Method used to get the job_run_id from the dbutils_data. If no job_run_id is found, returns current notebook name if dbutils is available, otherwise current file name. If also current file name is not available, a uuid4 is generated.

Returns

Union[str, None]
value of jobId
def get_task_id(self) ‑> Optional[str]

Method used to get the task_id from the dbutils_data. If no job_run_id is found, returns current notebook name if dbutils is available, otherwise current file name. If also current file name is not available, a uuid4 is generated.

Returns

Union[str, None]
value of jobId
def get_task_name(self) ‑> str

Method used to get the current task_name from notebook info. Returns sys.argv[0] as default if an error is raised, otherwise the notebook path.

Returns

str
value of the taskName
def get_url(self) ‑> str

Method used to generate an url to a notebook.

Args

job_id : str
id of the current job.
task_id : str
id of the current task

Returns

str
generated url. Id no url is generated, the string 'No job run associated' is returned.
def get_widgets(self) ‑> Mapping[str, object]

Method used to get the widgets content.

Returns

Union[str, None]
string of the widgets, repr of a dictionary.
def get_workspace(self) ‑> Optional[str]

Method used to get the org_id from the dbutils_data. Returns None if no org_id is found.

Returns

Union[str, None]
value of org_id