Module panama.utils.scala_utils

Functions

def get_last_run_timestamp(spark: pyspark.sql.session.SparkSession, run_table_name: str, launch_reference: str) ‑> str

Get the last run date

This function take a run table name and a launch reference and return the last loaddate_ previous to the launch_reference.

Args

run_table_name : str
The name of the run table (it must have loaddate_ and launch_reference columns).
launch_reference : str
The launch reference used for filtering the data view and runs view.

Returns

str
the last run timestamp.
def get_last_valid_type2_snapshot(spark: pyspark.sql.session.SparkSession, data_view: str, runs_view: str, launch_reference: str, scenario: str) ‑> pyspark.sql.dataframe.DataFrame

Get the last valid Type 2 snapshot DataFrame.

This function performs a SQL query using the Spark session to select the last valid Type 2 table data snapshot from the data_view table. The snapshot is filtered based on the launch_reference and the state of the corresponding run in the runs_view table. The result is returned as a DataFrame.

Args

data_view : str
The name of the data temp view table.
runs_view : str
The name of the runs temp view table.
launch_reference : str
The launch reference used for filtering the data view and runs view.

Returns

DataFrame
The last valid Type 2 snapshot DataFrame.
def handle_catalog(spark: pyspark.sql.session.SparkSession)

Function used to add the catalog name to the spark configurations. The environment will be extracted from the cluster tag.

The following information are added to the spark session:

  • catalog_name

Args

spark : SparkSession
current spark session.
def handle_dataframe(spark: pyspark.sql.session.SparkSession, sdf: pyspark.sql.dataframe.DataFrame, view_name: str, select_cols: Optional[List[str]] = None, round: bool = False)

Function used to handle the passage of information regarding a dataframe to scala. It saves a spark dataframe as a global temp view, selecting and rounding columns if required. A loaddate_ column is added to the dataframe with the current timestamp.

The following information are added to the spark configuration:

  • view_name

Args

spark : SparkSession
current spark session.
sdf : DataFrame
dataframe to save.
view_name : str
name of the view holding dataframe data.
select_cols : Optional[List[str]]
columns to keep. If not provided, all columns are kept. Defaults to None.
round : bool, optional
whether to round double columns or not. Defaults to False.
def handle_dataframe_destination(spark: pyspark.sql.session.SparkSession, adls_name: str, container: str, schema: str, table_name: str, schema_suffix: Optional[Literal['_of', '_oi', '']] = None)

Function used to handle a dataframe destination. The following information will be added to the spark configuration:

  • schema
  • table_name
  • schema_suffix
  • destination_path
  • path_prefix

Args

spark : SparkSession
current spark session.
adls_name : str
data lake storage name where data will be written.
container : str
container where data will be written.
schema : str
schema where data will be written.
table_name : str
target table name.

schema_suffix (Optional[Literal["_of", "_oi", ""]], optional): schema suffix. Defaults to None.

def handle_launch_reference(spark: pyspark.sql.session.SparkSession, launch_reference: Optional[str] = None)

Function used to add the launch reference, if provided, to the spark configuration.

Args

spark : SparkSession
current spark session.
launch_reference : Optional[str], optional
launch reference. Defaults to None.
def handle_other_conf(spark: pyspark.sql.session.SparkSession, other_conf: Optional[Dict[str, Any]] = None)

Function used to handle any optional spark configuration, passed as a dictionary.

Args

spark : SparkSession
current spark session
other_conf : Optional[Dict[str, Any]], optional
other spark configuration values as a dictionary. Defaults to None.
def handle_scala_run_id(spark: pyspark.sql.session.SparkSession, scala_run_id: Optional[str] = None)

Function used to add the scala_run_id, if provided, to the spark configuration.

Args

spark : SparkSession
current spark session
scala_run_id : Optional[str], optional
scala run id, previously extracted. Defaults to None.
def handover_write_to_scala(spark: pyspark.sql.session.SparkSession, sdf: Optional[pyspark.sql.dataframe.DataFrame] = None, view_name: Optional[str] = None, select_cols: Optional[List[str]] = None, round: bool = False, table_name: Optional[str] = None, adls_name: Optional[str] = None, container: Optional[str] = None, schema: Optional[str] = None, schema_suffix: Optional[Literal['_of', '_oi', '']] = None, launch_reference: Optional[str] = None, scala_run_id: Optional[str] = None, other_conf: Optional[Dict[str, Any]] = None) ‑> None

Master method used to handle any value that should be handled in scala. Check the required parameters for every step. Parameters will be saved as spark configurations.

If a dataframe is passed, the view_name is expected, as the dataframe will be saved as a global temporary view.

If the dataframe is to be saved, table_name is expected. If a table_name is passed, it is MANDATORY to set adls_name, container and schema. schema_suffix can be avoided, but will be handled with those configurations.

launch_reference, scala_run_id and other_conf are optional, and if missing will be ignored.

Args

spark : SparkSession
current spark session.
sdf : Optional[DataFrame], optional
dataframe to save as a temporary view. Defaults to None.
view_name : Optional[str], optional
view name for the dataframe; unused if sdf is not provided. Defaults to None.
select_cols : Optional[List[str]], optional
columns to keep from sdf; unused if sdf is not provided. Defaults to None.
round : bool, optional
whether to round double columns in sdf; unused if sdf is not provided. Defaults to False.
table_name : Optional[str], optional
name of the table where data in sdf will be saved. Defaults to None.
adls_name : Optional[str], optional
name of the datalake holding the container, schema and table where data in sdf will be written; unused if table_name is not provided. Defaults to None.
container : Optional[str], optional
name of the container holding schema and table where data in sdf will be written; unused if table_name is not provided. Defaults to None.
schema : Optional[str], optional
schema holding the table where data in sdf will be written; unused if table_name is not provided. Defaults to None.
schema_suffix (Optional[Literal["_of", "_oi", ""]], optional): schema suffix; unused if table_name is not provided. Defaults to None.
launch_reference : Optional[str], optional
current launch reference. Defaults to None.
scala_run_id : Optional[str], optional
run id for the scala writing. Defaults to None.
other_conf : Optional[Dict[str, Any]], optional
optional configurations. Defaults to None.

Raises

ValueError
if an sdf is passed, a view_name must be provided.
ValueError
if table_name is provided, also adls_name, container and schema must be provided.