upd end session system

This commit is contained in:
Даниил Ивлев 2025-09-14 21:36:17 +05:00
parent af93986176
commit 9d5331cfa3
2 changed files with 140 additions and 78 deletions

View File

@ -566,61 +566,134 @@ async def entrypoint(ctx: JobContext):
)
if not interviewer.interview_finalized:
await complete_interview_sequence(ctx.room.name, interviewer)
# Записываем команду завершения для агента и менеджера
try:
command_file = "agent_commands.json"
end_command = {
"action": "end_session",
"session_id": session_id,
"room_name": ctx.room.name,
"timestamp": datetime.now(UTC).isoformat(),
"source": "natural"
}
with open(command_file, "w", encoding="utf-8") as f:
json.dump(end_command, f, ensure_ascii=False, indent=2)
logger.info(f"[KEYWORD_DETECTION] End session command sent for session {session_id}")
except Exception as e:
logger.error(f"[KEYWORD_DETECTION] Failed to send end session command: {str(e)}")
break
return False
# --- Мониторинг команд завершения ---
async def monitor_end_commands():
"""Мониторит команды завершения сессии и лимит времени"""
command_file = "agent_commands.json"
# --- Проверка превышения лимита времени ---
async def check_time_limit(current_session_id: int, current_room_name: str):
"""Проверяет превышение лимита времени и завершает интервью"""
TIME_LIMIT_MINUTES = 60 # Жесткий лимит времени интервью
while not interviewer.interview_finalized:
if interviewer.interview_start_time is None or interviewer.interview_finalized:
return False
time_info = interviewer.get_time_info()
if time_info["elapsed_minutes"] >= TIME_LIMIT_MINUTES:
logger.warning(
f"[TIME_LIMIT] Interview exceeded {TIME_LIMIT_MINUTES} minutes "
f"({time_info['elapsed_minutes']:.1f} min), triggering completion for session {current_session_id}"
)
try:
# Проверяем команды завершения
if os.path.exists(command_file):
with open(command_file, encoding="utf-8") as f:
command = json.load(f)
command_file = "agent_commands.json"
end_command = {
"action": "end_session",
"session_id": current_session_id,
"room_name": current_room_name,
"timestamp": datetime.now(UTC).isoformat(),
"source": "time_limit"
}
if (
command.get("action") == "end_session"
and command.get("session_id") == session_id
):
logger.info(
f"[COMMAND] Received end_session command for session {session_id}"
)
with open(command_file, "w", encoding="utf-8") as f:
json.dump(end_command, f, ensure_ascii=False, indent=2)
if not interviewer.interview_finalized:
await complete_interview_sequence(
ctx.room.name, interviewer
)
break
# Проверяем превышение лимита времени
if interviewer.interview_start_time is not None:
time_info = interviewer.get_time_info()
if time_info["elapsed_minutes"] >= TIME_LIMIT_MINUTES:
logger.warning(
f"[TIME_LIMIT] Interview exceeded {TIME_LIMIT_MINUTES} minutes "
f"({time_info['elapsed_minutes']:.1f} min), forcing completion"
)
if not interviewer.interview_finalized:
await complete_interview_sequence(
ctx.room.name, interviewer
)
break
await asyncio.sleep(2) # Проверяем каждые 5 секунд
logger.info(f"[TIME_LIMIT] End session command sent for session {current_session_id}")
return True
except Exception as e:
logger.error(f"[COMMAND] Error monitoring commands: {str(e)}")
await asyncio.sleep(2)
logger.error(f"[TIME_LIMIT] Failed to send end session command: {str(e)}")
return False
# Запускаем мониторинг команд в фоне
asyncio.create_task(monitor_end_commands())
return False
# --- Мониторинг лимита времени ---
async def monitor_time_limit():
"""Мониторит превышение лимита времени интервью"""
# Запоминаем session_id и room_name на момент запуска задачи
monitored_session_id = session_id
monitored_room_name = ctx.room.name
logger.info(f"[TIME_MONITOR] Started monitoring session {monitored_session_id} in room {monitored_room_name}")
try:
while not interviewer.interview_finalized:
try:
if await check_time_limit(monitored_session_id, monitored_room_name):
logger.info(f"[TIME_MONITOR] Time limit triggered for session {monitored_session_id}")
break # Время превышено, команда отправлена
await asyncio.sleep(10) # Проверяем каждые 10 секунд
except Exception as e:
logger.error(f"[TIME_MONITOR] Error checking time limit for session {monitored_session_id}: {str(e)}")
await asyncio.sleep(10)
except asyncio.CancelledError:
logger.info(f"[TIME_MONITOR] Task cancelled for session {monitored_session_id}")
raise
finally:
logger.info(f"[TIME_MONITOR] Stopped monitoring session {monitored_session_id}")
# --- Мониторинг команд завершения ---
async def monitor_end_commands():
"""Мониторит команды завершения сессии"""
command_file = "agent_commands.json"
try:
while not interviewer.interview_finalized:
try:
# Проверяем команды завершения
if os.path.exists(command_file):
with open(command_file, encoding="utf-8") as f:
command = json.load(f)
if (
command.get("action") == "end_session"
and command.get("session_id") == session_id
):
logger.info(
f"[COMMAND] Received end_session command for session {session_id}"
)
if not interviewer.interview_finalized:
await complete_interview_sequence(
ctx.room.name, interviewer
)
break
await asyncio.sleep(2) # Проверяем каждые 2 секунды
except Exception as e:
logger.error(f"[COMMAND] Error monitoring commands: {str(e)}")
await asyncio.sleep(2)
except asyncio.CancelledError:
logger.info("[COMMAND] End commands monitoring task cancelled")
raise
# Запускаем мониторинг команд и времени в фоне
end_commands_task = asyncio.create_task(monitor_end_commands())
time_limit_task = asyncio.create_task(monitor_time_limit())
@session.on("user_state_changed")
def on_user_state_changed(event):
@ -650,16 +723,19 @@ async def entrypoint(ctx: JobContext):
async def complete_interview_sequence(room_name: str, interviewer_instance):
"""
Полная цепочка завершения интервью:
1. Финализация диалога в БД
2. Закрытие комнаты LiveKit
3. Завершение процесса агента
1. Отмена фоновых задач
2. Финализация диалога в БД
3. Закрытие комнаты LiveKit
"""
logger.info("[SEQUENCE] Starting interview completion sequence")
# Шаг 1: Финализируем интервью в БД
logger.info("[SEQUENCE] Step 1: Finalizing interview in database")
# Шаг 1: Задачи мониторинга завершатся естественным образом через interviewer.interview_finalized
logger.info("[SEQUENCE] Step 1: Background monitoring tasks will finish naturally after finalize_interview sets the flag")
# Шаг 2: Финализируем интервью в БД
logger.info("[SEQUENCE] Step 2: Finalizing interview in database")
await finalize_interview(room_name, interviewer_instance)
logger.info("[SEQUENCE] Step 1: Database finalization completed")
logger.info("[SEQUENCE] Step 2: Database finalization completed")
# Даём время на завершение всех DB операций
await asyncio.sleep(1)
@ -670,30 +746,9 @@ async def entrypoint(ctx: JobContext):
await close_room(room_name)
logger.info(f"[SEQUENCE] Step 2: Room {room_name} closed successfully")
except Exception as e:
logger.error(f"[SEQUENCE] Step 2: Failed to close room: {str(e)}")
logger.info(
"[SEQUENCE] Step 2: Room closure failed, but continuing sequence"
)
# Шаг 3: освобождаем агента через файл команд
logger.info("[SEQUENCE] Step 3: Releasing agent session")
try:
# Сигнализируем менеджеру агентов о завершении сессии
command_file = "agent_commands.json"
release_command = {
"action": "session_completed",
"session_id": session_id,
"room_name": room_name,
"timestamp": datetime.now(UTC).isoformat(),
}
with open(command_file, "w", encoding="utf-8") as f:
json.dump(release_command, f, ensure_ascii=False, indent=2)
logger.info(f"[SEQUENCE] Step 3: Session {session_id} release signal sent")
except Exception as e:
logger.error(f"[SEQUENCE] Step 3: Failed to send release signal: {str(e)}")
logger.info("[SEQUENCE] Step 3: Continuing without release signal")
# Логируем как info, а не error - это ожидаемо при принудительном закрытии комнаты
logger.info(f"[SEQUENCE] Step 2: Room closure triggered expected disconnect: {str(e)}")
# Продолжаем работу - это не критичная ошибка
# --- Упрощенная логика обработки пользовательского ответа ---
async def handle_user_input(user_response: str):

View File

@ -370,17 +370,24 @@ class AgentManager:
):
action = command.get("action")
if action == "session_completed":
if action == "end_session":
session_id = command.get("session_id")
room_name = command.get("room_name")
logger.info(
f"[MONITOR] Processing session_completed for {session_id}"
)
await self.handle_session_completed(
session_id, room_name
f"[MONITOR] Processing end_session for session {session_id}"
)
# Обновляем статус агента только если он активен
if (self._agent_process and
self._agent_process.status == "active" and
self._agent_process.session_id == session_id):
await self.handle_session_completed(session_id, room_name)
logger.info(f"[MONITOR] Agent status updated to idle for session {session_id}")
else:
logger.info(f"[MONITOR] Agent already released or different session, skipping status update")
last_processed_timestamp = command_timestamp
await asyncio.sleep(2) # Проверяем каждые 2 секунды