Skip to content

Instantly share code, notes, and snippets.

@prithajnath
Created April 10, 2025 01:18
Show Gist options
  • Select an option

  • Save prithajnath/1e5444210c3cbe7556eeb645b4065fad to your computer and use it in GitHub Desktop.

Select an option

Save prithajnath/1e5444210c3cbe7556eeb645b4065fad to your computer and use it in GitHub Desktop.
from collections import deque
def expand_cluster(D, labels, point_idx, neighbors, cluster_id, eps, min_pts):
queue = deque(neighbors)
labels[point_idx] = cluster_id
while queue:
current_point = queue.popleft()
if labels[current_point] == -1: # previously marked as noise
labels[current_point] = cluster_id
if labels[current_point] != 0: # already processed
continue
labels[current_point] = cluster_id
current_neighbors = get_neighbors(D, current_point, eps)
if len(current_neighbors) >= min_pts:
queue.extend(current_neighbors)
def dbscan(D, eps, min_pts):
n = len(D)
labels = [0] * n # 0: unvisited, -1: noise, >0: cluster ids
cluster_id = 0
for point_idx in range(n):
if labels[point_idx] != 0:
continue
neighbors = get_neighbors(D, point_idx, eps)
if len(neighbors) < min_pts:
labels[point_idx] = -1 # noise
else:
cluster_id += 1
expand_cluster(D, labels, point_idx, neighbors, cluster_id, eps, min_pts)
return labels
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment