fix(email): preserve fetched messages across IMAP retry
Keep messages already collected in the current poll cycle when a stale IMAP connection dies mid-fetch, so retrying once does not drop emails that were already parsed and marked seen. Add a regression test covering a mid-cycle disconnect after the first message succeeds. Made-with: Cursor
This commit is contained in:
@@ -282,13 +282,26 @@ class EmailChannel(BaseChannel):
|
|||||||
dedupe: bool,
|
dedupe: bool,
|
||||||
limit: int,
|
limit: int,
|
||||||
) -> list[dict[str, Any]]:
|
) -> list[dict[str, Any]]:
|
||||||
|
messages: list[dict[str, Any]] = []
|
||||||
|
cycle_uids: set[str] = set()
|
||||||
|
|
||||||
|
for attempt in range(2):
|
||||||
try:
|
try:
|
||||||
return self._fetch_messages_once(search_criteria, mark_seen, dedupe, limit)
|
self._fetch_messages_once(
|
||||||
|
search_criteria,
|
||||||
|
mark_seen,
|
||||||
|
dedupe,
|
||||||
|
limit,
|
||||||
|
messages,
|
||||||
|
cycle_uids,
|
||||||
|
)
|
||||||
|
return messages
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
if not self._is_stale_imap_error(exc):
|
if attempt == 1 or not self._is_stale_imap_error(exc):
|
||||||
raise
|
raise
|
||||||
logger.warning("Email IMAP connection went stale, retrying once: {}", exc)
|
logger.warning("Email IMAP connection went stale, retrying once: {}", exc)
|
||||||
return self._fetch_messages_once(search_criteria, mark_seen, dedupe, limit)
|
|
||||||
|
return messages
|
||||||
|
|
||||||
def _fetch_messages_once(
|
def _fetch_messages_once(
|
||||||
self,
|
self,
|
||||||
@@ -296,9 +309,10 @@ class EmailChannel(BaseChannel):
|
|||||||
mark_seen: bool,
|
mark_seen: bool,
|
||||||
dedupe: bool,
|
dedupe: bool,
|
||||||
limit: int,
|
limit: int,
|
||||||
) -> list[dict[str, Any]]:
|
messages: list[dict[str, Any]],
|
||||||
|
cycle_uids: set[str],
|
||||||
|
) -> None:
|
||||||
"""Fetch messages by arbitrary IMAP search criteria."""
|
"""Fetch messages by arbitrary IMAP search criteria."""
|
||||||
messages: list[dict[str, Any]] = []
|
|
||||||
mailbox = self.config.imap_mailbox or "INBOX"
|
mailbox = self.config.imap_mailbox or "INBOX"
|
||||||
|
|
||||||
if self.config.imap_use_ssl:
|
if self.config.imap_use_ssl:
|
||||||
@@ -336,6 +350,8 @@ class EmailChannel(BaseChannel):
|
|||||||
continue
|
continue
|
||||||
|
|
||||||
uid = self._extract_uid(fetched)
|
uid = self._extract_uid(fetched)
|
||||||
|
if uid and uid in cycle_uids:
|
||||||
|
continue
|
||||||
if dedupe and uid and uid in self._processed_uids:
|
if dedupe and uid and uid in self._processed_uids:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
@@ -378,6 +394,8 @@ class EmailChannel(BaseChannel):
|
|||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if uid:
|
||||||
|
cycle_uids.add(uid)
|
||||||
if dedupe and uid:
|
if dedupe and uid:
|
||||||
self._processed_uids.add(uid)
|
self._processed_uids.add(uid)
|
||||||
# mark_seen is the primary dedup; this set is a safety net
|
# mark_seen is the primary dedup; this set is a safety net
|
||||||
@@ -393,8 +411,6 @@ class EmailChannel(BaseChannel):
|
|||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
return messages
|
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def _is_stale_imap_error(cls, exc: Exception) -> bool:
|
def _is_stale_imap_error(cls, exc: Exception) -> bool:
|
||||||
message = str(exc).lower()
|
message = str(exc).lower()
|
||||||
|
|||||||
@@ -133,6 +133,49 @@ def test_fetch_new_messages_retries_once_when_imap_connection_goes_stale(monkeyp
|
|||||||
assert fake_instances[1].search_calls == 1
|
assert fake_instances[1].search_calls == 1
|
||||||
|
|
||||||
|
|
||||||
|
def test_fetch_new_messages_keeps_messages_collected_before_stale_retry(monkeypatch) -> None:
|
||||||
|
raw_first = _make_raw_email(subject="First", body="First body")
|
||||||
|
raw_second = _make_raw_email(subject="Second", body="Second body")
|
||||||
|
mailbox_state = {
|
||||||
|
b"1": {"uid": b"123", "raw": raw_first, "seen": False},
|
||||||
|
b"2": {"uid": b"124", "raw": raw_second, "seen": False},
|
||||||
|
}
|
||||||
|
fail_once = {"pending": True}
|
||||||
|
|
||||||
|
class FlakyIMAP:
|
||||||
|
def login(self, _user: str, _pw: str):
|
||||||
|
return "OK", [b"logged in"]
|
||||||
|
|
||||||
|
def select(self, _mailbox: str):
|
||||||
|
return "OK", [b"2"]
|
||||||
|
|
||||||
|
def search(self, *_args):
|
||||||
|
unseen_ids = [imap_id for imap_id, item in mailbox_state.items() if not item["seen"]]
|
||||||
|
return "OK", [b" ".join(unseen_ids)]
|
||||||
|
|
||||||
|
def fetch(self, imap_id: bytes, _parts: str):
|
||||||
|
if imap_id == b"2" and fail_once["pending"]:
|
||||||
|
fail_once["pending"] = False
|
||||||
|
raise imaplib.IMAP4.abort("socket error")
|
||||||
|
item = mailbox_state[imap_id]
|
||||||
|
header = b"%s (UID %s BODY[] {200})" % (imap_id, item["uid"])
|
||||||
|
return "OK", [(header, item["raw"]), b")"]
|
||||||
|
|
||||||
|
def store(self, imap_id: bytes, _op: str, _flags: str):
|
||||||
|
mailbox_state[imap_id]["seen"] = True
|
||||||
|
return "OK", [b""]
|
||||||
|
|
||||||
|
def logout(self):
|
||||||
|
return "BYE", [b""]
|
||||||
|
|
||||||
|
monkeypatch.setattr("nanobot.channels.email.imaplib.IMAP4_SSL", lambda _h, _p: FlakyIMAP())
|
||||||
|
|
||||||
|
channel = EmailChannel(_make_config(), MessageBus())
|
||||||
|
items = channel._fetch_new_messages()
|
||||||
|
|
||||||
|
assert [item["subject"] for item in items] == ["First", "Second"]
|
||||||
|
|
||||||
|
|
||||||
def test_fetch_new_messages_skips_missing_mailbox(monkeypatch) -> None:
|
def test_fetch_new_messages_skips_missing_mailbox(monkeypatch) -> None:
|
||||||
class MissingMailboxIMAP:
|
class MissingMailboxIMAP:
|
||||||
def login(self, _user: str, _pw: str):
|
def login(self, _user: str, _pw: str):
|
||||||
|
|||||||
Reference in New Issue
Block a user