Created
October 30, 2025 04:39
-
-
Save jasonnerothin/26d282d7332684a70103e04ccd58a8c5 to your computer and use it in GitHub Desktop.
Hungarian Algo
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import numpy as np | |
| from scipy.optimize import linear_sum_assignment | |
| def best_intercepts_hungarian( | |
| all_intercepts: List[Dict[str, Any]], | |
| strikers: List[Dict[str, Any]], | |
| optimization_metric: str = 'intercept_time' | |
| ) -> Dict[str, Dict[str, Any]]: | |
| """ | |
| Use Hungarian algorithm to find globally optimal striker-target assignments. | |
| This minimizes the TOTAL sum of the optimization metric across all assignments, | |
| rather than greedily picking the best for each striker sequentially. | |
| Args: | |
| all_intercepts: List of all possible intercept options | |
| Each dict should have: 'interceptor', 'target', 'intercept_time', etc. | |
| strikers: List of striker drones (used to ensure all get assignments) | |
| optimization_metric: Which metric to minimize | |
| - 'intercept_time': Minimize total time (default) | |
| - 'travel_distance_km': Minimize total distance | |
| - 'intercept_time_weighted': Prefer real telemetry | |
| Returns: | |
| Dictionary mapping striker IDs to their optimal intercept assignment | |
| Example: | |
| >>> intercepts = [ | |
| ... {'interceptor': 's1', 'target': 't1', 'intercept_time': 100}, | |
| ... {'interceptor': 's1', 'target': 't2', 'intercept_time': 150}, | |
| ... {'interceptor': 's2', 'target': 't1', 'intercept_time': 120}, | |
| ... {'interceptor': 's2', 'target': 't2', 'intercept_time': 90} | |
| ... ] | |
| >>> result = best_intercepts_hungarian(intercepts, strikers) | |
| >>> # Returns optimal global assignment | |
| """ | |
| if not all_intercepts: | |
| logger.warning("No intercepts provided for Hungarian algorithm") | |
| return {} | |
| # Group by interceptor | |
| by_interceptor = {} | |
| for intercept in all_intercepts: | |
| interceptor = intercept['interceptor'] | |
| if interceptor not in by_interceptor: | |
| by_interceptor[interceptor] = [] | |
| by_interceptor[interceptor].append(intercept) | |
| # Get unique strikers and targets | |
| striker_ids = [s['drone_id'] for s in strikers] | |
| all_targets = set() | |
| for intercept in all_intercepts: | |
| all_targets.add(intercept['target']) | |
| target_ids = sorted(all_targets) | |
| if not striker_ids or not target_ids: | |
| logger.warning("No strikers or targets for Hungarian algorithm") | |
| return {} | |
| logger.info(f"Hungarian algorithm: {len(striker_ids)} strikers, {len(target_ids)} targets") | |
| # Build cost matrix | |
| n_strikers = len(striker_ids) | |
| n_targets = len(target_ids) | |
| n = max(n_strikers, n_targets) | |
| # Initialize with very high cost (impossible assignment) | |
| IMPOSSIBLE_COST = 1e9 | |
| cost_matrix = np.full((n, n), IMPOSSIBLE_COST) | |
| # Store intercept data for lookup after assignment | |
| intercept_lookup = {} | |
| # Fill cost matrix with actual intercept costs | |
| for i, striker_id in enumerate(striker_ids): | |
| if striker_id not in by_interceptor: | |
| continue | |
| for intercept in by_interceptor[striker_id]: | |
| target = intercept['target'] | |
| j = target_ids.index(target) | |
| # Calculate cost based on optimization metric | |
| if optimization_metric == 'intercept_time': | |
| cost = intercept['intercept_time'] | |
| elif optimization_metric == 'travel_distance_km': | |
| cost = intercept['travel_distance_km'] | |
| elif optimization_metric == 'intercept_time_weighted': | |
| # Prefer assignments with real telemetry | |
| cost = intercept['intercept_time'] | |
| if not intercept.get('using_real_telemetry', False): | |
| cost *= 1.1 # 10% penalty for no telemetry | |
| else: | |
| logger.warning(f"Unknown metric '{optimization_metric}', using intercept_time") | |
| cost = intercept['intercept_time'] | |
| cost_matrix[i, j] = cost | |
| intercept_lookup[(striker_id, target)] = intercept | |
| logger.debug(f"Cost matrix shape: {cost_matrix.shape}") | |
| logger.debug(f"Cost matrix:\n{cost_matrix[:n_strikers, :n_targets]}") | |
| # Run Hungarian algorithm (finds minimum cost assignment) | |
| row_indices, col_indices = linear_sum_assignment(cost_matrix) | |
| # Build result dictionary | |
| result = {} | |
| total_cost = 0 | |
| for i, j in zip(row_indices, col_indices): | |
| # Only include real assignments (not dummy ones) | |
| if i < n_strikers and j < n_targets: | |
| striker_id = striker_ids[i] | |
| target = target_ids[j] | |
| # Check if this is a real assignment (not impossible) | |
| if cost_matrix[i, j] < IMPOSSIBLE_COST / 2: | |
| intercept = intercept_lookup.get((striker_id, target)) | |
| if intercept: | |
| result[striker_id] = intercept | |
| total_cost += cost_matrix[i, j] | |
| telemetry_indicator = "✓" if intercept.get('using_real_telemetry') else "⚠" | |
| logger.info(f"Hungarian assigned {striker_id} → {target}: " | |
| f"{intercept['intercept_time']:.0f}s, " | |
| f"{intercept['travel_distance_km']:.1f}km {telemetry_indicator}") | |
| logger.info(f"Hungarian total {optimization_metric}: {total_cost:.1f}") | |
| logger.info(f"Hungarian assigned {len(result)}/{len(striker_ids)} strikers") | |
| return result |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment