mirror of
https://github.com/partofmyid/register.git
synced 2026-06-05 18:46:50 +07:00
279 lines
No EOL
8.5 KiB
Python
Executable file
279 lines
No EOL
8.5 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
Migration script to convert domains/*.json to domains/*.nix
|
|
|
|
Reads each JSON domain config and generates a corresponding .nix file
|
|
following the format from docs/example.nix.
|
|
|
|
Usage:
|
|
python3 scripts/migrate-json-to-nix.py [--dry-run] [--delete-json]
|
|
|
|
Options:
|
|
--dry-run Print generated .nix content to stdout without writing files
|
|
--delete-json Delete the original .json files after successful conversion
|
|
"""
|
|
|
|
import json
|
|
import sys
|
|
import os
|
|
from pathlib import Path
|
|
|
|
DOMAINS_DIR = Path(__file__).resolve().parent.parent / "domains"
|
|
|
|
|
|
def json_to_nix(data: dict) -> str:
|
|
"""Convert a single domain JSON config to a .nix file string."""
|
|
owner = data.get("owner", {})
|
|
description = data.get("description")
|
|
record = data.get("record", {})
|
|
# Some files use "proxy", others use "proxied"
|
|
proxy = data.get("proxied", data.get("proxy"))
|
|
|
|
lines = []
|
|
|
|
# Header
|
|
lines.append("{ dns, ... }: let")
|
|
|
|
# Owner block
|
|
owner_lines = []
|
|
if owner.get("username"):
|
|
owner_lines.append(f' username = "{owner["username"]}";')
|
|
if owner.get("email"):
|
|
owner_lines.append(f' email = "{owner["email"]}";')
|
|
if owner.get("discord"):
|
|
owner_lines.append(f' discord = "{owner["discord"]}";')
|
|
if owner.get("repo"):
|
|
owner_lines.append(f' repo = "{owner["repo"]}";')
|
|
|
|
lines.append(" owner = {")
|
|
for ol in owner_lines:
|
|
lines.append(ol)
|
|
lines.append(" };")
|
|
|
|
if description is not None:
|
|
lines.append(f' description = "{escape_nix_string(description)}";')
|
|
|
|
if proxy is not None:
|
|
lines.append(f" proxy = {'true' if proxy else 'false'};")
|
|
|
|
lines.append("in with dns.lib.combinators; {")
|
|
|
|
# Records
|
|
record_lines = build_record_lines(record)
|
|
for rl in record_lines:
|
|
lines.append(rl)
|
|
|
|
lines.append("}")
|
|
lines.append("")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def escape_nix_string(s: str) -> str:
|
|
"""Escape special characters for a Nix double-quoted string."""
|
|
s = s.replace("\\", "\\\\")
|
|
s = s.replace('"', '\\"')
|
|
s = s.replace("${", "\\${")
|
|
return s
|
|
|
|
|
|
def build_record_lines(record: dict) -> list[str]:
|
|
"""Build the Nix record lines from the JSON record dict."""
|
|
lines = []
|
|
|
|
if "A" in record:
|
|
values = record["A"]
|
|
if isinstance(values, list):
|
|
if len(values) == 1:
|
|
lines.append(f' A = [ "{values[0]}" ];')
|
|
else:
|
|
lines.append(" A = [")
|
|
for v in values:
|
|
lines.append(f' "{v}"')
|
|
lines.append(" ];")
|
|
else:
|
|
lines.append(f' A = [ "{values}" ];')
|
|
|
|
if "AAAA" in record:
|
|
values = record["AAAA"]
|
|
if isinstance(values, list):
|
|
if len(values) == 1:
|
|
lines.append(f' AAAA = [ "{values[0]}" ];')
|
|
else:
|
|
lines.append(" AAAA = [")
|
|
for v in values:
|
|
lines.append(f' "{v}"')
|
|
lines.append(" ];")
|
|
else:
|
|
lines.append(f' AAAA = [ "{values}" ];')
|
|
|
|
if "CNAME" in record:
|
|
value = record["CNAME"]
|
|
if isinstance(value, list):
|
|
value = value[0]
|
|
lines.append(f' CNAME = [ "{value}." ];')
|
|
|
|
if "ALIAS" in record:
|
|
value = record["ALIAS"]
|
|
if isinstance(value, list):
|
|
value = value[0]
|
|
# ALIAS is typically handled as CNAME in dns.nix
|
|
lines.append(f' CNAME = [ "{value}." ];')
|
|
|
|
if "MX" in record:
|
|
values = record["MX"]
|
|
if isinstance(values, list):
|
|
lines.append(" MX = [")
|
|
for i, v in enumerate(values):
|
|
# MX records need priority; default to (i+1)*10
|
|
priority = (i + 1) * 10
|
|
lines.append(" {")
|
|
lines.append(f' exchange = "{ensure_fqdn(v)}";')
|
|
lines.append(f" preference = {priority};")
|
|
lines.append(" }")
|
|
lines.append(" ];")
|
|
else:
|
|
lines.append(" MX = [")
|
|
lines.append(" {")
|
|
lines.append(f' exchange = "{ensure_fqdn(values)}";')
|
|
lines.append(" preference = 10;")
|
|
lines.append(" }")
|
|
lines.append(" ];")
|
|
|
|
if "TXT" in record:
|
|
values = record["TXT"]
|
|
if isinstance(values, list):
|
|
if len(values) == 1:
|
|
lines.append(f' TXT = [ "{escape_nix_string(values[0])}" ];')
|
|
else:
|
|
lines.append(" TXT = [")
|
|
for v in values:
|
|
lines.append(f' "{escape_nix_string(v)}"')
|
|
lines.append(" ];")
|
|
else:
|
|
lines.append(f' TXT = [ "{escape_nix_string(values)}" ];')
|
|
|
|
if "NS" in record:
|
|
values = record["NS"]
|
|
if isinstance(values, list):
|
|
if len(values) == 1:
|
|
lines.append(f' NS = [ "{ensure_fqdn(values[0])}" ];')
|
|
else:
|
|
lines.append(" NS = [")
|
|
for v in values:
|
|
lines.append(f' "{ensure_fqdn(v)}"')
|
|
lines.append(" ];")
|
|
else:
|
|
lines.append(f' NS = [ "{ensure_fqdn(values)}" ];')
|
|
|
|
if "SRV" in record:
|
|
values = record["SRV"]
|
|
if isinstance(values, list):
|
|
lines.append(" SRV = [")
|
|
for srv in values:
|
|
lines.append(" {")
|
|
if "service" in srv:
|
|
lines.append(f' service = "{srv["service"]}";')
|
|
if "proto" in srv:
|
|
lines.append(f' proto = "{srv["proto"]}";')
|
|
if "port" in srv:
|
|
lines.append(f" port = {srv['port']};")
|
|
if "priority" in srv:
|
|
lines.append(f" priority = {srv['priority']};")
|
|
if "weight" in srv:
|
|
lines.append(f" weight = {srv['weight']};")
|
|
if "target" in srv:
|
|
lines.append(f' target = "{ensure_fqdn(srv["target"])}";')
|
|
lines.append(" }")
|
|
lines.append(" ];")
|
|
|
|
if "CAA" in record:
|
|
values = record["CAA"]
|
|
if isinstance(values, list):
|
|
lines.append(" CAA = [")
|
|
for caa in values:
|
|
lines.append(" {")
|
|
if "flags" in caa:
|
|
lines.append(f" flags = {caa['flags']};")
|
|
if "tag" in caa:
|
|
lines.append(f' tag = "{caa["tag"]}";')
|
|
if "value" in caa:
|
|
lines.append(f' value = "{escape_nix_string(caa["value"])}";')
|
|
lines.append(" }")
|
|
lines.append(" ];")
|
|
|
|
return lines
|
|
|
|
|
|
def ensure_fqdn(domain: str) -> str:
|
|
"""Ensure a domain name ends with a dot (FQDN)."""
|
|
if not domain.endswith("."):
|
|
return domain + "."
|
|
return domain
|
|
|
|
|
|
def migrate_file(json_path: Path, dry_run: bool = False, delete_json: bool = False) -> bool:
|
|
"""Migrate a single JSON file to .nix. Returns True on success."""
|
|
try:
|
|
with open(json_path, "r") as f:
|
|
data = json.load(f)
|
|
except json.JSONDecodeError as e:
|
|
print(f" ERROR: Failed to parse {json_path.name}: {e}", file=sys.stderr)
|
|
return False
|
|
|
|
nix_content = json_to_nix(data)
|
|
nix_filename = json_path.stem + ".nix"
|
|
nix_path = json_path.parent / nix_filename
|
|
|
|
if dry_run:
|
|
print(f"--- {nix_filename} ---")
|
|
print(nix_content)
|
|
return True
|
|
|
|
with open(nix_path, "w") as f:
|
|
f.write(nix_content)
|
|
|
|
print(f" Created {nix_path.name}")
|
|
|
|
if delete_json:
|
|
json_path.unlink()
|
|
print(f" Deleted {json_path.name}")
|
|
|
|
return True
|
|
|
|
|
|
def main():
|
|
dry_run = "--dry-run" in sys.argv
|
|
delete_json = "--delete-json" in sys.argv
|
|
|
|
if not DOMAINS_DIR.exists():
|
|
print(f"Error: domains directory not found at {DOMAINS_DIR}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
json_files = sorted(DOMAINS_DIR.glob("*.json"))
|
|
|
|
if not json_files:
|
|
print("No JSON files found in domains/")
|
|
sys.exit(0)
|
|
|
|
print(f"Found {len(json_files)} JSON file(s) to migrate")
|
|
if dry_run:
|
|
print("(dry run — no files will be written)\n")
|
|
|
|
success = 0
|
|
failed = 0
|
|
|
|
for json_path in json_files:
|
|
print(f"Migrating {json_path.name}...")
|
|
if migrate_file(json_path, dry_run=dry_run, delete_json=delete_json):
|
|
success += 1
|
|
else:
|
|
failed += 1
|
|
|
|
print(f"\nDone: {success} succeeded, {failed} failed")
|
|
if failed > 0:
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |