

class TimeSeriesDataFrame (sdf: DataFrame, granularity: _GRANULARITY_TYPING, date_cols: Union[str, List[str]], value_cols: Union[str, List[str]], key_cols: Union[str, List[str], None] = None, load: _LOAD_TYPING = None, serie_type: _SERIE_TYPE_TYPING = None, udm: Union[str, List[str], None] = None)
Expand source code
class TimeSeriesDataFrame(PanamaDataFrame):
    granularity_dic = {"interval": 0, "year": 1, "quarter": 2, "month": 3, "day": 4, "hour": 5}
    agg_type_dic = {"volume": "sum", "price": "mean", "amount": "sum"}
    load_dic = {"F": ["F1", "F2", "F3"], "PO": ["PL", "OP"], "PO2": ["PL2", "OP2"]}

    def __init__(
        sdf: DataFrame,
        granularity: _GRANULARITY_TYPING,
        date_cols: Union[str, List[str]],
        value_cols: Union[str, List[str]],
        key_cols: Union[str, List[str], None] = None,
        load: _LOAD_TYPING = None,
        serie_type: _SERIE_TYPE_TYPING = None,
        udm: Union[str, List[str], None] = None,
        super().__init__(sdf=sdf, key_cols=key_cols, date_cols=date_cols)
        self.add_columns(key="value_cols", value=value_cols)
        _check_columns_in_df(sdf, value_cols=value_cols)
        self.add_columns(key="serie_type", value=list_value_append(serie_type, drop_null=False))
        self.add_columns(key="udm", value=list_value_append(udm, drop_null=False))

        self.granularity = cast(_GRANULARITY_TYPING, granularity)

        load = cast(_LOAD_TYPING, load)
        self.load = list_value_append(load)


    def _set_lists_len(self) -> None:
        """Function used to set the list length for the serie_type and the udm_type"""
        value_cols = self.get_columns_by_name("value")
        serie_type = self.get_columns_by_name("serie_type")
        udm = self.get_columns_by_name("udm")
        check = check_list_len(reference=value_cols, to_check=serie_type)
        # we are left with the case of n_serie_type < n_value_cols and n_serie_type == 1
        if check is True:
            if len(serie_type) == 1:
                self.add_columns(key="serie_type", value=serie_type * len(value_cols))

        check = check_list_len(reference=value_cols, to_check=udm, to_check_label="udm")
        # we are left with the case of n_serie_type < n_value_cols and n_serie_type == 1
        if check is True:
            if len(udm) == 1:
                self.add_columns(key="udm", value=udm * len(value_cols))

    def _set_load_col(self) -> None:
        """Function used to set the load col. Wrapper of add_columns, checks for validity of the column.

            ValueError: load column {load_col} not in sdf.columns. Available values are {self.sdf.columns}.
            ValueError: No consistency between load expected values {allowed_values} and load column {load_col} content.
        load_cols = []
        for load in self.load:
            load_col = f"load_{load.lower()}"
            if load_col not in self.sdf.columns:
                raise ValueError(f"load column {load_col} not in sdf.columns. Available values are {self.sdf.columns}.")

            allowed_values = self.load_dic[load]
            filtered_df = self.sdf.filter(~F.col(load_col).isin(allowed_values))

            if not filtered_df.isEmpty():
                raise ValueError(
                    f"No consistency between load expected values {allowed_values} and load column {load_col} content."

        self.add_columns(key="load_col", value=load_cols)
        self.update_columns(key="temporal_cols", value=load_cols)

    def __getattr__(self, attr: str):
        df_dict = DataFrame.__dict__
        if attr in df_dict:
            sdf_attr = getattr(self.sdf, attr)
            if callable(sdf_attr):

                def method_wrapper(*args, **kwargs):
                    method_result = sdf_attr(*args, **kwargs)
                    if isinstance(method_result, DataFrame):
                        value_cols = self.get_columns_by_name("value")
                        serie_type = self.get_columns_by_name("serie_type")
                        udm = self.get_columns_by_name("udm")
                        _check_columns_to_keep(sdf=method_result, method=attr, values=value_cols, raises=False)

                        value_indexes = [value_cols.index(i) for i in value_cols if i in method_result.columns]
                        new_value_cols = [value_cols[i] for i in value_indexes]
                        new_serie_type = [serie_type[i] for i in value_indexes]
                        new_udm = [udm[i] for i in value_indexes]

                        return TimeSeriesDataFrame(
                            granularity=self.granularity,  # type: ignore
                            serie_type=new_serie_type,  # type: ignore
                        return method_result

                return method_wrapper
                return sdf_attr
            raise AttributeError(f"TimeSeriesDataFrame object has no attribute '{attr}'.")

    def order_by_granularity(tsdf_list: List[TimeSeriesDataFrame], reverse: bool = False) -> List[TimeSeriesDataFrame]:
        """Sorts a list of TimeSeriesDataFrame objects by their granularity attribute in ascending or descending order.

            tsdf_list: list of TimeSeriesDataFrame.
            reverse: ordering in ascending (False) or descending order (True). Defaults to False.

            List[TimeSeriesDataFrame]: the ordered original list.
        return sorted(tsdf_list, key=lambda x: TimeSeriesDataFrame.granularity_dic.get(x.granularity), reverse=reverse)  # type: ignore

    def automatic_join(
        self, sdf_y: Union[PanamaDataFrame, TimeSeriesDataFrame], how: str = "inner"
    ) -> TimeSeriesDataFrame:
        """Joins two TimeSeriesDataFrame objects based on their common keys and temporal information.

            sdf_y (Union[PanamaDataFrame, TimeSeriesDataFrame]): a TimeSeriesDataFrame or PanamaDataFrame.
            how (str, optional): the join type (see DataFrame join method). Defaults to "inner".

            TimeSeriesDataFrame: the resulting TimeSeriesDataFrame.

        # Find minimum granularity and add auxiliary column for temporal join
        if isinstance(sdf_y, TimeSeriesDataFrame):
            lower_g, higher_g = TimeSeriesDataFrame.order_by_granularity([self, sdf_y])
            max_granularity = higher_g.granularity
            is_interval_join = (self.granularity == "interval") | (sdf_y.granularity == "interval")
            if is_interval_join is False:
                higher_g.add_col_date(granularity=lower_g.granularity, output_col=lower_g.get_columns_by_name("date"))
            final_load = self.load if self.load is not None else sdf_y.load

            max_granularity = self.granularity
            final_load = self.load

        # Join using Panama DataFrame method
        join_psdf = super().automatic_join(sdf_y, how)
        join_ts = TimeSeriesDataFrame(
            granularity=max_granularity,  # type: ignore
            serie_type=self.get_columns_by_name("serie_type"),  # type: ignore

        if isinstance(sdf_y, TimeSeriesDataFrame):
            join_ts = join_ts.add_time_series(

        return join_ts

    def display(self):
        """Prints the granularity and load type of the time series and displays the DataFrame"""
        print(f"Time series with granularity {self.granularity} and load {self.load}")
        self.sdf.display()  # type: ignore

    def select_value_cols_udm_serie_type(self, serie_name: Union[str, List[str]]) -> tuple:
        """Returns position of selected serie_name in self.value_cols and the corresponding serie_type and udm.

            serie_name (Union[str, List[str]]): the selected value_cols.

            tuple: tuple with position, serie_type and udm.
        serie_name = list_value_append(serie_name)
        serie_value_cols = self.get_columns_by_name("value")
        serie_type_tot = self.get_columns_by_name("serie_type")
        udm_tot = self.get_columns_by_name("udm")
        pos = [serie_value_cols.index(s) for s in serie_name]
        serie_type = [serie_type_tot[p] for p in pos]
        udm = [udm_tot[p] for p in pos]

        return pos, serie_type, udm

    def add_time_series(
        col_to_add: Union[str, List[str]],
        serie_type: Union[str, List[str]],
        udm: Union[str, List[str]],
        col: Union[F.Column, List[F.Column], None] = None,
    ) -> TimeSeriesDataFrame:
        """Adds time_series to TimeSeriesDataFrame.

            col_to_add (Union[str, List[str]]): names of the column to add.
            col (Union[F.Column, List[F.Column]]): column expression to get new desired values.
            serie_type (Union[str, List[str]]): serie_type of new columns.
            udm (Union[str, List[str]]): unit of measure of new columns.

            TimeSeriesDataFrame: The final TimeSeriesDataFrame with new added columns.
        # Get new attributes
        tmp_value_cols = list_value_append(self.get_columns_by_name("value"), col_to_add)
        tmp_serie_type = list_value_append(self.get_columns_by_name("serie_type"), serie_type, drop_null=False)
        tmp_udm = list_value_append(self.get_columns_by_name("udm"), udm, drop_null=False)
        col_to_add = list_value_append(col_to_add)
        num_col_to_add = len(col_to_add)

        # Add columns
        tmp_sdf = self.sdf
        if col is not None:
            col = list_value_append(col)
            for i in range(num_col_to_add):
                tmp_sdf = tmp_sdf.withColumn(col_to_add[i], col[i])

        return TimeSeriesDataFrame(
            granularity=self.granularity,  # type: ignore

    def drop(self, *args) -> TimeSeriesDataFrame:
        """Drops given columns from the sdf of a TimeSeriesDataFrame object together with corresponding class attributes.

            col_to_drop (Union[str, List[str]]): list o string with the name of the column to drop.

            TimeSeriesDataFrame: the dropped TimeSeriesDataFrame.

        col_to_drop = list(args)
        value_cols = serie_col = self.get_columns_by_name("value")
        serie_type = serie_type_col = self.get_columns_by_name("serie_type")
        udm = udm_list = self.get_columns_by_name("udm")
        tmp_sdf = self.sdf
        temporal_cols = self.get_columns_by_name("temporal")

        key_cols = list_value_append(self.get_columns_by_name("key"))

        for drop_col in col_to_drop:
            # drop attributes in TimeSeriesDataFrame if possible
            if (drop_col in temporal_cols) or (drop_col in key_cols):
                raise ValueError("Cannot drop temporal cols (date or load) or key_cols in TimeSeriesDataFrame")
            elif drop_col in serie_col:
                if len(serie_col) == 1:
                    raise ValueError("Cannot drop value_cols in TimeSeriesDataFrame when it is only one")
                    pos_attribute_values = serie_col.index(drop_col)
                    value_cols = serie_col
                    serie_type = serie_type_col
                    udm = udm_list
            # drop column in sdf
            tmp_sdf = tmp_sdf.drop(drop_col)

        return TimeSeriesDataFrame(
            granularity=self.granularity,  # type: ignore
            serie_type=from_list_to_str(serie_type),  # type: ignore

    def get_aggr_fun(self) -> Union[str, List[Any]]:
        """Get the aggregation function corresponding to the serie_type of a TimeSeriesDataFrame.

            Union[str, List[str]]: the list with the aggregation function.
        keys_agg = [TimeSeriesDataFrame.agg_type_dic.get(k, None) for k in self.get_columns_by_name("serie_type")]

        return keys_agg

    def get_serie_type_dictionary(self) -> dict:
        """Get the dictionary that map each value_cols to the corresponding aggregation function.

            dict: the corresponding dictionary
        # Define key and values of the dictionary
        keys_agg = self.get_aggr_fun()
        values_agg = self.get_columns_by_name("value")

        # Create the dictionary
        key_value_pairs = zip(values_agg, keys_agg)
        dictionary = dict(key_value_pairs)

        return dictionary

    def aggregate(
        granularity: Literal["day", "hour", "month", "quarter", "year"],
        load: Union[_LOAD_TYPING, bool] = True,
        partition_cols: Union[List[str], None] = None,
        value_cols: Union[List[str], None] = None,
        aggr_functions: Union[List[str], None] = None,
        optional_aggr: Union[Dict[str, str], None] = None,
    ) -> TimeSeriesDataFrame:
        """Method for aggregating based on granularity and partition columns.

        NOTE: 'interval' granularity is not yet supported.

            granularity (Literal[&quot;day&quot;, &quot;hour&quot;, &quot;month&quot;, &quot;quarter&quot;, &quot;year&quot;]): output granularity.
            load (Union[ List[Literal[&quot;F&quot;, &quot;PO&quot;, &quot;PO2&quot;, &quot;f&quot;, &quot;po&quot;, &quot;po2&quot;]],
                Literal[&quot;F&quot;, &quot;PO&quot;, &quot;PO2&quot;, &quot;f&quot;, &quot;po&quot;, &quot;po2&quot;], bool ], optional): load types to use.
                    If False no load is used for aggregation. If True all load columns are used. If False no load is used. Defaults to True.
            partition_cols (Union[List[str], None], optional): list of other columns to use as partitions. If None, key_cols are used. Defaults to None.
            value_cols (Union[List[str], None], optional): columns to aggregate. If None, all the value columns are aggregated. Defaults to None.
            aggr_functions (Union[List[str], None], optional): functions used to aggregate the data. A single function per value column is allowed.
                If None, default functions are used. Defaults to None.
            optional_aggr (Union[Dict[str, str], None], optional): optional columns to aggregate. A dict {column<str>: aggr_function<str>} is expected.
                If None, no other columns are aggregated. Defaults to None.

            ValueError: if a non supported granularity is passed.

            TimeSeriesDataFrame: aggregated time series dataframe.

        supported_granularities = ("day", "hour", "month", "quarter", "year")

        # get self special columns lists
        current_key_cols = self.get_columns_by_name("key")
        current_value_cols = self.get_columns_by_name("value")
        current_serie_types = self.get_columns_by_name("serie_type")
        current_udm = self.get_columns_by_name("udm")
        current_load_col = self.get_columns_by_name("load")
        value_type_mapping = {k: v for k, v in zip(current_value_cols, current_serie_types)}
        value_udm_mapping = {k: v for k, v in zip(current_value_cols, current_udm)}

        # check granularity
        if granularity not in supported_granularities:
            raise ValueError(
                f"granularity {granularity} not yet supported. Supported granularities are {supported_granularities}."

        # if no partition is passed, key_columns are used as partition columns
        if partition_cols is None:
            partition_cols = current_key_cols

        # create granularity column: this will be the output date_col

        # check if load is required
        if load is None or load is False:
            output_load = None
        elif load is True:
            output_load = self.load
        elif isinstance(load, str):
            output_load = load
        elif isinstance(load, list):
            output_load = load
            load_cols = [f"load_{i.lower()}" for i in load]

        # get the value_cols
        if value_cols is None:
            value_cols = current_value_cols

        # get the aggregation functions
        value_type_dict = self.get_serie_type_dictionary()
        if aggr_functions is None:
            aggr_functions = []
            # if no aggregation is provided, use the default
            for v in value_cols:

        # if a single aggregation function is passed, use that function for all value cols
        if len(aggr_functions) == 1 and len(value_cols) > 1:
            aggr_functions = aggr_functions * len(value_cols)

        # generate aggregation dict
        aggr_dict = {col: f for col, f in list(zip(value_cols, aggr_functions))}

        # if extra aggregations are provided
        if optional_aggr is None:
            optional_aggr = dict()
        # add those aggregations to the others
        aggr_dict = {**aggr_dict, **optional_aggr}

        # if by a weird situation some columns are doubled in partition cols, deduplicate
        partition_cols = list(set(partition_cols))

        output_sdf = self.sdf.groupBy(*partition_cols).agg(aggr_dict)
        # generate output columns
        output_sdf = format_aggregated_columns(output_sdf)
        # get the output value_cols
        output_value_cols = [i for i in aggr_dict.keys() if i in current_value_cols]
        # create the key cols from the partition cols, without the load
        output_key_cols = [i for i in partition_cols if i not in (granularity, current_load_col)]
        # create the serie type from the value_cols
        output_serie_type = [value_type_mapping[i] for i in output_value_cols]
        # create the output udm
        output_udm = [value_udm_mapping[i] for i in output_value_cols]

        return TimeSeriesDataFrame(
            sdf=output_sdf,  # type: ignore
            serie_type=output_serie_type,  # type: ignore

    def expand_flat(
        calendar: DataFrame,
        granularity: str,
        serie_name: Union[List, List[str], None] = None,
        load_calendar_col: Union[List[str], str, None] = None,
    ) -> TimeSeriesDataFrame:
        """Expands a given profile based on a calendar column.

            calendar (DataFrame): the calendar DataFrame.
            granularity (str): a valid granularity desired.
            serie_name (Union[List, List[str]], optional): the value_col to be expanded.
                If None the method will be applied on all value_cols of the TimeSeriesDataFrame. Default is None.
            load_calendar_col (str, optional): the name of load column in calendar DataFrame to be used in case of self without load. Default is None.

            TimeSeriesDataFrame: the resulting TimeSeriesDataFrame.

        # Identify the load col
        load_col = self.get_columns_by_name("load")
        if load_calendar_col is None:
            load_calendar_col = self.get_columns_by_name("load")
            load_calendar_col = list_value_append(load_calendar_col)

        # generate output load columns
        if len(load_calendar_col) > 0:
            output_load_col = [i.split("_")[1].upper() for i in load_calendar_col]
            output_load_col = []

        # Adjust calendar DataFrame and create flat profile
        calendar = add_col_date(sdf=calendar, granularity=granularity, date_col="date", output_col=granularity)
        calendar =*load_calendar_col, granularity)
        if granularity != "hour":
            calendar = calendar.distinct()
        calendar = calendar.withColumn("one_col", F.lit(1))
        calendar_ts = TimeSeriesDataFrame(
            granularity=granularity,  # type: ignore
            load=output_load_col,  # type: ignore

        return self.expand_with_profile(calendar_ts, serie_name=serie_name)

    def expand_with_profile(
        profile: TimeSeriesDataFrame,
        standardize: bool = True,
        serie_name: Union[List, List[str], None] = None,
        profile_name: Union[List, List[str], None] = None,
    ) -> TimeSeriesDataFrame:
        """Reprofiles the value_cols of a TimeSeriesDataFrame given a specific profile.

            profile (TimeSeriesDataFrame): the TimeSeriesDataFrame with the profile to be used.
            standardize (bool): flag to choose if standardize value cols. Default is True.
            serie_name (Union[List, List[str]], optional): the self.value_cols to be profiled. If None the method will be applied to all value_cols of TimeSeriesDataFrame.
            profile_name (Union[List, List[str]], optional): the profile.value_cols to use for profiling. If None the method will be applied to all value_cols of TimeSeriesDataFrame.

            TimeSeriesDataFrame: the resulting TimeSeriesDataFrame.
        if serie_name is None:
            serie_name = self.get_columns_by_name("value")
            serie_name = list_value_append(serie_name)
        if profile_name is None:
            profile_value_cols = profile.get_columns_by_name("value")
            profile_value_cols = list_value_append(profile_name)

        # Identify load column
        load_col = self.get_columns_by_name("load")
        # Join
        output = self.automatic_join(profile)

        # Standardize the given profile for the key (load, id, date) and reprofile
        key_prof = list_value_append(self.get_columns_by_name("key"), load_col, self.get_columns_by_name("date"))
        w_key_prof = Window.partitionBy(key_prof)
        keys_agg = self.get_aggr_fun()

        # Check on lengths
        len_serie_to_profile = len(serie_name)
        if len(profile_value_cols) == 1 and len_serie_to_profile > 1:
            print("The same profile is used to reprofile all time series.")
            profile_value_cols = profile_value_cols * len_serie_to_profile

        for i in range(len(serie_name)):
            if standardize:
                output = output.withColumn(
                    / getattr(F, keys_agg[i])(F.col(profile_value_cols[i])).over(w_key_prof),
            output = output.withColumn(serie_name[i], F.col(serie_name[i]) * F.col(profile_value_cols[i]))

        output_std = output.drop(*profile.get_columns_by_name("value"))

        return output_std

    def total_amount(
        unitary_amount: TimeSeriesDataFrame,
        col_to_add: Union[str, List[str]],
        volume_name: Union[str, List[str], None] = None,
        price_name: Union[str, List[str], None] = None,
    ) -> TimeSeriesDataFrame:
        """Computes a total amount given self TimeSeriesDataFrame of volumes and unitary_amount TimeSeriesDataFrame of unitary prices.

            unitary_amount (TimeSeriesDataFrame): TimeSeriesDataFrame with unitary amount.
            col_to_add (Union[str, List[str]]): names of new columns.
            volume_name (Union[str, List[str]], optional): names of volume columns to use. Defaults to None.
            price_name (Union[str, List[str]], optional): names of unitary price columns to use. Defaults to None.

            TimeSeriesDataFrame: the resulting TimeSeriesDataFrame.
        volume_udm = self.get_columns_by_name("udm")
        serie_type = list_value_append("amount")
        unitary_price_udm = unitary_amount.get_columns_by_name("udm")

        # Get lists of input
        col_to_add = list_value_append(col_to_add)
        if volume_name is None:
            volume_cols = self.get_columns_by_name("value")
            volume_cols = list_value_append(volume_name)
            select_attr = self.select_value_cols_udm_serie_type(volume_name)
            volume_udm = select_attr[2]
        if price_name is None:
            price_cols = unitary_amount.get_columns_by_name("value")
            price_cols = list_value_append(price_name)
            select_attr = unitary_amount.select_value_cols_udm_serie_type(price_name)
            unitary_price_udm = select_attr[2]

        # Check on input
        len_add_col = len(col_to_add)
        serie_type = serie_type * len_add_col
        if len_add_col > 1:
            if len(volume_cols) == 1:
                print("The same quantity is used in the total amount computation.")
                volume_cols = volume_cols * len_add_col
                volume_udm = volume_udm * len_add_col
            if len(price_cols) == 1:
                print("The same unitary price is used in the total amount computation.")
                price_cols = price_cols * len_add_col
                unitary_price_udm = unitary_price_udm * len_add_col
        col = [F.col(volume) * F.col(price) for volume, price in zip(volume_cols, price_cols)]
        final_udm = [udm.split("/")[0] for udm in unitary_price_udm]

        # Check on input lengths and on udm
        if len_add_col != len(col):
            raise ValueError(f"Input parameters {col_to_add} and {col} are not of equal length.")
        if len_add_col != len(serie_type):
            raise ValueError(f"Input parameters {col_to_add} and {serie_type} are not of equal length.")
        if len_add_col != len(final_udm):
            raise ValueError(f"Input parameters {col_to_add} and {final_udm} are not of equal length.")

        price_volume_udm = [udm.split("/")[-1] for udm in unitary_price_udm]
        if price_volume_udm != volume_udm:
            raise ValueError("Different unit of measures, a conversion is needed.")

        output = self.automatic_join(unitary_amount)
        output = output.add_time_series(col_to_add=col_to_add, serie_type=serie_type, udm=final_udm, col=col)
        return output

    def conv_intermediate_pcs(
        udm_from: Union[str, List[str]],
        udm_to: Union[str, List[str]],
        pcs: PanamaDataFrame,
        value_col_conv: Union[str, List[str]],
        conv_fact: DataFrame,
        drop_pcs: bool = True,
    ) -> TimeSeriesDataFrame:
        """Calculates the intermediate pcs conversion from Smc to GJ or viceversa.

            udm_from (Union[str, List[str]]): the starting unit of measures.
            udm_to (Union[str, List[str]]): the final unit of measures.
            pcs (PanamaDataFrame): PanamaDataFrame with pcs column.
            value_col_conv (Union[str, List[str]]): name of the values to convert.
            conv_fact (DataFrame): the conversion factor DataFrame with udm_from, udm_to and factor columns.
            drop_pcs (bool, optional): indicates if the pcs time series needs to be dropped. Defaults to True.

            TimeSeriesDataFrame: TimeSeriesDataFrame with pcs conversion factor for each value_col_conv.
        output = self.automatic_join(sdf_y=pcs, how="left")

        # Convert null pcs into default
        default_pcs = get_single_conv_factor("Smc", "GJ", conv_fact)[0]
        output = output.withColumn("pcs", F.coalesce(F.col("pcs"), F.lit(default_pcs)))

        # Calculate pcs intermediate conversion factor
        for i in range(len(udm_to)):
            if "Smc" in udm_from[i]:
                output = output.withColumn(value_col_conv[i] + "_pcs_fact", 1 * F.col("pcs"))
                udm_from[i] = udm_from[i].replace("Smc", "GJ")  # type: ignore
            elif "Smc" in udm_to[i]:
                output = output.withColumn(value_col_conv[i] + "_pcs_fact", 1 / F.col("pcs"))
                udm_to[i] = udm_to[i].replace("Smc", "GJ")  # type: ignore
                output = output.withColumn(value_col_conv[i] + "_pcs_fact", F.lit(1))
            if "/" in udm_from[i] and "/" in udm_to[i]:
                output = output.withColumn(value_col_conv[i] + "_pcs_fact", 1 / F.col(value_col_conv[i] + "_pcs_fact"))

        # Drop pcs time series
        if drop_pcs:
            output = output.drop("pcs")

        return output

    def conv_udm(
        final_udm: Union[str, List[str]],
        conv_fact: DataFrame,
        serie_name: Union[str, List[str], None] = None,
        pcs: Union[PanamaDataFrame, None] = None,
        drop_pcs: bool = True,
    ) -> TimeSeriesDataFrame:
        """Converts unit of measure of a TimeSeriesDataFrame.

            final_udm (Union[str, List[str]]): the desired unit of measure.
            conv_fact (DataFrame): the conversion factor DataFrame with udm_from, udm_to and factor columns.
            serie_name (Union[str, List[str]], optional): the self.value_cols to be converted. If None the method will
                be applied to all value_cols of TimeSeriesDataFrame. Defaults to None.
            pcs (PanamaDataFrame, optional): PanamaDataFrame with pcs factors. Defaults to None.
            drop_pcs (bool, optional): indicates if the pcs time series needs to be dropped in the conv_intermediate_pcs step. Defaults to True.

            TimeSeriesDataFrame: the converted TimeSeriesDataFrame.
        # Get lists
        value_cols_list = self.get_columns_by_name("value")
        final_udm = list_value_append(final_udm)
        original_udm = self.get_columns_by_name("udm")
        serie_name_conv = value_cols_list if serie_name is None else list_value_append(serie_name)

        # Check on input
        len_serie_name, len_final_udm = len(serie_name_conv), len(final_udm)
        if len_serie_name != len_final_udm:
            if len(final_udm) == 1:
                print("All the time series will be converted to the same final unit of measure.")
                final_udm = final_udm * len(serie_name_conv)
                raise ValueError(
                    f"The given final_udm {final_udm} has not same length of the series to convert {serie_name_conv}."

        # Select serie_udm used for conversion and udm_to_return returned with the final TimeSeriesDataFrame
        if serie_name is None:
            serie_udm = original_udm
            udm_to_return = from_list_to_str(final_udm.copy())
            select_attr = self.select_value_cols_udm_serie_type(serie_name_conv)
            pos_udm = select_attr[0]
            serie_udm = select_attr[2]
            udm_to_return = self.get_columns_by_name("udm").copy()  # type: ignore
            for pos in pos_udm:
                udm_to_return[pos] = from_list_to_str(final_udm[pos_udm.index(pos)])

        # PCS intermediate conversion
        if pcs is not None:
            output = self.conv_intermediate_pcs(serie_udm, final_udm, pcs, serie_name_conv, conv_fact, drop_pcs)
            output_sdf = output.sdf
            output = self
            output_sdf = self.sdf

        # Traditional final conversion
        conv_factors = get_composite_conv_factor(serie_udm, final_udm, conv_fact)
        for i in range(len_serie_name):
            pcs_col_name = serie_name_conv[i] + "_pcs_fact"
            if pcs_col_name in output_sdf.columns:
                output_sdf = output_sdf.withColumn(
                    serie_name_conv[i], F.col(serie_name_conv[i]) * conv_factors[i] * F.col(pcs_col_name)
                output_sdf = output_sdf.withColumn(serie_name_conv[i], F.col(serie_name_conv[i]) * conv_factors[i])

        return TimeSeriesDataFrame(
            granularity=output.granularity,  # type: ignore
            load=output.load,  # type: ignore
            serie_type=output.get_columns_by_name("serie_type"),  # type: ignore


Class variables

var agg_type_dic
var granularity_dic
var load_dic

Static methods

def order_by_granularity(tsdf_list: List[TimeSeriesDataFrame], reverse: bool = False) ‑> List[TimeSeriesDataFrame]

Sorts a list of TimeSeriesDataFrame objects by their granularity attribute in ascending or descending order.


list of TimeSeriesDataFrame.
ordering in ascending (False) or descending order (True). Defaults to False.


the ordered original list.


def add_time_series(self, col_to_add: Union[str, List[str]], serie_type: Union[str, List[str]], udm: Union[str, List[str]], col: Union[F.Column, List[F.Column], None] = None) ‑> TimeSeriesDataFrame

Adds time_series to TimeSeriesDataFrame.


col_to_add : Union[str, List[str]]
names of the column to add.
col : Union[F.Column, List[F.Column]]
column expression to get new desired values.
serie_type : Union[str, List[str]]
serie_type of new columns.
udm : Union[str, List[str]]
unit of measure of new columns.


The final TimeSeriesDataFrame with new added columns.
def aggregate(self, granularity: "Literal['day', 'hour', 'month', 'quarter', 'year']", load: Union[_LOAD_TYPING, bool] = True, partition_cols: Union[List[str], None] = None, value_cols: Union[List[str], None] = None, aggr_functions: Union[List[str], None] = None, optional_aggr: Union[Dict[str, str], None] = None)

Method for aggregating based on granularity and partition columns.

NOTE: 'interval' granularity is not yet supported.


granularity (Literal["day", "hour", "month", "quarter", "year"]): output granularity.
load (Union[ List[Literal["F", "PO", "PO2", "f", "po", "po2"]],
Literal["F", "PO", "PO2", "f", "po", "po2"], bool ], optional): load types to use.
If False no load is used for aggregation. If True all load columns are used. If False no load is used. Defaults to True.
partition_cols : Union[List[str], None], optional
list of other columns to use as partitions. If None, key_cols are used. Defaults to None.
value_cols : Union[List[str], None], optional
columns to aggregate. If None, all the value columns are aggregated. Defaults to None.
aggr_functions : Union[List[str], None], optional
functions used to aggregate the data. A single function per value column is allowed. If None, default functions are used. Defaults to None.
optional_aggr : Union[Dict[str, str], None], optional
optional columns to aggregate. A dict {column: aggr_function} is expected. If None, no other columns are aggregated. Defaults to None.


if a non supported granularity is passed.


aggregated time series dataframe.
def automatic_join(self, sdf_y: Union[PanamaDataFrame, TimeSeriesDataFrame], how: str = 'inner') ‑> TimeSeriesDataFrame

Joins two TimeSeriesDataFrame objects based on their common keys and temporal information.


sdf_y : Union[PanamaDataFrame, TimeSeriesDataFrame]
a TimeSeriesDataFrame or PanamaDataFrame.
how : str, optional
the join type (see DataFrame join method). Defaults to "inner".


the resulting TimeSeriesDataFrame.
def conv_intermediate_pcs(self, udm_from: Union[str, List[str]], udm_to: Union[str, List[str]], pcs: PanamaDataFrame, value_col_conv: Union[str, List[str]], conv_fact: DataFrame, drop_pcs: bool = True) ‑> TimeSeriesDataFrame

Calculates the intermediate pcs conversion from Smc to GJ or viceversa.


udm_from : Union[str, List[str]]
the starting unit of measures.
udm_to : Union[str, List[str]]
the final unit of measures.
pcs : PanamaDataFrame
PanamaDataFrame with pcs column.
value_col_conv : Union[str, List[str]]
name of the values to convert.
conv_fact : DataFrame
the conversion factor DataFrame with udm_from, udm_to and factor columns.
drop_pcs : bool, optional
indicates if the pcs time series needs to be dropped. Defaults to True.


TimeSeriesDataFrame with pcs conversion factor for each value_col_conv.
def conv_udm(self, final_udm: Union[str, List[str]], conv_fact: DataFrame, serie_name: Union[str, List[str], None] = None, pcs: Union[PanamaDataFrame, None] = None, drop_pcs: bool = True) ‑> TimeSeriesDataFrame

Converts unit of measure of a TimeSeriesDataFrame.


final_udm : Union[str, List[str]]
the desired unit of measure.
conv_fact : DataFrame
the conversion factor DataFrame with udm_from, udm_to and factor columns.
serie_name : Union[str, List[str]], optional
the self.value_cols to be converted. If None the method will be applied to all value_cols of TimeSeriesDataFrame. Defaults to None.
pcs : PanamaDataFrame, optional
PanamaDataFrame with pcs factors. Defaults to None.
drop_pcs : bool, optional
indicates if the pcs time series needs to be dropped in the conv_intermediate_pcs step. Defaults to True.


the converted TimeSeriesDataFrame.
def display(self)

Prints the granularity and load type of the time series and displays the DataFrame

def drop(self, *args) ‑> TimeSeriesDataFrame

Drops given columns from the sdf of a TimeSeriesDataFrame object together with corresponding class attributes.


col_to_drop : Union[str, List[str]]
list o string with the name of the column to drop.


the dropped TimeSeriesDataFrame.
def expand_flat(self, calendar: DataFrame, granularity: str, serie_name: Union[List, List[str], None] = None, load_calendar_col: Union[List[str], str, None] = None) ‑> TimeSeriesDataFrame

Expands a given profile based on a calendar column.


calendar : DataFrame
the calendar DataFrame.
granularity : str
a valid granularity desired.
serie_name : Union[List, List[str]], optional
the value_col to be expanded. If None the method will be applied on all value_cols of the TimeSeriesDataFrame. Default is None.
load_calendar_col : str, optional
the name of load column in calendar DataFrame to be used in case of self without load. Default is None.


the resulting TimeSeriesDataFrame.
def expand_with_profile(self, profile: TimeSeriesDataFrame, standardize: bool = True, serie_name: Union[List, List[str], None] = None, profile_name: Union[List, List[str], None] = None) ‑> TimeSeriesDataFrame

Reprofiles the value_cols of a TimeSeriesDataFrame given a specific profile.


profile : TimeSeriesDataFrame
the TimeSeriesDataFrame with the profile to be used.
standardize : bool
flag to choose if standardize value cols. Default is True.
serie_name : Union[List, List[str]], optional
the self.value_cols to be profiled. If None the method will be applied to all value_cols of TimeSeriesDataFrame.
profile_name : Union[List, List[str]], optional
the profile.value_cols to use for profiling. If None the method will be applied to all value_cols of TimeSeriesDataFrame.


the resulting TimeSeriesDataFrame.
def get_aggr_fun(self) ‑> Union[str, List[Any]]

Get the aggregation function corresponding to the serie_type of a TimeSeriesDataFrame.


Union[str, List[str]]
the list with the aggregation function.
def get_serie_type_dictionary(self) ‑> dict

Get the dictionary that map each value_cols to the corresponding aggregation function.


the corresponding dictionary
def select_value_cols_udm_serie_type(self, serie_name: Union[str, List[str]]) ‑> tuple

Returns position of selected serie_name in self.value_cols and the corresponding serie_type and udm.


serie_name : Union[str, List[str]]
the selected value_cols.


tuple with position, serie_type and udm.
def total_amount(self, unitary_amount: TimeSeriesDataFrame, col_to_add: Union[str, List[str]], volume_name: Union[str, List[str], None] = None, price_name: Union[str, List[str], None] = None) ‑> TimeSeriesDataFrame

Computes a total amount given self TimeSeriesDataFrame of volumes and unitary_amount TimeSeriesDataFrame of unitary prices.


unitary_amount : TimeSeriesDataFrame
TimeSeriesDataFrame with unitary amount.
col_to_add : Union[str, List[str]]
names of new columns.
volume_name : Union[str, List[str]], optional
names of volume columns to use. Defaults to None.
price_name : Union[str, List[str]], optional
names of unitary price columns to use. Defaults to None.


the resulting TimeSeriesDataFrame.

Inherited members