Created
January 25, 2026 02:20
-
-
Save mzhang77/87e4fad6ac28e1daa80a20cede1aef59 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import mysql.connector | |
| import random | |
| import time | |
| # ================= 配置区域 ================= | |
| DB_CONFIG = { | |
| 'host': '127.0.0.1', # 替换 TiDB IP | |
| 'port': 4000, | |
| 'user': 'root', # 替换用户名 | |
| 'password': '', # 替换密码 | |
| 'database': 'test' | |
| } | |
| BATCH_SIZE = 1000 | |
| TOTAL_ROWS = 200000 | |
| HOT_ENTITY_ID = 'entity_heavy_burden' # 这个 ID 会有 20 万行数据 | |
| # =========================================== | |
| def get_conn(): | |
| return mysql.connector.connect(**DB_CONFIG) | |
| def setup_schema(): | |
| conn = get_conn() | |
| cursor = conn.cursor() | |
| print(">>> (Re)Creating Database and Schema...") | |
| cursor.execute(f"CREATE DATABASE IF NOT EXISTS {DB_CONFIG['database']}") | |
| cursor.execute(f"USE {DB_CONFIG['database']}") | |
| cursor.execute("DROP TABLE IF EXISTS edges") | |
| # 关键点:创建为了 "Option 2" 准备的复合索引 | |
| # 这种索引设计是为了让 UNION 改写最高效 | |
| sql = """ | |
| CREATE TABLE `edges` ( | |
| `id` BIGINT(20) NOT NULL AUTO_INCREMENT, | |
| `from_event_id` VARCHAR(64) NOT NULL, | |
| `to_entity_id` VARCHAR(64) NOT NULL, | |
| `to_entity_type` INT(11) NOT NULL, | |
| `from_event_type` INT(11) NOT NULL, | |
| `client_id` VARCHAR(64) NOT NULL, | |
| `payload` VARCHAR(255) DEFAULT 'padding', | |
| PRIMARY KEY (`id`), | |
| -- 1. 基础索引 (Option 1 可能会误用的) | |
| KEY `idx_to_entity_id` (`to_entity_id`), | |
| -- 2. Option 2 期望使用的复合索引 (Perfect Covering for each OR branch) | |
| KEY `idx_merge_type` (`to_entity_id`, `to_entity_type`), | |
| KEY `idx_merge_event` (`to_entity_id`, `from_event_type`), | |
| KEY `idx_merge_client` (`to_entity_id`, `client_id`) | |
| ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; | |
| """ | |
| cursor.execute(sql) | |
| conn.commit() | |
| cursor.close() | |
| conn.close() | |
| def load_data(): | |
| conn = get_conn() | |
| cursor = conn.cursor() | |
| cursor.execute(f"USE {DB_CONFIG['database']}") | |
| print(f">>> Generating {TOTAL_ROWS} rows for entity '{HOT_ENTITY_ID}'...") | |
| print(" Scenario: High fan-out on 'to_entity_id', VERY sparse matches for OR conditions.") | |
| data_batch = [] | |
| start_time = time.time() | |
| for i in range(TOTAL_ROWS): | |
| # 默认全是噪音数据 (不满足任何 OR 条件) | |
| # to_entity_type = 99 (Target is 1,2,3) | |
| # from_event_type = 99 (Target is 1,2,3) | |
| # client_id = 'noise' (Target is 'abc') | |
| t_type = 99 | |
| f_type = 99 | |
| c_id = 'noise' | |
| # 极少数情况满足条件 (模拟稀疏数据,强迫优化器做抉择) | |
| # 每 10,000 行插入一个满足条件的,总共才 20 个满足 | |
| if i % 10000 == 0: | |
| case = random.randint(0, 2) | |
| if case == 0: t_type = 1 | |
| elif case == 1: f_type = 1 | |
| else: c_id = 'abc' | |
| row = ( | |
| f"event_{i}", | |
| HOT_ENTITY_ID, | |
| t_type, | |
| f_type, | |
| c_id, | |
| "x" * 50 | |
| ) | |
| data_batch.append(row) | |
| if len(data_batch) >= BATCH_SIZE: | |
| stmt = """ | |
| INSERT INTO edges | |
| (from_event_id, to_entity_id, to_entity_type, from_event_type, client_id, payload) | |
| VALUES (%s, %s, %s, %s, %s, %s) | |
| """ | |
| cursor.executemany(stmt, data_batch) | |
| conn.commit() | |
| data_batch = [] | |
| if data_batch: | |
| cursor.executemany(stmt, data_batch) | |
| conn.commit() | |
| print(f">>> Data loaded in {time.time() - start_time:.2f} s.") | |
| # 必须执行 Analyze,否则 IndexMerge 几乎不可能被触发 | |
| print(">>> Running ANALYZE TABLE...") | |
| cursor.execute("ANALYZE TABLE edges") | |
| print(">>> Done.") | |
| cursor.close() | |
| conn.close() | |
| if __name__ == "__main__": | |
| setup_schema() | |
| load_data() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment