Skip to content

Instantly share code, notes, and snippets.

@ntwrkguru
Created August 11, 2025 17:34
Show Gist options
  • Select an option

  • Save ntwrkguru/473abd84d7d035505a614e053f24737a to your computer and use it in GitHub Desktop.

Select an option

Save ntwrkguru/473abd84d7d035505a614e053f24737a to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
'''
Firewall Management Module
This module provides a set of classes and functions to interact with and manage
a Palo Alto Next-Generation Firewall (NGFW). It includes capabilities to
retrieve firewall configurations, manage network interfaces, handle ARP tables,
retrieve SNMP versions, timezones, NTP servers, DNS servers, and configured
administrators. Additionally, it supports disk space management, content updates,
and antivirus updates.
Dependencies:
- requests: For making HTTP requests to the firewall API
- xmltodict: For parsing XML responses from the firewall
- urllib3: To disable SSL warnings
- netmiko: For SSH management and command execution
- key: Custom module to retrieve API keys for authentication
Classes:
- Firewall: A class to model and interact with a Palo Alto NGFW
Functions:
- get_arp: Retrieve the current ARP table
- get_interfaces: Get an operational view of network interfaces
- get_config: Retrieve the firewall configuration
- get_snmp_version: Get the configured SNMP version
- get_timezone: Get the configured timezone
- get_ntp_servers: Retrieve a list of NTP servers
- get_dns_servers: Retrieve a list of DNS servers
- get_zones: Retrieve a list of configured zones
- get_admin_users: Retrieve a list of configured administrators
- get_diskspace: Get an operational view of disk space
- high_disk_usage: Check if disk usage exceeds a given threshold
- cleanup_disk_space: Perform cleanup to free disk space
- check_content_updates: Check for available content updates
- install_content_updates: Install the latest content updates
- check_av_updates: Check for available antivirus updates
'''
# PIP Imports
import logging
import requests
import xmltodict
from urllib3 import disable_warnings
from netmiko import ConnectHandler
from netmiko.exceptions import NetMikoTimeoutException
from netmiko.exceptions import NetMikoAuthenticationException
# Local Imports
from paloalto.key import ApiKey
# Disable SSL warnings
disable_warnings()
logging.basicConfig(level=logging.INFO, handlers=[logging.StreamHandler()])
logger = logging.getLogger(__name__)
class Firewall:
"""
A class to build a model of a Palo Alto Next-Generation Firewall (NGFW).
This class provides methods to interact with and manage a Palo Alto NGFW by
retrieving and modifying various configurations, operational data, and system
information.
Attributes:
hostname (str): The hostname or IP address of the firewall.
username (str): The username for authenticating with the firewall.
password (str): The password for authenticating with the firewall.
api_key (str): The API key for making requests to the firewall.
config (dict): The current configuration of the firewall.
snmp_version (str): The configured SNMP version.
timezone (str): The configured timezone.
ntp_servers (list): A list of configured NTP servers.
zones (list): A list of configured zones.
dns_servers (list): A list of configured DNS servers.
admins (list): A list of configured administrator users.
Methods:
get_arp() -> dict: Retrieves the current ARP table.
get_interfaces() -> dict: Retrieves an operational view of the network interfaces.
get_config() -> dict: Retrieves the configuration of the firewall.
get_snmp_version() -> str: Retrieves the configured SNMP version.
get_timezone() -> str: Retrieves the configured timezone.
get_ntp_servers() -> list: Retrieves a list of NTP servers.
get_dns_servers() -> list: Retrieves a list of DNS servers.
get_zones() -> list: Retrieves a list of configured zones.
get_admin_users() -> list: Retrieves a list of administrator users.
get_diskspace() -> str: Retrieves an operational view of the disk space.
high_disk_usage(threshold: int) -> bool: Checks if disk usage exceeds a given threshold.
cleanup_disk_space() -> str: Performs cleanup to free up disk space.
check_content_updates() -> str: Checks for available content updates.
install_content_updates() -> str: Installs the latest content updates.
check_av_updates() -> str: Checks for available antivirus updates.
"""
def __init__(self, hostname, username, password):
self.hostname = hostname
self.username = username
self.password = password
self.api_key = ApiKey(self.hostname, self.username, self.password).api_key
self.config = self.get_config()
self.snmp_version = self.get_snmp_version()
self.timezone = self.get_timezone()
self.ntp_servers = self.get_ntp_servers()
self.zones = self.get_zones()
self.dns_servers = self.get_dns_servers()
self.admins = self.get_admin_users()
def _api_call(self, cmd: str) -> dict:
"""
Sends an API request to the Palo Alto NGFW and returns the parsed XML response.
This helper method constructs an API request to the firewall using the provided XML command,
sends the request, and parses the XML response into a dictionary. It handles any request
exceptions by logging an error message and returning an empty dictionary.
Parameters:
cmd (str): The XML command to send to the firewall.
Returns:
dict: The parsed XML response as a dictionary. If the request fails, an empty dictionary is returned.
Raises:
requests.exceptions.RequestException: If there is an issue with the HTTP request.
Logs:
An error message if the request fails.
"""
endpoint = f"https://{self.hostname}/api"
params = {"type": "op", "cmd": cmd, "key": self.api_key}
try:
r = requests.get(endpoint, params=params, verify=False, timeout=15)
r.raise_for_status()
return xmltodict.parse(r.content)
except requests.exceptions.RequestException as e:
logger.error("Request failed: %s", e)
return {}
def get_config(self) -> dict:
"""
Retrieves the configuration of a Panos firewall as a dictionary object.
This method constructs an XML command to fetch the effective running configuration
of the firewall. It uses the `_api_call` helper method to send the request and parse
the response. This holds the class instance's config as an object and is useful for
setting other attributes without making additional API calls.
Returns:
dict: The configuration of a Panos firewall as a dictionary object.
Logs:
INFO: Fetching configuration for the specified hostname.
INFO: Successfully retrieved the configuration.
ERROR: Request failed with the specific error message.
"""
logger.info("Fetching configuration for %s", self.hostname)
cmd = "<show><config><effective-running></effective-running></config></show>"
try:
response = self._api_call(cmd)
config = response.get('response', {}).get('result', {}).get('config', {})
logger.info("Successfully retrieved the configuration")
return config
except requests.exceptions.RequestException as e:
logger.error("Request failed: %s", e)
return {}
def get_arp(self) -> dict:
"""
Retrieves the current ARP table from the Palo Alto NGFW.
This method constructs an XML command to fetch the ARP (Address Resolution Protocol) table,
which contains mappings of IP addresses to their corresponding MAC addresses. It then uses
the `_api_call` helper method to send the request and parse the response.
Returns:
dict: A dictionary representation of the ARP table, where each entry includes IP and MAC address mappings.
If the response does not contain the ARP entries, an empty dictionary is returned.
"""
cmd = "<show><arp><entry name='all'/></arp></show>"
response = self._api_call(cmd)
return response.get('response', {}).get('result', {}).get('entries', {}).get('entry', {})
def get_interfaces(self) -> dict:
"""
Retrieves an operational view of the Panos firewall's network interfaces.
This method constructs an XML command to fetch the current status and attributes
of the firewall's network interfaces. It uses the `_api_call` helper method to
send the request and parse the response.
Returns:
dict: A dictionary representation of the network interfaces and their attributes.
If the response does not contain the interfaces, an empty dictionary is returned.
Logs:
INFO: Fetching interfaces for the specified hostname.
"""
logger.info("Fetching Interfaces for %s", self.hostname)
cmd = "<show><interface>logical</interface></show>"
response = self._api_call(cmd)
return response.get('response', {}).get('result', {}).get('ifnet', {}).get('entry', {})
def get_snmp_version(self) -> str:
'''
Retrieves the SNMP version configured on a Panos firewall
Returns:
str: The SNMP version configured on a Panos firewall
'''
snmp = self.config['devices']['entry']['deviceconfig']['system'].get('snmp-setting')
if snmp:
snmp_version = snmp['access-setting']['version']
else:
snmp_version = "Not Configured"
return snmp_version
def get_timezone(self) -> str:
'''
Retrieves the configured timezone for a Panos firewall
Returns:
str: The configured timezone for a Panos firewall
'''
timezone = self.config['devices']['entry']['deviceconfig']['system'].get('timezone')
return timezone
def get_ntp_servers(self) -> list:
'''
Retreives a list of all configured NTP servers on a Panos firewall
Returns:
list: List of all configured NTP servers on a Panos firewall
'''
ntp_servers = []
ntp = self.config['devices']['entry']['deviceconfig']['system'].get('ntp-servers')
if ntp:
for v in ntp.values():
if isinstance(v, dict) and 'ntp-server-address' in v:
ntp_servers.append(v['ntp-server-address'])
return ntp_servers
def get_dns_servers(self) -> list:
'''
Retrieves a list of the configured DNS servers on a Panos firewall
Returns:
list: List of the configured DNS servers on a Panos firewall
'''
dns_servers = list()
dns = (
self.config['devices']['entry']['deviceconfig']
['system']['dns-setting'].get('servers')
)
if dns:
dns_servers.append(dns)
return dns_servers
def get_zones(self) -> list:
'''
Retrieves a list of all configured zones on the Panos firewall
Returns:
list: List of all configured zones on the Panos firewall
'''
zones = []
for z in self.config['devices']['entry']['vsys']['entry']['zone']['entry']:
zones.append(z['@name'])
return zones
def get_admin_users(self) -> list:
'''
Retrieves a list of all configured admins on the Panos firewall
Returns:
list: List of all configured admins on the Panos firewall
'''
admins = []
for admin in self.config['mgt-config']['users']['entry']:
admins.append(admin['@name'])
return admins
def get_diskspace(self) -> list:
"""
Retrieves an operational view of the Panos firewall's disk space.
This method constructs an XML command to fetch the current disk space usage of the firewall.
It uses the `_api_call` helper method to send the request and parse the response. The method
processes the response to extract and format the disk space information.
Returns:
list: A list of dictionaries representing the disk space usage of the firewall's partitions.
Each dictionary contains the partition attributes like size, used, available, and usage percentage.
Logs:
INFO: Fetching disk space information for the specified hostname.
"""
logger.info("Fetching disk space information for %s", self.hostname)
cmd = "<show><system><disk-space/></system></show>"
response = self._api_call(cmd)
ds = response.get('response', {}).get('result', {}).split('\n')
disk_output = []
keys = ds[0].split()
mounted_on = keys[5] + " " + keys[6]
keys = keys[:5] + [mounted_on] # Need to wrap mounted_on in a list to avoid TypeError
for o in ds[1:]:
values = o.split()
entry = dict(zip(keys, values))
disk_output.append(entry)
return disk_output
def high_disk_usage(self, threshold: int) -> bool:
"""
Determines if the disk usage of key partitions exceeds a given threshold.
This method checks the disk usage percentage of critical partitions ("/" and "/opt/pancfg").
If the usage exceeds the specified threshold for either partition, the method returns True.
Parameters:
threshold (int): The disk usage percentage threshold to check against.
Returns:
bool: True if the usage of either partition exceeds the threshold, False otherwise.
"""
parts = [int(d['Use%'].replace('%', '')) for d in self.get_diskspace() if d['Mounted on'] in ["/", "/opt/pancfg"]]
return any(p >= threshold for p in parts)
def cleanup_disk_space(self) -> str:
"""
Cleans up disk space on the Palo Alto NGFW by executing a series of commands.
This method connects to the firewall via SSH and runs predefined commands to delete
old content, debug logs, and other unnecessary files to free up disk space.
Returns:
str: The result of the last executed command.
Logs:
INFO: Indicates the start of the cleanup process and logs each command execution.
ERROR: Logs errors related to connection and authentication issues.
"""
commands = [
"delete content cache old-content",
"delete debug-log mp-log file *.1",
"delete debug-log mp-log file *.2",
"delete debug-log mp-log file *.3",
"delete debug-log mp-log file *.old",
"debug software disk-usage cleanup threshold 90"
]
prompt_pattern = r">\s"
params = {
"device_type": "paloalto_panos",
"host": self.hostname,
"username": self.username,
"password": self.password
}
try:
session = ConnectHandler(**params)
except ValueError as e:
logger.error("%s error: %s", self.hostname, e)
except NetMikoAuthenticationException:
logger.error("Authentication failed to %s", self.hostname)
except NetMikoTimeoutException:
logger.error("Cannot connect to %s", self.hostname)
else:
logger.info("Cleaning up the disk space on %s", self.hostname)
results = []
for c in commands:
logger.info("Running command %s", c)
result = ''.join(session.send_command(c, expect_string=prompt_pattern).split('\n'))
logger.info(" - %s", result)
results.append(result)
session.disconnect()
return result
def check_content_updates(self) -> str:
"""
Checks for available content updates on the Palo Alto NGFW.
This method constructs an XML command to check for available content updates,
such as antivirus or application updates. It uses the `_api_call` helper method
to send the request and parse the response. The method then determines the latest
available content version and compares it with the current version.
Returns:
str: The newest available content version, or an empty string if the current
version is the same as the latest version.
Logs:
INFO: Checking for available content updates for the specified hostname.
"""
logger.info("Checking for available content updates for %s", self.hostname)
cmd = "<request><content><upgrade><check></check></upgrade></content></request>"
response = self._api_call(cmd)
content = response.get('response', {}).get('result', {}).get('content-updates', {}).get('entry', [])
if not content:
return ''
latest = content[0]['version']
current = ''
for c in content:
if c['version'].split('-')[1] > latest.split('-')[1]:
latest = c['version']
if c.get('current') == 'yes':
current = c['version']
return latest if int(current.split('-')[1]) < int(latest.split('-')[1]) else ''
def install_content_updates(self) -> None:
"""
Downloads and installs the latest content updates on the Palo Alto NGFW.
This method constructs an XML command to download and install the latest content updates.
It uses the `_api_call` helper method to send the request and parse the response. The method
logs the process and outputs the job details.
Returns:
None
Logs:
INFO: Indicates the start of the content update download process.
INFO: Logs the job details after the content update download request.
"""
logger.info("Downloading latest content updates for %s", self.hostname)
cmd = "<request><content><upgrade><download><latest></latest></download></upgrade></content></request>"
response = self._api_call(cmd)
content = response.get('response', {}).get('result', {}).get('job', '')
logger.info("Job details: %s", content)
def check_av_updates(self) -> str:
"""
Checks for available Anti Virus updates on the Palo Alto NGFW.
This method constructs an XML command to check for available Anti Virus updates.
It uses the `_api_call` helper method to send the request and parse the response. The method
then determines the latest available Anti Virus update version and compares it with the current version.
Returns:
str: The newest available Anti Virus version, or an empty string if the current
version is the same as the latest version.
Logs:
INFO: Checking for available Anti Virus updates for the specified hostname.
"""
logger.info("Checking for available Anti Virus updates for %s", self.hostname)
cmd = "<request><anti-virus><upgrade><check></check></upgrade></anti-virus></request>"
response = self._api_call(cmd)
content = response.get('response', {}).get('result', {}).get('content-updates', {}).get('entry', [])
if not content:
return ''
latest = content[0]['version']
current = ''
for c in content:
if c['version'].split('-')[1] > latest.split('-')[1]:
latest = c['version']
if c.get('current') == 'yes':
current = c['version']
return latest if int(current.split('-')[1]) < int(latest.split('-')[1]) else ''
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment