Module panama.utils.scala_utils
Functions
def get_last_run_timestamp(spark: pyspark.sql.session.SparkSession,
run_table_name: str,
launch_reference: str) ‑> str-
Expand source code
def get_last_run_timestamp(spark: SparkSession, run_table_name: str, launch_reference: str) -> str: """ Get the last run date This function take a run table name and a launch reference and return the last loaddate_ previous to the launch_reference. Args: run_table_name (str): The name of the run table (it must have loaddate_ and launch_reference columns). launch_reference (str): The launch reference used for filtering the data view and runs view. Returns: str: the last run timestamp. """ run_table = spark.table(run_table_name) run_date = get_col_as_list( run_table.filter(F.to_date("launch_reference") < launch_reference).select( F.max("loaddate_").alias("loaddate_") ), colname="loaddate_", )[0] return run_date
Get the last run date
This function take a run table name and a launch reference and return the last loaddate_ previous to the launch_reference.
Args
run_table_name
:str
- The name of the run table (it must have loaddate_ and launch_reference columns).
launch_reference
:str
- The launch reference used for filtering the data view and runs view.
Returns
str
- the last run timestamp.
def get_last_valid_type2_snapshot(spark: pyspark.sql.session.SparkSession,
data_view: str,
runs_view: str,
launch_reference: str,
scenario: str) ‑> pyspark.sql.dataframe.DataFrame-
Expand source code
def get_last_valid_type2_snapshot( spark: SparkSession, data_view: str, runs_view: str, launch_reference: str, scenario: str ) -> DataFrame: """ Get the last valid Type 2 snapshot DataFrame. This function performs a SQL query using the Spark session to select the last valid Type 2 table data snapshot from the data_view table. The snapshot is filtered based on the launch_reference and the state of the corresponding run in the runs_view table. The result is returned as a DataFrame. Args: data_view (str): The name of the data temp view table. runs_view (str): The name of the runs temp view table. launch_reference (str): The launch reference used for filtering the data view and runs view. Returns: DataFrame: The last valid Type 2 snapshot DataFrame. """ query = f"""select * from {data_view} where ( select max(run_id) from {runs_view} where launch_reference <= '{launch_reference}' and state == 'FINISHED' ) between start_run and coalesce(end_run, 1e15) and scenario='{scenario}' """ return spark.sql(query)
Get the last valid Type 2 snapshot DataFrame.
This function performs a SQL query using the Spark session to select the last valid Type 2 table data snapshot from the data_view table. The snapshot is filtered based on the launch_reference and the state of the corresponding run in the runs_view table. The result is returned as a DataFrame.
Args
data_view
:str
- The name of the data temp view table.
runs_view
:str
- The name of the runs temp view table.
launch_reference
:str
- The launch reference used for filtering the data view and runs view.
Returns
DataFrame
- The last valid Type 2 snapshot DataFrame.
def handle_catalog(spark: pyspark.sql.session.SparkSession)
-
Expand source code
def handle_catalog(spark: SparkSession): """Function used to add the catalog name to the spark configurations. The environment will be extracted from the cluster tag. The following information are added to the spark session: * catalog_name Args: spark (SparkSession): current spark session. """ catalog = f"{get_env_from_cluster_tag(spark)}_analytics" spark.conf.set("catalog_name", catalog)
Function used to add the catalog name to the spark configurations. The environment will be extracted from the cluster tag.
The following information are added to the spark session:
- catalog_name
Args
spark
:SparkSession
- current spark session.
def handle_dataframe(spark: pyspark.sql.session.SparkSession,
sdf: pyspark.sql.dataframe.DataFrame,
view_name: str,
select_cols: List[str] | None = None,
round: bool = False)-
Expand source code
def handle_dataframe( spark: SparkSession, sdf: DataFrame, view_name: str, select_cols: Optional[List[str]] = None, round: bool = False ): """Function used to handle the passage of information regarding a dataframe to scala. It saves a spark dataframe as a global temp view, selecting and rounding columns if required. A loaddate_ column is added to the dataframe with the current timestamp. The following information are added to the spark configuration: * view_name Args: spark (SparkSession): current spark session. sdf (DataFrame): dataframe to save. view_name (str): name of the view holding dataframe data. select_cols (Optional[List[str]]): columns to keep. If not provided, all columns are kept. Defaults to None. round (bool, optional): whether to round double columns or not. Defaults to False. """ if select_cols is not None: sdf = sdf.select(*select_cols) if round is True: round_cols = [F.round(c[0], 6).alias(c[0]) if c[1] == "double" else F.col(c[0]) for c in sdf.dtypes] sdf = sdf.select(*round_cols) sdf = sdf.withColumn("loaddate_", F.lit(datetime.now(tz=timezone("Europe/Rome")))) sdf.createOrReplaceGlobalTempView(view_name) spark.conf.set("view_name", view_name)
Function used to handle the passage of information regarding a dataframe to scala. It saves a spark dataframe as a global temp view, selecting and rounding columns if required. A loaddate_ column is added to the dataframe with the current timestamp.
The following information are added to the spark configuration:
- view_name
Args
spark
:SparkSession
- current spark session.
sdf
:DataFrame
- dataframe to save.
view_name
:str
- name of the view holding dataframe data.
select_cols
:Optional[List[str]]
- columns to keep. If not provided, all columns are kept. Defaults to None.
round
:bool
, optional- whether to round double columns or not. Defaults to False.
def handle_dataframe_destination(spark: pyspark.sql.session.SparkSession,
adls_name: str,
container: str,
schema: str,
table_name: str,
schema_suffix: Literal['_of', '_oi', ''] | None = None)-
Expand source code
def handle_dataframe_destination( spark: SparkSession, adls_name: str, container: str, schema: str, table_name: str, schema_suffix: Optional[Literal["_of", "_oi", ""]] = None, ): """Function used to handle a dataframe destination. The following information will be added to the spark configuration: * schema * table_name * schema_suffix * destination_path * path_prefix Args: spark (SparkSession): current spark session. adls_name (str): data lake storage name where data will be written. container (str): container where data will be written. schema (str): schema where data will be written. table_name (str): target table name. schema_suffix (Optional[Literal["_of", "_oi", ""]], optional): schema suffix. Defaults to None. """ if schema_suffix is None: warnings.warn("Missing schema_suffix, an empty string will be placed. Please check if that is correct") schema_suffix = "" destination_path = IOAdls(spark).generate_connection_string( adls_name=adls_name, container=container, path=f"{schema}/{table_name}" ) path_prefix = IOAdls(spark).generate_connection_string(adls_name=adls_name, container=container, path=f"{schema}") spark.conf.set("schema", schema) spark.conf.set("table_name", table_name) spark.conf.set("schema_suffix", schema_suffix) spark.conf.set("destination_path", destination_path) spark.conf.set("path_prefix", path_prefix)
Function used to handle a dataframe destination. The following information will be added to the spark configuration:
- schema
- table_name
- schema_suffix
- destination_path
- path_prefix
Args
spark
:SparkSession
- current spark session.
adls_name
:str
- data lake storage name where data will be written.
container
:str
- container where data will be written.
schema
:str
- schema where data will be written.
table_name
:str
- target table name.
schema_suffix (Optional[Literal["_of", "_oi", ""]], optional): schema suffix. Defaults to None.
def handle_launch_reference(spark: pyspark.sql.session.SparkSession, launch_reference: str | None = None)
-
Expand source code
def handle_launch_reference(spark: SparkSession, launch_reference: Optional[str] = None): """Function used to add the launch reference, if provided, to the spark configuration. Args: spark (SparkSession): current spark session. launch_reference (Optional[str], optional): launch reference. Defaults to None. """ if launch_reference is not None: spark.conf.set("launch_reference", launch_reference)
Function used to add the launch reference, if provided, to the spark configuration.
Args
spark
:SparkSession
- current spark session.
launch_reference
:Optional[str]
, optional- launch reference. Defaults to None.
def handle_other_conf(spark: pyspark.sql.session.SparkSession,
other_conf: Dict[str, Any] | None = None)-
Expand source code
def handle_other_conf(spark: SparkSession, other_conf: Optional[Dict[str, Any]] = None): """Function used to handle any optional spark configuration, passed as a dictionary. Args: spark (SparkSession): current spark session other_conf (Optional[Dict[str, Any]], optional): other spark configuration values as a dictionary. Defaults to None. """ if other_conf is not None: for k, v in other_conf.items(): spark.conf.set(k, v)
Function used to handle any optional spark configuration, passed as a dictionary.
Args
spark
:SparkSession
- current spark session
other_conf
:Optional[Dict[str, Any]]
, optional- other spark configuration values as a dictionary. Defaults to None.
def handle_scala_run_id(spark: pyspark.sql.session.SparkSession, scala_run_id: str | None = None)
-
Expand source code
def handle_scala_run_id(spark: SparkSession, scala_run_id: Optional[str] = None): """Function used to add the scala_run_id, if provided, to the spark configuration. Args: spark (SparkSession): current spark session scala_run_id (Optional[str], optional): scala run id, previously extracted. Defaults to None. """ if scala_run_id is not None: spark.conf.set("scala_run_id", scala_run_id)
Function used to add the scala_run_id, if provided, to the spark configuration.
Args
spark
:SparkSession
- current spark session
scala_run_id
:Optional[str]
, optional- scala run id, previously extracted. Defaults to None.
def handover_write_to_scala(spark: pyspark.sql.session.SparkSession,
sdf: pyspark.sql.dataframe.DataFrame | None = None,
view_name: str | None = None,
select_cols: List[str] | None = None,
round: bool = False,
table_name: str | None = None,
adls_name: str | None = None,
container: str | None = None,
schema: str | None = None,
schema_suffix: Literal['_of', '_oi', ''] | None = None,
launch_reference: str | None = None,
scala_run_id: str | None = None,
other_conf: Dict[str, Any] | None = None) ‑> None-
Expand source code
def handover_write_to_scala( spark: SparkSession, sdf: Optional[DataFrame] = None, view_name: Optional[str] = None, select_cols: Optional[List[str]] = None, round: bool = False, table_name: Optional[str] = None, adls_name: Optional[str] = None, container: Optional[str] = None, schema: Optional[str] = None, schema_suffix: Optional[Literal["_of", "_oi", ""]] = None, launch_reference: Optional[str] = None, scala_run_id: Optional[str] = None, other_conf: Optional[Dict[str, Any]] = None, ) -> None: """Master method used to handle any value that should be handled in scala. Check the required parameters for every step. Parameters will be saved as spark configurations. If a dataframe is passed, the view_name is expected, as the dataframe will be saved as a global temporary view. If the dataframe is to be saved, table_name is expected. If a table_name is passed, it is MANDATORY to set adls_name, container and schema. schema_suffix can be avoided, but will be handled with those configurations. launch_reference, scala_run_id and other_conf are optional, and if missing will be ignored. Args: spark (SparkSession): current spark session. sdf (Optional[DataFrame], optional): dataframe to save as a temporary view. Defaults to None. view_name (Optional[str], optional): view name for the dataframe; unused if sdf is not provided. Defaults to None. select_cols (Optional[List[str]], optional): columns to keep from sdf; unused if sdf is not provided. Defaults to None. round (bool, optional): whether to round double columns in sdf; unused if sdf is not provided. Defaults to False. table_name (Optional[str], optional): name of the table where data in sdf will be saved. Defaults to None. adls_name (Optional[str], optional): name of the datalake holding the container, schema and table where data in sdf will be written; unused if table_name is not provided. Defaults to None. container (Optional[str], optional): name of the container holding schema and table where data in sdf will be written; unused if table_name is not provided. Defaults to None. schema (Optional[str], optional): schema holding the table where data in sdf will be written; unused if table_name is not provided. Defaults to None. schema_suffix (Optional[Literal["_of", "_oi", ""]], optional): schema suffix; unused if table_name is not provided. Defaults to None. launch_reference (Optional[str], optional): current launch reference. Defaults to None. scala_run_id (Optional[str], optional): run id for the scala writing. Defaults to None. other_conf (Optional[Dict[str, Any]], optional): optional configurations. Defaults to None. Raises: ValueError: if an sdf is passed, a view_name must be provided. ValueError: if table_name is provided, also adls_name, container and schema must be provided. """ if sdf is not None: if view_name is None: raise ValueError("if an sdf is passed, also a view_name must be provided, None was found.") handle_dataframe(spark=spark, sdf=sdf, view_name=view_name, select_cols=select_cols, round=round) if table_name is None: warnings.warn("No table_name have been passed: no paths will be set in spark configuration.") elif (adls_name is None) or (container is None) or (schema is None): raise ValueError( "Missing adls_name, container or schema: these are mandatory to handle the writing with scala." ) else: handle_dataframe_destination( spark=spark, adls_name=adls_name, container=container, schema_suffix=schema_suffix, schema=schema, table_name=table_name, ) handle_catalog(spark=spark) handle_launch_reference(spark=spark, launch_reference=launch_reference) handle_scala_run_id(spark=spark, scala_run_id=scala_run_id) handle_other_conf(spark=spark, other_conf=other_conf)
Master method used to handle any value that should be handled in scala. Check the required parameters for every step. Parameters will be saved as spark configurations.
If a dataframe is passed, the view_name is expected, as the dataframe will be saved as a global temporary view.
If the dataframe is to be saved, table_name is expected. If a table_name is passed, it is MANDATORY to set adls_name, container and schema. schema_suffix can be avoided, but will be handled with those configurations.
launch_reference, scala_run_id and other_conf are optional, and if missing will be ignored.
Args
spark
:SparkSession
- current spark session.
sdf
:Optional[DataFrame]
, optional- dataframe to save as a temporary view. Defaults to None.
view_name
:Optional[str]
, optional- view name for the dataframe; unused if sdf is not provided. Defaults to None.
select_cols
:Optional[List[str]]
, optional- columns to keep from sdf; unused if sdf is not provided. Defaults to None.
round
:bool
, optional- whether to round double columns in sdf; unused if sdf is not provided. Defaults to False.
table_name
:Optional[str]
, optional- name of the table where data in sdf will be saved. Defaults to None.
adls_name
:Optional[str]
, optional- name of the datalake holding the container, schema and table where data in sdf will be written; unused if table_name is not provided. Defaults to None.
container
:Optional[str]
, optional- name of the container holding schema and table where data in sdf will be written; unused if table_name is not provided. Defaults to None.
schema
:Optional[str]
, optional- schema holding the table where data in sdf will be written; unused if table_name is not provided. Defaults to None.
- schema_suffix (Optional[Literal["_of", "_oi", ""]], optional): schema suffix; unused if table_name is not provided. Defaults to None.
launch_reference
:Optional[str]
, optional- current launch reference. Defaults to None.
scala_run_id
:Optional[str]
, optional- run id for the scala writing. Defaults to None.
other_conf
:Optional[Dict[str, Any]]
, optional- optional configurations. Defaults to None.
Raises
ValueError
- if an sdf is passed, a view_name must be provided.
ValueError
- if table_name is provided, also adls_name, container and schema must be provided.