Module panama.analytics.time_series_data_frame
Classes
class TimeSeriesDataFrame (sdf: DataFrame, granularity: _GRANULARITY_TYPING, date_cols: Union[str, List[str]], value_cols: Union[str, List[str]], key_cols: Union[str, List[str], None] = None, load: _LOAD_TYPING = None, serie_type: _SERIE_TYPE_TYPING = None, udm: Union[str, List[str], None] = None)
-
Expand source code
class TimeSeriesDataFrame(PanamaDataFrame): granularity_dic = {"interval": 0, "year": 1, "quarter": 2, "month": 3, "day": 4, "hour": 5} agg_type_dic = {"volume": "sum", "price": "mean", "amount": "sum"} load_dic = {"F": ["F1", "F2", "F3"], "PO": ["PL", "OP"], "PO2": ["PL2", "OP2"]} def __init__( self, sdf: DataFrame, granularity: _GRANULARITY_TYPING, date_cols: Union[str, List[str]], value_cols: Union[str, List[str]], key_cols: Union[str, List[str], None] = None, load: _LOAD_TYPING = None, serie_type: _SERIE_TYPE_TYPING = None, udm: Union[str, List[str], None] = None, ): super().__init__(sdf=sdf, key_cols=key_cols, date_cols=date_cols) self.add_columns(key="value_cols", value=value_cols) _check_columns_in_df(sdf, value_cols=value_cols) self.add_columns(key="serie_type", value=list_value_append(serie_type, drop_null=False)) self.add_columns(key="udm", value=list_value_append(udm, drop_null=False)) _check_granularity(granularity) self.granularity = cast(_GRANULARITY_TYPING, granularity) _check_load(load) load = cast(_LOAD_TYPING, load) self.load = list_value_append(load) self._set_load_col() self._set_lists_len() def _set_lists_len(self) -> None: """Function used to set the list length for the serie_type and the udm_type""" value_cols = self.get_columns_by_name("value") serie_type = self.get_columns_by_name("serie_type") udm = self.get_columns_by_name("udm") check = check_list_len(reference=value_cols, to_check=serie_type) # we are left with the case of n_serie_type < n_value_cols and n_serie_type == 1 if check is True: if len(serie_type) == 1: self.add_columns(key="serie_type", value=serie_type * len(value_cols)) check = check_list_len(reference=value_cols, to_check=udm, to_check_label="udm") # we are left with the case of n_serie_type < n_value_cols and n_serie_type == 1 if check is True: if len(udm) == 1: self.add_columns(key="udm", value=udm * len(value_cols)) def _set_load_col(self) -> None: """Function used to set the load col. Wrapper of add_columns, checks for validity of the column. Raises: ValueError: load column {load_col} not in sdf.columns. Available values are {self.sdf.columns}. ValueError: No consistency between load expected values {allowed_values} and load column {load_col} content. """ load_cols = [] for load in self.load: load_col = f"load_{load.lower()}" load_cols.append(load_col) if load_col not in self.sdf.columns: raise ValueError(f"load column {load_col} not in sdf.columns. Available values are {self.sdf.columns}.") allowed_values = self.load_dic[load] filtered_df = self.sdf.filter(~F.col(load_col).isin(allowed_values)) if not filtered_df.isEmpty(): raise ValueError( f"No consistency between load expected values {allowed_values} and load column {load_col} content." ) self.add_columns(key="load_col", value=load_cols) self.update_columns(key="temporal_cols", value=load_cols) def __getattr__(self, attr: str): df_dict = DataFrame.__dict__ if attr in df_dict: sdf_attr = getattr(self.sdf, attr) if callable(sdf_attr): @functools.wraps(sdf_attr) def method_wrapper(*args, **kwargs): method_result = sdf_attr(*args, **kwargs) if isinstance(method_result, DataFrame): value_cols = self.get_columns_by_name("value") serie_type = self.get_columns_by_name("serie_type") udm = self.get_columns_by_name("udm") _check_columns_to_keep(sdf=method_result, method=attr, values=value_cols, raises=False) value_indexes = [value_cols.index(i) for i in value_cols if i in method_result.columns] new_value_cols = [value_cols[i] for i in value_indexes] new_serie_type = [serie_type[i] for i in value_indexes] new_udm = [udm[i] for i in value_indexes] return TimeSeriesDataFrame( sdf=method_result, granularity=self.granularity, # type: ignore date_cols=self.get_columns_by_name("date"), key_cols=self.get_columns_by_name("key"), load=self.load, serie_type=new_serie_type, # type: ignore udm=new_udm, value_cols=new_value_cols, ) else: return method_result return method_wrapper else: return sdf_attr else: raise AttributeError(f"TimeSeriesDataFrame object has no attribute '{attr}'.") @staticmethod def order_by_granularity(tsdf_list: List[TimeSeriesDataFrame], reverse: bool = False) -> List[TimeSeriesDataFrame]: """Sorts a list of TimeSeriesDataFrame objects by their granularity attribute in ascending or descending order. Args: tsdf_list: list of TimeSeriesDataFrame. reverse: ordering in ascending (False) or descending order (True). Defaults to False. Returns: List[TimeSeriesDataFrame]: the ordered original list. """ return sorted(tsdf_list, key=lambda x: TimeSeriesDataFrame.granularity_dic.get(x.granularity), reverse=reverse) # type: ignore def automatic_join( self, sdf_y: Union[PanamaDataFrame, TimeSeriesDataFrame], how: str = "inner" ) -> TimeSeriesDataFrame: """Joins two TimeSeriesDataFrame objects based on their common keys and temporal information. Args: sdf_y (Union[PanamaDataFrame, TimeSeriesDataFrame]): a TimeSeriesDataFrame or PanamaDataFrame. how (str, optional): the join type (see DataFrame join method). Defaults to "inner". Returns: TimeSeriesDataFrame: the resulting TimeSeriesDataFrame. """ # Find minimum granularity and add auxiliary column for temporal join if isinstance(sdf_y, TimeSeriesDataFrame): lower_g, higher_g = TimeSeriesDataFrame.order_by_granularity([self, sdf_y]) max_granularity = higher_g.granularity is_interval_join = (self.granularity == "interval") | (sdf_y.granularity == "interval") if is_interval_join is False: higher_g.add_col_date(granularity=lower_g.granularity, output_col=lower_g.get_columns_by_name("date")) final_load = self.load if self.load is not None else sdf_y.load else: max_granularity = self.granularity final_load = self.load # Join using Panama DataFrame method join_psdf = super().automatic_join(sdf_y, how) join_ts = TimeSeriesDataFrame( sdf=join_psdf.sdf, granularity=max_granularity, # type: ignore date_cols=join_psdf.get_columns_by_name("date"), value_cols=self.get_columns_by_name("value"), key_cols=join_psdf.get_columns_by_name("key"), load=final_load, serie_type=self.get_columns_by_name("serie_type"), # type: ignore udm=self.get_columns_by_name("udm"), ) if isinstance(sdf_y, TimeSeriesDataFrame): join_ts = join_ts.add_time_series( col_to_add=sdf_y.get_columns_by_name("value"), serie_type=sdf_y.get_columns_by_name("serie_type"), udm=sdf_y.get_columns_by_name("udm"), ) return join_ts def display(self): """Prints the granularity and load type of the time series and displays the DataFrame""" print(f"Time series with granularity {self.granularity} and load {self.load}") self.sdf.display() # type: ignore def select_value_cols_udm_serie_type(self, serie_name: Union[str, List[str]]) -> tuple: """Returns position of selected serie_name in self.value_cols and the corresponding serie_type and udm. Args: serie_name (Union[str, List[str]]): the selected value_cols. Returns: tuple: tuple with position, serie_type and udm. """ serie_name = list_value_append(serie_name) serie_value_cols = self.get_columns_by_name("value") serie_type_tot = self.get_columns_by_name("serie_type") udm_tot = self.get_columns_by_name("udm") pos = [serie_value_cols.index(s) for s in serie_name] serie_type = [serie_type_tot[p] for p in pos] udm = [udm_tot[p] for p in pos] return pos, serie_type, udm def add_time_series( self, col_to_add: Union[str, List[str]], serie_type: Union[str, List[str]], udm: Union[str, List[str]], col: Union[F.Column, List[F.Column], None] = None, ) -> TimeSeriesDataFrame: """Adds time_series to TimeSeriesDataFrame. Args: col_to_add (Union[str, List[str]]): names of the column to add. col (Union[F.Column, List[F.Column]]): column expression to get new desired values. serie_type (Union[str, List[str]]): serie_type of new columns. udm (Union[str, List[str]]): unit of measure of new columns. Returns: TimeSeriesDataFrame: The final TimeSeriesDataFrame with new added columns. """ # Get new attributes tmp_value_cols = list_value_append(self.get_columns_by_name("value"), col_to_add) tmp_serie_type = list_value_append(self.get_columns_by_name("serie_type"), serie_type, drop_null=False) tmp_udm = list_value_append(self.get_columns_by_name("udm"), udm, drop_null=False) col_to_add = list_value_append(col_to_add) num_col_to_add = len(col_to_add) # Add columns tmp_sdf = self.sdf if col is not None: col = list_value_append(col) for i in range(num_col_to_add): tmp_sdf = tmp_sdf.withColumn(col_to_add[i], col[i]) return TimeSeriesDataFrame( sdf=tmp_sdf, granularity=self.granularity, # type: ignore date_cols=self.get_columns_by_name("date"), value_cols=tmp_value_cols, key_cols=self.get_columns_by_name("key"), load=self.load, serie_type=tmp_serie_type, udm=tmp_udm, ) def drop(self, *args) -> TimeSeriesDataFrame: """Drops given columns from the sdf of a TimeSeriesDataFrame object together with corresponding class attributes. Args: col_to_drop (Union[str, List[str]]): list o string with the name of the column to drop. Returns: TimeSeriesDataFrame: the dropped TimeSeriesDataFrame. """ col_to_drop = list(args) value_cols = serie_col = self.get_columns_by_name("value") serie_type = serie_type_col = self.get_columns_by_name("serie_type") udm = udm_list = self.get_columns_by_name("udm") tmp_sdf = self.sdf temporal_cols = self.get_columns_by_name("temporal") key_cols = list_value_append(self.get_columns_by_name("key")) for drop_col in col_to_drop: # drop attributes in TimeSeriesDataFrame if possible if (drop_col in temporal_cols) or (drop_col in key_cols): raise ValueError("Cannot drop temporal cols (date or load) or key_cols in TimeSeriesDataFrame") elif drop_col in serie_col: if len(serie_col) == 1: raise ValueError("Cannot drop value_cols in TimeSeriesDataFrame when it is only one") else: pos_attribute_values = serie_col.index(drop_col) serie_col.pop(pos_attribute_values) value_cols = serie_col serie_type_col.pop(pos_attribute_values) serie_type = serie_type_col udm_list.pop(pos_attribute_values) udm = udm_list # drop column in sdf tmp_sdf = tmp_sdf.drop(drop_col) return TimeSeriesDataFrame( sdf=tmp_sdf, granularity=self.granularity, # type: ignore date_cols=self.get_columns_by_name("date"), value_cols=from_list_to_str(value_cols), key_cols=self.get_columns_by_name("key"), load=self.load, serie_type=from_list_to_str(serie_type), # type: ignore udm=from_list_to_str(udm), ) def get_aggr_fun(self) -> Union[str, List[Any]]: """Get the aggregation function corresponding to the serie_type of a TimeSeriesDataFrame. Returns: Union[str, List[str]]: the list with the aggregation function. """ keys_agg = [TimeSeriesDataFrame.agg_type_dic.get(k, None) for k in self.get_columns_by_name("serie_type")] return keys_agg def get_serie_type_dictionary(self) -> dict: """Get the dictionary that map each value_cols to the corresponding aggregation function. Returns: dict: the corresponding dictionary """ # Define key and values of the dictionary keys_agg = self.get_aggr_fun() values_agg = self.get_columns_by_name("value") # Create the dictionary key_value_pairs = zip(values_agg, keys_agg) dictionary = dict(key_value_pairs) return dictionary def aggregate( self, granularity: Literal["day", "hour", "month", "quarter", "year"], load: Union[_LOAD_TYPING, bool] = True, partition_cols: Union[List[str], None] = None, value_cols: Union[List[str], None] = None, aggr_functions: Union[List[str], None] = None, optional_aggr: Union[Dict[str, str], None] = None, ) -> TimeSeriesDataFrame: """Method for aggregating based on granularity and partition columns. NOTE: 'interval' granularity is not yet supported. Args: granularity (Literal["day", "hour", "month", "quarter", "year"]): output granularity. load (Union[ List[Literal["F", "PO", "PO2", "f", "po", "po2"]], Literal["F", "PO", "PO2", "f", "po", "po2"], bool ], optional): load types to use. If False no load is used for aggregation. If True all load columns are used. If False no load is used. Defaults to True. partition_cols (Union[List[str], None], optional): list of other columns to use as partitions. If None, key_cols are used. Defaults to None. value_cols (Union[List[str], None], optional): columns to aggregate. If None, all the value columns are aggregated. Defaults to None. aggr_functions (Union[List[str], None], optional): functions used to aggregate the data. A single function per value column is allowed. If None, default functions are used. Defaults to None. optional_aggr (Union[Dict[str, str], None], optional): optional columns to aggregate. A dict {column<str>: aggr_function<str>} is expected. If None, no other columns are aggregated. Defaults to None. Raises: ValueError: if a non supported granularity is passed. Returns: TimeSeriesDataFrame: aggregated time series dataframe. """ supported_granularities = ("day", "hour", "month", "quarter", "year") # get self special columns lists current_key_cols = self.get_columns_by_name("key") current_value_cols = self.get_columns_by_name("value") current_serie_types = self.get_columns_by_name("serie_type") current_udm = self.get_columns_by_name("udm") current_load_col = self.get_columns_by_name("load") value_type_mapping = {k: v for k, v in zip(current_value_cols, current_serie_types)} value_udm_mapping = {k: v for k, v in zip(current_value_cols, current_udm)} # check granularity if granularity not in supported_granularities: raise ValueError( f"granularity {granularity} not yet supported. Supported granularities are {supported_granularities}." ) # if no partition is passed, key_columns are used as partition columns if partition_cols is None: partition_cols = current_key_cols # create granularity column: this will be the output date_col self.add_col_date(granularity=granularity) partition_cols.append(granularity) # check if load is required if load is None or load is False: output_load = None elif load is True: output_load = self.load partition_cols.extend(current_load_col) elif isinstance(load, str): output_load = load partition_cols.append(f"load_{load.lower()}") elif isinstance(load, list): output_load = load load_cols = [f"load_{i.lower()}" for i in load] partition_cols.extend(load_cols) # get the value_cols if value_cols is None: value_cols = current_value_cols # get the aggregation functions value_type_dict = self.get_serie_type_dictionary() if aggr_functions is None: aggr_functions = [] # if no aggregation is provided, use the default for v in value_cols: aggr_functions.append(value_type_dict[v]) # if a single aggregation function is passed, use that function for all value cols if len(aggr_functions) == 1 and len(value_cols) > 1: aggr_functions = aggr_functions * len(value_cols) # generate aggregation dict aggr_dict = {col: f for col, f in list(zip(value_cols, aggr_functions))} # if extra aggregations are provided if optional_aggr is None: optional_aggr = dict() # add those aggregations to the others aggr_dict = {**aggr_dict, **optional_aggr} # if by a weird situation some columns are doubled in partition cols, deduplicate partition_cols = list(set(partition_cols)) output_sdf = self.sdf.groupBy(*partition_cols).agg(aggr_dict) # generate output columns output_sdf = format_aggregated_columns(output_sdf) # get the output value_cols output_value_cols = [i for i in aggr_dict.keys() if i in current_value_cols] # create the key cols from the partition cols, without the load output_key_cols = [i for i in partition_cols if i not in (granularity, current_load_col)] # create the serie type from the value_cols output_serie_type = [value_type_mapping[i] for i in output_value_cols] # create the output udm output_udm = [value_udm_mapping[i] for i in output_value_cols] return TimeSeriesDataFrame( sdf=output_sdf, # type: ignore granularity=granularity, date_cols=granularity, value_cols=output_value_cols, key_cols=output_key_cols, load=output_load, serie_type=output_serie_type, # type: ignore udm=output_udm, ) def expand_flat( self, calendar: DataFrame, granularity: str, serie_name: Union[List, List[str], None] = None, load_calendar_col: Union[List[str], str, None] = None, ) -> TimeSeriesDataFrame: """Expands a given profile based on a calendar column. Args: calendar (DataFrame): the calendar DataFrame. granularity (str): a valid granularity desired. serie_name (Union[List, List[str]], optional): the value_col to be expanded. If None the method will be applied on all value_cols of the TimeSeriesDataFrame. Default is None. load_calendar_col (str, optional): the name of load column in calendar DataFrame to be used in case of self without load. Default is None. Returns: TimeSeriesDataFrame: the resulting TimeSeriesDataFrame. """ # Identify the load col load_col = self.get_columns_by_name("load") if load_calendar_col is None: load_calendar_col = self.get_columns_by_name("load") else: load_calendar_col = list_value_append(load_calendar_col) # generate output load columns if len(load_calendar_col) > 0: output_load_col = [i.split("_")[1].upper() for i in load_calendar_col] else: output_load_col = [] # Adjust calendar DataFrame and create flat profile calendar = add_col_date(sdf=calendar, granularity=granularity, date_col="date", output_col=granularity) calendar = calendar.select(*load_calendar_col, granularity) if granularity != "hour": calendar = calendar.distinct() calendar = calendar.withColumn("one_col", F.lit(1)) calendar_ts = TimeSeriesDataFrame( sdf=calendar, granularity=granularity, # type: ignore date_cols=granularity, value_cols="one_col", load=output_load_col, # type: ignore ) return self.expand_with_profile(calendar_ts, serie_name=serie_name) def expand_with_profile( self, profile: TimeSeriesDataFrame, standardize: bool = True, serie_name: Union[List, List[str], None] = None, profile_name: Union[List, List[str], None] = None, ) -> TimeSeriesDataFrame: """Reprofiles the value_cols of a TimeSeriesDataFrame given a specific profile. Args: profile (TimeSeriesDataFrame): the TimeSeriesDataFrame with the profile to be used. standardize (bool): flag to choose if standardize value cols. Default is True. serie_name (Union[List, List[str]], optional): the self.value_cols to be profiled. If None the method will be applied to all value_cols of TimeSeriesDataFrame. profile_name (Union[List, List[str]], optional): the profile.value_cols to use for profiling. If None the method will be applied to all value_cols of TimeSeriesDataFrame. Returns: TimeSeriesDataFrame: the resulting TimeSeriesDataFrame. """ if serie_name is None: serie_name = self.get_columns_by_name("value") else: serie_name = list_value_append(serie_name) if profile_name is None: profile_value_cols = profile.get_columns_by_name("value") else: profile_value_cols = list_value_append(profile_name) # Identify load column load_col = self.get_columns_by_name("load") # Join output = self.automatic_join(profile) # Standardize the given profile for the key (load, id, date) and reprofile key_prof = list_value_append(self.get_columns_by_name("key"), load_col, self.get_columns_by_name("date")) w_key_prof = Window.partitionBy(key_prof) keys_agg = self.get_aggr_fun() # Check on lengths len_serie_to_profile = len(serie_name) if len(profile_value_cols) == 1 and len_serie_to_profile > 1: print("The same profile is used to reprofile all time series.") profile_value_cols = profile_value_cols * len_serie_to_profile for i in range(len(serie_name)): if standardize: output = output.withColumn( profile_value_cols[i], F.col(profile_value_cols[i]) / getattr(F, keys_agg[i])(F.col(profile_value_cols[i])).over(w_key_prof), ) output = output.withColumn(serie_name[i], F.col(serie_name[i]) * F.col(profile_value_cols[i])) output_std = output.drop(*profile.get_columns_by_name("value")) return output_std def total_amount( self, unitary_amount: TimeSeriesDataFrame, col_to_add: Union[str, List[str]], volume_name: Union[str, List[str], None] = None, price_name: Union[str, List[str], None] = None, ) -> TimeSeriesDataFrame: """Computes a total amount given self TimeSeriesDataFrame of volumes and unitary_amount TimeSeriesDataFrame of unitary prices. Args: unitary_amount (TimeSeriesDataFrame): TimeSeriesDataFrame with unitary amount. col_to_add (Union[str, List[str]]): names of new columns. volume_name (Union[str, List[str]], optional): names of volume columns to use. Defaults to None. price_name (Union[str, List[str]], optional): names of unitary price columns to use. Defaults to None. Returns: TimeSeriesDataFrame: the resulting TimeSeriesDataFrame. """ volume_udm = self.get_columns_by_name("udm") serie_type = list_value_append("amount") unitary_price_udm = unitary_amount.get_columns_by_name("udm") # Get lists of input col_to_add = list_value_append(col_to_add) if volume_name is None: volume_cols = self.get_columns_by_name("value") else: volume_cols = list_value_append(volume_name) select_attr = self.select_value_cols_udm_serie_type(volume_name) volume_udm = select_attr[2] if price_name is None: price_cols = unitary_amount.get_columns_by_name("value") else: price_cols = list_value_append(price_name) select_attr = unitary_amount.select_value_cols_udm_serie_type(price_name) unitary_price_udm = select_attr[2] # Check on input len_add_col = len(col_to_add) serie_type = serie_type * len_add_col if len_add_col > 1: if len(volume_cols) == 1: print("The same quantity is used in the total amount computation.") volume_cols = volume_cols * len_add_col volume_udm = volume_udm * len_add_col if len(price_cols) == 1: print("The same unitary price is used in the total amount computation.") price_cols = price_cols * len_add_col unitary_price_udm = unitary_price_udm * len_add_col col = [F.col(volume) * F.col(price) for volume, price in zip(volume_cols, price_cols)] final_udm = [udm.split("/")[0] for udm in unitary_price_udm] # Check on input lengths and on udm if len_add_col != len(col): raise ValueError(f"Input parameters {col_to_add} and {col} are not of equal length.") if len_add_col != len(serie_type): raise ValueError(f"Input parameters {col_to_add} and {serie_type} are not of equal length.") if len_add_col != len(final_udm): raise ValueError(f"Input parameters {col_to_add} and {final_udm} are not of equal length.") price_volume_udm = [udm.split("/")[-1] for udm in unitary_price_udm] if price_volume_udm != volume_udm: raise ValueError("Different unit of measures, a conversion is needed.") output = self.automatic_join(unitary_amount) output = output.add_time_series(col_to_add=col_to_add, serie_type=serie_type, udm=final_udm, col=col) return output def conv_intermediate_pcs( self, udm_from: Union[str, List[str]], udm_to: Union[str, List[str]], pcs: PanamaDataFrame, value_col_conv: Union[str, List[str]], conv_fact: DataFrame, drop_pcs: bool = True, ) -> TimeSeriesDataFrame: """Calculates the intermediate pcs conversion from Smc to GJ or viceversa. Args: udm_from (Union[str, List[str]]): the starting unit of measures. udm_to (Union[str, List[str]]): the final unit of measures. pcs (PanamaDataFrame): PanamaDataFrame with pcs column. value_col_conv (Union[str, List[str]]): name of the values to convert. conv_fact (DataFrame): the conversion factor DataFrame with udm_from, udm_to and factor columns. drop_pcs (bool, optional): indicates if the pcs time series needs to be dropped. Defaults to True. Returns: TimeSeriesDataFrame: TimeSeriesDataFrame with pcs conversion factor for each value_col_conv. """ output = self.automatic_join(sdf_y=pcs, how="left") # Convert null pcs into default default_pcs = get_single_conv_factor("Smc", "GJ", conv_fact)[0] output = output.withColumn("pcs", F.coalesce(F.col("pcs"), F.lit(default_pcs))) # Calculate pcs intermediate conversion factor for i in range(len(udm_to)): if "Smc" in udm_from[i]: output = output.withColumn(value_col_conv[i] + "_pcs_fact", 1 * F.col("pcs")) udm_from[i] = udm_from[i].replace("Smc", "GJ") # type: ignore elif "Smc" in udm_to[i]: output = output.withColumn(value_col_conv[i] + "_pcs_fact", 1 / F.col("pcs")) udm_to[i] = udm_to[i].replace("Smc", "GJ") # type: ignore else: output = output.withColumn(value_col_conv[i] + "_pcs_fact", F.lit(1)) if "/" in udm_from[i] and "/" in udm_to[i]: output = output.withColumn(value_col_conv[i] + "_pcs_fact", 1 / F.col(value_col_conv[i] + "_pcs_fact")) # Drop pcs time series if drop_pcs: output = output.drop("pcs") return output def conv_udm( self, final_udm: Union[str, List[str]], conv_fact: DataFrame, serie_name: Union[str, List[str], None] = None, pcs: Union[PanamaDataFrame, None] = None, drop_pcs: bool = True, ) -> TimeSeriesDataFrame: """Converts unit of measure of a TimeSeriesDataFrame. Args: final_udm (Union[str, List[str]]): the desired unit of measure. conv_fact (DataFrame): the conversion factor DataFrame with udm_from, udm_to and factor columns. serie_name (Union[str, List[str]], optional): the self.value_cols to be converted. If None the method will be applied to all value_cols of TimeSeriesDataFrame. Defaults to None. pcs (PanamaDataFrame, optional): PanamaDataFrame with pcs factors. Defaults to None. drop_pcs (bool, optional): indicates if the pcs time series needs to be dropped in the conv_intermediate_pcs step. Defaults to True. Returns: TimeSeriesDataFrame: the converted TimeSeriesDataFrame. """ # Get lists value_cols_list = self.get_columns_by_name("value") final_udm = list_value_append(final_udm) original_udm = self.get_columns_by_name("udm") serie_name_conv = value_cols_list if serie_name is None else list_value_append(serie_name) # Check on input len_serie_name, len_final_udm = len(serie_name_conv), len(final_udm) if len_serie_name != len_final_udm: if len(final_udm) == 1: print("All the time series will be converted to the same final unit of measure.") final_udm = final_udm * len(serie_name_conv) else: raise ValueError( f"The given final_udm {final_udm} has not same length of the series to convert {serie_name_conv}." ) # Select serie_udm used for conversion and udm_to_return returned with the final TimeSeriesDataFrame if serie_name is None: serie_udm = original_udm udm_to_return = from_list_to_str(final_udm.copy()) else: select_attr = self.select_value_cols_udm_serie_type(serie_name_conv) pos_udm = select_attr[0] serie_udm = select_attr[2] udm_to_return = self.get_columns_by_name("udm").copy() # type: ignore for pos in pos_udm: udm_to_return[pos] = from_list_to_str(final_udm[pos_udm.index(pos)]) # PCS intermediate conversion if pcs is not None: output = self.conv_intermediate_pcs(serie_udm, final_udm, pcs, serie_name_conv, conv_fact, drop_pcs) output_sdf = output.sdf else: output = self output_sdf = self.sdf # Traditional final conversion conv_factors = get_composite_conv_factor(serie_udm, final_udm, conv_fact) for i in range(len_serie_name): pcs_col_name = serie_name_conv[i] + "_pcs_fact" if pcs_col_name in output_sdf.columns: output_sdf = output_sdf.withColumn( serie_name_conv[i], F.col(serie_name_conv[i]) * conv_factors[i] * F.col(pcs_col_name) ).drop(pcs_col_name) else: output_sdf = output_sdf.withColumn(serie_name_conv[i], F.col(serie_name_conv[i]) * conv_factors[i]) return TimeSeriesDataFrame( sdf=output_sdf, granularity=output.granularity, # type: ignore date_cols=output.get_columns_by_name("date"), value_cols=output.get_columns_by_name("value"), key_cols=output.get_columns_by_name("key"), load=output.load, # type: ignore serie_type=output.get_columns_by_name("serie_type"), # type: ignore udm=udm_to_return, )
Ancestors
Class variables
var agg_type_dic
var granularity_dic
var load_dic
Static methods
def order_by_granularity(tsdf_list: List[TimeSeriesDataFrame], reverse: bool = False) ‑> List[TimeSeriesDataFrame]
-
Sorts a list of TimeSeriesDataFrame objects by their granularity attribute in ascending or descending order.
Args
tsdf_list
- list of TimeSeriesDataFrame.
reverse
- ordering in ascending (False) or descending order (True). Defaults to False.
Returns
List[TimeSeriesDataFrame]
- the ordered original list.
Methods
def add_time_series(self, col_to_add: Union[str, List[str]], serie_type: Union[str, List[str]], udm: Union[str, List[str]], col: Union[F.Column, List[F.Column], None] = None) ‑> TimeSeriesDataFrame
-
Adds time_series to TimeSeriesDataFrame.
Args
col_to_add
:Union[str, List[str]]
- names of the column to add.
col
:Union[F.Column, List[F.Column]]
- column expression to get new desired values.
serie_type
:Union[str, List[str]]
- serie_type of new columns.
udm
:Union[str, List[str]]
- unit of measure of new columns.
Returns
TimeSeriesDataFrame
- The final TimeSeriesDataFrame with new added columns.
def aggregate(self, granularity: "Literal['day', 'hour', 'month', 'quarter', 'year']", load: Union[_LOAD_TYPING, bool] = True, partition_cols: Union[List[str], None] = None, value_cols: Union[List[str], None] = None, aggr_functions: Union[List[str], None] = None, optional_aggr: Union[Dict[str, str], None] = None)
-
Method for aggregating based on granularity and partition columns.
NOTE: 'interval' granularity is not yet supported.
Args
- granularity (Literal["day", "hour", "month", "quarter", "year"]): output granularity.
- load (Union[ List[Literal["F", "PO", "PO2", "f", "po", "po2"]],
- Literal["F", "PO", "PO2", "f", "po", "po2"], bool ], optional): load types to use.
- If False no load is used for aggregation. If True all load columns are used. If False no load is used. Defaults to True.
partition_cols
:Union[List[str], None]
, optional- list of other columns to use as partitions. If None, key_cols are used. Defaults to None.
value_cols
:Union[List[str], None]
, optional- columns to aggregate. If None, all the value columns are aggregated. Defaults to None.
aggr_functions
:Union[List[str], None]
, optional- functions used to aggregate the data. A single function per value column is allowed. If None, default functions are used. Defaults to None.
optional_aggr
:Union[Dict[str, str], None]
, optional- optional columns to aggregate. A dict {column
: aggr_function } is expected. If None, no other columns are aggregated. Defaults to None.
Raises
ValueError
- if a non supported granularity is passed.
Returns
TimeSeriesDataFrame
- aggregated time series dataframe.
def automatic_join(self, sdf_y: Union[PanamaDataFrame, TimeSeriesDataFrame], how: str = 'inner') ‑> TimeSeriesDataFrame
-
Joins two TimeSeriesDataFrame objects based on their common keys and temporal information.
Args
sdf_y
:Union[PanamaDataFrame, TimeSeriesDataFrame]
- a TimeSeriesDataFrame or PanamaDataFrame.
how
:str
, optional- the join type (see DataFrame join method). Defaults to "inner".
Returns
TimeSeriesDataFrame
- the resulting TimeSeriesDataFrame.
def conv_intermediate_pcs(self, udm_from: Union[str, List[str]], udm_to: Union[str, List[str]], pcs: PanamaDataFrame, value_col_conv: Union[str, List[str]], conv_fact: DataFrame, drop_pcs: bool = True) ‑> TimeSeriesDataFrame
-
Calculates the intermediate pcs conversion from Smc to GJ or viceversa.
Args
udm_from
:Union[str, List[str]]
- the starting unit of measures.
udm_to
:Union[str, List[str]]
- the final unit of measures.
pcs
:PanamaDataFrame
- PanamaDataFrame with pcs column.
value_col_conv
:Union[str, List[str]]
- name of the values to convert.
conv_fact
:DataFrame
- the conversion factor DataFrame with udm_from, udm_to and factor columns.
drop_pcs
:bool
, optional- indicates if the pcs time series needs to be dropped. Defaults to True.
Returns
TimeSeriesDataFrame
- TimeSeriesDataFrame with pcs conversion factor for each value_col_conv.
def conv_udm(self, final_udm: Union[str, List[str]], conv_fact: DataFrame, serie_name: Union[str, List[str], None] = None, pcs: Union[PanamaDataFrame, None] = None, drop_pcs: bool = True) ‑> TimeSeriesDataFrame
-
Converts unit of measure of a TimeSeriesDataFrame.
Args
final_udm
:Union[str, List[str]]
- the desired unit of measure.
conv_fact
:DataFrame
- the conversion factor DataFrame with udm_from, udm_to and factor columns.
serie_name
:Union[str, List[str]]
, optional- the self.value_cols to be converted. If None the method will be applied to all value_cols of TimeSeriesDataFrame. Defaults to None.
pcs
:PanamaDataFrame
, optional- PanamaDataFrame with pcs factors. Defaults to None.
drop_pcs
:bool
, optional- indicates if the pcs time series needs to be dropped in the conv_intermediate_pcs step. Defaults to True.
Returns
TimeSeriesDataFrame
- the converted TimeSeriesDataFrame.
def display(self)
-
Prints the granularity and load type of the time series and displays the DataFrame
def drop(self, *args) ‑> TimeSeriesDataFrame
-
Drops given columns from the sdf of a TimeSeriesDataFrame object together with corresponding class attributes.
Args
col_to_drop
:Union[str, List[str]]
- list o string with the name of the column to drop.
Returns
TimeSeriesDataFrame
- the dropped TimeSeriesDataFrame.
def expand_flat(self, calendar: DataFrame, granularity: str, serie_name: Union[List, List[str], None] = None, load_calendar_col: Union[List[str], str, None] = None) ‑> TimeSeriesDataFrame
-
Expands a given profile based on a calendar column.
Args
calendar
:DataFrame
- the calendar DataFrame.
granularity
:str
- a valid granularity desired.
serie_name
:Union[List, List[str]]
, optional- the value_col to be expanded. If None the method will be applied on all value_cols of the TimeSeriesDataFrame. Default is None.
load_calendar_col
:str
, optional- the name of load column in calendar DataFrame to be used in case of self without load. Default is None.
Returns
TimeSeriesDataFrame
- the resulting TimeSeriesDataFrame.
def expand_with_profile(self, profile: TimeSeriesDataFrame, standardize: bool = True, serie_name: Union[List, List[str], None] = None, profile_name: Union[List, List[str], None] = None) ‑> TimeSeriesDataFrame
-
Reprofiles the value_cols of a TimeSeriesDataFrame given a specific profile.
Args
profile
:TimeSeriesDataFrame
- the TimeSeriesDataFrame with the profile to be used.
standardize
:bool
- flag to choose if standardize value cols. Default is True.
serie_name
:Union[List, List[str]]
, optional- the self.value_cols to be profiled. If None the method will be applied to all value_cols of TimeSeriesDataFrame.
profile_name
:Union[List, List[str]]
, optional- the profile.value_cols to use for profiling. If None the method will be applied to all value_cols of TimeSeriesDataFrame.
Returns
TimeSeriesDataFrame
- the resulting TimeSeriesDataFrame.
def get_aggr_fun(self) ‑> Union[str, List[Any]]
-
Get the aggregation function corresponding to the serie_type of a TimeSeriesDataFrame.
Returns
Union[str, List[str]]
- the list with the aggregation function.
def get_serie_type_dictionary(self) ‑> dict
-
Get the dictionary that map each value_cols to the corresponding aggregation function.
Returns
dict
- the corresponding dictionary
def select_value_cols_udm_serie_type(self, serie_name: Union[str, List[str]]) ‑> tuple
-
Returns position of selected serie_name in self.value_cols and the corresponding serie_type and udm.
Args
serie_name
:Union[str, List[str]]
- the selected value_cols.
Returns
tuple
- tuple with position, serie_type and udm.
def total_amount(self, unitary_amount: TimeSeriesDataFrame, col_to_add: Union[str, List[str]], volume_name: Union[str, List[str], None] = None, price_name: Union[str, List[str], None] = None) ‑> TimeSeriesDataFrame
-
Computes a total amount given self TimeSeriesDataFrame of volumes and unitary_amount TimeSeriesDataFrame of unitary prices.
Args
unitary_amount
:TimeSeriesDataFrame
- TimeSeriesDataFrame with unitary amount.
col_to_add
:Union[str, List[str]]
- names of new columns.
volume_name
:Union[str, List[str]]
, optional- names of volume columns to use. Defaults to None.
price_name
:Union[str, List[str]]
, optional- names of unitary price columns to use. Defaults to None.
Returns
TimeSeriesDataFrame
- the resulting TimeSeriesDataFrame.
Inherited members