Module panama.logging
Sub-modules
- panama.logging.config_run
- panama.logging.decorators
- panama.logging.delta_handler
- panama.logging.file_handler
- panama.logging.logger
- panama.logging.magic
- panama.logging.messages
Functions
- def get_adls_name() ‑> str
- 
Expand source codedef get_adls_name() -> str: """Get the adls name. Returns: str: current log adls name. """ return _ADLS_NAMEGet the adls name. Returns- str
- current log adls name.
 
- def get_container() ‑> str
- 
Expand source codedef get_container() -> str: """Get the container name. Returns: str: current log container name. """ return _CONTAINERGet the container name. Returns- str
- current log container name.
 
- def get_delta_folder() ‑> str
- 
Expand source codedef get_delta_folder() -> str: """Get the delta folder. Returns: str: current delta log folder. """ return _DELTA_FOLDERGet the delta folder. Returns- str
- current delta log folder.
 
- def get_json_folder() ‑> str
- 
Expand source codedef get_json_folder() -> str: """Get the json folder. Returns: str: current json log folder. """ return _JSON_FOLDERGet the json folder. Returns- str
- current json log folder.
 
- def get_logger(context_manager: ContextManager | None = None,
 name: str | None = None,
 level: int = 20,
 delta_path: str | None = None,
 json_path: str | None = None,
 json_remote_path: str | None = None) ‑> PanamaAdapter
- 
Expand source codedef get_logger( context_manager: Union[ContextManager, None] = None, name: Union[str, None] = None, level: int = logging.INFO, delta_path: Union[str, None] = None, json_path: Union[str, None] = None, json_remote_path: Union[str, None] = None, ) -> PanamaAdapter: """Main function used to get a panama logger. Args: context_manager (Union[ContextManager, None], optional): ContextManager holding the contextual information. If None a default context manager is created, assuming a spark session is active. Defaults to None. name (Union[str, None], optional): logger name. If None, the root logger is used. Defaults to None. level (int, optional): log level. Defaults to logging.INFO. delta_path (Union[str, None], optional): path to delta folder. If None a default value is generated. Defaults to None. json_path (Union[str, None], optional): path to json folder. If None a default value is generated. Defaults to None. Raises: ValueError: if no context_manager is passed and no spark session is active, a ValueError is raised. Returns: PanamaAdapter: panama logger with the required handlers. """ # if no context manager is passed, try to generate one. if context_manager is None: context_manager = get_context_manager() # generate config run config_run = ConfigRun(context_manager=context_manager).generate_config() # type: ignore # use a default name for low-level logger if name is None: name = "panama_logger" panama_logger_name = f"PANAMA_{config_run.task_name}_{name}" # if the panama logger already exists, do not generate another one logger = logging.root.manager.loggerDict.get(panama_logger_name) if isinstance(logger, PanamaAdapter): return logger # get connection elements names adls_name = get_adls_name() container = get_container() # create delta path if delta_path is None: delta_path = get_delta_folder() delta_path = _make_path(spark=context_manager.spark, adls_name=adls_name, container=container, path=delta_path) # type: ignore # generate remote json file folder start_date = datetime.astimezone(datetime.now(), tz=timezone("Europe/Rome")).date().strftime("%Y-%m-%d") # create json path if json_path is None: json_path = get_json_folder() # create remote json path if json_remote_path is None: remote_json_folder = f"{json_path}/{start_date}" json_remote_path = _make_path( spark=context_manager.spark, adls_name=adls_name, container=container, path=remote_json_folder # type: ignore ) else: json_remote_path = os.path.join(json_remote_path, start_date) # create basic logger and set level logger = logging.getLogger(name=name) logger.setLevel(level) # generate handlers delta_handler = make_delta_handler(config_run=config_run, delta_folder=delta_path, level=level) json_handler = make_file_handler(config_run=config_run, path=json_path, level=level, remote_path=json_remote_path) # add handlers to logger logger.addHandler(delta_handler) logger.addHandler(json_handler) # wrap logger with Panama Adapter panama_logger = PanamaAdapter(logger=logger, config_run=config_run) # add the panama adapter to the panama logger dict logger.manager.loggerDict[panama_logger_name] = panama_logger # type: ignore return panama_loggerMain function used to get a panama logger. Args- context_manager:- Union[ContextManager, None], optional
- ContextManager holding the contextual information. If None
- a default context manager is created, assuming a spark session is active. Defaults to None.
- name:- Union[str, None], optional
- logger name. If None, the root logger is used. Defaults to None.
- level:- int, optional
- log level. Defaults to logging.INFO.
- delta_path:- Union[str, None], optional
- path to delta folder. If None a default value is generated. Defaults to None.
- json_path:- Union[str, None], optional
- path to json folder. If None a default value is generated. Defaults to None.
 Raises- ValueError
- if no context_manager is passed and no spark session is active, a ValueError is raised.
 Returns- PanamaAdapter
- panama logger with the required handlers.
 
- def log_execution(logger_name: str | None = None, level: int = 20, blocking: bool = True)
- 
Expand source codedef log_execution(logger_name: Union[str, None] = None, level: int = logging.INFO, blocking: bool = True): """Decorator function, used to retrieve the logger by logger name and log the operations of a function. Args: logger_name (Union[str, None], optional): name of the logger to retrieve. If None, the root logger is used. Defaults to None. level (int, optional): level of the log step. Defaults to panama_log.INFO. blocking (bool, optional): flag that indicates if the task must be closed and the error raised. Defaults to True. """ def wrapper(func: Callable) -> Callable: @functools.wraps(func) def inner(*args: Any, **kwargs: Any) -> Any: # get status of the logging active_log = panama.logging.get_active_log() # if level logging is not activated, do not log stuff if active_log >= 99: return func(*args, **kwargs) # get the current context manager cm = get_context_manager() # get the current logger logger = panama.logging.get_logger(name=logger_name, context_manager=cm) # generate step name as the func name step_id = func.__name__ # format logging args and kwargs extra = _keep_default_type_args_kwargs(*args, **kwargs) # try-catch for logging: warnings are handled try: result = func(*args, **kwargs) logger.log(level=level, msg=step_id, extra=extra) return result except Exception as e: if isinstance(e, Warning): logger.warning(step_id, extra=extra, exc_info=e) else: logger.error(step_id, extra, exc_info=e) if blocking is True: raise e return inner return wrapperDecorator function, used to retrieve the logger by logger name and log the operations of a function. Args- logger_name:- Union[str, None], optional
- name of the logger to retrieve. If None, the root logger is used. Defaults to None.
- level:- int, optional
- level of the log step. Defaults to panama_log.INFO.
- blocking:- bool, optional
- flag that indicates if the task must be closed and the error raised. Defaults to True.
 
- def set_adls_name(adls_name: str) ‑> None
- 
Expand source codedef set_adls_name(adls_name: str) -> None: """Set the adls name. Args: adls (str): adls name. """ global _ADLS_NAME _ADLS_NAME = adls_nameSet the adls name. Args- adls:- str
- adls name.
 
- def set_container(container: str) ‑> None
- 
Expand source codedef set_container(container: str) -> None: """Set the container name. Args: container (str): container name. """ global _CONTAINER _CONTAINER = containerSet the container name. Args- container:- str
- container name.
 
- def set_delta_folder(folder: str) ‑> None
- 
Expand source codedef set_delta_folder(folder: str) -> None: """Set the delta folder name. Args: folder (str): delta folder name. """ global _DELTA_FOLDER _DELTA_FOLDER = folderSet the delta folder name. Args- folder:- str
- delta folder name.
 
- def set_json_folder(folder: str) ‑> None
- 
Expand source codedef set_json_folder(folder: str) -> None: """Set the json folder name. Args: folder (str): json folder name. """ global _JSON_FOLDER _JSON_FOLDER = folderSet the json folder name. Args- folder:- str
- json folder name.
 
Classes
- class ConfigRun (context_manager: ContextManager)
- 
Expand source codeclass ConfigRun: """Class used to initialize the data of a job run. Attributes: dbutils_data (dict): data extracted from dbutils. If dbutils is missing, an empty dictionary is generated. spark (SparkSession): current spark session. """ def __init__(self, context_manager: ContextManager): """Initialize the ConfigRun object. dbutils_data is generated. Args: spark (Union[SparkSession, None]): current spark session. If None, the spark session is automatically fetched. Default is None. """ self.context_manager = context_manager self.spark = self.context_manager.spark self._set_dbutils_data() def _set_default(self, with_timestamp: bool = False): try: default = os.path.basename(sys.argv[0]) except: try: default = __file__ except: default = str(uuid.uuid4()) if with_timestamp is True: ts = datetime.astimezone(datetime.now(), tz=timezone("Europe/Rome")).strftime("%Y_%m_%d__%H_%M_%S") default = "@".join([default, ts]) return default def _set_dbutils_data(self): """Method used to try to set the dbutils data. If it fails no error is raised, and dbutils_data are set as empty.""" self.dbutils = get_db_utils(self.spark) try: dbutils_data = self.dbutils.notebook.entry_point.getDbutils().notebook().getContext().toJson() # type: ignore self.dbutils_data = json.loads(dbutils_data)["tags"] except: # (py4j.protocol.Py4JJavaError, NameError, ValueError) as e: print("No dbutils found. Cannot import data from dbutils.") self.dbutils_data = dict() def get_from_dbutils(self, key: str, default: Union[str, None] = None) -> Union[str, None]: """Method used to extract a value from dbutils_data. Args: key (str): name of the parameter to extract. default (Union[str, None], optional): default value if no parameter is found. Defaults to None. Returns: Union[str, None]: content of dbutils_data for the required key. """ return self.dbutils_data.get(key, default) def get_from_databricks_conf(self, key: str, default: Union[str, None] = None) -> Union[str, None]: """Method used to extract a value from spark configuration. Args: key (str): name of the parameter to extract. default (Union[str, None], optional): default value if no parameter is found. Defaults to None. Returns: Union[str, None]: content of spark configuration for the required key. """ return self.spark.conf.get(key, default) def get_job_id(self) -> Union[str, None]: """Method used to get the job_id from the dbutils_data. if no job_id is found, returns current notebook name if dbutils is available, otherwise current file name. If also current file name is not available, a uuid4 is generated. Returns: Union[str, None]: value of jobId """ default = self._set_default() return self.get_from_dbutils("jobId", default) def get_job_run_id(self) -> Union[str, None]: """Method used to get the job_run_id from the dbutils_data. If no job_run_id is found, returns current notebook name if dbutils is available, otherwise current file name. If also current file name is not available, a uuid4 is generated. Returns: Union[str, None]: value of jobId """ default = self._set_default(with_timestamp=True) return self.get_from_dbutils("multitaskParentRunId", default) def get_task_id(self) -> Union[str, None]: """Method used to get the task_id from the dbutils_data. If no job_run_id is found, returns current notebook name if dbutils is available, otherwise current file name. If also current file name is not available, a uuid4 is generated. Returns: Union[str, None]: value of jobId """ default = self._set_default() return self.get_from_dbutils("runId", default) def _get_workspace_url_root(self, default: Union[str, None] = None) -> Union[str, None]: """Method used to get the workspace url root. Args: default (Union[str, None], optional): default value if no value is found.. Defaults to None. Returns: Union[str, None]: root for the notebook link. """ workspace_url_root = self.get_from_dbutils("browserHostName", default) if workspace_url_root is None: workspace_url_root = self.get_from_databricks_conf("spark.databricks.workspaceUrl", default) return workspace_url_root def get_workspace(self) -> Union[str, None]: """Method used to get the org_id from the dbutils_data. Returns None if no org_id is found. Returns: Union[str, None]: value of org_id """ return self.get_from_dbutils("orgId") def get_job_name(self) -> Union[str, None]: """Method used to get the current job_name from dbutils_data. Returns None if no jobName is found. Returns: Union[str, None]: value of job_name """ return self.get_from_dbutils("jobName") def get_task_name(self) -> str: """Method used to get the current task_name from notebook info. Returns sys.argv[0] as default if an error is raised, otherwise the notebook path. Returns: str: value of the taskName """ try: path = self.dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get() # type: ignore except: path = sys.argv[0] default = os.path.basename(path).split(".")[0] return self.get_from_dbutils("taskName", default) # type: ignore def get_widgets(self) -> Mapping[str, object]: """Method used to get the widgets content. Returns: Union[str, None]: string of the widgets, repr of a dictionary. """ widgets = self.context_manager.__dict__ # remove the spark attribute widgets = {k: v for k, v in widgets.items() if k != "spark"} return widgets def get_url(self) -> str: """Method used to generate an url to a notebook. Args: job_id (str): id of the current job. task_id (str): id of the current task Returns: str: generated url. Id no url is generated, the string 'No job run associated' is returned. """ url_root = self._get_workspace_url_root() workspace = self.get_workspace() if url_root is not None: url = f"https://{url_root}?o={workspace}#job/{self.job_id}/run/{self.task_id}" else: url = "No job run associated" return url def generate_config(self): """Method used to generate a default job run configuration. The following attributes are set: job_id str job_run_id str task_id str job_name Union[str, None] task_name Union[str, None] url str widgets str defaults Dict[str, str] """ self.job_id = self.get_job_id() self.job_run_id = self.get_job_run_id() self.task_id = self.get_task_id() self.job_name = self.get_job_name() self.task_name = self.get_task_name() self.url = self.get_url() self.widgets = self.get_widgets() self.defaults = {"job_id": self.job_id, "job_run_id": self.job_run_id, "task_id": self.task_id} return selfClass used to initialize the data of a job run. Attributes- dbutils_data:- dict
- data extracted from dbutils. If dbutils is missing, an empty dictionary is generated.
- spark:- SparkSession
- current spark session.
 Initialize the ConfigRun object. dbutils_data is generated. Args- spark:- Union[SparkSession, None]
- current spark session. If None, the spark session is automatically fetched. Default is None.
 Methods- def generate_config(self)
- 
Expand source codedef generate_config(self): """Method used to generate a default job run configuration. The following attributes are set: job_id str job_run_id str task_id str job_name Union[str, None] task_name Union[str, None] url str widgets str defaults Dict[str, str] """ self.job_id = self.get_job_id() self.job_run_id = self.get_job_run_id() self.task_id = self.get_task_id() self.job_name = self.get_job_name() self.task_name = self.get_task_name() self.url = self.get_url() self.widgets = self.get_widgets() self.defaults = {"job_id": self.job_id, "job_run_id": self.job_run_id, "task_id": self.task_id} return selfMethod used to generate a default job run configuration. The following attributes are set: job_id str job_run_id str task_id str job_name Union[str, None] task_name Union[str, None] url str widgets str defaults Dict[str, str]
- def get_from_databricks_conf(self, key: str, default: str | None = None) ‑> str | None
- 
Expand source codedef get_from_databricks_conf(self, key: str, default: Union[str, None] = None) -> Union[str, None]: """Method used to extract a value from spark configuration. Args: key (str): name of the parameter to extract. default (Union[str, None], optional): default value if no parameter is found. Defaults to None. Returns: Union[str, None]: content of spark configuration for the required key. """ return self.spark.conf.get(key, default)Method used to extract a value from spark configuration. Args- key:- str
- name of the parameter to extract.
- default:- Union[str, None], optional
- default value if no parameter is found. Defaults to None.
 Returns- Union[str, None]
- content of spark configuration for the required key.
 
- def get_from_dbutils(self, key: str, default: str | None = None) ‑> str | None
- 
Expand source codedef get_from_dbutils(self, key: str, default: Union[str, None] = None) -> Union[str, None]: """Method used to extract a value from dbutils_data. Args: key (str): name of the parameter to extract. default (Union[str, None], optional): default value if no parameter is found. Defaults to None. Returns: Union[str, None]: content of dbutils_data for the required key. """ return self.dbutils_data.get(key, default)Method used to extract a value from dbutils_data. Args- key:- str
- name of the parameter to extract.
- default:- Union[str, None], optional
- default value if no parameter is found. Defaults to None.
 Returns- Union[str, None]
- content of dbutils_data for the required key.
 
- def get_job_id(self) ‑> str | None
- 
Expand source codedef get_job_id(self) -> Union[str, None]: """Method used to get the job_id from the dbutils_data. if no job_id is found, returns current notebook name if dbutils is available, otherwise current file name. If also current file name is not available, a uuid4 is generated. Returns: Union[str, None]: value of jobId """ default = self._set_default() return self.get_from_dbutils("jobId", default)Method used to get the job_id from the dbutils_data. if no job_id is found, returns current notebook name if dbutils is available, otherwise current file name. If also current file name is not available, a uuid4 is generated. Returns- Union[str, None]
- value of jobId
 
- def get_job_name(self) ‑> str | None
- 
Expand source codedef get_job_name(self) -> Union[str, None]: """Method used to get the current job_name from dbutils_data. Returns None if no jobName is found. Returns: Union[str, None]: value of job_name """ return self.get_from_dbutils("jobName")Method used to get the current job_name from dbutils_data. Returns None if no jobName is found. Returns- Union[str, None]
- value of job_name
 
- def get_job_run_id(self) ‑> str | None
- 
Expand source codedef get_job_run_id(self) -> Union[str, None]: """Method used to get the job_run_id from the dbutils_data. If no job_run_id is found, returns current notebook name if dbutils is available, otherwise current file name. If also current file name is not available, a uuid4 is generated. Returns: Union[str, None]: value of jobId """ default = self._set_default(with_timestamp=True) return self.get_from_dbutils("multitaskParentRunId", default)Method used to get the job_run_id from the dbutils_data. If no job_run_id is found, returns current notebook name if dbutils is available, otherwise current file name. If also current file name is not available, a uuid4 is generated. Returns- Union[str, None]
- value of jobId
 
- def get_task_id(self) ‑> str | None
- 
Expand source codedef get_task_id(self) -> Union[str, None]: """Method used to get the task_id from the dbutils_data. If no job_run_id is found, returns current notebook name if dbutils is available, otherwise current file name. If also current file name is not available, a uuid4 is generated. Returns: Union[str, None]: value of jobId """ default = self._set_default() return self.get_from_dbutils("runId", default)Method used to get the task_id from the dbutils_data. If no job_run_id is found, returns current notebook name if dbutils is available, otherwise current file name. If also current file name is not available, a uuid4 is generated. Returns- Union[str, None]
- value of jobId
 
- def get_task_name(self) ‑> str
- 
Expand source codedef get_task_name(self) -> str: """Method used to get the current task_name from notebook info. Returns sys.argv[0] as default if an error is raised, otherwise the notebook path. Returns: str: value of the taskName """ try: path = self.dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get() # type: ignore except: path = sys.argv[0] default = os.path.basename(path).split(".")[0] return self.get_from_dbutils("taskName", default) # type: ignoreMethod used to get the current task_name from notebook info. Returns sys.argv[0] as default if an error is raised, otherwise the notebook path. Returns- str
- value of the taskName
 
- def get_url(self) ‑> str
- 
Expand source codedef get_url(self) -> str: """Method used to generate an url to a notebook. Args: job_id (str): id of the current job. task_id (str): id of the current task Returns: str: generated url. Id no url is generated, the string 'No job run associated' is returned. """ url_root = self._get_workspace_url_root() workspace = self.get_workspace() if url_root is not None: url = f"https://{url_root}?o={workspace}#job/{self.job_id}/run/{self.task_id}" else: url = "No job run associated" return urlMethod used to generate an url to a notebook. Args- job_id:- str
- id of the current job.
- task_id:- str
- id of the current task
 Returns- str
- generated url. Id no url is generated, the string 'No job run associated' is returned.
 
- def get_widgets(self) ‑> Mapping[str, object]
- 
Expand source codedef get_widgets(self) -> Mapping[str, object]: """Method used to get the widgets content. Returns: Union[str, None]: string of the widgets, repr of a dictionary. """ widgets = self.context_manager.__dict__ # remove the spark attribute widgets = {k: v for k, v in widgets.items() if k != "spark"} return widgetsMethod used to get the widgets content. Returns- Union[str, None]
- string of the widgets, repr of a dictionary.
 
- def get_workspace(self) ‑> str | None
- 
Expand source codedef get_workspace(self) -> Union[str, None]: """Method used to get the org_id from the dbutils_data. Returns None if no org_id is found. Returns: Union[str, None]: value of org_id """ return self.get_from_dbutils("orgId")Method used to get the org_id from the dbutils_data. Returns None if no org_id is found. Returns- Union[str, None]
- value of org_id
 
 
- class PanamaAdapter (logger: logging.Logger,
 config_run: ConfigRun)
- 
Expand source codeclass PanamaAdapter(logging.LoggerAdapter): """Panama adapter to add contextual information and create the methods start_task, end_task and log_step.""" def __init__(self, logger: logging.Logger, config_run: ConfigRun): """Initialize adapter. Args: logger (logging.Logger): underlying logger object. config_run (ConfigRun): config run, with default data. """ self.config_run = config_run super().__init__(logger=logger, extra=config_run.widgets) def process(self, msg, kwargs): """Add to extra the required information. Deals with trace parameter. Args: msg (str): logged message content. kwargs (dict): log call arguments. Returns: (str, dict): msg and kwargs handled. """ forbidden = ["exc_info", "stack_info", "logger"] extra = {k: str(v) for k, v in kwargs.items() if k not in forbidden} # add trace kwarg to extra if isinstance(kwargs.get("exc_info"), str): extra["trace"] = kwargs["exc_info"] kwargs["extra"] = extra # remove trace from kwargs, as it has been added to extras kwargs.pop("trace", None) return msg, kwargs def move_json_to_remote(self): """Method used to move logs to remote storage.""" # get task name from config_run task_name = self.config_run.task_name # get logger_dict from manager logger_dict = self.logger.manager.loggerDict.items() # keep panama loggers related to the task loggers = [v for k, v in logger_dict if k.startswith(f"PANAMA_{task_name}")] # get file handlers for every logger and stash the file for l in loggers: for h in l.logger.handlers: if isinstance(h, PanamaFileHandler): h.move_to_remote() def start_task(self, **kwargs): """Method used to open a task record.""" self.log(level=99, msg="RUNNING", extra=kwargs, stack_info=False) def end_task(self, trace: Union[Exception, None] = None, move_file: bool = True): """Method used to close a task record.""" if trace is None: self.log(level=100, msg="OK") else: self.log(level=100, msg="KO", exc_info=trace) # wait a bit if some files have not been closed time.sleep(10) if move_file is True: # move task json to remote self.move_json_to_remote() def log_step(self, step_id: str, level: int = logging.INFO, trace: Union[Exception, str, None] = None, **extras): """Method used to log a step.""" self.log(level=level, msg=step_id, extra=extras, exc_info=trace) # type: ignorePanama adapter to add contextual information and create the methods start_task, end_task and log_step. Initialize adapter. Args- logger:- logging.Logger
- underlying logger object.
- config_run:- ConfigRun
- config run, with default data.
 Ancestors- logging.LoggerAdapter
 Methods- def end_task(self, trace: Exception | None = None, move_file: bool = True)
- 
Expand source codedef end_task(self, trace: Union[Exception, None] = None, move_file: bool = True): """Method used to close a task record.""" if trace is None: self.log(level=100, msg="OK") else: self.log(level=100, msg="KO", exc_info=trace) # wait a bit if some files have not been closed time.sleep(10) if move_file is True: # move task json to remote self.move_json_to_remote()Method used to close a task record. 
- def log_step(self, step_id: str, level: int = 20, trace: Exception | str | None = None, **extras)
- 
Expand source codedef log_step(self, step_id: str, level: int = logging.INFO, trace: Union[Exception, str, None] = None, **extras): """Method used to log a step.""" self.log(level=level, msg=step_id, extra=extras, exc_info=trace) # type: ignoreMethod used to log a step. 
- def move_json_to_remote(self)
- 
Expand source codedef move_json_to_remote(self): """Method used to move logs to remote storage.""" # get task name from config_run task_name = self.config_run.task_name # get logger_dict from manager logger_dict = self.logger.manager.loggerDict.items() # keep panama loggers related to the task loggers = [v for k, v in logger_dict if k.startswith(f"PANAMA_{task_name}")] # get file handlers for every logger and stash the file for l in loggers: for h in l.logger.handlers: if isinstance(h, PanamaFileHandler): h.move_to_remote()Method used to move logs to remote storage. 
- def process(self, msg, kwargs)
- 
Expand source codedef process(self, msg, kwargs): """Add to extra the required information. Deals with trace parameter. Args: msg (str): logged message content. kwargs (dict): log call arguments. Returns: (str, dict): msg and kwargs handled. """ forbidden = ["exc_info", "stack_info", "logger"] extra = {k: str(v) for k, v in kwargs.items() if k not in forbidden} # add trace kwarg to extra if isinstance(kwargs.get("exc_info"), str): extra["trace"] = kwargs["exc_info"] kwargs["extra"] = extra # remove trace from kwargs, as it has been added to extras kwargs.pop("trace", None) return msg, kwargsAdd to extra the required information. Deals with trace parameter. Args- msg:- str
- logged message content.
- kwargs:- dict
- log call arguments.
 Returns(str, dict): msg and kwargs handled. 
- def start_task(self, **kwargs)
- 
Expand source codedef start_task(self, **kwargs): """Method used to open a task record.""" self.log(level=99, msg="RUNNING", extra=kwargs, stack_info=False)Method used to open a task record.