Module panama.logging.delta_handler
Functions
def make_delta_handler(config_run: ConfigRun,
delta_folder: str,
level: int) ‑> deltaHandler-
Expand source code
def make_delta_handler(config_run: ConfigRun, delta_folder: str, level: int) -> deltaHandler: """Function used to create the required delta handler. Args: config_run (ConfigRun): config run for the handler. delta_folder (str): path to delta table. level (int): default logging level. Returns: deltaHandler: handler for delta logs. """ delta_filter = deltaFilter(config_run) delta_formatter = deltaFormatter() delta_formatter.converter = time_converter delta_handler = deltaHandler( spark=config_run.spark, path=delta_folder, delta_table_keys=config_run.defaults.keys(), level=level, ) delta_handler.setFormatter(delta_formatter) delta_handler.addFilter(delta_filter) return delta_handler
Function used to create the required delta handler.
Args
config_run
:ConfigRun
- config run for the handler.
delta_folder
:str
- path to delta table.
level
:int
- default logging level.
Returns
deltaHandler
- handler for delta logs.
Classes
class deltaFilter (config_run: ConfigRun)
-
Expand source code
class deltaFilter(logging.Filter): """Filter for delta logging. Sets defaults and passes to handler only records having level above 99.""" def __init__(self, config_run: ConfigRun): self.config_run = config_run def _set_defaults(self, record): record.job_id = self.config_run.job_id record.job_run_id = self.config_run.job_run_id record.task_id = self.config_run.task_id record.job_name = self.config_run.job_name record.task_name = self.config_run.task_name record.url = self.config_run.url record.task_name = self.config_run.task_name def filter(self, record): self._set_defaults(record) return record.levelno >= 99
Filter for delta logging. Sets defaults and passes to handler only records having level above 99.
Initialize a filter.
Initialize with the name of the logger which, together with its children, will have its events allowed through the filter. If no name is specified, allow every event.
Ancestors
- logging.Filter
Methods
def filter(self, record)
-
Expand source code
def filter(self, record): self._set_defaults(record) return record.levelno >= 99
Determine if the specified record is to be logged.
Returns True if the record should be logged, or False otherwise. If deemed appropriate, the record may be modified in-place.
class deltaFormatter
-
Expand source code
class deltaFormatter(logging.Formatter): def __init__(self): super().__init__(fmt="%(message)s - %(asctime)s", datefmt="%Y-%m-%d %H:%M:%S") def format(self, record): super().format(record) msg = { "job_id": record.job_id, # type: ignore "job_run_id": record.job_run_id, # type: ignore "task_id": record.task_id, # type: ignore "job_name": record.job_name, # type: ignore "task_name": record.task_name, # type: ignore "params": getattr(record, "extra", None), # type: ignore "url": record.url, # type: ignore "status": record.msg, "trace": record.exc_text, } if record.levelno == 99: msg["start"] = datetime.strptime(record.asctime, self.datefmt) # type: ignore elif record.levelno == 100: msg["end"] = datetime.strptime(record.asctime, self.datefmt) # type: ignore return [msg]
Formatter instances are used to convert a LogRecord to text.
Formatters need to know how a LogRecord is constructed. They are responsible for converting a LogRecord to (usually) a string which can be interpreted by either a human or an external system. The base Formatter allows a formatting string to be specified. If none is supplied, the style-dependent default value, "%(message)s", "{message}", or "${message}", is used.
The Formatter can be initialized with a format string which makes use of knowledge of the LogRecord attributes - e.g. the default value mentioned above makes use of the fact that the user's message and arguments are pre- formatted into a LogRecord's message attribute. Currently, the useful attributes in a LogRecord are described by:
%(name)s Name of the logger (logging channel) %(levelno)s Numeric logging level for the message (DEBUG, INFO, WARNING, ERROR, CRITICAL) %(levelname)s Text logging level for the message ("DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL") %(pathname)s Full pathname of the source file where the logging call was issued (if available) %(filename)s Filename portion of pathname %(module)s Module (name portion of filename) %(lineno)d Source line number where the logging call was issued (if available) %(funcName)s Function name %(created)f Time when the LogRecord was created (time.time() return value) %(asctime)s Textual time when the LogRecord was created %(msecs)d Millisecond portion of the creation time %(relativeCreated)d Time in milliseconds when the LogRecord was created, relative to the time the logging module was loaded (typically at application startup time) %(thread)d Thread ID (if available) %(threadName)s Thread name (if available) %(process)d Process ID (if available) %(message)s The result of record.getMessage(), computed just as the record is emitted
Initialize the formatter with specified format strings.
Initialize the formatter either with the specified format string, or a default as described above. Allow for specialized date formatting with the optional datefmt argument. If datefmt is omitted, you get an ISO8601-like (or RFC 3339-like) format.
Use a style parameter of '%', '{' or '$' to specify that you want to use one of %-formatting, :meth:
str.format
({}
) formatting or :class:string.Template
formatting in your format string.Changed in version: 3.2
Added the
style
parameter.Ancestors
- logging.Formatter
Methods
def format(self, record)
-
Expand source code
def format(self, record): super().format(record) msg = { "job_id": record.job_id, # type: ignore "job_run_id": record.job_run_id, # type: ignore "task_id": record.task_id, # type: ignore "job_name": record.job_name, # type: ignore "task_name": record.task_name, # type: ignore "params": getattr(record, "extra", None), # type: ignore "url": record.url, # type: ignore "status": record.msg, "trace": record.exc_text, } if record.levelno == 99: msg["start"] = datetime.strptime(record.asctime, self.datefmt) # type: ignore elif record.levelno == 100: msg["end"] = datetime.strptime(record.asctime, self.datefmt) # type: ignore return [msg]
Format the specified record as text.
The record's attribute dictionary is used as the operand to a string formatting operation which yields the returned string. Before formatting the dictionary, a couple of preparatory steps are carried out. The message attribute of the record is computed using LogRecord.getMessage(). If the formatting string uses the time (as determined by a call to usesTime(), formatTime() is called to format the event time. If there is exception information, it is formatted using formatException() and appended to the message.
class deltaHandler (spark: pyspark.sql.session.SparkSession,
path: str,
level: int,
delta_table_keys: Iterable[str])-
Expand source code
class deltaHandler(logging.Handler): schema = T.StructType( [ T.StructField("job_id", T.StringType(), False), T.StructField("job_run_id", T.StringType(), False), T.StructField("task_id", T.StringType(), False), T.StructField("job_name", T.StringType(), True), T.StructField("task_name", T.StringType(), True), T.StructField("start", T.TimestampType(), True), T.StructField("end", T.TimestampType(), True), T.StructField("status", T.StringType(), True), T.StructField("params", T.StringType(), True), T.StructField("url", T.StringType(), True), T.StructField("trace", T.StringType(), True), ] ) def __init__(self, spark: SparkSession, path: str, level: int, delta_table_keys: Iterable[str]): super().__init__(level=level) self.spark = spark self.path = path self.table = DeltaTable.forPath(spark, path) self.delta_table_keys = delta_table_keys def emit(self, record: logging.LogRecord) -> None: """Emit function for delta table. Args: record (DataFrame): record to be written. """ try: msg = self.format(record=record) msg_sdf = self.spark.createDataFrame(msg, schema=self.schema) # type: ignore merge_condition = " and ".join([f"source.`{i}` = target.`{i}`" for i in self.delta_table_keys]) self.table.alias("target").merge( source=msg_sdf.alias("source"), condition=F.expr(merge_condition) ).whenMatchedUpdate( set={ "end": F.col("source.end"), "status": F.col("source.status"), "trace": F.col("source.trace"), } ).whenNotMatchedInsert( values={col: F.col(f"source.`{col}`") for col in msg_sdf.columns} ).execute() except Exception: self.handleError(record) self.close()
Handler instances dispatch logging events to specific destinations.
The base handler class. Acts as a placeholder which defines the Handler interface. Handlers can optionally use Formatter instances to format records as desired. By default, no formatter is specified; in this case, the 'raw' message as determined by record.message is logged.
Initializes the instance - basically setting the formatter to None and the filter list to empty.
Ancestors
- logging.Handler
- logging.Filterer
Class variables
var schema
Methods
def emit(self, record: logging.LogRecord) ‑> None
-
Expand source code
def emit(self, record: logging.LogRecord) -> None: """Emit function for delta table. Args: record (DataFrame): record to be written. """ try: msg = self.format(record=record) msg_sdf = self.spark.createDataFrame(msg, schema=self.schema) # type: ignore merge_condition = " and ".join([f"source.`{i}` = target.`{i}`" for i in self.delta_table_keys]) self.table.alias("target").merge( source=msg_sdf.alias("source"), condition=F.expr(merge_condition) ).whenMatchedUpdate( set={ "end": F.col("source.end"), "status": F.col("source.status"), "trace": F.col("source.trace"), } ).whenNotMatchedInsert( values={col: F.col(f"source.`{col}`") for col in msg_sdf.columns} ).execute() except Exception: self.handleError(record) self.close()
Emit function for delta table.
Args
record
:DataFrame
- record to be written.