Skip to content

Instantly share code, notes, and snippets.

@trick77
Last active May 30, 2025 14:16
Show Gist options
  • Select an option

  • Save trick77/79166d1a9018742173db237c1deb98b9 to your computer and use it in GitHub Desktop.

Select an option

Save trick77/79166d1a9018742173db237c1deb98b9 to your computer and use it in GitHub Desktop.
Quick & dirty script for OPNsense to export dynamic IPv4/IPv6 leases from KEA to Unbound so it can be resolved from AdGuard.
#!/usr/bin/env python3
import json
import requests
import os
from time import time
from datetime import datetime
import subprocess
import ipaddress
# Configuration
KEA_CONTROL_AGENT_URL = "http://127.0.0.1:8000"
DHCPLEASES_CONF = "/var/unbound/dhcpleases.conf"
UNBOUND_CONTROL = "/usr/local/sbin/unbound-control"
def get_kea_leases():
"""Fetch active DHCPv4 and DHCPv6 leases from Kea Control Agent."""
leases = []
for command, service in [("lease4-get-all", "dhcp4"), ("lease6-get-all", "dhcp6")]:
payload = {"command": command, "service": [service]}
try:
response = requests.post(KEA_CONTROL_AGENT_URL, json=payload, timeout=5)
response.raise_for_status()
data = response.json()
if not isinstance(data, list) or not data:
print(f"Error: {service} response is not a list or is empty")
continue
if data[0].get("result") != 0:
print(f"Error fetching {service} leases: {data[0].get('text', 'Unknown error')}")
continue
arguments = data[0].get("arguments", {})
if "leases" not in arguments:
print(f"Error: No 'leases' key in {service} response arguments: {arguments}")
continue
leases.extend(arguments["leases"])
print(f"Parsed {len(arguments['leases'])} leases from {service}")
except requests.RequestException as e:
print(f"Failed to connect to Kea for {service}: {e}")
except json.JSONDecodeError as e:
print(f"JSON decode error for {service}: {e}")
return leases
def generate_dhcpleases_conf(leases):
"""Generate dhcpleases.conf entries for Unbound (IPv4 and IPv6)."""
entries = []
current_time = int(time())
for lease in leases:
hostname = lease.get("hostname")
ip_addr = lease.get("ip-address")
expiry = lease.get("expire", lease.get("cltt", 0) + lease.get("valid-lft", 0))
print(f"Lease: IP={ip_addr}, Hostname={hostname}, Expires={expiry}, Current={current_time}")
if not hostname or not ip_addr or lease.get("valid-lft", 0) == 0:
print(f"Skipping: Invalid hostname or IP")
continue
if expiry < current_time:
print(f"Skipping: Lease expired")
continue
hostname = hostname.strip().rstrip(".").replace(" ", "_").lower()
if not hostname:
print(f"Skipping: Invalid hostname after cleaning")
continue
if ":" in ip_addr: # IPv6
# Expand IPv6 to full 128-bit format and generate reverse PTR
try:
ip = ipaddress.IPv6Address(ip_addr)
# Split into nibbles, reverse, and join with dots
nibbles = list(ip.exploded.replace(":", ""))
reverse_ip = ".".join(nibbles[::-1]) + ".ip6.arpa"
entries.append(f'local-data: "{reverse_ip} IN PTR {hostname}"')
except ipaddress.AddressValueError as e:
print(f"Skipping IPv6 PTR for {ip_addr}: Invalid address ({e})")
else: # IPv4
reverse_ip = ".".join(reversed(ip_addr.split("."))) + ".in-addr.arpa"
entries.append(f'local-data: "{reverse_ip} IN PTR {hostname}"')
return entries
def write_dhcpleases_conf(entries):
"""Write entries to dhcpleases.conf and ensure proper permissions."""
try:
with open(DHCPLEASES_CONF, "w") as f:
f.write("# Generated by kea-exporter at {}\n".format(datetime.now()))
for entry in entries:
f.write(entry + "\n")
os.chmod(DHCPLEASES_CONF, 0o644)
os.chown(DHCPLEASES_CONF, 953, 953)
except OSError as e:
print(f"Error writing {DHCPLEASES_CONF}: {e}")
return False
return True
def reload_unbound():
"""Signal Unbound to reload local zone data."""
try:
result = subprocess.run([UNBOUND_CONTROL, "-c", "/var/unbound/unbound.conf", "reload_keep_cache"], capture_output=True, text=True)
if result.returncode == 0:
print("Unbound reloaded successfully.")
else:
print(f"Error reloading Unbound: {result.stderr}")
return False
except OSError as e:
print(f"Error executing unbound-control: {e}")
return False
return True
def main():
print("Syncing Kea DHCP leases to Unbound...")
leases = get_kea_leases()
if not leases:
print("No leases found or error occurred.")
return
entries = generate_dhcpleases_conf(leases)
if not entries:
print("No valid lease entries to write.")
return
if write_dhcpleases_conf(entries):
if reload_unbound():
print(f"Wrote {len(entries)} entries to {DHCPLEASES_CONF}")
else:
print(f"Wrote {len(entries)} entries to {DHCPLEASES_CONF}, but Unbound reload failed.")
else:
print("Failed to update dhcpleases.conf")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment