Skip to content

Instantly share code, notes, and snippets.

@S4tyendra
Created November 30, 2025 13:53
Show Gist options
  • Select an option

  • Save S4tyendra/aaa9f97debfc071be8cb4d657b93ed19 to your computer and use it in GitHub Desktop.

Select an option

Save S4tyendra/aaa9f97debfc071be8cb4d657b93ed19 to your computer and use it in GitHub Desktop.
import pygame
import xml.etree.ElementTree as ET
import math
import random
# Constants
SCREEN_WIDTH = 1000
SCREEN_HEIGHT = 800
SCALE = 2.0
OFFSET_X = 400
OFFSET_Y = 400
FPS = 60
DT = 1.0 / FPS # Simulation time step
# Colors
BLACK = (0, 0, 0)
WHITE = (255, 255, 255)
GRAY = (50, 50, 50)
GREEN = (0, 255, 0)
RED = (255, 0, 0)
YELLOW = (255, 255, 0)
BLUE = (0, 0, 255)
def parse_shape(shape_str):
points = []
for pair in shape_str.split(' '):
x, y = map(float, pair.split(','))
points.append((x, y))
return points
def transform_point(x, y):
# Transform simulation coordinates to screen coordinates
# SUMO: y increases upwards. Pygame: y increases downwards.
# We need to flip y.
sx = x * SCALE + OFFSET_X
sy = -y * SCALE + OFFSET_Y
return int(sx), int(sy)
class Lane:
def __init__(self, id, length, shape, speed):
self.id = id
self.length = float(length)
self.shape = parse_shape(shape)
self.speed_limit = float(speed)
self.incoming_connections = [] # List of (from_lane_id, via_lane_id)
self.outgoing_connections = [] # List of Connection objects
class Connection:
def __init__(self, from_lane, to_lane, via_lane, tl_id, link_index, direction):
self.from_lane = from_lane
self.to_lane = to_lane
self.via_lane = via_lane
self.tl_id = tl_id
self.link_index = int(link_index) if link_index else -1
self.direction = direction
class TrafficLight:
def __init__(self, id, phases):
self.id = id
self.phases = phases # List of {'duration': int, 'state': str}
self.current_phase_index = 0
self.timer = 0
self.state = self.phases[0]['state']
def update(self, dt):
self.timer += dt
current_duration = float(self.phases[self.current_phase_index]['duration'])
if self.timer >= current_duration:
self.timer = 0
self.current_phase_index = (self.current_phase_index + 1) % len(self.phases)
self.state = self.phases[self.current_phase_index]['state']
def get_state(self, link_index):
if 0 <= link_index < len(self.state):
return self.state[link_index]
return 'G' # Default green if index out of bounds
class VehicleType:
def __init__(self, id, length, width, max_speed, accel, decel, color):
self.id = id
self.length = float(length)
self.width = float(width)
self.max_speed = float(max_speed)
self.accel = float(accel)
self.decel = float(decel)
self.color = color
class Vehicle:
def __init__(self, id, vtype, route, network):
self.id = id
self.vtype = vtype
self.route = route # List of edge IDs
self.network = network
self.current_edge_index = 0
self.current_lane_id = self.get_lane_id_from_edge(self.route[0])
self.position = 0 # Position on current lane
self.speed = 0
self.finished = False
self.waiting_time = 0
def get_lane_id_from_edge(self, edge_id):
# Simplified: just take the first lane of the edge (index 0)
# In a real sim, we'd choose lanes.
# Check if edge exists in network
if edge_id in self.network.edges:
# Return first lane id
return self.network.edges[edge_id][0].id
return None
def update(self, dt, vehicles, traffic_lights):
if self.finished:
return
current_lane = self.network.lanes[self.current_lane_id]
# Calculate target speed
target_speed = min(self.vtype.max_speed, current_lane.speed_limit)
# Check for obstacles (Lead vehicle)
min_dist = float('inf')
lead_vehicle = None
for v in vehicles:
if v.id != self.id and v.current_lane_id == self.current_lane_id:
dist = v.position - self.position - v.vtype.length
if dist > 0 and dist < min_dist:
min_dist = dist
lead_vehicle = v
# Check for traffic lights (if near end of lane)
dist_to_end = current_lane.length - self.position
tl_state = 'G'
# Find connection to next edge
next_lane_id = None
connection = None
if self.current_edge_index < len(self.route) - 1:
next_edge_id = self.route[self.current_edge_index + 1]
# Find connection from current lane to any lane in next edge
for conn in current_lane.outgoing_connections:
to_lane = self.network.lanes[conn.to_lane]
# We need to know which edge 'to_lane' belongs to.
# We can store edge_id in Lane or look it up.
# For now, let's assume we can find the connection that leads to the next edge.
# Actually, internal edges (via) make this tricky.
# Route is list of normal edges.
# Current lane might be a normal lane or internal lane.
# Simplified routing:
# If current lane connects to a via lane, take it.
# If via lane connects to next edge, good.
pass
# Let's use a simpler approach for this custom sim:
# Look at outgoing connections.
# If there is a traffic light on this connection, check it.
# Find the connection that we want to take.
# For simplicity, pick the first valid connection that goes to the next edge in route.
# Or just pick the first connection if we are on an internal lane.
target_connection = None
for conn in current_lane.outgoing_connections:
# Check if this connection leads towards the next route edge
# This requires mapping lane -> edge.
# Let's build that map in Network.
to_lane_edge = self.network.lane_to_edge.get(conn.to_lane)
if to_lane_edge == next_edge_id:
target_connection = conn
break
# If it's an internal edge (via), we might need to look ahead?
# Actually, SUMO routes usually skip internal edges.
# So we go Normal -> Internal -> Normal.
# The connection from Normal to Internal has the TL info.
# If we are on Normal lane, we look for connection to Internal lane that leads to Next Normal.
if to_lane_edge and to_lane_edge.startswith(':'):
# It's internal. Does it lead to next_edge_id?
# We'd need to traverse one more step.
# Let's just check if the connection has a TL.
pass
# Fallback: just take the first connection if we don't have complex routing logic
if not target_connection and current_lane.outgoing_connections:
target_connection = current_lane.outgoing_connections[0]
if target_connection:
if target_connection.tl_id:
tl = traffic_lights.get(target_connection.tl_id)
if tl:
tl_state = tl.get_state(target_connection.link_index)
if tl_state in ['r', 'y'] and dist_to_end < 10 + self.vtype.max_speed: # Stop distance
# Treat red light as an obstacle at the end of the lane
if dist_to_end < min_dist:
min_dist = dist_to_end
# Car following model (simplified IDM-like)
# Desired gap
s_star = 2.0 + self.speed * 1.5 + (self.speed * (self.speed - (lead_vehicle.speed if lead_vehicle else 0))) / (2 * math.sqrt(self.vtype.accel * self.vtype.decel))
accel = self.vtype.accel * (1 - (self.speed / target_speed)**4 - (s_star / (min_dist if min_dist != float('inf') else 1000))**2)
self.speed += accel * dt
if self.speed < 0: self.speed = 0
self.position += self.speed * dt
if self.speed < 0.1:
self.waiting_time += dt
# Lane changing / Moving to next lane
if self.position >= current_lane.length:
# Move to next lane
# Find connection
next_conn = None
# Try to follow route
if self.current_edge_index < len(self.route) - 1:
next_edge_target = self.route[self.current_edge_index + 1]
for conn in current_lane.outgoing_connections:
to_lane_id = conn.to_lane
to_edge_id = self.network.lane_to_edge.get(to_lane_id)
if to_edge_id == next_edge_target:
next_conn = conn
self.current_edge_index += 1
break
elif to_edge_id and to_edge_id.startswith(':'):
# Internal edge. Check if it eventually leads to target?
# For now, just take it and hope.
# Ideally we should know the internal edge connects to the target.
next_conn = conn
# Don't increment edge index yet, we are in junction
break
if not next_conn and current_lane.outgoing_connections:
next_conn = current_lane.outgoing_connections[0]
to_edge_id = self.network.lane_to_edge.get(next_conn.to_lane)
if to_edge_id and not to_edge_id.startswith(':'):
# We moved to a normal edge, try to find it in route to update index
if to_edge_id in self.route:
self.current_edge_index = self.route.index(to_edge_id)
if next_conn:
self.current_lane_id = next_conn.to_lane
self.position -= current_lane.length
else:
self.finished = True
def draw(self, surface):
lane = self.network.lanes[self.current_lane_id]
# Interpolate position on shape
# Shape is list of points.
# Find segment corresponding to position.
total_len = 0
p1 = lane.shape[0]
p2 = lane.shape[1]
angle = 0
found_seg = False
for i in range(len(lane.shape) - 1):
sp1 = lane.shape[i]
sp2 = lane.shape[i+1]
seg_len = math.hypot(sp2[0]-sp1[0], sp2[1]-sp1[1])
if total_len + seg_len >= self.position:
# Vehicle is on this segment
remain = self.position - total_len
ratio = remain / seg_len if seg_len > 0 else 0
x = sp1[0] + (sp2[0] - sp1[0]) * ratio
y = sp1[1] + (sp2[1] - sp1[1]) * ratio
p1 = (x, y)
angle = math.atan2(sp2[1]-sp1[1], sp2[0]-sp1[0])
found_seg = True
break
total_len += seg_len
if not found_seg:
# End of lane
p1 = lane.shape[-1]
# Draw vehicle
# Rotate rect
cx, cy = transform_point(p1[0], p1[1])
# Create a surface for rotation
w = int(self.vtype.length * SCALE)
h = int(self.vtype.width * SCALE)
if w < 1: w = 1
if h < 1: h = 1
surf = pygame.Surface((w, h), pygame.SRCALPHA)
# Parse color
c = self.vtype.color.split(',')
if len(c) == 3:
color = (int(float(c[0])*255), int(float(c[1])*255), int(float(c[2])*255))
else:
color = WHITE
pygame.draw.rect(surf, color, (0, 0, w, h))
# Rotate
# Pygame rotation is counter-clockwise in degrees.
# math.atan2 returns radians. y is flipped in transform, but angle calculation needs care.
# In simulation coords: angle is standard math angle.
# In screen coords: y is flipped. So angle becomes -angle.
deg = math.degrees(angle)
rotated_surf = pygame.transform.rotate(surf, deg)
rect = rotated_surf.get_rect(center=(cx, cy))
surface.blit(rotated_surf, rect)
class Network:
def __init__(self):
self.lanes = {} # id -> Lane
self.edges = {} # id -> list of Lanes
self.lane_to_edge = {} # lane_id -> edge_id
self.junctions = []
self.traffic_lights = {} # id -> TrafficLight
def load_net(self, net_file):
tree = ET.parse(net_file)
root = tree.getroot()
# Parse Edges and Lanes
for edge in root.findall('edge'):
edge_id = edge.get('id')
self.edges[edge_id] = []
for lane in edge.findall('lane'):
lane_id = lane.get('id')
length = lane.get('length')
shape = lane.get('shape')
speed = lane.get('speed')
new_lane = Lane(lane_id, length, shape, speed)
self.lanes[lane_id] = new_lane
self.edges[edge_id].append(new_lane)
self.lane_to_edge[lane_id] = edge_id
# Parse Connections
for conn in root.findall('connection'):
from_edge = conn.get('from')
to_edge = conn.get('to')
from_lane_idx = int(conn.get('fromLane'))
to_lane_idx = int(conn.get('toLane'))
via_lane_id = conn.get('via')
tl_id = conn.get('tl')
link_index = conn.get('linkIndex')
direction = conn.get('dir')
# Find from_lane ID
if from_edge in self.edges:
# Assuming lanes are ordered by index
# But we stored them in list. Let's verify index.
# Usually lane id is edge_id_index.
from_lane_id = f"{from_edge}_{from_lane_idx}"
to_lane_id = f"{to_edge}_{to_lane_idx}"
if from_lane_id in self.lanes:
# If via exists, the connection is from 'from_lane' to 'via_lane'
# And there is another connection from 'via_lane' to 'to_lane' (implicit or explicit?)
# In SUMO net.xml, 'via' is the internal lane id.
# We should connect from -> via -> to.
target = to_lane_id
if via_lane_id:
target = via_lane_id
# Also add connection from via to to?
# Usually internal lanes have their own connections?
# Let's check internal edges in net.xml.
# Yes, internal edges have connections too? No, usually implied.
# But wait, the 'connection' tag links normal edges.
# The 'via' attribute tells us which internal lane is used.
# So: From -> Via (Connection 1)
# And: Via -> To (Connection 2)
# Let's add connection: From -> Via
c1 = Connection(from_lane_id, via_lane_id, None, tl_id, link_index, direction)
self.lanes[from_lane_id].outgoing_connections.append(c1)
# We also need to link Via -> To.
# The 'via' lane needs to be in self.lanes.
# Internal lanes are defined in <edge function="internal">.
# We parsed them.
if via_lane_id in self.lanes:
c2 = Connection(via_lane_id, to_lane_id, None, None, None, direction)
self.lanes[via_lane_id].outgoing_connections.append(c2)
else:
# Direct connection
c = Connection(from_lane_id, to_lane_id, None, tl_id, link_index, direction)
self.lanes[from_lane_id].outgoing_connections.append(c)
# Parse Traffic Lights
for tl in root.findall('tlLogic'):
tl_id = tl.get('id')
phases = []
for phase in tl.findall('phase'):
phases.append({
'duration': phase.get('duration'),
'state': phase.get('state')
})
self.traffic_lights[tl_id] = TrafficLight(tl_id, phases)
def load_routes(self, rou_file):
tree = ET.parse(rou_file)
root = tree.getroot()
vtypes = {}
for vt in root.findall('vType'):
vtypes[vt.get('id')] = VehicleType(
vt.get('id'),
vt.get('length'),
vt.get('width'),
vt.get('maxSpeed'),
vt.get('accel'),
vt.get('decel'),
vt.get('color')
)
routes = {}
for rt in root.findall('route'):
routes[rt.get('id')] = rt.get('edges').split(' ')
flows = []
for flow in root.findall('flow'):
flows.append({
'id': flow.get('id'),
'route': flow.get('route'),
'begin': float(flow.get('begin')),
'end': float(flow.get('end')),
'number': int(flow.get('number')),
'type': flow.get('type')
})
return vtypes, routes, flows
def main():
pygame.init()
screen = pygame.display.set_mode((SCREEN_WIDTH, SCREEN_HEIGHT))
pygame.display.set_caption("Traffic Simulation - Parsed from SUMO XML")
clock = pygame.time.Clock()
font = pygame.font.SysFont('Arial', 16)
# Load Network
network = Network()
network.load_net("updated.net.xml")
vtypes, routes, flows = network.load_routes("updated.rou.xml")
vehicles = []
vehicle_id_counter = 0
# Flow management
flow_state = []
for f in flows:
# Calculate interval
duration = f['end'] - f['begin']
if duration > 0 and f['number'] > 0:
interval = duration / f['number']
else:
interval = 0
flow_state.append({
'next_spawn': f['begin'],
'interval': interval,
'count': 0,
'total': f['number'],
'def': f
})
sim_time = 0
running = True
while running:
dt = clock.tick(FPS) / 1000.0 # Real time delta
# Use fixed time step for simulation stability?
# Let's use real time but capped.
for event in pygame.event.get():
if event.type == pygame.QUIT:
running = False
# Update Traffic Lights
for tl in network.traffic_lights.values():
tl.update(dt)
# Spawn Vehicles
for fs in flow_state:
if fs['count'] < fs['total'] and sim_time >= fs['next_spawn']:
# Spawn
f = fs['def']
v_type = vtypes[f['type']]
route_edges = routes[f['route']]
vid = f"{f['id']}_{fs['count']}"
v = Vehicle(vid, v_type, route_edges, network)
# Check if start lane is free
# Simplified: just check if any vehicle is at start of lane
start_lane_id = v.current_lane_id
free = True
for other in vehicles:
if other.current_lane_id == start_lane_id and other.position < v_type.length + 5:
free = False
break
if free:
vehicles.append(v)
fs['count'] += 1
fs['next_spawn'] += fs['interval']
# Update Vehicles
for v in vehicles:
v.update(dt, vehicles, network.traffic_lights)
# Remove finished vehicles
vehicles = [v for v in vehicles if not v.finished]
sim_time += dt
# Draw
screen.fill(BLACK)
# Draw Lanes
for lane_id, lane in network.lanes.items():
points = lane.shape
if len(points) >= 2:
pygame_points = [transform_point(p[0], p[1]) for p in points]
pygame.draw.lines(screen, GRAY, False, pygame_points, 2)
# Draw stop line if TL
# Check outgoing connections for TL
for conn in lane.outgoing_connections:
if conn.tl_id:
# Draw line at end of lane
p_end = pygame_points[-1]
# Approximate direction
if len(pygame_points) >= 2:
p_prev = pygame_points[-2]
# Draw a small red/green line?
tl = network.traffic_lights.get(conn.tl_id)
state = tl.get_state(conn.link_index)
color = RED if state in ['r', 'y'] else GREEN
if state == 'y': color = YELLOW
pygame.draw.circle(screen, color, p_end, 5)
# Draw Vehicles
for v in vehicles:
v.draw(screen)
# Draw Stats
# Bottom Right
stats_text = [
f"Time: {sim_time:.1f}s",
f"Vehicles: {len(vehicles)}",
f"FPS: {clock.get_fps():.1f}"
]
y_offset = SCREEN_HEIGHT - 100
for line in stats_text:
text_surf = font.render(line, True, WHITE)
rect = text_surf.get_rect(bottomright=(SCREEN_WIDTH - 10, y_offset))
screen.blit(text_surf, rect)
y_offset += 20
pygame.display.flip()
pygame.quit()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment