Module panama.analytics.time_series_data_frame

Classes

class TimeSeriesDataFrame (sdf: DataFrame, granularity: _GRANULARITY_TYPING, date_cols: Union[str, List[str]], value_cols: Union[str, List[str]], key_cols: Union[str, List[str], None] = None, load: _LOAD_TYPING = None, serie_type: _SERIE_TYPE_TYPING = None, udm: Union[str, List[str], None] = None)
Expand source code
class TimeSeriesDataFrame(PanamaDataFrame):
    granularity_dic = {"interval": 0, "year": 1, "quarter": 2, "month": 3, "day": 4, "hour": 5}
    agg_type_dic = {"volume": "sum", "price": "mean", "amount": "sum"}
    load_dic = {"F": ["F1", "F2", "F3"], "PO": ["PL", "OP"], "PO2": ["PL2", "OP2"]}

    def __init__(
        self,
        sdf: DataFrame,
        granularity: _GRANULARITY_TYPING,
        date_cols: Union[str, List[str]],
        value_cols: Union[str, List[str]],
        key_cols: Union[str, List[str], None] = None,
        load: _LOAD_TYPING = None,
        serie_type: _SERIE_TYPE_TYPING = None,
        udm: Union[str, List[str], None] = None,
    ):
        super().__init__(sdf=sdf, key_cols=key_cols, date_cols=date_cols)
        self.add_columns(key="value_cols", value=value_cols)
        _check_columns_in_df(sdf, value_cols=value_cols)
        self.add_columns(key="serie_type", value=list_value_append(serie_type, drop_null=False))
        self.add_columns(key="udm", value=list_value_append(udm, drop_null=False))

        _check_granularity(granularity)
        self.granularity = cast(_GRANULARITY_TYPING, granularity)

        _check_load(load)
        load = cast(_LOAD_TYPING, load)
        self.load = list_value_append(load)

        self._set_load_col()
        self._set_lists_len()

    def _set_lists_len(self) -> None:
        """Function used to set the list length for the serie_type and the udm_type"""
        value_cols = self.get_columns_by_name("value")
        serie_type = self.get_columns_by_name("serie_type")
        udm = self.get_columns_by_name("udm")
        check = check_list_len(reference=value_cols, to_check=serie_type)
        # we are left with the case of n_serie_type < n_value_cols and n_serie_type == 1
        if check is True:
            if len(serie_type) == 1:
                self.add_columns(key="serie_type", value=serie_type * len(value_cols))

        check = check_list_len(reference=value_cols, to_check=udm, to_check_label="udm")
        # we are left with the case of n_serie_type < n_value_cols and n_serie_type == 1
        if check is True:
            if len(udm) == 1:
                self.add_columns(key="udm", value=udm * len(value_cols))

    def _set_load_col(self) -> None:
        """Function used to set the load col. Wrapper of add_columns, checks for validity of the column.

        Raises:
            ValueError: load column {load_col} not in sdf.columns. Available values are {self.sdf.columns}.
            ValueError: No consistency between load expected values {allowed_values} and load column {load_col} content.
        """
        load_cols = []
        for load in self.load:
            load_col = f"load_{load.lower()}"
            load_cols.append(load_col)
            if load_col not in self.sdf.columns:
                raise ValueError(f"load column {load_col} not in sdf.columns. Available values are {self.sdf.columns}.")

            allowed_values = self.load_dic[load]
            filtered_df = self.sdf.filter(~F.col(load_col).isin(allowed_values))

            if not filtered_df.isEmpty():
                raise ValueError(
                    f"No consistency between load expected values {allowed_values} and load column {load_col} content."
                )

        self.add_columns(key="load_col", value=load_cols)
        self.update_columns(key="temporal_cols", value=load_cols)

    def __getattr__(self, attr: str):
        df_dict = DataFrame.__dict__
        if attr in df_dict:
            sdf_attr = getattr(self.sdf, attr)
            if callable(sdf_attr):

                @functools.wraps(sdf_attr)
                def method_wrapper(*args, **kwargs):
                    method_result = sdf_attr(*args, **kwargs)
                    if isinstance(method_result, DataFrame):
                        value_cols = self.get_columns_by_name("value")
                        serie_type = self.get_columns_by_name("serie_type")
                        udm = self.get_columns_by_name("udm")
                        _check_columns_to_keep(sdf=method_result, method=attr, values=value_cols, raises=False)

                        value_indexes = [value_cols.index(i) for i in value_cols if i in method_result.columns]
                        new_value_cols = [value_cols[i] for i in value_indexes]
                        new_serie_type = [serie_type[i] for i in value_indexes]
                        new_udm = [udm[i] for i in value_indexes]

                        return TimeSeriesDataFrame(
                            sdf=method_result,
                            granularity=self.granularity,  # type: ignore
                            date_cols=self.get_columns_by_name("date"),
                            key_cols=self.get_columns_by_name("key"),
                            load=self.load,
                            serie_type=new_serie_type,  # type: ignore
                            udm=new_udm,
                            value_cols=new_value_cols,
                        )
                    else:
                        return method_result

                return method_wrapper
            else:
                return sdf_attr
        else:
            raise AttributeError(f"TimeSeriesDataFrame object has no attribute '{attr}'.")

    @staticmethod
    def order_by_granularity(tsdf_list: List[TimeSeriesDataFrame], reverse: bool = False) -> List[TimeSeriesDataFrame]:
        """Sorts a list of TimeSeriesDataFrame objects by their granularity attribute in ascending or descending order.

        Args:
            tsdf_list: list of TimeSeriesDataFrame.
            reverse: ordering in ascending (False) or descending order (True). Defaults to False.

        Returns:
            List[TimeSeriesDataFrame]: the ordered original list.
        """
        return sorted(tsdf_list, key=lambda x: TimeSeriesDataFrame.granularity_dic.get(x.granularity), reverse=reverse)  # type: ignore

    def automatic_join(
        self, sdf_y: Union[PanamaDataFrame, TimeSeriesDataFrame], how: str = "inner"
    ) -> TimeSeriesDataFrame:
        """Joins two TimeSeriesDataFrame objects based on their common keys and temporal information.

        Args:
            sdf_y (Union[PanamaDataFrame, TimeSeriesDataFrame]): a TimeSeriesDataFrame or PanamaDataFrame.
            how (str, optional): the join type (see DataFrame join method). Defaults to "inner".

        Returns:
            TimeSeriesDataFrame: the resulting TimeSeriesDataFrame.
        """

        # Find minimum granularity and add auxiliary column for temporal join
        if isinstance(sdf_y, TimeSeriesDataFrame):
            lower_g, higher_g = TimeSeriesDataFrame.order_by_granularity([self, sdf_y])
            max_granularity = higher_g.granularity
            is_interval_join = (self.granularity == "interval") | (sdf_y.granularity == "interval")
            if is_interval_join is False:
                higher_g.add_col_date(granularity=lower_g.granularity, output_col=lower_g.get_columns_by_name("date"))
            final_load = self.load if self.load is not None else sdf_y.load

        else:
            max_granularity = self.granularity
            final_load = self.load

        # Join using Panama DataFrame method
        join_psdf = super().automatic_join(sdf_y, how)
        join_ts = TimeSeriesDataFrame(
            sdf=join_psdf.sdf,
            granularity=max_granularity,  # type: ignore
            date_cols=join_psdf.get_columns_by_name("date"),
            value_cols=self.get_columns_by_name("value"),
            key_cols=join_psdf.get_columns_by_name("key"),
            load=final_load,
            serie_type=self.get_columns_by_name("serie_type"),  # type: ignore
            udm=self.get_columns_by_name("udm"),
        )

        if isinstance(sdf_y, TimeSeriesDataFrame):
            join_ts = join_ts.add_time_series(
                col_to_add=sdf_y.get_columns_by_name("value"),
                serie_type=sdf_y.get_columns_by_name("serie_type"),
                udm=sdf_y.get_columns_by_name("udm"),
            )

        return join_ts

    def display(self):
        """Prints the granularity and load type of the time series and displays the DataFrame"""
        print(f"Time series with granularity {self.granularity} and load {self.load}")
        self.sdf.display()  # type: ignore

    def select_value_cols_udm_serie_type(self, serie_name: Union[str, List[str]]) -> tuple:
        """Returns position of selected serie_name in self.value_cols and the corresponding serie_type and udm.

        Args:
            serie_name (Union[str, List[str]]): the selected value_cols.

        Returns:
            tuple: tuple with position, serie_type and udm.
        """
        serie_name = list_value_append(serie_name)
        serie_value_cols = self.get_columns_by_name("value")
        serie_type_tot = self.get_columns_by_name("serie_type")
        udm_tot = self.get_columns_by_name("udm")
        pos = [serie_value_cols.index(s) for s in serie_name]
        serie_type = [serie_type_tot[p] for p in pos]
        udm = [udm_tot[p] for p in pos]

        return pos, serie_type, udm

    def add_time_series(
        self,
        col_to_add: Union[str, List[str]],
        serie_type: Union[str, List[str]],
        udm: Union[str, List[str]],
        col: Union[F.Column, List[F.Column], None] = None,
    ) -> TimeSeriesDataFrame:
        """Adds time_series to TimeSeriesDataFrame.

        Args:
            col_to_add (Union[str, List[str]]): names of the column to add.
            col (Union[F.Column, List[F.Column]]): column expression to get new desired values.
            serie_type (Union[str, List[str]]): serie_type of new columns.
            udm (Union[str, List[str]]): unit of measure of new columns.

        Returns:
            TimeSeriesDataFrame: The final TimeSeriesDataFrame with new added columns.
        """
        # Get new attributes
        tmp_value_cols = list_value_append(self.get_columns_by_name("value"), col_to_add)
        tmp_serie_type = list_value_append(self.get_columns_by_name("serie_type"), serie_type, drop_null=False)
        tmp_udm = list_value_append(self.get_columns_by_name("udm"), udm, drop_null=False)
        col_to_add = list_value_append(col_to_add)
        num_col_to_add = len(col_to_add)

        # Add columns
        tmp_sdf = self.sdf
        if col is not None:
            col = list_value_append(col)
            for i in range(num_col_to_add):
                tmp_sdf = tmp_sdf.withColumn(col_to_add[i], col[i])

        return TimeSeriesDataFrame(
            sdf=tmp_sdf,
            granularity=self.granularity,  # type: ignore
            date_cols=self.get_columns_by_name("date"),
            value_cols=tmp_value_cols,
            key_cols=self.get_columns_by_name("key"),
            load=self.load,
            serie_type=tmp_serie_type,
            udm=tmp_udm,
        )

    def drop(self, *args) -> TimeSeriesDataFrame:
        """Drops given columns from the sdf of a TimeSeriesDataFrame object together with corresponding class attributes.

        Args:
            col_to_drop (Union[str, List[str]]): list o string with the name of the column to drop.

        Returns:
            TimeSeriesDataFrame: the dropped TimeSeriesDataFrame.
        """

        col_to_drop = list(args)
        value_cols = serie_col = self.get_columns_by_name("value")
        serie_type = serie_type_col = self.get_columns_by_name("serie_type")
        udm = udm_list = self.get_columns_by_name("udm")
        tmp_sdf = self.sdf
        temporal_cols = self.get_columns_by_name("temporal")

        key_cols = list_value_append(self.get_columns_by_name("key"))

        for drop_col in col_to_drop:
            # drop attributes in TimeSeriesDataFrame if possible
            if (drop_col in temporal_cols) or (drop_col in key_cols):
                raise ValueError("Cannot drop temporal cols (date or load) or key_cols in TimeSeriesDataFrame")
            elif drop_col in serie_col:
                if len(serie_col) == 1:
                    raise ValueError("Cannot drop value_cols in TimeSeriesDataFrame when it is only one")
                else:
                    pos_attribute_values = serie_col.index(drop_col)
                    serie_col.pop(pos_attribute_values)
                    value_cols = serie_col
                    serie_type_col.pop(pos_attribute_values)
                    serie_type = serie_type_col
                    udm_list.pop(pos_attribute_values)
                    udm = udm_list
            # drop column in sdf
            tmp_sdf = tmp_sdf.drop(drop_col)

        return TimeSeriesDataFrame(
            sdf=tmp_sdf,
            granularity=self.granularity,  # type: ignore
            date_cols=self.get_columns_by_name("date"),
            value_cols=from_list_to_str(value_cols),
            key_cols=self.get_columns_by_name("key"),
            load=self.load,
            serie_type=from_list_to_str(serie_type),  # type: ignore
            udm=from_list_to_str(udm),
        )

    def get_aggr_fun(self) -> Union[str, List[Any]]:
        """Get the aggregation function corresponding to the serie_type of a TimeSeriesDataFrame.

        Returns:
            Union[str, List[str]]: the list with the aggregation function.
        """
        keys_agg = [TimeSeriesDataFrame.agg_type_dic.get(k, None) for k in self.get_columns_by_name("serie_type")]

        return keys_agg

    def get_serie_type_dictionary(self) -> dict:
        """Get the dictionary that map each value_cols to the corresponding aggregation function.

        Returns:
            dict: the corresponding dictionary
        """
        # Define key and values of the dictionary
        keys_agg = self.get_aggr_fun()
        values_agg = self.get_columns_by_name("value")

        # Create the dictionary
        key_value_pairs = zip(values_agg, keys_agg)
        dictionary = dict(key_value_pairs)

        return dictionary

    def aggregate(
        self,
        granularity: Literal["day", "hour", "month", "quarter", "year"],
        load: Union[_LOAD_TYPING, bool] = True,
        partition_cols: Union[List[str], None] = None,
        value_cols: Union[List[str], None] = None,
        aggr_functions: Union[List[str], None] = None,
        optional_aggr: Union[Dict[str, str], None] = None,
    ) -> TimeSeriesDataFrame:
        """Method for aggregating based on granularity and partition columns.

        NOTE: 'interval' granularity is not yet supported.

        Args:
            granularity (Literal[&quot;day&quot;, &quot;hour&quot;, &quot;month&quot;, &quot;quarter&quot;, &quot;year&quot;]): output granularity.
            load (Union[ List[Literal[&quot;F&quot;, &quot;PO&quot;, &quot;PO2&quot;, &quot;f&quot;, &quot;po&quot;, &quot;po2&quot;]],
                Literal[&quot;F&quot;, &quot;PO&quot;, &quot;PO2&quot;, &quot;f&quot;, &quot;po&quot;, &quot;po2&quot;], bool ], optional): load types to use.
                    If False no load is used for aggregation. If True all load columns are used. If False no load is used. Defaults to True.
            partition_cols (Union[List[str], None], optional): list of other columns to use as partitions. If None, key_cols are used. Defaults to None.
            value_cols (Union[List[str], None], optional): columns to aggregate. If None, all the value columns are aggregated. Defaults to None.
            aggr_functions (Union[List[str], None], optional): functions used to aggregate the data. A single function per value column is allowed.
                If None, default functions are used. Defaults to None.
            optional_aggr (Union[Dict[str, str], None], optional): optional columns to aggregate. A dict {column<str>: aggr_function<str>} is expected.
                If None, no other columns are aggregated. Defaults to None.

        Raises:
            ValueError: if a non supported granularity is passed.

        Returns:
            TimeSeriesDataFrame: aggregated time series dataframe.
        """

        supported_granularities = ("day", "hour", "month", "quarter", "year")

        # get self special columns lists
        current_key_cols = self.get_columns_by_name("key")
        current_value_cols = self.get_columns_by_name("value")
        current_serie_types = self.get_columns_by_name("serie_type")
        current_udm = self.get_columns_by_name("udm")
        current_load_col = self.get_columns_by_name("load")
        value_type_mapping = {k: v for k, v in zip(current_value_cols, current_serie_types)}
        value_udm_mapping = {k: v for k, v in zip(current_value_cols, current_udm)}

        # check granularity
        if granularity not in supported_granularities:
            raise ValueError(
                f"granularity {granularity} not yet supported. Supported granularities are {supported_granularities}."
            )

        # if no partition is passed, key_columns are used as partition columns
        if partition_cols is None:
            partition_cols = current_key_cols

        # create granularity column: this will be the output date_col
        self.add_col_date(granularity=granularity)
        partition_cols.append(granularity)

        # check if load is required
        if load is None or load is False:
            output_load = None
        elif load is True:
            output_load = self.load
            partition_cols.extend(current_load_col)
        elif isinstance(load, str):
            output_load = load
            partition_cols.append(f"load_{load.lower()}")
        elif isinstance(load, list):
            output_load = load
            load_cols = [f"load_{i.lower()}" for i in load]
            partition_cols.extend(load_cols)

        # get the value_cols
        if value_cols is None:
            value_cols = current_value_cols

        # get the aggregation functions
        value_type_dict = self.get_serie_type_dictionary()
        if aggr_functions is None:
            aggr_functions = []
            # if no aggregation is provided, use the default
            for v in value_cols:
                aggr_functions.append(value_type_dict[v])

        # if a single aggregation function is passed, use that function for all value cols
        if len(aggr_functions) == 1 and len(value_cols) > 1:
            aggr_functions = aggr_functions * len(value_cols)

        # generate aggregation dict
        aggr_dict = {col: f for col, f in list(zip(value_cols, aggr_functions))}

        # if extra aggregations are provided
        if optional_aggr is None:
            optional_aggr = dict()
        # add those aggregations to the others
        aggr_dict = {**aggr_dict, **optional_aggr}

        # if by a weird situation some columns are doubled in partition cols, deduplicate
        partition_cols = list(set(partition_cols))

        output_sdf = self.sdf.groupBy(*partition_cols).agg(aggr_dict)
        # generate output columns
        output_sdf = format_aggregated_columns(output_sdf)
        # get the output value_cols
        output_value_cols = [i for i in aggr_dict.keys() if i in current_value_cols]
        # create the key cols from the partition cols, without the load
        output_key_cols = [i for i in partition_cols if i not in (granularity, current_load_col)]
        # create the serie type from the value_cols
        output_serie_type = [value_type_mapping[i] for i in output_value_cols]
        # create the output udm
        output_udm = [value_udm_mapping[i] for i in output_value_cols]

        return TimeSeriesDataFrame(
            sdf=output_sdf,  # type: ignore
            granularity=granularity,
            date_cols=granularity,
            value_cols=output_value_cols,
            key_cols=output_key_cols,
            load=output_load,
            serie_type=output_serie_type,  # type: ignore
            udm=output_udm,
        )

    def expand_flat(
        self,
        calendar: DataFrame,
        granularity: str,
        serie_name: Union[List, List[str], None] = None,
        load_calendar_col: Union[List[str], str, None] = None,
    ) -> TimeSeriesDataFrame:
        """Expands a given profile based on a calendar column.

        Args:
            calendar (DataFrame): the calendar DataFrame.
            granularity (str): a valid granularity desired.
            serie_name (Union[List, List[str]], optional): the value_col to be expanded.
                If None the method will be applied on all value_cols of the TimeSeriesDataFrame. Default is None.
            load_calendar_col (str, optional): the name of load column in calendar DataFrame to be used in case of self without load. Default is None.

        Returns:
            TimeSeriesDataFrame: the resulting TimeSeriesDataFrame.
        """

        # Identify the load col
        load_col = self.get_columns_by_name("load")
        if load_calendar_col is None:
            load_calendar_col = self.get_columns_by_name("load")
        else:
            load_calendar_col = list_value_append(load_calendar_col)

        # generate output load columns
        if len(load_calendar_col) > 0:
            output_load_col = [i.split("_")[1].upper() for i in load_calendar_col]
        else:
            output_load_col = []

        # Adjust calendar DataFrame and create flat profile
        calendar = add_col_date(sdf=calendar, granularity=granularity, date_col="date", output_col=granularity)
        calendar = calendar.select(*load_calendar_col, granularity)
        if granularity != "hour":
            calendar = calendar.distinct()
        calendar = calendar.withColumn("one_col", F.lit(1))
        calendar_ts = TimeSeriesDataFrame(
            sdf=calendar,
            granularity=granularity,  # type: ignore
            date_cols=granularity,
            value_cols="one_col",
            load=output_load_col,  # type: ignore
        )

        return self.expand_with_profile(calendar_ts, serie_name=serie_name)

    def expand_with_profile(
        self,
        profile: TimeSeriesDataFrame,
        standardize: bool = True,
        serie_name: Union[List, List[str], None] = None,
        profile_name: Union[List, List[str], None] = None,
    ) -> TimeSeriesDataFrame:
        """Reprofiles the value_cols of a TimeSeriesDataFrame given a specific profile.

        Args:
            profile (TimeSeriesDataFrame): the TimeSeriesDataFrame with the profile to be used.
            standardize (bool): flag to choose if standardize value cols. Default is True.
            serie_name (Union[List, List[str]], optional): the self.value_cols to be profiled. If None the method will be applied to all value_cols of TimeSeriesDataFrame.
            profile_name (Union[List, List[str]], optional): the profile.value_cols to use for profiling. If None the method will be applied to all value_cols of TimeSeriesDataFrame.

        Returns:
            TimeSeriesDataFrame: the resulting TimeSeriesDataFrame.
        """
        if serie_name is None:
            serie_name = self.get_columns_by_name("value")
        else:
            serie_name = list_value_append(serie_name)
        if profile_name is None:
            profile_value_cols = profile.get_columns_by_name("value")
        else:
            profile_value_cols = list_value_append(profile_name)

        # Identify load column
        load_col = self.get_columns_by_name("load")
        # Join
        output = self.automatic_join(profile)

        # Standardize the given profile for the key (load, id, date) and reprofile
        key_prof = list_value_append(self.get_columns_by_name("key"), load_col, self.get_columns_by_name("date"))
        w_key_prof = Window.partitionBy(key_prof)
        keys_agg = self.get_aggr_fun()

        # Check on lengths
        len_serie_to_profile = len(serie_name)
        if len(profile_value_cols) == 1 and len_serie_to_profile > 1:
            print("The same profile is used to reprofile all time series.")
            profile_value_cols = profile_value_cols * len_serie_to_profile

        for i in range(len(serie_name)):
            if standardize:
                output = output.withColumn(
                    profile_value_cols[i],
                    F.col(profile_value_cols[i])
                    / getattr(F, keys_agg[i])(F.col(profile_value_cols[i])).over(w_key_prof),
                )
            output = output.withColumn(serie_name[i], F.col(serie_name[i]) * F.col(profile_value_cols[i]))

        output_std = output.drop(*profile.get_columns_by_name("value"))

        return output_std

    def total_amount(
        self,
        unitary_amount: TimeSeriesDataFrame,
        col_to_add: Union[str, List[str]],
        volume_name: Union[str, List[str], None] = None,
        price_name: Union[str, List[str], None] = None,
    ) -> TimeSeriesDataFrame:
        """Computes a total amount given self TimeSeriesDataFrame of volumes and unitary_amount TimeSeriesDataFrame of unitary prices.

        Args:
            unitary_amount (TimeSeriesDataFrame): TimeSeriesDataFrame with unitary amount.
            col_to_add (Union[str, List[str]]): names of new columns.
            volume_name (Union[str, List[str]], optional): names of volume columns to use. Defaults to None.
            price_name (Union[str, List[str]], optional): names of unitary price columns to use. Defaults to None.

        Returns:
            TimeSeriesDataFrame: the resulting TimeSeriesDataFrame.
        """
        volume_udm = self.get_columns_by_name("udm")
        serie_type = list_value_append("amount")
        unitary_price_udm = unitary_amount.get_columns_by_name("udm")

        # Get lists of input
        col_to_add = list_value_append(col_to_add)
        if volume_name is None:
            volume_cols = self.get_columns_by_name("value")
        else:
            volume_cols = list_value_append(volume_name)
            select_attr = self.select_value_cols_udm_serie_type(volume_name)
            volume_udm = select_attr[2]
        if price_name is None:
            price_cols = unitary_amount.get_columns_by_name("value")
        else:
            price_cols = list_value_append(price_name)
            select_attr = unitary_amount.select_value_cols_udm_serie_type(price_name)
            unitary_price_udm = select_attr[2]

        # Check on input
        len_add_col = len(col_to_add)
        serie_type = serie_type * len_add_col
        if len_add_col > 1:
            if len(volume_cols) == 1:
                print("The same quantity is used in the total amount computation.")
                volume_cols = volume_cols * len_add_col
                volume_udm = volume_udm * len_add_col
            if len(price_cols) == 1:
                print("The same unitary price is used in the total amount computation.")
                price_cols = price_cols * len_add_col
                unitary_price_udm = unitary_price_udm * len_add_col
        col = [F.col(volume) * F.col(price) for volume, price in zip(volume_cols, price_cols)]
        final_udm = [udm.split("/")[0] for udm in unitary_price_udm]

        # Check on input lengths and on udm
        if len_add_col != len(col):
            raise ValueError(f"Input parameters {col_to_add} and {col} are not of equal length.")
        if len_add_col != len(serie_type):
            raise ValueError(f"Input parameters {col_to_add} and {serie_type} are not of equal length.")
        if len_add_col != len(final_udm):
            raise ValueError(f"Input parameters {col_to_add} and {final_udm} are not of equal length.")

        price_volume_udm = [udm.split("/")[-1] for udm in unitary_price_udm]
        if price_volume_udm != volume_udm:
            raise ValueError("Different unit of measures, a conversion is needed.")

        output = self.automatic_join(unitary_amount)
        output = output.add_time_series(col_to_add=col_to_add, serie_type=serie_type, udm=final_udm, col=col)
        return output

    def conv_intermediate_pcs(
        self,
        udm_from: Union[str, List[str]],
        udm_to: Union[str, List[str]],
        pcs: PanamaDataFrame,
        value_col_conv: Union[str, List[str]],
        conv_fact: DataFrame,
        drop_pcs: bool = True,
    ) -> TimeSeriesDataFrame:
        """Calculates the intermediate pcs conversion from Smc to GJ or viceversa.

        Args:
            udm_from (Union[str, List[str]]): the starting unit of measures.
            udm_to (Union[str, List[str]]): the final unit of measures.
            pcs (PanamaDataFrame): PanamaDataFrame with pcs column.
            value_col_conv (Union[str, List[str]]): name of the values to convert.
            conv_fact (DataFrame): the conversion factor DataFrame with udm_from, udm_to and factor columns.
            drop_pcs (bool, optional): indicates if the pcs time series needs to be dropped. Defaults to True.

        Returns:
            TimeSeriesDataFrame: TimeSeriesDataFrame with pcs conversion factor for each value_col_conv.
        """
        output = self.automatic_join(sdf_y=pcs, how="left")

        # Convert null pcs into default
        default_pcs = get_single_conv_factor("Smc", "GJ", conv_fact)[0]
        output = output.withColumn("pcs", F.coalesce(F.col("pcs"), F.lit(default_pcs)))

        # Calculate pcs intermediate conversion factor
        for i in range(len(udm_to)):
            if "Smc" in udm_from[i]:
                output = output.withColumn(value_col_conv[i] + "_pcs_fact", 1 * F.col("pcs"))
                udm_from[i] = udm_from[i].replace("Smc", "GJ")  # type: ignore
            elif "Smc" in udm_to[i]:
                output = output.withColumn(value_col_conv[i] + "_pcs_fact", 1 / F.col("pcs"))
                udm_to[i] = udm_to[i].replace("Smc", "GJ")  # type: ignore
            else:
                output = output.withColumn(value_col_conv[i] + "_pcs_fact", F.lit(1))
            if "/" in udm_from[i] and "/" in udm_to[i]:
                output = output.withColumn(value_col_conv[i] + "_pcs_fact", 1 / F.col(value_col_conv[i] + "_pcs_fact"))

        # Drop pcs time series
        if drop_pcs:
            output = output.drop("pcs")

        return output

    def conv_udm(
        self,
        final_udm: Union[str, List[str]],
        conv_fact: DataFrame,
        serie_name: Union[str, List[str], None] = None,
        pcs: Union[PanamaDataFrame, None] = None,
        drop_pcs: bool = True,
    ) -> TimeSeriesDataFrame:
        """Converts unit of measure of a TimeSeriesDataFrame.

        Args:
            final_udm (Union[str, List[str]]): the desired unit of measure.
            conv_fact (DataFrame): the conversion factor DataFrame with udm_from, udm_to and factor columns.
            serie_name (Union[str, List[str]], optional): the self.value_cols to be converted. If None the method will
                be applied to all value_cols of TimeSeriesDataFrame. Defaults to None.
            pcs (PanamaDataFrame, optional): PanamaDataFrame with pcs factors. Defaults to None.
            drop_pcs (bool, optional): indicates if the pcs time series needs to be dropped in the conv_intermediate_pcs step. Defaults to True.

        Returns:
            TimeSeriesDataFrame: the converted TimeSeriesDataFrame.
        """
        # Get lists
        value_cols_list = self.get_columns_by_name("value")
        final_udm = list_value_append(final_udm)
        original_udm = self.get_columns_by_name("udm")
        serie_name_conv = value_cols_list if serie_name is None else list_value_append(serie_name)

        # Check on input
        len_serie_name, len_final_udm = len(serie_name_conv), len(final_udm)
        if len_serie_name != len_final_udm:
            if len(final_udm) == 1:
                print("All the time series will be converted to the same final unit of measure.")
                final_udm = final_udm * len(serie_name_conv)
            else:
                raise ValueError(
                    f"The given final_udm {final_udm} has not same length of the series to convert {serie_name_conv}."
                )

        # Select serie_udm used for conversion and udm_to_return returned with the final TimeSeriesDataFrame
        if serie_name is None:
            serie_udm = original_udm
            udm_to_return = from_list_to_str(final_udm.copy())
        else:
            select_attr = self.select_value_cols_udm_serie_type(serie_name_conv)
            pos_udm = select_attr[0]
            serie_udm = select_attr[2]
            udm_to_return = self.get_columns_by_name("udm").copy()  # type: ignore
            for pos in pos_udm:
                udm_to_return[pos] = from_list_to_str(final_udm[pos_udm.index(pos)])

        # PCS intermediate conversion
        if pcs is not None:
            output = self.conv_intermediate_pcs(serie_udm, final_udm, pcs, serie_name_conv, conv_fact, drop_pcs)
            output_sdf = output.sdf
        else:
            output = self
            output_sdf = self.sdf

        # Traditional final conversion
        conv_factors = get_composite_conv_factor(serie_udm, final_udm, conv_fact)
        for i in range(len_serie_name):
            pcs_col_name = serie_name_conv[i] + "_pcs_fact"
            if pcs_col_name in output_sdf.columns:
                output_sdf = output_sdf.withColumn(
                    serie_name_conv[i], F.col(serie_name_conv[i]) * conv_factors[i] * F.col(pcs_col_name)
                ).drop(pcs_col_name)
            else:
                output_sdf = output_sdf.withColumn(serie_name_conv[i], F.col(serie_name_conv[i]) * conv_factors[i])

        return TimeSeriesDataFrame(
            sdf=output_sdf,
            granularity=output.granularity,  # type: ignore
            date_cols=output.get_columns_by_name("date"),
            value_cols=output.get_columns_by_name("value"),
            key_cols=output.get_columns_by_name("key"),
            load=output.load,  # type: ignore
            serie_type=output.get_columns_by_name("serie_type"),  # type: ignore
            udm=udm_to_return,
        )

Ancestors

Class variables

var agg_type_dic
var granularity_dic
var load_dic

Static methods

def order_by_granularity(tsdf_list: List[TimeSeriesDataFrame], reverse: bool = False) ‑> List[TimeSeriesDataFrame]

Sorts a list of TimeSeriesDataFrame objects by their granularity attribute in ascending or descending order.

Args

tsdf_list
list of TimeSeriesDataFrame.
reverse
ordering in ascending (False) or descending order (True). Defaults to False.

Returns

List[TimeSeriesDataFrame]
the ordered original list.

Methods

def add_time_series(self, col_to_add: Union[str, List[str]], serie_type: Union[str, List[str]], udm: Union[str, List[str]], col: Union[F.Column, List[F.Column], None] = None) ‑> TimeSeriesDataFrame

Adds time_series to TimeSeriesDataFrame.

Args

col_to_add : Union[str, List[str]]
names of the column to add.
col : Union[F.Column, List[F.Column]]
column expression to get new desired values.
serie_type : Union[str, List[str]]
serie_type of new columns.
udm : Union[str, List[str]]
unit of measure of new columns.

Returns

TimeSeriesDataFrame
The final TimeSeriesDataFrame with new added columns.
def aggregate(self, granularity: "Literal['day', 'hour', 'month', 'quarter', 'year']", load: Union[_LOAD_TYPING, bool] = True, partition_cols: Union[List[str], None] = None, value_cols: Union[List[str], None] = None, aggr_functions: Union[List[str], None] = None, optional_aggr: Union[Dict[str, str], None] = None)

Method for aggregating based on granularity and partition columns.

NOTE: 'interval' granularity is not yet supported.

Args

granularity (Literal["day", "hour", "month", "quarter", "year"]): output granularity.
load (Union[ List[Literal["F", "PO", "PO2", "f", "po", "po2"]],
Literal["F", "PO", "PO2", "f", "po", "po2"], bool ], optional): load types to use.
If False no load is used for aggregation. If True all load columns are used. If False no load is used. Defaults to True.
partition_cols : Union[List[str], None], optional
list of other columns to use as partitions. If None, key_cols are used. Defaults to None.
value_cols : Union[List[str], None], optional
columns to aggregate. If None, all the value columns are aggregated. Defaults to None.
aggr_functions : Union[List[str], None], optional
functions used to aggregate the data. A single function per value column is allowed. If None, default functions are used. Defaults to None.
optional_aggr : Union[Dict[str, str], None], optional
optional columns to aggregate. A dict {column: aggr_function} is expected. If None, no other columns are aggregated. Defaults to None.

Raises

ValueError
if a non supported granularity is passed.

Returns

TimeSeriesDataFrame
aggregated time series dataframe.
def automatic_join(self, sdf_y: Union[PanamaDataFrame, TimeSeriesDataFrame], how: str = 'inner') ‑> TimeSeriesDataFrame

Joins two TimeSeriesDataFrame objects based on their common keys and temporal information.

Args

sdf_y : Union[PanamaDataFrame, TimeSeriesDataFrame]
a TimeSeriesDataFrame or PanamaDataFrame.
how : str, optional
the join type (see DataFrame join method). Defaults to "inner".

Returns

TimeSeriesDataFrame
the resulting TimeSeriesDataFrame.
def conv_intermediate_pcs(self, udm_from: Union[str, List[str]], udm_to: Union[str, List[str]], pcs: PanamaDataFrame, value_col_conv: Union[str, List[str]], conv_fact: DataFrame, drop_pcs: bool = True) ‑> TimeSeriesDataFrame

Calculates the intermediate pcs conversion from Smc to GJ or viceversa.

Args

udm_from : Union[str, List[str]]
the starting unit of measures.
udm_to : Union[str, List[str]]
the final unit of measures.
pcs : PanamaDataFrame
PanamaDataFrame with pcs column.
value_col_conv : Union[str, List[str]]
name of the values to convert.
conv_fact : DataFrame
the conversion factor DataFrame with udm_from, udm_to and factor columns.
drop_pcs : bool, optional
indicates if the pcs time series needs to be dropped. Defaults to True.

Returns

TimeSeriesDataFrame
TimeSeriesDataFrame with pcs conversion factor for each value_col_conv.
def conv_udm(self, final_udm: Union[str, List[str]], conv_fact: DataFrame, serie_name: Union[str, List[str], None] = None, pcs: Union[PanamaDataFrame, None] = None, drop_pcs: bool = True) ‑> TimeSeriesDataFrame

Converts unit of measure of a TimeSeriesDataFrame.

Args

final_udm : Union[str, List[str]]
the desired unit of measure.
conv_fact : DataFrame
the conversion factor DataFrame with udm_from, udm_to and factor columns.
serie_name : Union[str, List[str]], optional
the self.value_cols to be converted. If None the method will be applied to all value_cols of TimeSeriesDataFrame. Defaults to None.
pcs : PanamaDataFrame, optional
PanamaDataFrame with pcs factors. Defaults to None.
drop_pcs : bool, optional
indicates if the pcs time series needs to be dropped in the conv_intermediate_pcs step. Defaults to True.

Returns

TimeSeriesDataFrame
the converted TimeSeriesDataFrame.
def display(self)

Prints the granularity and load type of the time series and displays the DataFrame

def drop(self, *args) ‑> TimeSeriesDataFrame

Drops given columns from the sdf of a TimeSeriesDataFrame object together with corresponding class attributes.

Args

col_to_drop : Union[str, List[str]]
list o string with the name of the column to drop.

Returns

TimeSeriesDataFrame
the dropped TimeSeriesDataFrame.
def expand_flat(self, calendar: DataFrame, granularity: str, serie_name: Union[List, List[str], None] = None, load_calendar_col: Union[List[str], str, None] = None) ‑> TimeSeriesDataFrame

Expands a given profile based on a calendar column.

Args

calendar : DataFrame
the calendar DataFrame.
granularity : str
a valid granularity desired.
serie_name : Union[List, List[str]], optional
the value_col to be expanded. If None the method will be applied on all value_cols of the TimeSeriesDataFrame. Default is None.
load_calendar_col : str, optional
the name of load column in calendar DataFrame to be used in case of self without load. Default is None.

Returns

TimeSeriesDataFrame
the resulting TimeSeriesDataFrame.
def expand_with_profile(self, profile: TimeSeriesDataFrame, standardize: bool = True, serie_name: Union[List, List[str], None] = None, profile_name: Union[List, List[str], None] = None) ‑> TimeSeriesDataFrame

Reprofiles the value_cols of a TimeSeriesDataFrame given a specific profile.

Args

profile : TimeSeriesDataFrame
the TimeSeriesDataFrame with the profile to be used.
standardize : bool
flag to choose if standardize value cols. Default is True.
serie_name : Union[List, List[str]], optional
the self.value_cols to be profiled. If None the method will be applied to all value_cols of TimeSeriesDataFrame.
profile_name : Union[List, List[str]], optional
the profile.value_cols to use for profiling. If None the method will be applied to all value_cols of TimeSeriesDataFrame.

Returns

TimeSeriesDataFrame
the resulting TimeSeriesDataFrame.
def get_aggr_fun(self) ‑> Union[str, List[Any]]

Get the aggregation function corresponding to the serie_type of a TimeSeriesDataFrame.

Returns

Union[str, List[str]]
the list with the aggregation function.
def get_serie_type_dictionary(self) ‑> dict

Get the dictionary that map each value_cols to the corresponding aggregation function.

Returns

dict
the corresponding dictionary
def select_value_cols_udm_serie_type(self, serie_name: Union[str, List[str]]) ‑> tuple

Returns position of selected serie_name in self.value_cols and the corresponding serie_type and udm.

Args

serie_name : Union[str, List[str]]
the selected value_cols.

Returns

tuple
tuple with position, serie_type and udm.
def total_amount(self, unitary_amount: TimeSeriesDataFrame, col_to_add: Union[str, List[str]], volume_name: Union[str, List[str], None] = None, price_name: Union[str, List[str], None] = None) ‑> TimeSeriesDataFrame

Computes a total amount given self TimeSeriesDataFrame of volumes and unitary_amount TimeSeriesDataFrame of unitary prices.

Args

unitary_amount : TimeSeriesDataFrame
TimeSeriesDataFrame with unitary amount.
col_to_add : Union[str, List[str]]
names of new columns.
volume_name : Union[str, List[str]], optional
names of volume columns to use. Defaults to None.
price_name : Union[str, List[str]], optional
names of unitary price columns to use. Defaults to None.

Returns

TimeSeriesDataFrame
the resulting TimeSeriesDataFrame.

Inherited members