Add support for multiple IP addresses per host

This commit is contained in:
Marco Realacci 2025-01-25 18:09:58 +01:00
parent 9e3a605fa0
commit d62a7ac9fb

128
main.py
View file

@ -6,107 +6,68 @@ import ipaddress
import urllib3 import urllib3
# Environment variables # Environment variables
# URL of the OPNsense instance
OPNSENSE_URL = os.getenv("OPNSENSE_URL", None) OPNSENSE_URL = os.getenv("OPNSENSE_URL", None)
# API key for the OPNsense instance
OPNSENSE_API_KEY = os.getenv("OPNSENSE_API_KEY", None) OPNSENSE_API_KEY = os.getenv("OPNSENSE_API_KEY", None)
# API secret for the OPNsense instance
OPNSENSE_API_SECRET = os.getenv("OPNSENSE_API_SECRET", None) OPNSENSE_API_SECRET = os.getenv("OPNSENSE_API_SECRET", None)
# URL of the Technitium DNS server
TECHNITIUM_URL = os.getenv("TECHNITIUM_URL", None) TECHNITIUM_URL = os.getenv("TECHNITIUM_URL", None)
# Token for the Technitium DNS server
TECHNITIUM_TOKEN = os.getenv("TECHNITIUM_TOKEN", None) TECHNITIUM_TOKEN = os.getenv("TECHNITIUM_TOKEN", None)
# Subnets for the DNS zones
DNS_ZONE_SUBNETS = os.getenv("DNS_ZONE_SUBNETS", None) DNS_ZONE_SUBNETS = os.getenv("DNS_ZONE_SUBNETS", None)
# Flag to indicate whether to do IPv4 or not
DO_V4 = (os.getenv("DO_V4", "false").lower() == "true") DO_V4 = (os.getenv("DO_V4", "false").lower() == "true")
# Flag to indicate whether to verify HTTPS or not
VERIFY_HTTPS = (os.getenv("VERIFY_HTTPS", "true").lower() == "true") VERIFY_HTTPS = (os.getenv("VERIFY_HTTPS", "true").lower() == "true")
# Clock interval for the main loop
CLOCK = int(os.getenv("CLOCK", "30")) CLOCK = int(os.getenv("CLOCK", "30"))
def get_opnsense_data(path): def get_opnsense_data(path):
"""
Function to get data from the OPNsense API.
Args:
path (str): The API endpoint to hit.
Returns:
dict: The JSON response from the API call.
"""
r = requests.get(url=OPNSENSE_URL + path, verify=VERIFY_HTTPS, auth=(OPNSENSE_API_KEY, OPNSENSE_API_SECRET)) r = requests.get(url=OPNSENSE_URL + path, verify=VERIFY_HTTPS, auth=(OPNSENSE_API_KEY, OPNSENSE_API_SECRET))
if r.status_code != 200: if r.status_code != 200:
logging.error("Error occurred" + str(r.status_code) + ": " + r.text) logging.error("Error occurred" + str(r.status_code) + ": " + r.text)
return None return None
return r.json() return r.json()
def get_ndp(): def get_ndp():
"""
Function to get the NDP table from the OPNsense API.
Returns:
dict: The JSON response from the API call.
"""
return get_opnsense_data("/api/diagnostics/interface/search_ndp") return get_opnsense_data("/api/diagnostics/interface/search_ndp")
def get_dhcp4_leases(): def get_dhcp4_leases():
"""
Function to get the DHCPv4 leases from the OPNsense API.
Returns:
dict: The JSON response from the API call.
"""
return get_opnsense_data("/api/dhcpv4/leases/searchLease") return get_opnsense_data("/api/dhcpv4/leases/searchLease")
def build_matches(ndp, leases): def build_matches(ndp, leases):
"""
Function to build matches between NDP and DHCPv4 leases.
Args:
ndp (dict): The NDP table.
leases (dict): The DHCPv4 leases.
Returns:
set: A set of matches.
"""
matches = set() matches = set()
for e in leases["rows"]: for e in leases["rows"]:
ip6s = tuple(x["ip"].split("%")[0] for x in ndp["rows"] if x["mac"] == e["mac"]) ip6s = tuple(x["ip"].split("%")[0] for x in ndp["rows"] if x["mac"] == e["mac"] and e["if_descr"] == x["intf_description"])
if len(ip6s) == 0 and not DO_V4: if len(ip6s) == 0 and not DO_V4:
continue continue
matches.add((e["address"], ip6s, e["hostname"])) matches.add((e["address"], ip6s, e["hostname"]))
return matches return matches
def find_zone(zones, ip4): def find_zone(zones, ip4):
"""
Function to find the DNS zone for a given IPv4 address.
Args:
zones (list): The list of DNS zones.
ip4 (str): The IPv4 address.
Returns:
str: The DNS zone for the given IPv4 address.
"""
for zone in zones: for zone in zones:
if ip4 in zone[0]: return zone[1] if ip4 in zone[0]: return zone[1]
return None return None
def get_existing_records(domain, zone):
url = f"{TECHNITIUM_URL}/api/zones/records/get?token={TECHNITIUM_TOKEN}&domain={domain}.{zone}"
r = requests.get(url=url, verify=VERIFY_HTTPS)
if r.status_code != 200:
logging.error("Error fetching existing records: " + str(r.status_code) + ": " + r.text)
return []
return r.json().get("response", {}).get("records", [])
def make_record(zones, match): def delete_record(zone, domain, record_type, value):
""" url = f"{TECHNITIUM_URL}/api/zones/records/delete?token={TECHNITIUM_TOKEN}&domain={domain}.{zone}&zone={zone}&type={record_type}&value={value}"
Function to make a DNS record for a given match. r = requests.get(url=url, verify=VERIFY_HTTPS)
if r.status_code != 200:
logging.error("Error deleting record: " + str(r.status_code) + ": " + r.text)
else:
logging.info(f"Deleted {record_type} record for {value} in {domain}.{zone}")
Args: def add_record(zone, domain, record_type, ip):
zones (list): The list of DNS zones. url = f"{TECHNITIUM_URL}/api/zones/records/add?token={TECHNITIUM_TOKEN}&domain={domain}.{zone}&type={record_type}&ttl=5&expiryTtl=604800&overwrite=false&ptr=true&ipAddress={ip}"
match (tuple): The match to make a record for. r = requests.get(url=url, verify=VERIFY_HTTPS)
""" if r.status_code != 200:
logging.error("Error adding record: " + str(r.status_code) + ": " + r.text)
else:
logging.info(f"Added {record_type} record for {ip} in {domain}.{zone}")
def sync_records(zones, match):
zone = find_zone(zones, ipaddress.ip_address(match[0])) zone = find_zone(zones, ipaddress.ip_address(match[0]))
if zone is None: if zone is None:
logging.warning("Could not find a DNS zone for " + match[0]) logging.warning("Could not find a DNS zone for " + match[0])
@ -117,29 +78,24 @@ def make_record(zones, match):
hostname = match[2] hostname = match[2]
if hostname == "": if hostname == "":
logging.warning("no hostname found for " + match[0]) logging.warning("No hostname found for " + match[0])
return return
for ip6 in ip6s: existing_records = get_existing_records(hostname, zone)
v6path = "/api/zones/records/add?token=" + TECHNITIUM_TOKEN + "&domain=" + hostname + "." + zone + "&type=AAAA&ttl=1&overwrite=true&ptr=true&ipAddress=" + ip6.exploded existing_ips = {r["rData"]["ipAddress"] for r in existing_records if r["type"] in ["A", "AAAA"]}
logging.info("Inserting AAAA: " + hostname + "." + zone + " " + ip6.compressed) current_ips = set([ip4] if DO_V4 else []) | {ip.exploded for ip in ip6s}
r = requests.get(url=TECHNITIUM_URL + v6path, verify=VERIFY_HTTPS)
if r.status_code != 200:
logging.error("Error occurred" + str(r.status_code) + ": " + r.text)
continue
if DO_V4: # Delete outdated records
v4path = "/api/zones/records/add?token=" + TECHNITIUM_TOKEN + "&domain=" + hostname + "." + zone + "&type=A&ttl=1&overwrite=true&ptr=true&ipAddress=" + ip4 for ip in existing_ips - current_ips:
logging.info("Inserting A: " + hostname + "." + zone + " " + ip4) record_type = "A" if "." in ip else "AAAA"
r = requests.get(url=TECHNITIUM_URL + v4path, verify=VERIFY_HTTPS) delete_record(zone, hostname, record_type, ip)
if r.status_code != 200:
logging.error("Error occurred" + str(r.status_code) + ": " + r.text)
# Add missing records
for ip in current_ips - existing_ips:
record_type = "A" if "." in ip else "AAAA"
add_record(zone, hostname, record_type, ip)
def run(): def run():
"""
Main function to run the script.
"""
if not VERIFY_HTTPS: if not VERIFY_HTTPS:
urllib3.disable_warnings() urllib3.disable_warnings()
@ -163,17 +119,10 @@ def run():
previous_matches = matches previous_matches = matches
for match in new_matches: for match in new_matches:
make_record(zones, match) sync_records(zones, match)
time.sleep(CLOCK) time.sleep(CLOCK)
def verify_env() -> bool: def verify_env() -> bool:
"""
Function to verify the environment variables.
Returns:
bool: True if all mandatory environment variables are set, False otherwise.
"""
if not OPNSENSE_URL: return False if not OPNSENSE_URL: return False
if not OPNSENSE_API_KEY: return False if not OPNSENSE_API_KEY: return False
if not OPNSENSE_API_SECRET: return False if not OPNSENSE_API_SECRET: return False
@ -182,7 +131,6 @@ def verify_env() -> bool:
if not DNS_ZONE_SUBNETS: return False if not DNS_ZONE_SUBNETS: return False
return True return True
if __name__ == "__main__": if __name__ == "__main__":
logging.getLogger().setLevel(os.getenv("LOG_LEVEL", "INFO")) logging.getLogger().setLevel(os.getenv("LOG_LEVEL", "INFO"))
logging.info("loading environment...") logging.info("loading environment...")
@ -195,4 +143,4 @@ if __name__ == "__main__":
logging.info("OPNSENSE_URL: {}".format(OPNSENSE_URL)) logging.info("OPNSENSE_URL: {}".format(OPNSENSE_URL))
logging.info("TECHNITIUM_URL: {}".format(TECHNITIUM_URL)) logging.info("TECHNITIUM_URL: {}".format(TECHNITIUM_URL))
logging.info("VERIFY_HTTPS: {}".format(VERIFY_HTTPS)) logging.info("VERIFY_HTTPS: {}".format(VERIFY_HTTPS))
run() run()