Last active
September 14, 2025 17:26
-
-
Save Thelta/3ec36230a75adc98f67bbb37ae1c8954 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Script to merge TSV files by type (ranks, teams, users) and remove duplicates. | |
| Processes data/external/raw/midokuni-*/{number} directories. | |
| """ | |
| from collections import OrderedDict | |
| import os | |
| import re | |
| import json | |
| import shutil | |
| import logging | |
| from pathlib import Path | |
| from typing import List, Set | |
| import argparse | |
| def find_folders_and_files(base_dir: Path, season: int = None) -> dict: | |
| """Find all folders and their TSV files organized by region and number.""" | |
| folders = {} | |
| # Look for nxm-*-bagl and as directories | |
| for target_dir in list(base_dir.glob("nxm-*-bagl")) + list(base_dir.glob("as")): | |
| if not target_dir.is_dir(): | |
| continue | |
| region = target_dir.name.lower() | |
| # Look for numbered subdirectories | |
| for numbered_dir in target_dir.glob("*"): | |
| if not numbered_dir.is_dir() or not numbered_dir.name.isdigit(): | |
| continue | |
| number = numbered_dir.name | |
| # Filter by season if specified | |
| if season and int(number) != season: | |
| continue | |
| key = (region, number) | |
| if key not in folders: | |
| folders[key] = {'ranks': [], 'teams': [], 'users': []} | |
| # Find TSV files by type | |
| for file_type in ['ranks', 'teams', 'users']: | |
| pattern = f"*{file_type}*.tsv" | |
| # Read info.json to get end date | |
| info_file = numbered_dir / "info.json" | |
| end_date = None | |
| if info_file.exists(): | |
| try: | |
| with open(info_file, 'r') as f: | |
| info = json.load(f) | |
| end_date = info.get('end', '')[:10] # Get YYYY-MM-DD part | |
| except: | |
| pass | |
| for tsv_file in sorted(numbered_dir.glob(pattern), reverse=True): | |
| # Extract date from filename (YYYY-MM-DD format) | |
| filename_date = None | |
| date_match = re.search(r'(\d{4}-\d{2}-\d{2})', tsv_file.name) | |
| if date_match: | |
| filename_date = date_match.group(1) | |
| # Only include files where end_date >= filename_date | |
| if end_date and filename_date: | |
| if filename_date >= end_date : | |
| folders[key][file_type].append(tsv_file) | |
| else: | |
| # If no date info, include all files | |
| folders[key][file_type].append(tsv_file) | |
| return folders | |
| def read_tsv_lines(file_path: Path) -> List[str]: | |
| """Read TSV file and return list of lines (excluding empty lines).""" | |
| lines = [] | |
| try: | |
| with open(file_path, 'rb') as f: | |
| content = f.read() | |
| # Try to decode with different encodings | |
| for encoding in ['utf-8', 'utf-16', 'utf-16le', 'utf-16be']: | |
| try: | |
| text = content.decode(encoding) | |
| for line in text.splitlines(): | |
| if line: # Only skip truly empty lines | |
| lines.append(line) | |
| break | |
| except UnicodeDecodeError: | |
| continue | |
| else: | |
| logging.error(f"Could not decode {file_path} with any encoding") | |
| except Exception as e: | |
| logging.error(f"Error reading {file_path}: {e}") | |
| return lines | |
| def merge_and_deduplicate(tsv_files: List[Path], file_type: str = None, ranks_order: List[str] = None, raid_type: str = None) -> List[str]: | |
| """Merge TSV files and remove duplicate lines.""" | |
| all_lines = [] | |
| seen_lines: Set[str] = set() | |
| seen_ranks_second_col: Set[str] = set() | |
| seen_teams_first_two: Set[str] = set() | |
| seen_users_second_col: Set[str] = set() | |
| for tsv_file in tsv_files: | |
| logging.info(f"Processing: {tsv_file}") | |
| lines = read_tsv_lines(tsv_file) | |
| for line in lines: | |
| should_add = True | |
| if file_type == "ranks": | |
| # For ranks, check duplication by second column (user_id) | |
| parts = line.split('\t') | |
| if len(parts) > 1: | |
| user_id = parts[1] | |
| if user_id in seen_ranks_second_col: | |
| should_add = False | |
| logging.debug(f" WARNING: Duplicate user_id {user_id} found in ranks") | |
| else: | |
| seen_ranks_second_col.add(user_id) | |
| elif file_type == "teams": | |
| # For teams, check duplication differently based on raid type | |
| parts = line.split('\t') | |
| if len(parts) > 2: | |
| if raid_type == "GrandAssault": | |
| # For GrandAssault: user_id + armor_type + team_number | |
| key = f"{parts[0]}\t{parts[1]}\t{parts[2]}" | |
| else: | |
| # For TotalAssault: user_id + team_number | |
| key = f"{parts[0]}\t{parts[1]}" | |
| if key in seen_teams_first_two: | |
| should_add = False | |
| logging.debug(f" WARNING: Duplicate team entry {key} found in teams") | |
| else: | |
| seen_teams_first_two.add(key) | |
| elif file_type == "users": | |
| # For users, check duplication by second column | |
| parts = line.split('\t') | |
| if len(parts) > 1: | |
| user_id = parts[1] | |
| if user_id in seen_users_second_col: | |
| should_add = False | |
| logging.debug(f" WARNING: Duplicate user_id {user_id} found in users") | |
| else: | |
| seen_users_second_col.add(user_id) | |
| else: | |
| logging.error("Invalid line") | |
| else: | |
| # For other types, check full line duplication | |
| if line in seen_lines: | |
| should_add = False | |
| else: | |
| seen_lines.add(line) | |
| if should_add: | |
| all_lines.append(line) | |
| # Sort ranks by first column (rank number) | |
| if file_type == "ranks": | |
| all_lines.sort(key=lambda line: int(line.split('\t')[0]) if line.split('\t')[0].isdigit() else float('inf')) | |
| elif file_type == "users" and ranks_order: | |
| # Sort users to match ranks order by user_id | |
| user_lines = {line.split('\t')[1]: line for line in all_lines if len(line.split('\t')) > 1} | |
| all_lines = [user_lines[user_id] for user_id in ranks_order if user_id in user_lines] | |
| elif file_type == "teams" and ranks_order: | |
| # Sort teams to match ranks order by user_id (first column of teams) | |
| if raid_type == "GrandAssault": | |
| info_file = tsv_files[0].parent / "info.json" | |
| with open(info_file, 'r') as f: | |
| info = json.load(f, object_pairs_hook=OrderedDict) | |
| armors = list(info["playable"]) | |
| from collections import defaultdict | |
| team_lines = defaultdict(list) | |
| for line in all_lines: | |
| if len(line.split('\t')) > 0: | |
| team_lines[line.split('\t')[0]].append(line) | |
| # Sort each user's teams | |
| for user_id in team_lines: | |
| if raid_type == "GrandAssault": | |
| team_lines[user_id].sort(key=lambda line: ( | |
| armors.index(line.split('\t')[1]) if len(line.split('\t')) > 1 and line.split('\t')[1] in armors else float('inf'), | |
| int(line.split('\t')[2]) if len(line.split('\t')) > 2 and line.split('\t')[2].isdigit() else float('inf') | |
| )) | |
| else: # TotalAssault | |
| team_lines[user_id].sort(key=lambda line: int(line.split('\t')[1]) if len(line.split('\t')) > 1 and line.split('\t')[1].isdigit() else float('inf')) | |
| all_lines = [line for user_id in ranks_order if user_id in team_lines for line in team_lines[user_id]] | |
| return all_lines | |
| def validate_merged_files(ranks_lines: List[str], users_lines: List[str], teams_lines: List[str], raid_type: str = None): | |
| """Validate consistency between merged files.""" | |
| logging.info("\n=== Post-process validation ===") | |
| # Extract data | |
| ranks_data = [line.split('\t') for line in ranks_lines if line.split('\t')] | |
| users_data = [line.split('\t') for line in users_lines if line.split('\t')] | |
| teams_data = [line.split('\t') for line in teams_lines if line.split('\t')] | |
| # 1. Check consecutive ranks | |
| rank_numbers = [int(parts[0]) for parts in ranks_data if parts[0].isdigit()] | |
| if rank_numbers: | |
| expected_ranks = set(range(1, max(rank_numbers) + 1)) | |
| actual_ranks = set(rank_numbers) | |
| missing_ranks = expected_ranks - actual_ranks | |
| if missing_ranks: | |
| logging.warning(f"WARNING: Missing ranks in teams: {sorted(missing_ranks)}") | |
| # 2. Check user IDs between ranks and users | |
| ranks_user_ids = {parts[1] for parts in ranks_data if len(parts) > 1} | |
| users_user_ids = {parts[1] for parts in users_data if len(parts) > 1} | |
| missing_in_users = ranks_user_ids - users_user_ids | |
| if missing_in_users: | |
| logging.warning(f"WARNING: User IDs in ranks but missing in users: {missing_in_users}") | |
| # 3. Check user IDs between ranks and teams (only up to last team user's rank) | |
| teams_user_ids = {parts[0] for parts in teams_data if len(parts) > 0} | |
| if teams_data: | |
| last_team_user_id = teams_data[-1][0] | |
| last_rank = next((int(parts[0]) for parts in ranks_data if len(parts) > 1 and parts[1] == last_team_user_id and parts[0].isdigit()), None) | |
| if last_rank: | |
| filtered_ranks_user_ids = {parts[1] for parts in ranks_data if len(parts) > 1 and parts[0].isdigit() and int(parts[0]) <= last_rank} | |
| else: | |
| filtered_ranks_user_ids = ranks_user_ids | |
| else: | |
| filtered_ranks_user_ids = ranks_user_ids | |
| missing_in_teams = filtered_ranks_user_ids - teams_user_ids | |
| if missing_in_teams: | |
| logging.warning(f"WARNING: User IDs in ranks but missing in teams: {missing_in_teams}") | |
| # 4. Check consecutive team numbers per user | |
| if raid_type == "GrandAssault": | |
| # For GrandAssault, check per user+armor combination | |
| from collections import defaultdict | |
| user_armor_teams = defaultdict(list) | |
| for parts in teams_data: | |
| if len(parts) > 2 and parts[2].isdigit(): | |
| key = f"{parts[0]}_{parts[1]}" # user_id + armor_type | |
| user_armor_teams[key].append(int(parts[2])) | |
| for user_armor, team_nums in user_armor_teams.items(): | |
| if team_nums: | |
| expected_teams = set(range(1, max(team_nums) + 1)) | |
| actual_teams = set(team_nums) | |
| missing_teams = expected_teams - actual_teams | |
| if missing_teams: | |
| logging.warning(f"WARNING: Missing team numbers for {user_armor}: {sorted(missing_teams)}") | |
| else: | |
| # For TotalAssault, check per user | |
| from collections import defaultdict | |
| user_teams = defaultdict(list) | |
| for parts in teams_data: | |
| if len(parts) > 1 and parts[1].isdigit(): | |
| user_teams[parts[0]].append(int(parts[1])) | |
| for user_id, team_nums in user_teams.items(): | |
| if team_nums: | |
| expected_teams = set(range(1, max(team_nums) + 1)) | |
| actual_teams = set(team_nums) | |
| missing_teams = expected_teams - actual_teams | |
| if missing_teams: | |
| logging.warning(f"WARNING: Missing team numbers for user {user_id}: {sorted(missing_teams)}") | |
| def write_merged_file(output_path: Path, lines: List[str]): | |
| """Write merged lines to output file.""" | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| with open(output_path, 'w', encoding='utf-8') as f: | |
| for line in lines: | |
| f.write(line + '\n') | |
| logging.info(f"Written {len(lines)} unique lines to {output_path}") | |
| def main(): | |
| # Setup logging | |
| logging.basicConfig(level=logging.INFO, format='%(message)s') | |
| parser = argparse.ArgumentParser(description="Merge TSV files by type and remove duplicates") | |
| parser.add_argument("--base-dir", default="data/external/raw", | |
| help="Base directory containing midokuni-* folders") | |
| parser.add_argument("--output-dir", default="merged_tsv", | |
| help="Output directory for merged files") | |
| parser.add_argument("--raid-type", choices=["TotalAssault", "GrandAssault"], | |
| help="Raid type") | |
| parser.add_argument("--season", type=int, | |
| help="Season number") | |
| args = parser.parse_args() | |
| base_dir = Path(args.base_dir) / args.raid_type | |
| output_dir = Path(args.output_dir) | |
| if not base_dir.exists(): | |
| logging.error(f"Base directory {base_dir} does not exist") | |
| return | |
| # Find all folders and their files | |
| folders = find_folders_and_files(base_dir, args.season) | |
| if not folders: | |
| logging.error("No folders found") | |
| return | |
| # Process each folder separately | |
| region_files = {} | |
| for (region, number), file_types in folders.items(): | |
| logging.info(f"\n=== Processing {region}/{number} ===") | |
| # Create output directory for this folder | |
| folder_output_dir = output_dir / region / number | |
| # Process each file type in specific order | |
| ranks_order = None | |
| merged_files = {} | |
| for file_type in ["ranks", "users", "teams"]: | |
| tsv_files = file_types[file_type] | |
| if not tsv_files: | |
| logging.warning(f"No {file_type} files found in {region}/{number}") | |
| continue | |
| logging.info(f"Found {len(tsv_files)} {file_type} files") | |
| # Merge and deduplicate | |
| merged_lines = merge_and_deduplicate(tsv_files, file_type, ranks_order, args.raid_type) | |
| merged_files[file_type] = merged_lines | |
| # Store ranks order for users sorting | |
| if file_type == "ranks": | |
| ranks_order = [line.split('\t')[1] for line in merged_lines if len(line.split('\t')) > 1] | |
| # Write output file | |
| output_file = folder_output_dir / f"merged_{file_type}.tsv" | |
| write_merged_file(output_file, merged_lines) | |
| # Copy info.json if it exists | |
| if file_types["ranks"]: | |
| info_file = file_types["ranks"][0].parent / "info.json" | |
| if info_file.exists(): | |
| shutil.copy2(info_file, folder_output_dir / "info.json") | |
| # Validate per folder (not mixing regions) | |
| if all(ft in merged_files for ft in ["ranks", "users", "teams"]): | |
| logging.info(f"\n=== Validating {region}/{number} ===") | |
| validate_merged_files(merged_files["ranks"], merged_files["users"], merged_files["teams"], args.raid_type) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment