Module panama.utils.spark_df

Functions

def add_col_date(sdf: pyspark.sql.dataframe.DataFrame, granularity: str, date_col: str = 'date', output_col: Optional[str] = None) ‑> pyspark.sql.dataframe.DataFrame

Adds a new column to the DataFrame with the specified granularity.

Args

sdf : DataFrame
DataFrame with the new date column to trunc.
granularity : str
a valid granularity.
date_col : str, optional
the date column name to transform.
output_col : Union[str, None], optional
the column name for the new date column created. Defaults to None.

Returns: DataFrame: DataFrame with the new date column.

def add_linear_combination(sdf: pyspark.sql.dataframe.DataFrame, coefficients: List, colnames: List, output_col: str) ‑> pyspark.sql.dataframe.DataFrame

Add a column to a spark dataframe computing the element-wise linear combination of two set columns: sum(i=1…n)(coefficients_icolnames_i) if n = len(coefficients) = len(colnames) sum(i=1…n)(coefficients_icolnames_i) + coefficients_n+1 if n = len(coefficients) - 1 = len(colnames)

Args

sdf
dataframe to be processed
coefficient
List of coefficient, they can be both a string indicating a valid column name in sdf or a number indicating the coefficient to be consider for all sdf rows
colnames
List of valid column_names in sdf
output_col
name of the output_col containing the linear combination

Returns

the original sdf with output_col added.

def cache_or_persist_sdf(sdf: pyspark.sql.dataframe.DataFrame, mode: Optional[str] = None) ‑> None

Perform a persist or cache step and then an action count on a given spark dataframe.

Args

sdf : DataFrame
dataframe to cache or persist.
mode : str
what caching operation to perform. Accepted values are 'cache' and 'persist'.
def format_aggregated_columns(sdf: pyspark.sql.dataframe.DataFrame) ‑> pyspark.sql.dataframe.DataFrame

Function to format aggregated columns. Runs through all the columns, renaming those matching a pattern: namely, "aggr_func(col_name)" is renamed with "col_name".

Args

sdf : DataFrame
dataframe whose columns are to be renamed.

Returns

DataFrame
DataFrame with renamed columns.
def from_list_to_str(list_to_convert: list) ‑> Union[str, List[str]]

Returns a string if it is a list with only one element, otherwise returns the original list.

Args

list_to_convert : list
the list to convert in string.

Returns

Union[str, List[str]]
the final string or the original list if it has more elements.
def get_col_as_list(sdf: pyspark.sql.dataframe.DataFrame, colname: str, distinct: bool = False, drop_null: bool = False) ‑> List[Any]

Return columns values as python list.

Args

sdf
dataframe to be processed.
colname
name of the column to be extracted.
distinct
if True return all or only distinct values.
drop_null
if True drops null before returning result.

Returns

list containing column values.

def get_composite_conv_factor(udm_from: Union[str, List[str]], udm_to: Union[str, List[str]], conv_fact: pyspark.sql.dataframe.DataFrame) ‑> list

Returns the conversion factor given also composite unit of measures (ex. EUR/MWh).

Args

udm_from : Union[str, List[str]]
the ordered starting unit of measures.
udm_to : Union[str, List[str]]
the ordered desired unit of measures.
conv_fact : DataFrame
DataFrame with conversion factors.

Returns

list
the final conversion factors.
def get_join_expr(first_key: list, second_key: list, interval_join: bool = False, first_alias='x', second_alias='y') ‑> pyspark.sql.column.Column

Returns the join expression given two key lists.

Args

first_key : list
list with key column names of first DataFrame.
second_key : list
list with key column names of second DataFrame.
interval_join : bool
boolean flag if the join must contain expression like 'date between start and end'.

Returns

Column
the final join expression.
def get_single_conv_factor(udm_from: Union[str, Sequence[Optional[str]], ForwardRef(None)], udm_to: Union[str, Sequence[Optional[str]], ForwardRef(None)], conv_fact: pyspark.sql.dataframe.DataFrame) ‑> list

Returns the conversion factor given single unit of measures.

Args

udm_from : Union[str, List[Union[str, None]], None]
the ordered starting unit of measures.
udm_to : Union[str, List[Union[str, None]], None]
the ordered desired unit of measures.
conv_fact : DataFrame
DataFrame with conversion factors.

Returns

list
the final conversion factors.
def join_coalesce(sdf_x: pyspark.sql.dataframe.DataFrame, sdf_y: pyspark.sql.dataframe.DataFrame, on: Union[str, list, pyspark.sql.column.Column], x_cols: List, y_cols: Optional[List] = None, how: str = 'fullouter', aliases: Optional[List] = None, is_priority_left: bool = True, drop_original_cols: bool = True) ‑> pyspark.sql.dataframe.DataFrame

Merge two dataframes and then coalesce on or more pair of columns (coming from both dataframes) using a specific priority rule, by default columns in the left dataframe will have priority. User can also define merge behaviour.

Args

left : DataFrame
one of the two spark dataframes to join (will be treated as left table in the join operation) and will have the default priority.
right : DataFrame
one of the two spark dataframes to join (will be treated as right table in the join operation).
on : str, list, Column
a string for the join column name, a list of column names, a join expression (Column), or a list of Columns. If on is a string or a list of strings indicating the name of the join column(s), the column(s) must exist on both sides, and this performs an equi-join.
how : str
default fullouter. Must be one of: inner, cross, outer, full, fullouter, full_outer, left, leftouter, left_outer, right, rightouter, right_outer, semi, leftsemi, left_semi, anti, leftanti and left_anti.
x_cols : List
ordered list of columns to coalesce from left dataframe. Order must be the same as "y_cols"
y_cols : Union[List, None]
ordered list of columns to coalesce from left dataframe. Order must be the same as "x_cols". If empty the names will be considered the same as x_cols.
aliases : Union[List, None]
ordered list of new name to give to coalesced columns. If empty the new columns will be called as the original columns in the left dataframe.

is_priority_left (bool (Optional)): indicates if the columns in the left dataframe have priority. Default is True. drop_original_cols (bool (Optional)): indicates if the originale columns need to be dropped after coalesce step. Default is True.

Returns

DataFrame
new dataframe is the result of the combined merge and coalesce operations.
def join_coalesce_common_cols(sdf_x: pyspark.sql.dataframe.DataFrame, sdf_y: pyspark.sql.dataframe.DataFrame, on: Union[str, list, pyspark.sql.column.Column], how: str = 'inner') ‑> pyspark.sql.dataframe.DataFrame

Join two dataframes and remove duplicated columns by coalescing the columns with the same name in sdf_x and sdf_y

Args

sdf_x : DataFrame
one of the two spark dataframes to join (will be treated as left table in the join operation) and will have priority in coalesce.
sdf_y : DataFrame
one of the two spark dataframes to join (will be treated as right table in the join operation).
on : str, list
a string for the join column name, a list of column names, a join expression (Column), or a list of Columns. If on is a string or a list of strings indicating the name of the join column(s), the column(s) must exist on both sides, and this performs an equi-join.
how : str
default fullouter. Must be one of: inner, cross, outer, full, fullouter, full_outer, left, leftouter, left_outer, right, rightouter, right_outer, semi, leftsemi, left_semi, anti, leftanti and left_anti.

Returns

DataFrame
new dataframe is the result of the combined merge and coalesce operations.