diff --git a/ai_interviewer_agent.py b/ai_interviewer_agent.py index 1bf98db..8c9a5ae 100644 --- a/ai_interviewer_agent.py +++ b/ai_interviewer_agent.py @@ -566,61 +566,134 @@ async def entrypoint(ctx: JobContext): ) if not interviewer.interview_finalized: - await complete_interview_sequence(ctx.room.name, interviewer) + # Записываем команду завершения для агента и менеджера + try: + command_file = "agent_commands.json" + end_command = { + "action": "end_session", + "session_id": session_id, + "room_name": ctx.room.name, + "timestamp": datetime.now(UTC).isoformat(), + "source": "natural" + } + + with open(command_file, "w", encoding="utf-8") as f: + json.dump(end_command, f, ensure_ascii=False, indent=2) + + logger.info(f"[KEYWORD_DETECTION] End session command sent for session {session_id}") + + except Exception as e: + logger.error(f"[KEYWORD_DETECTION] Failed to send end session command: {str(e)}") + break return False - # --- Мониторинг команд завершения --- - async def monitor_end_commands(): - """Мониторит команды завершения сессии и лимит времени""" - command_file = "agent_commands.json" + # --- Проверка превышения лимита времени --- + async def check_time_limit(current_session_id: int, current_room_name: str): + """Проверяет превышение лимита времени и завершает интервью""" TIME_LIMIT_MINUTES = 60 # Жесткий лимит времени интервью - while not interviewer.interview_finalized: + if interviewer.interview_start_time is None or interviewer.interview_finalized: + return False + + time_info = interviewer.get_time_info() + if time_info["elapsed_minutes"] >= TIME_LIMIT_MINUTES: + logger.warning( + f"[TIME_LIMIT] Interview exceeded {TIME_LIMIT_MINUTES} minutes " + f"({time_info['elapsed_minutes']:.1f} min), triggering completion for session {current_session_id}" + ) + try: - # Проверяем команды завершения - if os.path.exists(command_file): - with open(command_file, encoding="utf-8") as f: - command = json.load(f) + command_file = "agent_commands.json" + end_command = { + "action": "end_session", + "session_id": current_session_id, + "room_name": current_room_name, + "timestamp": datetime.now(UTC).isoformat(), + "source": "time_limit" + } - if ( - command.get("action") == "end_session" - and command.get("session_id") == session_id - ): - logger.info( - f"[COMMAND] Received end_session command for session {session_id}" - ) + with open(command_file, "w", encoding="utf-8") as f: + json.dump(end_command, f, ensure_ascii=False, indent=2) - if not interviewer.interview_finalized: - await complete_interview_sequence( - ctx.room.name, interviewer - ) - break - - # Проверяем превышение лимита времени - if interviewer.interview_start_time is not None: - time_info = interviewer.get_time_info() - if time_info["elapsed_minutes"] >= TIME_LIMIT_MINUTES: - logger.warning( - f"[TIME_LIMIT] Interview exceeded {TIME_LIMIT_MINUTES} minutes " - f"({time_info['elapsed_minutes']:.1f} min), forcing completion" - ) - - if not interviewer.interview_finalized: - await complete_interview_sequence( - ctx.room.name, interviewer - ) - break - - await asyncio.sleep(2) # Проверяем каждые 5 секунд + logger.info(f"[TIME_LIMIT] End session command sent for session {current_session_id}") + return True except Exception as e: - logger.error(f"[COMMAND] Error monitoring commands: {str(e)}") - await asyncio.sleep(2) + logger.error(f"[TIME_LIMIT] Failed to send end session command: {str(e)}") + return False - # Запускаем мониторинг команд в фоне - asyncio.create_task(monitor_end_commands()) + return False + + # --- Мониторинг лимита времени --- + async def monitor_time_limit(): + """Мониторит превышение лимита времени интервью""" + # Запоминаем session_id и room_name на момент запуска задачи + monitored_session_id = session_id + monitored_room_name = ctx.room.name + + logger.info(f"[TIME_MONITOR] Started monitoring session {monitored_session_id} in room {monitored_room_name}") + + try: + while not interviewer.interview_finalized: + try: + if await check_time_limit(monitored_session_id, monitored_room_name): + logger.info(f"[TIME_MONITOR] Time limit triggered for session {monitored_session_id}") + break # Время превышено, команда отправлена + + await asyncio.sleep(10) # Проверяем каждые 10 секунд + + except Exception as e: + logger.error(f"[TIME_MONITOR] Error checking time limit for session {monitored_session_id}: {str(e)}") + await asyncio.sleep(10) + + except asyncio.CancelledError: + logger.info(f"[TIME_MONITOR] Task cancelled for session {monitored_session_id}") + raise + finally: + logger.info(f"[TIME_MONITOR] Stopped monitoring session {monitored_session_id}") + + # --- Мониторинг команд завершения --- + async def monitor_end_commands(): + """Мониторит команды завершения сессии""" + command_file = "agent_commands.json" + + try: + while not interviewer.interview_finalized: + try: + # Проверяем команды завершения + if os.path.exists(command_file): + with open(command_file, encoding="utf-8") as f: + command = json.load(f) + + if ( + command.get("action") == "end_session" + and command.get("session_id") == session_id + ): + logger.info( + f"[COMMAND] Received end_session command for session {session_id}" + ) + + if not interviewer.interview_finalized: + await complete_interview_sequence( + ctx.room.name, interviewer + ) + break + + await asyncio.sleep(2) # Проверяем каждые 2 секунды + + except Exception as e: + logger.error(f"[COMMAND] Error monitoring commands: {str(e)}") + await asyncio.sleep(2) + + except asyncio.CancelledError: + logger.info("[COMMAND] End commands monitoring task cancelled") + raise + + # Запускаем мониторинг команд и времени в фоне + end_commands_task = asyncio.create_task(monitor_end_commands()) + time_limit_task = asyncio.create_task(monitor_time_limit()) @session.on("user_state_changed") def on_user_state_changed(event): @@ -650,16 +723,19 @@ async def entrypoint(ctx: JobContext): async def complete_interview_sequence(room_name: str, interviewer_instance): """ Полная цепочка завершения интервью: - 1. Финализация диалога в БД - 2. Закрытие комнаты LiveKit - 3. Завершение процесса агента + 1. Отмена фоновых задач + 2. Финализация диалога в БД + 3. Закрытие комнаты LiveKit """ logger.info("[SEQUENCE] Starting interview completion sequence") - # Шаг 1: Финализируем интервью в БД - logger.info("[SEQUENCE] Step 1: Finalizing interview in database") + # Шаг 1: Задачи мониторинга завершатся естественным образом через interviewer.interview_finalized + logger.info("[SEQUENCE] Step 1: Background monitoring tasks will finish naturally after finalize_interview sets the flag") + + # Шаг 2: Финализируем интервью в БД + logger.info("[SEQUENCE] Step 2: Finalizing interview in database") await finalize_interview(room_name, interviewer_instance) - logger.info("[SEQUENCE] Step 1: Database finalization completed") + logger.info("[SEQUENCE] Step 2: Database finalization completed") # Даём время на завершение всех DB операций await asyncio.sleep(1) @@ -670,30 +746,9 @@ async def entrypoint(ctx: JobContext): await close_room(room_name) logger.info(f"[SEQUENCE] Step 2: Room {room_name} closed successfully") except Exception as e: - logger.error(f"[SEQUENCE] Step 2: Failed to close room: {str(e)}") - logger.info( - "[SEQUENCE] Step 2: Room closure failed, but continuing sequence" - ) - # Шаг 3: освобождаем агента через файл команд - logger.info("[SEQUENCE] Step 3: Releasing agent session") - try: - # Сигнализируем менеджеру агентов о завершении сессии - command_file = "agent_commands.json" - release_command = { - "action": "session_completed", - "session_id": session_id, - "room_name": room_name, - "timestamp": datetime.now(UTC).isoformat(), - } - - with open(command_file, "w", encoding="utf-8") as f: - json.dump(release_command, f, ensure_ascii=False, indent=2) - - logger.info(f"[SEQUENCE] Step 3: Session {session_id} release signal sent") - - except Exception as e: - logger.error(f"[SEQUENCE] Step 3: Failed to send release signal: {str(e)}") - logger.info("[SEQUENCE] Step 3: Continuing without release signal") + # Логируем как info, а не error - это ожидаемо при принудительном закрытии комнаты + logger.info(f"[SEQUENCE] Step 2: Room closure triggered expected disconnect: {str(e)}") + # Продолжаем работу - это не критичная ошибка # --- Упрощенная логика обработки пользовательского ответа --- async def handle_user_input(user_response: str): diff --git a/app/services/agent_manager.py b/app/services/agent_manager.py index 471bb1c..2f85fdf 100644 --- a/app/services/agent_manager.py +++ b/app/services/agent_manager.py @@ -370,17 +370,24 @@ class AgentManager: ): action = command.get("action") - if action == "session_completed": + if action == "end_session": session_id = command.get("session_id") room_name = command.get("room_name") logger.info( - f"[MONITOR] Processing session_completed for {session_id}" - ) - await self.handle_session_completed( - session_id, room_name + f"[MONITOR] Processing end_session for session {session_id}" ) + # Обновляем статус агента только если он активен + if (self._agent_process and + self._agent_process.status == "active" and + self._agent_process.session_id == session_id): + + await self.handle_session_completed(session_id, room_name) + logger.info(f"[MONITOR] Agent status updated to idle for session {session_id}") + else: + logger.info(f"[MONITOR] Agent already released or different session, skipping status update") + last_processed_timestamp = command_timestamp await asyncio.sleep(2) # Проверяем каждые 2 секунды