Module panama.utils.spark_df
Functions
def add_col_date(sdf: pyspark.sql.dataframe.DataFrame,
granularity: str,
date_col: str = 'date',
output_col: str | None = None) ‑> pyspark.sql.dataframe.DataFrame-
Expand source code
def add_col_date( sdf: DataFrame, granularity: str, date_col: str = "date", output_col: Union[str, None] = None ) -> DataFrame: """Adds a new column to the DataFrame with the specified granularity. Args: sdf (DataFrame): DataFrame with the new date column to trunc. granularity (str): a valid granularity. date_col (str, optional): the date column name to transform. output_col (Union[str, None], optional): the column name for the new date column created. Defaults to None. Returns: DataFrame: DataFrame with the new date column. """ if output_col is None: output_col = granularity new_sdf = sdf.withColumn(output_col, F.date_trunc(granularity, F.col(date_col))) if granularity in ["year", "quarter", "month", "day"]: new_sdf = new_sdf.withColumn(output_col, F.to_date(F.col(output_col))) return new_sdf
Adds a new column to the DataFrame with the specified granularity.
Args
sdf
:DataFrame
- DataFrame with the new date column to trunc.
granularity
:str
- a valid granularity.
date_col
:str
, optional- the date column name to transform.
output_col
:Union[str, None]
, optional- the column name for the new date column created. Defaults to None.
Returns: DataFrame: DataFrame with the new date column.
def add_linear_combination(sdf: pyspark.sql.dataframe.DataFrame,
coefficients: List,
colnames: List,
output_col: str) ‑> pyspark.sql.dataframe.DataFrame-
Expand source code
def add_linear_combination(sdf: DataFrame, coefficients: List, colnames: List, output_col: str) -> DataFrame: """ Add a column to a spark dataframe computing the element-wise linear combination of two set columns: sum(i=1...n)(coefficients_i*colnames_i) if n = len(coefficients) = len(colnames) sum(i=1...n)(coefficients_i*colnames_i) + coefficients_n+1 if n = len(coefficients) - 1 = len(colnames) Args: sdf: dataframe to be processed coefficient: List of coefficient, they can be both a string indicating a valid column name in sdf or a number indicating the coefficient to be consider for all sdf rows colnames: List of valid column_names in sdf output_col: name of the output_col containing the linear combination Returns: the original sdf with output_col added. """ # Check if constant_term is present constant_term_coeff = 0 if len(coefficients) != len(colnames): if len(coefficients) == len(colnames) + 1: constant_term_coeff = coefficients[-1] else: warnings.warn("Anomalous lengths in coefficients and colnames inputs: exceding elements will be ignored") # Manage constant_term constant_term = F.col(constant_term_coeff) if isinstance(constant_term_coeff, str) else constant_term_coeff # Manage other factors zip_coeff_col = zip(colnames, coefficients) factors = sum( (F.col(colnam) * (F.col(coeff) if isinstance(coeff, str) else coeff)) for colnam, coeff in zip_coeff_col ) # Compute linear combination sdf = sdf.withColumn(output_col, factors + constant_term) # type: ignore return sdf
Add a column to a spark dataframe computing the element-wise linear combination of two set columns: sum(i=1…n)(coefficients_icolnames_i) if n = len(coefficients) = len(colnames) sum(i=1…n)(coefficients_icolnames_i) + coefficients_n+1 if n = len(coefficients) - 1 = len(colnames)
Args
sdf
- dataframe to be processed
coefficient
- List of coefficient, they can be both a string indicating a valid column name in sdf or a number indicating the coefficient to be consider for all sdf rows
colnames
- List of valid column_names in sdf
output_col
- name of the output_col containing the linear combination
Returns
the original sdf with output_col added.
def cache_or_persist_sdf(sdf: pyspark.sql.dataframe.DataFrame, mode: str | None = None) ‑> None
-
Expand source code
def cache_or_persist_sdf(sdf: DataFrame, mode: Optional[str] = None) -> None: """Perform a persist or cache step and then an action count on a given spark dataframe. Args: sdf (DataFrame): dataframe to cache or persist. mode (str): what caching operation to perform. Accepted values are 'cache' and 'persist'. """ if mode == "cache": sdf.cache().count() elif mode == "persist": sdf.persist().count() elif mode is None: pass else: raise ValueError(f"cache_strategy must be one between 'cache' and 'persist', found '{mode}' instead.")
Perform a persist or cache step and then an action count on a given spark dataframe.
Args
sdf
:DataFrame
- dataframe to cache or persist.
mode
:str
- what caching operation to perform. Accepted values are 'cache' and 'persist'.
def format_aggregated_columns(sdf: pyspark.sql.dataframe.DataFrame) ‑> pyspark.sql.dataframe.DataFrame
-
Expand source code
def format_aggregated_columns(sdf: DataFrame) -> DataFrame: """Function to format aggregated columns. Runs through all the columns, renaming those matching a pattern: namely, "aggr_func(col_name)" is renamed with "col_name". Args: sdf (DataFrame): dataframe whose columns are to be renamed. Returns: DataFrame: DataFrame with renamed columns. """ cols = sdf.columns pattern = re.compile("(?<=\()\w*(?=\))") # type: ignore output = [] for c in cols: search = re.search(pattern, c) if search is not None: out_c = F.col(c).alias(search.group(0)) else: out_c = F.col(c) output.append(out_c) return sdf.select(*output)
Function to format aggregated columns. Runs through all the columns, renaming those matching a pattern: namely, "aggr_func(col_name)" is renamed with "col_name".
Args
sdf
:DataFrame
- dataframe whose columns are to be renamed.
Returns
DataFrame
- DataFrame with renamed columns.
def from_list_to_str(list_to_convert: list) ‑> str | List[str]
-
Expand source code
def from_list_to_str(list_to_convert: list) -> Union[str, List[str]]: """Returns a string if it is a list with only one element, otherwise returns the original list. Args: list_to_convert (list): the list to convert in string. Returns: Union[str, List[str]]: the final string or the original list if it has more elements. """ if len(list_to_convert) == 1: return str(list_to_convert[0]) else: return list_to_convert
Returns a string if it is a list with only one element, otherwise returns the original list.
Args
list_to_convert
:list
- the list to convert in string.
Returns
Union[str, List[str]]
- the final string or the original list if it has more elements.
def get_col_as_list(sdf: pyspark.sql.dataframe.DataFrame,
colname: str,
distinct: bool = False,
drop_null: bool = False) ‑> List[Any]-
Expand source code
def get_col_as_list(sdf: DataFrame, colname: str, distinct: bool = False, drop_null: bool = False) -> List[Any]: """Return columns values as python list. Args: sdf: dataframe to be processed. colname: name of the column to be extracted. distinct: if True return all or only distinct values. drop_null: if True drops null before returning result. Returns: list containing column values. """ sdf = sdf.select(colname) if distinct: sdf = sdf.distinct() if drop_null: sdf = sdf.dropna() return sdf.toPandas()[colname].tolist()
Return columns values as python list.
Args
sdf
- dataframe to be processed.
colname
- name of the column to be extracted.
distinct
- if True return all or only distinct values.
drop_null
- if True drops null before returning result.
Returns
list containing column values.
def get_composite_conv_factor(udm_from: str | List[str],
udm_to: str | List[str],
conv_fact: pyspark.sql.dataframe.DataFrame) ‑> list-
Expand source code
def get_composite_conv_factor( udm_from: Union[str, List[str]], udm_to: Union[str, List[str]], conv_fact: DataFrame ) -> list: """Returns the conversion factor given also composite unit of measures (ex. EUR/MWh). Args: udm_from (Union[str, List[str]]): the ordered starting unit of measures. udm_to (Union[str, List[str]]): the ordered desired unit of measures. conv_fact (DataFrame): DataFrame with conversion factors. Returns: list: the final conversion factors. """ # Check on input if udm_from is None or udm_to is None: raise Exception("Missing unit of measures in input.") udm_from = [udm_from] if isinstance(udm_from, str) else udm_from udm_to = [udm_to] if isinstance(udm_to, str) else udm_to # Split from_num = [num.split("/")[0] for num in udm_from] from_den = [den.split("/")[1] if "/" in den else None for den in udm_from] to_num = [num.split("/")[0] for num in udm_to] to_den = [den.split("/")[1] if "/" in den else None for den in udm_to] # Conversion conv_fact_num = get_single_conv_factor(from_num, to_num, conv_fact) conv_fact_den = get_single_conv_factor(from_den, to_den, conv_fact) factors = [value_num / value_den for value_num, value_den in zip(conv_fact_num, conv_fact_den)] return factors
Returns the conversion factor given also composite unit of measures (ex. EUR/MWh).
Args
udm_from
:Union[str, List[str]]
- the ordered starting unit of measures.
udm_to
:Union[str, List[str]]
- the ordered desired unit of measures.
conv_fact
:DataFrame
- DataFrame with conversion factors.
Returns
list
- the final conversion factors.
def get_join_expr(first_key: list,
second_key: list,
interval_join: bool = False,
first_alias='x',
second_alias='y') ‑> pyspark.sql.column.Column-
Expand source code
def get_join_expr( first_key: list, second_key: list, interval_join: bool = False, first_alias="x", second_alias="y" ) -> Column: """ Returns the join expression given two key lists. Args: first_key (list): list with key column names of first DataFrame. second_key (list): list with key column names of second DataFrame. interval_join (bool): boolean flag if the join must contain expression like 'date between start and end'. Returns: Column: the final join expression. """ if (len(first_key) == 0) and (len(second_key) == 0): return F.expr("1=1") first_key = [first_alias + "." + fk for fk in first_key] second_key = [second_alias + "." + sk for sk in second_key] key_list = [first_key, second_key] sorted_key_list = sorted(key_list, key=len) join_expr_interval = "" if interval_join: if len(first_key) == len(second_key): join_expr_interval = ( "not (" + "(" + sorted_key_list[0][0] + ">" + sorted_key_list[1][1] + ")" + " or " "(" + sorted_key_list[1][0] + ">" + sorted_key_list[0][1] + ")" + ")" ) sorted_key_list = [sorted_key_list[0][2:], sorted_key_list[1][2:]] else: join_expr_interval = ( sorted_key_list[0][0] + " between " + sorted_key_list[1][0] + " and " + sorted_key_list[1][1] ) sorted_key_list = [sorted_key_list[0][1:], sorted_key_list[1][2:]] join_expr = " and ".join([i[0] + "=" + i[1] for i in zip(sorted_key_list[0], sorted_key_list[1])]) if len(join_expr) == 0 or len(join_expr_interval) == 0: output_join_expr = " and ".join([join_expr + join_expr_interval]) else: output_join_expr = " and ".join([join_expr, join_expr_interval]) return F.expr(output_join_expr)
Returns the join expression given two key lists.
Args
first_key
:list
- list with key column names of first DataFrame.
second_key
:list
- list with key column names of second DataFrame.
interval_join
:bool
- boolean flag if the join must contain expression like 'date between start and end'.
Returns
Column
- the final join expression.
def get_single_conv_factor(udm_from: str | Sequence[str | None] | None,
udm_to: str | Sequence[str | None] | None,
conv_fact: pyspark.sql.dataframe.DataFrame) ‑> list-
Expand source code
def get_single_conv_factor( udm_from: Union[str, Sequence[Union[str, None]], None], udm_to: Union[str, Sequence[Union[str, None]], None], conv_fact: DataFrame, ) -> list: """Returns the conversion factor given single unit of measures. Args: udm_from (Union[str, List[Union[str, None]], None]): the ordered starting unit of measures. udm_to (Union[str, List[Union[str, None]], None]): the ordered desired unit of measures. conv_fact (DataFrame): DataFrame with conversion factors. Returns: list: the final conversion factors. """ if isinstance(udm_from, str) or udm_from is None: udm_from = [udm_from] if isinstance(udm_to, str) or udm_to is None: udm_to = [udm_to] fact_output = [] for from_value, to_value in zip(udm_from, udm_to): if (from_value is None and to_value is None) or (from_value == to_value): fact_output.append(1) elif from_value is None or to_value is None: fact_output.append(None) else: factors = ( conv_fact.filter((F.col("udm_from") == from_value) & (F.col("udm_to") == to_value)) .select("factor") .rdd.flatMap(lambda x: x) .collect() ) fact_output.append(factors[0]) return fact_output
Returns the conversion factor given single unit of measures.
Args
udm_from
:Union[str, List[Union[str, None]], None]
- the ordered starting unit of measures.
udm_to
:Union[str, List[Union[str, None]], None]
- the ordered desired unit of measures.
conv_fact
:DataFrame
- DataFrame with conversion factors.
Returns
list
- the final conversion factors.
def join_coalesce(sdf_x: pyspark.sql.dataframe.DataFrame,
sdf_y: pyspark.sql.dataframe.DataFrame,
on: str | list | pyspark.sql.column.Column,
x_cols: List,
y_cols: List | None = None,
how: str = 'fullouter',
aliases: List | None = None,
is_priority_left: bool = True,
drop_original_cols: bool = True) ‑> pyspark.sql.dataframe.DataFrame-
Expand source code
def join_coalesce( sdf_x: DataFrame, sdf_y: DataFrame, on: Union[str, list, Column], x_cols: List, y_cols: Union[List, None] = None, how: str = "fullouter", aliases: Union[List, None] = None, is_priority_left: bool = True, drop_original_cols: bool = True, ) -> DataFrame: """Merge two dataframes and then coalesce on or more pair of columns (coming from both dataframes) using a specific priority rule, by default columns in the left dataframe will have priority. User can also define merge behaviour. Args: left (DataFrame): one of the two spark dataframes to join (will be treated as left table in the join operation) and will have the default priority. right (DataFrame): one of the two spark dataframes to join (will be treated as right table in the join operation). on (str, list, Column): a string for the join column name, a list of column names, a join expression (Column), or a list of Columns. If on is a string or a list of strings indicating the name of the join column(s), the column(s) must exist on both sides, and this performs an equi-join. how (str): default fullouter. Must be one of: inner, cross, outer, full, fullouter, full_outer, left, leftouter, left_outer, right, rightouter, right_outer, semi, leftsemi, left_semi, anti, leftanti and left_anti. x_cols (List): ordered list of columns to coalesce from left dataframe. Order must be the same as "y_cols" y_cols (Union[List, None]): ordered list of columns to coalesce from left dataframe. Order must be the same as "x_cols". If empty the names will be considered the same as x_cols. aliases (Union[List, None]): ordered list of new name to give to coalesced columns. If empty the new columns will be called as the original columns in the left dataframe. is_priority_left (bool (Optional)): indicates if the columns in the left dataframe have priority. Default is True. drop_original_cols (bool (Optional)): indicates if the originale columns need to be dropped after coalesce step. Default is True. Returns: DataFrame: new dataframe is the result of the combined merge and coalesce operations. """ # Alias both dataframes sdf_x = sdf_x.alias("x") sdf_y = sdf_y.alias("y") # Merge the two dataframes final = sdf_x.join(sdf_y, on=on, how=how) # Check if all input has same lenght if y_cols is None: y_cols = x_cols if len(x_cols) == len(y_cols): if aliases == None or len(aliases) == len(x_cols): pass # all check ok else: raise ValueError("If providing aliases, it must have same lenght as x_cols and y_cols") else: raise ValueError("x_cols and y_cols must contain the same number of cols") # Create aliases list if not provided using x_cols if aliases == None: aliases = x_cols else: pass # Let's use provided aliases # Determine priority if is_priority_left == True: priority_cols = ["x." + c for c in x_cols] secondary_cols = ["y." + c for c in y_cols] else: priority_cols = ["y." + c for c in y_cols] secondary_cols = ["x." + c for c in x_cols] # Iterate over columns pairs and perform all coalesce operations for i in range(0, len(x_cols), 1): final = final.select("*", F.coalesce(final[priority_cols[i]], final[secondary_cols[i]]).alias(aliases[i])) # If needed, drop original columns if drop_original_cols == True: final = final.drop(F.col(priority_cols[i])) final = final.drop(F.col(secondary_cols[i])) else: pass # Do nothing return final
Merge two dataframes and then coalesce on or more pair of columns (coming from both dataframes) using a specific priority rule, by default columns in the left dataframe will have priority. User can also define merge behaviour.
Args
left
:DataFrame
- one of the two spark dataframes to join (will be treated as left table in the join operation) and will have the default priority.
right
:DataFrame
- one of the two spark dataframes to join (will be treated as right table in the join operation).
on
:str, list, Column
- a string for the join column name, a list of column names, a join expression (Column), or a list of Columns. If on is a string or a list of strings indicating the name of the join column(s), the column(s) must exist on both sides, and this performs an equi-join.
how
:str
- default fullouter. Must be one of: inner, cross, outer, full, fullouter, full_outer, left, leftouter, left_outer, right, rightouter, right_outer, semi, leftsemi, left_semi, anti, leftanti and left_anti.
x_cols
:List
- ordered list of columns to coalesce from left dataframe. Order must be the same as "y_cols"
y_cols
:Union[List, None]
- ordered list of columns to coalesce from left dataframe. Order must be the same as "x_cols". If empty the names will be considered the same as x_cols.
aliases
:Union[List, None]
- ordered list of new name to give to coalesced columns. If empty the new columns will be called as the original columns in the left dataframe.
is_priority_left (bool (Optional)): indicates if the columns in the left dataframe have priority. Default is True. drop_original_cols (bool (Optional)): indicates if the originale columns need to be dropped after coalesce step. Default is True.
Returns
DataFrame
- new dataframe is the result of the combined merge and coalesce operations.
def join_coalesce_common_cols(sdf_x: pyspark.sql.dataframe.DataFrame,
sdf_y: pyspark.sql.dataframe.DataFrame,
on: str | list | pyspark.sql.column.Column,
how: str = 'inner') ‑> pyspark.sql.dataframe.DataFrame-
Expand source code
def join_coalesce_common_cols( sdf_x: DataFrame, sdf_y: DataFrame, on: Union[str, list, Column], how: str = "inner" ) -> DataFrame: """Join two dataframes and remove duplicated columns by coalescing the columns with the same name in sdf_x and sdf_y Args: sdf_x (DataFrame): one of the two spark dataframes to join (will be treated as left table in the join operation) and will have priority in coalesce. sdf_y (DataFrame): one of the two spark dataframes to join (will be treated as right table in the join operation). on (str, list): a string for the join column name, a list of column names, a join expression (Column), or a list of Columns. If on is a string or a list of strings indicating the name of the join column(s), the column(s) must exist on both sides, and this performs an equi-join. how (str): default fullouter. Must be one of: inner, cross, outer, full, fullouter, full_outer, left, leftouter, left_outer, right, rightouter, right_outer, semi, leftsemi, left_semi, anti, leftanti and left_anti. Returns: DataFrame: new dataframe is the result of the combined merge and coalesce operations. """ col_x = set(sdf_x.columns) col_y = set(sdf_y.columns) common_cols = list(col_x & col_y) sdf_join = join_coalesce(sdf_x, sdf_y, on=on, x_cols=common_cols, how=how) return sdf_join
Join two dataframes and remove duplicated columns by coalescing the columns with the same name in sdf_x and sdf_y
Args
sdf_x
:DataFrame
- one of the two spark dataframes to join (will be treated as left table in the join operation) and will have priority in coalesce.
sdf_y
:DataFrame
- one of the two spark dataframes to join (will be treated as right table in the join operation).
on
:str, list
- a string for the join column name, a list of column names, a join expression (Column), or a list of Columns. If on is a string or a list of strings indicating the name of the join column(s), the column(s) must exist on both sides, and this performs an equi-join.
how
:str
- default fullouter. Must be one of: inner, cross, outer, full, fullouter, full_outer, left, leftouter, left_outer, right, rightouter, right_outer, semi, leftsemi, left_semi, anti, leftanti and left_anti.
Returns
DataFrame
- new dataframe is the result of the combined merge and coalesce operations.