Skip to content

Instantly share code, notes, and snippets.

@jasonnerothin
Created October 30, 2025 04:39
Show Gist options
  • Select an option

  • Save jasonnerothin/26d282d7332684a70103e04ccd58a8c5 to your computer and use it in GitHub Desktop.

Select an option

Save jasonnerothin/26d282d7332684a70103e04ccd58a8c5 to your computer and use it in GitHub Desktop.
Hungarian Algo
import numpy as np
from scipy.optimize import linear_sum_assignment
def best_intercepts_hungarian(
all_intercepts: List[Dict[str, Any]],
strikers: List[Dict[str, Any]],
optimization_metric: str = 'intercept_time'
) -> Dict[str, Dict[str, Any]]:
"""
Use Hungarian algorithm to find globally optimal striker-target assignments.
This minimizes the TOTAL sum of the optimization metric across all assignments,
rather than greedily picking the best for each striker sequentially.
Args:
all_intercepts: List of all possible intercept options
Each dict should have: 'interceptor', 'target', 'intercept_time', etc.
strikers: List of striker drones (used to ensure all get assignments)
optimization_metric: Which metric to minimize
- 'intercept_time': Minimize total time (default)
- 'travel_distance_km': Minimize total distance
- 'intercept_time_weighted': Prefer real telemetry
Returns:
Dictionary mapping striker IDs to their optimal intercept assignment
Example:
>>> intercepts = [
... {'interceptor': 's1', 'target': 't1', 'intercept_time': 100},
... {'interceptor': 's1', 'target': 't2', 'intercept_time': 150},
... {'interceptor': 's2', 'target': 't1', 'intercept_time': 120},
... {'interceptor': 's2', 'target': 't2', 'intercept_time': 90}
... ]
>>> result = best_intercepts_hungarian(intercepts, strikers)
>>> # Returns optimal global assignment
"""
if not all_intercepts:
logger.warning("No intercepts provided for Hungarian algorithm")
return {}
# Group by interceptor
by_interceptor = {}
for intercept in all_intercepts:
interceptor = intercept['interceptor']
if interceptor not in by_interceptor:
by_interceptor[interceptor] = []
by_interceptor[interceptor].append(intercept)
# Get unique strikers and targets
striker_ids = [s['drone_id'] for s in strikers]
all_targets = set()
for intercept in all_intercepts:
all_targets.add(intercept['target'])
target_ids = sorted(all_targets)
if not striker_ids or not target_ids:
logger.warning("No strikers or targets for Hungarian algorithm")
return {}
logger.info(f"Hungarian algorithm: {len(striker_ids)} strikers, {len(target_ids)} targets")
# Build cost matrix
n_strikers = len(striker_ids)
n_targets = len(target_ids)
n = max(n_strikers, n_targets)
# Initialize with very high cost (impossible assignment)
IMPOSSIBLE_COST = 1e9
cost_matrix = np.full((n, n), IMPOSSIBLE_COST)
# Store intercept data for lookup after assignment
intercept_lookup = {}
# Fill cost matrix with actual intercept costs
for i, striker_id in enumerate(striker_ids):
if striker_id not in by_interceptor:
continue
for intercept in by_interceptor[striker_id]:
target = intercept['target']
j = target_ids.index(target)
# Calculate cost based on optimization metric
if optimization_metric == 'intercept_time':
cost = intercept['intercept_time']
elif optimization_metric == 'travel_distance_km':
cost = intercept['travel_distance_km']
elif optimization_metric == 'intercept_time_weighted':
# Prefer assignments with real telemetry
cost = intercept['intercept_time']
if not intercept.get('using_real_telemetry', False):
cost *= 1.1 # 10% penalty for no telemetry
else:
logger.warning(f"Unknown metric '{optimization_metric}', using intercept_time")
cost = intercept['intercept_time']
cost_matrix[i, j] = cost
intercept_lookup[(striker_id, target)] = intercept
logger.debug(f"Cost matrix shape: {cost_matrix.shape}")
logger.debug(f"Cost matrix:\n{cost_matrix[:n_strikers, :n_targets]}")
# Run Hungarian algorithm (finds minimum cost assignment)
row_indices, col_indices = linear_sum_assignment(cost_matrix)
# Build result dictionary
result = {}
total_cost = 0
for i, j in zip(row_indices, col_indices):
# Only include real assignments (not dummy ones)
if i < n_strikers and j < n_targets:
striker_id = striker_ids[i]
target = target_ids[j]
# Check if this is a real assignment (not impossible)
if cost_matrix[i, j] < IMPOSSIBLE_COST / 2:
intercept = intercept_lookup.get((striker_id, target))
if intercept:
result[striker_id] = intercept
total_cost += cost_matrix[i, j]
telemetry_indicator = "✓" if intercept.get('using_real_telemetry') else "⚠"
logger.info(f"Hungarian assigned {striker_id} → {target}: "
f"{intercept['intercept_time']:.0f}s, "
f"{intercept['travel_distance_km']:.1f}km {telemetry_indicator}")
logger.info(f"Hungarian total {optimization_metric}: {total_cost:.1f}")
logger.info(f"Hungarian assigned {len(result)}/{len(striker_ids)} strikers")
return result
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment