|
#!/usr/bin/env python3 |
|
"""Benchmark event_metrics indexes. |
|
|
|
Usage: |
|
python script.py setup # Create base DB with data |
|
python script.py index single|composite # Add indexes |
|
python script.py query single|composite [balances|amounts] # Run query |
|
""" |
|
|
|
import random |
|
import shutil |
|
import sqlite3 |
|
import sys |
|
from pathlib import Path |
|
|
|
NUM_ROWS = 500_000 |
|
DB_DIR = Path('/tmp/benchmark_indexes') |
|
|
|
SCHEMA = """ |
|
CREATE TABLE location ( |
|
location CHAR(1) PRIMARY KEY NOT NULL, |
|
seq INTEGER UNIQUE |
|
); |
|
INSERT INTO location(location, seq) VALUES ('A', 1); |
|
INSERT INTO location(location, seq) VALUES ('B', 2); |
|
INSERT INTO location(location, seq) VALUES ('C', 3); |
|
INSERT INTO location(location, seq) VALUES ('D', 4); |
|
INSERT INTO location(location, seq) VALUES ('E', 5); |
|
INSERT INTO location(location, seq) VALUES ('F', 6); |
|
INSERT INTO location(location, seq) VALUES ('G', 7); |
|
INSERT INTO location(location, seq) VALUES ('H', 8); |
|
INSERT INTO location(location, seq) VALUES ('I', 9); |
|
INSERT INTO location(location, seq) VALUES ('J', 10); |
|
|
|
CREATE TABLE assets ( |
|
identifier TEXT NOT NULL PRIMARY KEY |
|
); |
|
|
|
CREATE TABLE history_events ( |
|
identifier INTEGER NOT NULL PRIMARY KEY, |
|
entry_type INTEGER NOT NULL, |
|
group_identifier TEXT NOT NULL, |
|
sequence_index INTEGER NOT NULL, |
|
timestamp INTEGER NOT NULL, |
|
location CHAR(1) NOT NULL DEFAULT('A') REFERENCES location(location), |
|
location_label TEXT, |
|
asset TEXT NOT NULL, |
|
amount TEXT NOT NULL, |
|
notes TEXT, |
|
type TEXT NOT NULL, |
|
subtype TEXT NOT NULL, |
|
extra_data TEXT, |
|
ignored INTEGER NOT NULL DEFAULT 0, |
|
FOREIGN KEY(asset) REFERENCES assets(identifier) ON UPDATE CASCADE, |
|
UNIQUE(group_identifier, sequence_index) |
|
); |
|
|
|
CREATE TABLE event_metrics ( |
|
id INTEGER NOT NULL PRIMARY KEY, |
|
event_identifier INTEGER NOT NULL REFERENCES history_events(identifier) ON DELETE CASCADE, |
|
location_label TEXT, |
|
protocol TEXT, |
|
metric_key TEXT NOT NULL, |
|
metric_value TEXT NOT NULL, |
|
asset TEXT NOT NULL, |
|
UNIQUE(event_identifier, location_label, protocol, metric_key, asset) |
|
); |
|
""" |
|
|
|
SINGLE_INDEXES = """ |
|
CREATE INDEX idx_event_metrics_event ON event_metrics(event_identifier); |
|
CREATE INDEX idx_event_metrics_location_label ON event_metrics(location_label); |
|
CREATE INDEX idx_event_metrics_protocol ON event_metrics(protocol); |
|
CREATE INDEX idx_event_metrics_metric_key ON event_metrics(metric_key); |
|
CREATE INDEX idx_event_metrics_asset ON event_metrics(asset); |
|
""" |
|
|
|
COMPOSITE_INDEXES = """ |
|
CREATE INDEX idx_event_metrics_event ON event_metrics(event_identifier); |
|
CREATE INDEX idx_event_metrics_location_label ON event_metrics(location_label); |
|
CREATE INDEX idx_event_metrics_protocol ON event_metrics(protocol); |
|
CREATE INDEX idx_event_metrics_metric_key ON event_metrics(metric_key); |
|
CREATE INDEX idx_event_metrics_asset ON event_metrics(asset); |
|
CREATE INDEX idx_event_metrics_metric_key_bucket ON event_metrics(metric_key, location_label, protocol, asset); |
|
CREATE INDEX idx_event_metrics_metric_key_event ON event_metrics(metric_key, event_identifier); |
|
""" |
|
|
|
GET_BALANCES = """ |
|
SELECT asset, SUM(metric_value) FROM ( |
|
SELECT em.asset, em.metric_value, MAX(he.timestamp + he.sequence_index) |
|
FROM event_metrics em JOIN history_events he ON em.event_identifier = he.identifier |
|
WHERE he.ignored = 0 AND em.metric_key = 'balance' AND he.timestamp <= 1700250000000 |
|
GROUP BY he.location, em.location_label, em.protocol, em.asset |
|
) GROUP BY asset HAVING SUM(metric_value) > 0 |
|
""" |
|
|
|
GET_ASSETS_AMOUNTS = """ |
|
WITH all_events AS ( |
|
SELECT he.timestamp, he.timestamp + he.sequence_index as sort_key, |
|
CAST(em.metric_value AS REAL) as balance, |
|
he.location || COALESCE(em.location_label, '') || COALESCE(em.protocol, '') || em.asset as bucket |
|
FROM event_metrics em |
|
JOIN history_events he ON em.event_identifier = he.identifier |
|
WHERE em.metric_key = 'balance' AND em.asset IN ('asset_0', 'asset_1', 'asset_2') |
|
) |
|
SELECT timestamp, sort_key, |
|
balance - COALESCE(LAG(balance) OVER (PARTITION BY bucket ORDER BY sort_key), 0) as delta |
|
FROM all_events WHERE timestamp >= 1700000000000 AND timestamp <= 1700250000000 |
|
""" |
|
|
|
|
|
def setup(): |
|
"""Create base database with data only, no indexes.""" |
|
DB_DIR.mkdir(exist_ok=True) |
|
base_db = DB_DIR / 'base.db' |
|
base_db.unlink(missing_ok=True) |
|
|
|
print(f'Creating base DB with {NUM_ROWS:,} rows...') |
|
conn = sqlite3.connect(base_db) |
|
conn.executescript(SCHEMA) |
|
|
|
random.seed(42) |
|
base_ts = 1700000000000 |
|
locations = list('ABCDEFGHIJ') |
|
assets = [f'asset_{i}' for i in range(50)] |
|
protocols = [f'proto_{i}' for i in range(20)] + [None] * 5 |
|
addresses = [f'0x{i:040x}' for i in range(100)] |
|
event_types = ['trade', 'receive', 'spend', 'deposit', 'withdrawal'] |
|
subtypes = ['none', 'fee', 'reward'] |
|
|
|
# insert assets |
|
conn.executemany('INSERT INTO assets VALUES (?)', [(a,) for a in assets]) |
|
|
|
# insert history_events |
|
conn.executemany( |
|
'INSERT INTO history_events VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)', |
|
[(i, # identifier |
|
1, # entry_type |
|
f'tx_{i // 10}', # group_identifier |
|
i % 10, # sequence_index |
|
base_ts + i * 1000, # timestamp |
|
random.choice(locations), # location |
|
random.choice(addresses), # location_label |
|
random.choice(assets), # asset |
|
str(random.uniform(0.1, 1000.0)), # amount |
|
f'Note {i}', # notes |
|
random.choice(event_types), # type |
|
random.choice(subtypes), # subtype |
|
None, # extra_data |
|
1 if i % 5 == 0 else 0, # ignored |
|
) for i in range(1, NUM_ROWS + 1)] |
|
) |
|
|
|
# insert event_metrics |
|
conn.executemany( |
|
'INSERT INTO event_metrics VALUES (?,?,?,?,?,?,?)', |
|
[(i, # id |
|
i, # event_identifier |
|
random.choice(addresses), # location_label |
|
random.choice(protocols), # protocol |
|
random.choice(['balance', 'pnl']), # metric_key |
|
str(random.uniform(-1000, 10000)), # metric_value |
|
random.choice(assets), # asset |
|
) for i in range(1, NUM_ROWS + 1)] |
|
) |
|
|
|
conn.commit() |
|
conn.close() |
|
print(f'Created {base_db} ({base_db.stat().st_size / 1024 / 1024:.1f} MB)') |
|
|
|
|
|
def create_indexed_db(mode: str): |
|
"""Copy base DB and add indexes.""" |
|
base_db = DB_DIR / 'base.db' |
|
target_db = DB_DIR / f'{mode}.db' |
|
|
|
if not base_db.exists(): |
|
print('Run setup first!') |
|
return |
|
|
|
shutil.copy(base_db, target_db) |
|
|
|
conn = sqlite3.connect(target_db) |
|
indexes = COMPOSITE_INDEXES if mode == 'composite' else SINGLE_INDEXES |
|
conn.executescript(indexes) |
|
conn.execute('ANALYZE') |
|
conn.commit() |
|
conn.close() |
|
print(f'Created {target_db} with {mode} indexes') |
|
|
|
|
|
def query(mode: str, query_name: str = 'balances'): |
|
db_path = DB_DIR / f'{mode}.db' |
|
conn = sqlite3.connect(db_path) |
|
sql = GET_BALANCES if query_name == 'balances' else GET_ASSETS_AMOUNTS |
|
conn.execute(sql).fetchall() |
|
conn.close() |
|
|
|
|
|
def main(): |
|
if len(sys.argv) < 2: |
|
print(__doc__) |
|
return |
|
|
|
cmd = sys.argv[1] |
|
if cmd == 'setup': |
|
setup() |
|
elif cmd == 'index': |
|
mode = sys.argv[2] if len(sys.argv) > 2 else 'single' |
|
create_indexed_db(mode) |
|
elif cmd == 'query': |
|
mode = sys.argv[2] if len(sys.argv) > 2 else 'single' |
|
query_name = sys.argv[3] if len(sys.argv) > 3 else 'balances' |
|
query(mode, query_name) |
|
else: |
|
print(__doc__) |
|
|
|
|
|
if __name__ == '__main__': |
|
main() |