Compare commits

..

9 commits
v1.0.2 ... main

Author SHA1 Message Date
9b2802ce97 Version 2.0 2025-01-25 18:56:45 +01:00
d0eb9493b3 Move disclaimer to the bottom of README.md 2025-01-25 18:56:05 +01:00
fb8f646dad Update README.md 2025-01-25 18:52:43 +01:00
80b1fc2062 Add flag to ignore link local addresses 2025-01-25 18:29:39 +01:00
0c691f1f54 Add flag to ignore link local addresses 2025-01-25 18:16:02 +01:00
e405fdac2b Normalize IP addresses before matching 2025-01-25 18:13:28 +01:00
d62a7ac9fb Add support for multiple IP addresses per host 2025-01-25 18:09:58 +01:00
9e3a605fa0
Update docker-compose.yml
Some checks failed
Create and publish a Docker image / build-and-push-image (push) Has been cancelled
2024-05-14 02:16:06 +02:00
d4d853809c
Fix pipeline (#1) 2024-05-14 01:25:59 +02:00
5 changed files with 82 additions and 157 deletions

View file

@ -3,7 +3,7 @@ name: Create and publish a Docker image
# Configures this workflow to run every time a change is pushed to the branch called `release`.
on:
push:
branches: ['dev']
branches: ['main']
# Defines two custom environment variables for the workflow. These are used for the Container registry domain, and a name for the Docker image that this workflow builds.
env:
@ -38,6 +38,8 @@ jobs:
uses: docker/metadata-action@9ec57ed1fcdbf14dcef7dfbe97b2010124a938b7
with:
images: ${{ env.REGISTRY }}/${{ github.actor }}/${{ env.IMAGE_NAME }}
flavor: |
latest=true
# This step uses the `docker/build-push-action` action to build the image, based on your repository's `Dockerfile`. If the build succeeds, it pushes the image to GitHub Packages.
# It uses the `context` parameter to define the build's context as the set of files located in the specified path. For more information, see "[Usage](https://github.com/docker/build-push-action#usage)" in the README of the `docker/build-push-action` repository.
# It uses the `tags` and `labels` parameters to tag and label the image with the output from the "meta" step.
@ -48,6 +50,7 @@ jobs:
context: .
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
# This step generates an artifact attestation for the image, which is an unforgeable statement about where and how it was built. It increases supply chain security for people who consume the image. For more information, see "[AUTOTITLE](/actions/security-guides/using-artifact-attestations-to-establish-provenance-for-builds)."
- name: Generate artifact attestation

View file

@ -1,54 +0,0 @@
#
name: Demo Push
# This workflow runs when any of the following occur:
# - A push is made to a branch called `main` or `seed`
# - A tag starting with "v" is created
# - A pull request is created or updated
on:
push:
branches:
- disabled
# tags:
# - v*
# pull_request:
# This creates an environment variable called `IMAGE_NAME ` with the value `ghtoken_product_demo`.
env:
REGISTRY: ghcr.io
IMAGE_NAME: slaacsense
#
jobs:
# This pushes the image to GitHub Packages.
push:
runs-on: ubuntu-latest
permissions:
contents: read
packages: write
attestations: write
pages: write
id-token: write
#
steps:
- uses: actions/checkout@v4
- name: Build image
run: docker build . --file Dockerfile --tag $IMAGE_NAME --label "runnumber=${GITHUB_RUN_ID}"
- name: Log in to registry
run: echo "${{ secrets.GITHUB_TOKEN }}" | docker login ghcr.io -u ${{ github.actor }} --password-stdin
- name: Push image
run: |
IMAGE_ID=ghcr.io/${{ github.repository_owner }}/$IMAGE_NAME
# This changes all uppercase characters to lowercase.
IMAGE_ID=$(echo $IMAGE_ID | tr '[A-Z]' '[a-z]')
# This strips the git ref prefix from the version.
VERSION=$(echo "${{ github.ref }}" | sed -e 's,.*/\(.*\),\1,')
# This strips the "v" prefix from the tag name.
[[ "${{ github.ref }}" == "refs/tags/"* ]] && VERSION=$(echo $VERSION | sed -e 's/^v//')
# This uses the Docker `latest` tag convention.
[ "$VERSION" == "main" ] && VERSION=latest
echo IMAGE_ID=$IMAGE_ID
echo VERSION=$VERSION
docker tag $IMAGE_NAME $IMAGE_ID:$VERSION
docker push $IMAGE_ID:$VERSION

View file

@ -12,20 +12,22 @@ Define the environment variables in the docker-compose file, then run: `docker c
### Environment variables:
| Variable Name | Description | Example Value |
|-----------------------|------------------------------------------------------------------------------------------|-----------------------------------------------------------------------|
| `OPNSENSE_URL` | The base URL of your OPNsense instance | http://192.168.1.1 (required)
| `OPNSENSE_API_KEY` | OPNsense API key | `your_opnsense_api_key` (required) |
| `OPNSENSE_API_SECRET` | OPNsense API secret | `a_very_secret_token` (required) |
| `TECHNITIUM_URL` | The base URL of your Technitium DNS instance | `dns.myawesomehome.home.arpa` (required) |
| `TECHNITIUM_TOKEN` | Technitium DNS token | `another_very_secret_token` (required) |
| Variable Name | Description | Example Value |
|-----------------------|------------------------------------------------------------------------------------------|------------------------------------------------------------------------|
| `OPNSENSE_URL` | The base URL of your OPNsense instance | http://192.168.1.1 (required)
| `OPNSENSE_API_KEY` | OPNsense API key | `your_opnsense_api_key` (required) |
| `OPNSENSE_API_SECRET` | OPNsense API secret | `a_very_secret_token` (required) |
| `TECHNITIUM_URL` | The base URL of your Technitium DNS instance | `dns.myawesomehome.home.arpa` (required) |
| `TECHNITIUM_TOKEN` | Technitium DNS token | `another_very_secret_token` (required) |
| `DNS_ZONE_SUBNETS` | Comma separated DNS zones and IPv4 subnet | `192.168.1.0/24=lan.home.arpa,192.168.2.0/24=dmz.home.arpa` (required) |
| `DO_V4` | If set to true, A records will be configured, otherwise only AAAA records are configured | `false` (defaults to false) |
| `VERIFY_HTTPS` | Verify OPNsense and Technitium's SSL certificates | `true` (defaults to true) |
| `CLOCK` | Interval between updates (in seconds) | `30` (defaults to 30) |
| `DO_V4` | If set to true, A records will be configured, otherwise only AAAA records are configured | `false` (defaults to false) |
| `IGNORE_LINK_LOCAL` | If set to true, link local IPv6 addresses wil be ignored | `true` (defaults to true) |
| `VERIFY_HTTPS` | Verify OPNsense and Technitium's SSL certificates | `true` (defaults to true) |
| `CLOCK` | Interval between updates (in seconds) | `30` (defaults to 30) |
### Note
You have to create the corresponding DNS zones in the Technitium dashboard, you can configure them as primary or conditional forwarder zones.
If DNS records are not being added, make sure that the corresponding reverse zone exists in Technitium DNS, otherwise the script will fail silently.
### Contributing:
I welcome contributions! Feel free to submit issues, feature requests, or pull requests.

View file

@ -1,9 +1,11 @@
version: '3.6'
services:
slaacsense:
image: ghcr.io/notherealmarco/slaacsense:latest
build: .
container_name: slaacsense
restart: unless-stopped
pull_policy: always
environment:
- OPNSENSE_URL=${OPNSENSE_URL}
- OPNSENSE_API_KEY=${OPNSENSE_API_KEY}

156
main.py
View file

@ -4,142 +4,122 @@ import time
import requests
import ipaddress
import urllib3
from collections import defaultdict
# Environment variables
# URL of the OPNsense instance
OPNSENSE_URL = os.getenv("OPNSENSE_URL", None)
# API key for the OPNsense instance
OPNSENSE_API_KEY = os.getenv("OPNSENSE_API_KEY", None)
# API secret for the OPNsense instance
OPNSENSE_API_SECRET = os.getenv("OPNSENSE_API_SECRET", None)
# URL of the Technitium DNS server
TECHNITIUM_URL = os.getenv("TECHNITIUM_URL", None)
# Token for the Technitium DNS server
TECHNITIUM_TOKEN = os.getenv("TECHNITIUM_TOKEN", None)
# Subnets for the DNS zones
DNS_ZONE_SUBNETS = os.getenv("DNS_ZONE_SUBNETS", None)
# Flag to indicate whether to do IPv4 or not
DO_V4 = (os.getenv("DO_V4", "false").lower() == "true")
# Flag to indicate whether to verify HTTPS or not
IGNORE_LINK_LOCAL = (os.getenv("IGNORE_LINK_LOCAL", "true").lower() == "true")
VERIFY_HTTPS = (os.getenv("VERIFY_HTTPS", "true").lower() == "true")
# Clock interval for the main loop
CLOCK = int(os.getenv("CLOCK", "30"))
def get_opnsense_data(path):
"""
Function to get data from the OPNsense API.
Args:
path (str): The API endpoint to hit.
Returns:
dict: The JSON response from the API call.
"""
r = requests.get(url=OPNSENSE_URL + path, verify=VERIFY_HTTPS, auth=(OPNSENSE_API_KEY, OPNSENSE_API_SECRET))
if r.status_code != 200:
logging.error("Error occurred" + str(r.status_code) + ": " + r.text)
return None
return r.json()
def get_ndp():
"""
Function to get the NDP table from the OPNsense API.
Returns:
dict: The JSON response from the API call.
"""
return get_opnsense_data("/api/diagnostics/interface/search_ndp")
def get_dhcp4_leases():
"""
Function to get the DHCPv4 leases from the OPNsense API.
Returns:
dict: The JSON response from the API call.
"""
return get_opnsense_data("/api/dhcpv4/leases/searchLease")
def build_matches(ndp, leases):
"""
Function to build matches between NDP and DHCPv4 leases.
Args:
ndp (dict): The NDP table.
leases (dict): The DHCPv4 leases.
Returns:
set: A set of matches.
"""
matches = set()
hostname_to_macs = defaultdict(lambda: defaultdict(list))
for e in leases["rows"]:
ip6s = tuple(x["ip"].split("%")[0] for x in ndp["rows"] if x["mac"] == e["mac"])
ip6s = tuple(
x["ip"].split("%")[0] for x in ndp["rows"]
if x["mac"] == e["mac"] and x["intf_description"] == e["if_descr"]
)
if IGNORE_LINK_LOCAL:
ip6s = tuple(ip for ip in ip6s if not ipaddress.ip_address(ip).is_link_local)
if len(ip6s) == 0 and not DO_V4:
continue
matches.add((e["address"], ip6s, e["hostname"]))
return matches
hostname = e["hostname"]
if hostname:
hostname_to_macs[hostname][e["if_descr"]].append(e["mac"])
matches.add((e["address"], ip6s, hostname, e["if_descr"], e["mac"]))
# Handle duplicate hostnames on the same interface
adjusted_matches = set()
for match in matches:
ip4, ip6s, hostname, if_descr, mac = match
if hostname and len(hostname_to_macs[hostname][if_descr]) > 1:
# Add the last 4 characters of the MAC address to the hostname
hostname = f"{hostname}-{mac.replace(':', '')[-4:]}"
adjusted_matches.add((ip4, ip6s, hostname))
return adjusted_matches
def find_zone(zones, ip4):
"""
Function to find the DNS zone for a given IPv4 address.
Args:
zones (list): The list of DNS zones.
ip4 (str): The IPv4 address.
Returns:
str: The DNS zone for the given IPv4 address.
"""
for zone in zones:
if ip4 in zone[0]: return zone[1]
return None
def get_existing_records(domain, zone):
url = f"{TECHNITIUM_URL}/api/zones/records/get?token={TECHNITIUM_TOKEN}&domain={domain}.{zone}"
r = requests.get(url=url, verify=VERIFY_HTTPS)
if r.status_code != 200:
logging.error("Error fetching existing records: " + str(r.status_code) + ": " + r.text)
return []
return r.json().get("response", {}).get("records", [])
def make_record(zones, match):
"""
Function to make a DNS record for a given match.
def delete_record(zone, domain, record_type, value):
url = f"{TECHNITIUM_URL}/api/zones/records/delete?token={TECHNITIUM_TOKEN}&domain={domain}.{zone}&zone={zone}&type={record_type}&value={value}"
r = requests.get(url=url, verify=VERIFY_HTTPS)
if r.status_code != 200:
logging.error("Error deleting record: " + str(r.status_code) + ": " + r.text)
else:
logging.info(f"Deleted {record_type} record for {value} in {domain}.{zone}")
Args:
zones (list): The list of DNS zones.
match (tuple): The match to make a record for.
"""
def add_record(zone, domain, record_type, ip):
url = f"{TECHNITIUM_URL}/api/zones/records/add?token={TECHNITIUM_TOKEN}&domain={domain}.{zone}&type={record_type}&ttl=5&expiryTtl=604800&overwrite=false&ptr=true&ipAddress={ip}"
r = requests.get(url=url, verify=VERIFY_HTTPS)
if r.status_code != 200:
logging.error("Error adding record: " + str(r.status_code) + ": " + r.text)
else:
logging.info(f"Added {record_type} record for {ip} in {domain}.{zone}")
def sync_records(zones, match):
zone = find_zone(zones, ipaddress.ip_address(match[0]))
if zone is None:
logging.warning("Could not find a DNS zone for " + match[0])
return
ip4 = match[0]
ip6s = [ipaddress.ip_address(x) for x in match[1]]
ip6s = [ipaddress.ip_address(x).compressed for x in match[1]]
hostname = match[2]
if hostname == "":
logging.warning("no hostname found for " + match[0])
logging.warning("No hostname found for " + match[0])
return
for ip6 in ip6s:
v6path = "/api/zones/records/add?token=" + TECHNITIUM_TOKEN + "&domain=" + hostname + "." + zone + "&type=AAAA&ttl=1&overwrite=true&ptr=true&ipAddress=" + ip6.exploded
logging.info("Inserting AAAA: " + hostname + "." + zone + " " + ip6.compressed)
r = requests.get(url=TECHNITIUM_URL + v6path, verify=VERIFY_HTTPS)
if r.status_code != 200:
logging.error("Error occurred" + str(r.status_code) + ": " + r.text)
continue
existing_records = get_existing_records(hostname, zone)
existing_ips = {ipaddress.ip_address(r["rData"]["ipAddress"]).compressed for r in existing_records if r["type"] in ["A", "AAAA"]}
current_ips = set([ipaddress.ip_address(ip4).compressed] if DO_V4 else []) | set(ip6s)
if DO_V4:
v4path = "/api/zones/records/add?token=" + TECHNITIUM_TOKEN + "&domain=" + hostname + "." + zone + "&type=A&ttl=1&overwrite=true&ptr=true&ipAddress=" + ip4
logging.info("Inserting A: " + hostname + "." + zone + " " + ip4)
r = requests.get(url=TECHNITIUM_URL + v4path, verify=VERIFY_HTTPS)
if r.status_code != 200:
logging.error("Error occurred" + str(r.status_code) + ": " + r.text)
# Delete outdated records
for ip in existing_ips - current_ips:
record_type = "A" if "." in ip else "AAAA"
delete_record(zone, hostname, record_type, ip)
# Add missing records
for ip in current_ips - existing_ips:
record_type = "A" if "." in ip else "AAAA"
add_record(zone, hostname, record_type, ip)
def run():
"""
Main function to run the script.
"""
if not VERIFY_HTTPS:
urllib3.disable_warnings()
@ -163,17 +143,10 @@ def run():
previous_matches = matches
for match in new_matches:
make_record(zones, match)
sync_records(zones, match)
time.sleep(CLOCK)
def verify_env() -> bool:
"""
Function to verify the environment variables.
Returns:
bool: True if all mandatory environment variables are set, False otherwise.
"""
if not OPNSENSE_URL: return False
if not OPNSENSE_API_KEY: return False
if not OPNSENSE_API_SECRET: return False
@ -182,7 +155,6 @@ def verify_env() -> bool:
if not DNS_ZONE_SUBNETS: return False
return True
if __name__ == "__main__":
logging.getLogger().setLevel(os.getenv("LOG_LEVEL", "INFO"))
logging.info("loading environment...")
@ -195,4 +167,4 @@ if __name__ == "__main__":
logging.info("OPNSENSE_URL: {}".format(OPNSENSE_URL))
logging.info("TECHNITIUM_URL: {}".format(TECHNITIUM_URL))
logging.info("VERIFY_HTTPS: {}".format(VERIFY_HTTPS))
run()
run()