Module panama.io
Sub-modules
panama.io.feature_store
panama.io.io
Classes
class DatabricksFeatureStore (spark: pyspark.sql.session.SparkSession, config: Optional[Dict] = None)
-
A feature store implementation that uses the Databricks Feature Store API.
Initializes a new instance of the DatabricksFeatureStore class and connects to the FeatureStoreClient.
Expand source code
class DatabricksFeatureStore(PanamaFeatureStore, FeatureStoreClient, IOInterface): """ A feature store implementation that uses the Databricks Feature Store API. """ def __init__(self, spark: SparkSession, config: Optional[Dict] = None): """ Initializes a new instance of the DatabricksFeatureStore class and connects to the FeatureStoreClient. """ self.spark = spark self.client = self.connect() PanamaFeatureStore.__init__(self, config) def connect(self) -> FeatureStoreClient: return FeatureStoreClient() def create_feature_table( self, table_name: str, primary_keys: List[str], description: str, df: Optional[DataFrame] = None, schema: Optional[StructType] = None, partition_columns: Optional[List[str]] = None, ) -> Union[str, None]: """ Create and return a feature table in feature store with the given name and primary keys. The returned feature table has the given name and primary keys. Uses the provided schema or the inferred schema of the provided df. If df is provided, this data will be saved in a Delta table. Args: table_name (str): Name of the table to read from. Format must be "<catalog>.<schema>.<table>". Catalog must be indicated without env prefix. primary_keys (List[str]): A list of primary key columns. description (str): A description of the feature table. df (Optional, pyspark.sql.DataFrame): Table used to infer schema if not given. schema (Optional, StructType): Feature table schema. partition_columns (Optional, List[str]): A list of partition columns. Default to None. Returns: Union[str, None]: The name of the created feature table. """ # First set catalog table_name = self._assign_env_to_string(s=table_name, purpose="w") schema_name_isolated = re.search(pattern=r"\.(\w*?)\.", string=table_name).group(1) # type: ignore table_name_isolated = re.search(pattern=r"\.([^\.]+)$", string=table_name).group(1) # type: ignore if ( self.spark.sql(f"SHOW TABLES IN {schema_name_isolated}") .where(F.col("tableName") == table_name_isolated) .count() > 0 ): trace = f"{table_name} already exists, function will interrupt and do nothing, use write_feature_table to add data to an existing table." print(trace) logger = panama.logging.get_logger() logger.warning(__name__, exc_info=Warning(trace)) return else: # Create feature table self.client.create_table( df=df, name=table_name, primary_keys=primary_keys, schema=schema, partition_columns=partition_columns, description=description, ) return f"Created Feature Store table at {table_name}" def write_feature_table(self, df: DataFrame, table_name: str, mode: str = "overwrite"): """ Writes data to a feature table in the Databricks Feature Store. Args: df (pyspark.sql.DataFrame): The data to be written to the feature table. table_name (str): Name of the table to read from. Format must be "<catalog>.<schema>.<table>". Catalog must be indicated without env prefix. mode (str): The mode to use when writing data to the feature table (default is 'overwrite', other option is 'merge'). """ # First set catalog table_name = self._assign_env_to_string(s=table_name, purpose="w") # Write data to feature table self.client.write_table(name=table_name, df=df, mode=mode) def read_feature_table(self, table_name: str) -> DataFrame: """ Reads data from a feature table in the Databricks Feature Store. Args: table_name (str): Name of the table to read from. Format must be "<catalog>.<schema>.<table>". Catalog must be indicated without env prefix. features (Optional[List[str]]): A list of feature names to be read (default is None, which reads all features). Returns: pyspark.sql.DataFrame: The data from the feature table. """ # First set catalog table_name = self._assign_env_to_string(s=table_name, purpose="r") return self.client.read_table(name=table_name) def _get_table(self, name: str, purpose: str) -> FeatureTable: """ Get a feature table’s metadata. Args: name (str): A feature table name of the form <catalog>.<schema>.<table_name>, for example dev_analytics.feature_store.user_features. catalog_name (Optional, str): Name of catalog to set before running the main functionality. If not specified the default object catalog will be used. purpose (str): string indicating if the string will be used toW read or to write table. Allowed values are 'r' and 'w' only. Returns: (databricks.feature_store.entities.feature_table.FeatureTable): Value class describing one feature table """ # First set catalog table_name = self._assign_env_to_string(s=name, purpose=purpose) return FeatureStoreClient().get_table(name=table_name) def _unpack_feature_lookup_dict(self, lookup_dict: Dict) -> Tuple[List[FeatureLookup], List[PanamaFeatureLookup]]: """Unpack input dict and create two separated feature list, a FeatureLookup list and a PanamaFeatureLookup list. Args: lookup_dict (Dict): input dict with FeatureTables (or standard table) catalog_name (Optional, str): IT'S ONLY MEANT FOR TEST PURPOSE, DO NOT USE!!! Returns: Tuple[List[FeatureLookup], List[PanamaFeatureLookup]]: a list of FeatureLookup objects and a list of PanamaFeatureLookup to be used as input for feature_lookup method. """ feature_lookup = [] table_lookup = [] for lookup in lookup_dict: lookup_config = lookup_dict.get(lookup) # Check if table's schema ends with `_fs`. # In that case it will be treated as a feature store feature lookup, otherwise as a panama feature lookup is_feature_table = lookup.split(".")[1].endswith("_fs") if is_feature_table == True: # FeatureLookup case # If empty go find feature table primary keys and use those as lookup keys lookup_key = lookup_config.get("lookup_key") # type: ignore if lookup_key is None: lookup_key = self._get_table(name=lookup, purpose="r").primary_keys feature_names = lookup_config.get("feature_names") # type: ignore feature_lookup.append( FeatureLookup( table_name=self._assign_env_to_string(lookup, "r"), lookup_key=lookup_key, feature_names=feature_names, ) ) else: # panama lookup case lookup_key = lookup_config.get("lookup_key") # type: ignore feature_names = lookup_config.get("feature_names") # type: ignore table_lookup.append( PanamaFeatureLookup( table_name=lookup, # i don't need to add prefix because the io will do it. lookup_key=lookup_key, feature_names=feature_names, ) ) return feature_lookup, table_lookup def _add_feature_to_sdf( self, df: DataFrame, feature_lookups: List[PanamaFeatureLookup], exclude_columns: Optional[List[str]] = None, ) -> DataFrame: """Enrich given pyspark.DataFrame with feature located in tables. The feature lookup are defined inside the PanamaFeatureLookup objects. Args: df (pyspark.DataFrame): input dataframe. feature_lookups (List(PanamaFeatureLookup)): List of features to join into the DataFrame. exclude_columns (List(str), optional): Names of the columns to drop from the enriched dataframe. Default to None. Returns: DataFrame: input dataframe plus desired features. """ io = IOAdls(spark=self.spark) # Iterate over feature_lookups for feature in feature_lookups: # Read table and select desired features (plus keys by default) feature_table = io.read_table( table_name=feature.table_name, ).select( feature.lookup_key + feature.feature_names # type: ignore ) # Perform join over lookup key df = df.join(feature_table, how="left", on=feature.lookup_key) # If required drop excluded columns if exclude_columns is not None: df = df.drop(*exclude_columns) return df def feature_lookup( self, df: DataFrame, lookup_dict: Dict, label: str, exclude_columns: List[str] = [], ) -> TrainingSet: """Add features from feauture store to given DataFrame. Structure of lookup_dict must be: { "catalog.schema.table_name" (str - feature table): { "lookup_key": (list - ordered list of columns in df that match feature table primary keys), "feature_names": (list - features in feature table that we want to add to df) } } Args: df (DataFrame): input dataframe to enrich. lookup_dict (Dict): dictonary describing every feature lookup (structure described above). label (str): Names of column(s) in DataFrame that contain training set labels. To create a training set without a label field, i.e. for unsupervised training set, specify label = None exclude_columns (List(str)): Names of the columns to drop from the TrainingSet DataFrame Returns: DataFrame: Object of type TrainingSet (databricks.feature_store.training_set.TrainingSet) """ # Separate feature store lookup from standard table lookup feature_lookup, table_lookup = self._unpack_feature_lookup_dict(lookup_dict) # First perform general table's feature lookup that enriches df if len(table_lookup) > 0: df = self._add_feature_to_sdf(df=df, feature_lookups=table_lookup, exclude_columns=exclude_columns) # Then perform feature table's FeatureLookup creating final training set training_set = self.client.create_training_set( df=df, feature_lookups=feature_lookup, label=label, exclude_columns=exclude_columns ) return training_set
Ancestors
- PanamaFeatureStore
- abc.ABC
- databricks.feature_store.client.FeatureStoreClient
- IOInterface
Methods
def connect(self) ‑> databricks.feature_store.client.FeatureStoreClient
def create_feature_table(self, table_name: str, primary_keys: List[str], description: str, df: Optional[pyspark.sql.dataframe.DataFrame] = None, schema: Optional[pyspark.sql.types.StructType] = None, partition_columns: Optional[List[str]] = None) ‑> Optional[str]
-
Create and return a feature table in feature store with the given name and primary keys.
The returned feature table has the given name and primary keys. Uses the provided schema or the inferred schema of the provided df. If df is provided, this data will be saved in a Delta table.
Args
table_name
:str
- Name of the table to read from. Format must be "
. . ". Catalog must be indicated without env prefix.
primary_keys
:List[str]
- A list of primary key columns.
description
:str
- A description of the feature table.
df
:Optional, pyspark.sql.DataFrame
- Table used to infer schema if not given.
schema
:Optional, StructType
- Feature table schema.
partition_columns
:Optional, List[str]
- A list of partition columns. Default to None.
Returns
Union[str, None]
- The name of the created feature table.
def feature_lookup(self, df: pyspark.sql.dataframe.DataFrame, lookup_dict: Dict, label: str, exclude_columns: List[str] = []) ‑> databricks.feature_store.training_set.TrainingSet
Add features from feauture store to given DataFrame.
Structure of lookup_dict must be: { "catalog.schema.table_name" (str - feature table): { "lookup_key": (list - ordered list of columns in df that match feature table primary keys), "feature_names": (list - features in feature table that we want to add to df) } }
Args
df
:DataFrame
- input dataframe to enrich.
lookup_dict
:Dict
- dictonary describing every feature lookup (structure described above).
label
:str
- Names of column(s) in DataFrame that contain training set labels. To create a training set without a label field, i.e. for unsupervised training set, specify label = None
exclude_columns (List(str)): Names of the columns to drop from the TrainingSet DataFrame
Returns
DataFrame
- Object of type TrainingSet (databricks.feature_store.training_set.TrainingSet)
def read_feature_table(self, table_name: str) ‑> pyspark.sql.dataframe.DataFrame
Reads data from a feature table in the Databricks Feature Store.
Args
table_name
:str
- Name of the table to read from. Format must be "
. . ". Catalog must be indicated without env prefix.
features
:Optional[List[str]]
- A list of feature names to be read (default is None, which reads all features).
Returns
pyspark.sql.DataFrame
- The data from the feature table.
def write_feature_table(self, df: pyspark.sql.dataframe.DataFrame, table_name: str, mode: str = 'overwrite')
Writes data to a feature table in the Databricks Feature Store.
Args
df
:pyspark.sql.DataFrame
- The data to be written to the feature table.
table_name
:str
- Name of the table to read from. Format must be "
. . ". Catalog must be indicated without env prefix.
mode
:str
- The mode to use when writing data to the feature table (default is 'overwrite', other option is 'merge').
class IOAdls (spark)
Extends the IO interface to SQL data structure.
Expand source code
class IOAdls(IOInterface): """ Extends the IO interface to SQL data structure. """ def __init__(self, spark): super().__init__(spark) @staticmethod def _generate_absolute_path(path: str, storage_account: str, container: str, source_type: str) -> str: """Generate the url that points to the required path inside the data lake. Args: path (str): File or table location storage_account (str): Azure storage account. container (str): Name of the container where to find files or tables. source_type (str): value extracted from connections.json indicating type of source (adls, blob, etc..) Returns: str: the final url to read data from. """ if source_type == "adls": return f"abfss://{container}@{storage_account}.dfs.core.windows.net/{path}" elif source_type == "blob": return f"wasb://{container}@{storage_account}.blob.core.windows.net/{path}" else: raise ValueError("Accepted source_type in connections.json are 'adls', 'blob' in IOAdls") @staticmethod def _get_storage_account_from_name(adls_name: str): """Get storage connection config from config json Args: adls_name (str): Name of datalake used to get connection config. Returns: (str): Azure storage account. """ adls_config = IOInterface._get_connection_config_json().get(adls_name) adls_storage_account = adls_config.get("credentials").get("storage_account") # type: ignore return adls_storage_account @staticmethod def _get_source_type_from_name(adls_name: str): """Get storage connection config from config json Args: adls_name (str): Name of datalake used to get connection config. Returns: (str): source_type value. """ adls_config = IOInterface._get_connection_config_json().get(adls_name) source_type = adls_config.get("type") # type: ignore return source_type @staticmethod def generate_connection_string(adls_name: str, container: str, path: str = "") -> str: """Generate the url that points to the required path inside the data lake. Args: container (str): Name of the container where to find files or tables. path (str): File or table location Returns: str: the final url to read data from.""" # Build absolute adsl path from adls name storage_account = IOAdls._get_storage_account_from_name(adls_name) source_type = IOAdls._get_source_type_from_name(adls_name) absolute_path = IOAdls._generate_absolute_path( path=path, storage_account=storage_account, container=container, source_type=source_type ) return absolute_path ############################### READING @panama.logging.log_execution(blocking=True) def read_file( self, adls_name: str, container: str, file_path: str, file_format: str = "delta", extra_options: Optional[dict] = None, create_view: Optional[bool] = False, ) -> DataFrame: """Reads a table directly from an ADLS instance's path location. Args: adls_name (str): Name of datalake (or blob) used to get connection config. container (str): Name of the container where to find files. file_path (str): Path where to find file (or files). Use folder to read multiple files, or specific file path with extension. file_format (str, optional): Format of the file to read. Defaults to "delta". extra_options (Optional[dict], optional): Additional options to pass to the spark read command as a dictionary. Defaults to None. create_view (bool, optional): return temp view containing entire table content. Defaults to False. Returns: DataFrame: containing the data from the specified table. """ storage_account = self._get_storage_account_from_name(adls_name) source_type = self._get_source_type_from_name(adls_name) # Compose datalake file path from given table info abs_file_path = self._generate_absolute_path( path=file_path, storage_account=storage_account, container=container, source_type=source_type ) # Init reader sdf_reader = self.spark.read.format(file_format) if extra_options: sdf_reader = sdf_reader.options(**extra_options) sdf = sdf_reader.load(abs_file_path) if create_view: self._create_sdf_temp_view(sdf=sdf, table_name_path=file_path) # Read table return sdf @panama.logging.log_execution(blocking=True) def read_table( self, table_name: str, max_datetime: Union[str, None] = None, create_view: Optional[bool] = False, force_read_to_env: Optional[bool] = False, ) -> DataFrame: """Reads a table from unity catalog. ATTENTION: momentary fix will let the user read dev catalog from dev environment (instead of test catalog from dev environment). Args: table_name (str): Name of the table to read from. Format must be "<catalog>.<schema>.<table>". Catalog must be indicated without env prefix. max_datetime (str, optional): filter out datetime newer (inclusive) to datetime input. Format must be '%Y-%m-%d %H:%M:%S' or '%Y-%m-%d'. Defaults to None. table_format (str, optional): Format of the table to read (e.g., 'delta').. Defaults to "delta". create_view (bool, optional): return temp view containing entire table content. Defaults to False. force_read_to_env (Optional, bool): if True, force read on same env of execution. This will only works on '_analytics' catalogs. Default to False. Returns: DataFrame: containing the data from the specified table. """ # If required, lookup for latest version of delta table wrt the max_datetime input if max_datetime: time_travel_datetime = self.get_latest_delta_datetime( table_name=table_name, max_datetime=max_datetime, force_read_to_env=force_read_to_env, ) else: time_travel_datetime = None # Set env in catalog name table_name = self._assign_env_to_string(table_name, purpose="r", force_read_to_env=force_read_to_env) query = f"SELECT * FROM {table_name}" if time_travel_datetime: # Add time travel sql condition. query += f' TIMESTAMP AS OF "{time_travel_datetime}"' else: pass sdf = self.spark.sql(query) # Create view if requested if create_view: self._create_sdf_temp_view(sdf=sdf, table_name_path=table_name) # type: ignore return sdf def get_latest_delta_datetime( self, table_name: str, max_datetime: Union[str, None] = None, force_read_to_env: Optional[bool] = False, ) -> datetime.datetime: """ Get latest timestamp when the delta table was written. If max_date or max_datetime input is prior to oldest table timestamp return the latter. Args: table_name (str): Name of the table to read from. Format must be "<catalog>.<schema>.<table>". Catalog must be indicated without env prefix. max_datetime (str, optional): filter out datetime newer (inclusive) to datetime input. Format must be '%Y-%m-%d %H:%M:%S' or '%Y-%m-%d'. Defaults to None. force_read_to_env (Optional, bool): if True, force read on same env of execution. This will only works on '_analytics' catalogs. Default to False. Returns: datetime.datetime: latest time when the delta table was written. """ # Validate max_datetime input format and define filter condition. if max_datetime: date_filter = timestamp_str_to_datetime(max_datetime) # Set env in catalog name table_name = self._assign_env_to_string(table_name, "r", force_read_to_env=force_read_to_env) # Query delta table's history looking for timestamps query = ( f"select timestamp as last_ts from (select * from (describe history {table_name})) order by timestamp desc" ) timestamp_list = self.spark.sql(query).collect() # Extract last_ts from every row timestamp_list = [row["last_ts"] for row in timestamp_list] # Return latest if max_datetime was not provided. if max_datetime is None: print(f"No max_date passed for {table_name} extraction, returning last available timestamp.") return timestamp_list[0] # Filter list based on defined filter filtered_timestamp_list = [t for t in timestamp_list if t <= date_filter] # type: ignore # Return last available timestamp or absolute older if filter date is prior to it. if len(filtered_timestamp_list) > 0: output_timestamp = filtered_timestamp_list[0] else: warnings.warn("No timestamp prior to max_date/max_datetime, returning the oldest timestamp") output_timestamp = timestamp_list[-1] return output_timestamp def read_most_recent_file( self, container: str, adls_name: str, file_name: str, file_format: str, look_folder: str = "", max_date: Union[str, None] = None, max_iterations: int = 100, extra_options: Optional[dict] = None, create_view: Optional[bool] = False, ) -> DataFrame: """Get the most recent file path from a directory with the structure: look_folder: - "2023-01-01" - filename.csv - "2023-02-01" - filename.csv - "2023-03-01" - filename.csv Then read the file and return the spark dataframe containing the data. Args: container (str): Name of the container where to find and read file. adls_name (str): Name of datalake used to get connection config. file_name (str): Name of the file that we want to get. file_format (str): Extension of the file we are looking for. look_folder (str, Optional): Main directory containing the data that needs to be selected. Default is root. date (str, optional): launch date, that works as an upper limit (inclusive) for the latest copies of the file. Format must be %Y-%m-%d. Defaults to None. max_iterations (int, optional): limit the number of searching tentatives. Defaults to 100. extra_options (Optional[dict], optional): Additional options to pass to the spark read command as a dictionary. Defaults to None. create_view (bool, optional): return temp view containing entire table content. Defaults to False. Returns: DataFrame: spark dataframe containing data inside most recent version of file required. """ # Search for latest version of file and get it's path file_path = self.get_path_most_recent_file( container=container, adls_name=adls_name, look_folder=look_folder, file_name=file_name, file_format=file_format, max_date=max_date, max_iterations=max_iterations, ) # Read file using path return self.read_file( adls_name=adls_name, container=container, file_path=file_path, file_format=file_format, extra_options=extra_options, create_view=create_view, ) @panama.logging.log_execution(blocking=True) def get_path_most_recent_file( self, container: str, adls_name: str, file_name: str, file_format: str, look_folder: str = "", max_date: Union[str, None] = None, max_iterations: int = 10, ): """Get the most recent file path from a directory with the structure: look_folder: - "2023-01-01" - filename.csv - "2023-02-01" - filename.csv - "2023-03-01" - filename.csv Args: container (str): Name of the container where to find files. adls_name (str): Name of datalake used to get connection config. file_name (str): Name of the file that we want to get (no extension). file_format (str): Extension of the file we are looking for. look_folder (str, Optional): Main directory containing the data that needs to be selected. Default is root. max_date (str, optional): launch date, that works as an upper limit (inclusive) for the latest copies of the file. Format must be %Y-%m-%d. Defaults to None. max_iterations (int, optional): limit the number of searching tentatives. Defaults to 10. """ # Concatenate file_name and file_format if re.search(string=file_name, pattern=".*\..*"): # type: ignore raise ValueError("Please provide file_name input without the extension. Use the file_format parameter.") else: file_name_complete = ".".join([file_name, file_format]) # Build absolute adsl path of look_folder storage_account = self._get_storage_account_from_name(adls_name) source_type = self._get_source_type_from_name(adls_name) look_folder_path = self._generate_absolute_path( path=look_folder, storage_account=storage_account, container=container, source_type=source_type ) # Get list of paths in look_folder dir_list = dbutils_fs_ls_names(look_folder_path, self.spark) # Select only folder with %Y-%m-%d format... date_format = re.compile("\d{4}-\d{2}-\d{2}") # type: ignore date_dir_list = list(filter(date_format.match, dir_list)) # ... filter by date if requested if max_date: try: re.search(string=max_date, pattern=date_format).group(0) # type: ignore except: raise ValueError("max_date must be of format %Y-%m-%d") date_dir_list = [i for i in date_dir_list if i <= f"{max_date}/"] # ... and sort date_dir_list = sorted(date_dir_list, reverse=True) try: print(f"{len(date_dir_list)} subfolder found, searching for file starting from more recent") except: raise FileNotFoundError(f"{look_folder} does not contain any folder with valid date format (%Y-%m-%d)") # Start searching for desired file file_found = None n = 0 while file_found == None and n <= min(len(date_dir_list), max_iterations): # Search in current subfolder iteration a path that ends with file name try: p = look_folder_path + "/" + date_dir_list[n] except: raise FileNotFoundError( "File not found! Check the file name (remember to use the extension) or the folder path" ) try: # If successfull will interrupt the loop file_found = [i for i in dbutils_fs_ls_names(path=p, spark=self.spark) if i == file_name_complete][0] print("File found in folder:", str(date_dir_list[n])[0:-1]) return "/".join([look_folder, date_dir_list[n], file_found]) except: # Next iteration n += 1 # If loop ended means that file was not found, throw error raise IndexError( "File not found! Check the file name (remember to use the extension) or the folder path or use a higher max_iterations" ) ############################## WRITING @staticmethod def _df_repartitioning(sdf, mode: str, partitions: int) -> DataFrame: """Perform either repartioning or coalesce over input spark dataframe. Args: sdf (DataFrame): spark dataframe, main subject of the function. mode (str): string to control which operation to perform. Accepted values are 'repartition' and 'coalesce'. partitions (int): number of partition to obtain. Object of the selected method. Raises: KeyError: when mode is not 'repartition' or 'coalesce'. Returns: DataFrame: repartitioned spark dataframe. """ if mode == "repartition": return sdf.repartition(partitions) elif mode == "coalesce": return sdf.coalesce(partitions) else: raise KeyError("Repartitioning modality unknown. Please use 'repartition' or 'coalesce'") @panama.logging.log_execution(blocking=True) # type: ignore def write_file( self, data: DataFrame, adls_name: str, container: str, file_path: str, mode: str = "append", file_format: str = "delta", repartitioning: Optional[list] = None, partition_by: Optional[list] = None, extra_options: Optional[dict] = None, save_as_table: bool = False, ) -> None: """Write data into datalake's table. Optionally provide "repartitiong" a list like [str, int] to perform repartitiong of dataframe before writing. Accepted repartitiong modality are "repartition" and "coalesce". Args: data (DataFrame): spark dataframe containing data. adls_name (str): Name of datalake used to get connection config. container (str): Name of the container where to write files. file_path (str): Path where to write file. Provide path containing a table. If empty, last part of path will be the name of the new table. mode (str, optional): spark write mode. Defaults to "append". file_format (str, optional): Format of the file to write. Defaults to "delta". repartitioning (Optional[list], optional): List indicating how to perform pre-writing repartitioning. Defaults to None. partition_by (Optional[list], optional): column(s) to use as writing partitions. Defaults to None. extra_options (Optional[dict], optional): Additional options to pass to the spark write command as a dictionary. Defaults to None. save_as_table (bool, optional): if True, the write command will be SaveAsTable, else it will use save. Defaults to False. """ # Compose datalake file path from given table info storage_account = self._get_storage_account_from_name(adls_name) source_type = self._get_source_type_from_name(adls_name) file_path = self._generate_absolute_path( path=file_path, storage_account=storage_account, container=container, source_type=source_type ) # If dataframe partitioning instrunction were given, execute it accordingly if repartitioning is not None and len(repartitioning) == 2: data = self._df_repartitioning(sdf=data, mode=repartitioning[0], partitions=repartitioning[1]) # Init data wirter data_writer = data.write # If partition_columns is provided then add partitionBy step to write pipeline if partition_by is not None: data_writer = data_writer.partitionBy(partition_by) # Add extra options if any was provided if extra_options: data_writer = data_writer.options(**extra_options) # Define write format and modality data_writer = data_writer.format(file_format).mode(mode) # Final save statement if save_as_table == True: data_writer.saveAsTable(file_path) else: data_writer.save(file_path) print(f"Table written in {file_path}")
Ancestors
Static methods
def generate_connection_string(adls_name: str, container: str, path: str = '') ‑> str
-
Generate the url that points to the required path inside the data lake.
Args
container
:str
- Name of the container where to find files or tables.
path
:str
- File or table location
Returns
str
- the final url to read data from.
Methods
def get_latest_delta_datetime(self, table_name: str, max_datetime: Optional[str] = None, force_read_to_env: Optional[bool] = False) ‑> datetime.datetime
-
Get latest timestamp when the delta table was written. If max_date or max_datetime input is prior to oldest table timestamp return the latter.
Args
table_name
:str
- Name of the table to read from. Format must be "
. . ". Catalog must be indicated without env prefix.
max_datetime
:str
, optional- filter out datetime newer (inclusive) to datetime input. Format must be '%Y-%m-%d %H:%M:%S' or '%Y-%m-%d'. Defaults to None.
force_read_to_env
:Optional, bool
- if True, force read on same env of execution. This will only works on '_analytics' catalogs. Default to False.
Returns
datetime.datetime
- latest time when the delta table was written.
def get_path_most_recent_file(self, container: str, adls_name: str, file_name: str, file_format: str, look_folder: str = '', max_date: Optional[str] = None, max_iterations: int = 10)
Get the most recent file path from a directory with the structure: look_folder: - "2023-01-01" - filename.csv - "2023-02-01" - filename.csv - "2023-03-01" - filename.csv
Args
container
:str
- Name of the container where to find files.
adls_name
:str
- Name of datalake used to get connection config.
file_name
:str
- Name of the file that we want to get (no extension).
file_format
:str
- Extension of the file we are looking for.
look_folder
:str, Optional
- Main directory containing the data that needs to be selected. Default is root.
max_date
:str
, optional- launch date, that works as an upper limit (inclusive) for the latest copies of the file. Format must be %Y-%m-%d. Defaults to None.
max_iterations
:int
, optional- limit the number of searching tentatives. Defaults to 10.
def read_file(self, adls_name: str, container: str, file_path: str, file_format: str = 'delta', extra_options: Optional[dict] = None, create_view: Optional[bool] = False) ‑> pyspark.sql.dataframe.DataFrame
Reads a table directly from an ADLS instance's path location.
Args
adls_name
:str
- Name of datalake (or blob) used to get connection config.
container
:str
- Name of the container where to find files.
file_path
:str
- Path where to find file (or files). Use folder to read multiple files, or specific file path with extension.
file_format
:str
, optional- Format of the file to read. Defaults to "delta".
extra_options
:Optional[dict]
, optional- Additional options to pass to the spark read command as a dictionary. Defaults to None.
create_view
:bool
, optional- return temp view containing entire table content. Defaults to False.
Returns
DataFrame
- containing the data from the specified table.
def read_most_recent_file(self, container: str, adls_name: str, file_name: str, file_format: str, look_folder: str = '', max_date: Optional[str] = None, max_iterations: int = 100, extra_options: Optional[dict] = None, create_view: Optional[bool] = False) ‑> pyspark.sql.dataframe.DataFrame
Get the most recent file path from a directory with the structure: look_folder: - "2023-01-01" - filename.csv - "2023-02-01" - filename.csv - "2023-03-01" - filename.csv Then read the file and return the spark dataframe containing the data.
Args
container
:str
- Name of the container where to find and read file.
adls_name
:str
- Name of datalake used to get connection config.
file_name
:str
- Name of the file that we want to get.
file_format
:str
- Extension of the file we are looking for.
look_folder
:str, Optional
- Main directory containing the data that needs to be selected. Default is root.
date
:str
, optional- launch date, that works as an upper limit (inclusive) for the latest copies of the file. Format must be %Y-%m-%d. Defaults to None.
max_iterations
:int
, optional- limit the number of searching tentatives. Defaults to 100.
extra_options
:Optional[dict]
, optional- Additional options to pass to the spark read command as a dictionary. Defaults to None.
create_view
:bool
, optional- return temp view containing entire table content. Defaults to False.
Returns
DataFrame
- spark dataframe containing data inside most recent version of file required.
def read_table(self, table_name: str, max_datetime: Optional[str] = None, create_view: Optional[bool] = False, force_read_to_env: Optional[bool] = False) ‑> pyspark.sql.dataframe.DataFrame
Reads a table from unity catalog.
ATTENTION: momentary fix will let the user read dev catalog from dev environment (instead of test catalog from dev environment).
Args
table_name
:str
- Name of the table to read from. Format must be "
. . ". Catalog must be indicated without env prefix.
max_datetime
:str
, optional- filter out datetime newer (inclusive) to datetime input. Format must be '%Y-%m-%d %H:%M:%S' or '%Y-%m-%d'. Defaults to None.
table_format
:str
, optional- Format of the table to read (e.g., 'delta').. Defaults to "delta".
create_view
:bool
, optional- return temp view containing entire table content. Defaults to False.
force_read_to_env
:Optional, bool
- if True, force read on same env of execution. This will only works on '_analytics' catalogs. Default to False.
Returns
DataFrame
- containing the data from the specified table.
def write_file(self, data: pyspark.sql.dataframe.DataFrame, adls_name: str, container: str, file_path: str, mode: str = 'append', file_format: str = 'delta', repartitioning: Optional[list] = None, partition_by: Optional[list] = None, extra_options: Optional[dict] = None, save_as_table: bool = False) ‑> None
Write data into datalake's table. Optionally provide "repartitiong" a list like [str, int] to perform repartitiong of dataframe before writing. Accepted repartitiong modality are "repartition" and "coalesce".
Args
data
:DataFrame
- spark dataframe containing data.
adls_name
:str
- Name of datalake used to get connection config.
container
:str
- Name of the container where to write files.
file_path
:str
- Path where to write file. Provide path containing a table. If empty, last part of path will be the name of the new table.
mode
:str
, optional- spark write mode. Defaults to "append".
file_format
:str
, optional- Format of the file to write. Defaults to "delta".
repartitioning
:Optional[list]
, optional- List indicating how to perform pre-writing repartitioning. Defaults to None.
partition_by
:Optional[list]
, optional- column(s) to use as writing partitions. Defaults to None.
extra_options
:Optional[dict]
, optional- Additional options to pass to the spark write command as a dictionary. Defaults to None.
save_as_table
:bool
, optional- if True, the write command will be SaveAsTable, else it will use save. Defaults to False.
class IOSql (spark)
Extends the IO interface to SQL data structure.
Expand source code
class IOSql(IOInterface): """ Extends the IO interface to SQL data structure. """ def __init__(self, spark): super().__init__(spark) def _get_connection_properties(self, database_name: str) -> dict: """Returns the connection properties for the specified database. Args: database_name (str): Name of the database. Raises: ValueError: Raise error if the database is not sqlserver or oracle. Returns: dict: A dict containing connection configuration. """ # Get database connection config from config json database_config = super()._get_connection_config_json().get(database_name) database_credentials = database_config.get("credentials") # type: ignore # Get password from KeVault using relative secret pswd = get_secret_from_keyvault( key=database_credentials.get("password"), spark=self.spark, scope=database_credentials.get("scope") ) # Generate url depending on database type and add driver if necessary if database_config.get("type") == "sqlserver": # type: ignore url = f"jdbc:sqlserver://{database_credentials.get('host')};databaseName={database_credentials.get('database')}" driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver" connection_config = { "url": url, "user": database_credentials.get("username"), "password": pswd, "driver": driver, "fetchSize": 1000, } elif database_config.get("type") == "oracle": # type: ignore url = f"jdbc:oracle:thin:@{database_credentials.get('host')}:{database_credentials.get('port')}/{database_credentials.get('service_name')}" driver = "oracle.jdbc.driver.OracleDriver" connection_config = { "url": url, "user": database_credentials.get("username"), "password": pswd, "driver": driver, "oracle.jdbc.timezoneAsRegion": False, "fetchSize": 1000, } else: raise ValueError(f"Unsupported database type: {database_config.get('type')}") # type: ignore return connection_config @staticmethod def _convert_all_decimal_columns_to_double(sdf): decimal_cols = [col[0] for col in sdf.dtypes if col[1].startswith("decimal")] for col_name in decimal_cols: sdf = sdf.withColumn(col_name, F.col(col_name).cast("double")) return sdf @panama.logging.log_execution(blocking=True) def read_table( self, database_name: str, table_name: Optional[str] = None, query: Optional[str] = None, extra_options: Optional[dict] = None, create_view: Optional[bool] = False, ) -> DataFrame: """Reads data from a table in an SQLServer or Oracle database. Args: database_name (str): Name of the database to read from. table_name (Optional[str], optional): optional when input 'query' parameter, name of the table to read from. Format of table_name is "schema.table". Defaults to None. query (Optional[str], optional): optional when input 'table_name' parameter, query to execute instead of reading the whole table. Defaults to None. extra_options (Optional[dict], optional): Additional options to pass to the JDBC connector as a dictionary. Defaults to None. create_view (bool, optional): return temp view containing entire table content. Defaults to False. Raises: ValueError: Raise error if user try to input both query and table_name at the same time. Returns: DataFrame: A Spark DataFrame containing the data from the specified table or query. """ # Check if user provided only one argument between table_name and query if (table_name is not None) and (query is not None): raise ValueError("Cannot specify both table_name and query.") connection_config = self._get_connection_properties(database_name) if table_name: connection_config.update({"dbtable": table_name}) elif query: connection_config.update({"query": query}) if extra_options: connection_config.update(extra_options) sdf = self.spark.read.format("jdbc").options(**connection_config).load() # Convert all decimal columns to doubleType standard sdf = self._convert_all_decimal_columns_to_double(sdf) if create_view: if table_name is None: table_name = re.search(string=query, pattern="((FROM)|(from))\s([\w\.]+)\s") # type: ignore self._create_sdf_temp_view(sdf=sdf, table_name_path=table_name) # type: ignore return sdf
Ancestors
Methods
def read_table(self, database_name: str, table_name: Optional[str] = None, query: Optional[str] = None, extra_options: Optional[dict] = None, create_view: Optional[bool] = False) ‑> pyspark.sql.dataframe.DataFrame
-
Reads data from a table in an SQLServer or Oracle database.
Args
database_name
:str
- Name of the database to read from.
table_name
:Optional[str]
, optional- optional when input 'query' parameter, name of the table to read from. Format of table_name is "schema.table". Defaults to None.
query
:Optional[str]
, optional- optional when input 'table_name' parameter, query to execute instead of reading the whole table. Defaults to None.
extra_options
:Optional[dict]
, optional- Additional options to pass to the JDBC connector as a dictionary. Defaults to None.
create_view
:bool
, optional- return temp view containing entire table content. Defaults to False.
Raises
ValueError
- Raise error if user try to input both query and table_name at the same time.
Returns
DataFrame
- A Spark DataFrame containing the data from the specified table or query.