Module panama.logging.delta_handler

Functions

def make_delta_handler(config_run: ConfigRun, delta_folder: str, level: int) ‑> deltaHandler

Function used to create the required delta handler.

Args

config_run : ConfigRun
config run for the handler.
delta_folder : str
path to delta table.
level : int
default logging level.

Returns

deltaHandler
handler for delta logs.

Classes

class deltaFilter (config_run: ConfigRun)

Filter for delta logging. Sets defaults and passes to handler only records having level above 99.

Initialize a filter.

Initialize with the name of the logger which, together with its children, will have its events allowed through the filter. If no name is specified, allow every event.

Expand source code
class deltaFilter(logging.Filter):
    """Filter for delta logging. Sets defaults and passes to handler only records having level above 99."""

    def __init__(self, config_run: ConfigRun):
        self.config_run = config_run

    def _set_defaults(self, record):
        record.job_id = self.config_run.job_id
        record.job_run_id = self.config_run.job_run_id
        record.task_id = self.config_run.task_id
        record.job_name = self.config_run.job_name
        record.task_name = self.config_run.task_name
        record.url = self.config_run.url
        record.task_name = self.config_run.task_name

    def filter(self, record):
        self._set_defaults(record)
        return record.levelno >= 99

Ancestors

  • logging.Filter

Methods

def filter(self, record)

Determine if the specified record is to be logged.

Returns True if the record should be logged, or False otherwise. If deemed appropriate, the record may be modified in-place.

class deltaFormatter

Formatter instances are used to convert a LogRecord to text.

Formatters need to know how a LogRecord is constructed. They are responsible for converting a LogRecord to (usually) a string which can be interpreted by either a human or an external system. The base Formatter allows a formatting string to be specified. If none is supplied, the style-dependent default value, "%(message)s", "{message}", or "${message}", is used.

The Formatter can be initialized with a format string which makes use of knowledge of the LogRecord attributes - e.g. the default value mentioned above makes use of the fact that the user's message and arguments are pre- formatted into a LogRecord's message attribute. Currently, the useful attributes in a LogRecord are described by:

%(name)s Name of the logger (logging channel) %(levelno)s Numeric logging level for the message (DEBUG, INFO, WARNING, ERROR, CRITICAL) %(levelname)s Text logging level for the message ("DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL") %(pathname)s Full pathname of the source file where the logging call was issued (if available) %(filename)s Filename portion of pathname %(module)s Module (name portion of filename) %(lineno)d Source line number where the logging call was issued (if available) %(funcName)s Function name %(created)f Time when the LogRecord was created (time.time() return value) %(asctime)s Textual time when the LogRecord was created %(msecs)d Millisecond portion of the creation time %(relativeCreated)d Time in milliseconds when the LogRecord was created, relative to the time the logging module was loaded (typically at application startup time) %(thread)d Thread ID (if available) %(threadName)s Thread name (if available) %(process)d Process ID (if available) %(message)s The result of record.getMessage(), computed just as the record is emitted

Initialize the formatter with specified format strings.

Initialize the formatter either with the specified format string, or a default as described above. Allow for specialized date formatting with the optional datefmt argument. If datefmt is omitted, you get an ISO8601-like (or RFC 3339-like) format.

Use a style parameter of '%', '{' or '$' to specify that you want to use one of %-formatting, :meth:str.format ({}) formatting or :class:string.Template formatting in your format string.

Changed in version: 3.2

Added the style parameter.

Expand source code
class deltaFormatter(logging.Formatter):
    def __init__(self):
        super().__init__(fmt="%(message)s - %(asctime)s", datefmt="%Y-%m-%d %H:%M:%S")

    def format(self, record):
        super().format(record)
        msg = {
            "job_id": record.job_id,  # type: ignore
            "job_run_id": record.job_run_id,  # type: ignore
            "task_id": record.task_id,  # type: ignore
            "job_name": record.job_name,  # type: ignore
            "task_name": record.task_name,  # type: ignore
            "params": getattr(record, "extra", None),  # type: ignore
            "url": record.url,  # type: ignore
            "status": record.msg,
            "trace": record.exc_text,
        }
        if record.levelno == 99:
            msg["start"] = datetime.strptime(record.asctime, self.datefmt)  # type: ignore
        elif record.levelno == 100:
            msg["end"] = datetime.strptime(record.asctime, self.datefmt)  # type: ignore
        return [msg]

Ancestors

  • logging.Formatter

Methods

def format(self, record)

Format the specified record as text.

The record's attribute dictionary is used as the operand to a string formatting operation which yields the returned string. Before formatting the dictionary, a couple of preparatory steps are carried out. The message attribute of the record is computed using LogRecord.getMessage(). If the formatting string uses the time (as determined by a call to usesTime(), formatTime() is called to format the event time. If there is exception information, it is formatted using formatException() and appended to the message.

class deltaHandler (spark: pyspark.sql.session.SparkSession, path: str, level: int, delta_table_keys: Iterable[str])

Handler instances dispatch logging events to specific destinations.

The base handler class. Acts as a placeholder which defines the Handler interface. Handlers can optionally use Formatter instances to format records as desired. By default, no formatter is specified; in this case, the 'raw' message as determined by record.message is logged.

Initializes the instance - basically setting the formatter to None and the filter list to empty.

Expand source code
class deltaHandler(logging.Handler):
    schema = T.StructType(
        [
            T.StructField("job_id", T.StringType(), False),
            T.StructField("job_run_id", T.StringType(), False),
            T.StructField("task_id", T.StringType(), False),
            T.StructField("job_name", T.StringType(), True),
            T.StructField("task_name", T.StringType(), True),
            T.StructField("start", T.TimestampType(), True),
            T.StructField("end", T.TimestampType(), True),
            T.StructField("status", T.StringType(), True),
            T.StructField("params", T.StringType(), True),
            T.StructField("url", T.StringType(), True),
            T.StructField("trace", T.StringType(), True),
        ]
    )

    def __init__(self, spark: SparkSession, path: str, level: int, delta_table_keys: Iterable[str]):
        super().__init__(level=level)
        self.spark = spark
        self.path = path
        self.table = DeltaTable.forPath(spark, path)
        self.delta_table_keys = delta_table_keys

    def emit(self, record: logging.LogRecord) -> None:
        """Emit function for delta table.

        Args:
            record (DataFrame): record to be written.
        """
        try:
            msg = self.format(record=record)
            msg_sdf = self.spark.createDataFrame(msg, schema=self.schema)  # type: ignore
            merge_condition = " and ".join([f"source.`{i}` = target.`{i}`" for i in self.delta_table_keys])

            self.table.alias("target").merge(
                source=msg_sdf.alias("source"), condition=F.expr(merge_condition)
            ).whenMatchedUpdate(
                set={
                    "end": F.col("source.end"),
                    "status": F.col("source.status"),
                    "trace": F.col("source.trace"),
                }
            ).whenNotMatchedInsert(
                values={col: F.col(f"source.`{col}`") for col in msg_sdf.columns}
            ).execute()
        except Exception:
            self.handleError(record)

        self.close()

Ancestors

  • logging.Handler
  • logging.Filterer

Class variables

var schema

Methods

def emit(self, record: logging.LogRecord) ‑> None

Emit function for delta table.

Args

record : DataFrame
record to be written.