Skip to content

Instantly share code, notes, and snippets.

@TheLinuxGuy
Last active September 10, 2025 20:26
Show Gist options
  • Select an option

  • Save TheLinuxGuy/a29c95b8f29381881de07a86cb684204 to your computer and use it in GitHub Desktop.

Select an option

Save TheLinuxGuy/a29c95b8f29381881de07a86cb684204 to your computer and use it in GitHub Desktop.
Firewalla DNS resolver issue debug
#!/usr/bin/env python3
"""
DNS Reliability Test Script
Tests a specific DNS server by sending direct DNS queries and reports reliability issues.
Usage:
python dns-reliability-test.py
Author: Generated to debug Firewalla router mode DNS server
Date: September 2025
"""
import socket
import struct
import time
import random
import sys
from datetime import datetime
class DNSTester:
def __init__(self, dns_server, timeout=5, verbose=False):
self.dns_server = dns_server
self.timeout = timeout
self.verbose = verbose
def create_dns_query(self, hostname):
"""Create a DNS query packet for A record lookup"""
# Generate random transaction ID
transaction_id = random.randint(0, 65535)
# DNS Header (12 bytes)
# ID (2) + Flags (2) + QDCOUNT (2) + ANCOUNT (2) + NSCOUNT (2) + ARCOUNT (2)
flags = 0x0100 # Standard query with recursion desired
header = struct.pack('!HHHHHH', transaction_id, flags, 1, 0, 0, 0)
# DNS Question Section
question = b''
for part in hostname.split('.'):
question += struct.pack('!B', len(part)) + part.encode()
question += b'\x00' # End of hostname
question += struct.pack('!HH', 1, 1) # Type A (1), Class IN (1)
query_packet = header + question
if self.verbose:
print(f" 🔍 Created DNS query packet ({len(query_packet)} bytes)")
print(f" 📦 Transaction ID: 0x{transaction_id:04x}")
print(f" 📦 Query hex: {query_packet.hex()}")
return query_packet, transaction_id
def parse_dns_name(self, response, offset):
"""Parse a DNS name from the response, handling compression"""
name_parts = []
original_offset = offset
jumped = False
while offset < len(response):
length = response[offset]
if length == 0: # End of name
offset += 1
break
elif length & 0xC0: # Compression pointer
if not jumped:
original_offset = offset + 2 # Save position after pointer
pointer = struct.unpack('!H', response[offset:offset+2])[0] & 0x3FFF
offset = pointer
jumped = True
else: # Regular label
offset += 1
if offset + length > len(response):
break
name_parts.append(response[offset:offset+length].decode('ascii'))
offset += length
return '.'.join(name_parts), original_offset if jumped else offset
def parse_dns_response(self, response, expected_id):
"""Parse DNS response packet"""
if self.verbose:
print(f" 📥 Raw response ({len(response)} bytes): {response.hex()}")
if len(response) < 12:
return False, "Response too short"
# Parse header
header = struct.unpack('!HHHHHH', response[:12])
transaction_id, flags, qdcount, ancount, nscount, arcount = header
if self.verbose:
print(f" 📋 DNS Header:")
print(f" Transaction ID: 0x{transaction_id:04x} (expected: 0x{expected_id:04x})")
print(f" Flags: 0x{flags:04x}")
print(f" Questions: {qdcount}")
print(f" Answers: {ancount}")
print(f" Authority: {nscount}")
print(f" Additional: {arcount}")
if transaction_id != expected_id:
return False, "Transaction ID mismatch"
# Check response code (last 4 bits of flags)
rcode = flags & 0x000F
if rcode != 0:
error_codes = {
1: "Format error",
2: "Server failure",
3: "Name error (NXDOMAIN)",
4: "Not implemented",
5: "Refused"
}
return False, f"DNS error: {error_codes.get(rcode, f'Unknown error {rcode}')}"
if ancount == 0:
return False, "No answer records"
# Skip question section to get to answer
offset = 12
if self.verbose:
print(f" 🔍 Parsing question section starting at offset {offset}")
# Skip question section properly
for _ in range(qdcount):
name, offset = self.parse_dns_name(response, offset)
offset += 4 # Skip QTYPE and QCLASS
if self.verbose:
print(f" 🔍 Answer section starts at offset {offset}")
# Parse all answer records to find A records
cname_chain = []
a_records = []
for i in range(ancount):
if offset >= len(response):
break
# Parse answer name
answer_name, new_offset = self.parse_dns_name(response, offset)
offset = new_offset
if offset + 10 > len(response):
return False, f"Malformed answer section - need 10 bytes at offset {offset}, but response is only {len(response)} bytes"
# Parse answer RR: TYPE (2) + CLASS (2) + TTL (4) + RDLENGTH (2)
rr_header = struct.unpack('!HHIH', response[offset:offset+10])
rr_type, rr_class, ttl, rdlength = rr_header
offset += 10
if self.verbose:
record_types = {1: 'A', 5: 'CNAME', 28: 'AAAA', 15: 'MX', 2: 'NS', 6: 'SOA'}
print(f" 📋 Answer RR #{i+1}:")
print(f" Name: {answer_name}")
print(f" Type: {rr_type} ({record_types.get(rr_type, 'other')})")
print(f" Class: {rr_class}")
print(f" TTL: {ttl}")
print(f" Data length: {rdlength}")
if offset + rdlength > len(response):
return False, f"Malformed RR data - need {rdlength} bytes at offset {offset}, but response is only {len(response)} bytes"
if rr_type == 1 and rdlength == 4: # A record
ip_bytes = response[offset:offset+4]
ip_address = '.'.join(str(b) for b in ip_bytes)
a_records.append((ip_address, ttl))
if self.verbose:
print(f" A record: {ip_address}")
elif rr_type == 5: # CNAME record
cname_target, _ = self.parse_dns_name(response, offset)
cname_chain.append((answer_name, cname_target, ttl))
if self.verbose:
print(f" CNAME: {answer_name} -> {cname_target}")
elif self.verbose:
rdata_hex = response[offset:offset+rdlength].hex()
print(f" Data: {rdata_hex}")
offset += rdlength
# Build result message
result_parts = []
if cname_chain:
cname_str = " -> ".join([f"{src}" for src, dst, ttl in cname_chain] + [cname_chain[-1][1]])
result_parts.append(f"CNAME: {cname_str}")
if a_records:
ip_list = [f"{ip} (TTL: {ttl}s)" for ip, ttl in a_records]
result_parts.append(f"A: {', '.join(ip_list)}")
return True, " | ".join(result_parts)
elif cname_chain:
# Got CNAME but no A record - this is actually successful resolution
final_target = cname_chain[-1][1]
return True, f"CNAME chain: {' -> '.join([src for src, dst, ttl in cname_chain] + [final_target])} (no A record in response)"
else:
return False, "No A or CNAME records found"
def query_dns(self, hostname):
"""Perform a single DNS query"""
try:
# Create DNS query
query_packet, transaction_id = self.create_dns_query(hostname)
# Create UDP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(self.timeout)
start_time = time.time()
if self.verbose:
print(f" 📡 Sending query to {self.dns_server}:53")
# Send query
sock.sendto(query_packet, (self.dns_server, 53))
# Receive response
response, addr = sock.recvfrom(512)
end_time = time.time()
query_time = (end_time - start_time) * 1000
if self.verbose:
print(f" 📡 Received response from {addr} in {query_time:.1f}ms")
sock.close()
# Parse response
success, message = self.parse_dns_response(response, transaction_id)
return {
'success': success,
'message': message,
'response_time': query_time,
'server_responded': True
}
except socket.timeout:
return {
'success': False,
'message': "DNS query timed out",
'response_time': None,
'server_responded': False
}
except socket.error as e:
return {
'success': False,
'message': f"Socket error: {e}",
'response_time': None,
'server_responded': False
}
except Exception as e:
return {
'success': False,
'message': f"Unexpected error: {e}",
'response_time': None,
'server_responded': False
}
def test_dns_reliability(dns_server, hostname, num_queries=10, interval=2, verbose=False):
"""Test DNS server reliability with multiple queries"""
tester = DNSTester(dns_server, verbose=verbose)
results = []
print(f"🔍 DNS Reliability Test")
print(f"📡 Target DNS Server: {dns_server}")
print(f"🌐 Hostname to resolve: {hostname}")
print(f"🔢 Number of queries: {num_queries}")
print(f"⏱️ Interval between queries: {interval} seconds")
print(f"⏰ Timeout per query: {tester.timeout} seconds")
if verbose:
print(f"🔍 Verbose mode: ENABLED")
print("-" * 70)
successful_queries = 0
total_response_time = 0
timeouts = 0
errors = 0
for i in range(1, num_queries + 1):
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"Query {i:2d} [{timestamp}]: ", end="", flush=True)
if verbose:
print() # New line for verbose output
result = tester.query_dns(hostname)
results.append({
'query_num': i,
'timestamp': timestamp,
**result
})
if result['success']:
successful_queries += 1
response_time = result['response_time']
total_response_time += response_time
if verbose:
print(f" ✅ SUCCESS - {hostname} → {result['message']} ({response_time:.1f}ms)")
else:
print(f"✅ SUCCESS - {hostname} → {result['message']} ({response_time:.1f}ms)")
else:
if not result['server_responded']:
if "timeout" in result['message'].lower():
timeouts += 1
if verbose:
print(f" ⏰ TIMEOUT - {hostname} - {result['message']}")
else:
print(f"⏰ TIMEOUT - {hostname} - {result['message']}")
else:
errors += 1
if verbose:
print(f" ❌ ERROR - {hostname} - {result['message']}")
else:
print(f"❌ ERROR - {hostname} - {result['message']}")
else:
errors += 1
if verbose:
print(f" ❌ FAILED - {hostname} - {result['message']}")
else:
print(f"❌ FAILED - {hostname} - {result['message']}")
# Wait before next query (except for last query)
if i < num_queries:
time.sleep(interval)
# Generate detailed report
print("\n" + "=" * 70)
print("📊 DNS RELIABILITY REPORT")
print("=" * 70)
success_rate = (successful_queries / num_queries) * 100
print(f"📈 Total queries performed: {num_queries}")
print(f"✅ Successful queries: {successful_queries}")
print(f"❌ Failed queries: {num_queries - successful_queries}")
print(f"⏰ Timeouts: {timeouts}")
print(f"🚫 Errors: {errors}")
print(f"📊 Success rate: {success_rate:.1f}%")
if successful_queries > 0:
avg_response_time = total_response_time / successful_queries
print(f"⚡ Average response time: {avg_response_time:.1f}ms")
print()
# Reliability assessment
if success_rate >= 95:
print("🟢 DNS SERVER STATUS: EXCELLENT")
print(" Your DNS server is working reliably.")
elif success_rate >= 80:
print("🟡 DNS SERVER STATUS: GOOD")
print(" Your DNS server has minor reliability issues.")
elif success_rate >= 50:
print("🟠 DNS SERVER STATUS: POOR")
print(" Your DNS server has significant reliability problems.")
else:
print("🔴 DNS SERVER STATUS: CRITICAL")
print(" Your DNS server is highly unreliable!")
# Specific issues analysis
if timeouts > 0:
timeout_rate = (timeouts / num_queries) * 100
print(f"\n⚠️ TIMEOUT ISSUES: {timeout_rate:.1f}% of queries timed out")
print(" This suggests network connectivity problems or DNS server overload.")
if errors > 0:
error_rate = (errors / num_queries) * 100
print(f"\n⚠️ ERROR ISSUES: {error_rate:.1f}% of queries had errors")
print(" This suggests DNS server configuration or response problems.")
# Recommendations
if success_rate < 95:
print("\n🔧 RECOMMENDATIONS:")
if timeouts > errors:
print(" • Check network connectivity to your router")
print(" • Consider increasing DNS timeout settings")
print(" • Router may be overloaded - try restarting it")
else:
print(" • Consider using alternative DNS servers:")
print(" - Google DNS: 8.8.8.8, 8.8.4.4")
print(" - Cloudflare DNS: 1.1.1.1, 1.0.0.1")
print(" - Quad9 DNS: 9.9.9.9, 149.112.112.112")
print(" • Update router firmware")
print(" • Check router DNS settings")
print(" • Restart your router/modem")
return results
def is_ip_address(address):
"""Check if a string is a valid IP address"""
try:
parts = address.split('.')
if len(parts) != 4:
return False
for part in parts:
if not 0 <= int(part) <= 255:
return False
return True
except (ValueError, AttributeError):
return False
def get_system_dns():
"""Get the system's default DNS server"""
try:
# Try to read from /etc/resolv.conf on Unix systems
try:
with open('/etc/resolv.conf', 'r') as f:
for line in f:
if line.startswith('nameserver'):
dns = line.split()[1]
if is_ip_address(dns):
return dns
except:
pass
# Fallback: use a temporary socket to determine default route
import socket
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
local_ip = s.getsockname()[0]
s.close()
# Guess router IP from local IP
ip_parts = local_ip.split('.')
router_ip = f"{ip_parts[0]}.{ip_parts[1]}.{ip_parts[2]}.1"
return router_ip
except:
# Ultimate fallback
return "192.168.1.1"
def resolve_dns_provider(provider):
"""Resolve DNS provider shortcuts to IP addresses"""
dns_providers = {
'google': '8.8.8.8',
'cloudflare': '1.1.1.1',
'quad9': '9.9.9.9',
'opendns': '208.67.222.222',
'level3': '4.2.2.1',
'comodo': '8.26.56.26'
}
# Check if it's a known provider
provider_lower = provider.lower()
if provider_lower in dns_providers:
return dns_providers[provider_lower]
# Check if it's already an IP address
if is_ip_address(provider):
return provider
# Return as-is if not recognized (could be a hostname)
return provider
if __name__ == "__main__":
# Handle command line arguments
verbose = "--verbose" in sys.argv or "-v" in sys.argv
# Parse arguments
dns_server = None
hostname = None
i = 1
while i < len(sys.argv):
arg = sys.argv[i]
if arg == "--dns" and i + 1 < len(sys.argv):
dns_server = resolve_dns_provider(sys.argv[i + 1])
i += 2
elif arg == "--hostname" and i + 1 < len(sys.argv):
hostname = sys.argv[i + 1]
i += 2
elif arg in ["--verbose", "-v"]:
i += 1
elif not arg.startswith("--"):
# Non-flag argument - determine if it's hostname or DNS server
if is_ip_address(arg) or arg.lower() in ['google', 'cloudflare', 'quad9', 'opendns', 'level3', 'comodo']:
# It's an IP address or DNS provider, treat as DNS server
if dns_server is None:
dns_server = resolve_dns_provider(arg)
else:
print(f"⚠️ Warning: Ignoring extra DNS server argument: {arg}")
else:
# It's not an IP or provider, treat as hostname
if hostname is None:
hostname = arg
else:
print(f"⚠️ Warning: Ignoring extra hostname argument: {arg}")
i += 1
else:
i += 1
# Set defaults
if dns_server is None:
dns_server = get_system_dns() # Use system DNS instead of hardcoded router IP
if hostname is None:
hostname = "service.tesla.com" # Default hostname
print("🚀 Starting Enhanced DNS Reliability Test")
# Show what we're testing
if hostname != "service.tesla.com":
print(f"🎯 Testing hostname: {hostname}")
else:
print(f"🎯 Testing Tesla service connectivity")
# Show DNS server info
system_dns = get_system_dns()
if dns_server == system_dns:
print(f"🌐 Using system DNS server: {dns_server}")
else:
print(f"🔧 Using custom DNS server: {dns_server}")
print()
try:
# Run the test
results = test_dns_reliability(
dns_server=dns_server,
hostname=hostname,
num_queries=15,
interval=2,
verbose=verbose
)
print(f"\n📁 Test completed at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
# Additional suggestions based on results
failed_queries = sum(1 for r in results if not r['success'])
if failed_queries > 0:
print(f"\n💡 NEXT STEPS:")
print(f" • Test with Google DNS: python3 {sys.argv[0]} --dns google {hostname}")
print(f" • Test with Cloudflare DNS: python3 {sys.argv[0]} --dns cloudflare {hostname}")
print(f" • Test different hostname: python3 {sys.argv[0]} google.com")
print(f" • Run with verbose output: python3 {sys.argv[0]} --verbose {hostname}")
if dns_server.startswith("192.168.") or dns_server.startswith("10.") or dns_server.startswith("172."):
print(f" • Check router admin panel (likely at http://{system_dns})")
print(f" • Verify router is using upstream DNS servers")
print(f" • Consider factory reset if problems persist")
print(f"\n📋 Available DNS providers:")
print(f" • google (8.8.8.8) • cloudflare (1.1.1.1) • quad9 (9.9.9.9)")
print(f" • opendns (208.67.222.222) • level3 (4.2.2.1) • comodo (8.26.56.26)")
except KeyboardInterrupt:
print(f"\n\n⏹️ Test interrupted by user")
sys.exit(1)
except Exception as e:
print(f"\n❌ Script error: {e}")
sys.exit(1)
def main():
"""Main function - this is now unused but kept for compatibility"""
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment