Created
November 19, 2025 20:40
-
-
Save ankurpandeyvns/2f7a94da17645312a07e0c12aea4c4c1 to your computer and use it in GitHub Desktop.
A comprehensive Python library for managing BOA-based GPON routers (like DG-GR6011 and similar devices).
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| GPON ONU/ONT Router Configuration Manager | |
| ========================================== | |
| A comprehensive Python script for managing BOA-based GPON devices. | |
| INSTALLATION | |
| ------------ | |
| pip install requests | |
| QUICK START | |
| ----------- | |
| from gpon_onu_manager import GPONManager | |
| # Connect to router | |
| router = GPONManager("192.168.3.1", "admin", "stdONU101") | |
| router.login() | |
| # Get device info | |
| info = router.get_device_info() | |
| print(info) | |
| # Create a PPPoE connection | |
| router.create_pppoe_route_connection( | |
| username="[email protected]", | |
| password="password123", | |
| vlan_id=100 | |
| ) | |
| # Always logout when done | |
| router.logout() | |
| SUPPORTED FEATURES | |
| ------------------ | |
| STATUS: | |
| - Device Info (model, firmware, serial number) | |
| - Device Resources (CPU, memory usage) | |
| - WAN Connection Status | |
| - PON/GPON Information | |
| - Connected Users/Devices | |
| - CATV Status | |
| - Wireless Performance Stats | |
| NETWORK: | |
| WAN Configuration: | |
| - Create/Delete/Modify WAN connections | |
| - PPPoE, DHCP, Static IP modes | |
| - Route and Bridge modes | |
| - VLAN tagging | |
| - IPv4/IPv6 dual-stack | |
| - NAT configuration | |
| - Enable/Disable connections | |
| LAN Configuration: | |
| - IPv4/IPv6 LAN settings | |
| - DHCP Server configuration | |
| - DHCP reservations (MAC-IP binding) | |
| - DNS settings | |
| WiFi Configuration: | |
| - 2.4GHz and 5GHz bands | |
| - SSID, password, security settings | |
| - Channel selection | |
| - Bandwidth configuration | |
| - Hidden SSID | |
| - Max clients | |
| - Band Steering | |
| Other Network: | |
| - MTU configuration | |
| - Client Management | |
| - Family Groups (Parental Controls) | |
| - CATV settings | |
| - Port Binding | |
| - TR-069 Remote Management | |
| - QoS (Quality of Service) | |
| - Time/NTP synchronization | |
| - Static Routing | |
| SECURITY: | |
| - Firewall (Low/High security levels) | |
| - DoS Attack Protection | |
| - URL Filtering | |
| - MAC Filtering (Whitelist/Blacklist) | |
| - IP/Port Filtering | |
| - Login Privileges | |
| APPLICATION: | |
| NAT/Port Forwarding: | |
| - DMZ Host configuration | |
| - Virtual Server (Port Forwarding) | |
| - ALG settings (FTP, SIP, H.323, etc.) | |
| - UPnP configuration | |
| VoIP: | |
| - SIP configuration | |
| - Codec settings | |
| - Advanced VoIP options | |
| Other: | |
| - DDNS (DynDNS, No-IP, TZO, etc.) | |
| - MQTT configuration | |
| - Multicast/IGMP | |
| - Samba file sharing | |
| - FTP server | |
| - USB storage management | |
| MANAGEMENT: | |
| - User/Password management | |
| - Configuration backup/restore | |
| - Factory reset | |
| - Scheduled reboot | |
| - Firmware upgrade | |
| - System logs | |
| - SNMP configuration | |
| DIAGNOSTICS: | |
| - Ping/Traceroute/DNS lookup | |
| - Loopback tests | |
| - Port mirroring | |
| - Self-diagnostics | |
| - AP scanning | |
| - Iperf speed tests | |
| EXAMPLES | |
| -------- | |
| # Example 1: Create different WAN connection types | |
| router = GPONManager("192.168.3.1", "admin", "password") | |
| router.login() | |
| # PPPoE Route mode (most common for ISPs) | |
| router.create_pppoe_route_connection( | |
| username="[email protected]", | |
| password="pass123", | |
| vlan_id=100, | |
| service_type=1, # INTERNET | |
| nat_enabled=True | |
| ) | |
| # DHCP Route mode | |
| router.create_dhcp_route_connection( | |
| vlan_id=200, | |
| nat_enabled=True | |
| ) | |
| # Static IP Route mode | |
| router.create_static_route_connection( | |
| ip_address="203.0.113.10", | |
| subnet_mask="255.255.255.0", | |
| gateway="203.0.113.1", | |
| dns1="8.8.8.8", | |
| dns2="8.8.4.4" | |
| ) | |
| # Bridge mode (for external router) | |
| router.create_bridge_connection(vlan_id=300) | |
| # Delete a connection | |
| router.delete_wan_connection("ppp_8_35_1") | |
| # Example 2: Configure WiFi | |
| # Change 2.4GHz WiFi settings | |
| router.set_wifi_24g_config({ | |
| 'ssid': 'MyNetwork', | |
| 'pskValue': 'SecurePassword123', | |
| 'wlanDisabled': '0', | |
| 'hiddenSSID': '0', | |
| 'channel': '6', | |
| 'band': '2' # 20/40MHz | |
| }) | |
| # Change 5GHz WiFi settings | |
| router.set_wifi_5g_config({ | |
| 'ssid': 'MyNetwork_5G', | |
| 'pskValue': 'SecurePassword123', | |
| 'channel': '36' | |
| }) | |
| # Example 3: Configure DMZ and Port Forwarding | |
| # Enable DMZ host | |
| router.set_dmz(enabled=True, ip="192.168.3.100") | |
| # Add port forwarding rule (Virtual Server) | |
| router.add_virtual_server( | |
| name="Web Server", | |
| external_port_start=8080, | |
| external_port_end=8080, | |
| internal_ip="192.168.3.100", | |
| internal_port=80, | |
| protocol=1 # TCP | |
| ) | |
| # Configure ALG settings | |
| router.set_alg_config({ | |
| 'ftp_algonoff': '1', | |
| 'sip_algonoff': '1', | |
| 'h323_algonoff': '1' | |
| }) | |
| # Example 4: Configure DDNS | |
| router.add_ddns( | |
| provider="dyndns", | |
| hostname="myhost.dyndns.org", | |
| username="myuser", | |
| password="mypass" | |
| ) | |
| # Example 5: Firewall and Security | |
| # Set firewall level | |
| router.set_firewall_config( | |
| level=2, # 0=Low, 2=High | |
| dos_protection=True | |
| ) | |
| # Add MAC filter | |
| router.set_mac_filter_mode("blacklist") | |
| router.add_mac_filter("AA:BB:CC:DD:EE:FF", "Blocked Device") | |
| # Example 6: Backup and Restore (with encryption/decryption) | |
| # Backup encrypted configuration | |
| config = router.backup_config() | |
| with open('router_backup.bin', 'wb') as f: | |
| f.write(config) | |
| # Backup and decrypt to readable XML | |
| config = router.backup_config(decrypt=True) | |
| with open('router_config.xml', 'wb') as f: | |
| f.write(config) | |
| # Restore from encrypted backup | |
| with open('router_backup.bin', 'rb') as f: | |
| router.restore_config(f.read(), encrypt=False) | |
| # Restore from plaintext XML (auto-encrypts) | |
| with open('router_config.xml', 'rb') as f: | |
| router.restore_config(f.read()) | |
| # Decrypt existing backup file | |
| with open('backup.bin', 'rb') as f: | |
| decrypted = GPONManager.decrypt_config(f.read()) | |
| with open('config.xml', 'wb') as f: | |
| f.write(decrypted) | |
| # Encrypt plaintext config for manual upload | |
| with open('config.xml', 'rb') as f: | |
| encrypted = GPONManager.encrypt_config(f.read()) | |
| with open('backup.bin', 'wb') as f: | |
| f.write(encrypted) | |
| # Example 7: Device Management | |
| # Reboot router | |
| router.reboot() | |
| # Factory reset | |
| router.factory_reset() | |
| # Set scheduled reboot | |
| router.set_scheduled_reboot( | |
| days=[1, 3, 5], # Mon, Wed, Fri | |
| hour=3, | |
| minute=0 | |
| ) | |
| SERVICE TYPES | |
| ------------- | |
| 0 = TR069_INTERNET | |
| 1 = INTERNET | |
| 2 = TR069 | |
| 3 = Other | |
| 4 = VOIP | |
| 5 = TR069_VOIP | |
| 6 = VOIP_INTERNET | |
| 7 = TR069_VOIP_INTERNET | |
| AUTHOR | |
| ------ | |
| Auto-generated for GPON ONU/ONT device management | |
| """ | |
| import requests | |
| import json | |
| import base64 | |
| import re | |
| import time | |
| from typing import Optional, Dict, Any, List | |
| from urllib.parse import urljoin | |
| class GPONManager: | |
| """ | |
| Main class for managing GPON ONU/ONT router. | |
| This class provides methods to configure all aspects of a BOA-based | |
| GPON router including WAN connections, WiFi, security, NAT, and more. | |
| Attributes: | |
| host (str): Router IP address | |
| username (str): Admin username | |
| password (str): Admin password | |
| session (requests.Session): HTTP session for persistent cookies | |
| logged_in (bool): Login status | |
| Example: | |
| >>> router = GPONManager("192.168.3.1", "admin", "password") | |
| >>> router.login() | |
| >>> info = router.get_device_info() | |
| >>> router.logout() | |
| """ | |
| # XOR encryption key for config backups | |
| CONFIG_XOR_KEY = b'tecomtec' | |
| def __init__(self, host: str = "192.168.3.1", username: str = "admin", password: str = "stdONU101"): | |
| """ | |
| Initialize GPONManager. | |
| Args: | |
| host: Router IP address (default: 192.168.3.1) | |
| username: Admin username (default: admin) | |
| password: Admin password | |
| """ | |
| self.host = host | |
| self.base_url = f"http://{host}" | |
| self.username = username | |
| self.password = password | |
| self.session = requests.Session() | |
| self.logged_in = False | |
| def _encode64(self, input_str: str) -> str: | |
| """Encode string to base64 (same as device JS)""" | |
| return base64.b64encode(input_str.encode()).decode() | |
| def _decode64(self, input_str: str) -> str: | |
| """Decode base64 string""" | |
| return base64.b64decode(input_str).decode() | |
| @staticmethod | |
| def decrypt_config(encrypted_data: bytes) -> bytes: | |
| """ | |
| Decrypt router configuration backup. | |
| The router uses XOR encryption with the key 'tecomtec'. | |
| Args: | |
| encrypted_data: Encrypted configuration data | |
| Returns: | |
| bytes: Decrypted XML configuration | |
| Example: | |
| >>> with open('backup.bin', 'rb') as f: | |
| ... encrypted = f.read() | |
| >>> decrypted = GPONManager.decrypt_config(encrypted) | |
| >>> print(decrypted.decode('utf-8')[:100]) | |
| """ | |
| key = GPONManager.CONFIG_XOR_KEY | |
| return bytes([encrypted_data[i] ^ key[i % len(key)] for i in range(len(encrypted_data))]) | |
| @staticmethod | |
| def encrypt_config(plaintext_data: bytes) -> bytes: | |
| """ | |
| Encrypt configuration data for upload to router. | |
| The router uses XOR encryption with the key 'tecomtec'. | |
| XOR encryption is symmetric, so this uses the same operation as decrypt. | |
| Args: | |
| plaintext_data: Plaintext XML configuration data | |
| Returns: | |
| bytes: Encrypted configuration data | |
| Example: | |
| >>> with open('config.xml', 'rb') as f: | |
| ... plaintext = f.read() | |
| >>> encrypted = GPONManager.encrypt_config(plaintext) | |
| >>> with open('backup.bin', 'wb') as f: | |
| ... f.write(encrypted) | |
| """ | |
| # XOR is symmetric - same operation for encrypt and decrypt | |
| return GPONManager.decrypt_config(plaintext_data) | |
| def login(self) -> bool: | |
| """ | |
| Login to the router. | |
| Returns: | |
| bool: True if login successful, False otherwise | |
| Example: | |
| >>> router = GPONManager("192.168.3.1", "admin", "pass") | |
| >>> if router.login(): | |
| ... print("Connected!") | |
| """ | |
| url = f"{self.base_url}/boaform/admin/formLogin" | |
| data = { | |
| "username": self.username, | |
| "password": self.password | |
| } | |
| try: | |
| response = self.session.post(url, data=data, allow_redirects=False) | |
| if "LOGINED" in response.text or response.status_code == 302: | |
| self.logged_in = True | |
| print(f"Successfully logged in to {self.host}") | |
| return True | |
| else: | |
| print(f"Login failed: {response.text}") | |
| return False | |
| except Exception as e: | |
| print(f"Login error: {e}") | |
| return False | |
| def logout(self) -> bool: | |
| """ | |
| Logout from the router. | |
| Returns: | |
| bool: True if logout successful | |
| """ | |
| url = f"{self.base_url}/boaform/admin/formLogout" | |
| try: | |
| self.session.post(url) | |
| self.logged_in = False | |
| print("Logged out successfully") | |
| return True | |
| except Exception as e: | |
| print(f"Logout error: {e}") | |
| return False | |
| def _get_asp_data(self, endpoint: str) -> str: | |
| """Get ASP data from router""" | |
| url = f"{self.base_url}/boaform/getASPdata/{endpoint}" | |
| try: | |
| response = self.session.get(url) | |
| return response.text | |
| except Exception as e: | |
| print(f"Error getting ASP data: {e}") | |
| return "" | |
| def _post_form(self, endpoint: str, data: dict) -> str: | |
| """Post form data to router""" | |
| url = f"{self.base_url}/boaform/getASPdata/{endpoint}" | |
| try: | |
| response = self.session.post(url, data=data) | |
| return response.text | |
| except Exception as e: | |
| print(f"Error posting form: {e}") | |
| return "" | |
| def _post_admin_form(self, endpoint: str, data: dict) -> str: | |
| """Post form data to admin endpoint""" | |
| url = f"{self.base_url}/boaform/admin/{endpoint}" | |
| try: | |
| response = self.session.post(url, data=data) | |
| return response.text | |
| except Exception as e: | |
| print(f"Error posting admin form: {e}") | |
| return "" | |
| def _parse_key_value(self, text: str) -> dict: | |
| """Parse key=value formatted response (handles both newline and & separators)""" | |
| result = {} | |
| text = text.strip() | |
| # First try to parse as standard key=value lines | |
| for line in text.split('\n'): | |
| line = line.strip() | |
| if not line: | |
| continue | |
| # Handle format: "label=key1=val1&key2=val2" (e.g., basic_info=devId=xxx&...) | |
| # Skip the prefix label if the value contains & and = | |
| if '&' in line and line.count('=') > 1: | |
| # Find first = and check if rest has & | |
| first_eq = line.find('=') | |
| rest = line[first_eq + 1:] | |
| if '&' in rest: | |
| line = rest | |
| # Check if line contains & separated values (like devId=xxx&devModel=xxx) | |
| if '&' in line and '=' in line: | |
| # Parse & separated key=value pairs | |
| for pair in line.split('&'): | |
| pair = pair.strip() | |
| if '=' in pair: | |
| key, value = pair.split('=', 1) | |
| result[key.strip()] = value.strip() | |
| elif '=' in line: | |
| key, value = line.split('=', 1) | |
| result[key.strip()] = value.strip() | |
| return result | |
| def _parse_json_response(self, text: str) -> dict: | |
| """Parse JSON response""" | |
| try: | |
| return json.loads(text) | |
| except: | |
| return self._parse_key_value(text) | |
| # ==================== STATUS SECTION ==================== | |
| def get_device_info(self) -> dict: | |
| """ | |
| Get basic device information. | |
| Returns: | |
| dict: Device model, ID, hardware version, firmware version, serial number | |
| Example: | |
| >>> info = router.get_device_info() | |
| >>> print(f"Model: {info.get('devModel')}") | |
| >>> print(f"Firmware: {info.get('stVer')}") | |
| """ | |
| data = self._get_asp_data("dev_basic_info") | |
| return self._parse_key_value(data) | |
| def get_device_resources(self) -> dict: | |
| """ | |
| Get CPU and memory usage. | |
| Returns: | |
| dict: CPU usage (cpUsage), Memory usage (memUsage) | |
| """ | |
| data = self._get_asp_data("dev_resource_info") | |
| return self._parse_key_value(data) | |
| def get_wan_status(self) -> list: | |
| """Get WAN connection status""" | |
| data = self._get_asp_data("listWanConfig") | |
| return self._parse_array_response(data) | |
| def get_pon_info(self) -> dict: | |
| """Get PON (GPON) information including optical power levels""" | |
| data = self._get_asp_data("ponGetStatus") | |
| return self._parse_key_value(data) | |
| def get_connected_users(self) -> list: | |
| """ | |
| Get list of connected users/devices (DHCP clients). | |
| Returns: | |
| list: Connected devices with IP, MAC, hostname | |
| """ | |
| data = self._get_asp_data("E8BDhcpClientList") | |
| return self._parse_array_response(data) | |
| def get_wlan_clients(self) -> list: | |
| """Get connected wireless clients""" | |
| data = self._get_asp_data("wirelessClientList") | |
| return self._parse_array_response(data) | |
| def _parse_array_response(self, text: str) -> list: | |
| """Parse array-style response like listWanv4[0]=key=val&key2=val2""" | |
| result = [] | |
| text = text.strip() | |
| for line in text.split('\n'): | |
| line = line.strip() | |
| if not line: | |
| continue | |
| # Handle format: listWanv4[0]=servName=xxx&ipAddr=xxx | |
| if '[' in line and ']=' in line: | |
| # Extract the value part after ]= | |
| value_part = line.split(']=', 1)[1] if ']=' in line else line | |
| # Parse & separated key=value pairs | |
| item = {} | |
| for pair in value_part.split('&'): | |
| pair = pair.strip() | |
| if '=' in pair: | |
| key, value = pair.split('=', 1) | |
| item[key.strip()] = value.strip() | |
| if item: | |
| result.append(item) | |
| return result | |
| def get_catv_status(self) -> dict: | |
| """Get CATV status""" | |
| data = self._get_asp_data("catv_status") | |
| return self._parse_key_value(data) | |
| def get_wireless_performance(self) -> dict: | |
| """Get wireless performance statistics""" | |
| data = self._get_asp_data("wireless_performance") | |
| return self._parse_json_response(data) | |
| # ==================== NETWORK - WAN SECTION ==================== | |
| def get_wan_config(self) -> dict: | |
| """Get WAN configuration""" | |
| data = self._get_asp_data("formEthernet_init") | |
| return self._parse_json_response(data) | |
| def get_wan_connections_list(self) -> list: | |
| """ | |
| Get list of all WAN connections with full configuration. | |
| Returns: | |
| list: All configured WAN connections | |
| """ | |
| data = self._get_asp_data("initPageEth") | |
| result = [] | |
| for line in data.strip().split('\n'): | |
| line = line.strip() | |
| if not line or '=' not in line: | |
| continue | |
| # Skip metadata lines | |
| if line.startswith('vlan_map=') or line.startswith('poe_proxy=') or line.startswith('pppnumleft='): | |
| continue | |
| # Parse connection: name=key1=val1&key2=val2&... | |
| name, rest = line.split('=', 1) | |
| conn = {'name': name} | |
| for pair in rest.split('&'): | |
| if '=' in pair: | |
| key, value = pair.split('=', 1) | |
| conn[key.strip()] = value.strip() | |
| if len(conn) > 1: # Has more than just name | |
| result.append(conn) | |
| return result | |
| def set_wan_config(self, config: dict) -> str: | |
| """ | |
| Set WAN configuration (low-level method). | |
| Args: | |
| config: Dictionary with WAN parameters | |
| See create_pppoe_route_connection() for easier usage. | |
| """ | |
| if 'pppUsername' in config: | |
| config['encodePppUserName'] = self._encode64(config.get('pppUsername', '')) | |
| if 'pppPassword' in config: | |
| config['encodePppPassword'] = self._encode64(config.get('pppPassword', '')) | |
| config['action'] = 'sv' | |
| return self._post_form("formEthernet", config) | |
| def delete_wan_connection(self, connection_name: str) -> str: | |
| """ | |
| Delete a WAN connection. | |
| Args: | |
| connection_name: Name of connection to delete (e.g., "ppp_8_35_1") | |
| Example: | |
| >>> router.delete_wan_connection("ppp_8_35_1") | |
| """ | |
| data = { | |
| 'lkname': connection_name, | |
| 'action': 'rm' | |
| } | |
| return self._post_form("formEthernet", data) | |
| def enable_wan_connection(self, connection_name: str) -> str: | |
| """Enable a WAN connection""" | |
| return self._post_form("formWanEnable", { | |
| 'lkname': connection_name, | |
| 'enable': '1' | |
| }) | |
| def disable_wan_connection(self, connection_name: str) -> str: | |
| """Disable a WAN connection""" | |
| return self._post_form("formWanEnable", { | |
| 'lkname': connection_name, | |
| 'enable': '0' | |
| }) | |
| def set_nat_type(self, nat_type: int) -> str: | |
| """ | |
| Set NAT type. | |
| Args: | |
| nat_type: 0=NAT4 (Symmetric), 1=NAT1 (Full Cone), 2=NAT2 | |
| """ | |
| return self._post_form("formNat", {'nat_type': nat_type}) | |
| # ==================== WAN CONNECTION HELPER METHODS ==================== | |
| def create_pppoe_route_connection(self, username: str, password: str, | |
| vlan_id: int = None, service_type: int = 1, | |
| nat_enabled: bool = True, mtu: int = 1492, | |
| connection_name: str = "new") -> str: | |
| """ | |
| Create a PPPoE Route mode connection. | |
| Args: | |
| username: PPPoE username from ISP | |
| password: PPPoE password from ISP | |
| vlan_id: VLAN ID (None to disable VLAN tagging) | |
| service_type: 0=TR069_INTERNET, 1=INTERNET, 2=TR069, 3=Other, 4=VOIP | |
| nat_enabled: Enable NAT (default True) | |
| mtu: MTU value (default 1492 for PPPoE) | |
| connection_name: 'new' for new connection or existing name to modify | |
| Returns: | |
| str: Response from router | |
| Example: | |
| >>> router.create_pppoe_route_connection( | |
| ... username="[email protected]", | |
| ... password="password123", | |
| ... vlan_id=100 | |
| ... ) | |
| """ | |
| config = { | |
| 'lkname': connection_name, | |
| 'lkmode': '1', # Route mode | |
| 'IpProtocolType': '3', # IPv4/IPv6 | |
| 'ipmode': '2', # PPPoE | |
| 'cmode': '2', | |
| 'ipDhcp': '0', | |
| 'napt': '1' if nat_enabled else '0', | |
| 'vlan': '1' if vlan_id else '0', | |
| 'vid': str(vlan_id) if vlan_id else '0', | |
| 'vprio': '0', | |
| 'mtu': str(mtu), | |
| 'pppUsername': username, | |
| 'pppPassword': password, | |
| 'pppServiceName': '', | |
| 'pppCtype': '0', # Continuous | |
| 'applicationtype': str(service_type), | |
| 'dnsMode': '1', | |
| 'disableLanDhcp': '0', | |
| 'itfGroup': '0', | |
| 'AddrMode': '32', # Auto for IPv6 | |
| 'iapd': '1', | |
| 'dslite_enable': '0', | |
| 'dnsv6Mode': '1', | |
| 'iana': '0', | |
| 'action': 'sv' | |
| } | |
| config['encodePppUserName'] = self._encode64(username) | |
| config['encodePppPassword'] = self._encode64(password) | |
| return self._post_form("formEthernet", config) | |
| def create_dhcp_route_connection(self, vlan_id: int = None, service_type: int = 1, | |
| nat_enabled: bool = True, mtu: int = 1500, | |
| connection_name: str = "new") -> str: | |
| """ | |
| Create a DHCP Route mode connection. | |
| Args: | |
| vlan_id: VLAN ID (None to disable VLAN tagging) | |
| service_type: Service type (see SERVICE TYPES in docstring) | |
| nat_enabled: Enable NAT | |
| mtu: MTU value | |
| connection_name: 'new' for new connection or existing name to modify | |
| Example: | |
| >>> router.create_dhcp_route_connection(vlan_id=200) | |
| """ | |
| config = { | |
| 'lkname': connection_name, | |
| 'lkmode': '1', # Route mode | |
| 'IpProtocolType': '3', # IPv4/IPv6 | |
| 'ipmode': '0', # DHCP | |
| 'cmode': '1', | |
| 'ipDhcp': '1', | |
| 'napt': '1' if nat_enabled else '0', | |
| 'vlan': '1' if vlan_id else '0', | |
| 'vid': str(vlan_id) if vlan_id else '0', | |
| 'vprio': '0', | |
| 'mtu': str(mtu), | |
| 'applicationtype': str(service_type), | |
| 'dnsMode': '1', | |
| 'disableLanDhcp': '0', | |
| 'itfGroup': '0', | |
| 'AddrMode': '32', | |
| 'iapd': '1', | |
| 'dslite_enable': '0', | |
| 'dnsv6Mode': '1', | |
| 'iana': '0', | |
| 'action': 'sv' | |
| } | |
| return self._post_form("formEthernet", config) | |
| def create_static_route_connection(self, ip_address: str, subnet_mask: str, | |
| gateway: str, dns1: str = "", dns2: str = "", | |
| vlan_id: int = None, service_type: int = 1, | |
| nat_enabled: bool = True, mtu: int = 1500, | |
| connection_name: str = "new") -> str: | |
| """ | |
| Create a Static IP Route mode connection. | |
| Args: | |
| ip_address: Static IP address | |
| subnet_mask: Subnet mask (e.g., "255.255.255.0") | |
| gateway: Default gateway | |
| dns1: Primary DNS server | |
| dns2: Secondary DNS server | |
| vlan_id: VLAN ID (None to disable) | |
| service_type: Service type | |
| nat_enabled: Enable NAT | |
| mtu: MTU value | |
| connection_name: 'new' for new connection | |
| Example: | |
| >>> router.create_static_route_connection( | |
| ... ip_address="203.0.113.10", | |
| ... subnet_mask="255.255.255.0", | |
| ... gateway="203.0.113.1", | |
| ... dns1="8.8.8.8" | |
| ... ) | |
| """ | |
| config = { | |
| 'lkname': connection_name, | |
| 'lkmode': '1', # Route mode | |
| 'IpProtocolType': '1', # IPv4 only | |
| 'ipmode': '1', # Static | |
| 'cmode': '1', | |
| 'ipDhcp': '0', | |
| 'napt': '1' if nat_enabled else '0', | |
| 'vlan': '1' if vlan_id else '0', | |
| 'vid': str(vlan_id) if vlan_id else '0', | |
| 'vprio': '0', | |
| 'mtu': str(mtu), | |
| 'ipAddr': ip_address, | |
| 'netMask': subnet_mask, | |
| 'remoteIpAddr': gateway, | |
| 'dnsMode': '0', | |
| 'v4dns1': dns1, | |
| 'v4dns2': dns2, | |
| 'applicationtype': str(service_type), | |
| 'disableLanDhcp': '0', | |
| 'itfGroup': '0', | |
| 'action': 'sv' | |
| } | |
| return self._post_form("formEthernet", config) | |
| def create_bridge_connection(self, vlan_id: int = None, service_type: int = 1, | |
| connection_name: str = "new") -> str: | |
| """ | |
| Create a Bridge mode connection. | |
| Bridge mode passes traffic directly to a downstream router. | |
| Args: | |
| vlan_id: VLAN ID (None to disable) | |
| service_type: 1=INTERNET, 3=Other | |
| connection_name: 'new' for new connection | |
| Example: | |
| >>> router.create_bridge_connection(vlan_id=100) | |
| """ | |
| config = { | |
| 'lkname': connection_name, | |
| 'lkmode': '0', # Bridge mode | |
| 'IpProtocolType': '3', | |
| 'ipmode': '0', | |
| 'cmode': '0', | |
| 'ipDhcp': '0', | |
| 'napt': '0', | |
| 'vlan': '1' if vlan_id else '0', | |
| 'vid': str(vlan_id) if vlan_id else '0', | |
| 'vprio': '0', | |
| 'mtu': '1500', | |
| 'applicationtype': str(service_type), | |
| 'disableLanDhcp': '1', | |
| 'itfGroup': '0', | |
| 'action': 'sv' | |
| } | |
| return self._post_form("formEthernet", config) | |
| def modify_wan_connection(self, connection_name: str, **kwargs) -> str: | |
| """ | |
| Modify an existing WAN connection. | |
| Args: | |
| connection_name: Name of connection to modify | |
| **kwargs: Parameters to change | |
| Example: | |
| >>> router.modify_wan_connection('ppp_8_35_1', mtu='1400', napt='0') | |
| """ | |
| config = {'lkname': connection_name, 'action': 'sv'} | |
| config.update(kwargs) | |
| if 'pppUsername' in kwargs: | |
| config['encodePppUserName'] = self._encode64(kwargs['pppUsername']) | |
| if 'pppPassword' in kwargs: | |
| config['encodePppPassword'] = self._encode64(kwargs['pppPassword']) | |
| return self._post_form("formEthernet", config) | |
| # ==================== NETWORK - LAN SECTION ==================== | |
| def get_lan_config(self) -> dict: | |
| """Get LAN configuration""" | |
| data = self._get_asp_data("init_dhcpmain_page") | |
| return self._parse_key_value(data) | |
| def set_lan_ipv4_config(self, config: dict) -> str: | |
| """ | |
| Set LAN IPv4 configuration. | |
| Args: | |
| config: Dictionary with keys: | |
| - uIp: LAN IP address | |
| - uMask: Subnet mask | |
| - uDhcpType: 0=disabled, 1=enabled, 2=relay | |
| - dhcpRangeStart: DHCP start IP | |
| - dhcpRangeEnd: DHCP end IP | |
| - ulTime: Lease time (60/3600/86400/604800) | |
| - ipv4landnsmode: 0=Proxy, 1=Static, 2=ISP | |
| - Ipv4Dns1, Ipv4Dns2: DNS servers | |
| Example: | |
| >>> router.set_lan_ipv4_config({ | |
| ... 'uIp': '192.168.3.1', | |
| ... 'uMask': '255.255.255.0', | |
| ... 'uDhcpType': '1', | |
| ... 'dhcpRangeStart': '192.168.3.100', | |
| ... 'dhcpRangeEnd': '192.168.3.200', | |
| ... 'ulTime': '86400' | |
| ... }) | |
| """ | |
| return self._post_form("formDhcpd", config) | |
| def get_lan_ipv6_config(self) -> dict: | |
| """Get LAN IPv6 configuration""" | |
| data = self._get_asp_data("IPv6_LAN_Page_init") | |
| return self._parse_json_response(data) | |
| def set_lan_ipv6_config(self, config: dict) -> str: | |
| """Set LAN IPv6 configuration""" | |
| return self._post_form("formDhcpServerV6", config) | |
| def get_dhcp_reservations(self) -> list: | |
| """Get DHCP MAC-IP reservations""" | |
| data = self._get_asp_data("showMACBaseTable") | |
| return self._parse_json_response(data) | |
| def add_dhcp_reservation(self, mac: str, ip: str) -> str: | |
| """ | |
| Add DHCP reservation (MAC-IP binding). | |
| Args: | |
| mac: MAC address (format: 00:11:22:33:44:55) | |
| ip: IP address to assign | |
| Example: | |
| >>> router.add_dhcp_reservation("00:11:22:33:44:55", "192.168.3.50") | |
| """ | |
| return self._post_form("formMacAddrBase", { | |
| 'macAddr_Dhcp_a': mac, | |
| 'ipAddr_Dhcp_a': ip, | |
| 'action': 'sv' | |
| }) | |
| def delete_dhcp_reservation(self, mac: str, ip: str) -> str: | |
| """Delete DHCP reservation""" | |
| return self._post_form("formMacAddrBase", { | |
| 'macAddr_Dhcp_d': mac, | |
| 'ipAddr_Dhcp_d': ip | |
| }) | |
| # ==================== NETWORK - WIFI SECTION ==================== | |
| def get_wifi_24g_config(self) -> dict: | |
| """Get 2.4GHz WiFi configuration""" | |
| data = self._get_asp_data("wlan_init_24g") | |
| return self._parse_json_response(data) | |
| def set_wifi_24g_config(self, config: dict) -> str: | |
| """ | |
| Set 2.4GHz WiFi configuration. | |
| Args: | |
| config: Dictionary with keys: | |
| - ssid: Network name | |
| - wlanDisabled: '0'=enabled, '1'=disabled | |
| - hiddenSSID: '0'=visible, '1'=hidden | |
| - channel: 1-11 (0=auto) | |
| - band: '0'=20MHz, '1'=40MHz, '2'=20/40MHz | |
| - wpaAuth: Security type | |
| - pskValue: WiFi password (8-63 chars) | |
| - maxStaNum: Max connected clients | |
| Example: | |
| >>> router.set_wifi_24g_config({ | |
| ... 'ssid': 'MyNetwork', | |
| ... 'pskValue': 'MyPassword123', | |
| ... 'channel': '6' | |
| ... }) | |
| """ | |
| return self._post_form("formWlanSetup_24g", config) | |
| def get_wifi_5g_config(self) -> dict: | |
| """Get 5GHz WiFi configuration""" | |
| data = self._get_asp_data("wlan_init_5g") | |
| return self._parse_json_response(data) | |
| def set_wifi_5g_config(self, config: dict) -> str: | |
| """Set 5GHz WiFi configuration (same parameters as 2.4GHz)""" | |
| return self._post_form("formWlanSetup_5g", config) | |
| # ==================== NETWORK - OTHER ==================== | |
| def get_mtu_config(self) -> dict: | |
| """Get MTU configuration""" | |
| data = self._get_asp_data("get_system_mtu") | |
| return self._parse_key_value(data) | |
| def set_mtu(self, mtu: int) -> str: | |
| """Set system MTU value (typically 1500)""" | |
| return self._post_form("formMTU", {'mtu': mtu}) | |
| def get_band_steering_config(self) -> dict: | |
| """Get band steering configuration""" | |
| data = self._get_asp_data("band_steering_config") | |
| return self._parse_key_value(data) | |
| def set_band_steering(self, enabled: bool, threshold: int = -70) -> str: | |
| """ | |
| Enable/disable band steering. | |
| Band steering automatically moves clients between 2.4GHz and 5GHz. | |
| Args: | |
| enabled: Enable band steering | |
| threshold: RSSI threshold in dBm | |
| """ | |
| return self._post_form("formBandSteering", { | |
| 'enable': 1 if enabled else 0, | |
| 'rssiThreshold': threshold | |
| }) | |
| # ==================== NETWORK - TR-069 ==================== | |
| def get_tr069_config(self) -> dict: | |
| """Get TR-069 remote management configuration""" | |
| data = self._get_asp_data("Network_TR069_Page_init") | |
| return self._parse_key_value(data) | |
| def set_tr069_config(self, config: dict) -> str: | |
| """ | |
| Set TR-069 configuration. | |
| Args: | |
| config: Dictionary with keys: | |
| - enable: Enable TR-069 | |
| - acsUrl: ACS server URL | |
| - acsUser: ACS username | |
| - acsPass: ACS password | |
| - informEnable: Enable periodic inform | |
| - informInterval: Inform interval in seconds | |
| """ | |
| return self._post_form("formTR069", config) | |
| # ==================== NETWORK - QoS ==================== | |
| def get_qos_config(self) -> dict: | |
| """Get QoS configuration including queue policy and traffic control""" | |
| result = {} | |
| # Get queue policy (main QoS settings) | |
| queue_data = self._get_asp_data("initQueuePolicy") | |
| if queue_data: | |
| for line in queue_data.strip().split('\n'): | |
| if '=' in line: | |
| key, value = line.split('=', 1) | |
| if key.startswith('queues'): | |
| # Parse queue config like: qindex=0&prio=1&weight=0&enable=1 | |
| queue_cfg = {} | |
| for pair in value.split('&'): | |
| if '=' in pair: | |
| k, v = pair.split('=', 1) | |
| queue_cfg[k] = v | |
| result[key] = queue_cfg | |
| else: | |
| result[key] = value | |
| # Get traffic control settings | |
| traffic_data = self._get_asp_data("initTraffictlPage") | |
| if traffic_data: | |
| traffic_cfg = self._parse_key_value(traffic_data) | |
| result['traffic_control'] = traffic_cfg | |
| # Get QoS rules | |
| rules_data = self._get_asp_data("initQosRulePage") | |
| if rules_data: | |
| result['rules'] = self._parse_key_value(rules_data) | |
| return result | |
| def set_qos_enabled(self, enabled: bool) -> str: | |
| """Enable/disable QoS""" | |
| return self._post_form("formQoS", {'enable': 1 if enabled else 0}) | |
| # ==================== NETWORK - TIME ==================== | |
| def get_time_config(self) -> dict: | |
| """Get time/NTP configuration""" | |
| data = self._get_asp_data("Network_Time_Page_init") | |
| return self._parse_key_value(data) | |
| def set_time_config(self, config: dict) -> str: | |
| """ | |
| Set time configuration. | |
| Args: | |
| config: Dictionary with keys: | |
| - ntpEnable: Enable NTP | |
| - ntpServer: NTP server address | |
| - timezone: Timezone offset | |
| """ | |
| return self._post_form("formTime", config) | |
| # ==================== NETWORK - ROUTING ==================== | |
| def get_route_config(self) -> dict: | |
| """Get static route and RIP configuration""" | |
| result = {} | |
| # Get RIP status | |
| rip_data = self._get_asp_data("showRipIf") | |
| result['rip'] = self._parse_key_value(rip_data) | |
| # Get static routes | |
| static_data = self._get_asp_data("showStaticRoute") | |
| result['static_routes'] = self._parse_key_value(static_data) | |
| # Get route list (active routes) | |
| route_data = self._get_asp_data("routeList") | |
| routes = [] | |
| for line in route_data.strip().split('\n'): | |
| if '=' in line: | |
| parts = line.split('=', 1) | |
| if len(parts) == 2: | |
| values = parts[1].split('&') | |
| if len(values) >= 5: | |
| routes.append({ | |
| 'destination': values[0], | |
| 'netmask': values[1], | |
| 'gateway': values[2], | |
| 'interface': values[3], | |
| 'metric': values[4] | |
| }) | |
| result['routes'] = routes | |
| return result | |
| def add_static_route(self, dest: str, mask: str, gateway: str, interface: str = "") -> str: | |
| """ | |
| Add static route. | |
| Args: | |
| dest: Destination network | |
| mask: Network mask | |
| gateway: Gateway IP | |
| interface: Optional interface name | |
| """ | |
| return self._post_form("formRoute", { | |
| 'destNet': dest, | |
| 'netMask': mask, | |
| 'gateway': gateway, | |
| 'interface': interface, | |
| 'action': 'add' | |
| }) | |
| def delete_static_route(self, dest: str, mask: str, gateway: str) -> str: | |
| """Delete static route""" | |
| return self._post_form("formRoute", { | |
| 'destNet': dest, | |
| 'netMask': mask, | |
| 'gateway': gateway, | |
| 'action': 'del' | |
| }) | |
| # ==================== SECURITY - FIREWALL ==================== | |
| def get_firewall_config(self) -> dict: | |
| """Get firewall configuration""" | |
| data = self._get_asp_data("initPageFirewall") | |
| return self._parse_key_value(data) | |
| def set_firewall_config(self, level: int = 0, dos_protection: bool = True) -> str: | |
| """ | |
| Set firewall configuration. | |
| Args: | |
| level: 0=Low, 2=High | |
| dos_protection: Enable DoS attack protection | |
| Example: | |
| >>> router.set_firewall_config(level=2, dos_protection=True) | |
| """ | |
| return self._post_form("formFirewall", { | |
| 'FirewallLevel': str(level), | |
| 'DosEnable': '1' if dos_protection else '0' | |
| }) | |
| # ==================== SECURITY - URL FILTER ==================== | |
| def get_url_filter_config(self) -> dict: | |
| """Get URL filter configuration""" | |
| data = self._get_asp_data("url_filter_config") | |
| return self._parse_json_response(data) | |
| def set_url_filter_enabled(self, enabled: bool) -> str: | |
| """Enable/disable URL filtering""" | |
| return self._post_form("formUrlFilter", {'enable': 1 if enabled else 0}) | |
| def add_url_filter(self, url: str, action: str = "deny") -> str: | |
| """Add URL to filter list""" | |
| return self._post_form("formUrlFilter", { | |
| 'url': url, | |
| 'action': action, | |
| 'cmd': 'add' | |
| }) | |
| def delete_url_filter(self, url: str) -> str: | |
| """Remove URL from filter list""" | |
| return self._post_form("formUrlFilter", { | |
| 'url': url, | |
| 'cmd': 'del' | |
| }) | |
| # ==================== SECURITY - MAC FILTER ==================== | |
| def get_mac_filter_config(self) -> dict: | |
| """Get MAC filter configuration""" | |
| data = self._get_asp_data("mac_filter_config") | |
| return self._parse_json_response(data) | |
| def set_mac_filter_mode(self, mode: str) -> str: | |
| """ | |
| Set MAC filter mode. | |
| Args: | |
| mode: 'disabled', 'whitelist', or 'blacklist' | |
| """ | |
| return self._post_form("formMacFilter", {'mode': mode}) | |
| def add_mac_filter(self, mac: str, description: str = "") -> str: | |
| """Add MAC to filter list""" | |
| return self._post_form("formMacFilter", { | |
| 'mac': mac, | |
| 'desc': description, | |
| 'action': 'add' | |
| }) | |
| def delete_mac_filter(self, mac: str) -> str: | |
| """Remove MAC from filter list""" | |
| return self._post_form("formMacFilter", { | |
| 'mac': mac, | |
| 'action': 'del' | |
| }) | |
| # ==================== SECURITY - PORT FILTER ==================== | |
| def get_port_filter_config(self) -> dict: | |
| """Get IP/Port filter configuration""" | |
| data = self._get_asp_data("port_filter_config") | |
| return self._parse_json_response(data) | |
| def add_port_filter(self, config: dict) -> str: | |
| """ | |
| Add port filter rule. | |
| Args: | |
| config: Dictionary with keys: | |
| - protocol: 'tcp', 'udp', or 'both' | |
| - srcIP, srcMask: Source IP/mask | |
| - srcPortStart, srcPortEnd: Source port range | |
| - dstIP, dstMask: Destination IP/mask | |
| - dstPortStart, dstPortEnd: Destination port range | |
| - action: 'accept' or 'deny' | |
| """ | |
| config['cmd'] = 'add' | |
| return self._post_form("formPortFilter", config) | |
| # ==================== APPLICATION - NAT/DMZ/PORT FORWARDING ==================== | |
| def get_alg_config(self) -> dict: | |
| """ | |
| Get ALG (Application Layer Gateway) configuration. | |
| Returns: | |
| dict: ALG settings for FTP, SIP, H.323, etc. | |
| """ | |
| data = self._get_asp_data("GetAlgTypes") | |
| return self._parse_key_value(data) | |
| def set_alg_config(self, config: dict) -> str: | |
| """ | |
| Set ALG configuration. | |
| Args: | |
| config: Dictionary with keys (all '0' or '1'): | |
| - ftp_algonoff: FTP ALG | |
| - tftp_algonoff: TFTP ALG | |
| - h323_algonoff: H.323 ALG | |
| - rtsp_algonoff: RTSP ALG | |
| - l2tp_algonoff: L2TP ALG | |
| - ipsec_algonoff: IPSec ALG | |
| - sip_algonoff: SIP ALG | |
| - pptp_algonoff: PPTP ALG | |
| Example: | |
| >>> router.set_alg_config({ | |
| ... 'ftp_algonoff': '1', | |
| ... 'sip_algonoff': '1' | |
| ... }) | |
| """ | |
| config['apply'] = '1' | |
| return self._post_form("formALGOnOff", config) | |
| def get_dmz_config(self) -> dict: | |
| """ | |
| Get DMZ configuration. | |
| Returns: | |
| dict: DMZ enabled status and IP | |
| """ | |
| data = self._get_asp_data("GetAlgTypes") | |
| return self._parse_key_value(data) | |
| def set_dmz(self, enabled: bool, ip: str = "") -> str: | |
| """ | |
| Set DMZ host. | |
| DMZ exposes all ports of a device to the internet. | |
| Args: | |
| enabled: Enable DMZ | |
| ip: DMZ host IP address | |
| Example: | |
| >>> router.set_dmz(True, "192.168.3.100") | |
| """ | |
| return self._post_form("formDMZ", { | |
| 'dmzcap': '1' if enabled else '0', | |
| 'ip': ip | |
| }) | |
| def get_virtual_servers(self) -> list: | |
| """ | |
| Get virtual server (port forwarding) list. | |
| Returns: | |
| list: Configured port forwarding rules | |
| """ | |
| data = self._get_asp_data("virtualSvrList") | |
| return self._parse_key_value(data) | |
| def add_virtual_server(self, name: str, external_port_start: int, external_port_end: int, | |
| internal_ip: str, internal_port: int, protocol: int = 1, | |
| external_ip: str = "", prefix_len: int = 0) -> str: | |
| """ | |
| Add virtual server (port forwarding rule). | |
| Args: | |
| name: Rule name/description | |
| external_port_start: External start port | |
| external_port_end: External end port | |
| internal_ip: Internal server IP | |
| internal_port: Internal port | |
| protocol: 1=TCP, 2=UDP, 4=TCP/UDP | |
| external_ip: External IP (optional, leave empty for all) | |
| prefix_len: IP prefix length (0-32) | |
| Example: | |
| >>> # Forward port 8080 to internal web server | |
| >>> router.add_virtual_server( | |
| ... name="Web Server", | |
| ... external_port_start=8080, | |
| ... external_port_end=8080, | |
| ... internal_ip="192.168.3.100", | |
| ... internal_port=80, | |
| ... protocol=1 | |
| ... ) | |
| >>> # Forward port range for gaming | |
| >>> router.add_virtual_server( | |
| ... name="Gaming", | |
| ... external_port_start=27000, | |
| ... external_port_end=27050, | |
| ... internal_ip="192.168.3.50", | |
| ... internal_port=27000, | |
| ... protocol=4 # TCP+UDP | |
| ... ) | |
| """ | |
| return self._post_admin_form("formVrtsrv", { | |
| 'cusSrvName': name, | |
| 'remotehost': external_ip, | |
| 'IpPrefixLen': str(prefix_len), | |
| 'wanStartPort': str(external_port_start), | |
| 'wanEndPort': str(external_port_end), | |
| 'protoType': str(protocol), | |
| 'serverIp': internal_ip, | |
| 'lanPort': str(internal_port), | |
| 'vrtenable': '1', | |
| 'action': 'add' | |
| }) | |
| def delete_virtual_server(self, rule_id: str) -> str: | |
| """Delete virtual server rule""" | |
| return self._post_admin_form("formVrtsrv", { | |
| rule_id: '1', | |
| 'action': 'delete' | |
| }) | |
| # ==================== APPLICATION - UPNP ==================== | |
| def get_upnp_config(self) -> dict: | |
| """Get UPnP configuration""" | |
| data = self._get_asp_data("get_UPnP_Page_data") | |
| return self._parse_key_value(data) | |
| def set_upnp(self, enabled: bool, wan_interface: str = "") -> str: | |
| """ | |
| Enable/disable UPnP. | |
| Args: | |
| enabled: Enable UPnP | |
| wan_interface: WAN interface for UPnP | |
| Example: | |
| >>> router.set_upnp(True) | |
| """ | |
| return self._post_form("formUpnp", { | |
| 'daemon': '1' if enabled else '0', | |
| 'ext_if': wan_interface | |
| }) | |
| # ==================== APPLICATION - DDNS ==================== | |
| def get_ddns_config(self) -> dict: | |
| """Get DDNS configuration""" | |
| data = self._get_asp_data("showDNSTable") | |
| return self._parse_key_value(data) | |
| def set_ddns_enabled(self, enabled: bool) -> str: | |
| """Enable/disable DDNS service""" | |
| return self._post_form("formDDNS", { | |
| 'ddnsEnable': '1' if enabled else '0', | |
| 'action': 'sw' | |
| }) | |
| def add_ddns(self, provider: str, hostname: str, username: str, password: str, | |
| interface: str = "LAN") -> str: | |
| """ | |
| Add DDNS entry. | |
| Args: | |
| provider: 'oray', 'dyndns', 'tzo', 'noip', or 'gnudip' | |
| hostname: DDNS hostname | |
| username: Account username | |
| password: Account password | |
| interface: Interface to use | |
| Example: | |
| >>> router.add_ddns( | |
| ... provider="dyndns", | |
| ... hostname="myhost.dyndns.org", | |
| ... username="myuser", | |
| ... password="mypass" | |
| ... ) | |
| """ | |
| data = { | |
| 'provider': provider, | |
| 'hostname': hostname, | |
| 'ifname': interface, | |
| 'action': 'ad' | |
| } | |
| # Provider-specific username/password fields | |
| if provider == 'oray': | |
| data['orayusername'] = username | |
| data['oraypassword'] = password | |
| elif provider == 'dyndns': | |
| data['dynusername'] = username | |
| data['dynpassword'] = password | |
| elif provider == 'tzo': | |
| data['tzoemail'] = username | |
| data['tzokey'] = password | |
| elif provider == 'noip': | |
| data['noipusername'] = username | |
| data['noippassword'] = password | |
| elif provider == 'gnudip': | |
| data['gnudipusername'] = username | |
| data['gnudippassword'] = password | |
| return self._post_form("formDDNS", data) | |
| def delete_ddns(self, rule_id: str) -> str: | |
| """Delete DDNS entry""" | |
| return self._post_form("formDDNS", { | |
| rule_id: '1', | |
| 'action': 'rm' | |
| }) | |
| # ==================== APPLICATION - VOIP ==================== | |
| def get_voip_config(self) -> dict: | |
| """Get VoIP basic configuration""" | |
| data = self._get_asp_data("asp_voip_e8c_get") | |
| return self._parse_key_value(data) | |
| def set_voip_config(self, config: dict) -> str: | |
| """Set VoIP configuration""" | |
| return self._post_form("formVoIP", config) | |
| # ==================== APPLICATION - SAMBA/FTP ==================== | |
| def get_samba_config(self) -> dict: | |
| """Get Samba file sharing configuration""" | |
| data = self._get_asp_data("get_Samba_Page_data") | |
| return self._parse_key_value(data) | |
| def set_samba_config(self, enabled: bool, username: str, password: str) -> str: | |
| """ | |
| Set Samba file sharing configuration. | |
| Args: | |
| enabled: Enable Samba | |
| username: Samba username | |
| password: Samba password | |
| """ | |
| return self._post_form("formSamba", { | |
| 'EnableSamba': '1' if enabled else '0', | |
| 'SambaName': username, | |
| 'SambaPasswd': password | |
| }) | |
| def get_ftp_config(self) -> dict: | |
| """Get FTP server configuration""" | |
| data = self._get_asp_data("get_Ftp_Page_data") | |
| return self._parse_key_value(data) | |
| def set_ftp_config(self, enabled: bool, username: str, password: str) -> str: | |
| """ | |
| Set FTP server configuration. | |
| Args: | |
| enabled: Enable FTP server | |
| username: FTP username | |
| password: FTP password | |
| """ | |
| return self._post_form("formFtp", { | |
| 'EnableFtp': '1' if enabled else '0', | |
| 'FtpName': username, | |
| 'FtpPasswd': password | |
| }) | |
| # ==================== MANAGEMENT - USER ==================== | |
| def get_user_info(self) -> dict: | |
| """Get current user information""" | |
| data = self._get_asp_data("initManageUserPage") | |
| return self._parse_key_value(data) | |
| def change_password(self, old_password: str, new_password: str) -> str: | |
| """ | |
| Change user password. | |
| Password requirements: | |
| - 6-32 characters | |
| - At least 2 types: uppercase, lowercase, numbers, special chars | |
| Args: | |
| old_password: Current password | |
| new_password: New password | |
| """ | |
| return self._post_form("new_formPasswordSetup", { | |
| 'oldPasswd': old_password, | |
| 'newPasswd': new_password, | |
| 'affirmPasswd': new_password | |
| }) | |
| # ==================== MANAGEMENT - DEVICE ==================== | |
| def backup_config(self, decrypt: bool = False) -> bytes: | |
| """ | |
| Backup device configuration. | |
| Args: | |
| decrypt: If True, return decrypted XML. If False, return raw encrypted data. | |
| Returns: | |
| bytes: Configuration file content (decrypted XML if decrypt=True, else encrypted) | |
| Example: | |
| >>> # Get encrypted backup | |
| >>> config = router.backup_config() | |
| >>> with open('backup.bin', 'wb') as f: | |
| ... f.write(config) | |
| >>> # Get decrypted XML directly | |
| >>> config = router.backup_config(decrypt=True) | |
| >>> with open('config.xml', 'wb') as f: | |
| ... f.write(config) | |
| """ | |
| import socket | |
| import urllib.parse | |
| # Get CSRF token first | |
| csrf_url = f"{self.base_url}/boaform/getASPdata/FMask" | |
| csrf_response = self.session.get(csrf_url) | |
| csrf_token = csrf_response.text.strip() | |
| # Parse host from base_url | |
| parsed = urllib.parse.urlparse(self.base_url) | |
| host = parsed.hostname | |
| port = parsed.port or 80 | |
| # Build HTTP request manually for HTTP/0.9 compatible response | |
| body = f"csrfMask={csrf_token}" | |
| request = ( | |
| f"POST /boaform/getASPdata/formMgmConfig HTTP/1.0\r\n" | |
| f"Host: {host}\r\n" | |
| f"Content-Type: application/x-www-form-urlencoded; charset=UTF-8\r\n" | |
| f"X-Requested-With: XMLHttpRequest\r\n" | |
| f"Content-Length: {len(body)}\r\n" | |
| f"Connection: close\r\n" | |
| f"\r\n" | |
| f"{body}" | |
| ) | |
| # Send request using raw socket | |
| sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
| sock.settimeout(30) | |
| sock.connect((host, port)) | |
| sock.sendall(request.encode()) | |
| # Receive response | |
| response = b"" | |
| while True: | |
| chunk = sock.recv(4096) | |
| if not chunk: | |
| break | |
| response += chunk | |
| sock.close() | |
| if decrypt: | |
| return self.decrypt_config(response) | |
| return response | |
| def restore_config(self, config_data: bytes, encrypt: bool = True) -> str: | |
| """ | |
| Restore device configuration from backup. | |
| Args: | |
| config_data: Configuration file content (plaintext XML or encrypted) | |
| encrypt: If True, encrypt plaintext XML before upload. | |
| If False, upload data as-is (for pre-encrypted backups). | |
| Example: | |
| >>> # Upload plaintext XML (will be encrypted automatically) | |
| >>> with open('config.xml', 'rb') as f: | |
| ... router.restore_config(f.read()) | |
| >>> # Upload pre-encrypted backup | |
| >>> with open('backup.bin', 'rb') as f: | |
| ... router.restore_config(f.read(), encrypt=False) | |
| """ | |
| # Encrypt if needed (for plaintext XML configs) | |
| if encrypt: | |
| # Check if it looks like plaintext XML | |
| if config_data.startswith(b'<') or b'<Config' in config_data[:100]: | |
| config_data = self.encrypt_config(config_data) | |
| files = {'binary': ('config.xml', config_data)} | |
| url = f"{self.base_url}/boaform/getASPdata/formMgmConfigUpload" | |
| response = self.session.post(url, files=files) | |
| return response.text | |
| def factory_reset(self, reset_type: int = 8) -> str: | |
| """ | |
| Reset device to factory defaults. | |
| Args: | |
| reset_type: 4=Reset to defaults, 8=Full factory reset | |
| """ | |
| return self._post_form("formNewReboot", {'reset': str(reset_type)}) | |
| def reboot(self) -> str: | |
| """Reboot the device""" | |
| return self._post_form("formNewReboot", {}) | |
| def set_scheduled_reboot(self, days: list, hour: int, minute: int) -> str: | |
| """ | |
| Set scheduled automatic reboot. | |
| Args: | |
| days: List of days (1=Mon, 2=Tue, ..., 7=Sun) | |
| hour: Hour (0-23) | |
| minute: Minute (0-59) | |
| Example: | |
| >>> # Reboot every Monday and Friday at 3:00 AM | |
| >>> router.set_scheduled_reboot([1, 5], 3, 0) | |
| """ | |
| day_bits = 0 | |
| for d in days: | |
| day_bits |= (1 << d) | |
| return self._post_form("formSysSchedule", { | |
| 'day': str(day_bits), | |
| 'hour': str(hour), | |
| 'minute': str(minute) | |
| }) | |
| def firmware_upgrade(self, firmware_data: bytes) -> str: | |
| """ | |
| Upgrade device firmware. | |
| Args: | |
| firmware_data: Firmware file content (.tar format) | |
| """ | |
| files = {'binary': ('firmware.tar', firmware_data)} | |
| url = f"{self.base_url}/boaform/getASPdata/formUpload" | |
| response = self.session.post(url, files=files) | |
| return response.text | |
| # ==================== MANAGEMENT - LOGS ==================== | |
| def get_system_log(self) -> str: | |
| """Get system log""" | |
| data = self._get_asp_data("system_log") | |
| return data | |
| def clear_system_log(self) -> str: | |
| """Clear system log""" | |
| return self._post_form("formClearLog", {}) | |
| # ==================== DIAGNOSTICS ==================== | |
| def ping(self, target: str, count: int = 4) -> str: | |
| """ | |
| Ping a host. | |
| Args: | |
| target: IP or hostname to ping | |
| count: Number of pings | |
| """ | |
| return self._post_form("formPing", { | |
| 'target': target, | |
| 'count': count | |
| }) | |
| def traceroute(self, target: str) -> str: | |
| """Traceroute to a host""" | |
| return self._post_form("formTraceroute", {'target': target}) | |
| def nslookup(self, hostname: str) -> str: | |
| """DNS lookup""" | |
| return self._post_form("formNslookup", {'hostname': hostname}) | |
| def run_self_diagnose(self) -> dict: | |
| """Run self-diagnostics""" | |
| data = self._post_form("formSelfDiagnose", {}) | |
| return self._parse_json_response(data) | |
| def scan_access_points(self, band: str = "both") -> list: | |
| """ | |
| Scan for nearby access points. | |
| Args: | |
| band: '2.4g', '5g', or 'both' | |
| """ | |
| data = self._post_form("formAPDetect", {'band': band}) | |
| return self._parse_json_response(data) | |
| # ==================== INTERACTIVE CLI ==================== | |
| import signal | |
| import sys | |
| from datetime import datetime | |
| class InteractiveCLI: | |
| """Interactive command-line interface for GPON Router Management""" | |
| def __init__(self): | |
| self.router = None | |
| self.running = True | |
| def print_header(self, title: str): | |
| """Print formatted section header""" | |
| width = 60 | |
| print("\n" + "=" * width) | |
| print(f" {title}") | |
| print("=" * width) | |
| def print_subheader(self, title: str): | |
| """Print formatted subsection header""" | |
| print(f"\n--- {title} ---") | |
| def print_table(self, data: dict, title: str = None): | |
| """Print data as formatted table""" | |
| if title: | |
| self.print_subheader(title) | |
| if not data: | |
| print(" No data available") | |
| return | |
| max_key_len = max(len(str(k)) for k in data.keys()) if data else 10 | |
| for key, value in data.items(): | |
| print(f" {str(key):<{max_key_len}} : {value}") | |
| def print_list(self, items: list, title: str = None): | |
| """Print list as formatted output""" | |
| if title: | |
| self.print_subheader(title) | |
| if not items: | |
| print(" No items found") | |
| return | |
| for i, item in enumerate(items, 1): | |
| if isinstance(item, dict): | |
| print(f"\n [{i}]") | |
| for k, v in item.items(): | |
| print(f" {k}: {v}") | |
| else: | |
| print(f" [{i}] {item}") | |
| def print_success(self, message: str): | |
| """Print success message""" | |
| print(f"\n [SUCCESS] {message}") | |
| def print_error(self, message: str): | |
| """Print error message""" | |
| print(f"\n [ERROR] {message}") | |
| def print_info(self, message: str): | |
| """Print info message""" | |
| print(f"\n [INFO] {message}") | |
| def get_input(self, prompt: str, default: str = None) -> str: | |
| """Get user input with optional default""" | |
| if default: | |
| user_input = input(f" {prompt} [{default}]: ").strip() | |
| return user_input if user_input else default | |
| return input(f" {prompt}: ").strip() | |
| def get_bool_input(self, prompt: str, default: bool = None) -> bool: | |
| """Get yes/no input with optional default""" | |
| if default is True: | |
| response = input(f" {prompt} (Y/n): ").strip().lower() | |
| return response not in ['n', 'no', '0', 'false'] | |
| elif default is False: | |
| response = input(f" {prompt} (y/N): ").strip().lower() | |
| return response in ['y', 'yes', '1', 'true'] | |
| else: | |
| response = input(f" {prompt} (y/n): ").strip().lower() | |
| return response in ['y', 'yes', '1', 'true'] | |
| def main_menu(self): | |
| """Display main menu""" | |
| self.print_header("GPON Router Configuration Manager") | |
| print(""" | |
| MAIN MENU | |
| ───────────────────────────────── | |
| 1. Status & Information | |
| 2. WAN Configuration | |
| 3. LAN Configuration | |
| 4. WiFi Configuration | |
| 5. Security Settings | |
| 6. NAT / Port Forwarding | |
| 7. Applications (DDNS, UPnP, etc.) | |
| 8. Device Management | |
| 9. Diagnostics | |
| 0. Exit (or Ctrl+C) | |
| ───────────────────────────────── | |
| """) | |
| def status_menu(self): | |
| """Status and Information submenu""" | |
| while True: | |
| self.print_header("Status & Information") | |
| print(""" | |
| 1. Device Information | |
| 2. Resource Usage (CPU/Memory) | |
| 3. WAN Status | |
| 4. PON Information | |
| 5. Connected Users | |
| 6. Wireless Clients | |
| 7. Show All Status | |
| 0. Back to Main Menu | |
| """) | |
| choice = self.get_input("Select option") | |
| if choice == '0': | |
| break | |
| elif choice == '1': | |
| info = self.router.get_device_info() | |
| # Format with friendly labels | |
| label_map = { | |
| 'devId': 'Device ID', | |
| 'devModel': 'Model', | |
| 'hdVer': 'Hardware Version', | |
| 'stVer': 'Firmware Version', | |
| 'gpon_sn': 'GPON Serial', | |
| 'mac_address': 'MAC Address', | |
| 'web_uptime': 'Uptime' | |
| } | |
| formatted = {} | |
| for key, value in info.items(): | |
| label = label_map.get(key, key) | |
| formatted[label] = value | |
| self.print_table(formatted, "Device Information") | |
| elif choice == '2': | |
| resources = self.router.get_device_resources() | |
| formatted = {} | |
| for key, value in resources.items(): | |
| if 'cpu' in key.lower() or 'cp' in key.lower(): | |
| formatted['CPU Usage'] = f"{value}%" | |
| elif 'mem' in key.lower(): | |
| formatted['Memory Usage'] = f"{value}%" | |
| else: | |
| formatted[key] = value | |
| self.print_table(formatted if formatted else resources, "Resource Usage") | |
| elif choice == '3': | |
| status = self.router.get_wan_status() | |
| if isinstance(status, list) and status: | |
| self.print_subheader("WAN Connections") | |
| for i, conn in enumerate(status, 1): | |
| print(f"\n Connection {i}:") | |
| formatted = { | |
| 'Service': conn.get('servName', 'N/A'), | |
| 'Protocol': conn.get('protocol', 'N/A'), | |
| 'Status': conn.get('strStatus', 'N/A').upper(), | |
| 'IP Address': conn.get('ipAddr', 'N/A'), | |
| 'Subnet Mask': conn.get('netmask', 'N/A'), | |
| 'Gateway': conn.get('gateway', 'N/A'), | |
| 'DNS 1': conn.get('dns1', 'N/A'), | |
| 'VLAN ID': conn.get('vlanId', 'N/A'), | |
| 'MAC Address': conn.get('MacAddr', 'N/A') | |
| } | |
| for k, v in formatted.items(): | |
| print(f" {k:<12} : {v}") | |
| else: | |
| self.print_table(status if isinstance(status, dict) else {}, "WAN Status") | |
| elif choice == '4': | |
| pon = self.router.get_pon_info() | |
| # Format PON info with friendly labels | |
| label_map = { | |
| 'pon_mode': 'PON Mode', | |
| 'pon_connect_status': 'Connection Status', | |
| 'temperature': 'Temperature', | |
| 'voltage': 'Voltage', | |
| 'tx-power': 'TX Power', | |
| 'rx-power': 'RX Power', | |
| 'bias-current': 'Bias Current', | |
| 'bytes-sent': 'Bytes Sent', | |
| 'bytes-received': 'Bytes Received', | |
| 'packets-sent': 'Packets Sent', | |
| 'packets-received': 'Packets Received' | |
| } | |
| formatted = {} | |
| for key, value in pon.items(): | |
| label = label_map.get(key, key.replace('-', ' ').title()) | |
| formatted[label] = value | |
| self.print_table(formatted, "PON Information") | |
| elif choice == '5': | |
| users = self.router.get_connected_users() | |
| if isinstance(users, list) and users: | |
| self.print_subheader("Connected Users (DHCP Clients)") | |
| for i, user in enumerate(users, 1): | |
| print(f"\n Device {i}:") | |
| # Format lease time | |
| lease_time = user.get('liveTime', '') | |
| if lease_time: | |
| try: | |
| secs = int(lease_time) | |
| hours = secs // 3600 | |
| mins = (secs % 3600) // 60 | |
| lease_time = f"{hours}h {mins}m" | |
| except: | |
| pass | |
| formatted = { | |
| 'Hostname': user.get('devname', user.get('hostName', '')), | |
| 'IP Address': user.get('ipAddr', ''), | |
| 'MAC Address': user.get('macAddr', ''), | |
| 'Lease Time': lease_time | |
| } | |
| for k, v in formatted.items(): | |
| if v: | |
| print(f" {k:<12} : {v}") | |
| elif isinstance(users, dict) and users: | |
| self.print_table(users, "Connected Users") | |
| else: | |
| print("\n--- Connected Users ---") | |
| print(" No users connected") | |
| elif choice == '6': | |
| clients = self.router.get_wlan_clients() | |
| if isinstance(clients, list): | |
| self.print_list(clients, "Wireless Clients") | |
| else: | |
| self.print_table(clients, "Wireless Clients") | |
| elif choice == '7': | |
| info = self.router.get_device_info() | |
| resources = self.router.get_device_resources() | |
| self.print_table(info, "Device Information") | |
| self.print_table(resources, "Resource Usage") | |
| input("\n Press Enter to continue...") | |
| def wan_menu(self): | |
| """WAN Configuration submenu""" | |
| while True: | |
| self.print_header("WAN Configuration") | |
| print(""" | |
| VIEW | |
| 1. List WAN Connections | |
| 2. View WAN Status | |
| CREATE | |
| 3. Create PPPoE Connection | |
| 4. Create DHCP Connection | |
| 5. Create Static IP Connection | |
| 6. Create Bridge Connection | |
| MANAGE | |
| 7. Enable/Disable Connection | |
| 8. Delete Connection | |
| 9. Set NAT Type | |
| 0. Back to Main Menu | |
| """) | |
| choice = self.get_input("Select option") | |
| if choice == '0': | |
| break | |
| elif choice == '1': | |
| connections = self.router.get_wan_connections_list() | |
| if isinstance(connections, list) and connections: | |
| self.print_subheader("WAN Connections") | |
| for i, conn in enumerate(connections, 1): | |
| print(f"\n Connection {i}: {conn.get('name', 'Unknown')}") | |
| # Determine connection type | |
| conn_type = "Route" if conn.get('brmode') == '0' else "Bridge" | |
| ip_mode = "DHCP" if conn.get('cmode') == '1' else "Static" | |
| if conn.get('pppPassword'): | |
| ip_mode = "PPPoE" | |
| formatted = { | |
| 'Type': conn_type, | |
| 'Mode': ip_mode, | |
| 'IP Address': conn.get('ipAddr', 'N/A'), | |
| 'Subnet': conn.get('netMask', 'N/A'), | |
| 'Gateway': conn.get('remoteIpAddr', 'N/A'), | |
| 'DNS': conn.get('v4dns1', 'N/A'), | |
| 'VLAN ID': conn.get('vid', 'N/A'), | |
| 'NAT': 'Enabled' if conn.get('napt') == '1' else 'Disabled', | |
| 'MTU': conn.get('mtu', 'N/A') | |
| } | |
| for k, v in formatted.items(): | |
| if v and v != 'N/A': | |
| print(f" {k:<12} : {v}") | |
| else: | |
| print("\n--- WAN Connections ---") | |
| print(" No connections configured") | |
| elif choice == '2': | |
| status = self.router.get_wan_status() | |
| if isinstance(status, list) and status: | |
| self.print_subheader("WAN Status") | |
| for i, conn in enumerate(status, 1): | |
| print(f"\n Connection {i}:") | |
| formatted = { | |
| 'Service': conn.get('servName', 'N/A'), | |
| 'Protocol': conn.get('protocol', 'N/A'), | |
| 'Status': conn.get('strStatus', 'N/A').upper(), | |
| 'IP Address': conn.get('ipAddr', 'N/A'), | |
| 'Subnet Mask': conn.get('netmask', 'N/A'), | |
| 'Gateway': conn.get('gateway', 'N/A'), | |
| 'DNS 1': conn.get('dns1', 'N/A'), | |
| 'VLAN ID': conn.get('vlanId', 'N/A'), | |
| 'MAC Address': conn.get('MacAddr', 'N/A') | |
| } | |
| for k, v in formatted.items(): | |
| print(f" {k:<12} : {v}") | |
| else: | |
| print("\n--- WAN Status ---") | |
| print(" No status available") | |
| elif choice == '3': | |
| self.print_subheader("Create PPPoE Connection") | |
| username = self.get_input("PPPoE Username") | |
| password = self.get_input("PPPoE Password") | |
| vlan = self.get_input("VLAN ID (leave empty for none)", "") | |
| vlan_id = int(vlan) if vlan else None | |
| nat = self.get_bool_input("Enable NAT?") | |
| result = self.router.create_pppoe_route_connection( | |
| username=username, | |
| password=password, | |
| vlan_id=vlan_id, | |
| nat_enabled=nat | |
| ) | |
| self.print_success("PPPoE connection created") | |
| elif choice == '4': | |
| self.print_subheader("Create DHCP Connection") | |
| vlan = self.get_input("VLAN ID (leave empty for none)", "") | |
| vlan_id = int(vlan) if vlan else None | |
| nat = self.get_bool_input("Enable NAT?") | |
| result = self.router.create_dhcp_route_connection( | |
| vlan_id=vlan_id, | |
| nat_enabled=nat | |
| ) | |
| self.print_success("DHCP connection created") | |
| elif choice == '5': | |
| self.print_subheader("Create Static IP Connection") | |
| ip = self.get_input("IP Address") | |
| mask = self.get_input("Subnet Mask", "255.255.255.0") | |
| gateway = self.get_input("Gateway") | |
| dns1 = self.get_input("DNS 1", "8.8.8.8") | |
| dns2 = self.get_input("DNS 2", "8.8.4.4") | |
| vlan = self.get_input("VLAN ID (leave empty for none)", "") | |
| vlan_id = int(vlan) if vlan else None | |
| result = self.router.create_static_route_connection( | |
| ip_address=ip, | |
| subnet_mask=mask, | |
| gateway=gateway, | |
| dns1=dns1, | |
| dns2=dns2, | |
| vlan_id=vlan_id | |
| ) | |
| self.print_success("Static IP connection created") | |
| elif choice == '6': | |
| self.print_subheader("Create Bridge Connection") | |
| vlan = self.get_input("VLAN ID (leave empty for none)", "") | |
| vlan_id = int(vlan) if vlan else None | |
| result = self.router.create_bridge_connection(vlan_id=vlan_id) | |
| self.print_success("Bridge connection created") | |
| elif choice == '7': | |
| conn_name = self.get_input("Connection name") | |
| enable = self.get_bool_input("Enable connection?") | |
| if enable: | |
| self.router.enable_wan_connection(conn_name) | |
| self.print_success(f"Connection '{conn_name}' enabled") | |
| else: | |
| self.router.disable_wan_connection(conn_name) | |
| self.print_success(f"Connection '{conn_name}' disabled") | |
| elif choice == '8': | |
| conn_name = self.get_input("Connection name to delete") | |
| if self.get_bool_input(f"Are you sure you want to delete '{conn_name}'?"): | |
| self.router.delete_wan_connection(conn_name) | |
| self.print_success(f"Connection '{conn_name}' deleted") | |
| elif choice == '9': | |
| self.print_subheader("Set NAT Type") | |
| print(" 0 = NAT4 (Symmetric)") | |
| print(" 1 = NAT1 (Full Cone)") | |
| print(" 2 = NAT2") | |
| nat_type = int(self.get_input("NAT Type", "0")) | |
| self.router.set_nat_type(nat_type) | |
| self.print_success(f"NAT type set to {nat_type}") | |
| input("\n Press Enter to continue...") | |
| def lan_menu(self): | |
| """LAN Configuration submenu""" | |
| while True: | |
| self.print_header("LAN Configuration") | |
| print(""" | |
| 1. View LAN Configuration | |
| 2. Set LAN IP Address | |
| 3. Configure DHCP Server | |
| 4. View DHCP Reservations | |
| 5. Add DHCP Reservation | |
| 6. Delete DHCP Reservation | |
| 0. Back to Main Menu | |
| """) | |
| choice = self.get_input("Select option") | |
| if choice == '0': | |
| break | |
| elif choice == '1': | |
| config = self.router.get_lan_config() | |
| formatted = { | |
| 'LAN IP': config.get('uIp', 'N/A'), | |
| 'Subnet Mask': config.get('uMask', 'N/A'), | |
| 'DHCP Server': 'Enabled' if config.get('uDhcpType') == '1' else 'Disabled', | |
| 'DHCP Start': config.get('dhcpRangeStart', 'N/A'), | |
| 'DHCP End': config.get('dhcpRangeEnd', 'N/A'), | |
| 'Lease Time': config.get('ulTime', 'N/A') + ' seconds' | |
| } | |
| self.print_table(formatted, "LAN Configuration") | |
| elif choice == '2': | |
| self.print_subheader("Set LAN IP") | |
| ip = self.get_input("New LAN IP", "192.168.3.1") | |
| mask = self.get_input("Subnet Mask", "255.255.255.0") | |
| self.router.set_lan_ipv4_config({'uIp': ip, 'uMask': mask}) | |
| self.print_success(f"LAN IP set to {ip}") | |
| elif choice == '3': | |
| self.print_subheader("Configure DHCP Server") | |
| enable = self.get_bool_input("Enable DHCP Server?") | |
| if enable: | |
| start = self.get_input("DHCP Start IP", "192.168.3.100") | |
| end = self.get_input("DHCP End IP", "192.168.3.200") | |
| lease = self.get_input("Lease Time (seconds)", "86400") | |
| self.router.set_lan_ipv4_config({ | |
| 'uDhcpType': '1', | |
| 'dhcpRangeStart': start, | |
| 'dhcpRangeEnd': end, | |
| 'ulTime': lease | |
| }) | |
| self.print_success("DHCP Server configured") | |
| else: | |
| self.router.set_lan_ipv4_config({'uDhcpType': '0'}) | |
| self.print_success("DHCP Server disabled") | |
| elif choice == '4': | |
| reservations = self.router.get_dhcp_reservations() | |
| if isinstance(reservations, list): | |
| self.print_list(reservations, "DHCP Reservations") | |
| else: | |
| self.print_table(reservations, "DHCP Reservations") | |
| elif choice == '5': | |
| self.print_subheader("Add DHCP Reservation") | |
| mac = self.get_input("MAC Address (00:11:22:33:44:55)") | |
| ip = self.get_input("IP Address") | |
| self.router.add_dhcp_reservation(mac, ip) | |
| self.print_success(f"Reserved {ip} for {mac}") | |
| elif choice == '6': | |
| self.print_subheader("Delete DHCP Reservation") | |
| mac = self.get_input("MAC Address") | |
| ip = self.get_input("IP Address") | |
| self.router.delete_dhcp_reservation(mac, ip) | |
| self.print_success(f"Reservation deleted") | |
| input("\n Press Enter to continue...") | |
| def wifi_menu(self): | |
| """WiFi Configuration submenu""" | |
| while True: | |
| self.print_header("WiFi Configuration") | |
| print(""" | |
| 2.4GHz | |
| 1. View 2.4GHz Configuration | |
| 2. Configure 2.4GHz WiFi | |
| 5GHz | |
| 3. View 5GHz Configuration | |
| 4. Configure 5GHz WiFi | |
| OTHER | |
| 5. View Connected Clients | |
| 6. Configure Band Steering | |
| 0. Back to Main Menu | |
| """) | |
| choice = self.get_input("Select option") | |
| if choice == '0': | |
| break | |
| elif choice == '1': | |
| config = self.router.get_wifi_24g_config() | |
| self.print_table(config, "2.4GHz WiFi Configuration") | |
| elif choice == '2': | |
| self.print_subheader("Configure 2.4GHz WiFi") | |
| ssid = self.get_input("SSID (Network Name)") | |
| password = self.get_input("Password (8-63 chars)") | |
| hidden = self.get_bool_input("Hide SSID?") | |
| channel = self.get_input("Channel (0=Auto, 1-11)", "0") | |
| self.router.set_wifi_24g_config({ | |
| 'ssid': ssid, | |
| 'pskValue': password, | |
| 'hiddenSSID': '1' if hidden else '0', | |
| 'channel': channel, | |
| 'wlanDisabled': '0' | |
| }) | |
| self.print_success("2.4GHz WiFi configured") | |
| elif choice == '3': | |
| config = self.router.get_wifi_5g_config() | |
| self.print_table(config, "5GHz WiFi Configuration") | |
| elif choice == '4': | |
| self.print_subheader("Configure 5GHz WiFi") | |
| ssid = self.get_input("SSID (Network Name)") | |
| password = self.get_input("Password (8-63 chars)") | |
| hidden = self.get_bool_input("Hide SSID?") | |
| channel = self.get_input("Channel (0=Auto)", "0") | |
| self.router.set_wifi_5g_config({ | |
| 'ssid': ssid, | |
| 'pskValue': password, | |
| 'hiddenSSID': '1' if hidden else '0', | |
| 'channel': channel, | |
| 'wlanDisabled': '0' | |
| }) | |
| self.print_success("5GHz WiFi configured") | |
| elif choice == '5': | |
| clients = self.router.get_wlan_clients() | |
| if isinstance(clients, list): | |
| self.print_list(clients, "Connected Wireless Clients") | |
| else: | |
| self.print_table(clients, "Connected Wireless Clients") | |
| elif choice == '6': | |
| self.print_subheader("Band Steering") | |
| enable = self.get_bool_input("Enable Band Steering?") | |
| if enable: | |
| threshold = self.get_input("RSSI Threshold (dBm)", "-70") | |
| self.router.set_band_steering(True, int(threshold)) | |
| else: | |
| self.router.set_band_steering(False) | |
| self.print_success("Band Steering configured") | |
| input("\n Press Enter to continue...") | |
| def security_menu(self): | |
| """Security Settings submenu""" | |
| while True: | |
| self.print_header("Security Settings") | |
| print(""" | |
| FIREWALL | |
| 1. View Firewall Configuration | |
| 2. Configure Firewall | |
| MAC FILTER | |
| 3. View MAC Filter Settings | |
| 4. Set MAC Filter Mode | |
| 5. Add MAC to Filter | |
| 6. Delete MAC from Filter | |
| URL FILTER | |
| 7. Add URL Filter | |
| 8. Delete URL Filter | |
| 0. Back to Main Menu | |
| """) | |
| choice = self.get_input("Select option") | |
| if choice == '0': | |
| break | |
| elif choice == '1': | |
| config = self.router.get_firewall_config() | |
| formatted = { | |
| 'Firewall Level': 'High' if config.get('FirewallLevel') == '2' else 'Low', | |
| 'DoS Protection': 'Enabled' if config.get('DosEnable') == '1' else 'Disabled' | |
| } | |
| self.print_table(formatted, "Firewall Configuration") | |
| elif choice == '2': | |
| self.print_subheader("Configure Firewall") | |
| print(" 0 = Low Security") | |
| print(" 2 = High Security") | |
| level = int(self.get_input("Security Level", "0")) | |
| dos = self.get_bool_input("Enable DoS Protection?") | |
| self.router.set_firewall_config(level, dos) | |
| self.print_success("Firewall configured") | |
| elif choice == '3': | |
| config = self.router.get_mac_filter_config() | |
| self.print_table(config, "MAC Filter Configuration") | |
| elif choice == '4': | |
| self.print_subheader("Set MAC Filter Mode") | |
| print(" Options: disabled, whitelist, blacklist") | |
| mode = self.get_input("Mode", "disabled") | |
| self.router.set_mac_filter_mode(mode) | |
| self.print_success(f"MAC Filter mode set to '{mode}'") | |
| elif choice == '5': | |
| mac = self.get_input("MAC Address") | |
| desc = self.get_input("Description", "") | |
| self.router.add_mac_filter(mac, desc) | |
| self.print_success(f"MAC {mac} added to filter") | |
| elif choice == '6': | |
| mac = self.get_input("MAC Address to remove") | |
| self.router.delete_mac_filter(mac) | |
| self.print_success(f"MAC {mac} removed from filter") | |
| elif choice == '7': | |
| url = self.get_input("URL to block") | |
| self.router.add_url_filter(url) | |
| self.print_success(f"URL '{url}' added to filter") | |
| elif choice == '8': | |
| url = self.get_input("URL to unblock") | |
| self.router.delete_url_filter(url) | |
| self.print_success(f"URL '{url}' removed from filter") | |
| input("\n Press Enter to continue...") | |
| def nat_menu(self): | |
| """NAT / Port Forwarding submenu""" | |
| while True: | |
| self.print_header("NAT / Port Forwarding") | |
| print(""" | |
| DMZ | |
| 1. View DMZ Configuration | |
| 2. Enable/Disable DMZ | |
| PORT FORWARDING | |
| 3. View Port Forwarding Rules | |
| 4. Add Port Forwarding Rule | |
| 5. Delete Port Forwarding Rule | |
| ALG | |
| 6. View ALG Configuration | |
| 7. Configure ALG | |
| UPnP | |
| 8. View UPnP Status | |
| 9. Enable/Disable UPnP | |
| 0. Back to Main Menu | |
| """) | |
| choice = self.get_input("Select option") | |
| if choice == '0': | |
| break | |
| elif choice == '1': | |
| config = self.router.get_dmz_config() | |
| formatted = { | |
| 'DMZ Enabled': 'Yes' if config.get('dmzcap') == '1' else 'No', | |
| 'DMZ Host IP': config.get('ip', 'N/A') | |
| } | |
| self.print_table(formatted, "DMZ Configuration") | |
| elif choice == '2': | |
| enable = self.get_bool_input("Enable DMZ?") | |
| if enable: | |
| ip = self.get_input("DMZ Host IP") | |
| self.router.set_dmz(True, ip) | |
| self.print_success(f"DMZ enabled for {ip}") | |
| else: | |
| self.router.set_dmz(False) | |
| self.print_success("DMZ disabled") | |
| elif choice == '3': | |
| rules = self.router.get_virtual_servers() | |
| self.print_table(rules, "Port Forwarding Rules") | |
| elif choice == '4': | |
| self.print_subheader("Add Port Forwarding Rule") | |
| name = self.get_input("Rule Name") | |
| ext_start = int(self.get_input("External Port Start")) | |
| ext_end = int(self.get_input("External Port End", str(ext_start))) | |
| int_ip = self.get_input("Internal IP") | |
| int_port = int(self.get_input("Internal Port", str(ext_start))) | |
| print(" Protocol: 1=TCP, 2=UDP, 4=TCP+UDP") | |
| proto = int(self.get_input("Protocol", "1")) | |
| self.router.add_virtual_server( | |
| name=name, | |
| external_port_start=ext_start, | |
| external_port_end=ext_end, | |
| internal_ip=int_ip, | |
| internal_port=int_port, | |
| protocol=proto | |
| ) | |
| self.print_success(f"Port forwarding rule '{name}' added") | |
| elif choice == '5': | |
| rule_id = self.get_input("Rule ID to delete") | |
| self.router.delete_virtual_server(rule_id) | |
| self.print_success("Rule deleted") | |
| elif choice == '6': | |
| config = self.router.get_alg_config() | |
| formatted = {} | |
| alg_names = { | |
| 'ftp_algonoff': 'FTP ALG', | |
| 'tftp_algonoff': 'TFTP ALG', | |
| 'h323_algonoff': 'H.323 ALG', | |
| 'rtsp_algonoff': 'RTSP ALG', | |
| 'l2tp_algonoff': 'L2TP ALG', | |
| 'ipsec_algonoff': 'IPSec ALG', | |
| 'sip_algonoff': 'SIP ALG', | |
| 'pptp_algonoff': 'PPTP ALG' | |
| } | |
| for key, name in alg_names.items(): | |
| if key in config: | |
| formatted[name] = 'Enabled' if config[key] == '1' else 'Disabled' | |
| self.print_table(formatted, "ALG Configuration") | |
| elif choice == '7': | |
| self.print_subheader("Configure ALG") | |
| ftp = '1' if self.get_bool_input("Enable FTP ALG?") else '0' | |
| sip = '1' if self.get_bool_input("Enable SIP ALG?") else '0' | |
| h323 = '1' if self.get_bool_input("Enable H.323 ALG?") else '0' | |
| pptp = '1' if self.get_bool_input("Enable PPTP ALG?") else '0' | |
| self.router.set_alg_config({ | |
| 'ftp_algonoff': ftp, | |
| 'sip_algonoff': sip, | |
| 'h323_algonoff': h323, | |
| 'pptp_algonoff': pptp | |
| }) | |
| self.print_success("ALG configured") | |
| elif choice == '8': | |
| config = self.router.get_upnp_config() | |
| formatted = { | |
| 'UPnP': 'Enabled' if config.get('daemon') == '1' else 'Disabled' | |
| } | |
| self.print_table(formatted, "UPnP Status") | |
| elif choice == '9': | |
| enable = self.get_bool_input("Enable UPnP?") | |
| self.router.set_upnp(enable) | |
| self.print_success(f"UPnP {'enabled' if enable else 'disabled'}") | |
| input("\n Press Enter to continue...") | |
| def applications_menu(self): | |
| """Applications submenu""" | |
| while True: | |
| self.print_header("Applications") | |
| print(""" | |
| DDNS | |
| 1. View DDNS Configuration | |
| 2. Enable/Disable DDNS | |
| 3. Add DDNS Entry | |
| SAMBA / FTP | |
| 4. View Samba Configuration | |
| 5. Configure Samba | |
| 6. View FTP Configuration | |
| 7. Configure FTP | |
| OTHER | |
| 8. View TR-069 Configuration | |
| 9. Configure QoS | |
| 0. Back to Main Menu | |
| """) | |
| choice = self.get_input("Select option") | |
| if choice == '0': | |
| break | |
| elif choice == '1': | |
| config = self.router.get_ddns_config() | |
| self.print_table(config, "DDNS Configuration") | |
| elif choice == '2': | |
| enable = self.get_bool_input("Enable DDNS?") | |
| self.router.set_ddns_enabled(enable) | |
| self.print_success(f"DDNS {'enabled' if enable else 'disabled'}") | |
| elif choice == '3': | |
| self.print_subheader("Add DDNS Entry") | |
| print(" Providers: oray, dyndns, tzo, noip, gnudip") | |
| provider = self.get_input("Provider", "dyndns") | |
| hostname = self.get_input("Hostname") | |
| username = self.get_input("Username") | |
| password = self.get_input("Password") | |
| self.router.add_ddns(provider, hostname, username, password) | |
| self.print_success(f"DDNS entry for {hostname} added") | |
| elif choice == '4': | |
| config = self.router.get_samba_config() | |
| self.print_table(config, "Samba Configuration") | |
| elif choice == '5': | |
| self.print_subheader("Configure Samba") | |
| enable = self.get_bool_input("Enable Samba?") | |
| if enable: | |
| username = self.get_input("Username") | |
| password = self.get_input("Password") | |
| self.router.set_samba_config(True, username, password) | |
| else: | |
| self.router.set_samba_config(False, "", "") | |
| self.print_success("Samba configured") | |
| elif choice == '6': | |
| config = self.router.get_ftp_config() | |
| self.print_table(config, "FTP Configuration") | |
| elif choice == '7': | |
| self.print_subheader("Configure FTP") | |
| enable = self.get_bool_input("Enable FTP?") | |
| if enable: | |
| username = self.get_input("Username") | |
| password = self.get_input("Password") | |
| self.router.set_ftp_config(True, username, password) | |
| else: | |
| self.router.set_ftp_config(False, "", "") | |
| self.print_success("FTP configured") | |
| elif choice == '8': | |
| config = self.router.get_tr069_config() | |
| self.print_table(config, "TR-069 Configuration") | |
| elif choice == '9': | |
| enable = self.get_bool_input("Enable QoS?") | |
| self.router.set_qos_enabled(enable) | |
| self.print_success(f"QoS {'enabled' if enable else 'disabled'}") | |
| input("\n Press Enter to continue...") | |
| def device_menu(self): | |
| """Device Management submenu""" | |
| while True: | |
| self.print_header("Device Management") | |
| print(""" | |
| BACKUP / RESTORE | |
| 1. Backup Configuration | |
| 2. Restore Configuration | |
| MAINTENANCE | |
| 3. Reboot Router | |
| 4. Factory Reset | |
| 5. Set Scheduled Reboot | |
| 6. Firmware Upgrade | |
| PASSWORD | |
| 7. Change Admin Password | |
| LOGS | |
| 8. View System Log | |
| 9. Clear System Log | |
| 0. Back to Main Menu | |
| """) | |
| choice = self.get_input("Select option") | |
| if choice == '0': | |
| break | |
| elif choice == '1': | |
| decrypt = self.get_bool_input("Decrypt to readable XML?", default=True) | |
| default_ext = ".xml" if decrypt else ".bin" | |
| filename = self.get_input("Backup filename", f"router_backup{default_ext}") | |
| config = self.router.backup_config(decrypt=decrypt) | |
| with open(filename, 'wb') as f: | |
| f.write(config) | |
| if decrypt: | |
| self.print_success(f"Decrypted configuration saved to '{filename}'") | |
| else: | |
| self.print_success(f"Encrypted configuration saved to '{filename}'") | |
| elif choice == '2': | |
| filename = self.get_input("Config filename to restore") | |
| try: | |
| with open(filename, 'rb') as f: | |
| config_data = f.read() | |
| # Auto-detect if file is plaintext XML or encrypted | |
| is_plaintext = config_data.startswith(b'<') or b'<Config' in config_data[:100] | |
| if is_plaintext: | |
| self.print_info("Detected plaintext XML - will encrypt before upload") | |
| self.router.restore_config(config_data, encrypt=True) | |
| else: | |
| self.print_info("Detected encrypted backup - uploading as-is") | |
| self.router.restore_config(config_data, encrypt=False) | |
| self.print_success("Configuration restored. Router will reboot.") | |
| except FileNotFoundError: | |
| self.print_error(f"File '{filename}' not found") | |
| elif choice == '3': | |
| if self.get_bool_input("Are you sure you want to reboot?"): | |
| self.router.reboot() | |
| self.print_success("Router is rebooting...") | |
| self.running = False | |
| break | |
| elif choice == '4': | |
| print("\n WARNING: This will erase all settings!") | |
| if self.get_bool_input("Are you absolutely sure?"): | |
| self.router.factory_reset() | |
| self.print_success("Factory reset initiated...") | |
| self.running = False | |
| break | |
| elif choice == '5': | |
| self.print_subheader("Scheduled Reboot") | |
| print(" Enter days as comma-separated (1=Mon, 2=Tue, ..., 7=Sun)") | |
| days_input = self.get_input("Days (e.g., 1,3,5)", "") | |
| if days_input: | |
| days = [int(d.strip()) for d in days_input.split(',')] | |
| hour = int(self.get_input("Hour (0-23)", "3")) | |
| minute = int(self.get_input("Minute (0-59)", "0")) | |
| self.router.set_scheduled_reboot(days, hour, minute) | |
| self.print_success("Scheduled reboot configured") | |
| elif choice == '6': | |
| filename = self.get_input("Firmware file (.tar)") | |
| try: | |
| with open(filename, 'rb') as f: | |
| self.router.firmware_upgrade(f.read()) | |
| self.print_success("Firmware upgrade started...") | |
| except FileNotFoundError: | |
| self.print_error(f"File '{filename}' not found") | |
| elif choice == '7': | |
| old_pass = self.get_input("Current Password") | |
| new_pass = self.get_input("New Password") | |
| confirm = self.get_input("Confirm New Password") | |
| if new_pass != confirm: | |
| self.print_error("Passwords do not match") | |
| else: | |
| self.router.change_password(old_pass, new_pass) | |
| self.print_success("Password changed") | |
| elif choice == '8': | |
| log = self.router.get_system_log() | |
| print("\n--- System Log ---") | |
| print(log[:2000] if len(log) > 2000 else log) | |
| elif choice == '9': | |
| self.router.clear_system_log() | |
| self.print_success("System log cleared") | |
| if self.running: | |
| input("\n Press Enter to continue...") | |
| def diagnostics_menu(self): | |
| """Diagnostics submenu""" | |
| while True: | |
| self.print_header("Diagnostics") | |
| print(""" | |
| NETWORK TESTS | |
| 1. Ping | |
| 2. Traceroute | |
| 3. DNS Lookup | |
| OTHER | |
| 4. Run Self-Diagnostics | |
| 5. Scan Nearby Access Points | |
| 0. Back to Main Menu | |
| """) | |
| choice = self.get_input("Select option") | |
| if choice == '0': | |
| break | |
| elif choice == '1': | |
| target = self.get_input("Host to ping", "8.8.8.8") | |
| count = int(self.get_input("Number of pings", "4")) | |
| result = self.router.ping(target, count) | |
| print("\n--- Ping Results ---") | |
| print(result) | |
| elif choice == '2': | |
| target = self.get_input("Host to trace") | |
| result = self.router.traceroute(target) | |
| print("\n--- Traceroute Results ---") | |
| print(result) | |
| elif choice == '3': | |
| hostname = self.get_input("Hostname to lookup") | |
| result = self.router.nslookup(hostname) | |
| print("\n--- DNS Lookup Results ---") | |
| print(result) | |
| elif choice == '4': | |
| result = self.router.run_self_diagnose() | |
| self.print_table(result, "Self-Diagnostics Results") | |
| elif choice == '5': | |
| print(" Bands: 2.4g, 5g, both") | |
| band = self.get_input("Band to scan", "both") | |
| result = self.router.scan_access_points(band) | |
| if isinstance(result, list): | |
| self.print_list(result, "Nearby Access Points") | |
| else: | |
| self.print_table(result, "Nearby Access Points") | |
| input("\n Press Enter to continue...") | |
| def run(self): | |
| """Main loop""" | |
| # Setup signal handler for Ctrl+C | |
| def signal_handler(sig, frame): | |
| print("\n\n Exiting... Goodbye!") | |
| if self.router and self.router.logged_in: | |
| self.router.logout() | |
| sys.exit(0) | |
| signal.signal(signal.SIGINT, signal_handler) | |
| # Get connection details | |
| self.print_header("GPON Router Configuration Manager") | |
| print("\n Connection Setup") | |
| print(" ─────────────────────────────────") | |
| host = self.get_input("Router IP", "192.168.3.1") | |
| username = self.get_input("Username", "admin") | |
| password = self.get_input("Password", "stdONU101") | |
| # Initialize router | |
| self.router = GPONManager(host, username, password) | |
| # Login | |
| print("\n Connecting...") | |
| if not self.router.login(): | |
| self.print_error("Failed to login! Please check credentials.") | |
| return | |
| self.print_success(f"Connected to {host}") | |
| # Main menu loop | |
| while self.running: | |
| try: | |
| self.main_menu() | |
| choice = self.get_input("Select option") | |
| if choice == '0': | |
| break | |
| elif choice == '1': | |
| self.status_menu() | |
| elif choice == '2': | |
| self.wan_menu() | |
| elif choice == '3': | |
| self.lan_menu() | |
| elif choice == '4': | |
| self.wifi_menu() | |
| elif choice == '5': | |
| self.security_menu() | |
| elif choice == '6': | |
| self.nat_menu() | |
| elif choice == '7': | |
| self.applications_menu() | |
| elif choice == '8': | |
| self.device_menu() | |
| elif choice == '9': | |
| self.diagnostics_menu() | |
| else: | |
| self.print_error("Invalid option") | |
| except Exception as e: | |
| self.print_error(f"Error: {str(e)}") | |
| input("\n Press Enter to continue...") | |
| # Logout | |
| print("\n Disconnecting...") | |
| self.router.logout() | |
| print("\n Goodbye!\n") | |
| def main(): | |
| """Main entry point""" | |
| cli = InteractiveCLI() | |
| cli.run() | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment