Module panama.utils.scala_utils

Functions

def get_last_run_timestamp(spark: pyspark.sql.session.SparkSession,
run_table_name: str,
launch_reference: str) ‑> str
Expand source code
def get_last_run_timestamp(spark: SparkSession, run_table_name: str, launch_reference: str) -> str:
    """
    Get the last run date

    This function take a run table name and a launch reference and return the last loaddate_ previous to the launch_reference.

    Args:
        run_table_name (str): The name of the run table (it must have loaddate_ and launch_reference columns).
        launch_reference (str): The launch reference used for filtering the data view and runs view.

    Returns:
        str: the last run timestamp.
    """
    run_table = spark.table(run_table_name)
    run_date = get_col_as_list(
        run_table.filter(F.to_date("launch_reference") < launch_reference).select(
            F.max("loaddate_").alias("loaddate_")
        ),
        colname="loaddate_",
    )[0]
    return run_date

Get the last run date

This function take a run table name and a launch reference and return the last loaddate_ previous to the launch_reference.

Args

run_table_name : str
The name of the run table (it must have loaddate_ and launch_reference columns).
launch_reference : str
The launch reference used for filtering the data view and runs view.

Returns

str
the last run timestamp.
def get_last_valid_type2_snapshot(spark: pyspark.sql.session.SparkSession,
data_view: str,
runs_view: str,
launch_reference: str,
scenario: str) ‑> pyspark.sql.dataframe.DataFrame
Expand source code
def get_last_valid_type2_snapshot(
    spark: SparkSession, data_view: str, runs_view: str, launch_reference: str, scenario: str
) -> DataFrame:
    """
    Get the last valid Type 2 snapshot DataFrame.

    This function performs a SQL query using the Spark session to select the last valid Type 2 table data snapshot
    from the data_view table. The snapshot is filtered based on the launch_reference and the state of the
    corresponding run in the runs_view table. The result is returned as a DataFrame.

    Args:
        data_view (str): The name of the data temp view table.
        runs_view (str): The name of the runs temp view table.
        launch_reference (str): The launch reference used for filtering the data view and runs view.

    Returns:
        DataFrame: The last valid Type 2 snapshot DataFrame.
    """
    query = f"""select *
        from {data_view}
        where (
            select max(run_id)
            from {runs_view}
            where launch_reference <= '{launch_reference}'
            and state == 'FINISHED'
        ) between start_run and coalesce(end_run, 1e15)
        and scenario='{scenario}'
    """

    return spark.sql(query)

Get the last valid Type 2 snapshot DataFrame.

This function performs a SQL query using the Spark session to select the last valid Type 2 table data snapshot from the data_view table. The snapshot is filtered based on the launch_reference and the state of the corresponding run in the runs_view table. The result is returned as a DataFrame.

Args

data_view : str
The name of the data temp view table.
runs_view : str
The name of the runs temp view table.
launch_reference : str
The launch reference used for filtering the data view and runs view.

Returns

DataFrame
The last valid Type 2 snapshot DataFrame.
def handle_catalog(spark: pyspark.sql.session.SparkSession)
Expand source code
def handle_catalog(spark: SparkSession):
    """Function used to add the catalog name to the spark configurations. The environment will be extracted from the cluster tag.

    The following information are added to the spark session:

    * catalog_name

    Args:
        spark (SparkSession): current spark session.
    """
    catalog = f"{get_env_from_cluster_tag(spark)}_analytics"
    spark.conf.set("catalog_name", catalog)

Function used to add the catalog name to the spark configurations. The environment will be extracted from the cluster tag.

The following information are added to the spark session:

  • catalog_name

Args

spark : SparkSession
current spark session.
def handle_dataframe(spark: pyspark.sql.session.SparkSession,
sdf: pyspark.sql.dataframe.DataFrame,
view_name: str,
select_cols: List[str] | None = None,
round: bool = False)
Expand source code
def handle_dataframe(
    spark: SparkSession, sdf: DataFrame, view_name: str, select_cols: Optional[List[str]] = None, round: bool = False
):
    """Function used to handle the passage of information regarding a dataframe to scala.
    It saves a spark dataframe as a global temp view, selecting and rounding columns if required.
    A loaddate_ column is added to the dataframe with the current timestamp.

    The following information are added to the spark configuration:

    * view_name

    Args:
        spark (SparkSession): current spark session.
        sdf (DataFrame): dataframe to save.
        view_name (str): name of the view holding dataframe data.
        select_cols (Optional[List[str]]): columns to keep. If not provided, all columns are kept. Defaults to None.
        round (bool, optional): whether to round double columns or not. Defaults to False.
    """
    if select_cols is not None:
        sdf = sdf.select(*select_cols)
    if round is True:
        round_cols = [F.round(c[0], 6).alias(c[0]) if c[1] == "double" else F.col(c[0]) for c in sdf.dtypes]
        sdf = sdf.select(*round_cols)

    sdf = sdf.withColumn("loaddate_", F.lit(datetime.now(tz=timezone("Europe/Rome"))))

    sdf.createOrReplaceGlobalTempView(view_name)
    spark.conf.set("view_name", view_name)

Function used to handle the passage of information regarding a dataframe to scala. It saves a spark dataframe as a global temp view, selecting and rounding columns if required. A loaddate_ column is added to the dataframe with the current timestamp.

The following information are added to the spark configuration:

  • view_name

Args

spark : SparkSession
current spark session.
sdf : DataFrame
dataframe to save.
view_name : str
name of the view holding dataframe data.
select_cols : Optional[List[str]]
columns to keep. If not provided, all columns are kept. Defaults to None.
round : bool, optional
whether to round double columns or not. Defaults to False.
def handle_dataframe_destination(spark: pyspark.sql.session.SparkSession,
adls_name: str,
container: str,
schema: str,
table_name: str,
schema_suffix: Literal['_of', '_oi', ''] | None = None)
Expand source code
def handle_dataframe_destination(
    spark: SparkSession,
    adls_name: str,
    container: str,
    schema: str,
    table_name: str,
    schema_suffix: Optional[Literal["_of", "_oi", ""]] = None,
):
    """Function used to handle a dataframe destination. The following information will be added to the spark configuration:

    * schema
    * table_name
    * schema_suffix
    * destination_path
    * path_prefix

    Args:
        spark (SparkSession): current spark session.
        adls_name (str): data lake storage name where data will be written.
        container (str): container where data will be written.
        schema (str): schema where data will be written.
        table_name (str): target table name.
        schema_suffix (Optional[Literal[&quot;_of&quot;, &quot;_oi&quot;, &quot;&quot;]], optional): schema suffix. Defaults to None.
    """
    if schema_suffix is None:
        warnings.warn("Missing schema_suffix, an empty string will be placed. Please check if that is correct")
        schema_suffix = ""

    destination_path = IOAdls(spark).generate_connection_string(
        adls_name=adls_name, container=container, path=f"{schema}/{table_name}"
    )
    path_prefix = IOAdls(spark).generate_connection_string(adls_name=adls_name, container=container, path=f"{schema}")

    spark.conf.set("schema", schema)
    spark.conf.set("table_name", table_name)
    spark.conf.set("schema_suffix", schema_suffix)
    spark.conf.set("destination_path", destination_path)
    spark.conf.set("path_prefix", path_prefix)

Function used to handle a dataframe destination. The following information will be added to the spark configuration:

  • schema
  • table_name
  • schema_suffix
  • destination_path
  • path_prefix

Args

spark : SparkSession
current spark session.
adls_name : str
data lake storage name where data will be written.
container : str
container where data will be written.
schema : str
schema where data will be written.
table_name : str
target table name.

schema_suffix (Optional[Literal["_of", "_oi", ""]], optional): schema suffix. Defaults to None.

def handle_launch_reference(spark: pyspark.sql.session.SparkSession, launch_reference: str | None = None)
Expand source code
def handle_launch_reference(spark: SparkSession, launch_reference: Optional[str] = None):
    """Function used to add the launch reference, if provided, to the spark configuration.

    Args:
        spark (SparkSession): current spark session.
        launch_reference (Optional[str], optional): launch reference. Defaults to None.
    """
    if launch_reference is not None:
        spark.conf.set("launch_reference", launch_reference)

Function used to add the launch reference, if provided, to the spark configuration.

Args

spark : SparkSession
current spark session.
launch_reference : Optional[str], optional
launch reference. Defaults to None.
def handle_other_conf(spark: pyspark.sql.session.SparkSession,
other_conf: Dict[str, Any] | None = None)
Expand source code
def handle_other_conf(spark: SparkSession, other_conf: Optional[Dict[str, Any]] = None):
    """Function used to handle any optional spark configuration, passed as a dictionary.

    Args:
        spark (SparkSession): current spark session
        other_conf (Optional[Dict[str, Any]], optional): other spark configuration values as a dictionary. Defaults to None.
    """
    if other_conf is not None:
        for k, v in other_conf.items():
            spark.conf.set(k, v)

Function used to handle any optional spark configuration, passed as a dictionary.

Args

spark : SparkSession
current spark session
other_conf : Optional[Dict[str, Any]], optional
other spark configuration values as a dictionary. Defaults to None.
def handle_scala_run_id(spark: pyspark.sql.session.SparkSession, scala_run_id: str | None = None)
Expand source code
def handle_scala_run_id(spark: SparkSession, scala_run_id: Optional[str] = None):
    """Function used to add the scala_run_id, if provided, to the spark configuration.

    Args:
        spark (SparkSession): current spark session
        scala_run_id (Optional[str], optional): scala run id, previously extracted. Defaults to None.
    """
    if scala_run_id is not None:
        spark.conf.set("scala_run_id", scala_run_id)

Function used to add the scala_run_id, if provided, to the spark configuration.

Args

spark : SparkSession
current spark session
scala_run_id : Optional[str], optional
scala run id, previously extracted. Defaults to None.
def handover_write_to_scala(spark: pyspark.sql.session.SparkSession,
sdf: pyspark.sql.dataframe.DataFrame | None = None,
view_name: str | None = None,
select_cols: List[str] | None = None,
round: bool = False,
table_name: str | None = None,
adls_name: str | None = None,
container: str | None = None,
schema: str | None = None,
schema_suffix: Literal['_of', '_oi', ''] | None = None,
launch_reference: str | None = None,
scala_run_id: str | None = None,
other_conf: Dict[str, Any] | None = None) ‑> None
Expand source code
def handover_write_to_scala(
    spark: SparkSession,
    sdf: Optional[DataFrame] = None,
    view_name: Optional[str] = None,
    select_cols: Optional[List[str]] = None,
    round: bool = False,
    table_name: Optional[str] = None,
    adls_name: Optional[str] = None,
    container: Optional[str] = None,
    schema: Optional[str] = None,
    schema_suffix: Optional[Literal["_of", "_oi", ""]] = None,
    launch_reference: Optional[str] = None,
    scala_run_id: Optional[str] = None,
    other_conf: Optional[Dict[str, Any]] = None,
) -> None:
    """Master method used to handle any value that should be handled in scala. Check the required parameters for every step.
    Parameters will be saved as spark configurations.

    If a dataframe is passed, the view_name is expected, as the dataframe will be saved as a global temporary view.

    If the dataframe is to be saved, table_name is expected.
    If a table_name is passed, it is MANDATORY to set adls_name, container and schema. schema_suffix can be avoided, but will be handled with
    those configurations.

    launch_reference, scala_run_id and other_conf are optional, and if missing will be ignored.

    Args:
        spark (SparkSession): current spark session.
        sdf (Optional[DataFrame], optional): dataframe to save as a temporary view. Defaults to None.
        view_name (Optional[str], optional): view name for the dataframe; unused if sdf is not provided. Defaults to None.
        select_cols (Optional[List[str]], optional): columns to keep from sdf; unused if sdf is not provided. Defaults to None.
        round (bool, optional): whether to round double columns in sdf; unused if sdf is not provided. Defaults to False.
        table_name (Optional[str], optional): name of the table where data in sdf will be saved. Defaults to None.
        adls_name (Optional[str], optional): name of the datalake holding the container, schema and table where data in sdf will be written;
                                             unused if table_name is not provided. Defaults to None.
        container (Optional[str], optional): name of the container holding schema and table where data in sdf will be written;
                                             unused if table_name is not provided. Defaults to None.
        schema (Optional[str], optional): schema holding the table where data in sdf will be written; unused if table_name is not provided. Defaults to None.
        schema_suffix (Optional[Literal[&quot;_of&quot;, &quot;_oi&quot;, &quot;&quot;]], optional): schema suffix; unused if table_name is not provided. Defaults to None.
        launch_reference (Optional[str], optional): current launch reference. Defaults to None.
        scala_run_id (Optional[str], optional): run id for the scala writing. Defaults to None.
        other_conf (Optional[Dict[str, Any]], optional): optional configurations. Defaults to None.

    Raises:
        ValueError: if an sdf is passed, a view_name must be provided.
        ValueError: if table_name is provided, also adls_name, container and schema must be provided.
    """

    if sdf is not None:
        if view_name is None:
            raise ValueError("if an sdf is passed, also a view_name must be provided, None was found.")
        handle_dataframe(spark=spark, sdf=sdf, view_name=view_name, select_cols=select_cols, round=round)

    if table_name is None:
        warnings.warn("No table_name have been passed: no paths will be set in spark configuration.")
    elif (adls_name is None) or (container is None) or (schema is None):
        raise ValueError(
            "Missing adls_name, container or schema: these are mandatory to handle the writing with scala."
        )
    else:
        handle_dataframe_destination(
            spark=spark,
            adls_name=adls_name,
            container=container,
            schema_suffix=schema_suffix,
            schema=schema,
            table_name=table_name,
        )

    handle_catalog(spark=spark)
    handle_launch_reference(spark=spark, launch_reference=launch_reference)
    handle_scala_run_id(spark=spark, scala_run_id=scala_run_id)
    handle_other_conf(spark=spark, other_conf=other_conf)

Master method used to handle any value that should be handled in scala. Check the required parameters for every step. Parameters will be saved as spark configurations.

If a dataframe is passed, the view_name is expected, as the dataframe will be saved as a global temporary view.

If the dataframe is to be saved, table_name is expected. If a table_name is passed, it is MANDATORY to set adls_name, container and schema. schema_suffix can be avoided, but will be handled with those configurations.

launch_reference, scala_run_id and other_conf are optional, and if missing will be ignored.

Args

spark : SparkSession
current spark session.
sdf : Optional[DataFrame], optional
dataframe to save as a temporary view. Defaults to None.
view_name : Optional[str], optional
view name for the dataframe; unused if sdf is not provided. Defaults to None.
select_cols : Optional[List[str]], optional
columns to keep from sdf; unused if sdf is not provided. Defaults to None.
round : bool, optional
whether to round double columns in sdf; unused if sdf is not provided. Defaults to False.
table_name : Optional[str], optional
name of the table where data in sdf will be saved. Defaults to None.
adls_name : Optional[str], optional
name of the datalake holding the container, schema and table where data in sdf will be written; unused if table_name is not provided. Defaults to None.
container : Optional[str], optional
name of the container holding schema and table where data in sdf will be written; unused if table_name is not provided. Defaults to None.
schema : Optional[str], optional
schema holding the table where data in sdf will be written; unused if table_name is not provided. Defaults to None.
schema_suffix (Optional[Literal["_of", "_oi", ""]], optional): schema suffix; unused if table_name is not provided. Defaults to None.
launch_reference : Optional[str], optional
current launch reference. Defaults to None.
scala_run_id : Optional[str], optional
run id for the scala writing. Defaults to None.
other_conf : Optional[Dict[str, Any]], optional
optional configurations. Defaults to None.

Raises

ValueError
if an sdf is passed, a view_name must be provided.
ValueError
if table_name is provided, also adls_name, container and schema must be provided.