diff --git a/ai_interviewer_agent.py b/ai_interviewer_agent.py index b95472f..6b7ad1e 100644 --- a/ai_interviewer_agent.py +++ b/ai_interviewer_agent.py @@ -2,6 +2,7 @@ import asyncio import json import logging import os +import time from datetime import datetime # Принудительно устанавливаем UTF-8 для Windows @@ -23,6 +24,7 @@ from app.core.database import get_session from app.repositories.interview_repository import InterviewRepository from app.repositories.resume_repository import ResumeRepository from app.services.interview_finalization_service import InterviewFinalizationService +from app.services.interview_service import InterviewRoomService from rag.settings import settings logger = logging.getLogger("ai-interviewer") @@ -61,7 +63,7 @@ class InterviewAgent: self.last_user_response = None self.intro_done = False # Новый флаг — произнесено ли приветствие self.interview_finalized = False # Флаг завершения интервью - + # Трекинг времени интервью import time @@ -160,9 +162,35 @@ class InterviewAgent: - Опыт работы: {candidate_years} лет - Ключевые навыки: {candidate_skills} -ПЛАН ИНТЕРВЬЮ (используй как руководство, но адаптируйся): +ЦЕЛЬ ИНТЕРВЬЮ: + +Найти кандидата, который не только подходит по техническим навыкам, но и силён по мягким навыкам, культуре и потенциалу. +Задачи интервью: +- Выявить сильные и слабые стороны кандидата. +- Понять, насколько он подходит к вакансии и соответствует интервью. +- Проверить мышление, мотивацию и способность адаптироваться. + +ПОКАЗАТЕЛИ "ДОСТОЙНОГО КАНДИДАТА": +- Глубокое понимание ключевых технологий ({candidate_skills}). +- Умение решать проблемы, а не просто отвечать на вопросы. +- Чёткая и логичная коммуникация. +- Способность учиться и адаптироваться. +- Совпадение ценностей и принципов с командой и компанией. + +ПЛАН ИНТЕРВЬЮ (как руководство, адаптируйся по ситуации) + {sections_info} +ТИПЫ ВОПРОСОВ: +Поведенческие (30%) — выяснить, как кандидат действовал в реальных ситуациях. +Пример: "Расскажи про ситуацию, когда ты столкнулся с трудной задачей на проекте. Что ты сделал?" + +Технические (50%) — проверить глубину знаний и практические навыки. +Пример: "Как бы ты реализовал X?" или "Объясни разницу между A и B." + +Проблемные / кейсы (20%) — проверить мышление и подход к решению. +Пример: "У нас есть система, которая падает раз в неделю. Как бы ты подошёл к диагностике проблемы?" + ВРЕМЯ ИНТЕРВЬЮ: - Запланированная длительность: {self.duration_minutes} минут - Прошло времени: {elapsed_minutes:.1f} минут ({time_percentage:.0f}%) @@ -172,26 +200,36 @@ class InterviewAgent: ФОКУС-ОБЛАСТИ: {focus_areas_str} КЛЮЧЕВЫЕ ОЦЕНОЧНЫЕ ТОЧКИ: {evaluation_points_str} +КРАСНЫЕ ФЛАГИ: +Во время интервью отмечай следующие негативные сигналы: +- Не может объяснить собственные решения. +- Противоречит сам себе или врёт. +- Агрессивная, пассивная или неуважительная коммуникация. +- Нет желания учиться или интереса к проекту. +- Перекладывает ответственность на других, не признаёт ошибок. + ИНСТРУКЦИИ: 1. Начни с приветствия: {greeting} 2. Адаптируй вопросы под ответы кандидата +3. Не повторяй то, что клиент тебе сказал, лучше показывай, что понял, услышал и иди дальше. Лишний раз его не хвали 3. Следи за временем - при превышении 80% времени начинай завершать интервью 4. Оценивай качество и глубину ответов кандидата -5. Завершай интервью если: +5. Если получаешь сообщение "[СИСТЕМА] Клиент молчит..." - это означает проблемы со связью или кандидат растерялся. Скажи что-то вроде "Приём! Ты меня слышишь?" или "Всё в порядке? Связь не пропала?" +6. Завершай интервью если: - Получил достаточно информации для оценки - Время почти истекло (>90% от запланированного) - Кандидат дал исчерпывающие ответы -6. При завершении спроси о вопросах кандидата и поблагодари + - Получаешь сообщение "[СИСТЕМА] Похоже клиент отключился" +7. При завершении спроси о вопросах кандидата и поблагодари ВАЖНО: Отвечай естественно и разговорно, как живой интервьюер! ЗАВЕРШЕНИЕ ИНТЕРВЬЮ: Когда нужно завершить интервью (время истекло, получена достаточная информация), -используй одну из этих ключевых фраз: -- "Спасибо за интересную беседу! У тебя есть вопросы ко мне?" -- "Это всё, что я хотел узнать. Есть ли у вас вопросы?" -- "Интервью подходит к концу. У тебя есть вопросы ко мне?" -ФИНАЛЬНАЯ ФРАЗА после которой ничего не будет: +используй фразу типа: +- "Спасибо за интересную беседу! Интервью подходит к концу. У тебя есть вопросы ко мне?" + +ФИНАЛЬНАЯ ФРАЗА после которой конец интервью: - До скорой встречи! ЗАВЕРШАЙ ИНТЕРВЬЮ, если: @@ -288,8 +326,8 @@ async def entrypoint(ctx: JobContext): logger.info("[INIT] Using default interview plan") interview_plan = { "interview_structure": { - "duration_minutes": 2, # ТЕСТОВЫЙ РЕЖИМ - 2 минуты - "greeting": "Привет! Это быстрое тестовое интервью на 2 минуты. Готов?", + "duration_minutes": 5, # ТЕСТОВЫЙ РЕЖИМ - 5 минут + "greeting": "Привет! Это быстрое тестовое интервью на 5 минут. Готов?", "sections": [ { "name": "Знакомство", @@ -348,8 +386,14 @@ async def entrypoint(ctx: JobContext): # Создаем обычный Agent и Session agent = Agent(instructions=interviewer.get_system_instructions()) - # Создаем AgentSession с обычным TTS - session = AgentSession(vad=silero.VAD.load(), stt=stt, llm=llm, tts=tts) + # Создаем AgentSession с обычным TTS и детекцией неактивности пользователя + session = AgentSession( + vad=silero.VAD.load(), + stt=stt, + llm=llm, + tts=tts, + user_away_timeout=7.0 # 7 секунд неактивности для срабатывания away + ) # --- Сохранение диалога в БД --- async def save_dialogue_to_db(room_name: str, dialogue_history: list): @@ -446,7 +490,20 @@ async def entrypoint(ctx: JobContext): if not interviewer.interview_finalized: # Запускаем полную цепочку завершения интервью - await complete_interview_sequence(ctx.room.name, interviewer) + try: + session_generator = get_session() + db = await anext(session_generator) + try: + interview_repo = InterviewRepository(db) + resume_repo = ResumeRepository(db) + interview_service = InterviewRoomService( + interview_repo, resume_repo + ) + await interview_service.end_interview_session(session_id) + finally: + await session_generator.aclose() + except Exception as e: + logger.error(f"[FINALIZE] Error finalizing interview: {str(e)}") return True break @@ -477,7 +534,7 @@ async def entrypoint(ctx: JobContext): ) break - await asyncio.sleep(2) # Проверяем каждые 2 секунды + await asyncio.sleep(1) # Проверяем каждые 1 секунды except Exception as e: logger.error(f"[COMMAND] Error monitoring commands: {str(e)}") @@ -485,6 +542,22 @@ async def entrypoint(ctx: JobContext): # Запускаем мониторинг команд в фоне asyncio.create_task(monitor_end_commands()) + + # --- Обработчик состояния пользователя (замена мониторинга тишины) --- + @session.on("user_state_changed") + def on_user_state_changed(event): + """Обработчик изменения состояния пользователя (активен/неактивен)""" + async def on_change(): + logger.info(f"[USER_STATE] User state changed to: {event.new_state}") + + if event.new_state == "away" and interviewer.intro_done: + logger.info("[USER_STATE] User went away, generating response...") + + # Генерируем ответ через LLM с инструкциями + await session.generate_reply( + instructions="Клиент молчит уже больше 10 секунд. Проверь связь фразой вроде 'Приём! Ты меня слышишь?' или 'Связь не пропала?'" + ) + asyncio.create_task(on_change()) # --- Полная цепочка завершения интервью --- async def complete_interview_sequence(room_name: str, interviewer_instance): @@ -494,47 +567,31 @@ async def entrypoint(ctx: JobContext): 2. Закрытие комнаты LiveKit 3. Завершение процесса агента """ + logger.info("[SEQUENCE] Starting interview completion sequence") + + # Шаг 1: Финализируем интервью в БД + logger.info("[SEQUENCE] Step 1: Finalizing interview in database") + await finalize_interview(room_name, interviewer_instance) + logger.info("[SEQUENCE] Step 1: Database finalization completed") + + # Даём время на завершение всех DB операций + await asyncio.sleep(1) + + # Шаг 2: Закрываем комнату LiveKit + logger.info("[SEQUENCE] Step 2: Closing LiveKit room") try: - logger.info("[SEQUENCE] Starting interview completion sequence") - - # Шаг 1: Финализируем интервью в БД - logger.info("[SEQUENCE] Step 1: Finalizing interview in database") - await finalize_interview(room_name, interviewer_instance) - logger.info("[SEQUENCE] Step 1: Database finalization completed") - - # Даём время на завершение всех DB операций - await asyncio.sleep(1) - - # Шаг 2: Закрываем комнату LiveKit - logger.info("[SEQUENCE] Step 2: Closing LiveKit room") - try: - await close_room(room_name) - logger.info(f"[SEQUENCE] Step 2: Room {room_name} closed successfully") - except Exception as e: - logger.error(f"[SEQUENCE] Step 2: Failed to close room: {str(e)}") - logger.info( - "[SEQUENCE] Step 2: Room closure failed, but continuing sequence" - ) - - # Шаг 3: Завершаем процесс агента - logger.info("[SEQUENCE] Step 3: Terminating agent process") - await asyncio.sleep(2) # Даём время на завершение всех операций - logger.info("[SEQUENCE] Step 3: Force terminating agent process") - import os - - os._exit(0) # Принудительное завершение процесса - + await close_room(room_name) + logger.info(f"[SEQUENCE] Step 2: Room {room_name} closed successfully") except Exception as e: - logger.error(f"[SEQUENCE] Error in interview completion sequence: {str(e)}") - # Fallback: принудительно завершаем процесс даже при ошибках - logger.info("[SEQUENCE] Fallback: Force terminating process") - await asyncio.sleep(1) - import os - - os._exit(1) + logger.error(f"[SEQUENCE] Step 2: Failed to close room: {str(e)}") + logger.info( + "[SEQUENCE] Step 2: Room closure failed, but continuing sequence" + ) + # --- Упрощенная логика обработки пользовательского ответа --- async def handle_user_input(user_response: str): + current_section = interviewer.get_current_section() # Сохраняем ответ пользователя @@ -575,6 +632,7 @@ async def entrypoint(ctx: JobContext): if role == "user": asyncio.create_task(handle_user_input(text)) elif role == "assistant": + # Сохраняем ответ агента в историю диалога current_section = interviewer.get_current_section() interviewer.conversation_history.append( diff --git a/app/services/ai_interviewer_service.py b/app/services/ai_interviewer_service.py deleted file mode 100644 index 97f0121..0000000 --- a/app/services/ai_interviewer_service.py +++ /dev/null @@ -1,300 +0,0 @@ -import asyncio -import json -import logging -from datetime import datetime - -from livekit import rtc - -from rag.settings import settings - -logger = logging.getLogger(__name__) - - -class AIInterviewerService: - """Сервис AI интервьюера, который подключается к LiveKit комнате как участник""" - - def __init__(self, interview_session_id: int, resume_data: dict): - self.interview_session_id = interview_session_id - self.resume_data = resume_data - self.room: rtc.Room | None = None - self.audio_source: rtc.AudioSource | None = None - self.conversation_history: list[dict] = [] - self.current_question_index = 0 - self.interview_questions = [] - - async def connect_to_room(self, room_name: str, token: str): - """Подключение AI агента к LiveKit комнате""" - try: - self.room = rtc.Room() - - # Настройка обработчиков событий - self.room.on("participant_connected", self.on_participant_connected) - self.room.on("track_subscribed", self.on_track_subscribed) - self.room.on("data_received", self.on_data_received) - - # Подключение к комнате - await self.room.connect(settings.livekit_url, token) - logger.info(f"AI agent connected to room: {room_name}") - - # Создание аудио источника для TTS - self.audio_source = rtc.AudioSource(sample_rate=16000, num_channels=1) - track = rtc.LocalAudioTrack.create_audio_track( - "ai_voice", self.audio_source - ) - - # Публикация аудио трека - await self.room.local_participant.publish_track( - track, rtc.TrackPublishOptions() - ) - - # Генерация первого вопроса - await self.generate_interview_questions() - await self.start_interview() - - except Exception as e: - logger.error(f"Error connecting to room: {str(e)}") - raise - - async def on_participant_connected(self, participant: rtc.RemoteParticipant): - """Обработка подключения пользователя""" - logger.info(f"Participant connected: {participant.identity}") - # Можем отправить приветственное сообщение - await self.send_message({"type": "ai_speaking_start"}) - - async def on_track_subscribed( - self, - track: rtc.Track, - publication: rtc.RemoteTrackPublication, - participant: rtc.RemoteParticipant, - ): - """Обработка получения аудио трека от пользователя""" - if track.kind == rtc.TrackKind.KIND_AUDIO: - logger.info("Subscribed to user audio track") - # Настройка обработки аудио для STT - audio_stream = rtc.AudioStream(track) - asyncio.create_task(self.process_user_audio(audio_stream)) - - async def on_data_received(self, data: bytes, participant: rtc.RemoteParticipant): - """Обработка сообщений от фронтенда""" - try: - message = json.loads(data.decode()) - await self.handle_frontend_message(message) - except Exception as e: - logger.error(f"Error processing data message: {str(e)}") - - async def handle_frontend_message(self, message: dict): - """Обработка сообщений от фронтенда""" - msg_type = message.get("type") - - if msg_type == "start_interview": - await self.start_interview() - elif msg_type == "end_interview": - await self.end_interview() - elif msg_type == "user_finished_speaking": - # Пользователь закончил говорить, можем обрабатывать его ответ - pass - - async def process_user_audio(self, audio_stream: rtc.AudioStream): - """Обработка аудио от пользователя через STT""" - try: - # Здесь будет интеграция с STT сервисом - # Пока заглушка - async for audio_frame in audio_stream: - # TODO: Отправить аудио в STT (Whisper API) - # user_text = await self.speech_to_text(audio_frame) - # if user_text: - # await self.process_user_response(user_text) - pass - except Exception as e: - logger.error(f"Error processing user audio: {str(e)}") - - async def generate_interview_questions(self): - """Генерация вопросов для интервью на основе резюме""" - try: - from rag.registry import registry - - chat_model = registry.get_chat_model() - - # Используем существующую логику генерации вопросов - questions_prompt = f""" - Сгенерируй 8 вопросов для голосового собеседования кандидата. - - РЕЗЮМЕ КАНДИДАТА: - Имя: {self.resume_data.get("name", "Не указано")} - Навыки: {", ".join(self.resume_data.get("skills", []))} - Опыт работы: {self.resume_data.get("total_years", 0)} лет - Образование: {self.resume_data.get("education", "Не указано")} - - ВАЖНО: - 1. Вопросы должны быть короткими и ясными для голосового формата - 2. Начни с простого приветствия и представления - 3. Каждый вопрос должен занимать не более 2-3 предложений - 4. Используй естественную разговорную речь - - Верни только JSON массив строк с вопросами: - ["Привет! Расскажи немного о себе", "Какой у тебя опыт в...", ...] - """ - - from langchain.schema import HumanMessage, SystemMessage - - messages = [ - SystemMessage( - content="Ты HR интервьюер. Говори естественно и дружелюбно." - ), - HumanMessage(content=questions_prompt), - ] - - response = chat_model.get_llm().invoke(messages) - response_text = response.content.strip() - - # Парсим JSON ответ - if response_text.startswith("[") and response_text.endswith("]"): - self.interview_questions = json.loads(response_text) - else: - # Fallback вопросы - self.interview_questions = [ - "Привет! Расскажи немного о себе и своем опыте", - "Что тебя привлекает в этой позиции?", - "Расскажи о своем самом значимом проекте", - "Какие технологии ты используешь в работе?", - "Как ты решаешь сложные задачи?", - "Есть ли у тебя вопросы ко мне?", - ] - - logger.info( - f"Generated {len(self.interview_questions)} interview questions" - ) - - except Exception as e: - logger.error(f"Error generating questions: {str(e)}") - # Используем базовые вопросы - self.interview_questions = [ - "Привет! Расскажи о своем опыте", - "Что тебя интересует в этой позиции?", - "Есть ли у тебя вопросы?", - ] - - async def start_interview(self): - """Начало интервью""" - if not self.interview_questions: - await self.generate_interview_questions() - - # Отправляем первый вопрос - await self.ask_next_question() - - async def ask_next_question(self): - """Задать следующий вопрос""" - if self.current_question_index >= len(self.interview_questions): - await self.end_interview() - return - - question = self.interview_questions[self.current_question_index] - - # Отправляем сообщение фронтенду - await self.send_message( - { - "type": "question", - "text": question, - "questionNumber": self.current_question_index + 1, - } - ) - - # Конвертируем в речь и воспроизводим - # TODO: Реализовать TTS - # audio_data = await self.text_to_speech(question) - # await self.play_audio(audio_data) - - self.current_question_index += 1 - logger.info(f"Asked question {self.current_question_index}: {question}") - - async def process_user_response(self, user_text: str): - """Обработка ответа пользователя""" - # Сохраняем ответ в историю - self.conversation_history.append( - { - "type": "user_response", - "text": user_text, - "timestamp": datetime.utcnow().isoformat(), - "question_index": self.current_question_index - 1, - } - ) - - # Можем добавить анализ ответа через LLM - # И решить - задать уточняющий вопрос или перейти к следующему - - # Пока просто переходим к следующему вопросу - await asyncio.sleep(1) # Небольшая пауза - await self.ask_next_question() - - async def send_message(self, message: dict): - """Отправка сообщения фронтенду""" - if self.room: - data = json.dumps(message).encode() - await self.room.local_participant.publish_data(data) - - async def play_audio(self, audio_data: bytes): - """Воспроизведение аудио через LiveKit""" - if self.audio_source: - # TODO: Конвертировать audio_data в нужный формат и отправить - pass - - async def end_interview(self): - """Завершение интервью""" - await self.send_message( - { - "type": "interview_complete", - "summary": f"Interview completed with {len(self.conversation_history)} responses", - } - ) - - # Сохраняем транскрипт в базу данных - transcript = json.dumps(self.conversation_history, ensure_ascii=False, indent=2) - - # TODO: Обновить interview_session в БД с транскриптом - - logger.info("Interview completed") - - # Отключение от комнаты - if self.room: - await self.room.disconnect() - - -class AIInterviewerManager: - """Менеджер для управления AI интервьюерами""" - - def __init__(self): - self.active_sessions: dict[int, AIInterviewerService] = {} - - async def start_interview_session( - self, interview_session_id: int, room_name: str, resume_data: dict - ): - """Запуск AI интервьюера для сессии""" - try: - # Создаем токен для AI агента - # Нужно создать специальный токен для AI агента - - ai_interviewer = AIInterviewerService(interview_session_id, resume_data) - - # TODO: Генерировать токен для AI агента - # ai_token = generate_ai_agent_token(room_name) - # await ai_interviewer.connect_to_room(room_name, ai_token) - - self.active_sessions[interview_session_id] = ai_interviewer - - logger.info(f"Started AI interviewer for session: {interview_session_id}") - - except Exception as e: - logger.error(f"Error starting AI interviewer: {str(e)}") - raise - - async def stop_interview_session(self, interview_session_id: int): - """Остановка AI интервьюера""" - if interview_session_id in self.active_sessions: - ai_interviewer = self.active_sessions[interview_session_id] - await ai_interviewer.end_interview() - del self.active_sessions[interview_session_id] - logger.info(f"Stopped AI interviewer for session: {interview_session_id}") - - -# Глобальный менеджер -ai_interviewer_manager = AIInterviewerManager() diff --git a/celery_worker/celery_app.py b/celery_worker/celery_app.py index 1a86b33..5bf945e 100644 --- a/celery_worker/celery_app.py +++ b/celery_worker/celery_app.py @@ -6,7 +6,7 @@ celery_app = Celery( "hr_ai_backend", broker=f"redis://{settings.redis_cache_url}:{settings.redis_cache_port}/{settings.redis_cache_db}", backend=f"redis://{settings.redis_cache_url}:{settings.redis_cache_port}/{settings.redis_cache_db}", - include=["celery_worker.tasks"], + include=["celery_worker.tasks", "celery_worker.interview_analysis_task"], ) celery_app.conf.update( diff --git a/celery_worker/interview_analysis_task.py b/celery_worker/interview_analysis_task.py index 6cb9e45..81a7586 100644 --- a/celery_worker/interview_analysis_task.py +++ b/celery_worker/interview_analysis_task.py @@ -51,15 +51,30 @@ def generate_interview_report(resume_id: int): # Получаем историю интервью interview_session = _get_interview_session(db, resume_id) + logger.info(f"[INTERVIEW_ANALYSIS] Found interview_session: {interview_session is not None}") + + if interview_session: + logger.info(f"[INTERVIEW_ANALYSIS] Session ID: {interview_session.id}, dialogue_history length: {len(interview_session.dialogue_history) if interview_session.dialogue_history else 0}") + else: + logger.warning(f"[INTERVIEW_ANALYSIS] No interview session found for resume_id: {resume_id}") # Парсим JSON данные parsed_resume = _parse_json_field(resume.parsed_data) interview_plan = _parse_json_field(resume.interview_plan) - dialogue_history = ( - _parse_json_field(interview_session.dialogue_history) - if interview_session - else [] - ) + # Парсим dialogue_history отдельно (это список, а не словарь) + dialogue_history = [] + if interview_session and interview_session.dialogue_history: + if isinstance(interview_session.dialogue_history, list): + dialogue_history = interview_session.dialogue_history + elif isinstance(interview_session.dialogue_history, str): + try: + dialogue_history = json.loads(interview_session.dialogue_history) + if not isinstance(dialogue_history, list): + dialogue_history = [] + except (json.JSONDecodeError, TypeError): + dialogue_history = [] + + logger.info(f"[INTERVIEW_ANALYSIS] Parsed dialogue_history length: {len(dialogue_history)}") # Генерируем отчет report = _generate_comprehensive_report( @@ -140,11 +155,25 @@ def _get_interview_session(db, resume_id: int): try: from app.models.interview import InterviewSession - return ( + logger.info(f"[GET_SESSION] Looking for interview session with resume_id: {resume_id}") + + session = ( db.query(InterviewSession) .filter(InterviewSession.resume_id == resume_id) .first() ) + + if session: + logger.info(f"[GET_SESSION] Found session {session.id} for resume {resume_id}") + logger.info(f"[GET_SESSION] Session status: {session.status}") + logger.info(f"[GET_SESSION] Dialogue history type: {type(session.dialogue_history)}") + if session.dialogue_history: + logger.info(f"[GET_SESSION] Raw dialogue_history preview: {str(session.dialogue_history)[:200]}...") + else: + logger.warning(f"[GET_SESSION] No session found for resume_id: {resume_id}") + + return session + except Exception as e: logger.error(f"Error getting interview session: {e}") return None diff --git a/celery_worker/process_cleanup_task.py b/celery_worker/process_cleanup_task.py deleted file mode 100644 index 0a13709..0000000 --- a/celery_worker/process_cleanup_task.py +++ /dev/null @@ -1,202 +0,0 @@ -import psutil - -from celery_worker.celery_app import celery_app -from celery_worker.database import get_sync_session - - -@celery_app.task(bind=True) -def cleanup_interview_processes_task(self): - """ - Периодическая задача очистки мертвых AI процессов - """ - try: - self.update_state( - state="PROGRESS", - meta={"status": "Checking for dead AI processes...", "progress": 10}, - ) - - # Используем синхронный подход для Celery - with get_sync_session() as session: - # Получаем все "активные" сессии из БД - from app.models.interview import InterviewSession - - active_sessions = ( - session.query(InterviewSession) - .filter(InterviewSession.ai_agent_status == "running") - .all() - ) - - cleaned_count = 0 - total_sessions = len(active_sessions) - - self.update_state( - state="PROGRESS", - meta={ - "status": f"Found {total_sessions} potentially active sessions...", - "progress": 30, - }, - ) - - for i, interview_session in enumerate(active_sessions): - if interview_session.ai_agent_pid: - try: - # Проверяем, жив ли процесс - process = psutil.Process(interview_session.ai_agent_pid) - - if not process.is_running(): - # Процесс мертв, обновляем статус - interview_session.ai_agent_pid = None - interview_session.ai_agent_status = "stopped" - session.add(interview_session) - cleaned_count += 1 - - except psutil.NoSuchProcess: - # Процесс не существует - interview_session.ai_agent_pid = None - interview_session.ai_agent_status = "stopped" - session.add(interview_session) - cleaned_count += 1 - - except Exception as e: - print( - f"Error checking process {interview_session.ai_agent_pid}: {str(e)}" - ) - - # Обновляем прогресс - progress = 30 + (i + 1) / total_sessions * 60 - self.update_state( - state="PROGRESS", - meta={ - "status": f"Processed {i + 1}/{total_sessions} sessions...", - "progress": progress, - }, - ) - - # Сохраняем изменения - session.commit() - - self.update_state( - state="SUCCESS", - meta={ - "status": f"Cleanup completed. Cleaned {cleaned_count} dead processes.", - "progress": 100, - "cleaned_count": cleaned_count, - "total_checked": total_sessions, - }, - ) - - return { - "status": "completed", - "cleaned_count": cleaned_count, - "total_checked": total_sessions, - } - - except Exception as e: - self.update_state( - state="FAILURE", - meta={ - "status": f"Error during cleanup: {str(e)}", - "progress": 0, - "error": str(e), - }, - ) - raise - - -@celery_app.task(bind=True) -def force_kill_interview_process_task(self, session_id: int): - """ - Принудительное завершение AI процесса для сессии - """ - try: - self.update_state( - state="PROGRESS", - meta={"status": f"Looking for session {session_id}...", "progress": 20}, - ) - - with get_sync_session() as session: - from app.models.interview import InterviewSession - - interview_session = ( - session.query(InterviewSession) - .filter(InterviewSession.id == session_id) - .first() - ) - - if not interview_session: - return { - "status": "not_found", - "message": f"Session {session_id} not found", - } - - if not interview_session.ai_agent_pid: - return { - "status": "no_process", - "message": f"No AI process found for session {session_id}", - } - - self.update_state( - state="PROGRESS", - meta={ - "status": f"Terminating process {interview_session.ai_agent_pid}...", - "progress": 50, - }, - ) - - try: - process = psutil.Process(interview_session.ai_agent_pid) - - # Graceful terminate - process.terminate() - - # Ждем до 5 секунд - import time - - for _ in range(50): - if not process.is_running(): - break - time.sleep(0.1) - - # Если не помогло, убиваем принудительно - if process.is_running(): - process.kill() - time.sleep(0.5) # Даем время на завершение - - # Обновляем статус в БД - interview_session.ai_agent_pid = None - interview_session.ai_agent_status = "stopped" - session.add(interview_session) - session.commit() - - self.update_state( - state="SUCCESS", - meta={"status": "Process terminated successfully", "progress": 100}, - ) - - return { - "status": "terminated", - "message": f"AI process for session {session_id} terminated successfully", - } - - except psutil.NoSuchProcess: - # Процесс уже не существует - interview_session.ai_agent_pid = None - interview_session.ai_agent_status = "stopped" - session.add(interview_session) - session.commit() - - return { - "status": "already_dead", - "message": "Process was already dead, cleaned up database", - } - - except Exception as e: - self.update_state( - state="FAILURE", - meta={ - "status": f"Error terminating process: {str(e)}", - "progress": 0, - "error": str(e), - }, - ) - raise