Module panama.logging
Sub-modules
panama.logging.config_run
panama.logging.decorators
panama.logging.delta_handler
panama.logging.file_handler
panama.logging.logger
panama.logging.magic
panama.logging.messages
Functions
def get_adls_name() ‑> str
-
Expand source code
def get_adls_name() -> str: """Get the adls name. Returns: str: current log adls name. """ return _ADLS_NAME
Get the adls name.
Returns
str
- current log adls name.
def get_container() ‑> str
-
Expand source code
def get_container() -> str: """Get the container name. Returns: str: current log container name. """ return _CONTAINER
Get the container name.
Returns
str
- current log container name.
def get_delta_folder() ‑> str
-
Expand source code
def get_delta_folder() -> str: """Get the delta folder. Returns: str: current delta log folder. """ return _DELTA_FOLDER
Get the delta folder.
Returns
str
- current delta log folder.
def get_json_folder() ‑> str
-
Expand source code
def get_json_folder() -> str: """Get the json folder. Returns: str: current json log folder. """ return _JSON_FOLDER
Get the json folder.
Returns
str
- current json log folder.
def get_logger(context_manager: ContextManager | None = None,
name: str | None = None,
level: int = 20,
delta_path: str | None = None,
json_path: str | None = None,
json_remote_path: str | None = None) ‑> PanamaAdapter-
Expand source code
def get_logger( context_manager: Union[ContextManager, None] = None, name: Union[str, None] = None, level: int = logging.INFO, delta_path: Union[str, None] = None, json_path: Union[str, None] = None, json_remote_path: Union[str, None] = None, ) -> PanamaAdapter: """Main function used to get a panama logger. Args: context_manager (Union[ContextManager, None], optional): ContextManager holding the contextual information. If None a default context manager is created, assuming a spark session is active. Defaults to None. name (Union[str, None], optional): logger name. If None, the root logger is used. Defaults to None. level (int, optional): log level. Defaults to logging.INFO. delta_path (Union[str, None], optional): path to delta folder. If None a default value is generated. Defaults to None. json_path (Union[str, None], optional): path to json folder. If None a default value is generated. Defaults to None. Raises: ValueError: if no context_manager is passed and no spark session is active, a ValueError is raised. Returns: PanamaAdapter: panama logger with the required handlers. """ # if no context manager is passed, try to generate one. if context_manager is None: context_manager = get_context_manager() # generate config run config_run = ConfigRun(context_manager=context_manager).generate_config() # type: ignore # use a default name for low-level logger if name is None: name = "panama_logger" panama_logger_name = f"PANAMA_{config_run.task_name}_{name}" # if the panama logger already exists, do not generate another one logger = logging.root.manager.loggerDict.get(panama_logger_name) if isinstance(logger, PanamaAdapter): return logger # get connection elements names adls_name = get_adls_name() container = get_container() # create delta path if delta_path is None: delta_path = get_delta_folder() delta_path = _make_path(spark=context_manager.spark, adls_name=adls_name, container=container, path=delta_path) # type: ignore # generate remote json file folder start_date = datetime.astimezone(datetime.now(), tz=timezone("Europe/Rome")).date().strftime("%Y-%m-%d") # create json path if json_path is None: json_path = get_json_folder() # create remote json path if json_remote_path is None: remote_json_folder = f"{json_path}/{start_date}" json_remote_path = _make_path( spark=context_manager.spark, adls_name=adls_name, container=container, path=remote_json_folder # type: ignore ) else: json_remote_path = os.path.join(json_remote_path, start_date) # create basic logger and set level logger = logging.getLogger(name=name) logger.setLevel(level) # generate handlers delta_handler = make_delta_handler(config_run=config_run, delta_folder=delta_path, level=level) json_handler = make_file_handler(config_run=config_run, path=json_path, level=level, remote_path=json_remote_path) # add handlers to logger logger.addHandler(delta_handler) logger.addHandler(json_handler) # wrap logger with Panama Adapter panama_logger = PanamaAdapter(logger=logger, config_run=config_run) # add the panama adapter to the panama logger dict logger.manager.loggerDict[panama_logger_name] = panama_logger # type: ignore return panama_logger
Main function used to get a panama logger.
Args
context_manager
:Union[ContextManager, None]
, optional- ContextManager holding the contextual information. If None
- a default context manager is created, assuming a spark session is active. Defaults to None.
name
:Union[str, None]
, optional- logger name. If None, the root logger is used. Defaults to None.
level
:int
, optional- log level. Defaults to logging.INFO.
delta_path
:Union[str, None]
, optional- path to delta folder. If None a default value is generated. Defaults to None.
json_path
:Union[str, None]
, optional- path to json folder. If None a default value is generated. Defaults to None.
Raises
ValueError
- if no context_manager is passed and no spark session is active, a ValueError is raised.
Returns
PanamaAdapter
- panama logger with the required handlers.
def log_execution(logger_name: str | None = None, level: int = 20, blocking: bool = True)
-
Expand source code
def log_execution(logger_name: Union[str, None] = None, level: int = logging.INFO, blocking: bool = True): """Decorator function, used to retrieve the logger by logger name and log the operations of a function. Args: logger_name (Union[str, None], optional): name of the logger to retrieve. If None, the root logger is used. Defaults to None. level (int, optional): level of the log step. Defaults to panama_log.INFO. blocking (bool, optional): flag that indicates if the task must be closed and the error raised. Defaults to True. """ def wrapper(func: Callable) -> Callable: @functools.wraps(func) def inner(*args: Any, **kwargs: Any) -> Any: # get status of the logging active_log = panama.logging.get_active_log() # if level logging is not activated, do not log stuff if active_log >= 99: return func(*args, **kwargs) # get the current context manager cm = get_context_manager() # get the current logger logger = panama.logging.get_logger(name=logger_name, context_manager=cm) # generate step name as the func name step_id = func.__name__ # format logging args and kwargs extra = _keep_default_type_args_kwargs(*args, **kwargs) # try-catch for logging: warnings are handled try: result = func(*args, **kwargs) logger.log(level=level, msg=step_id, extra=extra) return result except Exception as e: if isinstance(e, Warning): logger.warning(step_id, extra=extra, exc_info=e) else: logger.error(step_id, extra, exc_info=e) if blocking is True: raise e return inner return wrapper
Decorator function, used to retrieve the logger by logger name and log the operations of a function.
Args
logger_name
:Union[str, None]
, optional- name of the logger to retrieve. If None, the root logger is used. Defaults to None.
level
:int
, optional- level of the log step. Defaults to panama_log.INFO.
blocking
:bool
, optional- flag that indicates if the task must be closed and the error raised. Defaults to True.
def set_adls_name(adls_name: str) ‑> None
-
Expand source code
def set_adls_name(adls_name: str) -> None: """Set the adls name. Args: adls (str): adls name. """ global _ADLS_NAME _ADLS_NAME = adls_name
Set the adls name.
Args
adls
:str
- adls name.
def set_container(container: str) ‑> None
-
Expand source code
def set_container(container: str) -> None: """Set the container name. Args: container (str): container name. """ global _CONTAINER _CONTAINER = container
Set the container name.
Args
container
:str
- container name.
def set_delta_folder(folder: str) ‑> None
-
Expand source code
def set_delta_folder(folder: str) -> None: """Set the delta folder name. Args: folder (str): delta folder name. """ global _DELTA_FOLDER _DELTA_FOLDER = folder
Set the delta folder name.
Args
folder
:str
- delta folder name.
def set_json_folder(folder: str) ‑> None
-
Expand source code
def set_json_folder(folder: str) -> None: """Set the json folder name. Args: folder (str): json folder name. """ global _JSON_FOLDER _JSON_FOLDER = folder
Set the json folder name.
Args
folder
:str
- json folder name.
Classes
class ConfigRun (context_manager: ContextManager)
-
Expand source code
class ConfigRun: """Class used to initialize the data of a job run. Attributes: dbutils_data (dict): data extracted from dbutils. If dbutils is missing, an empty dictionary is generated. spark (SparkSession): current spark session. """ def __init__(self, context_manager: ContextManager): """Initialize the ConfigRun object. dbutils_data is generated. Args: spark (Union[SparkSession, None]): current spark session. If None, the spark session is automatically fetched. Default is None. """ self.context_manager = context_manager self.spark = self.context_manager.spark self._set_dbutils_data() def _set_default(self, with_timestamp: bool = False): try: default = os.path.basename(sys.argv[0]) except: try: default = __file__ except: default = str(uuid.uuid4()) if with_timestamp is True: ts = datetime.astimezone(datetime.now(), tz=timezone("Europe/Rome")).strftime("%Y_%m_%d__%H_%M_%S") default = "@".join([default, ts]) return default def _set_dbutils_data(self): """Method used to try to set the dbutils data. If it fails no error is raised, and dbutils_data are set as empty.""" self.dbutils = get_db_utils(self.spark) try: dbutils_data = self.dbutils.notebook.entry_point.getDbutils().notebook().getContext().toJson() # type: ignore self.dbutils_data = json.loads(dbutils_data)["tags"] except: # (py4j.protocol.Py4JJavaError, NameError, ValueError) as e: print("No dbutils found. Cannot import data from dbutils.") self.dbutils_data = dict() def get_from_dbutils(self, key: str, default: Union[str, None] = None) -> Union[str, None]: """Method used to extract a value from dbutils_data. Args: key (str): name of the parameter to extract. default (Union[str, None], optional): default value if no parameter is found. Defaults to None. Returns: Union[str, None]: content of dbutils_data for the required key. """ return self.dbutils_data.get(key, default) def get_from_databricks_conf(self, key: str, default: Union[str, None] = None) -> Union[str, None]: """Method used to extract a value from spark configuration. Args: key (str): name of the parameter to extract. default (Union[str, None], optional): default value if no parameter is found. Defaults to None. Returns: Union[str, None]: content of spark configuration for the required key. """ return self.spark.conf.get(key, default) def get_job_id(self) -> Union[str, None]: """Method used to get the job_id from the dbutils_data. if no job_id is found, returns current notebook name if dbutils is available, otherwise current file name. If also current file name is not available, a uuid4 is generated. Returns: Union[str, None]: value of jobId """ default = self._set_default() return self.get_from_dbutils("jobId", default) def get_job_run_id(self) -> Union[str, None]: """Method used to get the job_run_id from the dbutils_data. If no job_run_id is found, returns current notebook name if dbutils is available, otherwise current file name. If also current file name is not available, a uuid4 is generated. Returns: Union[str, None]: value of jobId """ default = self._set_default(with_timestamp=True) return self.get_from_dbutils("multitaskParentRunId", default) def get_task_id(self) -> Union[str, None]: """Method used to get the task_id from the dbutils_data. If no job_run_id is found, returns current notebook name if dbutils is available, otherwise current file name. If also current file name is not available, a uuid4 is generated. Returns: Union[str, None]: value of jobId """ default = self._set_default() return self.get_from_dbutils("runId", default) def _get_workspace_url_root(self, default: Union[str, None] = None) -> Union[str, None]: """Method used to get the workspace url root. Args: default (Union[str, None], optional): default value if no value is found.. Defaults to None. Returns: Union[str, None]: root for the notebook link. """ workspace_url_root = self.get_from_dbutils("browserHostName", default) if workspace_url_root is None: workspace_url_root = self.get_from_databricks_conf("spark.databricks.workspaceUrl", default) return workspace_url_root def get_workspace(self) -> Union[str, None]: """Method used to get the org_id from the dbutils_data. Returns None if no org_id is found. Returns: Union[str, None]: value of org_id """ return self.get_from_dbutils("orgId") def get_job_name(self) -> Union[str, None]: """Method used to get the current job_name from dbutils_data. Returns None if no jobName is found. Returns: Union[str, None]: value of job_name """ return self.get_from_dbutils("jobName") def get_task_name(self) -> str: """Method used to get the current task_name from notebook info. Returns sys.argv[0] as default if an error is raised, otherwise the notebook path. Returns: str: value of the taskName """ try: path = self.dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get() # type: ignore except: path = sys.argv[0] default = os.path.basename(path).split(".")[0] return self.get_from_dbutils("taskName", default) # type: ignore def get_widgets(self) -> Mapping[str, object]: """Method used to get the widgets content. Returns: Union[str, None]: string of the widgets, repr of a dictionary. """ widgets = self.context_manager.__dict__ # remove the spark attribute widgets = {k: v for k, v in widgets.items() if k != "spark"} return widgets def get_url(self) -> str: """Method used to generate an url to a notebook. Args: job_id (str): id of the current job. task_id (str): id of the current task Returns: str: generated url. Id no url is generated, the string 'No job run associated' is returned. """ url_root = self._get_workspace_url_root() workspace = self.get_workspace() if url_root is not None: url = f"https://{url_root}?o={workspace}#job/{self.job_id}/run/{self.task_id}" else: url = "No job run associated" return url def generate_config(self): """Method used to generate a default job run configuration. The following attributes are set: job_id str job_run_id str task_id str job_name Union[str, None] task_name Union[str, None] url str widgets str defaults Dict[str, str] """ self.job_id = self.get_job_id() self.job_run_id = self.get_job_run_id() self.task_id = self.get_task_id() self.job_name = self.get_job_name() self.task_name = self.get_task_name() self.url = self.get_url() self.widgets = self.get_widgets() self.defaults = {"job_id": self.job_id, "job_run_id": self.job_run_id, "task_id": self.task_id} return self
Class used to initialize the data of a job run.
Attributes
dbutils_data
:dict
- data extracted from dbutils. If dbutils is missing, an empty dictionary is generated.
spark
:SparkSession
- current spark session.
Initialize the ConfigRun object. dbutils_data is generated.
Args
spark
:Union[SparkSession, None]
- current spark session. If None, the spark session is automatically fetched. Default is None.
Methods
def generate_config(self)
-
Expand source code
def generate_config(self): """Method used to generate a default job run configuration. The following attributes are set: job_id str job_run_id str task_id str job_name Union[str, None] task_name Union[str, None] url str widgets str defaults Dict[str, str] """ self.job_id = self.get_job_id() self.job_run_id = self.get_job_run_id() self.task_id = self.get_task_id() self.job_name = self.get_job_name() self.task_name = self.get_task_name() self.url = self.get_url() self.widgets = self.get_widgets() self.defaults = {"job_id": self.job_id, "job_run_id": self.job_run_id, "task_id": self.task_id} return self
Method used to generate a default job run configuration.
The following attributes are set:
job_id str job_run_id str task_id str job_name Union[str, None] task_name Union[str, None] url str widgets str defaults Dict[str, str]
def get_from_databricks_conf(self, key: str, default: str | None = None) ‑> str | None
-
Expand source code
def get_from_databricks_conf(self, key: str, default: Union[str, None] = None) -> Union[str, None]: """Method used to extract a value from spark configuration. Args: key (str): name of the parameter to extract. default (Union[str, None], optional): default value if no parameter is found. Defaults to None. Returns: Union[str, None]: content of spark configuration for the required key. """ return self.spark.conf.get(key, default)
Method used to extract a value from spark configuration.
Args
key
:str
- name of the parameter to extract.
default
:Union[str, None]
, optional- default value if no parameter is found. Defaults to None.
Returns
Union[str, None]
- content of spark configuration for the required key.
def get_from_dbutils(self, key: str, default: str | None = None) ‑> str | None
-
Expand source code
def get_from_dbutils(self, key: str, default: Union[str, None] = None) -> Union[str, None]: """Method used to extract a value from dbutils_data. Args: key (str): name of the parameter to extract. default (Union[str, None], optional): default value if no parameter is found. Defaults to None. Returns: Union[str, None]: content of dbutils_data for the required key. """ return self.dbutils_data.get(key, default)
Method used to extract a value from dbutils_data.
Args
key
:str
- name of the parameter to extract.
default
:Union[str, None]
, optional- default value if no parameter is found. Defaults to None.
Returns
Union[str, None]
- content of dbutils_data for the required key.
def get_job_id(self) ‑> str | None
-
Expand source code
def get_job_id(self) -> Union[str, None]: """Method used to get the job_id from the dbutils_data. if no job_id is found, returns current notebook name if dbutils is available, otherwise current file name. If also current file name is not available, a uuid4 is generated. Returns: Union[str, None]: value of jobId """ default = self._set_default() return self.get_from_dbutils("jobId", default)
Method used to get the job_id from the dbutils_data. if no job_id is found, returns current notebook name if dbutils is available, otherwise current file name. If also current file name is not available, a uuid4 is generated.
Returns
Union[str, None]
- value of jobId
def get_job_name(self) ‑> str | None
-
Expand source code
def get_job_name(self) -> Union[str, None]: """Method used to get the current job_name from dbutils_data. Returns None if no jobName is found. Returns: Union[str, None]: value of job_name """ return self.get_from_dbutils("jobName")
Method used to get the current job_name from dbutils_data. Returns None if no jobName is found.
Returns
Union[str, None]
- value of job_name
def get_job_run_id(self) ‑> str | None
-
Expand source code
def get_job_run_id(self) -> Union[str, None]: """Method used to get the job_run_id from the dbutils_data. If no job_run_id is found, returns current notebook name if dbutils is available, otherwise current file name. If also current file name is not available, a uuid4 is generated. Returns: Union[str, None]: value of jobId """ default = self._set_default(with_timestamp=True) return self.get_from_dbutils("multitaskParentRunId", default)
Method used to get the job_run_id from the dbutils_data. If no job_run_id is found, returns current notebook name if dbutils is available, otherwise current file name. If also current file name is not available, a uuid4 is generated.
Returns
Union[str, None]
- value of jobId
def get_task_id(self) ‑> str | None
-
Expand source code
def get_task_id(self) -> Union[str, None]: """Method used to get the task_id from the dbutils_data. If no job_run_id is found, returns current notebook name if dbutils is available, otherwise current file name. If also current file name is not available, a uuid4 is generated. Returns: Union[str, None]: value of jobId """ default = self._set_default() return self.get_from_dbutils("runId", default)
Method used to get the task_id from the dbutils_data. If no job_run_id is found, returns current notebook name if dbutils is available, otherwise current file name. If also current file name is not available, a uuid4 is generated.
Returns
Union[str, None]
- value of jobId
def get_task_name(self) ‑> str
-
Expand source code
def get_task_name(self) -> str: """Method used to get the current task_name from notebook info. Returns sys.argv[0] as default if an error is raised, otherwise the notebook path. Returns: str: value of the taskName """ try: path = self.dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get() # type: ignore except: path = sys.argv[0] default = os.path.basename(path).split(".")[0] return self.get_from_dbutils("taskName", default) # type: ignore
Method used to get the current task_name from notebook info. Returns sys.argv[0] as default if an error is raised, otherwise the notebook path.
Returns
str
- value of the taskName
def get_url(self) ‑> str
-
Expand source code
def get_url(self) -> str: """Method used to generate an url to a notebook. Args: job_id (str): id of the current job. task_id (str): id of the current task Returns: str: generated url. Id no url is generated, the string 'No job run associated' is returned. """ url_root = self._get_workspace_url_root() workspace = self.get_workspace() if url_root is not None: url = f"https://{url_root}?o={workspace}#job/{self.job_id}/run/{self.task_id}" else: url = "No job run associated" return url
Method used to generate an url to a notebook.
Args
job_id
:str
- id of the current job.
task_id
:str
- id of the current task
Returns
str
- generated url. Id no url is generated, the string 'No job run associated' is returned.
def get_widgets(self) ‑> Mapping[str, object]
-
Expand source code
def get_widgets(self) -> Mapping[str, object]: """Method used to get the widgets content. Returns: Union[str, None]: string of the widgets, repr of a dictionary. """ widgets = self.context_manager.__dict__ # remove the spark attribute widgets = {k: v for k, v in widgets.items() if k != "spark"} return widgets
Method used to get the widgets content.
Returns
Union[str, None]
- string of the widgets, repr of a dictionary.
def get_workspace(self) ‑> str | None
-
Expand source code
def get_workspace(self) -> Union[str, None]: """Method used to get the org_id from the dbutils_data. Returns None if no org_id is found. Returns: Union[str, None]: value of org_id """ return self.get_from_dbutils("orgId")
Method used to get the org_id from the dbutils_data. Returns None if no org_id is found.
Returns
Union[str, None]
- value of org_id
class PanamaAdapter (logger: logging.Logger,
config_run: ConfigRun)-
Expand source code
class PanamaAdapter(logging.LoggerAdapter): """Panama adapter to add contextual information and create the methods start_task, end_task and log_step.""" def __init__(self, logger: logging.Logger, config_run: ConfigRun): """Initialize adapter. Args: logger (logging.Logger): underlying logger object. config_run (ConfigRun): config run, with default data. """ self.config_run = config_run super().__init__(logger=logger, extra=config_run.widgets) def process(self, msg, kwargs): """Add to extra the required information. Deals with trace parameter. Args: msg (str): logged message content. kwargs (dict): log call arguments. Returns: (str, dict): msg and kwargs handled. """ forbidden = ["exc_info", "stack_info", "logger"] extra = {k: str(v) for k, v in kwargs.items() if k not in forbidden} # add trace kwarg to extra if isinstance(kwargs.get("exc_info"), str): extra["trace"] = kwargs["exc_info"] kwargs["extra"] = extra # remove trace from kwargs, as it has been added to extras kwargs.pop("trace", None) return msg, kwargs def move_json_to_remote(self): """Method used to move logs to remote storage.""" # get task name from config_run task_name = self.config_run.task_name # get logger_dict from manager logger_dict = self.logger.manager.loggerDict.items() # keep panama loggers related to the task loggers = [v for k, v in logger_dict if k.startswith(f"PANAMA_{task_name}")] # get file handlers for every logger and stash the file for l in loggers: for h in l.logger.handlers: if isinstance(h, PanamaFileHandler): h.move_to_remote() def start_task(self, **kwargs): """Method used to open a task record.""" self.log(level=99, msg="RUNNING", extra=kwargs, stack_info=False) def end_task(self, trace: Union[Exception, None] = None, move_file: bool = True): """Method used to close a task record.""" if trace is None: self.log(level=100, msg="OK") else: self.log(level=100, msg="KO", exc_info=trace) # wait a bit if some files have not been closed time.sleep(10) if move_file is True: # move task json to remote self.move_json_to_remote() def log_step(self, step_id: str, level: int = logging.INFO, trace: Union[Exception, str, None] = None, **extras): """Method used to log a step.""" self.log(level=level, msg=step_id, extra=extras, exc_info=trace) # type: ignore
Panama adapter to add contextual information and create the methods start_task, end_task and log_step.
Initialize adapter.
Args
logger
:logging.Logger
- underlying logger object.
config_run
:ConfigRun
- config run, with default data.
Ancestors
- logging.LoggerAdapter
Methods
def end_task(self, trace: Exception | None = None, move_file: bool = True)
-
Expand source code
def end_task(self, trace: Union[Exception, None] = None, move_file: bool = True): """Method used to close a task record.""" if trace is None: self.log(level=100, msg="OK") else: self.log(level=100, msg="KO", exc_info=trace) # wait a bit if some files have not been closed time.sleep(10) if move_file is True: # move task json to remote self.move_json_to_remote()
Method used to close a task record.
def log_step(self, step_id: str, level: int = 20, trace: Exception | str | None = None, **extras)
-
Expand source code
def log_step(self, step_id: str, level: int = logging.INFO, trace: Union[Exception, str, None] = None, **extras): """Method used to log a step.""" self.log(level=level, msg=step_id, extra=extras, exc_info=trace) # type: ignore
Method used to log a step.
def move_json_to_remote(self)
-
Expand source code
def move_json_to_remote(self): """Method used to move logs to remote storage.""" # get task name from config_run task_name = self.config_run.task_name # get logger_dict from manager logger_dict = self.logger.manager.loggerDict.items() # keep panama loggers related to the task loggers = [v for k, v in logger_dict if k.startswith(f"PANAMA_{task_name}")] # get file handlers for every logger and stash the file for l in loggers: for h in l.logger.handlers: if isinstance(h, PanamaFileHandler): h.move_to_remote()
Method used to move logs to remote storage.
def process(self, msg, kwargs)
-
Expand source code
def process(self, msg, kwargs): """Add to extra the required information. Deals with trace parameter. Args: msg (str): logged message content. kwargs (dict): log call arguments. Returns: (str, dict): msg and kwargs handled. """ forbidden = ["exc_info", "stack_info", "logger"] extra = {k: str(v) for k, v in kwargs.items() if k not in forbidden} # add trace kwarg to extra if isinstance(kwargs.get("exc_info"), str): extra["trace"] = kwargs["exc_info"] kwargs["extra"] = extra # remove trace from kwargs, as it has been added to extras kwargs.pop("trace", None) return msg, kwargs
Add to extra the required information. Deals with trace parameter.
Args
msg
:str
- logged message content.
kwargs
:dict
- log call arguments.
Returns
(str, dict): msg and kwargs handled.
def start_task(self, **kwargs)
-
Expand source code
def start_task(self, **kwargs): """Method used to open a task record.""" self.log(level=99, msg="RUNNING", extra=kwargs, stack_info=False)
Method used to open a task record.