import asyncio import json import pytest from nanobot.cron.service import CronService from nanobot.cron.types import CronSchedule def test_add_job_rejects_unknown_timezone(tmp_path) -> None: service = CronService(tmp_path / "cron" / "jobs.json") with pytest.raises(ValueError, match="unknown timezone 'America/Vancovuer'"): service.add_job( name="tz typo", schedule=CronSchedule(kind="cron", expr="0 9 * * *", tz="America/Vancovuer"), message="hello", ) assert service.list_jobs(include_disabled=True) == [] def test_add_job_accepts_valid_timezone(tmp_path) -> None: service = CronService(tmp_path / "cron" / "jobs.json") job = service.add_job( name="tz ok", schedule=CronSchedule(kind="cron", expr="0 9 * * *", tz="America/Vancouver"), message="hello", ) assert job.schedule.tz == "America/Vancouver" assert job.state.next_run_at_ms is not None @pytest.mark.asyncio async def test_execute_job_records_run_history(tmp_path) -> None: store_path = tmp_path / "cron" / "jobs.json" service = CronService(store_path, on_job=lambda _: asyncio.sleep(0)) job = service.add_job( name="hist", schedule=CronSchedule(kind="every", every_ms=60_000), message="hello", ) await service.run_job(job.id) loaded = service.get_job(job.id) assert loaded is not None assert len(loaded.state.run_history) == 1 rec = loaded.state.run_history[0] assert rec.status == "ok" assert rec.duration_ms >= 0 assert rec.error is None @pytest.mark.asyncio async def test_run_history_records_errors(tmp_path) -> None: store_path = tmp_path / "cron" / "jobs.json" async def fail(_): raise RuntimeError("boom") service = CronService(store_path, on_job=fail) job = service.add_job( name="fail", schedule=CronSchedule(kind="every", every_ms=60_000), message="hello", ) await service.run_job(job.id) loaded = service.get_job(job.id) assert len(loaded.state.run_history) == 1 assert loaded.state.run_history[0].status == "error" assert loaded.state.run_history[0].error == "boom" @pytest.mark.asyncio async def test_run_history_trimmed_to_max(tmp_path) -> None: store_path = tmp_path / "cron" / "jobs.json" service = CronService(store_path, on_job=lambda _: asyncio.sleep(0)) job = service.add_job( name="trim", schedule=CronSchedule(kind="every", every_ms=60_000), message="hello", ) for _ in range(25): await service.run_job(job.id) loaded = service.get_job(job.id) assert len(loaded.state.run_history) == CronService._MAX_RUN_HISTORY @pytest.mark.asyncio async def test_run_history_persisted_to_disk(tmp_path) -> None: store_path = tmp_path / "cron" / "jobs.json" service = CronService(store_path, on_job=lambda _: asyncio.sleep(0)) job = service.add_job( name="persist", schedule=CronSchedule(kind="every", every_ms=60_000), message="hello", ) await service.run_job(job.id) raw = json.loads(store_path.read_text()) history = raw["jobs"][0]["state"]["runHistory"] assert len(history) == 1 assert history[0]["status"] == "ok" assert "runAtMs" in history[0] assert "durationMs" in history[0] fresh = CronService(store_path) loaded = fresh.get_job(job.id) assert len(loaded.state.run_history) == 1 assert loaded.state.run_history[0].status == "ok" @pytest.mark.asyncio async def test_running_service_honors_external_disable(tmp_path) -> None: store_path = tmp_path / "cron" / "jobs.json" called: list[str] = [] async def on_job(job) -> None: called.append(job.id) service = CronService(store_path, on_job=on_job) job = service.add_job( name="external-disable", schedule=CronSchedule(kind="every", every_ms=200), message="hello", ) await service.start() try: # Wait slightly to ensure file mtime is definitively different await asyncio.sleep(0.05) external = CronService(store_path) updated = external.enable_job(job.id, enabled=False) assert updated is not None assert updated.enabled is False await asyncio.sleep(0.35) assert called == [] finally: service.stop()