Created
September 12, 2025 06:52
-
-
Save troglobit/1a8124d3824e966207df3f7e8ecf300e to your computer and use it in GitHub Desktop.
SSDP scanner
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import socket | |
| import time | |
| import threading | |
| class SsdpClient(threading.Thread): | |
| """ Simple SSDP client probe """ | |
| SEARCH_INTERVAL = 5 | |
| BCAST_IP = '239.255.255.250' | |
| BCAST_PORT = 1900 | |
| def __init__(self): | |
| threading.Thread.__init__(self) | |
| self.interrupted = False | |
| self.host = None | |
| def run(self): | |
| """ Initiate the SSDP probe, printing every discovered host """ | |
| self.keep_search() | |
| def scan(self): | |
| """ Initiate the SSDP scan for a given IP""" | |
| self.keep_search() | |
| def stop(self): | |
| """ When done, stop the background scan """ | |
| self.interrupted = True | |
| print("upnp client stop") | |
| def keep_search(self): | |
| """ run search function every SEARCH_INTERVAL """ | |
| try: | |
| while True: | |
| self.search() | |
| for x in range(self.SEARCH_INTERVAL): | |
| time.sleep(1) | |
| if self.interrupted: | |
| return | |
| except Exception as e: | |
| print('Error in upnp client keep search %s', e) | |
| def search(self): | |
| """ | |
| broadcast SSDP DISCOVER message to LAN network | |
| filter our protocal and add to network | |
| """ | |
| try: | |
| msg = ('M-SEARCH * HTTP/1.1\r\n' | |
| +'HOST: 239.255.255.250:1900\r\n' | |
| +'MAN: "ssdp:discover"\r\n' | |
| +'MX: 1\r\n' | |
| +'ST: ssdp:all\r\n' | |
| +'\r\n') | |
| txt = "location: " | |
| sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
| sock.sendto(msg.encode('ASCII'), (self.BCAST_IP, self.BCAST_PORT)) | |
| sock.settimeout(3) | |
| hosts = set() | |
| while True: | |
| data, (addr, _) = sock.recvfrom(1024) | |
| text = data.decode().lower() | |
| pos = text.find(txt) | |
| if pos != -1 and addr not in hosts: | |
| print(f"{addr} ...") | |
| hosts.add(addr) | |
| finally: | |
| sock.close() | |
| if __name__ == "__main__": | |
| ssdp = SsdpClient() | |
| ssdp.scan() | |
| for _ in range(5): | |
| print("Waiting ...") | |
| time.sleep(1) | |
| ssdp.stop() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment