"""Filesystem utilities for internal use"""from__future__importannotationsimportloggingimportosfromcurldl.util.cryptimportCryptographyfromcurldl.util.timeimportTimelog=logging.getLogger(__name__)
[docs]classFileSystem:"""Filesystem utilities, include cryptographic digest verification wrappers"""
[docs]@staticmethoddefverify_rel_path_is_safe(basedir:str|os.PathLike[str],rel_path:str|os.PathLike[str])->None:"""Verify that a relative path does not escape base directory and either does not exist or is a file or a symlink to one. :param basedir: base directory path :param rel_path: path relative to base directory :raises ValueError: relative path escapes base directory before or after symlink resolution, resulting path is a dangling symlink, is not a file or a symlink to file """base=os.path.abspath(basedir)path=os.path.abspath(os.path.join(basedir,rel_path))base_real,path_real=os.path.realpath(base),os.path.realpath(path)# os.path.commonpath() also raises ValueError for different-drive Windows pathsifbase!=os.path.commonpath((base,path)):raiseValueError(f"Relative path {rel_path} escapes base path {base}")ifbase_real!=os.path.commonpath((base_real,path_real)):raiseValueError(f"Relative path {rel_path} escapes base path {base} ""after resolving symlinks")ifbase==pathorbase_real==path_real:raiseValueError(f"Relative path {rel_path} does not extend {base}")ifos.path.islink(path)andnotos.path.exists(path):raiseValueError(f"Path is a dangling symlink: {path}")ifos.path.exists(path)andnotos.path.isfile(path):raiseValueError(f"Exists and not a file or symlink to file: {path}")ifstr(rel_path).endswith(os.path.sep)or(os.path.altsepandstr(rel_path).endswith(os.path.altsep)):raiseValueError(f"Path can only point to a directory: {rel_path}")
[docs]@classmethoddefcreate_directory_for_path(cls,path:str|os.PathLike[str])->None:"""Create all path components for path, except for last. :param path: file path """path_dir=os.path.dirname(path)ifnotos.path.exists(path_dir):log.info("Creating directory: %s",path_dir)os.makedirs(path_dir)
[docs]@classmethoddefverify_size_and_digests(cls,path:str|os.PathLike[str],*,size:int|None=None,digests:dict[str,str]|None=None,)->None:"""Verify file size and digests and raise :class:`ValueError` in case of mismatch. ``digests`` is a dict of hash algorithms and digests to check (see :func:`curldl.util.crypt.Cryptography.verify_digest`). :param path: input file path :param size: expected file size in bytes, or ``None`` to ignore :param digests: mapping of digest algorithms to expected hexadecimal digest strings, or ``None`` to ignore :raises ValueError: not a file or file size mismatch or one of digests fails verification """ifsizeisnotNone:cls.verify_size(path,size=size)foralgo,digestindigests.items()ifdigestselse{}:Cryptography.verify_digest(path,algo=algo,digest=digest)
[docs]@classmethoddefverify_size(cls,path:str|os.PathLike[str],size:int)->None:"""Verify file size and raise :class:`ValueError` in case of mismatch or if not a file. :param path: input file path :param size: expected file size in bytes :raises ValueError: not a file or file size mismatch """path_size=os.path.getsize(path)ifnotos.path.isfile(path):raiseValueError(f"Not a file: {path}")ifpath_size!=size:raiseValueError(f"Size mismatch for {path}: {path_size:,} instead of {size:,} B")log.debug("Successfully verified file size of %s",path)
[docs]@staticmethoddefget_file_size(path:str|os.PathLike[str],default:int=0)->int:"""Returns file size, or ``default`` if it does not exist or is not a file. :param path: input file path :param default: value to return if ``path`` does not exist or is not a file (e.g., a directory) :return: input file size """returnos.path.getsize(path)ifos.path.isfile(path)elsedefault
[docs]@classmethoddefset_file_timestamp(cls,path:str|os.PathLike[str],timestamp:int|float)->None:"""Sets file timestamp to a POSIX timestamp. If timestamp is negative, does nothing. :param path: filesystem path, must exist; symlinks are followed :param timestamp: POSIX UTC-based timestamp to store as last-modified and last-accessed file time if non-negative """iftimestamp<0:returniflog.isEnabledFor(logging.DEBUG):log.debug("Timestamping %s with %s",path,Time.timestamp_to_dt(timestamp))os.utime(path,times=(timestamp,timestamp))