Created
January 19, 2020 19:51
-
-
Save v64/3d78519ad4899c2308ef5f1582f7be88 to your computer and use it in GitHub Desktop.
BitTorrent Metainfo SQLite DB
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import hashlib | |
| import itertools | |
| import os | |
| import sqlite3 | |
| import urllib.parse | |
| def main(): | |
| db = sqlite3.connect('./metainfo.db') | |
| c = db.cursor() | |
| check_tables(c) | |
| for root, subdirs, files in os.walk('.', followlinks=True): | |
| for file in files: | |
| if file[-8:] == '.torrent': | |
| try: | |
| full_path = os.path.join(root, file) | |
| process_torrent(full_path, c) | |
| db.commit() | |
| except Exception as e: | |
| db.rollback() | |
| print("Error processing {} ({})".format(full_path, e)) | |
| db.commit() | |
| c.close() | |
| db.close() | |
| def process_torrent(filename, c): | |
| with open(filename, 'rb') as torrent: | |
| orig = torrent.read() | |
| decoded = bdecode(orig) | |
| info_hash = hashlib.sha1(bencode(decoded['info'])).hexdigest() | |
| single = 1 if 'length' in decoded['info'] else 0 | |
| name = decoded['info']['name'] | |
| pieces = int(len(decoded['info']['pieces']) / 20) | |
| piece_len = decoded['info']['piece length'] | |
| num_files = 1 if single else len(decoded['info']['files']) | |
| magnet_link = get_magnet_link(info_hash, name, decoded['announce-list']) | |
| total_length = 0 | |
| if single: | |
| total_length = decoded['info']['length'] | |
| else: | |
| for i, file in enumerate(decoded['info']['files']): | |
| full_path = [x for x in file['path']] | |
| file_name = os.path.join(*full_path) | |
| file_length = file['length'] | |
| total_length += file_length | |
| db_row = (info_hash, i, file_name, file_length) | |
| c.execute('insert or ignore into torrent_files values (?,?,?,?)', db_row) | |
| db_row = (info_hash, orig, magnet_link, name, pieces, piece_len, num_files, total_length) | |
| c.execute('insert or ignore into torrent_metainfo values (?,?,?,?,?,?,?,?)', db_row) | |
| def get_magnet_link(info_hash, name, announce): | |
| """ | |
| Example magnet URL for Borat (2006) [1080p] 88f540a711f44791b5da74e9ac4c3794c58a82c4 | |
| magnet:?xt=urn:btih:88f540a711f44791b5da74e9ac4c3794c58a82c4&dn=Borat%20%282006%29%20%5B1080p%5D&tr=udp%3A%2F%2Ftracker.yify-torrents.com%2Fannounce&tr=udp%3A%2F%2Ftwig.gs%3A6969&tr=udp%3A%2F%2Ftracker.publichd.eu%2Fannounce&tr=http%3A%2F%2Ftracker.publichd.eu%2Fannounce&tr=udp%3A%2F%2Ftracker.police.maori.nz%2Fannounce&tr=udp%3A%2F%2Ftracker.1337x.org%3A80%2Fannounce&tr=udp%3A%2F%2Fexodus.desync.com%3A6969&tr=udp%3A%2F%2Ftracker.istole.it%3A80&tr=udp%3A%2F%2Ftracker.ccc.de%3A80%2Fannounce&tr=http%3A%2F%2Ftracker.yify-torrents.com%2Fannounce&tr=udp%3A%2F%2F9.rarbg.com%3A2710%2Fannounce&tr=http%3A%2F%2Ffr33dom.h33t.com%3A3310%2Fannounce&tr=udp%3A%2F%2Ftracker.zer0day.to%3A1337%2Fannounce&tr=udp%3A%2F%2Ftracker.leechers-paradise.org%3A6969%2Fannounce&tr=udp%3A%2F%2Fcoppersurfer.tk%3A6969%2Fannounce | |
| """ | |
| announce = list(itertools.chain.from_iterable(announce)) # Flatten announce-list tiers into one list | |
| quoted_name = urllib.parse.quote(name, safe='') | |
| quoted_announce = '&'.join(['tr={}'.format(urllib.parse.quote(x, safe='')) for x in announce]) | |
| return 'magnet:?xt=urn:btih:{}&dn={}&{}'.format(info_hash, quoted_name, quoted_announce) | |
| def check_tables(c): | |
| c.execute("select name from sqlite_master where type='table' and name='torrent_metainfo'") | |
| rows = c.fetchall() | |
| if len(rows) == 0: | |
| c.execute(create_torrent_metainfo) | |
| c.execute(create_torrent_files) | |
| c.connection.commit() | |
| create_torrent_metainfo = """\ | |
| create table torrent_metainfo ( | |
| info_hash text primary key not null, | |
| orig_bencode blob not null, | |
| magnet_link text not null, | |
| name text not null, | |
| pieces integer not null, | |
| piece_length integer not null, | |
| num_files integer not null, | |
| total_length integer not null | |
| ) | |
| """ | |
| create_torrent_files = """\ | |
| create table torrent_files ( | |
| info_hash text not null, | |
| list_index integer not null, | |
| name text not null, | |
| length integer not null, | |
| primary key (info_hash, list_index) | |
| ) | |
| """ | |
| # Bencoding functions included to remove dependency on bencoding | |
| # From https://github.com/jbernhard/scripts/blob/master/verify-torrent | |
| # functions for decoding bencoded data | |
| # original version from the bittorrent bencode module | |
| # updated for python 3 | |
| # decode the four bencode types | |
| # note that these functions often call each other recursively | |
| def encode_int(x): | |
| return bytes('i{}e'.format(str(x)), 'ascii') | |
| def decode_int(x, f): | |
| f += 1 | |
| newf = x.index(b'e', f) | |
| n = int(x[f:newf]) | |
| if x[f] == ord('-'): | |
| if x[f + 1] == '0': | |
| raise ValueError('-0 is invalid') | |
| elif x[f] == ord('0') and newf != f+1: | |
| raise ValueError('leading zero is invalid in integer') | |
| return n, newf+1 | |
| def encode_string(x): | |
| b = bytes(x, 'utf-8') | |
| return encode_bytes(b) | |
| def encode_bytes(x): | |
| return bytes(str(len(x))+':', 'ascii') + x | |
| def decode_string(x, f): | |
| colon = x.index(b':', f) | |
| n = int(x[f:colon]) | |
| if x[f] == ord('0') and colon != f+1: | |
| raise ValueError('leading zero is invalid in string length') | |
| colon += 1 | |
| encoded_string = x[colon:colon+n] | |
| try: | |
| r = encoded_string.decode() | |
| except UnicodeDecodeError: | |
| r = encoded_string | |
| return r, colon+n | |
| def encode_list(x): | |
| o = b'l' | |
| for e in x: | |
| o += bencode(e) | |
| return o + b'e' | |
| def decode_list(x, f): | |
| r, f = [], f+1 | |
| while x[f] != ord('e'): | |
| v, f = decode_func[chr(x[f])](x, f) | |
| r.append(v) | |
| return r, f+1 | |
| def encode_dict(x): | |
| o = b'd' | |
| for k in sorted(x): | |
| o += encode_string(k) + bencode(x[k]) | |
| return o + b'e' | |
| def decode_dict(x, f): | |
| r, f = {}, f+1 | |
| while x[f] != ord('e'): | |
| k, f = decode_string(x, f) | |
| r[k], f = decode_func[chr(x[f])](x, f) | |
| return r, f+1 | |
| # define a dictionary mapping bencode keys to encoding functions | |
| encode_func = {"<class 'list'>": encode_list, "<class 'dict'>": encode_dict, | |
| "<class 'bytes'>": encode_bytes, "<class 'str'>": encode_string, | |
| "<class 'int'>": encode_int} | |
| # define a dictionary mapping bdecode keys to decoding functions | |
| decode_func = {str(i): decode_string for i in range(10)} | |
| decode_func.update(l=decode_list, d=decode_dict, i=decode_int) | |
| def bencode(x): | |
| try: | |
| r = encode_func[str(type(x))](x) | |
| except (IndexError, KeyError, ValueError): | |
| raise ValueError("Invalid type to encode: {}".format(str(type(x)))) | |
| return r | |
| def bdecode(x): | |
| try: | |
| r, l = decode_func[chr(x[0])](x, 0) | |
| except (IndexError, KeyError, ValueError) as e: | |
| raise ValueError("not a valid bencoded string ({})".format(e)) | |
| if l != len(x): | |
| raise ValueError("invalid bencoded value (data after valid prefix)") | |
| return r | |
| if __name__ == '__main__': | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment