Skip to content

Instantly share code, notes, and snippets.

@miraclx
Created June 7, 2021 13:46
Show Gist options
  • Select an option

  • Save miraclx/2c3cc535404cf9d0314a466c6e2a6304 to your computer and use it in GitHub Desktop.

Select an option

Save miraclx/2c3cc535404cf9d0314a466c6e2a6304 to your computer and use it in GitHub Desktop.
Get info from MTN MF283 on the CLI

Show realtime information from the MTN MF283 Router

Get pretty display for realtime data

One-off query returning JSON data

import os
import re
import sys
import time
import base64
import getpass
import datetime
import requests
import functools
from datetime import timedelta
from http.cookies import SimpleCookie
ROUTER_KEY = None
ROUTER_KEY_PATH = os.path.join(os.path.expanduser("~"), '.router_key')
PROTOCOL = "http://"
ROUTER_IP = "192.168.0.1"
PASSWORD = None
def get_auth_key(force=False):
global ROUTER_KEY, PASSWORD
if not force:
if not ROUTER_KEY and os.path.exists(ROUTER_KEY_PATH):
with open(ROUTER_KEY_PATH, "r") as f:
try:
ROUTER_KEY, _ = next(filter(lambda auth_pair: auth_pair[1] == PASSWORD, ((
line[:32], line[32:]) for line in f.read().splitlines())))
except StopIteration:
pass
if ROUTER_KEY:
return ROUTER_KEY
r = requests.post(
f"{PROTOCOL}{ROUTER_IP}/goform/goform_set_cmd_process",
data=f"isTest=false&goformId=LOGIN&password={PASSWORD}",
headers={"Referer": f"{PROTOCOL}{ROUTER_IP}/index.html"}
)
if r.json()["result"] != "0":
PASSWORD = False
return
ROUTER_KEY = SimpleCookie(r.headers.get("set-cookie"))["zwsd"].value
with open(ROUTER_KEY_PATH, "w+") as f:
modified, lines = functools.reduce(lambda final, this: (True, (final[1].append[(ROUTER_KEY, PASSWORD)], final)[1]) if this[1] == PASSWORD else (False, (final[1].append(this), final)[1]), ((
line[:32], line[32:]) for line in f.read().splitlines()), (False, []))
if not modified:
lines.append((ROUTER_KEY, PASSWORD))
f.write("%s" % "\n".join("".join(pair) for pair in lines))
return ROUTER_KEY
def get_multi_data():
prop_keys = [
"loginfo",
"signalbar",
"network_type",
"network_provider",
"lan_ipaddr",
"wan_ipaddr",
"user_ip_addr",
"LocalDomain",
"wifi_onoff_state",
"wifi_access_sta_num",
"wifi_chip1_ssid1_switch_onoff",
"wifi_chip1_ssid1_ssid",
"wifi_chip1_ssid1_access_sta_num",
"wifi_chip1_ssid1_wifi_coverage",
"wifi_chip1_ssid1_max_access_num",
"wifi_chip1_ssid1_auth_mode",
"wifi_chip1_ssid1_password_encode",
"wifi_chip2_ssid1_switch_onoff",
"wifi_chip2_ssid1_ssid",
"wifi_chip2_ssid1_access_sta_num",
"wifi_chip2_ssid1_wifi_coverage",
"wifi_chip2_ssid1_max_access_num",
"wifi_chip2_ssid1_auth_mode",
"wifi_chip2_ssid1_password_encode",
"battery_charging",
"battery_vol_percent",
"realtime_tx_bytes",
"realtime_rx_bytes",
"realtime_time",
"realtime_tx_thrpt",
"realtime_rx_thrpt",
"monthly_rx_bytes",
"monthly_tx_bytes",
"monthly_time",
"data_volume_limit_switch",
"data_volume_limit_size",
"data_volume_alert_percent",
"data_volume_limit_unit",
"sms_unread_num",
]
return requests.get(
f"{PROTOCOL}{ROUTER_IP}/goform/goform_get_cmd_process?multi_data=1&isTest=false&sms_received_flag_flag=0&sts_received_flag_flag=0&cmd={'%2C'.join(prop_keys)}&_=1622776507136",
headers={
"Referer": f'{PROTOCOL}{ROUTER_IP}/index.html',
"Cookie": f'zwsd="{ROUTER_KEY}"'
}
).json()
def pad(scale=None, fill=" "):
return {"type": "expand", "scale": scale, "fill": fill}
def push_align(fill=" "):
return {
"type": "postsized",
"size": lambda aggr, occupied: aggr[1] - occupied,
"fill": fill if callable(fill) else lambda size: fill * size
}
def backspace(space=1):
return {
"type": "sized",
"size": lambda _1, _2: space,
}
def min_max(iterable):
_min, _max = None, None
for v in iterable:
_min, _max = (v if _min is None else min(v, _min)
), (v if _max is None else max(v, _max))
return _min, _max
def mk_progressbar(value, total, chars=10):
filled = (value * chars) // total
return ("#" * filled) + (" " * (chars - filled))
def fmt_speed(speed, bits=False):
if bits:
speed *= 8
for unit in ['', 'K', 'M', 'G', 'T', 'P', 'E', 'Z']:
if abs(speed) < 1024.0:
return "%3.2f %s" % (speed, unit + ("b" if bits else "B"))
speed /= 1024.0
return "%.2f %s" % (speed, 'Yb')
def strip_ansi(string):
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
return ansi_escape.sub('', string)
def build(input_stacks):
stacks = []
rows, columns = [*(int(s)
for s in os.popen('stty size', 'r').read().split())]
for line in input_stacks:
final = [None] * len(line)
sized_components, post_sized_components, expanding_components = [], [], []
occupied = 0
for (index, component) in enumerate(line):
if isinstance(component, str):
occupied += len(strip_ansi(component))
final[index] = component
elif isinstance(component, dict):
size = component.get("size")
fill = component.get("fill")
if component["type"] == "sized":
sized_components.append((index, size, fill))
elif component["type"] == "postsized":
post_sized_components.append((index, size, fill))
elif component["type"] == "expand":
expanding_components.append(
(index, size, fill, component.get("scale")))
for (index, size, fill) in sized_components:
if size and callable(size):
size = size(columns - occupied, occupied)
if not isinstance(size, (int, float)):
size = 0
final[index] = fill(size) if callable(fill) else ""
occupied += size
stacks.append(
(final, occupied, post_sized_components, expanding_components))
min_fixed, max_fixed = min_max(line[1] for line in stacks)
for (final, occupied, post_sized_components, expanding_components) in stacks:
for (index, size, fill) in post_sized_components:
if size and callable(size):
size = size((min_fixed, max_fixed), occupied)
if not isinstance(size, (int, float)):
size = 0
final[index] = fill(size) if callable(fill) else ""
occupied += size
full_width = columns - occupied
total_scaling = functools.reduce(
lambda total_scaling, component: total_scaling + (component[3] or 1), expanding_components, 0)
for (index, size, fill, scale) in expanding_components:
free = (full_width * (scale or 1)) / total_scaling
if size and callable(size):
size = size(free)
if not isinstance(size, (int, float)):
size = free
final[index] = fill(size) if callable(fill) else (
fill if isinstance(fill, str) else " ") * int(size)
return (stack[0] for stack in stacks)
def battery_info(stats):
return "[%s] %s%%%s" % (mk_progressbar(int(stats["battery_vol_percent"]), 100),
stats["battery_vol_percent"].rjust(3, " "), " (charging)" if stats["battery_charging"] == "1" else " (discharging)")
def signal_info(stats):
return "[%s] %s%% [%s] (%s)" % (mk_progressbar(
int(stats["signalbar"]), 5), str((int(stats["signalbar"]) * 100) // 5).rjust(3, " "), stats["network_type"], stats["network_provider"])
def wifi_info(stats):
active_users_ssid_1, max_users_ssid_1 = int(
stats["wifi_chip1_ssid1_access_sta_num"]), int(stats["wifi_chip1_ssid1_max_access_num"])
active_users_ssid_2, max_users_ssid_2 = int(
stats["wifi_chip2_ssid1_access_sta_num"]), int(stats["wifi_chip2_ssid1_max_access_num"])
return [
["WiFi:", "%s" % ("on (\u26d3: %s/%s)" % (stats["wifi_access_sta_num"], max_users_ssid_1 + max_users_ssid_2))
if stats["wifi_onoff_state"] == "1" else "off"],
*([
*([
["\x1b[33m • SSID 1\x1b[0m:", "%s (\u26d3: %s/%s)" % (
stats["wifi_chip1_ssid1_ssid"], active_users_ssid_1, max_users_ssid_1)],
["\x1b[33m • Password\x1b[0m:", "%s" %
base64.b64decode(stats["wifi_chip1_ssid1_password_encode"]).decode("ascii")]
] if stats["wifi_chip1_ssid1_switch_onoff"] == "1" else []),
*([
["\x1b[33m • SSID 2\x1b[0m:", "%s (\u26d3: %s/%s)" % (
stats["wifi_chip2_ssid1_ssid"], active_users_ssid_2, max_users_ssid_2)],
["\x1b[33m • Password\x1b[0m:", "%s" %
base64.b64decode(stats["wifi_chip2_ssid1_password_encode"]).decode("ascii")]
] if stats["wifi_chip2_ssid1_switch_onoff"] else []),
] if stats["wifi_onoff_state"] == "1" else [])
]
def realtime_traffic_info(stats):
up, down = int(stats["realtime_tx_thrpt"]), int(
stats["realtime_rx_thrpt"])
return [
["\x1b[33mSpeed\x1b[0m:", "(\u21e1: %sps, \u21e3: %sps, \u21f5: %sps)" % (
fmt_speed(up, bits=True),
fmt_speed(down, bits=True),
fmt_speed(up + down, bits=True)
)],
]
def session_traffic_info(stats):
up, down = int(stats["realtime_tx_bytes"]), int(
stats["realtime_rx_bytes"])
return [
["Session Data Usage:", "(duration: %s)" % timedelta(
seconds=int(stats["realtime_time"]))],
["\x1b[33m • Data Usage\x1b[0m:", "(\u21e1: %s, \u21e3: %s, \u21f5: %s)" % (
fmt_speed(up),
fmt_speed(down),
fmt_speed(up + down)
)],
]
def monthly_traffic_info(stats):
up, down = int(stats["monthly_tx_bytes"]), int(
stats["monthly_rx_bytes"])
return [
["Monthly Data Usage:", "(duration: %s)" % timedelta(
seconds=int(stats["monthly_time"]))],
["\x1b[33m • Data Usage\x1b[0m:", "(\u21e1: %s, \u21e3: %s, \u21f5: %s)" % (
fmt_speed(up),
fmt_speed(down),
fmt_speed(up + down)
)],
]
def volume_limit_info(stats):
if stats["data_volume_limit_switch"] != "1":
return []
elif stats["data_volume_limit_unit"] == "data":
size, unit = stats["data_volume_limit_size"].split("_")
total = int(size) * 1048576 * int(unit)
usage = int(stats["monthly_tx_bytes"]) + \
int(stats["monthly_rx_bytes"])
return [
["\x1b[33mData Usage Limit\x1b[0m:", "(used %s%% of %s, alerts at %s%%)" %
((usage * 100) // total, fmt_speed(total), int(stats["data_volume_alert_percent"]))],
]
elif stats["data_volume_limit_unit"] == "time":
total_hours = timedelta(
hours=int(stats["data_volume_limit_size"]))
used_seconds = int(stats["monthly_time"])
return [
["\x1b[33mTime Usage Limit\x1b[0m:", "(used %s%% of %s, alerts at %s%%)" %
((used_seconds * 100) // total_hours.total_seconds(), str(total_hours).split(",")[0], int(stats["data_volume_alert_percent"]))],
]
def main():
global PASSWORD
PASSWORD = PASSWORD or os.environ.get("ROUTER_PASSWORD") or base64.b64encode(
getpass.getpass("Password: ").encode("ascii")).decode("ascii")
get_auth_key()
while True:
stats = get_multi_data()
if stats["loginfo"] != "ok":
get_auth_key(True)
if PASSWORD == False:
print("\x1b[31m[!]\x1b[0m Incorrect Password", file=sys.stderr)
break
continue
stack = [
["\x1b[36mRouter Info\x1b[0m"],
["\x1b[33mBattery\x1b[0m:", battery_info(stats)],
["\x1b[33mSignal\x1b[0m:", signal_info(stats)],
["\x1b[33mServer IP\x1b[0m:", "LAN: %s, WAN: %s" %
(stats["lan_ipaddr"], stats["wan_ipaddr"])],
["\x1b[33mDevice IP\x1b[0m:", "%s" % stats["user_ip_addr"]],
*wifi_info(stats),
*realtime_traffic_info(stats),
*session_traffic_info(stats),
*monthly_traffic_info(stats),
*volume_limit_info(stats),
["\x1b[33mSMS\x1b[0m:", "%s unread" % stats["sms_unread_num"]],
]
print("\n".join("".join(line) for line in build(
[pad(), *([component for component in line for component in [component, pad(.06, fill=lambda size: "\x1b[38;5;245m%s\x1b[0m" % ("\u06d4" * int(size))), push_align(fill=lambda size: "\x1b[38;5;245m%s\x1b[0m" % ("\u06d4" * size))]][:-2] if not isinstance(line, tuple) else line[0]), pad()] for line in stack)))
time.sleep(.5)
for line in stack:
sys.stdout.write(f"\x1b[F\x1b[2K")
if __name__ == "__main__":
print("Logging into [%s]" % ROUTER_IP, file=sys.stderr)
while True:
main()
import os
import sys
import json
import base64
import getpass
import requests
import functools
from http.cookies import SimpleCookie
ROUTER_KEY = None
ROUTER_KEY_PATH = os.path.join(os.path.expanduser("~"), '.router_key')
PROTOCOL = "http://"
ROUTER_IP = "192.168.0.1"
PASSWORD = None
def get_auth_key(force=False):
global ROUTER_KEY, PASSWORD
if not force:
if not ROUTER_KEY and os.path.exists(ROUTER_KEY_PATH):
with open(ROUTER_KEY_PATH, "r") as f:
try:
ROUTER_KEY, _ = next(filter(lambda auth_pair: auth_pair[1] == PASSWORD, ((
line[:32], line[32:]) for line in f.read().splitlines())))
except StopIteration:
pass
if ROUTER_KEY:
return ROUTER_KEY
r = requests.post(
f"{PROTOCOL}{ROUTER_IP}/goform/goform_set_cmd_process",
data=f"isTest=false&goformId=LOGIN&password={PASSWORD}",
headers={"Referer": f"{PROTOCOL}{ROUTER_IP}/index.html"}
)
if r.json()["result"] != "0":
PASSWORD = False
return
ROUTER_KEY = SimpleCookie(r.headers.get("set-cookie"))["zwsd"].value
with open(ROUTER_KEY_PATH, "w+") as f:
modified, lines = functools.reduce(lambda final, this: (True, (final[1].append[(ROUTER_KEY, PASSWORD)], final)[1]) if this[1] == PASSWORD else (False, (final[1].append(this), final)[1]), ((
line[:32], line[32:]) for line in f.read().splitlines()), (False, []))
if not modified:
lines.append((ROUTER_KEY, PASSWORD))
f.write("%s" % "\n".join("".join(pair) for pair in lines))
return ROUTER_KEY
def get_multi_data(*cmds):
prop_keys = ["loginfo", *cmds]
return requests.get(
f"{PROTOCOL}{ROUTER_IP}/goform/goform_get_cmd_process?multi_data=1&isTest=false&sms_received_flag_flag=0&sts_received_flag_flag=0&cmd={'%2C'.join(prop_keys)}&_=1622776507136",
headers={
"Referer": f'{PROTOCOL}{ROUTER_IP}/index.html',
"Cookie": f'zwsd="{ROUTER_KEY}"'
}
).json()
def main():
print("Logging into [%s]" % ROUTER_IP, file=sys.stderr)
global PASSWORD
while True:
PASSWORD = PASSWORD or os.environ.get("ROUTER_PASSWORD") or base64.b64encode(
getpass.getpass("Password: ").encode("ascii")).decode("ascii")
get_auth_key()
stats = get_multi_data()
if stats["loginfo"] != "ok":
get_auth_key(True)
if PASSWORD == False:
print("\x1b[31m[!]\x1b[0m Incorrect Password", file=sys.stderr)
continue
break
print(json.dumps(get_multi_data(*sys.argv[1:])))
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment