Module panama.logging.delta_handler
Functions
def make_delta_handler(config_run: ConfigRun, delta_folder: str, level: int) ‑> deltaHandler
-
Function used to create the required delta handler.
Args
config_run
:ConfigRun
- config run for the handler.
delta_folder
:str
- path to delta table.
level
:int
- default logging level.
Returns
deltaHandler
- handler for delta logs.
Classes
class deltaFilter (config_run: ConfigRun)
-
Filter for delta logging. Sets defaults and passes to handler only records having level above 99.
Initialize a filter.
Initialize with the name of the logger which, together with its children, will have its events allowed through the filter. If no name is specified, allow every event.
Expand source code
class deltaFilter(logging.Filter): """Filter for delta logging. Sets defaults and passes to handler only records having level above 99.""" def __init__(self, config_run: ConfigRun): self.config_run = config_run def _set_defaults(self, record): record.job_id = self.config_run.job_id record.job_run_id = self.config_run.job_run_id record.task_id = self.config_run.task_id record.job_name = self.config_run.job_name record.task_name = self.config_run.task_name record.url = self.config_run.url record.task_name = self.config_run.task_name def filter(self, record): self._set_defaults(record) return record.levelno >= 99
Ancestors
- logging.Filter
Methods
def filter(self, record)
-
Determine if the specified record is to be logged.
Returns True if the record should be logged, or False otherwise. If deemed appropriate, the record may be modified in-place.
class deltaFormatter
-
Formatter instances are used to convert a LogRecord to text.
Formatters need to know how a LogRecord is constructed. They are responsible for converting a LogRecord to (usually) a string which can be interpreted by either a human or an external system. The base Formatter allows a formatting string to be specified. If none is supplied, the style-dependent default value, "%(message)s", "{message}", or "${message}", is used.
The Formatter can be initialized with a format string which makes use of knowledge of the LogRecord attributes - e.g. the default value mentioned above makes use of the fact that the user's message and arguments are pre- formatted into a LogRecord's message attribute. Currently, the useful attributes in a LogRecord are described by:
%(name)s Name of the logger (logging channel) %(levelno)s Numeric logging level for the message (DEBUG, INFO, WARNING, ERROR, CRITICAL) %(levelname)s Text logging level for the message ("DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL") %(pathname)s Full pathname of the source file where the logging call was issued (if available) %(filename)s Filename portion of pathname %(module)s Module (name portion of filename) %(lineno)d Source line number where the logging call was issued (if available) %(funcName)s Function name %(created)f Time when the LogRecord was created (time.time() return value) %(asctime)s Textual time when the LogRecord was created %(msecs)d Millisecond portion of the creation time %(relativeCreated)d Time in milliseconds when the LogRecord was created, relative to the time the logging module was loaded (typically at application startup time) %(thread)d Thread ID (if available) %(threadName)s Thread name (if available) %(process)d Process ID (if available) %(message)s The result of record.getMessage(), computed just as the record is emitted
Initialize the formatter with specified format strings.
Initialize the formatter either with the specified format string, or a default as described above. Allow for specialized date formatting with the optional datefmt argument. If datefmt is omitted, you get an ISO8601-like (or RFC 3339-like) format.
Use a style parameter of '%', '{' or '$' to specify that you want to use one of %-formatting, :meth:
str.format
({}
) formatting or :class:string.Template
formatting in your format string.Changed in version: 3.2
Added the
style
parameter.Expand source code
class deltaFormatter(logging.Formatter): def __init__(self): super().__init__(fmt="%(message)s - %(asctime)s", datefmt="%Y-%m-%d %H:%M:%S") def format(self, record): super().format(record) msg = { "job_id": record.job_id, # type: ignore "job_run_id": record.job_run_id, # type: ignore "task_id": record.task_id, # type: ignore "job_name": record.job_name, # type: ignore "task_name": record.task_name, # type: ignore "params": getattr(record, "extra", None), # type: ignore "url": record.url, # type: ignore "status": record.msg, "trace": record.exc_text, } if record.levelno == 99: msg["start"] = datetime.strptime(record.asctime, self.datefmt) # type: ignore elif record.levelno == 100: msg["end"] = datetime.strptime(record.asctime, self.datefmt) # type: ignore return [msg]
Ancestors
- logging.Formatter
Methods
def format(self, record)
-
Format the specified record as text.
The record's attribute dictionary is used as the operand to a string formatting operation which yields the returned string. Before formatting the dictionary, a couple of preparatory steps are carried out. The message attribute of the record is computed using LogRecord.getMessage(). If the formatting string uses the time (as determined by a call to usesTime(), formatTime() is called to format the event time. If there is exception information, it is formatted using formatException() and appended to the message.
class deltaHandler (spark: pyspark.sql.session.SparkSession, path: str, level: int, delta_table_keys: Iterable[str])
-
Handler instances dispatch logging events to specific destinations.
The base handler class. Acts as a placeholder which defines the Handler interface. Handlers can optionally use Formatter instances to format records as desired. By default, no formatter is specified; in this case, the 'raw' message as determined by record.message is logged.
Initializes the instance - basically setting the formatter to None and the filter list to empty.
Expand source code
class deltaHandler(logging.Handler): schema = T.StructType( [ T.StructField("job_id", T.StringType(), False), T.StructField("job_run_id", T.StringType(), False), T.StructField("task_id", T.StringType(), False), T.StructField("job_name", T.StringType(), True), T.StructField("task_name", T.StringType(), True), T.StructField("start", T.TimestampType(), True), T.StructField("end", T.TimestampType(), True), T.StructField("status", T.StringType(), True), T.StructField("params", T.StringType(), True), T.StructField("url", T.StringType(), True), T.StructField("trace", T.StringType(), True), ] ) def __init__(self, spark: SparkSession, path: str, level: int, delta_table_keys: Iterable[str]): super().__init__(level=level) self.spark = spark self.path = path self.table = DeltaTable.forPath(spark, path) self.delta_table_keys = delta_table_keys def emit(self, record: logging.LogRecord) -> None: """Emit function for delta table. Args: record (DataFrame): record to be written. """ try: msg = self.format(record=record) msg_sdf = self.spark.createDataFrame(msg, schema=self.schema) # type: ignore merge_condition = " and ".join([f"source.`{i}` = target.`{i}`" for i in self.delta_table_keys]) self.table.alias("target").merge( source=msg_sdf.alias("source"), condition=F.expr(merge_condition) ).whenMatchedUpdate( set={ "end": F.col("source.end"), "status": F.col("source.status"), "trace": F.col("source.trace"), } ).whenNotMatchedInsert( values={col: F.col(f"source.`{col}`") for col in msg_sdf.columns} ).execute() except Exception: self.handleError(record) self.close()
Ancestors
- logging.Handler
- logging.Filterer
Class variables
var schema
Methods
def emit(self, record: logging.LogRecord) ‑> None
-
Emit function for delta table.
Args
record
:DataFrame
- record to be written.