Last active
May 30, 2025 14:16
-
-
Save trick77/79166d1a9018742173db237c1deb98b9 to your computer and use it in GitHub Desktop.
Quick & dirty script for OPNsense to export dynamic IPv4/IPv6 leases from KEA to Unbound so it can be resolved from AdGuard.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import json | |
| import requests | |
| import os | |
| from time import time | |
| from datetime import datetime | |
| import subprocess | |
| import ipaddress | |
| # Configuration | |
| KEA_CONTROL_AGENT_URL = "http://127.0.0.1:8000" | |
| DHCPLEASES_CONF = "/var/unbound/dhcpleases.conf" | |
| UNBOUND_CONTROL = "/usr/local/sbin/unbound-control" | |
| def get_kea_leases(): | |
| """Fetch active DHCPv4 and DHCPv6 leases from Kea Control Agent.""" | |
| leases = [] | |
| for command, service in [("lease4-get-all", "dhcp4"), ("lease6-get-all", "dhcp6")]: | |
| payload = {"command": command, "service": [service]} | |
| try: | |
| response = requests.post(KEA_CONTROL_AGENT_URL, json=payload, timeout=5) | |
| response.raise_for_status() | |
| data = response.json() | |
| if not isinstance(data, list) or not data: | |
| print(f"Error: {service} response is not a list or is empty") | |
| continue | |
| if data[0].get("result") != 0: | |
| print(f"Error fetching {service} leases: {data[0].get('text', 'Unknown error')}") | |
| continue | |
| arguments = data[0].get("arguments", {}) | |
| if "leases" not in arguments: | |
| print(f"Error: No 'leases' key in {service} response arguments: {arguments}") | |
| continue | |
| leases.extend(arguments["leases"]) | |
| print(f"Parsed {len(arguments['leases'])} leases from {service}") | |
| except requests.RequestException as e: | |
| print(f"Failed to connect to Kea for {service}: {e}") | |
| except json.JSONDecodeError as e: | |
| print(f"JSON decode error for {service}: {e}") | |
| return leases | |
| def generate_dhcpleases_conf(leases): | |
| """Generate dhcpleases.conf entries for Unbound (IPv4 and IPv6).""" | |
| entries = [] | |
| current_time = int(time()) | |
| for lease in leases: | |
| hostname = lease.get("hostname") | |
| ip_addr = lease.get("ip-address") | |
| expiry = lease.get("expire", lease.get("cltt", 0) + lease.get("valid-lft", 0)) | |
| print(f"Lease: IP={ip_addr}, Hostname={hostname}, Expires={expiry}, Current={current_time}") | |
| if not hostname or not ip_addr or lease.get("valid-lft", 0) == 0: | |
| print(f"Skipping: Invalid hostname or IP") | |
| continue | |
| if expiry < current_time: | |
| print(f"Skipping: Lease expired") | |
| continue | |
| hostname = hostname.strip().rstrip(".").replace(" ", "_").lower() | |
| if not hostname: | |
| print(f"Skipping: Invalid hostname after cleaning") | |
| continue | |
| if ":" in ip_addr: # IPv6 | |
| # Expand IPv6 to full 128-bit format and generate reverse PTR | |
| try: | |
| ip = ipaddress.IPv6Address(ip_addr) | |
| # Split into nibbles, reverse, and join with dots | |
| nibbles = list(ip.exploded.replace(":", "")) | |
| reverse_ip = ".".join(nibbles[::-1]) + ".ip6.arpa" | |
| entries.append(f'local-data: "{reverse_ip} IN PTR {hostname}"') | |
| except ipaddress.AddressValueError as e: | |
| print(f"Skipping IPv6 PTR for {ip_addr}: Invalid address ({e})") | |
| else: # IPv4 | |
| reverse_ip = ".".join(reversed(ip_addr.split("."))) + ".in-addr.arpa" | |
| entries.append(f'local-data: "{reverse_ip} IN PTR {hostname}"') | |
| return entries | |
| def write_dhcpleases_conf(entries): | |
| """Write entries to dhcpleases.conf and ensure proper permissions.""" | |
| try: | |
| with open(DHCPLEASES_CONF, "w") as f: | |
| f.write("# Generated by kea-exporter at {}\n".format(datetime.now())) | |
| for entry in entries: | |
| f.write(entry + "\n") | |
| os.chmod(DHCPLEASES_CONF, 0o644) | |
| os.chown(DHCPLEASES_CONF, 953, 953) | |
| except OSError as e: | |
| print(f"Error writing {DHCPLEASES_CONF}: {e}") | |
| return False | |
| return True | |
| def reload_unbound(): | |
| """Signal Unbound to reload local zone data.""" | |
| try: | |
| result = subprocess.run([UNBOUND_CONTROL, "-c", "/var/unbound/unbound.conf", "reload_keep_cache"], capture_output=True, text=True) | |
| if result.returncode == 0: | |
| print("Unbound reloaded successfully.") | |
| else: | |
| print(f"Error reloading Unbound: {result.stderr}") | |
| return False | |
| except OSError as e: | |
| print(f"Error executing unbound-control: {e}") | |
| return False | |
| return True | |
| def main(): | |
| print("Syncing Kea DHCP leases to Unbound...") | |
| leases = get_kea_leases() | |
| if not leases: | |
| print("No leases found or error occurred.") | |
| return | |
| entries = generate_dhcpleases_conf(leases) | |
| if not entries: | |
| print("No valid lease entries to write.") | |
| return | |
| if write_dhcpleases_conf(entries): | |
| if reload_unbound(): | |
| print(f"Wrote {len(entries)} entries to {DHCPLEASES_CONF}") | |
| else: | |
| print(f"Wrote {len(entries)} entries to {DHCPLEASES_CONF}, but Unbound reload failed.") | |
| else: | |
| print("Failed to update dhcpleases.conf") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment