Module panama.logging
Sub-modules
panama.logging.config_run
panama.logging.decorators
panama.logging.delta_handler
panama.logging.file_handler
panama.logging.logger
panama.logging.messages
Functions
def get_adls_name() ‑> str
-
Get the adls name.
Returns
str
- current log adls name.
def get_container() ‑> str
-
Get the container name.
Returns
str
- current log container name.
def get_delta_folder() ‑> str
-
Get the delta folder.
Returns
str
- current delta log folder.
def get_json_folder() ‑> str
-
Get the json folder.
Returns
str
- current json log folder.
def get_logger(context_manager: Optional[ContextManager] = None, name: Optional[str] = None, level: int = 20, delta_path: Optional[str] = None, json_path: Optional[str] = None, json_remote_path: Optional[str] = None) ‑> PanamaAdapter
-
Main function used to get a panama logger.
Args
context_manager
:Union[ContextManager, None]
, optional- ContextManager holding the contextual information. If None
- a default context manager is created, assuming a spark session is active. Defaults to None.
name
:Union[str, None]
, optional- logger name. If None, the root logger is used. Defaults to None.
level
:int
, optional- log level. Defaults to logging.INFO.
delta_path
:Union[str, None]
, optional- path to delta folder. If None a default value is generated. Defaults to None.
json_path
:Union[str, None]
, optional- path to json folder. If None a default value is generated. Defaults to None.
Raises
ValueError
- if no context_manager is passed and no spark session is active, a ValueError is raised.
Returns
PanamaAdapter
- panama logger with the required handlers.
def log_execution(logger_name: Optional[str] = None, level: int = 20, blocking: bool = True)
-
Decorator function, used to retrieve the logger by logger name and log the operations of a function.
Args
logger_name
:Union[str, None]
, optional- name of the logger to retrieve. If None, the root logger is used. Defaults to None.
level
:int
, optional- level of the log step. Defaults to panama_log.INFO.
blocking
:bool
, optional- flag that indicates if the task must be closed and the error raised. Defaults to True.
def set_adls_name(adls_name: str) ‑> None
-
Set the adls name.
Args
adls
:str
- adls name.
def set_container(container: str) ‑> None
-
Set the container name.
Args
container
:str
- container name.
def set_delta_folder(folder: str) ‑> None
-
Set the delta folder name.
Args
folder
:str
- delta folder name.
def set_json_folder(folder: str) ‑> None
-
Set the json folder name.
Args
folder
:str
- json folder name.
Classes
class ConfigRun (context_manager: ContextManager)
-
Class used to initialize the data of a job run.
Attributes
dbutils_data
:dict
- data extracted from dbutils. If dbutils is missing, an empty dictionary is generated.
spark
:SparkSession
- current spark session.
Initialize the ConfigRun object. dbutils_data is generated.
Args
spark
:Union[SparkSession, None]
- current spark session. If None, the spark session is automatically fetched. Default is None.
Expand source code
class ConfigRun: """Class used to initialize the data of a job run. Attributes: dbutils_data (dict): data extracted from dbutils. If dbutils is missing, an empty dictionary is generated. spark (SparkSession): current spark session. """ def __init__(self, context_manager: ContextManager): """Initialize the ConfigRun object. dbutils_data is generated. Args: spark (Union[SparkSession, None]): current spark session. If None, the spark session is automatically fetched. Default is None. """ self.context_manager = context_manager self.spark = self.context_manager.spark self._set_dbutils_data() def _set_default(self, with_timestamp: bool = False): try: default = os.path.basename(sys.argv[0]) except: try: default = __file__ except: default = str(uuid.uuid4()) if with_timestamp is True: ts = datetime.astimezone(datetime.now(), tz=timezone("Europe/Rome")).strftime("%Y_%m_%d__%H_%M_%S") default = "@".join([default, ts]) return default def _set_dbutils_data(self): """Method used to try to set the dbutils data. If it fails no error is raised, and dbutils_data are set as empty.""" self.dbutils = get_db_utils(self.spark) try: dbutils_data = self.dbutils.notebook.entry_point.getDbutils().notebook().getContext().toJson() # type: ignore self.dbutils_data = json.loads(dbutils_data)["tags"] except: # (py4j.protocol.Py4JJavaError, NameError, ValueError) as e: print("No dbutils found. Cannot import data from dbutils.") self.dbutils_data = dict() def get_from_dbutils(self, key: str, default: Union[str, None] = None) -> Union[str, None]: """Method used to extract a value from dbutils_data. Args: key (str): name of the parameter to extract. default (Union[str, None], optional): default value if no parameter is found. Defaults to None. Returns: Union[str, None]: content of dbutils_data for the required key. """ return self.dbutils_data.get(key, default) def get_from_databricks_conf(self, key: str, default: Union[str, None] = None) -> Union[str, None]: """Method used to extract a value from spark configuration. Args: key (str): name of the parameter to extract. default (Union[str, None], optional): default value if no parameter is found. Defaults to None. Returns: Union[str, None]: content of spark configuration for the required key. """ return self.spark.conf.get(key, default) def get_job_id(self) -> Union[str, None]: """Method used to get the job_id from the dbutils_data. if no job_id is found, returns current notebook name if dbutils is available, otherwise current file name. If also current file name is not available, a uuid4 is generated. Returns: Union[str, None]: value of jobId """ default = self._set_default() return self.get_from_dbutils("jobId", default) def get_job_run_id(self) -> Union[str, None]: """Method used to get the job_run_id from the dbutils_data. If no job_run_id is found, returns current notebook name if dbutils is available, otherwise current file name. If also current file name is not available, a uuid4 is generated. Returns: Union[str, None]: value of jobId """ default = self._set_default(with_timestamp=True) return self.get_from_dbutils("multitaskParentRunId", default) def get_task_id(self) -> Union[str, None]: """Method used to get the task_id from the dbutils_data. If no job_run_id is found, returns current notebook name if dbutils is available, otherwise current file name. If also current file name is not available, a uuid4 is generated. Returns: Union[str, None]: value of jobId """ default = self._set_default() return self.get_from_dbutils("runId", default) def _get_workspace_url_root(self, default: Union[str, None] = None) -> Union[str, None]: """Method used to get the workspace url root. Args: default (Union[str, None], optional): default value if no value is found.. Defaults to None. Returns: Union[str, None]: root for the notebook link. """ workspace_url_root = self.get_from_dbutils("browserHostName", default) if workspace_url_root is None: workspace_url_root = self.get_from_databricks_conf("spark.databricks.workspaceUrl", default) return workspace_url_root def get_workspace(self) -> Union[str, None]: """Method used to get the org_id from the dbutils_data. Returns None if no org_id is found. Returns: Union[str, None]: value of org_id """ return self.get_from_dbutils("orgId") def get_job_name(self) -> Union[str, None]: """Method used to get the current job_name from dbutils_data. Returns None if no jobName is found. Returns: Union[str, None]: value of job_name """ return self.get_from_dbutils("jobName") def get_task_name(self) -> str: """Method used to get the current task_name from notebook info. Returns sys.argv[0] as default if an error is raised, otherwise the notebook path. Returns: str: value of the taskName """ try: path = self.dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get() # type: ignore except: path = sys.argv[0] default = os.path.basename(path).split(".")[0] return self.get_from_dbutils("taskName", default) # type: ignore def get_widgets(self) -> Mapping[str, object]: """Method used to get the widgets content. Returns: Union[str, None]: string of the widgets, repr of a dictionary. """ widgets = self.context_manager.__dict__ # remove the spark attribute widgets = {k: v for k, v in widgets.items() if k != "spark"} return widgets def get_url(self) -> str: """Method used to generate an url to a notebook. Args: job_id (str): id of the current job. task_id (str): id of the current task Returns: str: generated url. Id no url is generated, the string 'No job run associated' is returned. """ url_root = self._get_workspace_url_root() workspace = self.get_workspace() if url_root is not None: url = f"https://{url_root}?o={workspace}#job/{self.job_id}/run/{self.task_id}" else: url = "No job run associated" return url def generate_config(self): """Method used to generate a default job run configuration. The following attributes are set: job_id str job_run_id str task_id str job_name Union[str, None] task_name Union[str, None] url str widgets str defaults Dict[str, str] """ self.job_id = self.get_job_id() self.job_run_id = self.get_job_run_id() self.task_id = self.get_task_id() self.job_name = self.get_job_name() self.task_name = self.get_task_name() self.url = self.get_url() self.widgets = self.get_widgets() self.defaults = {"job_id": self.job_id, "job_run_id": self.job_run_id, "task_id": self.task_id} return self
Methods
def generate_config(self)
-
Method used to generate a default job run configuration.
The following attributes are set:
job_id str job_run_id str task_id str job_name Union[str, None] task_name Union[str, None] url str widgets str defaults Dict[str, str]
def get_from_databricks_conf(self, key: str, default: Optional[str] = None) ‑> Optional[str]
-
Method used to extract a value from spark configuration.
Args
key
:str
- name of the parameter to extract.
default
:Union[str, None]
, optional- default value if no parameter is found. Defaults to None.
Returns
Union[str, None]
- content of spark configuration for the required key.
def get_from_dbutils(self, key: str, default: Optional[str] = None) ‑> Optional[str]
-
Method used to extract a value from dbutils_data.
Args
key
:str
- name of the parameter to extract.
default
:Union[str, None]
, optional- default value if no parameter is found. Defaults to None.
Returns
Union[str, None]
- content of dbutils_data for the required key.
def get_job_id(self) ‑> Optional[str]
-
Method used to get the job_id from the dbutils_data. if no job_id is found, returns current notebook name if dbutils is available, otherwise current file name. If also current file name is not available, a uuid4 is generated.
Returns
Union[str, None]
- value of jobId
def get_job_name(self) ‑> Optional[str]
-
Method used to get the current job_name from dbutils_data. Returns None if no jobName is found.
Returns
Union[str, None]
- value of job_name
def get_job_run_id(self) ‑> Optional[str]
-
Method used to get the job_run_id from the dbutils_data. If no job_run_id is found, returns current notebook name if dbutils is available, otherwise current file name. If also current file name is not available, a uuid4 is generated.
Returns
Union[str, None]
- value of jobId
def get_task_id(self) ‑> Optional[str]
-
Method used to get the task_id from the dbutils_data. If no job_run_id is found, returns current notebook name if dbutils is available, otherwise current file name. If also current file name is not available, a uuid4 is generated.
Returns
Union[str, None]
- value of jobId
def get_task_name(self) ‑> str
-
Method used to get the current task_name from notebook info. Returns sys.argv[0] as default if an error is raised, otherwise the notebook path.
Returns
str
- value of the taskName
def get_url(self) ‑> str
-
Method used to generate an url to a notebook.
Args
job_id
:str
- id of the current job.
task_id
:str
- id of the current task
Returns
str
- generated url. Id no url is generated, the string 'No job run associated' is returned.
def get_widgets(self) ‑> Mapping[str, object]
-
Method used to get the widgets content.
Returns
Union[str, None]
- string of the widgets, repr of a dictionary.
def get_workspace(self) ‑> Optional[str]
-
Method used to get the org_id from the dbutils_data. Returns None if no org_id is found.
Returns
Union[str, None]
- value of org_id
class PanamaAdapter (logger: logging.Logger, config_run: ConfigRun)
-
Panama adapter to add contextual information and create the methods start_task, end_task and log_step.
Initialize adapter.
Args
logger
:logging.Logger
- underlying logger object.
config_run
:ConfigRun
- config run, with default data.
Expand source code
class PanamaAdapter(logging.LoggerAdapter): """Panama adapter to add contextual information and create the methods start_task, end_task and log_step.""" def __init__(self, logger: logging.Logger, config_run: ConfigRun): """Initialize adapter. Args: logger (logging.Logger): underlying logger object. config_run (ConfigRun): config run, with default data. """ self.config_run = config_run super().__init__(logger=logger, extra=config_run.widgets) def process(self, msg, kwargs): """Add to extra the required information. Deals with trace parameter. Args: msg (str): logged message content. kwargs (dict): log call arguments. Returns: (str, dict): msg and kwargs handled. """ forbidden = ["exc_info", "stack_info", "logger"] extra = {k: str(v) for k, v in kwargs.items() if k not in forbidden} # add trace kwarg to extra if isinstance(kwargs.get("exc_info"), str): extra["trace"] = kwargs["exc_info"] kwargs["extra"] = extra # remove trace from kwargs, as it has been added to extras kwargs.pop("trace", None) return msg, kwargs def move_json_to_remote(self): """Method used to move logs to remote storage.""" # get task name from config_run task_name = self.config_run.task_name # get logger_dict from manager logger_dict = self.logger.manager.loggerDict.items() # keep panama loggers related to the task loggers = [v for k, v in logger_dict if k.startswith(f"PANAMA_{task_name}")] # get file handlers for every logger and stash the file for l in loggers: for h in l.logger.handlers: if isinstance(h, PanamaFileHandler): h.move_to_remote() def start_task(self, **kwargs): """Method used to open a task record.""" self.log(level=99, msg="RUNNING", extra=kwargs, stack_info=False) def end_task(self, trace: Union[Exception, None] = None, move_file: bool = True): """Method used to close a task record.""" if trace is None: self.log(level=100, msg="OK") else: self.log(level=100, msg="KO", exc_info=trace) # wait a bit if some files have not been closed time.sleep(10) if move_file is True: # move task json to remote self.move_json_to_remote() def log_step(self, step_id: str, level: int = logging.INFO, trace: Union[Exception, str, None] = None, **extras): """Method used to log a step.""" self.log(level=level, msg=step_id, extra=extras, exc_info=trace) # type: ignore
Ancestors
- logging.LoggerAdapter
Methods
def end_task(self, trace: Optional[Exception] = None, move_file: bool = True)
-
Method used to close a task record.
def log_step(self, step_id: str, level: int = 20, trace: Union[Exception, str, ForwardRef(None)] = None, **extras)
-
Method used to log a step.
def move_json_to_remote(self)
-
Method used to move logs to remote storage.
def process(self, msg, kwargs)
-
Add to extra the required information. Deals with trace parameter.
Args
msg
:str
- logged message content.
kwargs
:dict
- log call arguments.
Returns
(str, dict): msg and kwargs handled.
def start_task(self, **kwargs)
-
Method used to open a task record.