387 lines
15 KiB
Python
387 lines
15 KiB
Python
import asyncio
|
||
import json
|
||
import logging
|
||
import os
|
||
import subprocess
|
||
from dataclasses import dataclass
|
||
from datetime import UTC, datetime
|
||
|
||
import psutil
|
||
|
||
from app.core.config import settings
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
@dataclass
|
||
class AgentProcess:
|
||
pid: int
|
||
session_id: int | None
|
||
room_name: str | None
|
||
started_at: datetime
|
||
status: str # "idle", "active", "stopping"
|
||
|
||
|
||
class AgentManager:
|
||
"""Singleton менеджер для управления AI агентом интервьюера"""
|
||
|
||
_instance: "AgentManager | None" = None
|
||
_agent_process: AgentProcess | None = None
|
||
_lock = asyncio.Lock()
|
||
|
||
def __new__(cls):
|
||
if cls._instance is None:
|
||
cls._instance = super().__new__(cls)
|
||
return cls._instance
|
||
|
||
def __init__(self):
|
||
if not hasattr(self, "_initialized"):
|
||
self._initialized = True
|
||
self.livekit_url = settings.livekit_url or "ws://localhost:7880"
|
||
self.api_key = settings.livekit_api_key or "devkey"
|
||
self.api_secret = (
|
||
settings.livekit_api_secret or "devkey_secret_32chars_minimum_length"
|
||
)
|
||
self._monitoring_task = None
|
||
|
||
async def start_agent(self) -> bool:
|
||
"""Запускает AI агента в режиме ожидания (без конкретной сессии)"""
|
||
async with self._lock:
|
||
if self._agent_process and self._is_process_alive(self._agent_process.pid):
|
||
logger.info(f"Agent already running with PID {self._agent_process.pid}")
|
||
return True
|
||
|
||
try:
|
||
# Запускаем агента в режиме worker (будет ждать подключения к комнатам)
|
||
agent_cmd = [
|
||
"uv",
|
||
"run",
|
||
"ai_interviewer_agent.py",
|
||
"start",
|
||
"--url",
|
||
self.livekit_url,
|
||
"--api-key",
|
||
self.api_key,
|
||
"--api-secret",
|
||
self.api_secret,
|
||
]
|
||
|
||
# Настройка окружения
|
||
env = os.environ.copy()
|
||
env.update(
|
||
{
|
||
"OPENAI_API_KEY": settings.openai_api_key or "",
|
||
"DEEPGRAM_API_KEY": settings.deepgram_api_key or "",
|
||
"CARTESIA_API_KEY": settings.cartesia_api_key or "",
|
||
"PYTHONIOENCODING": "utf-8",
|
||
}
|
||
)
|
||
|
||
# Запуск процесса
|
||
with open("ai_agent.log", "w") as log_file:
|
||
process = subprocess.Popen(
|
||
agent_cmd,
|
||
env=env,
|
||
stdout=log_file,
|
||
stderr=subprocess.STDOUT,
|
||
cwd=".",
|
||
)
|
||
|
||
self._agent_process = AgentProcess(
|
||
pid=process.pid,
|
||
session_id=None,
|
||
room_name=None,
|
||
started_at=datetime.now(UTC),
|
||
status="idle",
|
||
)
|
||
|
||
logger.info(f"AI Agent started with PID {process.pid}")
|
||
|
||
# Запускаем мониторинг команд
|
||
if not self._monitoring_task:
|
||
self._monitoring_task = asyncio.create_task(self._monitor_commands())
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to start AI agent: {e}")
|
||
return False
|
||
|
||
async def stop_agent(self) -> bool:
|
||
"""Останавливает AI агента"""
|
||
async with self._lock:
|
||
if not self._agent_process:
|
||
return True
|
||
|
||
try:
|
||
if self._is_process_alive(self._agent_process.pid):
|
||
process = psutil.Process(self._agent_process.pid)
|
||
|
||
# Сначала пытаемся graceful shutdown
|
||
process.terminate()
|
||
|
||
# Ждем до 10 секунд
|
||
for _ in range(100):
|
||
if not process.is_running():
|
||
break
|
||
await asyncio.sleep(0.1)
|
||
|
||
# Если не завершился, убиваем принудительно
|
||
if process.is_running():
|
||
process.kill()
|
||
|
||
logger.info(f"AI Agent with PID {self._agent_process.pid} stopped")
|
||
self._agent_process = None
|
||
|
||
# Останавливаем мониторинг команд
|
||
if self._monitoring_task:
|
||
self._monitoring_task.cancel()
|
||
self._monitoring_task = None
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error stopping AI agent: {e}")
|
||
self._agent_process = None
|
||
return False
|
||
|
||
async def assign_session(
|
||
self,
|
||
session_id: int,
|
||
room_name: str,
|
||
interview_plan: dict,
|
||
vacancy_data: dict = None,
|
||
) -> bool:
|
||
"""Назначает агенту конкретную сессию интервью"""
|
||
async with self._lock:
|
||
if not self._agent_process or not self._is_process_alive(
|
||
self._agent_process.pid
|
||
):
|
||
logger.error("No active agent to assign session to")
|
||
return False
|
||
|
||
if self._agent_process.status == "active":
|
||
logger.error(
|
||
f"Agent is busy with session {self._agent_process.session_id}"
|
||
)
|
||
return False
|
||
|
||
try:
|
||
# Создаем файл метаданных для сессии
|
||
metadata_file = f"session_metadata_{session_id}.json"
|
||
metadata = {
|
||
"session_id": session_id,
|
||
"room_name": room_name,
|
||
"interview_plan": interview_plan,
|
||
"command": "start_interview",
|
||
}
|
||
|
||
# Добавляем данные вакансии если они переданы
|
||
if vacancy_data:
|
||
metadata["vacancy_data"] = vacancy_data
|
||
|
||
with open(metadata_file, "w", encoding="utf-8") as f:
|
||
json.dump(metadata, f, ensure_ascii=False, indent=2)
|
||
|
||
# Отправляем сигнал агенту через файл команд
|
||
command_file = "agent_commands.json"
|
||
with open(command_file, "w", encoding="utf-8") as f:
|
||
json.dump(
|
||
{
|
||
"action": "start_session",
|
||
"session_id": session_id,
|
||
"room_name": room_name,
|
||
"metadata_file": metadata_file,
|
||
"timestamp": datetime.now(UTC).isoformat(),
|
||
},
|
||
f,
|
||
ensure_ascii=False,
|
||
indent=2,
|
||
)
|
||
|
||
# Обновляем статус агента
|
||
self._agent_process.session_id = session_id
|
||
self._agent_process.room_name = room_name
|
||
self._agent_process.status = "active"
|
||
|
||
logger.info(
|
||
f"Assigned session {session_id} to agent PID {self._agent_process.pid}"
|
||
)
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error assigning session to agent: {e}")
|
||
return False
|
||
|
||
async def release_session(self) -> bool:
|
||
"""Освобождает агента от текущей сессии"""
|
||
async with self._lock:
|
||
if not self._agent_process:
|
||
return True
|
||
|
||
try:
|
||
# Отправляем команду завершения сессии
|
||
command_file = "agent_commands.json"
|
||
with open(command_file, "w", encoding="utf-8") as f:
|
||
json.dump(
|
||
{
|
||
"action": "end_session",
|
||
"session_id": self._agent_process.session_id,
|
||
"timestamp": datetime.now(UTC).isoformat(),
|
||
},
|
||
f,
|
||
ensure_ascii=False,
|
||
indent=2,
|
||
)
|
||
|
||
# Очищаем файлы метаданных
|
||
if self._agent_process.session_id:
|
||
try:
|
||
os.remove(
|
||
f"session_metadata_{self._agent_process.session_id}.json"
|
||
)
|
||
except FileNotFoundError:
|
||
pass
|
||
|
||
# Возвращаем агента в режим ожидания
|
||
self._agent_process.session_id = None
|
||
self._agent_process.room_name = None
|
||
self._agent_process.status = "idle"
|
||
|
||
logger.info("Released agent from current session")
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error releasing agent session: {e}")
|
||
return False
|
||
|
||
async def handle_session_completed(self, session_id: int, room_name: str) -> bool:
|
||
"""Обрабатывает сигнал о завершении сессии от агента"""
|
||
async with self._lock:
|
||
if not self._agent_process:
|
||
logger.warning(f"No agent process to handle session_completed for {session_id}")
|
||
return False
|
||
|
||
if self._agent_process.session_id != session_id:
|
||
logger.warning(
|
||
f"Session mismatch: expected {self._agent_process.session_id}, got {session_id}"
|
||
)
|
||
return False
|
||
|
||
try:
|
||
# Очищаем файлы метаданных
|
||
try:
|
||
os.remove(f"session_metadata_{session_id}.json")
|
||
except FileNotFoundError:
|
||
pass
|
||
|
||
# Возвращаем агента в режим ожидания
|
||
old_session_id = self._agent_process.session_id
|
||
self._agent_process.session_id = None
|
||
self._agent_process.room_name = None
|
||
self._agent_process.status = "idle"
|
||
|
||
logger.info(f"Agent automatically released from session {old_session_id}")
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error handling session_completed: {e}")
|
||
return False
|
||
|
||
def get_status(self) -> dict:
|
||
"""Возвращает текущий статус агента"""
|
||
if not self._agent_process:
|
||
return {
|
||
"status": "stopped",
|
||
"pid": None,
|
||
"session_id": None,
|
||
"room_name": None,
|
||
"uptime": None,
|
||
}
|
||
|
||
is_alive = self._is_process_alive(self._agent_process.pid)
|
||
if not is_alive:
|
||
self._agent_process = None
|
||
return {
|
||
"status": "dead",
|
||
"pid": None,
|
||
"session_id": None,
|
||
"room_name": None,
|
||
"uptime": None,
|
||
}
|
||
|
||
uptime = datetime.now(UTC) - self._agent_process.started_at
|
||
|
||
return {
|
||
"status": self._agent_process.status,
|
||
"pid": self._agent_process.pid,
|
||
"session_id": self._agent_process.session_id,
|
||
"room_name": self._agent_process.room_name,
|
||
"uptime": str(uptime),
|
||
"started_at": self._agent_process.started_at.isoformat(),
|
||
}
|
||
|
||
def is_available(self) -> bool:
|
||
"""Проверяет, доступен ли агент для новой сессии"""
|
||
if not self._agent_process:
|
||
return False
|
||
|
||
if not self._is_process_alive(self._agent_process.pid):
|
||
self._agent_process = None
|
||
return False
|
||
|
||
return self._agent_process.status == "idle"
|
||
|
||
def _is_process_alive(self, pid: int) -> bool:
|
||
"""Проверяет, жив ли процесс"""
|
||
try:
|
||
process = psutil.Process(pid)
|
||
return process.is_running()
|
||
except psutil.NoSuchProcess:
|
||
return False
|
||
except Exception:
|
||
return False
|
||
|
||
async def _monitor_commands(self):
|
||
"""Мониторит файл команд для обработки сигналов от агента"""
|
||
command_file = "agent_commands.json"
|
||
last_processed_timestamp = None
|
||
|
||
logger.info("[MONITOR] Starting command monitoring")
|
||
|
||
try:
|
||
while True:
|
||
try:
|
||
if os.path.exists(command_file):
|
||
with open(command_file, "r", encoding="utf-8") as f:
|
||
command = json.load(f)
|
||
|
||
# Проверяем timestamp чтобы избежать повторной обработки
|
||
command_timestamp = command.get("timestamp")
|
||
if command_timestamp and command_timestamp != last_processed_timestamp:
|
||
action = command.get("action")
|
||
|
||
if action == "session_completed":
|
||
session_id = command.get("session_id")
|
||
room_name = command.get("room_name")
|
||
|
||
logger.info(f"[MONITOR] Processing session_completed for {session_id}")
|
||
await self.handle_session_completed(session_id, room_name)
|
||
|
||
last_processed_timestamp = command_timestamp
|
||
|
||
await asyncio.sleep(2) # Проверяем каждые 2 секунды
|
||
|
||
except Exception as e:
|
||
logger.error(f"[MONITOR] Error processing command: {e}")
|
||
await asyncio.sleep(5) # Больший интервал при ошибке
|
||
|
||
except asyncio.CancelledError:
|
||
logger.info("[MONITOR] Command monitoring stopped")
|
||
except Exception as e:
|
||
logger.error(f"[MONITOR] Command monitoring failed: {e}")
|
||
|
||
|
||
# Глобальный экземпляр менеджера
|
||
agent_manager = AgentManager()
|