Skip to content

Instantly share code, notes, and snippets.

@ankurpandeyvns
Created November 19, 2025 20:40
Show Gist options
  • Select an option

  • Save ankurpandeyvns/2f7a94da17645312a07e0c12aea4c4c1 to your computer and use it in GitHub Desktop.

Select an option

Save ankurpandeyvns/2f7a94da17645312a07e0c12aea4c4c1 to your computer and use it in GitHub Desktop.
A comprehensive Python library for managing BOA-based GPON routers (like DG-GR6011 and similar devices).
#!/usr/bin/env python3
"""
GPON ONU/ONT Router Configuration Manager
==========================================
A comprehensive Python script for managing BOA-based GPON devices.
INSTALLATION
------------
pip install requests
QUICK START
-----------
from gpon_onu_manager import GPONManager
# Connect to router
router = GPONManager("192.168.3.1", "admin", "stdONU101")
router.login()
# Get device info
info = router.get_device_info()
print(info)
# Create a PPPoE connection
router.create_pppoe_route_connection(
username="[email protected]",
password="password123",
vlan_id=100
)
# Always logout when done
router.logout()
SUPPORTED FEATURES
------------------
STATUS:
- Device Info (model, firmware, serial number)
- Device Resources (CPU, memory usage)
- WAN Connection Status
- PON/GPON Information
- Connected Users/Devices
- CATV Status
- Wireless Performance Stats
NETWORK:
WAN Configuration:
- Create/Delete/Modify WAN connections
- PPPoE, DHCP, Static IP modes
- Route and Bridge modes
- VLAN tagging
- IPv4/IPv6 dual-stack
- NAT configuration
- Enable/Disable connections
LAN Configuration:
- IPv4/IPv6 LAN settings
- DHCP Server configuration
- DHCP reservations (MAC-IP binding)
- DNS settings
WiFi Configuration:
- 2.4GHz and 5GHz bands
- SSID, password, security settings
- Channel selection
- Bandwidth configuration
- Hidden SSID
- Max clients
- Band Steering
Other Network:
- MTU configuration
- Client Management
- Family Groups (Parental Controls)
- CATV settings
- Port Binding
- TR-069 Remote Management
- QoS (Quality of Service)
- Time/NTP synchronization
- Static Routing
SECURITY:
- Firewall (Low/High security levels)
- DoS Attack Protection
- URL Filtering
- MAC Filtering (Whitelist/Blacklist)
- IP/Port Filtering
- Login Privileges
APPLICATION:
NAT/Port Forwarding:
- DMZ Host configuration
- Virtual Server (Port Forwarding)
- ALG settings (FTP, SIP, H.323, etc.)
- UPnP configuration
VoIP:
- SIP configuration
- Codec settings
- Advanced VoIP options
Other:
- DDNS (DynDNS, No-IP, TZO, etc.)
- MQTT configuration
- Multicast/IGMP
- Samba file sharing
- FTP server
- USB storage management
MANAGEMENT:
- User/Password management
- Configuration backup/restore
- Factory reset
- Scheduled reboot
- Firmware upgrade
- System logs
- SNMP configuration
DIAGNOSTICS:
- Ping/Traceroute/DNS lookup
- Loopback tests
- Port mirroring
- Self-diagnostics
- AP scanning
- Iperf speed tests
EXAMPLES
--------
# Example 1: Create different WAN connection types
router = GPONManager("192.168.3.1", "admin", "password")
router.login()
# PPPoE Route mode (most common for ISPs)
router.create_pppoe_route_connection(
username="[email protected]",
password="pass123",
vlan_id=100,
service_type=1, # INTERNET
nat_enabled=True
)
# DHCP Route mode
router.create_dhcp_route_connection(
vlan_id=200,
nat_enabled=True
)
# Static IP Route mode
router.create_static_route_connection(
ip_address="203.0.113.10",
subnet_mask="255.255.255.0",
gateway="203.0.113.1",
dns1="8.8.8.8",
dns2="8.8.4.4"
)
# Bridge mode (for external router)
router.create_bridge_connection(vlan_id=300)
# Delete a connection
router.delete_wan_connection("ppp_8_35_1")
# Example 2: Configure WiFi
# Change 2.4GHz WiFi settings
router.set_wifi_24g_config({
'ssid': 'MyNetwork',
'pskValue': 'SecurePassword123',
'wlanDisabled': '0',
'hiddenSSID': '0',
'channel': '6',
'band': '2' # 20/40MHz
})
# Change 5GHz WiFi settings
router.set_wifi_5g_config({
'ssid': 'MyNetwork_5G',
'pskValue': 'SecurePassword123',
'channel': '36'
})
# Example 3: Configure DMZ and Port Forwarding
# Enable DMZ host
router.set_dmz(enabled=True, ip="192.168.3.100")
# Add port forwarding rule (Virtual Server)
router.add_virtual_server(
name="Web Server",
external_port_start=8080,
external_port_end=8080,
internal_ip="192.168.3.100",
internal_port=80,
protocol=1 # TCP
)
# Configure ALG settings
router.set_alg_config({
'ftp_algonoff': '1',
'sip_algonoff': '1',
'h323_algonoff': '1'
})
# Example 4: Configure DDNS
router.add_ddns(
provider="dyndns",
hostname="myhost.dyndns.org",
username="myuser",
password="mypass"
)
# Example 5: Firewall and Security
# Set firewall level
router.set_firewall_config(
level=2, # 0=Low, 2=High
dos_protection=True
)
# Add MAC filter
router.set_mac_filter_mode("blacklist")
router.add_mac_filter("AA:BB:CC:DD:EE:FF", "Blocked Device")
# Example 6: Backup and Restore (with encryption/decryption)
# Backup encrypted configuration
config = router.backup_config()
with open('router_backup.bin', 'wb') as f:
f.write(config)
# Backup and decrypt to readable XML
config = router.backup_config(decrypt=True)
with open('router_config.xml', 'wb') as f:
f.write(config)
# Restore from encrypted backup
with open('router_backup.bin', 'rb') as f:
router.restore_config(f.read(), encrypt=False)
# Restore from plaintext XML (auto-encrypts)
with open('router_config.xml', 'rb') as f:
router.restore_config(f.read())
# Decrypt existing backup file
with open('backup.bin', 'rb') as f:
decrypted = GPONManager.decrypt_config(f.read())
with open('config.xml', 'wb') as f:
f.write(decrypted)
# Encrypt plaintext config for manual upload
with open('config.xml', 'rb') as f:
encrypted = GPONManager.encrypt_config(f.read())
with open('backup.bin', 'wb') as f:
f.write(encrypted)
# Example 7: Device Management
# Reboot router
router.reboot()
# Factory reset
router.factory_reset()
# Set scheduled reboot
router.set_scheduled_reboot(
days=[1, 3, 5], # Mon, Wed, Fri
hour=3,
minute=0
)
SERVICE TYPES
-------------
0 = TR069_INTERNET
1 = INTERNET
2 = TR069
3 = Other
4 = VOIP
5 = TR069_VOIP
6 = VOIP_INTERNET
7 = TR069_VOIP_INTERNET
AUTHOR
------
Auto-generated for GPON ONU/ONT device management
"""
import requests
import json
import base64
import re
import time
from typing import Optional, Dict, Any, List
from urllib.parse import urljoin
class GPONManager:
"""
Main class for managing GPON ONU/ONT router.
This class provides methods to configure all aspects of a BOA-based
GPON router including WAN connections, WiFi, security, NAT, and more.
Attributes:
host (str): Router IP address
username (str): Admin username
password (str): Admin password
session (requests.Session): HTTP session for persistent cookies
logged_in (bool): Login status
Example:
>>> router = GPONManager("192.168.3.1", "admin", "password")
>>> router.login()
>>> info = router.get_device_info()
>>> router.logout()
"""
# XOR encryption key for config backups
CONFIG_XOR_KEY = b'tecomtec'
def __init__(self, host: str = "192.168.3.1", username: str = "admin", password: str = "stdONU101"):
"""
Initialize GPONManager.
Args:
host: Router IP address (default: 192.168.3.1)
username: Admin username (default: admin)
password: Admin password
"""
self.host = host
self.base_url = f"http://{host}"
self.username = username
self.password = password
self.session = requests.Session()
self.logged_in = False
def _encode64(self, input_str: str) -> str:
"""Encode string to base64 (same as device JS)"""
return base64.b64encode(input_str.encode()).decode()
def _decode64(self, input_str: str) -> str:
"""Decode base64 string"""
return base64.b64decode(input_str).decode()
@staticmethod
def decrypt_config(encrypted_data: bytes) -> bytes:
"""
Decrypt router configuration backup.
The router uses XOR encryption with the key 'tecomtec'.
Args:
encrypted_data: Encrypted configuration data
Returns:
bytes: Decrypted XML configuration
Example:
>>> with open('backup.bin', 'rb') as f:
... encrypted = f.read()
>>> decrypted = GPONManager.decrypt_config(encrypted)
>>> print(decrypted.decode('utf-8')[:100])
"""
key = GPONManager.CONFIG_XOR_KEY
return bytes([encrypted_data[i] ^ key[i % len(key)] for i in range(len(encrypted_data))])
@staticmethod
def encrypt_config(plaintext_data: bytes) -> bytes:
"""
Encrypt configuration data for upload to router.
The router uses XOR encryption with the key 'tecomtec'.
XOR encryption is symmetric, so this uses the same operation as decrypt.
Args:
plaintext_data: Plaintext XML configuration data
Returns:
bytes: Encrypted configuration data
Example:
>>> with open('config.xml', 'rb') as f:
... plaintext = f.read()
>>> encrypted = GPONManager.encrypt_config(plaintext)
>>> with open('backup.bin', 'wb') as f:
... f.write(encrypted)
"""
# XOR is symmetric - same operation for encrypt and decrypt
return GPONManager.decrypt_config(plaintext_data)
def login(self) -> bool:
"""
Login to the router.
Returns:
bool: True if login successful, False otherwise
Example:
>>> router = GPONManager("192.168.3.1", "admin", "pass")
>>> if router.login():
... print("Connected!")
"""
url = f"{self.base_url}/boaform/admin/formLogin"
data = {
"username": self.username,
"password": self.password
}
try:
response = self.session.post(url, data=data, allow_redirects=False)
if "LOGINED" in response.text or response.status_code == 302:
self.logged_in = True
print(f"Successfully logged in to {self.host}")
return True
else:
print(f"Login failed: {response.text}")
return False
except Exception as e:
print(f"Login error: {e}")
return False
def logout(self) -> bool:
"""
Logout from the router.
Returns:
bool: True if logout successful
"""
url = f"{self.base_url}/boaform/admin/formLogout"
try:
self.session.post(url)
self.logged_in = False
print("Logged out successfully")
return True
except Exception as e:
print(f"Logout error: {e}")
return False
def _get_asp_data(self, endpoint: str) -> str:
"""Get ASP data from router"""
url = f"{self.base_url}/boaform/getASPdata/{endpoint}"
try:
response = self.session.get(url)
return response.text
except Exception as e:
print(f"Error getting ASP data: {e}")
return ""
def _post_form(self, endpoint: str, data: dict) -> str:
"""Post form data to router"""
url = f"{self.base_url}/boaform/getASPdata/{endpoint}"
try:
response = self.session.post(url, data=data)
return response.text
except Exception as e:
print(f"Error posting form: {e}")
return ""
def _post_admin_form(self, endpoint: str, data: dict) -> str:
"""Post form data to admin endpoint"""
url = f"{self.base_url}/boaform/admin/{endpoint}"
try:
response = self.session.post(url, data=data)
return response.text
except Exception as e:
print(f"Error posting admin form: {e}")
return ""
def _parse_key_value(self, text: str) -> dict:
"""Parse key=value formatted response (handles both newline and & separators)"""
result = {}
text = text.strip()
# First try to parse as standard key=value lines
for line in text.split('\n'):
line = line.strip()
if not line:
continue
# Handle format: "label=key1=val1&key2=val2" (e.g., basic_info=devId=xxx&...)
# Skip the prefix label if the value contains & and =
if '&' in line and line.count('=') > 1:
# Find first = and check if rest has &
first_eq = line.find('=')
rest = line[first_eq + 1:]
if '&' in rest:
line = rest
# Check if line contains & separated values (like devId=xxx&devModel=xxx)
if '&' in line and '=' in line:
# Parse & separated key=value pairs
for pair in line.split('&'):
pair = pair.strip()
if '=' in pair:
key, value = pair.split('=', 1)
result[key.strip()] = value.strip()
elif '=' in line:
key, value = line.split('=', 1)
result[key.strip()] = value.strip()
return result
def _parse_json_response(self, text: str) -> dict:
"""Parse JSON response"""
try:
return json.loads(text)
except:
return self._parse_key_value(text)
# ==================== STATUS SECTION ====================
def get_device_info(self) -> dict:
"""
Get basic device information.
Returns:
dict: Device model, ID, hardware version, firmware version, serial number
Example:
>>> info = router.get_device_info()
>>> print(f"Model: {info.get('devModel')}")
>>> print(f"Firmware: {info.get('stVer')}")
"""
data = self._get_asp_data("dev_basic_info")
return self._parse_key_value(data)
def get_device_resources(self) -> dict:
"""
Get CPU and memory usage.
Returns:
dict: CPU usage (cpUsage), Memory usage (memUsage)
"""
data = self._get_asp_data("dev_resource_info")
return self._parse_key_value(data)
def get_wan_status(self) -> list:
"""Get WAN connection status"""
data = self._get_asp_data("listWanConfig")
return self._parse_array_response(data)
def get_pon_info(self) -> dict:
"""Get PON (GPON) information including optical power levels"""
data = self._get_asp_data("ponGetStatus")
return self._parse_key_value(data)
def get_connected_users(self) -> list:
"""
Get list of connected users/devices (DHCP clients).
Returns:
list: Connected devices with IP, MAC, hostname
"""
data = self._get_asp_data("E8BDhcpClientList")
return self._parse_array_response(data)
def get_wlan_clients(self) -> list:
"""Get connected wireless clients"""
data = self._get_asp_data("wirelessClientList")
return self._parse_array_response(data)
def _parse_array_response(self, text: str) -> list:
"""Parse array-style response like listWanv4[0]=key=val&key2=val2"""
result = []
text = text.strip()
for line in text.split('\n'):
line = line.strip()
if not line:
continue
# Handle format: listWanv4[0]=servName=xxx&ipAddr=xxx
if '[' in line and ']=' in line:
# Extract the value part after ]=
value_part = line.split(']=', 1)[1] if ']=' in line else line
# Parse & separated key=value pairs
item = {}
for pair in value_part.split('&'):
pair = pair.strip()
if '=' in pair:
key, value = pair.split('=', 1)
item[key.strip()] = value.strip()
if item:
result.append(item)
return result
def get_catv_status(self) -> dict:
"""Get CATV status"""
data = self._get_asp_data("catv_status")
return self._parse_key_value(data)
def get_wireless_performance(self) -> dict:
"""Get wireless performance statistics"""
data = self._get_asp_data("wireless_performance")
return self._parse_json_response(data)
# ==================== NETWORK - WAN SECTION ====================
def get_wan_config(self) -> dict:
"""Get WAN configuration"""
data = self._get_asp_data("formEthernet_init")
return self._parse_json_response(data)
def get_wan_connections_list(self) -> list:
"""
Get list of all WAN connections with full configuration.
Returns:
list: All configured WAN connections
"""
data = self._get_asp_data("initPageEth")
result = []
for line in data.strip().split('\n'):
line = line.strip()
if not line or '=' not in line:
continue
# Skip metadata lines
if line.startswith('vlan_map=') or line.startswith('poe_proxy=') or line.startswith('pppnumleft='):
continue
# Parse connection: name=key1=val1&key2=val2&...
name, rest = line.split('=', 1)
conn = {'name': name}
for pair in rest.split('&'):
if '=' in pair:
key, value = pair.split('=', 1)
conn[key.strip()] = value.strip()
if len(conn) > 1: # Has more than just name
result.append(conn)
return result
def set_wan_config(self, config: dict) -> str:
"""
Set WAN configuration (low-level method).
Args:
config: Dictionary with WAN parameters
See create_pppoe_route_connection() for easier usage.
"""
if 'pppUsername' in config:
config['encodePppUserName'] = self._encode64(config.get('pppUsername', ''))
if 'pppPassword' in config:
config['encodePppPassword'] = self._encode64(config.get('pppPassword', ''))
config['action'] = 'sv'
return self._post_form("formEthernet", config)
def delete_wan_connection(self, connection_name: str) -> str:
"""
Delete a WAN connection.
Args:
connection_name: Name of connection to delete (e.g., "ppp_8_35_1")
Example:
>>> router.delete_wan_connection("ppp_8_35_1")
"""
data = {
'lkname': connection_name,
'action': 'rm'
}
return self._post_form("formEthernet", data)
def enable_wan_connection(self, connection_name: str) -> str:
"""Enable a WAN connection"""
return self._post_form("formWanEnable", {
'lkname': connection_name,
'enable': '1'
})
def disable_wan_connection(self, connection_name: str) -> str:
"""Disable a WAN connection"""
return self._post_form("formWanEnable", {
'lkname': connection_name,
'enable': '0'
})
def set_nat_type(self, nat_type: int) -> str:
"""
Set NAT type.
Args:
nat_type: 0=NAT4 (Symmetric), 1=NAT1 (Full Cone), 2=NAT2
"""
return self._post_form("formNat", {'nat_type': nat_type})
# ==================== WAN CONNECTION HELPER METHODS ====================
def create_pppoe_route_connection(self, username: str, password: str,
vlan_id: int = None, service_type: int = 1,
nat_enabled: bool = True, mtu: int = 1492,
connection_name: str = "new") -> str:
"""
Create a PPPoE Route mode connection.
Args:
username: PPPoE username from ISP
password: PPPoE password from ISP
vlan_id: VLAN ID (None to disable VLAN tagging)
service_type: 0=TR069_INTERNET, 1=INTERNET, 2=TR069, 3=Other, 4=VOIP
nat_enabled: Enable NAT (default True)
mtu: MTU value (default 1492 for PPPoE)
connection_name: 'new' for new connection or existing name to modify
Returns:
str: Response from router
Example:
>>> router.create_pppoe_route_connection(
... username="[email protected]",
... password="password123",
... vlan_id=100
... )
"""
config = {
'lkname': connection_name,
'lkmode': '1', # Route mode
'IpProtocolType': '3', # IPv4/IPv6
'ipmode': '2', # PPPoE
'cmode': '2',
'ipDhcp': '0',
'napt': '1' if nat_enabled else '0',
'vlan': '1' if vlan_id else '0',
'vid': str(vlan_id) if vlan_id else '0',
'vprio': '0',
'mtu': str(mtu),
'pppUsername': username,
'pppPassword': password,
'pppServiceName': '',
'pppCtype': '0', # Continuous
'applicationtype': str(service_type),
'dnsMode': '1',
'disableLanDhcp': '0',
'itfGroup': '0',
'AddrMode': '32', # Auto for IPv6
'iapd': '1',
'dslite_enable': '0',
'dnsv6Mode': '1',
'iana': '0',
'action': 'sv'
}
config['encodePppUserName'] = self._encode64(username)
config['encodePppPassword'] = self._encode64(password)
return self._post_form("formEthernet", config)
def create_dhcp_route_connection(self, vlan_id: int = None, service_type: int = 1,
nat_enabled: bool = True, mtu: int = 1500,
connection_name: str = "new") -> str:
"""
Create a DHCP Route mode connection.
Args:
vlan_id: VLAN ID (None to disable VLAN tagging)
service_type: Service type (see SERVICE TYPES in docstring)
nat_enabled: Enable NAT
mtu: MTU value
connection_name: 'new' for new connection or existing name to modify
Example:
>>> router.create_dhcp_route_connection(vlan_id=200)
"""
config = {
'lkname': connection_name,
'lkmode': '1', # Route mode
'IpProtocolType': '3', # IPv4/IPv6
'ipmode': '0', # DHCP
'cmode': '1',
'ipDhcp': '1',
'napt': '1' if nat_enabled else '0',
'vlan': '1' if vlan_id else '0',
'vid': str(vlan_id) if vlan_id else '0',
'vprio': '0',
'mtu': str(mtu),
'applicationtype': str(service_type),
'dnsMode': '1',
'disableLanDhcp': '0',
'itfGroup': '0',
'AddrMode': '32',
'iapd': '1',
'dslite_enable': '0',
'dnsv6Mode': '1',
'iana': '0',
'action': 'sv'
}
return self._post_form("formEthernet", config)
def create_static_route_connection(self, ip_address: str, subnet_mask: str,
gateway: str, dns1: str = "", dns2: str = "",
vlan_id: int = None, service_type: int = 1,
nat_enabled: bool = True, mtu: int = 1500,
connection_name: str = "new") -> str:
"""
Create a Static IP Route mode connection.
Args:
ip_address: Static IP address
subnet_mask: Subnet mask (e.g., "255.255.255.0")
gateway: Default gateway
dns1: Primary DNS server
dns2: Secondary DNS server
vlan_id: VLAN ID (None to disable)
service_type: Service type
nat_enabled: Enable NAT
mtu: MTU value
connection_name: 'new' for new connection
Example:
>>> router.create_static_route_connection(
... ip_address="203.0.113.10",
... subnet_mask="255.255.255.0",
... gateway="203.0.113.1",
... dns1="8.8.8.8"
... )
"""
config = {
'lkname': connection_name,
'lkmode': '1', # Route mode
'IpProtocolType': '1', # IPv4 only
'ipmode': '1', # Static
'cmode': '1',
'ipDhcp': '0',
'napt': '1' if nat_enabled else '0',
'vlan': '1' if vlan_id else '0',
'vid': str(vlan_id) if vlan_id else '0',
'vprio': '0',
'mtu': str(mtu),
'ipAddr': ip_address,
'netMask': subnet_mask,
'remoteIpAddr': gateway,
'dnsMode': '0',
'v4dns1': dns1,
'v4dns2': dns2,
'applicationtype': str(service_type),
'disableLanDhcp': '0',
'itfGroup': '0',
'action': 'sv'
}
return self._post_form("formEthernet", config)
def create_bridge_connection(self, vlan_id: int = None, service_type: int = 1,
connection_name: str = "new") -> str:
"""
Create a Bridge mode connection.
Bridge mode passes traffic directly to a downstream router.
Args:
vlan_id: VLAN ID (None to disable)
service_type: 1=INTERNET, 3=Other
connection_name: 'new' for new connection
Example:
>>> router.create_bridge_connection(vlan_id=100)
"""
config = {
'lkname': connection_name,
'lkmode': '0', # Bridge mode
'IpProtocolType': '3',
'ipmode': '0',
'cmode': '0',
'ipDhcp': '0',
'napt': '0',
'vlan': '1' if vlan_id else '0',
'vid': str(vlan_id) if vlan_id else '0',
'vprio': '0',
'mtu': '1500',
'applicationtype': str(service_type),
'disableLanDhcp': '1',
'itfGroup': '0',
'action': 'sv'
}
return self._post_form("formEthernet", config)
def modify_wan_connection(self, connection_name: str, **kwargs) -> str:
"""
Modify an existing WAN connection.
Args:
connection_name: Name of connection to modify
**kwargs: Parameters to change
Example:
>>> router.modify_wan_connection('ppp_8_35_1', mtu='1400', napt='0')
"""
config = {'lkname': connection_name, 'action': 'sv'}
config.update(kwargs)
if 'pppUsername' in kwargs:
config['encodePppUserName'] = self._encode64(kwargs['pppUsername'])
if 'pppPassword' in kwargs:
config['encodePppPassword'] = self._encode64(kwargs['pppPassword'])
return self._post_form("formEthernet", config)
# ==================== NETWORK - LAN SECTION ====================
def get_lan_config(self) -> dict:
"""Get LAN configuration"""
data = self._get_asp_data("init_dhcpmain_page")
return self._parse_key_value(data)
def set_lan_ipv4_config(self, config: dict) -> str:
"""
Set LAN IPv4 configuration.
Args:
config: Dictionary with keys:
- uIp: LAN IP address
- uMask: Subnet mask
- uDhcpType: 0=disabled, 1=enabled, 2=relay
- dhcpRangeStart: DHCP start IP
- dhcpRangeEnd: DHCP end IP
- ulTime: Lease time (60/3600/86400/604800)
- ipv4landnsmode: 0=Proxy, 1=Static, 2=ISP
- Ipv4Dns1, Ipv4Dns2: DNS servers
Example:
>>> router.set_lan_ipv4_config({
... 'uIp': '192.168.3.1',
... 'uMask': '255.255.255.0',
... 'uDhcpType': '1',
... 'dhcpRangeStart': '192.168.3.100',
... 'dhcpRangeEnd': '192.168.3.200',
... 'ulTime': '86400'
... })
"""
return self._post_form("formDhcpd", config)
def get_lan_ipv6_config(self) -> dict:
"""Get LAN IPv6 configuration"""
data = self._get_asp_data("IPv6_LAN_Page_init")
return self._parse_json_response(data)
def set_lan_ipv6_config(self, config: dict) -> str:
"""Set LAN IPv6 configuration"""
return self._post_form("formDhcpServerV6", config)
def get_dhcp_reservations(self) -> list:
"""Get DHCP MAC-IP reservations"""
data = self._get_asp_data("showMACBaseTable")
return self._parse_json_response(data)
def add_dhcp_reservation(self, mac: str, ip: str) -> str:
"""
Add DHCP reservation (MAC-IP binding).
Args:
mac: MAC address (format: 00:11:22:33:44:55)
ip: IP address to assign
Example:
>>> router.add_dhcp_reservation("00:11:22:33:44:55", "192.168.3.50")
"""
return self._post_form("formMacAddrBase", {
'macAddr_Dhcp_a': mac,
'ipAddr_Dhcp_a': ip,
'action': 'sv'
})
def delete_dhcp_reservation(self, mac: str, ip: str) -> str:
"""Delete DHCP reservation"""
return self._post_form("formMacAddrBase", {
'macAddr_Dhcp_d': mac,
'ipAddr_Dhcp_d': ip
})
# ==================== NETWORK - WIFI SECTION ====================
def get_wifi_24g_config(self) -> dict:
"""Get 2.4GHz WiFi configuration"""
data = self._get_asp_data("wlan_init_24g")
return self._parse_json_response(data)
def set_wifi_24g_config(self, config: dict) -> str:
"""
Set 2.4GHz WiFi configuration.
Args:
config: Dictionary with keys:
- ssid: Network name
- wlanDisabled: '0'=enabled, '1'=disabled
- hiddenSSID: '0'=visible, '1'=hidden
- channel: 1-11 (0=auto)
- band: '0'=20MHz, '1'=40MHz, '2'=20/40MHz
- wpaAuth: Security type
- pskValue: WiFi password (8-63 chars)
- maxStaNum: Max connected clients
Example:
>>> router.set_wifi_24g_config({
... 'ssid': 'MyNetwork',
... 'pskValue': 'MyPassword123',
... 'channel': '6'
... })
"""
return self._post_form("formWlanSetup_24g", config)
def get_wifi_5g_config(self) -> dict:
"""Get 5GHz WiFi configuration"""
data = self._get_asp_data("wlan_init_5g")
return self._parse_json_response(data)
def set_wifi_5g_config(self, config: dict) -> str:
"""Set 5GHz WiFi configuration (same parameters as 2.4GHz)"""
return self._post_form("formWlanSetup_5g", config)
# ==================== NETWORK - OTHER ====================
def get_mtu_config(self) -> dict:
"""Get MTU configuration"""
data = self._get_asp_data("get_system_mtu")
return self._parse_key_value(data)
def set_mtu(self, mtu: int) -> str:
"""Set system MTU value (typically 1500)"""
return self._post_form("formMTU", {'mtu': mtu})
def get_band_steering_config(self) -> dict:
"""Get band steering configuration"""
data = self._get_asp_data("band_steering_config")
return self._parse_key_value(data)
def set_band_steering(self, enabled: bool, threshold: int = -70) -> str:
"""
Enable/disable band steering.
Band steering automatically moves clients between 2.4GHz and 5GHz.
Args:
enabled: Enable band steering
threshold: RSSI threshold in dBm
"""
return self._post_form("formBandSteering", {
'enable': 1 if enabled else 0,
'rssiThreshold': threshold
})
# ==================== NETWORK - TR-069 ====================
def get_tr069_config(self) -> dict:
"""Get TR-069 remote management configuration"""
data = self._get_asp_data("Network_TR069_Page_init")
return self._parse_key_value(data)
def set_tr069_config(self, config: dict) -> str:
"""
Set TR-069 configuration.
Args:
config: Dictionary with keys:
- enable: Enable TR-069
- acsUrl: ACS server URL
- acsUser: ACS username
- acsPass: ACS password
- informEnable: Enable periodic inform
- informInterval: Inform interval in seconds
"""
return self._post_form("formTR069", config)
# ==================== NETWORK - QoS ====================
def get_qos_config(self) -> dict:
"""Get QoS configuration including queue policy and traffic control"""
result = {}
# Get queue policy (main QoS settings)
queue_data = self._get_asp_data("initQueuePolicy")
if queue_data:
for line in queue_data.strip().split('\n'):
if '=' in line:
key, value = line.split('=', 1)
if key.startswith('queues'):
# Parse queue config like: qindex=0&prio=1&weight=0&enable=1
queue_cfg = {}
for pair in value.split('&'):
if '=' in pair:
k, v = pair.split('=', 1)
queue_cfg[k] = v
result[key] = queue_cfg
else:
result[key] = value
# Get traffic control settings
traffic_data = self._get_asp_data("initTraffictlPage")
if traffic_data:
traffic_cfg = self._parse_key_value(traffic_data)
result['traffic_control'] = traffic_cfg
# Get QoS rules
rules_data = self._get_asp_data("initQosRulePage")
if rules_data:
result['rules'] = self._parse_key_value(rules_data)
return result
def set_qos_enabled(self, enabled: bool) -> str:
"""Enable/disable QoS"""
return self._post_form("formQoS", {'enable': 1 if enabled else 0})
# ==================== NETWORK - TIME ====================
def get_time_config(self) -> dict:
"""Get time/NTP configuration"""
data = self._get_asp_data("Network_Time_Page_init")
return self._parse_key_value(data)
def set_time_config(self, config: dict) -> str:
"""
Set time configuration.
Args:
config: Dictionary with keys:
- ntpEnable: Enable NTP
- ntpServer: NTP server address
- timezone: Timezone offset
"""
return self._post_form("formTime", config)
# ==================== NETWORK - ROUTING ====================
def get_route_config(self) -> dict:
"""Get static route and RIP configuration"""
result = {}
# Get RIP status
rip_data = self._get_asp_data("showRipIf")
result['rip'] = self._parse_key_value(rip_data)
# Get static routes
static_data = self._get_asp_data("showStaticRoute")
result['static_routes'] = self._parse_key_value(static_data)
# Get route list (active routes)
route_data = self._get_asp_data("routeList")
routes = []
for line in route_data.strip().split('\n'):
if '=' in line:
parts = line.split('=', 1)
if len(parts) == 2:
values = parts[1].split('&')
if len(values) >= 5:
routes.append({
'destination': values[0],
'netmask': values[1],
'gateway': values[2],
'interface': values[3],
'metric': values[4]
})
result['routes'] = routes
return result
def add_static_route(self, dest: str, mask: str, gateway: str, interface: str = "") -> str:
"""
Add static route.
Args:
dest: Destination network
mask: Network mask
gateway: Gateway IP
interface: Optional interface name
"""
return self._post_form("formRoute", {
'destNet': dest,
'netMask': mask,
'gateway': gateway,
'interface': interface,
'action': 'add'
})
def delete_static_route(self, dest: str, mask: str, gateway: str) -> str:
"""Delete static route"""
return self._post_form("formRoute", {
'destNet': dest,
'netMask': mask,
'gateway': gateway,
'action': 'del'
})
# ==================== SECURITY - FIREWALL ====================
def get_firewall_config(self) -> dict:
"""Get firewall configuration"""
data = self._get_asp_data("initPageFirewall")
return self._parse_key_value(data)
def set_firewall_config(self, level: int = 0, dos_protection: bool = True) -> str:
"""
Set firewall configuration.
Args:
level: 0=Low, 2=High
dos_protection: Enable DoS attack protection
Example:
>>> router.set_firewall_config(level=2, dos_protection=True)
"""
return self._post_form("formFirewall", {
'FirewallLevel': str(level),
'DosEnable': '1' if dos_protection else '0'
})
# ==================== SECURITY - URL FILTER ====================
def get_url_filter_config(self) -> dict:
"""Get URL filter configuration"""
data = self._get_asp_data("url_filter_config")
return self._parse_json_response(data)
def set_url_filter_enabled(self, enabled: bool) -> str:
"""Enable/disable URL filtering"""
return self._post_form("formUrlFilter", {'enable': 1 if enabled else 0})
def add_url_filter(self, url: str, action: str = "deny") -> str:
"""Add URL to filter list"""
return self._post_form("formUrlFilter", {
'url': url,
'action': action,
'cmd': 'add'
})
def delete_url_filter(self, url: str) -> str:
"""Remove URL from filter list"""
return self._post_form("formUrlFilter", {
'url': url,
'cmd': 'del'
})
# ==================== SECURITY - MAC FILTER ====================
def get_mac_filter_config(self) -> dict:
"""Get MAC filter configuration"""
data = self._get_asp_data("mac_filter_config")
return self._parse_json_response(data)
def set_mac_filter_mode(self, mode: str) -> str:
"""
Set MAC filter mode.
Args:
mode: 'disabled', 'whitelist', or 'blacklist'
"""
return self._post_form("formMacFilter", {'mode': mode})
def add_mac_filter(self, mac: str, description: str = "") -> str:
"""Add MAC to filter list"""
return self._post_form("formMacFilter", {
'mac': mac,
'desc': description,
'action': 'add'
})
def delete_mac_filter(self, mac: str) -> str:
"""Remove MAC from filter list"""
return self._post_form("formMacFilter", {
'mac': mac,
'action': 'del'
})
# ==================== SECURITY - PORT FILTER ====================
def get_port_filter_config(self) -> dict:
"""Get IP/Port filter configuration"""
data = self._get_asp_data("port_filter_config")
return self._parse_json_response(data)
def add_port_filter(self, config: dict) -> str:
"""
Add port filter rule.
Args:
config: Dictionary with keys:
- protocol: 'tcp', 'udp', or 'both'
- srcIP, srcMask: Source IP/mask
- srcPortStart, srcPortEnd: Source port range
- dstIP, dstMask: Destination IP/mask
- dstPortStart, dstPortEnd: Destination port range
- action: 'accept' or 'deny'
"""
config['cmd'] = 'add'
return self._post_form("formPortFilter", config)
# ==================== APPLICATION - NAT/DMZ/PORT FORWARDING ====================
def get_alg_config(self) -> dict:
"""
Get ALG (Application Layer Gateway) configuration.
Returns:
dict: ALG settings for FTP, SIP, H.323, etc.
"""
data = self._get_asp_data("GetAlgTypes")
return self._parse_key_value(data)
def set_alg_config(self, config: dict) -> str:
"""
Set ALG configuration.
Args:
config: Dictionary with keys (all '0' or '1'):
- ftp_algonoff: FTP ALG
- tftp_algonoff: TFTP ALG
- h323_algonoff: H.323 ALG
- rtsp_algonoff: RTSP ALG
- l2tp_algonoff: L2TP ALG
- ipsec_algonoff: IPSec ALG
- sip_algonoff: SIP ALG
- pptp_algonoff: PPTP ALG
Example:
>>> router.set_alg_config({
... 'ftp_algonoff': '1',
... 'sip_algonoff': '1'
... })
"""
config['apply'] = '1'
return self._post_form("formALGOnOff", config)
def get_dmz_config(self) -> dict:
"""
Get DMZ configuration.
Returns:
dict: DMZ enabled status and IP
"""
data = self._get_asp_data("GetAlgTypes")
return self._parse_key_value(data)
def set_dmz(self, enabled: bool, ip: str = "") -> str:
"""
Set DMZ host.
DMZ exposes all ports of a device to the internet.
Args:
enabled: Enable DMZ
ip: DMZ host IP address
Example:
>>> router.set_dmz(True, "192.168.3.100")
"""
return self._post_form("formDMZ", {
'dmzcap': '1' if enabled else '0',
'ip': ip
})
def get_virtual_servers(self) -> list:
"""
Get virtual server (port forwarding) list.
Returns:
list: Configured port forwarding rules
"""
data = self._get_asp_data("virtualSvrList")
return self._parse_key_value(data)
def add_virtual_server(self, name: str, external_port_start: int, external_port_end: int,
internal_ip: str, internal_port: int, protocol: int = 1,
external_ip: str = "", prefix_len: int = 0) -> str:
"""
Add virtual server (port forwarding rule).
Args:
name: Rule name/description
external_port_start: External start port
external_port_end: External end port
internal_ip: Internal server IP
internal_port: Internal port
protocol: 1=TCP, 2=UDP, 4=TCP/UDP
external_ip: External IP (optional, leave empty for all)
prefix_len: IP prefix length (0-32)
Example:
>>> # Forward port 8080 to internal web server
>>> router.add_virtual_server(
... name="Web Server",
... external_port_start=8080,
... external_port_end=8080,
... internal_ip="192.168.3.100",
... internal_port=80,
... protocol=1
... )
>>> # Forward port range for gaming
>>> router.add_virtual_server(
... name="Gaming",
... external_port_start=27000,
... external_port_end=27050,
... internal_ip="192.168.3.50",
... internal_port=27000,
... protocol=4 # TCP+UDP
... )
"""
return self._post_admin_form("formVrtsrv", {
'cusSrvName': name,
'remotehost': external_ip,
'IpPrefixLen': str(prefix_len),
'wanStartPort': str(external_port_start),
'wanEndPort': str(external_port_end),
'protoType': str(protocol),
'serverIp': internal_ip,
'lanPort': str(internal_port),
'vrtenable': '1',
'action': 'add'
})
def delete_virtual_server(self, rule_id: str) -> str:
"""Delete virtual server rule"""
return self._post_admin_form("formVrtsrv", {
rule_id: '1',
'action': 'delete'
})
# ==================== APPLICATION - UPNP ====================
def get_upnp_config(self) -> dict:
"""Get UPnP configuration"""
data = self._get_asp_data("get_UPnP_Page_data")
return self._parse_key_value(data)
def set_upnp(self, enabled: bool, wan_interface: str = "") -> str:
"""
Enable/disable UPnP.
Args:
enabled: Enable UPnP
wan_interface: WAN interface for UPnP
Example:
>>> router.set_upnp(True)
"""
return self._post_form("formUpnp", {
'daemon': '1' if enabled else '0',
'ext_if': wan_interface
})
# ==================== APPLICATION - DDNS ====================
def get_ddns_config(self) -> dict:
"""Get DDNS configuration"""
data = self._get_asp_data("showDNSTable")
return self._parse_key_value(data)
def set_ddns_enabled(self, enabled: bool) -> str:
"""Enable/disable DDNS service"""
return self._post_form("formDDNS", {
'ddnsEnable': '1' if enabled else '0',
'action': 'sw'
})
def add_ddns(self, provider: str, hostname: str, username: str, password: str,
interface: str = "LAN") -> str:
"""
Add DDNS entry.
Args:
provider: 'oray', 'dyndns', 'tzo', 'noip', or 'gnudip'
hostname: DDNS hostname
username: Account username
password: Account password
interface: Interface to use
Example:
>>> router.add_ddns(
... provider="dyndns",
... hostname="myhost.dyndns.org",
... username="myuser",
... password="mypass"
... )
"""
data = {
'provider': provider,
'hostname': hostname,
'ifname': interface,
'action': 'ad'
}
# Provider-specific username/password fields
if provider == 'oray':
data['orayusername'] = username
data['oraypassword'] = password
elif provider == 'dyndns':
data['dynusername'] = username
data['dynpassword'] = password
elif provider == 'tzo':
data['tzoemail'] = username
data['tzokey'] = password
elif provider == 'noip':
data['noipusername'] = username
data['noippassword'] = password
elif provider == 'gnudip':
data['gnudipusername'] = username
data['gnudippassword'] = password
return self._post_form("formDDNS", data)
def delete_ddns(self, rule_id: str) -> str:
"""Delete DDNS entry"""
return self._post_form("formDDNS", {
rule_id: '1',
'action': 'rm'
})
# ==================== APPLICATION - VOIP ====================
def get_voip_config(self) -> dict:
"""Get VoIP basic configuration"""
data = self._get_asp_data("asp_voip_e8c_get")
return self._parse_key_value(data)
def set_voip_config(self, config: dict) -> str:
"""Set VoIP configuration"""
return self._post_form("formVoIP", config)
# ==================== APPLICATION - SAMBA/FTP ====================
def get_samba_config(self) -> dict:
"""Get Samba file sharing configuration"""
data = self._get_asp_data("get_Samba_Page_data")
return self._parse_key_value(data)
def set_samba_config(self, enabled: bool, username: str, password: str) -> str:
"""
Set Samba file sharing configuration.
Args:
enabled: Enable Samba
username: Samba username
password: Samba password
"""
return self._post_form("formSamba", {
'EnableSamba': '1' if enabled else '0',
'SambaName': username,
'SambaPasswd': password
})
def get_ftp_config(self) -> dict:
"""Get FTP server configuration"""
data = self._get_asp_data("get_Ftp_Page_data")
return self._parse_key_value(data)
def set_ftp_config(self, enabled: bool, username: str, password: str) -> str:
"""
Set FTP server configuration.
Args:
enabled: Enable FTP server
username: FTP username
password: FTP password
"""
return self._post_form("formFtp", {
'EnableFtp': '1' if enabled else '0',
'FtpName': username,
'FtpPasswd': password
})
# ==================== MANAGEMENT - USER ====================
def get_user_info(self) -> dict:
"""Get current user information"""
data = self._get_asp_data("initManageUserPage")
return self._parse_key_value(data)
def change_password(self, old_password: str, new_password: str) -> str:
"""
Change user password.
Password requirements:
- 6-32 characters
- At least 2 types: uppercase, lowercase, numbers, special chars
Args:
old_password: Current password
new_password: New password
"""
return self._post_form("new_formPasswordSetup", {
'oldPasswd': old_password,
'newPasswd': new_password,
'affirmPasswd': new_password
})
# ==================== MANAGEMENT - DEVICE ====================
def backup_config(self, decrypt: bool = False) -> bytes:
"""
Backup device configuration.
Args:
decrypt: If True, return decrypted XML. If False, return raw encrypted data.
Returns:
bytes: Configuration file content (decrypted XML if decrypt=True, else encrypted)
Example:
>>> # Get encrypted backup
>>> config = router.backup_config()
>>> with open('backup.bin', 'wb') as f:
... f.write(config)
>>> # Get decrypted XML directly
>>> config = router.backup_config(decrypt=True)
>>> with open('config.xml', 'wb') as f:
... f.write(config)
"""
import socket
import urllib.parse
# Get CSRF token first
csrf_url = f"{self.base_url}/boaform/getASPdata/FMask"
csrf_response = self.session.get(csrf_url)
csrf_token = csrf_response.text.strip()
# Parse host from base_url
parsed = urllib.parse.urlparse(self.base_url)
host = parsed.hostname
port = parsed.port or 80
# Build HTTP request manually for HTTP/0.9 compatible response
body = f"csrfMask={csrf_token}"
request = (
f"POST /boaform/getASPdata/formMgmConfig HTTP/1.0\r\n"
f"Host: {host}\r\n"
f"Content-Type: application/x-www-form-urlencoded; charset=UTF-8\r\n"
f"X-Requested-With: XMLHttpRequest\r\n"
f"Content-Length: {len(body)}\r\n"
f"Connection: close\r\n"
f"\r\n"
f"{body}"
)
# Send request using raw socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(30)
sock.connect((host, port))
sock.sendall(request.encode())
# Receive response
response = b""
while True:
chunk = sock.recv(4096)
if not chunk:
break
response += chunk
sock.close()
if decrypt:
return self.decrypt_config(response)
return response
def restore_config(self, config_data: bytes, encrypt: bool = True) -> str:
"""
Restore device configuration from backup.
Args:
config_data: Configuration file content (plaintext XML or encrypted)
encrypt: If True, encrypt plaintext XML before upload.
If False, upload data as-is (for pre-encrypted backups).
Example:
>>> # Upload plaintext XML (will be encrypted automatically)
>>> with open('config.xml', 'rb') as f:
... router.restore_config(f.read())
>>> # Upload pre-encrypted backup
>>> with open('backup.bin', 'rb') as f:
... router.restore_config(f.read(), encrypt=False)
"""
# Encrypt if needed (for plaintext XML configs)
if encrypt:
# Check if it looks like plaintext XML
if config_data.startswith(b'<') or b'<Config' in config_data[:100]:
config_data = self.encrypt_config(config_data)
files = {'binary': ('config.xml', config_data)}
url = f"{self.base_url}/boaform/getASPdata/formMgmConfigUpload"
response = self.session.post(url, files=files)
return response.text
def factory_reset(self, reset_type: int = 8) -> str:
"""
Reset device to factory defaults.
Args:
reset_type: 4=Reset to defaults, 8=Full factory reset
"""
return self._post_form("formNewReboot", {'reset': str(reset_type)})
def reboot(self) -> str:
"""Reboot the device"""
return self._post_form("formNewReboot", {})
def set_scheduled_reboot(self, days: list, hour: int, minute: int) -> str:
"""
Set scheduled automatic reboot.
Args:
days: List of days (1=Mon, 2=Tue, ..., 7=Sun)
hour: Hour (0-23)
minute: Minute (0-59)
Example:
>>> # Reboot every Monday and Friday at 3:00 AM
>>> router.set_scheduled_reboot([1, 5], 3, 0)
"""
day_bits = 0
for d in days:
day_bits |= (1 << d)
return self._post_form("formSysSchedule", {
'day': str(day_bits),
'hour': str(hour),
'minute': str(minute)
})
def firmware_upgrade(self, firmware_data: bytes) -> str:
"""
Upgrade device firmware.
Args:
firmware_data: Firmware file content (.tar format)
"""
files = {'binary': ('firmware.tar', firmware_data)}
url = f"{self.base_url}/boaform/getASPdata/formUpload"
response = self.session.post(url, files=files)
return response.text
# ==================== MANAGEMENT - LOGS ====================
def get_system_log(self) -> str:
"""Get system log"""
data = self._get_asp_data("system_log")
return data
def clear_system_log(self) -> str:
"""Clear system log"""
return self._post_form("formClearLog", {})
# ==================== DIAGNOSTICS ====================
def ping(self, target: str, count: int = 4) -> str:
"""
Ping a host.
Args:
target: IP or hostname to ping
count: Number of pings
"""
return self._post_form("formPing", {
'target': target,
'count': count
})
def traceroute(self, target: str) -> str:
"""Traceroute to a host"""
return self._post_form("formTraceroute", {'target': target})
def nslookup(self, hostname: str) -> str:
"""DNS lookup"""
return self._post_form("formNslookup", {'hostname': hostname})
def run_self_diagnose(self) -> dict:
"""Run self-diagnostics"""
data = self._post_form("formSelfDiagnose", {})
return self._parse_json_response(data)
def scan_access_points(self, band: str = "both") -> list:
"""
Scan for nearby access points.
Args:
band: '2.4g', '5g', or 'both'
"""
data = self._post_form("formAPDetect", {'band': band})
return self._parse_json_response(data)
# ==================== INTERACTIVE CLI ====================
import signal
import sys
from datetime import datetime
class InteractiveCLI:
"""Interactive command-line interface for GPON Router Management"""
def __init__(self):
self.router = None
self.running = True
def print_header(self, title: str):
"""Print formatted section header"""
width = 60
print("\n" + "=" * width)
print(f" {title}")
print("=" * width)
def print_subheader(self, title: str):
"""Print formatted subsection header"""
print(f"\n--- {title} ---")
def print_table(self, data: dict, title: str = None):
"""Print data as formatted table"""
if title:
self.print_subheader(title)
if not data:
print(" No data available")
return
max_key_len = max(len(str(k)) for k in data.keys()) if data else 10
for key, value in data.items():
print(f" {str(key):<{max_key_len}} : {value}")
def print_list(self, items: list, title: str = None):
"""Print list as formatted output"""
if title:
self.print_subheader(title)
if not items:
print(" No items found")
return
for i, item in enumerate(items, 1):
if isinstance(item, dict):
print(f"\n [{i}]")
for k, v in item.items():
print(f" {k}: {v}")
else:
print(f" [{i}] {item}")
def print_success(self, message: str):
"""Print success message"""
print(f"\n [SUCCESS] {message}")
def print_error(self, message: str):
"""Print error message"""
print(f"\n [ERROR] {message}")
def print_info(self, message: str):
"""Print info message"""
print(f"\n [INFO] {message}")
def get_input(self, prompt: str, default: str = None) -> str:
"""Get user input with optional default"""
if default:
user_input = input(f" {prompt} [{default}]: ").strip()
return user_input if user_input else default
return input(f" {prompt}: ").strip()
def get_bool_input(self, prompt: str, default: bool = None) -> bool:
"""Get yes/no input with optional default"""
if default is True:
response = input(f" {prompt} (Y/n): ").strip().lower()
return response not in ['n', 'no', '0', 'false']
elif default is False:
response = input(f" {prompt} (y/N): ").strip().lower()
return response in ['y', 'yes', '1', 'true']
else:
response = input(f" {prompt} (y/n): ").strip().lower()
return response in ['y', 'yes', '1', 'true']
def main_menu(self):
"""Display main menu"""
self.print_header("GPON Router Configuration Manager")
print("""
MAIN MENU
─────────────────────────────────
1. Status & Information
2. WAN Configuration
3. LAN Configuration
4. WiFi Configuration
5. Security Settings
6. NAT / Port Forwarding
7. Applications (DDNS, UPnP, etc.)
8. Device Management
9. Diagnostics
0. Exit (or Ctrl+C)
─────────────────────────────────
""")
def status_menu(self):
"""Status and Information submenu"""
while True:
self.print_header("Status & Information")
print("""
1. Device Information
2. Resource Usage (CPU/Memory)
3. WAN Status
4. PON Information
5. Connected Users
6. Wireless Clients
7. Show All Status
0. Back to Main Menu
""")
choice = self.get_input("Select option")
if choice == '0':
break
elif choice == '1':
info = self.router.get_device_info()
# Format with friendly labels
label_map = {
'devId': 'Device ID',
'devModel': 'Model',
'hdVer': 'Hardware Version',
'stVer': 'Firmware Version',
'gpon_sn': 'GPON Serial',
'mac_address': 'MAC Address',
'web_uptime': 'Uptime'
}
formatted = {}
for key, value in info.items():
label = label_map.get(key, key)
formatted[label] = value
self.print_table(formatted, "Device Information")
elif choice == '2':
resources = self.router.get_device_resources()
formatted = {}
for key, value in resources.items():
if 'cpu' in key.lower() or 'cp' in key.lower():
formatted['CPU Usage'] = f"{value}%"
elif 'mem' in key.lower():
formatted['Memory Usage'] = f"{value}%"
else:
formatted[key] = value
self.print_table(formatted if formatted else resources, "Resource Usage")
elif choice == '3':
status = self.router.get_wan_status()
if isinstance(status, list) and status:
self.print_subheader("WAN Connections")
for i, conn in enumerate(status, 1):
print(f"\n Connection {i}:")
formatted = {
'Service': conn.get('servName', 'N/A'),
'Protocol': conn.get('protocol', 'N/A'),
'Status': conn.get('strStatus', 'N/A').upper(),
'IP Address': conn.get('ipAddr', 'N/A'),
'Subnet Mask': conn.get('netmask', 'N/A'),
'Gateway': conn.get('gateway', 'N/A'),
'DNS 1': conn.get('dns1', 'N/A'),
'VLAN ID': conn.get('vlanId', 'N/A'),
'MAC Address': conn.get('MacAddr', 'N/A')
}
for k, v in formatted.items():
print(f" {k:<12} : {v}")
else:
self.print_table(status if isinstance(status, dict) else {}, "WAN Status")
elif choice == '4':
pon = self.router.get_pon_info()
# Format PON info with friendly labels
label_map = {
'pon_mode': 'PON Mode',
'pon_connect_status': 'Connection Status',
'temperature': 'Temperature',
'voltage': 'Voltage',
'tx-power': 'TX Power',
'rx-power': 'RX Power',
'bias-current': 'Bias Current',
'bytes-sent': 'Bytes Sent',
'bytes-received': 'Bytes Received',
'packets-sent': 'Packets Sent',
'packets-received': 'Packets Received'
}
formatted = {}
for key, value in pon.items():
label = label_map.get(key, key.replace('-', ' ').title())
formatted[label] = value
self.print_table(formatted, "PON Information")
elif choice == '5':
users = self.router.get_connected_users()
if isinstance(users, list) and users:
self.print_subheader("Connected Users (DHCP Clients)")
for i, user in enumerate(users, 1):
print(f"\n Device {i}:")
# Format lease time
lease_time = user.get('liveTime', '')
if lease_time:
try:
secs = int(lease_time)
hours = secs // 3600
mins = (secs % 3600) // 60
lease_time = f"{hours}h {mins}m"
except:
pass
formatted = {
'Hostname': user.get('devname', user.get('hostName', '')),
'IP Address': user.get('ipAddr', ''),
'MAC Address': user.get('macAddr', ''),
'Lease Time': lease_time
}
for k, v in formatted.items():
if v:
print(f" {k:<12} : {v}")
elif isinstance(users, dict) and users:
self.print_table(users, "Connected Users")
else:
print("\n--- Connected Users ---")
print(" No users connected")
elif choice == '6':
clients = self.router.get_wlan_clients()
if isinstance(clients, list):
self.print_list(clients, "Wireless Clients")
else:
self.print_table(clients, "Wireless Clients")
elif choice == '7':
info = self.router.get_device_info()
resources = self.router.get_device_resources()
self.print_table(info, "Device Information")
self.print_table(resources, "Resource Usage")
input("\n Press Enter to continue...")
def wan_menu(self):
"""WAN Configuration submenu"""
while True:
self.print_header("WAN Configuration")
print("""
VIEW
1. List WAN Connections
2. View WAN Status
CREATE
3. Create PPPoE Connection
4. Create DHCP Connection
5. Create Static IP Connection
6. Create Bridge Connection
MANAGE
7. Enable/Disable Connection
8. Delete Connection
9. Set NAT Type
0. Back to Main Menu
""")
choice = self.get_input("Select option")
if choice == '0':
break
elif choice == '1':
connections = self.router.get_wan_connections_list()
if isinstance(connections, list) and connections:
self.print_subheader("WAN Connections")
for i, conn in enumerate(connections, 1):
print(f"\n Connection {i}: {conn.get('name', 'Unknown')}")
# Determine connection type
conn_type = "Route" if conn.get('brmode') == '0' else "Bridge"
ip_mode = "DHCP" if conn.get('cmode') == '1' else "Static"
if conn.get('pppPassword'):
ip_mode = "PPPoE"
formatted = {
'Type': conn_type,
'Mode': ip_mode,
'IP Address': conn.get('ipAddr', 'N/A'),
'Subnet': conn.get('netMask', 'N/A'),
'Gateway': conn.get('remoteIpAddr', 'N/A'),
'DNS': conn.get('v4dns1', 'N/A'),
'VLAN ID': conn.get('vid', 'N/A'),
'NAT': 'Enabled' if conn.get('napt') == '1' else 'Disabled',
'MTU': conn.get('mtu', 'N/A')
}
for k, v in formatted.items():
if v and v != 'N/A':
print(f" {k:<12} : {v}")
else:
print("\n--- WAN Connections ---")
print(" No connections configured")
elif choice == '2':
status = self.router.get_wan_status()
if isinstance(status, list) and status:
self.print_subheader("WAN Status")
for i, conn in enumerate(status, 1):
print(f"\n Connection {i}:")
formatted = {
'Service': conn.get('servName', 'N/A'),
'Protocol': conn.get('protocol', 'N/A'),
'Status': conn.get('strStatus', 'N/A').upper(),
'IP Address': conn.get('ipAddr', 'N/A'),
'Subnet Mask': conn.get('netmask', 'N/A'),
'Gateway': conn.get('gateway', 'N/A'),
'DNS 1': conn.get('dns1', 'N/A'),
'VLAN ID': conn.get('vlanId', 'N/A'),
'MAC Address': conn.get('MacAddr', 'N/A')
}
for k, v in formatted.items():
print(f" {k:<12} : {v}")
else:
print("\n--- WAN Status ---")
print(" No status available")
elif choice == '3':
self.print_subheader("Create PPPoE Connection")
username = self.get_input("PPPoE Username")
password = self.get_input("PPPoE Password")
vlan = self.get_input("VLAN ID (leave empty for none)", "")
vlan_id = int(vlan) if vlan else None
nat = self.get_bool_input("Enable NAT?")
result = self.router.create_pppoe_route_connection(
username=username,
password=password,
vlan_id=vlan_id,
nat_enabled=nat
)
self.print_success("PPPoE connection created")
elif choice == '4':
self.print_subheader("Create DHCP Connection")
vlan = self.get_input("VLAN ID (leave empty for none)", "")
vlan_id = int(vlan) if vlan else None
nat = self.get_bool_input("Enable NAT?")
result = self.router.create_dhcp_route_connection(
vlan_id=vlan_id,
nat_enabled=nat
)
self.print_success("DHCP connection created")
elif choice == '5':
self.print_subheader("Create Static IP Connection")
ip = self.get_input("IP Address")
mask = self.get_input("Subnet Mask", "255.255.255.0")
gateway = self.get_input("Gateway")
dns1 = self.get_input("DNS 1", "8.8.8.8")
dns2 = self.get_input("DNS 2", "8.8.4.4")
vlan = self.get_input("VLAN ID (leave empty for none)", "")
vlan_id = int(vlan) if vlan else None
result = self.router.create_static_route_connection(
ip_address=ip,
subnet_mask=mask,
gateway=gateway,
dns1=dns1,
dns2=dns2,
vlan_id=vlan_id
)
self.print_success("Static IP connection created")
elif choice == '6':
self.print_subheader("Create Bridge Connection")
vlan = self.get_input("VLAN ID (leave empty for none)", "")
vlan_id = int(vlan) if vlan else None
result = self.router.create_bridge_connection(vlan_id=vlan_id)
self.print_success("Bridge connection created")
elif choice == '7':
conn_name = self.get_input("Connection name")
enable = self.get_bool_input("Enable connection?")
if enable:
self.router.enable_wan_connection(conn_name)
self.print_success(f"Connection '{conn_name}' enabled")
else:
self.router.disable_wan_connection(conn_name)
self.print_success(f"Connection '{conn_name}' disabled")
elif choice == '8':
conn_name = self.get_input("Connection name to delete")
if self.get_bool_input(f"Are you sure you want to delete '{conn_name}'?"):
self.router.delete_wan_connection(conn_name)
self.print_success(f"Connection '{conn_name}' deleted")
elif choice == '9':
self.print_subheader("Set NAT Type")
print(" 0 = NAT4 (Symmetric)")
print(" 1 = NAT1 (Full Cone)")
print(" 2 = NAT2")
nat_type = int(self.get_input("NAT Type", "0"))
self.router.set_nat_type(nat_type)
self.print_success(f"NAT type set to {nat_type}")
input("\n Press Enter to continue...")
def lan_menu(self):
"""LAN Configuration submenu"""
while True:
self.print_header("LAN Configuration")
print("""
1. View LAN Configuration
2. Set LAN IP Address
3. Configure DHCP Server
4. View DHCP Reservations
5. Add DHCP Reservation
6. Delete DHCP Reservation
0. Back to Main Menu
""")
choice = self.get_input("Select option")
if choice == '0':
break
elif choice == '1':
config = self.router.get_lan_config()
formatted = {
'LAN IP': config.get('uIp', 'N/A'),
'Subnet Mask': config.get('uMask', 'N/A'),
'DHCP Server': 'Enabled' if config.get('uDhcpType') == '1' else 'Disabled',
'DHCP Start': config.get('dhcpRangeStart', 'N/A'),
'DHCP End': config.get('dhcpRangeEnd', 'N/A'),
'Lease Time': config.get('ulTime', 'N/A') + ' seconds'
}
self.print_table(formatted, "LAN Configuration")
elif choice == '2':
self.print_subheader("Set LAN IP")
ip = self.get_input("New LAN IP", "192.168.3.1")
mask = self.get_input("Subnet Mask", "255.255.255.0")
self.router.set_lan_ipv4_config({'uIp': ip, 'uMask': mask})
self.print_success(f"LAN IP set to {ip}")
elif choice == '3':
self.print_subheader("Configure DHCP Server")
enable = self.get_bool_input("Enable DHCP Server?")
if enable:
start = self.get_input("DHCP Start IP", "192.168.3.100")
end = self.get_input("DHCP End IP", "192.168.3.200")
lease = self.get_input("Lease Time (seconds)", "86400")
self.router.set_lan_ipv4_config({
'uDhcpType': '1',
'dhcpRangeStart': start,
'dhcpRangeEnd': end,
'ulTime': lease
})
self.print_success("DHCP Server configured")
else:
self.router.set_lan_ipv4_config({'uDhcpType': '0'})
self.print_success("DHCP Server disabled")
elif choice == '4':
reservations = self.router.get_dhcp_reservations()
if isinstance(reservations, list):
self.print_list(reservations, "DHCP Reservations")
else:
self.print_table(reservations, "DHCP Reservations")
elif choice == '5':
self.print_subheader("Add DHCP Reservation")
mac = self.get_input("MAC Address (00:11:22:33:44:55)")
ip = self.get_input("IP Address")
self.router.add_dhcp_reservation(mac, ip)
self.print_success(f"Reserved {ip} for {mac}")
elif choice == '6':
self.print_subheader("Delete DHCP Reservation")
mac = self.get_input("MAC Address")
ip = self.get_input("IP Address")
self.router.delete_dhcp_reservation(mac, ip)
self.print_success(f"Reservation deleted")
input("\n Press Enter to continue...")
def wifi_menu(self):
"""WiFi Configuration submenu"""
while True:
self.print_header("WiFi Configuration")
print("""
2.4GHz
1. View 2.4GHz Configuration
2. Configure 2.4GHz WiFi
5GHz
3. View 5GHz Configuration
4. Configure 5GHz WiFi
OTHER
5. View Connected Clients
6. Configure Band Steering
0. Back to Main Menu
""")
choice = self.get_input("Select option")
if choice == '0':
break
elif choice == '1':
config = self.router.get_wifi_24g_config()
self.print_table(config, "2.4GHz WiFi Configuration")
elif choice == '2':
self.print_subheader("Configure 2.4GHz WiFi")
ssid = self.get_input("SSID (Network Name)")
password = self.get_input("Password (8-63 chars)")
hidden = self.get_bool_input("Hide SSID?")
channel = self.get_input("Channel (0=Auto, 1-11)", "0")
self.router.set_wifi_24g_config({
'ssid': ssid,
'pskValue': password,
'hiddenSSID': '1' if hidden else '0',
'channel': channel,
'wlanDisabled': '0'
})
self.print_success("2.4GHz WiFi configured")
elif choice == '3':
config = self.router.get_wifi_5g_config()
self.print_table(config, "5GHz WiFi Configuration")
elif choice == '4':
self.print_subheader("Configure 5GHz WiFi")
ssid = self.get_input("SSID (Network Name)")
password = self.get_input("Password (8-63 chars)")
hidden = self.get_bool_input("Hide SSID?")
channel = self.get_input("Channel (0=Auto)", "0")
self.router.set_wifi_5g_config({
'ssid': ssid,
'pskValue': password,
'hiddenSSID': '1' if hidden else '0',
'channel': channel,
'wlanDisabled': '0'
})
self.print_success("5GHz WiFi configured")
elif choice == '5':
clients = self.router.get_wlan_clients()
if isinstance(clients, list):
self.print_list(clients, "Connected Wireless Clients")
else:
self.print_table(clients, "Connected Wireless Clients")
elif choice == '6':
self.print_subheader("Band Steering")
enable = self.get_bool_input("Enable Band Steering?")
if enable:
threshold = self.get_input("RSSI Threshold (dBm)", "-70")
self.router.set_band_steering(True, int(threshold))
else:
self.router.set_band_steering(False)
self.print_success("Band Steering configured")
input("\n Press Enter to continue...")
def security_menu(self):
"""Security Settings submenu"""
while True:
self.print_header("Security Settings")
print("""
FIREWALL
1. View Firewall Configuration
2. Configure Firewall
MAC FILTER
3. View MAC Filter Settings
4. Set MAC Filter Mode
5. Add MAC to Filter
6. Delete MAC from Filter
URL FILTER
7. Add URL Filter
8. Delete URL Filter
0. Back to Main Menu
""")
choice = self.get_input("Select option")
if choice == '0':
break
elif choice == '1':
config = self.router.get_firewall_config()
formatted = {
'Firewall Level': 'High' if config.get('FirewallLevel') == '2' else 'Low',
'DoS Protection': 'Enabled' if config.get('DosEnable') == '1' else 'Disabled'
}
self.print_table(formatted, "Firewall Configuration")
elif choice == '2':
self.print_subheader("Configure Firewall")
print(" 0 = Low Security")
print(" 2 = High Security")
level = int(self.get_input("Security Level", "0"))
dos = self.get_bool_input("Enable DoS Protection?")
self.router.set_firewall_config(level, dos)
self.print_success("Firewall configured")
elif choice == '3':
config = self.router.get_mac_filter_config()
self.print_table(config, "MAC Filter Configuration")
elif choice == '4':
self.print_subheader("Set MAC Filter Mode")
print(" Options: disabled, whitelist, blacklist")
mode = self.get_input("Mode", "disabled")
self.router.set_mac_filter_mode(mode)
self.print_success(f"MAC Filter mode set to '{mode}'")
elif choice == '5':
mac = self.get_input("MAC Address")
desc = self.get_input("Description", "")
self.router.add_mac_filter(mac, desc)
self.print_success(f"MAC {mac} added to filter")
elif choice == '6':
mac = self.get_input("MAC Address to remove")
self.router.delete_mac_filter(mac)
self.print_success(f"MAC {mac} removed from filter")
elif choice == '7':
url = self.get_input("URL to block")
self.router.add_url_filter(url)
self.print_success(f"URL '{url}' added to filter")
elif choice == '8':
url = self.get_input("URL to unblock")
self.router.delete_url_filter(url)
self.print_success(f"URL '{url}' removed from filter")
input("\n Press Enter to continue...")
def nat_menu(self):
"""NAT / Port Forwarding submenu"""
while True:
self.print_header("NAT / Port Forwarding")
print("""
DMZ
1. View DMZ Configuration
2. Enable/Disable DMZ
PORT FORWARDING
3. View Port Forwarding Rules
4. Add Port Forwarding Rule
5. Delete Port Forwarding Rule
ALG
6. View ALG Configuration
7. Configure ALG
UPnP
8. View UPnP Status
9. Enable/Disable UPnP
0. Back to Main Menu
""")
choice = self.get_input("Select option")
if choice == '0':
break
elif choice == '1':
config = self.router.get_dmz_config()
formatted = {
'DMZ Enabled': 'Yes' if config.get('dmzcap') == '1' else 'No',
'DMZ Host IP': config.get('ip', 'N/A')
}
self.print_table(formatted, "DMZ Configuration")
elif choice == '2':
enable = self.get_bool_input("Enable DMZ?")
if enable:
ip = self.get_input("DMZ Host IP")
self.router.set_dmz(True, ip)
self.print_success(f"DMZ enabled for {ip}")
else:
self.router.set_dmz(False)
self.print_success("DMZ disabled")
elif choice == '3':
rules = self.router.get_virtual_servers()
self.print_table(rules, "Port Forwarding Rules")
elif choice == '4':
self.print_subheader("Add Port Forwarding Rule")
name = self.get_input("Rule Name")
ext_start = int(self.get_input("External Port Start"))
ext_end = int(self.get_input("External Port End", str(ext_start)))
int_ip = self.get_input("Internal IP")
int_port = int(self.get_input("Internal Port", str(ext_start)))
print(" Protocol: 1=TCP, 2=UDP, 4=TCP+UDP")
proto = int(self.get_input("Protocol", "1"))
self.router.add_virtual_server(
name=name,
external_port_start=ext_start,
external_port_end=ext_end,
internal_ip=int_ip,
internal_port=int_port,
protocol=proto
)
self.print_success(f"Port forwarding rule '{name}' added")
elif choice == '5':
rule_id = self.get_input("Rule ID to delete")
self.router.delete_virtual_server(rule_id)
self.print_success("Rule deleted")
elif choice == '6':
config = self.router.get_alg_config()
formatted = {}
alg_names = {
'ftp_algonoff': 'FTP ALG',
'tftp_algonoff': 'TFTP ALG',
'h323_algonoff': 'H.323 ALG',
'rtsp_algonoff': 'RTSP ALG',
'l2tp_algonoff': 'L2TP ALG',
'ipsec_algonoff': 'IPSec ALG',
'sip_algonoff': 'SIP ALG',
'pptp_algonoff': 'PPTP ALG'
}
for key, name in alg_names.items():
if key in config:
formatted[name] = 'Enabled' if config[key] == '1' else 'Disabled'
self.print_table(formatted, "ALG Configuration")
elif choice == '7':
self.print_subheader("Configure ALG")
ftp = '1' if self.get_bool_input("Enable FTP ALG?") else '0'
sip = '1' if self.get_bool_input("Enable SIP ALG?") else '0'
h323 = '1' if self.get_bool_input("Enable H.323 ALG?") else '0'
pptp = '1' if self.get_bool_input("Enable PPTP ALG?") else '0'
self.router.set_alg_config({
'ftp_algonoff': ftp,
'sip_algonoff': sip,
'h323_algonoff': h323,
'pptp_algonoff': pptp
})
self.print_success("ALG configured")
elif choice == '8':
config = self.router.get_upnp_config()
formatted = {
'UPnP': 'Enabled' if config.get('daemon') == '1' else 'Disabled'
}
self.print_table(formatted, "UPnP Status")
elif choice == '9':
enable = self.get_bool_input("Enable UPnP?")
self.router.set_upnp(enable)
self.print_success(f"UPnP {'enabled' if enable else 'disabled'}")
input("\n Press Enter to continue...")
def applications_menu(self):
"""Applications submenu"""
while True:
self.print_header("Applications")
print("""
DDNS
1. View DDNS Configuration
2. Enable/Disable DDNS
3. Add DDNS Entry
SAMBA / FTP
4. View Samba Configuration
5. Configure Samba
6. View FTP Configuration
7. Configure FTP
OTHER
8. View TR-069 Configuration
9. Configure QoS
0. Back to Main Menu
""")
choice = self.get_input("Select option")
if choice == '0':
break
elif choice == '1':
config = self.router.get_ddns_config()
self.print_table(config, "DDNS Configuration")
elif choice == '2':
enable = self.get_bool_input("Enable DDNS?")
self.router.set_ddns_enabled(enable)
self.print_success(f"DDNS {'enabled' if enable else 'disabled'}")
elif choice == '3':
self.print_subheader("Add DDNS Entry")
print(" Providers: oray, dyndns, tzo, noip, gnudip")
provider = self.get_input("Provider", "dyndns")
hostname = self.get_input("Hostname")
username = self.get_input("Username")
password = self.get_input("Password")
self.router.add_ddns(provider, hostname, username, password)
self.print_success(f"DDNS entry for {hostname} added")
elif choice == '4':
config = self.router.get_samba_config()
self.print_table(config, "Samba Configuration")
elif choice == '5':
self.print_subheader("Configure Samba")
enable = self.get_bool_input("Enable Samba?")
if enable:
username = self.get_input("Username")
password = self.get_input("Password")
self.router.set_samba_config(True, username, password)
else:
self.router.set_samba_config(False, "", "")
self.print_success("Samba configured")
elif choice == '6':
config = self.router.get_ftp_config()
self.print_table(config, "FTP Configuration")
elif choice == '7':
self.print_subheader("Configure FTP")
enable = self.get_bool_input("Enable FTP?")
if enable:
username = self.get_input("Username")
password = self.get_input("Password")
self.router.set_ftp_config(True, username, password)
else:
self.router.set_ftp_config(False, "", "")
self.print_success("FTP configured")
elif choice == '8':
config = self.router.get_tr069_config()
self.print_table(config, "TR-069 Configuration")
elif choice == '9':
enable = self.get_bool_input("Enable QoS?")
self.router.set_qos_enabled(enable)
self.print_success(f"QoS {'enabled' if enable else 'disabled'}")
input("\n Press Enter to continue...")
def device_menu(self):
"""Device Management submenu"""
while True:
self.print_header("Device Management")
print("""
BACKUP / RESTORE
1. Backup Configuration
2. Restore Configuration
MAINTENANCE
3. Reboot Router
4. Factory Reset
5. Set Scheduled Reboot
6. Firmware Upgrade
PASSWORD
7. Change Admin Password
LOGS
8. View System Log
9. Clear System Log
0. Back to Main Menu
""")
choice = self.get_input("Select option")
if choice == '0':
break
elif choice == '1':
decrypt = self.get_bool_input("Decrypt to readable XML?", default=True)
default_ext = ".xml" if decrypt else ".bin"
filename = self.get_input("Backup filename", f"router_backup{default_ext}")
config = self.router.backup_config(decrypt=decrypt)
with open(filename, 'wb') as f:
f.write(config)
if decrypt:
self.print_success(f"Decrypted configuration saved to '{filename}'")
else:
self.print_success(f"Encrypted configuration saved to '{filename}'")
elif choice == '2':
filename = self.get_input("Config filename to restore")
try:
with open(filename, 'rb') as f:
config_data = f.read()
# Auto-detect if file is plaintext XML or encrypted
is_plaintext = config_data.startswith(b'<') or b'<Config' in config_data[:100]
if is_plaintext:
self.print_info("Detected plaintext XML - will encrypt before upload")
self.router.restore_config(config_data, encrypt=True)
else:
self.print_info("Detected encrypted backup - uploading as-is")
self.router.restore_config(config_data, encrypt=False)
self.print_success("Configuration restored. Router will reboot.")
except FileNotFoundError:
self.print_error(f"File '{filename}' not found")
elif choice == '3':
if self.get_bool_input("Are you sure you want to reboot?"):
self.router.reboot()
self.print_success("Router is rebooting...")
self.running = False
break
elif choice == '4':
print("\n WARNING: This will erase all settings!")
if self.get_bool_input("Are you absolutely sure?"):
self.router.factory_reset()
self.print_success("Factory reset initiated...")
self.running = False
break
elif choice == '5':
self.print_subheader("Scheduled Reboot")
print(" Enter days as comma-separated (1=Mon, 2=Tue, ..., 7=Sun)")
days_input = self.get_input("Days (e.g., 1,3,5)", "")
if days_input:
days = [int(d.strip()) for d in days_input.split(',')]
hour = int(self.get_input("Hour (0-23)", "3"))
minute = int(self.get_input("Minute (0-59)", "0"))
self.router.set_scheduled_reboot(days, hour, minute)
self.print_success("Scheduled reboot configured")
elif choice == '6':
filename = self.get_input("Firmware file (.tar)")
try:
with open(filename, 'rb') as f:
self.router.firmware_upgrade(f.read())
self.print_success("Firmware upgrade started...")
except FileNotFoundError:
self.print_error(f"File '{filename}' not found")
elif choice == '7':
old_pass = self.get_input("Current Password")
new_pass = self.get_input("New Password")
confirm = self.get_input("Confirm New Password")
if new_pass != confirm:
self.print_error("Passwords do not match")
else:
self.router.change_password(old_pass, new_pass)
self.print_success("Password changed")
elif choice == '8':
log = self.router.get_system_log()
print("\n--- System Log ---")
print(log[:2000] if len(log) > 2000 else log)
elif choice == '9':
self.router.clear_system_log()
self.print_success("System log cleared")
if self.running:
input("\n Press Enter to continue...")
def diagnostics_menu(self):
"""Diagnostics submenu"""
while True:
self.print_header("Diagnostics")
print("""
NETWORK TESTS
1. Ping
2. Traceroute
3. DNS Lookup
OTHER
4. Run Self-Diagnostics
5. Scan Nearby Access Points
0. Back to Main Menu
""")
choice = self.get_input("Select option")
if choice == '0':
break
elif choice == '1':
target = self.get_input("Host to ping", "8.8.8.8")
count = int(self.get_input("Number of pings", "4"))
result = self.router.ping(target, count)
print("\n--- Ping Results ---")
print(result)
elif choice == '2':
target = self.get_input("Host to trace")
result = self.router.traceroute(target)
print("\n--- Traceroute Results ---")
print(result)
elif choice == '3':
hostname = self.get_input("Hostname to lookup")
result = self.router.nslookup(hostname)
print("\n--- DNS Lookup Results ---")
print(result)
elif choice == '4':
result = self.router.run_self_diagnose()
self.print_table(result, "Self-Diagnostics Results")
elif choice == '5':
print(" Bands: 2.4g, 5g, both")
band = self.get_input("Band to scan", "both")
result = self.router.scan_access_points(band)
if isinstance(result, list):
self.print_list(result, "Nearby Access Points")
else:
self.print_table(result, "Nearby Access Points")
input("\n Press Enter to continue...")
def run(self):
"""Main loop"""
# Setup signal handler for Ctrl+C
def signal_handler(sig, frame):
print("\n\n Exiting... Goodbye!")
if self.router and self.router.logged_in:
self.router.logout()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
# Get connection details
self.print_header("GPON Router Configuration Manager")
print("\n Connection Setup")
print(" ─────────────────────────────────")
host = self.get_input("Router IP", "192.168.3.1")
username = self.get_input("Username", "admin")
password = self.get_input("Password", "stdONU101")
# Initialize router
self.router = GPONManager(host, username, password)
# Login
print("\n Connecting...")
if not self.router.login():
self.print_error("Failed to login! Please check credentials.")
return
self.print_success(f"Connected to {host}")
# Main menu loop
while self.running:
try:
self.main_menu()
choice = self.get_input("Select option")
if choice == '0':
break
elif choice == '1':
self.status_menu()
elif choice == '2':
self.wan_menu()
elif choice == '3':
self.lan_menu()
elif choice == '4':
self.wifi_menu()
elif choice == '5':
self.security_menu()
elif choice == '6':
self.nat_menu()
elif choice == '7':
self.applications_menu()
elif choice == '8':
self.device_menu()
elif choice == '9':
self.diagnostics_menu()
else:
self.print_error("Invalid option")
except Exception as e:
self.print_error(f"Error: {str(e)}")
input("\n Press Enter to continue...")
# Logout
print("\n Disconnecting...")
self.router.logout()
print("\n Goodbye!\n")
def main():
"""Main entry point"""
cli = InteractiveCLI()
cli.run()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment