Module panama.utils.spark_df
Functions
def add_col_date(sdf: pyspark.sql.dataframe.DataFrame, granularity: str, date_col: str = 'date', output_col: Optional[str] = None) ‑> pyspark.sql.dataframe.DataFrame-
Adds a new column to the DataFrame with the specified granularity.
Args
sdf:DataFrame- DataFrame with the new date column to trunc.
granularity:str- a valid granularity.
date_col:str, optional- the date column name to transform.
output_col:Union[str, None], optional- the column name for the new date column created. Defaults to None.
Returns: DataFrame: DataFrame with the new date column.
def add_linear_combination(sdf: pyspark.sql.dataframe.DataFrame, coefficients: List, colnames: List, output_col: str) ‑> pyspark.sql.dataframe.DataFrame-
Add a column to a spark dataframe computing the element-wise linear combination of two set columns: sum(i=1…n)(coefficients_icolnames_i) if n = len(coefficients) = len(colnames) sum(i=1…n)(coefficients_icolnames_i) + coefficients_n+1 if n = len(coefficients) - 1 = len(colnames)
Args
sdf- dataframe to be processed
coefficient- List of coefficient, they can be both a string indicating a valid column name in sdf or a number indicating the coefficient to be consider for all sdf rows
colnames- List of valid column_names in sdf
output_col- name of the output_col containing the linear combination
Returns
the original sdf with output_col added.
def cache_or_persist_sdf(sdf: pyspark.sql.dataframe.DataFrame, mode: Optional[str] = None) ‑> None-
Perform a persist or cache step and then an action count on a given spark dataframe.
Args
sdf:DataFrame- dataframe to cache or persist.
mode:str- what caching operation to perform. Accepted values are 'cache' and 'persist'.
def format_aggregated_columns(sdf: pyspark.sql.dataframe.DataFrame) ‑> pyspark.sql.dataframe.DataFrame-
Function to format aggregated columns. Runs through all the columns, renaming those matching a pattern: namely, "aggr_func(col_name)" is renamed with "col_name".
Args
sdf:DataFrame- dataframe whose columns are to be renamed.
Returns
DataFrame- DataFrame with renamed columns.
def from_list_to_str(list_to_convert: list) ‑> Union[str, List[str]]-
Returns a string if it is a list with only one element, otherwise returns the original list.
Args
list_to_convert:list- the list to convert in string.
Returns
Union[str, List[str]]- the final string or the original list if it has more elements.
def get_col_as_list(sdf: pyspark.sql.dataframe.DataFrame, colname: str, distinct: bool = False, drop_null: bool = False) ‑> List[Any]-
Return columns values as python list.
Args
sdf- dataframe to be processed.
colname- name of the column to be extracted.
distinct- if True return all or only distinct values.
drop_null- if True drops null before returning result.
Returns
list containing column values.
def get_composite_conv_factor(udm_from: Union[str, List[str]], udm_to: Union[str, List[str]], conv_fact: pyspark.sql.dataframe.DataFrame) ‑> list-
Returns the conversion factor given also composite unit of measures (ex. EUR/MWh).
Args
udm_from:Union[str, List[str]]- the ordered starting unit of measures.
udm_to:Union[str, List[str]]- the ordered desired unit of measures.
conv_fact:DataFrame- DataFrame with conversion factors.
Returns
list- the final conversion factors.
def get_join_expr(first_key: list, second_key: list, interval_join: bool = False, first_alias='x', second_alias='y') ‑> pyspark.sql.column.Column-
Returns the join expression given two key lists.
Args
first_key:list- list with key column names of first DataFrame.
second_key:list- list with key column names of second DataFrame.
interval_join:bool- boolean flag if the join must contain expression like 'date between start and end'.
Returns
Column- the final join expression.
def get_single_conv_factor(udm_from: Union[str, Sequence[Optional[str]], ForwardRef(None)], udm_to: Union[str, Sequence[Optional[str]], ForwardRef(None)], conv_fact: pyspark.sql.dataframe.DataFrame) ‑> list-
Returns the conversion factor given single unit of measures.
Args
udm_from:Union[str, List[Union[str, None]], None]- the ordered starting unit of measures.
udm_to:Union[str, List[Union[str, None]], None]- the ordered desired unit of measures.
conv_fact:DataFrame- DataFrame with conversion factors.
Returns
list- the final conversion factors.
def join_coalesce(sdf_x: pyspark.sql.dataframe.DataFrame, sdf_y: pyspark.sql.dataframe.DataFrame, on: Union[str, list, pyspark.sql.column.Column], x_cols: List, y_cols: Optional[List] = None, how: str = 'fullouter', aliases: Optional[List] = None, is_priority_left: bool = True, drop_original_cols: bool = True) ‑> pyspark.sql.dataframe.DataFrame-
Merge two dataframes and then coalesce on or more pair of columns (coming from both dataframes) using a specific priority rule, by default columns in the left dataframe will have priority. User can also define merge behaviour.
Args
left:DataFrame- one of the two spark dataframes to join (will be treated as left table in the join operation) and will have the default priority.
right:DataFrame- one of the two spark dataframes to join (will be treated as right table in the join operation).
on:str, list, Column- a string for the join column name, a list of column names, a join expression (Column), or a list of Columns. If on is a string or a list of strings indicating the name of the join column(s), the column(s) must exist on both sides, and this performs an equi-join.
how:str- default fullouter. Must be one of: inner, cross, outer, full, fullouter, full_outer, left, leftouter, left_outer, right, rightouter, right_outer, semi, leftsemi, left_semi, anti, leftanti and left_anti.
x_cols:List- ordered list of columns to coalesce from left dataframe. Order must be the same as "y_cols"
y_cols:Union[List, None]- ordered list of columns to coalesce from left dataframe. Order must be the same as "x_cols". If empty the names will be considered the same as x_cols.
aliases:Union[List, None]- ordered list of new name to give to coalesced columns. If empty the new columns will be called as the original columns in the left dataframe.
is_priority_left (bool (Optional)): indicates if the columns in the left dataframe have priority. Default is True. drop_original_cols (bool (Optional)): indicates if the originale columns need to be dropped after coalesce step. Default is True.
Returns
DataFrame- new dataframe is the result of the combined merge and coalesce operations.
def join_coalesce_common_cols(sdf_x: pyspark.sql.dataframe.DataFrame, sdf_y: pyspark.sql.dataframe.DataFrame, on: Union[str, list, pyspark.sql.column.Column], how: str = 'inner') ‑> pyspark.sql.dataframe.DataFrame-
Join two dataframes and remove duplicated columns by coalescing the columns with the same name in sdf_x and sdf_y
Args
sdf_x:DataFrame- one of the two spark dataframes to join (will be treated as left table in the join operation) and will have priority in coalesce.
sdf_y:DataFrame- one of the two spark dataframes to join (will be treated as right table in the join operation).
on:str, list- a string for the join column name, a list of column names, a join expression (Column), or a list of Columns. If on is a string or a list of strings indicating the name of the join column(s), the column(s) must exist on both sides, and this performs an equi-join.
how:str- default fullouter. Must be one of: inner, cross, outer, full, fullouter, full_outer, left, leftouter, left_outer, right, rightouter, right_outer, semi, leftsemi, left_semi, anti, leftanti and left_anti.
Returns
DataFrame- new dataframe is the result of the combined merge and coalesce operations.