Skip to content

Instantly share code, notes, and snippets.

@mzhang77
Created January 25, 2026 02:20
Show Gist options
  • Select an option

  • Save mzhang77/87e4fad6ac28e1daa80a20cede1aef59 to your computer and use it in GitHub Desktop.

Select an option

Save mzhang77/87e4fad6ac28e1daa80a20cede1aef59 to your computer and use it in GitHub Desktop.
import mysql.connector
import random
import time
# ================= 配置区域 =================
DB_CONFIG = {
'host': '127.0.0.1', # 替换 TiDB IP
'port': 4000,
'user': 'root', # 替换用户名
'password': '', # 替换密码
'database': 'test'
}
BATCH_SIZE = 1000
TOTAL_ROWS = 200000
HOT_ENTITY_ID = 'entity_heavy_burden' # 这个 ID 会有 20 万行数据
# ===========================================
def get_conn():
return mysql.connector.connect(**DB_CONFIG)
def setup_schema():
conn = get_conn()
cursor = conn.cursor()
print(">>> (Re)Creating Database and Schema...")
cursor.execute(f"CREATE DATABASE IF NOT EXISTS {DB_CONFIG['database']}")
cursor.execute(f"USE {DB_CONFIG['database']}")
cursor.execute("DROP TABLE IF EXISTS edges")
# 关键点:创建为了 "Option 2" 准备的复合索引
# 这种索引设计是为了让 UNION 改写最高效
sql = """
CREATE TABLE `edges` (
`id` BIGINT(20) NOT NULL AUTO_INCREMENT,
`from_event_id` VARCHAR(64) NOT NULL,
`to_entity_id` VARCHAR(64) NOT NULL,
`to_entity_type` INT(11) NOT NULL,
`from_event_type` INT(11) NOT NULL,
`client_id` VARCHAR(64) NOT NULL,
`payload` VARCHAR(255) DEFAULT 'padding',
PRIMARY KEY (`id`),
-- 1. 基础索引 (Option 1 可能会误用的)
KEY `idx_to_entity_id` (`to_entity_id`),
-- 2. Option 2 期望使用的复合索引 (Perfect Covering for each OR branch)
KEY `idx_merge_type` (`to_entity_id`, `to_entity_type`),
KEY `idx_merge_event` (`to_entity_id`, `from_event_type`),
KEY `idx_merge_client` (`to_entity_id`, `client_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
"""
cursor.execute(sql)
conn.commit()
cursor.close()
conn.close()
def load_data():
conn = get_conn()
cursor = conn.cursor()
cursor.execute(f"USE {DB_CONFIG['database']}")
print(f">>> Generating {TOTAL_ROWS} rows for entity '{HOT_ENTITY_ID}'...")
print(" Scenario: High fan-out on 'to_entity_id', VERY sparse matches for OR conditions.")
data_batch = []
start_time = time.time()
for i in range(TOTAL_ROWS):
# 默认全是噪音数据 (不满足任何 OR 条件)
# to_entity_type = 99 (Target is 1,2,3)
# from_event_type = 99 (Target is 1,2,3)
# client_id = 'noise' (Target is 'abc')
t_type = 99
f_type = 99
c_id = 'noise'
# 极少数情况满足条件 (模拟稀疏数据,强迫优化器做抉择)
# 每 10,000 行插入一个满足条件的,总共才 20 个满足
if i % 10000 == 0:
case = random.randint(0, 2)
if case == 0: t_type = 1
elif case == 1: f_type = 1
else: c_id = 'abc'
row = (
f"event_{i}",
HOT_ENTITY_ID,
t_type,
f_type,
c_id,
"x" * 50
)
data_batch.append(row)
if len(data_batch) >= BATCH_SIZE:
stmt = """
INSERT INTO edges
(from_event_id, to_entity_id, to_entity_type, from_event_type, client_id, payload)
VALUES (%s, %s, %s, %s, %s, %s)
"""
cursor.executemany(stmt, data_batch)
conn.commit()
data_batch = []
if data_batch:
cursor.executemany(stmt, data_batch)
conn.commit()
print(f">>> Data loaded in {time.time() - start_time:.2f} s.")
# 必须执行 Analyze,否则 IndexMerge 几乎不可能被触发
print(">>> Running ANALYZE TABLE...")
cursor.execute("ANALYZE TABLE edges")
print(">>> Done.")
cursor.close()
conn.close()
if __name__ == "__main__":
setup_schema()
load_data()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment