Compare commits

...

10 commits
v1.0.1 ... main

Author SHA1 Message Date
9b2802ce97 Version 2.0 2025-01-25 18:56:45 +01:00
d0eb9493b3 Move disclaimer to the bottom of README.md 2025-01-25 18:56:05 +01:00
fb8f646dad Update README.md 2025-01-25 18:52:43 +01:00
80b1fc2062 Add flag to ignore link local addresses 2025-01-25 18:29:39 +01:00
0c691f1f54 Add flag to ignore link local addresses 2025-01-25 18:16:02 +01:00
e405fdac2b Normalize IP addresses before matching 2025-01-25 18:13:28 +01:00
d62a7ac9fb Add support for multiple IP addresses per host 2025-01-25 18:09:58 +01:00
9e3a605fa0
Update docker-compose.yml
Some checks failed
Create and publish a Docker image / build-and-push-image (push) Has been cancelled
2024-05-14 02:16:06 +02:00
d4d853809c
Fix pipeline (#1) 2024-05-14 01:25:59 +02:00
1aed1efd77 Add comments 2024-05-14 00:23:36 +02:00
4 changed files with 86 additions and 41 deletions

View file

@ -27,7 +27,7 @@ jobs:
uses: actions/checkout@v4
# Uses the `docker/login-action` action to log in to the Container registry registry using the account and password that will publish the packages. Once published, the packages are scoped to the account defined here.
- name: Log in to the Container registry
uses: docker/login-action@v3
uses: docker/login-action@65b78e6e13532edd9afa3aa52ac7964289d1a9c1
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
@ -35,9 +35,11 @@ jobs:
# This step uses [docker/metadata-action](https://github.com/docker/metadata-action#about) to extract tags and labels that will be applied to the specified image. The `id` "meta" allows the output of this step to be referenced in a subsequent step. The `images` value provides the base name for the tags and labels.
- name: Extract metadata (tags, labels) for Docker
id: meta
uses: docker/metadata-action@v5
uses: docker/metadata-action@9ec57ed1fcdbf14dcef7dfbe97b2010124a938b7
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
images: ${{ env.REGISTRY }}/${{ github.actor }}/${{ env.IMAGE_NAME }}
flavor: |
latest=true
# This step uses the `docker/build-push-action` action to build the image, based on your repository's `Dockerfile`. If the build succeeds, it pushes the image to GitHub Packages.
# It uses the `context` parameter to define the build's context as the set of files located in the specified path. For more information, see "[Usage](https://github.com/docker/build-push-action#usage)" in the README of the `docker/build-push-action` repository.
# It uses the `tags` and `labels` parameters to tag and label the image with the output from the "meta" step.
@ -54,6 +56,6 @@ jobs:
- name: Generate artifact attestation
uses: actions/attest-build-provenance@v1
with:
subject-name: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME}}
subject-name: ${{ env.REGISTRY }}/${{ github.actor }}/${{ env.IMAGE_NAME}}
subject-digest: ${{ steps.push.outputs.digest }}
push-to-registry: true

View file

@ -13,7 +13,7 @@ Define the environment variables in the docker-compose file, then run: `docker c
### Environment variables:
| Variable Name | Description | Example Value |
|-----------------------|------------------------------------------------------------------------------------------|-----------------------------------------------------------------------|
|-----------------------|------------------------------------------------------------------------------------------|------------------------------------------------------------------------|
| `OPNSENSE_URL` | The base URL of your OPNsense instance | http://192.168.1.1 (required)
| `OPNSENSE_API_KEY` | OPNsense API key | `your_opnsense_api_key` (required) |
| `OPNSENSE_API_SECRET` | OPNsense API secret | `a_very_secret_token` (required) |
@ -21,11 +21,13 @@ Define the environment variables in the docker-compose file, then run: `docker c
| `TECHNITIUM_TOKEN` | Technitium DNS token | `another_very_secret_token` (required) |
| `DNS_ZONE_SUBNETS` | Comma separated DNS zones and IPv4 subnet | `192.168.1.0/24=lan.home.arpa,192.168.2.0/24=dmz.home.arpa` (required) |
| `DO_V4` | If set to true, A records will be configured, otherwise only AAAA records are configured | `false` (defaults to false) |
| `IGNORE_LINK_LOCAL` | If set to true, link local IPv6 addresses wil be ignored | `true` (defaults to true) |
| `VERIFY_HTTPS` | Verify OPNsense and Technitium's SSL certificates | `true` (defaults to true) |
| `CLOCK` | Interval between updates (in seconds) | `30` (defaults to 30) |
### Note
You have to create the corresponding DNS zones in the Technitium dashboard, you can configure them as primary or conditional forwarder zones.
If DNS records are not being added, make sure that the corresponding reverse zone exists in Technitium DNS, otherwise the script will fail silently.
### Contributing:
I welcome contributions! Feel free to submit issues, feature requests, or pull requests.

View file

@ -1,9 +1,11 @@
version: '3.6'
services:
slaacsense:
image: ghcr.io/notherealmarco/slaacsense:latest
build: .
container_name: slaacsense
restart: unless-stopped
pull_policy: always
environment:
- OPNSENSE_URL=${OPNSENSE_URL}
- OPNSENSE_API_KEY=${OPNSENSE_API_KEY}

91
main.py
View file

@ -4,7 +4,9 @@ import time
import requests
import ipaddress
import urllib3
from collections import defaultdict
# Environment variables
OPNSENSE_URL = os.getenv("OPNSENSE_URL", None)
OPNSENSE_API_KEY = os.getenv("OPNSENSE_API_KEY", None)
OPNSENSE_API_SECRET = os.getenv("OPNSENSE_API_SECRET", None)
@ -12,10 +14,10 @@ TECHNITIUM_URL = os.getenv("TECHNITIUM_URL", None)
TECHNITIUM_TOKEN = os.getenv("TECHNITIUM_TOKEN", None)
DNS_ZONE_SUBNETS = os.getenv("DNS_ZONE_SUBNETS", None)
DO_V4 = (os.getenv("DO_V4", "false").lower() == "true")
IGNORE_LINK_LOCAL = (os.getenv("IGNORE_LINK_LOCAL", "true").lower() == "true")
VERIFY_HTTPS = (os.getenv("VERIFY_HTTPS", "true").lower() == "true")
CLOCK = int(os.getenv("CLOCK", "30"))
def get_opnsense_data(path):
r = requests.get(url=OPNSENSE_URL + path, verify=VERIFY_HTTPS, auth=(OPNSENSE_API_KEY, OPNSENSE_API_SECRET))
if r.status_code != 200:
@ -23,60 +25,99 @@ def get_opnsense_data(path):
return None
return r.json()
def get_ndp():
return get_opnsense_data("/api/diagnostics/interface/search_ndp")
def get_dhcp4_leases():
return get_opnsense_data("/api/dhcpv4/leases/searchLease")
def build_matches(ndp, leases):
matches = set()
hostname_to_macs = defaultdict(lambda: defaultdict(list))
for e in leases["rows"]:
ip6s = tuple(x["ip"].split("%")[0] for x in ndp["rows"] if x["mac"] == e["mac"])
ip6s = tuple(
x["ip"].split("%")[0] for x in ndp["rows"]
if x["mac"] == e["mac"] and x["intf_description"] == e["if_descr"]
)
if IGNORE_LINK_LOCAL:
ip6s = tuple(ip for ip in ip6s if not ipaddress.ip_address(ip).is_link_local)
if len(ip6s) == 0 and not DO_V4:
continue
matches.add((e["address"], ip6s, e["hostname"]))
return matches
hostname = e["hostname"]
if hostname:
hostname_to_macs[hostname][e["if_descr"]].append(e["mac"])
matches.add((e["address"], ip6s, hostname, e["if_descr"], e["mac"]))
# Handle duplicate hostnames on the same interface
adjusted_matches = set()
for match in matches:
ip4, ip6s, hostname, if_descr, mac = match
if hostname and len(hostname_to_macs[hostname][if_descr]) > 1:
# Add the last 4 characters of the MAC address to the hostname
hostname = f"{hostname}-{mac.replace(':', '')[-4:]}"
adjusted_matches.add((ip4, ip6s, hostname))
return adjusted_matches
def find_zone(zones, ip4):
for zone in zones:
if ip4 in zone[0]: return zone[1]
return None
def get_existing_records(domain, zone):
url = f"{TECHNITIUM_URL}/api/zones/records/get?token={TECHNITIUM_TOKEN}&domain={domain}.{zone}"
r = requests.get(url=url, verify=VERIFY_HTTPS)
if r.status_code != 200:
logging.error("Error fetching existing records: " + str(r.status_code) + ": " + r.text)
return []
return r.json().get("response", {}).get("records", [])
def make_record(zones, match):
def delete_record(zone, domain, record_type, value):
url = f"{TECHNITIUM_URL}/api/zones/records/delete?token={TECHNITIUM_TOKEN}&domain={domain}.{zone}&zone={zone}&type={record_type}&value={value}"
r = requests.get(url=url, verify=VERIFY_HTTPS)
if r.status_code != 200:
logging.error("Error deleting record: " + str(r.status_code) + ": " + r.text)
else:
logging.info(f"Deleted {record_type} record for {value} in {domain}.{zone}")
def add_record(zone, domain, record_type, ip):
url = f"{TECHNITIUM_URL}/api/zones/records/add?token={TECHNITIUM_TOKEN}&domain={domain}.{zone}&type={record_type}&ttl=5&expiryTtl=604800&overwrite=false&ptr=true&ipAddress={ip}"
r = requests.get(url=url, verify=VERIFY_HTTPS)
if r.status_code != 200:
logging.error("Error adding record: " + str(r.status_code) + ": " + r.text)
else:
logging.info(f"Added {record_type} record for {ip} in {domain}.{zone}")
def sync_records(zones, match):
zone = find_zone(zones, ipaddress.ip_address(match[0]))
if zone is None:
logging.warning("Could not find a DNS zone for " + match[0])
return
ip4 = match[0]
ip6s = [ipaddress.ip_address(x) for x in match[1]]
ip6s = [ipaddress.ip_address(x).compressed for x in match[1]]
hostname = match[2]
if hostname == "":
logging.warning("no hostname found for " + match[0])
logging.warning("No hostname found for " + match[0])
return
for ip6 in ip6s:
v6path = "/api/zones/records/add?token=" + TECHNITIUM_TOKEN + "&domain=" + hostname + "." + zone + "&type=AAAA&ttl=1&overwrite=true&ptr=true&ipAddress=" + ip6.exploded
logging.info("Inserting AAAA: " + hostname + "." + zone + " " + ip6.compressed)
r = requests.get(url=TECHNITIUM_URL + v6path, verify=VERIFY_HTTPS)
if r.status_code != 200:
logging.error("Error occurred" + str(r.status_code) + ": " + r.text)
continue
existing_records = get_existing_records(hostname, zone)
existing_ips = {ipaddress.ip_address(r["rData"]["ipAddress"]).compressed for r in existing_records if r["type"] in ["A", "AAAA"]}
current_ips = set([ipaddress.ip_address(ip4).compressed] if DO_V4 else []) | set(ip6s)
if DO_V4:
v4path = "/api/zones/records/add?token=" + TECHNITIUM_TOKEN + "&domain=" + hostname + "." + zone + "&type=A&ttl=1&overwrite=true&ptr=true&ipAddress=" + ip4
logging.info("Inserting A: " + hostname + "." + zone + " " + ip4)
r = requests.get(url=TECHNITIUM_URL + v4path, verify=VERIFY_HTTPS)
if r.status_code != 200:
logging.error("Error occurred" + str(r.status_code) + ": " + r.text)
# Delete outdated records
for ip in existing_ips - current_ips:
record_type = "A" if "." in ip else "AAAA"
delete_record(zone, hostname, record_type, ip)
# Add missing records
for ip in current_ips - existing_ips:
record_type = "A" if "." in ip else "AAAA"
add_record(zone, hostname, record_type, ip)
def run():
if not VERIFY_HTTPS:
@ -102,10 +143,9 @@ def run():
previous_matches = matches
for match in new_matches:
make_record(zones, match)
sync_records(zones, match)
time.sleep(CLOCK)
def verify_env() -> bool:
if not OPNSENSE_URL: return False
if not OPNSENSE_API_KEY: return False
@ -115,7 +155,6 @@ def verify_env() -> bool:
if not DNS_ZONE_SUBNETS: return False
return True
if __name__ == "__main__":
logging.getLogger().setLevel(os.getenv("LOG_LEVEL", "INFO"))
logging.info("loading environment...")