Module panama.logging

Sub-modules

panama.logging.config_run
panama.logging.decorators
panama.logging.delta_handler
panama.logging.file_handler
panama.logging.logger
panama.logging.messages

Functions

def get_adls_name() ‑> str

Get the adls name.

Returns

str
current log adls name.
def get_container() ‑> str

Get the container name.

Returns

str
current log container name.
def get_delta_folder() ‑> str

Get the delta folder.

Returns

str
current delta log folder.
def get_json_folder() ‑> str

Get the json folder.

Returns

str
current json log folder.
def get_logger(context_manager: Optional[ContextManager] = None, name: Optional[str] = None, level: int = 20, delta_path: Optional[str] = None, json_path: Optional[str] = None, json_remote_path: Optional[str] = None) ‑> PanamaAdapter

Main function used to get a panama logger.

Args

context_manager : Union[ContextManager, None], optional
ContextManager holding the contextual information. If None
a default context manager is created, assuming a spark session is active. Defaults to None.
name : Union[str, None], optional
logger name. If None, the root logger is used. Defaults to None.
level : int, optional
log level. Defaults to logging.INFO.
delta_path : Union[str, None], optional
path to delta folder. If None a default value is generated. Defaults to None.
json_path : Union[str, None], optional
path to json folder. If None a default value is generated. Defaults to None.

Raises

ValueError
if no context_manager is passed and no spark session is active, a ValueError is raised.

Returns

PanamaAdapter
panama logger with the required handlers.
def log_execution(logger_name: Optional[str] = None, level: int = 20, blocking: bool = True)

Decorator function, used to retrieve the logger by logger name and log the operations of a function.

Args

logger_name : Union[str, None], optional
name of the logger to retrieve. If None, the root logger is used. Defaults to None.
level : int, optional
level of the log step. Defaults to panama_log.INFO.
blocking : bool, optional
flag that indicates if the task must be closed and the error raised. Defaults to True.
def set_adls_name(adls_name: str) ‑> None

Set the adls name.

Args

adls : str
adls name.
def set_container(container: str) ‑> None

Set the container name.

Args

container : str
container name.
def set_delta_folder(folder: str) ‑> None

Set the delta folder name.

Args

folder : str
delta folder name.
def set_json_folder(folder: str) ‑> None

Set the json folder name.

Args

folder : str
json folder name.

Classes

class ConfigRun (context_manager: ContextManager)

Class used to initialize the data of a job run.

Attributes

dbutils_data : dict
data extracted from dbutils. If dbutils is missing, an empty dictionary is generated.
spark : SparkSession
current spark session.

Initialize the ConfigRun object. dbutils_data is generated.

Args

spark : Union[SparkSession, None]
current spark session. If None, the spark session is automatically fetched. Default is None.
Expand source code
class ConfigRun:
    """Class used to initialize the data of a job run.

    Attributes:
        dbutils_data (dict): data extracted from dbutils. If dbutils is missing, an empty dictionary is generated.
        spark (SparkSession): current spark session.
    """

    def __init__(self, context_manager: ContextManager):
        """Initialize the ConfigRun object. dbutils_data is generated.

        Args:
            spark (Union[SparkSession, None]): current spark session. If None, the spark session is automatically fetched. Default is None.
        """
        self.context_manager = context_manager
        self.spark = self.context_manager.spark
        self._set_dbutils_data()

    def _set_default(self, with_timestamp: bool = False):
        try:
            default = os.path.basename(sys.argv[0])
        except:
            try:
                default = __file__
            except:
                default = str(uuid.uuid4())

        if with_timestamp is True:
            ts = datetime.astimezone(datetime.now(), tz=timezone("Europe/Rome")).strftime("%Y_%m_%d__%H_%M_%S")
            default = "@".join([default, ts])

        return default

    def _set_dbutils_data(self):
        """Method used to try to set the dbutils data. If it fails no error is raised, and dbutils_data are set as empty."""

        self.dbutils = get_db_utils(self.spark)

        try:
            dbutils_data = self.dbutils.notebook.entry_point.getDbutils().notebook().getContext().toJson()  # type: ignore
            self.dbutils_data = json.loads(dbutils_data)["tags"]
        except:  #  (py4j.protocol.Py4JJavaError, NameError, ValueError) as e:
            print("No dbutils found. Cannot import data from dbutils.")
            self.dbutils_data = dict()

    def get_from_dbutils(self, key: str, default: Union[str, None] = None) -> Union[str, None]:
        """Method used to extract a value from dbutils_data.

        Args:
            key (str): name of the parameter to extract.
            default (Union[str, None], optional): default value if no parameter is found. Defaults to None.

        Returns:
            Union[str, None]: content of dbutils_data for the required key.
        """
        return self.dbutils_data.get(key, default)

    def get_from_databricks_conf(self, key: str, default: Union[str, None] = None) -> Union[str, None]:
        """Method used to extract a value from spark configuration.

        Args:
            key (str): name of the parameter to extract.
            default (Union[str, None], optional): default value if no parameter is found. Defaults to None.

        Returns:
            Union[str, None]: content of spark configuration for the required key.
        """
        return self.spark.conf.get(key, default)

    def get_job_id(self) -> Union[str, None]:
        """Method used to get the job_id from the dbutils_data.  if no job_id is found, returns current notebook name if dbutils is available, otherwise current file name.
        If also current file name is not available, a uuid4 is generated.

        Returns:
            Union[str, None]: value of jobId
        """
        default = self._set_default()
        return self.get_from_dbutils("jobId", default)

    def get_job_run_id(self) -> Union[str, None]:
        """Method used to get the job_run_id from the dbutils_data. If no job_run_id is found, returns current notebook name if dbutils is available, otherwise current file name.
        If also current file name is not available, a uuid4 is generated.

        Returns:
            Union[str, None]: value of jobId
        """
        default = self._set_default(with_timestamp=True)
        return self.get_from_dbutils("multitaskParentRunId", default)

    def get_task_id(self) -> Union[str, None]:
        """Method used to get the task_id from the dbutils_data. If no job_run_id is found, returns current notebook name if dbutils is available, otherwise current file name.
        If also current file name is not available, a uuid4 is generated.

        Returns:
            Union[str, None]: value of jobId
        """
        default = self._set_default()
        return self.get_from_dbutils("runId", default)

    def _get_workspace_url_root(self, default: Union[str, None] = None) -> Union[str, None]:
        """Method used to get the workspace url root.

        Args:
            default (Union[str, None], optional): default value if no value is found.. Defaults to None.

        Returns:
            Union[str, None]: root for the notebook link.
        """
        workspace_url_root = self.get_from_dbutils("browserHostName", default)
        if workspace_url_root is None:
            workspace_url_root = self.get_from_databricks_conf("spark.databricks.workspaceUrl", default)
        return workspace_url_root

    def get_workspace(self) -> Union[str, None]:
        """Method used to get the org_id from the dbutils_data. Returns None if no org_id is found.

        Returns:
            Union[str, None]: value of org_id
        """
        return self.get_from_dbutils("orgId")

    def get_job_name(self) -> Union[str, None]:
        """Method used to get the current job_name from dbutils_data. Returns None if no jobName is found.

        Returns:
            Union[str, None]: value of job_name
        """
        return self.get_from_dbutils("jobName")

    def get_task_name(self) -> str:
        """Method used to get the current task_name from notebook info. Returns sys.argv[0] as default if an error is raised, otherwise the notebook path.

        Returns:
            str: value of the taskName
        """

        try:
            path = self.dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get()  # type: ignore
        except:
            path = sys.argv[0]
        default = os.path.basename(path).split(".")[0]
        return self.get_from_dbutils("taskName", default)  # type: ignore

    def get_widgets(self) -> Mapping[str, object]:
        """Method used to get the widgets content.

        Returns:
            Union[str, None]: string of the widgets, repr of a dictionary.
        """
        widgets = self.context_manager.__dict__
        # remove the spark attribute
        widgets = {k: v for k, v in widgets.items() if k != "spark"}
        return widgets

    def get_url(self) -> str:
        """Method used to generate an url to a notebook.

        Args:
            job_id (str): id of the current job.
            task_id (str): id of the current task

        Returns:
            str: generated url. Id no url is generated, the string 'No job run associated' is returned.
        """
        url_root = self._get_workspace_url_root()
        workspace = self.get_workspace()
        if url_root is not None:
            url = f"https://{url_root}?o={workspace}#job/{self.job_id}/run/{self.task_id}"
        else:
            url = "No job run associated"
        return url

    def generate_config(self):
        """Method used to generate a default job run configuration.

        The following attributes are set:

            job_id str
            job_run_id str
            task_id str
            job_name Union[str, None]
            task_name Union[str, None]
            url str
            widgets str
            defaults Dict[str, str]
        """
        self.job_id = self.get_job_id()
        self.job_run_id = self.get_job_run_id()
        self.task_id = self.get_task_id()

        self.job_name = self.get_job_name()
        self.task_name = self.get_task_name()

        self.url = self.get_url()
        self.widgets = self.get_widgets()

        self.defaults = {"job_id": self.job_id, "job_run_id": self.job_run_id, "task_id": self.task_id}

        return self

Methods

def generate_config(self)

Method used to generate a default job run configuration.

The following attributes are set:

job_id str
job_run_id str
task_id str
job_name Union[str, None]
task_name Union[str, None]
url str
widgets str
defaults Dict[str, str]
def get_from_databricks_conf(self, key: str, default: Optional[str] = None) ‑> Optional[str]

Method used to extract a value from spark configuration.

Args

key : str
name of the parameter to extract.
default : Union[str, None], optional
default value if no parameter is found. Defaults to None.

Returns

Union[str, None]
content of spark configuration for the required key.
def get_from_dbutils(self, key: str, default: Optional[str] = None) ‑> Optional[str]

Method used to extract a value from dbutils_data.

Args

key : str
name of the parameter to extract.
default : Union[str, None], optional
default value if no parameter is found. Defaults to None.

Returns

Union[str, None]
content of dbutils_data for the required key.
def get_job_id(self) ‑> Optional[str]

Method used to get the job_id from the dbutils_data. if no job_id is found, returns current notebook name if dbutils is available, otherwise current file name. If also current file name is not available, a uuid4 is generated.

Returns

Union[str, None]
value of jobId
def get_job_name(self) ‑> Optional[str]

Method used to get the current job_name from dbutils_data. Returns None if no jobName is found.

Returns

Union[str, None]
value of job_name
def get_job_run_id(self) ‑> Optional[str]

Method used to get the job_run_id from the dbutils_data. If no job_run_id is found, returns current notebook name if dbutils is available, otherwise current file name. If also current file name is not available, a uuid4 is generated.

Returns

Union[str, None]
value of jobId
def get_task_id(self) ‑> Optional[str]

Method used to get the task_id from the dbutils_data. If no job_run_id is found, returns current notebook name if dbutils is available, otherwise current file name. If also current file name is not available, a uuid4 is generated.

Returns

Union[str, None]
value of jobId
def get_task_name(self) ‑> str

Method used to get the current task_name from notebook info. Returns sys.argv[0] as default if an error is raised, otherwise the notebook path.

Returns

str
value of the taskName
def get_url(self) ‑> str

Method used to generate an url to a notebook.

Args

job_id : str
id of the current job.
task_id : str
id of the current task

Returns

str
generated url. Id no url is generated, the string 'No job run associated' is returned.
def get_widgets(self) ‑> Mapping[str, object]

Method used to get the widgets content.

Returns

Union[str, None]
string of the widgets, repr of a dictionary.
def get_workspace(self) ‑> Optional[str]

Method used to get the org_id from the dbutils_data. Returns None if no org_id is found.

Returns

Union[str, None]
value of org_id
class PanamaAdapter (logger: logging.Logger, config_run: ConfigRun)

Panama adapter to add contextual information and create the methods start_task, end_task and log_step.

Initialize adapter.

Args

logger : logging.Logger
underlying logger object.
config_run : ConfigRun
config run, with default data.
Expand source code
class PanamaAdapter(logging.LoggerAdapter):
    """Panama adapter to add contextual information and create the methods start_task, end_task and log_step."""

    def __init__(self, logger: logging.Logger, config_run: ConfigRun):
        """Initialize adapter.

        Args:
            logger (logging.Logger): underlying logger object.
            config_run (ConfigRun): config run, with default data.
        """
        self.config_run = config_run
        super().__init__(logger=logger, extra=config_run.widgets)

    def process(self, msg, kwargs):
        """Add to extra the required information. Deals with trace parameter.

        Args:
            msg (str): logged message content.
            kwargs (dict): log call arguments.

        Returns:
            (str, dict): msg and kwargs handled.
        """
        forbidden = ["exc_info", "stack_info", "logger"]
        extra = {k: str(v) for k, v in kwargs.items() if k not in forbidden}
        # add trace kwarg to extra
        if isinstance(kwargs.get("exc_info"), str):
            extra["trace"] = kwargs["exc_info"]
        kwargs["extra"] = extra
        # remove trace from kwargs, as it has been added to extras
        kwargs.pop("trace", None)
        return msg, kwargs

    def move_json_to_remote(self):
        """Method used to move logs to remote storage."""
        # get task name from config_run
        task_name = self.config_run.task_name
        # get logger_dict from manager
        logger_dict = self.logger.manager.loggerDict.items()
        # keep panama loggers related to the task
        loggers = [v for k, v in logger_dict if k.startswith(f"PANAMA_{task_name}")]
        # get file handlers for every logger and stash the file
        for l in loggers:
            for h in l.logger.handlers:
                if isinstance(h, PanamaFileHandler):
                    h.move_to_remote()

    def start_task(self, **kwargs):
        """Method used to open a task record."""
        self.log(level=99, msg="RUNNING", extra=kwargs, stack_info=False)

    def end_task(self, trace: Union[Exception, None] = None, move_file: bool = True):
        """Method used to close a task record."""
        if trace is None:
            self.log(level=100, msg="OK")
        else:
            self.log(level=100, msg="KO", exc_info=trace)
        # wait a bit if some files have not been closed
        time.sleep(10)
        if move_file is True:
            # move task json to remote
            self.move_json_to_remote()

    def log_step(self, step_id: str, level: int = logging.INFO, trace: Union[Exception, str, None] = None, **extras):
        """Method used to log a step."""
        self.log(level=level, msg=step_id, extra=extras, exc_info=trace)  # type: ignore

Ancestors

  • logging.LoggerAdapter

Methods

def end_task(self, trace: Optional[Exception] = None, move_file: bool = True)

Method used to close a task record.

def log_step(self, step_id: str, level: int = 20, trace: Union[Exception, str, ForwardRef(None)] = None, **extras)

Method used to log a step.

def move_json_to_remote(self)

Method used to move logs to remote storage.

def process(self, msg, kwargs)

Add to extra the required information. Deals with trace parameter.

Args

msg : str
logged message content.
kwargs : dict
log call arguments.

Returns

(str, dict): msg and kwargs handled.

def start_task(self, **kwargs)

Method used to open a task record.