Module panama.dbutils_mock
Sub-modules
panama.dbutils_mock.dbutils
panama.dbutils_mock.secrets
Classes
class DBUtils (sc: pyspark.sql.session.SparkSession)
-
Initialize DBUtils.
Args
sc
:SparkSession
- The Spark session.
Returns
None
Expand source code
class DBUtils: class Notebook: class EntryPoint: def __init__(self, sc) -> None: self.sc = sc def getCurrentBindings(self): return dict() def __init__(self, sc) -> None: self.sc = sc def __getattr__(self, item): if item == "entry_point": self.entry_point = self.EntryPoint(self.sc) return self.entry_point raise AttributeError class FSHandler: def __init__(self, sc): self.sc = sc def __call__(self): return self def print_return(self, result): print(result) return result def check_types(self, vars_and_types): for var, type_ in vars_and_types: if not isinstance(var, type_): raise TypeError("{0} has the wrong type - {1} is expected.".format(repr(var), type_)) def help(self, method_name=None): pass def create_list_from_jschema(self, jschema, create_obj_from_jschema) -> List: return [create_obj_from_jschema(jschema.apply(i)) for i in range(jschema.length())] # fsutils functions def cp(self, source: str, dest: str, recurse: bool = False) -> str: self.check_types([(source, string_types), (dest, string_types), (recurse, bool)]) if recurse: return self.print_return(shutil.copytree(source, dest)) else: return self.print_return(shutil.copyfile(source, dest)) def head(self, file: str, max_bytes: int = 65536) -> int: self.check_types([(file, string_types), (max_bytes, int)]) return self.print_return(os.system(f"head -n {max_bytes} {file}")) def ls(self, path: str) -> List[FileInfo]: self.check_types([(path, string_types)]) return self.print_return( [FileInfo(e.path, e.name, e.stat().st_size, e.stat().st_mtime) for e in os.scandir(path)] ) def mkdirs(self, dir: str) -> str: self.check_types([(dir, string_types)]) return self.print_return(os.makedirs(dir, exist_ok=True)) # type: ignore def mv(self, source: str, dest: str, recurse: bool = False) -> str: # type: ignore self.check_types([(source, string_types), (dest, string_types), (recurse, bool)]) if recurse: return self.print_return(shutil.move(source, dest)) else: if not os.path.isfile(source): if len(os.listdir(source)) > 0: raise FileExistsError("To copy files in a directory, set recurse to true") else: return self.print_return(shutil.move(source, dest)) def put(self, file: str, contents: str, overwrite: bool = False) -> None: self.check_types([(file, string_types), (contents, string_types), (overwrite, bool)]) raise NotImplementedError def rm(self, dir: str, recurse: bool = False) -> str: self.check_types([(dir, string_types), (recurse, bool)]) if recurse: return self.print_return(shutil.rmtree(dir)) # type: ignore else: return self.print_return(os.rmdir(dir)) # type: ignore # cache functions def cacheFiles(self, *files: str) -> None: self.check_types(list(zip(files, [string_types] * len(files)))) raise NotImplementedError def cacheTable(self, name: str) -> None: self.check_types([(name, string_types)]) raise NotImplementedError def uncacheFiles(self, *files: str) -> None: self.check_types(list(zip(files, [string_types] * len(files)))) raise NotImplementedError def uncacheTable(self, name: str) -> None: self.check_types([(name, string_types)]) raise NotImplementedError # mount functions def mount( self, source: str, mount_point: str, encryption_type: str = "", owner: Union[str, None] = None, extra_configs: dict = {}, ) -> None: self.check_types( [ (source, string_types), (mount_point, string_types), (encryption_type, string_types), (owner, Union[str, None]), (extra_configs, dict), ] ) raise NotImplementedError def updateMount( self, source: str, mount_point: str, encryption_type: str = "", owner: Union[str, None] = None, extra_configs: dict = {}, ) -> None: self.check_types( [ (source, string_types), (mount_point, string_types), (encryption_type, string_types), (owner, Union[str, None]), (extra_configs, dict), ] ) raise NotImplementedError def mounts(self) -> None: raise NotImplementedError def refreshMounts(self) -> None: raise NotImplementedError def unmount(self, mount_point: str) -> None: raise NotImplementedError def __repr__(self): # TODO(tjh) call the scala doc return "Package 'dbutils.fs'. For more information, type 'display(dbutils.fs)'" " in a cell." def __getstate__(self): print( """ You cannot use dbutils within a spark job or otherwise pickle it. If you need to use getArguments within a spark job, you have to get the argument before using it in the job. For example, if you have the following code: myRdd.map(lambda i: dbutils.args.getArgument("X") + str(i)) Then you should use it this way: argX = dbutils.args.getArgument("X") myRdd.map(lambda i: argX + str(i)) """ ) raise Exception("You cannot use dbutils within a spark job") def __init__(self, sc: SparkSession): """Initialize DBUtils. Args: sc (SparkSession): The Spark session. Returns: None """ self.sc = sc def __getattr__(self, item): if item == "fs": self.fs = self.FSHandler(self.sc) return self.fs if item == "notebook": self.notebook = self.Notebook(self.sc) return self.notebook if item == "secrets": self.secrets = Secrets(self.sc) return self.secrets raise AttributeError(f"no attribute named '{item}'.") def __repr__(self): return "Package 'dbutils'. For more information, type 'dbutils.help()' in a cell." def __call__(self): return self def help(self, method_name: str = "") -> None: pass
Class variables
var FSHandler
var Notebook
Methods
def help(self, method_name: str = '') ‑> None