Module panama.logging.file_handler

Functions

def make_file_handler(config_run: ConfigRun, path: str, remote_path: str, level: int = 20) ‑> PanamaFileHandler

Function used to generate the file handler needed by a default panama logger.

Args

config_run : ConfigRun
config run used to pass defaults to record.
path : str
local path to a folder for json logs.
remote_path : str
path where the json file will be sent at the end of a task.
level : int, optional
default logging level. Defaults to logging.INFO.

Returns

PanamaFileHandler
file handler for panama logger.

Classes

class PanamaFileHandler (path: str, task_name: str, remote_path: str, level: int = 20)

Handler for file logs.

Initialize the json handler

Args

path : str
path to the folder
file_name (str) : name of the file where logging is written.
remote_path : str
the path to the remote adls where the json will be sent at the end of a task.
level : int, optional
minimum log level. Defaults to logging.INFO.
Expand source code
class PanamaFileHandler(logging.Handler):

    """Handler for file logs."""

    def __init__(self, path: str, task_name: str, remote_path: str, level: int = logging.INFO):
        """Initialize the json handler

        Args:
            path (str): path to the folder
            file_name (str) : name of the file where logging is written.
            remote_path (str): the path to the remote adls where the json will be sent at the end of a task.
            level (int, optional): minimum log level. Defaults to logging.INFO.
        """
        super().__init__(level=level)
        self.path = path
        self.remote_path = remote_path
        self.task_name = task_name
        self.start_time = datetime.astimezone(datetime.now(), tz=timezone("Europe/Rome"))
        self.file_name = f"{self.task_name.replace('_','')}_{self.start_time.strftime('%Y%m%d%H%M%S')}.json"
        self.set_local_path()
        self.file_pointer = None

    def _open_local_connection(self):
        """Method used to open connection in a local path"""
        self.file_pointer = open(self.local_path, "w+")

    def set_local_path(self, path_prefix=None):
        if path_prefix is None:
            if runtime() == "databricks":
                path_prefix = "/dbfs"
            else:
                path_prefix = os.getcwd()
        local_folder = os.path.join(path_prefix, self.path)
        if not os.path.exists(local_folder):
            os.makedirs(local_folder)
        # note: the file is not yet created.
        # the empty file is created when _open_local_connection is called
        self.local_path = os.path.join(local_folder, self.file_name)

    def emit(self, record: logging.LogRecord):
        """Method used to write logs to the target local json file."""
        if self.file_pointer is None:
            self._open_local_connection()

        # format record
        log_entry = self.format(record=record)
        self.file_pointer.write(log_entry + "\n")  # type: ignore
        self.file_pointer.flush()  # type: ignore
        os.fsync(self.file_pointer.fileno())  # type: ignore

    def move_to_remote(self):
        """Method used to move logs to remote storage."""
        spark = SparkSession.getActiveSession()
        dbutils = get_db_utils(spark)
        if self.file_pointer is not None:
            self.file_pointer.close()
            try:
                # handle non-databricks environment
                if runtime() == "databricks":
                    source = f"file:{self.local_path}"
                else:
                    source = f"{self.local_path}"
                dbutils.fs.mkdirs(self.remote_path)
                dbutils.fs.mv(source, self.remote_path, recurse=True)
            except Exception as e:
                if "java.io.FileNotFoundException" not in str(e):
                    print(self.local_path, "not found.")
                    raise e

Ancestors

  • logging.Handler
  • logging.Filterer

Methods

def emit(self, record: logging.LogRecord)

Method used to write logs to the target local json file.

def move_to_remote(self)

Method used to move logs to remote storage.

def set_local_path(self, path_prefix=None)
class fileFilter (config_run: ConfigRun)

Filter instances are used to perform arbitrary filtering of LogRecords.

Loggers and Handlers can optionally use Filter instances to filter records as desired. The base filter class only allows events which are below a certain point in the logger hierarchy. For example, a filter initialized with "A.B" will allow events logged by loggers "A.B", "A.B.C", "A.B.C.D", "A.B.D" etc. but not "A.BB", "B.A.B" etc. If initialized with the empty string, all events are passed.

Initialize a filter.

Initialize with the name of the logger which, together with its children, will have its events allowed through the filter. If no name is specified, allow every event.

Expand source code
class fileFilter(logging.Filter):
    def __init__(self, config_run: ConfigRun):
        self.config_run = config_run

    def _set_defaults(self, record):
        record.job_id = self.config_run.job_id
        record.job_run_id = self.config_run.job_run_id
        record.task_id = self.config_run.task_id

    def filter(self, record):
        self._set_defaults(record)
        return record.levelno < 99

Ancestors

  • logging.Filter

Methods

def filter(self, record)

Determine if the specified record is to be logged.

Returns True if the record should be logged, or False otherwise. If deemed appropriate, the record may be modified in-place.

class fileFormatter

Formatter instances are used to convert a LogRecord to text.

Formatters need to know how a LogRecord is constructed. They are responsible for converting a LogRecord to (usually) a string which can be interpreted by either a human or an external system. The base Formatter allows a formatting string to be specified. If none is supplied, the style-dependent default value, "%(message)s", "{message}", or "${message}", is used.

The Formatter can be initialized with a format string which makes use of knowledge of the LogRecord attributes - e.g. the default value mentioned above makes use of the fact that the user's message and arguments are pre- formatted into a LogRecord's message attribute. Currently, the useful attributes in a LogRecord are described by:

%(name)s Name of the logger (logging channel) %(levelno)s Numeric logging level for the message (DEBUG, INFO, WARNING, ERROR, CRITICAL) %(levelname)s Text logging level for the message ("DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL") %(pathname)s Full pathname of the source file where the logging call was issued (if available) %(filename)s Filename portion of pathname %(module)s Module (name portion of filename) %(lineno)d Source line number where the logging call was issued (if available) %(funcName)s Function name %(created)f Time when the LogRecord was created (time.time() return value) %(asctime)s Textual time when the LogRecord was created %(msecs)d Millisecond portion of the creation time %(relativeCreated)d Time in milliseconds when the LogRecord was created, relative to the time the logging module was loaded (typically at application startup time) %(thread)d Thread ID (if available) %(threadName)s Thread name (if available) %(process)d Process ID (if available) %(message)s The result of record.getMessage(), computed just as the record is emitted

Initialize the formatter with specified format strings.

Initialize the formatter either with the specified format string, or a default as described above. Allow for specialized date formatting with the optional datefmt argument. If datefmt is omitted, you get an ISO8601-like (or RFC 3339-like) format.

Use a style parameter of '%', '{' or '$' to specify that you want to use one of %-formatting, :meth:str.format ({}) formatting or :class:string.Template formatting in your format string.

Changed in version: 3.2

Added the style parameter.

Expand source code
class fileFormatter(logging.Formatter):
    def __init__(self):
        super().__init__(fmt="%(asctime)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S")

    def format(self, record):
        """Function to format the record as required by panama.

        Args:
            record (LogRecord): input record from logger.

        Returns:
            str: string to write to file.
        """
        super().format(record)

        # get trace if present
        if hasattr(record, "trace"):
            trace_content = record.trace  # type: ignore
        elif record.exc_text is not None:
            trace_content = record.exc_text
        else:
            trace_content = None

        # get extra content
        extra_content = getattr(record, "extra", dict())
        if len(extra_content) > 0:
            extra_content = json.dumps(extra_content)
        else:
            extra_content = None

        # populate message content
        msg = {
            "asctime": record.asctime,
            "levelname": record.levelname,
            "job_id": record.job_id,  # type: ignore
            "job_run_id": record.job_run_id,  # type: ignore
            "task_id": record.task_id,  # type: ignore
            "step_id": record.message,
        }

        # if trace is not none, add trace
        if trace_content is not None:
            msg["trace"] = trace_content
        # if extra content is not none, add extra content
        if extra_content is not None:
            msg["extras"] = extra_content

        return json.dumps(msg)

Ancestors

  • logging.Formatter

Methods

def format(self, record)

Function to format the record as required by panama.

Args

record : LogRecord
input record from logger.

Returns

str
string to write to file.