-
-
Save mikob/24a908471e38370f40d302b1cb1b41fb to your computer and use it in GitHub Desktop.
| #!/usr/bin/env python3 | |
| ''' | |
| Update 10/4/2021 | |
| * Download audio for the longest word in Spanish. | |
| Update 9/22/2020 | |
| * Allow looking up through multiple lang codes by priority order on Forvo (eg. es_LATAM, es) | |
| * Strip nbsp; when looking up words on Forvo | |
| Update 9/5/2020 | |
| * Fixed to work with newer Anki versions 2.1.33+ (possibly earlier) | |
| --- | |
| Make sure to adjust BACKUP_LOC to a directory of your choosing. | |
| You will need to adjust CARD_TYPE and possibly DECK_NAME or create your own class that extends | |
| AudioDownloader for other languages. | |
| Adjust get_idxs and get_modded_fields_for_card as needed. | |
| You might need to adjust the fields for kana and kanji for Japanese. | |
| ''' | |
| import sqlite3 | |
| from abc import ABC, abstractmethod | |
| from hashlib import sha1 | |
| # from anki.utils import fieldChecksum | |
| from datetime import datetime | |
| import re | |
| import base64 | |
| from functools import reduce | |
| import sys | |
| import shutil | |
| import time | |
| import requests | |
| import json | |
| import os | |
| import click | |
| BACKUP_LOC = '/home/mikob/.local/share/Anki2/Miko/custom-backups/' | |
| CARD_FIELD_SEPARATOR = '\x1f' | |
| FAKE_BROWSER_HEADERS = { | |
| "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3", | |
| "user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36", | |
| "accept-language": "en-US,en;q=0.9,ja;q=0.8", | |
| "cache-control": "no-cache", | |
| "pragma": "no-cache", | |
| "sec-fetch-mode": | |
| "navigate", | |
| "sec-fetch-site": "none", | |
| "sec-fetch-user": "?1", | |
| "upgrade-insecure-requests": "1" | |
| } | |
| unicode_regx = re.compile('[\x00-\x7F]') | |
| word_sep_regx = re.compile('[\[\].,0-9a-zA-Z`\/「」、。:;0-9ー()()\s].') | |
| def strip_special_chars(s): | |
| return re.sub('[?:;/\\\"<>\.]+', '', s) | |
| class AudioDownloader(ABC): | |
| stats = { | |
| 'not_found': 0, | |
| 'found': 0, | |
| 'existing_sound': 0, | |
| 'invalid_audio': 0, | |
| } | |
| media_loc = None | |
| DECK_NAME = '' | |
| def __init__(self, dry, anki_user_folder_loc, last_mod): | |
| db_loc = os.path.join(anki_user_folder_loc, 'collection.anki2') | |
| self.media_loc = os.path.join(anki_user_folder_loc, 'collection.media') | |
| backup_loc = os.path.join( | |
| BACKUP_LOC, 'anki_backup_%s.anki2' % datetime.now()) | |
| shutil.copy(db_loc, backup_loc) | |
| self.log("Created backup: %s" % backup_loc) | |
| conn = sqlite3.connect(db_loc) | |
| cur = conn.cursor() | |
| mod_time = int(time.time()) - last_mod | |
| cur.execute('SELECT id,name FROM notetypes') | |
| note_types = cur.fetchall() | |
| # find deck_id | |
| deck_id = None | |
| if self.DECK_NAME: | |
| cur.execute('SELECT id,name FROM decks') | |
| decks = cur.fetchall() | |
| # get deck ids by name | |
| for deck in decks: | |
| _deck_id = deck[0] | |
| deck_name = deck[1] | |
| if deck_name == self.DECK_NAME: | |
| deck_id = _deck_id | |
| break | |
| if not deck_id: | |
| self.log( | |
| f'Could not find deck {self.DECK_NAME}. Continuing with all decks...') | |
| else: | |
| self.log(f'{self.DECK_NAME} deck id: {deck_id}') | |
| cards = [] | |
| for note_type in note_types: | |
| note_id = note_type[0] | |
| note_name = note_type[1] | |
| if note_name == self.CARD_TYPE: | |
| cards = self.get_cards(cur, note_id, mod_time, deck_id) | |
| cur.execute( | |
| 'SELECT ord,name,ntid FROM fields WHERE ntid=?', (int(note_id),)) | |
| note_fields = cur.fetchall() | |
| idxs = self.get_idxs(note_fields) | |
| if len(idxs) == 0: | |
| raise AssertionError("did not get any idxs") | |
| break | |
| # to restrict doing each note just once | |
| note_set = set() | |
| for card in cards: | |
| id = card[0] | |
| nid = card[1] | |
| if not (nid in note_set): | |
| note_set.add(nid) | |
| modded_note_fields = self.get_modded_fields_for_card( | |
| card, idxs) | |
| if modded_note_fields is not None and len(modded_note_fields) > 0: | |
| self.write_back(cur, id, modded_note_fields) | |
| # csum = fieldChecksum(new_fields[0]) | |
| if not dry: | |
| conn.commit() | |
| conn.close() | |
| self.log("Finished!\n\n%s" % self.stats) | |
| ''' | |
| Get the cards that don't have audio and have been updated after the | |
| mod time. | |
| mid = note type id | |
| ''' | |
| def get_cards(self, cur, note_type_id, mod_time, deck_id=None): | |
| q = 'SELECT * FROM notes WHERE mid=? AND mod>?' | |
| qa = (int(note_type_id), mod_time) | |
| if deck_id: | |
| q = 'SELECT * FROM notes INNER JOIN cards ON notes.id=cards.nid WHERE notes.mid=? AND notes.mod>? AND cards.did=?' | |
| qa += (deck_id,) | |
| cur.execute(q, qa) | |
| cards = cur.fetchall() | |
| self.log("Found %d cards" % (len(cards))) | |
| return cards | |
| def write_back(self, cur, id, modded_note_fields): | |
| try: | |
| modded_fields_str = str.join( | |
| CARD_FIELD_SEPARATOR, modded_note_fields) | |
| mod = int(time.time()) | |
| # need to set update sequence number (usn) to -1 to tell server we have updates | |
| cur.execute("UPDATE notes SET flds=?,mod=?,usn=? WHERE id=?", | |
| (modded_fields_str, mod, -1, id)) | |
| except Exception as e: | |
| self.log("Error with card %s" % e) | |
| def log(self, *msg): | |
| print(self.CARD_TYPE, *msg) | |
| ''' | |
| note_fields is a tuple with _ord (field idx), name, and note_type_id | |
| Returns a tuple of idxs that get_modded_fields_for_card can use. | |
| ''' | |
| @abstractmethod | |
| def get_idxs(self, note_fields): | |
| pass | |
| ''' | |
| Takes idxs from get_idxs | |
| ''' | |
| @abstractmethod | |
| def get_modded_fields_for_card(self, cards, idxs): | |
| pass | |
| class ForvoDownloader(AudioDownloader): | |
| CARD_TYPE = 'WAudio' | |
| def strip_bad_chars(self, word): | |
| return word.replace(' ', '') | |
| def get_forvo_pronunciation(self, longest_word, processed_word): | |
| for lang_code in self.LANG_CODES: | |
| webPageUrl = "https://forvo.com/search/%s/%s/" % ( | |
| longest_word, lang_code) | |
| # s = requests.Session() | |
| # res = s.get('https://forvo.com') | |
| # cookies = dict(res.cookies) | |
| webPageText = requests.get( | |
| webPageUrl, headers=FAKE_BROWSER_HEADERS).text | |
| pageTextList = re.findall( | |
| "<article class=\"search_words.*?</article>", webPageText, re.DOTALL) | |
| if len(pageTextList) == 0: | |
| continue | |
| # first result might be search_words empty | |
| pageText = pageTextList[-1] | |
| pronunciations = re.findall("Play\(\d+,'(.*?)'", pageText) | |
| if pronunciations: | |
| for l in range(len(pronunciations)): | |
| pronunciations[l] = base64.b64decode( | |
| pronunciations[l]).decode() | |
| words = '"%s"' % longest_word | |
| if processed_word and longest_word != processed_word: | |
| words = '"%s" ("%s")' % (longest_word, processed_word) | |
| self.log('Found %d pronunciations for %s' % | |
| (len(pronunciations), words)) | |
| return 'https://forvo.com/mp3/%s' % pronunciations[0] | |
| def get_idxs(self, note_fields): | |
| audio_idx = None | |
| for _ord, name, note_type_id in note_fields: | |
| if name == 'Audio': | |
| return (_ord,) | |
| return () | |
| def get_modded_fields_for_card(self, card, idxs): | |
| audio_idx = idxs[0] | |
| front_field_idx = 0 | |
| fields = card[6].split(CARD_FIELD_SEPARATOR) | |
| existing_sound = fields[audio_idx].strip() | |
| if existing_sound != '': | |
| self.stats['existing_sound'] += 1 | |
| return | |
| processed_word = self.strip_bad_chars(fields[front_field_idx]) | |
| longest_word = reduce(lambda memo, word: word if len( | |
| word) > len(memo) else memo, processed_word.split(' '), '') | |
| audio_url = self.get_forvo_pronunciation(longest_word, processed_word) | |
| if audio_url: | |
| try: | |
| r = requests.get(audio_url, headers=FAKE_BROWSER_HEADERS) | |
| if r.headers['content-type'] != 'audio/mpeg': | |
| self.stats['invalid_audio'] += 1 | |
| return | |
| self.stats['found'] += 1 | |
| sound_file_name = '%s_%s.mp3' % ( | |
| strip_special_chars(fields[front_field_idx]), 'forvo') | |
| sound_file_path = os.path.join(self.media_loc, sound_file_name) | |
| with open(sound_file_path, 'wb') as f: | |
| for chunk in r: | |
| f.write(chunk) | |
| fields[audio_idx] = '[sound:%s]' % sound_file_name | |
| except Exception as e: | |
| self.log("Error with updating card with spec. idx %s" % e) | |
| return fields | |
| else: | |
| words = '"%s"' % longest_word | |
| if longest_word != processed_word: | |
| words = '"%s" ("%s")' % (longest_word, processed_word) | |
| self.log('Could not find pronunciation for %s' % words) | |
| self.stats['not_found'] += 1 | |
| class Spanish(ForvoDownloader): | |
| DECK_NAME = 'Español' | |
| LANG_CODES = ['es_latam', 'es'] | |
| class Russian(ForvoDownloader): | |
| DECK_NAME = 'По-русски' | |
| LANG_CODES = ['ru'] | |
| def strip_accents(self, word): | |
| return word.replace('а́', 'а').replace('е́', 'е') | |
| class Japanese(AudioDownloader): | |
| CARD_TYPE = 'Japanese' | |
| DL_URL = 'http://assets.languagepod101.com/dictionary/japanese/audiomp3.php' | |
| def __init__(self, *args, **kwargs): | |
| self.stats.update({ | |
| 'found_conjugated': 0, | |
| 'no_kanji_or_kana': 0, | |
| }) | |
| super().__init__(*args, **kwargs) | |
| @classmethod | |
| def find_word(cls, s): | |
| splitted = word_sep_regx.split(s.strip()) | |
| for w in splitted: | |
| cleaned = remove_non_unicode_characters(w).strip() | |
| if cleaned != '': | |
| return cleaned | |
| return '' | |
| @classmethod | |
| def to_dict_form(cls, kanji, kana): | |
| kana_root = kana[:-3] | |
| kanji_root = kanji[:-3] | |
| def _filled(poss): | |
| return [('%s%s' % (kanji_root, suff), '%s%s' % (kana_root, suff)) for suff in poss] | |
| if kanji[-3:] == 'します' and kana[-3:] == 'します': | |
| # group 3 | |
| return _filled(['する', '']) | |
| elif kanji[-2:] == 'ます' and kana[-2:] == 'ます': | |
| if kana[-3] in ('き', 'ぎ', 'み', 'り', 'い', 'し', 'ち',): | |
| # group 1 | |
| if kana[-3] == 'き': | |
| return _filled(['く']) | |
| elif kana[-3] == 'ぎ': | |
| return _filled(['ぐ']) | |
| elif kana[-3] == 'み': | |
| return _filled(['む']) | |
| elif kana[-3] == 'り': | |
| return _filled(['る']) | |
| elif kana[-3] == 'い': | |
| return _filled(['う']) | |
| elif kana[-3] == 'し': | |
| return _filled(['す']) | |
| elif kana[-3] == 'ち': | |
| return _filled(['つ']) | |
| else: | |
| # group 2 | |
| return [('%sる' % kanji[:-2], '%sる' % kana[:-2])] | |
| return [] | |
| def get_idxs(self, note_fields): | |
| audio_idx = None | |
| kanji_idx = None | |
| hiragana_idx = None | |
| for _ord, name, note_type_id in note_fields: | |
| fieldname = name.lower() | |
| if 'audio' == fieldname: | |
| audio_idx = _ord | |
| elif 'hiragana' == fieldname: | |
| hiragana_idx = _ord | |
| elif fieldname == 'front' or ('vocab' in fieldname and 'hiragana' not in fieldname and 'back' not in fieldname): | |
| kanji_idx = _ord | |
| elif fieldname == 'text': | |
| # needs work (cloze deletion) | |
| kanji_idx = _ord | |
| if audio_idx is None or kanji_idx is None or hiragana_idx is None: | |
| raise AssertionError("missing kanji, hiragana or audio field(s)") | |
| return (audio_idx, kanji_idx, hiragana_idx) | |
| def get_modded_fields_for_card(self, card, idxs): | |
| audio_idx = idxs[0] | |
| kanji_idx = idxs[1] | |
| hiragana_idx = idxs[2] | |
| fields = card[6].split(CARD_FIELD_SEPARATOR) | |
| raw_kanji = fields[kanji_idx] | |
| raw_kana = fields[hiragana_idx] | |
| kanji = self.find_word(raw_kanji) | |
| kana = self.find_word(raw_kana) | |
| if kana == '': | |
| if kanji == '': | |
| if raw_kana != '' or raw_kanji != '': | |
| print("No kanji/kana for %s %s" % (raw_kanji, raw_kana)) | |
| self.stats['no_kanji_or_kana'] += 1 | |
| return | |
| kana = kanji | |
| existing_sound = fields[audio_idx].strip() | |
| if existing_sound != '': | |
| # print("already has audio %s %s" % (kanji, kana)) | |
| self.stats['existing_sound'] += 1 | |
| return | |
| conjugated = False | |
| try: | |
| r = requests.get(self.DL_URL, params={ | |
| 'kanji': kanji, 'kana': kana}) | |
| if r.headers['Content-length'] == '52288': | |
| # now try conjugating | |
| attempts = self.to_dict_form(kanji, kana) | |
| for attempt in attempts: | |
| conjugated_kanji, conjugated_kana = attempt | |
| r = requests.get(self.DL_URL, params={ | |
| 'kanji': conjugated_kanji, 'kana': conjugated_kana}) | |
| print("Trying %s %s for %s %s" % | |
| (conjugated_kanji, conjugated_kana, kanji, kana)) | |
| if r.headers['Content-length'] != '52288': | |
| conjugated = True | |
| if not conjugated: | |
| print("NOT found %s %s" % (kanji, kana)) | |
| self.stats['not_found'] += 1 | |
| return | |
| if conjugated: | |
| print("Found conjugated form %s %s" % (kanji, kana)) | |
| self.stats['found_conjugated'] += 1 | |
| else: | |
| print("Found %s %s" % (kanji, kana)) | |
| self.stats['found'] += 1 | |
| sound_file_name = '%s_%s.mp3' % (kanji, kana) | |
| sound_file_path = os.path.join(self.media_loc, sound_file_name) | |
| with open(sound_file_path, 'wb') as f: | |
| for chunk in r: | |
| f.write(chunk) | |
| fields[audio_idx] = '[sound:%s]' % sound_file_name | |
| except Exception as e: | |
| self.log("Error with updating card with spec. idx %s" % e) | |
| return fields | |
| class JapaneseCloze(Japanese): | |
| CARD_TYPE = 'Japanese Cloze' | |
| class JapaneseKanji(Japanese): | |
| CARD_TYPE = 'Kanji' | |
| def remove_non_unicode_characters(s): | |
| return unicode_regx.sub('', s) | |
| @click.command() | |
| @click.argument('anki-user-folder-loc', type=click.Path(exists=True)) | |
| @click.option('--language', '-l', type=click.Choice(['japanese', 'russian', 'spanish']), required=True, multiple=True) | |
| @click.option('--last-mod', default=time.time()) | |
| @click.option('--dry', is_flag=True) | |
| def do(anki_user_folder_loc, language, last_mod, dry): | |
| if 'japanese' in language: | |
| Japanese(dry, anki_user_folder_loc, last_mod) | |
| JapaneseCloze(dry, anki_user_folder_loc, last_mod) | |
| JapaneseKanji(dry, anki_user_folder_loc, last_mod) | |
| if 'russian' in language: | |
| Russian(dry, anki_user_folder_loc, last_mod) | |
| if 'spanish' in language: | |
| Spanish(dry, anki_user_folder_loc, last_mod) | |
| if __name__ == '__main__': | |
| do() |
This doesn't work (I'm on 2.1). It needs click (perhaps in the same folder?) to run, and anki_user_folder_loc doesn't seem to be defined. How does this even work for you?
Doesnt work on 2.1 , any update on this?
There's some issues with the code:
- 98: this for loop should run for each note, not pointlessly each and every card of it (or alternatively: only for one card)
- for the same reason, 386&387 should be commented out by default, with a note in the preface
- Japanese(AudioDownloader) needs a default DECK_NAME
- tests @ 290 & 292 should be == ffs, and the preface needs a note that the fields there (and 294 & 296) may need adjustment
- actually I have not idea what 294 was about so I killed it
As for Anki itself, with newer 2.1 versions you must run "Downgrade & Quit" from (bottom right) of the File→"Switch Profile" dialog (C-S-p), otherwise the profile will be in a new format this script can't read. Anki will automatically upgrade it again the next time you open the profile normally.
@nwwt Thanks! I actually just noticed some of the issues with newer Anki. I will fix and update this script, it would be helpful to have your change requests in a fork - if you're so inclined.
@nwwt updated, FYI
This would be really convenient as an add-on. Thanks for sharing your personal work.
How do you make this work? I added it the file to the forvo src but I'm not sure what to do next
Hi, I tried running the script but it's telling me "Missing argument : 'ANKI_USER_FOLDER_LOC'.
Also, don't really know how to use this thing. Is there a wiki or something? I want to download pronounciations from japanesepod101 for a big list of words.
@languagemaniac
JP101 is available at media.digitalwords.net/anki/lp/Japanese.apkg
audio_field_indices.append(idx)should probably beaudio_field_indices.append(audio_field_idx ). Right now this will only work ifAudiois the last field.