Module panama.logging.file_handler
Functions
def make_file_handler(config_run: ConfigRun, path: str, remote_path: str, level: int = 20) ‑> PanamaFileHandler
-
Function used to generate the file handler needed by a default panama logger.
Args
config_run
:ConfigRun
- config run used to pass defaults to record.
path
:str
- local path to a folder for json logs.
remote_path
:str
- path where the json file will be sent at the end of a task.
level
:int
, optional- default logging level. Defaults to logging.INFO.
Returns
PanamaFileHandler
- file handler for panama logger.
Classes
class PanamaFileHandler (path: str, task_name: str, remote_path: str, level: int = 20)
-
Handler for file logs.
Initialize the json handler
Args
path
:str
- path to the folder
- file_name (str) : name of the file where logging is written.
remote_path
:str
- the path to the remote adls where the json will be sent at the end of a task.
level
:int
, optional- minimum log level. Defaults to logging.INFO.
Expand source code
class PanamaFileHandler(logging.Handler): """Handler for file logs.""" def __init__(self, path: str, task_name: str, remote_path: str, level: int = logging.INFO): """Initialize the json handler Args: path (str): path to the folder file_name (str) : name of the file where logging is written. remote_path (str): the path to the remote adls where the json will be sent at the end of a task. level (int, optional): minimum log level. Defaults to logging.INFO. """ super().__init__(level=level) self.path = path self.remote_path = remote_path self.task_name = task_name self.start_time = datetime.astimezone(datetime.now(), tz=timezone("Europe/Rome")) self.file_name = f"{self.task_name.replace('_','')}_{self.start_time.strftime('%Y%m%d%H%M%S')}.json" self.set_local_path() self.file_pointer = None def _open_local_connection(self): """Method used to open connection in a local path""" self.file_pointer = open(self.local_path, "w+") def set_local_path(self, path_prefix=None): if path_prefix is None: if runtime() == "databricks": path_prefix = "/dbfs" else: path_prefix = os.getcwd() local_folder = os.path.join(path_prefix, self.path) if not os.path.exists(local_folder): os.makedirs(local_folder) # note: the file is not yet created. # the empty file is created when _open_local_connection is called self.local_path = os.path.join(local_folder, self.file_name) def emit(self, record: logging.LogRecord): """Method used to write logs to the target local json file.""" if self.file_pointer is None: self._open_local_connection() # format record log_entry = self.format(record=record) self.file_pointer.write(log_entry + "\n") # type: ignore self.file_pointer.flush() # type: ignore os.fsync(self.file_pointer.fileno()) # type: ignore def move_to_remote(self): """Method used to move logs to remote storage.""" spark = SparkSession.getActiveSession() dbutils = get_db_utils(spark) if self.file_pointer is not None: self.file_pointer.close() try: # handle non-databricks environment if runtime() == "databricks": source = f"file:{self.local_path}" else: source = f"{self.local_path}" dbutils.fs.mkdirs(self.remote_path) dbutils.fs.mv(source, self.remote_path, recurse=True) except Exception as e: if "java.io.FileNotFoundException" not in str(e): print(self.local_path, "not found.") raise e
Ancestors
- logging.Handler
- logging.Filterer
Methods
def emit(self, record: logging.LogRecord)
-
Method used to write logs to the target local json file.
def move_to_remote(self)
-
Method used to move logs to remote storage.
def set_local_path(self, path_prefix=None)
class fileFilter (config_run: ConfigRun)
-
Filter instances are used to perform arbitrary filtering of LogRecords.
Loggers and Handlers can optionally use Filter instances to filter records as desired. The base filter class only allows events which are below a certain point in the logger hierarchy. For example, a filter initialized with "A.B" will allow events logged by loggers "A.B", "A.B.C", "A.B.C.D", "A.B.D" etc. but not "A.BB", "B.A.B" etc. If initialized with the empty string, all events are passed.
Initialize a filter.
Initialize with the name of the logger which, together with its children, will have its events allowed through the filter. If no name is specified, allow every event.
Expand source code
class fileFilter(logging.Filter): def __init__(self, config_run: ConfigRun): self.config_run = config_run def _set_defaults(self, record): record.job_id = self.config_run.job_id record.job_run_id = self.config_run.job_run_id record.task_id = self.config_run.task_id def filter(self, record): self._set_defaults(record) return record.levelno < 99
Ancestors
- logging.Filter
Methods
def filter(self, record)
-
Determine if the specified record is to be logged.
Returns True if the record should be logged, or False otherwise. If deemed appropriate, the record may be modified in-place.
class fileFormatter
-
Formatter instances are used to convert a LogRecord to text.
Formatters need to know how a LogRecord is constructed. They are responsible for converting a LogRecord to (usually) a string which can be interpreted by either a human or an external system. The base Formatter allows a formatting string to be specified. If none is supplied, the style-dependent default value, "%(message)s", "{message}", or "${message}", is used.
The Formatter can be initialized with a format string which makes use of knowledge of the LogRecord attributes - e.g. the default value mentioned above makes use of the fact that the user's message and arguments are pre- formatted into a LogRecord's message attribute. Currently, the useful attributes in a LogRecord are described by:
%(name)s Name of the logger (logging channel) %(levelno)s Numeric logging level for the message (DEBUG, INFO, WARNING, ERROR, CRITICAL) %(levelname)s Text logging level for the message ("DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL") %(pathname)s Full pathname of the source file where the logging call was issued (if available) %(filename)s Filename portion of pathname %(module)s Module (name portion of filename) %(lineno)d Source line number where the logging call was issued (if available) %(funcName)s Function name %(created)f Time when the LogRecord was created (time.time() return value) %(asctime)s Textual time when the LogRecord was created %(msecs)d Millisecond portion of the creation time %(relativeCreated)d Time in milliseconds when the LogRecord was created, relative to the time the logging module was loaded (typically at application startup time) %(thread)d Thread ID (if available) %(threadName)s Thread name (if available) %(process)d Process ID (if available) %(message)s The result of record.getMessage(), computed just as the record is emitted
Initialize the formatter with specified format strings.
Initialize the formatter either with the specified format string, or a default as described above. Allow for specialized date formatting with the optional datefmt argument. If datefmt is omitted, you get an ISO8601-like (or RFC 3339-like) format.
Use a style parameter of '%', '{' or '$' to specify that you want to use one of %-formatting, :meth:
str.format
({}
) formatting or :class:string.Template
formatting in your format string.Changed in version: 3.2
Added the
style
parameter.Expand source code
class fileFormatter(logging.Formatter): def __init__(self): super().__init__(fmt="%(asctime)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S") def format(self, record): """Function to format the record as required by panama. Args: record (LogRecord): input record from logger. Returns: str: string to write to file. """ super().format(record) # get trace if present if hasattr(record, "trace"): trace_content = record.trace # type: ignore elif record.exc_text is not None: trace_content = record.exc_text else: trace_content = None # get extra content extra_content = getattr(record, "extra", dict()) if len(extra_content) > 0: extra_content = json.dumps(extra_content) else: extra_content = None # populate message content msg = { "asctime": record.asctime, "levelname": record.levelname, "job_id": record.job_id, # type: ignore "job_run_id": record.job_run_id, # type: ignore "task_id": record.task_id, # type: ignore "step_id": record.message, } # if trace is not none, add trace if trace_content is not None: msg["trace"] = trace_content # if extra content is not none, add extra content if extra_content is not None: msg["extras"] = extra_content return json.dumps(msg)
Ancestors
- logging.Formatter
Methods
def format(self, record)
-
Function to format the record as required by panama.
Args
record
:LogRecord
- input record from logger.
Returns
str
- string to write to file.