ai-hackaton-backend/app/services/agent_manager.py
2025-09-09 20:26:14 +05:00

400 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import json
import logging
import os
import subprocess
from dataclasses import dataclass
from datetime import UTC, datetime
import psutil
from app.core.config import settings
logger = logging.getLogger(__name__)
@dataclass
class AgentProcess:
pid: int
session_id: int | None
room_name: str | None
started_at: datetime
status: str # "idle", "active", "stopping"
class AgentManager:
"""Singleton менеджер для управления AI агентом интервьюера"""
_instance: "AgentManager | None" = None
_agent_process: AgentProcess | None = None
_lock = asyncio.Lock()
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
if not hasattr(self, "_initialized"):
self._initialized = True
self.livekit_url = settings.livekit_url or "ws://localhost:7880"
self.api_key = settings.livekit_api_key or "devkey"
self.api_secret = (
settings.livekit_api_secret or "devkey_secret_32chars_minimum_length"
)
self._monitoring_task = None
async def start_agent(self) -> bool:
"""Запускает AI агента в режиме ожидания (без конкретной сессии)"""
async with self._lock:
if self._agent_process and self._is_process_alive(self._agent_process.pid):
logger.info(f"Agent already running with PID {self._agent_process.pid}")
return True
try:
# Запускаем агента в режиме worker (будет ждать подключения к комнатам)
agent_cmd = [
"uv",
"run",
"ai_interviewer_agent.py",
"start",
"--url",
self.livekit_url,
"--api-key",
self.api_key,
"--api-secret",
self.api_secret,
]
# Настройка окружения
env = os.environ.copy()
env.update(
{
"OPENAI_API_KEY": settings.openai_api_key or "",
"DEEPGRAM_API_KEY": settings.deepgram_api_key or "",
"CARTESIA_API_KEY": settings.cartesia_api_key or "",
"PYTHONIOENCODING": "utf-8",
}
)
# Запуск процесса
with open("ai_agent.log", "w") as log_file:
process = subprocess.Popen(
agent_cmd,
env=env,
stdout=log_file,
stderr=subprocess.STDOUT,
cwd=".",
)
self._agent_process = AgentProcess(
pid=process.pid,
session_id=None,
room_name=None,
started_at=datetime.now(UTC),
status="idle",
)
logger.info(f"AI Agent started with PID {process.pid}")
# Запускаем мониторинг команд
if not self._monitoring_task:
self._monitoring_task = asyncio.create_task(
self._monitor_commands()
)
return True
except Exception as e:
logger.error(f"Failed to start AI agent: {e}")
return False
async def stop_agent(self) -> bool:
"""Останавливает AI агента"""
async with self._lock:
if not self._agent_process:
return True
try:
if self._is_process_alive(self._agent_process.pid):
process = psutil.Process(self._agent_process.pid)
# Сначала пытаемся graceful shutdown
process.terminate()
# Ждем до 10 секунд
for _ in range(100):
if not process.is_running():
break
await asyncio.sleep(0.1)
# Если не завершился, убиваем принудительно
if process.is_running():
process.kill()
logger.info(f"AI Agent with PID {self._agent_process.pid} stopped")
self._agent_process = None
# Останавливаем мониторинг команд
if self._monitoring_task:
self._monitoring_task.cancel()
self._monitoring_task = None
return True
except Exception as e:
logger.error(f"Error stopping AI agent: {e}")
self._agent_process = None
return False
async def assign_session(
self,
session_id: int,
room_name: str,
interview_plan: dict,
vacancy_data: dict = None,
) -> bool:
"""Назначает агенту конкретную сессию интервью"""
async with self._lock:
if not self._agent_process or not self._is_process_alive(
self._agent_process.pid
):
logger.error("No active agent to assign session to")
return False
if self._agent_process.status == "active":
logger.error(
f"Agent is busy with session {self._agent_process.session_id}"
)
return False
try:
# Создаем файл метаданных для сессии
metadata_file = f"session_metadata_{session_id}.json"
metadata = {
"session_id": session_id,
"room_name": room_name,
"interview_plan": interview_plan,
"command": "start_interview",
}
# Добавляем данные вакансии если они переданы
if vacancy_data:
metadata["vacancy_data"] = vacancy_data
with open(metadata_file, "w", encoding="utf-8") as f:
json.dump(metadata, f, ensure_ascii=False, indent=2)
# Отправляем сигнал агенту через файл команд
command_file = "agent_commands.json"
with open(command_file, "w", encoding="utf-8") as f:
json.dump(
{
"action": "start_session",
"session_id": session_id,
"room_name": room_name,
"metadata_file": metadata_file,
"timestamp": datetime.now(UTC).isoformat(),
},
f,
ensure_ascii=False,
indent=2,
)
# Обновляем статус агента
self._agent_process.session_id = session_id
self._agent_process.room_name = room_name
self._agent_process.status = "active"
logger.info(
f"Assigned session {session_id} to agent PID {self._agent_process.pid}"
)
return True
except Exception as e:
logger.error(f"Error assigning session to agent: {e}")
return False
async def release_session(self) -> bool:
"""Освобождает агента от текущей сессии"""
async with self._lock:
if not self._agent_process:
return True
try:
# Отправляем команду завершения сессии
command_file = "agent_commands.json"
with open(command_file, "w", encoding="utf-8") as f:
json.dump(
{
"action": "end_session",
"session_id": self._agent_process.session_id,
"timestamp": datetime.now(UTC).isoformat(),
},
f,
ensure_ascii=False,
indent=2,
)
# Очищаем файлы метаданных
if self._agent_process.session_id:
try:
os.remove(
f"session_metadata_{self._agent_process.session_id}.json"
)
except FileNotFoundError:
pass
# Возвращаем агента в режим ожидания
self._agent_process.session_id = None
self._agent_process.room_name = None
self._agent_process.status = "idle"
logger.info("Released agent from current session")
return True
except Exception as e:
logger.error(f"Error releasing agent session: {e}")
return False
async def handle_session_completed(self, session_id: int, room_name: str) -> bool:
"""Обрабатывает сигнал о завершении сессии от агента"""
async with self._lock:
if not self._agent_process:
logger.warning(
f"No agent process to handle session_completed for {session_id}"
)
return False
if self._agent_process.session_id != session_id:
logger.warning(
f"Session mismatch: expected {self._agent_process.session_id}, got {session_id}"
)
return False
try:
# Очищаем файлы метаданных
try:
os.remove(f"session_metadata_{session_id}.json")
except FileNotFoundError:
pass
# Возвращаем агента в режим ожидания
old_session_id = self._agent_process.session_id
self._agent_process.session_id = None
self._agent_process.room_name = None
self._agent_process.status = "idle"
logger.info(
f"Agent automatically released from session {old_session_id}"
)
return True
except Exception as e:
logger.error(f"Error handling session_completed: {e}")
return False
def get_status(self) -> dict:
"""Возвращает текущий статус агента"""
if not self._agent_process:
return {
"status": "stopped",
"pid": None,
"session_id": None,
"room_name": None,
"uptime": None,
}
is_alive = self._is_process_alive(self._agent_process.pid)
if not is_alive:
self._agent_process = None
return {
"status": "dead",
"pid": None,
"session_id": None,
"room_name": None,
"uptime": None,
}
uptime = datetime.now(UTC) - self._agent_process.started_at
return {
"status": self._agent_process.status,
"pid": self._agent_process.pid,
"session_id": self._agent_process.session_id,
"room_name": self._agent_process.room_name,
"uptime": str(uptime),
"started_at": self._agent_process.started_at.isoformat(),
}
def is_available(self) -> bool:
"""Проверяет, доступен ли агент для новой сессии"""
if not self._agent_process:
return False
if not self._is_process_alive(self._agent_process.pid):
self._agent_process = None
return False
return self._agent_process.status == "idle"
def _is_process_alive(self, pid: int) -> bool:
"""Проверяет, жив ли процесс"""
try:
process = psutil.Process(pid)
return process.is_running()
except psutil.NoSuchProcess:
return False
except Exception:
return False
async def _monitor_commands(self):
"""Мониторит файл команд для обработки сигналов от агента"""
command_file = "agent_commands.json"
last_processed_timestamp = None
logger.info("[MONITOR] Starting command monitoring")
try:
while True:
try:
if os.path.exists(command_file):
with open(command_file, encoding="utf-8") as f:
command = json.load(f)
# Проверяем timestamp чтобы избежать повторной обработки
command_timestamp = command.get("timestamp")
if (
command_timestamp
and command_timestamp != last_processed_timestamp
):
action = command.get("action")
if action == "session_completed":
session_id = command.get("session_id")
room_name = command.get("room_name")
logger.info(
f"[MONITOR] Processing session_completed for {session_id}"
)
await self.handle_session_completed(
session_id, room_name
)
last_processed_timestamp = command_timestamp
await asyncio.sleep(2) # Проверяем каждые 2 секунды
except Exception as e:
logger.error(f"[MONITOR] Error processing command: {e}")
await asyncio.sleep(5) # Больший интервал при ошибке
except asyncio.CancelledError:
logger.info("[MONITOR] Command monitoring stopped")
except Exception as e:
logger.error(f"[MONITOR] Command monitoring failed: {e}")
# Глобальный экземпляр менеджера
agent_manager = AgentManager()