Harden email IMAP polling retries
This commit is contained in:
@@ -80,6 +80,21 @@ class EmailChannel(BaseChannel):
|
|||||||
"Nov",
|
"Nov",
|
||||||
"Dec",
|
"Dec",
|
||||||
)
|
)
|
||||||
|
_IMAP_RECONNECT_MARKERS = (
|
||||||
|
"disconnected for inactivity",
|
||||||
|
"eof occurred in violation of protocol",
|
||||||
|
"socket error",
|
||||||
|
"connection reset",
|
||||||
|
"broken pipe",
|
||||||
|
"bye",
|
||||||
|
)
|
||||||
|
_IMAP_MISSING_MAILBOX_MARKERS = (
|
||||||
|
"mailbox doesn't exist",
|
||||||
|
"select failed",
|
||||||
|
"no such mailbox",
|
||||||
|
"can't open mailbox",
|
||||||
|
"does not exist",
|
||||||
|
)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def default_config(cls) -> dict[str, Any]:
|
def default_config(cls) -> dict[str, Any]:
|
||||||
@@ -266,6 +281,21 @@ class EmailChannel(BaseChannel):
|
|||||||
mark_seen: bool,
|
mark_seen: bool,
|
||||||
dedupe: bool,
|
dedupe: bool,
|
||||||
limit: int,
|
limit: int,
|
||||||
|
) -> list[dict[str, Any]]:
|
||||||
|
try:
|
||||||
|
return self._fetch_messages_once(search_criteria, mark_seen, dedupe, limit)
|
||||||
|
except Exception as exc:
|
||||||
|
if not self._is_stale_imap_error(exc):
|
||||||
|
raise
|
||||||
|
logger.warning("Email IMAP connection went stale, retrying once: {}", exc)
|
||||||
|
return self._fetch_messages_once(search_criteria, mark_seen, dedupe, limit)
|
||||||
|
|
||||||
|
def _fetch_messages_once(
|
||||||
|
self,
|
||||||
|
search_criteria: tuple[str, ...],
|
||||||
|
mark_seen: bool,
|
||||||
|
dedupe: bool,
|
||||||
|
limit: int,
|
||||||
) -> list[dict[str, Any]]:
|
) -> list[dict[str, Any]]:
|
||||||
"""Fetch messages by arbitrary IMAP search criteria."""
|
"""Fetch messages by arbitrary IMAP search criteria."""
|
||||||
messages: list[dict[str, Any]] = []
|
messages: list[dict[str, Any]] = []
|
||||||
@@ -278,8 +308,15 @@ class EmailChannel(BaseChannel):
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
client.login(self.config.imap_username, self.config.imap_password)
|
client.login(self.config.imap_username, self.config.imap_password)
|
||||||
status, _ = client.select(mailbox)
|
try:
|
||||||
|
status, _ = client.select(mailbox)
|
||||||
|
except Exception as exc:
|
||||||
|
if self._is_missing_mailbox_error(exc):
|
||||||
|
logger.warning("Email mailbox unavailable, skipping poll for {}: {}", mailbox, exc)
|
||||||
|
return messages
|
||||||
|
raise
|
||||||
if status != "OK":
|
if status != "OK":
|
||||||
|
logger.warning("Email mailbox select returned {}, skipping poll for {}", status, mailbox)
|
||||||
return messages
|
return messages
|
||||||
|
|
||||||
status, data = client.search(None, *search_criteria)
|
status, data = client.search(None, *search_criteria)
|
||||||
@@ -358,6 +395,16 @@ class EmailChannel(BaseChannel):
|
|||||||
|
|
||||||
return messages
|
return messages
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _is_stale_imap_error(cls, exc: Exception) -> bool:
|
||||||
|
message = str(exc).lower()
|
||||||
|
return any(marker in message for marker in cls._IMAP_RECONNECT_MARKERS)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _is_missing_mailbox_error(cls, exc: Exception) -> bool:
|
||||||
|
message = str(exc).lower()
|
||||||
|
return any(marker in message for marker in cls._IMAP_MISSING_MAILBOX_MARKERS)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def _format_imap_date(cls, value: date) -> str:
|
def _format_imap_date(cls, value: date) -> str:
|
||||||
"""Format date for IMAP search (always English month abbreviations)."""
|
"""Format date for IMAP search (always English month abbreviations)."""
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
from email.message import EmailMessage
|
from email.message import EmailMessage
|
||||||
from datetime import date
|
from datetime import date
|
||||||
|
import imaplib
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
@@ -82,6 +83,77 @@ def test_fetch_new_messages_parses_unseen_and_marks_seen(monkeypatch) -> None:
|
|||||||
assert items_again == []
|
assert items_again == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_fetch_new_messages_retries_once_when_imap_connection_goes_stale(monkeypatch) -> None:
|
||||||
|
raw = _make_raw_email(subject="Invoice", body="Please pay")
|
||||||
|
fail_once = {"pending": True}
|
||||||
|
|
||||||
|
class FlakyIMAP:
|
||||||
|
def __init__(self) -> None:
|
||||||
|
self.store_calls: list[tuple[bytes, str, str]] = []
|
||||||
|
self.search_calls = 0
|
||||||
|
|
||||||
|
def login(self, _user: str, _pw: str):
|
||||||
|
return "OK", [b"logged in"]
|
||||||
|
|
||||||
|
def select(self, _mailbox: str):
|
||||||
|
return "OK", [b"1"]
|
||||||
|
|
||||||
|
def search(self, *_args):
|
||||||
|
self.search_calls += 1
|
||||||
|
if fail_once["pending"]:
|
||||||
|
fail_once["pending"] = False
|
||||||
|
raise imaplib.IMAP4.abort("socket error")
|
||||||
|
return "OK", [b"1"]
|
||||||
|
|
||||||
|
def fetch(self, _imap_id: bytes, _parts: str):
|
||||||
|
return "OK", [(b"1 (UID 123 BODY[] {200})", raw), b")"]
|
||||||
|
|
||||||
|
def store(self, imap_id: bytes, op: str, flags: str):
|
||||||
|
self.store_calls.append((imap_id, op, flags))
|
||||||
|
return "OK", [b""]
|
||||||
|
|
||||||
|
def logout(self):
|
||||||
|
return "BYE", [b""]
|
||||||
|
|
||||||
|
fake_instances: list[FlakyIMAP] = []
|
||||||
|
|
||||||
|
def _factory(_host: str, _port: int):
|
||||||
|
instance = FlakyIMAP()
|
||||||
|
fake_instances.append(instance)
|
||||||
|
return instance
|
||||||
|
|
||||||
|
monkeypatch.setattr("nanobot.channels.email.imaplib.IMAP4_SSL", _factory)
|
||||||
|
|
||||||
|
channel = EmailChannel(_make_config(), MessageBus())
|
||||||
|
items = channel._fetch_new_messages()
|
||||||
|
|
||||||
|
assert len(items) == 1
|
||||||
|
assert len(fake_instances) == 2
|
||||||
|
assert fake_instances[0].search_calls == 1
|
||||||
|
assert fake_instances[1].search_calls == 1
|
||||||
|
|
||||||
|
|
||||||
|
def test_fetch_new_messages_skips_missing_mailbox(monkeypatch) -> None:
|
||||||
|
class MissingMailboxIMAP:
|
||||||
|
def login(self, _user: str, _pw: str):
|
||||||
|
return "OK", [b"logged in"]
|
||||||
|
|
||||||
|
def select(self, _mailbox: str):
|
||||||
|
raise imaplib.IMAP4.error("Mailbox doesn't exist")
|
||||||
|
|
||||||
|
def logout(self):
|
||||||
|
return "BYE", [b""]
|
||||||
|
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"nanobot.channels.email.imaplib.IMAP4_SSL",
|
||||||
|
lambda _h, _p: MissingMailboxIMAP(),
|
||||||
|
)
|
||||||
|
|
||||||
|
channel = EmailChannel(_make_config(), MessageBus())
|
||||||
|
|
||||||
|
assert channel._fetch_new_messages() == []
|
||||||
|
|
||||||
|
|
||||||
def test_extract_text_body_falls_back_to_html() -> None:
|
def test_extract_text_body_falls_back_to_html() -> None:
|
||||||
msg = EmailMessage()
|
msg = EmailMessage()
|
||||||
msg["From"] = "alice@example.com"
|
msg["From"] = "alice@example.com"
|
||||||
|
|||||||
Reference in New Issue
Block a user