Module ki.functional

Type-safe, non Anki-specific functions.

Expand source code
#!/usr/bin/env python3
"""Type-safe, non Anki-specific functions."""

# pylint: disable=import-self, too-many-return-statements
# pylint: disable=no-value-for-parameter

import os
import re
import sys
import shutil
import hashlib
import tempfile
import functools
import subprocess
import unicodedata
from pathlib import Path
from itertools import chain
from functools import reduce, partial, update_wrapper, wraps

import git
from tqdm import tqdm
from colorama import Fore, Style

from beartype import beartype
from beartype.typing import (
    List,
    Union,
    Generator,
    Tuple,
    Callable,
    Any,
    FrozenSet,
    Iterable,
    TypeVar,
)

import ki.functional as F
from ki.types import (
    File,
    Dir,
    EmptyDir,
    NoPath,
    NoFile,
    Link,
    Singleton,
    PseudoFile,
    KiRev,
    Rev,
)

_has_type_hint_support = sys.version_info[:2] >= (3, 5)

T = TypeVar("T")

UTF8 = "UTF-8"
GIT = ".git"
GITMODULES_FILE = ".gitmodules"
PIPE = subprocess.PIPE
STDOUT = subprocess.STDOUT
BRANCH_NAME = "main"

# Emoji regex character classes.
EMOJIS = "\U0001F600-\U0001F64F"
PICTOGRAPHS = "\U0001F300-\U0001F5FF"
TRANSPORTS = "\U0001F680-\U0001F6FF"
FLAGS = "\U0001F1E0-\U0001F1FF"

# Regex to filter out bad stuff from filenames.
SLUG_REGEX = re.compile(r"[^\w\s\-" + EMOJIS + PICTOGRAPHS + TRANSPORTS + FLAGS + "]")


@beartype
def curried(func: Callable[[Any, ...], T]) -> Callable[[Any, ...], T]:
    """A decorator that makes the function curried

    Usage example:

    >>> @curried
    ... def sum5(a, b, c, d, e):
    ...     return a + b + c + d + e
    ...
    >>> sum5(1)(2)(3)(4)(5)
    15
    >>> sum5(1, 2, 3)(4, 5)
    15
    """

    def _args_len(func):
        # pylint: disable=import-outside-toplevel
        good = True
        try:
            from inspect import signature

            signature(func)
        except TypeError:
            good = False

        if good and _has_type_hint_support:
            from inspect import signature

            args = signature(func).parameters
        else:
            from inspect import getfullargspec

            args = getfullargspec(func).args

        return len(args)

    @wraps(func)
    def _curried(*args, **kwargs):
        f = func
        count = 0
        while isinstance(f, partial):
            if f.args:
                count += len(f.args)
            f = f.func

        if count == _args_len(f) - len(args):
            return func(*args, **kwargs)

        para_func = partial(func, *args, **kwargs)
        if hasattr(f, "__name__"):
            update_wrapper(para_func, f)
        return curried(para_func)

    def _curried_lambda(*args, **kwargs):
        return partial(func, *args, **kwargs)

    if func.__name__ == "<lambda>":
        return _curried_lambda

    return _curried


@beartype
def rmtree(target: Dir) -> NoFile:
    """Equivalent to `shutil.rmtree()`."""
    shutil.rmtree(target)
    return NoFile(target)


@beartype
def copytree(source: Dir, target: NoFile) -> Dir:
    """Call shutil.copytree()."""
    shutil.copytree(source, target, symlinks=True)
    return Dir(target.resolve())


@beartype
def movetree(source: Dir, target: NoFile) -> Dir:
    """Call shutil.move()."""
    shutil.move(source, target)
    return Dir(target.resolve())


@beartype
def cwd() -> Dir:
    """Call Path.cwd()."""
    return Dir(Path.cwd().resolve())


@beartype
def is_root(path: Union[File, Dir]) -> bool:
    """Check if 'path' is a root directory (e.g., '/' on Unix or 'C:\' on Windows)."""
    # Links and `~`s are resolved before checking.
    path = path.resolve()
    return len(path.parents) == 0


@functools.cache
@beartype
def shallow_walk(
    directory: Dir,
) -> Tuple[Dir, List[Dir], List[File]]:
    """Walk only the top-level directory with `os.walk()`."""
    # pylint: disable=redefined-outer-name
    root, dirs, files = next(os.walk(directory))
    root = Dir(root)
    dirs = [Dir(root / d) for d in dirs]
    # TODO: Treat symlinks.
    files = [File(root / f) for f in files]
    return root, dirs, files


@beartype
def walk(
    directory: Dir,
) -> FrozenSet[Union[File, PseudoFile, Link, NoFile]]:
    """Get all file-like leaves in a directory, recursively."""
    # pylint: disable=redefined-outer-name
    leaves = frozenset()
    for root, _, files in os.walk(directory):
        root = Dir(root)
        leaves |= frozenset({F.chk(root / f) for f in files})
    return leaves


# TODO: Remove `resolve: bool` parameter, and test symlinks before resolving.
@beartype
def chk(
    path: Path,
    resolve: bool = True,
) -> Union[File, Dir, EmptyDir, PseudoFile, NoPath, NoFile, Link]:
    """Test whether `path` is a file, a directory, or something else."""
    if resolve:
        path = path.resolve()
    if path.is_file():
        return File(path)
    if path.is_dir():
        if is_empty(Dir(path)):
            return EmptyDir(path)
        return Dir(path)
    if path.exists():
        return PseudoFile(path)
    if os.path.islink(path):
        return Link(path)
    if path.parent.is_dir():
        return NoFile(path)
    return NoPath(path)


@beartype
def touch(directory: Dir, name: str) -> File:
    """Touch a file."""
    path = directory / singleton(name)
    path.touch()
    return File(path.resolve())


@beartype
def write(path: Union[File, NoFile], text: str) -> File:
    """Write text to a file."""
    with open(path, "w+", encoding="UTF-8") as f:
        f.write(text)
    return File(path)


@beartype
def writeb(path: Union[File, NoFile], bs: bytes) -> File:
    """Write text to a file."""
    with open(path, "wb") as f:
        f.write(bs)
    return File(path)


@beartype
def symlink(path: NoFile, target: Path) -> Link:
    """Link `path` to `target`."""
    os.symlink(target, path)
    return Link(path)


@beartype
def mksubdir(directory: EmptyDir, suffix: Path) -> EmptyDir:
    """
    Make a subdirectory of an empty directory (with parents).

    Returns
    -------
    EmptyDir
        The created subdirectory.
    """
    subdir = directory / suffix
    subdir.mkdir(parents=True)
    directory.__class__ = Dir
    return EmptyDir(subdir.resolve())


@beartype
def force_mkdir(path: Path) -> Dir:
    """Make a directory (with parents, ok if it already exists)."""
    path.mkdir(parents=True, exist_ok=True)
    return Dir(path.resolve())


@beartype
def chdir(directory: Dir) -> Dir:
    """Changes working directory and returns old cwd."""
    old: Dir = F.cwd()
    os.chdir(directory)
    return old


@beartype
def parent(path: Union[File, Dir]) -> Dir:
    """
    Get the parent of a path that exists.  If the path points to the filesystem
    root, we return itself.
    """
    if is_root(path):
        return Dir(path.resolve())
    return Dir(path.parent)


@beartype
def mkdtemp() -> EmptyDir:
    """Make a temporary directory (in /tmp)."""
    return EmptyDir(tempfile.mkdtemp()).resolve()


@beartype
def copyfile(source: File, target: Union[File, NoFile]) -> File:
    """Safely copy a file to a valid location."""
    shutil.copyfile(source, target)
    return File(target.resolve())


@beartype
def rglob(d: Dir, pattern: str) -> List[File]:
    """Call d.rglob() and returns only files."""
    files = filter(lambda p: isinstance(p, File), map(F.chk, d.rglob(pattern)))
    return list(files)


@beartype
def is_empty(directory: Dir) -> bool:
    """Check if directory is empty, quickly."""
    return not next(os.scandir(directory), None)


@beartype
def root(repo: git.Repo) -> Dir:
    """Get working directory of a repo."""
    return Dir(repo.working_dir).resolve()


@beartype
def gitd(repo: git.Repo) -> Dir:
    """Get git directory of a repo."""
    return Dir(repo.git_dir).resolve()


@beartype
def singleton(name: str) -> Singleton:
    """Removes all forward slashes and returns a Singleton pathlib.Path."""
    return Singleton(name.replace("/", ""))


@beartype
def md5(path: File) -> str:
    """Compute md5sum of file at `path`."""
    hash_md5 = hashlib.md5()
    with open(path, "rb") as f:
        for chunk in iter(lambda: f.read(4096), b""):
            hash_md5.update(chunk)
    return hash_md5.hexdigest()


@beartype
def rev_exists(repo: git.Repo, rev: str) -> bool:
    """Check if git commit reference exists in repository."""
    try:
        repo.git.rev_parse("--verify", rev)
    except git.GitCommandError:
        return False
    return True


@beartype
def get_batches(lst: List[File], n: int) -> Generator[File, None, None]:
    """Yield successive n-sized chunks from lst."""
    for i in range(0, len(lst), n):
        yield lst[i : i + n]


@beartype
def slugify(value: str) -> str:
    """
    Taken from [1]. Convert spaces or repeated dashes to single dashes. Remove
    characters that aren't alphanumerics, underscores, or hyphens. Convert to
    lowercase. Also strip leading and trailing whitespace, dashes, and
    underscores.

    [1] https://github.com/django/django/blob/master/django/utils/text.py
    """
    value = unicodedata.normalize("NFKC", value)
    value = re.sub(SLUG_REGEX, "", value.lower())
    return re.sub(r"[-\s]+", "-", value).strip("-_")


@beartype
def ki_rev_to_rev(ki_rev: KiRev) -> Rev:
    """Convert a ki repository commit rev to a git repository commit rev."""
    return Rev(ki_rev.kirepo.repo, ki_rev.sha)


@beartype
def mkdir(path: NoPath) -> EmptyDir:
    """Make a directory (with parents)."""
    path.mkdir(parents=True)
    return EmptyDir(path)


@beartype
def unlink(file: Union[File, Link]) -> NoFile:
    """Safely unlink a file."""
    os.unlink(file)
    return NoFile(file)


@curried
@beartype
def rmsm(repo: git.Repo, sm: git.Submodule) -> git.Commit:
    """Remove a git submodule."""
    # Remove the submodule root and delete its .git directory.
    sm_root = Path(sm.module().working_tree_dir)
    repo.git.rm(sm_root, cached=True)
    dotgit = F.chk(sm_root / GIT)
    if isinstance(dotgit, Dir):
        F.rmtree(dotgit)
    else:
        dotgit.unlink(missing_ok=True)

    # Directory `sm_root` should still exist after `git.rm()` call.
    repo.git.add(sm_root)
    return repo.index.commit(f"Add submodule `{sm.name}` as ordinary directory.")


@beartype
def unsubmodule(repo: git.Repo) -> git.Repo:
    """
    Un-submodule all the git submodules (converts them to ordinary subdirs and
    destroys commit history). Commit the changes to the main repository.
    """
    _: List[git.Commit] = list(map(F.rmsm(repo), repo.submodules))
    gitmodules_file: Path = F.root(repo) / GITMODULES_FILE
    if gitmodules_file.exists():
        repo.git.rm(gitmodules_file)
        _ = repo.index.commit("Remove `.gitmodules` file.")
    return repo


@beartype
def init(targetdir: Dir) -> Tuple[git.Repo, str]:
    """Run `git init`, returning the repo and initial branch name."""
    branch = BRANCH_NAME
    try:
        repo = git.Repo.init(targetdir, initial_branch=BRANCH_NAME)
    except git.GitCommandError:
        branch = "master"
        repo = git.Repo.init(targetdir)
    return repo, branch


@beartype
def isfile(p: Path) -> bool:
    """Check if `p` is a File."""
    return isinstance(p, File)


@beartype
def cat(xs: Iterable[Iterable[T]]) -> Iterable[T]:
    """Concatenate some iterables."""
    return chain.from_iterable(xs)


@beartype
def commitall(repo: git.Repo, msg: str) -> git.Commit:
    """Commit all contents of a git repository."""
    repo.git.add(all=True)
    return repo.index.commit(msg)


@curried
@beartype
def git_rm(repo: git.Repo, path: str) -> str:
    """Remove a path in a repo."""
    repo.git.rm(path)
    return path


@beartype
def yellow(s: str) -> None:
    """Print a message to the console in yellow."""
    print(f"{Fore.YELLOW}{s}{Style.RESET_ALL}")


@beartype
def red(s: str) -> None:
    """Print a message to the console in red."""
    print(f"{Fore.RED}{s}{Style.RESET_ALL}")


@beartype
def progressbar(xs: Iterable[T], s: str) -> Iterable[T]:
    """Print a progress bar for an iterable."""
    ys: Iterable[T] = tqdm(xs, ncols=80)
    ys.set_description(s)
    return ys


@beartype
def starfilter(
    f: Callable[[Any, ...], bool], xs: Iterable[Tuple[Any, ...]]
) -> Iterable[Tuple[Any, ...]]:
    """Filter an iterable, automatically unpacking tuple arguments."""
    return filter(lambda x: f(*x), xs)


@beartype
def part(p: Callable[[T], bool], xs: Iterable[T]) -> Tuple[Iterable[T], Iterable[T]]:
    """Partition a list on a boolean predicate (Trues, Falses)."""
    return reduce(lambda s, x: s[not p(x)].append(x) or s, xs, ([], []))

Functions

def cat(xs: collections.abc.Iterable[collections.abc.Iterable[~T]]) ‑> collections.abc.Iterable[~T]

Concatenate some iterables.

Expand source code
@beartype
def cat(xs: Iterable[Iterable[T]]) -> Iterable[T]:
    """Concatenate some iterables."""
    return chain.from_iterable(xs)
def chdir(directory: Dir) ‑> Dir

Changes working directory and returns old cwd.

Expand source code
@beartype
def chdir(directory: Dir) -> Dir:
    """Changes working directory and returns old cwd."""
    old: Dir = F.cwd()
    os.chdir(directory)
    return old
def chk(path: pathlib.Path, resolve: bool = True) ‑> Union[FileDirEmptyDirPseudoFileNoPathNoFileLink]

Test whether path is a file, a directory, or something else.

Expand source code
@beartype
def chk(
    path: Path,
    resolve: bool = True,
) -> Union[File, Dir, EmptyDir, PseudoFile, NoPath, NoFile, Link]:
    """Test whether `path` is a file, a directory, or something else."""
    if resolve:
        path = path.resolve()
    if path.is_file():
        return File(path)
    if path.is_dir():
        if is_empty(Dir(path)):
            return EmptyDir(path)
        return Dir(path)
    if path.exists():
        return PseudoFile(path)
    if os.path.islink(path):
        return Link(path)
    if path.parent.is_dir():
        return NoFile(path)
    return NoPath(path)
def commitall(repo: git.repo.base.Repo, msg: str) ‑> git.objects.commit.Commit

Commit all contents of a git repository.

Expand source code
@beartype
def commitall(repo: git.Repo, msg: str) -> git.Commit:
    """Commit all contents of a git repository."""
    repo.git.add(all=True)
    return repo.index.commit(msg)
def copyfile(source: File, target: Union[FileNoFile]) ‑> File

Safely copy a file to a valid location.

Expand source code
@beartype
def copyfile(source: File, target: Union[File, NoFile]) -> File:
    """Safely copy a file to a valid location."""
    shutil.copyfile(source, target)
    return File(target.resolve())
def copytree(source: Dir, target: NoFile) ‑> Dir

Call shutil.copytree().

Expand source code
@beartype
def copytree(source: Dir, target: NoFile) -> Dir:
    """Call shutil.copytree()."""
    shutil.copytree(source, target, symlinks=True)
    return Dir(target.resolve())
def curried(func: collections.abc.Callable[[typing.Any, ...], ~T]) ‑> collections.abc.Callable[[typing.Any, ...], ~T]

A decorator that makes the function curried

Usage example:

>>> @curried
... def sum5(a, b, c, d, e):
...     return a + b + c + d + e
...
>>> sum5(1)(2)(3)(4)(5)
15
>>> sum5(1, 2, 3)(4, 5)
15
Expand source code
@beartype
def curried(func: Callable[[Any, ...], T]) -> Callable[[Any, ...], T]:
    """A decorator that makes the function curried

    Usage example:

    >>> @curried
    ... def sum5(a, b, c, d, e):
    ...     return a + b + c + d + e
    ...
    >>> sum5(1)(2)(3)(4)(5)
    15
    >>> sum5(1, 2, 3)(4, 5)
    15
    """

    def _args_len(func):
        # pylint: disable=import-outside-toplevel
        good = True
        try:
            from inspect import signature

            signature(func)
        except TypeError:
            good = False

        if good and _has_type_hint_support:
            from inspect import signature

            args = signature(func).parameters
        else:
            from inspect import getfullargspec

            args = getfullargspec(func).args

        return len(args)

    @wraps(func)
    def _curried(*args, **kwargs):
        f = func
        count = 0
        while isinstance(f, partial):
            if f.args:
                count += len(f.args)
            f = f.func

        if count == _args_len(f) - len(args):
            return func(*args, **kwargs)

        para_func = partial(func, *args, **kwargs)
        if hasattr(f, "__name__"):
            update_wrapper(para_func, f)
        return curried(para_func)

    def _curried_lambda(*args, **kwargs):
        return partial(func, *args, **kwargs)

    if func.__name__ == "<lambda>":
        return _curried_lambda

    return _curried
def cwd() ‑> Dir

Call Path.cwd().

Expand source code
@beartype
def cwd() -> Dir:
    """Call Path.cwd()."""
    return Dir(Path.cwd().resolve())
def force_mkdir(path: pathlib.Path) ‑> Dir

Make a directory (with parents, ok if it already exists).

Expand source code
@beartype
def force_mkdir(path: Path) -> Dir:
    """Make a directory (with parents, ok if it already exists)."""
    path.mkdir(parents=True, exist_ok=True)
    return Dir(path.resolve())
def get_batches(lst: list[File], n: int) ‑> collections.abc.Generator[File, None, None]

Yield successive n-sized chunks from lst.

Expand source code
@beartype
def get_batches(lst: List[File], n: int) -> Generator[File, None, None]:
    """Yield successive n-sized chunks from lst."""
    for i in range(0, len(lst), n):
        yield lst[i : i + n]
def git_rm(repo: git.repo.base.Repo, path: str) ‑> str

Remove a path in a repo.

Expand source code
@curried
@beartype
def git_rm(repo: git.Repo, path: str) -> str:
    """Remove a path in a repo."""
    repo.git.rm(path)
    return path
def gitd(repo: git.repo.base.Repo) ‑> Dir

Get git directory of a repo.

Expand source code
@beartype
def gitd(repo: git.Repo) -> Dir:
    """Get git directory of a repo."""
    return Dir(repo.git_dir).resolve()
def init(targetdir: Dir) ‑> tuple[git.repo.base.Repo, str]

Run git init(), returning the repo and initial branch name.

Expand source code
@beartype
def init(targetdir: Dir) -> Tuple[git.Repo, str]:
    """Run `git init`, returning the repo and initial branch name."""
    branch = BRANCH_NAME
    try:
        repo = git.Repo.init(targetdir, initial_branch=BRANCH_NAME)
    except git.GitCommandError:
        branch = "master"
        repo = git.Repo.init(targetdir)
    return repo, branch
def is_empty(directory: Dir) ‑> bool

Check if directory is empty, quickly.

Expand source code
@beartype
def is_empty(directory: Dir) -> bool:
    """Check if directory is empty, quickly."""
    return not next(os.scandir(directory), None)
def is_root(path: Union[FileDir]) ‑> bool

Check if 'path' is a root directory (e.g., '/' on Unix or 'C:' on Windows).

Expand source code
@beartype
def is_root(path: Union[File, Dir]) -> bool:
    """Check if 'path' is a root directory (e.g., '/' on Unix or 'C:\' on Windows)."""
    # Links and `~`s are resolved before checking.
    path = path.resolve()
    return len(path.parents) == 0
def isfile(p: pathlib.Path) ‑> bool

Check if p is a File.

Expand source code
@beartype
def isfile(p: Path) -> bool:
    """Check if `p` is a File."""
    return isinstance(p, File)
def ki_rev_to_rev(ki_rev: KiRev) ‑> Rev

Convert a ki repository commit rev to a git repository commit rev.

Expand source code
@beartype
def ki_rev_to_rev(ki_rev: KiRev) -> Rev:
    """Convert a ki repository commit rev to a git repository commit rev."""
    return Rev(ki_rev.kirepo.repo, ki_rev.sha)
def md5(path: File) ‑> str

Compute md5sum of file at path.

Expand source code
@beartype
def md5(path: File) -> str:
    """Compute md5sum of file at `path`."""
    hash_md5 = hashlib.md5()
    with open(path, "rb") as f:
        for chunk in iter(lambda: f.read(4096), b""):
            hash_md5.update(chunk)
    return hash_md5.hexdigest()
def mkdir(path: NoPath) ‑> EmptyDir

Make a directory (with parents).

Expand source code
@beartype
def mkdir(path: NoPath) -> EmptyDir:
    """Make a directory (with parents)."""
    path.mkdir(parents=True)
    return EmptyDir(path)
def mkdtemp() ‑> EmptyDir

Make a temporary directory (in /tmp).

Expand source code
@beartype
def mkdtemp() -> EmptyDir:
    """Make a temporary directory (in /tmp)."""
    return EmptyDir(tempfile.mkdtemp()).resolve()
def mksubdir(directory: EmptyDir, suffix: pathlib.Path) ‑> EmptyDir

Make a subdirectory of an empty directory (with parents).

Returns

EmptyDir
The created subdirectory.
Expand source code
@beartype
def mksubdir(directory: EmptyDir, suffix: Path) -> EmptyDir:
    """
    Make a subdirectory of an empty directory (with parents).

    Returns
    -------
    EmptyDir
        The created subdirectory.
    """
    subdir = directory / suffix
    subdir.mkdir(parents=True)
    directory.__class__ = Dir
    return EmptyDir(subdir.resolve())
def movetree(source: Dir, target: NoFile) ‑> Dir

Call shutil.move().

Expand source code
@beartype
def movetree(source: Dir, target: NoFile) -> Dir:
    """Call shutil.move()."""
    shutil.move(source, target)
    return Dir(target.resolve())
def parent(path: Union[FileDir]) ‑> Dir

Get the parent of a path that exists. If the path points to the filesystem root, we return itself.

Expand source code
@beartype
def parent(path: Union[File, Dir]) -> Dir:
    """
    Get the parent of a path that exists.  If the path points to the filesystem
    root, we return itself.
    """
    if is_root(path):
        return Dir(path.resolve())
    return Dir(path.parent)
def part(p: collections.abc.Callable[[~T], bool], xs: collections.abc.Iterable[~T]) ‑> tuple[collections.abc.Iterable[~T], collections.abc.Iterable[~T]]

Partition a list on a boolean predicate (Trues, Falses).

Expand source code
@beartype
def part(p: Callable[[T], bool], xs: Iterable[T]) -> Tuple[Iterable[T], Iterable[T]]:
    """Partition a list on a boolean predicate (Trues, Falses)."""
    return reduce(lambda s, x: s[not p(x)].append(x) or s, xs, ([], []))
def progressbar(xs: collections.abc.Iterable[~T], s: str) ‑> collections.abc.Iterable[~T]

Print a progress bar for an iterable.

Expand source code
@beartype
def progressbar(xs: Iterable[T], s: str) -> Iterable[T]:
    """Print a progress bar for an iterable."""
    ys: Iterable[T] = tqdm(xs, ncols=80)
    ys.set_description(s)
    return ys
def red(s: str) ‑> None

Print a message to the console in red.

Expand source code
@beartype
def red(s: str) -> None:
    """Print a message to the console in red."""
    print(f"{Fore.RED}{s}{Style.RESET_ALL}")
def rev_exists(repo: git.repo.base.Repo, rev: str) ‑> bool

Check if git commit reference exists in repository.

Expand source code
@beartype
def rev_exists(repo: git.Repo, rev: str) -> bool:
    """Check if git commit reference exists in repository."""
    try:
        repo.git.rev_parse("--verify", rev)
    except git.GitCommandError:
        return False
    return True
def rglob(d: Dir, pattern: str) ‑> list[File]

Call d.rglob() and returns only files.

Expand source code
@beartype
def rglob(d: Dir, pattern: str) -> List[File]:
    """Call d.rglob() and returns only files."""
    files = filter(lambda p: isinstance(p, File), map(F.chk, d.rglob(pattern)))
    return list(files)
def rmsm(repo: git.repo.base.Repo, sm: git.objects.submodule.base.Submodule) ‑> git.objects.commit.Commit

Remove a git submodule.

Expand source code
@curried
@beartype
def rmsm(repo: git.Repo, sm: git.Submodule) -> git.Commit:
    """Remove a git submodule."""
    # Remove the submodule root and delete its .git directory.
    sm_root = Path(sm.module().working_tree_dir)
    repo.git.rm(sm_root, cached=True)
    dotgit = F.chk(sm_root / GIT)
    if isinstance(dotgit, Dir):
        F.rmtree(dotgit)
    else:
        dotgit.unlink(missing_ok=True)

    # Directory `sm_root` should still exist after `git.rm()` call.
    repo.git.add(sm_root)
    return repo.index.commit(f"Add submodule `{sm.name}` as ordinary directory.")
def rmtree(target: Dir) ‑> NoFile

Equivalent to shutil.rmtree().

Expand source code
@beartype
def rmtree(target: Dir) -> NoFile:
    """Equivalent to `shutil.rmtree()`."""
    shutil.rmtree(target)
    return NoFile(target)
def root(repo: git.repo.base.Repo) ‑> Dir

Get working directory of a repo.

Expand source code
@beartype
def root(repo: git.Repo) -> Dir:
    """Get working directory of a repo."""
    return Dir(repo.working_dir).resolve()
def shallow_walk(directory: Dir) ‑> tuple[Dir, list[Dir], list[File]]

Walk only the top-level directory with os.walk().

Expand source code
@functools.cache
@beartype
def shallow_walk(
    directory: Dir,
) -> Tuple[Dir, List[Dir], List[File]]:
    """Walk only the top-level directory with `os.walk()`."""
    # pylint: disable=redefined-outer-name
    root, dirs, files = next(os.walk(directory))
    root = Dir(root)
    dirs = [Dir(root / d) for d in dirs]
    # TODO: Treat symlinks.
    files = [File(root / f) for f in files]
    return root, dirs, files
def singleton(name: str) ‑> Singleton

Removes all forward slashes and returns a Singleton pathlib.Path.

Expand source code
@beartype
def singleton(name: str) -> Singleton:
    """Removes all forward slashes and returns a Singleton pathlib.Path."""
    return Singleton(name.replace("/", ""))
def slugify(value: str) ‑> str

Taken from [1]. Convert spaces or repeated dashes to single dashes. Remove characters that aren't alphanumerics, underscores, or hyphens. Convert to lowercase. Also strip leading and trailing whitespace, dashes, and underscores.

[1] https://github.com/django/django/blob/master/django/utils/text.py

Expand source code
@beartype
def slugify(value: str) -> str:
    """
    Taken from [1]. Convert spaces or repeated dashes to single dashes. Remove
    characters that aren't alphanumerics, underscores, or hyphens. Convert to
    lowercase. Also strip leading and trailing whitespace, dashes, and
    underscores.

    [1] https://github.com/django/django/blob/master/django/utils/text.py
    """
    value = unicodedata.normalize("NFKC", value)
    value = re.sub(SLUG_REGEX, "", value.lower())
    return re.sub(r"[-\s]+", "-", value).strip("-_")
def starfilter(f: collections.abc.Callable[[typing.Any, ...], bool], xs: collections.abc.Iterable[tuple[typing.Any, ...]]) ‑> collections.abc.Iterable[tuple[typing.Any, ...]]

Filter an iterable, automatically unpacking tuple arguments.

Expand source code
@beartype
def starfilter(
    f: Callable[[Any, ...], bool], xs: Iterable[Tuple[Any, ...]]
) -> Iterable[Tuple[Any, ...]]:
    """Filter an iterable, automatically unpacking tuple arguments."""
    return filter(lambda x: f(*x), xs)

Link path to target.

Expand source code
@beartype
def symlink(path: NoFile, target: Path) -> Link:
    """Link `path` to `target`."""
    os.symlink(target, path)
    return Link(path)
def touch(directory: Dir, name: str) ‑> File

Touch a file.

Expand source code
@beartype
def touch(directory: Dir, name: str) -> File:
    """Touch a file."""
    path = directory / singleton(name)
    path.touch()
    return File(path.resolve())

Safely unlink a file.

Expand source code
@beartype
def unlink(file: Union[File, Link]) -> NoFile:
    """Safely unlink a file."""
    os.unlink(file)
    return NoFile(file)
def unsubmodule(repo: git.repo.base.Repo) ‑> git.repo.base.Repo

Un-submodule all the git submodules (converts them to ordinary subdirs and destroys commit history). Commit the changes to the main repository.

Expand source code
@beartype
def unsubmodule(repo: git.Repo) -> git.Repo:
    """
    Un-submodule all the git submodules (converts them to ordinary subdirs and
    destroys commit history). Commit the changes to the main repository.
    """
    _: List[git.Commit] = list(map(F.rmsm(repo), repo.submodules))
    gitmodules_file: Path = F.root(repo) / GITMODULES_FILE
    if gitmodules_file.exists():
        repo.git.rm(gitmodules_file)
        _ = repo.index.commit("Remove `.gitmodules` file.")
    return repo
def walk(directory: Dir) ‑> frozenset[typing.Union[FilePseudoFileLinkNoFile]]

Get all file-like leaves in a directory, recursively.

Expand source code
@beartype
def walk(
    directory: Dir,
) -> FrozenSet[Union[File, PseudoFile, Link, NoFile]]:
    """Get all file-like leaves in a directory, recursively."""
    # pylint: disable=redefined-outer-name
    leaves = frozenset()
    for root, _, files in os.walk(directory):
        root = Dir(root)
        leaves |= frozenset({F.chk(root / f) for f in files})
    return leaves
def write(path: Union[FileNoFile], text: str) ‑> File

Write text to a file.

Expand source code
@beartype
def write(path: Union[File, NoFile], text: str) -> File:
    """Write text to a file."""
    with open(path, "w+", encoding="UTF-8") as f:
        f.write(text)
    return File(path)
def writeb(path: Union[FileNoFile], bs: bytes) ‑> File

Write text to a file.

Expand source code
@beartype
def writeb(path: Union[File, NoFile], bs: bytes) -> File:
    """Write text to a file."""
    with open(path, "wb") as f:
        f.write(bs)
    return File(path)
def yellow(s: str) ‑> None

Print a message to the console in yellow.

Expand source code
@beartype
def yellow(s: str) -> None:
    """Print a message to the console in yellow."""
    print(f"{Fore.YELLOW}{s}{Style.RESET_ALL}")