Source code for cellpy.internals.core

"""This module contains div classes etc that are not really connected to cellpy."""

from dataclasses import dataclass
import fnmatch
import logging
import os
import pathlib
import shutil
import stat
import tempfile
import time
import warnings
from typing import (
    Any,
    Tuple,
    Dict,
    List,
    Union,
    TypeVar,
    Generator,
    Optional,
    Iterable,
    Callable,
    Type,
    cast,
)

import fabric

from cellpy.exceptions import UnderDefined

S = TypeVar("S", bound="OtherPath")
URI_PREFIXES = ["ssh:", "sftp:", "scp:", "http:", "https:", "ftp:", "ftps:", "smb:"]
IMPLEMENTED_PROTOCOLS = ["ssh:", "sftp:", "scp:"]
# name of environment variable that holds the key file and password:
ENV_VAR_CELLPY_KEY_FILENAME = "CELLPY_KEY_FILENAME"
ENV_VAR_CELLPY_PASSWORD = "CELLPY_PASSWORD"


[docs]@dataclass class ExternalStatResult: """Mock of os.stat_result.""" # st_mode: int = 0 # st_ino: int = 0 # st_dev: int = 0 # st_nlink: int = 0 # st_uid: int = 0 # st_gid: int = 0 st_size: int = 0 st_mtime: int = 0 st_atime: int = 0 st_ctime: Optional[int] = None
def _clean_up_original_path_string(path_string): if not isinstance(path_string, str): if isinstance(path_string, OtherPath): logging.debug(f"path is an OtherPath object") if hasattr(path_string, "original"): logging.debug(f"path has an original attribute") path_string = path_string.original else: logging.debug(f"path does not have an original attribute") path_string = str(path_string) elif isinstance(path_string, pathlib.PosixPath): path_string = "/".join(path_string.parts) elif isinstance(path_string, pathlib.WindowsPath): parts = list(path_string.parts) if not parts: parts = [""] parts[0] = parts[0].replace("\\", "") path_string = "/".join(parts) else: logging.debug(f"unknown path type: {type(path_string)}") path_string = str(path_string) return path_string def _check_external(path_string: str) -> Tuple[str, bool, str, str]: # path_sep = "\\" if os.name == "nt" else "/" _is_external = False _location = "" _uri_prefix = "" for prefix in URI_PREFIXES: if path_string.startswith(prefix): path_string = path_string.replace(prefix, "") path_string = path_string.lstrip("/") _is_external = True _uri_prefix = prefix + "//" _location, *rest = path_string.split("/") path_string = "/" + "/".join(rest) break path_string = path_string or "." # fix for windows paths: path_string = path_string.replace("\\", "/") # fix for posix paths: path_string = path_string.replace("//", "/") return path_string, _is_external, _uri_prefix, _location
[docs]class OtherPath(pathlib.Path): """A pathlib.Path subclass that can handle external paths. Additional attributes: is_external (bool): is True if the path is external. location (str): the location of the external path (e.g. a server name). uri_prefix (str): the prefix of the external path (e.g. scp:// or sftp://). raw_path (str): the path without any uri_prefix or location. original (str): the original path string. full_path (str): the full path (including uri_prefix and location). Additional methods: copy (method): a method for copying the file to a local path. Overrides (only if is_external is True): glob (method): a method for globbing external paths. rglob (method): a method for 'recursive' globbing external paths (max one extra level deep). """ _flavour = ( pathlib._windows_flavour if os.name == "nt" else pathlib._posix_flavour ) # noqa def __new__(cls, *args, **kwargs): if args: path, *args = args else: path = "." logging.debug("initiating OtherPath without any arguments") if not path: logging.debug("initiating OtherPath with empty path") path = "." if isinstance(path, OtherPath) and hasattr(path, "_original"): logging.debug(f"path is OtherPath") path = path._original logging.debug(f"checked if path is OtherPath") path = _clean_up_original_path_string(path) assert isinstance(path, str), "path must be a string" cls.__original = path cls._pathlib_doc = super().__doc__ path = _check_external(path)[0] return super().__new__(cls, path, *args, **kwargs) def __init__(self, *args, **kwargs): logging.debug("Running __init__ for OtherPath") _path_string, *args = args if not _path_string: path_string = "." else: path_string = self.__original self._original = self.__original self._check_external(path_string) # pathlib.PurePath and Path for Python 3.12 seems to have an __init__ method # where it sets self._raw_path from the input argument, but this is not the case # for Python 3.11, 10, and 9. Those do not have their own __init__ method (and # does not have a self._raw_path attribute). # Instead of running e.g. super().__init__(self._raw_other_path) we do this # instead (which is what the __init__ method does in Python 3.12): self._raw_path = self._raw_other_path self.__doc__ += f"\nOriginal documentation:\n\n{self._pathlib_doc}" self._wrap_methods() # dynamically wrapping methods - should gradually be replaced by hard-coded methods. def _wrap_methods(self): logging.debug("Running _wrap_methods for OtherPath") existing_methods = self.__class__.__dict__.keys() parent_methods_that_works_also_on_external_paths = [] # "parents", "parts" parent_methods_that_returns_other_paths = [] for m in sorted(dir(pathlib.Path)): if m.startswith("_"): continue if ( m in existing_methods or m in parent_methods_that_works_also_on_external_paths ): continue method = getattr(pathlib.Path, m) if m in parent_methods_that_returns_other_paths: setattr(self.__class__, m, self._wrap_and_morph_method(method)) if callable(method): setattr(self.__class__, m, self._wrap_callable_method(method)) else: setattr(self.__class__, m, self._wrap_non_callable(method)) def _wrap_and_morph_method(self, method): if self.is_external: return lambda *args, **kwargs: self else: return lambda *args, **kwargs: OtherPath(*args, **kwargs) def _wrap_callable_method(self, method, default_return_value=True): if self.is_external: return lambda *args, **kwargs: default_return_value else: return method def _wrap_non_callable(self, attr, default_return_value=None): if self.is_external: return default_return_value else: return attr def _check_external(self, path_string): logging.debug("Running _check_external for OtherPath") ( path_string, self._is_external, self._uri_prefix, self._location, ) = _check_external(path_string) logging.debug(f"self._is_external: {self._is_external}") logging.debug(f"self._uri_prefix: {self._uri_prefix}") logging.debug(f"self._location: {self._location}") logging.debug(f"path_string: {path_string}") self._raw_other_path = path_string def __div__(self, other: Union[str, S]) -> S: if self.is_external: path = f"{self._original}/{other}" return OtherPath(path) path = pathlib.Path(self._original).__truediv__(other) return OtherPath(path) def __truediv__(self, other: Union[str, S]) -> S: if self.is_external: path = f"{self._original}/{other}" return OtherPath(path) path = pathlib.Path(self._original).__truediv__(other) return OtherPath(path) def __rtruediv__(self: S, key: Union[str, S]) -> S: if self.is_external: raise TypeError(f"Cannot use rtruediv on external paths.") path = pathlib.Path(self._original).__rtruediv__(key) return OtherPath(path) def __str__(self: S) -> str: if hasattr(self, "_original") and self.is_external: logging.debug("external path, returning _original") return self._original return super().__str__() def __repr__(self: S) -> str: if hasattr(self, "_original"): if self.is_external: logging.debug("external path, returning _original") return f"OtherPath('{self._original}')" else: return super().__repr__() def _glob(self, glob_str: str, **kwargs) -> Generator: testing = kwargs.pop("testing", False) search_in_sub_dirs = kwargs.pop("search_in_sub_dirs", False) if self.is_external: connect_kwargs, host = self._get_connection_info(testing) paths = self._glob_with_fabric( host, connect_kwargs, glob_str, search_in_sub_dirs=search_in_sub_dirs ) return (OtherPath(f"{self._original.rstrip('/')}/{p}") for p in paths) paths = pathlib.Path(self._original).glob(glob_str) return (OtherPath(p) for p in paths)
[docs] def glob(self, glob_str: str, *args, **kwargs) -> Generator: return self._glob(glob_str, search_in_sub_dirs=False, **kwargs)
[docs] def rglob(self, glob_str: str, *args, **kwargs) -> Generator: return self._glob(glob_str, search_in_sub_dirs=True, **kwargs)
def _listdir(self, levels: int, **kwargs) -> Generator: if self.is_external: testing = kwargs.pop("testing", False) connect_kwargs, host = self._get_connection_info(testing) paths = self._listdir_with_fabric(host, connect_kwargs, levels) return (OtherPath(p) for p in paths) if self.is_dir(): return ( OtherPath(f"{self.full_path}/{p}") for p in os.listdir(self._original) )
[docs] def listdir(self: S, levels: int = 1, **kwargs) -> Generator: """List the contents of the directory. Args: levels (int, optional): How many sublevels to list. Defaults to 1. If you want to list all sublevels, use `listdir(levels=-1)`. If you want to list only the current level (no subdirectories), use `listdir(levels=0)`. Returns: Generator: Generator of OtherPath objects. """ return self._listdir(levels, **kwargs)
[docs] def resolve(self: S, *args, **kwargs) -> S: """Resolve the path.""" if self.is_external: logging.debug(f"Cannot resolve external paths. Returning self. ({self})") return OtherPath(self._original) resolved_path = pathlib.Path(self._original).resolve(*args, **kwargs) return OtherPath(resolved_path)
[docs] def is_dir(self: S, *args, **kwargs) -> bool: """Check if path is a directory.""" if self.is_external: logging.warning( f"Cannot check if dir exists for external paths! Assuming it exists." ) return True return super().is_dir()
[docs] def is_file(self: S, *args, **kwargs) -> bool: """Check if path is a file.""" if self.is_external: logging.warning( f"Cannot check if file exists for external paths! Assuming it exists." ) return True return super().is_file()
[docs] def exists(self: S, *args, **kwargs) -> bool: """Check if path exists.""" if self.is_external: logging.warning( f"Cannot check if path exists for external paths! Assuming it exists." ) return True return super().exists()
@property def parent(self: S) -> S: """Return the parent directory of the path.""" if self.is_external: return OtherPath(self._original.rsplit("/", 1)[0]) return OtherPath(super().parent) @property def name(self: S): """Return the parent directory of the path.""" return super().name @property def suffix(self) -> str: """Return the suffix of the path.""" return super().suffix @property def suffixes(self) -> List[str]: """Return the suffixes of the path.""" return super().suffixes @property def stem(self) -> str: """Return the stem of the path.""" return super().stem
[docs] def with_suffix(self: S, suffix: str) -> S: """Return a new path with the suffix changed.""" if self.is_external: logging.warning( "This is method (`with_suffix`) not tested for external paths!" ) return OtherPath(self._original.rsplit(".", 1)[0] + suffix) return OtherPath(super().with_suffix(suffix))
[docs] def with_name(self: S, name: str) -> S: """Return a new path with the name changed.""" if self.is_external: logging.warning( "This method (`with_name`) is not tested for external paths!" ) return OtherPath(self._original.rsplit("/", 1)[0] + "/" + name) return OtherPath(super().with_name(name))
[docs] def with_stem(self: S, stem: str) -> S: """Return a new path with the stem changed.""" if self.is_external: logging.warning( "This method (`with_stem`) is not tested for external paths!" ) return OtherPath(self._original.rsplit("/", 1)[0] + "/" + stem) return OtherPath(super().with_stem(stem))
[docs] def absolute(self: S) -> S: if self.is_external: logging.warning( "This method (`absolute`) is not implemented yet for external paths! Returning self." ) return OtherPath(self._original) return OtherPath(super().absolute())
[docs] def samefile(self: S, other_path: Union[str, pathlib.Path, S]) -> bool: if self.is_external: logging.warning( "This method (`absolute`) is not implemented yet for external paths! Returning True." ) return True return super().samefile(other_path)
[docs] def iterdir(self, *args, **kwargs): if self.is_external: logging.warning( f"Cannot run `iterdir` yet for external paths! Returning None." ) return else: return (OtherPath(p) for p in super().iterdir())
@property def parents(self, *args, **kwargs): if self.is_external: logging.warning( f"Cannot run `parents` yet for external paths! Returning None." ) return return super().parents
[docs] def stat(self, *args, **kwargs): testing = kwargs.pop("testing", False) if self.is_external: # logging.warning(f"Cannot run `stat` for external paths! Returning stat_result object with only zeros.") try: connect_kwargs, host = self._get_connection_info(testing) except UnderDefined as e: logging.debug(f"UnderDefined error: {e}") logging.debug("Returning stat_result object with only zeros.") return ExternalStatResult() try: return self._stat_with_fabric(host, connect_kwargs) except FileNotFoundError: logging.debug( "File not found! Returning stat_result object with only zeros." ) return ExternalStatResult() return super().stat()
[docs] def joinpath(self, *args, **kwargs): logging.warning(f"Cannot run 'joinpath' for OtherPath!") return OtherPath(self._original)
[docs] def match(self, *args, **kwargs): logging.warning(f"Cannot run 'match' for OtherPath!") return
[docs] def cwd(self): logging.warning(f"Cannot run 'match' for OtherPath!") return
[docs] def group(self): logging.warning(f"Cannot run 'group' for OtherPath!") return
@property def owner(self, *args, **kwargs): logging.warning(f"Cannot get 'owner' for OtherPath!") return
[docs] def lchmod(self, *args, **kwargs): logging.warning(f"Cannot run 'lchmod' for OtherPath!") return OtherPath(self._original)
@property def original(self: S) -> str: return self._original @property def raw_path(self: S) -> str: # this will return a leading slash for some edge cases return self._raw_other_path @property def full_path(self: S) -> str: if self.is_external: return f"{self._uri_prefix}{self._location}{self._raw_other_path}" return self._original @property def pathlike_location(self: S) -> S: """Return the location of the external path as a pathlike object.""" if self.is_external: return OtherPath(f"{self._uri_prefix}{self._location}") return OtherPath(super().drive) @property def is_external(self: S) -> bool: if not hasattr(self, "_is_external"): logging.warning("OBS! OtherPath object missing _is_external attribute!") logging.warning("This should not happen. Please report this bug!") logging.warning( "(most likely means that pathlib.Path has changed and that it now has " "another attribute or method that returns a new pathlib.Path object or " "that you have used a method that is not supported yet)" ) # return False return self._is_external @property def uri_prefix(self) -> str: """Return the uri prefix for the external path (e.g ``ssh://``).""" return self._uri_prefix @property def location(self) -> str: """Return the location of the external path (e.g ``user@server.com``).""" return self._location
[docs] def as_uri(self) -> str: """Return the path as a uri (e.g. ``scp://user@server.com/home/data/my_file.txt``).""" if self._is_external: return f"{self._uri_prefix}{self._location}/{'/'.join(list(super().parts)[1:])}" return super().as_uri()
[docs] def copy( self, destination: Optional[pathlib.Path] = None, testing=False ) -> pathlib.Path: """Copy the file to a destination.""" if destination is None: destination = pathlib.Path(tempfile.gettempdir()) else: destination = pathlib.Path(destination) path_of_copied_file = destination / self.name if not self.is_external: shutil.copy2(self, destination) else: connect_kwargs, host = self._get_connection_info(testing) self._copy_with_fabric(host, connect_kwargs, destination) return path_of_copied_file
def _get_connection_info(self, testing: bool = False) -> Tuple[Dict, str]: host = self.location uri_prefix = self.uri_prefix.replace("//", "") if uri_prefix not in URI_PREFIXES: raise ValueError(f"uri_prefix {uri_prefix} not recognized") if uri_prefix not in IMPLEMENTED_PROTOCOLS: raise ValueError( f"uri_prefix {uri_prefix.replace(':', '')} not implemented yet" ) password = os.getenv(ENV_VAR_CELLPY_PASSWORD, None) key_filename = os.getenv(ENV_VAR_CELLPY_KEY_FILENAME, None) if password is None and key_filename is None: raise UnderDefined( f"You must define either {ENV_VAR_CELLPY_PASSWORD} " f"or {ENV_VAR_CELLPY_KEY_FILENAME} environment variables." ) if key_filename is not None: key_filename = pathlib.Path(key_filename).expanduser().resolve() connect_kwargs = {"key_filename": str(key_filename)} logging.debug(f"got key_filename") if not testing: if not pathlib.Path(key_filename).is_file(): raise FileNotFoundError(f"Could not find key file {key_filename}") else: connect_kwargs = {"password": password} return connect_kwargs, host def _copy_with_fabric( self, host: str, connect_kwargs: dict, destination: Union[str, S, pathlib.Path] ): with fabric.Connection(host, connect_kwargs=connect_kwargs) as conn: try: t1 = time.time() conn.get(self.raw_path, str(destination / self.name)) logging.debug(f"copying took {time.time() - t1:.2f} seconds") except FileNotFoundError as e: raise FileNotFoundError( f"Could not find file {self.raw_path} on {host}" ) from e def _stat_with_fabric(self, host: str, connect_kwargs: dict) -> ExternalStatResult: with fabric.Connection(host, connect_kwargs=connect_kwargs) as conn: try: t1 = time.time() sftp_conn = conn.sftp() stat_result = sftp_conn.stat(self.raw_path) logging.debug(f"stat took {time.time() - t1:.2f} seconds") return ExternalStatResult( st_size=stat_result.st_size, st_atime=stat_result.st_atime, st_mtime=stat_result.st_mtime, ) except FileNotFoundError as e: raise FileNotFoundError( f"Could not find file {self.raw_path} on {host}" ) from e def _listdir_with_fabric( self: S, host: str, connect_kwargs: dict, levels: int = 1, ) -> List[str]: """List the contents of a directory through sftp.""" path_separator = "/" # only supports unix-like systems t1 = time.time() with fabric.Connection(host, connect_kwargs=connect_kwargs) as conn: try: t1 = time.time() sftp_conn = conn.sftp() sftp_conn.chdir(self.raw_path) sub_dirs = [ f"{self.raw_path}{path_separator}{f}" for f in sftp_conn.listdir() if stat.S_ISDIR(sftp_conn.stat(f).st_mode) ] files = [ f"{self.raw_path}{path_separator}{f}" for f in sftp_conn.listdir() if not stat.S_ISDIR(sftp_conn.stat(f).st_mode) ] while levels != 0: new_sub_dirs = [] for sub_dir in sub_dirs: try: sftp_conn.chdir(sub_dir) _new_sub_dirs = [ f"{sub_dir}{path_separator}{f}" for f in sftp_conn.listdir() if stat.S_ISDIR(sftp_conn.stat(f).st_mode) ] new_files = [ f"{sub_dir}{path_separator}{f}" for f in sftp_conn.listdir() if not stat.S_ISDIR(sftp_conn.stat(f).st_mode) ] files += new_files new_sub_dirs += _new_sub_dirs sftp_conn.chdir(self.raw_path) except FileNotFoundError: logging.debug( f"Could not look in {sub_dir}: FileNotFoundError" ) pass sub_dirs = new_sub_dirs if len(sub_dirs) == 0: break levels -= 1 logging.debug(f"globbing took {time.time() - t1:.2f} seconds") return files except FileNotFoundError as e: raise FileNotFoundError( f"Could not find file {self.raw_path} on {host}" ) from e def _glob_with_fabric( self: S, host: str, connect_kwargs: dict, glob_str: str, search_in_sub_dirs: bool = False, ) -> List[str]: # TODO: update this so that it works faster (need some linux magic) path_separator = "/" with fabric.Connection(host, connect_kwargs=connect_kwargs) as conn: try: t1 = time.time() sftp_conn = conn.sftp() sftp_conn.chdir(self.raw_path) if search_in_sub_dirs: # recursive globbing one level down sub_dirs = [ f for f in sftp_conn.listdir() if stat.S_ISDIR(sftp_conn.stat(f).st_mode) ] files = [ f for f in sftp_conn.listdir() if not stat.S_ISDIR(sftp_conn.stat(f).st_mode) ] filtered_files = fnmatch.filter(files, glob_str) for sub_dir in sub_dirs: try: sftp_conn.chdir(sub_dir) new_files = [ f for f in sftp_conn.listdir() if not stat.S_ISDIR(sftp_conn.stat(f).st_mode) ] new_filtered_files = fnmatch.filter(new_files, glob_str) new_filtered_files = [ f"{sub_dir}{path_separator}{f}" for f in new_filtered_files ] filtered_files += new_filtered_files sftp_conn.chdir("..") except FileNotFoundError: logging.debug( f"Could not look in {sub_dir}: FileNotFoundError" ) pass else: files = sftp_conn.listdir() filtered_files = fnmatch.filter(files, glob_str) logging.debug(f"globbing took {time.time() - t1:.2f} seconds") return filtered_files except FileNotFoundError as e: raise FileNotFoundError( f"Could not find file {self.raw_path} on {host}" ) from e