235 lines
9.7 KiB
Python
235 lines
9.7 KiB
Python
import logging
|
||
from datetime import datetime
|
||
from typing import Annotated
|
||
|
||
from fastapi import Depends
|
||
|
||
from app.models.resume import ResumeStatus
|
||
from app.repositories.interview_repository import InterviewRepository
|
||
from app.repositories.resume_repository import ResumeRepository
|
||
|
||
logger = logging.getLogger("interview-finalization")
|
||
|
||
|
||
class InterviewFinalizationService:
|
||
"""Сервис для завершения интервью и запуска анализа"""
|
||
|
||
def __init__(
|
||
self,
|
||
interview_repo: Annotated[InterviewRepository, Depends(InterviewRepository)],
|
||
resume_repo: Annotated[ResumeRepository, Depends(ResumeRepository)],
|
||
):
|
||
self.interview_repo = interview_repo
|
||
self.resume_repo = resume_repo
|
||
|
||
async def finalize_interview(
|
||
self, room_name: str, dialogue_history: list, interview_metrics: dict = None
|
||
) -> dict | None:
|
||
"""
|
||
Завершает интервью и запускает анализ
|
||
|
||
Args:
|
||
room_name: Имя комнаты LiveKit
|
||
dialogue_history: История диалога
|
||
interview_metrics: Метрики интервью (количество вопросов, время и т.д.)
|
||
|
||
Returns:
|
||
dict с информацией о завершенном интервью или None если ошибка
|
||
"""
|
||
try:
|
||
logger.info(f"[FINALIZE] Starting finalization for room: {room_name}")
|
||
|
||
# 1. Находим сессию интервью
|
||
interview_session = await self.interview_repo.get_by_room_name(room_name)
|
||
if not interview_session:
|
||
logger.error(
|
||
f"[FINALIZE] Interview session not found for room: {room_name}"
|
||
)
|
||
return None
|
||
|
||
# 2. Обновляем статус сессии интервью на "completed"
|
||
success = await self.interview_repo.update_status(
|
||
interview_session.id, "completed", datetime.utcnow()
|
||
)
|
||
|
||
if not success:
|
||
logger.error(
|
||
f"[FINALIZE] Failed to update session status for {interview_session.id}"
|
||
)
|
||
return None
|
||
|
||
resume_id = interview_session.resume_id
|
||
logger.info(
|
||
f"[FINALIZE] Interview session {interview_session.id} marked as completed"
|
||
)
|
||
|
||
# 3. Обновляем статус резюме на "INTERVIEWED"
|
||
resume = await self.resume_repo.get(resume_id)
|
||
if resume:
|
||
await self.resume_repo.update(
|
||
resume_id,
|
||
{
|
||
"status": ResumeStatus.INTERVIEWED,
|
||
"updated_at": datetime.utcnow(),
|
||
},
|
||
)
|
||
logger.info(
|
||
f"[FINALIZE] Resume {resume_id} status updated to INTERVIEWED"
|
||
)
|
||
else:
|
||
logger.warning(f"[FINALIZE] Resume {resume_id} not found")
|
||
|
||
# 4. Сохраняем финальную историю диалога
|
||
await self.interview_repo.update_dialogue_history(
|
||
room_name, dialogue_history
|
||
)
|
||
logger.info(
|
||
f"[FINALIZE] Saved final dialogue ({len(dialogue_history)} messages)"
|
||
)
|
||
|
||
# 5. Обновляем статус AI агента
|
||
await self.interview_repo.update_ai_agent_status(
|
||
interview_session.id, None, "stopped"
|
||
)
|
||
|
||
# 6. Запускаем анализ интервью через Celery
|
||
analysis_task = await self._start_interview_analysis(resume_id)
|
||
|
||
# 7. Собираем итоговые метрики
|
||
finalization_result = {
|
||
"session_id": interview_session.id,
|
||
"resume_id": resume_id,
|
||
"room_name": room_name,
|
||
"total_messages": len(dialogue_history),
|
||
"analysis_task_id": analysis_task.get("task_id")
|
||
if analysis_task
|
||
else None,
|
||
"completed_at": datetime.utcnow().isoformat(),
|
||
"metrics": interview_metrics or {},
|
||
}
|
||
|
||
logger.info(
|
||
f"[FINALIZE] Interview successfully finalized: {finalization_result}"
|
||
)
|
||
return finalization_result
|
||
|
||
except Exception as e:
|
||
logger.error(
|
||
f"[FINALIZE] Error finalizing interview for room {room_name}: {str(e)}"
|
||
)
|
||
return None
|
||
|
||
async def _start_interview_analysis(self, resume_id: int):
|
||
"""Запускает анализ интервью через Celery"""
|
||
# try:
|
||
logger.info(
|
||
f"[FINALIZE] Attempting to start analysis task for resume_id: {resume_id}"
|
||
)
|
||
|
||
# Импортируем задачу
|
||
# from celery_worker.interview_analysis_task import generate_interview_report
|
||
# logger.debug(f"[FINALIZE] Successfully imported generate_interview_report task")
|
||
#
|
||
# # Запускаем задачу
|
||
# task = generate_interview_report.delay(resume_id)
|
||
# logger.info(f"[FINALIZE] Analysis task started: {task.id} for resume_id: {resume_id}")
|
||
# return task
|
||
#
|
||
# except ImportError as e:
|
||
# logger.error(f"[FINALIZE] Import error for analysis task: {str(e)}")
|
||
# return None
|
||
# except ConnectionError as e:
|
||
# logger.error(f"[FINALIZE] Connection error starting analysis task for resume {resume_id}: {str(e)}")
|
||
# logger.warning(f"[FINALIZE] This may indicate Redis/Celery broker is not accessible from AI agent process")
|
||
#
|
||
# # Fallback: попытка запуска анализа через HTTP API
|
||
# return await self._start_analysis_via_http(resume_id)
|
||
# except Exception as e:
|
||
# logger.error(f"[FINALIZE] Failed to start analysis task for resume {resume_id}: {str(e)}")
|
||
# logger.debug(f"[FINALIZE] Exception type: {type(e).__name__}")
|
||
|
||
# Fallback: попытка запуска анализа через HTTP API для любых других ошибок
|
||
return await self._start_analysis_via_http(resume_id)
|
||
|
||
async def _start_analysis_via_http(self, resume_id: int):
|
||
"""Fallback: запуск анализа через HTTP API (когда Celery недоступен из AI агента)"""
|
||
try:
|
||
import httpx
|
||
|
||
url = f"http://localhost:8000/api/v1/analysis/interview-report/{resume_id}"
|
||
logger.info(f"[FINALIZE] Attempting HTTP fallback to URL: {url}")
|
||
|
||
# Попробуем отправить HTTP запрос на локальный API для запуска анализа
|
||
async with httpx.AsyncClient() as client:
|
||
response = await client.post(url, timeout=5.0)
|
||
|
||
if response.status_code == 200:
|
||
result = response.json()
|
||
logger.info(
|
||
f"[FINALIZE] Analysis started via HTTP API for resume_id: {resume_id}, task_id: {result.get('task_id', 'unknown')}"
|
||
)
|
||
return result
|
||
else:
|
||
logger.error(
|
||
f"[FINALIZE] HTTP API returned {response.status_code} for resume_id: {resume_id}"
|
||
)
|
||
logger.debug(f"[FINALIZE] Response body: {response.text[:200]}")
|
||
return None
|
||
|
||
except Exception as e:
|
||
logger.error(
|
||
f"[FINALIZE] HTTP fallback failed for resume {resume_id}: {str(e)}"
|
||
)
|
||
return None
|
||
|
||
async def save_dialogue_to_session(
|
||
self, room_name: str, dialogue_history: list
|
||
) -> bool:
|
||
"""Сохраняет диалог в сессию (для промежуточных сохранений)"""
|
||
try:
|
||
success = await self.interview_repo.update_dialogue_history(
|
||
room_name, dialogue_history
|
||
)
|
||
if success:
|
||
logger.info(
|
||
f"[DIALOGUE] Saved {len(dialogue_history)} messages for room: {room_name}"
|
||
)
|
||
return success
|
||
except Exception as e:
|
||
logger.error(
|
||
f"[DIALOGUE] Error saving dialogue for room {room_name}: {str(e)}"
|
||
)
|
||
return False
|
||
|
||
async def cleanup_dead_processes(self) -> int:
|
||
"""Очищает информацию о мертвых AI процессах"""
|
||
try:
|
||
import psutil
|
||
|
||
active_sessions = (
|
||
await self.interview_repo.get_sessions_with_running_agents()
|
||
)
|
||
cleaned_count = 0
|
||
|
||
for session in active_sessions:
|
||
if session.ai_agent_pid:
|
||
try:
|
||
process = psutil.Process(session.ai_agent_pid)
|
||
if not process.is_running():
|
||
await self.interview_repo.update_ai_agent_status(
|
||
session.id, None, "stopped"
|
||
)
|
||
cleaned_count += 1
|
||
except psutil.NoSuchProcess:
|
||
await self.interview_repo.update_ai_agent_status(
|
||
session.id, None, "stopped"
|
||
)
|
||
cleaned_count += 1
|
||
|
||
logger.info(f"[CLEANUP] Cleaned up {cleaned_count} dead processes")
|
||
return cleaned_count
|
||
|
||
except Exception as e:
|
||
logger.error(f"[CLEANUP] Error cleaning up processes: {str(e)}")
|
||
return 0
|