ai-hackaton-backend/celery_worker/database.py

174 lines
6.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from contextlib import contextmanager
from sqlalchemy import create_engine
from sqlalchemy.orm import Session, sessionmaker
from rag.settings import settings
# Создаем синхронный engine для Celery (так как Celery работает в отдельных процессах)
sync_engine = create_engine(
settings.database_url.replace(
"asyncpg", "psycopg2"
), # Убираем asyncpg для синхронного подключения
echo=False,
future=True,
connect_args={"client_encoding": "utf8"}, # Принудительно UTF-8
)
# Создаем синхронный session maker
SyncSessionLocal = sessionmaker(bind=sync_engine, autocommit=False, autoflush=False)
@contextmanager
def get_sync_session() -> Session:
"""Получить синхронную сессию для использования в Celery tasks"""
session = SyncSessionLocal()
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
class SyncResumeRepository:
"""Синхронный repository для работы с Resume в Celery tasks"""
def __init__(self, session: Session):
self.session = session
def get_by_id(self, resume_id: int):
"""Получить резюме по ID"""
from app.models.resume import Resume
return self.session.query(Resume).filter(Resume.id == resume_id).first()
def update_status(
self,
resume_id: int,
status: str,
parsed_data: dict = None,
error_message: str = None,
rejection_reason: str = None,
):
"""Обновить статус резюме"""
from datetime import datetime
from app.models.resume import Resume, ResumeStatus
resume = self.session.query(Resume).filter(Resume.id == resume_id).first()
if resume:
# Обновляем статус
if status == "parsing":
resume.status = ResumeStatus.PARSING
elif status == "parsed":
resume.status = ResumeStatus.PARSED
if parsed_data:
resume.parsed_data = parsed_data
elif status == "failed":
resume.status = ResumeStatus.PARSE_FAILED
if error_message:
resume.parse_error = error_message
elif status == "rejected":
resume.status = ResumeStatus.REJECTED
if rejection_reason:
resume.notes = f"ОТКЛОНЕНО: {rejection_reason}"
resume.updated_at = datetime.utcnow()
self.session.add(resume)
return resume
return None
def update_interview_plan(self, resume_id: int, interview_plan: dict):
"""Обновить план интервью"""
from datetime import datetime
from app.models.resume import Resume
resume = self.session.query(Resume).filter(Resume.id == resume_id).first()
if resume:
resume.interview_plan = interview_plan
resume.updated_at = datetime.utcnow()
self.session.add(resume)
return resume
return None
def _normalize_utf8_dict(self, data):
"""Нормализует UTF-8 в словаре рекурсивно"""
import json
# Сериализуем в JSON с ensure_ascii=False, потом парсим обратно
# Это принудительно конвертирует все unicode escape sequences в нормальные символы
try:
json_str = json.dumps(data, ensure_ascii=False, separators=(",", ":"))
return json.loads(json_str)
except (TypeError, ValueError):
# Fallback - рекурсивная обработка
if isinstance(data, dict):
return {
key: self._normalize_utf8_dict(value) for key, value in data.items()
}
elif isinstance(data, list):
return [self._normalize_utf8_dict(item) for item in data]
elif isinstance(data, str):
try:
# Пытаемся декодировать unicode escape sequences
if "\\u" in data:
return data.encode().decode("unicode_escape")
return data
except (UnicodeDecodeError, UnicodeEncodeError):
return data
else:
return data
class SyncVacancyRepository:
"""Синхронный repository для работы с Vacancy в Celery tasks"""
def __init__(self, session: Session):
self.session = session
def get_by_id(self, vacancy_id: int):
"""Получить вакансию по ID"""
from app.models.vacancy import Vacancy
return self.session.query(Vacancy).filter(Vacancy.id == vacancy_id).first()
def create_vacancy(self, vacancy_create):
"""Создать новую вакансию"""
from datetime import datetime
from app.models.vacancy import Vacancy
# Конвертируем VacancyCreate в dict
if hasattr(vacancy_create, 'dict'):
vacancy_data = vacancy_create.dict()
elif hasattr(vacancy_create, 'model_dump'):
vacancy_data = vacancy_create.model_dump()
else:
vacancy_data = vacancy_create
# Создаем новую вакансию
vacancy = Vacancy(
**vacancy_data,
created_at=datetime.utcnow(),
updated_at=datetime.utcnow()
)
self.session.add(vacancy)
self.session.flush() # Получаем ID без коммита
self.session.refresh(vacancy) # Обновляем объект из БД
# Создаем простой объект с нужными данными для возврата
class VacancyResult:
def __init__(self, id, title):
self.id = id
self.title = title
return VacancyResult(vacancy.id, vacancy.title)