Module panama.logging.delta_handler

Functions

def make_delta_handler(config_run: ConfigRun,
delta_folder: str,
level: int) ‑> deltaHandler
Expand source code
def make_delta_handler(config_run: ConfigRun, delta_folder: str, level: int) -> deltaHandler:
    """Function used to create the required delta handler.

    Args:
        config_run (ConfigRun): config run for the handler.
        delta_folder (str): path to delta table.
        level (int): default logging level.

    Returns:
        deltaHandler: handler for delta logs.
    """
    delta_filter = deltaFilter(config_run)
    delta_formatter = deltaFormatter()
    delta_formatter.converter = time_converter
    delta_handler = deltaHandler(
        spark=config_run.spark,
        path=delta_folder,
        delta_table_keys=config_run.defaults.keys(),
        level=level,
    )
    delta_handler.setFormatter(delta_formatter)
    delta_handler.addFilter(delta_filter)

    return delta_handler

Function used to create the required delta handler.

Args

config_run : ConfigRun
config run for the handler.
delta_folder : str
path to delta table.
level : int
default logging level.

Returns

deltaHandler
handler for delta logs.

Classes

class deltaFilter (config_run: ConfigRun)
Expand source code
class deltaFilter(logging.Filter):
    """Filter for delta logging. Sets defaults and passes to handler only records having level above 99."""

    def __init__(self, config_run: ConfigRun):
        self.config_run = config_run

    def _set_defaults(self, record):
        record.job_id = self.config_run.job_id
        record.job_run_id = self.config_run.job_run_id
        record.task_id = self.config_run.task_id
        record.job_name = self.config_run.job_name
        record.task_name = self.config_run.task_name
        record.url = self.config_run.url
        record.task_name = self.config_run.task_name

    def filter(self, record):
        self._set_defaults(record)
        return record.levelno >= 99

Filter for delta logging. Sets defaults and passes to handler only records having level above 99.

Initialize a filter.

Initialize with the name of the logger which, together with its children, will have its events allowed through the filter. If no name is specified, allow every event.

Ancestors

  • logging.Filter

Methods

def filter(self, record)
Expand source code
def filter(self, record):
    self._set_defaults(record)
    return record.levelno >= 99

Determine if the specified record is to be logged.

Returns True if the record should be logged, or False otherwise. If deemed appropriate, the record may be modified in-place.

class deltaFormatter
Expand source code
class deltaFormatter(logging.Formatter):
    def __init__(self):
        super().__init__(fmt="%(message)s - %(asctime)s", datefmt="%Y-%m-%d %H:%M:%S")

    def format(self, record):
        super().format(record)
        msg = {
            "job_id": record.job_id,  # type: ignore
            "job_run_id": record.job_run_id,  # type: ignore
            "task_id": record.task_id,  # type: ignore
            "job_name": record.job_name,  # type: ignore
            "task_name": record.task_name,  # type: ignore
            "params": getattr(record, "extra", None),  # type: ignore
            "url": record.url,  # type: ignore
            "status": record.msg,
            "trace": record.exc_text,
        }
        if record.levelno == 99:
            msg["start"] = datetime.strptime(record.asctime, self.datefmt)  # type: ignore
        elif record.levelno == 100:
            msg["end"] = datetime.strptime(record.asctime, self.datefmt)  # type: ignore
        return [msg]

Formatter instances are used to convert a LogRecord to text.

Formatters need to know how a LogRecord is constructed. They are responsible for converting a LogRecord to (usually) a string which can be interpreted by either a human or an external system. The base Formatter allows a formatting string to be specified. If none is supplied, the style-dependent default value, "%(message)s", "{message}", or "${message}", is used.

The Formatter can be initialized with a format string which makes use of knowledge of the LogRecord attributes - e.g. the default value mentioned above makes use of the fact that the user's message and arguments are pre- formatted into a LogRecord's message attribute. Currently, the useful attributes in a LogRecord are described by:

%(name)s Name of the logger (logging channel) %(levelno)s Numeric logging level for the message (DEBUG, INFO, WARNING, ERROR, CRITICAL) %(levelname)s Text logging level for the message ("DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL") %(pathname)s Full pathname of the source file where the logging call was issued (if available) %(filename)s Filename portion of pathname %(module)s Module (name portion of filename) %(lineno)d Source line number where the logging call was issued (if available) %(funcName)s Function name %(created)f Time when the LogRecord was created (time.time() return value) %(asctime)s Textual time when the LogRecord was created %(msecs)d Millisecond portion of the creation time %(relativeCreated)d Time in milliseconds when the LogRecord was created, relative to the time the logging module was loaded (typically at application startup time) %(thread)d Thread ID (if available) %(threadName)s Thread name (if available) %(process)d Process ID (if available) %(message)s The result of record.getMessage(), computed just as the record is emitted

Initialize the formatter with specified format strings.

Initialize the formatter either with the specified format string, or a default as described above. Allow for specialized date formatting with the optional datefmt argument. If datefmt is omitted, you get an ISO8601-like (or RFC 3339-like) format.

Use a style parameter of '%', '{' or '$' to specify that you want to use one of %-formatting, :meth:str.format ({}) formatting or :class:string.Template formatting in your format string.

Changed in version: 3.2

Added the style parameter.

Ancestors

  • logging.Formatter

Methods

def format(self, record)
Expand source code
def format(self, record):
    super().format(record)
    msg = {
        "job_id": record.job_id,  # type: ignore
        "job_run_id": record.job_run_id,  # type: ignore
        "task_id": record.task_id,  # type: ignore
        "job_name": record.job_name,  # type: ignore
        "task_name": record.task_name,  # type: ignore
        "params": getattr(record, "extra", None),  # type: ignore
        "url": record.url,  # type: ignore
        "status": record.msg,
        "trace": record.exc_text,
    }
    if record.levelno == 99:
        msg["start"] = datetime.strptime(record.asctime, self.datefmt)  # type: ignore
    elif record.levelno == 100:
        msg["end"] = datetime.strptime(record.asctime, self.datefmt)  # type: ignore
    return [msg]

Format the specified record as text.

The record's attribute dictionary is used as the operand to a string formatting operation which yields the returned string. Before formatting the dictionary, a couple of preparatory steps are carried out. The message attribute of the record is computed using LogRecord.getMessage(). If the formatting string uses the time (as determined by a call to usesTime(), formatTime() is called to format the event time. If there is exception information, it is formatted using formatException() and appended to the message.

class deltaHandler (spark: pyspark.sql.session.SparkSession,
path: str,
level: int,
delta_table_keys: Iterable[str])
Expand source code
class deltaHandler(logging.Handler):
    schema = T.StructType(
        [
            T.StructField("job_id", T.StringType(), False),
            T.StructField("job_run_id", T.StringType(), False),
            T.StructField("task_id", T.StringType(), False),
            T.StructField("job_name", T.StringType(), True),
            T.StructField("task_name", T.StringType(), True),
            T.StructField("start", T.TimestampType(), True),
            T.StructField("end", T.TimestampType(), True),
            T.StructField("status", T.StringType(), True),
            T.StructField("params", T.StringType(), True),
            T.StructField("url", T.StringType(), True),
            T.StructField("trace", T.StringType(), True),
        ]
    )

    def __init__(self, spark: SparkSession, path: str, level: int, delta_table_keys: Iterable[str]):
        super().__init__(level=level)
        self.spark = spark
        self.path = path
        self.table = DeltaTable.forPath(spark, path)
        self.delta_table_keys = delta_table_keys

    def emit(self, record: logging.LogRecord) -> None:
        """Emit function for delta table.

        Args:
            record (DataFrame): record to be written.
        """
        try:
            msg = self.format(record=record)
            msg_sdf = self.spark.createDataFrame(msg, schema=self.schema)  # type: ignore
            merge_condition = " and ".join([f"source.`{i}` = target.`{i}`" for i in self.delta_table_keys])

            self.table.alias("target").merge(
                source=msg_sdf.alias("source"), condition=F.expr(merge_condition)
            ).whenMatchedUpdate(
                set={
                    "end": F.col("source.end"),
                    "status": F.col("source.status"),
                    "trace": F.col("source.trace"),
                }
            ).whenNotMatchedInsert(
                values={col: F.col(f"source.`{col}`") for col in msg_sdf.columns}
            ).execute()
        except Exception:
            self.handleError(record)

        self.close()

Handler instances dispatch logging events to specific destinations.

The base handler class. Acts as a placeholder which defines the Handler interface. Handlers can optionally use Formatter instances to format records as desired. By default, no formatter is specified; in this case, the 'raw' message as determined by record.message is logged.

Initializes the instance - basically setting the formatter to None and the filter list to empty.

Ancestors

  • logging.Handler
  • logging.Filterer

Class variables

var schema

Methods

def emit(self, record: logging.LogRecord) ‑> None
Expand source code
def emit(self, record: logging.LogRecord) -> None:
    """Emit function for delta table.

    Args:
        record (DataFrame): record to be written.
    """
    try:
        msg = self.format(record=record)
        msg_sdf = self.spark.createDataFrame(msg, schema=self.schema)  # type: ignore
        merge_condition = " and ".join([f"source.`{i}` = target.`{i}`" for i in self.delta_table_keys])

        self.table.alias("target").merge(
            source=msg_sdf.alias("source"), condition=F.expr(merge_condition)
        ).whenMatchedUpdate(
            set={
                "end": F.col("source.end"),
                "status": F.col("source.status"),
                "trace": F.col("source.trace"),
            }
        ).whenNotMatchedInsert(
            values={col: F.col(f"source.`{col}`") for col in msg_sdf.columns}
        ).execute()
    except Exception:
        self.handleError(record)

    self.close()

Emit function for delta table.

Args

record : DataFrame
record to be written.