Module panama.logging.config_run
Classes
class ConfigRun (context_manager: ContextManager)
-
Class used to initialize the data of a job run.
Attributes
dbutils_data
:dict
- data extracted from dbutils. If dbutils is missing, an empty dictionary is generated.
spark
:SparkSession
- current spark session.
Initialize the ConfigRun object. dbutils_data is generated.
Args
spark
:Union[SparkSession, None]
- current spark session. If None, the spark session is automatically fetched. Default is None.
Expand source code
class ConfigRun: """Class used to initialize the data of a job run. Attributes: dbutils_data (dict): data extracted from dbutils. If dbutils is missing, an empty dictionary is generated. spark (SparkSession): current spark session. """ def __init__(self, context_manager: ContextManager): """Initialize the ConfigRun object. dbutils_data is generated. Args: spark (Union[SparkSession, None]): current spark session. If None, the spark session is automatically fetched. Default is None. """ self.context_manager = context_manager self.spark = self.context_manager.spark self._set_dbutils_data() def _set_default(self, with_timestamp: bool = False): try: default = os.path.basename(sys.argv[0]) except: try: default = __file__ except: default = str(uuid.uuid4()) if with_timestamp is True: ts = datetime.astimezone(datetime.now(), tz=timezone("Europe/Rome")).strftime("%Y_%m_%d__%H_%M_%S") default = "@".join([default, ts]) return default def _set_dbutils_data(self): """Method used to try to set the dbutils data. If it fails no error is raised, and dbutils_data are set as empty.""" self.dbutils = get_db_utils(self.spark) try: dbutils_data = self.dbutils.notebook.entry_point.getDbutils().notebook().getContext().toJson() # type: ignore self.dbutils_data = json.loads(dbutils_data)["tags"] except: # (py4j.protocol.Py4JJavaError, NameError, ValueError) as e: print("No dbutils found. Cannot import data from dbutils.") self.dbutils_data = dict() def get_from_dbutils(self, key: str, default: Union[str, None] = None) -> Union[str, None]: """Method used to extract a value from dbutils_data. Args: key (str): name of the parameter to extract. default (Union[str, None], optional): default value if no parameter is found. Defaults to None. Returns: Union[str, None]: content of dbutils_data for the required key. """ return self.dbutils_data.get(key, default) def get_from_databricks_conf(self, key: str, default: Union[str, None] = None) -> Union[str, None]: """Method used to extract a value from spark configuration. Args: key (str): name of the parameter to extract. default (Union[str, None], optional): default value if no parameter is found. Defaults to None. Returns: Union[str, None]: content of spark configuration for the required key. """ return self.spark.conf.get(key, default) def get_job_id(self) -> Union[str, None]: """Method used to get the job_id from the dbutils_data. if no job_id is found, returns current notebook name if dbutils is available, otherwise current file name. If also current file name is not available, a uuid4 is generated. Returns: Union[str, None]: value of jobId """ default = self._set_default() return self.get_from_dbutils("jobId", default) def get_job_run_id(self) -> Union[str, None]: """Method used to get the job_run_id from the dbutils_data. If no job_run_id is found, returns current notebook name if dbutils is available, otherwise current file name. If also current file name is not available, a uuid4 is generated. Returns: Union[str, None]: value of jobId """ default = self._set_default(with_timestamp=True) return self.get_from_dbutils("multitaskParentRunId", default) def get_task_id(self) -> Union[str, None]: """Method used to get the task_id from the dbutils_data. If no job_run_id is found, returns current notebook name if dbutils is available, otherwise current file name. If also current file name is not available, a uuid4 is generated. Returns: Union[str, None]: value of jobId """ default = self._set_default() return self.get_from_dbutils("runId", default) def _get_workspace_url_root(self, default: Union[str, None] = None) -> Union[str, None]: """Method used to get the workspace url root. Args: default (Union[str, None], optional): default value if no value is found.. Defaults to None. Returns: Union[str, None]: root for the notebook link. """ workspace_url_root = self.get_from_dbutils("browserHostName", default) if workspace_url_root is None: workspace_url_root = self.get_from_databricks_conf("spark.databricks.workspaceUrl", default) return workspace_url_root def get_workspace(self) -> Union[str, None]: """Method used to get the org_id from the dbutils_data. Returns None if no org_id is found. Returns: Union[str, None]: value of org_id """ return self.get_from_dbutils("orgId") def get_job_name(self) -> Union[str, None]: """Method used to get the current job_name from dbutils_data. Returns None if no jobName is found. Returns: Union[str, None]: value of job_name """ return self.get_from_dbutils("jobName") def get_task_name(self) -> str: """Method used to get the current task_name from notebook info. Returns sys.argv[0] as default if an error is raised, otherwise the notebook path. Returns: str: value of the taskName """ try: path = self.dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get() # type: ignore except: path = sys.argv[0] default = os.path.basename(path).split(".")[0] return self.get_from_dbutils("taskName", default) # type: ignore def get_widgets(self) -> Mapping[str, object]: """Method used to get the widgets content. Returns: Union[str, None]: string of the widgets, repr of a dictionary. """ widgets = self.context_manager.__dict__ # remove the spark attribute widgets = {k: v for k, v in widgets.items() if k != "spark"} return widgets def get_url(self) -> str: """Method used to generate an url to a notebook. Args: job_id (str): id of the current job. task_id (str): id of the current task Returns: str: generated url. Id no url is generated, the string 'No job run associated' is returned. """ url_root = self._get_workspace_url_root() workspace = self.get_workspace() if url_root is not None: url = f"https://{url_root}?o={workspace}#job/{self.job_id}/run/{self.task_id}" else: url = "No job run associated" return url def generate_config(self): """Method used to generate a default job run configuration. The following attributes are set: job_id str job_run_id str task_id str job_name Union[str, None] task_name Union[str, None] url str widgets str defaults Dict[str, str] """ self.job_id = self.get_job_id() self.job_run_id = self.get_job_run_id() self.task_id = self.get_task_id() self.job_name = self.get_job_name() self.task_name = self.get_task_name() self.url = self.get_url() self.widgets = self.get_widgets() self.defaults = {"job_id": self.job_id, "job_run_id": self.job_run_id, "task_id": self.task_id} return self
Methods
def generate_config(self)
-
Method used to generate a default job run configuration.
The following attributes are set:
job_id str job_run_id str task_id str job_name Union[str, None] task_name Union[str, None] url str widgets str defaults Dict[str, str]
def get_from_databricks_conf(self, key: str, default: Optional[str] = None) ‑> Optional[str]
-
Method used to extract a value from spark configuration.
Args
key
:str
- name of the parameter to extract.
default
:Union[str, None]
, optional- default value if no parameter is found. Defaults to None.
Returns
Union[str, None]
- content of spark configuration for the required key.
def get_from_dbutils(self, key: str, default: Optional[str] = None) ‑> Optional[str]
-
Method used to extract a value from dbutils_data.
Args
key
:str
- name of the parameter to extract.
default
:Union[str, None]
, optional- default value if no parameter is found. Defaults to None.
Returns
Union[str, None]
- content of dbutils_data for the required key.
def get_job_id(self) ‑> Optional[str]
-
Method used to get the job_id from the dbutils_data. if no job_id is found, returns current notebook name if dbutils is available, otherwise current file name. If also current file name is not available, a uuid4 is generated.
Returns
Union[str, None]
- value of jobId
def get_job_name(self) ‑> Optional[str]
-
Method used to get the current job_name from dbutils_data. Returns None if no jobName is found.
Returns
Union[str, None]
- value of job_name
def get_job_run_id(self) ‑> Optional[str]
-
Method used to get the job_run_id from the dbutils_data. If no job_run_id is found, returns current notebook name if dbutils is available, otherwise current file name. If also current file name is not available, a uuid4 is generated.
Returns
Union[str, None]
- value of jobId
def get_task_id(self) ‑> Optional[str]
-
Method used to get the task_id from the dbutils_data. If no job_run_id is found, returns current notebook name if dbutils is available, otherwise current file name. If also current file name is not available, a uuid4 is generated.
Returns
Union[str, None]
- value of jobId
def get_task_name(self) ‑> str
-
Method used to get the current task_name from notebook info. Returns sys.argv[0] as default if an error is raised, otherwise the notebook path.
Returns
str
- value of the taskName
def get_url(self) ‑> str
-
Method used to generate an url to a notebook.
Args
job_id
:str
- id of the current job.
task_id
:str
- id of the current task
Returns
str
- generated url. Id no url is generated, the string 'No job run associated' is returned.
def get_widgets(self) ‑> Mapping[str, object]
-
Method used to get the widgets content.
Returns
Union[str, None]
- string of the widgets, repr of a dictionary.
def get_workspace(self) ‑> Optional[str]
-
Method used to get the org_id from the dbutils_data. Returns None if no org_id is found.
Returns
Union[str, None]
- value of org_id