ai-hackaton-backend/app/services/email_service.py
2026-01-15 20:41:24 +05:00

87 lines
2.1 KiB
Python

import asyncio
import logging
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from app.core.config import settings
logger = logging.getLogger(__name__)
def _send_email_sync(
subject: str,
body: str,
to_email: str,
from_email: str | None = None,
) -> bool:
"""Synchronous email sending using smtplib."""
if not settings.email_host_user or not settings.email_host_password:
logger.warning("Email credentials not configured, skipping email send")
return False
from_email = from_email or settings.default_from_email
msg = MIMEMultipart()
msg["From"] = from_email
msg["To"] = to_email
msg["Subject"] = subject
msg.attach(MIMEText(body, "plain", "utf-8"))
try:
if settings.email_use_tls:
server = smtplib.SMTP(settings.email_host, settings.email_port)
server.starttls()
else:
server = smtplib.SMTP_SSL(settings.email_host, settings.email_port)
server.login(settings.email_host_user, settings.email_host_password)
server.send_message(msg)
server.quit()
return True
except Exception as e:
logger.error(f"Failed to send email: {e}")
raise
async def send_email(
subject: str,
body: str,
to_email: str,
from_email: str | None = None,
) -> bool:
"""Async wrapper for sending emails."""
return await asyncio.to_thread(
_send_email_sync,
subject,
body,
to_email,
from_email,
)
async def send_contact_request_email(
name: str,
email: str,
phone: str | None = None,
company: str | None = None,
message: str | None = None,
) -> bool:
"""Send contact request notification email."""
subject = f"New Contact Request from {name}"
body = f"""New contact request received:
Name: {name}
Email: {email}
Phone: {phone or 'Not specified'}
Company: {company or 'Not specified'}
Message:
{message or 'No additional message'}
"""
return await send_email(
subject=subject,
body=body,
to_email=settings.contact_request_email,
)