Module panama.utils.spark_df
Functions
def add_col_date(sdf: pyspark.sql.dataframe.DataFrame, granularity: str, date_col: str = 'date', output_col: Optional[str] = None) ‑> pyspark.sql.dataframe.DataFrame
-
Adds a new column to the DataFrame with the specified granularity.
Args
sdf
:DataFrame
- DataFrame with the new date column to trunc.
granularity
:str
- a valid granularity.
date_col
:str
, optional- the date column name to transform.
output_col
:Union[str, None]
, optional- the column name for the new date column created. Defaults to None.
Returns: DataFrame: DataFrame with the new date column.
def add_linear_combination(sdf: pyspark.sql.dataframe.DataFrame, coefficients: List, colnames: List, output_col: str) ‑> pyspark.sql.dataframe.DataFrame
-
Add a column to a spark dataframe computing the element-wise linear combination of two set columns: sum(i=1…n)(coefficients_icolnames_i) if n = len(coefficients) = len(colnames) sum(i=1…n)(coefficients_icolnames_i) + coefficients_n+1 if n = len(coefficients) - 1 = len(colnames)
Args
sdf
- dataframe to be processed
coefficient
- List of coefficient, they can be both a string indicating a valid column name in sdf or a number indicating the coefficient to be consider for all sdf rows
colnames
- List of valid column_names in sdf
output_col
- name of the output_col containing the linear combination
Returns
the original sdf with output_col added.
def cache_or_persist_sdf(sdf: pyspark.sql.dataframe.DataFrame, mode: Optional[str] = None) ‑> None
-
Perform a persist or cache step and then an action count on a given spark dataframe.
Args
sdf
:DataFrame
- dataframe to cache or persist.
mode
:str
- what caching operation to perform. Accepted values are 'cache' and 'persist'.
def format_aggregated_columns(sdf: pyspark.sql.dataframe.DataFrame) ‑> pyspark.sql.dataframe.DataFrame
-
Function to format aggregated columns. Runs through all the columns, renaming those matching a pattern: namely, "aggr_func(col_name)" is renamed with "col_name".
Args
sdf
:DataFrame
- dataframe whose columns are to be renamed.
Returns
DataFrame
- DataFrame with renamed columns.
def from_list_to_str(list_to_convert: list) ‑> Union[str, List[str]]
-
Returns a string if it is a list with only one element, otherwise returns the original list.
Args
list_to_convert
:list
- the list to convert in string.
Returns
Union[str, List[str]]
- the final string or the original list if it has more elements.
def get_col_as_list(sdf: pyspark.sql.dataframe.DataFrame, colname: str, distinct: bool = False, drop_null: bool = False) ‑> List[Any]
-
Return columns values as python list.
Args
sdf
- dataframe to be processed.
colname
- name of the column to be extracted.
distinct
- if True return all or only distinct values.
drop_null
- if True drops null before returning result.
Returns
list containing column values.
def get_composite_conv_factor(udm_from: Union[str, List[str]], udm_to: Union[str, List[str]], conv_fact: pyspark.sql.dataframe.DataFrame) ‑> list
-
Returns the conversion factor given also composite unit of measures (ex. EUR/MWh).
Args
udm_from
:Union[str, List[str]]
- the ordered starting unit of measures.
udm_to
:Union[str, List[str]]
- the ordered desired unit of measures.
conv_fact
:DataFrame
- DataFrame with conversion factors.
Returns
list
- the final conversion factors.
def get_join_expr(first_key: list, second_key: list, interval_join: bool = False, first_alias='x', second_alias='y') ‑> pyspark.sql.column.Column
-
Returns the join expression given two key lists.
Args
first_key
:list
- list with key column names of first DataFrame.
second_key
:list
- list with key column names of second DataFrame.
interval_join
:bool
- boolean flag if the join must contain expression like 'date between start and end'.
Returns
Column
- the final join expression.
def get_single_conv_factor(udm_from: Union[str, Sequence[Optional[str]], ForwardRef(None)], udm_to: Union[str, Sequence[Optional[str]], ForwardRef(None)], conv_fact: pyspark.sql.dataframe.DataFrame) ‑> list
-
Returns the conversion factor given single unit of measures.
Args
udm_from
:Union[str, List[Union[str, None]], None]
- the ordered starting unit of measures.
udm_to
:Union[str, List[Union[str, None]], None]
- the ordered desired unit of measures.
conv_fact
:DataFrame
- DataFrame with conversion factors.
Returns
list
- the final conversion factors.
def join_coalesce(sdf_x: pyspark.sql.dataframe.DataFrame, sdf_y: pyspark.sql.dataframe.DataFrame, on: Union[str, list, pyspark.sql.column.Column], x_cols: List, y_cols: Optional[List] = None, how: str = 'fullouter', aliases: Optional[List] = None, is_priority_left: bool = True, drop_original_cols: bool = True) ‑> pyspark.sql.dataframe.DataFrame
-
Merge two dataframes and then coalesce on or more pair of columns (coming from both dataframes) using a specific priority rule, by default columns in the left dataframe will have priority. User can also define merge behaviour.
Args
left
:DataFrame
- one of the two spark dataframes to join (will be treated as left table in the join operation) and will have the default priority.
right
:DataFrame
- one of the two spark dataframes to join (will be treated as right table in the join operation).
on
:str, list, Column
- a string for the join column name, a list of column names, a join expression (Column), or a list of Columns. If on is a string or a list of strings indicating the name of the join column(s), the column(s) must exist on both sides, and this performs an equi-join.
how
:str
- default fullouter. Must be one of: inner, cross, outer, full, fullouter, full_outer, left, leftouter, left_outer, right, rightouter, right_outer, semi, leftsemi, left_semi, anti, leftanti and left_anti.
x_cols
:List
- ordered list of columns to coalesce from left dataframe. Order must be the same as "y_cols"
y_cols
:Union[List, None]
- ordered list of columns to coalesce from left dataframe. Order must be the same as "x_cols". If empty the names will be considered the same as x_cols.
aliases
:Union[List, None]
- ordered list of new name to give to coalesced columns. If empty the new columns will be called as the original columns in the left dataframe.
is_priority_left (bool (Optional)): indicates if the columns in the left dataframe have priority. Default is True. drop_original_cols (bool (Optional)): indicates if the originale columns need to be dropped after coalesce step. Default is True.
Returns
DataFrame
- new dataframe is the result of the combined merge and coalesce operations.
def join_coalesce_common_cols(sdf_x: pyspark.sql.dataframe.DataFrame, sdf_y: pyspark.sql.dataframe.DataFrame, on: Union[str, list, pyspark.sql.column.Column], how: str = 'inner') ‑> pyspark.sql.dataframe.DataFrame
-
Join two dataframes and remove duplicated columns by coalescing the columns with the same name in sdf_x and sdf_y
Args
sdf_x
:DataFrame
- one of the two spark dataframes to join (will be treated as left table in the join operation) and will have priority in coalesce.
sdf_y
:DataFrame
- one of the two spark dataframes to join (will be treated as right table in the join operation).
on
:str, list
- a string for the join column name, a list of column names, a join expression (Column), or a list of Columns. If on is a string or a list of strings indicating the name of the join column(s), the column(s) must exist on both sides, and this performs an equi-join.
how
:str
- default fullouter. Must be one of: inner, cross, outer, full, fullouter, full_outer, left, leftouter, left_outer, right, rightouter, right_outer, semi, leftsemi, left_semi, anti, leftanti and left_anti.
Returns
DataFrame
- new dataframe is the result of the combined merge and coalesce operations.