Package ki
Ki is a command-line interface for the version control and editing of .anki2
collections as git repositories of markdown files.
Rather than providing an
interactive UI like the Anki desktop client, ki aims to allow natural editing
in the filesystem.
In general, the purpose of ki is to allow users to work on large, complex Anki decks in exactly the same way they work on large, complex software projects.
Ki provides command-line functions to:
- clone a
.anki2
collection into a directory as a git repository. - pull changes from the Anki desktop client (and AnkiWeb) into an existing repository.
- push changes (safely!) back to Anki.
This is documentation for the ki repository.
Installation
Ki is tested on Python 3.9 and 3.10. You'll need to install Python and Git, and then run the following command in a terminal:
- Install the
ki
package:
pip install git+https://github.com/langfield/ki.git@main
Getting started
This section will walk through the following example workflow:
- Cloning an existing collection into a
ki
repository. - Editing the note files in the repository.
- Pushing those edits back to Anki.
- Pulling changes made in Anki into the repository.
Before cloning, we'll need to find our .anki2
collection file.
This is where Anki stores the data for all our notes.
Note. If you're new to Anki, or are unfamiliar with the terms collection, profile, note, or card, you may wish to take a look at the Anki documentation.
If you already know the path to the .anki2
collection file you want to clone,
skip to the section on running the clone command.
Finding the .anki2
collection file
To find our collection file, we must first find our Anki data directory. The location of this varies by operating system.
In most cases, you should be able to find your data directory at the path given below for your respective OS:
MacOS
~/Library/Application Support/Anki2
Windows
%APPDATA%\Anki2
GNU/Linux
~/.local/share/Anki2
Note. You can read more about the default Anki data directory locations here.
If you are running Anki 2.1 (which you should be, because ki
is not tested
with lower versions), opening this directory will reveal several files and
subdirectories. The following example output is from a machine running Debian
GNU/Linux:
user@host:~/.local/share/Anki2$ ls
addons21 crash.log prefs21.db README.txt 'User 1'
In particular, there is a subdirectory for each profile. In the above
example, there is only one profile, User 1
. But, in general, there may be
many profiles associated with a given Anki installation.
Multiple profiles
Below we can see a visual representation of the directory structure of an
Anki data directory with two profiles, User 1
, and User 2
:
Anki2/
├── addons21
│ ├── 1046608507
│ ├── 109531687
│ ├── 1097423555
│ └── 1972239816
├── crash.log
├── prefs21.db
├── README.txt
├── User 1
│ ├── backups
│ ├── collection2.log
│ ├── collection.anki2
│ ├── collection.log
│ ├── collection.media
│ ├── collection.media.db2
│ └── deleted.txt
└── User 2
├── collection.anki2
├── collection.anki2-wal
└── collection.media
Note that there is a collection.anki2
file in each profile subdirectory.
If you're not sure of the name of your user profile, it can be seen in the title bar of the Anki desktop client:
Most Anki installations will only have one profile, and if you haven't changed
the default profile name, it will probably be called User 1
. Let's enter the
profile directory for User 1
and list its contents:
user@host:~/.local/share/Anki2$ cd User\ 1/
user@host:~/.local/share/Anki2/User 1$ ls
backups collection2.log collection.anki2 collection.log collection.media collection.media.db2 deleted.txt
So if we want to clone User 1
's collection, the path that we want is:
~/.local/share/Anki2/User\ 1/collection.anki2
We'll pass this as a command-line argument to the ki
executable in the next
section.
Running the clone command
Now we're ready to actually clone the collection into a repository. The ki clone
command works similarly to git clone
, in that it will create a new directory
for the repository within the current working directory. So if we want to
clone our collection into a new subdirectory in ~/
(the home directory on
macOS and GNU/Linux), we would first make sure we're in the home directory.
Second, we need to check that Anki is closed before cloning. Nothing bad
will happen if we clone while Anki is open, but the command will fail because
the database is locked. Once we've done that, we can run the command:
ki clone ~/.local/share/Anki2/User 1/collection.anki2
And we should see output that looks similar to this:
lyra@oxford$ ki clone ~/.local/share/Anki2/User 1/collection.anki2
Found .anki2 file at '/home/lyra/.local/share/Anki2/User 1/collection.anki2'
Computed md5sum: ad7ea6d486a327042cf0b09b54626b66
Wrote md5sum to '/home/lyra/collection/.ki/hashes'
Cloning into '/home/lyra/collection/'...
100%|█████████████████████████| 28886/28886 [00:10<00:00, 2883.78it/s]
If we list the contents of the home directory, we can see that ki
did
indeed create a new directory called collection
:
lyra@oxford:~$ ls
collection pkgs
Editing notes
Now that we've successfully cloned our Anki collection into a ki
repository,
we can start editing notes! Our home directory looks like this:
lyra@oxford:~$ ls
collection pkgs
And we see the repo we cloned, which is called collection
.
Let's change directories to the newly cloned ki
repo and take a look at
what's inside:
lyra@oxford:~$ cd collection/
lyra@oxford:~/collection$ ls --classify
algebras/ manifolds/ rings/
We see that we have three directories, which represent three Anki decks. This is just an example; you'll see directories corresponding to the top-level decks in your Anki collection.
Note. The
ls --classify
command adds a trailing/
to the end of directories to distinguish them from ordinary files.
Lets enter the manifolds
directory and see what's inside.
lyra@oxford:~/collection$ cd manifolds/
lyra@oxford:~/collection/manifolds$ ls
MANIFOLDS.md
So we see a single markdown file called MANIFOLDS.md
, which contains the
notes for the manifolds deck. If we had subdecks of the manifolds deck, we
would see more subdirectories here, and each one would have a markdown file in
it as well. Lets open this file and see what's inside.
We'll use vim to open the markdown file in this example, but any text editor will work.
lyra@oxford:~/collection/manifolds$ vi MANIFOLDS.md
# Note
nid: 1622849751948
model: Basic
deck: manifolds
tags:
markdown: false
## Front
Diffeomorphism
## Back
A smooth surjective map between manifolds which has a smooth inverse.
# Note
nid: 1566621764508
model: Basic
deck: manifolds
tags:
markdown: false
## Front
distribution (on a smooth manifold)
## Back
A distribution on \(M\) of rank \(k\) is a rank-\(k\) subbundle of \(TM\)
So we see the structure of two notes inside this file. For each note, there is a section for note metadata, and a section for each field.
There is a typo in the first note. It says smooth surjective map
, but it
should say smooth bijective map
. Lets fix it, save our changes, and go back
to the terminal. When we go back up to the root of the repository and run git
status
, we can see which files we've changed.
INTERNAL. Add the output of git status here.
And running git diff
shows us the content of the unstaged changes:
INTERNAL. Add the output of git diff here.
Then we can commit our changes as usual.
lyra@oxford:~/collection$ git add manifolds/MANIFOLDS.md
lyra@oxford:~/collection$ git commit -m "Fix typo in diffeomorphism definition: 'surjective' -> 'bijective'"
At this point we would usually git push
, but if we try that in a ki
repository, we'll see this:
lyra@oxford:~/collection$ git push
fatal: No configured push destination.
Either specify the URL from the command-line or configure a remote repository using
git remote add <name> <url>
and then push using the remote name
git push <name>
Since we're not pushing to an ordinary git
remote, but to the Anki SQLite3
database, we must use ki push
instead, which is covered briefly in the next
section.
Pushing committed changes back to Anki
This part is super easy! Similar to when we cloned, we must remember to close Anki before pushing, or the command will fail (gracefully). All right, now we just run the command:
lyra@oxford:~/collection$ ki push
Pushing to '/home/lyra/.local/share/Anki2/lyra/collection.anki2'
Computed md5sum: 199216c39eeabe23a1da016a99ffd3e2
Verified md5sum matches latest hash in '/home/lyra/decks/.ki/hashes'
Generating local .anki2 file from latest commit: 2aa009729b6dd337dd1ce795df611f5a49
Writing changes to '/tmp/tmpyiids2qm/original.anki2'...
100%|█████████████████████████████████| 2/2 [00:00<00:00, 1081.56it/s]
Database was modified.
Writing backup of .anki2 file to '/home/lyra/decks/.ki/backups'
Overwrote '/home/lyra/.local/share/Anki2/lyra/collection.anki2'
As the output suggests, ki
saves a backup of our collection each time we
push
, just in case we wish to hard-revert a change you've made.
Now we can open Anki and view the changes we've made in the note browser!
Pulling changes from Anki into the repository
So now we know how to make changes from the filesystem and push them back to Anki, but suppose that after we cloned our repository, we made some edits within Anki, and we'd like those to show up in our repository? For this, we'll need to close Anki, and then run the following command:
lyra@oxford:~/collection$ ki pull
Pulling from '/home/lyra/.local/share/Anki2/lyra/collection.anki2'
Computed md5sum: 199216c39eeabe23a1da016a99ffd3e2
Updating 5a9ef09..9c30b73
Fast-forward
note1645010162168.md | 4 ++--
note1645222430007.md | 11 +++++++++++
2 files changed, 13 insertions(+), 2 deletions(-)
create mode 100644 note1645222430007.md
From /tmp/tmpt5a3yd9a/ki/local/199216c39eeabe23a1da016a99ffd3e2/
* branch main -> FETCH_HEAD
* [new branch] main -> anki/main
Wrote md5sum to '/home/lyra/decks/.ki/hashes'
And we're done! Our repository is up to date, as ki
will tell us if we try to pull again:
lyra@oxford:~/collection$ ki pull
ki pull: up to date.
Merge conflicts
Occasionally, when we edit the same lines in the same note fields in both Anki and our local repository, we may encounter a merge conflict:
lyra@oxford:~/collection$ ki pull
Pulling from '/home/lyra/.local/share/Anki2/User 1/collection.anki2'
Computed md5sum: debeb6689f0b83d520ff913067c598e9
Auto-merging note1645788806304.md
CONFLICT (add/add): Merge conflict in note1645788806304.md
Automatic merge failed; fix conflicts and then commit the result.
From /tmp/tmpgkq4ilfy/ki/local/debeb6689f0b83d520ff913067c598e9/
* branch main -> FETCH_HEAD
* [new branch] main -> anki/main
Wrote md5sum to '/home/mal/collection/.ki/hashes'
This is expected behavior, and since the process of resolving merge conflicts
is the same for ki
repositories as git
repositories (since ki
repositories are git repositories), we refer to
StackOverflow
for how to proceed.
Usage reference
Clone
The ki clone
command takes one required argument (the path to a .anki2
file) and one optional argument (a path to a target directory). The usage is
meant to mirror that of git clone
.
An example of the clone
subcommand usage and its output is given below.
$ ki clone ~/.local/share/Anki2/lyra/collection.anki2 decks
Found .anki2 file at '/home/lyra/.local/share/Anki2/lyra/collection.anki2'
Computed md5sum: ad7ea6d486a327042cf0b09b54626b66
Wrote md5sum to '/home/lyra/decks/.ki/hashes'
Cloning into '/home/lyra/decks/'...
100%|█████████████████████████| 28886/28886 [00:10<00:00, 2883.78it/s]
Pull
Once an Anki collection has been cloned, we can pull
changes made by the Anki
desktop client into our repository.
An example of the pull
subcommand usage and its output is given below.
$ ki pull
Pulling from '/home/lyra/.local/share/Anki2/lyra/collection.anki2'
Computed md5sum: 199216c39eeabe23a1da016a99ffd3e2
Updating 5a9ef09..9c30b73
Fast-forward
note1645010162168.md | 4 ++--
note1645222430007.md | 11 +++++++++++
2 files changed, 13 insertions(+), 2 deletions(-)
create mode 100644 note1645222430007.md
From /tmp/tmpt5a3yd9a/ki/local/199216c39eeabe23a1da016a99ffd3e2/
* branch main -> FETCH_HEAD
* [new branch] main -> anki/main
Wrote md5sum to '/home/lyra/decks/.ki/hashes'
ki
first deletes any residual ephemeral repositories in /tmp/ki/remote/
.
These would only remain here if a previous pull command failed.
It then verifies that the path to the .anki2
file specified in the .ki/
directory (analogous to the .git/
directory) still exists.
It computes and records the hash of the collection file. In this way, ki
keeps track of whether the collection database has changed since the last
clone
/pull
.
Finally, the collection is then cloned into an ephemeral repository in a temp
directory, which is then git pull
-ed into the current repository.
At this point, if the git operation fails, the user can take over and manage the merge themselves.
Push
When we want to push our changes back to the Anki desktop client, we can use
ki push
to do that.
An example of the push
subcommand usage and its output is given below.
$ ki push
Pushing to '/home/lyra/.local/share/Anki2/lyra/collection.anki2'
Computed md5sum: 199216c39eeabe23a1da016a99ffd3e2
Verified md5sum matches latest hash in '/home/lyra/decks/.ki/hashes'
Generating local .anki2 file from latest commit: 2aa009729b6dd337dd1ce795df611f5a49
Writing changes to '/tmp/tmpyiids2qm/original.anki2'...
100%|█████████████████████████████████| 2/2 [00:00<00:00, 1081.56it/s]
Database was modified.
Writing backup of .anki2 file to '/home/lyra/decks/.ki/backups'
Overwrote '/home/lyra/.local/share/Anki2/lyra/collection.anki2'
We store 5 backups of the collection prior to a push.
Collaborative decks
This section assumes knowledge of the basic ki
operations and familiarity
with git
. If you haven't yet cloned your Anki collection into a ki
repository, read the getting started section.
- Cloning a collaborative deck from GitHub.
- Editing the collaborative deck.
- [Pulling][pulling other users' changes from github] other users' changes to the deck from GitHub.
- [Pushing][pushing edits back to github] edits back to GitHub.
Cloning a collaborative deck from GitHub
Now that we've created our first ki
repository, we might want to try our hand
at collaborating on a deck with other Anki users. We won't actually need to
make use of the ki
program to do this, because ki
repositories are also
git repositories, and so we can clone collaborative decks from GitHub as
git-submodules
of our collection repo.
Note. If you're completely unfamiliar with
git
, consider reading this short introduction.
Suppose we've cloned an Anki collection into a ki
repository in our home
directory, just like we did in the getting started section,
and we want to add a collaborative deck from GitHub to our collection. Let's
walk through an example. Our home directory looks like this:
lyra@oxford:~$ ls
collection pkgs
And we see the repo we cloned, which is called collection
.
To add a collaborative deck repo as a submodule, we'll first need to change
directories to the newly cloned ki
repo:
lyra@oxford:~$ cd collection/
lyra@oxford:~/collection$ ls --classify
algebras/ groups/ rings/
We see that we have three directories, which represent three Anki decks. This is just an example; you'll see directories corresponding to the top-level decks in your Anki collection.
Note. The
ls --classify
command adds a trailing/
to the end of directories to distinguish them from ordinary files.
Adding the repository as a git submodule
Suppose we want to add the collaborative deck https://github.com/langfield/manifolds.git to our collection. We can do that by running the command:
git-submodule add https://github.com/langfield/manifolds.git
which yields the output:
lyra@oxford~/collection$ git-submodule add https://github.com/langfield/manifolds.git
Cloning into 'manifolds'...
remote: Counting objects: 11, done.
remote: Compressing objects: 100% (10/10), done.
remote: Total 11 (delta 0), reused 11 (delta 0)
Unpacking objects: 100% (11/11), done.
Checking connectivity... done.
And we can see that the command was successful because we have a new
directory/deck called manifolds
in our repo:
lyra@oxford:~/collection$ ls --classify
algebras/ groups/ manifolds/ rings/
Nice!
Editing a collaborative deck
There are two ways to edit a collaborative deck locally:
- Edit the markdown files in the
ki
repository. - Edit the deck inside the Anki desktop client.
After we've cloned the manifolds
deck repository into a submodule of our ki
repository, we may want to make some edits to the deck.
How it works
ki
is built on top of existing tooling implemented in the python package
apy
, which is used to parse the Anki
collection SQLite file and convert its contents to human-readable markdown
files.
These files (one per Anki note) are then dumped to a configurable location in
the filesystem as a git repository, whose structure mirrors that of the decks
in the collection. In effect, ki
treats the git repo it generates as a local
copy of the collection, and the .anki2
collection file as a remote.
All operations like pulling updates to the collection into ki
and pushing
updates from ki
into Anki are handled by git under the hood.
This appproach has several advantages:
- Merge conflicts can be handled in the usual, familiar way.
- Additional remotes (e.g. a human-readable backup of a collection on github) can be added easily.
- Users are free to pick the editor of their choice, perform batch editing
with command line tools like
awk
orsed
, and even add CI actions.
Model
The following diagram shows the dataflow of a typical Anki/ki
stack.
+-------------+ +--------------+
| | | |
| AnkiWeb ------------- AnkiMobile |
| | sync | |
+------|------+ +--------------+
|
| sync
|
+------|------+
| |
| Anki |
| |
+------|------+
|
| deck edits
|
+--------|--------+ +------------------+
| | ki clone | |
| ----------------> |
| Collection file | | ~/decks/ |
| (.anki2) | ki push | (git repository) |
| <---------------- |
| | | |
+--------|--------+ +---------^--------+
| |
| ki pull |
| |
| |
+----------v----------+ |
| | |
| /tmp/ki/remote/AAA | ki pull |
| (git repository) -------------------------
| [ephemeral] |
| |
+---------------------+
The node labeled Anki is the Anki desktop client on the localhost. It communicates with the AnkiWeb servers via Anki's sync feature. Other clients (e.g. AnkiDroid and AnkiMobile) are able to (1) pull changes made by the desktop client into their local collections via AnkiWeb, and (2) push changes made locally back to AnkiWeb.
When the Anki desktop client is started on the localhost, it opens and places a
lock on the .anki2
SQLite file. During the session, changes are possibly made
to the deck, and the SQLite file is unlocked when the program is closed.
Since ki
must read from this database file, that means that ki
commands
will not work while Anki is running. This is by design: the database is
locked for a reason, and enforcing this constraint lowers the likelihood that
users' decks become corrupted.
An ephemeral repository is used as an auxiliary step during the ki pull
operation so that we can merge the Anki desktop client's changes into our
repository via git.
Generating html
By default, ki
parses the html of each field and dumps the content only,
insofar as that is possible. It also supports parsing arbitrary html elements
autogenerated by addons and regenerated the updated content. In the following
subsection, we walk through an example.
Example: generating syntax-highlighted code blocks
The anki addon developer Glutanimate has an addon called syntax-highlighting
,
which adds UI elements to the Anki note editor that automatically generates a
syntax highlighted version of a code block from the clipboard. In effect, it
generates a formatted HTML table for the code listing that gets dumped into the
source of relevant note field.
A fork of this addon is available here: https://ankiweb.net/shared/info/1100811177
And the source tree for the original addon is on github: https://github.com/glutanimate/syntax-highlighting
For example, consider the following python code block:
n = 1
n >> 1
print(n)
Given the above code, the addon generates the following HTML:
<table class="highlighttable">
<tbody>
<tr>
<td class="linenos">
<div class="linenodiv">
<pre>
<span class="normal">1</span>
<span class="normal">2</span>
<span class="normal">3</span>
</pre>
</div>
</td>
<td class="code">
<div class="highlight">
<pre>
<code>
<span class="n">n</span>
<span class="o">=</span>
<span class="mi">1</span>
<br>
<span class="n">n</span>
<span class="o">>></span>
<span class="mi">1</span>
<br>
<span class="nb">print</span>
<span class="p">(</span>
<span class="n">n</span>
<span class="p">)</span>
<br>
</code>
</pre>
</div>
</td>
</tr>
</tbody>
</table>
Editing fields like this could become annoying very quickly. It would be better
if ki
just gave us the markdown version above (only 3 lines), and then
regenerated the note field HTML when converting the repository back into a
.anki2
deck.
Adding ki
HTML attributes
And in fact, this is possible. We first fork the addon so we can add some extra
data to our generated HTML. In particular, we'd like to add an attribute
ki-src
whose value is the UTF-8 encoded source code. In general, this will be
the encoded version of the source of whatever we'd like to autoformat.
We also add a ki-formatter
attribute, whose value is an identifier that
specifies a custom python module (we must implement this) that transforms the
(possibly edited) ki-src
text back into a HTML element of the form seen
above.
So let's call our ki-formatter
identifier syntax-hl-python
. Then our addon
has to change the opening tag of the snippet above to look like:
<table class="highlighttable"; ki-src="n = 1\nn >> 1\nprint(n)\n"; ki-formatter="syntax-hl-python">
All ki
needs is the original text of the code block prior to html formatting,
and a function that can reapply the formatting to the modified text. Since the
html table was generated by an addon, we already have a python function for
this, and in general we can provide a ~/.config/ki/ki.json
file that maps
implementation IDs to paths of python modules. The module must have a top-level
function defined of the form format(text: str) -> bs4.Tag
.
If we have an addon implementation, we can import it here and use it in our
format()
implementation. We can add a ki
attribute whose value is the
base64 encoding of the code block, and a implementation
attribute whose value
is the name of a function. At import-time, ki
will decode this and write the
human-readable source to the relevant markdown file instead.
Source code
If you have git
, you can clone
a local copy of the source code by running the following command in a terminal:
git clone git@github.com:langfield/ki.git
Expand source code
"""
Ki is a command-line interface for the version control and editing of `.anki2`
collections as git repositories of markdown files. Rather than providing an
interactive UI like the Anki desktop client, ki aims to allow natural editing
*in the filesystem*.
In general, the purpose of ki is to allow users to work on large, complex Anki
decks in exactly the same way they work on large, complex software projects.
.. include:: ./DOCUMENTATION.md
"""
# pylint: disable=invalid-name, missing-class-docstring, broad-except
# pylint: disable=too-many-return-statements, too-many-lines, too-many-arguments
# pylint: disable=no-value-for-parameter, not-callable, unnecessary-lambda-assignment
import os
import re
import gc
import sys
import time
import json
import copy
import random
import logging
import sqlite3
import hashlib
import datetime
import itertools
import subprocess
import configparser
from pathlib import Path
from itertools import chain, starmap, tee
from functools import reduce
from collections import namedtuple
import git
import click
from lark import Lark
# Required to avoid circular imports because the Anki pylib codebase is gross.
import anki.collection
from anki.cards import Card
from anki.utils import ids2str
from anki.models import NotetypeDict
from anki.errors import NotFoundError
from anki.collection import Collection, Note, OpChangesWithId
from anki.importing.noteimp import NoteImporter
from beartype import beartype
from beartype.typing import (
Set,
List,
Dict,
Any,
Optional,
Callable,
Union,
TypeVar,
Tuple,
Iterator,
Iterable,
FrozenSet,
)
import ki.maybes as M
import ki.functional as F
from ki.types import (
MODELS_FILE,
File,
Dir,
EmptyDir,
NoPath,
NoFile,
GitChangeType,
Delta,
KiRepo,
Notetype,
ColNote,
KiRev,
Rev,
Deck,
Root,
DotKi,
CardFile,
NoteDBRow,
DeckNote,
NoteMetadata,
PushResult,
PlannedLink,
MediaBytes,
AddedMedia,
UpdatesRejectedError,
TargetExistsError,
CollectionChecksumError,
MissingNotetypeError,
NotetypeMismatchError,
NoteFieldValidationWarning,
DeletedFileNotFoundWarning,
DiffTargetFileNotFoundWarning,
NotetypeCollisionWarning,
SQLiteLockError,
MissingMediaDirectoryError,
WrongFieldCountWarning,
InconsistentFieldNamesWarning,
AnkiDBNoteMissingFieldsError,
RenamedMediaFileWarning,
NonEmptyWorkingTreeError,
EmptyNoteWarning,
DuplicateNoteWarning,
UnhealthyNoteWarning,
MediaDirectoryDeckNameCollisionWarning,
notetype_json,
)
from ki.maybes import (
GIT,
GITIGNORE_FILE,
GITMODULES_FILE,
KI,
HASHES_FILE,
BACKUPS_DIR,
)
from ki.transformer import NoteTransformer, FlatNote
curried = F.curried
logging.basicConfig(level=logging.INFO)
TQ = F.progressbar
T = TypeVar("T")
NoteId, DeckId, CardId = int, int, int
CardFileMap = Dict[DeckId, List[CardFile]]
GITATTRS_FILE = ".gitattributes"
UTF8 = "UTF-8"
URLS = "(https?|ftp)://"
MEDIA = M.MEDIA
DEV_NULL = "/dev/null"
BATCH_SIZE = 300
HTML_REGEX = r"</?\s*[a-z-][^>]*\s*>|(\&(?:[\w\d]+|#\d+|#x[a-f\d]+);)"
REMOTE_NAME = "anki"
BRANCH_NAME = F.BRANCH_NAME
MAX_FILENAME_LEN = 60
IGNORE_DIRS = set([GIT, KI, MEDIA])
IGNORE_FILES = set([GITIGNORE_FILE, GITMODULES_FILE, MODELS_FILE])
HEAD_SUFFIX = Path("ki-head")
LOCAL_SUFFIX = Path("ki-local")
REMOTE_SUFFIX = Path("ki-remote")
FIELD_HTML_SUFFIX = Path("ki-fieldhtml")
LCA = "last-successful-ki-push"
MEDIA_FILE_RECURSIVE_PATTERN = f"**/{MEDIA}/*"
# This is the key for media files associated with notetypes instead of the
# contents of a specific note.
NOTETYPE_NID = -57
MD = ".md"
ALHPANUMERICS = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
SYMBOLS = "!#$%&()*+,-./:;<=>?@[]^_`{|}~"
BASE91_TABLE = list(ALHPANUMERICS + SYMBOLS)
ADDED = GitChangeType.ADDED
RENAMED = GitChangeType.RENAMED
DELETED = GitChangeType.DELETED
MODIFIED = GitChangeType.MODIFIED
TYPECHANGED = GitChangeType.TYPECHANGED
@beartype
def do(f: Callable[[Any], Any], xs: Iterable[Any]) -> None:
"""Perform some action on an iterable."""
list(map(f, xs))
@beartype
def stardo(f: Callable[[Any], Any], xs: Iterable[Any]) -> None:
"""Perform some action on an iterable of tuples, unpacking arguments."""
list(starmap(f, xs))
@beartype
def lock(col_file: File) -> sqlite3.Connection:
"""Check that lock can be acquired on a SQLite3 database given a path."""
try:
con = sqlite3.connect(col_file, timeout=0.1)
con.isolation_level = "EXCLUSIVE"
con.execute("BEGIN EXCLUSIVE")
except sqlite3.DatabaseError as err:
raise SQLiteLockError(col_file, err) from err
if sys.platform == "win32":
con.commit()
con.close()
return con
@beartype
def unlock(con: sqlite3.Connection) -> None:
"""Unlock a SQLite3 database."""
if sys.platform == "win32":
return
con.commit()
con.close()
@beartype
def cp_repo(rev: Rev, suffix: str) -> git.Repo:
"""Get a temporary copy of a git repository in /tmp/<suffix>/."""
# Copy the entire repo into a temp directory ending in `../suffix/`.
target: NoFile = F.chk(F.mkdtemp() / suffix)
ephem = git.Repo(F.copytree(F.root(rev.repo), target))
# Do a reset --hard to the given SHA.
ephem.git.reset(rev.sha, hard=True)
return ephem
@beartype
def cp_ki(ki_rev: KiRev, suffix: str) -> KiRepo:
"""
Given a KiRev, i.e. a pair of the form (kirepo, SHA), we clone
`kirepo.repo` into a temp directory and hard reset to the given commit
hash. Copies the .ki/ directory from `ki_rev.kirepo` without making any
changes.
Parameters
----------
ki_rev : KiRev
The ki repository to clone, and a commit for it.
suffix : str
/tmp/.../ path suffix, e.g. `ki/local/`.
Returns
-------
KiRepo
The copied ki repository.
"""
rev: Rev = F.ki_rev_to_rev(ki_rev)
print(F.root(rev.repo))
ephem: git.Repo = cp_repo(rev, suffix)
F.force_mkdir(F.root(ephem) / KI / BACKUPS_DIR)
kirepo: KiRepo = M.kirepo(F.root(ephem))
return kirepo
@beartype
def is_anki_note(path: File) -> bool:
"""Check if file is a `ki`-style markdown note."""
# Ought to have markdown file extension.
if path.suffix != ".md":
return False
with open(path, "r", encoding=UTF8) as md_f:
lines = md_f.readlines()
if len(lines) < 8:
return False
if lines[0] != "# Note\n":
return False
if lines[1] != "```\n":
return False
if not re.match(r"^guid: ", lines[2]):
return False
return True
@beartype
def is_ignorable(root: Dir, path: Path) -> bool:
"""
Filter out paths in a git repository diff that do not correspond to Anki
notes.
We could do this purely using calls to `is_anki_note()`, but these are
expensive, so we try to find matches without opening any files first.
"""
# Ignore if `path` is an exact match for any of the patterns Since the
# contents of a git repository diff are always going to be files, this
# alone will not correctly ignore directory names given in `patterns`.
#
# If any of the patterns in `dirnames` resolve to one of the parents of
# `path`, return a warning, so that we are able to filter out entire
# directories.
filenames, dirnames = IGNORE_FILES, IGNORE_DIRS
if path.name in filenames | dirnames or len(set(path.parts) & dirnames) > 0:
return True
# If `path` is an extant file (not a directory) and *not* a note, ignore it.
file = F.chk(root / path)
if isinstance(file, File) and not is_anki_note(file):
return True
return False
@curried
@beartype
def mungediff(
parse: Callable[[Delta], DeckNote], a_root: Dir, b_root: Dir, d: git.Diff
) -> Iterable[Union[Delta, Warning]]:
"""Extract deltas and warnings from a collection of diffs."""
a, b = d.a_path, d.b_path
a, b = a if a else b, b if b else a
if is_ignorable(a_root, Path(a)) or is_ignorable(b_root, Path(b)):
return []
# Get absolute and relative paths to 'a' and 'b'.
AB = namedtuple("AB", "a b")
files = AB(F.chk(a_root / a), F.chk(b_root / b))
rels = AB(Path(a), Path(b))
if d.change_type == DELETED.value:
if not F.isfile(files.a):
return [DeletedFileNotFoundWarning(rels.a)]
return [Delta(GitChangeType.DELETED, files.a, rels.a)]
if not F.isfile(files.b):
return [DiffTargetFileNotFoundWarning(rels.b)]
if d.change_type == RENAMED.value:
a_delta = Delta(GitChangeType.DELETED, files.a, rels.a)
b_delta = Delta(GitChangeType.ADDED, files.b, rels.b)
a_decknote, b_decknote = parse(a_delta), parse(b_delta)
if a_decknote.guid != b_decknote.guid:
return [a_delta, b_delta]
return [Delta(GitChangeType(d.change_type), files.b, rels.b)]
@beartype
def diff2(
repo: git.Repo,
parse: Callable[[Delta], DeckNote],
) -> Iterable[Union[Delta, Warning]]:
"""Diff `repo` from `HEAD~1` to `HEAD`."""
# We diff from A~B.
head1: Rev = M.rev(repo, repo.commit("HEAD~1").hexsha)
uuid = hex(random.randrange(16**4))[2:]
head1_repo = cp_repo(head1, suffix=f"HEAD~1-{uuid}")
a_root, b_root = F.root(head1_repo), F.root(repo)
diffidx = repo.commit("HEAD~1").diff(repo.commit("HEAD"))
# Get the diffs for each change type (e.g. 'DELETED').
return chain(*map(mungediff(parse, a_root, b_root), diffidx))
@beartype
def get_models_recursively(kirepo: KiRepo) -> Dict[str, Notetype]:
"""
Find and merge all `models.json` files recursively. Returns a dictionary
sending model names to Notetypes.
"""
@beartype
def load(file: File) -> Iterable[Notetype]:
"""Load a models file."""
with open(file, "r", encoding=UTF8) as f:
return map(M.notetype, json.load(f).values())
notetypes = F.cat(map(load, F.rglob(kirepo.root, MODELS_FILE)))
return {notetype.name: notetype for notetype in notetypes}
@beartype
def check_fields_health(note: Note) -> List[Warning]:
"""Construct warnings when Anki's fields health check fails."""
health = note.fields_check()
if health == 1:
return [EmptyNoteWarning(note, health)]
if health == 2:
return [DuplicateNoteWarning(note, health, html_to_screen(note.fields[0]))]
if health != 0:
return [UnhealthyNoteWarning(note, health)]
return []
@beartype
def get_guid(fields: List[str]) -> str:
"""Construct a new GUID for a note. Adapted from genanki's `guid_for()`."""
# Get the first 8 bytes of the SHA256 of `contents` as an int.
m = hashlib.sha256()
m.update("__".join(fields).encode("utf-8"))
x = reduce(lambda h, b: (h << 8) + b, m.digest()[:8], 0)
# convert to the weird base91 format that Anki uses
chars = []
while x > 0:
chars.append(BASE91_TABLE[x % len(BASE91_TABLE)])
x //= len(BASE91_TABLE)
return "".join(reversed(chars))
@curried
@beartype
def parse_note(parser: Lark, transformer: NoteTransformer, delta: Delta) -> DeckNote:
"""Parse with lark."""
tree = parser.parse(delta.path.read_text(encoding=UTF8))
flatnote: FlatNote = transformer.transform(tree)
parts: Tuple[str, ...] = delta.relpath.parent.parts
deck: str = "::".join(parts)
# Generate a GUID from the hash of the field contents if the `guid` field
# in the note file was left blank.
fields = list(flatnote.fields.values())
guid = flatnote.guid if flatnote.guid != "" else get_guid(fields)
return DeckNote(
title=flatnote.title,
guid=guid,
deck=deck,
model=flatnote.model,
tags=flatnote.tags,
fields=flatnote.fields,
)
@beartype
def plain_to_html(plain: str) -> str:
"""Convert plain text to html"""
# Minor clean up
plain = plain.replace(r"<", "<")
plain = plain.replace(r">", ">")
plain = plain.replace(r"&", "&")
plain = plain.replace(r" ", " ")
plain = re.sub(r"\<b\>\s*\<\/b\>", "", plain)
plain = re.sub(r"\<i\>\s*\<\/i\>", "", plain)
plain = re.sub(r"\<div\>\s*\<\/div\>", "", plain)
# Convert newlines to `<br>` tags.
if not re.search(HTML_REGEX, plain):
plain = plain.replace("\n", "<br>")
return plain.strip()
@curried
@beartype
def update_field(decknote: DeckNote, note: Note, key: str, field: str) -> None:
"""Update a field contained in `note`."""
try:
note[key] = plain_to_html(field)
except IndexError as err:
raise AnkiDBNoteMissingFieldsError(decknote, note.id, key) from err
@beartype
def update_note(
note: Note, decknote: DeckNote, old_notetype: Notetype, new_notetype: Notetype
) -> Iterable[Warning]:
"""
Change all the data of `note` to that given in `decknote`.
This is only to be called on notes whose nid already exists in the
database. Creates a new deck if `decknote.deck` doesn't exist. Assumes
that the model has already been added to the collection, and raises an
exception if it finds otherwise. Changes notetype to that specified by
`decknote.model`. Overwrites all fields with `decknote.fields`.
Updates:
- tags
- deck
- model
- fields
"""
# Check that the passed argument `new_notetype` has a name consistent with
# the model specified in `decknote`. The former should be derived from the
# latter, and if they don't match, there is a bug in the caller.
if decknote.model != new_notetype.name:
raise NotetypeMismatchError(decknote, new_notetype)
nid = note.id
note.tags = decknote.tags
note.flush()
# Set the deck of the given note, as well as all its cards, and create a
# deck with this name if it doesn't already exist. See the
# comments/docstrings in the implementation of the
# `anki.decks.DeckManager.id()` method.
newdid: int = note.col.decks.id(decknote.deck, create=True)
cids = [c.id for c in note.cards()]
if cids:
note.col.set_deck(cids, newdid)
# Set notetype (also clears all fields).
if old_notetype.id != new_notetype.id:
fmap = {field.ord: None for field in old_notetype.flds}
note.col.models.change(old_notetype.dict, [nid], new_notetype.dict, fmap, None)
note.load()
# Validate field keys against notetype.
warnings: List[Warning] = validate_decknote_fields(new_notetype, decknote)
if len(warnings) > 0:
return warnings
# Set field values and flush to collection database. This is correct
# because every field name that appears in `new_notetype` is contained in
# `decknote.fields`, or else we would have printed a warning and returned
# above.
missing = {key for key in decknote.fields if key not in note}
warnings = map(lambda k: NoteFieldValidationWarning(nid, k, new_notetype), missing)
fields = [(key, field) for key, field in decknote.fields.items() if key in note]
stardo(update_field(decknote, note), fields)
note.flush()
# Remove if unhealthy.
fwarns: List[Warning] = check_fields_health(note)
if len(fwarns) > 0:
note.col.remove_notes([nid])
return chain(warnings, fwarns)
@beartype
def validate_decknote_fields(notetype: Notetype, decknote: DeckNote) -> List[Warning]:
"""Validate that the fields given in the note match the notetype."""
warnings: List[Warning] = []
names: List[str] = [field.name for field in notetype.flds]
# TODO: It might also be nice to print the path of the note in the
# repository. This would have to be added to the `DeckNote` spec.
if len(decknote.fields.keys()) != len(names):
warnings.append(WrongFieldCountWarning(decknote, names))
mk_warning = lambda n, k: InconsistentFieldNamesWarning(n, k, decknote)
names_and_keys = F.starfilter(
lambda n, k: n != k, zip(names, decknote.fields.keys())
)
return warnings + list(starmap(mk_warning, names_and_keys))
@beartype
def get_note_path(colnote: ColNote, deck_dir: Dir, card_name: str = "") -> NoFile:
"""Get note path from sort field text."""
field_text = colnote.sfld
# Construct filename, stripping HTML tags and sanitizing (quickly).
field_text = plain_to_html(field_text)
field_text = re.sub("<[^<]+?>", "", field_text)
# If the HTML stripping removed all text, we just slugify the raw sort
# field text.
if len(field_text) == 0:
field_text = colnote.sfld
name = field_text[:MAX_FILENAME_LEN]
slug = F.slugify(name)
# If the slug is still empty, use all the fields.
if len(slug) == 0:
contents = " ".join(colnote.n.values())
name = contents[:MAX_FILENAME_LEN]
slug = F.slugify(name)
# Make it so `slug` cannot possibly be an empty string, because then we get
# a `Path('.')` which is a bug, and causes a runtime exception. If all else
# fails, use the notetype name, hash of the payload, and creation date.
if len(slug) == 0:
guidhex = colnote.n.guid.encode(UTF8).hex()
slug: str = f"{colnote.notetype.name}--{guidhex}"
# Note IDs are in milliseconds.
dt = datetime.datetime.fromtimestamp(colnote.n.id / 1000.0)
slug += "--" + dt.strftime("%Y-%m-%d--%Hh-%Mm-%Ss")
F.yellow(f"Slug for note with guid '{colnote.n.guid}' is empty...")
F.yellow(f"Using hex representation of guid in filename: '{slug}'")
if card_name != "":
slug = f"{slug}_{card_name}"
filename: str = f"{slug}{MD}"
note_path = F.chk(deck_dir / filename, resolve=False)
i = 1
while not isinstance(note_path, NoFile):
filename = f"{slug}_{i}{MD}"
note_path = F.chk(deck_dir / filename, resolve=False)
i += 1
return note_path
@beartype
def backup(kirepo: KiRepo) -> int:
"""Backup collection to `.ki/backups`."""
timestamp = datetime.datetime.now().strftime("%Y-%m-%d--%Hh-%Mm-%Ss")
md5sum = F.md5(kirepo.col_file)
name = f"{timestamp}--{md5sum}.anki2"
backup_file = F.chk(kirepo.backups_dir / name)
# We assume here that no one would ever make e.g. a directory called
# `name`, since `name` contains the md5sum of the collection file, and
# thus that is extraordinarily improbable. So the only thing we have to
# check for is that we haven't already written a backup file to this
# location.
if isinstance(backup_file, File):
return 1
F.copyfile(kirepo.col_file, F.chk(kirepo.backups_dir / name))
return 0
@beartype
def append_md5sum(dotki: Dir, tag: str, md5sum: str) -> None:
"""Append an md5sum hash to the hashes file."""
hashes_file = dotki / HASHES_FILE
with open(hashes_file, "a+", encoding=UTF8) as hashes_f:
hashes_f.write(f"{md5sum} {tag}\n")
@beartype
def get_field_note_id(nid: int, fieldname: str) -> str:
"""A str ID that uniquely identifies field-note pairs."""
return f"{nid}{F.slugify(fieldname)}"
@beartype
def add_db_note(
col: Collection,
nid: int,
guid: str,
mid: int,
mod: int,
usn: int,
tags: List[str],
fields: List[str],
sfld: str,
csum: int,
flags: int,
data: str,
) -> Note:
"""Add a note to the database directly, with a SQL INSERT."""
importer = NoteImporter(col, "")
importer.addNew(
[
(
nid,
guid,
mid,
mod,
usn,
" " + " ".join(tags) + " ",
"\x1f".join(fields),
sfld,
csum,
flags,
data,
)
]
)
# All the `mark_modified` flag does is update `mod`. Since we always set
# `mod` to the current timestamp anyway, this doesn't matter, so may as
# well set it to `True` to reflect the semantics of the operation we're
# performing. This may present issues down the road since newly imported
# cards from cloned submodules will be marked modified on import/push,
# which is not exactly right. The anki2 importer does *not* mark as
# modified, because importing a new note does not modify its content. We
# would need to have `mod` data inside the note grammar in order for this
# to make sense, which may be more trouble than it's worth. Users writing
# new notes as markdown files would have to set the `mod` to some default
# value, or leave it blank. Assuming people don't do this nearly as often
# as they will export or push notes they've created in Anki, then it might
# make sense.
col.after_note_updates([nid], mark_modified=True)
return col.get_note(nid)
@curried
@beartype
def push_note(
col: Collection,
timestamp_ns: int,
guids: Dict[str, NoteMetadata],
new_nids: Iterator[int],
decknote: DeckNote,
) -> Iterable[Warning]:
"""
Update the Anki `Note` object in `col` corresponding to `decknote`,
creating it if it does not already exist.
Raises
------
MissingNotetypeError
If we can't find a notetype with the name provided in `decknote`.
"""
# Notetype/model names are privileged in Anki, so if we don't find the
# right name, we raise an error.
model_id: Optional[int] = col.models.id_for_name(decknote.model)
if model_id is None:
raise MissingNotetypeError(decknote.model)
new_notetype: Notetype = M.notetype(col.models.get(model_id))
if decknote.guid in guids:
nid: int = guids[decknote.guid].nid
try:
note: Note = col.get_note(nid)
except NotFoundError as err:
print(f"{nid = }")
print(f"{decknote.guid = }")
raise err
else:
nid: int = next(new_nids)
note: Note = add_db_note(
col,
nid,
decknote.guid,
model_id,
mod=int(timestamp_ns // 1e9),
usn=-1,
tags=decknote.tags,
fields=list(decknote.fields.values()),
sfld=decknote.fields[new_notetype.sortf.name],
csum=0,
flags=0,
data="",
)
# If we are updating an existing note, we need to know the old and new
# notetypes, and then update the notetype (and the rest of the note data)
# accordingly.
old_notetype: Notetype = M.notetype(note.note_type())
return update_note(note, decknote, old_notetype, new_notetype)
@beartype
def get_header_lines(colnote) -> List[str]:
"""Get header of markdown representation of note."""
lines = [
"# Note",
"```",
f"guid: {colnote.n.guid}",
f"notetype: {colnote.notetype.name}",
"```",
"",
"### Tags",
"```",
]
lines += colnote.n.tags
lines += ["```", ""]
return lines
@curried
@beartype
def localmedia(s: str, regex: str) -> Iterable[str]:
"""Return local media filenames matching the given regex pattern."""
fnames = map(lambda m: m.group("fname"), re.finditer(regex, s))
fnames = map(lambda s: s.strip(), fnames)
return filter(lambda x: not re.match(URLS, x.lower()), fnames)
@beartype
def media_filenames_in_field(col: Collection, s: str) -> Iterable[str]:
"""A copy of `MediaManager.files_in_str()`, but without LaTeX rendering."""
s = (s.strip()).replace('"', "")
return F.cat(map(localmedia(s), col.media.regexps))
@curried
@beartype
def copy_note_media(
col: Collection, src: Dir, tgt: Dir, row: NoteDBRow
) -> FrozenSet[File]:
"""
Copy a single note's media files and return the copies as a set. We do this
by first filtering for only 'rootfiles', i.e. excluding media files in
subdirectories of the media directory. Then we take only those which exist,
i.e. typecheck as `File`. Then we construct the source and destination
paths, and finally actually perform the copy op, returning the result.
Note that `src` is the media directory where the files originate, and `tgt`
is the media directory we're copying to.
"""
files: Iterable[str] = media_filenames_in_field(col, row.flds)
rootfiles = filter(lambda f: f == os.path.basename(f), files)
medias: Iterable[File] = filter(F.isfile, map(lambda f: F.chk(src / f), rootfiles))
srcdsts = map(lambda file: (file, F.chk(tgt / file.name)), medias)
return frozenset(starmap(F.copyfile, srcdsts))
@curried
@beartype
def copy_notetype_media(
src: Dir, tgt: Dir, paths: Set[Path], m: NotetypeDict
) -> FrozenSet[File]:
"""Copy media from notetype `m` from source to target, returning set of copies."""
matches: Iterable[Path] = filter(lambda p: hasmedia(m, str(p)), paths)
medias = filter(F.isfile, map(lambda p: F.chk(src / p), matches))
srcdsts = map(lambda f: (f, F.chk(tgt / f.name)), medias)
return frozenset(starmap(F.copyfile, srcdsts))
@beartype
def copy_media_files(
col: Collection,
media_target_dir: EmptyDir,
) -> Dict[int, Set[File]]:
"""
Get a list of extant media files used in notes and notetypes, copy those
media files to the top-level `_media/` directory in the repository root,
and return a map sending note ids to sets of copied media files.
Adapted from code in `anki/pylib/anki/exporting.py`. Specifically, the
`AnkiExporter.exportInto()` function.
Parameters
----------
col
Anki collection.
media_target_dir
Where media files are to be copied to.
"""
# All note ids as a string for the SQL query.
strnids = ids2str(list(col.find_notes(query="")))
# This is the path to the media directory. In the original implementation
# of `AnkiExporter.exportInto()`, there is check made of the form
#
# if self.mediaDir:
#
# before doing path manipulation with this string.
#
# Examining the `__init__()` function of `MediaManager`, we can see that
# `col.media.dir()` will only be `None` in the case where `server=True` is
# passed to the `Collection` constructor. But since we do the construction
# within ki, we have a guarantee that this will never be true, and thus we
# can assume it is a nonempty string, which is all we need for the
# following code to be safe.
media_dir = F.chk(Path(col.media.dir()))
if not isinstance(media_dir, Dir):
raise MissingMediaDirectoryError(col.path, media_dir)
# Find media files that appear in note fields and copy them to the target.
query: str = "select * from notes where id in " + strnids
rows: List[NoteDBRow] = [NoteDBRow(*row) for row in col.db.all(query)]
rows = TQ(rows, "Media")
copy_fn = copy_note_media(col, media_dir, media_target_dir)
media = {row.nid: copy_fn(row) for row in rows}
mids = col.db.list("select distinct mid from notes where id in " + strnids)
# Copy notetype template media files.
_, _, files = F.shallow_walk(media_dir)
paths: Iterable[Path] = map(lambda f: Path(f.name), files)
paths = set(filter(lambda f: str(f).startswith("_"), paths))
models = filter(lambda m: int(m["id"]) in mids, col.models.all())
mediasets = map(copy_notetype_media(media_dir, media_target_dir, paths), models)
media[NOTETYPE_NID] = reduce(lambda x, y: x.union(y), mediasets, set())
return media
@beartype
def hasmedia(model: NotetypeDict, fname: str) -> bool:
"""
Check if a notetype has media.
Adapted from `anki.exporting.AnkiExporter._modelHasMedia()`, which is an
instance method, but does not make any use of `self`, and so could be a
staticmethod. It is a pure function.
"""
# First check the styling.
if fname in model["css"]:
return True
# If no reference to fname then check the templates as well.
return any(map(lambda t: fname in t["qfmt"] or fname in t["afmt"], model["tmpls"]))
@beartype
def write_repository(
col: Collection,
targetdir: Dir,
dotki: DotKi,
media_target_dir: EmptyDir,
) -> Collection:
"""Write notes to appropriate directories in `targetdir`."""
# Create config file.
config = configparser.ConfigParser()
config["remote"] = {"path": col.path}
with open(dotki.config, "w", encoding=UTF8) as config_f:
config.write(config_f)
# ColNote-containing data structure, to be passed to `write_decks()`.
nids: Iterable[int] = TQ(col.find_notes(query=""), "Notes")
colnotes: Dict[int, ColNote] = {nid: M.colnote(col, nid) for nid in nids}
media: Dict[int, Set[File]] = copy_media_files(col, media_target_dir)
write_decks(
col=col,
targetdir=targetdir,
colnotes=colnotes,
media=media,
)
return col
@beartype
def postorder(node: Union[Root, Deck]) -> List[Deck]:
"""
Post-order traversal. Guarantees that we won't process a node until we've
processed all its children.
"""
descendants: List[Deck] = reduce(lambda xs, x: xs + postorder(x), node.children, [])
return descendants if isinstance(node, Root) else descendants + [node]
@beartype
def preorder(node: Union[Root, Deck]) -> List[Deck]:
"""
Pre-order traversal. Guarantees that we won't process a node until
we've processed all its ancestors.
"""
descendants: List[Deck] = reduce(lambda xs, x: xs + preorder(x), node.children, [])
return descendants if isinstance(node, Root) else [node] + descendants
@beartype
def write_decks(
col: Collection,
targetdir: Dir,
colnotes: Dict[int, ColNote],
media: Dict[int, Set[File]],
) -> None:
"""
The proper way to do this is a DFS traversal, perhaps recursively, which
will make it easier to keep things purely functional, accumulating the
model ids of the children in each node. For this, we must construct a tree
from the deck names.
Implement new `ColNote`-writing procedure, using `DeckTreeNode`s.
It must do the following for each deck:
- create the deck directory
- write the models.json file
- create and populate the media directory
- write the note payload for each note in the correct deck, exactly once
In other words, for each deck, we need to write all of its:
- models
- media
- notes
The first two are cumulative: we want the models and media of subdecks to
be included in their ancestors. The notes, however, should not be
cumulative. Indeed, we want each note to appear exactly once in the
entire repository, making allowances for the case where a single note's
cards are spread across multiple decks, in which case we must create a
symlink.
And actually, both of these cases are nicely taken care of for us by the
`DeckManager.cids()` function, which has a `children: bool` parameter
which toggles whether or not to get the card ids of subdecks or not.
"""
# Accumulate pairs of model ids and notetype maps. The return type of the
# `ModelManager.get()` call below indicates that it may return `None`,
# but we know it will not because we are getting the notetype id straight
# from the Anki DB.
#
# Dump the models file for the whole repository.
models = {m.id: col.models.get(m.id) for m in col.models.all_names_and_ids()}
with open(targetdir / MODELS_FILE, "w", encoding=UTF8) as f:
json.dump(models, f, ensure_ascii=False, indent=4, sort_keys=True)
# Construct an iterable of all decks except the trivial deck.
root: Deck = M.tree(col, targetdir, col.decks.deck_tree())
collisions, decks = F.part(lambda d: MEDIA in d.fullname, postorder(root))
if any(True for _ in collisions):
warn(MediaDirectoryDeckNameCollisionWarning())
decks = list(decks)
deckmap = {d.fullname: d for d in decks}
# Write cards, models, and media to filesystem.
do(write_note(deckmap), TQ(colnotes.values(), "Notes"))
do(write_models(col, models), TQ(decks, "Notetypes"))
symlink_media(col, root, targetdir, media)
@curried
@beartype
def write_note(
deckmap: Dict[str, Deck],
colnote: ColNote,
) -> File:
decknames = set(map(lambda c: c.col.decks.name(c.did), colnote.n.cards()))
sortf = colnote.sfld
if len(decknames) == 0:
raise ValueError(f"No cards for note: {sortf}")
if len(decknames) > 1:
raise ValueError(f"Cards for note {sortf} are in distinct decks: {decknames}")
fullname = decknames.pop()
parts = fullname.split("::")
if "_media" in parts:
raise ValueError(f"Bad deck name '{fullname}' (cannot contain '_media')")
deck: Deck = deckmap[fullname]
path: NoFile = get_note_path(colnote, deck.deckd)
payload: str = get_note_payload(colnote)
return F.write(path, payload)
@curried
@beartype
def write_models(col: Collection, models: Dict[int, NotetypeDict], deck: Deck) -> None:
"""Write the `models.json` file for the given deck."""
did: int = deck.did
deckd: Dir = deck.deckd
descendants: List[CardId] = col.decks.cids(did=did, children=True)
cards: List[Card] = list(map(col.get_card, descendants))
descendant_mids: Set[int] = {c.note().mid for c in cards}
# Write `models.json` for current deck.
deck_models = {mid: models[mid] for mid in descendant_mids}
with open(deckd / MODELS_FILE, "w", encoding=UTF8) as f:
json.dump(deck_models, f, ensure_ascii=False, indent=4, sort_keys=True)
@beartype
def mklink(targetd: Dir, colnote: ColNote, deckd: Dir, card: Card, file: File) -> None:
"""Return a windows link for a card if one is necessary."""
note_path: NoFile = get_note_path(colnote, deckd, card.template()["name"])
M.link(targetd, PlannedLink(link=note_path, tgt=file))
@beartype
def parentmap(root: Union[Root, Deck]) -> Dict[str, Union[Root, Deck]]:
"""Map deck fullnames to parent `Deck`s."""
parents = {child.fullname: root for child in root.children}
return parents | reduce(lambda x, y: x | y, map(parentmap, root.children), {})
@curried
@beartype
def planned_link(
parents: Dict[str, Union[Root, Deck]], deck: Deck, media_file: File
) -> Optional[PlannedLink]:
"""Get the target of the to-be-created media symlink."""
link: Path = F.chk(deck.mediad / media_file.name, resolve=False)
if not isinstance(link, NoFile):
return None
parent: Union[Root, Deck] = parents[deck.fullname]
if isinstance(parent, Root):
tgt = media_file
else:
tgt = F.chk(parent.mediad / media_file.name, resolve=False)
return PlannedLink(link=link, tgt=tgt)
@curried
@beartype
def symlink_deck_media(
col: Collection,
targetd: Dir,
media: Dict[int, Set[File]],
parents: Dict[str, Union[Root, Deck]],
deck: Deck,
) -> None:
"""Create chained symlinks for a single deck."""
# Get nids for all descendant notes with media.
descendants: List[CardId] = col.decks.cids(did=deck.did, children=True)
cards: Iterable[Card] = map(col.get_card, descendants)
nids: Set[NoteId] = {NOTETYPE_NID} | set(map(lambda c: c.nid, cards))
# Get link path and target for each media file, and create the links.
files = F.cat(map(lambda nid: media[nid], filter(lambda nid: nid in media, nids)))
plinks = filter(None, map(planned_link(parents, deck), files))
do(M.link(targetd), plinks)
@beartype
def symlink_media(
col: Collection,
root: Root,
targetd: Dir,
media: Dict[int, Set[File]],
) -> None:
"""Chain symlinks up the deck tree into top-level `<collection>/_media/`."""
decks: List[Deck] = preorder(root)
parents: Dict[str, Union[Root, Deck]] = parentmap(root)
return do(symlink_deck_media(col, targetd, media, parents), decks)
@beartype
def html_to_screen(html: str) -> str:
"""
Convert html for a *single field* into plaintext, to be displayed within a
markdown file.
Does very litle (just converts HTML-escaped special characters like `<br>`
tags or ` `s to their UTF-8 equivalents).
"""
html = re.sub(r"\<style\>.*\<\/style\>", "", html, flags=re.S)
plain = html
# For convenience: Un-escape some common LaTeX constructs.
plain = plain.replace(r"\\\\", r"\\")
plain = plain.replace(r"\\{", r"\{")
plain = plain.replace(r"\\}", r"\}")
plain = plain.replace(r"\*}", r"*}")
plain = plain.replace(r"<", "<")
plain = plain.replace(r">", ">")
plain = plain.replace(r"&", "&")
plain = plain.replace(r" ", " ")
plain = plain.replace("<br>", "\n")
plain = plain.replace("<br/>", "\n")
plain = plain.replace("<br />", "\n")
# Unbreak lines within src attributes.
plain = re.sub('src= ?\n"', 'src="', plain)
plain = re.sub(r"\<b\>\s*\<\/b\>", "", plain)
return plain
@curried
@beartype
def get_field_payload(col: Collection, name: str, content: str) -> List[str]:
"""Get the lines of a markdown snippet for some Anki note field."""
text = col.media.escape_media_filenames(html_to_screen(content), unescape=True)
return [f"## {name}", text, ""]
@beartype
def get_note_payload(colnote: ColNote) -> str:
"""
Return the markdown-converted contents of the Anki note represented by
`colnote` as a string.
A `ColNote` is a dataclass wrapper around a `Note` object which has been
loaded from the DB.
"""
lines = get_header_lines(colnote)
lines += F.cat(starmap(get_field_payload(colnote.n.col), colnote.n.items()))
return "\n".join(lines)
@beartype
def git_pull(remote: str, branch: str, cwd: Dir) -> str:
"""Pull remote into branch using a subprocess call."""
args = ["git", "pull", "-v", remote, branch]
p = subprocess.run(args, check=False, cwd=cwd, capture_output=True)
return f"{p.stdout.decode()}\n{p.stderr.decode()}"
@beartype
def echo(string: str, silent: bool = False) -> None:
"""Call `click.secho()` with formatting."""
if not silent:
click.secho(string, bold=True)
@beartype
def warn(w: Warning) -> None:
"""Call `click.secho()` with formatting (yellow)."""
click.secho(f"WARNING: {str(w)}", bold=True, fg="yellow")
@beartype
def get_target(cwd: Dir, col_file: File, directory: str) -> Tuple[EmptyDir, bool]:
"""Create default target directory."""
path = F.chk(Path(directory) if directory != "" else cwd / col_file.stem)
new: bool = True
if isinstance(path, NoPath):
path.mkdir(parents=True)
return M.emptydir(path), new
if isinstance(path, EmptyDir):
new = False
return path, new
raise TargetExistsError(path)
@beartype
def echo_note_change_types(deltas: Iterable[Delta]) -> None:
"""Write a table of git change types for notes to stdout."""
# pylint: disable=too-many-locals
is_change_type = lambda t: lambda d: d.status == t
vs, ws, xs, ys, zs = tee(deltas, 5)
adds = list(filter(is_change_type(ADDED), vs))
deletes = list(filter(is_change_type(DELETED), ws))
renames = list(filter(is_change_type(RENAMED), xs))
modifies = list(filter(is_change_type(MODIFIED), ys))
types = list(filter(is_change_type(TYPECHANGED), zs))
LPAD, RPAD = 15, 9
add_info: str = "ADD".ljust(LPAD) + str(len(adds)).rjust(RPAD)
delete_info: str = "DELETE".ljust(LPAD) + str(len(deletes)).rjust(RPAD)
modification_info: str = "MODIFY".ljust(LPAD) + str(len(modifies)).rjust(RPAD)
rename_info: str = "RENAME".ljust(LPAD) + str(len(renames)).rjust(RPAD)
type_info: str = "TYPE CHANGE".ljust(LPAD) + str(len(types)).rjust(RPAD)
echo("=" * (LPAD + RPAD))
echo("Note change types")
echo("-" * (LPAD + RPAD))
echo(f"{add_info}\n{delete_info}\n{modification_info}\n{rename_info}\n{type_info}")
echo("=" * (LPAD + RPAD))
@curried
@beartype
def add_model(col: Collection, model: Notetype) -> None:
"""Add a model to the database."""
# Check if a model already exists with this name, and get its `mid`.
mid: Optional[int] = col.models.id_for_name(model.name)
# TODO: This function is unfinished. We need to add new notetypes (and
# rename them) only if they are 'new', where new means they are different
# from anything else already in the DB, in the content-addressed sense. If
# they are new, then we must indicate that the notes we are adding actually
# have these new notetypes. For this, it may make sense to use the hash of
# the notetype everywhere (i.e. in the note file) rather than the name or
# mid.
#
# If a model already exists with this name, parse it, and check if its hash
# is identical to the model we are trying to add.
if mid is not None:
nt: NotetypeDict = col.models.get(mid)
# If we are trying to add a model that has the exact same content and
# name as an existing model, skip it.
existing: Notetype = M.notetype(nt)
if notetype_json(model) == notetype_json(existing):
return
# If the hashes don't match, then we somehow need to update
# `decknote.model` for the relevant notes.
warn(NotetypeCollisionWarning(model, existing))
nt_copy: NotetypeDict = copy.deepcopy(model.dict)
nt_copy["id"] = 0
changes: OpChangesWithId = col.models.add_dict(nt_copy)
nt: NotetypeDict = col.models.get(changes.id)
model: Notetype = M.notetype(nt)
echo(f"Added model '{model.name}'")
@beartype
def mediadata(col: Collection, fname: str) -> bytes:
"""Get media file content as bytes (empty if missing)."""
if not col.media.have(fname):
return b""
path = os.path.join(col.media.dir(), fname)
try:
with open(path, "rb") as f:
return f.read()
except OSError:
return b""
@beartype
def get_note_metadata(col: Collection) -> Dict[str, NoteMetadata]:
"""
Construct a map from guid -> (nid, mod, mid), adapted from
`Anki2Importer._import_notes()`. Note that `mod` is the modification
timestamp, in epoch seconds (timestamp of when the note was last modified).
"""
guids: Dict[str, NoteMetadata] = {}
for nid, guid, mod, mid in col.db.execute("select id, guid, mod, mid from notes"):
guids[guid] = NoteMetadata(nid, mod, mid)
return guids
@curried
@beartype
def mediabytes(col: Collection, file: File) -> MediaBytes:
"""Get old bytes (from collection) and new bytes (from file) for media file."""
old: bytes = mediadata(col, file.name)
new: bytes = file.read_bytes()
return MediaBytes(file=file, old=old, new=new)
@curried
@beartype
def addmedia(col: Collection, m: MediaBytes) -> AddedMedia:
"""Add a media file to collection (possibly renaming)."""
return AddedMedia(file=m.file, new_name=col.media.add_file(m.file))
@beartype
def commit_hashes_file(kirepo: KiRepo) -> None:
"""Add and commit hashes file."""
kirepo.repo.index.add(f"{KI}/{HASHES_FILE}")
kirepo.repo.index.commit("Update collection hashes file.")
@beartype
def cleanup(targetdir: Dir, new: bool) -> Union[Dir, EmptyDir, NoPath]:
"""Cleans up after failed clone operations."""
try:
if new:
return F.rmtree(targetdir)
_, dirs, files = F.shallow_walk(targetdir)
do(F.rmtree, dirs)
do(os.remove, files)
except PermissionError as _:
pass
return F.chk(targetdir)
@click.group()
@click.version_option()
@beartype
def ki() -> None:
"""
The universal CLI entry point for `ki`.
Takes no arguments, only has three subcommands (clone, pull, push).
"""
return
@ki.command()
@click.argument("collection")
@click.argument("directory", required=False, default="")
def clone(collection: str, directory: str = "") -> None:
"""Clone an Anki collection into a directory."""
_clone1(collection, directory)
@beartype
def _clone1(collection: str, directory: str = "") -> git.Repo:
"""Execute a clone op."""
col_file: File = M.xfile(Path(collection))
# Write all files to `targetdir`, and instantiate a `KiRepo` object.
targetdir, new = get_target(F.cwd(), col_file, directory)
try:
col = M.collection(col_file)
_, _ = _clone2(col, targetdir, msg="Initial commit", silent=False)
col.close(save=False)
kirepo: KiRepo = M.kirepo(targetdir)
kirepo.repo.create_tag(LCA)
kirepo.repo.close()
gc.collect()
return kirepo.repo
except Exception as err:
cleanup(targetdir, new)
raise err
@beartype
def _clone2(
col: Collection,
targetdir: EmptyDir,
msg: str,
silent: bool,
) -> Tuple[git.Repo, str]:
"""
Clone an Anki collection into a directory.
The caller expects that `targetdir` will be the root of a valid ki
repository after this function is called, so we need to do our repo
initialization with gitpython in here, as opposed to in `clone()`.
Parameters
----------
col : Collection
An anki collection object.
targetdir : pathlib.Path
A path to a directory to clone the collection into.
Note: we check that this directory is empty.
msg : str
Message for initial commit.
silent : bool
Whether to suppress progress information printed to stdout.
Returns
-------
repo : git.Repo
The cloned repository.
branch_name : str
The name of the default branch.
"""
col_file: File = M.xfile(Path(col.path))
kidir, mediadir = M.empty_kirepo(targetdir)
dotki: DotKi = M.dotki(kidir)
md5sum = F.md5(col_file)
echo(f"Cloning into '{targetdir}'...", silent=silent)
(targetdir / GITIGNORE_FILE).write_text(f"{KI}/{BACKUPS_DIR}\n")
(targetdir / GITATTRS_FILE).write_text("*.md linguist-detectable\n")
# Write note files to disk.
write_repository(col, targetdir, dotki, mediadir)
repo, branch = F.init(targetdir)
# Store a checksum of the Anki collection file in the hashes file.
append_md5sum(kidir, col_file.name, md5sum)
F.commitall(repo, msg)
if repo.is_dirty():
raise NonEmptyWorkingTreeError(repo)
return repo, branch
@ki.command()
@beartype
def pull() -> None:
"""Pull changes into the current ki repository from an Anki collection."""
_pull1()
@beartype
def _pull1() -> None:
"""Execute a pull op."""
# Check that we are inside a ki repository, and get the associated collection.
kirepo: KiRepo = M.kirepo(F.cwd())
col = M.collection(kirepo.col_file)
md5sum: str = F.md5(kirepo.col_file)
hashes: List[str] = kirepo.hashes_file.read_text(encoding=UTF8).split("\n")
hashes = list(filter(lambda l: l != "", hashes))
if md5sum in hashes[-1]:
echo("ki pull: up to date.")
col.close(save=False)
return
col = _pull2(kirepo, col)
col.close(save=False)
@beartype
def _pull2(kirepo: KiRepo, col: Collection) -> Collection:
"""
Pull into `kirepo` without checking if we are already up-to-date.
Load the git repository at `anki_remote_root`, force pull (preferring
'theirs', i.e. the new stuff from the sqlite3 database) changes from that
repository (which is cloned straight from the collection, which in general
may have new changes) into `lca_repo`, and then pull `lca_repo` into the
main repository.
We pull in this sequence in order to avoid merge conflicts. Since we first
pull into a snapshot of the repository as it looked when we last pushed to
the database, we know that there cannot be any merge conflicts, because to
git, it just looks like we haven't made any changes since then. Then we
pull the result of that merge into our actual repository. So there could
still be merge conflicts at that point, but they will only be 'genuine'
merge conflicts in some sense, because as a result of using this snapshot
strategy, we give the anki collection the appearance of being a persistent
remote git repo. If we didn't do this, the fact that we did a fresh clone
of the database every time would mean that everything would look like a
merge conflict, because there is no shared history.
Parameters
----------
kirepo : KiRepo
A dataclass representing the Ki repository in the cwd.
Raises
------
CollectionChecksumError
If the Anki collection file was modified while pulling changes. This is
very unlikely, since the caller acquires a lock on the SQLite3
database.
"""
# pylint: disable=too-many-locals
md5sum: str = F.md5(kirepo.col_file)
# Copy `repo` into a temp directory and `reset --hard` at rev of last
# successful `push()`, which is the last common ancestor, or 'LCA'.
head: Rev = M.head(kirepo.repo)
rev: Rev = M.rev(kirepo.repo, sha=kirepo.repo.tag(LCA).commit.hexsha)
lca_repo: git.Repo = cp_repo(rev, f"{LOCAL_SUFFIX}-{md5sum}")
# Clone collection into a temp directory at `anki_remote_root`.
anki_remote_root: EmptyDir = F.mksubdir(F.mkdtemp(), REMOTE_SUFFIX / md5sum)
msg = f"Fetch changes from DB at `{kirepo.col_file}` with md5sum `{md5sum}`"
remote_repo, branch = _clone2(col, anki_remote_root, msg, silent=False)
# Create git remote pointing to `remote_repo`, which represents the current
# state of the Anki SQLite3 database, and pull it into `lca_repo`.
anki_remote = lca_repo.create_remote(REMOTE_NAME, F.gitd(remote_repo))
anki_remote.fetch()
# Handle deleted files, preferring `theirs`.
diffidx = lca_repo.commit("HEAD").diff(lca_repo.commit("FETCH_HEAD"))
dels: Iterable[git.Diff] = diffidx.iter_change_type(DELETED.value)
dels = filter(lambda d: d.a_path != GITMODULES_FILE, dels)
dels = filter(lambda d: F.isfile(F.chk(F.root(lca_repo) / d.a_path)), dels)
a_paths: Iterable[str] = set(map(F.git_rm(lca_repo), map(lambda d: d.a_path, dels)))
if len(a_paths) > 0:
details: str = "".join(map(lambda a: f"Remove '{a}'\n", a_paths))
F.commitall(lca_repo, msg=f"Remove files deleted in remote.\n\n{details}")
remote_root: Dir = F.root(remote_repo)
lca_repo = M.gitcopy(lca_repo, remote_root, unsub=False)
F.commitall(lca_repo, f"Pull changes from repository at `{remote_root}`")
# Create remote pointing to `lca_repo` and pull into `repo`. Note
# that this `git pull` may not always create a merge commit, because a
# fast-forward only updates the branch pointer.
lca_remote = kirepo.repo.create_remote(REMOTE_NAME, lca_repo.git_dir)
kirepo.repo.git.config("pull.rebase", "false")
out = git_pull(REMOTE_NAME, branch, kirepo.root)
echo(out)
kirepo.repo.delete_remote(lca_remote)
# The merge will have overwritten the hashes file with only the collection
# hash from the fresh clone of the remote, so we checkout its state from
# before the merge.
kirepo.repo.git.checkout([head.sha, "--", f"{KI}/{HASHES_FILE}"])
# Raise an error if the collection was modified during pull.
if F.md5(kirepo.col_file) != md5sum:
raise CollectionChecksumError(kirepo.col_file)
# Append the hash of the collection to the hashes file.
if "Aborting" not in out:
append_md5sum(kirepo.ki, kirepo.col_file.name, md5sum)
commit_hashes_file(kirepo)
return col
# PUSH
@ki.command()
@beartype
def push() -> None:
"""Push commits from the currrent ki repository into an Anki collection."""
_push()
@beartype
def _push() -> PushResult:
"""Execute a push op."""
# pylint: disable=too-many-locals
# Check that we are inside a ki repository, and load collection.
kirepo: KiRepo = M.kirepo(F.cwd())
col = M.collection(kirepo.col_file)
md5sum: str = F.md5(kirepo.col_file)
hashes: List[str] = kirepo.hashes_file.read_text(encoding=UTF8).split("\n")
hashes = list(filter(lambda l: l != "", hashes))
if md5sum not in hashes[-1]:
raise UpdatesRejectedError(kirepo.col_file)
head_kirepo: KiRepo = cp_ki(M.head_ki(kirepo), f"{HEAD_SUFFIX}-{md5sum}")
remote_root: EmptyDir = F.mksubdir(F.mkdtemp(), REMOTE_SUFFIX / md5sum)
msg = f"Fetch changes from collection '{kirepo.col_file}' with md5sum '{md5sum}'"
remote_repo, _ = _clone2(col, remote_root, msg, silent=True)
remote_repo = M.gitcopy(remote_repo, head_kirepo.root, unsub=True)
F.commitall(remote_repo, f"Pull changes from repository at `{kirepo.root}`")
parse: Callable[[Delta], DeckNote] = parse_note(*M.parser_and_transformer())
deltas, warnings = F.part(lambda x: isinstance(x, Delta), diff2(remote_repo, parse))
do(warn, warnings)
# If there are no changes, quit.
if len(set(deltas)) == 0:
echo("ki push: up to date.")
col.close(save=False)
return PushResult.UP_TO_DATE
echo(f"Pushing to '{kirepo.col_file}'")
models: Dict[str, Notetype] = get_models_recursively(head_kirepo)
return write_collection(deltas, models, kirepo, parse, head_kirepo, col)
@beartype
def write_collection(
deltas: Iterable[Delta],
models: Dict[str, Notetype],
kirepo: KiRepo,
parse: Callable[[Delta], DeckNote],
head_kirepo: KiRepo,
col: Collection,
) -> PushResult:
"""Push a list of `Delta`s to an Anki collection."""
# pylint: disable=too-many-locals
# Copy collection to a temp directory.
temp_col_dir: Dir = F.mkdtemp()
new_col_file = temp_col_dir / kirepo.col_file.name
col_name: str = kirepo.col_file.name
new_col_file: NoFile = F.chk(temp_col_dir / col_name)
new_col_file: File = F.copyfile(kirepo.col_file, new_col_file)
# Open collection and add new models to root `models.json` file.
tempcol: Collection = M.collection(new_col_file)
do(add_model(tempcol), models.values())
# Stash both unstaged and staged files (including untracked).
head_kirepo.repo.git.stash(include_untracked=True, keep_index=True)
head_kirepo.repo.git.reset("HEAD", hard=True)
# Display table of note change type counts and partition deltas into
# 'deletes' and 'not deletes'.
xs, ys, zs = tee(deltas, 3)
echo_note_change_types(xs)
dels: Iterable[Delta] = filter(lambda d: d.status == DELETED, ys)
deltas: Iterable[Delta] = filter(lambda d: d.status != DELETED, zs)
# Map guid -> (nid, mod, mid).
guids: Dict[str, NoteMetadata] = get_note_metadata(tempcol)
# Parse to-be-deleted notes and remove them from collection.
del_guids: Iterable[str] = map(lambda dd: dd.guid, map(parse, dels))
del_guids = set(filter(lambda g: g in guids, del_guids))
del_nids: Iterable[NoteId] = map(lambda g: guids[g].nid, del_guids)
tempcol.remove_notes(list(del_nids))
# Push changes for all other notes.
guids = {k: v for k, v in guids.items() if k not in del_guids}
timestamp_ns: int = time.time_ns()
new_nids: Iterator[int] = itertools.count(int(timestamp_ns / 1e6))
decknotes: Iterable[DeckNote] = map(parse, deltas)
do(warn, F.cat(map(push_note(tempcol, timestamp_ns, guids, new_nids), decknotes)))
# It is always safe to save changes to the DB, since the DB is a copy.
tempcol.close(save=True)
# Backup collection file and overwrite collection.
backup(kirepo)
F.copyfile(new_col_file, kirepo.col_file)
echo(f"Overwrote '{kirepo.col_file}'")
# Add media files to collection.
media_files = F.rglob(head_kirepo.root, MEDIA_FILE_RECURSIVE_PATTERN)
mbytes: Iterable[MediaBytes] = map(mediabytes(col), media_files)
# Skip media files whose twin in collection has same name and same data.
mbytes = filter(lambda m: m.old == b"" or m.old != m.new, mbytes)
# Add (and possibly rename) media paths.
renames = filter(lambda a: a.file.name != a.new_name, map(addmedia(col), mbytes))
warnings = map(lambda r: RenamedMediaFileWarning(r.file.name, r.new_name), renames)
do(warn, warnings)
col.close(save=True)
# Append and commit collection checksum to hashes file.
append_md5sum(kirepo.ki, kirepo.col_file.name, F.md5(kirepo.col_file))
commit_hashes_file(kirepo)
# Update commit SHA of most recent successful PUSH and unlock SQLite DB.
kirepo.repo.delete_tag(LCA)
kirepo.repo.create_tag(LCA)
return PushResult.NONTRIVIAL
Sub-modules
ki.functional
-
Type-safe, non Anki-specific functions.
ki.maybes
-
Factory functions for safely handling errors in type construction.
ki.transformer
-
A Lark transformer for the ki note grammar.
ki.types
-
Types for ki.
Functions
def add_db_note(col: anki.collection.Collection, nid: int, guid: str, mid: int, mod: int, usn: int, tags: list[str], fields: list[str], sfld: str, csum: int, flags: int, data: str) ‑> anki.notes.Note
-
Add a note to the database directly, with a SQL INSERT.
Expand source code
@beartype def add_db_note( col: Collection, nid: int, guid: str, mid: int, mod: int, usn: int, tags: List[str], fields: List[str], sfld: str, csum: int, flags: int, data: str, ) -> Note: """Add a note to the database directly, with a SQL INSERT.""" importer = NoteImporter(col, "") importer.addNew( [ ( nid, guid, mid, mod, usn, " " + " ".join(tags) + " ", "\x1f".join(fields), sfld, csum, flags, data, ) ] ) # All the `mark_modified` flag does is update `mod`. Since we always set # `mod` to the current timestamp anyway, this doesn't matter, so may as # well set it to `True` to reflect the semantics of the operation we're # performing. This may present issues down the road since newly imported # cards from cloned submodules will be marked modified on import/push, # which is not exactly right. The anki2 importer does *not* mark as # modified, because importing a new note does not modify its content. We # would need to have `mod` data inside the note grammar in order for this # to make sense, which may be more trouble than it's worth. Users writing # new notes as markdown files would have to set the `mod` to some default # value, or leave it blank. Assuming people don't do this nearly as often # as they will export or push notes they've created in Anki, then it might # make sense. col.after_note_updates([nid], mark_modified=True) return col.get_note(nid)
def add_model(col: anki.collection.Collection, model: Notetype) ‑> None
-
Add a model to the database.
Expand source code
@curried @beartype def add_model(col: Collection, model: Notetype) -> None: """Add a model to the database.""" # Check if a model already exists with this name, and get its `mid`. mid: Optional[int] = col.models.id_for_name(model.name) # TODO: This function is unfinished. We need to add new notetypes (and # rename them) only if they are 'new', where new means they are different # from anything else already in the DB, in the content-addressed sense. If # they are new, then we must indicate that the notes we are adding actually # have these new notetypes. For this, it may make sense to use the hash of # the notetype everywhere (i.e. in the note file) rather than the name or # mid. # # If a model already exists with this name, parse it, and check if its hash # is identical to the model we are trying to add. if mid is not None: nt: NotetypeDict = col.models.get(mid) # If we are trying to add a model that has the exact same content and # name as an existing model, skip it. existing: Notetype = M.notetype(nt) if notetype_json(model) == notetype_json(existing): return # If the hashes don't match, then we somehow need to update # `decknote.model` for the relevant notes. warn(NotetypeCollisionWarning(model, existing)) nt_copy: NotetypeDict = copy.deepcopy(model.dict) nt_copy["id"] = 0 changes: OpChangesWithId = col.models.add_dict(nt_copy) nt: NotetypeDict = col.models.get(changes.id) model: Notetype = M.notetype(nt) echo(f"Added model '{model.name}'")
def addmedia(col: anki.collection.Collection, m: MediaBytes) ‑> AddedMedia
-
Add a media file to collection (possibly renaming).
Expand source code
@curried @beartype def addmedia(col: Collection, m: MediaBytes) -> AddedMedia: """Add a media file to collection (possibly renaming).""" return AddedMedia(file=m.file, new_name=col.media.add_file(m.file))
def append_md5sum(dotki: Dir, tag: str, md5sum: str) ‑> None
-
Append an md5sum hash to the hashes file.
Expand source code
@beartype def append_md5sum(dotki: Dir, tag: str, md5sum: str) -> None: """Append an md5sum hash to the hashes file.""" hashes_file = dotki / HASHES_FILE with open(hashes_file, "a+", encoding=UTF8) as hashes_f: hashes_f.write(f"{md5sum} {tag}\n")
def backup(kirepo: KiRepo) ‑> int
-
Backup collection to
.ki/backups
.Expand source code
@beartype def backup(kirepo: KiRepo) -> int: """Backup collection to `.ki/backups`.""" timestamp = datetime.datetime.now().strftime("%Y-%m-%d--%Hh-%Mm-%Ss") md5sum = F.md5(kirepo.col_file) name = f"{timestamp}--{md5sum}.anki2" backup_file = F.chk(kirepo.backups_dir / name) # We assume here that no one would ever make e.g. a directory called # `name`, since `name` contains the md5sum of the collection file, and # thus that is extraordinarily improbable. So the only thing we have to # check for is that we haven't already written a backup file to this # location. if isinstance(backup_file, File): return 1 F.copyfile(kirepo.col_file, F.chk(kirepo.backups_dir / name)) return 0
def check_fields_health(note: anki.notes.Note) ‑> list[Warning]
-
Construct warnings when Anki's fields health check fails.
Expand source code
@beartype def check_fields_health(note: Note) -> List[Warning]: """Construct warnings when Anki's fields health check fails.""" health = note.fields_check() if health == 1: return [EmptyNoteWarning(note, health)] if health == 2: return [DuplicateNoteWarning(note, health, html_to_screen(note.fields[0]))] if health != 0: return [UnhealthyNoteWarning(note, health)] return []
def cleanup(targetdir: Dir, new: bool) ‑> Union[Dir, EmptyDir, NoPath]
-
Cleans up after failed clone operations.
Expand source code
@beartype def cleanup(targetdir: Dir, new: bool) -> Union[Dir, EmptyDir, NoPath]: """Cleans up after failed clone operations.""" try: if new: return F.rmtree(targetdir) _, dirs, files = F.shallow_walk(targetdir) do(F.rmtree, dirs) do(os.remove, files) except PermissionError as _: pass return F.chk(targetdir)
def commit_hashes_file(kirepo: KiRepo) ‑> None
-
Add and commit hashes file.
Expand source code
@beartype def commit_hashes_file(kirepo: KiRepo) -> None: """Add and commit hashes file.""" kirepo.repo.index.add(f"{KI}/{HASHES_FILE}") kirepo.repo.index.commit("Update collection hashes file.")
def copy_media_files(col: anki.collection.Collection, media_target_dir: EmptyDir) ‑> dict[int, set[File]]
-
Get a list of extant media files used in notes and notetypes, copy those media files to the top-level
_media/
directory in the repository root, and return a map sending note ids to sets of copied media files.Adapted from code in
anki/pylib/anki/exporting.py
. Specifically, theAnkiExporter.exportInto()
function.Parameters
col
- Anki collection.
media_target_dir
- Where media files are to be copied to.
Expand source code
@beartype def copy_media_files( col: Collection, media_target_dir: EmptyDir, ) -> Dict[int, Set[File]]: """ Get a list of extant media files used in notes and notetypes, copy those media files to the top-level `_media/` directory in the repository root, and return a map sending note ids to sets of copied media files. Adapted from code in `anki/pylib/anki/exporting.py`. Specifically, the `AnkiExporter.exportInto()` function. Parameters ---------- col Anki collection. media_target_dir Where media files are to be copied to. """ # All note ids as a string for the SQL query. strnids = ids2str(list(col.find_notes(query=""))) # This is the path to the media directory. In the original implementation # of `AnkiExporter.exportInto()`, there is check made of the form # # if self.mediaDir: # # before doing path manipulation with this string. # # Examining the `__init__()` function of `MediaManager`, we can see that # `col.media.dir()` will only be `None` in the case where `server=True` is # passed to the `Collection` constructor. But since we do the construction # within ki, we have a guarantee that this will never be true, and thus we # can assume it is a nonempty string, which is all we need for the # following code to be safe. media_dir = F.chk(Path(col.media.dir())) if not isinstance(media_dir, Dir): raise MissingMediaDirectoryError(col.path, media_dir) # Find media files that appear in note fields and copy them to the target. query: str = "select * from notes where id in " + strnids rows: List[NoteDBRow] = [NoteDBRow(*row) for row in col.db.all(query)] rows = TQ(rows, "Media") copy_fn = copy_note_media(col, media_dir, media_target_dir) media = {row.nid: copy_fn(row) for row in rows} mids = col.db.list("select distinct mid from notes where id in " + strnids) # Copy notetype template media files. _, _, files = F.shallow_walk(media_dir) paths: Iterable[Path] = map(lambda f: Path(f.name), files) paths = set(filter(lambda f: str(f).startswith("_"), paths)) models = filter(lambda m: int(m["id"]) in mids, col.models.all()) mediasets = map(copy_notetype_media(media_dir, media_target_dir, paths), models) media[NOTETYPE_NID] = reduce(lambda x, y: x.union(y), mediasets, set()) return media
def copy_note_media(col: anki.collection.Collection, src: Dir, tgt: Dir, row: NoteDBRow) ‑> frozenset[File]
-
Copy a single note's media files and return the copies as a set. We do this by first filtering for only 'rootfiles', i.e. excluding media files in subdirectories of the media directory. Then we take only those which exist, i.e. typecheck as
File
. Then we construct the source and destination paths, and finally actually perform the copy op, returning the result.Note that
src
is the media directory where the files originate, andtgt
is the media directory we're copying to.Expand source code
@curried @beartype def copy_note_media( col: Collection, src: Dir, tgt: Dir, row: NoteDBRow ) -> FrozenSet[File]: """ Copy a single note's media files and return the copies as a set. We do this by first filtering for only 'rootfiles', i.e. excluding media files in subdirectories of the media directory. Then we take only those which exist, i.e. typecheck as `File`. Then we construct the source and destination paths, and finally actually perform the copy op, returning the result. Note that `src` is the media directory where the files originate, and `tgt` is the media directory we're copying to. """ files: Iterable[str] = media_filenames_in_field(col, row.flds) rootfiles = filter(lambda f: f == os.path.basename(f), files) medias: Iterable[File] = filter(F.isfile, map(lambda f: F.chk(src / f), rootfiles)) srcdsts = map(lambda file: (file, F.chk(tgt / file.name)), medias) return frozenset(starmap(F.copyfile, srcdsts))
def copy_notetype_media(src: Dir, tgt: Dir, paths: set[pathlib.Path], m: dict[str, typing.Any]) ‑> frozenset[File]
-
Copy media from notetype
m
from source to target, returning set of copies.Expand source code
@curried @beartype def copy_notetype_media( src: Dir, tgt: Dir, paths: Set[Path], m: NotetypeDict ) -> FrozenSet[File]: """Copy media from notetype `m` from source to target, returning set of copies.""" matches: Iterable[Path] = filter(lambda p: hasmedia(m, str(p)), paths) medias = filter(F.isfile, map(lambda p: F.chk(src / p), matches)) srcdsts = map(lambda f: (f, F.chk(tgt / f.name)), medias) return frozenset(starmap(F.copyfile, srcdsts))
def cp_ki(ki_rev: KiRev, suffix: str) ‑> KiRepo
-
Given a KiRev, i.e. a pair of the form (kirepo, SHA), we clone
kirepo.repo
into a temp directory and hard reset to the given commit hash. Copies the .ki/ directory fromki_rev.kirepo
without making any changes.Parameters
ki_rev
:KiRev
- The ki repository to clone, and a commit for it.
suffix
:str
- /tmp/…/ path suffix, e.g.
ki/local/
.
Returns
KiRepo
- The copied ki repository.
Expand source code
@beartype def cp_ki(ki_rev: KiRev, suffix: str) -> KiRepo: """ Given a KiRev, i.e. a pair of the form (kirepo, SHA), we clone `kirepo.repo` into a temp directory and hard reset to the given commit hash. Copies the .ki/ directory from `ki_rev.kirepo` without making any changes. Parameters ---------- ki_rev : KiRev The ki repository to clone, and a commit for it. suffix : str /tmp/.../ path suffix, e.g. `ki/local/`. Returns ------- KiRepo The copied ki repository. """ rev: Rev = F.ki_rev_to_rev(ki_rev) print(F.root(rev.repo)) ephem: git.Repo = cp_repo(rev, suffix) F.force_mkdir(F.root(ephem) / KI / BACKUPS_DIR) kirepo: KiRepo = M.kirepo(F.root(ephem)) return kirepo
def cp_repo(rev: Rev, suffix: str) ‑> git.repo.base.Repo
-
Get a temporary copy of a git repository in /tmp/
/. Expand source code
@beartype def cp_repo(rev: Rev, suffix: str) -> git.Repo: """Get a temporary copy of a git repository in /tmp/<suffix>/.""" # Copy the entire repo into a temp directory ending in `../suffix/`. target: NoFile = F.chk(F.mkdtemp() / suffix) ephem = git.Repo(F.copytree(F.root(rev.repo), target)) # Do a reset --hard to the given SHA. ephem.git.reset(rev.sha, hard=True) return ephem
def diff2(repo: git.repo.base.Repo, parse: collections.abc.Callable[[Delta], DeckNote]) ‑> collections.abc.Iterable[typing.Union[Delta, Warning]]
-
Diff
repo
fromHEAD~1
toHEAD
.Expand source code
@beartype def diff2( repo: git.Repo, parse: Callable[[Delta], DeckNote], ) -> Iterable[Union[Delta, Warning]]: """Diff `repo` from `HEAD~1` to `HEAD`.""" # We diff from A~B. head1: Rev = M.rev(repo, repo.commit("HEAD~1").hexsha) uuid = hex(random.randrange(16**4))[2:] head1_repo = cp_repo(head1, suffix=f"HEAD~1-{uuid}") a_root, b_root = F.root(head1_repo), F.root(repo) diffidx = repo.commit("HEAD~1").diff(repo.commit("HEAD")) # Get the diffs for each change type (e.g. 'DELETED'). return chain(*map(mungediff(parse, a_root, b_root), diffidx))
def do(f: collections.abc.Callable[[typing.Any], typing.Any], xs: collections.abc.Iterable[typing.Any]) ‑> None
-
Perform some action on an iterable.
Expand source code
@beartype def do(f: Callable[[Any], Any], xs: Iterable[Any]) -> None: """Perform some action on an iterable.""" list(map(f, xs))
def echo(string: str, silent: bool = False) ‑> None
-
Call
click.secho()
with formatting.Expand source code
@beartype def echo(string: str, silent: bool = False) -> None: """Call `click.secho()` with formatting.""" if not silent: click.secho(string, bold=True)
def echo_note_change_types(deltas: collections.abc.Iterable[Delta]) ‑> None
-
Write a table of git change types for notes to stdout.
Expand source code
@beartype def echo_note_change_types(deltas: Iterable[Delta]) -> None: """Write a table of git change types for notes to stdout.""" # pylint: disable=too-many-locals is_change_type = lambda t: lambda d: d.status == t vs, ws, xs, ys, zs = tee(deltas, 5) adds = list(filter(is_change_type(ADDED), vs)) deletes = list(filter(is_change_type(DELETED), ws)) renames = list(filter(is_change_type(RENAMED), xs)) modifies = list(filter(is_change_type(MODIFIED), ys)) types = list(filter(is_change_type(TYPECHANGED), zs)) LPAD, RPAD = 15, 9 add_info: str = "ADD".ljust(LPAD) + str(len(adds)).rjust(RPAD) delete_info: str = "DELETE".ljust(LPAD) + str(len(deletes)).rjust(RPAD) modification_info: str = "MODIFY".ljust(LPAD) + str(len(modifies)).rjust(RPAD) rename_info: str = "RENAME".ljust(LPAD) + str(len(renames)).rjust(RPAD) type_info: str = "TYPE CHANGE".ljust(LPAD) + str(len(types)).rjust(RPAD) echo("=" * (LPAD + RPAD)) echo("Note change types") echo("-" * (LPAD + RPAD)) echo(f"{add_info}\n{delete_info}\n{modification_info}\n{rename_info}\n{type_info}") echo("=" * (LPAD + RPAD))
def get_field_note_id(nid: int, fieldname: str) ‑> str
-
A str ID that uniquely identifies field-note pairs.
Expand source code
@beartype def get_field_note_id(nid: int, fieldname: str) -> str: """A str ID that uniquely identifies field-note pairs.""" return f"{nid}{F.slugify(fieldname)}"
def get_field_payload(col: anki.collection.Collection, name: str, content: str) ‑> list[str]
-
Get the lines of a markdown snippet for some Anki note field.
Expand source code
@curried @beartype def get_field_payload(col: Collection, name: str, content: str) -> List[str]: """Get the lines of a markdown snippet for some Anki note field.""" text = col.media.escape_media_filenames(html_to_screen(content), unescape=True) return [f"## {name}", text, ""]
def get_guid(fields: list[str]) ‑> str
-
Construct a new GUID for a note. Adapted from genanki's
guid_for()
.Expand source code
@beartype def get_guid(fields: List[str]) -> str: """Construct a new GUID for a note. Adapted from genanki's `guid_for()`.""" # Get the first 8 bytes of the SHA256 of `contents` as an int. m = hashlib.sha256() m.update("__".join(fields).encode("utf-8")) x = reduce(lambda h, b: (h << 8) + b, m.digest()[:8], 0) # convert to the weird base91 format that Anki uses chars = [] while x > 0: chars.append(BASE91_TABLE[x % len(BASE91_TABLE)]) x //= len(BASE91_TABLE) return "".join(reversed(chars))
def get_header_lines(colnote) ‑> list[str]
-
Get header of markdown representation of note.
Expand source code
@beartype def get_header_lines(colnote) -> List[str]: """Get header of markdown representation of note.""" lines = [ "# Note", "```", f"guid: {colnote.n.guid}", f"notetype: {colnote.notetype.name}", "```", "", "### Tags", "```", ] lines += colnote.n.tags lines += ["```", ""] return lines
def get_models_recursively(kirepo: KiRepo) ‑> dict[str, Notetype]
-
Find and merge all
models.json
files recursively. Returns a dictionary sending model names to Notetypes.Expand source code
@beartype def get_models_recursively(kirepo: KiRepo) -> Dict[str, Notetype]: """ Find and merge all `models.json` files recursively. Returns a dictionary sending model names to Notetypes. """ @beartype def load(file: File) -> Iterable[Notetype]: """Load a models file.""" with open(file, "r", encoding=UTF8) as f: return map(M.notetype, json.load(f).values()) notetypes = F.cat(map(load, F.rglob(kirepo.root, MODELS_FILE))) return {notetype.name: notetype for notetype in notetypes}
def get_note_metadata(col: anki.collection.Collection) ‑> dict[str, NoteMetadata]
-
Construct a map from guid -> (nid, mod, mid), adapted from
Anki2Importer._import_notes()
. Note thatmod
is the modification timestamp, in epoch seconds (timestamp of when the note was last modified).Expand source code
@beartype def get_note_metadata(col: Collection) -> Dict[str, NoteMetadata]: """ Construct a map from guid -> (nid, mod, mid), adapted from `Anki2Importer._import_notes()`. Note that `mod` is the modification timestamp, in epoch seconds (timestamp of when the note was last modified). """ guids: Dict[str, NoteMetadata] = {} for nid, guid, mod, mid in col.db.execute("select id, guid, mod, mid from notes"): guids[guid] = NoteMetadata(nid, mod, mid) return guids
def get_note_path(colnote: ColNote, deck_dir: Dir, card_name: str = '') ‑> NoFile
-
Get note path from sort field text.
Expand source code
@beartype def get_note_path(colnote: ColNote, deck_dir: Dir, card_name: str = "") -> NoFile: """Get note path from sort field text.""" field_text = colnote.sfld # Construct filename, stripping HTML tags and sanitizing (quickly). field_text = plain_to_html(field_text) field_text = re.sub("<[^<]+?>", "", field_text) # If the HTML stripping removed all text, we just slugify the raw sort # field text. if len(field_text) == 0: field_text = colnote.sfld name = field_text[:MAX_FILENAME_LEN] slug = F.slugify(name) # If the slug is still empty, use all the fields. if len(slug) == 0: contents = " ".join(colnote.n.values()) name = contents[:MAX_FILENAME_LEN] slug = F.slugify(name) # Make it so `slug` cannot possibly be an empty string, because then we get # a `Path('.')` which is a bug, and causes a runtime exception. If all else # fails, use the notetype name, hash of the payload, and creation date. if len(slug) == 0: guidhex = colnote.n.guid.encode(UTF8).hex() slug: str = f"{colnote.notetype.name}--{guidhex}" # Note IDs are in milliseconds. dt = datetime.datetime.fromtimestamp(colnote.n.id / 1000.0) slug += "--" + dt.strftime("%Y-%m-%d--%Hh-%Mm-%Ss") F.yellow(f"Slug for note with guid '{colnote.n.guid}' is empty...") F.yellow(f"Using hex representation of guid in filename: '{slug}'") if card_name != "": slug = f"{slug}_{card_name}" filename: str = f"{slug}{MD}" note_path = F.chk(deck_dir / filename, resolve=False) i = 1 while not isinstance(note_path, NoFile): filename = f"{slug}_{i}{MD}" note_path = F.chk(deck_dir / filename, resolve=False) i += 1 return note_path
def get_note_payload(colnote: ColNote) ‑> str
-
Return the markdown-converted contents of the Anki note represented by
colnote
as a string.A
ColNote
is a dataclass wrapper around aNote
object which has been loaded from the DB.Expand source code
@beartype def get_note_payload(colnote: ColNote) -> str: """ Return the markdown-converted contents of the Anki note represented by `colnote` as a string. A `ColNote` is a dataclass wrapper around a `Note` object which has been loaded from the DB. """ lines = get_header_lines(colnote) lines += F.cat(starmap(get_field_payload(colnote.n.col), colnote.n.items())) return "\n".join(lines)
def get_target(cwd: Dir, col_file: File, directory: str) ‑> tuple[EmptyDir, bool]
-
Create default target directory.
Expand source code
@beartype def get_target(cwd: Dir, col_file: File, directory: str) -> Tuple[EmptyDir, bool]: """Create default target directory.""" path = F.chk(Path(directory) if directory != "" else cwd / col_file.stem) new: bool = True if isinstance(path, NoPath): path.mkdir(parents=True) return M.emptydir(path), new if isinstance(path, EmptyDir): new = False return path, new raise TargetExistsError(path)
def git_pull(remote: str, branch: str, cwd: Dir) ‑> str
-
Pull remote into branch using a subprocess call.
Expand source code
@beartype def git_pull(remote: str, branch: str, cwd: Dir) -> str: """Pull remote into branch using a subprocess call.""" args = ["git", "pull", "-v", remote, branch] p = subprocess.run(args, check=False, cwd=cwd, capture_output=True) return f"{p.stdout.decode()}\n{p.stderr.decode()}"
def hasmedia(model: dict[str, typing.Any], fname: str) ‑> bool
-
Check if a notetype has media.
Adapted from
anki.exporting.AnkiExporter._modelHasMedia()
, which is an instance method, but does not make any use ofself
, and so could be a staticmethod. It is a pure function.Expand source code
@beartype def hasmedia(model: NotetypeDict, fname: str) -> bool: """ Check if a notetype has media. Adapted from `anki.exporting.AnkiExporter._modelHasMedia()`, which is an instance method, but does not make any use of `self`, and so could be a staticmethod. It is a pure function. """ # First check the styling. if fname in model["css"]: return True # If no reference to fname then check the templates as well. return any(map(lambda t: fname in t["qfmt"] or fname in t["afmt"], model["tmpls"]))
def html_to_screen(html: str) ‑> str
-
Convert html for a single field into plaintext, to be displayed within a markdown file.
Does very litle (just converts HTML-escaped special characters like
<br>
tags or
s to their UTF-8 equivalents).Expand source code
@beartype def html_to_screen(html: str) -> str: """ Convert html for a *single field* into plaintext, to be displayed within a markdown file. Does very litle (just converts HTML-escaped special characters like `<br>` tags or ` `s to their UTF-8 equivalents). """ html = re.sub(r"\<style\>.*\<\/style\>", "", html, flags=re.S) plain = html # For convenience: Un-escape some common LaTeX constructs. plain = plain.replace(r"\\\\", r"\\") plain = plain.replace(r"\\{", r"\{") plain = plain.replace(r"\\}", r"\}") plain = plain.replace(r"\*}", r"*}") plain = plain.replace(r"<", "<") plain = plain.replace(r">", ">") plain = plain.replace(r"&", "&") plain = plain.replace(r" ", " ") plain = plain.replace("<br>", "\n") plain = plain.replace("<br/>", "\n") plain = plain.replace("<br />", "\n") # Unbreak lines within src attributes. plain = re.sub('src= ?\n"', 'src="', plain) plain = re.sub(r"\<b\>\s*\<\/b\>", "", plain) return plain
def is_anki_note(path: File) ‑> bool
-
Check if file is a
ki
-style markdown note.Expand source code
@beartype def is_anki_note(path: File) -> bool: """Check if file is a `ki`-style markdown note.""" # Ought to have markdown file extension. if path.suffix != ".md": return False with open(path, "r", encoding=UTF8) as md_f: lines = md_f.readlines() if len(lines) < 8: return False if lines[0] != "# Note\n": return False if lines[1] != "```\n": return False if not re.match(r"^guid: ", lines[2]): return False return True
def is_ignorable(root: Dir, path: pathlib.Path) ‑> bool
-
Filter out paths in a git repository diff that do not correspond to Anki notes.
We could do this purely using calls to
is_anki_note()
, but these are expensive, so we try to find matches without opening any files first.Expand source code
@beartype def is_ignorable(root: Dir, path: Path) -> bool: """ Filter out paths in a git repository diff that do not correspond to Anki notes. We could do this purely using calls to `is_anki_note()`, but these are expensive, so we try to find matches without opening any files first. """ # Ignore if `path` is an exact match for any of the patterns Since the # contents of a git repository diff are always going to be files, this # alone will not correctly ignore directory names given in `patterns`. # # If any of the patterns in `dirnames` resolve to one of the parents of # `path`, return a warning, so that we are able to filter out entire # directories. filenames, dirnames = IGNORE_FILES, IGNORE_DIRS if path.name in filenames | dirnames or len(set(path.parts) & dirnames) > 0: return True # If `path` is an extant file (not a directory) and *not* a note, ignore it. file = F.chk(root / path) if isinstance(file, File) and not is_anki_note(file): return True return False
def localmedia(s: str, regex: str) ‑> collections.abc.Iterable[str]
-
Return local media filenames matching the given regex pattern.
Expand source code
@curried @beartype def localmedia(s: str, regex: str) -> Iterable[str]: """Return local media filenames matching the given regex pattern.""" fnames = map(lambda m: m.group("fname"), re.finditer(regex, s)) fnames = map(lambda s: s.strip(), fnames) return filter(lambda x: not re.match(URLS, x.lower()), fnames)
def lock(col_file: File) ‑> sqlite3.Connection
-
Check that lock can be acquired on a SQLite3 database given a path.
Expand source code
@beartype def lock(col_file: File) -> sqlite3.Connection: """Check that lock can be acquired on a SQLite3 database given a path.""" try: con = sqlite3.connect(col_file, timeout=0.1) con.isolation_level = "EXCLUSIVE" con.execute("BEGIN EXCLUSIVE") except sqlite3.DatabaseError as err: raise SQLiteLockError(col_file, err) from err if sys.platform == "win32": con.commit() con.close() return con
def media_filenames_in_field(col: anki.collection.Collection, s: str) ‑> collections.abc.Iterable[str]
-
A copy of
MediaManager.files_in_str()
, but without LaTeX rendering.Expand source code
@beartype def media_filenames_in_field(col: Collection, s: str) -> Iterable[str]: """A copy of `MediaManager.files_in_str()`, but without LaTeX rendering.""" s = (s.strip()).replace('"', "") return F.cat(map(localmedia(s), col.media.regexps))
def mediabytes(col: anki.collection.Collection, file: File) ‑> MediaBytes
-
Get old bytes (from collection) and new bytes (from file) for media file.
Expand source code
@curried @beartype def mediabytes(col: Collection, file: File) -> MediaBytes: """Get old bytes (from collection) and new bytes (from file) for media file.""" old: bytes = mediadata(col, file.name) new: bytes = file.read_bytes() return MediaBytes(file=file, old=old, new=new)
def mediadata(col: anki.collection.Collection, fname: str) ‑> bytes
-
Get media file content as bytes (empty if missing).
Expand source code
@beartype def mediadata(col: Collection, fname: str) -> bytes: """Get media file content as bytes (empty if missing).""" if not col.media.have(fname): return b"" path = os.path.join(col.media.dir(), fname) try: with open(path, "rb") as f: return f.read() except OSError: return b""
def mklink(targetd: Dir, colnote: ColNote, deckd: Dir, card: anki.cards.Card, file: File) ‑> None
-
Return a windows link for a card if one is necessary.
Expand source code
@beartype def mklink(targetd: Dir, colnote: ColNote, deckd: Dir, card: Card, file: File) -> None: """Return a windows link for a card if one is necessary.""" note_path: NoFile = get_note_path(colnote, deckd, card.template()["name"]) M.link(targetd, PlannedLink(link=note_path, tgt=file))
def mungediff(parse: collections.abc.Callable[[Delta], DeckNote], a_root: Dir, b_root: Dir, d: git.diff.Diff) ‑> collections.abc.Iterable[typing.Union[Delta, Warning]]
-
Extract deltas and warnings from a collection of diffs.
Expand source code
@curried @beartype def mungediff( parse: Callable[[Delta], DeckNote], a_root: Dir, b_root: Dir, d: git.Diff ) -> Iterable[Union[Delta, Warning]]: """Extract deltas and warnings from a collection of diffs.""" a, b = d.a_path, d.b_path a, b = a if a else b, b if b else a if is_ignorable(a_root, Path(a)) or is_ignorable(b_root, Path(b)): return [] # Get absolute and relative paths to 'a' and 'b'. AB = namedtuple("AB", "a b") files = AB(F.chk(a_root / a), F.chk(b_root / b)) rels = AB(Path(a), Path(b)) if d.change_type == DELETED.value: if not F.isfile(files.a): return [DeletedFileNotFoundWarning(rels.a)] return [Delta(GitChangeType.DELETED, files.a, rels.a)] if not F.isfile(files.b): return [DiffTargetFileNotFoundWarning(rels.b)] if d.change_type == RENAMED.value: a_delta = Delta(GitChangeType.DELETED, files.a, rels.a) b_delta = Delta(GitChangeType.ADDED, files.b, rels.b) a_decknote, b_decknote = parse(a_delta), parse(b_delta) if a_decknote.guid != b_decknote.guid: return [a_delta, b_delta] return [Delta(GitChangeType(d.change_type), files.b, rels.b)]
def parentmap(root: Union[Root, Deck]) ‑> dict[str, typing.Union[Root, Deck]]
-
Map deck fullnames to parent
Deck
s.Expand source code
@beartype def parentmap(root: Union[Root, Deck]) -> Dict[str, Union[Root, Deck]]: """Map deck fullnames to parent `Deck`s.""" parents = {child.fullname: root for child in root.children} return parents | reduce(lambda x, y: x | y, map(parentmap, root.children), {})
def parse_note(parser: lark.lark.Lark, transformer: NoteTransformer, delta: Delta) ‑> DeckNote
-
Parse with lark.
Expand source code
@curried @beartype def parse_note(parser: Lark, transformer: NoteTransformer, delta: Delta) -> DeckNote: """Parse with lark.""" tree = parser.parse(delta.path.read_text(encoding=UTF8)) flatnote: FlatNote = transformer.transform(tree) parts: Tuple[str, ...] = delta.relpath.parent.parts deck: str = "::".join(parts) # Generate a GUID from the hash of the field contents if the `guid` field # in the note file was left blank. fields = list(flatnote.fields.values()) guid = flatnote.guid if flatnote.guid != "" else get_guid(fields) return DeckNote( title=flatnote.title, guid=guid, deck=deck, model=flatnote.model, tags=flatnote.tags, fields=flatnote.fields, )
def plain_to_html(plain: str) ‑> str
-
Convert plain text to html
Expand source code
@beartype def plain_to_html(plain: str) -> str: """Convert plain text to html""" # Minor clean up plain = plain.replace(r"<", "<") plain = plain.replace(r">", ">") plain = plain.replace(r"&", "&") plain = plain.replace(r" ", " ") plain = re.sub(r"\<b\>\s*\<\/b\>", "", plain) plain = re.sub(r"\<i\>\s*\<\/i\>", "", plain) plain = re.sub(r"\<div\>\s*\<\/div\>", "", plain) # Convert newlines to `<br>` tags. if not re.search(HTML_REGEX, plain): plain = plain.replace("\n", "<br>") return plain.strip()
def planned_link(parents: dict[str, typing.Union[Root, Deck]], deck: Deck, media_file: File) ‑> Optional[PlannedLink]
-
Get the target of the to-be-created media symlink.
Expand source code
@curried @beartype def planned_link( parents: Dict[str, Union[Root, Deck]], deck: Deck, media_file: File ) -> Optional[PlannedLink]: """Get the target of the to-be-created media symlink.""" link: Path = F.chk(deck.mediad / media_file.name, resolve=False) if not isinstance(link, NoFile): return None parent: Union[Root, Deck] = parents[deck.fullname] if isinstance(parent, Root): tgt = media_file else: tgt = F.chk(parent.mediad / media_file.name, resolve=False) return PlannedLink(link=link, tgt=tgt)
def postorder(node: Union[Root, Deck]) ‑> list[Deck]
-
Post-order traversal. Guarantees that we won't process a node until we've processed all its children.
Expand source code
@beartype def postorder(node: Union[Root, Deck]) -> List[Deck]: """ Post-order traversal. Guarantees that we won't process a node until we've processed all its children. """ descendants: List[Deck] = reduce(lambda xs, x: xs + postorder(x), node.children, []) return descendants if isinstance(node, Root) else descendants + [node]
def preorder(node: Union[Root, Deck]) ‑> list[Deck]
-
Pre-order traversal. Guarantees that we won't process a node until we've processed all its ancestors.
Expand source code
@beartype def preorder(node: Union[Root, Deck]) -> List[Deck]: """ Pre-order traversal. Guarantees that we won't process a node until we've processed all its ancestors. """ descendants: List[Deck] = reduce(lambda xs, x: xs + preorder(x), node.children, []) return descendants if isinstance(node, Root) else [node] + descendants
def push_note(col: anki.collection.Collection, timestamp_ns: int, guids: dict[str, NoteMetadata], new_nids: collections.abc.Iterator[int], decknote: DeckNote) ‑> collections.abc.Iterable[Warning]
-
Update the Anki
Note
object incol
corresponding todecknote
, creating it if it does not already exist.Raises
MissingNotetypeError
- If we can't find a notetype with the name provided in
decknote
.
Expand source code
@curried @beartype def push_note( col: Collection, timestamp_ns: int, guids: Dict[str, NoteMetadata], new_nids: Iterator[int], decknote: DeckNote, ) -> Iterable[Warning]: """ Update the Anki `Note` object in `col` corresponding to `decknote`, creating it if it does not already exist. Raises ------ MissingNotetypeError If we can't find a notetype with the name provided in `decknote`. """ # Notetype/model names are privileged in Anki, so if we don't find the # right name, we raise an error. model_id: Optional[int] = col.models.id_for_name(decknote.model) if model_id is None: raise MissingNotetypeError(decknote.model) new_notetype: Notetype = M.notetype(col.models.get(model_id)) if decknote.guid in guids: nid: int = guids[decknote.guid].nid try: note: Note = col.get_note(nid) except NotFoundError as err: print(f"{nid = }") print(f"{decknote.guid = }") raise err else: nid: int = next(new_nids) note: Note = add_db_note( col, nid, decknote.guid, model_id, mod=int(timestamp_ns // 1e9), usn=-1, tags=decknote.tags, fields=list(decknote.fields.values()), sfld=decknote.fields[new_notetype.sortf.name], csum=0, flags=0, data="", ) # If we are updating an existing note, we need to know the old and new # notetypes, and then update the notetype (and the rest of the note data) # accordingly. old_notetype: Notetype = M.notetype(note.note_type()) return update_note(note, decknote, old_notetype, new_notetype)
def stardo(f: collections.abc.Callable[[typing.Any], typing.Any], xs: collections.abc.Iterable[typing.Any]) ‑> None
-
Perform some action on an iterable of tuples, unpacking arguments.
Expand source code
@beartype def stardo(f: Callable[[Any], Any], xs: Iterable[Any]) -> None: """Perform some action on an iterable of tuples, unpacking arguments.""" list(starmap(f, xs))
def symlink_deck_media(col: anki.collection.Collection, targetd: Dir, media: dict[int, set[File]], parents: dict[str, typing.Union[Root, Deck]], deck: Deck) ‑> None
-
Create chained symlinks for a single deck.
Expand source code
@curried @beartype def symlink_deck_media( col: Collection, targetd: Dir, media: Dict[int, Set[File]], parents: Dict[str, Union[Root, Deck]], deck: Deck, ) -> None: """Create chained symlinks for a single deck.""" # Get nids for all descendant notes with media. descendants: List[CardId] = col.decks.cids(did=deck.did, children=True) cards: Iterable[Card] = map(col.get_card, descendants) nids: Set[NoteId] = {NOTETYPE_NID} | set(map(lambda c: c.nid, cards)) # Get link path and target for each media file, and create the links. files = F.cat(map(lambda nid: media[nid], filter(lambda nid: nid in media, nids))) plinks = filter(None, map(planned_link(parents, deck), files)) do(M.link(targetd), plinks)
def symlink_media(col: anki.collection.Collection, root: Root, targetd: Dir, media: dict[int, set[File]]) ‑> None
-
Chain symlinks up the deck tree into top-level
<collection>/_media/
.Expand source code
@beartype def symlink_media( col: Collection, root: Root, targetd: Dir, media: Dict[int, Set[File]], ) -> None: """Chain symlinks up the deck tree into top-level `<collection>/_media/`.""" decks: List[Deck] = preorder(root) parents: Dict[str, Union[Root, Deck]] = parentmap(root) return do(symlink_deck_media(col, targetd, media, parents), decks)
def unlock(con: sqlite3.Connection) ‑> None
-
Unlock a SQLite3 database.
Expand source code
@beartype def unlock(con: sqlite3.Connection) -> None: """Unlock a SQLite3 database.""" if sys.platform == "win32": return con.commit() con.close()
def update_field(decknote: DeckNote, note: anki.notes.Note, key: str, field: str) ‑> None
-
Update a field contained in
note
.Expand source code
@curried @beartype def update_field(decknote: DeckNote, note: Note, key: str, field: str) -> None: """Update a field contained in `note`.""" try: note[key] = plain_to_html(field) except IndexError as err: raise AnkiDBNoteMissingFieldsError(decknote, note.id, key) from err
def update_note(note: anki.notes.Note, decknote: DeckNote, old_notetype: Notetype, new_notetype: Notetype) ‑> collections.abc.Iterable[Warning]
-
Change all the data of
note
to that given indecknote
.This is only to be called on notes whose nid already exists in the database. Creates a new deck if
decknote.deck
doesn't exist. Assumes that the model has already been added to the collection, and raises an exception if it finds otherwise. Changes notetype to that specified bydecknote.model
. Overwrites all fields withdecknote.fields
.Updates: - tags - deck - model - fields
Expand source code
@beartype def update_note( note: Note, decknote: DeckNote, old_notetype: Notetype, new_notetype: Notetype ) -> Iterable[Warning]: """ Change all the data of `note` to that given in `decknote`. This is only to be called on notes whose nid already exists in the database. Creates a new deck if `decknote.deck` doesn't exist. Assumes that the model has already been added to the collection, and raises an exception if it finds otherwise. Changes notetype to that specified by `decknote.model`. Overwrites all fields with `decknote.fields`. Updates: - tags - deck - model - fields """ # Check that the passed argument `new_notetype` has a name consistent with # the model specified in `decknote`. The former should be derived from the # latter, and if they don't match, there is a bug in the caller. if decknote.model != new_notetype.name: raise NotetypeMismatchError(decknote, new_notetype) nid = note.id note.tags = decknote.tags note.flush() # Set the deck of the given note, as well as all its cards, and create a # deck with this name if it doesn't already exist. See the # comments/docstrings in the implementation of the # `anki.decks.DeckManager.id()` method. newdid: int = note.col.decks.id(decknote.deck, create=True) cids = [c.id for c in note.cards()] if cids: note.col.set_deck(cids, newdid) # Set notetype (also clears all fields). if old_notetype.id != new_notetype.id: fmap = {field.ord: None for field in old_notetype.flds} note.col.models.change(old_notetype.dict, [nid], new_notetype.dict, fmap, None) note.load() # Validate field keys against notetype. warnings: List[Warning] = validate_decknote_fields(new_notetype, decknote) if len(warnings) > 0: return warnings # Set field values and flush to collection database. This is correct # because every field name that appears in `new_notetype` is contained in # `decknote.fields`, or else we would have printed a warning and returned # above. missing = {key for key in decknote.fields if key not in note} warnings = map(lambda k: NoteFieldValidationWarning(nid, k, new_notetype), missing) fields = [(key, field) for key, field in decknote.fields.items() if key in note] stardo(update_field(decknote, note), fields) note.flush() # Remove if unhealthy. fwarns: List[Warning] = check_fields_health(note) if len(fwarns) > 0: note.col.remove_notes([nid]) return chain(warnings, fwarns)
def validate_decknote_fields(notetype: Notetype, decknote: DeckNote) ‑> list[Warning]
-
Validate that the fields given in the note match the notetype.
Expand source code
@beartype def validate_decknote_fields(notetype: Notetype, decknote: DeckNote) -> List[Warning]: """Validate that the fields given in the note match the notetype.""" warnings: List[Warning] = [] names: List[str] = [field.name for field in notetype.flds] # TODO: It might also be nice to print the path of the note in the # repository. This would have to be added to the `DeckNote` spec. if len(decknote.fields.keys()) != len(names): warnings.append(WrongFieldCountWarning(decknote, names)) mk_warning = lambda n, k: InconsistentFieldNamesWarning(n, k, decknote) names_and_keys = F.starfilter( lambda n, k: n != k, zip(names, decknote.fields.keys()) ) return warnings + list(starmap(mk_warning, names_and_keys))
def warn(w: Warning) ‑> None
-
Call
click.secho()
with formatting (yellow).Expand source code
@beartype def warn(w: Warning) -> None: """Call `click.secho()` with formatting (yellow).""" click.secho(f"WARNING: {str(w)}", bold=True, fg="yellow")
def write_collection(deltas: collections.abc.Iterable[Delta], models: dict[str, Notetype], kirepo: KiRepo, parse: collections.abc.Callable[[Delta], DeckNote], head_kirepo: KiRepo, col: anki.collection.Collection) ‑> PushResult
-
Push a list of
Delta
s to an Anki collection.Expand source code
@beartype def write_collection( deltas: Iterable[Delta], models: Dict[str, Notetype], kirepo: KiRepo, parse: Callable[[Delta], DeckNote], head_kirepo: KiRepo, col: Collection, ) -> PushResult: """Push a list of `Delta`s to an Anki collection.""" # pylint: disable=too-many-locals # Copy collection to a temp directory. temp_col_dir: Dir = F.mkdtemp() new_col_file = temp_col_dir / kirepo.col_file.name col_name: str = kirepo.col_file.name new_col_file: NoFile = F.chk(temp_col_dir / col_name) new_col_file: File = F.copyfile(kirepo.col_file, new_col_file) # Open collection and add new models to root `models.json` file. tempcol: Collection = M.collection(new_col_file) do(add_model(tempcol), models.values()) # Stash both unstaged and staged files (including untracked). head_kirepo.repo.git.stash(include_untracked=True, keep_index=True) head_kirepo.repo.git.reset("HEAD", hard=True) # Display table of note change type counts and partition deltas into # 'deletes' and 'not deletes'. xs, ys, zs = tee(deltas, 3) echo_note_change_types(xs) dels: Iterable[Delta] = filter(lambda d: d.status == DELETED, ys) deltas: Iterable[Delta] = filter(lambda d: d.status != DELETED, zs) # Map guid -> (nid, mod, mid). guids: Dict[str, NoteMetadata] = get_note_metadata(tempcol) # Parse to-be-deleted notes and remove them from collection. del_guids: Iterable[str] = map(lambda dd: dd.guid, map(parse, dels)) del_guids = set(filter(lambda g: g in guids, del_guids)) del_nids: Iterable[NoteId] = map(lambda g: guids[g].nid, del_guids) tempcol.remove_notes(list(del_nids)) # Push changes for all other notes. guids = {k: v for k, v in guids.items() if k not in del_guids} timestamp_ns: int = time.time_ns() new_nids: Iterator[int] = itertools.count(int(timestamp_ns / 1e6)) decknotes: Iterable[DeckNote] = map(parse, deltas) do(warn, F.cat(map(push_note(tempcol, timestamp_ns, guids, new_nids), decknotes))) # It is always safe to save changes to the DB, since the DB is a copy. tempcol.close(save=True) # Backup collection file and overwrite collection. backup(kirepo) F.copyfile(new_col_file, kirepo.col_file) echo(f"Overwrote '{kirepo.col_file}'") # Add media files to collection. media_files = F.rglob(head_kirepo.root, MEDIA_FILE_RECURSIVE_PATTERN) mbytes: Iterable[MediaBytes] = map(mediabytes(col), media_files) # Skip media files whose twin in collection has same name and same data. mbytes = filter(lambda m: m.old == b"" or m.old != m.new, mbytes) # Add (and possibly rename) media paths. renames = filter(lambda a: a.file.name != a.new_name, map(addmedia(col), mbytes)) warnings = map(lambda r: RenamedMediaFileWarning(r.file.name, r.new_name), renames) do(warn, warnings) col.close(save=True) # Append and commit collection checksum to hashes file. append_md5sum(kirepo.ki, kirepo.col_file.name, F.md5(kirepo.col_file)) commit_hashes_file(kirepo) # Update commit SHA of most recent successful PUSH and unlock SQLite DB. kirepo.repo.delete_tag(LCA) kirepo.repo.create_tag(LCA) return PushResult.NONTRIVIAL
def write_decks(col: anki.collection.Collection, targetdir: Dir, colnotes: dict[int, ColNote], media: dict[int, set[File]]) ‑> None
-
The proper way to do this is a DFS traversal, perhaps recursively, which will make it easier to keep things purely functional, accumulating the model ids of the children in each node. For this, we must construct a tree from the deck names.
Implement new
ColNote
-writing procedure, usingDeckTreeNode
s.It must do the following for each deck: - create the deck directory - write the models.json file - create and populate the media directory - write the note payload for each note in the correct deck, exactly once
In other words, for each deck, we need to write all of its: - models - media - notes
The first two are cumulative: we want the models and media of subdecks to be included in their ancestors. The notes, however, should not be cumulative. Indeed, we want each note to appear exactly once in the entire repository, making allowances for the case where a single note's cards are spread across multiple decks, in which case we must create a symlink.
And actually, both of these cases are nicely taken care of for us by the
DeckManager.cids()
function, which has achildren: bool
parameter which toggles whether or not to get the card ids of subdecks or not.Expand source code
@beartype def write_decks( col: Collection, targetdir: Dir, colnotes: Dict[int, ColNote], media: Dict[int, Set[File]], ) -> None: """ The proper way to do this is a DFS traversal, perhaps recursively, which will make it easier to keep things purely functional, accumulating the model ids of the children in each node. For this, we must construct a tree from the deck names. Implement new `ColNote`-writing procedure, using `DeckTreeNode`s. It must do the following for each deck: - create the deck directory - write the models.json file - create and populate the media directory - write the note payload for each note in the correct deck, exactly once In other words, for each deck, we need to write all of its: - models - media - notes The first two are cumulative: we want the models and media of subdecks to be included in their ancestors. The notes, however, should not be cumulative. Indeed, we want each note to appear exactly once in the entire repository, making allowances for the case where a single note's cards are spread across multiple decks, in which case we must create a symlink. And actually, both of these cases are nicely taken care of for us by the `DeckManager.cids()` function, which has a `children: bool` parameter which toggles whether or not to get the card ids of subdecks or not. """ # Accumulate pairs of model ids and notetype maps. The return type of the # `ModelManager.get()` call below indicates that it may return `None`, # but we know it will not because we are getting the notetype id straight # from the Anki DB. # # Dump the models file for the whole repository. models = {m.id: col.models.get(m.id) for m in col.models.all_names_and_ids()} with open(targetdir / MODELS_FILE, "w", encoding=UTF8) as f: json.dump(models, f, ensure_ascii=False, indent=4, sort_keys=True) # Construct an iterable of all decks except the trivial deck. root: Deck = M.tree(col, targetdir, col.decks.deck_tree()) collisions, decks = F.part(lambda d: MEDIA in d.fullname, postorder(root)) if any(True for _ in collisions): warn(MediaDirectoryDeckNameCollisionWarning()) decks = list(decks) deckmap = {d.fullname: d for d in decks} # Write cards, models, and media to filesystem. do(write_note(deckmap), TQ(colnotes.values(), "Notes")) do(write_models(col, models), TQ(decks, "Notetypes")) symlink_media(col, root, targetdir, media)
def write_models(col: anki.collection.Collection, models: dict[int, dict[str, typing.Any]], deck: Deck) ‑> None
-
Write the
models.json
file for the given deck.Expand source code
@curried @beartype def write_models(col: Collection, models: Dict[int, NotetypeDict], deck: Deck) -> None: """Write the `models.json` file for the given deck.""" did: int = deck.did deckd: Dir = deck.deckd descendants: List[CardId] = col.decks.cids(did=did, children=True) cards: List[Card] = list(map(col.get_card, descendants)) descendant_mids: Set[int] = {c.note().mid for c in cards} # Write `models.json` for current deck. deck_models = {mid: models[mid] for mid in descendant_mids} with open(deckd / MODELS_FILE, "w", encoding=UTF8) as f: json.dump(deck_models, f, ensure_ascii=False, indent=4, sort_keys=True)
def write_note(deckmap: dict[str, Deck], colnote: ColNote) ‑> File
-
Expand source code
@curried @beartype def write_note( deckmap: Dict[str, Deck], colnote: ColNote, ) -> File: decknames = set(map(lambda c: c.col.decks.name(c.did), colnote.n.cards())) sortf = colnote.sfld if len(decknames) == 0: raise ValueError(f"No cards for note: {sortf}") if len(decknames) > 1: raise ValueError(f"Cards for note {sortf} are in distinct decks: {decknames}") fullname = decknames.pop() parts = fullname.split("::") if "_media" in parts: raise ValueError(f"Bad deck name '{fullname}' (cannot contain '_media')") deck: Deck = deckmap[fullname] path: NoFile = get_note_path(colnote, deck.deckd) payload: str = get_note_payload(colnote) return F.write(path, payload)
def write_repository(col: anki.collection.Collection, targetdir: Dir, dotki: DotKi, media_target_dir: EmptyDir) ‑> anki.collection.Collection
-
Write notes to appropriate directories in
targetdir
.Expand source code
@beartype def write_repository( col: Collection, targetdir: Dir, dotki: DotKi, media_target_dir: EmptyDir, ) -> Collection: """Write notes to appropriate directories in `targetdir`.""" # Create config file. config = configparser.ConfigParser() config["remote"] = {"path": col.path} with open(dotki.config, "w", encoding=UTF8) as config_f: config.write(config_f) # ColNote-containing data structure, to be passed to `write_decks()`. nids: Iterable[int] = TQ(col.find_notes(query=""), "Notes") colnotes: Dict[int, ColNote] = {nid: M.colnote(col, nid) for nid in nids} media: Dict[int, Set[File]] = copy_media_files(col, media_target_dir) write_decks( col=col, targetdir=targetdir, colnotes=colnotes, media=media, ) return col