Skip to content

Instantly share code, notes, and snippets.

@Thelta
Last active September 14, 2025 17:26
Show Gist options
  • Select an option

  • Save Thelta/3ec36230a75adc98f67bbb37ae1c8954 to your computer and use it in GitHub Desktop.

Select an option

Save Thelta/3ec36230a75adc98f67bbb37ae1c8954 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
Script to merge TSV files by type (ranks, teams, users) and remove duplicates.
Processes data/external/raw/midokuni-*/{number} directories.
"""
from collections import OrderedDict
import os
import re
import json
import shutil
import logging
from pathlib import Path
from typing import List, Set
import argparse
def find_folders_and_files(base_dir: Path, season: int = None) -> dict:
"""Find all folders and their TSV files organized by region and number."""
folders = {}
# Look for nxm-*-bagl and as directories
for target_dir in list(base_dir.glob("nxm-*-bagl")) + list(base_dir.glob("as")):
if not target_dir.is_dir():
continue
region = target_dir.name.lower()
# Look for numbered subdirectories
for numbered_dir in target_dir.glob("*"):
if not numbered_dir.is_dir() or not numbered_dir.name.isdigit():
continue
number = numbered_dir.name
# Filter by season if specified
if season and int(number) != season:
continue
key = (region, number)
if key not in folders:
folders[key] = {'ranks': [], 'teams': [], 'users': []}
# Find TSV files by type
for file_type in ['ranks', 'teams', 'users']:
pattern = f"*{file_type}*.tsv"
# Read info.json to get end date
info_file = numbered_dir / "info.json"
end_date = None
if info_file.exists():
try:
with open(info_file, 'r') as f:
info = json.load(f)
end_date = info.get('end', '')[:10] # Get YYYY-MM-DD part
except:
pass
for tsv_file in sorted(numbered_dir.glob(pattern), reverse=True):
# Extract date from filename (YYYY-MM-DD format)
filename_date = None
date_match = re.search(r'(\d{4}-\d{2}-\d{2})', tsv_file.name)
if date_match:
filename_date = date_match.group(1)
# Only include files where end_date >= filename_date
if end_date and filename_date:
if filename_date >= end_date :
folders[key][file_type].append(tsv_file)
else:
# If no date info, include all files
folders[key][file_type].append(tsv_file)
return folders
def read_tsv_lines(file_path: Path) -> List[str]:
"""Read TSV file and return list of lines (excluding empty lines)."""
lines = []
try:
with open(file_path, 'rb') as f:
content = f.read()
# Try to decode with different encodings
for encoding in ['utf-8', 'utf-16', 'utf-16le', 'utf-16be']:
try:
text = content.decode(encoding)
for line in text.splitlines():
if line: # Only skip truly empty lines
lines.append(line)
break
except UnicodeDecodeError:
continue
else:
logging.error(f"Could not decode {file_path} with any encoding")
except Exception as e:
logging.error(f"Error reading {file_path}: {e}")
return lines
def merge_and_deduplicate(tsv_files: List[Path], file_type: str = None, ranks_order: List[str] = None, raid_type: str = None) -> List[str]:
"""Merge TSV files and remove duplicate lines."""
all_lines = []
seen_lines: Set[str] = set()
seen_ranks_second_col: Set[str] = set()
seen_teams_first_two: Set[str] = set()
seen_users_second_col: Set[str] = set()
for tsv_file in tsv_files:
logging.info(f"Processing: {tsv_file}")
lines = read_tsv_lines(tsv_file)
for line in lines:
should_add = True
if file_type == "ranks":
# For ranks, check duplication by second column (user_id)
parts = line.split('\t')
if len(parts) > 1:
user_id = parts[1]
if user_id in seen_ranks_second_col:
should_add = False
logging.debug(f" WARNING: Duplicate user_id {user_id} found in ranks")
else:
seen_ranks_second_col.add(user_id)
elif file_type == "teams":
# For teams, check duplication differently based on raid type
parts = line.split('\t')
if len(parts) > 2:
if raid_type == "GrandAssault":
# For GrandAssault: user_id + armor_type + team_number
key = f"{parts[0]}\t{parts[1]}\t{parts[2]}"
else:
# For TotalAssault: user_id + team_number
key = f"{parts[0]}\t{parts[1]}"
if key in seen_teams_first_two:
should_add = False
logging.debug(f" WARNING: Duplicate team entry {key} found in teams")
else:
seen_teams_first_two.add(key)
elif file_type == "users":
# For users, check duplication by second column
parts = line.split('\t')
if len(parts) > 1:
user_id = parts[1]
if user_id in seen_users_second_col:
should_add = False
logging.debug(f" WARNING: Duplicate user_id {user_id} found in users")
else:
seen_users_second_col.add(user_id)
else:
logging.error("Invalid line")
else:
# For other types, check full line duplication
if line in seen_lines:
should_add = False
else:
seen_lines.add(line)
if should_add:
all_lines.append(line)
# Sort ranks by first column (rank number)
if file_type == "ranks":
all_lines.sort(key=lambda line: int(line.split('\t')[0]) if line.split('\t')[0].isdigit() else float('inf'))
elif file_type == "users" and ranks_order:
# Sort users to match ranks order by user_id
user_lines = {line.split('\t')[1]: line for line in all_lines if len(line.split('\t')) > 1}
all_lines = [user_lines[user_id] for user_id in ranks_order if user_id in user_lines]
elif file_type == "teams" and ranks_order:
# Sort teams to match ranks order by user_id (first column of teams)
if raid_type == "GrandAssault":
info_file = tsv_files[0].parent / "info.json"
with open(info_file, 'r') as f:
info = json.load(f, object_pairs_hook=OrderedDict)
armors = list(info["playable"])
from collections import defaultdict
team_lines = defaultdict(list)
for line in all_lines:
if len(line.split('\t')) > 0:
team_lines[line.split('\t')[0]].append(line)
# Sort each user's teams
for user_id in team_lines:
if raid_type == "GrandAssault":
team_lines[user_id].sort(key=lambda line: (
armors.index(line.split('\t')[1]) if len(line.split('\t')) > 1 and line.split('\t')[1] in armors else float('inf'),
int(line.split('\t')[2]) if len(line.split('\t')) > 2 and line.split('\t')[2].isdigit() else float('inf')
))
else: # TotalAssault
team_lines[user_id].sort(key=lambda line: int(line.split('\t')[1]) if len(line.split('\t')) > 1 and line.split('\t')[1].isdigit() else float('inf'))
all_lines = [line for user_id in ranks_order if user_id in team_lines for line in team_lines[user_id]]
return all_lines
def validate_merged_files(ranks_lines: List[str], users_lines: List[str], teams_lines: List[str], raid_type: str = None):
"""Validate consistency between merged files."""
logging.info("\n=== Post-process validation ===")
# Extract data
ranks_data = [line.split('\t') for line in ranks_lines if line.split('\t')]
users_data = [line.split('\t') for line in users_lines if line.split('\t')]
teams_data = [line.split('\t') for line in teams_lines if line.split('\t')]
# 1. Check consecutive ranks
rank_numbers = [int(parts[0]) for parts in ranks_data if parts[0].isdigit()]
if rank_numbers:
expected_ranks = set(range(1, max(rank_numbers) + 1))
actual_ranks = set(rank_numbers)
missing_ranks = expected_ranks - actual_ranks
if missing_ranks:
logging.warning(f"WARNING: Missing ranks in teams: {sorted(missing_ranks)}")
# 2. Check user IDs between ranks and users
ranks_user_ids = {parts[1] for parts in ranks_data if len(parts) > 1}
users_user_ids = {parts[1] for parts in users_data if len(parts) > 1}
missing_in_users = ranks_user_ids - users_user_ids
if missing_in_users:
logging.warning(f"WARNING: User IDs in ranks but missing in users: {missing_in_users}")
# 3. Check user IDs between ranks and teams (only up to last team user's rank)
teams_user_ids = {parts[0] for parts in teams_data if len(parts) > 0}
if teams_data:
last_team_user_id = teams_data[-1][0]
last_rank = next((int(parts[0]) for parts in ranks_data if len(parts) > 1 and parts[1] == last_team_user_id and parts[0].isdigit()), None)
if last_rank:
filtered_ranks_user_ids = {parts[1] for parts in ranks_data if len(parts) > 1 and parts[0].isdigit() and int(parts[0]) <= last_rank}
else:
filtered_ranks_user_ids = ranks_user_ids
else:
filtered_ranks_user_ids = ranks_user_ids
missing_in_teams = filtered_ranks_user_ids - teams_user_ids
if missing_in_teams:
logging.warning(f"WARNING: User IDs in ranks but missing in teams: {missing_in_teams}")
# 4. Check consecutive team numbers per user
if raid_type == "GrandAssault":
# For GrandAssault, check per user+armor combination
from collections import defaultdict
user_armor_teams = defaultdict(list)
for parts in teams_data:
if len(parts) > 2 and parts[2].isdigit():
key = f"{parts[0]}_{parts[1]}" # user_id + armor_type
user_armor_teams[key].append(int(parts[2]))
for user_armor, team_nums in user_armor_teams.items():
if team_nums:
expected_teams = set(range(1, max(team_nums) + 1))
actual_teams = set(team_nums)
missing_teams = expected_teams - actual_teams
if missing_teams:
logging.warning(f"WARNING: Missing team numbers for {user_armor}: {sorted(missing_teams)}")
else:
# For TotalAssault, check per user
from collections import defaultdict
user_teams = defaultdict(list)
for parts in teams_data:
if len(parts) > 1 and parts[1].isdigit():
user_teams[parts[0]].append(int(parts[1]))
for user_id, team_nums in user_teams.items():
if team_nums:
expected_teams = set(range(1, max(team_nums) + 1))
actual_teams = set(team_nums)
missing_teams = expected_teams - actual_teams
if missing_teams:
logging.warning(f"WARNING: Missing team numbers for user {user_id}: {sorted(missing_teams)}")
def write_merged_file(output_path: Path, lines: List[str]):
"""Write merged lines to output file."""
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, 'w', encoding='utf-8') as f:
for line in lines:
f.write(line + '\n')
logging.info(f"Written {len(lines)} unique lines to {output_path}")
def main():
# Setup logging
logging.basicConfig(level=logging.INFO, format='%(message)s')
parser = argparse.ArgumentParser(description="Merge TSV files by type and remove duplicates")
parser.add_argument("--base-dir", default="data/external/raw",
help="Base directory containing midokuni-* folders")
parser.add_argument("--output-dir", default="merged_tsv",
help="Output directory for merged files")
parser.add_argument("--raid-type", choices=["TotalAssault", "GrandAssault"],
help="Raid type")
parser.add_argument("--season", type=int,
help="Season number")
args = parser.parse_args()
base_dir = Path(args.base_dir) / args.raid_type
output_dir = Path(args.output_dir)
if not base_dir.exists():
logging.error(f"Base directory {base_dir} does not exist")
return
# Find all folders and their files
folders = find_folders_and_files(base_dir, args.season)
if not folders:
logging.error("No folders found")
return
# Process each folder separately
region_files = {}
for (region, number), file_types in folders.items():
logging.info(f"\n=== Processing {region}/{number} ===")
# Create output directory for this folder
folder_output_dir = output_dir / region / number
# Process each file type in specific order
ranks_order = None
merged_files = {}
for file_type in ["ranks", "users", "teams"]:
tsv_files = file_types[file_type]
if not tsv_files:
logging.warning(f"No {file_type} files found in {region}/{number}")
continue
logging.info(f"Found {len(tsv_files)} {file_type} files")
# Merge and deduplicate
merged_lines = merge_and_deduplicate(tsv_files, file_type, ranks_order, args.raid_type)
merged_files[file_type] = merged_lines
# Store ranks order for users sorting
if file_type == "ranks":
ranks_order = [line.split('\t')[1] for line in merged_lines if len(line.split('\t')) > 1]
# Write output file
output_file = folder_output_dir / f"merged_{file_type}.tsv"
write_merged_file(output_file, merged_lines)
# Copy info.json if it exists
if file_types["ranks"]:
info_file = file_types["ranks"][0].parent / "info.json"
if info_file.exists():
shutil.copy2(info_file, folder_output_dir / "info.json")
# Validate per folder (not mixing regions)
if all(ft in merged_files for ft in ["ranks", "users", "teams"]):
logging.info(f"\n=== Validating {region}/{number} ===")
validate_merged_files(merged_files["ranks"], merged_files["users"], merged_files["teams"], args.raid_type)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment