register/scripts/migrate-nix.py
2026-03-22 07:15:41 +07:00

279 lines
No EOL
8.9 KiB
Python
Executable file

#!/usr/bin/env python3
"""
Migration script to convert domains/*.json to domains/*.nix
Reads each JSON domain config and generates a corresponding .nix file
following the format from docs/example.nix.
Usage:
python3 scripts/migrate-nix.py [--dry-run] [--delete-json]
Options:
--dry-run Print generated .nix content to stdout without writing files
--delete-json Delete the original .json files after successful conversion
"""
import json
import sys
from pathlib import Path
DOMAINS_DIR = Path(__file__).resolve().parent.parent / "domains"
def json_to_nix(data: dict) -> str:
"""Convert a single domain JSON config to a .nix file string."""
owner = data.get("owner", {})
description = data.get("description")
record = data.get("record", {})
# Some files use "proxy", others use "proxied"
proxy = data.get("proxied", data.get("proxy"))
lines = []
# Header — no let block, just the function head with `with`
lines.append("{ dns, ... }: with dns.lib.combinators; {")
# Owner block as a top-level attribute
lines.append(" owner = {")
if owner.get("username"):
lines.append(f' username = "{escape_nix_string(owner["username"])}";')
if owner.get("email"):
lines.append(f' email = "{escape_nix_string(owner["email"])}";')
if owner.get("discord"):
lines.append(f' discord = "{escape_nix_string(owner["discord"])}";')
if owner.get("repo"):
lines.append(f' repo = "{escape_nix_string(owner["repo"])}";')
lines.append(" };")
# Description as a top-level attribute
if description is not None:
lines.append(f' description = "{escape_nix_string(description)}";')
# Proxy as a top-level attribute
if proxy is not None:
lines.append(f" proxy = {'true' if proxy else 'false'};")
# Records nested under `records`
record_lines = build_record_lines(record)
if record_lines:
lines.append(" records = {")
for rl in record_lines:
lines.append(rl)
lines.append(" };")
lines.append("}")
lines.append("")
return "\n".join(lines)
def escape_nix_string(s: str) -> str:
"""Escape special characters for a Nix double-quoted string."""
s = s.replace("\\", "\\\\")
s = s.replace('"', '\\"')
s = s.replace("${", "\\${")
return s
def build_record_lines(record: dict) -> list[str]:
"""Build the Nix record lines from the JSON record dict.
These are indented with 4 spaces since they sit inside `records = { ... };`.
"""
lines = []
if "A" in record:
values = record["A"]
if isinstance(values, list):
if len(values) == 1:
lines.append(f' A = [ "{values[0]}" ];')
else:
lines.append(" A = [")
for v in values:
lines.append(f' "{v}"')
lines.append(" ];")
else:
lines.append(f' A = [ "{values}" ];')
if "AAAA" in record:
values = record["AAAA"]
if isinstance(values, list):
if len(values) == 1:
lines.append(f' AAAA = [ "{values[0]}" ];')
else:
lines.append(" AAAA = [")
for v in values:
lines.append(f' "{v}"')
lines.append(" ];")
else:
lines.append(f' AAAA = [ "{values}" ];')
if "CNAME" in record:
value = record["CNAME"]
if isinstance(value, list):
value = value[0]
lines.append(f' CNAME = [ "{ensure_fqdn(value)}" ];')
if "ALIAS" in record:
value = record["ALIAS"]
if isinstance(value, list):
value = value[0]
# ALIAS is typically handled as CNAME in dns.nix
lines.append(f' CNAME = [ "{ensure_fqdn(value)}" ];')
if "MX" in record:
values = record["MX"]
if isinstance(values, list):
lines.append(" MX = [")
for i, v in enumerate(values):
priority = (i + 1) * 10
lines.append(" {")
lines.append(f' exchange = "{ensure_fqdn(v)}";')
lines.append(f" preference = {priority};")
lines.append(" }")
lines.append(" ];")
else:
lines.append(" MX = [")
lines.append(" {")
lines.append(f' exchange = "{ensure_fqdn(values)}";')
lines.append(" preference = 10;")
lines.append(" }")
lines.append(" ];")
if "TXT" in record:
values = record["TXT"]
if isinstance(values, list):
if len(values) == 1:
lines.append(f' TXT = [ "{escape_nix_string(values[0])}" ];')
else:
lines.append(" TXT = [")
for v in values:
lines.append(f' "{escape_nix_string(v)}"')
lines.append(" ];")
else:
lines.append(f' TXT = [ "{escape_nix_string(values)}" ];')
if "NS" in record:
values = record["NS"]
if isinstance(values, list):
if len(values) == 1:
lines.append(f' NS = [ "{ensure_fqdn(values[0])}" ];')
else:
lines.append(" NS = [")
for v in values:
lines.append(f' "{ensure_fqdn(v)}"')
lines.append(" ];")
else:
lines.append(f' NS = [ "{ensure_fqdn(values)}" ];')
if "SRV" in record:
values = record["SRV"]
if isinstance(values, list):
lines.append(" SRV = [")
for srv in values:
lines.append(" {")
if "service" in srv:
lines.append(f' service = "{srv["service"]}";')
if "proto" in srv:
lines.append(f' proto = "{srv["proto"]}";')
if "port" in srv:
lines.append(f" port = {srv['port']};")
if "priority" in srv:
lines.append(f" priority = {srv['priority']};")
if "weight" in srv:
lines.append(f" weight = {srv['weight']};")
if "target" in srv:
lines.append(f' target = "{ensure_fqdn(srv["target"])}";')
lines.append(" }")
lines.append(" ];")
if "CAA" in record:
values = record["CAA"]
if isinstance(values, list):
lines.append(" CAA = [")
for caa in values:
lines.append(" {")
if "flags" in caa:
lines.append(f" flags = {caa['flags']};")
if "tag" in caa:
lines.append(f' tag = "{caa["tag"]}";')
if "value" in caa:
lines.append(f' value = "{escape_nix_string(caa["value"])}";')
lines.append(" }")
lines.append(" ];")
return lines
def ensure_fqdn(domain: str) -> str:
"""Ensure a domain name ends with a dot (FQDN)."""
if not domain.endswith("."):
return domain + "."
return domain
def migrate_file(json_path: Path, dry_run: bool = False, delete_json: bool = False) -> bool:
"""Migrate a single JSON file to .nix. Returns True on success."""
try:
with open(json_path, "r") as f:
data = json.load(f)
except json.JSONDecodeError as e:
print(f" ERROR: Failed to parse {json_path.name}: {e}", file=sys.stderr)
return False
nix_content = json_to_nix(data)
nix_filename = json_path.stem + ".nix"
nix_path = json_path.parent / nix_filename
if dry_run:
print(f"--- {nix_filename} ---")
print(nix_content)
return True
with open(nix_path, "w") as f:
f.write(nix_content)
print(f" Created {nix_path.name}")
if delete_json:
json_path.unlink()
print(f" Deleted {json_path.name}")
return True
def main():
dry_run = "--dry-run" in sys.argv
delete_json = "--delete-json" in sys.argv
if not DOMAINS_DIR.exists():
print(f"Error: domains directory not found at {DOMAINS_DIR}", file=sys.stderr)
sys.exit(1)
json_files = sorted(DOMAINS_DIR.glob("*.json"))
if not json_files:
print("No JSON files found in domains/")
sys.exit(0)
print(f"Found {len(json_files)} JSON file(s) to migrate")
if dry_run:
print("(dry run — no files will be written)\n")
success = 0
failed = 0
for json_path in json_files:
print(f"Migrating {json_path.name}...")
if migrate_file(json_path, dry_run=dry_run, delete_json=delete_json):
success += 1
else:
failed += 1
print(f"\nDone: {success} succeeded, {failed} failed")
if failed > 0:
sys.exit(1)
if __name__ == "__main__":
main()