Module panama.dbutils_mock.dbutils
Expand source code
from __future__ import absolute_import
from __future__ import print_function
from collections import namedtuple
from genericpath import isfile
import shutil
import os
from six import string_types
from typing import List, Union
from pyspark.sql import SparkSession
class FileInfo(namedtuple("FileInfo", ["path", "name", "size", "modificationTime"])):
def isDir(self) -> bool:
"""
Check if the FileInfo represents a directory.
Returns:
bool: True if the FileInfo represents a directory, False otherwise.
"""
return self.name.endswith("/")
def isFile(self) -> bool:
"""
Check if the FileInfo represents a file.
Returns:
bool: True if the FileInfo represents a file, False otherwise.
"""
return not self.isDir()
@staticmethod
def create_from_jschema(j_file_info) -> "FileInfo":
"""
Create a FileInfo object from a JavaScript schema object.
Args:
j_file_info: The JavaScript schema object.
Returns:
FileInfo: The created FileInfo object.
"""
return FileInfo(
path=j_file_info.path(),
name=j_file_info.name(),
size=j_file_info.size(),
modificationTime=j_file_info.modificationTime(),
)
class DBUtils:
class Notebook:
class EntryPoint:
def __init__(self, sc) -> None:
self.sc = sc
def getCurrentBindings(self):
return dict()
def __init__(self, sc) -> None:
self.sc = sc
def __getattr__(self, item):
if item == "entry_point":
self.entry_point = self.EntryPoint(self.sc)
return self.entry_point
raise AttributeError
class FSHandler:
def __init__(self, sc):
self.sc = sc
def __call__(self):
return self
def print_return(self, result):
print(result)
return result
def check_types(self, vars_and_types):
for var, type_ in vars_and_types:
if not isinstance(var, type_):
raise TypeError("{0} has the wrong type - {1} is expected.".format(repr(var), type_))
def help(self, method_name=None):
pass
def create_list_from_jschema(self, jschema, create_obj_from_jschema) -> List:
return [create_obj_from_jschema(jschema.apply(i)) for i in range(jschema.length())]
# fsutils functions
def cp(self, source: str, dest: str, recurse: bool = False) -> str:
self.check_types([(source, string_types), (dest, string_types), (recurse, bool)])
if recurse:
return self.print_return(shutil.copytree(source, dest))
else:
return self.print_return(shutil.copyfile(source, dest))
def head(self, file: str, max_bytes: int = 65536) -> int:
self.check_types([(file, string_types), (max_bytes, int)])
return self.print_return(os.system(f"head -n {max_bytes} {file}"))
def ls(self, path: str) -> List[FileInfo]:
self.check_types([(path, string_types)])
return self.print_return(
[FileInfo(e.path, e.name, e.stat().st_size, e.stat().st_mtime) for e in os.scandir(path)]
)
def mkdirs(self, dir: str) -> str:
self.check_types([(dir, string_types)])
return self.print_return(os.makedirs(dir, exist_ok=True)) # type: ignore
def mv(self, source: str, dest: str, recurse: bool = False) -> str: # type: ignore
self.check_types([(source, string_types), (dest, string_types), (recurse, bool)])
if recurse:
return self.print_return(shutil.move(source, dest))
else:
if not os.path.isfile(source):
if len(os.listdir(source)) > 0:
raise FileExistsError("To copy files in a directory, set recurse to true")
else:
return self.print_return(shutil.move(source, dest))
def put(self, file: str, contents: str, overwrite: bool = False) -> None:
self.check_types([(file, string_types), (contents, string_types), (overwrite, bool)])
raise NotImplementedError
def rm(self, dir: str, recurse: bool = False) -> str:
self.check_types([(dir, string_types), (recurse, bool)])
if recurse:
return self.print_return(shutil.rmtree(dir)) # type: ignore
else:
return self.print_return(os.rmdir(dir)) # type: ignore
# cache functions
def cacheFiles(self, *files: str) -> None:
self.check_types(list(zip(files, [string_types] * len(files))))
raise NotImplementedError
def cacheTable(self, name: str) -> None:
self.check_types([(name, string_types)])
raise NotImplementedError
def uncacheFiles(self, *files: str) -> None:
self.check_types(list(zip(files, [string_types] * len(files))))
raise NotImplementedError
def uncacheTable(self, name: str) -> None:
self.check_types([(name, string_types)])
raise NotImplementedError
# mount functions
def mount(
self,
source: str,
mount_point: str,
encryption_type: str = "",
owner: Union[str, None] = None,
extra_configs: dict = {},
) -> None:
self.check_types(
[
(source, string_types),
(mount_point, string_types),
(encryption_type, string_types),
(owner, Union[str, None]),
(extra_configs, dict),
]
)
raise NotImplementedError
def updateMount(
self,
source: str,
mount_point: str,
encryption_type: str = "",
owner: Union[str, None] = None,
extra_configs: dict = {},
) -> None:
self.check_types(
[
(source, string_types),
(mount_point, string_types),
(encryption_type, string_types),
(owner, Union[str, None]),
(extra_configs, dict),
]
)
raise NotImplementedError
def mounts(self) -> None:
raise NotImplementedError
def refreshMounts(self) -> None:
raise NotImplementedError
def unmount(self, mount_point: str) -> None:
raise NotImplementedError
def __repr__(self):
# TODO(tjh) call the scala doc
return "Package 'dbutils.fs'. For more information, type 'display(dbutils.fs)'" " in a cell."
def __getstate__(self):
print(
""" You cannot use dbutils within a spark job or otherwise pickle it.
If you need to use getArguments within a spark job, you have to get the argument before
using it in the job. For example, if you have the following code:
myRdd.map(lambda i: dbutils.args.getArgument("X") + str(i))
Then you should use it this way:
argX = dbutils.args.getArgument("X")
myRdd.map(lambda i: argX + str(i))
"""
)
raise Exception("You cannot use dbutils within a spark job")
def __init__(self, sc: SparkSession):
"""Initialize DBUtils.
Args:
sc (SparkSession): The Spark session.
Returns:
None
"""
self.sc = sc
def __getattr__(self, item):
if item == "fs":
self.fs = self.FSHandler(self.sc)
return self.fs
if item == "notebook":
self.notebook = self.Notebook(self.sc)
return self.notebook
raise AttributeError(f"no attribute named '{item}'.")
def __repr__(self):
return "Package 'dbutils'. For more information, type 'dbutils.help()' in a cell."
def __call__(self):
return self
def help(self, method_name: str = "") -> None:
pass
Classes
class DBUtils (sc: pyspark.sql.session.SparkSession)
-
Initialize DBUtils.
Args
sc
:SparkSession
- The Spark session.
Returns
None
Expand source code
class DBUtils: class Notebook: class EntryPoint: def __init__(self, sc) -> None: self.sc = sc def getCurrentBindings(self): return dict() def __init__(self, sc) -> None: self.sc = sc def __getattr__(self, item): if item == "entry_point": self.entry_point = self.EntryPoint(self.sc) return self.entry_point raise AttributeError class FSHandler: def __init__(self, sc): self.sc = sc def __call__(self): return self def print_return(self, result): print(result) return result def check_types(self, vars_and_types): for var, type_ in vars_and_types: if not isinstance(var, type_): raise TypeError("{0} has the wrong type - {1} is expected.".format(repr(var), type_)) def help(self, method_name=None): pass def create_list_from_jschema(self, jschema, create_obj_from_jschema) -> List: return [create_obj_from_jschema(jschema.apply(i)) for i in range(jschema.length())] # fsutils functions def cp(self, source: str, dest: str, recurse: bool = False) -> str: self.check_types([(source, string_types), (dest, string_types), (recurse, bool)]) if recurse: return self.print_return(shutil.copytree(source, dest)) else: return self.print_return(shutil.copyfile(source, dest)) def head(self, file: str, max_bytes: int = 65536) -> int: self.check_types([(file, string_types), (max_bytes, int)]) return self.print_return(os.system(f"head -n {max_bytes} {file}")) def ls(self, path: str) -> List[FileInfo]: self.check_types([(path, string_types)]) return self.print_return( [FileInfo(e.path, e.name, e.stat().st_size, e.stat().st_mtime) for e in os.scandir(path)] ) def mkdirs(self, dir: str) -> str: self.check_types([(dir, string_types)]) return self.print_return(os.makedirs(dir, exist_ok=True)) # type: ignore def mv(self, source: str, dest: str, recurse: bool = False) -> str: # type: ignore self.check_types([(source, string_types), (dest, string_types), (recurse, bool)]) if recurse: return self.print_return(shutil.move(source, dest)) else: if not os.path.isfile(source): if len(os.listdir(source)) > 0: raise FileExistsError("To copy files in a directory, set recurse to true") else: return self.print_return(shutil.move(source, dest)) def put(self, file: str, contents: str, overwrite: bool = False) -> None: self.check_types([(file, string_types), (contents, string_types), (overwrite, bool)]) raise NotImplementedError def rm(self, dir: str, recurse: bool = False) -> str: self.check_types([(dir, string_types), (recurse, bool)]) if recurse: return self.print_return(shutil.rmtree(dir)) # type: ignore else: return self.print_return(os.rmdir(dir)) # type: ignore # cache functions def cacheFiles(self, *files: str) -> None: self.check_types(list(zip(files, [string_types] * len(files)))) raise NotImplementedError def cacheTable(self, name: str) -> None: self.check_types([(name, string_types)]) raise NotImplementedError def uncacheFiles(self, *files: str) -> None: self.check_types(list(zip(files, [string_types] * len(files)))) raise NotImplementedError def uncacheTable(self, name: str) -> None: self.check_types([(name, string_types)]) raise NotImplementedError # mount functions def mount( self, source: str, mount_point: str, encryption_type: str = "", owner: Union[str, None] = None, extra_configs: dict = {}, ) -> None: self.check_types( [ (source, string_types), (mount_point, string_types), (encryption_type, string_types), (owner, Union[str, None]), (extra_configs, dict), ] ) raise NotImplementedError def updateMount( self, source: str, mount_point: str, encryption_type: str = "", owner: Union[str, None] = None, extra_configs: dict = {}, ) -> None: self.check_types( [ (source, string_types), (mount_point, string_types), (encryption_type, string_types), (owner, Union[str, None]), (extra_configs, dict), ] ) raise NotImplementedError def mounts(self) -> None: raise NotImplementedError def refreshMounts(self) -> None: raise NotImplementedError def unmount(self, mount_point: str) -> None: raise NotImplementedError def __repr__(self): # TODO(tjh) call the scala doc return "Package 'dbutils.fs'. For more information, type 'display(dbutils.fs)'" " in a cell." def __getstate__(self): print( """ You cannot use dbutils within a spark job or otherwise pickle it. If you need to use getArguments within a spark job, you have to get the argument before using it in the job. For example, if you have the following code: myRdd.map(lambda i: dbutils.args.getArgument("X") + str(i)) Then you should use it this way: argX = dbutils.args.getArgument("X") myRdd.map(lambda i: argX + str(i)) """ ) raise Exception("You cannot use dbutils within a spark job") def __init__(self, sc: SparkSession): """Initialize DBUtils. Args: sc (SparkSession): The Spark session. Returns: None """ self.sc = sc def __getattr__(self, item): if item == "fs": self.fs = self.FSHandler(self.sc) return self.fs if item == "notebook": self.notebook = self.Notebook(self.sc) return self.notebook raise AttributeError(f"no attribute named '{item}'.") def __repr__(self): return "Package 'dbutils'. For more information, type 'dbutils.help()' in a cell." def __call__(self): return self def help(self, method_name: str = "") -> None: pass
Class variables
var FSHandler
var Notebook
Methods
def help(self, method_name: str = '') ‑> None
-
Expand source code
def help(self, method_name: str = "") -> None: pass
class FileInfo (path, name, size, modificationTime)
-
FileInfo(path, name, size, modificationTime)
Expand source code
class FileInfo(namedtuple("FileInfo", ["path", "name", "size", "modificationTime"])): def isDir(self) -> bool: """ Check if the FileInfo represents a directory. Returns: bool: True if the FileInfo represents a directory, False otherwise. """ return self.name.endswith("/") def isFile(self) -> bool: """ Check if the FileInfo represents a file. Returns: bool: True if the FileInfo represents a file, False otherwise. """ return not self.isDir() @staticmethod def create_from_jschema(j_file_info) -> "FileInfo": """ Create a FileInfo object from a JavaScript schema object. Args: j_file_info: The JavaScript schema object. Returns: FileInfo: The created FileInfo object. """ return FileInfo( path=j_file_info.path(), name=j_file_info.name(), size=j_file_info.size(), modificationTime=j_file_info.modificationTime(), )
Ancestors
- builtins.tuple
Static methods
def create_from_jschema(j_file_info) ‑> FileInfo
-
Create a FileInfo object from a JavaScript schema object.
Args
j_file_info
- The JavaScript schema object.
Returns
FileInfo
- The created FileInfo object.
Expand source code
@staticmethod def create_from_jschema(j_file_info) -> "FileInfo": """ Create a FileInfo object from a JavaScript schema object. Args: j_file_info: The JavaScript schema object. Returns: FileInfo: The created FileInfo object. """ return FileInfo( path=j_file_info.path(), name=j_file_info.name(), size=j_file_info.size(), modificationTime=j_file_info.modificationTime(), )
Methods
def isDir(self) ‑> bool
-
Check if the FileInfo represents a directory.
Returns
bool
- True if the FileInfo represents a directory, False otherwise.
Expand source code
def isDir(self) -> bool: """ Check if the FileInfo represents a directory. Returns: bool: True if the FileInfo represents a directory, False otherwise. """ return self.name.endswith("/")
def isFile(self) ‑> bool
-
Check if the FileInfo represents a file.
Returns
bool
- True if the FileInfo represents a file, False otherwise.
Expand source code
def isFile(self) -> bool: """ Check if the FileInfo represents a file. Returns: bool: True if the FileInfo represents a file, False otherwise. """ return not self.isDir()