feat(cron): add run history tracking for cron jobs

Record run_at_ms, status, duration_ms and error for each execution,
keeping the last 20 entries per job in jobs.json. Adds CronRunRecord
dataclass, get_job() lookup, and four regression tests covering
success, error, trimming and persistence.

Closes #1837

Made-with: Cursor
This commit is contained in:
Xubin Ren
2026-03-20 18:17:33 +00:00
committed by Xubin Ren
parent ec2e12b028
commit 09ad9a4673
3 changed files with 130 additions and 5 deletions

View File

@@ -10,7 +10,7 @@ from typing import Any, Callable, Coroutine
from loguru import logger from loguru import logger
from nanobot.cron.types import CronJob, CronJobState, CronPayload, CronSchedule, CronStore from nanobot.cron.types import CronJob, CronJobState, CronPayload, CronRunRecord, CronSchedule, CronStore
def _now_ms() -> int: def _now_ms() -> int:
@@ -63,10 +63,12 @@ def _validate_schedule_for_add(schedule: CronSchedule) -> None:
class CronService: class CronService:
"""Service for managing and executing scheduled jobs.""" """Service for managing and executing scheduled jobs."""
_MAX_RUN_HISTORY = 20
def __init__( def __init__(
self, self,
store_path: Path, store_path: Path,
on_job: Callable[[CronJob], Coroutine[Any, Any, str | None]] | None = None on_job: Callable[[CronJob], Coroutine[Any, Any, str | None]] | None = None,
): ):
self.store_path = store_path self.store_path = store_path
self.on_job = on_job self.on_job = on_job
@@ -113,6 +115,15 @@ class CronService:
last_run_at_ms=j.get("state", {}).get("lastRunAtMs"), last_run_at_ms=j.get("state", {}).get("lastRunAtMs"),
last_status=j.get("state", {}).get("lastStatus"), last_status=j.get("state", {}).get("lastStatus"),
last_error=j.get("state", {}).get("lastError"), last_error=j.get("state", {}).get("lastError"),
run_history=[
CronRunRecord(
run_at_ms=r["runAtMs"],
status=r["status"],
duration_ms=r.get("durationMs", 0),
error=r.get("error"),
)
for r in j.get("state", {}).get("runHistory", [])
],
), ),
created_at_ms=j.get("createdAtMs", 0), created_at_ms=j.get("createdAtMs", 0),
updated_at_ms=j.get("updatedAtMs", 0), updated_at_ms=j.get("updatedAtMs", 0),
@@ -160,6 +171,15 @@ class CronService:
"lastRunAtMs": j.state.last_run_at_ms, "lastRunAtMs": j.state.last_run_at_ms,
"lastStatus": j.state.last_status, "lastStatus": j.state.last_status,
"lastError": j.state.last_error, "lastError": j.state.last_error,
"runHistory": [
{
"runAtMs": r.run_at_ms,
"status": r.status,
"durationMs": r.duration_ms,
"error": r.error,
}
for r in j.state.run_history
],
}, },
"createdAtMs": j.created_at_ms, "createdAtMs": j.created_at_ms,
"updatedAtMs": j.updated_at_ms, "updatedAtMs": j.updated_at_ms,
@@ -248,9 +268,8 @@ class CronService:
logger.info("Cron: executing job '{}' ({})", job.name, job.id) logger.info("Cron: executing job '{}' ({})", job.name, job.id)
try: try:
response = None
if self.on_job: if self.on_job:
response = await self.on_job(job) await self.on_job(job)
job.state.last_status = "ok" job.state.last_status = "ok"
job.state.last_error = None job.state.last_error = None
@@ -261,8 +280,17 @@ class CronService:
job.state.last_error = str(e) job.state.last_error = str(e)
logger.error("Cron: job '{}' failed: {}", job.name, e) logger.error("Cron: job '{}' failed: {}", job.name, e)
end_ms = _now_ms()
job.state.last_run_at_ms = start_ms job.state.last_run_at_ms = start_ms
job.updated_at_ms = _now_ms() job.updated_at_ms = end_ms
job.state.run_history.append(CronRunRecord(
run_at_ms=start_ms,
status=job.state.last_status,
duration_ms=end_ms - start_ms,
error=job.state.last_error,
))
job.state.run_history = job.state.run_history[-self._MAX_RUN_HISTORY:]
# Handle one-shot jobs # Handle one-shot jobs
if job.schedule.kind == "at": if job.schedule.kind == "at":
@@ -366,6 +394,11 @@ class CronService:
return True return True
return False return False
def get_job(self, job_id: str) -> CronJob | None:
"""Get a job by ID."""
store = self._load_store()
return next((j for j in store.jobs if j.id == job_id), None)
def status(self) -> dict: def status(self) -> dict:
"""Get service status.""" """Get service status."""
store = self._load_store() store = self._load_store()

View File

@@ -29,6 +29,15 @@ class CronPayload:
to: str | None = None # e.g. phone number to: str | None = None # e.g. phone number
@dataclass
class CronRunRecord:
"""A single execution record for a cron job."""
run_at_ms: int
status: Literal["ok", "error", "skipped"]
duration_ms: int = 0
error: str | None = None
@dataclass @dataclass
class CronJobState: class CronJobState:
"""Runtime state of a job.""" """Runtime state of a job."""
@@ -36,6 +45,7 @@ class CronJobState:
last_run_at_ms: int | None = None last_run_at_ms: int | None = None
last_status: Literal["ok", "error", "skipped"] | None = None last_status: Literal["ok", "error", "skipped"] | None = None
last_error: str | None = None last_error: str | None = None
run_history: list[CronRunRecord] = field(default_factory=list)
@dataclass @dataclass

View File

@@ -1,4 +1,5 @@
import asyncio import asyncio
import json
import pytest import pytest
@@ -32,6 +33,87 @@ def test_add_job_accepts_valid_timezone(tmp_path) -> None:
assert job.state.next_run_at_ms is not None assert job.state.next_run_at_ms is not None
@pytest.mark.asyncio
async def test_execute_job_records_run_history(tmp_path) -> None:
store_path = tmp_path / "cron" / "jobs.json"
service = CronService(store_path, on_job=lambda _: asyncio.sleep(0))
job = service.add_job(
name="hist",
schedule=CronSchedule(kind="every", every_ms=60_000),
message="hello",
)
await service.run_job(job.id)
loaded = service.get_job(job.id)
assert loaded is not None
assert len(loaded.state.run_history) == 1
rec = loaded.state.run_history[0]
assert rec.status == "ok"
assert rec.duration_ms >= 0
assert rec.error is None
@pytest.mark.asyncio
async def test_run_history_records_errors(tmp_path) -> None:
store_path = tmp_path / "cron" / "jobs.json"
async def fail(_):
raise RuntimeError("boom")
service = CronService(store_path, on_job=fail)
job = service.add_job(
name="fail",
schedule=CronSchedule(kind="every", every_ms=60_000),
message="hello",
)
await service.run_job(job.id)
loaded = service.get_job(job.id)
assert len(loaded.state.run_history) == 1
assert loaded.state.run_history[0].status == "error"
assert loaded.state.run_history[0].error == "boom"
@pytest.mark.asyncio
async def test_run_history_trimmed_to_max(tmp_path) -> None:
store_path = tmp_path / "cron" / "jobs.json"
service = CronService(store_path, on_job=lambda _: asyncio.sleep(0))
job = service.add_job(
name="trim",
schedule=CronSchedule(kind="every", every_ms=60_000),
message="hello",
)
for _ in range(25):
await service.run_job(job.id)
loaded = service.get_job(job.id)
assert len(loaded.state.run_history) == CronService._MAX_RUN_HISTORY
@pytest.mark.asyncio
async def test_run_history_persisted_to_disk(tmp_path) -> None:
store_path = tmp_path / "cron" / "jobs.json"
service = CronService(store_path, on_job=lambda _: asyncio.sleep(0))
job = service.add_job(
name="persist",
schedule=CronSchedule(kind="every", every_ms=60_000),
message="hello",
)
await service.run_job(job.id)
raw = json.loads(store_path.read_text())
history = raw["jobs"][0]["state"]["runHistory"]
assert len(history) == 1
assert history[0]["status"] == "ok"
assert "runAtMs" in history[0]
assert "durationMs" in history[0]
fresh = CronService(store_path)
loaded = fresh.get_job(job.id)
assert len(loaded.state.run_history) == 1
assert loaded.state.run_history[0].status == "ok"
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_running_service_honors_external_disable(tmp_path) -> None: async def test_running_service_honors_external_disable(tmp_path) -> None:
store_path = tmp_path / "cron" / "jobs.json" store_path = tmp_path / "cron" / "jobs.json"