Last active
September 10, 2025 20:26
-
-
Save TheLinuxGuy/a29c95b8f29381881de07a86cb684204 to your computer and use it in GitHub Desktop.
Firewalla DNS resolver issue debug
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| DNS Reliability Test Script | |
| Tests a specific DNS server by sending direct DNS queries and reports reliability issues. | |
| Usage: | |
| python dns-reliability-test.py | |
| Author: Generated to debug Firewalla router mode DNS server | |
| Date: September 2025 | |
| """ | |
| import socket | |
| import struct | |
| import time | |
| import random | |
| import sys | |
| from datetime import datetime | |
| class DNSTester: | |
| def __init__(self, dns_server, timeout=5, verbose=False): | |
| self.dns_server = dns_server | |
| self.timeout = timeout | |
| self.verbose = verbose | |
| def create_dns_query(self, hostname): | |
| """Create a DNS query packet for A record lookup""" | |
| # Generate random transaction ID | |
| transaction_id = random.randint(0, 65535) | |
| # DNS Header (12 bytes) | |
| # ID (2) + Flags (2) + QDCOUNT (2) + ANCOUNT (2) + NSCOUNT (2) + ARCOUNT (2) | |
| flags = 0x0100 # Standard query with recursion desired | |
| header = struct.pack('!HHHHHH', transaction_id, flags, 1, 0, 0, 0) | |
| # DNS Question Section | |
| question = b'' | |
| for part in hostname.split('.'): | |
| question += struct.pack('!B', len(part)) + part.encode() | |
| question += b'\x00' # End of hostname | |
| question += struct.pack('!HH', 1, 1) # Type A (1), Class IN (1) | |
| query_packet = header + question | |
| if self.verbose: | |
| print(f" 🔍 Created DNS query packet ({len(query_packet)} bytes)") | |
| print(f" 📦 Transaction ID: 0x{transaction_id:04x}") | |
| print(f" 📦 Query hex: {query_packet.hex()}") | |
| return query_packet, transaction_id | |
| def parse_dns_name(self, response, offset): | |
| """Parse a DNS name from the response, handling compression""" | |
| name_parts = [] | |
| original_offset = offset | |
| jumped = False | |
| while offset < len(response): | |
| length = response[offset] | |
| if length == 0: # End of name | |
| offset += 1 | |
| break | |
| elif length & 0xC0: # Compression pointer | |
| if not jumped: | |
| original_offset = offset + 2 # Save position after pointer | |
| pointer = struct.unpack('!H', response[offset:offset+2])[0] & 0x3FFF | |
| offset = pointer | |
| jumped = True | |
| else: # Regular label | |
| offset += 1 | |
| if offset + length > len(response): | |
| break | |
| name_parts.append(response[offset:offset+length].decode('ascii')) | |
| offset += length | |
| return '.'.join(name_parts), original_offset if jumped else offset | |
| def parse_dns_response(self, response, expected_id): | |
| """Parse DNS response packet""" | |
| if self.verbose: | |
| print(f" 📥 Raw response ({len(response)} bytes): {response.hex()}") | |
| if len(response) < 12: | |
| return False, "Response too short" | |
| # Parse header | |
| header = struct.unpack('!HHHHHH', response[:12]) | |
| transaction_id, flags, qdcount, ancount, nscount, arcount = header | |
| if self.verbose: | |
| print(f" 📋 DNS Header:") | |
| print(f" Transaction ID: 0x{transaction_id:04x} (expected: 0x{expected_id:04x})") | |
| print(f" Flags: 0x{flags:04x}") | |
| print(f" Questions: {qdcount}") | |
| print(f" Answers: {ancount}") | |
| print(f" Authority: {nscount}") | |
| print(f" Additional: {arcount}") | |
| if transaction_id != expected_id: | |
| return False, "Transaction ID mismatch" | |
| # Check response code (last 4 bits of flags) | |
| rcode = flags & 0x000F | |
| if rcode != 0: | |
| error_codes = { | |
| 1: "Format error", | |
| 2: "Server failure", | |
| 3: "Name error (NXDOMAIN)", | |
| 4: "Not implemented", | |
| 5: "Refused" | |
| } | |
| return False, f"DNS error: {error_codes.get(rcode, f'Unknown error {rcode}')}" | |
| if ancount == 0: | |
| return False, "No answer records" | |
| # Skip question section to get to answer | |
| offset = 12 | |
| if self.verbose: | |
| print(f" 🔍 Parsing question section starting at offset {offset}") | |
| # Skip question section properly | |
| for _ in range(qdcount): | |
| name, offset = self.parse_dns_name(response, offset) | |
| offset += 4 # Skip QTYPE and QCLASS | |
| if self.verbose: | |
| print(f" 🔍 Answer section starts at offset {offset}") | |
| # Parse all answer records to find A records | |
| cname_chain = [] | |
| a_records = [] | |
| for i in range(ancount): | |
| if offset >= len(response): | |
| break | |
| # Parse answer name | |
| answer_name, new_offset = self.parse_dns_name(response, offset) | |
| offset = new_offset | |
| if offset + 10 > len(response): | |
| return False, f"Malformed answer section - need 10 bytes at offset {offset}, but response is only {len(response)} bytes" | |
| # Parse answer RR: TYPE (2) + CLASS (2) + TTL (4) + RDLENGTH (2) | |
| rr_header = struct.unpack('!HHIH', response[offset:offset+10]) | |
| rr_type, rr_class, ttl, rdlength = rr_header | |
| offset += 10 | |
| if self.verbose: | |
| record_types = {1: 'A', 5: 'CNAME', 28: 'AAAA', 15: 'MX', 2: 'NS', 6: 'SOA'} | |
| print(f" 📋 Answer RR #{i+1}:") | |
| print(f" Name: {answer_name}") | |
| print(f" Type: {rr_type} ({record_types.get(rr_type, 'other')})") | |
| print(f" Class: {rr_class}") | |
| print(f" TTL: {ttl}") | |
| print(f" Data length: {rdlength}") | |
| if offset + rdlength > len(response): | |
| return False, f"Malformed RR data - need {rdlength} bytes at offset {offset}, but response is only {len(response)} bytes" | |
| if rr_type == 1 and rdlength == 4: # A record | |
| ip_bytes = response[offset:offset+4] | |
| ip_address = '.'.join(str(b) for b in ip_bytes) | |
| a_records.append((ip_address, ttl)) | |
| if self.verbose: | |
| print(f" A record: {ip_address}") | |
| elif rr_type == 5: # CNAME record | |
| cname_target, _ = self.parse_dns_name(response, offset) | |
| cname_chain.append((answer_name, cname_target, ttl)) | |
| if self.verbose: | |
| print(f" CNAME: {answer_name} -> {cname_target}") | |
| elif self.verbose: | |
| rdata_hex = response[offset:offset+rdlength].hex() | |
| print(f" Data: {rdata_hex}") | |
| offset += rdlength | |
| # Build result message | |
| result_parts = [] | |
| if cname_chain: | |
| cname_str = " -> ".join([f"{src}" for src, dst, ttl in cname_chain] + [cname_chain[-1][1]]) | |
| result_parts.append(f"CNAME: {cname_str}") | |
| if a_records: | |
| ip_list = [f"{ip} (TTL: {ttl}s)" for ip, ttl in a_records] | |
| result_parts.append(f"A: {', '.join(ip_list)}") | |
| return True, " | ".join(result_parts) | |
| elif cname_chain: | |
| # Got CNAME but no A record - this is actually successful resolution | |
| final_target = cname_chain[-1][1] | |
| return True, f"CNAME chain: {' -> '.join([src for src, dst, ttl in cname_chain] + [final_target])} (no A record in response)" | |
| else: | |
| return False, "No A or CNAME records found" | |
| def query_dns(self, hostname): | |
| """Perform a single DNS query""" | |
| try: | |
| # Create DNS query | |
| query_packet, transaction_id = self.create_dns_query(hostname) | |
| # Create UDP socket | |
| sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
| sock.settimeout(self.timeout) | |
| start_time = time.time() | |
| if self.verbose: | |
| print(f" 📡 Sending query to {self.dns_server}:53") | |
| # Send query | |
| sock.sendto(query_packet, (self.dns_server, 53)) | |
| # Receive response | |
| response, addr = sock.recvfrom(512) | |
| end_time = time.time() | |
| query_time = (end_time - start_time) * 1000 | |
| if self.verbose: | |
| print(f" 📡 Received response from {addr} in {query_time:.1f}ms") | |
| sock.close() | |
| # Parse response | |
| success, message = self.parse_dns_response(response, transaction_id) | |
| return { | |
| 'success': success, | |
| 'message': message, | |
| 'response_time': query_time, | |
| 'server_responded': True | |
| } | |
| except socket.timeout: | |
| return { | |
| 'success': False, | |
| 'message': "DNS query timed out", | |
| 'response_time': None, | |
| 'server_responded': False | |
| } | |
| except socket.error as e: | |
| return { | |
| 'success': False, | |
| 'message': f"Socket error: {e}", | |
| 'response_time': None, | |
| 'server_responded': False | |
| } | |
| except Exception as e: | |
| return { | |
| 'success': False, | |
| 'message': f"Unexpected error: {e}", | |
| 'response_time': None, | |
| 'server_responded': False | |
| } | |
| def test_dns_reliability(dns_server, hostname, num_queries=10, interval=2, verbose=False): | |
| """Test DNS server reliability with multiple queries""" | |
| tester = DNSTester(dns_server, verbose=verbose) | |
| results = [] | |
| print(f"🔍 DNS Reliability Test") | |
| print(f"📡 Target DNS Server: {dns_server}") | |
| print(f"🌐 Hostname to resolve: {hostname}") | |
| print(f"🔢 Number of queries: {num_queries}") | |
| print(f"⏱️ Interval between queries: {interval} seconds") | |
| print(f"⏰ Timeout per query: {tester.timeout} seconds") | |
| if verbose: | |
| print(f"🔍 Verbose mode: ENABLED") | |
| print("-" * 70) | |
| successful_queries = 0 | |
| total_response_time = 0 | |
| timeouts = 0 | |
| errors = 0 | |
| for i in range(1, num_queries + 1): | |
| timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| print(f"Query {i:2d} [{timestamp}]: ", end="", flush=True) | |
| if verbose: | |
| print() # New line for verbose output | |
| result = tester.query_dns(hostname) | |
| results.append({ | |
| 'query_num': i, | |
| 'timestamp': timestamp, | |
| **result | |
| }) | |
| if result['success']: | |
| successful_queries += 1 | |
| response_time = result['response_time'] | |
| total_response_time += response_time | |
| if verbose: | |
| print(f" ✅ SUCCESS - {hostname} → {result['message']} ({response_time:.1f}ms)") | |
| else: | |
| print(f"✅ SUCCESS - {hostname} → {result['message']} ({response_time:.1f}ms)") | |
| else: | |
| if not result['server_responded']: | |
| if "timeout" in result['message'].lower(): | |
| timeouts += 1 | |
| if verbose: | |
| print(f" ⏰ TIMEOUT - {hostname} - {result['message']}") | |
| else: | |
| print(f"⏰ TIMEOUT - {hostname} - {result['message']}") | |
| else: | |
| errors += 1 | |
| if verbose: | |
| print(f" ❌ ERROR - {hostname} - {result['message']}") | |
| else: | |
| print(f"❌ ERROR - {hostname} - {result['message']}") | |
| else: | |
| errors += 1 | |
| if verbose: | |
| print(f" ❌ FAILED - {hostname} - {result['message']}") | |
| else: | |
| print(f"❌ FAILED - {hostname} - {result['message']}") | |
| # Wait before next query (except for last query) | |
| if i < num_queries: | |
| time.sleep(interval) | |
| # Generate detailed report | |
| print("\n" + "=" * 70) | |
| print("📊 DNS RELIABILITY REPORT") | |
| print("=" * 70) | |
| success_rate = (successful_queries / num_queries) * 100 | |
| print(f"📈 Total queries performed: {num_queries}") | |
| print(f"✅ Successful queries: {successful_queries}") | |
| print(f"❌ Failed queries: {num_queries - successful_queries}") | |
| print(f"⏰ Timeouts: {timeouts}") | |
| print(f"🚫 Errors: {errors}") | |
| print(f"📊 Success rate: {success_rate:.1f}%") | |
| if successful_queries > 0: | |
| avg_response_time = total_response_time / successful_queries | |
| print(f"⚡ Average response time: {avg_response_time:.1f}ms") | |
| print() | |
| # Reliability assessment | |
| if success_rate >= 95: | |
| print("🟢 DNS SERVER STATUS: EXCELLENT") | |
| print(" Your DNS server is working reliably.") | |
| elif success_rate >= 80: | |
| print("🟡 DNS SERVER STATUS: GOOD") | |
| print(" Your DNS server has minor reliability issues.") | |
| elif success_rate >= 50: | |
| print("🟠 DNS SERVER STATUS: POOR") | |
| print(" Your DNS server has significant reliability problems.") | |
| else: | |
| print("🔴 DNS SERVER STATUS: CRITICAL") | |
| print(" Your DNS server is highly unreliable!") | |
| # Specific issues analysis | |
| if timeouts > 0: | |
| timeout_rate = (timeouts / num_queries) * 100 | |
| print(f"\n⚠️ TIMEOUT ISSUES: {timeout_rate:.1f}% of queries timed out") | |
| print(" This suggests network connectivity problems or DNS server overload.") | |
| if errors > 0: | |
| error_rate = (errors / num_queries) * 100 | |
| print(f"\n⚠️ ERROR ISSUES: {error_rate:.1f}% of queries had errors") | |
| print(" This suggests DNS server configuration or response problems.") | |
| # Recommendations | |
| if success_rate < 95: | |
| print("\n🔧 RECOMMENDATIONS:") | |
| if timeouts > errors: | |
| print(" • Check network connectivity to your router") | |
| print(" • Consider increasing DNS timeout settings") | |
| print(" • Router may be overloaded - try restarting it") | |
| else: | |
| print(" • Consider using alternative DNS servers:") | |
| print(" - Google DNS: 8.8.8.8, 8.8.4.4") | |
| print(" - Cloudflare DNS: 1.1.1.1, 1.0.0.1") | |
| print(" - Quad9 DNS: 9.9.9.9, 149.112.112.112") | |
| print(" • Update router firmware") | |
| print(" • Check router DNS settings") | |
| print(" • Restart your router/modem") | |
| return results | |
| def is_ip_address(address): | |
| """Check if a string is a valid IP address""" | |
| try: | |
| parts = address.split('.') | |
| if len(parts) != 4: | |
| return False | |
| for part in parts: | |
| if not 0 <= int(part) <= 255: | |
| return False | |
| return True | |
| except (ValueError, AttributeError): | |
| return False | |
| def get_system_dns(): | |
| """Get the system's default DNS server""" | |
| try: | |
| # Try to read from /etc/resolv.conf on Unix systems | |
| try: | |
| with open('/etc/resolv.conf', 'r') as f: | |
| for line in f: | |
| if line.startswith('nameserver'): | |
| dns = line.split()[1] | |
| if is_ip_address(dns): | |
| return dns | |
| except: | |
| pass | |
| # Fallback: use a temporary socket to determine default route | |
| import socket | |
| s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
| s.connect(("8.8.8.8", 80)) | |
| local_ip = s.getsockname()[0] | |
| s.close() | |
| # Guess router IP from local IP | |
| ip_parts = local_ip.split('.') | |
| router_ip = f"{ip_parts[0]}.{ip_parts[1]}.{ip_parts[2]}.1" | |
| return router_ip | |
| except: | |
| # Ultimate fallback | |
| return "192.168.1.1" | |
| def resolve_dns_provider(provider): | |
| """Resolve DNS provider shortcuts to IP addresses""" | |
| dns_providers = { | |
| 'google': '8.8.8.8', | |
| 'cloudflare': '1.1.1.1', | |
| 'quad9': '9.9.9.9', | |
| 'opendns': '208.67.222.222', | |
| 'level3': '4.2.2.1', | |
| 'comodo': '8.26.56.26' | |
| } | |
| # Check if it's a known provider | |
| provider_lower = provider.lower() | |
| if provider_lower in dns_providers: | |
| return dns_providers[provider_lower] | |
| # Check if it's already an IP address | |
| if is_ip_address(provider): | |
| return provider | |
| # Return as-is if not recognized (could be a hostname) | |
| return provider | |
| if __name__ == "__main__": | |
| # Handle command line arguments | |
| verbose = "--verbose" in sys.argv or "-v" in sys.argv | |
| # Parse arguments | |
| dns_server = None | |
| hostname = None | |
| i = 1 | |
| while i < len(sys.argv): | |
| arg = sys.argv[i] | |
| if arg == "--dns" and i + 1 < len(sys.argv): | |
| dns_server = resolve_dns_provider(sys.argv[i + 1]) | |
| i += 2 | |
| elif arg == "--hostname" and i + 1 < len(sys.argv): | |
| hostname = sys.argv[i + 1] | |
| i += 2 | |
| elif arg in ["--verbose", "-v"]: | |
| i += 1 | |
| elif not arg.startswith("--"): | |
| # Non-flag argument - determine if it's hostname or DNS server | |
| if is_ip_address(arg) or arg.lower() in ['google', 'cloudflare', 'quad9', 'opendns', 'level3', 'comodo']: | |
| # It's an IP address or DNS provider, treat as DNS server | |
| if dns_server is None: | |
| dns_server = resolve_dns_provider(arg) | |
| else: | |
| print(f"⚠️ Warning: Ignoring extra DNS server argument: {arg}") | |
| else: | |
| # It's not an IP or provider, treat as hostname | |
| if hostname is None: | |
| hostname = arg | |
| else: | |
| print(f"⚠️ Warning: Ignoring extra hostname argument: {arg}") | |
| i += 1 | |
| else: | |
| i += 1 | |
| # Set defaults | |
| if dns_server is None: | |
| dns_server = get_system_dns() # Use system DNS instead of hardcoded router IP | |
| if hostname is None: | |
| hostname = "service.tesla.com" # Default hostname | |
| print("🚀 Starting Enhanced DNS Reliability Test") | |
| # Show what we're testing | |
| if hostname != "service.tesla.com": | |
| print(f"🎯 Testing hostname: {hostname}") | |
| else: | |
| print(f"🎯 Testing Tesla service connectivity") | |
| # Show DNS server info | |
| system_dns = get_system_dns() | |
| if dns_server == system_dns: | |
| print(f"🌐 Using system DNS server: {dns_server}") | |
| else: | |
| print(f"🔧 Using custom DNS server: {dns_server}") | |
| print() | |
| try: | |
| # Run the test | |
| results = test_dns_reliability( | |
| dns_server=dns_server, | |
| hostname=hostname, | |
| num_queries=15, | |
| interval=2, | |
| verbose=verbose | |
| ) | |
| print(f"\n📁 Test completed at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") | |
| # Additional suggestions based on results | |
| failed_queries = sum(1 for r in results if not r['success']) | |
| if failed_queries > 0: | |
| print(f"\n💡 NEXT STEPS:") | |
| print(f" • Test with Google DNS: python3 {sys.argv[0]} --dns google {hostname}") | |
| print(f" • Test with Cloudflare DNS: python3 {sys.argv[0]} --dns cloudflare {hostname}") | |
| print(f" • Test different hostname: python3 {sys.argv[0]} google.com") | |
| print(f" • Run with verbose output: python3 {sys.argv[0]} --verbose {hostname}") | |
| if dns_server.startswith("192.168.") or dns_server.startswith("10.") or dns_server.startswith("172."): | |
| print(f" • Check router admin panel (likely at http://{system_dns})") | |
| print(f" • Verify router is using upstream DNS servers") | |
| print(f" • Consider factory reset if problems persist") | |
| print(f"\n📋 Available DNS providers:") | |
| print(f" • google (8.8.8.8) • cloudflare (1.1.1.1) • quad9 (9.9.9.9)") | |
| print(f" • opendns (208.67.222.222) • level3 (4.2.2.1) • comodo (8.26.56.26)") | |
| except KeyboardInterrupt: | |
| print(f"\n\n⏹️ Test interrupted by user") | |
| sys.exit(1) | |
| except Exception as e: | |
| print(f"\n❌ Script error: {e}") | |
| sys.exit(1) | |
| def main(): | |
| """Main function - this is now unused but kept for compatibility""" | |
| pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment