Created
June 7, 2021 13:46
-
-
Save miraclx/2c3cc535404cf9d0314a466c6e2a6304 to your computer and use it in GitHub Desktop.
Get info from MTN MF283 on the CLI
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| import re | |
| import sys | |
| import time | |
| import base64 | |
| import getpass | |
| import datetime | |
| import requests | |
| import functools | |
| from datetime import timedelta | |
| from http.cookies import SimpleCookie | |
| ROUTER_KEY = None | |
| ROUTER_KEY_PATH = os.path.join(os.path.expanduser("~"), '.router_key') | |
| PROTOCOL = "http://" | |
| ROUTER_IP = "192.168.0.1" | |
| PASSWORD = None | |
| def get_auth_key(force=False): | |
| global ROUTER_KEY, PASSWORD | |
| if not force: | |
| if not ROUTER_KEY and os.path.exists(ROUTER_KEY_PATH): | |
| with open(ROUTER_KEY_PATH, "r") as f: | |
| try: | |
| ROUTER_KEY, _ = next(filter(lambda auth_pair: auth_pair[1] == PASSWORD, (( | |
| line[:32], line[32:]) for line in f.read().splitlines()))) | |
| except StopIteration: | |
| pass | |
| if ROUTER_KEY: | |
| return ROUTER_KEY | |
| r = requests.post( | |
| f"{PROTOCOL}{ROUTER_IP}/goform/goform_set_cmd_process", | |
| data=f"isTest=false&goformId=LOGIN&password={PASSWORD}", | |
| headers={"Referer": f"{PROTOCOL}{ROUTER_IP}/index.html"} | |
| ) | |
| if r.json()["result"] != "0": | |
| PASSWORD = False | |
| return | |
| ROUTER_KEY = SimpleCookie(r.headers.get("set-cookie"))["zwsd"].value | |
| with open(ROUTER_KEY_PATH, "w+") as f: | |
| modified, lines = functools.reduce(lambda final, this: (True, (final[1].append[(ROUTER_KEY, PASSWORD)], final)[1]) if this[1] == PASSWORD else (False, (final[1].append(this), final)[1]), (( | |
| line[:32], line[32:]) for line in f.read().splitlines()), (False, [])) | |
| if not modified: | |
| lines.append((ROUTER_KEY, PASSWORD)) | |
| f.write("%s" % "\n".join("".join(pair) for pair in lines)) | |
| return ROUTER_KEY | |
| def get_multi_data(): | |
| prop_keys = [ | |
| "loginfo", | |
| "signalbar", | |
| "network_type", | |
| "network_provider", | |
| "lan_ipaddr", | |
| "wan_ipaddr", | |
| "user_ip_addr", | |
| "LocalDomain", | |
| "wifi_onoff_state", | |
| "wifi_access_sta_num", | |
| "wifi_chip1_ssid1_switch_onoff", | |
| "wifi_chip1_ssid1_ssid", | |
| "wifi_chip1_ssid1_access_sta_num", | |
| "wifi_chip1_ssid1_wifi_coverage", | |
| "wifi_chip1_ssid1_max_access_num", | |
| "wifi_chip1_ssid1_auth_mode", | |
| "wifi_chip1_ssid1_password_encode", | |
| "wifi_chip2_ssid1_switch_onoff", | |
| "wifi_chip2_ssid1_ssid", | |
| "wifi_chip2_ssid1_access_sta_num", | |
| "wifi_chip2_ssid1_wifi_coverage", | |
| "wifi_chip2_ssid1_max_access_num", | |
| "wifi_chip2_ssid1_auth_mode", | |
| "wifi_chip2_ssid1_password_encode", | |
| "battery_charging", | |
| "battery_vol_percent", | |
| "realtime_tx_bytes", | |
| "realtime_rx_bytes", | |
| "realtime_time", | |
| "realtime_tx_thrpt", | |
| "realtime_rx_thrpt", | |
| "monthly_rx_bytes", | |
| "monthly_tx_bytes", | |
| "monthly_time", | |
| "data_volume_limit_switch", | |
| "data_volume_limit_size", | |
| "data_volume_alert_percent", | |
| "data_volume_limit_unit", | |
| "sms_unread_num", | |
| ] | |
| return requests.get( | |
| f"{PROTOCOL}{ROUTER_IP}/goform/goform_get_cmd_process?multi_data=1&isTest=false&sms_received_flag_flag=0&sts_received_flag_flag=0&cmd={'%2C'.join(prop_keys)}&_=1622776507136", | |
| headers={ | |
| "Referer": f'{PROTOCOL}{ROUTER_IP}/index.html', | |
| "Cookie": f'zwsd="{ROUTER_KEY}"' | |
| } | |
| ).json() | |
| def pad(scale=None, fill=" "): | |
| return {"type": "expand", "scale": scale, "fill": fill} | |
| def push_align(fill=" "): | |
| return { | |
| "type": "postsized", | |
| "size": lambda aggr, occupied: aggr[1] - occupied, | |
| "fill": fill if callable(fill) else lambda size: fill * size | |
| } | |
| def backspace(space=1): | |
| return { | |
| "type": "sized", | |
| "size": lambda _1, _2: space, | |
| } | |
| def min_max(iterable): | |
| _min, _max = None, None | |
| for v in iterable: | |
| _min, _max = (v if _min is None else min(v, _min) | |
| ), (v if _max is None else max(v, _max)) | |
| return _min, _max | |
| def mk_progressbar(value, total, chars=10): | |
| filled = (value * chars) // total | |
| return ("#" * filled) + (" " * (chars - filled)) | |
| def fmt_speed(speed, bits=False): | |
| if bits: | |
| speed *= 8 | |
| for unit in ['', 'K', 'M', 'G', 'T', 'P', 'E', 'Z']: | |
| if abs(speed) < 1024.0: | |
| return "%3.2f %s" % (speed, unit + ("b" if bits else "B")) | |
| speed /= 1024.0 | |
| return "%.2f %s" % (speed, 'Yb') | |
| def strip_ansi(string): | |
| ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') | |
| return ansi_escape.sub('', string) | |
| def build(input_stacks): | |
| stacks = [] | |
| rows, columns = [*(int(s) | |
| for s in os.popen('stty size', 'r').read().split())] | |
| for line in input_stacks: | |
| final = [None] * len(line) | |
| sized_components, post_sized_components, expanding_components = [], [], [] | |
| occupied = 0 | |
| for (index, component) in enumerate(line): | |
| if isinstance(component, str): | |
| occupied += len(strip_ansi(component)) | |
| final[index] = component | |
| elif isinstance(component, dict): | |
| size = component.get("size") | |
| fill = component.get("fill") | |
| if component["type"] == "sized": | |
| sized_components.append((index, size, fill)) | |
| elif component["type"] == "postsized": | |
| post_sized_components.append((index, size, fill)) | |
| elif component["type"] == "expand": | |
| expanding_components.append( | |
| (index, size, fill, component.get("scale"))) | |
| for (index, size, fill) in sized_components: | |
| if size and callable(size): | |
| size = size(columns - occupied, occupied) | |
| if not isinstance(size, (int, float)): | |
| size = 0 | |
| final[index] = fill(size) if callable(fill) else "" | |
| occupied += size | |
| stacks.append( | |
| (final, occupied, post_sized_components, expanding_components)) | |
| min_fixed, max_fixed = min_max(line[1] for line in stacks) | |
| for (final, occupied, post_sized_components, expanding_components) in stacks: | |
| for (index, size, fill) in post_sized_components: | |
| if size and callable(size): | |
| size = size((min_fixed, max_fixed), occupied) | |
| if not isinstance(size, (int, float)): | |
| size = 0 | |
| final[index] = fill(size) if callable(fill) else "" | |
| occupied += size | |
| full_width = columns - occupied | |
| total_scaling = functools.reduce( | |
| lambda total_scaling, component: total_scaling + (component[3] or 1), expanding_components, 0) | |
| for (index, size, fill, scale) in expanding_components: | |
| free = (full_width * (scale or 1)) / total_scaling | |
| if size and callable(size): | |
| size = size(free) | |
| if not isinstance(size, (int, float)): | |
| size = free | |
| final[index] = fill(size) if callable(fill) else ( | |
| fill if isinstance(fill, str) else " ") * int(size) | |
| return (stack[0] for stack in stacks) | |
| def battery_info(stats): | |
| return "[%s] %s%%%s" % (mk_progressbar(int(stats["battery_vol_percent"]), 100), | |
| stats["battery_vol_percent"].rjust(3, " "), " (charging)" if stats["battery_charging"] == "1" else " (discharging)") | |
| def signal_info(stats): | |
| return "[%s] %s%% [%s] (%s)" % (mk_progressbar( | |
| int(stats["signalbar"]), 5), str((int(stats["signalbar"]) * 100) // 5).rjust(3, " "), stats["network_type"], stats["network_provider"]) | |
| def wifi_info(stats): | |
| active_users_ssid_1, max_users_ssid_1 = int( | |
| stats["wifi_chip1_ssid1_access_sta_num"]), int(stats["wifi_chip1_ssid1_max_access_num"]) | |
| active_users_ssid_2, max_users_ssid_2 = int( | |
| stats["wifi_chip2_ssid1_access_sta_num"]), int(stats["wifi_chip2_ssid1_max_access_num"]) | |
| return [ | |
| ["WiFi:", "%s" % ("on (\u26d3: %s/%s)" % (stats["wifi_access_sta_num"], max_users_ssid_1 + max_users_ssid_2)) | |
| if stats["wifi_onoff_state"] == "1" else "off"], | |
| *([ | |
| *([ | |
| ["\x1b[33m • SSID 1\x1b[0m:", "%s (\u26d3: %s/%s)" % ( | |
| stats["wifi_chip1_ssid1_ssid"], active_users_ssid_1, max_users_ssid_1)], | |
| ["\x1b[33m • Password\x1b[0m:", "%s" % | |
| base64.b64decode(stats["wifi_chip1_ssid1_password_encode"]).decode("ascii")] | |
| ] if stats["wifi_chip1_ssid1_switch_onoff"] == "1" else []), | |
| *([ | |
| ["\x1b[33m • SSID 2\x1b[0m:", "%s (\u26d3: %s/%s)" % ( | |
| stats["wifi_chip2_ssid1_ssid"], active_users_ssid_2, max_users_ssid_2)], | |
| ["\x1b[33m • Password\x1b[0m:", "%s" % | |
| base64.b64decode(stats["wifi_chip2_ssid1_password_encode"]).decode("ascii")] | |
| ] if stats["wifi_chip2_ssid1_switch_onoff"] else []), | |
| ] if stats["wifi_onoff_state"] == "1" else []) | |
| ] | |
| def realtime_traffic_info(stats): | |
| up, down = int(stats["realtime_tx_thrpt"]), int( | |
| stats["realtime_rx_thrpt"]) | |
| return [ | |
| ["\x1b[33mSpeed\x1b[0m:", "(\u21e1: %sps, \u21e3: %sps, \u21f5: %sps)" % ( | |
| fmt_speed(up, bits=True), | |
| fmt_speed(down, bits=True), | |
| fmt_speed(up + down, bits=True) | |
| )], | |
| ] | |
| def session_traffic_info(stats): | |
| up, down = int(stats["realtime_tx_bytes"]), int( | |
| stats["realtime_rx_bytes"]) | |
| return [ | |
| ["Session Data Usage:", "(duration: %s)" % timedelta( | |
| seconds=int(stats["realtime_time"]))], | |
| ["\x1b[33m • Data Usage\x1b[0m:", "(\u21e1: %s, \u21e3: %s, \u21f5: %s)" % ( | |
| fmt_speed(up), | |
| fmt_speed(down), | |
| fmt_speed(up + down) | |
| )], | |
| ] | |
| def monthly_traffic_info(stats): | |
| up, down = int(stats["monthly_tx_bytes"]), int( | |
| stats["monthly_rx_bytes"]) | |
| return [ | |
| ["Monthly Data Usage:", "(duration: %s)" % timedelta( | |
| seconds=int(stats["monthly_time"]))], | |
| ["\x1b[33m • Data Usage\x1b[0m:", "(\u21e1: %s, \u21e3: %s, \u21f5: %s)" % ( | |
| fmt_speed(up), | |
| fmt_speed(down), | |
| fmt_speed(up + down) | |
| )], | |
| ] | |
| def volume_limit_info(stats): | |
| if stats["data_volume_limit_switch"] != "1": | |
| return [] | |
| elif stats["data_volume_limit_unit"] == "data": | |
| size, unit = stats["data_volume_limit_size"].split("_") | |
| total = int(size) * 1048576 * int(unit) | |
| usage = int(stats["monthly_tx_bytes"]) + \ | |
| int(stats["monthly_rx_bytes"]) | |
| return [ | |
| ["\x1b[33mData Usage Limit\x1b[0m:", "(used %s%% of %s, alerts at %s%%)" % | |
| ((usage * 100) // total, fmt_speed(total), int(stats["data_volume_alert_percent"]))], | |
| ] | |
| elif stats["data_volume_limit_unit"] == "time": | |
| total_hours = timedelta( | |
| hours=int(stats["data_volume_limit_size"])) | |
| used_seconds = int(stats["monthly_time"]) | |
| return [ | |
| ["\x1b[33mTime Usage Limit\x1b[0m:", "(used %s%% of %s, alerts at %s%%)" % | |
| ((used_seconds * 100) // total_hours.total_seconds(), str(total_hours).split(",")[0], int(stats["data_volume_alert_percent"]))], | |
| ] | |
| def main(): | |
| global PASSWORD | |
| PASSWORD = PASSWORD or os.environ.get("ROUTER_PASSWORD") or base64.b64encode( | |
| getpass.getpass("Password: ").encode("ascii")).decode("ascii") | |
| get_auth_key() | |
| while True: | |
| stats = get_multi_data() | |
| if stats["loginfo"] != "ok": | |
| get_auth_key(True) | |
| if PASSWORD == False: | |
| print("\x1b[31m[!]\x1b[0m Incorrect Password", file=sys.stderr) | |
| break | |
| continue | |
| stack = [ | |
| ["\x1b[36mRouter Info\x1b[0m"], | |
| ["\x1b[33mBattery\x1b[0m:", battery_info(stats)], | |
| ["\x1b[33mSignal\x1b[0m:", signal_info(stats)], | |
| ["\x1b[33mServer IP\x1b[0m:", "LAN: %s, WAN: %s" % | |
| (stats["lan_ipaddr"], stats["wan_ipaddr"])], | |
| ["\x1b[33mDevice IP\x1b[0m:", "%s" % stats["user_ip_addr"]], | |
| *wifi_info(stats), | |
| *realtime_traffic_info(stats), | |
| *session_traffic_info(stats), | |
| *monthly_traffic_info(stats), | |
| *volume_limit_info(stats), | |
| ["\x1b[33mSMS\x1b[0m:", "%s unread" % stats["sms_unread_num"]], | |
| ] | |
| print("\n".join("".join(line) for line in build( | |
| [pad(), *([component for component in line for component in [component, pad(.06, fill=lambda size: "\x1b[38;5;245m%s\x1b[0m" % ("\u06d4" * int(size))), push_align(fill=lambda size: "\x1b[38;5;245m%s\x1b[0m" % ("\u06d4" * size))]][:-2] if not isinstance(line, tuple) else line[0]), pad()] for line in stack))) | |
| time.sleep(.5) | |
| for line in stack: | |
| sys.stdout.write(f"\x1b[F\x1b[2K") | |
| if __name__ == "__main__": | |
| print("Logging into [%s]" % ROUTER_IP, file=sys.stderr) | |
| while True: | |
| main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| import sys | |
| import json | |
| import base64 | |
| import getpass | |
| import requests | |
| import functools | |
| from http.cookies import SimpleCookie | |
| ROUTER_KEY = None | |
| ROUTER_KEY_PATH = os.path.join(os.path.expanduser("~"), '.router_key') | |
| PROTOCOL = "http://" | |
| ROUTER_IP = "192.168.0.1" | |
| PASSWORD = None | |
| def get_auth_key(force=False): | |
| global ROUTER_KEY, PASSWORD | |
| if not force: | |
| if not ROUTER_KEY and os.path.exists(ROUTER_KEY_PATH): | |
| with open(ROUTER_KEY_PATH, "r") as f: | |
| try: | |
| ROUTER_KEY, _ = next(filter(lambda auth_pair: auth_pair[1] == PASSWORD, (( | |
| line[:32], line[32:]) for line in f.read().splitlines()))) | |
| except StopIteration: | |
| pass | |
| if ROUTER_KEY: | |
| return ROUTER_KEY | |
| r = requests.post( | |
| f"{PROTOCOL}{ROUTER_IP}/goform/goform_set_cmd_process", | |
| data=f"isTest=false&goformId=LOGIN&password={PASSWORD}", | |
| headers={"Referer": f"{PROTOCOL}{ROUTER_IP}/index.html"} | |
| ) | |
| if r.json()["result"] != "0": | |
| PASSWORD = False | |
| return | |
| ROUTER_KEY = SimpleCookie(r.headers.get("set-cookie"))["zwsd"].value | |
| with open(ROUTER_KEY_PATH, "w+") as f: | |
| modified, lines = functools.reduce(lambda final, this: (True, (final[1].append[(ROUTER_KEY, PASSWORD)], final)[1]) if this[1] == PASSWORD else (False, (final[1].append(this), final)[1]), (( | |
| line[:32], line[32:]) for line in f.read().splitlines()), (False, [])) | |
| if not modified: | |
| lines.append((ROUTER_KEY, PASSWORD)) | |
| f.write("%s" % "\n".join("".join(pair) for pair in lines)) | |
| return ROUTER_KEY | |
| def get_multi_data(*cmds): | |
| prop_keys = ["loginfo", *cmds] | |
| return requests.get( | |
| f"{PROTOCOL}{ROUTER_IP}/goform/goform_get_cmd_process?multi_data=1&isTest=false&sms_received_flag_flag=0&sts_received_flag_flag=0&cmd={'%2C'.join(prop_keys)}&_=1622776507136", | |
| headers={ | |
| "Referer": f'{PROTOCOL}{ROUTER_IP}/index.html', | |
| "Cookie": f'zwsd="{ROUTER_KEY}"' | |
| } | |
| ).json() | |
| def main(): | |
| print("Logging into [%s]" % ROUTER_IP, file=sys.stderr) | |
| global PASSWORD | |
| while True: | |
| PASSWORD = PASSWORD or os.environ.get("ROUTER_PASSWORD") or base64.b64encode( | |
| getpass.getpass("Password: ").encode("ascii")).decode("ascii") | |
| get_auth_key() | |
| stats = get_multi_data() | |
| if stats["loginfo"] != "ok": | |
| get_auth_key(True) | |
| if PASSWORD == False: | |
| print("\x1b[31m[!]\x1b[0m Incorrect Password", file=sys.stderr) | |
| continue | |
| break | |
| print(json.dumps(get_multi_data(*sys.argv[1:]))) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment

