From 542455109de366e6ea1c4e0fa99f691a686d09eb Mon Sep 17 00:00:00 2001 From: Xubin Ren Date: Fri, 20 Mar 2026 18:47:54 +0000 Subject: [PATCH] fix(email): preserve fetched messages across IMAP retry Keep messages already collected in the current poll cycle when a stale IMAP connection dies mid-fetch, so retrying once does not drop emails that were already parsed and marked seen. Add a regression test covering a mid-cycle disconnect after the first message succeeds. Made-with: Cursor --- nanobot/channels/email.py | 38 ++++++++++++++++++++++---------- tests/test_email_channel.py | 43 +++++++++++++++++++++++++++++++++++++ 2 files changed, 70 insertions(+), 11 deletions(-) diff --git a/nanobot/channels/email.py b/nanobot/channels/email.py index e0ce289..be3cb3e 100644 --- a/nanobot/channels/email.py +++ b/nanobot/channels/email.py @@ -282,13 +282,26 @@ class EmailChannel(BaseChannel): dedupe: bool, limit: int, ) -> list[dict[str, Any]]: - try: - return self._fetch_messages_once(search_criteria, mark_seen, dedupe, limit) - except Exception as exc: - if not self._is_stale_imap_error(exc): - raise - logger.warning("Email IMAP connection went stale, retrying once: {}", exc) - return self._fetch_messages_once(search_criteria, mark_seen, dedupe, limit) + messages: list[dict[str, Any]] = [] + cycle_uids: set[str] = set() + + for attempt in range(2): + try: + self._fetch_messages_once( + search_criteria, + mark_seen, + dedupe, + limit, + messages, + cycle_uids, + ) + return messages + except Exception as exc: + if attempt == 1 or not self._is_stale_imap_error(exc): + raise + logger.warning("Email IMAP connection went stale, retrying once: {}", exc) + + return messages def _fetch_messages_once( self, @@ -296,9 +309,10 @@ class EmailChannel(BaseChannel): mark_seen: bool, dedupe: bool, limit: int, - ) -> list[dict[str, Any]]: + messages: list[dict[str, Any]], + cycle_uids: set[str], + ) -> None: """Fetch messages by arbitrary IMAP search criteria.""" - messages: list[dict[str, Any]] = [] mailbox = self.config.imap_mailbox or "INBOX" if self.config.imap_use_ssl: @@ -336,6 +350,8 @@ class EmailChannel(BaseChannel): continue uid = self._extract_uid(fetched) + if uid and uid in cycle_uids: + continue if dedupe and uid and uid in self._processed_uids: continue @@ -378,6 +394,8 @@ class EmailChannel(BaseChannel): } ) + if uid: + cycle_uids.add(uid) if dedupe and uid: self._processed_uids.add(uid) # mark_seen is the primary dedup; this set is a safety net @@ -393,8 +411,6 @@ class EmailChannel(BaseChannel): except Exception: pass - return messages - @classmethod def _is_stale_imap_error(cls, exc: Exception) -> bool: message = str(exc).lower() diff --git a/tests/test_email_channel.py b/tests/test_email_channel.py index 63203ed..23d3ea7 100644 --- a/tests/test_email_channel.py +++ b/tests/test_email_channel.py @@ -133,6 +133,49 @@ def test_fetch_new_messages_retries_once_when_imap_connection_goes_stale(monkeyp assert fake_instances[1].search_calls == 1 +def test_fetch_new_messages_keeps_messages_collected_before_stale_retry(monkeypatch) -> None: + raw_first = _make_raw_email(subject="First", body="First body") + raw_second = _make_raw_email(subject="Second", body="Second body") + mailbox_state = { + b"1": {"uid": b"123", "raw": raw_first, "seen": False}, + b"2": {"uid": b"124", "raw": raw_second, "seen": False}, + } + fail_once = {"pending": True} + + class FlakyIMAP: + def login(self, _user: str, _pw: str): + return "OK", [b"logged in"] + + def select(self, _mailbox: str): + return "OK", [b"2"] + + def search(self, *_args): + unseen_ids = [imap_id for imap_id, item in mailbox_state.items() if not item["seen"]] + return "OK", [b" ".join(unseen_ids)] + + def fetch(self, imap_id: bytes, _parts: str): + if imap_id == b"2" and fail_once["pending"]: + fail_once["pending"] = False + raise imaplib.IMAP4.abort("socket error") + item = mailbox_state[imap_id] + header = b"%s (UID %s BODY[] {200})" % (imap_id, item["uid"]) + return "OK", [(header, item["raw"]), b")"] + + def store(self, imap_id: bytes, _op: str, _flags: str): + mailbox_state[imap_id]["seen"] = True + return "OK", [b""] + + def logout(self): + return "BYE", [b""] + + monkeypatch.setattr("nanobot.channels.email.imaplib.IMAP4_SSL", lambda _h, _p: FlakyIMAP()) + + channel = EmailChannel(_make_config(), MessageBus()) + items = channel._fetch_new_messages() + + assert [item["subject"] for item in items] == ["First", "Second"] + + def test_fetch_new_messages_skips_missing_mailbox(monkeypatch) -> None: class MissingMailboxIMAP: def login(self, _user: str, _pw: str):