Skip to content

Instantly share code, notes, and snippets.

@v64
Created January 19, 2020 19:51
Show Gist options
  • Select an option

  • Save v64/3d78519ad4899c2308ef5f1582f7be88 to your computer and use it in GitHub Desktop.

Select an option

Save v64/3d78519ad4899c2308ef5f1582f7be88 to your computer and use it in GitHub Desktop.
BitTorrent Metainfo SQLite DB
#!/usr/bin/env python3
import hashlib
import itertools
import os
import sqlite3
import urllib.parse
def main():
db = sqlite3.connect('./metainfo.db')
c = db.cursor()
check_tables(c)
for root, subdirs, files in os.walk('.', followlinks=True):
for file in files:
if file[-8:] == '.torrent':
try:
full_path = os.path.join(root, file)
process_torrent(full_path, c)
db.commit()
except Exception as e:
db.rollback()
print("Error processing {} ({})".format(full_path, e))
db.commit()
c.close()
db.close()
def process_torrent(filename, c):
with open(filename, 'rb') as torrent:
orig = torrent.read()
decoded = bdecode(orig)
info_hash = hashlib.sha1(bencode(decoded['info'])).hexdigest()
single = 1 if 'length' in decoded['info'] else 0
name = decoded['info']['name']
pieces = int(len(decoded['info']['pieces']) / 20)
piece_len = decoded['info']['piece length']
num_files = 1 if single else len(decoded['info']['files'])
magnet_link = get_magnet_link(info_hash, name, decoded['announce-list'])
total_length = 0
if single:
total_length = decoded['info']['length']
else:
for i, file in enumerate(decoded['info']['files']):
full_path = [x for x in file['path']]
file_name = os.path.join(*full_path)
file_length = file['length']
total_length += file_length
db_row = (info_hash, i, file_name, file_length)
c.execute('insert or ignore into torrent_files values (?,?,?,?)', db_row)
db_row = (info_hash, orig, magnet_link, name, pieces, piece_len, num_files, total_length)
c.execute('insert or ignore into torrent_metainfo values (?,?,?,?,?,?,?,?)', db_row)
def get_magnet_link(info_hash, name, announce):
"""
Example magnet URL for Borat (2006) [1080p] 88f540a711f44791b5da74e9ac4c3794c58a82c4
magnet:?xt=urn:btih:88f540a711f44791b5da74e9ac4c3794c58a82c4&dn=Borat%20%282006%29%20%5B1080p%5D&tr=udp%3A%2F%2Ftracker.yify-torrents.com%2Fannounce&tr=udp%3A%2F%2Ftwig.gs%3A6969&tr=udp%3A%2F%2Ftracker.publichd.eu%2Fannounce&tr=http%3A%2F%2Ftracker.publichd.eu%2Fannounce&tr=udp%3A%2F%2Ftracker.police.maori.nz%2Fannounce&tr=udp%3A%2F%2Ftracker.1337x.org%3A80%2Fannounce&tr=udp%3A%2F%2Fexodus.desync.com%3A6969&tr=udp%3A%2F%2Ftracker.istole.it%3A80&tr=udp%3A%2F%2Ftracker.ccc.de%3A80%2Fannounce&tr=http%3A%2F%2Ftracker.yify-torrents.com%2Fannounce&tr=udp%3A%2F%2F9.rarbg.com%3A2710%2Fannounce&tr=http%3A%2F%2Ffr33dom.h33t.com%3A3310%2Fannounce&tr=udp%3A%2F%2Ftracker.zer0day.to%3A1337%2Fannounce&tr=udp%3A%2F%2Ftracker.leechers-paradise.org%3A6969%2Fannounce&tr=udp%3A%2F%2Fcoppersurfer.tk%3A6969%2Fannounce
"""
announce = list(itertools.chain.from_iterable(announce)) # Flatten announce-list tiers into one list
quoted_name = urllib.parse.quote(name, safe='')
quoted_announce = '&'.join(['tr={}'.format(urllib.parse.quote(x, safe='')) for x in announce])
return 'magnet:?xt=urn:btih:{}&dn={}&{}'.format(info_hash, quoted_name, quoted_announce)
def check_tables(c):
c.execute("select name from sqlite_master where type='table' and name='torrent_metainfo'")
rows = c.fetchall()
if len(rows) == 0:
c.execute(create_torrent_metainfo)
c.execute(create_torrent_files)
c.connection.commit()
create_torrent_metainfo = """\
create table torrent_metainfo (
info_hash text primary key not null,
orig_bencode blob not null,
magnet_link text not null,
name text not null,
pieces integer not null,
piece_length integer not null,
num_files integer not null,
total_length integer not null
)
"""
create_torrent_files = """\
create table torrent_files (
info_hash text not null,
list_index integer not null,
name text not null,
length integer not null,
primary key (info_hash, list_index)
)
"""
# Bencoding functions included to remove dependency on bencoding
# From https://github.com/jbernhard/scripts/blob/master/verify-torrent
# functions for decoding bencoded data
# original version from the bittorrent bencode module
# updated for python 3
# decode the four bencode types
# note that these functions often call each other recursively
def encode_int(x):
return bytes('i{}e'.format(str(x)), 'ascii')
def decode_int(x, f):
f += 1
newf = x.index(b'e', f)
n = int(x[f:newf])
if x[f] == ord('-'):
if x[f + 1] == '0':
raise ValueError('-0 is invalid')
elif x[f] == ord('0') and newf != f+1:
raise ValueError('leading zero is invalid in integer')
return n, newf+1
def encode_string(x):
b = bytes(x, 'utf-8')
return encode_bytes(b)
def encode_bytes(x):
return bytes(str(len(x))+':', 'ascii') + x
def decode_string(x, f):
colon = x.index(b':', f)
n = int(x[f:colon])
if x[f] == ord('0') and colon != f+1:
raise ValueError('leading zero is invalid in string length')
colon += 1
encoded_string = x[colon:colon+n]
try:
r = encoded_string.decode()
except UnicodeDecodeError:
r = encoded_string
return r, colon+n
def encode_list(x):
o = b'l'
for e in x:
o += bencode(e)
return o + b'e'
def decode_list(x, f):
r, f = [], f+1
while x[f] != ord('e'):
v, f = decode_func[chr(x[f])](x, f)
r.append(v)
return r, f+1
def encode_dict(x):
o = b'd'
for k in sorted(x):
o += encode_string(k) + bencode(x[k])
return o + b'e'
def decode_dict(x, f):
r, f = {}, f+1
while x[f] != ord('e'):
k, f = decode_string(x, f)
r[k], f = decode_func[chr(x[f])](x, f)
return r, f+1
# define a dictionary mapping bencode keys to encoding functions
encode_func = {"<class 'list'>": encode_list, "<class 'dict'>": encode_dict,
"<class 'bytes'>": encode_bytes, "<class 'str'>": encode_string,
"<class 'int'>": encode_int}
# define a dictionary mapping bdecode keys to decoding functions
decode_func = {str(i): decode_string for i in range(10)}
decode_func.update(l=decode_list, d=decode_dict, i=decode_int)
def bencode(x):
try:
r = encode_func[str(type(x))](x)
except (IndexError, KeyError, ValueError):
raise ValueError("Invalid type to encode: {}".format(str(type(x))))
return r
def bdecode(x):
try:
r, l = decode_func[chr(x[0])](x, 0)
except (IndexError, KeyError, ValueError) as e:
raise ValueError("not a valid bencoded string ({})".format(e))
if l != len(x):
raise ValueError("invalid bencoded value (data after valid prefix)")
return r
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment