Module panama.utils.spark_df

Functions

def add_col_date(sdf: pyspark.sql.dataframe.DataFrame,
granularity: str,
date_col: str = 'date',
output_col: str | None = None) ‑> pyspark.sql.dataframe.DataFrame
Expand source code
def add_col_date(
    sdf: DataFrame, granularity: str, date_col: str = "date", output_col: Union[str, None] = None
) -> DataFrame:
    """Adds a new column to the DataFrame with the specified granularity.

    Args:
        sdf (DataFrame): DataFrame with the new date column to trunc.
        granularity (str): a valid granularity.
        date_col (str, optional): the date column name to transform.
        output_col (Union[str, None], optional): the column name for the new date column created. Defaults to None.

    Returns:
    DataFrame: DataFrame with the new date column.
    """
    if output_col is None:
        output_col = granularity
    new_sdf = sdf.withColumn(output_col, F.date_trunc(granularity, F.col(date_col)))

    if granularity in ["year", "quarter", "month", "day"]:
        new_sdf = new_sdf.withColumn(output_col, F.to_date(F.col(output_col)))

    return new_sdf

Adds a new column to the DataFrame with the specified granularity.

Args

sdf : DataFrame
DataFrame with the new date column to trunc.
granularity : str
a valid granularity.
date_col : str, optional
the date column name to transform.
output_col : Union[str, None], optional
the column name for the new date column created. Defaults to None.

Returns: DataFrame: DataFrame with the new date column.

def add_linear_combination(sdf: pyspark.sql.dataframe.DataFrame,
coefficients: List,
colnames: List,
output_col: str) ‑> pyspark.sql.dataframe.DataFrame
Expand source code
def add_linear_combination(sdf: DataFrame, coefficients: List, colnames: List, output_col: str) -> DataFrame:
    """
    Add a column to a spark dataframe computing the element-wise linear combination of two set columns:
    sum(i=1...n)(coefficients_i*colnames_i) if n = len(coefficients) = len(colnames)
    sum(i=1...n)(coefficients_i*colnames_i) + coefficients_n+1 if n = len(coefficients) - 1 = len(colnames)

    Args:
        sdf: dataframe to be processed
        coefficient: List of coefficient, they can be both a string indicating a valid column name in sdf or a number indicating the coefficient to be consider for all sdf rows
        colnames: List of valid column_names in sdf
        output_col: name of the output_col containing the linear combination

    Returns:
        the original sdf with output_col added.

    """
    # Check if constant_term is present
    constant_term_coeff = 0
    if len(coefficients) != len(colnames):
        if len(coefficients) == len(colnames) + 1:
            constant_term_coeff = coefficients[-1]
        else:
            warnings.warn("Anomalous lengths in coefficients and colnames inputs: exceding elements will be ignored")

    # Manage constant_term
    constant_term = F.col(constant_term_coeff) if isinstance(constant_term_coeff, str) else constant_term_coeff

    # Manage other factors
    zip_coeff_col = zip(colnames, coefficients)
    factors = sum(
        (F.col(colnam) * (F.col(coeff) if isinstance(coeff, str) else coeff)) for colnam, coeff in zip_coeff_col
    )

    # Compute linear combination
    sdf = sdf.withColumn(output_col, factors + constant_term)  # type: ignore

    return sdf

Add a column to a spark dataframe computing the element-wise linear combination of two set columns: sum(i=1…n)(coefficients_icolnames_i) if n = len(coefficients) = len(colnames) sum(i=1…n)(coefficients_icolnames_i) + coefficients_n+1 if n = len(coefficients) - 1 = len(colnames)

Args

sdf
dataframe to be processed
coefficient
List of coefficient, they can be both a string indicating a valid column name in sdf or a number indicating the coefficient to be consider for all sdf rows
colnames
List of valid column_names in sdf
output_col
name of the output_col containing the linear combination

Returns

the original sdf with output_col added.

def cache_or_persist_sdf(sdf: pyspark.sql.dataframe.DataFrame, mode: str | None = None) ‑> None
Expand source code
def cache_or_persist_sdf(sdf: DataFrame, mode: Optional[str] = None) -> None:
    """Perform a persist or cache step and then an action count on a given spark dataframe.

    Args:
        sdf (DataFrame): dataframe to cache or persist.
        mode (str): what caching operation to perform. Accepted values are 'cache' and 'persist'.
    """
    if mode == "cache":
        sdf.cache().count()
    elif mode == "persist":
        sdf.persist().count()
    elif mode is None:
        pass
    else:
        raise ValueError(f"cache_strategy must be one between 'cache' and 'persist', found '{mode}' instead.")

Perform a persist or cache step and then an action count on a given spark dataframe.

Args

sdf : DataFrame
dataframe to cache or persist.
mode : str
what caching operation to perform. Accepted values are 'cache' and 'persist'.
def format_aggregated_columns(sdf: pyspark.sql.dataframe.DataFrame) ‑> pyspark.sql.dataframe.DataFrame
Expand source code
def format_aggregated_columns(sdf: DataFrame) -> DataFrame:
    """Function to format aggregated columns. Runs through all the columns, renaming those matching a pattern:
    namely, "aggr_func(col_name)" is renamed with "col_name".

    Args:
        sdf (DataFrame): dataframe whose columns are to be renamed.

    Returns:
        DataFrame: DataFrame with renamed columns.
    """
    cols = sdf.columns
    pattern = re.compile("(?<=\()\w*(?=\))")  # type: ignore
    output = []
    for c in cols:
        search = re.search(pattern, c)
        if search is not None:
            out_c = F.col(c).alias(search.group(0))
        else:
            out_c = F.col(c)
        output.append(out_c)
    return sdf.select(*output)

Function to format aggregated columns. Runs through all the columns, renaming those matching a pattern: namely, "aggr_func(col_name)" is renamed with "col_name".

Args

sdf : DataFrame
dataframe whose columns are to be renamed.

Returns

DataFrame
DataFrame with renamed columns.
def from_list_to_str(list_to_convert: list) ‑> str | List[str]
Expand source code
def from_list_to_str(list_to_convert: list) -> Union[str, List[str]]:
    """Returns a string if it is a list with only one element, otherwise returns the original list.

    Args:
        list_to_convert (list): the list to convert in string.

    Returns:
        Union[str, List[str]]: the final string or the original list if it has more elements.
    """
    if len(list_to_convert) == 1:
        return str(list_to_convert[0])
    else:
        return list_to_convert

Returns a string if it is a list with only one element, otherwise returns the original list.

Args

list_to_convert : list
the list to convert in string.

Returns

Union[str, List[str]]
the final string or the original list if it has more elements.
def get_col_as_list(sdf: pyspark.sql.dataframe.DataFrame,
colname: str,
distinct: bool = False,
drop_null: bool = False) ‑> List[Any]
Expand source code
def get_col_as_list(sdf: DataFrame, colname: str, distinct: bool = False, drop_null: bool = False) -> List[Any]:
    """Return columns values as python list.

    Args:
        sdf: dataframe to be processed.
        colname: name of the column to be extracted.
        distinct: if True return all or only distinct values.
        drop_null: if True drops null before returning result.
    Returns:
        list containing column values.

    """
    sdf = sdf.select(colname)
    if distinct:
        sdf = sdf.distinct()
    if drop_null:
        sdf = sdf.dropna()
    return sdf.toPandas()[colname].tolist()

Return columns values as python list.

Args

sdf
dataframe to be processed.
colname
name of the column to be extracted.
distinct
if True return all or only distinct values.
drop_null
if True drops null before returning result.

Returns

list containing column values.

def get_composite_conv_factor(udm_from: str | List[str],
udm_to: str | List[str],
conv_fact: pyspark.sql.dataframe.DataFrame) ‑> list
Expand source code
def get_composite_conv_factor(
    udm_from: Union[str, List[str]], udm_to: Union[str, List[str]], conv_fact: DataFrame
) -> list:
    """Returns the conversion factor given also composite unit of measures (ex. EUR/MWh).

    Args:
        udm_from (Union[str, List[str]]): the ordered starting unit of measures.
        udm_to (Union[str, List[str]]): the ordered desired unit of measures.
        conv_fact (DataFrame): DataFrame with conversion factors.

    Returns:
        list: the final conversion factors.
    """
    # Check on input
    if udm_from is None or udm_to is None:
        raise Exception("Missing unit of measures in input.")
    udm_from = [udm_from] if isinstance(udm_from, str) else udm_from
    udm_to = [udm_to] if isinstance(udm_to, str) else udm_to

    # Split
    from_num = [num.split("/")[0] for num in udm_from]
    from_den = [den.split("/")[1] if "/" in den else None for den in udm_from]
    to_num = [num.split("/")[0] for num in udm_to]
    to_den = [den.split("/")[1] if "/" in den else None for den in udm_to]

    # Conversion
    conv_fact_num = get_single_conv_factor(from_num, to_num, conv_fact)
    conv_fact_den = get_single_conv_factor(from_den, to_den, conv_fact)
    factors = [value_num / value_den for value_num, value_den in zip(conv_fact_num, conv_fact_den)]

    return factors

Returns the conversion factor given also composite unit of measures (ex. EUR/MWh).

Args

udm_from : Union[str, List[str]]
the ordered starting unit of measures.
udm_to : Union[str, List[str]]
the ordered desired unit of measures.
conv_fact : DataFrame
DataFrame with conversion factors.

Returns

list
the final conversion factors.
def get_join_expr(first_key: list,
second_key: list,
interval_join: bool = False,
first_alias='x',
second_alias='y') ‑> pyspark.sql.column.Column
Expand source code
def get_join_expr(
    first_key: list, second_key: list, interval_join: bool = False, first_alias="x", second_alias="y"
) -> Column:
    """
    Returns the join expression given two key lists.

    Args:
        first_key (list): list with key column names of first DataFrame.
        second_key (list): list with key column names of second DataFrame.
        interval_join (bool): boolean flag if the join must contain expression like 'date between start and end'.

    Returns:
        Column: the final join expression.
    """
    if (len(first_key) == 0) and (len(second_key) == 0):
        return F.expr("1=1")

    first_key = [first_alias + "." + fk for fk in first_key]
    second_key = [second_alias + "." + sk for sk in second_key]
    key_list = [first_key, second_key]

    sorted_key_list = sorted(key_list, key=len)

    join_expr_interval = ""
    if interval_join:
        if len(first_key) == len(second_key):
            join_expr_interval = (
                "not (" + "(" + sorted_key_list[0][0] + ">" + sorted_key_list[1][1] + ")" + " or "
                "(" + sorted_key_list[1][0] + ">" + sorted_key_list[0][1] + ")" + ")"
            )
            sorted_key_list = [sorted_key_list[0][2:], sorted_key_list[1][2:]]

        else:
            join_expr_interval = (
                sorted_key_list[0][0] + " between " + sorted_key_list[1][0] + " and " + sorted_key_list[1][1]
            )
            sorted_key_list = [sorted_key_list[0][1:], sorted_key_list[1][2:]]

    join_expr = " and ".join([i[0] + "=" + i[1] for i in zip(sorted_key_list[0], sorted_key_list[1])])
    if len(join_expr) == 0 or len(join_expr_interval) == 0:
        output_join_expr = " and ".join([join_expr + join_expr_interval])
    else:
        output_join_expr = " and ".join([join_expr, join_expr_interval])

    return F.expr(output_join_expr)

Returns the join expression given two key lists.

Args

first_key : list
list with key column names of first DataFrame.
second_key : list
list with key column names of second DataFrame.
interval_join : bool
boolean flag if the join must contain expression like 'date between start and end'.

Returns

Column
the final join expression.
def get_single_conv_factor(udm_from: str | Sequence[str | None] | None,
udm_to: str | Sequence[str | None] | None,
conv_fact: pyspark.sql.dataframe.DataFrame) ‑> list
Expand source code
def get_single_conv_factor(
    udm_from: Union[str, Sequence[Union[str, None]], None],
    udm_to: Union[str, Sequence[Union[str, None]], None],
    conv_fact: DataFrame,
) -> list:
    """Returns the conversion factor given single unit of measures.

    Args:
        udm_from (Union[str, List[Union[str, None]], None]): the ordered starting unit of measures.
        udm_to (Union[str, List[Union[str, None]], None]): the ordered desired unit of measures.
        conv_fact (DataFrame): DataFrame with conversion factors.

    Returns:
        list: the final conversion factors.
    """
    if isinstance(udm_from, str) or udm_from is None:
        udm_from = [udm_from]
    if isinstance(udm_to, str) or udm_to is None:
        udm_to = [udm_to]
    fact_output = []
    for from_value, to_value in zip(udm_from, udm_to):
        if (from_value is None and to_value is None) or (from_value == to_value):
            fact_output.append(1)
        elif from_value is None or to_value is None:
            fact_output.append(None)
        else:
            factors = (
                conv_fact.filter((F.col("udm_from") == from_value) & (F.col("udm_to") == to_value))
                .select("factor")
                .rdd.flatMap(lambda x: x)
                .collect()
            )
            fact_output.append(factors[0])

    return fact_output

Returns the conversion factor given single unit of measures.

Args

udm_from : Union[str, List[Union[str, None]], None]
the ordered starting unit of measures.
udm_to : Union[str, List[Union[str, None]], None]
the ordered desired unit of measures.
conv_fact : DataFrame
DataFrame with conversion factors.

Returns

list
the final conversion factors.
def join_coalesce(sdf_x: pyspark.sql.dataframe.DataFrame,
sdf_y: pyspark.sql.dataframe.DataFrame,
on: str | list | pyspark.sql.column.Column,
x_cols: List,
y_cols: List | None = None,
how: str = 'fullouter',
aliases: List | None = None,
is_priority_left: bool = True,
drop_original_cols: bool = True) ‑> pyspark.sql.dataframe.DataFrame
Expand source code
def join_coalesce(
    sdf_x: DataFrame,
    sdf_y: DataFrame,
    on: Union[str, list, Column],
    x_cols: List,
    y_cols: Union[List, None] = None,
    how: str = "fullouter",
    aliases: Union[List, None] = None,
    is_priority_left: bool = True,
    drop_original_cols: bool = True,
) -> DataFrame:
    """Merge two dataframes and then coalesce on or more pair of columns (coming from both dataframes) using a specific priority rule, by default columns in the left dataframe will have priority.
    User can also define merge behaviour.

    Args:
        left (DataFrame): one of the two spark dataframes to join (will be treated as left table in the join operation) and will have the default priority.
        right (DataFrame): one of the two spark dataframes to join (will be treated as right table in the join operation).
        on (str, list, Column): a string for the join column name, a list of column names, a join expression (Column), or a list of Columns. If on is a string or a list of strings indicating the name of the join column(s), the column(s) must exist on both sides, and this performs an equi-join.
        how (str): default fullouter. Must be one of: inner, cross, outer, full, fullouter, full_outer, left, leftouter, left_outer, right, rightouter, right_outer, semi, leftsemi, left_semi, anti, leftanti and left_anti.
        x_cols (List): ordered list of columns to coalesce from left dataframe. Order must be the same as "y_cols"
        y_cols (Union[List, None]): ordered list of columns to coalesce from left dataframe. Order must be the same as "x_cols". If empty the names will be considered the same as x_cols.
        aliases (Union[List, None]): ordered list of new name to give to coalesced columns. If empty the new columns will be called as the original columns in the left dataframe.
        is_priority_left (bool (Optional)): indicates if the columns in the left dataframe have priority. Default is True.
        drop_original_cols (bool (Optional)): indicates if the originale columns need to be dropped after coalesce step. Default is True.

    Returns:
        DataFrame: new dataframe is the result of the combined merge and coalesce operations.
    """

    # Alias both dataframes
    sdf_x = sdf_x.alias("x")
    sdf_y = sdf_y.alias("y")

    # Merge the two dataframes
    final = sdf_x.join(sdf_y, on=on, how=how)

    # Check if all input has same lenght
    if y_cols is None:
        y_cols = x_cols

    if len(x_cols) == len(y_cols):
        if aliases == None or len(aliases) == len(x_cols):
            pass  # all check ok
        else:
            raise ValueError("If providing aliases, it must have same lenght as x_cols and y_cols")
    else:
        raise ValueError("x_cols and y_cols must contain the same number of cols")

    # Create aliases list if not provided using x_cols
    if aliases == None:
        aliases = x_cols
    else:
        pass  # Let's use provided aliases

    # Determine priority
    if is_priority_left == True:
        priority_cols = ["x." + c for c in x_cols]
        secondary_cols = ["y." + c for c in y_cols]
    else:
        priority_cols = ["y." + c for c in y_cols]
        secondary_cols = ["x." + c for c in x_cols]

    # Iterate over columns pairs and perform all coalesce operations
    for i in range(0, len(x_cols), 1):
        final = final.select("*", F.coalesce(final[priority_cols[i]], final[secondary_cols[i]]).alias(aliases[i]))

        # If needed, drop original columns
        if drop_original_cols == True:
            final = final.drop(F.col(priority_cols[i]))
            final = final.drop(F.col(secondary_cols[i]))
        else:
            pass  # Do nothing

    return final

Merge two dataframes and then coalesce on or more pair of columns (coming from both dataframes) using a specific priority rule, by default columns in the left dataframe will have priority. User can also define merge behaviour.

Args

left : DataFrame
one of the two spark dataframes to join (will be treated as left table in the join operation) and will have the default priority.
right : DataFrame
one of the two spark dataframes to join (will be treated as right table in the join operation).
on : str, list, Column
a string for the join column name, a list of column names, a join expression (Column), or a list of Columns. If on is a string or a list of strings indicating the name of the join column(s), the column(s) must exist on both sides, and this performs an equi-join.
how : str
default fullouter. Must be one of: inner, cross, outer, full, fullouter, full_outer, left, leftouter, left_outer, right, rightouter, right_outer, semi, leftsemi, left_semi, anti, leftanti and left_anti.
x_cols : List
ordered list of columns to coalesce from left dataframe. Order must be the same as "y_cols"
y_cols : Union[List, None]
ordered list of columns to coalesce from left dataframe. Order must be the same as "x_cols". If empty the names will be considered the same as x_cols.
aliases : Union[List, None]
ordered list of new name to give to coalesced columns. If empty the new columns will be called as the original columns in the left dataframe.

is_priority_left (bool (Optional)): indicates if the columns in the left dataframe have priority. Default is True. drop_original_cols (bool (Optional)): indicates if the originale columns need to be dropped after coalesce step. Default is True.

Returns

DataFrame
new dataframe is the result of the combined merge and coalesce operations.
def join_coalesce_common_cols(sdf_x: pyspark.sql.dataframe.DataFrame,
sdf_y: pyspark.sql.dataframe.DataFrame,
on: str | list | pyspark.sql.column.Column,
how: str = 'inner') ‑> pyspark.sql.dataframe.DataFrame
Expand source code
def join_coalesce_common_cols(
    sdf_x: DataFrame, sdf_y: DataFrame, on: Union[str, list, Column], how: str = "inner"
) -> DataFrame:
    """Join two dataframes and remove duplicated columns by coalescing the columns with the same name in sdf_x and sdf_y

    Args:
        sdf_x (DataFrame): one of the two spark dataframes to join (will be treated as left table in the join operation) and will have priority in coalesce.
        sdf_y (DataFrame): one of the two spark dataframes to join (will be treated as right table in the join operation).
        on (str, list): a string for the join column name, a list of column names, a join expression (Column), or a list of Columns. If on is a string or a list of strings indicating the name of the join column(s), the column(s) must exist on both sides, and this performs an equi-join.
        how (str): default fullouter. Must be one of: inner, cross, outer, full, fullouter, full_outer, left, leftouter, left_outer, right, rightouter, right_outer, semi, leftsemi, left_semi, anti, leftanti and left_anti.

    Returns:
        DataFrame: new dataframe is the result of the combined merge and coalesce operations.
    """
    col_x = set(sdf_x.columns)
    col_y = set(sdf_y.columns)
    common_cols = list(col_x & col_y)
    sdf_join = join_coalesce(sdf_x, sdf_y, on=on, x_cols=common_cols, how=how)

    return sdf_join

Join two dataframes and remove duplicated columns by coalescing the columns with the same name in sdf_x and sdf_y

Args

sdf_x : DataFrame
one of the two spark dataframes to join (will be treated as left table in the join operation) and will have priority in coalesce.
sdf_y : DataFrame
one of the two spark dataframes to join (will be treated as right table in the join operation).
on : str, list
a string for the join column name, a list of column names, a join expression (Column), or a list of Columns. If on is a string or a list of strings indicating the name of the join column(s), the column(s) must exist on both sides, and this performs an equi-join.
how : str
default fullouter. Must be one of: inner, cross, outer, full, fullouter, full_outer, left, leftouter, left_outer, right, rightouter, right_outer, semi, leftsemi, left_semi, anti, leftanti and left_anti.

Returns

DataFrame
new dataframe is the result of the combined merge and coalesce operations.