Module panama.logging.messages

Functions

def create_message(table_name: str, status: Literal['FINISHED', 'FAILED']) ‑> str
Expand source code
def create_message(table_name: str, status: Literal["FINISHED", "FAILED"]) -> str:
    """Create the message from table and status.

    Args:
        table_name (str): name of the table to log.
        status (Literal["FINISHED", "FAILED"]): status of the table.

    Raises:
        NameError: FORBIDDEN NAME for table. Analytics table names cannot end with any value from "_entity", "_application", "_slv".

    Returns:
        str: message to send to the topic.
    """
    if any([table_name.endswith(i) for i in _FORBIDDEN_ENDNAMES]):
        raise NameError(
            f"FORBIDDEN NAME for table. Analytics table names cannot end with any value from {_FORBIDDEN_ENDNAMES}."
        )

    return '{"child_name": "' + table_name + '", "status": "' + status + '"}'

Create the message from table and status.

Args

table_name : str
name of the table to log.

status (Literal["FINISHED", "FAILED"]): status of the table.

Raises

NameError
FORBIDDEN NAME for table. Analytics table names cannot end with any value from "_entity", "_application", "_slv".

Returns

str
message to send to the topic.
def create_servicebus_client() ‑> azure.servicebus._servicebus_client.ServiceBusClient
Expand source code
def create_servicebus_client() -> ServiceBusClient:
    """Function used to create a service bus client.

    Returns:
        ServiceBusClient: client used to send messages.
    """
    servicebus_connection_str = get_connection_string_service_bus()
    servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str)
    return servicebus_client

Function used to create a service bus client.

Returns

ServiceBusClient
client used to send messages.
def get_connection_string_service_bus() ‑> str
Expand source code
def get_connection_string_service_bus() -> str:
    """Function used to recover information for service bus connection from the connections.json file and a keyvault.

    Returns:
        str: connection string for the servicebus.
    """

    conn_string = "Endpoint=sb://{account_name}.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey={account_key}"

    connection_config = get_connection_config_json()
    spark = get_spark_session()

    account_name = connection_config["servicebus"]["credentials"]["account_name"]
    account_key_secret = connection_config["servicebus"]["credentials"]["account_key"]
    scope = connection_config["servicebus"]["credentials"].get("scope")
    account_key = get_secret_from_keyvault(spark=spark, key=account_key_secret, scope=scope)

    conn_string = conn_string.format(account_name=account_name, account_key=account_key)

    return conn_string

Function used to recover information for service bus connection from the connections.json file and a keyvault.

Returns

str
connection string for the servicebus.
def send_message(table_name: str,
status: Literal['FINISHED', 'FAILED'],
topic_name: str = 'dependency-topic') ‑> None
Expand source code
def send_message(table_name: str, status: Literal["FINISHED", "FAILED"], topic_name: str = "dependency-topic") -> None:
    """Function used to send messages to the service bus. The service bus used can be found in connections.json file.

    Args:
        table_name (str): name of the table to log.
        status (Literal["FINISHED", "FAILED"]): status of the table.
        topic_name (str, optional): name of the topic where data will be sent. Defaults to "dependency-topic".
    """
    servicebus_client = create_servicebus_client()
    with servicebus_client:
        topic_sender = servicebus_client.get_topic_sender(topic_name=topic_name)
        with topic_sender:
            msg = create_message(table_name=table_name, status=status)
            message = ServiceBusMessage(msg)
            topic_sender.send_messages(message)

Function used to send messages to the service bus. The service bus used can be found in connections.json file.

Args

table_name : str
name of the table to log.
status (Literal["FINISHED", "FAILED"]): status of the table.
topic_name : str, optional
name of the topic where data will be sent. Defaults to "dependency-topic".