Module panama.utils.scala_utils
Functions
def get_last_run_timestamp(spark: pyspark.sql.session.SparkSession, run_table_name: str, launch_reference: str) ‑> str
-
Get the last run date
This function take a run table name and a launch reference and return the last loaddate_ previous to the launch_reference.
Args
run_table_name
:str
- The name of the run table (it must have loaddate_ and launch_reference columns).
launch_reference
:str
- The launch reference used for filtering the data view and runs view.
Returns
str
- the last run timestamp.
def get_last_valid_type2_snapshot(spark: pyspark.sql.session.SparkSession, data_view: str, runs_view: str, launch_reference: str, scenario: str) ‑> pyspark.sql.dataframe.DataFrame
-
Get the last valid Type 2 snapshot DataFrame.
This function performs a SQL query using the Spark session to select the last valid Type 2 table data snapshot from the data_view table. The snapshot is filtered based on the launch_reference and the state of the corresponding run in the runs_view table. The result is returned as a DataFrame.
Args
data_view
:str
- The name of the data temp view table.
runs_view
:str
- The name of the runs temp view table.
launch_reference
:str
- The launch reference used for filtering the data view and runs view.
Returns
DataFrame
- The last valid Type 2 snapshot DataFrame.
def handle_catalog(spark: pyspark.sql.session.SparkSession)
-
Function used to add the catalog name to the spark configurations. The environment will be extracted from the cluster tag.
The following information are added to the spark session:
- catalog_name
Args
spark
:SparkSession
- current spark session.
def handle_dataframe(spark: pyspark.sql.session.SparkSession, sdf: pyspark.sql.dataframe.DataFrame, view_name: str, select_cols: Optional[List[str]] = None, round: bool = False)
-
Function used to handle the passage of information regarding a dataframe to scala. It saves a spark dataframe as a global temp view, selecting and rounding columns if required. A loaddate_ column is added to the dataframe with the current timestamp.
The following information are added to the spark configuration:
- view_name
Args
spark
:SparkSession
- current spark session.
sdf
:DataFrame
- dataframe to save.
view_name
:str
- name of the view holding dataframe data.
select_cols
:Optional[List[str]]
- columns to keep. If not provided, all columns are kept. Defaults to None.
round
:bool
, optional- whether to round double columns or not. Defaults to False.
def handle_dataframe_destination(spark: pyspark.sql.session.SparkSession, adls_name: str, container: str, schema: str, table_name: str, schema_suffix: Optional[Literal['_of', '_oi', '']] = None)
-
Function used to handle a dataframe destination. The following information will be added to the spark configuration:
- schema
- table_name
- schema_suffix
- destination_path
- path_prefix
Args
spark
:SparkSession
- current spark session.
adls_name
:str
- data lake storage name where data will be written.
container
:str
- container where data will be written.
schema
:str
- schema where data will be written.
table_name
:str
- target table name.
schema_suffix (Optional[Literal["_of", "_oi", ""]], optional): schema suffix. Defaults to None.
def handle_launch_reference(spark: pyspark.sql.session.SparkSession, launch_reference: Optional[str] = None)
-
Function used to add the launch reference, if provided, to the spark configuration.
Args
spark
:SparkSession
- current spark session.
launch_reference
:Optional[str]
, optional- launch reference. Defaults to None.
def handle_other_conf(spark: pyspark.sql.session.SparkSession, other_conf: Optional[Dict[str, Any]] = None)
-
Function used to handle any optional spark configuration, passed as a dictionary.
Args
spark
:SparkSession
- current spark session
other_conf
:Optional[Dict[str, Any]]
, optional- other spark configuration values as a dictionary. Defaults to None.
def handle_scala_run_id(spark: pyspark.sql.session.SparkSession, scala_run_id: Optional[str] = None)
-
Function used to add the scala_run_id, if provided, to the spark configuration.
Args
spark
:SparkSession
- current spark session
scala_run_id
:Optional[str]
, optional- scala run id, previously extracted. Defaults to None.
def handover_write_to_scala(spark: pyspark.sql.session.SparkSession, sdf: Optional[pyspark.sql.dataframe.DataFrame] = None, view_name: Optional[str] = None, select_cols: Optional[List[str]] = None, round: bool = False, table_name: Optional[str] = None, adls_name: Optional[str] = None, container: Optional[str] = None, schema: Optional[str] = None, schema_suffix: Optional[Literal['_of', '_oi', '']] = None, launch_reference: Optional[str] = None, scala_run_id: Optional[str] = None, other_conf: Optional[Dict[str, Any]] = None) ‑> None
-
Master method used to handle any value that should be handled in scala. Check the required parameters for every step. Parameters will be saved as spark configurations.
If a dataframe is passed, the view_name is expected, as the dataframe will be saved as a global temporary view.
If the dataframe is to be saved, table_name is expected. If a table_name is passed, it is MANDATORY to set adls_name, container and schema. schema_suffix can be avoided, but will be handled with those configurations.
launch_reference, scala_run_id and other_conf are optional, and if missing will be ignored.
Args
spark
:SparkSession
- current spark session.
sdf
:Optional[DataFrame]
, optional- dataframe to save as a temporary view. Defaults to None.
view_name
:Optional[str]
, optional- view name for the dataframe; unused if sdf is not provided. Defaults to None.
select_cols
:Optional[List[str]]
, optional- columns to keep from sdf; unused if sdf is not provided. Defaults to None.
round
:bool
, optional- whether to round double columns in sdf; unused if sdf is not provided. Defaults to False.
table_name
:Optional[str]
, optional- name of the table where data in sdf will be saved. Defaults to None.
adls_name
:Optional[str]
, optional- name of the datalake holding the container, schema and table where data in sdf will be written; unused if table_name is not provided. Defaults to None.
container
:Optional[str]
, optional- name of the container holding schema and table where data in sdf will be written; unused if table_name is not provided. Defaults to None.
schema
:Optional[str]
, optional- schema holding the table where data in sdf will be written; unused if table_name is not provided. Defaults to None.
- schema_suffix (Optional[Literal["_of", "_oi", ""]], optional): schema suffix; unused if table_name is not provided. Defaults to None.
launch_reference
:Optional[str]
, optional- current launch reference. Defaults to None.
scala_run_id
:Optional[str]
, optional- run id for the scala writing. Defaults to None.
other_conf
:Optional[Dict[str, Any]]
, optional- optional configurations. Defaults to None.
Raises
ValueError
- if an sdf is passed, a view_name must be provided.
ValueError
- if table_name is provided, also adls_name, container and schema must be provided.