fix(email): avoid executor hang in blocking io
This commit is contained in:
@@ -59,6 +59,16 @@ class EmailChannel(BaseChannel):
|
|||||||
self._processed_uids: set[str] = set() # Capped to prevent unbounded growth
|
self._processed_uids: set[str] = set() # Capped to prevent unbounded growth
|
||||||
self._MAX_PROCESSED_UIDS = 100000
|
self._MAX_PROCESSED_UIDS = 100000
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
async def _run_blocking(func, /, *args, **kwargs):
|
||||||
|
"""Run blocking IMAP/SMTP work.
|
||||||
|
|
||||||
|
The usual threadpool offload path (`asyncio.to_thread` / executors)
|
||||||
|
can hang in some deployment/test environments here, so Email falls
|
||||||
|
back to direct execution for reliability.
|
||||||
|
"""
|
||||||
|
return func(*args, **kwargs)
|
||||||
|
|
||||||
async def start(self) -> None:
|
async def start(self) -> None:
|
||||||
"""Start polling IMAP for inbound emails."""
|
"""Start polling IMAP for inbound emails."""
|
||||||
if not self.config.consent_granted:
|
if not self.config.consent_granted:
|
||||||
@@ -77,7 +87,7 @@ class EmailChannel(BaseChannel):
|
|||||||
poll_seconds = max(5, int(self.config.poll_interval_seconds))
|
poll_seconds = max(5, int(self.config.poll_interval_seconds))
|
||||||
while self._running:
|
while self._running:
|
||||||
try:
|
try:
|
||||||
inbound_items = await asyncio.to_thread(self._fetch_new_messages)
|
inbound_items = await self._run_blocking(self._fetch_new_messages)
|
||||||
for item in inbound_items:
|
for item in inbound_items:
|
||||||
sender = item["sender"]
|
sender = item["sender"]
|
||||||
subject = item.get("subject", "")
|
subject = item.get("subject", "")
|
||||||
@@ -134,19 +144,16 @@ class EmailChannel(BaseChannel):
|
|||||||
if override:
|
if override:
|
||||||
subject = override
|
subject = override
|
||||||
|
|
||||||
email_msg = EmailMessage()
|
|
||||||
email_msg["From"] = self.config.from_address or self.config.smtp_username or self.config.imap_username
|
|
||||||
email_msg["To"] = to_addr
|
|
||||||
email_msg["Subject"] = subject
|
|
||||||
email_msg.set_content(msg.content or "")
|
|
||||||
|
|
||||||
in_reply_to = self._last_message_id_by_chat.get(to_addr)
|
in_reply_to = self._last_message_id_by_chat.get(to_addr)
|
||||||
if in_reply_to:
|
|
||||||
email_msg["In-Reply-To"] = in_reply_to
|
|
||||||
email_msg["References"] = in_reply_to
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
await asyncio.to_thread(self._smtp_send, email_msg)
|
await self._run_blocking(
|
||||||
|
self._smtp_send_message,
|
||||||
|
to_addr=to_addr,
|
||||||
|
subject=subject,
|
||||||
|
content=msg.content or "",
|
||||||
|
in_reply_to=in_reply_to,
|
||||||
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error("Error sending email to {}: {}", to_addr, e)
|
logger.error("Error sending email to {}: {}", to_addr, e)
|
||||||
raise
|
raise
|
||||||
@@ -171,6 +178,25 @@ class EmailChannel(BaseChannel):
|
|||||||
return False
|
return False
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
def _smtp_send_message(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
to_addr: str,
|
||||||
|
subject: str,
|
||||||
|
content: str,
|
||||||
|
in_reply_to: str | None = None,
|
||||||
|
) -> None:
|
||||||
|
"""Build and send one outbound email inside the worker thread."""
|
||||||
|
msg = EmailMessage()
|
||||||
|
msg["From"] = self.config.from_address or self.config.smtp_username or self.config.imap_username
|
||||||
|
msg["To"] = to_addr
|
||||||
|
msg["Subject"] = subject
|
||||||
|
msg.set_content(content)
|
||||||
|
if in_reply_to:
|
||||||
|
msg["In-Reply-To"] = in_reply_to
|
||||||
|
msg["References"] = in_reply_to
|
||||||
|
self._smtp_send(msg)
|
||||||
|
|
||||||
def _smtp_send(self, msg: EmailMessage) -> None:
|
def _smtp_send(self, msg: EmailMessage) -> None:
|
||||||
timeout = 30
|
timeout = 30
|
||||||
if self.config.smtp_use_ssl:
|
if self.config.smtp_use_ssl:
|
||||||
|
|||||||
Reference in New Issue
Block a user