diff --git a/mcquack.py b/mcquack.py index 634e106..d257fb2 100755 --- a/mcquack.py +++ b/mcquack.py @@ -31,20 +31,91 @@ def get_label(script_path: Path) -> str: return f"{PLIST_PREFIX}.{script_path.stem}" -def create_plist_dict(script_path: Path, args: list[str]) -> dict: +def parse_calendar(calendar_str: str) -> dict[str, int]: + """Parse a calendar string like 'Hour=2,Minute=0' into a dictionary. + + Valid keys: Month (1-12), Day (1-31), Weekday (0-7), Hour (0-23), Minute (0-59) + """ + valid_keys = {"Month", "Day", "Weekday", "Hour", "Minute"} + ranges = { + "Month": (1, 12), + "Day": (1, 31), + "Weekday": (0, 7), + "Hour": (0, 23), + "Minute": (0, 59), + } + + result = {} + parts = calendar_str.split(",") + + for part in parts: + part = part.strip() + if "=" not in part: + raise ValueError(f"Invalid calendar format: '{part}' (missing '=')") + + key, value = part.split("=", 1) + key = key.strip() + value = value.strip() + + if key not in valid_keys: + raise ValueError( + f"Invalid calendar key: '{key}' (valid keys: {', '.join(sorted(valid_keys))})" + ) + + try: + int_value = int(value) + except ValueError: + raise ValueError(f"Invalid calendar value for {key}: '{value}' (must be an integer)") + + min_val, max_val = ranges[key] + if not (min_val <= int_value <= max_val): + raise ValueError( + f"Invalid calendar value for {key}: {int_value} (must be {min_val}-{max_val})" + ) + + result[key] = int_value + + return result + + +def create_plist_dict( + script_path: Path, + args: list[str], + interval: int | None = None, + throttle: int | None = None, + calendar: list[dict[str, int]] | None = None, +) -> dict: """Create a plist dictionary for a script.""" label = get_label(script_path) program_args = [str(script_path.resolve())] + args - return { + plist = { "Label": label, "ProgramArguments": program_args, "RunAtLoad": True, - "KeepAlive": True, "StandardOutPath": str(LOGS_DIR / f"mcquack.{script_path.stem}.out.log"), "StandardErrorPath": str(LOGS_DIR / f"mcquack.{script_path.stem}.err.log"), } + # If scheduling is used, don't keep alive (let the schedule handle it) + if interval is not None: + plist["KeepAlive"] = False + plist["StartInterval"] = interval + elif calendar is not None: + plist["KeepAlive"] = False + # Single calendar entry is a dict, multiple entries are a list + if len(calendar) == 1: + plist["StartCalendarInterval"] = calendar[0] + else: + plist["StartCalendarInterval"] = calendar + else: + plist["KeepAlive"] = True + + if throttle is not None: + plist["ThrottleInterval"] = throttle + + return plist + @app.command("list") def list_agents() -> None: @@ -77,6 +148,29 @@ def create( list[str] | None, typer.Argument(help="Arguments to pass to the script (must come after '--')"), ] = None, + interval: Annotated[ + int | None, + typer.Option( + "--interval", + help="Run every N seconds (StartInterval)", + min=1, + ), + ] = None, + throttle: Annotated[ + int | None, + typer.Option( + "--throttle", + help="Minimum seconds between runs (ThrottleInterval)", + min=1, + ), + ] = None, + calendar: Annotated[ + list[str] | None, + typer.Option( + "--calendar", + help="Schedule with calendar interval (e.g., 'Hour=2,Minute=0'). Can be specified multiple times.", + ), + ] = None, ) -> None: """Create and load a LaunchAgent for a script. @@ -89,6 +183,13 @@ def create( mcquack create my_script.sh --help # Shows this help mcquack create my_script.sh -- --help # Passes --help to the script + + Scheduling options: + + mcquack create script.sh --interval 300 # Run every 5 minutes + mcquack create script.sh --interval 300 --throttle 60 # Run every 5 min, throttle 60s + mcquack create script.sh --calendar "Hour=2,Minute=0" # Every day at 2 AM + mcquack create script.sh --calendar "Weekday=1,Hour=9,Minute=0" # Mondays at 9 AM """ script_path = script.resolve() @@ -100,6 +201,14 @@ def create( typer.echo(f"Error: {script_path} is not executable.", err=True) raise typer.Exit(1) + # Validate mutually exclusive options + if interval is not None and calendar is not None: + typer.echo( + "Error: --interval and --calendar are mutually exclusive. Use one or the other.", + err=True, + ) + raise typer.Exit(1) + plist_path = get_plist_path(script_path) if plist_path.exists(): @@ -109,11 +218,26 @@ def create( ) raise typer.Exit(1) + # Parse calendar strings if provided + parsed_calendar = None + if calendar is not None: + try: + parsed_calendar = [parse_calendar(cal_str) for cal_str in calendar] + except ValueError as e: + typer.echo(f"Error: {e}", err=True) + raise typer.Exit(1) + # Ensure directories exist LAUNCH_AGENTS_DIR.mkdir(parents=True, exist_ok=True) LOGS_DIR.mkdir(parents=True, exist_ok=True) - plist = create_plist_dict(script_path, script_args or []) + plist = create_plist_dict( + script_path, + script_args or [], + interval=interval, + throttle=throttle, + calendar=parsed_calendar, + ) with open(plist_path, "wb") as f: plistlib.dump(plist, f) @@ -183,6 +307,15 @@ def show( ) typer.echo(f"RunAtLoad: {plist.get('RunAtLoad', False)}") typer.echo(f"KeepAlive: {plist.get('KeepAlive', False)}") + + # Display scheduling information + if "StartInterval" in plist: + typer.echo(f"StartInterval: {plist['StartInterval']}") + if "ThrottleInterval" in plist: + typer.echo(f"ThrottleInterval: {plist['ThrottleInterval']}") + if "StartCalendarInterval" in plist: + typer.echo(f"StartCalendarInterval: {plist['StartCalendarInterval']}") + typer.echo(f"Stdout: {plist.get('StandardOutPath', 'N/A')}") typer.echo(f"Stderr: {plist.get('StandardErrorPath', 'N/A')}") diff --git a/test_mcquack.py b/test_mcquack.py index 7301094..4402d1c 100644 --- a/test_mcquack.py +++ b/test_mcquack.py @@ -370,3 +370,300 @@ class TestPlistFormat: break else: pytest.fail("ProgramArguments key not found") + + +class TestStartInterval: + """Tests for StartInterval scheduling.""" + + def test_create_with_interval(self, mock_dirs, mock_script, mock_launchctl): + """Test creating agent with StartInterval.""" + result = runner.invoke(app, ["create", str(mock_script), "--interval", "300"]) + assert result.exit_code == 0 + assert "Created:" in result.stdout + + # Verify plist contains StartInterval + plist_path = get_plist_path(mock_dirs) + with open(plist_path, "rb") as f: + plist = plistlib.load(f) + + assert plist.get("StartInterval") == 300 + assert plist.get("KeepAlive") is False + assert plist.get("RunAtLoad") is True + + def test_create_with_interval_and_args(self, mock_dirs, mock_script, mock_launchctl): + """Test creating agent with StartInterval and script arguments.""" + result = runner.invoke(app, ["create", str(mock_script), "--interval", "600", "--", "--config", "test.json"]) + assert result.exit_code == 0 + + plist_path = get_plist_path(mock_dirs) + with open(plist_path, "rb") as f: + plist = plistlib.load(f) + + assert plist.get("StartInterval") == 600 + program_args = plist.get("ProgramArguments", []) + assert program_args[1:] == ["--config", "test.json"] + + def test_interval_must_be_positive(self, mock_dirs, mock_script, mock_launchctl): + """Test that interval must be a positive integer.""" + result = runner.invoke(app, ["create", str(mock_script), "--interval", "-1"]) + assert result.exit_code != 0 + + result = runner.invoke(app, ["create", str(mock_script), "--interval", "0"]) + assert result.exit_code != 0 + + def test_interval_xml_format(self, mock_dirs, mock_script, mock_launchctl): + """Verify StartInterval is correctly formatted in XML.""" + runner.invoke(app, ["create", str(mock_script), "--interval", "120"]) + + plist_path = get_plist_path(mock_dirs) + tree = ET.parse(plist_path) + root = tree.getroot() + dict_elem = root.find("dict") + assert dict_elem is not None + + # Find StartInterval key and verify it's an integer + children = list(dict_elem) + for i, child in enumerate(children): + if child.tag == "key" and child.text == "StartInterval": + value_elem = children[i + 1] + assert value_elem.tag == "integer" + assert value_elem.text == "120" + break + else: + pytest.fail("StartInterval key not found") + + def test_show_displays_interval(self, mock_dirs, mock_script, mock_launchctl): + """Test that show command displays StartInterval.""" + runner.invoke(app, ["create", str(mock_script), "--interval", "300"]) + + result = runner.invoke(app, ["show", str(mock_script)]) + assert result.exit_code == 0 + assert "StartInterval: 300" in result.stdout or "300" in result.stdout + + +class TestThrottleInterval: + """Tests for ThrottleInterval.""" + + def test_create_with_throttle_and_interval(self, mock_dirs, mock_script, mock_launchctl): + """Test creating agent with both ThrottleInterval and StartInterval.""" + result = runner.invoke(app, ["create", str(mock_script), "--interval", "300", "--throttle", "60"]) + assert result.exit_code == 0 + + plist_path = get_plist_path(mock_dirs) + with open(plist_path, "rb") as f: + plist = plistlib.load(f) + + assert plist.get("StartInterval") == 300 + assert plist.get("ThrottleInterval") == 60 + assert plist.get("KeepAlive") is False + + def test_throttle_must_be_positive(self, mock_dirs, mock_script, mock_launchctl): + """Test that throttle must be a positive integer.""" + result = runner.invoke(app, ["create", str(mock_script), "--interval", "300", "--throttle", "-1"]) + assert result.exit_code != 0 + + result = runner.invoke(app, ["create", str(mock_script), "--interval", "300", "--throttle", "0"]) + assert result.exit_code != 0 + + def test_throttle_xml_format(self, mock_dirs, mock_script, mock_launchctl): + """Verify ThrottleInterval is correctly formatted in XML.""" + runner.invoke(app, ["create", str(mock_script), "--interval", "300", "--throttle", "45"]) + + plist_path = get_plist_path(mock_dirs) + tree = ET.parse(plist_path) + root = tree.getroot() + dict_elem = root.find("dict") + assert dict_elem is not None + + # Find ThrottleInterval key and verify it's an integer + children = list(dict_elem) + for i, child in enumerate(children): + if child.tag == "key" and child.text == "ThrottleInterval": + value_elem = children[i + 1] + assert value_elem.tag == "integer" + assert value_elem.text == "45" + break + else: + pytest.fail("ThrottleInterval key not found") + + def test_show_displays_throttle(self, mock_dirs, mock_script, mock_launchctl): + """Test that show command displays ThrottleInterval.""" + runner.invoke(app, ["create", str(mock_script), "--interval", "300", "--throttle", "60"]) + + result = runner.invoke(app, ["show", str(mock_script)]) + assert result.exit_code == 0 + assert "ThrottleInterval: 60" in result.stdout + + +class TestStartCalendarInterval: + """Tests for StartCalendarInterval scheduling.""" + + def test_create_with_calendar_simple(self, mock_dirs, mock_script, mock_launchctl): + """Test creating agent with simple calendar interval (every day at 2 AM).""" + result = runner.invoke(app, ["create", str(mock_script), "--calendar", "Hour=2,Minute=0"]) + assert result.exit_code == 0 + + plist_path = get_plist_path(mock_dirs) + with open(plist_path, "rb") as f: + plist = plistlib.load(f) + + assert "StartCalendarInterval" in plist + calendar = plist["StartCalendarInterval"] + assert isinstance(calendar, dict) + assert calendar.get("Hour") == 2 + assert calendar.get("Minute") == 0 + assert plist.get("KeepAlive") is False + assert plist.get("RunAtLoad") is True + + def test_create_with_calendar_all_fields(self, mock_dirs, mock_script, mock_launchctl): + """Test calendar with all possible fields.""" + result = runner.invoke( + app, + ["create", str(mock_script), "--calendar", "Month=12,Day=25,Hour=9,Minute=30,Weekday=1"] + ) + assert result.exit_code == 0 + + plist_path = get_plist_path(mock_dirs) + with open(plist_path, "rb") as f: + plist = plistlib.load(f) + + calendar = plist["StartCalendarInterval"] + assert calendar.get("Month") == 12 + assert calendar.get("Day") == 25 + assert calendar.get("Hour") == 9 + assert calendar.get("Minute") == 30 + assert calendar.get("Weekday") == 1 + + def test_create_with_multiple_calendar_entries(self, mock_dirs, mock_script, mock_launchctl): + """Test creating agent with multiple calendar intervals.""" + result = runner.invoke( + app, + ["create", str(mock_script), "--calendar", "Weekday=1,Hour=9,Minute=0", "--calendar", "Weekday=5,Hour=9,Minute=0"] + ) + assert result.exit_code == 0 + + plist_path = get_plist_path(mock_dirs) + with open(plist_path, "rb") as f: + plist = plistlib.load(f) + + calendar = plist["StartCalendarInterval"] + assert isinstance(calendar, list) + assert len(calendar) == 2 + assert calendar[0] == {"Weekday": 1, "Hour": 9, "Minute": 0} + assert calendar[1] == {"Weekday": 5, "Hour": 9, "Minute": 0} + + def test_calendar_with_script_args(self, mock_dirs, mock_script, mock_launchctl): + """Test calendar interval combined with script arguments.""" + result = runner.invoke( + app, + ["create", str(mock_script), "--calendar", "Hour=3,Minute=0", "--", "--config", "test.json"] + ) + assert result.exit_code == 0 + + plist_path = get_plist_path(mock_dirs) + with open(plist_path, "rb") as f: + plist = plistlib.load(f) + + program_args = plist.get("ProgramArguments", []) + assert program_args[1:] == ["--config", "test.json"] + assert plist["StartCalendarInterval"]["Hour"] == 3 + + def test_calendar_invalid_format(self, mock_dirs, mock_script, mock_launchctl): + """Test that invalid calendar format is rejected.""" + # Missing equals sign + result = runner.invoke(app, ["create", str(mock_script), "--calendar", "Hour2"]) + assert result.exit_code != 0 + + # Invalid key + result = runner.invoke(app, ["create", str(mock_script), "--calendar", "InvalidKey=5"]) + assert result.exit_code != 0 + + # Non-integer value + result = runner.invoke(app, ["create", str(mock_script), "--calendar", "Hour=abc"]) + assert result.exit_code != 0 + + def test_calendar_invalid_values(self, mock_dirs, mock_script, mock_launchctl): + """Test that invalid calendar values are rejected.""" + # Hour out of range + result = runner.invoke(app, ["create", str(mock_script), "--calendar", "Hour=25"]) + assert result.exit_code != 0 + + # Minute out of range + result = runner.invoke(app, ["create", str(mock_script), "--calendar", "Minute=60"]) + assert result.exit_code != 0 + + # Month out of range + result = runner.invoke(app, ["create", str(mock_script), "--calendar", "Month=13"]) + assert result.exit_code != 0 + + # Day out of range + result = runner.invoke(app, ["create", str(mock_script), "--calendar", "Day=32"]) + assert result.exit_code != 0 + + # Weekday out of range + result = runner.invoke(app, ["create", str(mock_script), "--calendar", "Weekday=8"]) + assert result.exit_code != 0 + + def test_calendar_xml_format_single(self, mock_dirs, mock_script, mock_launchctl): + """Verify single calendar entry is correctly formatted in XML.""" + runner.invoke(app, ["create", str(mock_script), "--calendar", "Hour=14,Minute=30"]) + + plist_path = get_plist_path(mock_dirs) + tree = ET.parse(plist_path) + root = tree.getroot() + dict_elem = root.find("dict") + assert dict_elem is not None + + # Find StartCalendarInterval key + children = list(dict_elem) + for i, child in enumerate(children): + if child.tag == "key" and child.text == "StartCalendarInterval": + value_elem = children[i + 1] + # Single entry should be a dict + assert value_elem.tag == "dict" + break + else: + pytest.fail("StartCalendarInterval key not found") + + def test_calendar_xml_format_multiple(self, mock_dirs, mock_script, mock_launchctl): + """Verify multiple calendar entries are correctly formatted in XML as array.""" + runner.invoke( + app, + ["create", str(mock_script), "--calendar", "Hour=9,Minute=0", "--calendar", "Hour=17,Minute=0"] + ) + + plist_path = get_plist_path(mock_dirs) + tree = ET.parse(plist_path) + root = tree.getroot() + dict_elem = root.find("dict") + assert dict_elem is not None + + # Find StartCalendarInterval key + children = list(dict_elem) + for i, child in enumerate(children): + if child.tag == "key" and child.text == "StartCalendarInterval": + value_elem = children[i + 1] + # Multiple entries should be an array + assert value_elem.tag == "array" + dicts_in_array = value_elem.findall("dict") + assert len(dicts_in_array) == 2 + break + else: + pytest.fail("StartCalendarInterval key not found") + + def test_show_displays_calendar(self, mock_dirs, mock_script, mock_launchctl): + """Test that show command displays StartCalendarInterval.""" + runner.invoke(app, ["create", str(mock_script), "--calendar", "Hour=2,Minute=0"]) + + result = runner.invoke(app, ["show", str(mock_script)]) + assert result.exit_code == 0 + assert "StartCalendarInterval" in result.stdout + + def test_calendar_and_interval_mutually_exclusive(self, mock_dirs, mock_script, mock_launchctl): + """Test that --calendar and --interval cannot be used together.""" + result = runner.invoke( + app, + ["create", str(mock_script), "--interval", "300", "--calendar", "Hour=2,Minute=0"] + ) + assert result.exit_code != 0 + assert "mutually exclusive" in result.output.lower() or "cannot" in result.output.lower()