Created
November 30, 2025 13:53
-
-
Save S4tyendra/aaa9f97debfc071be8cb4d657b93ed19 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import pygame | |
| import xml.etree.ElementTree as ET | |
| import math | |
| import random | |
| # Constants | |
| SCREEN_WIDTH = 1000 | |
| SCREEN_HEIGHT = 800 | |
| SCALE = 2.0 | |
| OFFSET_X = 400 | |
| OFFSET_Y = 400 | |
| FPS = 60 | |
| DT = 1.0 / FPS # Simulation time step | |
| # Colors | |
| BLACK = (0, 0, 0) | |
| WHITE = (255, 255, 255) | |
| GRAY = (50, 50, 50) | |
| GREEN = (0, 255, 0) | |
| RED = (255, 0, 0) | |
| YELLOW = (255, 255, 0) | |
| BLUE = (0, 0, 255) | |
| def parse_shape(shape_str): | |
| points = [] | |
| for pair in shape_str.split(' '): | |
| x, y = map(float, pair.split(',')) | |
| points.append((x, y)) | |
| return points | |
| def transform_point(x, y): | |
| # Transform simulation coordinates to screen coordinates | |
| # SUMO: y increases upwards. Pygame: y increases downwards. | |
| # We need to flip y. | |
| sx = x * SCALE + OFFSET_X | |
| sy = -y * SCALE + OFFSET_Y | |
| return int(sx), int(sy) | |
| class Lane: | |
| def __init__(self, id, length, shape, speed): | |
| self.id = id | |
| self.length = float(length) | |
| self.shape = parse_shape(shape) | |
| self.speed_limit = float(speed) | |
| self.incoming_connections = [] # List of (from_lane_id, via_lane_id) | |
| self.outgoing_connections = [] # List of Connection objects | |
| class Connection: | |
| def __init__(self, from_lane, to_lane, via_lane, tl_id, link_index, direction): | |
| self.from_lane = from_lane | |
| self.to_lane = to_lane | |
| self.via_lane = via_lane | |
| self.tl_id = tl_id | |
| self.link_index = int(link_index) if link_index else -1 | |
| self.direction = direction | |
| class TrafficLight: | |
| def __init__(self, id, phases): | |
| self.id = id | |
| self.phases = phases # List of {'duration': int, 'state': str} | |
| self.current_phase_index = 0 | |
| self.timer = 0 | |
| self.state = self.phases[0]['state'] | |
| def update(self, dt): | |
| self.timer += dt | |
| current_duration = float(self.phases[self.current_phase_index]['duration']) | |
| if self.timer >= current_duration: | |
| self.timer = 0 | |
| self.current_phase_index = (self.current_phase_index + 1) % len(self.phases) | |
| self.state = self.phases[self.current_phase_index]['state'] | |
| def get_state(self, link_index): | |
| if 0 <= link_index < len(self.state): | |
| return self.state[link_index] | |
| return 'G' # Default green if index out of bounds | |
| class VehicleType: | |
| def __init__(self, id, length, width, max_speed, accel, decel, color): | |
| self.id = id | |
| self.length = float(length) | |
| self.width = float(width) | |
| self.max_speed = float(max_speed) | |
| self.accel = float(accel) | |
| self.decel = float(decel) | |
| self.color = color | |
| class Vehicle: | |
| def __init__(self, id, vtype, route, network): | |
| self.id = id | |
| self.vtype = vtype | |
| self.route = route # List of edge IDs | |
| self.network = network | |
| self.current_edge_index = 0 | |
| self.current_lane_id = self.get_lane_id_from_edge(self.route[0]) | |
| self.position = 0 # Position on current lane | |
| self.speed = 0 | |
| self.finished = False | |
| self.waiting_time = 0 | |
| def get_lane_id_from_edge(self, edge_id): | |
| # Simplified: just take the first lane of the edge (index 0) | |
| # In a real sim, we'd choose lanes. | |
| # Check if edge exists in network | |
| if edge_id in self.network.edges: | |
| # Return first lane id | |
| return self.network.edges[edge_id][0].id | |
| return None | |
| def update(self, dt, vehicles, traffic_lights): | |
| if self.finished: | |
| return | |
| current_lane = self.network.lanes[self.current_lane_id] | |
| # Calculate target speed | |
| target_speed = min(self.vtype.max_speed, current_lane.speed_limit) | |
| # Check for obstacles (Lead vehicle) | |
| min_dist = float('inf') | |
| lead_vehicle = None | |
| for v in vehicles: | |
| if v.id != self.id and v.current_lane_id == self.current_lane_id: | |
| dist = v.position - self.position - v.vtype.length | |
| if dist > 0 and dist < min_dist: | |
| min_dist = dist | |
| lead_vehicle = v | |
| # Check for traffic lights (if near end of lane) | |
| dist_to_end = current_lane.length - self.position | |
| tl_state = 'G' | |
| # Find connection to next edge | |
| next_lane_id = None | |
| connection = None | |
| if self.current_edge_index < len(self.route) - 1: | |
| next_edge_id = self.route[self.current_edge_index + 1] | |
| # Find connection from current lane to any lane in next edge | |
| for conn in current_lane.outgoing_connections: | |
| to_lane = self.network.lanes[conn.to_lane] | |
| # We need to know which edge 'to_lane' belongs to. | |
| # We can store edge_id in Lane or look it up. | |
| # For now, let's assume we can find the connection that leads to the next edge. | |
| # Actually, internal edges (via) make this tricky. | |
| # Route is list of normal edges. | |
| # Current lane might be a normal lane or internal lane. | |
| # Simplified routing: | |
| # If current lane connects to a via lane, take it. | |
| # If via lane connects to next edge, good. | |
| pass | |
| # Let's use a simpler approach for this custom sim: | |
| # Look at outgoing connections. | |
| # If there is a traffic light on this connection, check it. | |
| # Find the connection that we want to take. | |
| # For simplicity, pick the first valid connection that goes to the next edge in route. | |
| # Or just pick the first connection if we are on an internal lane. | |
| target_connection = None | |
| for conn in current_lane.outgoing_connections: | |
| # Check if this connection leads towards the next route edge | |
| # This requires mapping lane -> edge. | |
| # Let's build that map in Network. | |
| to_lane_edge = self.network.lane_to_edge.get(conn.to_lane) | |
| if to_lane_edge == next_edge_id: | |
| target_connection = conn | |
| break | |
| # If it's an internal edge (via), we might need to look ahead? | |
| # Actually, SUMO routes usually skip internal edges. | |
| # So we go Normal -> Internal -> Normal. | |
| # The connection from Normal to Internal has the TL info. | |
| # If we are on Normal lane, we look for connection to Internal lane that leads to Next Normal. | |
| if to_lane_edge and to_lane_edge.startswith(':'): | |
| # It's internal. Does it lead to next_edge_id? | |
| # We'd need to traverse one more step. | |
| # Let's just check if the connection has a TL. | |
| pass | |
| # Fallback: just take the first connection if we don't have complex routing logic | |
| if not target_connection and current_lane.outgoing_connections: | |
| target_connection = current_lane.outgoing_connections[0] | |
| if target_connection: | |
| if target_connection.tl_id: | |
| tl = traffic_lights.get(target_connection.tl_id) | |
| if tl: | |
| tl_state = tl.get_state(target_connection.link_index) | |
| if tl_state in ['r', 'y'] and dist_to_end < 10 + self.vtype.max_speed: # Stop distance | |
| # Treat red light as an obstacle at the end of the lane | |
| if dist_to_end < min_dist: | |
| min_dist = dist_to_end | |
| # Car following model (simplified IDM-like) | |
| # Desired gap | |
| s_star = 2.0 + self.speed * 1.5 + (self.speed * (self.speed - (lead_vehicle.speed if lead_vehicle else 0))) / (2 * math.sqrt(self.vtype.accel * self.vtype.decel)) | |
| accel = self.vtype.accel * (1 - (self.speed / target_speed)**4 - (s_star / (min_dist if min_dist != float('inf') else 1000))**2) | |
| self.speed += accel * dt | |
| if self.speed < 0: self.speed = 0 | |
| self.position += self.speed * dt | |
| if self.speed < 0.1: | |
| self.waiting_time += dt | |
| # Lane changing / Moving to next lane | |
| if self.position >= current_lane.length: | |
| # Move to next lane | |
| # Find connection | |
| next_conn = None | |
| # Try to follow route | |
| if self.current_edge_index < len(self.route) - 1: | |
| next_edge_target = self.route[self.current_edge_index + 1] | |
| for conn in current_lane.outgoing_connections: | |
| to_lane_id = conn.to_lane | |
| to_edge_id = self.network.lane_to_edge.get(to_lane_id) | |
| if to_edge_id == next_edge_target: | |
| next_conn = conn | |
| self.current_edge_index += 1 | |
| break | |
| elif to_edge_id and to_edge_id.startswith(':'): | |
| # Internal edge. Check if it eventually leads to target? | |
| # For now, just take it and hope. | |
| # Ideally we should know the internal edge connects to the target. | |
| next_conn = conn | |
| # Don't increment edge index yet, we are in junction | |
| break | |
| if not next_conn and current_lane.outgoing_connections: | |
| next_conn = current_lane.outgoing_connections[0] | |
| to_edge_id = self.network.lane_to_edge.get(next_conn.to_lane) | |
| if to_edge_id and not to_edge_id.startswith(':'): | |
| # We moved to a normal edge, try to find it in route to update index | |
| if to_edge_id in self.route: | |
| self.current_edge_index = self.route.index(to_edge_id) | |
| if next_conn: | |
| self.current_lane_id = next_conn.to_lane | |
| self.position -= current_lane.length | |
| else: | |
| self.finished = True | |
| def draw(self, surface): | |
| lane = self.network.lanes[self.current_lane_id] | |
| # Interpolate position on shape | |
| # Shape is list of points. | |
| # Find segment corresponding to position. | |
| total_len = 0 | |
| p1 = lane.shape[0] | |
| p2 = lane.shape[1] | |
| angle = 0 | |
| found_seg = False | |
| for i in range(len(lane.shape) - 1): | |
| sp1 = lane.shape[i] | |
| sp2 = lane.shape[i+1] | |
| seg_len = math.hypot(sp2[0]-sp1[0], sp2[1]-sp1[1]) | |
| if total_len + seg_len >= self.position: | |
| # Vehicle is on this segment | |
| remain = self.position - total_len | |
| ratio = remain / seg_len if seg_len > 0 else 0 | |
| x = sp1[0] + (sp2[0] - sp1[0]) * ratio | |
| y = sp1[1] + (sp2[1] - sp1[1]) * ratio | |
| p1 = (x, y) | |
| angle = math.atan2(sp2[1]-sp1[1], sp2[0]-sp1[0]) | |
| found_seg = True | |
| break | |
| total_len += seg_len | |
| if not found_seg: | |
| # End of lane | |
| p1 = lane.shape[-1] | |
| # Draw vehicle | |
| # Rotate rect | |
| cx, cy = transform_point(p1[0], p1[1]) | |
| # Create a surface for rotation | |
| w = int(self.vtype.length * SCALE) | |
| h = int(self.vtype.width * SCALE) | |
| if w < 1: w = 1 | |
| if h < 1: h = 1 | |
| surf = pygame.Surface((w, h), pygame.SRCALPHA) | |
| # Parse color | |
| c = self.vtype.color.split(',') | |
| if len(c) == 3: | |
| color = (int(float(c[0])*255), int(float(c[1])*255), int(float(c[2])*255)) | |
| else: | |
| color = WHITE | |
| pygame.draw.rect(surf, color, (0, 0, w, h)) | |
| # Rotate | |
| # Pygame rotation is counter-clockwise in degrees. | |
| # math.atan2 returns radians. y is flipped in transform, but angle calculation needs care. | |
| # In simulation coords: angle is standard math angle. | |
| # In screen coords: y is flipped. So angle becomes -angle. | |
| deg = math.degrees(angle) | |
| rotated_surf = pygame.transform.rotate(surf, deg) | |
| rect = rotated_surf.get_rect(center=(cx, cy)) | |
| surface.blit(rotated_surf, rect) | |
| class Network: | |
| def __init__(self): | |
| self.lanes = {} # id -> Lane | |
| self.edges = {} # id -> list of Lanes | |
| self.lane_to_edge = {} # lane_id -> edge_id | |
| self.junctions = [] | |
| self.traffic_lights = {} # id -> TrafficLight | |
| def load_net(self, net_file): | |
| tree = ET.parse(net_file) | |
| root = tree.getroot() | |
| # Parse Edges and Lanes | |
| for edge in root.findall('edge'): | |
| edge_id = edge.get('id') | |
| self.edges[edge_id] = [] | |
| for lane in edge.findall('lane'): | |
| lane_id = lane.get('id') | |
| length = lane.get('length') | |
| shape = lane.get('shape') | |
| speed = lane.get('speed') | |
| new_lane = Lane(lane_id, length, shape, speed) | |
| self.lanes[lane_id] = new_lane | |
| self.edges[edge_id].append(new_lane) | |
| self.lane_to_edge[lane_id] = edge_id | |
| # Parse Connections | |
| for conn in root.findall('connection'): | |
| from_edge = conn.get('from') | |
| to_edge = conn.get('to') | |
| from_lane_idx = int(conn.get('fromLane')) | |
| to_lane_idx = int(conn.get('toLane')) | |
| via_lane_id = conn.get('via') | |
| tl_id = conn.get('tl') | |
| link_index = conn.get('linkIndex') | |
| direction = conn.get('dir') | |
| # Find from_lane ID | |
| if from_edge in self.edges: | |
| # Assuming lanes are ordered by index | |
| # But we stored them in list. Let's verify index. | |
| # Usually lane id is edge_id_index. | |
| from_lane_id = f"{from_edge}_{from_lane_idx}" | |
| to_lane_id = f"{to_edge}_{to_lane_idx}" | |
| if from_lane_id in self.lanes: | |
| # If via exists, the connection is from 'from_lane' to 'via_lane' | |
| # And there is another connection from 'via_lane' to 'to_lane' (implicit or explicit?) | |
| # In SUMO net.xml, 'via' is the internal lane id. | |
| # We should connect from -> via -> to. | |
| target = to_lane_id | |
| if via_lane_id: | |
| target = via_lane_id | |
| # Also add connection from via to to? | |
| # Usually internal lanes have their own connections? | |
| # Let's check internal edges in net.xml. | |
| # Yes, internal edges have connections too? No, usually implied. | |
| # But wait, the 'connection' tag links normal edges. | |
| # The 'via' attribute tells us which internal lane is used. | |
| # So: From -> Via (Connection 1) | |
| # And: Via -> To (Connection 2) | |
| # Let's add connection: From -> Via | |
| c1 = Connection(from_lane_id, via_lane_id, None, tl_id, link_index, direction) | |
| self.lanes[from_lane_id].outgoing_connections.append(c1) | |
| # We also need to link Via -> To. | |
| # The 'via' lane needs to be in self.lanes. | |
| # Internal lanes are defined in <edge function="internal">. | |
| # We parsed them. | |
| if via_lane_id in self.lanes: | |
| c2 = Connection(via_lane_id, to_lane_id, None, None, None, direction) | |
| self.lanes[via_lane_id].outgoing_connections.append(c2) | |
| else: | |
| # Direct connection | |
| c = Connection(from_lane_id, to_lane_id, None, tl_id, link_index, direction) | |
| self.lanes[from_lane_id].outgoing_connections.append(c) | |
| # Parse Traffic Lights | |
| for tl in root.findall('tlLogic'): | |
| tl_id = tl.get('id') | |
| phases = [] | |
| for phase in tl.findall('phase'): | |
| phases.append({ | |
| 'duration': phase.get('duration'), | |
| 'state': phase.get('state') | |
| }) | |
| self.traffic_lights[tl_id] = TrafficLight(tl_id, phases) | |
| def load_routes(self, rou_file): | |
| tree = ET.parse(rou_file) | |
| root = tree.getroot() | |
| vtypes = {} | |
| for vt in root.findall('vType'): | |
| vtypes[vt.get('id')] = VehicleType( | |
| vt.get('id'), | |
| vt.get('length'), | |
| vt.get('width'), | |
| vt.get('maxSpeed'), | |
| vt.get('accel'), | |
| vt.get('decel'), | |
| vt.get('color') | |
| ) | |
| routes = {} | |
| for rt in root.findall('route'): | |
| routes[rt.get('id')] = rt.get('edges').split(' ') | |
| flows = [] | |
| for flow in root.findall('flow'): | |
| flows.append({ | |
| 'id': flow.get('id'), | |
| 'route': flow.get('route'), | |
| 'begin': float(flow.get('begin')), | |
| 'end': float(flow.get('end')), | |
| 'number': int(flow.get('number')), | |
| 'type': flow.get('type') | |
| }) | |
| return vtypes, routes, flows | |
| def main(): | |
| pygame.init() | |
| screen = pygame.display.set_mode((SCREEN_WIDTH, SCREEN_HEIGHT)) | |
| pygame.display.set_caption("Traffic Simulation - Parsed from SUMO XML") | |
| clock = pygame.time.Clock() | |
| font = pygame.font.SysFont('Arial', 16) | |
| # Load Network | |
| network = Network() | |
| network.load_net("updated.net.xml") | |
| vtypes, routes, flows = network.load_routes("updated.rou.xml") | |
| vehicles = [] | |
| vehicle_id_counter = 0 | |
| # Flow management | |
| flow_state = [] | |
| for f in flows: | |
| # Calculate interval | |
| duration = f['end'] - f['begin'] | |
| if duration > 0 and f['number'] > 0: | |
| interval = duration / f['number'] | |
| else: | |
| interval = 0 | |
| flow_state.append({ | |
| 'next_spawn': f['begin'], | |
| 'interval': interval, | |
| 'count': 0, | |
| 'total': f['number'], | |
| 'def': f | |
| }) | |
| sim_time = 0 | |
| running = True | |
| while running: | |
| dt = clock.tick(FPS) / 1000.0 # Real time delta | |
| # Use fixed time step for simulation stability? | |
| # Let's use real time but capped. | |
| for event in pygame.event.get(): | |
| if event.type == pygame.QUIT: | |
| running = False | |
| # Update Traffic Lights | |
| for tl in network.traffic_lights.values(): | |
| tl.update(dt) | |
| # Spawn Vehicles | |
| for fs in flow_state: | |
| if fs['count'] < fs['total'] and sim_time >= fs['next_spawn']: | |
| # Spawn | |
| f = fs['def'] | |
| v_type = vtypes[f['type']] | |
| route_edges = routes[f['route']] | |
| vid = f"{f['id']}_{fs['count']}" | |
| v = Vehicle(vid, v_type, route_edges, network) | |
| # Check if start lane is free | |
| # Simplified: just check if any vehicle is at start of lane | |
| start_lane_id = v.current_lane_id | |
| free = True | |
| for other in vehicles: | |
| if other.current_lane_id == start_lane_id and other.position < v_type.length + 5: | |
| free = False | |
| break | |
| if free: | |
| vehicles.append(v) | |
| fs['count'] += 1 | |
| fs['next_spawn'] += fs['interval'] | |
| # Update Vehicles | |
| for v in vehicles: | |
| v.update(dt, vehicles, network.traffic_lights) | |
| # Remove finished vehicles | |
| vehicles = [v for v in vehicles if not v.finished] | |
| sim_time += dt | |
| # Draw | |
| screen.fill(BLACK) | |
| # Draw Lanes | |
| for lane_id, lane in network.lanes.items(): | |
| points = lane.shape | |
| if len(points) >= 2: | |
| pygame_points = [transform_point(p[0], p[1]) for p in points] | |
| pygame.draw.lines(screen, GRAY, False, pygame_points, 2) | |
| # Draw stop line if TL | |
| # Check outgoing connections for TL | |
| for conn in lane.outgoing_connections: | |
| if conn.tl_id: | |
| # Draw line at end of lane | |
| p_end = pygame_points[-1] | |
| # Approximate direction | |
| if len(pygame_points) >= 2: | |
| p_prev = pygame_points[-2] | |
| # Draw a small red/green line? | |
| tl = network.traffic_lights.get(conn.tl_id) | |
| state = tl.get_state(conn.link_index) | |
| color = RED if state in ['r', 'y'] else GREEN | |
| if state == 'y': color = YELLOW | |
| pygame.draw.circle(screen, color, p_end, 5) | |
| # Draw Vehicles | |
| for v in vehicles: | |
| v.draw(screen) | |
| # Draw Stats | |
| # Bottom Right | |
| stats_text = [ | |
| f"Time: {sim_time:.1f}s", | |
| f"Vehicles: {len(vehicles)}", | |
| f"FPS: {clock.get_fps():.1f}" | |
| ] | |
| y_offset = SCREEN_HEIGHT - 100 | |
| for line in stats_text: | |
| text_surf = font.render(line, True, WHITE) | |
| rect = text_surf.get_rect(bottomright=(SCREEN_WIDTH - 10, y_offset)) | |
| screen.blit(text_surf, rect) | |
| y_offset += 20 | |
| pygame.display.flip() | |
| pygame.quit() | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment