-
-
Save HappyAmos/09080cca4208af8b29be5cbc924d4b78 to your computer and use it in GitHub Desktop.
Blink IMMIS PoC
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import subprocess | |
| import threading | |
| from time import sleep, time | |
| import requests | |
| import socket | |
| import ssl | |
| BASE_API_URL = "https://rest-u026.immedia-semi.com" | |
| ## REPLACE THESE WITH YOUR OWN VALUES | |
| BLINK_TOKEN = "xxxxxxxxx" | |
| ACCOUNT_ID = 123456 | |
| NETWORK_ID = 123456 | |
| CAMERA_ID = 123456 | |
| def get_liveview(): | |
| headers = { | |
| 'app-build': 'ANDROID_28799573', | |
| 'user-agent': '37.0ANDROID_28799573', | |
| 'locale': 'en_US', | |
| 'x-blink-time-zone': 'America/New_York', | |
| 'token-auth': BLINK_TOKEN, | |
| 'content-type': 'application/json; charset=UTF-8', | |
| } | |
| json_data = { | |
| 'intent': 'liveview', | |
| } | |
| response = requests.post( | |
| f'{BASE_API_URL}/api/v2/accounts/{ACCOUNT_ID}/networks/{NETWORK_ID}/owls/{CAMERA_ID}/liveview', | |
| headers=headers, | |
| json=json_data, | |
| ) | |
| print(f'Get LiveView Response: {response.text}') | |
| return response.json() | |
| def poll(network_id, command_id, interval): | |
| headers = { | |
| 'app-build': 'ANDROID_28799573', | |
| 'user-agent': '37.0ANDROID_28799573', | |
| 'locale': 'en_US', | |
| 'x-blink-time-zone': 'America/New_York', | |
| 'token-auth': BLINK_TOKEN, | |
| 'content-type': 'application/json; charset=UTF-8', | |
| } | |
| while True: | |
| response = requests.get( | |
| f'{BASE_API_URL}/network/{network_id}/command/{command_id}', | |
| headers=headers, | |
| ) | |
| print(f"Command Response: {response.text}") | |
| sleep(interval) | |
| liveview = get_liveview() | |
| immis = liveview["server"] | |
| host = immis.split('/')[2].split(':')[0] | |
| conn_id = immis.split('/')[-1].split('_')[0] | |
| client_id = int(immis.split('?client_id=')[1]) | |
| print(f"{immis=}") | |
| print(f"{conn_id=}") | |
| print(f"{client_id=}") | |
| context = ssl.SSLContext(protocol=ssl.PROTOCOL_TLS_CLIENT) | |
| context.check_hostname = False | |
| context.verify_mode = ssl.CERT_NONE | |
| with socket.create_connection((host, 443)) as sock: | |
| command_poll = threading.Thread(target=poll, args=(NETWORK_ID, liveview["command_id"], liveview["polling_interval"])) | |
| command_poll.start() | |
| conn_header = bytes([ | |
| 0x00, 0x00, 0x00, 0x28, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, | |
| 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00] + list(client_id.to_bytes(4)) + [0x01, 0x08, 0x00, 0x00, | |
| 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, | |
| 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, | |
| 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, | |
| 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, | |
| 0x00, 0x00, 0x00, 0x00, 0x00, 0x10] + list(bytes(conn_id, 'ASCII')) + [0x00, 0x00, 0x00, 0x01, 0x0a, 0x00, 0x00, 0x00, 0x00, 0x00, | |
| 0x00, 0x00, 0x00, | |
| ]) | |
| keep_alive = bytes([ | |
| 0x12, 0x00, 0x00, 0x03, 0xe8, 0x00, 0x00, 0x00, 0x18, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, | |
| 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, | |
| 0x00, | |
| ]) | |
| with context.wrap_socket(sock, server_hostname=host) as ssock: | |
| start = time() | |
| ssock.send(conn_header) | |
| ffplay_process = subprocess.Popen( | |
| ['ffplay', '-f', 'mpegts', '-err_detect', 'ignore_err', '-'], | |
| stdin=subprocess.PIPE | |
| ) | |
| try: | |
| data = ssock.recv(64) | |
| while data: | |
| ffplay_process.stdin.write(data) | |
| data = ssock.recv(64) | |
| if time() - start > 1: | |
| ssock.send(keep_alive) | |
| start = time() | |
| except KeyboardInterrupt: | |
| ffplay_process.kill() | |
| exit() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment