Module panama.dbutils_mock

Expand source code
from .dbutils import DBUtils

__all__ = [
    "DBUtils",
]

Sub-modules

panama.dbutils_mock.dbutils

Classes

class DBUtils (sc: pyspark.sql.session.SparkSession)

Initialize DBUtils.

Args

sc : SparkSession
The Spark session.

Returns

None

Expand source code
class DBUtils:
    class Notebook:
        class EntryPoint:
            def __init__(self, sc) -> None:
                self.sc = sc

            def getCurrentBindings(self):
                return dict()

        def __init__(self, sc) -> None:
            self.sc = sc

        def __getattr__(self, item):
            if item == "entry_point":
                self.entry_point = self.EntryPoint(self.sc)
                return self.entry_point
            raise AttributeError

    class FSHandler:
        def __init__(self, sc):
            self.sc = sc

        def __call__(self):
            return self

        def print_return(self, result):
            print(result)
            return result

        def check_types(self, vars_and_types):
            for var, type_ in vars_and_types:
                if not isinstance(var, type_):
                    raise TypeError("{0} has the wrong type - {1} is expected.".format(repr(var), type_))

        def help(self, method_name=None):
            pass

        def create_list_from_jschema(self, jschema, create_obj_from_jschema) -> List:
            return [create_obj_from_jschema(jschema.apply(i)) for i in range(jschema.length())]

        # fsutils functions

        def cp(self, source: str, dest: str, recurse: bool = False) -> str:
            self.check_types([(source, string_types), (dest, string_types), (recurse, bool)])
            if recurse:
                return self.print_return(shutil.copytree(source, dest))
            else:
                return self.print_return(shutil.copyfile(source, dest))

        def head(self, file: str, max_bytes: int = 65536) -> int:
            self.check_types([(file, string_types), (max_bytes, int)])
            return self.print_return(os.system(f"head -n {max_bytes} {file}"))

        def ls(self, path: str) -> List[FileInfo]:
            self.check_types([(path, string_types)])
            return self.print_return(
                [FileInfo(e.path, e.name, e.stat().st_size, e.stat().st_mtime) for e in os.scandir(path)]
            )

        def mkdirs(self, dir: str) -> str:
            self.check_types([(dir, string_types)])
            return self.print_return(os.makedirs(dir, exist_ok=True))  # type: ignore

        def mv(self, source: str, dest: str, recurse: bool = False) -> str:  # type: ignore
            self.check_types([(source, string_types), (dest, string_types), (recurse, bool)])
            if recurse:
                return self.print_return(shutil.move(source, dest))
            else:
                if not os.path.isfile(source):
                    if len(os.listdir(source)) > 0:
                        raise FileExistsError("To copy files in a directory, set recurse to true")
                else:
                    return self.print_return(shutil.move(source, dest))

        def put(self, file: str, contents: str, overwrite: bool = False) -> None:
            self.check_types([(file, string_types), (contents, string_types), (overwrite, bool)])
            raise NotImplementedError

        def rm(self, dir: str, recurse: bool = False) -> str:
            self.check_types([(dir, string_types), (recurse, bool)])
            if recurse:
                return self.print_return(shutil.rmtree(dir))  # type: ignore
            else:
                return self.print_return(os.rmdir(dir))  # type: ignore

        # cache functions

        def cacheFiles(self, *files: str) -> None:
            self.check_types(list(zip(files, [string_types] * len(files))))
            raise NotImplementedError

        def cacheTable(self, name: str) -> None:
            self.check_types([(name, string_types)])
            raise NotImplementedError

        def uncacheFiles(self, *files: str) -> None:
            self.check_types(list(zip(files, [string_types] * len(files))))
            raise NotImplementedError

        def uncacheTable(self, name: str) -> None:
            self.check_types([(name, string_types)])
            raise NotImplementedError

        # mount functions

        def mount(
            self,
            source: str,
            mount_point: str,
            encryption_type: str = "",
            owner: Union[str, None] = None,
            extra_configs: dict = {},
        ) -> None:
            self.check_types(
                [
                    (source, string_types),
                    (mount_point, string_types),
                    (encryption_type, string_types),
                    (owner, Union[str, None]),
                    (extra_configs, dict),
                ]
            )
            raise NotImplementedError

        def updateMount(
            self,
            source: str,
            mount_point: str,
            encryption_type: str = "",
            owner: Union[str, None] = None,
            extra_configs: dict = {},
        ) -> None:
            self.check_types(
                [
                    (source, string_types),
                    (mount_point, string_types),
                    (encryption_type, string_types),
                    (owner, Union[str, None]),
                    (extra_configs, dict),
                ]
            )
            raise NotImplementedError

        def mounts(self) -> None:
            raise NotImplementedError

        def refreshMounts(self) -> None:
            raise NotImplementedError

        def unmount(self, mount_point: str) -> None:
            raise NotImplementedError

        def __repr__(self):
            # TODO(tjh) call the scala doc
            return "Package 'dbutils.fs'. For more information, type 'display(dbutils.fs)'" " in a cell."

    def __getstate__(self):
        print(
            """ You cannot use dbutils within a spark job or otherwise pickle it.
            If you need to use getArguments within a spark job, you have to get the argument before
            using it in the job. For example, if you have the following code:

              myRdd.map(lambda i: dbutils.args.getArgument("X") + str(i))

            Then you should use it this way:

              argX = dbutils.args.getArgument("X")
              myRdd.map(lambda i: argX + str(i))
            """
        )
        raise Exception("You cannot use dbutils within a spark job")

    def __init__(self, sc: SparkSession):
        """Initialize DBUtils.

        Args:
            sc (SparkSession): The Spark session.

        Returns:
            None
        """
        self.sc = sc

    def __getattr__(self, item):
        if item == "fs":
            self.fs = self.FSHandler(self.sc)
            return self.fs
        if item == "notebook":
            self.notebook = self.Notebook(self.sc)
            return self.notebook
        raise AttributeError(f"no attribute named '{item}'.")

    def __repr__(self):
        return "Package 'dbutils'. For more information, type 'dbutils.help()' in a cell."

    def __call__(self):
        return self

    def help(self, method_name: str = "") -> None:
        pass

Class variables

var FSHandler
var Notebook

Methods

def help(self, method_name: str = '') ‑> None
Expand source code
def help(self, method_name: str = "") -> None:
    pass