Module panama.io.feature_store
Classes
class DatabricksFeatureStore (spark: pyspark.sql.session.SparkSession, config: Optional[Dict] = None)
-
A feature store implementation that uses the Databricks Feature Store API.
Initializes a new instance of the DatabricksFeatureStore class and connects to the FeatureStoreClient.
Expand source code
class DatabricksFeatureStore(PanamaFeatureStore, FeatureStoreClient, IOInterface): """ A feature store implementation that uses the Databricks Feature Store API. """ def __init__(self, spark: SparkSession, config: Optional[Dict] = None): """ Initializes a new instance of the DatabricksFeatureStore class and connects to the FeatureStoreClient. """ self.spark = spark self.client = self.connect() PanamaFeatureStore.__init__(self, config) def connect(self) -> FeatureStoreClient: return FeatureStoreClient() def create_feature_table( self, table_name: str, primary_keys: List[str], description: str, df: Optional[DataFrame] = None, schema: Optional[StructType] = None, partition_columns: Optional[List[str]] = None, ) -> Union[str, None]: """ Create and return a feature table in feature store with the given name and primary keys. The returned feature table has the given name and primary keys. Uses the provided schema or the inferred schema of the provided df. If df is provided, this data will be saved in a Delta table. Args: table_name (str): Name of the table to read from. Format must be "<catalog>.<schema>.<table>". Catalog must be indicated without env prefix. primary_keys (List[str]): A list of primary key columns. description (str): A description of the feature table. df (Optional, pyspark.sql.DataFrame): Table used to infer schema if not given. schema (Optional, StructType): Feature table schema. partition_columns (Optional, List[str]): A list of partition columns. Default to None. Returns: Union[str, None]: The name of the created feature table. """ # First set catalog table_name = self._assign_env_to_string(s=table_name, purpose="w") schema_name_isolated = re.search(pattern=r"\.(\w*?)\.", string=table_name).group(1) # type: ignore table_name_isolated = re.search(pattern=r"\.([^\.]+)$", string=table_name).group(1) # type: ignore if ( self.spark.sql(f"SHOW TABLES IN {schema_name_isolated}") .where(F.col("tableName") == table_name_isolated) .count() > 0 ): trace = f"{table_name} already exists, function will interrupt and do nothing, use write_feature_table to add data to an existing table." print(trace) logger = panama.logging.get_logger() logger.warning(__name__, exc_info=Warning(trace)) return else: # Create feature table self.client.create_table( df=df, name=table_name, primary_keys=primary_keys, schema=schema, partition_columns=partition_columns, description=description, ) return f"Created Feature Store table at {table_name}" def write_feature_table(self, df: DataFrame, table_name: str, mode: str = "overwrite"): """ Writes data to a feature table in the Databricks Feature Store. Args: df (pyspark.sql.DataFrame): The data to be written to the feature table. table_name (str): Name of the table to read from. Format must be "<catalog>.<schema>.<table>". Catalog must be indicated without env prefix. mode (str): The mode to use when writing data to the feature table (default is 'overwrite', other option is 'merge'). """ # First set catalog table_name = self._assign_env_to_string(s=table_name, purpose="w") # Write data to feature table self.client.write_table(name=table_name, df=df, mode=mode) def read_feature_table(self, table_name: str) -> DataFrame: """ Reads data from a feature table in the Databricks Feature Store. Args: table_name (str): Name of the table to read from. Format must be "<catalog>.<schema>.<table>". Catalog must be indicated without env prefix. features (Optional[List[str]]): A list of feature names to be read (default is None, which reads all features). Returns: pyspark.sql.DataFrame: The data from the feature table. """ # First set catalog table_name = self._assign_env_to_string(s=table_name, purpose="r") return self.client.read_table(name=table_name) def _get_table(self, name: str, purpose: str) -> FeatureTable: """ Get a feature table’s metadata. Args: name (str): A feature table name of the form <catalog>.<schema>.<table_name>, for example dev_analytics.feature_store.user_features. catalog_name (Optional, str): Name of catalog to set before running the main functionality. If not specified the default object catalog will be used. purpose (str): string indicating if the string will be used toW read or to write table. Allowed values are 'r' and 'w' only. Returns: (databricks.feature_store.entities.feature_table.FeatureTable): Value class describing one feature table """ # First set catalog table_name = self._assign_env_to_string(s=name, purpose=purpose) return FeatureStoreClient().get_table(name=table_name) def _unpack_feature_lookup_dict(self, lookup_dict: Dict) -> Tuple[List[FeatureLookup], List[PanamaFeatureLookup]]: """Unpack input dict and create two separated feature list, a FeatureLookup list and a PanamaFeatureLookup list. Args: lookup_dict (Dict): input dict with FeatureTables (or standard table) catalog_name (Optional, str): IT'S ONLY MEANT FOR TEST PURPOSE, DO NOT USE!!! Returns: Tuple[List[FeatureLookup], List[PanamaFeatureLookup]]: a list of FeatureLookup objects and a list of PanamaFeatureLookup to be used as input for feature_lookup method. """ feature_lookup = [] table_lookup = [] for lookup in lookup_dict: lookup_config = lookup_dict.get(lookup) # Check if table's schema ends with `_fs`. # In that case it will be treated as a feature store feature lookup, otherwise as a panama feature lookup is_feature_table = lookup.split(".")[1].endswith("_fs") if is_feature_table == True: # FeatureLookup case # If empty go find feature table primary keys and use those as lookup keys lookup_key = lookup_config.get("lookup_key") # type: ignore if lookup_key is None: lookup_key = self._get_table(name=lookup, purpose="r").primary_keys feature_names = lookup_config.get("feature_names") # type: ignore feature_lookup.append( FeatureLookup( table_name=self._assign_env_to_string(lookup, "r"), lookup_key=lookup_key, feature_names=feature_names, ) ) else: # panama lookup case lookup_key = lookup_config.get("lookup_key") # type: ignore feature_names = lookup_config.get("feature_names") # type: ignore table_lookup.append( PanamaFeatureLookup( table_name=lookup, # i don't need to add prefix because the io will do it. lookup_key=lookup_key, feature_names=feature_names, ) ) return feature_lookup, table_lookup def _add_feature_to_sdf( self, df: DataFrame, feature_lookups: List[PanamaFeatureLookup], exclude_columns: Optional[List[str]] = None, ) -> DataFrame: """Enrich given pyspark.DataFrame with feature located in tables. The feature lookup are defined inside the PanamaFeatureLookup objects. Args: df (pyspark.DataFrame): input dataframe. feature_lookups (List(PanamaFeatureLookup)): List of features to join into the DataFrame. exclude_columns (List(str), optional): Names of the columns to drop from the enriched dataframe. Default to None. Returns: DataFrame: input dataframe plus desired features. """ io = IOAdls(spark=self.spark) # Iterate over feature_lookups for feature in feature_lookups: # Read table and select desired features (plus keys by default) feature_table = io.read_table( table_name=feature.table_name, ).select( feature.lookup_key + feature.feature_names # type: ignore ) # Perform join over lookup key df = df.join(feature_table, how="left", on=feature.lookup_key) # If required drop excluded columns if exclude_columns is not None: df = df.drop(*exclude_columns) return df def feature_lookup( self, df: DataFrame, lookup_dict: Dict, label: str, exclude_columns: List[str] = [], ) -> TrainingSet: """Add features from feauture store to given DataFrame. Structure of lookup_dict must be: { "catalog.schema.table_name" (str - feature table): { "lookup_key": (list - ordered list of columns in df that match feature table primary keys), "feature_names": (list - features in feature table that we want to add to df) } } Args: df (DataFrame): input dataframe to enrich. lookup_dict (Dict): dictonary describing every feature lookup (structure described above). label (str): Names of column(s) in DataFrame that contain training set labels. To create a training set without a label field, i.e. for unsupervised training set, specify label = None exclude_columns (List(str)): Names of the columns to drop from the TrainingSet DataFrame Returns: DataFrame: Object of type TrainingSet (databricks.feature_store.training_set.TrainingSet) """ # Separate feature store lookup from standard table lookup feature_lookup, table_lookup = self._unpack_feature_lookup_dict(lookup_dict) # First perform general table's feature lookup that enriches df if len(table_lookup) > 0: df = self._add_feature_to_sdf(df=df, feature_lookups=table_lookup, exclude_columns=exclude_columns) # Then perform feature table's FeatureLookup creating final training set training_set = self.client.create_training_set( df=df, feature_lookups=feature_lookup, label=label, exclude_columns=exclude_columns ) return training_set
Ancestors
- PanamaFeatureStore
- abc.ABC
- databricks.feature_store.client.FeatureStoreClient
- IOInterface
Methods
def connect(self) ‑> databricks.feature_store.client.FeatureStoreClient
def create_feature_table(self, table_name: str, primary_keys: List[str], description: str, df: Optional[pyspark.sql.dataframe.DataFrame] = None, schema: Optional[pyspark.sql.types.StructType] = None, partition_columns: Optional[List[str]] = None) ‑> Optional[str]
-
Create and return a feature table in feature store with the given name and primary keys.
The returned feature table has the given name and primary keys. Uses the provided schema or the inferred schema of the provided df. If df is provided, this data will be saved in a Delta table.
Args
table_name
:str
- Name of the table to read from. Format must be "
. . ". Catalog must be indicated without env prefix.
primary_keys
:List[str]
- A list of primary key columns.
description
:str
- A description of the feature table.
df
:Optional, pyspark.sql.DataFrame
- Table used to infer schema if not given.
schema
:Optional, StructType
- Feature table schema.
partition_columns
:Optional, List[str]
- A list of partition columns. Default to None.
Returns
Union[str, None]
- The name of the created feature table.
def feature_lookup(self, df: pyspark.sql.dataframe.DataFrame, lookup_dict: Dict, label: str, exclude_columns: List[str] = []) ‑> databricks.feature_store.training_set.TrainingSet
Add features from feauture store to given DataFrame.
Structure of lookup_dict must be: { "catalog.schema.table_name" (str - feature table): { "lookup_key": (list - ordered list of columns in df that match feature table primary keys), "feature_names": (list - features in feature table that we want to add to df) } }
Args
df
:DataFrame
- input dataframe to enrich.
lookup_dict
:Dict
- dictonary describing every feature lookup (structure described above).
label
:str
- Names of column(s) in DataFrame that contain training set labels. To create a training set without a label field, i.e. for unsupervised training set, specify label = None
exclude_columns (List(str)): Names of the columns to drop from the TrainingSet DataFrame
Returns
DataFrame
- Object of type TrainingSet (databricks.feature_store.training_set.TrainingSet)
def read_feature_table(self, table_name: str) ‑> pyspark.sql.dataframe.DataFrame
Reads data from a feature table in the Databricks Feature Store.
Args
table_name
:str
- Name of the table to read from. Format must be "
. . ". Catalog must be indicated without env prefix.
features
:Optional[List[str]]
- A list of feature names to be read (default is None, which reads all features).
Returns
pyspark.sql.DataFrame
- The data from the feature table.
def write_feature_table(self, df: pyspark.sql.dataframe.DataFrame, table_name: str, mode: str = 'overwrite')
Writes data to a feature table in the Databricks Feature Store.
Args
df
:pyspark.sql.DataFrame
- The data to be written to the feature table.
table_name
:str
- Name of the table to read from. Format must be "
. . ". Catalog must be indicated without env prefix.
mode
:str
- The mode to use when writing data to the feature table (default is 'overwrite', other option is 'merge').
class PanamaFeatureLookup (table_name: str, lookup_key: Union[str, List[str]], feature_names: Union[str, List[str], ForwardRef(None)] = None)
Value class that emulate FeatureLookup but is used to perfrorm feature lookup over internal tables.
Initialize object
Args
table_name
:str
- Table name
lookup_key
:Union[str, List[str]]
- Key to use when joining this table with the :class:
DataFrame <pyspark.sql.DataFrame>
feature_names
:Union[str, List[str], None]
, optional- A single feature name, a list of feature names, or None to lookup all features (excluding primary keys) in the feature table at the time that the training set is created. Defaults to None.
Returns
PanamaFeatureLookup
Expand source code
class PanamaFeatureLookup(object): """Value class that emulate FeatureLookup but is used to perfrorm feature lookup over internal tables.""" def __init__( self, table_name: str, lookup_key: Union[str, List[str]], feature_names: Union[str, List[str], None] = None, ): """Initialize object Args: table_name (str): Table name lookup_key (Union[str, List[str]]): Key to use when joining this table with the :class:`DataFrame <pyspark.sql.DataFrame>` feature_names (Union[str, List[str], None], optional): A single feature name, a list of feature names, or None to lookup all features (excluding primary keys) in the feature table at the time that the training set is created. Defaults to None. Returns: PanamaFeatureLookup """ self._table_name = table_name self._feature_names = fsutils.as_list(feature_names, default=[]) self._lookup_key = lookup_key def __eq__(self, other): if not isinstance(other, self.__class__): return False return self.__dict__ == other.__dict__ @property def table_name(self): """The table name to use in this PanamaFeatureLookup.""" return self._table_name @property def lookup_key(self): """The lookup key(s) to use in this PanamaFeatureLookup.""" return self._lookup_key @property def feature_names(self): """The feature name(s) to use in this PanamaFeatureLookup.""" return self._feature_names
Instance variables
prop feature_names
-
The feature name(s) to use in this PanamaFeatureLookup.
Expand source code
@property def feature_names(self): """The feature name(s) to use in this PanamaFeatureLookup.""" return self._feature_names
prop lookup_key
-
The lookup key(s) to use in this PanamaFeatureLookup.
Expand source code
@property def lookup_key(self): """The lookup key(s) to use in this PanamaFeatureLookup.""" return self._lookup_key
prop table_name
-
The table name to use in this PanamaFeatureLookup.
Expand source code
@property def table_name(self): """The table name to use in this PanamaFeatureLookup.""" return self._table_name
class PanamaFeatureStore (config: Optional[Dict] = None)
Helper class that provides a standard way to create an ABC using inheritance.
Expand source code
class PanamaFeatureStore(ABC): def __init__(self, config: Optional[Dict] = None): self.config = config @abstractmethod def connect(self): pass @abstractmethod def create_feature_table( self, df: DataFrame, table_name: str, primary_keys: List[str], partition_columns: List[str], description: str, ): pass @abstractmethod def write_feature_table(self, df: DataFrame, table_name: str, mode: str): pass @abstractmethod def read_feature_table(self, table_name: str, features: Optional[List[str]]) -> DataFrame: pass
Ancestors
- abc.ABC
Subclasses
Methods
def connect(self)
def create_feature_table(self, df: pyspark.sql.dataframe.DataFrame, table_name: str, primary_keys: List[str], partition_columns: List[str], description: str)
def read_feature_table(self, table_name: str, features: Optional[List[str]]) ‑> pyspark.sql.dataframe.DataFrame
def write_feature_table(self, df: pyspark.sql.dataframe.DataFrame, table_name: str, mode: str)