forked from mirrors/kingfisher
108 lines
3.3 KiB
Python
108 lines
3.3 KiB
Python
#!/usr/bin/env -S uv run --script
|
||
# /// script
|
||
# requires-python = ">=3.9"
|
||
# dependencies = ["PyYAML>=6.0", "idna>=3.7"]
|
||
# ///
|
||
|
||
import os
|
||
import re
|
||
import socket
|
||
from pathlib import Path
|
||
from urllib.parse import urlparse
|
||
|
||
import idna
|
||
|
||
try:
|
||
import yaml
|
||
except ModuleNotFoundError as exc:
|
||
raise SystemExit(
|
||
"PyYAML isn’t installed.\n"
|
||
"Run the script with `uv run …` or add PyYAML to the dependency list."
|
||
) from exc
|
||
|
||
|
||
RULES_DIR = Path(os.path.expanduser("../../data/rules"))
|
||
URL_KEY_RE = re.compile(r"url$", re.IGNORECASE) # keys literally named “url”
|
||
TEMPLATE_RE = re.compile(r"\{\{.*?\}\}") # strip Liquid placeholders
|
||
DOMAIN_RE = re.compile(r"^(?:[a-z][a-z0-9+\-.]*://)?([^/]+)", re.I)
|
||
|
||
|
||
def find_yaml_files(root: Path):
|
||
yield from root.rglob("*.yml")
|
||
yield from root.rglob("*.yaml")
|
||
|
||
|
||
def extract_domains(obj):
|
||
"""Recursively yield every domain appearing in any 'url' key."""
|
||
if isinstance(obj, dict):
|
||
for k, v in obj.items():
|
||
if URL_KEY_RE.fullmatch(str(k)) and isinstance(v, str):
|
||
cleaned = TEMPLATE_RE.sub("", v).strip()
|
||
parsed = urlparse(cleaned).netloc
|
||
if not parsed:
|
||
m = DOMAIN_RE.match(cleaned)
|
||
if not m: # value wasn’t a URL/host – ignore
|
||
continue
|
||
parsed = m.group(1)
|
||
|
||
domain = (
|
||
parsed.split("@")[-1] # drop any creds
|
||
.split(":")[0] # drop port
|
||
.lstrip(".")
|
||
.rstrip(".")
|
||
.lower()
|
||
)
|
||
if domain and "{{" not in domain: # ignore Liquid tokens
|
||
yield domain
|
||
else:
|
||
yield from extract_domains(v)
|
||
elif isinstance(obj, list):
|
||
for item in obj:
|
||
yield from extract_domains(item)
|
||
|
||
|
||
def domain_active(domain: str) -> bool:
|
||
"""Return True iff *domain* resolves via DNS."""
|
||
if not domain:
|
||
return False
|
||
try:
|
||
ascii_domain = idna.encode(domain).decode() # puny-encode if needed
|
||
socket.gethostbyname(ascii_domain)
|
||
return True
|
||
except (socket.gaierror, UnicodeError, idna.IDNAError):
|
||
return False
|
||
|
||
|
||
def main():
|
||
# list of (yaml_path, [dead_domain, …])
|
||
inactive_files: list[tuple[Path, list[str]]] = []
|
||
|
||
for yml_path in find_yaml_files(RULES_DIR):
|
||
try:
|
||
docs = yaml.safe_load_all(yml_path.read_text())
|
||
except yaml.YAMLError as e:
|
||
print(f"⚠️ Skipping {yml_path} (YAML error: {e})")
|
||
continue
|
||
|
||
domains: set[str] = set()
|
||
for doc in docs:
|
||
if doc is not None:
|
||
domains.update(extract_domains(doc))
|
||
|
||
if not domains:
|
||
continue
|
||
|
||
dead = sorted({d for d in domains if not domain_active(d)})
|
||
if dead:
|
||
inactive_files.append((yml_path, dead))
|
||
|
||
if inactive_files:
|
||
print("YAML files with at least one non-resolving domain:")
|
||
for path, dead in inactive_files:
|
||
print(f" - {path}: {', '.join(dead)}")
|
||
else:
|
||
print("✅ All domains resolve.")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|