Package ki
Ki is a command-line interface for the version control and editing of .anki2
collections as git repositories of markdown files.
Rather than providing an
interactive UI like the Anki desktop client, ki aims to allow natural editing
in the filesystem.
In general, the purpose of ki is to allow users to work on large, complex Anki decks in exactly the same way they work on large, complex software projects.
Ki provides command-line functions to:
- clone a
.anki2
collection into a directory as a git repository. - pull changes from the Anki desktop client (and AnkiWeb) into an existing repository.
- push changes (safely!) back to Anki.
This is documentation for the ki repository.
Installation
Ki is tested on Python 3.9. You'll need to install Python and Git, and then run the following command in a terminal:
- Install the
ki
package:
pip install git+https://github.com/langfield/ki.git@main
Getting started
This section will walk through the following example workflow:
- Cloning an existing collection into a
ki
repository. - Editing the note files in the repository.
- Pushing those edits back to Anki.
- Pulling changes made in Anki into the repository.
Before cloning, we'll need to find our .anki2
collection file.
This is where Anki stores the data for all our notes.
Note. If you're new to Anki, or are unfamiliar with the terms collection, profile, note, or card, you may wish to take a look at the Anki documentation.
If you already know the path to the .anki2
collection file you want to clone,
skip to the section on running the clone command.
Finding the .anki2
collection file
To find our collection file, we must first find our Anki data directory. The location of this varies by operating system.
In most cases, you should be able to find your data directory at the path given below for your respective OS:
MacOS
~/Library/Application Support/Anki2
Windows
%APPDATA%\Anki2
GNU/Linux
~/.local/share/Anki2
Note. You can read more about the default Anki data directory locations here.
If you are running Anki 2.1 (which you should be, because ki
is not tested
with lower versions), opening this directory will reveal several files and
subdirectories. The following example output is from a machine running Debian
GNU/Linux:
user@host:~/.local/share/Anki2$ ls
addons21 crash.log prefs21.db README.txt 'User 1'
In particular, there is a subdirectory for each profile. In the above
example, there is only one profile, User 1
. But, in general, there may be
many profiles associated with a given Anki installation.
Multiple profiles
Below we can see a visual representation of the directory structure of an
Anki data directory with two profiles, User 1
, and User 2
:
Anki2/
├── addons21
│ ├── 1046608507
│ ├── 109531687
│ ├── 1097423555
│ └── 1972239816
├── crash.log
├── prefs21.db
├── README.txt
├── User 1
│ ├── backups
│ ├── collection2.log
│ ├── collection.anki2
│ ├── collection.log
│ ├── collection.media
│ ├── collection.media.db2
│ └── deleted.txt
└── User 2
├── collection.anki2
├── collection.anki2-wal
└── collection.media
Note that there is a collection.anki2
file in each profile subdirectory.
If you're not sure of the name of your user profile, it can be seen in the title bar of the Anki desktop client:
Most Anki installations will only have one profile, and if you haven't changed
the default profile name, it will probably be called User 1
. Let's enter the
profile directory for User 1
and list its contents:
user@host:~/.local/share/Anki2$ cd User\ 1/
user@host:~/.local/share/Anki2/User 1$ ls
backups collection2.log collection.anki2 collection.log collection.media collection.media.db2 deleted.txt
So if we want to clone User 1
's collection, the path that we want is:
~/.local/share/Anki2/User\ 1/collection.anki2
We'll pass this as a command-line argument to the ki
executable in the next
section.
Running the clone command
Now we're ready to actually clone the collection into a repository. The ki clone
command works similarly to git clone
, in that it will create a new directory
for the repository within the current working directory. So if we want to
clone our collection into a new subdirectory in ~/
(the home directory on
macOS and GNU/Linux), we would first make sure we're in the home directory.
Second, we need to check that Anki is closed before cloning. Nothing bad
will happen if we clone while Anki is open, but the command will fail because
the database is locked. Once we've done that, we can run the command:
ki clone ~/.local/share/Anki2/User 1/collection.anki2
And we should see output that looks similar to this:
lyra@oxford$ ki clone ~/.local/share/Anki2/User 1/collection.anki2
Found .anki2 file at '/home/lyra/.local/share/Anki2/User 1/collection.anki2'
Computed md5sum: ad7ea6d486a327042cf0b09b54626b66
Wrote md5sum to '/home/lyra/collection/.ki/hashes'
Cloning into '/home/lyra/collection/'...
100%|█████████████████████████| 28886/28886 [00:10<00:00, 2883.78it/s]
If we list the contents of the home directory, we can see that ki
did
indeed create a new directory called collection
:
lyra@oxford:~$ ls
collection pkgs
Editing notes
Now that we've successfully cloned our Anki collection into a ki
repository,
we can start editing notes! Our home directory looks like this:
lyra@oxford:~$ ls
collection pkgs
And we see the repo we cloned, which is called collection
.
Let's change directories to the newly cloned ki
repo and take a look at
what's inside:
lyra@oxford:~$ cd collection/
lyra@oxford:~/collection$ ls --classify
algebras/ manifolds/ rings/
We see that we have three directories, which represent three Anki decks. This is just an example; you'll see directories corresponding to the top-level decks in your Anki collection.
Note. The
ls --classify
command adds a trailing/
to the end of directories to distinguish them from ordinary files.
Lets enter the manifolds
directory and see what's inside.
lyra@oxford:~/collection$ cd manifolds/
lyra@oxford:~/collection/manifolds$ ls
MANIFOLDS.md
So we see a single markdown file called MANIFOLDS.md
, which contains the
notes for the manifolds deck. If we had subdecks of the manifolds deck, we
would see more subdirectories here, and each one would have a markdown file in
it as well. Lets open this file and see what's inside.
We'll use vim to open the markdown file in this example, but any text editor will work.
lyra@oxford:~/collection/manifolds$ vi MANIFOLDS.md
# Note
nid: 1622849751948
model: Basic
deck: manifolds
tags:
markdown: false
## Front
Diffeomorphism
## Back
A smooth surjective map between manifolds which has a smooth inverse.
# Note
nid: 1566621764508
model: Basic
deck: manifolds
tags:
markdown: false
## Front
distribution (on a smooth manifold)
## Back
A distribution on \(M\) of rank \(k\) is a rank-\(k\) subbundle of \(TM\)
So we see the structure of two notes inside this file. For each note, there is a section for note metadata, and a section for each field.
There is a typo in the first note. It says smooth surjective map
, but it
should say smooth bijective map
. Lets fix it, save our changes, and go back
to the terminal. When we go back up to the root of the repository and run git
status
, we can see which files we've changed.
INTERNAL. Add the output of git status here.
And running git diff
shows us the content of the unstaged changes:
INTERNAL. Add the output of git diff here.
Then we can commit our changes as usual.
lyra@oxford:~/collection$ git add manifolds/MANIFOLDS.md
lyra@oxford:~/collection$ git commit -m "Fix typo in diffeomorphism definition: 'surjective' -> 'bijective'"
At this point we would usually git push
, but if we try that in a ki
repository, we'll see this:
lyra@oxford:~/collection$ git push
fatal: No configured push destination.
Either specify the URL from the command-line or configure a remote repository using
git remote add <name> <url>
and then push using the remote name
git push <name>
Since we're not pushing to an ordinary git
remote, but to the Anki SQLite3
database, we must use ki push
instead, which is covered briefly in the next
section.
Pushing committed changes back to Anki
This part is super easy! Similar to when we cloned, we must remember to close Anki before pushing, or the command will fail (gracefully). All right, now we just run the command:
lyra@oxford:~/collection$ ki push
Pushing to '/home/lyra/.local/share/Anki2/lyra/collection.anki2'
Computed md5sum: 199216c39eeabe23a1da016a99ffd3e2
Verified md5sum matches latest hash in '/home/lyra/decks/.ki/hashes'
Generating local .anki2 file from latest commit: 2aa009729b6dd337dd1ce795df611f5a49
Writing changes to '/tmp/tmpyiids2qm/original.anki2'...
100%|█████████████████████████████████| 2/2 [00:00<00:00, 1081.56it/s]
Database was modified.
Writing backup of .anki2 file to '/home/lyra/decks/.ki/backups'
Overwrote '/home/lyra/.local/share/Anki2/lyra/collection.anki2'
As the output suggests, ki
saves a backup of our collection each time we
push
, just in case we wish to hard-revert a change you've made.
Now we can open Anki and view the changes we've made in the note browser!
Pulling changes from Anki into the repository
So now we know how to make changes from the filesystem and push them back to Anki, but suppose that after we cloned our repository, we made some edits within Anki, and we'd like those to show up in our repository? For this, we'll need to close Anki, and then run the following command:
lyra@oxford:~/collection$ ki pull
Pulling from '/home/lyra/.local/share/Anki2/lyra/collection.anki2'
Computed md5sum: 199216c39eeabe23a1da016a99ffd3e2
Updating 5a9ef09..9c30b73
Fast-forward
note1645010162168.md | 4 ++--
note1645222430007.md | 11 +++++++++++
2 files changed, 13 insertions(+), 2 deletions(-)
create mode 100644 note1645222430007.md
From /tmp/tmpt5a3yd9a/ki/local/199216c39eeabe23a1da016a99ffd3e2/
* branch main -> FETCH_HEAD
* [new branch] main -> anki/main
Wrote md5sum to '/home/lyra/decks/.ki/hashes'
And we're done! Our repository is up to date, as ki
will tell us if we try to pull again:
lyra@oxford:~/collection$ ki pull
ki pull: up to date.
Merge conflicts
Occasionally, when we edit the same lines in the same note fields in both Anki and our local repository, we may encounter a merge conflict:
lyra@oxford:~/collection$ ki pull
Pulling from '/home/lyra/.local/share/Anki2/User 1/collection.anki2'
Computed md5sum: debeb6689f0b83d520ff913067c598e9
Auto-merging note1645788806304.md
CONFLICT (add/add): Merge conflict in note1645788806304.md
Automatic merge failed; fix conflicts and then commit the result.
From /tmp/tmpgkq4ilfy/ki/local/debeb6689f0b83d520ff913067c598e9/
* branch main -> FETCH_HEAD
* [new branch] main -> anki/main
Wrote md5sum to '/home/mal/collection/.ki/hashes'
This is expected behavior, and since the process of resolving merge conflicts
is the same for ki
repositories as git
repositories (since ki
repositories are git repositories), we refer to
StackOverflow
for how to proceed.
Usage reference
Clone
The ki clone
command takes one required argument (the path to a .anki2
file) and one optional argument (a path to a target directory). The usage is
meant to mirror that of git clone
.
An example of the clone
subcommand usage and its output is given below.
$ ki clone ~/.local/share/Anki2/lyra/collection.anki2 decks
Found .anki2 file at '/home/lyra/.local/share/Anki2/lyra/collection.anki2'
Computed md5sum: ad7ea6d486a327042cf0b09b54626b66
Wrote md5sum to '/home/lyra/decks/.ki/hashes'
Cloning into '/home/lyra/decks/'...
100%|█████████████████████████| 28886/28886 [00:10<00:00, 2883.78it/s]
Pull
Once an Anki collection has been cloned, we can pull
changes made by the Anki
desktop client into our repository.
An example of the pull
subcommand usage and its output is given below.
$ ki pull
Pulling from '/home/lyra/.local/share/Anki2/lyra/collection.anki2'
Computed md5sum: 199216c39eeabe23a1da016a99ffd3e2
Updating 5a9ef09..9c30b73
Fast-forward
note1645010162168.md | 4 ++--
note1645222430007.md | 11 +++++++++++
2 files changed, 13 insertions(+), 2 deletions(-)
create mode 100644 note1645222430007.md
From /tmp/tmpt5a3yd9a/ki/local/199216c39eeabe23a1da016a99ffd3e2/
* branch main -> FETCH_HEAD
* [new branch] main -> anki/main
Wrote md5sum to '/home/lyra/decks/.ki/hashes'
ki
first deletes any residual ephemeral repositories in /tmp/ki/remote/
.
These would only remain here if a previous pull command failed.
It then verifies that the path to the .anki2
file specified in the .ki/
directory (analogous to the .git/
directory) still exists.
It computes and records the hash of the collection file. In this way, ki
keeps track of whether the collection database has changed since the last
clone
/pull
.
Finally, the collection is then cloned into an ephemeral repository in a temp
directory, which is then git pull
-ed into the current repository.
At this point, if the git operation fails, the user can take over and manage the merge themselves.
Push
When we want to push our changes back to the Anki desktop client, we can use
ki push
to do that.
An example of the push
subcommand usage and its output is given below.
$ ki push
Pushing to '/home/lyra/.local/share/Anki2/lyra/collection.anki2'
Computed md5sum: 199216c39eeabe23a1da016a99ffd3e2
Verified md5sum matches latest hash in '/home/lyra/decks/.ki/hashes'
Generating local .anki2 file from latest commit: 2aa009729b6dd337dd1ce795df611f5a49
Writing changes to '/tmp/tmpyiids2qm/original.anki2'...
100%|█████████████████████████████████| 2/2 [00:00<00:00, 1081.56it/s]
Database was modified.
Writing backup of .anki2 file to '/home/lyra/decks/.ki/backups'
Overwrote '/home/lyra/.local/share/Anki2/lyra/collection.anki2'
We store 5 backups of the collection prior to a push.
Collaborative decks
This section assumes knowledge of the basic ki
operations and familiarity
with git
. If you haven't yet cloned your Anki collection into a ki
repository, read the getting started section.
- Cloning a collaborative deck from GitHub.
- Editing the collaborative deck.
- [Pulling][pulling other users' changes from github] other users' changes to the deck from GitHub.
- [Pushing][pushing edits back to github] edits back to GitHub.
Cloning a collaborative deck from GitHub
Now that we've created our first ki
repository, we might want to try our hand
at collaborating on a deck with other Anki users. We won't actually need to
make use of the ki
program to do this, because ki
repositories are also
git repositories, and so we can clone collaborative decks from GitHub as
git-submodules
of our collection repo.
Note. If you're completely unfamiliar with
git
, consider reading this short introduction.
Suppose we've cloned an Anki collection into a ki
repository in our home
directory, just like we did in the getting started section,
and we want to add a collaborative deck from GitHub to our collection. Let's
walk through an example. Our home directory looks like this:
lyra@oxford:~$ ls
collection pkgs
And we see the repo we cloned, which is called collection
.
To add a collaborative deck repo as a submodule, we'll first need to change
directories to the newly cloned ki
repo:
lyra@oxford:~$ cd collection/
lyra@oxford:~/collection$ ls --classify
algebras/ groups/ rings/
We see that we have three directories, which represent three Anki decks. This is just an example; you'll see directories corresponding to the top-level decks in your Anki collection.
Note. The
ls --classify
command adds a trailing/
to the end of directories to distinguish them from ordinary files.
Adding the repository as a git submodule
Suppose we want to add the collaborative deck https://github.com/langfield/manifolds.git to our collection. We can do that by running the command:
git-submodule add https://github.com/langfield/manifolds.git
which yields the output:
lyra@oxford~/collection$ git-submodule add https://github.com/langfield/manifolds.git
Cloning into 'manifolds'...
remote: Counting objects: 11, done.
remote: Compressing objects: 100% (10/10), done.
remote: Total 11 (delta 0), reused 11 (delta 0)
Unpacking objects: 100% (11/11), done.
Checking connectivity... done.
And we can see that the command was successful because we have a new
directory/deck called manifolds
in our repo:
lyra@oxford:~/collection$ ls --classify
algebras/ groups/ manifolds/ rings/
Nice!
Editing a collaborative deck
There are two ways to edit a collaborative deck locally:
- Edit the markdown files in the
ki
repository. - Edit the deck inside the Anki desktop client.
After we've cloned the manifolds
deck repository into a submodule of our ki
repository, we may want to make some edits to the deck.
How it works
ki
is built on top of existing tooling implemented in the python package
apy
, which is used to parse the Anki
collection SQLite file and convert its contents to human-readable markdown
files.
These files (one per Anki note) are then dumped to a configurable location in
the filesystem as a git repository, whose structure mirrors that of the decks
in the collection. In effect, ki
treats the git repo it generates as a local
copy of the collection, and the .anki2
collection file as a remote.
All operations like pulling updates to the collection into ki
and pushing
updates from ki
into Anki are handled by git under the hood.
This appproach has several advantages:
- Merge conflicts can be handled in the usual, familiar way.
- Additional remotes (e.g. a human-readable backup of a collection on github) can be added easily.
- Users are free to pick the editor of their choice, perform batch editing
with command line tools like
awk
orsed
, and even add CI actions.
Model
The following diagram shows the dataflow of a typical Anki/ki
stack.
+-------------+ +--------------+
| | | |
| AnkiWeb ------------- AnkiMobile |
| | sync | |
+------|------+ +--------------+
|
| sync
|
+------|------+
| |
| Anki |
| |
+------|------+
|
| deck edits
|
+--------|--------+ +------------------+
| | ki clone | |
| ----------------> |
| Collection file | | ~/decks/ |
| (.anki2) | ki push | (git repository) |
| <---------------- |
| | | |
+--------|--------+ +---------^--------+
| |
| ki pull |
| |
| |
+----------v----------+ |
| | |
| /tmp/ki/remote/AAA | ki pull |
| (git repository) -------------------------
| [ephemeral] |
| |
+---------------------+
The node labeled Anki is the Anki desktop client on the localhost. It communicates with the AnkiWeb servers via Anki's sync feature. Other clients (e.g. AnkiDroid and AnkiMobile) are able to (1) pull changes made by the desktop client into their local collections via AnkiWeb, and (2) push changes made locally back to AnkiWeb.
When the Anki desktop client is started on the localhost, it opens and places a
lock on the .anki2
SQLite file. During the session, changes are possibly made
to the deck, and the SQLite file is unlocked when the program is closed.
Since ki
must read from this database file, that means that ki
commands
will not work while Anki is running. This is by design: the database is
locked for a reason, and enforcing this constraint lowers the likelihood that
users' decks become corrupted.
An ephemeral repository is used as an auxiliary step during the ki pull
operation so that we can merge the Anki desktop client's changes into our
repository via git.
Generating html
By default, ki
parses the html of each field and dumps the content only,
insofar as that is possible. It also supports parsing arbitrary html elements
autogenerated by addons and regenerated the updated content. In the following
subsection, we walk through an example.
Example: generating syntax-highlighted code blocks
The anki addon developer Glutanimate has an addon called syntax-highlighting
,
which adds UI elements to the Anki note editor that automatically generates a
syntax highlighted version of a code block from the clipboard. In effect, it
generates a formatted HTML table for the code listing that gets dumped into the
source of relevant note field.
A fork of this addon for the latest version of Anki (2.1.49 at the time of writing), is available here: https://ankiweb.net/shared/info/1972239816
And the source tree for the original addon is on github: https://github.com/glutanimate/syntax-highlighting
For example, consider the following python code block:
n = 1
n >> 1
print(n)
Given the above code, the addon generates the following HTML:
<table class="highlighttable">
<tbody>
<tr>
<td class="linenos">
<div class="linenodiv">
<pre>
<span class="normal">1</span>
<span class="normal">2</span>
<span class="normal">3</span>
</pre>
</div>
</td>
<td class="code">
<div class="highlight">
<pre>
<code>
<span class="n">n</span>
<span class="o">=</span>
<span class="mi">1</span>
<br>
<span class="n">n</span>
<span class="o">>></span>
<span class="mi">1</span>
<br>
<span class="nb">print</span>
<span class="p">(</span>
<span class="n">n</span>
<span class="p">)</span>
<br>
</code>
</pre>
</div>
</td>
</tr>
</tbody>
</table>
Editing fields like this could become annoying very quickly. It would be better
if ki
just gave us the markdown version above (only 3 lines), and then
regenerated the note field HTML when converting the repository back into a
.anki2
deck.
Adding ki
HTML attributes
And in fact, this is possible. We first fork the addon so we can add some extra
data to our generated HTML. In particular, we'd like to add an attribute
ki-src
whose value is the UTF-8 encoded source code. In general, this will be
the encoded version of the source of whatever we'd like to autoformat.
We also add a ki-formatter
attribute, whose value is an identifier that
specifies a custom python module (we must implement this) that transforms the
(possibly edited) ki-src
text back into a HTML element of the form seen
above.
So let's call our ki-formatter
identifier syntax-hl-python
. Then our addon
has to change the opening tag of the snippet above to look like:
<table class="highlighttable"; ki-src="n = 1\nn >> 1\nprint(n)\n"; ki-formatter="syntax-hl-python">
All ki
needs is the original text of the code block prior to html formatting,
and a function that can reapply the formatting to the modified text. Since the
html table was generated by an addon, we already have a python function for
this, and in general we can provide a ~/.config/ki/ki.json
file that maps
implementation IDs to paths of python modules. The module must have a top-level
function defined of the form format(text: str) -> bs4.Tag
.
If we have an addon implementation, we can import it here and use it in our
format()
implementation. We can add a ki
attribute whose value is the
base64 encoding of the code block, and a implementation
attribute whose value
is the name of a function. At import-time, ki
will decode this and write the
human-readable source to the relevant markdown file instead.
Source code
If you have git
, you can clone
a local copy of the source code by running the following command in a terminal:
git clone git@github.com:langfield/ki.git
Expand source code
"""
Ki is a command-line interface for the version control and editing of `.anki2`
collections as git repositories of markdown files. Rather than providing an
interactive UI like the Anki desktop client, ki aims to allow natural editing
*in the filesystem*.
In general, the purpose of ki is to allow users to work on large, complex Anki
decks in exactly the same way they work on large, complex software projects.
.. include:: ./DOCUMENTATION.md
"""
# pylint: disable=invalid-name, missing-class-docstring, broad-except
# pylint: disable=too-many-return-statements, too-many-lines
import os
import re
import io
import gc
import sys
import time
import json
import copy
import shutil
import random
import logging
import secrets
import sqlite3
import hashlib
import itertools
import traceback
import functools
import subprocess
import dataclasses
import configparser
from pathlib import Path
from contextlib import redirect_stdout
from dataclasses import dataclass
import git
import click
import whatthepatch
import prettyprinter as pp
from tqdm import tqdm
from halo import Halo
from lark import Lark
from loguru import logger
# Required to avoid circular imports because the Anki pylib codebase is gross.
import anki.collection
from anki.cards import Card, CardId, TemplateDict
from anki.decks import DeckTreeNode
from anki.utils import ids2str
from anki.models import ChangeNotetypeInfo, ChangeNotetypeRequest, NotetypeDict
from anki.errors import NotFoundError
from anki.exporting import AnkiExporter
from anki.collection import Collection, Note, OpChangesWithId
from anki.importing.noteimp import NoteImporter
from beartype import beartype
from beartype.typing import (
Set,
List,
Dict,
Any,
Optional,
Callable,
Union,
TypeVar,
Sequence,
Tuple,
Iterator,
)
import ki.maybes as M
import ki.functional as F
from ki.types import (
MODELS_FILE,
ExtantFile,
ExtantDir,
EmptyDir,
NoPath,
NoFile,
Symlink,
LatentSymlink,
ExtantStrangePath,
GitChangeType,
Patch,
Delta,
KiRepo,
Field,
Template,
Notetype,
ColNote,
KiRepoRef,
RepoRef,
Leaves,
NoteDBRow,
DeckNote,
NoteMetadata,
PushResult,
WrittenNoteFile,
UpdatesRejectedError,
TargetExistsError,
CollectionChecksumError,
MissingNotetypeError,
MissingFieldOrdinalError,
MissingNoteIdError,
NotetypeMismatchError,
NoteFieldValidationWarning,
UnhealthyNoteWarning,
UnPushedPathWarning,
NotAnkiNoteWarning,
DeletedFileNotFoundWarning,
DiffTargetFileNotFoundWarning,
NotetypeKeyError,
UnnamedNotetypeError,
SQLiteLockError,
NoteFieldKeyError,
MissingMediaDirectoryError,
MissingMediaFileWarning,
ExpectedNonexistentPathError,
WrongFieldCountWarning,
InconsistentFieldNamesWarning,
MissingTidyExecutableError,
AnkiDBNoteMissingFieldsError,
GitFileModeParseError,
RenamedMediaFileWarning,
NonEmptyWorkingTreeError,
)
from ki.maybes import (
GIT,
GITIGNORE_FILE,
GITMODULES_FILE,
KI,
CONFIG_FILE,
HASHES_FILE,
BACKUPS_DIR,
LAST_PUSH_FILE,
)
from ki.transformer import NoteTransformer, FlatNote
logging.basicConfig(level=logging.INFO)
T = TypeVar("T")
# TODO: What if there is a deck called `_media`?
MEDIA = "_media"
DEV_NULL = "/dev/null"
BATCH_SIZE = 300
HTML_REGEX = r"</?\s*[a-z-][^>]*\s*>|(\&(?:[\w\d]+|#\d+|#x[a-f\d]+);)"
REMOTE_NAME = "anki"
BRANCH_NAME = "main"
CHANGE_TYPES = "A D R M T".split()
TQDM_NUM_COLS = 80
MAX_FILENAME_LEN = 40
IGNORE_DIRECTORIES = set([GIT, KI, MEDIA])
IGNORE_FILES = set([GITIGNORE_FILE, GITMODULES_FILE, MODELS_FILE])
HEAD_SUFFIX = Path("ki-head")
LOCAL_SUFFIX = Path("ki-local")
REMOTE_SUFFIX = Path("ki-remote")
FIELD_HTML_SUFFIX = Path("ki-fieldhtml")
GENERATED_HTML_SENTINEL = "data-original-markdown"
MEDIA_FILE_RECURSIVE_PATTERN = f"**/{MEDIA}/*"
# This is the key for media files associated with notetypes instead of the
# contents of a specific note.
NOTETYPE_NID = -57
MD = ".md"
FAILED = "Failed: exiting."
WARNING_IGNORE_LIST = [NotAnkiNoteWarning, UnPushedPathWarning, MissingMediaFileWarning]
SPINNER = "bouncingBall"
VERBOSE = False
PROFILE = False
BASE91_TABLE = [
"a",
"b",
"c",
"d",
"e",
"f",
"g",
"h",
"i",
"j",
"k",
"l",
"m",
"n",
"o",
"p",
"q",
"r",
"s",
"t",
"u",
"v",
"w",
"x",
"y",
"z",
"A",
"B",
"C",
"D",
"E",
"F",
"G",
"H",
"I",
"J",
"K",
"L",
"M",
"N",
"O",
"P",
"Q",
"R",
"S",
"T",
"U",
"V",
"W",
"X",
"Y",
"Z",
"0",
"1",
"2",
"3",
"4",
"5",
"6",
"7",
"8",
"9",
"!",
"#",
"$",
"%",
"&",
"(",
")",
"*",
"+",
",",
"-",
".",
"/",
":",
";",
"<",
"=",
">",
"?",
"@",
"[",
"]",
"^",
"_",
"`",
"{",
"|",
"}",
"~",
]
@beartype
def lock(col_file: ExtantFile) -> sqlite3.Connection:
"""Check that lock can be acquired on a SQLite3 database given a path."""
try:
con = sqlite3.connect(col_file, timeout=0.1)
con.isolation_level = "EXCLUSIVE"
con.execute("BEGIN EXCLUSIVE")
except sqlite3.DatabaseError as err:
raise SQLiteLockError(col_file, err) from err
if sys.platform == "win32":
con.commit()
con.close()
return con
@beartype
def unlock(con: sqlite3.Connection) -> None:
"""Unlock a SQLite3 database."""
if sys.platform == "win32":
return
con.commit()
con.close()
@beartype
def copy_repo(repo_ref: RepoRef, suffix: str) -> git.Repo:
"""Get a temporary copy of a git repository in /tmp/<suffix>/."""
# Copy the entire repo into `<tmp_dir>/suffix/`.
target: NoFile = F.test(F.mkdtemp() / suffix)
ephem = git.Repo(F.copytree(F.working_dir(repo_ref.repo), target))
# Annihilate the .ki subdirectory.
ki_dir = F.test(F.working_dir(ephem) / KI)
if isinstance(ki_dir, ExtantDir):
F.rmtree(ki_dir)
# Do a reset --hard to the given SHA.
ephem.git.reset(repo_ref.sha, hard=True)
return ephem
@beartype
def copy_kirepo(kirepo_ref: KiRepoRef, suffix: str) -> KiRepo:
"""
Given a KiRepoRef, i.e. a pair of the form (kirepo, SHA), we clone
`kirepo.repo` into a temp directory and hard reset to the given commit
hash. Copies the .ki/ directory from `kirepo_ref.kirepo` without making any
changes.
Parameters
----------
suffix : pathlib.Path
/tmp/.../ path suffix, e.g. `ki/local/`.
kirepo_ref : KiRepoRef
The ki repository to clone, and a commit for it.
Returns
-------
KiRepo
The cloned repository.
"""
ref: RepoRef = F.kirepo_ref_to_repo_ref(kirepo_ref)
ephem: git.Repo = copy_repo(ref, suffix)
ki_dir: Path = F.test(F.working_dir(ephem) / KI)
if not isinstance(ki_dir, NoFile):
raise ExpectedNonexistentPathError(ki_dir)
F.copytree(kirepo_ref.kirepo.ki_dir, ki_dir)
kirepo: KiRepo = M.kirepo(F.working_dir(ephem))
return kirepo
@beartype
def is_anki_note(path: ExtantFile) -> bool:
"""Check if file is a `ki`-style markdown note."""
# Ought to have markdown file extension.
if path.suffix != ".md":
return False
with open(path, "r", encoding="UTF-8") as md_f:
lines = md_f.readlines()
if len(lines) < 8:
return False
if lines[0] != "# Note\n":
return False
if lines[1] != "```\n":
return False
if not re.match(r"^guid: ", lines[2]):
return False
return True
@beartype
def get_note_warnings(
path: Path, root: ExtantDir, ignore_files: Set[str], ignore_dirs: Set[str]
) -> Optional[Warning]:
"""
Filter out paths in a git repository diff that do not correspond to Anki
notes.
We could do this purely using calls to `is_anki_note()`, but these are
expensive, so we try to find matches without opening any files first.
"""
# If `path` is an exact match for one of the patterns in `patterns`, we
# immediately return a warning. Since the contents of a git repository diff
# are always going to be files, this alone will not correctly ignore
# directory names given in `patterns`.
if path.name in ignore_files | ignore_dirs:
return UnPushedPathWarning(path, path.name)
# If any of the patterns in `patterns` resolve to one of the parents of
# `path`, return a warning, so that we are able to filter out entire
# directories.
components: Tuple[str, ...] = path.parts
dirnames: Set[str] = set(components) & ignore_dirs
for dirname in dirnames:
return UnPushedPathWarning(path, dirname)
# If `path` is an extant file (not a directory) and NOT a note, ignore it.
abspath = (root / path).resolve()
if abspath.exists() and abspath.is_file():
file = ExtantFile(abspath)
if not is_anki_note(file):
return NotAnkiNoteWarning(file)
return None
@beartype
def unsubmodule_repo(repo: git.Repo) -> git.Repo:
"""
Un-submodule all the git submodules (convert them to ordinary
subdirectories and destroy their commit history). Commit the changes to
the main repository.
MUTATES REPO in-place!
UNSAFE: git.rm() calls.
"""
gitmodules_path: Path = Path(repo.working_dir) / GITMODULES_FILE
for sm in repo.submodules:
# The submodule path is guaranteed to exist by gitpython.
sm_path = Path(sm.module().working_tree_dir)
repo.git.rm(sm_path, cached=True)
# Annihilate `.gitmodules` file.
if gitmodules_path.is_file:
repo.git.rm(gitmodules_path, ignore_unmatch=True)
sm_git_path = F.test(sm_path / GIT)
if isinstance(sm_git_path, ExtantDir):
F.rmtree(sm_git_path)
else:
(sm_path / GIT).unlink(missing_ok=True)
# Directory should still exist after `git.rm()`.
repo.git.add(sm_path)
_ = repo.index.commit(f"Add submodule `{sm.name}` as ordinary directory.")
if gitmodules_path.exists():
repo.git.rm(gitmodules_path)
_ = repo.index.commit("Remove `.gitmodules` file.")
return repo
@beartype
def diff2(
repo: git.Repo,
parser: Lark,
transformer: NoteTransformer,
) -> List[Union[Delta, Warning]]:
"""Diff `repo` from `HEAD~1` to `HEAD`."""
with F.halo(text=f"Checking out repo '{F.working_dir(repo)}' at HEAD~1..."):
head1: RepoRef = M.repo_ref(repo, repo.commit("HEAD~1").hexsha)
# pylint: disable=consider-using-f-string
uuid = "%4x" % random.randrange(16**4)
# pylint: enable=consider-using-f-string
head1_repo = copy_repo(head1, suffix=f"HEAD~1-{uuid}")
# We diff from A~B.
b_repo = repo
a_repo = head1_repo
# Use a `DiffIndex` to get the changed files.
deltas = []
a_dir = F.test(Path(a_repo.working_dir))
b_dir = F.test(Path(b_repo.working_dir))
head = repo.commit("HEAD")
with F.halo(text=f"Diffing '{head1.sha}' ~ '{head.hexsha}'..."):
diff_index = repo.commit("HEAD~1").diff(head, create_patch=VERBOSE)
for change_type in GitChangeType:
diffs = diff_index.iter_change_type(change_type.value)
bar = tqdm(diffs, ncols=TQDM_NUM_COLS, leave=False)
bar.set_description(f"{change_type}")
for diff in bar:
a_relpath: str = diff.a_path
b_relpath: str = diff.b_path
if diff.a_path is None:
a_relpath = b_relpath
if diff.b_path is None:
b_relpath = a_relpath
a_warning: Optional[Warning] = get_note_warnings(
Path(a_relpath),
a_dir,
ignore_files=IGNORE_FILES,
ignore_dirs=IGNORE_DIRECTORIES,
)
b_warning: Optional[Warning] = get_note_warnings(
Path(b_relpath),
b_dir,
ignore_files=IGNORE_FILES,
ignore_dirs=IGNORE_DIRECTORIES,
)
if VERBOSE:
logger.debug(f"{a_relpath} -> {b_relpath}")
logger.debug(diff.diff.decode())
if a_warning is not None:
deltas.append(a_warning)
continue
if b_warning is not None:
deltas.append(b_warning)
continue
a_path = F.test(a_dir / a_relpath)
b_path = F.test(b_dir / b_relpath)
a_relpath = Path(a_relpath)
b_relpath = Path(b_relpath)
if change_type == GitChangeType.DELETED:
if not isinstance(a_path, ExtantFile):
deltas.append(DeletedFileNotFoundWarning(a_relpath))
continue
deltas.append(Delta(change_type, a_path, a_relpath))
continue
if not isinstance(b_path, ExtantFile):
deltas.append(DiffTargetFileNotFoundWarning(b_relpath))
continue
if change_type == GitChangeType.RENAMED:
a_delta = Delta(GitChangeType.DELETED, a_path, a_relpath)
b_delta = Delta(GitChangeType.ADDED, b_path, b_relpath)
a_decknote: DeckNote = parse_markdown_note(parser, transformer, a_delta)
b_decknote: DeckNote = parse_markdown_note(parser, transformer, b_delta)
if a_decknote.guid != b_decknote.guid:
deltas.append(a_delta)
deltas.append(b_delta)
continue
deltas.append(Delta(change_type, b_path, b_relpath))
if VERBOSE:
echo(f"Diffing '{repo.working_dir}': '{head1.sha}' ~ '{head.hexsha}'")
return deltas
@beartype
def parse_notetype_dict(nt: Dict[str, Any]) -> Notetype:
"""
Convert an Anki NotetypeDict into a Notetype dataclass.
Anki returns objects of type `NotetypeDict` (see pylib/anki/models.py)
when you call a method like `col.models.all()`. This is a dictionary
mapping strings to various stuff, and we read all its data into a python
dataclass here so that we can access it safely. Since we don't expect Anki
to ever give us 'invalid' notetypes (since we define 'valid' as being
processable by Anki), we return an exception if the parse fails.
Note on naming convention: Below, abbreviated variable names represent
dicts coming from Anki, like `nt: NotetypeDict` or `fld: FieldDict`.
Full words like `field: Field` represent ki dataclasses. The parameters
of the dataclasses, however, use abbreviations for consistency with Anki
map keys.
"""
# If we can't even read the name of the notetype, then we can't print out a
# nice error message in the event of a `KeyError`. So we have to print out
# a different error message saying that the notetype doesn't have a name
# field.
try:
nt["name"]
except KeyError as err:
raise UnnamedNotetypeError(nt) from err
try:
fields: Dict[int, Field] = {}
for fld in nt["flds"]:
ordinal = fld["ord"]
fields[ordinal] = Field(name=fld["name"], ord=ordinal)
templates: List[Template] = []
for tmpl in nt["tmpls"]:
templates.append(
Template(
name=tmpl["name"],
qfmt=tmpl["qfmt"],
afmt=tmpl["afmt"],
ord=tmpl["ord"],
)
)
# Guarantee that 'sortf' exists in `notetype.flds`.
sort_ordinal: int = nt["sortf"]
if sort_ordinal not in fields:
# If we get a KeyError here, it will be caught.
raise MissingFieldOrdinalError(sort_ordinal, nt["name"])
notetype = Notetype(
id=nt["id"],
name=nt["name"],
type=nt["type"],
flds=list(fields.values()),
tmpls=templates,
sortf=fields[sort_ordinal],
dict=nt,
)
except KeyError as err:
key = str(err)
raise NotetypeKeyError(key, str(nt["name"])) from err
return notetype
@beartype
def get_models_recursively(kirepo: KiRepo, silent: bool) -> Dict[str, Notetype]:
"""
Find and merge all `models.json` files recursively.
Should we check for duplicates?
Returns
-------
Dict[int, Notetype]
A dictionary sending model names to Notetypes.
"""
all_models: Dict[str, Notetype] = {}
# Load notetypes from json files.
bar = tqdm(F.rglob(kirepo.root, MODELS_FILE), ncols=TQDM_NUM_COLS, leave=not silent)
bar.set_description("Models")
for models_file in bar:
with open(models_file, "r", encoding="UTF-8") as models_f:
new_nts: Dict[int, Dict[str, Any]] = json.load(models_f)
models: Dict[str, Notetype] = {}
for _, nt in new_nts.items():
notetype: Notetype = parse_notetype_dict(nt)
models[notetype.name] = notetype
# Add mappings to dictionary.
all_models.update(models)
return all_models
@beartype
def display_fields_health_warning(note: Note) -> int:
"""Display warnings when Anki's fields health check fails."""
health = note.fields_check()
if health == 1:
warn(f"Found empty note '{note.id}'")
warn(f"Fields health check code: {health}")
elif health == 2:
duplication = (
"Failed to add note to collection. Notetype/fields of "
+ f"note '{note.id}' are duplicate of existing note."
)
warn(duplication)
warn(f"First field:\n{html_to_screen(note.fields[0])}")
warn(f"Fields health check code: {health}")
elif health != 0:
death = (
f"fatal: Note '{note.id}' failed fields check with unknown "
+ f"error code: {health}"
)
logger.error(death)
return health
@beartype
def get_guid(fields: List[str]) -> str:
"""Construct a new GUID for a note. Adapted from genanki's `guid_for()`."""
contents = "__".join(fields)
# Get the first 8 bytes of the SHA256 of `contents` as an int.
m = hashlib.sha256()
m.update(contents.encode("utf-8"))
hash_bytes = m.digest()[:8]
hash_int = 0
for b in hash_bytes:
hash_int <<= 8
hash_int += b
# convert to the weird base91 format that Anki uses
rv_reversed = []
while hash_int > 0:
rv_reversed.append(BASE91_TABLE[hash_int % len(BASE91_TABLE)])
hash_int //= len(BASE91_TABLE)
return "".join(reversed(rv_reversed))
@beartype
def parse_markdown_note(
parser: Lark, transformer: NoteTransformer, delta: Delta
) -> DeckNote:
"""Parse with lark."""
tree = parser.parse(delta.path.read_text(encoding="UTF-8"))
flatnote: FlatNote = transformer.transform(tree)
parts: Tuple[str, ...] = delta.relpath.parent.parts
deck: str = "::".join(parts)
# Generate a GUID from the hash of the field contents if the `guid` field
# in the note file was left blank.
guid = flatnote.guid if flatnote.guid != "" else get_guid(flatnote.fields)
return DeckNote(
title=flatnote.title,
guid=guid,
deck=deck,
model=flatnote.model,
tags=flatnote.tags,
fields=flatnote.fields,
)
@beartype
def plain_to_html(plain: str) -> str:
"""Convert plain text to html"""
# Minor clean up
plain = plain.replace(r"<", "<")
plain = plain.replace(r">", ">")
plain = plain.replace(r"&", "&")
plain = plain.replace(r" ", " ")
plain = re.sub(r"\<b\>\s*\<\/b\>", "", plain)
plain = re.sub(r"\<i\>\s*\<\/i\>", "", plain)
plain = re.sub(r"\<div\>\s*\<\/div\>", "", plain)
# Convert newlines to `<br>` tags.
if not re.search(HTML_REGEX, plain):
plain = plain.replace("\n", "<br>")
return plain.strip()
@beartype
def update_note(
note: Note, decknote: DeckNote, old_notetype: Notetype, new_notetype: Notetype
) -> List[Warning]:
"""
Change all the data of `note` to that given in `decknote`.
This is only to be called on notes whose nid already exists in the
database. Creates a new deck if `decknote.deck` doesn't exist. Assumes
that the model has already been added to the collection, and raises an
exception if it finds otherwise. Changes notetype to that specified by
`decknote.model`. Overwrites all fields with `decknote.fields`.
Updates:
- tags
- deck
- model
- fields
"""
# Check that the passed argument `new_notetype` has a name consistent with
# the model specified in `decknote`. The former should be derived from the
# latter, and if they don't match, there is a bug in the caller.
if decknote.model != new_notetype.name:
raise NotetypeMismatchError(decknote, new_notetype)
note.tags = decknote.tags
note.flush()
# Set the deck of the given note, and create a deck with this name if it
# doesn't already exist. See the comments/docstrings in the implementation
# of the `anki.decks.DeckManager.id()` method.
newdid: int = note.col.decks.id(decknote.deck, create=True)
cids = [c.id for c in note.cards()]
# Set deck for all cards of this note.
if cids:
note.col.set_deck(cids, newdid)
# Change notetype of note.
fmap: Dict[str, None] = {}
for field in old_notetype.flds:
fmap[field.ord] = None
# Change notetype (also clears all fields).
if old_notetype.id != new_notetype.id:
note.col.models.change(
old_notetype.dict, [note.id], new_notetype.dict, fmap, None
)
note.load()
# Validate field keys against notetype.
warnings: List[Warning] = validate_decknote_fields(new_notetype, decknote)
if len(warnings) > 0:
return warnings
# Set field values. This is correct because every field name that appears
# in `new_notetype` is contained in `decknote.fields`, or else we would
# have printed a warning and returned above.
for key, field in decknote.fields.items():
if key not in note:
warnings.append(NoteFieldValidationWarning(note.id, key, new_notetype))
continue
try:
note[key] = plain_to_html(field)
except IndexError as err:
raise AnkiDBNoteMissingFieldsError(decknote, note.id, key) from err
# Flush fields to collection object.
note.flush()
# Remove if unhealthy.
health = display_fields_health_warning(note)
if health != 0:
note.col.remove_notes([note.id])
warnings.append(UnhealthyNoteWarning(note.id, health))
return warnings
@beartype
def validate_decknote_fields(notetype: Notetype, decknote: DeckNote) -> List[Warning]:
"""Validate that the fields given in the note match the notetype."""
warnings: List[Warning] = []
names: List[str] = [field.name for field in notetype.flds]
# TODO: It might also be nice to print the path of the note in the
# repository. This would have to be added to the `DeckNote` spec.
if len(decknote.fields.keys()) != len(names):
warnings.append(WrongFieldCountWarning(decknote, names))
for x, y in zip(names, decknote.fields.keys()):
if x != y:
warnings.append(InconsistentFieldNamesWarning(x, y, decknote))
return warnings
# TODO: This should use other fields when there is not enough content in the
# first field to get a unique filename, if they exist. Note that we must still
# treat the case where all fields are images!
@beartype
def get_note_path(colnote: ColNote, deck_dir: ExtantDir, card_name: str = "") -> NoFile:
"""Get note path from sort field text."""
field_text = colnote.sortf_text
# Construct filename, stripping HTML tags and sanitizing (quickly).
field_text = plain_to_html(field_text)
field_text = re.sub("<[^<]+?>", "", field_text)
# If the HTML stripping removed all text, we just slugify the raw sort
# field text.
if len(field_text) == 0:
field_text = colnote.sortf_text
name = field_text[:MAX_FILENAME_LEN]
slug = F.slugify(name)
# Make it so `slug` cannot possibly be an empty string, because then we get
# a `Path('.')` which is a bug, and causes a runtime exception. If all
# else fails, generate a random hex string to use as the filename.
if len(slug) == 0:
slug = str(colnote.n.guid)
msg = f"Slug for '{colnote.n.guid}' is empty. Using guid as filename"
logger.warning(msg)
if card_name != "":
slug = f"{slug}_{card_name}"
filename: str = f"{slug}{MD}"
note_path = F.test(deck_dir / filename, resolve=False)
i = 1
while not isinstance(note_path, NoFile):
filename = f"{slug}_{i}{MD}"
note_path = F.test(deck_dir / filename, resolve=False)
i += 1
return note_path
@beartype
def backup(kirepo: KiRepo) -> None:
"""Backup collection to `.ki/backups`."""
md5sum = F.md5(kirepo.col_file)
name = f"{md5sum}.anki2"
backup_file = F.test(kirepo.backups_dir / name)
# We assume here that no one would ever make e.g. a directory called
# `name`, since `name` contains the md5sum of the collection file, and
# thus that is extraordinarily improbable. So the only thing we have to
# check for is that we haven't already written a backup file to this
# location.
if isinstance(backup_file, ExtantFile):
echo("Backup already exists.")
return
echo(f"Writing backup of .anki2 file to '{backup_file}'")
F.copyfile(kirepo.col_file, kirepo.backups_dir, name)
@beartype
def append_md5sum(
ki_dir: ExtantDir, tag: str, md5sum: str, silent: bool = False
) -> None:
"""Append an md5sum hash to the hashes file."""
hashes_file = ki_dir / HASHES_FILE
with open(hashes_file, "a+", encoding="UTF-8") as hashes_f:
hashes_f.write(f"{md5sum} {tag}\n")
echo(f"Wrote md5sum to '{hashes_file}'", silent)
@beartype
def create_deck_dir(deck_name: str, targetdir: ExtantDir) -> ExtantDir:
"""
Construct path to deck directory and create it, allowing the case in which
the directory already exists because we already created one of its
children, in which case this function is a no-op.
"""
# Strip leading periods so we don't get hidden folders.
components = deck_name.split("::")
components = [re.sub(r"^\.", r"", comp) for comp in components]
components = [re.sub(r"/", r"-", comp) for comp in components]
deck_path = Path(targetdir, *components)
return F.force_mkdir(deck_path)
@beartype
def get_field_note_id(nid: int, fieldname: str) -> str:
"""A str ID that uniquely identifies field-note pairs."""
return f"{nid}{F.slugify(fieldname)}"
# TODO: Use more refined types than `int`.
@beartype
def add_db_note(
col: Collection,
nid: int,
guid: str,
mid: int,
mod: int,
usn: int,
tags: List[str],
fields: List[str],
sfld: str,
csum: int,
flags: int,
data: str,
) -> Note:
"""Add a note to the database directly, with a SQL INSERT."""
importer = NoteImporter(col, "")
importer.addNew(
[
(
nid,
guid,
mid,
mod,
usn,
" " + " ".join(tags) + " ",
"\x1f".join(fields),
sfld,
csum,
flags,
data,
)
]
)
col.after_note_updates([nid], mark_modified=False)
return col.get_note(nid)
@beartype
def push_note(
col: Collection,
decknote: DeckNote,
timestamp_ns: int,
guids: Dict[str, NoteMetadata],
new_nids: Iterator[int],
) -> List[Warning]:
"""
Update the Anki `Note` object in `col` corresponding to `decknote`,
creating it if it does not already exist.
Raises
------
MissingNotetypeError
If we can't find a notetype with the name provided in `decknote`.
"""
# Notetype/model names are privileged in Anki, so if we don't find the
# right name, we raise an error.
model_id: Optional[int] = col.models.id_for_name(decknote.model)
if model_id is None:
raise MissingNotetypeError(decknote.model)
if decknote.guid in guids:
nid: int = guids[decknote.guid].nid
note: Note = col.get_note(nid)
else:
nid: int = next(new_nids)
note: Note = add_db_note(
col,
nid,
decknote.guid,
model_id,
mod=int(timestamp_ns // 1e9),
usn=-1,
tags=decknote.tags,
fields=list(decknote.fields.values()),
sfld="",
csum=0,
flags=0,
data="",
)
# If we are updating an existing note, we need to know the old and new
# notetypes, and then update the notetype (and the rest of the note data)
# accordingly.
old_notetype: Notetype = parse_notetype_dict(note.note_type())
new_notetype: Notetype = parse_notetype_dict(col.models.get(model_id))
warnings = update_note(note, decknote, old_notetype, new_notetype)
return warnings
@beartype
def get_colnote(col: Collection, nid: int) -> ColNote:
"""Get a dataclass representation of an Anki note."""
try:
note = col.get_note(nid)
except NotFoundError as err:
raise MissingNoteIdError(nid) from err
notetype: Notetype = parse_notetype_dict(note.note_type())
# Get sort field content. See comment where we subscript in the same way in
# `push_note()`.
try:
sortf_text: str = note[notetype.sortf.name]
except KeyError as err:
raise NoteFieldKeyError(str(err), nid) from err
# TODO: Remove implicit assumption that all cards are in the same deck, and
# work with cards instead of notes.
try:
deck = col.decks.name(note.cards()[0].did)
except IndexError as err:
logger.error(f"{note.cards() = }")
logger.error(f"{note.guid = }")
logger.error(f"{note.id = }")
raise err
colnote = ColNote(
n=note,
new=False,
deck=deck,
title="",
markdown=False,
notetype=notetype,
sortf_text=sortf_text,
)
return colnote
@beartype
def get_header_lines(colnote) -> List[str]:
"""Get header of markdown representation of note."""
lines = [
"# Note",
"```",
f"guid: {colnote.n.guid}",
f"notetype: {colnote.notetype.name}",
"```",
"",
"### Tags",
"```",
]
lines += colnote.n.tags
lines += ["```", ""]
return lines
def files_in_str(
col: Collection, string: str, include_remote: bool = False
) -> list[str]:
"""A copy of `MediaManager.files_in_str()`, but without LaTeX rendering."""
# Extract filenames.
files = []
for reg in col.media.regexps:
for match in re.finditer(reg, string):
fname = match.group("fname")
is_local = not re.match("(https?|ftp)://", fname.lower())
if is_local or include_remote:
fname = fname.strip()
fname = fname.replace('"', "")
files.append(fname)
return files
@beartype
def copy_media_files(
col: Collection,
media_target_dir: EmptyDir,
silent: bool,
) -> Tuple[Dict[int, Set[ExtantFile]], Set[Warning]]:
"""
Get a list of extant media files used in notes and notetypes, copy those
media files to the top-level `_media/` directory in the repository root,
and return a map sending note ids to sets of copied media files.
Adapted from code in `anki/pylib/anki/exporting.py`. Specifically, the
`AnkiExporter.exportInto()` function.
SQLite3 notes table schema
--------------------------
CREATE TABLE notes (
id integer PRIMARY KEY,
guid text NOT NULL,
mid integer NOT NULL,
mod integer NOT NULL,
usn integer NOT NULL,
tags text NOT NULL,
flds text NOT NULL,
-- The use of type integer for sfld is deliberate, because it means
-- that integer values in this field will sort numerically.
sfld integer NOT NULL,
csum integer NOT NULL,
flags integer NOT NULL,
data text NOT NULL
);
Parameters
----------
col
Anki collection.
silent
Whether to display stdout.
"""
# All note ids as a string for the SQL query.
strnids = ids2str(list(col.find_notes(query="")))
# This is the path to the media directory. In the original implementation
# of `AnkiExporter.exportInto()`, there is check made of the form
#
# if self.mediaDir:
#
# before doing path manipulation with this string.
#
# Examining the `__init__()` function of `MediaManager`, we can see that
# `col.media.dir()` will only be `None` in the case where `server=True` is
# passed to the `Collection` constructor. But since we do the construction
# within ki, we have a guarantee that this will never be true, and thus we
# can assume it is a nonempty string, which is all we need for the
# following code to be safe.
media_dir = F.test(Path(col.media.dir()))
if not isinstance(media_dir, ExtantDir):
raise MissingMediaDirectoryError(col.path, media_dir)
# Find only used media files, collecting warnings for bad paths.
media: Dict[int, Set[ExtantFile]] = {}
warnings: Set[Warning] = set()
query: str = "select * from notes where id in " + strnids
rows: List[NoteDBRow] = [NoteDBRow(*row) for row in col.db.all(query)]
bar = tqdm(rows, ncols=TQDM_NUM_COLS, leave=not silent)
bar.set_description("Media")
for row in bar:
for file in files_in_str(col, row.flds):
# Skip files in subdirs.
if file != os.path.basename(file):
continue
media_file = F.test(media_dir / file)
if isinstance(media_file, ExtantFile):
copied_file = F.copyfile(media_file, media_target_dir, media_file.name)
media[row.nid] = media.get(row.nid, set()) | set([copied_file])
else:
warnings.add(MissingMediaFileWarning(col.path, media_file))
mids = col.db.list("select distinct mid from notes where id in " + strnids)
# Faster version.
_, _, files = F.shallow_walk(media_dir)
fnames: List[Path] = [Path(file.name) for file in files]
for fname in fnames:
# Notetype template media files are *always* prefixed by underscores.
if str(fname).startswith("_"):
# Scan all models in mids for reference to fname.
for m in col.models.all():
if int(m["id"]) in mids and _modelHasMedia(m, str(fname)):
# If the path referenced by `fname` doesn't exist or is not
# a file, we do not display a warning or return an error.
# This path certainly ought to exist, since `fname` was
# obtained from an `os.listdir()` call.
media_file = F.test(media_dir / fname)
if isinstance(media_file, ExtantFile):
copied_file = F.copyfile(
media_file, media_target_dir, media_file.name
)
notetype_media = media.get(NOTETYPE_NID, set())
media[NOTETYPE_NID] = notetype_media | set([copied_file])
break
return media, warnings
@beartype
def _modelHasMedia(model: NotetypeDict, fname: str) -> bool:
"""
Check if a notetype has media.
Adapted from `anki.exporting.AnkiExporter._modelHasMedia()`, which is an
instance method, but does not make any use of `self`, and so could be a
staticmethod. It is a pure function.
"""
# First check the styling
if fname in model["css"]:
return True
# If no reference to fname then check the templates as well
for t in model["tmpls"]:
if fname in t["qfmt"] or fname in t["afmt"]:
return True
return False
@beartype
def write_repository(
col_file: ExtantFile,
targetdir: ExtantDir,
leaves: Leaves,
media_target_dir: EmptyDir,
silent: bool,
verbose: bool,
) -> Set[LatentSymlink]:
"""Write notes to appropriate directories in `targetdir`."""
# Create config file.
config_file: ExtantFile = leaves.files[CONFIG_FILE]
config = configparser.ConfigParser()
config["remote"] = {"path": col_file}
with open(config_file, "w", encoding="UTF-8") as config_f:
config.write(config_f)
# Create temp directory for htmlfield text files.
tempdir: EmptyDir = F.mkdtemp()
root: EmptyDir = F.mksubdir(tempdir, FIELD_HTML_SUFFIX)
tidy_field_files: Dict[str, ExtantFile] = {}
decks: Dict[str, List[ColNote]] = {}
# Open collection using a `Maybe`.
cwd: ExtantDir = F.cwd()
col: Collection = M.collection(col_file)
F.chdir(cwd)
# ColNote-containing data structure, to be passed to `write_decks()`.
colnotes: Dict[int, ColNote] = {}
# Query all note ids, get the deck from each note, and construct a map
# sending deck names to lists of notes.
all_nids = list(col.find_notes(query=""))
bar = tqdm(all_nids, ncols=TQDM_NUM_COLS, leave=not silent)
bar.set_description("Notes")
for nid in bar:
colnote: ColNote = get_colnote(col, nid)
colnotes[nid] = colnote
decks[colnote.deck] = decks.get(colnote.deck, []) + [colnote]
for field_name, field_text in colnote.n.items():
field_text: str = html_to_screen(field_text)
if re.search(HTML_REGEX, field_text):
fid: str = get_field_note_id(nid, field_name)
html_file: NoFile = F.test(root / fid)
tidy_field_files[fid] = F.write(html_file, field_text)
tidy_html_recursively(root, silent)
media: Dict[int, Set[ExtantFile]]
media, warnings = copy_media_files(col, media_target_dir, silent=silent)
latent_links: Set[LatentSymlink] = write_decks(
col,
targetdir,
colnotes,
media,
tidy_field_files,
silent,
)
num_displayed: int = 0
for warning in warnings:
if verbose or type(warning) not in WARNING_IGNORE_LIST:
click.secho(str(warning), fg="yellow")
num_displayed += 1
num_suppressed: int = len(warnings) - num_displayed
echo(f"Warnings suppressed: {num_suppressed} (show with '--verbose')")
F.rmtree(root)
col.close(save=False)
return latent_links
@beartype
def write_decks(
col: Collection,
targetdir: ExtantDir,
colnotes: Dict[int, ColNote],
media: Dict[int, Set[ExtantFile]],
tidy_field_files: Dict[str, ExtantFile],
silent: bool,
) -> Set[LatentSymlink]:
"""
The proper way to do this is a DFS traversal, perhaps recursively, which
will make it easier to keep things purely functional, accumulating the
model ids of the children in each node. For this, we must construct a tree
from the deck names.
"""
# Accumulate pairs of model ids and notetype maps. The return type of the
# `ModelManager.get()` call below indicates that it may return `None`,
# but we know it will not because we are getting the notetype id straight
# from the Anki DB.
models_map: Dict[int, NotetypeDict] = {}
for nt_name_id in col.models.all_names_and_ids():
models_map[nt_name_id.id] = col.models.get(nt_name_id.id)
# Dump the models file for the whole repository.
with open(targetdir / MODELS_FILE, "w", encoding="UTF-8") as f:
json.dump(models_map, f, ensure_ascii=False, indent=4, sort_keys=True)
# Implement new `ColNote`-writing procedure, using `DeckTreeNode`s.
#
# It must do the following for each deck:
# - create the deck directory
# - write the models.json file
# - create and populate the media directory
# - write the note payload for each note in the correct deck, exactly once
#
# In other words, for each deck, we need to write all of its:
# - models
# - media
# - notes
#
# The first two are cumulative: we want the models and media of subdecks to
# be included in their ancestors. The notes, however, should not be
# cumulative. Indeed, we want each note to appear exactly once in the
# entire repository, making allowances for the case where a single note's
# cards are spread across multiple decks, in which case we must create a
# symlink.
#
# And actually, both of these cases are nicely taken care of for us by the
# `DeckManager.cids()` function, which has a `children: bool` parameter
# which toggles whether or not to get the card ids of subdecks or not.
root: DeckTreeNode = col.decks.deck_tree()
@beartype
def postorder(node: DeckTreeNode) -> List[DeckTreeNode]:
"""
Post-order traversal. Guarantees that we won't process a node until
we've processed all its children.
"""
traversal: List[DeckTreeNode] = []
for child in node.children:
traversal += postorder(child)
traversal += [node]
return traversal
# All card ids we've already processed.
written_cids: Set[int] = set()
# Map nids we've already written files for to a dataclass containing:
# - the corresponding `ExtantFile`
# - the deck id of the card for which we wrote it
#
# This deck id identifies the deck corresponding to the location where all
# the symlinks should point.
written_notes: Dict[int, WrittenNoteFile] = {}
# All latent symlinks created on Windows whose file modes we must set.
latent_links: Set[LatentSymlink] = set()
nodes: List[DeckTreeNode] = postorder(root)
nodes_bar = tqdm(nodes, ncols=TQDM_NUM_COLS, leave=not silent)
nodes_bar.set_description("Decks")
for node in nodes_bar:
node_cids: Set[int]
node_notes: Dict[int, WrittenNoteFile]
node_latent_links: Set[LatentSymlink]
node_cids, node_notes, node_latent_links = write_deck_node_cards(
node,
col,
targetdir,
colnotes,
tidy_field_files,
models_map,
written_cids,
written_notes,
)
written_cids |= node_cids
written_notes.update(node_notes)
latent_links |= node_latent_links
media_links: Set[LatentSymlink] = chain_media_symlinks(root, col, targetdir, media)
latent_links |= media_links
return latent_links
@beartype
def write_deck_node_cards(
node: DeckTreeNode,
col: Collection,
targetdir: ExtantDir,
colnotes: Dict[int, ColNote],
tidy_field_files: Dict[str, ExtantFile],
models_map: Dict[int, NotetypeDict],
written_cids: Set[int],
written_notes: Dict[int, WrittenNoteFile],
) -> Tuple[Set[int], Dict[int, WrittenNoteFile], Set[LatentSymlink]]:
"""Write all the cards to disk for a single `DeckTreeNode`."""
written_cids = copy.deepcopy(written_cids)
written_notes = copy.deepcopy(written_notes)
latent_links: Set[LatentSymlink] = set()
# The name stored in a `DeckTreeNode` object is not the full name of
# the deck, it is just the 'basename'. The `postorder()` function
# returns a deck with `did == 0` at the end of each call, probably
# because this is the implicit parent deck of all top-level decks. This
# deck empirically always has the empty string as its name. This is
# likely an Anki implementation detail. As a result, we ignore any deck
# with empty basename. We do this as opposed to ignoring decks with
# `did == 0` because it seems less likely to change if the Anki devs
# decide to mess with the implementation. Empty deck names will always
# be reserved, but they might e.g. decide ``did == -1`` makes more
# sense.
did: int = node.deck_id
basename: str = node.name
if basename == "":
return set(), {}, set()
name = col.decks.name(did)
deck_dir: ExtantDir = create_deck_dir(name, targetdir)
children: Set[CardId] = set(col.decks.cids(did=did, children=False))
descendants: List[CardId] = col.decks.cids(did=did, children=True)
descendant_nids: Set[int] = {NOTETYPE_NID}
descendant_mids: Set[int] = set()
for cid in descendants:
card: Card = col.get_card(cid)
descendant_nids.add(card.nid)
descendant_mids.add(card.note().mid)
# Card writes should not be cumulative, so we only perform them if
# `cid` is the card id of a card that is in the deck corresponding
# to `node`, but not in any of its children.
if cid not in children:
continue
# We only even consider writing *anything* (file or symlink) to
# disk after checking that we haven't already processed this
# particular card id. If we have, then we only need to bother with
# the cumulative stuff above (nids and mids, which we keep track of
# in order to write out models and media).
#
# TODO: This fixes a very serious bug. Write a test to capture the
# issue. Use the Japanese Core 2000 deck as a template if needed.
if cid in written_cids:
continue
written_cids.add(cid)
# We only write the payload if we haven't seen this note before.
# Otherwise, this must be a different card generated from the same
# note, so we symlink to the location where we've already written
# it to disk.
colnote: ColNote = colnotes[card.nid]
if card.nid not in written_notes:
note_path: NoFile = get_note_path(colnote, deck_dir)
payload: str = get_note_payload(colnote, tidy_field_files)
note_path: ExtantFile = F.write(note_path, payload)
written_notes[card.nid] = WrittenNoteFile(did, note_path)
else:
# If `card` is in the same deck as the card we wrote `written`
# for, then there is no need to create a symlink, because the
# note file is already there, in the correct deck directory.
#
# TODO: This fixes a very serious bug. Write a test to capture the
# issue. Use the Japanese Core 2000 deck as a template if needed.
written: WrittenNoteFile = written_notes[card.nid]
if card.did == written.did:
continue
# Get card template name.
template: TemplateDict = card.template()
name: str = template["name"]
note_path: NoFile = get_note_path(colnote, deck_dir, name)
abs_target: ExtantFile = written_notes[card.nid].file
distance = len(note_path.parent.relative_to(targetdir).parts)
up_path = Path("../" * distance)
relative: Path = abs_target.relative_to(targetdir)
target: Path = up_path / relative
try:
link: Union[Symlink, LatentSymlink] = F.symlink(note_path, target)
if isinstance(link, LatentSymlink):
latent_links.add(link)
except OSError as _:
trace = traceback.format_exc(limit=3)
logger.warning(f"Failed to create symlink for cid '{cid}'\n{trace}")
# TODO: Should this block be outside this function?
# Write `models.json` for current deck.
deck_models_map = {mid: models_map[mid] for mid in descendant_mids}
with open(deck_dir / MODELS_FILE, "w", encoding="UTF-8") as f:
json.dump(deck_models_map, f, ensure_ascii=False, indent=4, sort_keys=True)
return written_cids, written_notes, latent_links
@beartype
def chain_media_symlinks(
root: DeckTreeNode,
col: Collection,
targetdir: ExtantDir,
media: Dict[int, Set[ExtantFile]],
) -> Set[LatentSymlink]:
"""Chain symlinks up the deck tree into top-level `<collection>/_media/`."""
@beartype
def preorder(node: DeckTreeNode) -> List[DeckTreeNode]:
"""
Pre-order traversal. Guarantees that we won't process a node until
we've processed all its ancestors.
"""
traversal: List[DeckTreeNode] = [node]
for child in node.children:
traversal += preorder(child)
return traversal
@beartype
def map_parents(node: DeckTreeNode, col: Collection) -> Dict[str, DeckTreeNode]:
"""Map deck names to parent `DeckTreeNode`s."""
parents: Dict[str, DeckTreeNode] = {}
for child in node.children:
did: int = child.deck_id
name: str = col.decks.name(did)
parents[name] = node
subparents = map_parents(child, col)
parents.update(subparents)
return parents
media_latent_links: Set[LatentSymlink] = set()
media_dirs: Dict[str, ExtantDir] = {}
parents: Dict[str, DeckTreeNode] = map_parents(root, col)
for node in preorder(root):
if node.name == "":
continue
did: int = node.deck_id
fullname = col.decks.name(did)
deck_dir: ExtantDir = create_deck_dir(fullname, targetdir)
deck_media_dir: ExtantDir = F.force_mkdir(deck_dir / MEDIA)
media_dirs[fullname] = deck_media_dir
descendant_nids: Set[int] = {NOTETYPE_NID}
descendants: List[CardId] = col.decks.cids(did=did, children=True)
for cid in descendants:
card: Card = col.get_card(cid)
descendant_nids.add(card.nid)
for nid in descendant_nids:
if nid not in media:
continue
for media_file in media[nid]:
parent: DeckTreeNode = parents[fullname]
if parent.name != "":
parent_did: int = parent.deck_id
parent_fullname: str = col.decks.name(parent_did)
parent_media_dir = media_dirs[parent_fullname]
abs_target: Symlink = F.test(
parent_media_dir / media_file.name, resolve=False
)
else:
abs_target: ExtantFile = media_file
path = F.test(deck_media_dir / media_file.name, resolve=False)
if not isinstance(path, NoFile):
continue
distance = len(path.parent.relative_to(targetdir).parts)
up_path = Path("../" * distance)
relative: Path = abs_target.relative_to(targetdir)
target: Path = up_path / relative
try:
link: Union[Symlink, LatentSymlink] = F.symlink(path, target)
if isinstance(link, LatentSymlink):
media_latent_links.add(link)
except OSError as _:
trace = traceback.format_exc(limit=3)
logger.warning(f"Failed to create symlink to media\n{trace}")
return media_latent_links
@beartype
def html_to_screen(html: str) -> str:
"""
Convert html for a *single field* into plaintext, to be displayed within a
markdown file.
Does very litle (just converts HTML-escaped special characters like `<br>`
tags or ` `s to their UTF-8 equivalents) in the case where the
sentinel string does not appear. This string, `GENERATED_HTML_SENTINEL`,
indicates that a note was first created in a ki markdown file, and
therefore its HTML structure can be safely stripped out within calls to
`write_decks()`.
"""
html = re.sub(r"\<style\>.*\<\/style\>", "", html, flags=re.S)
plain = html
# For convenience: Un-escape some common LaTeX constructs.
plain = plain.replace(r"\\\\", r"\\")
plain = plain.replace(r"\\{", r"\{")
plain = plain.replace(r"\\}", r"\}")
plain = plain.replace(r"\*}", r"*}")
plain = plain.replace(r"<", "<")
plain = plain.replace(r">", ">")
plain = plain.replace(r"&", "&")
plain = plain.replace(r" ", " ")
plain = plain.replace("<br>", "\n")
plain = plain.replace("<br/>", "\n")
plain = plain.replace("<br />", "\n")
# Unbreak lines within src attributes.
plain = re.sub('src= ?\n"', 'src="', plain)
plain = re.sub(r"\<b\>\s*\<\/b\>", "", plain)
return plain.strip()
@beartype
def get_colnote_as_str(colnote: ColNote) -> str:
"""Return string representation of a `ColNote`."""
lines = get_header_lines(colnote)
for field_name, field_text in colnote.n.items():
lines.append("### " + field_name)
lines.append(html_to_screen(field_text))
lines.append("")
return "\n".join(lines)
@beartype
def get_note_payload(colnote: ColNote, tidy_field_files: Dict[str, ExtantFile]) -> str:
"""
Return the markdown-converted contents of the Anki note represented by
`colnote` as a string.
Given a `ColNote`, which is a dataclass wrapper around a `Note` object
which has been loaded from the DB, and a mapping from `fid`s (unique
identifiers of field-note pairs) to paths, we check for each field of each
note whether that field's `fid` is contained in `tidy_field_files`. If so,
that means that the caller dumped the contents of this field to a file (the
file with this path, in fact) in order to autoformat the HTML source. If
this field was tidied/autoformatted, we read from that path to get the
tidied source, otherwise, we use the field content present in the
`ColNote`.
"""
# Get tidied html if it exists.
tidy_fields = {}
for field_name, field_text in colnote.n.items():
fid = get_field_note_id(colnote.n.id, field_name)
if fid in tidy_field_files:
# HTML5-tidy adds a newline after `<br>` in indent mode, so we
# remove these, because `html_to_screen()` converts `<br>` tags to
# newlines anyway.
tidied_field_text: str = tidy_field_files[fid].read_text(encoding="UTF-8")
tidied_field_text = tidied_field_text.replace("<br>\n", "\n")
tidied_field_text = tidied_field_text.replace("<br/>\n", "\n")
tidied_field_text = tidied_field_text.replace("<br />\n", "\n")
tidy_fields[field_name] = tidied_field_text
else:
tidy_fields[field_name] = field_text
lines = get_header_lines(colnote)
for field_name, field_text in tidy_fields.items():
lines.append("## " + field_name)
screen_text = html_to_screen(field_text)
text = colnote.n.col.media.escape_media_filenames(screen_text, unescape=True)
lines.append(text)
lines.append("")
return "\n".join(lines)
@beartype
def git_pull(
remote: str,
branch: str,
cwd: ExtantDir,
unrelated: bool,
theirs: bool,
check: bool,
silent: bool,
) -> None:
"""Pull remote into branch using a subprocess call."""
with F.halo(f"Pulling into '{cwd}'..."):
args = ["git", "pull", "-v"]
if unrelated:
args += ["--allow-unrelated-histories"]
if theirs:
args += ["--strategy-option=theirs"]
args += ["--verbose"]
args += [remote, branch]
p = subprocess.run(args, check=False, cwd=cwd, capture_output=True)
echo(f"{p.stdout.decode()}", silent=silent)
echo(f"{p.stderr.decode()}", silent=silent)
if check and p.returncode != 0:
click.secho(f"Error while pulling into '{cwd}'", fg="red")
raise RuntimeError(f"Git failed with return code '{p.returncode}'.")
echo(f"Pulling into '{cwd}'... done.", silent=silent)
@beartype
def echo(string: str, silent: bool = False) -> None:
"""Call `click.secho()` with formatting."""
if not silent:
click.secho(string, bold=True)
@beartype
def warn(string: str) -> None:
"""Call `click.secho()` with formatting (yellow)."""
click.secho(f"WARNING: {string}", bold=True, fg="yellow")
@beartype
def tidy_html_recursively(root: ExtantDir, silent: bool) -> None:
"""Call html5-tidy on each file in `root`, editing in-place."""
# Spin up subprocesses for tidying field HTML in-place.
batches: List[List[ExtantFile]] = list(
F.get_batches(F.rglob(root, "*"), BATCH_SIZE)
)
bar = tqdm(batches, ncols=TQDM_NUM_COLS, leave=not silent)
bar.set_description("HTML")
for batch in bar:
# TODO: Should we fail silently here, so as to not bother user with
# tidy warnings?
command = [
"tidy",
"-q",
"-m",
"-i",
"-omit",
"-utf8",
"--tidy-mark",
"no",
"--show-body-only",
"yes",
"--wrap",
"68",
"--wrap-attributes",
"yes",
]
command += batch
try:
subprocess.run(command, check=False, capture_output=True)
except FileNotFoundError as err:
raise MissingTidyExecutableError(err) from err
@beartype
def get_target(
cwd: ExtantDir, col_file: ExtantFile, directory: str
) -> Tuple[EmptyDir, bool]:
"""Create default target directory."""
path = F.test(Path(directory) if directory != "" else cwd / col_file.stem)
new: bool = True
if isinstance(path, NoPath):
path.mkdir(parents=True)
return M.emptydir(path), new
if isinstance(path, EmptyDir):
new = False
return path, new
raise TargetExistsError(path)
@beartype
def echo_note_change_types(deltas: List[Delta]) -> None:
"""Write a table of git change types for notes to stdout."""
is_change_type = lambda t: lambda d: d.status == t
adds = list(filter(is_change_type(GitChangeType.ADDED), deltas))
deletes = list(filter(is_change_type(GitChangeType.DELETED), deltas))
renames = list(filter(is_change_type(GitChangeType.RENAMED), deltas))
modifies = list(filter(is_change_type(GitChangeType.MODIFIED), deltas))
typechanges = list(filter(is_change_type(GitChangeType.TYPECHANGED), deltas))
WORD_PAD = 15
COUNT_PAD = 9
add_info: str = "ADD".ljust(WORD_PAD) + str(len(adds)).rjust(COUNT_PAD)
delete_info: str = "DELETE".ljust(WORD_PAD) + str(len(deletes)).rjust(COUNT_PAD)
modification_info: str = "MODIFY".ljust(WORD_PAD) + str(len(modifies)).rjust(
COUNT_PAD
)
rename_info: str = "RENAME".ljust(WORD_PAD) + str(len(renames)).rjust(COUNT_PAD)
typechange_info: str = "TYPE CHANGE".ljust(WORD_PAD) + str(len(typechanges)).rjust(
COUNT_PAD
)
echo("=" * (WORD_PAD + COUNT_PAD))
echo("Note change types")
echo("-" * (WORD_PAD + COUNT_PAD))
echo(add_info)
echo(delete_info)
echo(modification_info)
echo(rename_info)
echo(typechange_info)
echo("=" * (WORD_PAD + COUNT_PAD))
@beartype
def add_models(col: Collection, models: Dict[str, Notetype]) -> None:
"""Add all new models."""
@beartype
def get_notetype_json(notetype: Notetype) -> str:
dictionary: Dict[str, Any] = dataclasses.asdict(notetype)
dictionary.pop("id")
inner = dictionary["dict"]
inner.pop("id")
inner.pop("mod")
dictionary["dict"] = inner
return json.dumps(dictionary, sort_keys=True, indent=4)
@beartype
def notetype_hash_repr(notetype: Notetype) -> str:
s = get_notetype_json(notetype)
return f"JSON for '{pp.pformat(notetype.id)}':\n{s}"
for model in models.values():
# TODO: Consider waiting to parse `models` until after the
# `add_dict()` call.
#
# Set the model id to `0`, and then add.
# TODO: What happens if we try to add a model with the same name as
# an existing model, but the two models are not the same,
# content-wise?
#
# Check if a model already exists with this name, and get its `mid`.
mid: Optional[int] = col.models.id_for_name(model.name)
# TODO: This block is unfinished. We need to add new notetypes (and
# rename them) only if they are 'new', where new means they are
# different from anything else already in the DB, in the
# content-addressed sense. If they are new, then we must indicate that
# the notes we are adding actually have these new notetypes. For this,
# it may make sense to use the hash of the notetype everywhere (i.e. in
# the note file) rather than the name or mid.
#
# If a model already exists with this name, parse it, and check if its
# hash is identical to the model we are trying to add.
if mid is not None:
nt: NotetypeDict = col.models.get(mid)
# If we are trying to add a model that has the exact same content
# and name as an existing model, skip it.
existing_model: Notetype = parse_notetype_dict(nt)
if get_notetype_json(model) == get_notetype_json(existing_model):
continue
logger.warning(
f"Collision: New model '{model.name}' has same name "
f"as existing model with mid '{mid}', but hashes differ."
)
logger.warning(notetype_hash_repr(model))
logger.warning(notetype_hash_repr(existing_model))
# If the hashes don't match, then we somehow need to update
# `decknote.model` for the relevant notes.
# TODO: Consider using the hash of the notetype instead of its
# name.
nt_copy: NotetypeDict = copy.deepcopy(model.dict)
nt_copy["id"] = 0
changes: OpChangesWithId = col.models.add_dict(nt_copy)
nt: NotetypeDict = col.models.get(changes.id)
model: Notetype = parse_notetype_dict(nt)
echo(f"Added model '{model.name}'")
@beartype
def media_data(col: Collection, fname: str) -> bytes:
"""Get media file content as bytes (empty if missing)."""
if not col.media.have(fname):
return b""
path = os.path.join(col.media.dir(), fname)
try:
with open(path, "rb") as f:
return f.read()
except OSError:
return b""
@beartype
def filemode(
file: Union[ExtantFile, ExtantDir, ExtantStrangePath, Symlink, LatentSymlink]
) -> int:
"""Get git file mode."""
try:
# We must search from file upwards in case inside submodule.
repo = git.Repo(file, search_parent_directories=True)
out = repo.git.ls_files(["-s", str(file)])
# Treat case where file is untracked.
if out == "":
return -1
mode: int = int(out.split()[0])
except Exception as err:
raise GitFileModeParseError(file, out) from err
return mode
@click.group()
@click.version_option()
@beartype
def ki() -> None:
"""
The universal CLI entry point for `ki`.
Takes no arguments, only has three subcommands (clone, pull, push).
"""
return
@ki.command()
@click.argument("collection")
@click.argument("directory", required=False, default="")
@click.option("--verbose", "-v", is_flag=True, help="Print more output.")
def clone(collection: str, directory: str = "", verbose: bool = False) -> None:
"""
Clone an Anki collection into a directory.
Parameters
----------
collection : str
The path to an `.anki2` collection file.
directory : str, default=""
An optional path to a directory to clone the collection into.
Note: we check that this directory does not yet exist.
"""
if PROFILE:
# pylint: disable=import-outside-toplevel
from pyinstrument import Profiler
# pylint: enable=import-outside-toplevel
profiler = Profiler()
profiler.start()
echo("Cloning.")
col_file: ExtantFile = M.xfile(Path(collection))
@beartype
def cleanup(targetdir: ExtantDir, new: bool) -> Union[ExtantDir, EmptyDir, NoPath]:
"""Cleans up after failed clone operations."""
try:
if new:
return F.rmtree(targetdir)
_, dirs, files = F.shallow_walk(targetdir)
for directory in dirs:
F.rmtree(directory)
for file in files:
os.remove(file)
except PermissionError as _:
pass
return F.test(targetdir)
# Write all files to `targetdir`, and instantiate a `KiRepo` object.
targetdir: EmptyDir
new: bool
targetdir, new = get_target(F.cwd(), col_file, directory)
try:
_, _ = _clone(
col_file,
targetdir,
msg="Initial commit",
silent=False,
verbose=verbose,
)
kirepo: KiRepo = M.kirepo(targetdir)
F.write(kirepo.last_push_file, kirepo.repo.head.commit.hexsha)
kirepo.repo.close()
gc.collect()
echo("Done.")
except Exception as err:
cleanup(targetdir, new)
raise err
if PROFILE:
profiler.stop()
s = profiler.output_html()
Path("ki_clone_profile.html").resolve().write_text(s, encoding="UTF-8")
@beartype
def _clone(
col_file: ExtantFile,
targetdir: EmptyDir,
msg: str,
silent: bool,
verbose: bool,
) -> Tuple[git.Repo, str]:
"""
Clone an Anki collection into a directory.
The caller expects that `targetdir` will be the root of a valid ki
repository after this function is called, so we need to do our repo
initialization with gitpython in here, as opposed to in `clone()`.
Parameters
----------
col_file : pathlib.Path
The path to an `.anki2` collection file.
targetdir : pathlib.Path
A path to a directory to clone the collection into.
Note: we check that this directory is empty.
msg : str
Message for initial commit.
silent : bool
Whether to suppress progress information printed to stdout.
Returns
-------
repo : git.Repo
The cloned repository.
md5sum : str
The hash of the Anki collection file.
"""
echo(f"Found .anki2 file at '{col_file}'", silent=silent)
# Create `.ki/` and `_media/`, and create empty metadata files in `.ki/`.
# TODO: Consider writing a Maybe factory for all this.
directories: Leaves = F.fmkleaves(targetdir, dirs={KI: KI, MEDIA: MEDIA})
leaves: Leaves = F.fmkleaves(
directories.dirs[KI],
files={CONFIG_FILE: CONFIG_FILE, LAST_PUSH_FILE: LAST_PUSH_FILE},
dirs={BACKUPS_DIR: BACKUPS_DIR},
)
md5sum = F.md5(col_file)
echo(f"Computed md5sum: {md5sum}", silent)
echo(f"Cloning into '{targetdir}'...", silent=silent)
(targetdir / GITIGNORE_FILE).write_text(KI + "\n")
# Write notes to disk.
latent_links: Set[LatentSymlink] = write_repository(
col_file,
targetdir,
leaves,
directories.dirs[MEDIA],
silent,
verbose,
)
# Initialize the main repository.
with F.halo("Initializing repository and committing contents..."):
repo = git.Repo.init(targetdir, initial_branch=BRANCH_NAME)
root = F.working_dir(repo)
repo.git.add(all=True)
_ = repo.index.commit(msg)
# Use `git update-index` to set 120000 file mode on each latent symlink
# in `latent_links`.
relative_links: Set[Path] = set()
for abslink in latent_links:
link = abslink.relative_to(root)
relative_links.add(link)
# Convert to POSIX pathseps since that's what `git` wants.
githash = repo.git.hash_object(["-w", f"{link.as_posix()}"])
target = f"120000,{githash},{link.as_posix()}"
repo.git.update_index(target, add=True, cacheinfo=True)
# We do *not* call `git.add()` here since we call `git.update_index()` above.
_ = repo.index.commit(msg)
# On Windows, there are changes left in the working tree at this point
# (because git sees that the mode of the actual underlying file is
# 100644), so we must stash them in order to ensure the repo is not
# left dirty.
repo.git.stash("save")
if repo.is_dirty():
raise NonEmptyWorkingTreeError(repo)
# Squash last two commits together.
repo.git.reset(["--soft", "HEAD~1"])
repo.git.commit(message=msg, amend=True)
# Store a checksum of the Anki collection file in the hashes file.
append_md5sum(directories.dirs[KI], col_file.name, md5sum, silent)
return repo, md5sum
@ki.command()
@beartype
def pull() -> None:
"""
Pull from a preconfigured remote Anki collection into an existing ki
repository.
"""
if PROFILE:
# pylint: disable=import-outside-toplevel
from pyinstrument import Profiler
# pylint: enable=import-outside-toplevel
profiler = Profiler()
profiler.start()
# Check that we are inside a ki repository, and get the associated collection.
kirepo: KiRepo = M.kirepo(F.cwd())
con: sqlite3.Connection = lock(kirepo.col_file)
md5sum: str = F.md5(kirepo.col_file)
hashes: List[str] = kirepo.hashes_file.read_text(encoding="UTF-8").split("\n")
hashes = list(filter(lambda l: l != "", hashes))
if md5sum in hashes[-1]:
echo("ki pull: up to date.")
return
_pull(kirepo, silent=False)
unlock(con)
if PROFILE:
profiler.stop()
s = profiler.output_html()
Path("ki_pull_profile.html").resolve().write_text(s, encoding="UTF-8")
@beartype
def _pull(kirepo: KiRepo, silent: bool) -> None:
"""
Pull into `kirepo` without checking if we are already up-to-date.
Load the git repository at `anki_remote_root`, force pull (preferring
'theirs', i.e. the new stuff from the sqlite3 database) changes from that
repository (which is cloned straight from the collection, which in general
may have new changes) into `last_push_repo`, and then pull `last_push_repo`
into the main repository.
We pull in this sequence in order to avoid merge conflicts. Since we first
pull into a snapshot of the repository as it looked when we last pushed to
the database, we know that there cannot be any merge conflicts, because to
git, it just looks like we haven't made any changes since then. Then we
pull the result of that merge into our actual repository. So there could
still be merge conflicts at that point, but they will only be 'genuine'
merge conflicts in some sense, because as a result of using this snapshot
strategy, we give the anki collection the appearance of being a persistent
remote git repo. If we didn't do this, the fact that we did a fresh clone
of the database every time would mean that everything would look like a
merge conflict, because there is no shared history.
Parameters
----------
kirepo : KiRepo
A dataclass representing the Ki repository in the cwd.
silent : bool
Whether to suppress progress information printed to stdout.
Raises
------
CollectionChecksumError
If the Anki collection file was modified while pulling changes. This is
very unlikely, since the caller acquires a lock on the SQLite3
database.
"""
md5sum: str = F.md5(kirepo.col_file)
echo(f"Pulling from '{kirepo.col_file}'", silent)
echo(f"Computed md5sum: {md5sum}", silent)
# Copy `repo` into a temp directory and `reset --hard` at ref of last
# successful `push()`.
sha: str = kirepo.last_push_file.read_text(encoding="UTF-8")
with F.halo(text=f"Checking out repo at '{sha}'..."):
ref: RepoRef = M.repo_ref(kirepo.repo, sha=sha)
last_push_repo: git.Repo = copy_repo(ref, f"{LOCAL_SUFFIX}-{md5sum}")
unsub_repo: git.Repo = copy_repo(ref, f"unsub-{LOCAL_SUFFIX}-{md5sum}")
# Ki clone collection into a temp directory at `anki_remote_root`.
anki_remote_root: EmptyDir = F.mksubdir(F.mkdtemp(), REMOTE_SUFFIX / md5sum)
msg = f"Fetch changes from DB at `{kirepo.col_file}` with md5sum `{md5sum}`"
remote_repo, _ = _clone(
kirepo.col_file,
anki_remote_root,
msg,
silent=silent,
verbose=False,
)
# Create git remote pointing to `remote_repo`, which represents the current
# state of the Anki SQLite3 database, and pull it into `last_push_repo`.
anki_remote = last_push_repo.create_remote(REMOTE_NAME, remote_repo.git_dir)
unsub_remote = unsub_repo.create_remote(REMOTE_NAME, remote_repo.git_dir)
last_push_root: ExtantDir = F.working_dir(last_push_repo)
# =================== NEW PULL ARCHITECTURE ====================
# Update all submodules in `unsub_repo`. This is critically important,
# because it essentially 'rolls-back' commits made in submodules since the
# last successful ki push in the main repository. Our `copy_repo()` call
# does a `reset --hard` to the commit of the last push, but this does *not*
# do an equivalent rollback for submodules. So they may contain new local
# changes that we don't want. Calling `git submodule update` here checks
# out the commit that *was* recorded in the submodule file at the ref of
# the last push.
unsub_root = F.working_dir(unsub_repo)
with F.halo(text="Updating submodules in stage repositories..."):
unsub_repo.git.submodule("update")
last_push_repo.git.submodule("update")
with F.halo(text=f"Unsubmoduling repository at '{unsub_root}'..."):
unsub_repo = unsubmodule_repo(unsub_repo)
patches_dir: ExtantDir = F.mkdtemp()
with F.halo(text=f"Fetching from remote at '{remote_repo.working_dir}'..."):
anki_remote.fetch()
unsub_remote.fetch()
with F.halo(text=f"Diffing 'HEAD' ~ 'FETCH_HEAD' in '{unsub_root}'..."):
raw_unified_patch = unsub_repo.git.diff(["HEAD", "FETCH_HEAD"], binary=True)
@beartype
def unquote_diff_path(path: str) -> str:
if len(path) <= 4:
return path
if path[0] == '"' and path[-1] == '"':
path = path.lstrip('"').rstrip('"')
if path[:2] in ("a/", "b/"):
path = path[2:]
return path
# Construct patches for every file in the flattenened/unsubmoduled checkout
# of the revision of the last successful `ki push`. Each patch is the diff
# between the relevant file in the flattened (all submodules converted to
# ordinary directories) repository and the same file in the Anki remote (a
# fresh `ki clone` of the current database).
patches: List[Patch] = []
f = io.StringIO()
with F.halo(text="Generating submodule patches..."):
with redirect_stdout(f):
for diff in whatthepatch.parse_patch(raw_unified_patch):
a_path = unquote_diff_path(diff.header.old_path)
b_path = unquote_diff_path(diff.header.new_path)
if a_path == DEV_NULL:
a_path = b_path
if b_path == DEV_NULL:
b_path = a_path
patch = Patch(Path(a_path), Path(b_path), diff)
patches.append(patch)
# Construct a map that sends submodule relative roots, that is, the
# relative path of a submodule root directory to the top-level root
# directory of the ki repository, to `git.Repo` objects for each submodule.
subrepos: Dict[Path, git.Repo] = {}
for sm in last_push_repo.submodules:
if sm.exists() and sm.module_exists():
sm_repo: git.Repo = sm.module()
sm_root: ExtantDir = F.working_dir(sm_repo)
# Get submodule root relative to ki repository root.
sm_rel_root: Path = sm_root.relative_to(last_push_root)
subrepos[sm_rel_root] = sm_repo
# Remove submodules directories from remote repo.
halotext = f"Removing submodule directory '{sm_rel_root}' from remote..."
if os.path.isdir(anki_remote_root / sm_rel_root):
with F.halo(text=halotext):
remote_repo.git.rm(["-r", str(sm_rel_root)])
if len(last_push_repo.submodules) > 0:
with F.halo(text="Committing submodule directory removals..."):
remote_repo.git.add(all=True)
remote_repo.index.commit("Remove submodule directories.")
# Apply patches within submodules.
msg = "Applying patches:\n\n"
patches_bar = tqdm(patches, ncols=TQDM_NUM_COLS)
patches_bar.set_description("Patches")
patched_submodules: Set[Path] = set()
for patch in patches_bar:
for sm_rel_root, sm_repo in subrepos.items():
# TODO: We must also treat case where we moved a file into or out
# of a submodule, but we just do this for now. In this case, we may
# have `patch.a` not be relative to the submodule root (if we moved
# a file into the sm dir), or vice-versa.
a_in_submodule: bool = patch.a.is_relative_to(sm_rel_root)
b_in_submodule: bool = patch.b.is_relative_to(sm_rel_root)
if a_in_submodule and b_in_submodule:
# Ignore the submodule 'file' itself.
if sm_rel_root in (patch.a, patch.b):
continue
# Hash the patch to use as a filename.
blake2 = hashlib.blake2s()
blake2.update(patch.diff.text.encode())
patch_hash: str = blake2.hexdigest()
patch_path: NoFile = F.test(patches_dir / patch_hash)
# Strip trailing linefeeds from each line so that `git apply`
# is happy on Windows (equivalent to running `dos2unix`).
patch_path: ExtantFile = F.write(patch_path, patch.diff.text)
if sys.platform == "win32":
with open(patch_path, "rb") as f:
patch_bytes = f.read()
reformatted_bytes = patch_bytes.replace(b"\r\n", b"\n")
with open(patch_path, "wb") as f:
f.write(reformatted_bytes)
# Number of leading path components to drop from diff paths.
num_parts = len(sm_rel_root.parts) + 1
# TODO: More tests are needed to make sure that the `git apply`
# call is not flaky. In particular, we must treat new and
# deleted files.
#
# Note that it is unnecessary to use `--3way` here, because
# this submodule is supposed to represent a fast-forward from
# the last successful push to the current state of the remote.
# There should be no nontrivial merging involved.
#
# Then -p<n> flag tells `git apply` to drop the first n leading
# path components from both diff paths. So if n is 2, we map
# `a/dog/cat` -> `cat`.
sm_repo.git.apply(
patch_path,
p=str(num_parts),
allow_empty=True,
verbose=True,
)
patched_submodules.add(sm_rel_root)
msg += f" `{patch.a}`\n"
echo(f"Applied {len(patched_submodules)} patches within submodules.")
for sm_rel_root in patched_submodules:
echo(f" Patched '{sm_rel_root}'")
# Commit patches in submodules.
with F.halo(text="Committing applied patches to submodules..."):
for sm_repo in subrepos.values():
sm_repo.git.add(all=True)
sm_repo.index.commit(msg)
# Get active branches of each submodule.
sm_branches: Dict[Path, str] = {}
for sm_rel_root, sm_repo in subrepos.items():
try:
sm_branches[sm_rel_root] = sm_repo.active_branch.name
except TypeError:
head: git.Head = next(iter(sm_repo.branches))
sm_branches[sm_rel_root] = head.name
# TODO: What if a submodule was deleted (or added) entirely?
#
# New commits in submodules within `last_push_repo` are be pulled into the
# submodules within `kirepo.repo`. This is done by adding a remote pointing
# to the patched submodule in each corresponding submodule in the main
# repository, and then pulling from that remote. Then the remote is
# deleted.
for sm in kirepo.repo.submodules:
if sm.exists() and sm.module_exists():
sm_repo: git.Repo = sm.module()
sm_rel_root: Path = F.working_dir(sm_repo).relative_to(kirepo.root)
# Note that `subrepos` are the submodules of `last_push_repo`.
if sm_rel_root in subrepos:
remote_sm: git.Repo = subrepos[sm_rel_root]
branch: str = sm_branches[sm_rel_root]
# TODO: What's in `upstream` that isn't already in `branch`?
remote_sm.git.branch("upstream")
# Simulate a `git merge --strategy=theirs upstream`.
remote_sm.git.checkout(["-b", "tmp", "upstream"])
remote_sm.git.merge(["-s", "ours", branch])
remote_sm.git.checkout(branch)
remote_sm.git.merge("tmp")
remote_sm.git.branch(["-D", "tmp"])
remote_target: ExtantDir = F.git_dir(remote_sm)
sm_remote = sm_repo.create_remote(REMOTE_NAME, remote_target)
git_pull(
REMOTE_NAME,
branch,
F.working_dir(sm_repo),
False,
False,
False,
silent,
)
sm_repo.delete_remote(sm_remote)
remote_sm.close()
sm_repo.close()
# Commit new submodules commits in `last_push_repo`.
if len(patched_submodules) > 0:
with F.halo(text=f"Committing new submodule commits to '{last_push_root}'..."):
last_push_repo.git.add(all=True)
last_push_repo.index.commit(msg)
# Handle deleted files, preferring `theirs`.
deletes = 0
del_msg = "Remove files deleted in remote.\n\n"
fetch_head = last_push_repo.commit("FETCH_HEAD")
diff_index = last_push_repo.commit("HEAD").diff(fetch_head)
for diff in diff_index.iter_change_type(GitChangeType.DELETED.value):
# Don't remove gitmodules.
if diff.a_path == GITMODULES_FILE:
continue
a_path: Path = F.test(last_push_root / diff.a_path)
if isinstance(a_path, ExtantFile):
last_push_repo.git.rm(diff.a_path)
del_msg += f"Remove '{a_path}'\n"
deletes += 1
if deletes > 0:
with F.halo(text=f"Committing remote deletions to '{last_push_root}'..."):
last_push_repo.git.add(all=True)
last_push_repo.index.commit(del_msg)
# =================== NEW PULL ARCHITECTURE ====================
echo(f"Copying blobs at HEAD='{sha}' to stage in '{last_push_root}'...")
git_copy = F.copytree(F.git_dir(last_push_repo), F.test(F.mkdtemp() / "GIT"))
last_push_repo.close()
last_push_root: NoFile = F.rmtree(F.working_dir(last_push_repo))
del last_push_repo
remote_root: ExtantDir = F.working_dir(remote_repo)
last_push_root: ExtantDir = F.copytree(remote_root, last_push_root)
echo(f"Copying git history from '{sha}' to stage...")
last_push_repo: git.Repo = M.repo(last_push_root)
git_dir: NoPath = F.rmtree(F.git_dir(last_push_repo))
del last_push_repo
F.copytree(git_copy, F.test(git_dir))
echo(f"Committing stage repository contents in '{last_push_root}'...")
last_push_repo: git.Repo = M.repo(last_push_root)
last_push_repo.git.add(all=True)
last_push_repo.index.commit(f"Pull changes from repository at `{kirepo.root}`")
# Create remote pointing to `last_push_repo` and pull into `repo`. Note
# that this `git pull` may not always create a merge commit, because a
# fast-forward only updates the branch pointer.
last_push_remote = kirepo.repo.create_remote(REMOTE_NAME, last_push_repo.git_dir)
kirepo.repo.git.config("pull.rebase", "false")
git_pull(REMOTE_NAME, BRANCH_NAME, kirepo.root, False, False, False, silent)
kirepo.repo.delete_remote(last_push_remote)
# Append the hash of the collection to the hashes file, and raise an error
# if the collection was modified while we were pulling changes.
append_md5sum(kirepo.ki_dir, kirepo.col_file.name, md5sum, silent=True)
if F.md5(kirepo.col_file) != md5sum:
raise CollectionChecksumError(kirepo.col_file)
# PUSH
@ki.command()
@click.option("--verbose", "-v", is_flag=True, help="Print more output.")
@beartype
def push(verbose: bool = False) -> PushResult:
"""
Push a ki repository into a .anki2 file.
Consists of the following operations:
- Clone collection into a temp directory
- Delete all files and folders except `.git/`
- Copy into this temp directory all files and folders from the current
repository checked out at HEAD (excluding `.git*` stuff).
- Unsubmodule repo
- Add and commit everything
- Take diff from `HEAD~1` to `HEAD`.
Returns
-------
PushResult
An `Enum` indicating whether the push was nontrivial.
Raises
------
UpdatesRejectedError
If the user needs to pull remote changes first.
"""
if PROFILE:
# pylint: disable=import-outside-toplevel
from pyinstrument import Profiler
# pylint: enable=import-outside-toplevel
profiler = Profiler()
profiler.start()
pp.install_extras(exclude=["ipython", "django", "ipython_repr_pretty"])
# Check that we are inside a ki repository, and load collection.
cwd: ExtantDir = F.cwd()
kirepo: KiRepo = M.kirepo(cwd)
con: sqlite3.Connection = lock(kirepo.col_file)
md5sum: str = F.md5(kirepo.col_file)
hashes: List[str] = kirepo.hashes_file.read_text(encoding="UTF-8").split("\n")
hashes = list(filter(lambda l: l != "", hashes))
if md5sum not in hashes[-1]:
raise UpdatesRejectedError(kirepo.col_file)
# =================== NEW PUSH ARCHITECTURE ====================
with F.halo("Initializing stage repository..."):
head: KiRepoRef = M.head_kirepo_ref(kirepo)
head_kirepo: KiRepo = copy_kirepo(head, f"{HEAD_SUFFIX}-{md5sum}")
remote_root: EmptyDir = F.mksubdir(F.mkdtemp(), REMOTE_SUFFIX / md5sum)
msg = f"Fetch changes from collection '{kirepo.col_file}' with md5sum '{md5sum}'"
remote_repo, _ = _clone(
kirepo.col_file,
remote_root,
msg,
silent=True,
verbose=verbose,
)
with F.halo(f"Copying blobs at HEAD='{head.sha}' to stage in '{remote_root}'..."):
git_copy = F.copytree(F.git_dir(remote_repo), F.test(F.mkdtemp() / "GIT"))
remote_repo.close()
remote_root: NoFile = F.rmtree(F.working_dir(remote_repo))
del remote_repo
remote_root: ExtantDir = F.copytree(head_kirepo.root, remote_root)
with F.halo(f"Flattening stage repository submodules in '{remote_root}'..."):
remote_repo: git.Repo = unsubmodule_repo(M.repo(remote_root))
git_dir: NoPath = F.rmtree(F.git_dir(remote_repo))
del remote_repo
F.copytree(git_copy, F.test(git_dir))
with F.halo(f"Committing stage repository contents in '{remote_root}'..."):
remote_repo: git.Repo = M.repo(remote_root)
remote_repo.git.add(all=True)
remote_repo.index.commit(f"Pull changes from repository at `{kirepo.root}`")
# =================== NEW PUSH ARCHITECTURE ====================
# Read grammar.
# TODO:! Should we assume this always exists? A nice error message should
# be printed on initialization if the grammar file is missing. No
# computation should be done, and none of the click commands should work.
grammar_path = Path(__file__).resolve().parent / "grammar.lark"
grammar = grammar_path.read_text(encoding="UTF-8")
# Instantiate parser.
parser = Lark(grammar, start="note", parser="lalr")
transformer = NoteTransformer()
deltas: List[Union[Delta, Warning]] = diff2(remote_repo, parser, transformer)
# Map model names to models.
models: Dict[str, Notetype] = get_models_recursively(head_kirepo, silent=True)
result: PushResult = push_deltas(
deltas,
models,
kirepo,
md5sum,
parser,
transformer,
head_kirepo,
con,
verbose,
)
if PROFILE:
profiler.stop()
s = profiler.output_html()
Path("ki_push_profile.html").resolve().write_text(s, encoding="UTF-8")
return result
@beartype
def push_deltas(
deltas: List[Union[Delta, Warning]],
models: Dict[str, Notetype],
kirepo: KiRepo,
md5sum: str,
parser: Lark,
transformer: NoteTransformer,
head_kirepo: KiRepo,
con: sqlite3.Connection,
verbose: bool,
) -> PushResult:
"""Push a list of `Delta`s to an Anki collection."""
warnings: List[Warning] = [delta for delta in deltas if isinstance(delta, Warning)]
deltas: List[Delta] = [delta for delta in deltas if isinstance(delta, Delta)]
# Display warnings from diff procedure.
for warning in warnings:
if verbose or type(warning) not in WARNING_IGNORE_LIST:
click.secho(str(warning), fg="yellow")
warnings = []
# If there are no changes, quit.
if len(set(deltas)) == 0:
echo("ki push: up to date.")
return PushResult.UP_TO_DATE
echo(f"Pushing to '{kirepo.col_file}'")
echo(f"Computed md5sum: {md5sum}")
echo(f"Verified md5sum matches latest hash in '{kirepo.hashes_file}'")
# Copy collection to a temp directory.
temp_col_dir: ExtantDir = F.mkdtemp()
new_col_file = temp_col_dir / kirepo.col_file.name
col_name: str = kirepo.col_file.name
new_col_file: ExtantFile = F.copyfile(kirepo.col_file, temp_col_dir, col_name)
head: RepoRef = M.head_repo_ref(kirepo.repo)
echo(f"Generating local .anki2 file from latest commit: {head.sha}")
echo(f"Writing changes to '{new_col_file}'...")
# Open collection, holding cwd constant (otherwise Anki changes it).
cwd: ExtantDir = F.cwd()
col: Collection = M.collection(new_col_file)
F.chdir(cwd)
# Add new models to the collection.
add_models(col, models)
# Stash both unstaged and staged files (including untracked).
kirepo.repo.git.stash(include_untracked=True, keep_index=True)
kirepo.repo.git.reset("HEAD", hard=True)
# Display table of note change type counts.
echo_note_change_types(deltas)
# Construct a map from guid -> (nid, mod, mid), adapted from
# `Anki2Importer._import_notes()`. Note that `mod` is the modification
# timestamp, in epoch seconds (timestamp of when the note was last
# modified).
guids: Dict[str, NoteMetadata] = {}
for nid, guid, mod, mid in col.db.execute("select id, guid, mod, mid from notes"):
guids[guid] = NoteMetadata(nid, mod, mid)
timestamp_ns: int = time.time_ns()
new_nids: Iterator[int] = itertools.count(int(timestamp_ns / 1e6))
is_delete = lambda d: d.status == GitChangeType.DELETED
bar = tqdm(deltas, ncols=TQDM_NUM_COLS)
bar.set_description("Deltas")
for delta in bar:
# Parse the file at `delta.path` into a `DeckNote`, and
# add/edit/delete in collection.
decknote = parse_markdown_note(parser, transformer, delta)
if is_delete(delta):
if decknote.guid in guids:
col.remove_notes([guids[decknote.guid].nid])
continue
# TODO: If relevant prefix of sort field has changed, we should
# regenerate the file. Recall that the sort field is used to determine
# the filename. If the content of the sort field has changed, then we
# may need to update the filename.
warnings += push_note(col, decknote, timestamp_ns, guids, new_nids)
num_displayed: int = 0
for warning in warnings:
if verbose or type(warning) not in WARNING_IGNORE_LIST:
click.secho(str(warning), fg="yellow")
num_displayed += 1
num_suppressed: int = len(warnings) - num_displayed
echo(f"Warnings suppressed: {num_suppressed} (show with '--verbose')")
# It is always safe to save changes to the DB, since the DB is a copy.
col.close(save=True)
# Backup collection file and overwrite collection.
backup(kirepo)
new_col_file = F.copyfile(new_col_file, F.parent(kirepo.col_file), col_name)
echo(f"Overwrote '{kirepo.col_file}'")
# Add media files to collection.
col: Collection = M.collection(kirepo.col_file)
media_files = F.rglob(head_kirepo.root, MEDIA_FILE_RECURSIVE_PATTERN)
warnings = []
bar = tqdm(media_files, ncols=TQDM_NUM_COLS, disable=False)
bar.set_description("Media")
for media_file in bar:
# Check file mode, and follow symlink if applicable.
mode: int = filemode(media_file)
if mode == 120000:
target: str = media_file.read_text(encoding="UTF-8")
parent: ExtantDir = F.parent(media_file)
media_file: ExtantFile = M.xfile(parent / target)
# Get bytes of new media file.
with open(media_file, "rb") as f:
new: bytes = f.read()
# Get bytes of existing media file (if it exists).
old: bytes = media_data(col, media_file.name)
if old and old == new:
continue
# Add (and possibly rename) media paths.
new_media_filename: str = col.media.add_file(media_file)
if new_media_filename != media_file.name:
warning = RenamedMediaFileWarning(media_file.name, new_media_filename)
warnings.append(warning)
col.close(save=True)
num_displayed: int = 0
for warning in warnings:
if verbose or type(warning) not in WARNING_IGNORE_LIST:
click.secho(str(warning), fg="yellow")
num_displayed += 1
num_suppressed: int = len(warnings) - num_displayed
echo(f"Warnings suppressed: {num_suppressed} (show with '--verbose')")
# Append to hashes file.
new_md5sum = F.md5(new_col_file)
append_md5sum(kirepo.ki_dir, new_col_file.name, new_md5sum, silent=False)
# Update the commit SHA of most recent successful PUSH.
head: RepoRef = M.head_repo_ref(kirepo.repo)
kirepo.last_push_file.write_text(head.sha)
# Unlock Anki SQLite DB.
unlock(con)
return PushResult.NONTRIVIAL
Sub-modules
ki.functional
-
Type-safe, non Anki-specific functions.
ki.maybes
-
Monadic factory functions for safely handling errors in type construction.
ki.transformer
-
A Lark transformer for the ki note grammar.
ki.types
-
Types for ki.
Functions
def add_db_note(col: anki.collection.Collection, nid: int, guid: str, mid: int, mod: int, usn: int, tags: list[str], fields: list[str], sfld: str, csum: int, flags: int, data: str) ‑> anki.notes.Note
-
Add a note to the database directly, with a SQL INSERT.
Expand source code
@beartype def add_db_note( col: Collection, nid: int, guid: str, mid: int, mod: int, usn: int, tags: List[str], fields: List[str], sfld: str, csum: int, flags: int, data: str, ) -> Note: """Add a note to the database directly, with a SQL INSERT.""" importer = NoteImporter(col, "") importer.addNew( [ ( nid, guid, mid, mod, usn, " " + " ".join(tags) + " ", "\x1f".join(fields), sfld, csum, flags, data, ) ] ) col.after_note_updates([nid], mark_modified=False) return col.get_note(nid)
def add_models(col: anki.collection.Collection, models: dict[str, Notetype]) ‑> None
-
Add all new models.
Expand source code
@beartype def add_models(col: Collection, models: Dict[str, Notetype]) -> None: """Add all new models.""" @beartype def get_notetype_json(notetype: Notetype) -> str: dictionary: Dict[str, Any] = dataclasses.asdict(notetype) dictionary.pop("id") inner = dictionary["dict"] inner.pop("id") inner.pop("mod") dictionary["dict"] = inner return json.dumps(dictionary, sort_keys=True, indent=4) @beartype def notetype_hash_repr(notetype: Notetype) -> str: s = get_notetype_json(notetype) return f"JSON for '{pp.pformat(notetype.id)}':\n{s}" for model in models.values(): # TODO: Consider waiting to parse `models` until after the # `add_dict()` call. # # Set the model id to `0`, and then add. # TODO: What happens if we try to add a model with the same name as # an existing model, but the two models are not the same, # content-wise? # # Check if a model already exists with this name, and get its `mid`. mid: Optional[int] = col.models.id_for_name(model.name) # TODO: This block is unfinished. We need to add new notetypes (and # rename them) only if they are 'new', where new means they are # different from anything else already in the DB, in the # content-addressed sense. If they are new, then we must indicate that # the notes we are adding actually have these new notetypes. For this, # it may make sense to use the hash of the notetype everywhere (i.e. in # the note file) rather than the name or mid. # # If a model already exists with this name, parse it, and check if its # hash is identical to the model we are trying to add. if mid is not None: nt: NotetypeDict = col.models.get(mid) # If we are trying to add a model that has the exact same content # and name as an existing model, skip it. existing_model: Notetype = parse_notetype_dict(nt) if get_notetype_json(model) == get_notetype_json(existing_model): continue logger.warning( f"Collision: New model '{model.name}' has same name " f"as existing model with mid '{mid}', but hashes differ." ) logger.warning(notetype_hash_repr(model)) logger.warning(notetype_hash_repr(existing_model)) # If the hashes don't match, then we somehow need to update # `decknote.model` for the relevant notes. # TODO: Consider using the hash of the notetype instead of its # name. nt_copy: NotetypeDict = copy.deepcopy(model.dict) nt_copy["id"] = 0 changes: OpChangesWithId = col.models.add_dict(nt_copy) nt: NotetypeDict = col.models.get(changes.id) model: Notetype = parse_notetype_dict(nt) echo(f"Added model '{model.name}'")
def append_md5sum(ki_dir: ExtantDir, tag: str, md5sum: str, silent: bool = False) ‑> None
-
Append an md5sum hash to the hashes file.
Expand source code
@beartype def append_md5sum( ki_dir: ExtantDir, tag: str, md5sum: str, silent: bool = False ) -> None: """Append an md5sum hash to the hashes file.""" hashes_file = ki_dir / HASHES_FILE with open(hashes_file, "a+", encoding="UTF-8") as hashes_f: hashes_f.write(f"{md5sum} {tag}\n") echo(f"Wrote md5sum to '{hashes_file}'", silent)
def backup(kirepo: KiRepo) ‑> None
-
Backup collection to
.ki/backups
.Expand source code
@beartype def backup(kirepo: KiRepo) -> None: """Backup collection to `.ki/backups`.""" md5sum = F.md5(kirepo.col_file) name = f"{md5sum}.anki2" backup_file = F.test(kirepo.backups_dir / name) # We assume here that no one would ever make e.g. a directory called # `name`, since `name` contains the md5sum of the collection file, and # thus that is extraordinarily improbable. So the only thing we have to # check for is that we haven't already written a backup file to this # location. if isinstance(backup_file, ExtantFile): echo("Backup already exists.") return echo(f"Writing backup of .anki2 file to '{backup_file}'") F.copyfile(kirepo.col_file, kirepo.backups_dir, name)
def chain_media_symlinks(root: anki.decks_pb2.DeckTreeNode, col: anki.collection.Collection, targetdir: ExtantDir, media: dict[int, set[ExtantFile]]) ‑> set[LatentSymlink]
-
Chain symlinks up the deck tree into top-level
<collection>/_media/
.Expand source code
@beartype def chain_media_symlinks( root: DeckTreeNode, col: Collection, targetdir: ExtantDir, media: Dict[int, Set[ExtantFile]], ) -> Set[LatentSymlink]: """Chain symlinks up the deck tree into top-level `<collection>/_media/`.""" @beartype def preorder(node: DeckTreeNode) -> List[DeckTreeNode]: """ Pre-order traversal. Guarantees that we won't process a node until we've processed all its ancestors. """ traversal: List[DeckTreeNode] = [node] for child in node.children: traversal += preorder(child) return traversal @beartype def map_parents(node: DeckTreeNode, col: Collection) -> Dict[str, DeckTreeNode]: """Map deck names to parent `DeckTreeNode`s.""" parents: Dict[str, DeckTreeNode] = {} for child in node.children: did: int = child.deck_id name: str = col.decks.name(did) parents[name] = node subparents = map_parents(child, col) parents.update(subparents) return parents media_latent_links: Set[LatentSymlink] = set() media_dirs: Dict[str, ExtantDir] = {} parents: Dict[str, DeckTreeNode] = map_parents(root, col) for node in preorder(root): if node.name == "": continue did: int = node.deck_id fullname = col.decks.name(did) deck_dir: ExtantDir = create_deck_dir(fullname, targetdir) deck_media_dir: ExtantDir = F.force_mkdir(deck_dir / MEDIA) media_dirs[fullname] = deck_media_dir descendant_nids: Set[int] = {NOTETYPE_NID} descendants: List[CardId] = col.decks.cids(did=did, children=True) for cid in descendants: card: Card = col.get_card(cid) descendant_nids.add(card.nid) for nid in descendant_nids: if nid not in media: continue for media_file in media[nid]: parent: DeckTreeNode = parents[fullname] if parent.name != "": parent_did: int = parent.deck_id parent_fullname: str = col.decks.name(parent_did) parent_media_dir = media_dirs[parent_fullname] abs_target: Symlink = F.test( parent_media_dir / media_file.name, resolve=False ) else: abs_target: ExtantFile = media_file path = F.test(deck_media_dir / media_file.name, resolve=False) if not isinstance(path, NoFile): continue distance = len(path.parent.relative_to(targetdir).parts) up_path = Path("../" * distance) relative: Path = abs_target.relative_to(targetdir) target: Path = up_path / relative try: link: Union[Symlink, LatentSymlink] = F.symlink(path, target) if isinstance(link, LatentSymlink): media_latent_links.add(link) except OSError as _: trace = traceback.format_exc(limit=3) logger.warning(f"Failed to create symlink to media\n{trace}") return media_latent_links
def copy_kirepo(kirepo_ref: KiRepoRef, suffix: str) ‑> KiRepo
-
Given a KiRepoRef, i.e. a pair of the form (kirepo, SHA), we clone
kirepo.repo
into a temp directory and hard reset to the given commit hash. Copies the .ki/ directory fromkirepo_ref.kirepo
without making any changes.Parameters
suffix
:pathlib.Path
- /tmp/…/ path suffix, e.g.
ki/local/
. kirepo_ref
:KiRepoRef
- The ki repository to clone, and a commit for it.
Returns
KiRepo
- The cloned repository.
Expand source code
@beartype def copy_kirepo(kirepo_ref: KiRepoRef, suffix: str) -> KiRepo: """ Given a KiRepoRef, i.e. a pair of the form (kirepo, SHA), we clone `kirepo.repo` into a temp directory and hard reset to the given commit hash. Copies the .ki/ directory from `kirepo_ref.kirepo` without making any changes. Parameters ---------- suffix : pathlib.Path /tmp/.../ path suffix, e.g. `ki/local/`. kirepo_ref : KiRepoRef The ki repository to clone, and a commit for it. Returns ------- KiRepo The cloned repository. """ ref: RepoRef = F.kirepo_ref_to_repo_ref(kirepo_ref) ephem: git.Repo = copy_repo(ref, suffix) ki_dir: Path = F.test(F.working_dir(ephem) / KI) if not isinstance(ki_dir, NoFile): raise ExpectedNonexistentPathError(ki_dir) F.copytree(kirepo_ref.kirepo.ki_dir, ki_dir) kirepo: KiRepo = M.kirepo(F.working_dir(ephem)) return kirepo
def copy_media_files(col: anki.collection.Collection, media_target_dir: EmptyDir, silent: bool) ‑> tuple[dict[int, set[ExtantFile]], set[Warning]]
-
Get a list of extant media files used in notes and notetypes, copy those media files to the top-level
_media/
directory in the repository root, and return a map sending note ids to sets of copied media files.Adapted from code in
anki/pylib/anki/exporting.py
. Specifically, theAnkiExporter.exportInto()
function.Sqlite3 Notes Table Schema
CREATE TABLE notes ( id integer PRIMARY KEY, guid text NOT NULL, mid integer NOT NULL, mod integer NOT NULL, usn integer NOT NULL, tags text NOT NULL, flds text NOT NULL, – The use of type integer for sfld is deliberate, because it means – that integer values in this field will sort numerically. sfld integer NOT NULL, csum integer NOT NULL, flags integer NOT NULL, data text NOT NULL );
Parameters
col
- Anki collection.
silent
- Whether to display stdout.
Expand source code
@beartype def copy_media_files( col: Collection, media_target_dir: EmptyDir, silent: bool, ) -> Tuple[Dict[int, Set[ExtantFile]], Set[Warning]]: """ Get a list of extant media files used in notes and notetypes, copy those media files to the top-level `_media/` directory in the repository root, and return a map sending note ids to sets of copied media files. Adapted from code in `anki/pylib/anki/exporting.py`. Specifically, the `AnkiExporter.exportInto()` function. SQLite3 notes table schema -------------------------- CREATE TABLE notes ( id integer PRIMARY KEY, guid text NOT NULL, mid integer NOT NULL, mod integer NOT NULL, usn integer NOT NULL, tags text NOT NULL, flds text NOT NULL, -- The use of type integer for sfld is deliberate, because it means -- that integer values in this field will sort numerically. sfld integer NOT NULL, csum integer NOT NULL, flags integer NOT NULL, data text NOT NULL ); Parameters ---------- col Anki collection. silent Whether to display stdout. """ # All note ids as a string for the SQL query. strnids = ids2str(list(col.find_notes(query=""))) # This is the path to the media directory. In the original implementation # of `AnkiExporter.exportInto()`, there is check made of the form # # if self.mediaDir: # # before doing path manipulation with this string. # # Examining the `__init__()` function of `MediaManager`, we can see that # `col.media.dir()` will only be `None` in the case where `server=True` is # passed to the `Collection` constructor. But since we do the construction # within ki, we have a guarantee that this will never be true, and thus we # can assume it is a nonempty string, which is all we need for the # following code to be safe. media_dir = F.test(Path(col.media.dir())) if not isinstance(media_dir, ExtantDir): raise MissingMediaDirectoryError(col.path, media_dir) # Find only used media files, collecting warnings for bad paths. media: Dict[int, Set[ExtantFile]] = {} warnings: Set[Warning] = set() query: str = "select * from notes where id in " + strnids rows: List[NoteDBRow] = [NoteDBRow(*row) for row in col.db.all(query)] bar = tqdm(rows, ncols=TQDM_NUM_COLS, leave=not silent) bar.set_description("Media") for row in bar: for file in files_in_str(col, row.flds): # Skip files in subdirs. if file != os.path.basename(file): continue media_file = F.test(media_dir / file) if isinstance(media_file, ExtantFile): copied_file = F.copyfile(media_file, media_target_dir, media_file.name) media[row.nid] = media.get(row.nid, set()) | set([copied_file]) else: warnings.add(MissingMediaFileWarning(col.path, media_file)) mids = col.db.list("select distinct mid from notes where id in " + strnids) # Faster version. _, _, files = F.shallow_walk(media_dir) fnames: List[Path] = [Path(file.name) for file in files] for fname in fnames: # Notetype template media files are *always* prefixed by underscores. if str(fname).startswith("_"): # Scan all models in mids for reference to fname. for m in col.models.all(): if int(m["id"]) in mids and _modelHasMedia(m, str(fname)): # If the path referenced by `fname` doesn't exist or is not # a file, we do not display a warning or return an error. # This path certainly ought to exist, since `fname` was # obtained from an `os.listdir()` call. media_file = F.test(media_dir / fname) if isinstance(media_file, ExtantFile): copied_file = F.copyfile( media_file, media_target_dir, media_file.name ) notetype_media = media.get(NOTETYPE_NID, set()) media[NOTETYPE_NID] = notetype_media | set([copied_file]) break return media, warnings
def copy_repo(repo_ref: RepoRef, suffix: str) ‑> git.repo.base.Repo
-
Get a temporary copy of a git repository in /tmp/
/. Expand source code
@beartype def copy_repo(repo_ref: RepoRef, suffix: str) -> git.Repo: """Get a temporary copy of a git repository in /tmp/<suffix>/.""" # Copy the entire repo into `<tmp_dir>/suffix/`. target: NoFile = F.test(F.mkdtemp() / suffix) ephem = git.Repo(F.copytree(F.working_dir(repo_ref.repo), target)) # Annihilate the .ki subdirectory. ki_dir = F.test(F.working_dir(ephem) / KI) if isinstance(ki_dir, ExtantDir): F.rmtree(ki_dir) # Do a reset --hard to the given SHA. ephem.git.reset(repo_ref.sha, hard=True) return ephem
def create_deck_dir(deck_name: str, targetdir: ExtantDir) ‑> ExtantDir
-
Construct path to deck directory and create it, allowing the case in which the directory already exists because we already created one of its children, in which case this function is a no-op.
Expand source code
@beartype def create_deck_dir(deck_name: str, targetdir: ExtantDir) -> ExtantDir: """ Construct path to deck directory and create it, allowing the case in which the directory already exists because we already created one of its children, in which case this function is a no-op. """ # Strip leading periods so we don't get hidden folders. components = deck_name.split("::") components = [re.sub(r"^\.", r"", comp) for comp in components] components = [re.sub(r"/", r"-", comp) for comp in components] deck_path = Path(targetdir, *components) return F.force_mkdir(deck_path)
def diff2(repo: git.repo.base.Repo, parser: lark.lark.Lark, transformer: NoteTransformer) ‑> list[typing.Union[Delta, Warning]]
-
Diff
repo
fromHEAD~1
toHEAD
.Expand source code
@beartype def diff2( repo: git.Repo, parser: Lark, transformer: NoteTransformer, ) -> List[Union[Delta, Warning]]: """Diff `repo` from `HEAD~1` to `HEAD`.""" with F.halo(text=f"Checking out repo '{F.working_dir(repo)}' at HEAD~1..."): head1: RepoRef = M.repo_ref(repo, repo.commit("HEAD~1").hexsha) # pylint: disable=consider-using-f-string uuid = "%4x" % random.randrange(16**4) # pylint: enable=consider-using-f-string head1_repo = copy_repo(head1, suffix=f"HEAD~1-{uuid}") # We diff from A~B. b_repo = repo a_repo = head1_repo # Use a `DiffIndex` to get the changed files. deltas = [] a_dir = F.test(Path(a_repo.working_dir)) b_dir = F.test(Path(b_repo.working_dir)) head = repo.commit("HEAD") with F.halo(text=f"Diffing '{head1.sha}' ~ '{head.hexsha}'..."): diff_index = repo.commit("HEAD~1").diff(head, create_patch=VERBOSE) for change_type in GitChangeType: diffs = diff_index.iter_change_type(change_type.value) bar = tqdm(diffs, ncols=TQDM_NUM_COLS, leave=False) bar.set_description(f"{change_type}") for diff in bar: a_relpath: str = diff.a_path b_relpath: str = diff.b_path if diff.a_path is None: a_relpath = b_relpath if diff.b_path is None: b_relpath = a_relpath a_warning: Optional[Warning] = get_note_warnings( Path(a_relpath), a_dir, ignore_files=IGNORE_FILES, ignore_dirs=IGNORE_DIRECTORIES, ) b_warning: Optional[Warning] = get_note_warnings( Path(b_relpath), b_dir, ignore_files=IGNORE_FILES, ignore_dirs=IGNORE_DIRECTORIES, ) if VERBOSE: logger.debug(f"{a_relpath} -> {b_relpath}") logger.debug(diff.diff.decode()) if a_warning is not None: deltas.append(a_warning) continue if b_warning is not None: deltas.append(b_warning) continue a_path = F.test(a_dir / a_relpath) b_path = F.test(b_dir / b_relpath) a_relpath = Path(a_relpath) b_relpath = Path(b_relpath) if change_type == GitChangeType.DELETED: if not isinstance(a_path, ExtantFile): deltas.append(DeletedFileNotFoundWarning(a_relpath)) continue deltas.append(Delta(change_type, a_path, a_relpath)) continue if not isinstance(b_path, ExtantFile): deltas.append(DiffTargetFileNotFoundWarning(b_relpath)) continue if change_type == GitChangeType.RENAMED: a_delta = Delta(GitChangeType.DELETED, a_path, a_relpath) b_delta = Delta(GitChangeType.ADDED, b_path, b_relpath) a_decknote: DeckNote = parse_markdown_note(parser, transformer, a_delta) b_decknote: DeckNote = parse_markdown_note(parser, transformer, b_delta) if a_decknote.guid != b_decknote.guid: deltas.append(a_delta) deltas.append(b_delta) continue deltas.append(Delta(change_type, b_path, b_relpath)) if VERBOSE: echo(f"Diffing '{repo.working_dir}': '{head1.sha}' ~ '{head.hexsha}'") return deltas
def display_fields_health_warning(note: anki.notes.Note) ‑> int
-
Display warnings when Anki's fields health check fails.
Expand source code
@beartype def display_fields_health_warning(note: Note) -> int: """Display warnings when Anki's fields health check fails.""" health = note.fields_check() if health == 1: warn(f"Found empty note '{note.id}'") warn(f"Fields health check code: {health}") elif health == 2: duplication = ( "Failed to add note to collection. Notetype/fields of " + f"note '{note.id}' are duplicate of existing note." ) warn(duplication) warn(f"First field:\n{html_to_screen(note.fields[0])}") warn(f"Fields health check code: {health}") elif health != 0: death = ( f"fatal: Note '{note.id}' failed fields check with unknown " + f"error code: {health}" ) logger.error(death) return health
def echo(string: str, silent: bool = False) ‑> None
-
Call
click.secho()
with formatting.Expand source code
@beartype def echo(string: str, silent: bool = False) -> None: """Call `click.secho()` with formatting.""" if not silent: click.secho(string, bold=True)
def echo_note_change_types(deltas: list[Delta]) ‑> None
-
Write a table of git change types for notes to stdout.
Expand source code
@beartype def echo_note_change_types(deltas: List[Delta]) -> None: """Write a table of git change types for notes to stdout.""" is_change_type = lambda t: lambda d: d.status == t adds = list(filter(is_change_type(GitChangeType.ADDED), deltas)) deletes = list(filter(is_change_type(GitChangeType.DELETED), deltas)) renames = list(filter(is_change_type(GitChangeType.RENAMED), deltas)) modifies = list(filter(is_change_type(GitChangeType.MODIFIED), deltas)) typechanges = list(filter(is_change_type(GitChangeType.TYPECHANGED), deltas)) WORD_PAD = 15 COUNT_PAD = 9 add_info: str = "ADD".ljust(WORD_PAD) + str(len(adds)).rjust(COUNT_PAD) delete_info: str = "DELETE".ljust(WORD_PAD) + str(len(deletes)).rjust(COUNT_PAD) modification_info: str = "MODIFY".ljust(WORD_PAD) + str(len(modifies)).rjust( COUNT_PAD ) rename_info: str = "RENAME".ljust(WORD_PAD) + str(len(renames)).rjust(COUNT_PAD) typechange_info: str = "TYPE CHANGE".ljust(WORD_PAD) + str(len(typechanges)).rjust( COUNT_PAD ) echo("=" * (WORD_PAD + COUNT_PAD)) echo("Note change types") echo("-" * (WORD_PAD + COUNT_PAD)) echo(add_info) echo(delete_info) echo(modification_info) echo(rename_info) echo(typechange_info) echo("=" * (WORD_PAD + COUNT_PAD))
def filemode(file: Union[ExtantFile, ExtantDir, ExtantStrangePath, Symlink, LatentSymlink]) ‑> int
-
Get git file mode.
Expand source code
@beartype def filemode( file: Union[ExtantFile, ExtantDir, ExtantStrangePath, Symlink, LatentSymlink] ) -> int: """Get git file mode.""" try: # We must search from file upwards in case inside submodule. repo = git.Repo(file, search_parent_directories=True) out = repo.git.ls_files(["-s", str(file)]) # Treat case where file is untracked. if out == "": return -1 mode: int = int(out.split()[0]) except Exception as err: raise GitFileModeParseError(file, out) from err return mode
def files_in_str(col: anki.collection.Collection, string: str, include_remote: bool = False) ‑> list[str]
-
A copy of
MediaManager.files_in_str()
, but without LaTeX rendering.Expand source code
def files_in_str( col: Collection, string: str, include_remote: bool = False ) -> list[str]: """A copy of `MediaManager.files_in_str()`, but without LaTeX rendering.""" # Extract filenames. files = [] for reg in col.media.regexps: for match in re.finditer(reg, string): fname = match.group("fname") is_local = not re.match("(https?|ftp)://", fname.lower()) if is_local or include_remote: fname = fname.strip() fname = fname.replace('"', "") files.append(fname) return files
def get_colnote(col: anki.collection.Collection, nid: int) ‑> ColNote
-
Get a dataclass representation of an Anki note.
Expand source code
@beartype def get_colnote(col: Collection, nid: int) -> ColNote: """Get a dataclass representation of an Anki note.""" try: note = col.get_note(nid) except NotFoundError as err: raise MissingNoteIdError(nid) from err notetype: Notetype = parse_notetype_dict(note.note_type()) # Get sort field content. See comment where we subscript in the same way in # `push_note()`. try: sortf_text: str = note[notetype.sortf.name] except KeyError as err: raise NoteFieldKeyError(str(err), nid) from err # TODO: Remove implicit assumption that all cards are in the same deck, and # work with cards instead of notes. try: deck = col.decks.name(note.cards()[0].did) except IndexError as err: logger.error(f"{note.cards() = }") logger.error(f"{note.guid = }") logger.error(f"{note.id = }") raise err colnote = ColNote( n=note, new=False, deck=deck, title="", markdown=False, notetype=notetype, sortf_text=sortf_text, ) return colnote
def get_colnote_as_str(colnote: ColNote) ‑> str
-
Return string representation of a
ColNote
.Expand source code
@beartype def get_colnote_as_str(colnote: ColNote) -> str: """Return string representation of a `ColNote`.""" lines = get_header_lines(colnote) for field_name, field_text in colnote.n.items(): lines.append("### " + field_name) lines.append(html_to_screen(field_text)) lines.append("") return "\n".join(lines)
def get_field_note_id(nid: int, fieldname: str) ‑> str
-
A str ID that uniquely identifies field-note pairs.
Expand source code
@beartype def get_field_note_id(nid: int, fieldname: str) -> str: """A str ID that uniquely identifies field-note pairs.""" return f"{nid}{F.slugify(fieldname)}"
def get_guid(fields: list[str]) ‑> str
-
Construct a new GUID for a note. Adapted from genanki's
guid_for()
.Expand source code
@beartype def get_guid(fields: List[str]) -> str: """Construct a new GUID for a note. Adapted from genanki's `guid_for()`.""" contents = "__".join(fields) # Get the first 8 bytes of the SHA256 of `contents` as an int. m = hashlib.sha256() m.update(contents.encode("utf-8")) hash_bytes = m.digest()[:8] hash_int = 0 for b in hash_bytes: hash_int <<= 8 hash_int += b # convert to the weird base91 format that Anki uses rv_reversed = [] while hash_int > 0: rv_reversed.append(BASE91_TABLE[hash_int % len(BASE91_TABLE)]) hash_int //= len(BASE91_TABLE) return "".join(reversed(rv_reversed))
def get_header_lines(colnote) ‑> list[str]
-
Get header of markdown representation of note.
Expand source code
@beartype def get_header_lines(colnote) -> List[str]: """Get header of markdown representation of note.""" lines = [ "# Note", "```", f"guid: {colnote.n.guid}", f"notetype: {colnote.notetype.name}", "```", "", "### Tags", "```", ] lines += colnote.n.tags lines += ["```", ""] return lines
def get_models_recursively(kirepo: KiRepo, silent: bool) ‑> dict[str, Notetype]
-
Find and merge all
models.json
files recursively.Should we check for duplicates?
Returns
Dict[int, Notetype]
- A dictionary sending model names to Notetypes.
Expand source code
@beartype def get_models_recursively(kirepo: KiRepo, silent: bool) -> Dict[str, Notetype]: """ Find and merge all `models.json` files recursively. Should we check for duplicates? Returns ------- Dict[int, Notetype] A dictionary sending model names to Notetypes. """ all_models: Dict[str, Notetype] = {} # Load notetypes from json files. bar = tqdm(F.rglob(kirepo.root, MODELS_FILE), ncols=TQDM_NUM_COLS, leave=not silent) bar.set_description("Models") for models_file in bar: with open(models_file, "r", encoding="UTF-8") as models_f: new_nts: Dict[int, Dict[str, Any]] = json.load(models_f) models: Dict[str, Notetype] = {} for _, nt in new_nts.items(): notetype: Notetype = parse_notetype_dict(nt) models[notetype.name] = notetype # Add mappings to dictionary. all_models.update(models) return all_models
def get_note_path(colnote: ColNote, deck_dir: ExtantDir, card_name: str = '') ‑> NoFile
-
Get note path from sort field text.
Expand source code
@beartype def get_note_path(colnote: ColNote, deck_dir: ExtantDir, card_name: str = "") -> NoFile: """Get note path from sort field text.""" field_text = colnote.sortf_text # Construct filename, stripping HTML tags and sanitizing (quickly). field_text = plain_to_html(field_text) field_text = re.sub("<[^<]+?>", "", field_text) # If the HTML stripping removed all text, we just slugify the raw sort # field text. if len(field_text) == 0: field_text = colnote.sortf_text name = field_text[:MAX_FILENAME_LEN] slug = F.slugify(name) # Make it so `slug` cannot possibly be an empty string, because then we get # a `Path('.')` which is a bug, and causes a runtime exception. If all # else fails, generate a random hex string to use as the filename. if len(slug) == 0: slug = str(colnote.n.guid) msg = f"Slug for '{colnote.n.guid}' is empty. Using guid as filename" logger.warning(msg) if card_name != "": slug = f"{slug}_{card_name}" filename: str = f"{slug}{MD}" note_path = F.test(deck_dir / filename, resolve=False) i = 1 while not isinstance(note_path, NoFile): filename = f"{slug}_{i}{MD}" note_path = F.test(deck_dir / filename, resolve=False) i += 1 return note_path
def get_note_payload(colnote: ColNote, tidy_field_files: dict[str, ExtantFile]) ‑> str
-
Return the markdown-converted contents of the Anki note represented by
colnote
as a string.Given a
ColNote
, which is a dataclass wrapper around aNote
object which has been loaded from the DB, and a mapping fromfid
s (unique identifiers of field-note pairs) to paths, we check for each field of each note whether that field'sfid
is contained intidy_field_files
. If so, that means that the caller dumped the contents of this field to a file (the file with this path, in fact) in order to autoformat the HTML source. If this field was tidied/autoformatted, we read from that path to get the tidied source, otherwise, we use the field content present in theColNote
.Expand source code
@beartype def get_note_payload(colnote: ColNote, tidy_field_files: Dict[str, ExtantFile]) -> str: """ Return the markdown-converted contents of the Anki note represented by `colnote` as a string. Given a `ColNote`, which is a dataclass wrapper around a `Note` object which has been loaded from the DB, and a mapping from `fid`s (unique identifiers of field-note pairs) to paths, we check for each field of each note whether that field's `fid` is contained in `tidy_field_files`. If so, that means that the caller dumped the contents of this field to a file (the file with this path, in fact) in order to autoformat the HTML source. If this field was tidied/autoformatted, we read from that path to get the tidied source, otherwise, we use the field content present in the `ColNote`. """ # Get tidied html if it exists. tidy_fields = {} for field_name, field_text in colnote.n.items(): fid = get_field_note_id(colnote.n.id, field_name) if fid in tidy_field_files: # HTML5-tidy adds a newline after `<br>` in indent mode, so we # remove these, because `html_to_screen()` converts `<br>` tags to # newlines anyway. tidied_field_text: str = tidy_field_files[fid].read_text(encoding="UTF-8") tidied_field_text = tidied_field_text.replace("<br>\n", "\n") tidied_field_text = tidied_field_text.replace("<br/>\n", "\n") tidied_field_text = tidied_field_text.replace("<br />\n", "\n") tidy_fields[field_name] = tidied_field_text else: tidy_fields[field_name] = field_text lines = get_header_lines(colnote) for field_name, field_text in tidy_fields.items(): lines.append("## " + field_name) screen_text = html_to_screen(field_text) text = colnote.n.col.media.escape_media_filenames(screen_text, unescape=True) lines.append(text) lines.append("") return "\n".join(lines)
def get_note_warnings(path: pathlib.Path, root: ExtantDir, ignore_files: set[str], ignore_dirs: set[str]) ‑> Optional[Warning]
-
Filter out paths in a git repository diff that do not correspond to Anki notes.
We could do this purely using calls to
is_anki_note()
, but these are expensive, so we try to find matches without opening any files first.Expand source code
@beartype def get_note_warnings( path: Path, root: ExtantDir, ignore_files: Set[str], ignore_dirs: Set[str] ) -> Optional[Warning]: """ Filter out paths in a git repository diff that do not correspond to Anki notes. We could do this purely using calls to `is_anki_note()`, but these are expensive, so we try to find matches without opening any files first. """ # If `path` is an exact match for one of the patterns in `patterns`, we # immediately return a warning. Since the contents of a git repository diff # are always going to be files, this alone will not correctly ignore # directory names given in `patterns`. if path.name in ignore_files | ignore_dirs: return UnPushedPathWarning(path, path.name) # If any of the patterns in `patterns` resolve to one of the parents of # `path`, return a warning, so that we are able to filter out entire # directories. components: Tuple[str, ...] = path.parts dirnames: Set[str] = set(components) & ignore_dirs for dirname in dirnames: return UnPushedPathWarning(path, dirname) # If `path` is an extant file (not a directory) and NOT a note, ignore it. abspath = (root / path).resolve() if abspath.exists() and abspath.is_file(): file = ExtantFile(abspath) if not is_anki_note(file): return NotAnkiNoteWarning(file) return None
def get_target(cwd: ExtantDir, col_file: ExtantFile, directory: str) ‑> tuple[EmptyDir, bool]
-
Create default target directory.
Expand source code
@beartype def get_target( cwd: ExtantDir, col_file: ExtantFile, directory: str ) -> Tuple[EmptyDir, bool]: """Create default target directory.""" path = F.test(Path(directory) if directory != "" else cwd / col_file.stem) new: bool = True if isinstance(path, NoPath): path.mkdir(parents=True) return M.emptydir(path), new if isinstance(path, EmptyDir): new = False return path, new raise TargetExistsError(path)
def git_pull(remote: str, branch: str, cwd: ExtantDir, unrelated: bool, theirs: bool, check: bool, silent: bool) ‑> None
-
Pull remote into branch using a subprocess call.
Expand source code
@beartype def git_pull( remote: str, branch: str, cwd: ExtantDir, unrelated: bool, theirs: bool, check: bool, silent: bool, ) -> None: """Pull remote into branch using a subprocess call.""" with F.halo(f"Pulling into '{cwd}'..."): args = ["git", "pull", "-v"] if unrelated: args += ["--allow-unrelated-histories"] if theirs: args += ["--strategy-option=theirs"] args += ["--verbose"] args += [remote, branch] p = subprocess.run(args, check=False, cwd=cwd, capture_output=True) echo(f"{p.stdout.decode()}", silent=silent) echo(f"{p.stderr.decode()}", silent=silent) if check and p.returncode != 0: click.secho(f"Error while pulling into '{cwd}'", fg="red") raise RuntimeError(f"Git failed with return code '{p.returncode}'.") echo(f"Pulling into '{cwd}'... done.", silent=silent)
def html_to_screen(html: str) ‑> str
-
Convert html for a single field into plaintext, to be displayed within a markdown file.
Does very litle (just converts HTML-escaped special characters like
<br>
tags or
s to their UTF-8 equivalents) in the case where the sentinel string does not appear. This string,GENERATED_HTML_SENTINEL
, indicates that a note was first created in a ki markdown file, and therefore its HTML structure can be safely stripped out within calls towrite_decks()
.Expand source code
@beartype def html_to_screen(html: str) -> str: """ Convert html for a *single field* into plaintext, to be displayed within a markdown file. Does very litle (just converts HTML-escaped special characters like `<br>` tags or ` `s to their UTF-8 equivalents) in the case where the sentinel string does not appear. This string, `GENERATED_HTML_SENTINEL`, indicates that a note was first created in a ki markdown file, and therefore its HTML structure can be safely stripped out within calls to `write_decks()`. """ html = re.sub(r"\<style\>.*\<\/style\>", "", html, flags=re.S) plain = html # For convenience: Un-escape some common LaTeX constructs. plain = plain.replace(r"\\\\", r"\\") plain = plain.replace(r"\\{", r"\{") plain = plain.replace(r"\\}", r"\}") plain = plain.replace(r"\*}", r"*}") plain = plain.replace(r"<", "<") plain = plain.replace(r">", ">") plain = plain.replace(r"&", "&") plain = plain.replace(r" ", " ") plain = plain.replace("<br>", "\n") plain = plain.replace("<br/>", "\n") plain = plain.replace("<br />", "\n") # Unbreak lines within src attributes. plain = re.sub('src= ?\n"', 'src="', plain) plain = re.sub(r"\<b\>\s*\<\/b\>", "", plain) return plain.strip()
def is_anki_note(path: ExtantFile) ‑> bool
-
Check if file is a
ki
-style markdown note.Expand source code
@beartype def is_anki_note(path: ExtantFile) -> bool: """Check if file is a `ki`-style markdown note.""" # Ought to have markdown file extension. if path.suffix != ".md": return False with open(path, "r", encoding="UTF-8") as md_f: lines = md_f.readlines() if len(lines) < 8: return False if lines[0] != "# Note\n": return False if lines[1] != "```\n": return False if not re.match(r"^guid: ", lines[2]): return False return True
def lock(col_file: ExtantFile) ‑> sqlite3.Connection
-
Check that lock can be acquired on a SQLite3 database given a path.
Expand source code
@beartype def lock(col_file: ExtantFile) -> sqlite3.Connection: """Check that lock can be acquired on a SQLite3 database given a path.""" try: con = sqlite3.connect(col_file, timeout=0.1) con.isolation_level = "EXCLUSIVE" con.execute("BEGIN EXCLUSIVE") except sqlite3.DatabaseError as err: raise SQLiteLockError(col_file, err) from err if sys.platform == "win32": con.commit() con.close() return con
def media_data(col: anki.collection.Collection, fname: str) ‑> bytes
-
Get media file content as bytes (empty if missing).
Expand source code
@beartype def media_data(col: Collection, fname: str) -> bytes: """Get media file content as bytes (empty if missing).""" if not col.media.have(fname): return b"" path = os.path.join(col.media.dir(), fname) try: with open(path, "rb") as f: return f.read() except OSError: return b""
def parse_markdown_note(parser: lark.lark.Lark, transformer: NoteTransformer, delta: Delta) ‑> DeckNote
-
Parse with lark.
Expand source code
@beartype def parse_markdown_note( parser: Lark, transformer: NoteTransformer, delta: Delta ) -> DeckNote: """Parse with lark.""" tree = parser.parse(delta.path.read_text(encoding="UTF-8")) flatnote: FlatNote = transformer.transform(tree) parts: Tuple[str, ...] = delta.relpath.parent.parts deck: str = "::".join(parts) # Generate a GUID from the hash of the field contents if the `guid` field # in the note file was left blank. guid = flatnote.guid if flatnote.guid != "" else get_guid(flatnote.fields) return DeckNote( title=flatnote.title, guid=guid, deck=deck, model=flatnote.model, tags=flatnote.tags, fields=flatnote.fields, )
def parse_notetype_dict(nt: dict[str, typing.Any]) ‑> Notetype
-
Convert an Anki NotetypeDict into a Notetype dataclass.
Anki returns objects of type
NotetypeDict
(see pylib/anki/models.py) when you call a method likecol.models.all()
. This is a dictionary mapping strings to various stuff, and we read all its data into a python dataclass here so that we can access it safely. Since we don't expect Anki to ever give us 'invalid' notetypes (since we define 'valid' as being processable by Anki), we return an exception if the parse fails.Note on naming convention: Below, abbreviated variable names represent dicts coming from Anki, like
nt: NotetypeDict
orfld: FieldDict
. Full words likefield: Field
represent ki dataclasses. The parameters of the dataclasses, however, use abbreviations for consistency with Anki map keys.Expand source code
@beartype def parse_notetype_dict(nt: Dict[str, Any]) -> Notetype: """ Convert an Anki NotetypeDict into a Notetype dataclass. Anki returns objects of type `NotetypeDict` (see pylib/anki/models.py) when you call a method like `col.models.all()`. This is a dictionary mapping strings to various stuff, and we read all its data into a python dataclass here so that we can access it safely. Since we don't expect Anki to ever give us 'invalid' notetypes (since we define 'valid' as being processable by Anki), we return an exception if the parse fails. Note on naming convention: Below, abbreviated variable names represent dicts coming from Anki, like `nt: NotetypeDict` or `fld: FieldDict`. Full words like `field: Field` represent ki dataclasses. The parameters of the dataclasses, however, use abbreviations for consistency with Anki map keys. """ # If we can't even read the name of the notetype, then we can't print out a # nice error message in the event of a `KeyError`. So we have to print out # a different error message saying that the notetype doesn't have a name # field. try: nt["name"] except KeyError as err: raise UnnamedNotetypeError(nt) from err try: fields: Dict[int, Field] = {} for fld in nt["flds"]: ordinal = fld["ord"] fields[ordinal] = Field(name=fld["name"], ord=ordinal) templates: List[Template] = [] for tmpl in nt["tmpls"]: templates.append( Template( name=tmpl["name"], qfmt=tmpl["qfmt"], afmt=tmpl["afmt"], ord=tmpl["ord"], ) ) # Guarantee that 'sortf' exists in `notetype.flds`. sort_ordinal: int = nt["sortf"] if sort_ordinal not in fields: # If we get a KeyError here, it will be caught. raise MissingFieldOrdinalError(sort_ordinal, nt["name"]) notetype = Notetype( id=nt["id"], name=nt["name"], type=nt["type"], flds=list(fields.values()), tmpls=templates, sortf=fields[sort_ordinal], dict=nt, ) except KeyError as err: key = str(err) raise NotetypeKeyError(key, str(nt["name"])) from err return notetype
def plain_to_html(plain: str) ‑> str
-
Convert plain text to html
Expand source code
@beartype def plain_to_html(plain: str) -> str: """Convert plain text to html""" # Minor clean up plain = plain.replace(r"<", "<") plain = plain.replace(r">", ">") plain = plain.replace(r"&", "&") plain = plain.replace(r" ", " ") plain = re.sub(r"\<b\>\s*\<\/b\>", "", plain) plain = re.sub(r"\<i\>\s*\<\/i\>", "", plain) plain = re.sub(r"\<div\>\s*\<\/div\>", "", plain) # Convert newlines to `<br>` tags. if not re.search(HTML_REGEX, plain): plain = plain.replace("\n", "<br>") return plain.strip()
def push_deltas(deltas: list[typing.Union[Delta, Warning]], models: dict[str, Notetype], kirepo: KiRepo, md5sum: str, parser: lark.lark.Lark, transformer: NoteTransformer, head_kirepo: KiRepo, con: sqlite3.Connection, verbose: bool) ‑> PushResult
-
Push a list of
Delta
s to an Anki collection.Expand source code
@beartype def push_deltas( deltas: List[Union[Delta, Warning]], models: Dict[str, Notetype], kirepo: KiRepo, md5sum: str, parser: Lark, transformer: NoteTransformer, head_kirepo: KiRepo, con: sqlite3.Connection, verbose: bool, ) -> PushResult: """Push a list of `Delta`s to an Anki collection.""" warnings: List[Warning] = [delta for delta in deltas if isinstance(delta, Warning)] deltas: List[Delta] = [delta for delta in deltas if isinstance(delta, Delta)] # Display warnings from diff procedure. for warning in warnings: if verbose or type(warning) not in WARNING_IGNORE_LIST: click.secho(str(warning), fg="yellow") warnings = [] # If there are no changes, quit. if len(set(deltas)) == 0: echo("ki push: up to date.") return PushResult.UP_TO_DATE echo(f"Pushing to '{kirepo.col_file}'") echo(f"Computed md5sum: {md5sum}") echo(f"Verified md5sum matches latest hash in '{kirepo.hashes_file}'") # Copy collection to a temp directory. temp_col_dir: ExtantDir = F.mkdtemp() new_col_file = temp_col_dir / kirepo.col_file.name col_name: str = kirepo.col_file.name new_col_file: ExtantFile = F.copyfile(kirepo.col_file, temp_col_dir, col_name) head: RepoRef = M.head_repo_ref(kirepo.repo) echo(f"Generating local .anki2 file from latest commit: {head.sha}") echo(f"Writing changes to '{new_col_file}'...") # Open collection, holding cwd constant (otherwise Anki changes it). cwd: ExtantDir = F.cwd() col: Collection = M.collection(new_col_file) F.chdir(cwd) # Add new models to the collection. add_models(col, models) # Stash both unstaged and staged files (including untracked). kirepo.repo.git.stash(include_untracked=True, keep_index=True) kirepo.repo.git.reset("HEAD", hard=True) # Display table of note change type counts. echo_note_change_types(deltas) # Construct a map from guid -> (nid, mod, mid), adapted from # `Anki2Importer._import_notes()`. Note that `mod` is the modification # timestamp, in epoch seconds (timestamp of when the note was last # modified). guids: Dict[str, NoteMetadata] = {} for nid, guid, mod, mid in col.db.execute("select id, guid, mod, mid from notes"): guids[guid] = NoteMetadata(nid, mod, mid) timestamp_ns: int = time.time_ns() new_nids: Iterator[int] = itertools.count(int(timestamp_ns / 1e6)) is_delete = lambda d: d.status == GitChangeType.DELETED bar = tqdm(deltas, ncols=TQDM_NUM_COLS) bar.set_description("Deltas") for delta in bar: # Parse the file at `delta.path` into a `DeckNote`, and # add/edit/delete in collection. decknote = parse_markdown_note(parser, transformer, delta) if is_delete(delta): if decknote.guid in guids: col.remove_notes([guids[decknote.guid].nid]) continue # TODO: If relevant prefix of sort field has changed, we should # regenerate the file. Recall that the sort field is used to determine # the filename. If the content of the sort field has changed, then we # may need to update the filename. warnings += push_note(col, decknote, timestamp_ns, guids, new_nids) num_displayed: int = 0 for warning in warnings: if verbose or type(warning) not in WARNING_IGNORE_LIST: click.secho(str(warning), fg="yellow") num_displayed += 1 num_suppressed: int = len(warnings) - num_displayed echo(f"Warnings suppressed: {num_suppressed} (show with '--verbose')") # It is always safe to save changes to the DB, since the DB is a copy. col.close(save=True) # Backup collection file and overwrite collection. backup(kirepo) new_col_file = F.copyfile(new_col_file, F.parent(kirepo.col_file), col_name) echo(f"Overwrote '{kirepo.col_file}'") # Add media files to collection. col: Collection = M.collection(kirepo.col_file) media_files = F.rglob(head_kirepo.root, MEDIA_FILE_RECURSIVE_PATTERN) warnings = [] bar = tqdm(media_files, ncols=TQDM_NUM_COLS, disable=False) bar.set_description("Media") for media_file in bar: # Check file mode, and follow symlink if applicable. mode: int = filemode(media_file) if mode == 120000: target: str = media_file.read_text(encoding="UTF-8") parent: ExtantDir = F.parent(media_file) media_file: ExtantFile = M.xfile(parent / target) # Get bytes of new media file. with open(media_file, "rb") as f: new: bytes = f.read() # Get bytes of existing media file (if it exists). old: bytes = media_data(col, media_file.name) if old and old == new: continue # Add (and possibly rename) media paths. new_media_filename: str = col.media.add_file(media_file) if new_media_filename != media_file.name: warning = RenamedMediaFileWarning(media_file.name, new_media_filename) warnings.append(warning) col.close(save=True) num_displayed: int = 0 for warning in warnings: if verbose or type(warning) not in WARNING_IGNORE_LIST: click.secho(str(warning), fg="yellow") num_displayed += 1 num_suppressed: int = len(warnings) - num_displayed echo(f"Warnings suppressed: {num_suppressed} (show with '--verbose')") # Append to hashes file. new_md5sum = F.md5(new_col_file) append_md5sum(kirepo.ki_dir, new_col_file.name, new_md5sum, silent=False) # Update the commit SHA of most recent successful PUSH. head: RepoRef = M.head_repo_ref(kirepo.repo) kirepo.last_push_file.write_text(head.sha) # Unlock Anki SQLite DB. unlock(con) return PushResult.NONTRIVIAL
def push_note(col: anki.collection.Collection, decknote: DeckNote, timestamp_ns: int, guids: dict[str, NoteMetadata], new_nids: collections.abc.Iterator[int]) ‑> list[Warning]
-
Update the Anki
Note
object incol
corresponding todecknote
, creating it if it does not already exist.Raises
MissingNotetypeError
- If we can't find a notetype with the name provided in
decknote
.
Expand source code
@beartype def push_note( col: Collection, decknote: DeckNote, timestamp_ns: int, guids: Dict[str, NoteMetadata], new_nids: Iterator[int], ) -> List[Warning]: """ Update the Anki `Note` object in `col` corresponding to `decknote`, creating it if it does not already exist. Raises ------ MissingNotetypeError If we can't find a notetype with the name provided in `decknote`. """ # Notetype/model names are privileged in Anki, so if we don't find the # right name, we raise an error. model_id: Optional[int] = col.models.id_for_name(decknote.model) if model_id is None: raise MissingNotetypeError(decknote.model) if decknote.guid in guids: nid: int = guids[decknote.guid].nid note: Note = col.get_note(nid) else: nid: int = next(new_nids) note: Note = add_db_note( col, nid, decknote.guid, model_id, mod=int(timestamp_ns // 1e9), usn=-1, tags=decknote.tags, fields=list(decknote.fields.values()), sfld="", csum=0, flags=0, data="", ) # If we are updating an existing note, we need to know the old and new # notetypes, and then update the notetype (and the rest of the note data) # accordingly. old_notetype: Notetype = parse_notetype_dict(note.note_type()) new_notetype: Notetype = parse_notetype_dict(col.models.get(model_id)) warnings = update_note(note, decknote, old_notetype, new_notetype) return warnings
def tidy_html_recursively(root: ExtantDir, silent: bool) ‑> None
-
Call html5-tidy on each file in
root
, editing in-place.Expand source code
@beartype def tidy_html_recursively(root: ExtantDir, silent: bool) -> None: """Call html5-tidy on each file in `root`, editing in-place.""" # Spin up subprocesses for tidying field HTML in-place. batches: List[List[ExtantFile]] = list( F.get_batches(F.rglob(root, "*"), BATCH_SIZE) ) bar = tqdm(batches, ncols=TQDM_NUM_COLS, leave=not silent) bar.set_description("HTML") for batch in bar: # TODO: Should we fail silently here, so as to not bother user with # tidy warnings? command = [ "tidy", "-q", "-m", "-i", "-omit", "-utf8", "--tidy-mark", "no", "--show-body-only", "yes", "--wrap", "68", "--wrap-attributes", "yes", ] command += batch try: subprocess.run(command, check=False, capture_output=True) except FileNotFoundError as err: raise MissingTidyExecutableError(err) from err
def unlock(con: sqlite3.Connection) ‑> None
-
Unlock a SQLite3 database.
Expand source code
@beartype def unlock(con: sqlite3.Connection) -> None: """Unlock a SQLite3 database.""" if sys.platform == "win32": return con.commit() con.close()
def unsubmodule_repo(repo: git.repo.base.Repo) ‑> git.repo.base.Repo
-
Un-submodule all the git submodules (convert them to ordinary subdirectories and destroy their commit history). Commit the changes to the main repository.
MUTATES REPO in-place!
UNSAFE: git.rm() calls.
Expand source code
@beartype def unsubmodule_repo(repo: git.Repo) -> git.Repo: """ Un-submodule all the git submodules (convert them to ordinary subdirectories and destroy their commit history). Commit the changes to the main repository. MUTATES REPO in-place! UNSAFE: git.rm() calls. """ gitmodules_path: Path = Path(repo.working_dir) / GITMODULES_FILE for sm in repo.submodules: # The submodule path is guaranteed to exist by gitpython. sm_path = Path(sm.module().working_tree_dir) repo.git.rm(sm_path, cached=True) # Annihilate `.gitmodules` file. if gitmodules_path.is_file: repo.git.rm(gitmodules_path, ignore_unmatch=True) sm_git_path = F.test(sm_path / GIT) if isinstance(sm_git_path, ExtantDir): F.rmtree(sm_git_path) else: (sm_path / GIT).unlink(missing_ok=True) # Directory should still exist after `git.rm()`. repo.git.add(sm_path) _ = repo.index.commit(f"Add submodule `{sm.name}` as ordinary directory.") if gitmodules_path.exists(): repo.git.rm(gitmodules_path) _ = repo.index.commit("Remove `.gitmodules` file.") return repo
def update_note(note: anki.notes.Note, decknote: DeckNote, old_notetype: Notetype, new_notetype: Notetype) ‑> list[Warning]
-
Change all the data of
note
to that given indecknote
.This is only to be called on notes whose nid already exists in the database. Creates a new deck if
decknote.deck
doesn't exist. Assumes that the model has already been added to the collection, and raises an exception if it finds otherwise. Changes notetype to that specified bydecknote.model
. Overwrites all fields withdecknote.fields
.Updates: - tags - deck - model - fields
Expand source code
@beartype def update_note( note: Note, decknote: DeckNote, old_notetype: Notetype, new_notetype: Notetype ) -> List[Warning]: """ Change all the data of `note` to that given in `decknote`. This is only to be called on notes whose nid already exists in the database. Creates a new deck if `decknote.deck` doesn't exist. Assumes that the model has already been added to the collection, and raises an exception if it finds otherwise. Changes notetype to that specified by `decknote.model`. Overwrites all fields with `decknote.fields`. Updates: - tags - deck - model - fields """ # Check that the passed argument `new_notetype` has a name consistent with # the model specified in `decknote`. The former should be derived from the # latter, and if they don't match, there is a bug in the caller. if decknote.model != new_notetype.name: raise NotetypeMismatchError(decknote, new_notetype) note.tags = decknote.tags note.flush() # Set the deck of the given note, and create a deck with this name if it # doesn't already exist. See the comments/docstrings in the implementation # of the `anki.decks.DeckManager.id()` method. newdid: int = note.col.decks.id(decknote.deck, create=True) cids = [c.id for c in note.cards()] # Set deck for all cards of this note. if cids: note.col.set_deck(cids, newdid) # Change notetype of note. fmap: Dict[str, None] = {} for field in old_notetype.flds: fmap[field.ord] = None # Change notetype (also clears all fields). if old_notetype.id != new_notetype.id: note.col.models.change( old_notetype.dict, [note.id], new_notetype.dict, fmap, None ) note.load() # Validate field keys against notetype. warnings: List[Warning] = validate_decknote_fields(new_notetype, decknote) if len(warnings) > 0: return warnings # Set field values. This is correct because every field name that appears # in `new_notetype` is contained in `decknote.fields`, or else we would # have printed a warning and returned above. for key, field in decknote.fields.items(): if key not in note: warnings.append(NoteFieldValidationWarning(note.id, key, new_notetype)) continue try: note[key] = plain_to_html(field) except IndexError as err: raise AnkiDBNoteMissingFieldsError(decknote, note.id, key) from err # Flush fields to collection object. note.flush() # Remove if unhealthy. health = display_fields_health_warning(note) if health != 0: note.col.remove_notes([note.id]) warnings.append(UnhealthyNoteWarning(note.id, health)) return warnings
def validate_decknote_fields(notetype: Notetype, decknote: DeckNote) ‑> list[Warning]
-
Validate that the fields given in the note match the notetype.
Expand source code
@beartype def validate_decknote_fields(notetype: Notetype, decknote: DeckNote) -> List[Warning]: """Validate that the fields given in the note match the notetype.""" warnings: List[Warning] = [] names: List[str] = [field.name for field in notetype.flds] # TODO: It might also be nice to print the path of the note in the # repository. This would have to be added to the `DeckNote` spec. if len(decknote.fields.keys()) != len(names): warnings.append(WrongFieldCountWarning(decknote, names)) for x, y in zip(names, decknote.fields.keys()): if x != y: warnings.append(InconsistentFieldNamesWarning(x, y, decknote)) return warnings
def warn(string: str) ‑> None
-
Call
click.secho()
with formatting (yellow).Expand source code
@beartype def warn(string: str) -> None: """Call `click.secho()` with formatting (yellow).""" click.secho(f"WARNING: {string}", bold=True, fg="yellow")
def write_deck_node_cards(node: anki.decks_pb2.DeckTreeNode, col: anki.collection.Collection, targetdir: ExtantDir, colnotes: dict[int, ColNote], tidy_field_files: dict[str, ExtantFile], models_map: dict[int, dict[str, typing.Any]], written_cids: set[int], written_notes: dict[int, WrittenNoteFile]) ‑> tuple[set[int], dict[int, WrittenNoteFile], set[LatentSymlink]]
-
Write all the cards to disk for a single
DeckTreeNode
.Expand source code
@beartype def write_deck_node_cards( node: DeckTreeNode, col: Collection, targetdir: ExtantDir, colnotes: Dict[int, ColNote], tidy_field_files: Dict[str, ExtantFile], models_map: Dict[int, NotetypeDict], written_cids: Set[int], written_notes: Dict[int, WrittenNoteFile], ) -> Tuple[Set[int], Dict[int, WrittenNoteFile], Set[LatentSymlink]]: """Write all the cards to disk for a single `DeckTreeNode`.""" written_cids = copy.deepcopy(written_cids) written_notes = copy.deepcopy(written_notes) latent_links: Set[LatentSymlink] = set() # The name stored in a `DeckTreeNode` object is not the full name of # the deck, it is just the 'basename'. The `postorder()` function # returns a deck with `did == 0` at the end of each call, probably # because this is the implicit parent deck of all top-level decks. This # deck empirically always has the empty string as its name. This is # likely an Anki implementation detail. As a result, we ignore any deck # with empty basename. We do this as opposed to ignoring decks with # `did == 0` because it seems less likely to change if the Anki devs # decide to mess with the implementation. Empty deck names will always # be reserved, but they might e.g. decide ``did == -1`` makes more # sense. did: int = node.deck_id basename: str = node.name if basename == "": return set(), {}, set() name = col.decks.name(did) deck_dir: ExtantDir = create_deck_dir(name, targetdir) children: Set[CardId] = set(col.decks.cids(did=did, children=False)) descendants: List[CardId] = col.decks.cids(did=did, children=True) descendant_nids: Set[int] = {NOTETYPE_NID} descendant_mids: Set[int] = set() for cid in descendants: card: Card = col.get_card(cid) descendant_nids.add(card.nid) descendant_mids.add(card.note().mid) # Card writes should not be cumulative, so we only perform them if # `cid` is the card id of a card that is in the deck corresponding # to `node`, but not in any of its children. if cid not in children: continue # We only even consider writing *anything* (file or symlink) to # disk after checking that we haven't already processed this # particular card id. If we have, then we only need to bother with # the cumulative stuff above (nids and mids, which we keep track of # in order to write out models and media). # # TODO: This fixes a very serious bug. Write a test to capture the # issue. Use the Japanese Core 2000 deck as a template if needed. if cid in written_cids: continue written_cids.add(cid) # We only write the payload if we haven't seen this note before. # Otherwise, this must be a different card generated from the same # note, so we symlink to the location where we've already written # it to disk. colnote: ColNote = colnotes[card.nid] if card.nid not in written_notes: note_path: NoFile = get_note_path(colnote, deck_dir) payload: str = get_note_payload(colnote, tidy_field_files) note_path: ExtantFile = F.write(note_path, payload) written_notes[card.nid] = WrittenNoteFile(did, note_path) else: # If `card` is in the same deck as the card we wrote `written` # for, then there is no need to create a symlink, because the # note file is already there, in the correct deck directory. # # TODO: This fixes a very serious bug. Write a test to capture the # issue. Use the Japanese Core 2000 deck as a template if needed. written: WrittenNoteFile = written_notes[card.nid] if card.did == written.did: continue # Get card template name. template: TemplateDict = card.template() name: str = template["name"] note_path: NoFile = get_note_path(colnote, deck_dir, name) abs_target: ExtantFile = written_notes[card.nid].file distance = len(note_path.parent.relative_to(targetdir).parts) up_path = Path("../" * distance) relative: Path = abs_target.relative_to(targetdir) target: Path = up_path / relative try: link: Union[Symlink, LatentSymlink] = F.symlink(note_path, target) if isinstance(link, LatentSymlink): latent_links.add(link) except OSError as _: trace = traceback.format_exc(limit=3) logger.warning(f"Failed to create symlink for cid '{cid}'\n{trace}") # TODO: Should this block be outside this function? # Write `models.json` for current deck. deck_models_map = {mid: models_map[mid] for mid in descendant_mids} with open(deck_dir / MODELS_FILE, "w", encoding="UTF-8") as f: json.dump(deck_models_map, f, ensure_ascii=False, indent=4, sort_keys=True) return written_cids, written_notes, latent_links
def write_decks(col: anki.collection.Collection, targetdir: ExtantDir, colnotes: dict[int, ColNote], media: dict[int, set[ExtantFile]], tidy_field_files: dict[str, ExtantFile], silent: bool) ‑> set[LatentSymlink]
-
The proper way to do this is a DFS traversal, perhaps recursively, which will make it easier to keep things purely functional, accumulating the model ids of the children in each node. For this, we must construct a tree from the deck names.
Expand source code
@beartype def write_decks( col: Collection, targetdir: ExtantDir, colnotes: Dict[int, ColNote], media: Dict[int, Set[ExtantFile]], tidy_field_files: Dict[str, ExtantFile], silent: bool, ) -> Set[LatentSymlink]: """ The proper way to do this is a DFS traversal, perhaps recursively, which will make it easier to keep things purely functional, accumulating the model ids of the children in each node. For this, we must construct a tree from the deck names. """ # Accumulate pairs of model ids and notetype maps. The return type of the # `ModelManager.get()` call below indicates that it may return `None`, # but we know it will not because we are getting the notetype id straight # from the Anki DB. models_map: Dict[int, NotetypeDict] = {} for nt_name_id in col.models.all_names_and_ids(): models_map[nt_name_id.id] = col.models.get(nt_name_id.id) # Dump the models file for the whole repository. with open(targetdir / MODELS_FILE, "w", encoding="UTF-8") as f: json.dump(models_map, f, ensure_ascii=False, indent=4, sort_keys=True) # Implement new `ColNote`-writing procedure, using `DeckTreeNode`s. # # It must do the following for each deck: # - create the deck directory # - write the models.json file # - create and populate the media directory # - write the note payload for each note in the correct deck, exactly once # # In other words, for each deck, we need to write all of its: # - models # - media # - notes # # The first two are cumulative: we want the models and media of subdecks to # be included in their ancestors. The notes, however, should not be # cumulative. Indeed, we want each note to appear exactly once in the # entire repository, making allowances for the case where a single note's # cards are spread across multiple decks, in which case we must create a # symlink. # # And actually, both of these cases are nicely taken care of for us by the # `DeckManager.cids()` function, which has a `children: bool` parameter # which toggles whether or not to get the card ids of subdecks or not. root: DeckTreeNode = col.decks.deck_tree() @beartype def postorder(node: DeckTreeNode) -> List[DeckTreeNode]: """ Post-order traversal. Guarantees that we won't process a node until we've processed all its children. """ traversal: List[DeckTreeNode] = [] for child in node.children: traversal += postorder(child) traversal += [node] return traversal # All card ids we've already processed. written_cids: Set[int] = set() # Map nids we've already written files for to a dataclass containing: # - the corresponding `ExtantFile` # - the deck id of the card for which we wrote it # # This deck id identifies the deck corresponding to the location where all # the symlinks should point. written_notes: Dict[int, WrittenNoteFile] = {} # All latent symlinks created on Windows whose file modes we must set. latent_links: Set[LatentSymlink] = set() nodes: List[DeckTreeNode] = postorder(root) nodes_bar = tqdm(nodes, ncols=TQDM_NUM_COLS, leave=not silent) nodes_bar.set_description("Decks") for node in nodes_bar: node_cids: Set[int] node_notes: Dict[int, WrittenNoteFile] node_latent_links: Set[LatentSymlink] node_cids, node_notes, node_latent_links = write_deck_node_cards( node, col, targetdir, colnotes, tidy_field_files, models_map, written_cids, written_notes, ) written_cids |= node_cids written_notes.update(node_notes) latent_links |= node_latent_links media_links: Set[LatentSymlink] = chain_media_symlinks(root, col, targetdir, media) latent_links |= media_links return latent_links
def write_repository(col_file: ExtantFile, targetdir: ExtantDir, leaves: Leaves, media_target_dir: EmptyDir, silent: bool, verbose: bool) ‑> set[LatentSymlink]
-
Write notes to appropriate directories in
targetdir
.Expand source code
@beartype def write_repository( col_file: ExtantFile, targetdir: ExtantDir, leaves: Leaves, media_target_dir: EmptyDir, silent: bool, verbose: bool, ) -> Set[LatentSymlink]: """Write notes to appropriate directories in `targetdir`.""" # Create config file. config_file: ExtantFile = leaves.files[CONFIG_FILE] config = configparser.ConfigParser() config["remote"] = {"path": col_file} with open(config_file, "w", encoding="UTF-8") as config_f: config.write(config_f) # Create temp directory for htmlfield text files. tempdir: EmptyDir = F.mkdtemp() root: EmptyDir = F.mksubdir(tempdir, FIELD_HTML_SUFFIX) tidy_field_files: Dict[str, ExtantFile] = {} decks: Dict[str, List[ColNote]] = {} # Open collection using a `Maybe`. cwd: ExtantDir = F.cwd() col: Collection = M.collection(col_file) F.chdir(cwd) # ColNote-containing data structure, to be passed to `write_decks()`. colnotes: Dict[int, ColNote] = {} # Query all note ids, get the deck from each note, and construct a map # sending deck names to lists of notes. all_nids = list(col.find_notes(query="")) bar = tqdm(all_nids, ncols=TQDM_NUM_COLS, leave=not silent) bar.set_description("Notes") for nid in bar: colnote: ColNote = get_colnote(col, nid) colnotes[nid] = colnote decks[colnote.deck] = decks.get(colnote.deck, []) + [colnote] for field_name, field_text in colnote.n.items(): field_text: str = html_to_screen(field_text) if re.search(HTML_REGEX, field_text): fid: str = get_field_note_id(nid, field_name) html_file: NoFile = F.test(root / fid) tidy_field_files[fid] = F.write(html_file, field_text) tidy_html_recursively(root, silent) media: Dict[int, Set[ExtantFile]] media, warnings = copy_media_files(col, media_target_dir, silent=silent) latent_links: Set[LatentSymlink] = write_decks( col, targetdir, colnotes, media, tidy_field_files, silent, ) num_displayed: int = 0 for warning in warnings: if verbose or type(warning) not in WARNING_IGNORE_LIST: click.secho(str(warning), fg="yellow") num_displayed += 1 num_suppressed: int = len(warnings) - num_displayed echo(f"Warnings suppressed: {num_suppressed} (show with '--verbose')") F.rmtree(root) col.close(save=False) return latent_links