Module panama.logging.messages
Functions
def create_message(table_name: str, status: Literal['FINISHED', 'FAILED']) ‑> str
-
Expand source code
def create_message(table_name: str, status: Literal["FINISHED", "FAILED"]) -> str: """Create the message from table and status. Args: table_name (str): name of the table to log. status (Literal["FINISHED", "FAILED"]): status of the table. Raises: NameError: FORBIDDEN NAME for table. Analytics table names cannot end with any value from "_entity", "_application", "_slv". Returns: str: message to send to the topic. """ if any([table_name.endswith(i) for i in _FORBIDDEN_ENDNAMES]): raise NameError( f"FORBIDDEN NAME for table. Analytics table names cannot end with any value from {_FORBIDDEN_ENDNAMES}." ) return '{"child_name": "' + table_name + '", "status": "' + status + '"}'
Create the message from table and status.
Args
table_name
:str
- name of the table to log.
status (Literal["FINISHED", "FAILED"]): status of the table.
Raises
NameError
- FORBIDDEN NAME for table. Analytics table names cannot end with any value from "_entity", "_application", "_slv".
Returns
str
- message to send to the topic.
def create_servicebus_client() ‑> azure.servicebus._servicebus_client.ServiceBusClient
-
Expand source code
def create_servicebus_client() -> ServiceBusClient: """Function used to create a service bus client. Returns: ServiceBusClient: client used to send messages. """ servicebus_connection_str = get_connection_string_service_bus() servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str) return servicebus_client
Function used to create a service bus client.
Returns
ServiceBusClient
- client used to send messages.
def get_connection_string_service_bus() ‑> str
-
Expand source code
def get_connection_string_service_bus() -> str: """Function used to recover information for service bus connection from the connections.json file and a keyvault. Returns: str: connection string for the servicebus. """ conn_string = "Endpoint=sb://{account_name}.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey={account_key}" connection_config = get_connection_config_json() spark = get_spark_session() account_name = connection_config["servicebus"]["credentials"]["account_name"] account_key_secret = connection_config["servicebus"]["credentials"]["account_key"] scope = connection_config["servicebus"]["credentials"].get("scope") account_key = get_secret_from_keyvault(spark=spark, key=account_key_secret, scope=scope) conn_string = conn_string.format(account_name=account_name, account_key=account_key) return conn_string
Function used to recover information for service bus connection from the connections.json file and a keyvault.
Returns
str
- connection string for the servicebus.
def send_message(table_name: str,
status: Literal['FINISHED', 'FAILED'],
topic_name: str = 'dependency-topic') ‑> None-
Expand source code
def send_message(table_name: str, status: Literal["FINISHED", "FAILED"], topic_name: str = "dependency-topic") -> None: """Function used to send messages to the service bus. The service bus used can be found in connections.json file. Args: table_name (str): name of the table to log. status (Literal["FINISHED", "FAILED"]): status of the table. topic_name (str, optional): name of the topic where data will be sent. Defaults to "dependency-topic". """ servicebus_client = create_servicebus_client() with servicebus_client: topic_sender = servicebus_client.get_topic_sender(topic_name=topic_name) with topic_sender: msg = create_message(table_name=table_name, status=status) message = ServiceBusMessage(msg) topic_sender.send_messages(message)
Function used to send messages to the service bus. The service bus used can be found in connections.json file.
Args
table_name
:str
- name of the table to log.
- status (Literal["FINISHED", "FAILED"]): status of the table.
topic_name
:str
, optional- name of the topic where data will be sent. Defaults to "dependency-topic".