Module panama.io.feature_store

Classes

class DatabricksFeatureStore (spark: pyspark.sql.session.SparkSession, config: Optional[Dict] = None)

A feature store implementation that uses the Databricks Feature Store API.

Initializes a new instance of the DatabricksFeatureStore class and connects to the FeatureStoreClient.

Expand source code
class DatabricksFeatureStore(PanamaFeatureStore, FeatureStoreClient, IOInterface):
    """
    A feature store implementation that uses the Databricks Feature Store API.
    """

    def __init__(self, spark: SparkSession, config: Optional[Dict] = None):
        """
        Initializes a new instance of the DatabricksFeatureStore class and connects to the
        FeatureStoreClient.
        """
        self.spark = spark
        self.client = self.connect()
        PanamaFeatureStore.__init__(self, config)

    def connect(self) -> FeatureStoreClient:
        return FeatureStoreClient()

    def create_feature_table(
        self,
        table_name: str,
        primary_keys: List[str],
        description: str,
        df: Optional[DataFrame] = None,
        schema: Optional[StructType] = None,
        partition_columns: Optional[List[str]] = None,
    ) -> Union[str, None]:
        """
        Create and return a feature table in feature store with the given name and primary keys.

        The returned feature table has the given name and primary keys. Uses the provided schema or the inferred schema of the provided df. If df
        is provided, this data will be saved in a Delta table.

        Args:
            table_name (str): Name of the table to read from. Format must be "<catalog>.<schema>.<table>". Catalog must be indicated without env prefix.
            primary_keys (List[str]): A list of primary key columns.
            description (str): A description of the feature table.
            df (Optional, pyspark.sql.DataFrame): Table used to infer schema if not given.
            schema (Optional, StructType): Feature table schema.
            partition_columns (Optional, List[str]): A list of partition columns. Default to None.

        Returns:
            Union[str, None]: The name of the created feature table.
        """

        # First set catalog
        table_name = self._assign_env_to_string(s=table_name, purpose="w")

        schema_name_isolated = re.search(pattern=r"\.(\w*?)\.", string=table_name).group(1)  # type: ignore
        table_name_isolated = re.search(pattern=r"\.([^\.]+)$", string=table_name).group(1)  # type: ignore

        if (
            self.spark.sql(f"SHOW TABLES IN {schema_name_isolated}")
            .where(F.col("tableName") == table_name_isolated)
            .count()
            > 0
        ):
            trace = f"{table_name} already exists, function will interrupt and do nothing, use write_feature_table to add data to an existing table."
            print(trace)
            logger = panama.logging.get_logger()
            logger.log_step(step_id=__name__, level=panama.logging.WARN, trace=trace)
            return
        else:
            # Create feature table
            self.client.create_table(
                df=df,
                name=table_name,
                primary_keys=primary_keys,
                schema=schema,
                partition_columns=partition_columns,
                description=description,
            )
            return f"Created Feature Store table at {table_name}"

    def write_feature_table(self, df: DataFrame, table_name: str, mode: str = "overwrite"):
        """
        Writes data to a feature table in the Databricks Feature Store.

        Args:
            df (pyspark.sql.DataFrame): The data to be written to the feature table.
            table_name (str): Name of the table to read from. Format must be "<catalog>.<schema>.<table>". Catalog must be indicated without env prefix.
            mode (str): The mode to use when writing data to the feature table (default is 'overwrite', other option is 'merge').
        """
        # First set catalog
        table_name = self._assign_env_to_string(s=table_name, purpose="w")

        # Write data to feature table
        self.client.write_table(name=table_name, df=df, mode=mode)

    def read_feature_table(self, table_name: str) -> DataFrame:
        """
        Reads data from a feature table in the Databricks Feature Store.

        Args:
            table_name (str): Name of the table to read from. Format must be "<catalog>.<schema>.<table>". Catalog must be indicated without env prefix.
            features (Optional[List[str]]): A list of feature names to be read (default is None, which reads all features).

        Returns:
            pyspark.sql.DataFrame: The data from the feature table.
        """
        # First set catalog
        table_name = self._assign_env_to_string(s=table_name, purpose="r")

        return self.client.read_table(name=table_name)

    def _get_table(self, name: str, purpose: str) -> FeatureTable:
        """
        Get a feature table’s metadata.

        Args:
            name (str): A feature table name of the form <catalog>.<schema>.<table_name>, for example dev_analytics.feature_store.user_features.
            catalog_name (Optional, str): Name of catalog to set before running the main functionality. If not specified the default object catalog will be used.
            purpose (str): string indicating if the string will be used toW read or to write table. Allowed values are 'r' and 'w' only.
        Returns:
            (databricks.feature_store.entities.feature_table.FeatureTable): Value class describing one feature table
        """
        # First set catalog
        table_name = self._assign_env_to_string(s=name, purpose=purpose)

        return FeatureStoreClient().get_table(name=table_name)

    def _unpack_feature_lookup_dict(self, lookup_dict: Dict) -> Tuple[List[FeatureLookup], List[PanamaFeatureLookup]]:
        """Unpack input dict and create two separated feature list, a FeatureLookup list and a PanamaFeatureLookup list.

        Args:
            lookup_dict (Dict): input dict with FeatureTables (or standard table)
            catalog_name (Optional, str): IT'S ONLY MEANT FOR TEST PURPOSE, DO NOT USE!!!

        Returns:
            Tuple[List[FeatureLookup], List[PanamaFeatureLookup]]: a list of FeatureLookup objects and a list of PanamaFeatureLookup
                to be used as input for feature_lookup method.
        """

        feature_lookup = []
        table_lookup = []

        for lookup in lookup_dict:
            lookup_config = lookup_dict.get(lookup)

            # Check if table's schema ends with `_fs`.
            # In that case it will be treated as a feature store feature lookup, otherwise as a panama feature lookup
            is_feature_table = lookup.split(".")[1].endswith("_fs")

            if is_feature_table == True:  # FeatureLookup case
                # If empty go find feature table primary keys and use those as lookup keys
                lookup_key = lookup_config.get("lookup_key")  # type: ignore
                if lookup_key is None:
                    lookup_key = self._get_table(name=lookup, purpose="r").primary_keys

                feature_names = lookup_config.get("feature_names")  # type: ignore

                feature_lookup.append(
                    FeatureLookup(
                        table_name=self._assign_env_to_string(lookup, "r"),
                        lookup_key=lookup_key,
                        feature_names=feature_names,
                    )
                )

            else:  # panama lookup case
                lookup_key = lookup_config.get("lookup_key")  # type: ignore
                feature_names = lookup_config.get("feature_names")  # type: ignore

                table_lookup.append(
                    PanamaFeatureLookup(
                        table_name=lookup,  # i don't need to add prefix because the io will do it.
                        lookup_key=lookup_key,
                        feature_names=feature_names,
                    )
                )

        return feature_lookup, table_lookup

    def _add_feature_to_sdf(
        self,
        df: DataFrame,
        feature_lookups: List[PanamaFeatureLookup],
        exclude_columns: Optional[List[str]] = None,
    ) -> DataFrame:
        """Enrich given pyspark.DataFrame with feature located in tables. The feature lookup are defined inside the PanamaFeatureLookup objects.

        Args:
            df (pyspark.DataFrame): input dataframe.
            feature_lookups (List(PanamaFeatureLookup)): List of features to join into the DataFrame.
            exclude_columns (List(str), optional): Names of the columns to drop from the enriched dataframe. Default to None.

        Returns:
            DataFrame: input dataframe plus desired features.
        """

        io = IOAdls(spark=self.spark)

        # Iterate over feature_lookups
        for feature in feature_lookups:
            # Read table and select desired features (plus keys by default)
            feature_table = io.read_table(
                table_name=feature.table_name,
            ).select(
                feature.lookup_key + feature.feature_names  # type: ignore
            )

            # Perform join over lookup key
            df = df.join(feature_table, how="left", on=feature.lookup_key)

        # If required drop excluded columns
        if exclude_columns is not None:
            df = df.drop(*exclude_columns)

        return df

    def feature_lookup(
        self,
        df: DataFrame,
        lookup_dict: Dict,
        label: str,
        exclude_columns: List[str] = [],
    ) -> TrainingSet:
        """Add features from feauture store to given DataFrame.

        Structure of lookup_dict must be:
        {
            "catalog.schema.table_name" (str - feature table):
                {
                    "lookup_key": (list - ordered list of columns in df that match feature table primary keys),
                    "feature_names": (list - features in feature table that we want to add to df)
                }
        }

        Args:
            df (DataFrame): input dataframe to enrich.
            lookup_dict (Dict): dictonary describing every feature lookup (structure described above).
            label (str): Names of column(s) in DataFrame that contain training set labels. To create a training set without a label field, i.e. for unsupervised training set, specify label = None
            exclude_columns (List(str)): Names of the columns to drop from the TrainingSet DataFrame

        Returns:
            DataFrame: Object of type TrainingSet (databricks.feature_store.training_set.TrainingSet)
        """
        # Separate feature store lookup from standard table lookup
        feature_lookup, table_lookup = self._unpack_feature_lookup_dict(lookup_dict)

        # First perform general table's feature lookup that enriches df
        if len(table_lookup) > 0:
            df = self._add_feature_to_sdf(df=df, feature_lookups=table_lookup, exclude_columns=exclude_columns)

        # Then perform feature table's FeatureLookup creating final training set
        training_set = self.client.create_training_set(
            df=df, feature_lookups=feature_lookup, label=label, exclude_columns=exclude_columns
        )

        return training_set

Ancestors

Methods

def connect(self) ‑> databricks.feature_store.client.FeatureStoreClient
def create_feature_table(self, table_name: str, primary_keys: List[str], description: str, df: Optional[pyspark.sql.dataframe.DataFrame] = None, schema: Optional[pyspark.sql.types.StructType] = None, partition_columns: Optional[List[str]] = None) ‑> Optional[str]

Create and return a feature table in feature store with the given name and primary keys.

The returned feature table has the given name and primary keys. Uses the provided schema or the inferred schema of the provided df. If df is provided, this data will be saved in a Delta table.

Args

table_name : str
Name of the table to read from. Format must be "..". Catalog must be indicated without env prefix.
primary_keys : List[str]
A list of primary key columns.
description : str
A description of the feature table.
df : Optional, pyspark.sql.DataFrame
Table used to infer schema if not given.
schema : Optional, StructType
Feature table schema.
partition_columns : Optional, List[str]
A list of partition columns. Default to None.

Returns

Union[str, None]
The name of the created feature table.
def feature_lookup(self, df: pyspark.sql.dataframe.DataFrame, lookup_dict: Dict, label: str, exclude_columns: List[str] = []) ‑> databricks.feature_store.training_set.TrainingSet

Add features from feauture store to given DataFrame.

Structure of lookup_dict must be: { "catalog.schema.table_name" (str - feature table): { "lookup_key": (list - ordered list of columns in df that match feature table primary keys), "feature_names": (list - features in feature table that we want to add to df) } }

Args

df : DataFrame
input dataframe to enrich.
lookup_dict : Dict
dictonary describing every feature lookup (structure described above).
label : str
Names of column(s) in DataFrame that contain training set labels. To create a training set without a label field, i.e. for unsupervised training set, specify label = None

exclude_columns (List(str)): Names of the columns to drop from the TrainingSet DataFrame

Returns

DataFrame
Object of type TrainingSet (databricks.feature_store.training_set.TrainingSet)
def read_feature_table(self, table_name: str) ‑> pyspark.sql.dataframe.DataFrame

Reads data from a feature table in the Databricks Feature Store.

Args

table_name : str
Name of the table to read from. Format must be "..
". Catalog must be indicated without env prefix.
features : Optional[List[str]]
A list of feature names to be read (default is None, which reads all features).

Returns

pyspark.sql.DataFrame
The data from the feature table.
def write_feature_table(self, df: pyspark.sql.dataframe.DataFrame, table_name: str, mode: str = 'overwrite')

Writes data to a feature table in the Databricks Feature Store.

Args

df : pyspark.sql.DataFrame
The data to be written to the feature table.
table_name : str
Name of the table to read from. Format must be "..
". Catalog must be indicated without env prefix.
mode : str
The mode to use when writing data to the feature table (default is 'overwrite', other option is 'merge').
class PanamaFeatureLookup (table_name: str, lookup_key: Union[str, List[str]], feature_names: Union[str, List[str], ForwardRef(None)] = None)

Value class that emulate FeatureLookup but is used to perfrorm feature lookup over internal tables.

Initialize object

Args

table_name : str
Table name
lookup_key : Union[str, List[str]]
Key to use when joining this table with the :class:DataFrame <pyspark.sql.DataFrame>
feature_names : Union[str, List[str], None], optional
A single feature name, a list of feature names, or None to lookup all features (excluding primary keys) in the feature table at the time that the training set is created. Defaults to None.

Returns

PanamaFeatureLookup

Expand source code
class PanamaFeatureLookup(object):
    """Value class that emulate FeatureLookup but is used to perfrorm feature lookup over internal tables."""

    def __init__(
        self,
        table_name: str,
        lookup_key: Union[str, List[str]],
        feature_names: Union[str, List[str], None] = None,
    ):
        """Initialize object

        Args:
            table_name (str): Table name
            lookup_key (Union[str, List[str]]): Key to use when joining this table with the :class:`DataFrame <pyspark.sql.DataFrame>`
            feature_names (Union[str, List[str], None], optional): A single feature name, a list of feature names, or None to lookup all features
                (excluding primary keys) in the feature table at the time that the training set is created. Defaults to None.

        Returns:
            PanamaFeatureLookup
        """

        self._table_name = table_name
        self._feature_names = fsutils.as_list(feature_names, default=[])
        self._lookup_key = lookup_key

    def __eq__(self, other):
        if not isinstance(other, self.__class__):
            return False
        return self.__dict__ == other.__dict__

    @property
    def table_name(self):
        """The table name to use in this PanamaFeatureLookup."""
        return self._table_name

    @property
    def lookup_key(self):
        """The lookup key(s) to use in this PanamaFeatureLookup."""
        return self._lookup_key

    @property
    def feature_names(self):
        """The feature name(s) to use in this PanamaFeatureLookup."""
        return self._feature_names

Instance variables

prop feature_names

The feature name(s) to use in this PanamaFeatureLookup.

Expand source code
@property
def feature_names(self):
    """The feature name(s) to use in this PanamaFeatureLookup."""
    return self._feature_names
prop lookup_key

The lookup key(s) to use in this PanamaFeatureLookup.

Expand source code
@property
def lookup_key(self):
    """The lookup key(s) to use in this PanamaFeatureLookup."""
    return self._lookup_key
prop table_name

The table name to use in this PanamaFeatureLookup.

Expand source code
@property
def table_name(self):
    """The table name to use in this PanamaFeatureLookup."""
    return self._table_name
class PanamaFeatureStore (config: Optional[Dict] = None)

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code
class PanamaFeatureStore(ABC):
    def __init__(self, config: Optional[Dict] = None):
        self.config = config

    @abstractmethod
    def connect(self):
        pass

    @abstractmethod
    def create_feature_table(
        self,
        df: DataFrame,
        table_name: str,
        primary_keys: List[str],
        partition_columns: List[str],
        description: str,
    ):
        pass

    @abstractmethod
    def write_feature_table(self, df: DataFrame, table_name: str, mode: str):
        pass

    @abstractmethod
    def read_feature_table(self, table_name: str, features: Optional[List[str]]) -> DataFrame:
        pass

Ancestors

  • abc.ABC

Subclasses

Methods

def connect(self)
def create_feature_table(self, df: pyspark.sql.dataframe.DataFrame, table_name: str, primary_keys: List[str], partition_columns: List[str], description: str)
def read_feature_table(self, table_name: str, features: Optional[List[str]]) ‑> pyspark.sql.dataframe.DataFrame
def write_feature_table(self, df: pyspark.sql.dataframe.DataFrame, table_name: str, mode: str)