diff --git a/ai_interviewer_agent.py b/ai_interviewer_agent.py index a20ba22..a23f6e7 100644 --- a/ai_interviewer_agent.py +++ b/ai_interviewer_agent.py @@ -3,7 +3,7 @@ import asyncio import json import logging import os -from typing import Dict, List +from typing import Dict, List, Optional from datetime import datetime # Принудительно устанавливаем UTF-8 для Windows @@ -12,26 +12,51 @@ if os.name == 'nt': # Windows if hasattr(sys, 'stdout') and hasattr(sys.stdout, 'reconfigure'): sys.stdout.reconfigure(encoding='utf-8', errors='replace') sys.stderr.reconfigure(encoding='utf-8', errors='replace') + + # Устанавливаем переменную окружения для Python + os.environ.setdefault('PYTHONIOENCODING', 'utf-8') + from livekit.agents import ( Agent, AgentSession, JobContext, WorkerOptions, cli, + NotGiven ) -from livekit.plugins import openai, deepgram, cartesia, silero, resemble +from livekit.plugins import openai, deepgram, cartesia, silero +from livekit.api import LiveKitAPI, DeleteRoomRequest from rag.settings import settings +from app.core.database import get_session +from app.repositories.interview_repository import InterviewRepository +from app.repositories.resume_repository import ResumeRepository +from app.services.interview_finalization_service import InterviewFinalizationService logger = logging.getLogger("ai-interviewer") logger.setLevel(logging.INFO) + +async def close_room(room_name: str): + """Закрывает LiveKit комнату полностью (отключает всех участников)""" + try: + api = LiveKitAPI(settings.livekit_url, settings.livekit_api_key, settings.livekit_api_secret) + # Создаем RoomService для управления комнатами + await api.room.delete_room(delete=DeleteRoomRequest(room=room_name)) + + logger.info(f"[ROOM_MANAGEMENT] Room {room_name} deleted successfully") + + except Exception as e: + logger.error(f"[ROOM_MANAGEMENT] Failed to delete room {room_name}: {str(e)}") + raise + + class InterviewAgent: """AI Agent для проведения собеседований с управлением диалогом""" - + def __init__(self, interview_plan: Dict): self.interview_plan = interview_plan self.conversation_history = [] - + # Состояние диалога self.current_section = 0 self.current_question_in_section = 0 @@ -39,378 +64,416 @@ class InterviewAgent: self.waiting_for_response = False self.last_question = None self.last_user_response = None + self.intro_done = False # Новый флаг — произнесено ли приветствие + self.interview_finalized = False # Флаг завершения интервью + + # Трекинг времени интервью + import time + self.interview_start_time = time.time() + self.duration_minutes = interview_plan.get('interview_structure', {}).get('duration_minutes', 10) - # Извлекаем структуру интервью self.sections = self.interview_plan.get('interview_structure', {}).get('sections', []) self.total_sections = len(self.sections) - + + logger.info(f"[TIME] Interview started at {time.strftime('%H:%M:%S')}, duration: {self.duration_minutes} min") + def get_current_section(self) -> Dict: """Получить текущую секцию интервью""" if self.current_section < len(self.sections): return self.sections[self.current_section] return {} - + def get_next_question(self) -> str: - """Получить следующий вопрос из текущей секции""" + """Получить следующий вопрос""" section = self.get_current_section() questions = section.get('questions', []) - if self.current_question_in_section < len(questions): return questions[self.current_question_in_section] return None - + def move_to_next_question(self): """Переход к следующему вопросу""" - section = self.get_current_section() - questions = section.get('questions', []) - self.current_question_in_section += 1 self.questions_asked_total += 1 - - # Если вопросы в секции закончились, переходим к следующей - if self.current_question_in_section >= len(questions): + + section = self.get_current_section() + if self.current_question_in_section >= len(section.get('questions', [])): self.move_to_next_section() - + def move_to_next_section(self): """Переход к следующей секции""" self.current_section += 1 self.current_question_in_section = 0 - if self.current_section < len(self.sections): logger.info(f"Переход к секции: {self.sections[self.current_section].get('name', 'Unnamed')}") - + def is_interview_complete(self) -> bool: - """Проверяет, завершено ли интервью""" - return self.current_section >= len(self.sections) - - async def analyze_user_response(self, response: str, chat_model) -> Dict[str, str]: - """Анализирует ответ пользователя и решает следующий шаг""" - try: - from rag.registry import registry - - analysis_prompt = f""" - Проанализируй ответ кандидата на интервью и определи следующий шаг. - - КОНТЕКСТ: - - Текущая секция: {self.get_current_section().get('name', 'Unknown')} - - Заданный вопрос: {self.last_question} - - Ответ кандидата: {response} - - Оцени ответ и определи действие: - 1. "continue" - ответ полный, переходим к следующему вопросу - 2. "clarify" - нужно уточнить или углубить ответ - 3. "redirect" - нужно перенаправить на тему - - Ответь в JSON формате: - {{ - "action": "continue|clarify|redirect", - "reason": "Объяснение решения", - "follow_up_question": "Уточняющий вопрос если action=clarify или redirect" - }} - """ - - from langchain.schema import HumanMessage, SystemMessage - messages = [ - SystemMessage(content="Ты эксперт-аналитик интервью. Анализируй ответы объективно."), - HumanMessage(content=analysis_prompt) - ] - - response_analysis = chat_model.chat(messages) - response_text = response_analysis.content.strip() - - # Парсим JSON ответ - if response_text.startswith('{') and response_text.endswith('}'): - return json.loads(response_text) - else: - # Fallback - return { - "action": "continue", - "reason": "Не удалось проанализировать ответ", - "follow_up_question": "" - } - - except Exception as e: - logger.error(f"Ошибка анализа ответа: {str(e)}") - return { - "action": "continue", - "reason": "Ошибка анализа", - "follow_up_question": "" - } - - def _extract_questions_from_plan(self) -> List[str]: - """Извлечение вопросов из готового плана интервью""" - questions = [] - - try: - # Начинаем с приветствия из плана - greeting = self.interview_plan.get('interview_structure', {}).get('greeting', 'Привет! Готов к интервью?') - questions.append(greeting) - - # Извлекаем вопросы из секций - sections = self.interview_plan.get('interview_structure', {}).get('sections', []) - - for section in sections: - section_questions = section.get('questions', []) - questions.extend(section_questions) - - return questions - - except Exception as e: - logger.error(f"Ошибка извлечения вопросов из плана: {str(e)}") - # Fallback вопросы - return [ - "Привет! Расскажи немного о себе", - "Какой у тебя опыт работы?", - "Что тебя привлекает в этой позиции?", - "Есть ли у тебя вопросы ко мне?" - ] - + """Интервью завершается только по решению LLM через ключевые фразы""" + return False # LLM теперь решает через ключевые фразы + def get_system_instructions(self) -> str: - """Системные инструкции для AI агента""" + """Системные инструкции для AI агента с ключевыми фразами для завершения""" candidate_info = self.interview_plan.get('candidate_info', {}) interview_structure = self.interview_plan.get('interview_structure', {}) - focus_areas = self.interview_plan.get('focus_areas', []) - greeting = interview_structure.get('greeting', 'Привет! Готов к интервью?') + focus_areas = self.interview_plan.get('focus_areas', []) + key_evaluation_points = self.interview_plan.get('key_evaluation_points', []) - current_section = self.get_current_section() - current_section_name = current_section.get('name', 'Неизвестно') - progress = f"{self.current_section + 1}/{len(self.sections)}" + # Вычисляем текущее время интервью + import time + elapsed_minutes = (time.time() - self.interview_start_time) / 60 + remaining_minutes = max(0, self.duration_minutes - elapsed_minutes) + time_percentage = min(100, (elapsed_minutes / self.duration_minutes) * 100) - return f"""Ты опытный HR-интервьюер, который проводит структурированное голосовое собеседование. + # Формируем план интервью для агента + sections_info = "\n".join([ + f"- {section.get('name', 'Секция')}: {', '.join(section.get('questions', []))}" + for section in self.sections + ]) + + # Безопасно формируем строки для избежания конфликтов с кавычками + candidate_name = candidate_info.get('name', 'Кандидат') + candidate_years = candidate_info.get('total_years', 0) + candidate_skills = ', '.join(candidate_info.get('skills', [])) + focus_areas_str = ', '.join(focus_areas) + evaluation_points_str = ', '.join(key_evaluation_points) + + # Статус времени + if time_percentage > 90: + time_status = 'СРОЧНО ЗАВЕРШАТЬ' + elif time_percentage > 75: + time_status = 'ВРЕМЯ ЗАКАНЧИВАЕТСЯ' + else: + time_status = 'НОРМАЛЬНО' + + return f"""Ты опытный HR-интервьюер, который проводит адаптивное голосовое собеседование. ИНФОРМАЦИЯ О КАНДИДАТЕ: -- Имя: {candidate_info.get('name', 'Кандидат')} -- Опыт работы: {candidate_info.get('total_years', 0)} лет -- Ключевые навыки: {', '.join(candidate_info.get('skills', []))} +- Имя: {candidate_name} +- Опыт работы: {candidate_years} лет +- Ключевые навыки: {candidate_skills} -ТЕКУЩЕЕ СОСТОЯНИЕ ИНТЕРВЬЮ: -- Прогресс: {progress} секций -- Текущая секция: {current_section_name} -- Вопросов задано: {self.questions_asked_total} +ПЛАН ИНТЕРВЬЮ (используй как руководство, но адаптируйся): +{sections_info} -ПЛАН ИНТЕРВЬЮ: -{json.dumps(interview_structure.get('sections', []), ensure_ascii=False, indent=2)} +ВРЕМЯ ИНТЕРВЬЮ: +- Запланированная длительность: {self.duration_minutes} минут +- Прошло времени: {elapsed_minutes:.1f} минут ({time_percentage:.0f}%) +- Осталось времени: {remaining_minutes:.1f} минут +- Статус времени: {time_status} -ТВОЯ ЗАДАЧА: -1. Веди живое интерактивное интервью -2. Анализируй каждый ответ кандидата -3. Принимай решения: - - Если ответ полный и достаточный → переходи к следующему вопросу - - Если ответ поверхностный → задавай уточняющие вопросы - - Если кандидат ушел от темы → мягко возвращай к вопросу -4. Поддерживай естественный диалог +ФОКУС-ОБЛАСТИ: {focus_areas_str} +КЛЮЧЕВЫЕ ОЦЕНОЧНЫЕ ТОЧКИ: {evaluation_points_str} -ПРАВИЛА ВЕДЕНИЯ ДИАЛОГА: -✅ Говори только на русском языке -✅ Задавай один вопрос за раз и жди ответа -✅ Анализируй качество и полноту каждого ответа -✅ Адаптируй следующие вопросы под полученные ответы -✅ Показывай искреннюю заинтересованность -✅ Если ответ неполный - углубляйся: "Расскажи подробнее...", "А как именно ты..." -✅ При переходе между секциями делай плавные переходы -✅ Завершай интервью благодарностью и следующими шагами +ИНСТРУКЦИИ: +1. Начни с приветствия: {greeting} +2. Адаптируй вопросы под ответы кандидата +3. Следи за временем - при превышении 80% времени начинай завершать интервью +4. Оценивай качество и глубину ответов кандидата +5. Завершай интервью если: + - Получил достаточно информации для оценки + - Время почти истекло (>90% от запланированного) + - Кандидат дал исчерпывающие ответы +6. При завершении спроси о вопросах кандидата и поблагодари -ПРИМЕРЫ РЕАКЦИЙ НА ОТВЕТЫ: -- Короткий ответ: "Интересно! А можешь рассказать конкретный пример?" -- Хороший ответ: "Отлично! Давай перейдем к следующему вопросу..." -- Уход от темы: "Понимаю, но давай вернемся к..." +ВАЖНО: Отвечай естественно и разговорно, как живой интервьюер! -НАЧНИ С ПРИВЕТСТВИЯ: {greeting} +ЗАВЕРШЕНИЕ ИНТЕРВЬЮ: +Когда нужно завершить интервью (время истекло, получена достаточная информация), +используй одну из этих ключевых фраз: +- "Спасибо за интересную беседу! У тебя есть вопросы ко мне?" +- "Это всё, что я хотел узнать. Есть ли у вас вопросы?" +- "Интервью подходит к концу. У тебя есть вопросы ко мне?" +ФИНАЛЬНАЯ ФРАЗА после которой ничего не будет: +- До скорой встречи! + +ЗАВЕРШАЙ ИНТЕРВЬЮ, если: +- Прошло >80% времени И получил основную информацию +- Кандидат дал полные ответы по всем ключевым областям +- Возникли технические проблемы или кандидат просит завершить + +СТИЛЬ: Дружелюбный, профессиональный, заинтересованный в кандидате. """ + def get_time_info(self) -> Dict[str, float]: + """Получает информацию о времени интервью""" + import time + elapsed_minutes = (time.time() - self.interview_start_time) / 60 + remaining_minutes = max(0.0, self.duration_minutes - elapsed_minutes) + time_percentage = min(100.0, (elapsed_minutes / self.duration_minutes) * 100) + + return { + "elapsed_minutes": elapsed_minutes, + "remaining_minutes": remaining_minutes, + "time_percentage": time_percentage, + "duration_minutes": self.duration_minutes + } + + async def track_interview_progress(self, user_response: str) -> Dict[str, any]: + """Трекает прогресс интервью для логирования""" + current_section = self.get_current_section() + time_info = self.get_time_info() + + return { + "section": current_section.get('name', 'Unknown'), + "questions_asked": self.questions_asked_total, + "section_progress": f"{self.current_section + 1}/{len(self.sections)}", + "user_response_length": len(user_response), + "elapsed_minutes": f"{time_info['elapsed_minutes']:.1f}", + "remaining_minutes": f"{time_info['remaining_minutes']:.1f}", + "time_percentage": f"{time_info['time_percentage']:.0f}%" + } + async def entrypoint(ctx: JobContext): """Точка входа для AI агента""" - logger.info("Starting AI Interviewer Agent") + logger.info("[INIT] Starting AI Interviewer Agent") + logger.info(f"[INIT] Room: {ctx.room.name}") + + # План интервью - получаем из переменной окружения + room_metadata = os.environ.get("LIVEKIT_ROOM_METADATA", ctx.room.metadata or "{}") - # Получаем данные о резюме из метаданных комнаты - room_metadata = ctx.room.metadata if ctx.room.metadata else "{}" try: metadata = json.loads(room_metadata) interview_plan = metadata.get("interview_plan", {}) - if not hasattr(interview_plan, 'interview_structure'): - raise ValueError - except: - # Fallback план для тестирования + if not interview_plan: + interview_plan = {} + except Exception as e: + logger.warning(f"[INIT] Failed to parse metadata: {str(e)}") + interview_plan = {} + + # Используем дефолтный план если план пустой или нет секций + if not interview_plan or not interview_plan.get("interview_structure", {}).get("sections"): + logger.info(f"[INIT] Using default interview plan") interview_plan = { "interview_structure": { - "duration_minutes": 30, - "greeting": "Привет! Готов к тестовому интервью?", + "duration_minutes": 2, # ТЕСТОВЫЙ РЕЖИМ - 2 минуты + "greeting": "Привет! Это быстрое тестовое интервью на 2 минуты. Готов?", "sections": [ - { - "name": "Знакомство", - "duration_minutes": 10, - "questions": ["Расскажи о себе", "Что тебя привлекло в этой позиции?"] - }, - { - "name": "Технические навыки", - "duration_minutes": 15, - "questions": ["Расскажи о своем опыте с Python", "Какие проекты разрабатывал?"] - }, - { - "name": "Вопросы кандидата", - "duration_minutes": 5, - "questions": ["Есть ли у тебя вопросы ко мне?"] - } + {"name": "Знакомство", "duration_minutes": 1, "questions": ["Расскажи кратко о себе одним предложением"]}, + {"name": "Завершение", "duration_minutes": 1, "questions": ["Спасибо! Есть вопросы ко мне?"]} ] }, - "focus_areas": ["technical_skills", "experience"], - "candidate_info": { - "name": "Тестовый кандидат", - "skills": ["Python", "React", "PostgreSQL"], - "total_years": 3, - "education": "Высшее техническое" - } + "candidate_info": {"name": "Тестовый кандидат", "skills": ["Python", "React"], "total_years": 3}, + "focus_areas": ["quick_test"], + "key_evaluation_points": ["Коммуникация"] } - - logger.info(f"Interview plan: {interview_plan}") - - # Создаем интервьюера с планом + interviewer = InterviewAgent(interview_plan) - - # Настройка STT (Speech-to-Text) - if hasattr(settings, 'deepgram_api_key') and settings.deepgram_api_key: - stt = deepgram.STT( - model="nova-2-general", - language="ru", # Русский язык - api_key=settings.deepgram_api_key - ) - else: - # Fallback на OpenAI Whisper - stt = openai.STT( - model="whisper-1", - language="ru", - api_key=settings.openai_api_key - ) - - # Настройка LLM - llm = openai.LLM( - model="gpt-4o-mini", - api_key=settings.openai_api_key, - temperature=0.7, - ) - - # Настройка TTS (Text-to-Speech) - if hasattr(settings, 'resemble_api_key') and settings.resemble_api_key: - tts = resemble.TTS( - voice_uuid="55592656", - api_key=settings.resemble_api_key - ) - else: - # Fallback на локальный TTS - tts = silero.TTS( - language="ru", - model="v4_ru" - ) - - # Создание агента с системными инструкциями - agent = Agent( - instructions=interviewer.get_system_instructions() - ) - - # Создание сессии агента - session = AgentSession( - vad=silero.VAD.load(), # Voice Activity Detection - stt=stt, - llm=llm, - tts=tts, - ) - - # Добавляем обработчики событий с управлением диалогом - @session.on("user_speech_committed") - def on_user_speech(msg): - """Синхронный callback. Внутри создаётся async-задача.""" + logger.info(f"[INIT] InterviewAgent created with {len(interviewer.sections)} sections") - async def handler(): - user_response = msg.content - logger.info(f"User said: {user_response}") + # STT + stt = deepgram.STT(model="nova-2-general", language="ru", api_key=settings.deepgram_api_key) \ + if settings.deepgram_api_key else openai.STT(model="whisper-1", language="ru", api_key=settings.openai_api_key) - # Сохраняем историю - interviewer.conversation_history.append({ - "role": "user", - "content": user_response, - "timestamp": datetime.utcnow().isoformat(), - "section": interviewer.get_current_section().get('name', 'Unknown') - }) + # LLM + llm = openai.LLM(model="gpt-4o-mini", api_key=settings.openai_api_key, temperature=0.7) - interviewer.last_user_response = user_response - interviewer.waiting_for_response = False + # TTS + tts = cartesia.TTS(model="sonic-turbo", language="ru", voice='da05e96d-ca10-4220-9042-d8acef654fa9', + api_key=settings.cartesia_api_key) if settings.cartesia_api_key else silero.TTS(language="ru", model="v4_ru") + # Создаем обычный Agent и Session + agent = Agent(instructions=interviewer.get_system_instructions()) + + # Создаем AgentSession с обычным TTS + session = AgentSession(vad=silero.VAD.load(), stt=stt, llm=llm, tts=tts) + + # --- Сохранение диалога в БД --- + async def save_dialogue_to_db(room_name: str, dialogue_history: list): + try: + session_generator = get_session() + db = await anext(session_generator) try: - # Анализ ответа - analysis = await interviewer.analyze_user_response(user_response, llm) - action = analysis.get("action", "continue") + interview_repo = InterviewRepository(db) + resume_repo = ResumeRepository(db) + finalization_service = InterviewFinalizationService(interview_repo, resume_repo) + success = await finalization_service.save_dialogue_to_session(room_name, dialogue_history) + if not success: + logger.warning(f"[DB] Failed to save dialogue for room: {room_name}") + finally: + await session_generator.aclose() + except Exception as e: + logger.error(f"[DB] Error saving dialogue: {str(e)}") - logger.info(f"Response analysis: {action} - {analysis.get('reason', 'No reason')}") + # --- Логика завершения интервью --- + async def finalize_interview(room_name: str, interviewer_instance): + """Завершение интервью и запуск анализа""" + + # Проверяем, не завершено ли уже интервью + if interviewer_instance.interview_finalized: + logger.info(f"[FINALIZE] Interview already finalized for room: {room_name}") + return + + interviewer_instance.interview_finalized = True + + try: + logger.info(f"[FINALIZE] Starting interview finalization for room: {room_name}") + + # Собираем метрики интервью + time_info = interviewer_instance.get_time_info() + interview_metrics = { + "total_messages": interviewer_instance.questions_asked_total, + "dialogue_length": len(interviewer_instance.conversation_history), + "elapsed_minutes": time_info['elapsed_minutes'], + "planned_duration": time_info['duration_minutes'], + "time_percentage": time_info['time_percentage'] + } + + session_generator = get_session() + db = await anext(session_generator) + try: + interview_repo = InterviewRepository(db) + resume_repo = ResumeRepository(db) + finalization_service = InterviewFinalizationService(interview_repo, resume_repo) + + # Используем сервис для завершения интервью + result = await finalization_service.finalize_interview( + room_name=room_name, + dialogue_history=interviewer_instance.conversation_history, + interview_metrics=interview_metrics + ) + + if result: + logger.info(f"[FINALIZE] Interview successfully finalized: session_id={result['session_id']}, task_id={result['analysis_task_id']}") - if action == "continue": - interviewer.move_to_next_question() - - if not interviewer.is_interview_complete(): - next_question = interviewer.get_next_question() - if next_question: - await session.say(next_question) - interviewer.last_question = next_question - interviewer.waiting_for_response = True - else: - await session.say( - "Спасибо за интервью! Это все вопросы, которые я хотел задать. " - "В ближайшее время мы свяжемся с тобой по результатам." - ) - - elif action in ["clarify", "redirect"]: - follow_up = analysis.get("follow_up_question", "Можешь рассказать подробнее?") - await session.say(follow_up) - interviewer.waiting_for_response = True + else: + logger.error(f"[FINALIZE] Failed to finalize interview for room: {room_name}") + finally: + await session_generator.aclose() + except Exception as e: + logger.error(f"[FINALIZE] Error finalizing interview: {str(e)}") + # --- Проверка завершения интервью по ключевым фразам --- + async def check_interview_completion_by_keywords(agent_text: str): + """Проверяет завершение интервью по ключевым фразам""" + # Ключевые фразы для завершения интервью + ending_keywords = [ + "До скорой встречи" + ] + + text_lower = agent_text.lower() + + for keyword in ending_keywords: + if keyword.lower() in text_lower: + logger.info(f"[KEYWORD_DETECTION] Found ending keyword: '{keyword}' in agent response") + + if not interviewer.interview_finalized: + # Запускаем полную цепочку завершения интервью + await complete_interview_sequence(ctx.room.name, interviewer) + return True + break + + return False + + # --- Полная цепочка завершения интервью --- + async def complete_interview_sequence(room_name: str, interviewer_instance): + """ + Полная цепочка завершения интервью: + 1. Финализация диалога в БД + 2. Закрытие комнаты LiveKit + 3. Завершение процесса агента + """ + try: + logger.info("[SEQUENCE] Starting interview completion sequence") + + # Шаг 1: Финализируем интервью в БД + logger.info("[SEQUENCE] Step 1: Finalizing interview in database") + await finalize_interview(room_name, interviewer_instance) + logger.info("[SEQUENCE] Step 1: Database finalization completed") + + # Даём время на завершение всех DB операций + await asyncio.sleep(1) + + # Шаг 2: Закрываем комнату LiveKit + logger.info("[SEQUENCE] Step 2: Closing LiveKit room") + try: + await close_room(room_name) + logger.info(f"[SEQUENCE] Step 2: Room {room_name} closed successfully") except Exception as e: - logger.error(f"Ошибка обработки ответа пользователя: {str(e)}") - interviewer.move_to_next_question() + logger.error(f"[SEQUENCE] Step 2: Failed to close room: {str(e)}") + logger.info("[SEQUENCE] Step 2: Room closure failed, but continuing sequence") + + # Шаг 3: Завершаем процесс агента + logger.info("[SEQUENCE] Step 3: Terminating agent process") + await asyncio.sleep(2) # Даём время на завершение всех операций + logger.info("[SEQUENCE] Step 3: Force terminating agent process") + import os + os._exit(0) # Принудительное завершение процесса + + except Exception as e: + logger.error(f"[SEQUENCE] Error in interview completion sequence: {str(e)}") + # Fallback: принудительно завершаем процесс даже при ошибках + logger.info("[SEQUENCE] Fallback: Force terminating process") + await asyncio.sleep(1) + import os + os._exit(1) - # запускаем асинхронный обработчик - asyncio.create_task(handler()) - - @session.on("agent_speech_committed") - def on_agent_speech(msg): - """Обработка речи агента""" - agent_response = msg.content - logger.info(f"Agent said: {agent_response}") + + # --- Упрощенная логика обработки пользовательского ответа --- + async def handle_user_input(user_response: str): + current_section = interviewer.get_current_section() - # Сохраняем в историю - interviewer.conversation_history.append({ - "role": "assistant", - "content": agent_response, + # Сохраняем ответ пользователя + dialogue_message = { + "role": "user", + "content": str(user_response).encode('utf-8').decode('utf-8'), # Принудительное UTF-8 "timestamp": datetime.utcnow().isoformat(), - "section": interviewer.get_current_section().get('name', 'Unknown') - }) + "section": current_section.get('name', 'Unknown') + } + interviewer.conversation_history.append(dialogue_message) + await save_dialogue_to_db(ctx.room.name, interviewer.conversation_history) - # Если это вопрос, обновляем состояние - if "?" in agent_response: - interviewer.last_question = agent_response - interviewer.waiting_for_response = True - - # Запускаем сессию агента + # Обновляем прогресс интервью + if not interviewer.intro_done: + interviewer.intro_done = True + + # Обновляем счетчик сообщений и треким время + interviewer.questions_asked_total += 1 + progress_info = await interviewer.track_interview_progress(user_response) + logger.info(f"[PROGRESS] Messages: {progress_info['questions_asked']}, Time: {progress_info['elapsed_minutes']}min/{progress_info['time_percentage']}") + + + # Обновляем инструкции агента с текущим прогрессом + try: + updated_instructions = interviewer.get_system_instructions() + await agent.update_instructions(updated_instructions) + except Exception as e: + logger.error(f"[ERROR] Failed to update instructions: {str(e)}") + + @session.on("conversation_item_added") + def on_conversation_item(event): + role = event.item.role + text = event.item.text_content + + if role == "user": + asyncio.create_task(handle_user_input(text)) + elif role == "assistant": + # Сохраняем ответ агента в историю диалога + current_section = interviewer.get_current_section() + interviewer.conversation_history.append({ + "role": "assistant", + "content": str(text).encode('utf-8').decode('utf-8'), # Принудительное UTF-8 + "timestamp": datetime.utcnow().isoformat(), + "section": current_section.get('name', 'Unknown') + }) + + # Сохраняем диалог в БД + asyncio.create_task(save_dialogue_to_db(ctx.room.name, interviewer.conversation_history)) + + # Проверяем ключевые фразы для завершения интервью + asyncio.create_task(check_interview_completion_by_keywords(text)) + + await session.start(agent=agent, room=ctx.room) - - # Приветственное сообщение - # В новой версии приветствие будет автоматически отправлено из системных инструкций - - logger.info("AI Interviewer started successfully") + logger.info("[INIT] AI Interviewer started") def main(): - """Запуск агента""" logging.basicConfig(level=logging.INFO) - - # Настройки воркера - worker_options = WorkerOptions( - entrypoint_fnc=entrypoint, - ) - - # Запуск через CLI - cli.run_app(worker_options) + asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) # фикс для Windows + cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint)) if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/app/core/database.py b/app/core/database.py index 8321fc0..eccf075 100644 --- a/app/core/database.py +++ b/app/core/database.py @@ -1,25 +1,60 @@ -from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker +from typing import AsyncGenerator, Generator +from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine +from sqlalchemy.orm import sessionmaker, Session +from sqlalchemy import create_engine from sqlmodel import SQLModel + from .config import settings -engine = create_async_engine( +# Async engine для FastAPI +async_engine = create_async_engine( settings.database_url, echo=settings.debug, - future=True + future=True, + pool_size=10, + max_overflow=20, + pool_timeout=30, + pool_pre_ping=True, ) -async_session_maker = async_sessionmaker( - bind=engine, +async_session = sessionmaker( + async_engine, + expire_on_commit=False, class_=AsyncSession, - expire_on_commit=False +) + +# Sync engine для Celery и других синхронных операций +sync_engine = create_engine( + settings.database_url.replace("asyncpg", "psycopg2"), + echo=settings.debug, + future=True, + pool_size=10, + max_overflow=20, + pool_timeout=30, + pool_pre_ping=True, + pool_recycle=3600, +) + +sync_session = sessionmaker( + bind=sync_engine, + expire_on_commit=False, + class_=Session, ) -async def get_session() -> AsyncSession: - async with async_session_maker() as session: +async def get_session() -> AsyncGenerator[AsyncSession, None]: + """Async session для FastAPI DI""" + async with async_session() as session: + yield session + + +def get_sync_session() -> Generator[Session, None, None]: + """Sync session для Celery и других синхронных операций""" + with sync_session() as session: yield session async def create_db_and_tables(): - async with engine.begin() as conn: - await conn.run_sync(SQLModel.metadata.create_all) + """Создать таблицы в БД""" + async with async_engine.begin() as conn: + await conn.run_sync(SQLModel.metadata.create_all) \ No newline at end of file diff --git a/app/core/session_middleware.py b/app/core/session_middleware.py index 6dec629..361069e 100644 --- a/app/core/session_middleware.py +++ b/app/core/session_middleware.py @@ -3,7 +3,7 @@ from fastapi.responses import JSONResponse from starlette.middleware.base import BaseHTTPMiddleware from starlette.types import ASGIApp from sqlalchemy.ext.asyncio import AsyncSession -from app.core.database import async_session_maker +from app.core.database import get_session from app.repositories.session_repository import SessionRepository from app.models.session import Session import logging @@ -32,7 +32,7 @@ class SessionMiddleware(BaseHTTPMiddleware): try: # Работаем с БД в рамках одной async сессии - async with async_session_maker() as db_session: + async for db_session in get_session(): session_repo = SessionRepository(db_session) # Проверяем существующую сессию @@ -56,6 +56,7 @@ class SessionMiddleware(BaseHTTPMiddleware): # Добавляем сессию в контекст запроса request.state.session = session_obj + break except Exception as e: logger.error(f"Session middleware error: {e}") @@ -83,10 +84,4 @@ class SessionMiddleware(BaseHTTPMiddleware): async def get_current_session(request: Request) -> Session: """Получить текущую сессию из контекста запроса""" - return getattr(request.state, 'session', None) - - -async def get_db_session() -> AsyncSession: - """Получить новую сессию БД для использования в эндпоинтах""" - async with async_session_maker() as session: - yield session \ No newline at end of file + return getattr(request.state, 'session', None) \ No newline at end of file diff --git a/app/models/__init__.py b/app/models/__init__.py index 74165fb..f482d14 100644 --- a/app/models/__init__.py +++ b/app/models/__init__.py @@ -1,6 +1,21 @@ from .vacancy import Vacancy, VacancyCreate, VacancyUpdate, VacancyRead from .resume import Resume, ResumeCreate, ResumeUpdate, ResumeRead from .session import Session, SessionCreate, SessionRead +from .interview import ( + InterviewSession, + InterviewSessionCreate, + InterviewSessionUpdate, + InterviewSessionRead, + InterviewStatus +) +from .interview_report import ( + InterviewReport, + InterviewReportCreate, + InterviewReportUpdate, + InterviewReportRead, + InterviewReportSummary, + RecommendationType +) __all__ = [ "Vacancy", @@ -14,4 +29,15 @@ __all__ = [ "Session", "SessionCreate", "SessionRead", + "InterviewSession", + "InterviewSessionCreate", + "InterviewSessionUpdate", + "InterviewSessionRead", + "InterviewStatus", + "InterviewReport", + "InterviewReportCreate", + "InterviewReportUpdate", + "InterviewReportRead", + "InterviewReportSummary", + "RecommendationType", ] \ No newline at end of file diff --git a/app/models/interview.py b/app/models/interview.py index 943c8f4..eda05da 100644 --- a/app/models/interview.py +++ b/app/models/interview.py @@ -1,7 +1,7 @@ -from sqlmodel import SQLModel, Field, Column -from sqlalchemy import Enum as SQLEnum, String +from sqlmodel import SQLModel, Field, Column, Relationship +from sqlalchemy import Enum as SQLEnum, JSON from datetime import datetime -from typing import Optional +from typing import Optional, List, Dict, Any from enum import Enum @@ -18,12 +18,10 @@ class InterviewStatus(str, Enum): class InterviewSessionBase(SQLModel): resume_id: int = Field(foreign_key="resume.id") room_name: str = Field(max_length=255, unique=True) - status: str = Field( - default="created", - sa_column=Column(SQLEnum('created', 'active', 'completed', 'failed', name="interviewstatus")) - ) + status: str = Field(default="created", max_length=50) transcript: Optional[str] = None ai_feedback: Optional[str] = None + dialogue_history: Optional[List[Dict[str, Any]]] = Field(default=None, sa_column=Column(JSON)) # Добавляем отслеживание AI процесса ai_agent_pid: Optional[int] = None ai_agent_status: str = Field(default="not_started") # not_started, running, stopped, failed @@ -35,6 +33,10 @@ class InterviewSession(InterviewSessionBase, table=True): id: Optional[int] = Field(default=None, primary_key=True) started_at: datetime = Field(default_factory=datetime.utcnow) completed_at: Optional[datetime] = None + + # Связь с отчетом (один к одному) + report: Optional["InterviewReport"] = Relationship(back_populates="interview_session") + class InterviewSessionCreate(SQLModel): @@ -47,6 +49,7 @@ class InterviewSessionUpdate(SQLModel): completed_at: Optional[datetime] = None transcript: Optional[str] = None ai_feedback: Optional[str] = None + dialogue_history: Optional[List[Dict[str, Any]]] = None class InterviewSessionRead(InterviewSessionBase): diff --git a/app/models/interview_report.py b/app/models/interview_report.py new file mode 100644 index 0000000..6a2a6e5 --- /dev/null +++ b/app/models/interview_report.py @@ -0,0 +1,207 @@ +# -*- coding: utf-8 -*- +from sqlmodel import SQLModel, Field, Column, Relationship +from sqlalchemy import JSON, String, Integer, Float, Text +from datetime import datetime +from typing import Optional, List, Dict, Any +from enum import Enum + + +class RecommendationType(str, Enum): + STRONGLY_RECOMMEND = "strongly_recommend" + RECOMMEND = "recommend" + CONSIDER = "consider" + REJECT = "reject" + + def __str__(self): + return self.value + + +class InterviewReportBase(SQLModel): + """Базовая модель отчета по интервью""" + interview_session_id: int = Field(foreign_key="interview_sessions.id", unique=True) + + # Основные критерии оценки (0-100) + technical_skills_score: int = Field(ge=0, le=100) + technical_skills_justification: Optional[str] = Field(default=None, max_length=1000) + technical_skills_concerns: Optional[str] = Field(default=None, max_length=500) + + experience_relevance_score: int = Field(ge=0, le=100) + experience_relevance_justification: Optional[str] = Field(default=None, max_length=1000) + experience_relevance_concerns: Optional[str] = Field(default=None, max_length=500) + + communication_score: int = Field(ge=0, le=100) + communication_justification: Optional[str] = Field(default=None, max_length=1000) + communication_concerns: Optional[str] = Field(default=None, max_length=500) + + problem_solving_score: int = Field(ge=0, le=100) + problem_solving_justification: Optional[str] = Field(default=None, max_length=1000) + problem_solving_concerns: Optional[str] = Field(default=None, max_length=500) + + cultural_fit_score: int = Field(ge=0, le=100) + cultural_fit_justification: Optional[str] = Field(default=None, max_length=1000) + cultural_fit_concerns: Optional[str] = Field(default=None, max_length=500) + + # Агрегированные поля + overall_score: int = Field(ge=0, le=100) + recommendation: RecommendationType + + # Дополнительные поля для анализа + strengths: Optional[List[str]] = Field(default=None, sa_column=Column(JSON)) + weaknesses: Optional[List[str]] = Field(default=None, sa_column=Column(JSON)) + red_flags: Optional[List[str]] = Field(default=None, sa_column=Column(JSON)) + + # Метрики интервью + questions_quality_score: Optional[float] = Field(default=None, ge=0, le=10) # Средняя оценка ответов + interview_duration_minutes: Optional[int] = Field(default=None, ge=0) + response_count: Optional[int] = Field(default=None, ge=0) + dialogue_messages_count: Optional[int] = Field(default=None, ge=0) + + # Дополнительная информация + next_steps: Optional[str] = Field(default=None, max_length=1000) + interviewer_notes: Optional[str] = Field(default=None, sa_column=Column(Text)) + + # Детальный анализ вопросов (JSON) + questions_analysis: Optional[List[Dict[str, Any]]] = Field(default=None, sa_column=Column(JSON)) + + # Метаданные анализа + analysis_method: Optional[str] = Field(default="openai_gpt4", max_length=50) # openai_gpt4, fallback_heuristic + llm_model_used: Optional[str] = Field(default=None, max_length=100) + analysis_duration_seconds: Optional[int] = Field(default=None, ge=0) + + +class InterviewReport(InterviewReportBase, table=True): + """Полный отчет по интервью с ID и временными метками""" + __tablename__ = "interview_reports" + + id: Optional[int] = Field(default=None, primary_key=True) + created_at: datetime = Field(default_factory=datetime.utcnow) + updated_at: datetime = Field(default_factory=datetime.utcnow) + + # Связь с сессией интервью + interview_session: Optional["InterviewSession"] = Relationship(back_populates="report") + + +class InterviewReportCreate(SQLModel): + """Модель для создания отчета""" + interview_session_id: int + + technical_skills_score: int = Field(ge=0, le=100) + technical_skills_justification: Optional[str] = None + technical_skills_concerns: Optional[str] = None + + experience_relevance_score: int = Field(ge=0, le=100) + experience_relevance_justification: Optional[str] = None + experience_relevance_concerns: Optional[str] = None + + communication_score: int = Field(ge=0, le=100) + communication_justification: Optional[str] = None + communication_concerns: Optional[str] = None + + problem_solving_score: int = Field(ge=0, le=100) + problem_solving_justification: Optional[str] = None + problem_solving_concerns: Optional[str] = None + + cultural_fit_score: int = Field(ge=0, le=100) + cultural_fit_justification: Optional[str] = None + cultural_fit_concerns: Optional[str] = None + + overall_score: int = Field(ge=0, le=100) + recommendation: RecommendationType + + strengths: Optional[List[str]] = None + weaknesses: Optional[List[str]] = None + red_flags: Optional[List[str]] = None + + questions_quality_score: Optional[float] = None + interview_duration_minutes: Optional[int] = None + response_count: Optional[int] = None + dialogue_messages_count: Optional[int] = None + + next_steps: Optional[str] = None + interviewer_notes: Optional[str] = None + questions_analysis: Optional[List[Dict[str, Any]]] = None + + analysis_method: Optional[str] = "openai_gpt4" + llm_model_used: Optional[str] = None + analysis_duration_seconds: Optional[int] = None + + +class InterviewReportUpdate(SQLModel): + """Модель для обновления отчета""" + technical_skills_score: Optional[int] = Field(default=None, ge=0, le=100) + technical_skills_justification: Optional[str] = None + technical_skills_concerns: Optional[str] = None + + experience_relevance_score: Optional[int] = Field(default=None, ge=0, le=100) + experience_relevance_justification: Optional[str] = None + experience_relevance_concerns: Optional[str] = None + + communication_score: Optional[int] = Field(default=None, ge=0, le=100) + communication_justification: Optional[str] = None + communication_concerns: Optional[str] = None + + problem_solving_score: Optional[int] = Field(default=None, ge=0, le=100) + problem_solving_justification: Optional[str] = None + problem_solving_concerns: Optional[str] = None + + cultural_fit_score: Optional[int] = Field(default=None, ge=0, le=100) + cultural_fit_justification: Optional[str] = None + cultural_fit_concerns: Optional[str] = None + + overall_score: Optional[int] = Field(default=None, ge=0, le=100) + recommendation: Optional[RecommendationType] = None + + strengths: Optional[List[str]] = None + weaknesses: Optional[List[str]] = None + red_flags: Optional[List[str]] = None + + questions_quality_score: Optional[float] = None + interview_duration_minutes: Optional[int] = None + response_count: Optional[int] = None + dialogue_messages_count: Optional[int] = None + + next_steps: Optional[str] = None + interviewer_notes: Optional[str] = None + questions_analysis: Optional[List[Dict[str, Any]]] = None + + analysis_method: Optional[str] = None + llm_model_used: Optional[str] = None + analysis_duration_seconds: Optional[int] = None + + +class InterviewReportRead(InterviewReportBase): + """Модель для чтения отчета с ID и временными метками""" + id: int + created_at: datetime + updated_at: datetime + + +class InterviewReportSummary(SQLModel): + """Краткая сводка отчета для списков""" + id: int + interview_session_id: int + overall_score: int + recommendation: RecommendationType + created_at: datetime + + # Основные баллы + technical_skills_score: int + experience_relevance_score: int + communication_score: int + problem_solving_score: int + cultural_fit_score: int + + # Краткие выводы + strengths: Optional[List[str]] = None + red_flags: Optional[List[str]] = None + + +# Индексы для эффективных запросов по скорингу +""" +Полезные SQL индексы: +CREATE INDEX idx_interview_reports_overall_score ON interview_reports (overall_score DESC); +CREATE INDEX idx_interview_reports_recommendation ON interview_reports (recommendation); +CREATE INDEX idx_interview_reports_technical_skills ON interview_reports (technical_skills_score DESC); +CREATE INDEX idx_interview_reports_communication ON interview_reports (communication_score DESC); +CREATE INDEX idx_interview_reports_session_id ON interview_reports (interview_session_id); +""" \ No newline at end of file diff --git a/app/models/resume.py b/app/models/resume.py index b21442c..69ec59a 100644 --- a/app/models/resume.py +++ b/app/models/resume.py @@ -56,8 +56,8 @@ class ResumeUpdate(SQLModel): status: Optional[ResumeStatus] = None interview_report_url: Optional[str] = None notes: Optional[str] = None - parsed_data: Optional[dict] = Field(default=None, sa_column=Column(JSON)) - interview_plan: Optional[dict] = Field(default=None, sa_column=Column(JSON)) + parsed_data: Optional[dict] = None + interview_plan: Optional[dict] = None parse_error: Optional[str] = None diff --git a/app/repositories/__init__.py b/app/repositories/__init__.py index a4e76c0..2daa018 100644 --- a/app/repositories/__init__.py +++ b/app/repositories/__init__.py @@ -1,4 +1,5 @@ from .vacancy_repository import VacancyRepository from .resume_repository import ResumeRepository +from .interview_repository import InterviewRepository -__all__ = ["VacancyRepository", "ResumeRepository"] \ No newline at end of file +__all__ = ["VacancyRepository", "ResumeRepository", "InterviewRepository"] \ No newline at end of file diff --git a/app/repositories/base_repository.py b/app/repositories/base_repository.py index daee58a..adab572 100644 --- a/app/repositories/base_repository.py +++ b/app/repositories/base_repository.py @@ -1,49 +1,54 @@ -from typing import TypeVar, Generic, Optional, List, Type +from typing import TypeVar, Generic, Optional, List, Type, Annotated from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, update, delete from sqlmodel import SQLModel +from fastapi import Depends +from app.core.database import get_session ModelType = TypeVar("ModelType", bound=SQLModel) class BaseRepository(Generic[ModelType]): - def __init__(self, model: Type[ModelType], session: AsyncSession): + def __init__(self, model: Type[ModelType], session: Annotated[AsyncSession, Depends(get_session)]): self.model = model - self.session = session + self._session = session async def create(self, obj_in: ModelType) -> ModelType: db_obj = self.model.model_validate(obj_in) - self.session.add(db_obj) - await self.session.commit() - await self.session.refresh(db_obj) + self._session.add(db_obj) + await self._session.commit() + await self._session.refresh(db_obj) return db_obj async def get(self, id: int) -> Optional[ModelType]: statement = select(self.model).where(self.model.id == id) - result = await self.session.execute(statement) + result = await self._session.execute(statement) return result.scalar_one_or_none() async def get_all(self, skip: int = 0, limit: int = 100) -> List[ModelType]: statement = select(self.model).offset(skip).limit(limit) - result = await self.session.execute(statement) + result = await self._session.execute(statement) return result.scalars().all() async def update(self, id: int, obj_in: dict) -> Optional[ModelType]: - statement = ( - update(self.model) - .where(self.model.id == id) - .values(**obj_in) - .returning(self.model) + # Получаем объект и обновляем его напрямую + result = await self._session.execute( + select(self.model).where(self.model.id == id) ) - result = await self.session.execute(statement) db_obj = result.scalar_one_or_none() - if db_obj: - await self.session.commit() - await self.session.refresh(db_obj) + + if not db_obj: + return None + + for key, value in obj_in.items(): + setattr(db_obj, key, value) + + await self._session.commit() + await self._session.refresh(db_obj) return db_obj async def delete(self, id: int) -> bool: statement = delete(self.model).where(self.model.id == id) - result = await self.session.execute(statement) - await self.session.commit() + result = await self._session.execute(statement) + await self._session.commit() return result.rowcount > 0 \ No newline at end of file diff --git a/app/repositories/interview_repository.py b/app/repositories/interview_repository.py new file mode 100644 index 0000000..733cc2a --- /dev/null +++ b/app/repositories/interview_repository.py @@ -0,0 +1,115 @@ +from typing import Optional, List, Annotated +from datetime import datetime +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy import select, update, desc +from fastapi import Depends +from app.core.database import get_session +from app.models.interview import InterviewSession, InterviewStatus +from app.repositories.base_repository import BaseRepository + + +class InterviewRepository(BaseRepository[InterviewSession]): + def __init__(self, session: Annotated[AsyncSession, Depends(get_session)]): + super().__init__(InterviewSession, session) + + async def get_by_room_name(self, room_name: str) -> Optional[InterviewSession]: + """Получить сессию интервью по имени комнаты""" + statement = select(InterviewSession).where(InterviewSession.room_name == room_name) + result = await self._session.execute(statement) + return result.scalar_one_or_none() + + async def update_status(self, session_id: int, status: str, completed_at: Optional[datetime] = None) -> bool: + """Обновить статус сессии""" + try: + # Получаем объект и обновляем его напрямую + result = await self._session.execute( + select(InterviewSession).where(InterviewSession.id == session_id) + ) + session_obj = result.scalar_one_or_none() + + if not session_obj: + return False + + session_obj.status = status + if completed_at: + session_obj.completed_at = completed_at + + await self._session.commit() + return True + except Exception: + await self._session.rollback() + return False + + async def update_dialogue_history(self, room_name: str, dialogue_history: list) -> bool: + """Обновить историю диалога для сессии""" + try: + # Получаем объект и обновляем его напрямую + result = await self._session.execute( + select(InterviewSession).where(InterviewSession.room_name == room_name) + ) + session_obj = result.scalar_one_or_none() + + if not session_obj: + return False + + session_obj.dialogue_history = dialogue_history + await self._session.commit() + return True + except Exception: + await self._session.rollback() + return False + + async def update_ai_agent_status(self, session_id: int, pid: Optional[int] = None, status: str = "not_started") -> bool: + """Обновить статус AI агента""" + try: + # Получаем объект и обновляем его напрямую + result = await self._session.execute( + select(InterviewSession).where(InterviewSession.id == session_id) + ) + session_obj = result.scalar_one_or_none() + + if not session_obj: + return False + + session_obj.ai_agent_pid = pid + session_obj.ai_agent_status = status + await self._session.commit() + return True + except Exception: + await self._session.rollback() + return False + + async def get_sessions_with_running_agents(self) -> List[InterviewSession]: + """Получить сессии с запущенными AI агентами""" + statement = select(InterviewSession).where( + InterviewSession.ai_agent_status == "running" + ) + result = await self._session.execute(statement) + return result.scalars().all() + + async def get_active_session_by_resume_id(self, resume_id: int) -> Optional[InterviewSession]: + """Получить активную сессию собеседования для резюме""" + statement = ( + select(InterviewSession) + .where(InterviewSession.resume_id == resume_id) + .where(InterviewSession.status.in_(["created", "active"])) + .order_by(InterviewSession.started_at.desc()) + ) + result = await self._session.execute(statement) + return result.scalar_one_or_none() + + async def create_interview_session(self, resume_id: int, room_name: str) -> InterviewSession: + """Создать новую сессию интервью""" + from app.models.interview import InterviewSessionCreate + session_data = InterviewSessionCreate( + resume_id=resume_id, + room_name=room_name + ) + return await self.create(session_data.model_dump()) + + async def update_session_status(self, session_id: int, status: str) -> bool: + """Обновить статус сессии (алиас для update_status)""" + completed_at = None + if status == "completed": + completed_at = datetime.utcnow() + return await self.update_status(session_id, status, completed_at) \ No newline at end of file diff --git a/app/repositories/resume_repository.py b/app/repositories/resume_repository.py index e4050a7..f63248a 100644 --- a/app/repositories/resume_repository.py +++ b/app/repositories/resume_repository.py @@ -1,55 +1,55 @@ -from typing import List, Optional +from typing import List, Optional, Annotated from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select +from fastapi import Depends +from app.core.database import get_session from app.models.resume import Resume, ResumeStatus from .base_repository import BaseRepository class ResumeRepository(BaseRepository[Resume]): - def __init__(self, session: AsyncSession): + def __init__(self, session: Annotated[AsyncSession, Depends(get_session)]): super().__init__(Resume, session) async def get_by_vacancy_id(self, vacancy_id: int) -> List[Resume]: statement = select(Resume).where(Resume.vacancy_id == vacancy_id) - result = await self.session.execute(statement) - return result.scalars().all() - - async def get_by_session_id(self, session_id: int) -> List[Resume]: - """Получить все резюме пользователя по session_id""" - statement = select(Resume).where(Resume.session_id == session_id) - result = await self.session.execute(statement) - return result.scalars().all() - - async def get_by_vacancy_and_session(self, vacancy_id: int, session_id: int) -> List[Resume]: - """Получить резюме пользователя для конкретной вакансии""" - statement = select(Resume).where( - Resume.vacancy_id == vacancy_id, - Resume.session_id == session_id - ) - result = await self.session.execute(statement) + result = await self._session.execute(statement) return result.scalars().all() async def get_by_status(self, status: ResumeStatus) -> List[Resume]: statement = select(Resume).where(Resume.status == status) - result = await self.session.execute(statement) + result = await self._session.execute(statement) return result.scalars().all() - async def get_by_applicant_email(self, email: str) -> List[Resume]: - statement = select(Resume).where(Resume.applicant_email == email) - result = await self.session.execute(statement) - return result.scalars().all() + async def get_by_id(self, resume_id: int) -> Optional[Resume]: + """Получить резюме по ID""" + return await self.get(resume_id) - async def create_with_session(self, resume_data: dict, session_id: int) -> Resume: + async def create_with_session(self, resume_dict: dict, session_id: int) -> Resume: """Создать резюме с привязкой к сессии""" - resume_data['session_id'] = session_id - new_resume = Resume(**resume_data) - return await self.create(new_resume) + resume_dict['session_id'] = session_id + return await self.create(resume_dict) + + async def get_by_session_id(self, session_id: int) -> List[Resume]: + """Получить резюме по session_id""" + statement = select(Resume).where(Resume.session_id == session_id) + result = await self._session.execute(statement) + return result.scalars().all() + + async def get_by_vacancy_and_session(self, vacancy_id: int, session_id: int) -> List[Resume]: + """Получить резюме по vacancy_id и session_id""" + statement = select(Resume).where( + Resume.vacancy_id == vacancy_id, + Resume.session_id == session_id + ) + result = await self._session.execute(statement) + return result.scalars().all() async def update_status(self, resume_id: int, status: ResumeStatus) -> Optional[Resume]: + """Обновить статус резюме""" return await self.update(resume_id, {"status": status}) async def add_interview_report(self, resume_id: int, report_url: str) -> Optional[Resume]: - return await self.update(resume_id, { - "interview_report_url": report_url, - "status": ResumeStatus.INTERVIEWED - }) \ No newline at end of file + """Добавить ссылку на отчет интервью""" + return await self.update(resume_id, {"interview_report_url": report_url}) + diff --git a/app/repositories/session_repository.py b/app/repositories/session_repository.py index 67ab358..b29f431 100644 --- a/app/repositories/session_repository.py +++ b/app/repositories/session_repository.py @@ -1,13 +1,15 @@ -from typing import Optional +from typing import Optional, Annotated from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select +from fastapi import Depends +from app.core.database import get_session from app.models.session import Session from app.repositories.base_repository import BaseRepository from datetime import datetime class SessionRepository(BaseRepository[Session]): - def __init__(self, session: AsyncSession): + def __init__(self, session: Annotated[AsyncSession, Depends(get_session)]): super().__init__(Session, session) async def get_by_session_id(self, session_id: str) -> Optional[Session]: @@ -17,7 +19,7 @@ class SessionRepository(BaseRepository[Session]): Session.is_active == True, Session.expires_at > datetime.utcnow() ) - result = await self.session.execute(statement) + result = await self._session.execute(statement) return result.scalar_one_or_none() async def create_session(self, user_agent: Optional[str] = None, ip_address: Optional[str] = None) -> Session: @@ -31,9 +33,9 @@ class SessionRepository(BaseRepository[Session]): if session: session.is_active = False session.updated_at = datetime.utcnow() - self.session.add(session) - await self.session.commit() - await self.session.refresh(session) + self._session.add(session) + await self._session.commit() + await self._session.refresh(session) return True return False @@ -43,22 +45,22 @@ class SessionRepository(BaseRepository[Session]): if session: session.last_activity = datetime.utcnow() session.updated_at = datetime.utcnow() - self.session.add(session) - await self.session.commit() - await self.session.refresh(session) + self._session.add(session) + await self._session.commit() + await self._session.refresh(session) return True return False async def cleanup_expired_sessions(self) -> int: """Remove expired sessions""" statement = select(Session).where(Session.expires_at < datetime.utcnow()) - result = await self.session.execute(statement) + result = await self._session.execute(statement) expired_sessions = result.scalars().all() count = 0 for session in expired_sessions: - await self.session.delete(session) + await self._session.delete(session) count += 1 - await self.session.commit() + await self._session.commit() return count \ No newline at end of file diff --git a/app/repositories/vacancy_repository.py b/app/repositories/vacancy_repository.py index 171c56b..f003f6e 100644 --- a/app/repositories/vacancy_repository.py +++ b/app/repositories/vacancy_repository.py @@ -1,34 +1,62 @@ -from typing import List, Optional +from typing import List, Optional, Annotated from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, and_ +from fastapi import Depends +from app.core.database import get_session from app.models.vacancy import Vacancy, VacancyCreate, VacancyUpdate from .base_repository import BaseRepository class VacancyRepository(BaseRepository[Vacancy]): - def __init__(self, session: AsyncSession): + def __init__(self, session: Annotated[AsyncSession, Depends(get_session)]): super().__init__(Vacancy, session) async def get_by_company(self, company_name: str) -> List[Vacancy]: statement = select(Vacancy).where(Vacancy.company_name == company_name) - result = await self.session.execute(statement) + result = await self._session.execute(statement) return result.scalars().all() - async def get_by_area(self, area_name: str) -> List[Vacancy]: - statement = select(Vacancy).where(Vacancy.area_name == area_name) - result = await self.session.execute(statement) - return result.scalars().all() - - async def get_active_vacancies(self, skip: int = 0, limit: int = 100) -> List[Vacancy]: + async def get_active(self, skip: int = 0, limit: int = 100) -> List[Vacancy]: statement = ( select(Vacancy) .where(Vacancy.is_archived == False) .offset(skip) .limit(limit) ) - result = await self.session.execute(statement) + result = await self._session.execute(statement) return result.scalars().all() + async def search( + self, + title: Optional[str] = None, + company_name: Optional[str] = None, + area_name: Optional[str] = None, + skip: int = 0, + limit: int = 100 + ) -> List[Vacancy]: + """Поиск вакансий по критериям""" + statement = select(Vacancy) + conditions = [] + + if title: + conditions.append(Vacancy.title.ilike(f"%{title}%")) + if company_name: + conditions.append(Vacancy.company_name.ilike(f"%{company_name}%")) + if area_name: + conditions.append(Vacancy.area_name.ilike(f"%{area_name}%")) + + if conditions: + statement = statement.where(and_(*conditions)) + + statement = statement.offset(skip).limit(limit) + result = await self._session.execute(statement) + return result.scalars().all() + + + async def get_active_vacancies(self, skip: int = 0, limit: int = 100) -> List[Vacancy]: + """Получить активные вакансии (алиас для get_active)""" + return await self.get_active(skip=skip, limit=limit) + async def search_vacancies( self, title: Optional[str] = None, @@ -37,19 +65,15 @@ class VacancyRepository(BaseRepository[Vacancy]): skip: int = 0, limit: int = 100 ) -> List[Vacancy]: - conditions = [] - - if title: - conditions.append(Vacancy.title.ilike(f"%{title}%")) - if company_name: - conditions.append(Vacancy.company_name.ilike(f"%{company_name}%")) - if area_name: - conditions.append(Vacancy.area_name.ilike(f"%{area_name}%")) - - statement = select(Vacancy) - if conditions: - statement = statement.where(and_(*conditions)) - - statement = statement.offset(skip).limit(limit) - result = await self.session.execute(statement) - return result.scalars().all() \ No newline at end of file + """Поиск вакансий (алиас для search)""" + return await self.search( + title=title, + company_name=company_name, + area_name=area_name, + skip=skip, + limit=limit + ) + + async def archive(self, vacancy_id: int) -> Optional[Vacancy]: + """Архивировать вакансию""" + return await self.update(vacancy_id, {"is_active": False}) \ No newline at end of file diff --git a/app/routers/admin_router.py b/app/routers/admin_router.py index 7b03266..01acd40 100644 --- a/app/routers/admin_router.py +++ b/app/routers/admin_router.py @@ -1,117 +1,79 @@ from fastapi import APIRouter, Depends, HTTPException -from sqlalchemy.ext.asyncio import AsyncSession -from app.core.database import get_db -from app.services.interview_service import InterviewRoomService -from typing import List, Dict -import psutil +from app.services.admin_service import AdminService +from typing import Dict router = APIRouter(prefix="/admin", tags=["Admin"]) @router.get("/interview-processes") -async def list_active_interview_processes(db: AsyncSession = Depends(get_db)) -> Dict: +async def list_active_interview_processes( + admin_service: AdminService = Depends(AdminService) +) -> Dict: """Список всех активных AI процессов интервью""" - interview_service = InterviewRoomService(db) - - active_sessions = await interview_service.get_active_agent_processes() - - processes_info = [] - for session in active_sessions: - process_info = { - "session_id": session.id, - "resume_id": session.resume_id, - "room_name": session.room_name, - "pid": session.ai_agent_pid, - "status": session.ai_agent_status, - "started_at": session.started_at.isoformat() if session.started_at else None, - "is_running": False, - "memory_mb": 0, - "cpu_percent": 0 - } - - # Проверяем реальное состояние процесса - if session.ai_agent_pid: - try: - process = psutil.Process(session.ai_agent_pid) - if process.is_running(): - process_info["is_running"] = True - process_info["memory_mb"] = round(process.memory_info().rss / 1024 / 1024, 1) - process_info["cpu_percent"] = round(process.cpu_percent(), 1) - except (psutil.NoSuchProcess, psutil.AccessDenied): - pass - - processes_info.append(process_info) - - return { - "total_active_sessions": len(active_sessions), - "processes": processes_info - } + return await admin_service.get_active_interview_processes() @router.post("/interview-processes/{session_id}/stop") -async def stop_interview_process(session_id: int, db: AsyncSession = Depends(get_db)) -> Dict: +async def stop_interview_process( + session_id: int, + admin_service: AdminService = Depends(AdminService) +) -> Dict: """Остановить AI процесс для конкретного интервью""" - interview_service = InterviewRoomService(db) + result = await admin_service.stop_interview_process(session_id) - success = await interview_service.stop_agent_process(session_id) + if not result["success"]: + raise HTTPException(status_code=404, detail=result["message"]) - if success: - return {"message": f"AI process for session {session_id} stopped successfully"} - else: - raise HTTPException(status_code=404, detail=f"Session {session_id} not found or process not running") + return result @router.post("/interview-processes/cleanup") -async def cleanup_dead_processes(db: AsyncSession = Depends(get_db)) -> Dict: +async def cleanup_dead_processes( + admin_service: AdminService = Depends(AdminService) +) -> Dict: """Очистка мертвых процессов""" - interview_service = InterviewRoomService(db) - - cleaned_count = await interview_service.cleanup_dead_processes() - - return { - "message": f"Cleaned up {cleaned_count} dead processes" - } + return await admin_service.cleanup_dead_processes() @router.get("/system-stats") -async def get_system_stats() -> Dict: +async def get_system_stats( + admin_service: AdminService = Depends(AdminService) +) -> Dict: """Общая статистика системы""" - try: - # Общая информация о системе - cpu_percent = psutil.cpu_percent(interval=1) - memory = psutil.virtual_memory() - disk = psutil.disk_usage('/') - - # Поиск всех Python процессов (потенциальные AI агенты) - python_processes = [] - for proc in psutil.process_iter(['pid', 'name', 'memory_info', 'cpu_percent', 'cmdline']): - try: - if proc.info['name'] and 'python' in proc.info['name'].lower(): - cmdline = ' '.join(proc.info['cmdline']) if proc.info['cmdline'] else '' - if 'ai_interviewer_agent' in cmdline: - python_processes.append({ - 'pid': proc.info['pid'], - 'memory_mb': round(proc.info['memory_info'].rss / 1024 / 1024, 1), - 'cpu_percent': proc.info['cpu_percent'] or 0, - 'cmdline': cmdline - }) - except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): - pass - - return { - "system": { - "cpu_percent": cpu_percent, - "memory_percent": memory.percent, - "memory_available_gb": round(memory.available / 1024 / 1024 / 1024, 1), - "disk_percent": disk.percent, - "disk_free_gb": round(disk.free / 1024 / 1024 / 1024, 1) - }, - "ai_agents": { - "count": len(python_processes), - "total_memory_mb": sum(p['memory_mb'] for p in python_processes), - "processes": python_processes - } - } - - except Exception as e: - raise HTTPException(status_code=500, detail=f"Error getting system stats: {str(e)}") \ No newline at end of file + result = await admin_service.get_system_stats() + + if "error" in result: + raise HTTPException(status_code=500, detail=result["error"]) + + return result + + +@router.get("/analytics/dashboard") +async def get_analytics_dashboard( + admin_service: AdminService = Depends(AdminService) +) -> Dict: + """Основная аналитическая панель""" + return await admin_service.get_analytics_dashboard() + + +@router.get("/analytics/candidates/{vacancy_id}") +async def get_vacancy_analytics( + vacancy_id: int, + admin_service: AdminService = Depends(AdminService) +) -> Dict: + """Аналитика кандидатов по конкретной вакансии""" + return await admin_service.get_vacancy_analytics(vacancy_id) + + +@router.post("/analytics/generate-reports/{vacancy_id}") +async def generate_reports_for_vacancy( + vacancy_id: int, + admin_service: AdminService = Depends(AdminService) +) -> Dict: + """Запустить генерацию отчетов для всех кандидатов вакансии""" + result = await admin_service.generate_reports_for_vacancy(vacancy_id) + + if "error" in result: + raise HTTPException(status_code=404, detail=result["error"]) + + return result \ No newline at end of file diff --git a/app/routers/analysis_router.py b/app/routers/analysis_router.py new file mode 100644 index 0000000..54553f5 --- /dev/null +++ b/app/routers/analysis_router.py @@ -0,0 +1,225 @@ +# -*- coding: utf-8 -*- +from typing import List +from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks +from pydantic import BaseModel + +from app.repositories.resume_repository import ResumeRepository +from celery_worker.interview_analysis_task import generate_interview_report, analyze_multiple_candidates + + +router = APIRouter( + prefix="/analysis", + tags=["analysis"] +) + + +class AnalysisResponse(BaseModel): + """Ответ запуска задачи анализа""" + message: str + resume_id: int + task_id: str + + +class BulkAnalysisRequest(BaseModel): + """Запрос массового анализа""" + resume_ids: List[int] + + +class BulkAnalysisResponse(BaseModel): + """Ответ массового анализа""" + message: str + resume_count: int + task_id: str + + +class CandidateRanking(BaseModel): + """Рейтинг кандидата""" + resume_id: int + candidate_name: str + overall_score: int + recommendation: str + position: str + + +@router.post("/interview-report/{resume_id}", response_model=AnalysisResponse) +async def start_interview_analysis( + resume_id: int, + background_tasks: BackgroundTasks, + resume_repo: ResumeRepository = Depends(ResumeRepository) +): + """ + Запускает анализ интервью для конкретного кандидата + + Анализирует: + - Соответствие резюме вакансии + - Качество ответов в диалоге интервью + - Технические навыки и опыт + - Коммуникативные способности + - Общую рекомендацию и рейтинг + """ + + # Проверяем, существует ли резюме + resume = await resume_repo.get_by_id(resume_id) + + if not resume: + raise HTTPException(status_code=404, detail="Resume not found") + + # Запускаем задачу анализа + task = generate_interview_report.delay(resume_id) + + return AnalysisResponse( + message="Interview analysis started", + resume_id=resume_id, + task_id=task.id + ) + + +@router.post("/bulk-analysis", response_model=BulkAnalysisResponse) +async def start_bulk_analysis( + request: BulkAnalysisRequest, + background_tasks: BackgroundTasks, + resume_repo: ResumeRepository = Depends(ResumeRepository) +): + """ + Запускает массовый анализ нескольких кандидатов + + Возвращает ранжированный список кандидатов по общему баллу + Полезно для сравнения кандидатов на одну позицию + """ + + # Проверяем, что все резюме существуют + existing_resumes = [] + + for resume_id in request.resume_ids: + resume = await resume_repo.get_by_id(resume_id) + if resume: + existing_resumes.append(resume_id) + + if not existing_resumes: + raise HTTPException(status_code=404, detail="No valid resumes found") + + # Запускаем задачу массового анализа + task = analyze_multiple_candidates.delay(existing_resumes) + + return BulkAnalysisResponse( + message="Bulk analysis started", + resume_count=len(existing_resumes), + task_id=task.id + ) + + +@router.get("/ranking/{vacancy_id}") +async def get_candidates_ranking( + vacancy_id: int, + resume_repo: ResumeRepository = Depends(ResumeRepository) +): + """ + Получить ранжированный список кандидатов для вакансии + + Сортирует кандидатов по результатам анализа интервью + Показывает только тех, кто прошел интервью + """ + + # Получаем все резюме для вакансии со статусом "interviewed" + resumes = await resume_repo.get_by_vacancy_id(vacancy_id) + interviewed_resumes = [r for r in resumes if r.status in ["interviewed"]] + + if not interviewed_resumes: + return { + "vacancy_id": vacancy_id, + "candidates": [], + "message": "No interviewed candidates found" + } + + # Запускаем массовый анализ если еще не было + resume_ids = [r.id for r in interviewed_resumes] + task = analyze_multiple_candidates.delay(resume_ids) + + # В реальности здесь нужно дождаться выполнения или получить из кэша + # Пока возвращаем информацию о запущенной задаче + return { + "vacancy_id": vacancy_id, + "task_id": task.id, + "message": f"Analysis started for {len(resume_ids)} candidates", + "resume_ids": resume_ids + } + + +@router.get("/report/{resume_id}") +async def get_interview_report( + resume_id: int, + resume_repo: ResumeRepository = Depends(ResumeRepository) +): + """ + Получить готовый отчет анализа интервью + + Если отчет еще не готов - запускает анализ + """ + + resume = await resume_repo.get_by_id(resume_id) + + if not resume: + raise HTTPException(status_code=404, detail="Resume not found") + + # Проверяем, есть ли уже готовый отчет в notes + if resume.notes and "ОЦЕНКА КАНДИДАТА" in resume.notes: + return { + "resume_id": resume_id, + "candidate_name": resume.applicant_name, + "status": "completed", + "report_summary": resume.notes, + "message": "Report available" + } + + # Если отчета нет - запускаем анализ + task = generate_interview_report.delay(resume_id) + + return { + "resume_id": resume_id, + "candidate_name": resume.applicant_name, + "status": "in_progress", + "task_id": task.id, + "message": "Analysis started, check back later" + } + + +@router.get("/statistics/{vacancy_id}") +async def get_analysis_statistics( + vacancy_id: int, + resume_repo: ResumeRepository = Depends(ResumeRepository) +): + """ + Получить статистику анализа кандидатов по вакансии + """ + + resumes = await resume_repo.get_by_vacancy_id(vacancy_id) + + total_candidates = len(resumes) + interviewed = len([r for r in resumes if r.status == "interviewed"]) + with_reports = len([r for r in resumes if r.notes and "ОЦЕНКА КАНДИДАТА" in r.notes]) + + # Подсчитываем рекомендации из notes (упрощенно) + recommendations = {"strongly_recommend": 0, "recommend": 0, "consider": 0, "reject": 0} + + for resume in resumes: + if resume.notes and "ОЦЕНКА КАНДИДАТА" in resume.notes: + notes = resume.notes.lower() + if "strongly_recommend" in notes: + recommendations["strongly_recommend"] += 1 + elif "recommend" in notes and "strongly_recommend" not in notes: + recommendations["recommend"] += 1 + elif "consider" in notes: + recommendations["consider"] += 1 + elif "reject" in notes: + recommendations["reject"] += 1 + + return { + "vacancy_id": vacancy_id, + "statistics": { + "total_candidates": total_candidates, + "interviewed_candidates": interviewed, + "analyzed_candidates": with_reports, + "recommendations": recommendations, + "analysis_completion": round((with_reports / max(interviewed, 1)) * 100, 1) if interviewed > 0 else 0 + } + } \ No newline at end of file diff --git a/app/routers/interview_router.py b/app/routers/interview_router.py index 76ebd2b..7d6be16 100644 --- a/app/routers/interview_router.py +++ b/app/routers/interview_router.py @@ -1,6 +1,5 @@ from fastapi import APIRouter, Depends, HTTPException, Request -from sqlalchemy.ext.asyncio import AsyncSession -from app.core.session_middleware import get_current_session, get_db_session +from app.core.session_middleware import get_current_session from app.models.session import Session from app.models.interview import InterviewValidationResponse, LiveKitTokenResponse, InterviewStatus from app.services.interview_service import InterviewRoomService @@ -13,14 +12,12 @@ async def validate_interview( request: Request, resume_id: int, current_session: Session = Depends(get_current_session), - db_session: AsyncSession = Depends(get_db_session) + interview_service: InterviewRoomService = Depends(InterviewRoomService) ): """Валидация резюме для проведения собеседования""" if not current_session: raise HTTPException(status_code=401, detail="No active session") - interview_service = InterviewRoomService(db_session) - # Проверяем валидность резюме для собеседования validation_result = await interview_service.validate_resume_for_interview(resume_id) @@ -40,14 +37,12 @@ async def get_interview_token( request: Request, resume_id: int, current_session: Session = Depends(get_current_session), - db_session: AsyncSession = Depends(get_db_session) + interview_service: InterviewRoomService = Depends(InterviewRoomService) ): """Получение токена для LiveKit собеседования""" if not current_session: raise HTTPException(status_code=401, detail="No active session") - interview_service = InterviewRoomService(db_session) - # Получаем токен для LiveKit token_response = await interview_service.get_livekit_token(resume_id) @@ -65,14 +60,12 @@ async def end_interview( request: Request, resume_id: int, current_session: Session = Depends(get_current_session), - db_session: AsyncSession = Depends(get_db_session) + interview_service: InterviewRoomService = Depends(InterviewRoomService) ): """Завершение собеседования""" if not current_session: raise HTTPException(status_code=401, detail="No active session") - interview_service = InterviewRoomService(db_session) - # Получаем активную сессию собеседования interview_session = await interview_service.get_interview_session(resume_id) diff --git a/app/routers/resume_router.py b/app/routers/resume_router.py index ebbee7d..d028ddd 100644 --- a/app/routers/resume_router.py +++ b/app/routers/resume_router.py @@ -1,7 +1,6 @@ from fastapi import APIRouter, Depends, HTTPException, Query, UploadFile, File, Form, Request -from sqlalchemy.ext.asyncio import AsyncSession from typing import List, Optional -from app.core.session_middleware import get_current_session, get_db_session +from app.core.session_middleware import get_current_session from app.models.resume import ResumeCreate, ResumeUpdate, ResumeRead, ResumeStatus from app.models.session import Session from app.services.resume_service import ResumeService @@ -22,13 +21,12 @@ async def create_resume( cover_letter: Optional[str] = Form(None), resume_file: UploadFile = File(...), current_session: Session = Depends(get_current_session), - db_session: AsyncSession = Depends(get_db_session) + resume_service: ResumeService = Depends(ResumeService) ): if not current_session: raise HTTPException(status_code=401, detail="No active session") file_service = FileService() - resume_service = ResumeService(db_session) upload_result = await file_service.upload_resume_file(resume_file) if not upload_result: @@ -74,13 +72,11 @@ async def get_resumes( vacancy_id: Optional[int] = Query(None), status: Optional[ResumeStatus] = Query(None), current_session: Session = Depends(get_current_session), - db_session: AsyncSession = Depends(get_db_session) + service: ResumeService = Depends(ResumeService) ): if not current_session: raise HTTPException(status_code=401, detail="No active session") - service = ResumeService(db_session) - # Получаем только резюме текущего пользователя if vacancy_id: return await service.get_resumes_by_vacancy_and_session(vacancy_id, current_session.id) @@ -93,12 +89,10 @@ async def get_resume( request: Request, resume_id: int, current_session: Session = Depends(get_current_session), - db_session: AsyncSession = Depends(get_db_session) + service: ResumeService = Depends(ResumeService) ): if not current_session: raise HTTPException(status_code=401, detail="No active session") - - service = ResumeService(db_session) resume = await service.get_resume(resume_id) if not resume: @@ -117,12 +111,10 @@ async def update_resume( resume_id: int, resume: ResumeUpdate, current_session: Session = Depends(get_current_session), - db_session: AsyncSession = Depends(get_db_session) + service: ResumeService = Depends(ResumeService) ): if not current_session: raise HTTPException(status_code=401, detail="No active session") - - service = ResumeService(db_session) existing_resume = await service.get_resume(resume_id) if not existing_resume: @@ -142,12 +134,10 @@ async def update_resume_status( resume_id: int, status: ResumeStatus, current_session: Session = Depends(get_current_session), - db_session: AsyncSession = Depends(get_db_session) + service: ResumeService = Depends(ResumeService) ): if not current_session: raise HTTPException(status_code=401, detail="No active session") - - service = ResumeService(db_session) existing_resume = await service.get_resume(resume_id) if not existing_resume: @@ -167,13 +157,12 @@ async def upload_interview_report( resume_id: int, report_file: UploadFile = File(...), current_session: Session = Depends(get_current_session), - db_session: AsyncSession = Depends(get_db_session) + resume_service: ResumeService = Depends(ResumeService) ): if not current_session: raise HTTPException(status_code=401, detail="No active session") file_service = FileService() - resume_service = ResumeService(db_session) existing_resume = await resume_service.get_resume(resume_id) if not existing_resume: @@ -198,14 +187,13 @@ async def get_parsing_status( resume_id: int, task_id: str = Query(..., description="Task ID from resume upload response"), current_session: Session = Depends(get_current_session), - db_session: AsyncSession = Depends(get_db_session) + service: ResumeService = Depends(ResumeService) ): """Получить статус парсинга резюме по task_id""" if not current_session: raise HTTPException(status_code=401, detail="No active session") # Проверяем доступ к резюме - service = ResumeService(db_session) resume = await service.get_resume(resume_id) if not resume: @@ -263,12 +251,10 @@ async def delete_resume( request: Request, resume_id: int, current_session: Session = Depends(get_current_session), - db_session: AsyncSession = Depends(get_db_session) + service: ResumeService = Depends(ResumeService) ): if not current_session: raise HTTPException(status_code=401, detail="No active session") - - service = ResumeService(db_session) existing_resume = await service.get_resume(resume_id) if not existing_resume: diff --git a/app/routers/session_router.py b/app/routers/session_router.py index ad46c93..55d037d 100644 --- a/app/routers/session_router.py +++ b/app/routers/session_router.py @@ -1,7 +1,6 @@ from fastapi import APIRouter, Depends, Request, HTTPException from fastapi.responses import JSONResponse -from sqlalchemy.ext.asyncio import AsyncSession -from app.core.session_middleware import get_current_session, get_db_session +from app.core.session_middleware import get_current_session from app.repositories.session_repository import SessionRepository from app.models.session import Session, SessionRead from typing import Optional @@ -38,18 +37,15 @@ async def get_current_session_info( async def refresh_session( request: Request, current_session: Session = Depends(get_current_session), - db_session: AsyncSession = Depends(get_db_session) + session_repo: SessionRepository = Depends(SessionRepository) ): """Продлить сессию на 30 дней""" if not current_session: raise HTTPException(status_code=401, detail="No active session") - - session_repo = SessionRepository(db_session) current_session.extend_session(days=30) - db_session.add(current_session) - db_session.commit() - db_session.refresh(current_session) + # Обновляем через репозиторий + await session_repo.update_last_activity(current_session.session_id) logger.info(f"Extended session {current_session.session_id}") @@ -64,13 +60,11 @@ async def refresh_session( async def logout( request: Request, current_session: Session = Depends(get_current_session), - db_session: AsyncSession = Depends(get_db_session) + session_repo: SessionRepository = Depends(SessionRepository) ): """Завершить текущую сессию""" if not current_session: raise HTTPException(status_code=401, detail="No active session") - - session_repo = SessionRepository(db_session) deactivated = await session_repo.deactivate_session(current_session.session_id) if deactivated: diff --git a/app/routers/vacancy_router.py b/app/routers/vacancy_router.py index 21de339..d69d1eb 100644 --- a/app/routers/vacancy_router.py +++ b/app/routers/vacancy_router.py @@ -1,9 +1,8 @@ from fastapi import APIRouter, Depends, HTTPException, Query -from sqlalchemy.ext.asyncio import AsyncSession from typing import List, Optional -from app.core.database import get_session -from app.models.vacancy import VacancyCreate, VacancyUpdate, VacancyRead + from app.services.vacancy_service import VacancyService +from app.models.vacancy import VacancyCreate, VacancyUpdate, VacancyRead router = APIRouter(prefix="/vacancies", tags=["vacancies"]) @@ -11,10 +10,9 @@ router = APIRouter(prefix="/vacancies", tags=["vacancies"]) @router.post("/", response_model=VacancyRead) async def create_vacancy( vacancy: VacancyCreate, - session: AsyncSession = Depends(get_session) + vacancy_service: VacancyService = Depends(VacancyService) ): - service = VacancyService(session) - return await service.create_vacancy(vacancy) + return await vacancy_service.create_vacancy(vacancy) @router.get("/", response_model=List[VacancyRead]) @@ -25,12 +23,10 @@ async def get_vacancies( title: Optional[str] = Query(None), company_name: Optional[str] = Query(None), area_name: Optional[str] = Query(None), - session: AsyncSession = Depends(get_session) + vacancy_service: VacancyService = Depends(VacancyService) ): - service = VacancyService(session) - if any([title, company_name, area_name]): - return await service.search_vacancies( + return await vacancy_service.search_vacancies( title=title, company_name=company_name, area_name=area_name, @@ -39,18 +35,17 @@ async def get_vacancies( ) if active_only: - return await service.get_active_vacancies(skip=skip, limit=limit) + return await vacancy_service.get_active_vacancies(skip=skip, limit=limit) - return await service.get_all_vacancies(skip=skip, limit=limit) + return await vacancy_service.get_all_vacancies(skip=skip, limit=limit) @router.get("/{vacancy_id}", response_model=VacancyRead) async def get_vacancy( vacancy_id: int, - session: AsyncSession = Depends(get_session) + vacancy_service: VacancyService = Depends(VacancyService) ): - service = VacancyService(session) - vacancy = await service.get_vacancy(vacancy_id) + vacancy = await vacancy_service.get_vacancy(vacancy_id) if not vacancy: raise HTTPException(status_code=404, detail="Vacancy not found") return vacancy @@ -60,10 +55,9 @@ async def get_vacancy( async def update_vacancy( vacancy_id: int, vacancy: VacancyUpdate, - session: AsyncSession = Depends(get_session) + vacancy_service: VacancyService = Depends(VacancyService) ): - service = VacancyService(session) - updated_vacancy = await service.update_vacancy(vacancy_id, vacancy) + updated_vacancy = await vacancy_service.update_vacancy(vacancy_id, vacancy) if not updated_vacancy: raise HTTPException(status_code=404, detail="Vacancy not found") return updated_vacancy @@ -72,10 +66,9 @@ async def update_vacancy( @router.delete("/{vacancy_id}") async def delete_vacancy( vacancy_id: int, - session: AsyncSession = Depends(get_session) + vacancy_service: VacancyService = Depends(VacancyService) ): - service = VacancyService(session) - success = await service.delete_vacancy(vacancy_id) + success = await vacancy_service.delete_vacancy(vacancy_id) if not success: raise HTTPException(status_code=404, detail="Vacancy not found") return {"message": "Vacancy deleted successfully"} @@ -84,10 +77,9 @@ async def delete_vacancy( @router.patch("/{vacancy_id}/archive", response_model=VacancyRead) async def archive_vacancy( vacancy_id: int, - session: AsyncSession = Depends(get_session) + vacancy_service: VacancyService = Depends(VacancyService) ): - service = VacancyService(session) - archived_vacancy = await service.archive_vacancy(vacancy_id) + archived_vacancy = await vacancy_service.archive_vacancy(vacancy_id) if not archived_vacancy: raise HTTPException(status_code=404, detail="Vacancy not found") return archived_vacancy \ No newline at end of file diff --git a/app/services/admin_service.py b/app/services/admin_service.py new file mode 100644 index 0000000..03e2d15 --- /dev/null +++ b/app/services/admin_service.py @@ -0,0 +1,257 @@ +from typing import Dict, List, Annotated +from fastapi import Depends + +from app.repositories.interview_repository import InterviewRepository +from app.repositories.resume_repository import ResumeRepository +from app.services.interview_service import InterviewRoomService +from app.services.interview_finalization_service import InterviewFinalizationService + + +class AdminService: + def __init__( + self, + interview_repo: Annotated[InterviewRepository, Depends(InterviewRepository)], + resume_repo: Annotated[ResumeRepository, Depends(ResumeRepository)], + interview_service: Annotated[InterviewRoomService, Depends(InterviewRoomService)], + finalization_service: Annotated[InterviewFinalizationService, Depends(InterviewFinalizationService)] + ): + self.interview_repo = interview_repo + self.resume_repo = resume_repo + self.interview_service = interview_service + self.finalization_service = finalization_service + + async def get_active_interview_processes(self): + """Получить список активных AI процессов""" + active_sessions = await self.interview_service.get_active_agent_processes() + + import psutil + processes_info = [] + + for session in active_sessions: + process_info = { + "session_id": session.id, + "resume_id": session.resume_id, + "room_name": session.room_name, + "pid": session.ai_agent_pid, + "status": session.ai_agent_status, + "started_at": session.started_at.isoformat() if session.started_at else None, + "is_running": False, + "memory_mb": 0, + "cpu_percent": 0 + } + + if session.ai_agent_pid: + try: + process = psutil.Process(session.ai_agent_pid) + if process.is_running(): + process_info["is_running"] = True + process_info["memory_mb"] = round(process.memory_info().rss / 1024 / 1024, 1) + process_info["cpu_percent"] = round(process.cpu_percent(interval=0.1), 1) + except (psutil.NoSuchProcess, psutil.AccessDenied): + pass + + processes_info.append(process_info) + + return { + "active_processes": len([p for p in processes_info if p["is_running"]]), + "total_sessions": len(processes_info), + "processes": processes_info + } + + async def stop_interview_process(self, session_id: int): + """Остановить AI процесс интервью""" + success = await self.interview_service.stop_agent_process(session_id) + + return { + "success": success, + "message": f"Process for session {session_id} {'stopped' if success else 'failed to stop'}" + } + + async def cleanup_dead_processes(self): + """Очистить информацию о мертвых процессах""" + cleaned_count = await self.finalization_service.cleanup_dead_processes() + + return { + "cleaned_processes": cleaned_count, + "message": f"Cleaned up {cleaned_count} dead processes" + } + + async def get_analytics_dashboard(self) -> Dict: + """Основная аналитическая панель""" + + all_resumes = await self.resume_repo.get_all() + + status_stats = {} + for resume in all_resumes: + status = resume.status.value if hasattr(resume.status, 'value') else str(resume.status) + status_stats[status] = status_stats.get(status, 0) + 1 + + analyzed_count = 0 + recommendation_stats = {"strongly_recommend": 0, "recommend": 0, "consider": 0, "reject": 0} + + for resume in all_resumes: + if resume.notes and "ОЦЕНКА КАНДИДАТА" in resume.notes: + analyzed_count += 1 + notes = resume.notes.lower() + + if "strongly_recommend" in notes: + recommendation_stats["strongly_recommend"] += 1 + elif "recommend" in notes and "strongly_recommend" not in notes: + recommendation_stats["recommend"] += 1 + elif "consider" in notes: + recommendation_stats["consider"] += 1 + elif "reject" in notes: + recommendation_stats["reject"] += 1 + + recent_resumes = sorted(all_resumes, key=lambda x: x.updated_at, reverse=True)[:10] + recent_activity = [] + + for resume in recent_resumes: + activity_item = { + "resume_id": resume.id, + "candidate_name": resume.applicant_name, + "status": resume.status.value if hasattr(resume.status, 'value') else str(resume.status), + "updated_at": resume.updated_at.isoformat() if resume.updated_at else None, + "has_analysis": resume.notes and "ОЦЕНКА КАНДИДАТА" in resume.notes + } + recent_activity.append(activity_item) + + return { + "summary": { + "total_candidates": len(all_resumes), + "interviewed_candidates": status_stats.get("interviewed", 0), + "analyzed_candidates": analyzed_count, + "analysis_completion_rate": round((analyzed_count / max(len(all_resumes), 1)) * 100, 1) + }, + "status_distribution": status_stats, + "recommendation_distribution": recommendation_stats, + "recent_activity": recent_activity + } + + async def get_vacancy_analytics(self, vacancy_id: int) -> Dict: + """Аналитика кандидатов по конкретной вакансии""" + + vacancy_resumes = await self.resume_repo.get_by_vacancy_id(vacancy_id) + + if not vacancy_resumes: + return { + "vacancy_id": vacancy_id, + "message": "No candidates found for this vacancy", + "candidates": [] + } + + candidates_info = [] + + for resume in vacancy_resumes: + overall_score = None + recommendation = None + + if resume.notes and "ОЦЕНКА КАНДИДАТА" in resume.notes: + notes = resume.notes + if "Общий балл:" in notes: + try: + score_line = [line for line in notes.split('\n') if "Общий балл:" in line][0] + overall_score = int(score_line.split("Общий балл:")[1].split("/")[0].strip()) + except: + pass + + if "Рекомендация:" in notes: + try: + rec_line = [line for line in notes.split('\n') if "Рекомендация:" in line][0] + recommendation = rec_line.split("Рекомендация:")[1].strip() + except: + pass + + candidate_info = { + "resume_id": resume.id, + "candidate_name": resume.applicant_name, + "email": resume.applicant_email, + "status": resume.status.value if hasattr(resume.status, 'value') else str(resume.status), + "created_at": resume.created_at.isoformat() if resume.created_at else None, + "updated_at": resume.updated_at.isoformat() if resume.updated_at else None, + "has_analysis": resume.notes and "ОЦЕНКА КАНДИДАТА" in resume.notes, + "overall_score": overall_score, + "recommendation": recommendation, + "has_parsed_data": bool(resume.parsed_data), + "has_interview_plan": bool(resume.interview_plan) + } + + candidates_info.append(candidate_info) + + candidates_info.sort(key=lambda x: (x['overall_score'] or 0, x['updated_at'] or ''), reverse=True) + + return { + "vacancy_id": vacancy_id, + "total_candidates": len(candidates_info), + "candidates": candidates_info + } + + async def generate_reports_for_vacancy(self, vacancy_id: int) -> Dict: + """Запустить генерацию отчетов для всех кандидатов вакансии""" + + from celery_worker.interview_analysis_task import analyze_multiple_candidates + + vacancy_resumes = await self.resume_repo.get_by_vacancy_id(vacancy_id) + + interviewed_resumes = [r for r in vacancy_resumes if r.status in ["interviewed"]] + + if not interviewed_resumes: + return { + "error": "No interviewed candidates found for this vacancy", + "vacancy_id": vacancy_id + } + + resume_ids = [r.id for r in interviewed_resumes] + + task = analyze_multiple_candidates.delay(resume_ids) + + return { + "vacancy_id": vacancy_id, + "task_id": task.id, + "message": f"Analysis started for {len(resume_ids)} candidates", + "resume_ids": resume_ids + } + + async def get_system_stats(self) -> Dict: + """Общая статистика системы""" + import psutil + + try: + cpu_percent = psutil.cpu_percent(interval=1) + memory = psutil.virtual_memory() + disk = psutil.disk_usage('/') + + python_processes = [] + for proc in psutil.process_iter(['pid', 'name', 'memory_info', 'cpu_percent', 'cmdline']): + try: + if proc.info['name'] and 'python' in proc.info['name'].lower(): + cmdline = ' '.join(proc.info['cmdline']) if proc.info['cmdline'] else '' + if 'ai_interviewer_agent' in cmdline: + python_processes.append({ + 'pid': proc.info['pid'], + 'memory_mb': round(proc.info['memory_info'].rss / 1024 / 1024, 1), + 'cpu_percent': proc.info['cpu_percent'] or 0, + 'cmdline': cmdline + }) + except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): + pass + + return { + "system": { + "cpu_percent": cpu_percent, + "memory_percent": memory.percent, + "memory_available_gb": round(memory.available / 1024 / 1024 / 1024, 1), + "disk_percent": disk.percent, + "disk_free_gb": round(disk.free / 1024 / 1024 / 1024, 1) + }, + "ai_agents": { + "count": len(python_processes), + "total_memory_mb": sum(p['memory_mb'] for p in python_processes), + "processes": python_processes + } + } + + except Exception as e: + return { + "error": f"Error getting system stats: {str(e)}" + } \ No newline at end of file diff --git a/app/services/ai_interviewer_service.py b/app/services/ai_interviewer_service.py index 76f8e80..e461120 100644 --- a/app/services/ai_interviewer_service.py +++ b/app/services/ai_interviewer_service.py @@ -104,16 +104,7 @@ class AIInterviewerService: pass except Exception as e: logger.error(f"Error processing user audio: {str(e)}") - - async def speech_to_text(self, audio_data: bytes) -> Optional[str]: - """Конвертация речи в текст через Whisper API""" - # TODO: Интеграция с OpenAI Whisper или другим STT сервисом - pass - - async def text_to_speech(self, text: str) -> bytes: - """Конвертация текста в речь через TTS сервис""" - # TODO: Интеграция с ElevenLabs, OpenAI TTS или другим сервисом - pass + async def generate_interview_questions(self): """Генерация вопросов для интервью на основе резюме""" diff --git a/app/services/interview_finalization_service.py b/app/services/interview_finalization_service.py new file mode 100644 index 0000000..a888a9a --- /dev/null +++ b/app/services/interview_finalization_service.py @@ -0,0 +1,197 @@ +from typing import Optional, Annotated +from datetime import datetime +import logging +from fastapi import Depends + +from app.repositories.interview_repository import InterviewRepository +from app.repositories.resume_repository import ResumeRepository +from app.models.resume import ResumeStatus + +logger = logging.getLogger("interview-finalization") + + +class InterviewFinalizationService: + """Сервис для завершения интервью и запуска анализа""" + + def __init__( + self, + interview_repo: Annotated[InterviewRepository, Depends(InterviewRepository)], + resume_repo: Annotated[ResumeRepository, Depends(ResumeRepository)] + ): + self.interview_repo = interview_repo + self.resume_repo = resume_repo + + async def finalize_interview( + self, + room_name: str, + dialogue_history: list, + interview_metrics: dict = None + ) -> Optional[dict]: + """ + Завершает интервью и запускает анализ + + Args: + room_name: Имя комнаты LiveKit + dialogue_history: История диалога + interview_metrics: Метрики интервью (количество вопросов, время и т.д.) + + Returns: + dict с информацией о завершенном интервью или None если ошибка + """ + try: + logger.info(f"[FINALIZE] Starting finalization for room: {room_name}") + + # 1. Находим сессию интервью + interview_session = await self.interview_repo.get_by_room_name(room_name) + if not interview_session: + logger.error(f"[FINALIZE] Interview session not found for room: {room_name}") + return None + + # 2. Обновляем статус сессии интервью на "completed" + success = await self.interview_repo.update_status( + interview_session.id, + "completed", + datetime.utcnow() + ) + + if not success: + logger.error(f"[FINALIZE] Failed to update session status for {interview_session.id}") + return None + + resume_id = interview_session.resume_id + logger.info(f"[FINALIZE] Interview session {interview_session.id} marked as completed") + + # 3. Обновляем статус резюме на "INTERVIEWED" + resume = await self.resume_repo.get(resume_id) + if resume: + await self.resume_repo.update(resume_id, { + "status": ResumeStatus.INTERVIEWED, + "updated_at": datetime.utcnow() + }) + logger.info(f"[FINALIZE] Resume {resume_id} status updated to INTERVIEWED") + else: + logger.warning(f"[FINALIZE] Resume {resume_id} not found") + + # 4. Сохраняем финальную историю диалога + await self.interview_repo.update_dialogue_history(room_name, dialogue_history) + logger.info(f"[FINALIZE] Saved final dialogue ({len(dialogue_history)} messages)") + + # 5. Обновляем статус AI агента + await self.interview_repo.update_ai_agent_status(interview_session.id, None, "stopped") + + # 6. Запускаем анализ интервью через Celery + analysis_task = await self._start_interview_analysis(resume_id) + + # 7. Собираем итоговые метрики + finalization_result = { + "session_id": interview_session.id, + "resume_id": resume_id, + "room_name": room_name, + "total_messages": len(dialogue_history), + "analysis_task_id": analysis_task.get('task_id') if analysis_task else None, + "completed_at": datetime.utcnow().isoformat(), + "metrics": interview_metrics or {} + } + + logger.info(f"[FINALIZE] Interview successfully finalized: {finalization_result}") + return finalization_result + + except Exception as e: + logger.error(f"[FINALIZE] Error finalizing interview for room {room_name}: {str(e)}") + return None + + async def _start_interview_analysis(self, resume_id: int): + """Запускает анализ интервью через Celery""" + # try: + logger.info(f"[FINALIZE] Attempting to start analysis task for resume_id: {resume_id}") + + # Импортируем задачу + # from celery_worker.interview_analysis_task import generate_interview_report + # logger.debug(f"[FINALIZE] Successfully imported generate_interview_report task") + # + # # Запускаем задачу + # task = generate_interview_report.delay(resume_id) + # logger.info(f"[FINALIZE] Analysis task started: {task.id} for resume_id: {resume_id}") + # return task + # + # except ImportError as e: + # logger.error(f"[FINALIZE] Import error for analysis task: {str(e)}") + # return None + # except ConnectionError as e: + # logger.error(f"[FINALIZE] Connection error starting analysis task for resume {resume_id}: {str(e)}") + # logger.warning(f"[FINALIZE] This may indicate Redis/Celery broker is not accessible from AI agent process") + # + # # Fallback: попытка запуска анализа через HTTP API + # return await self._start_analysis_via_http(resume_id) + # except Exception as e: + # logger.error(f"[FINALIZE] Failed to start analysis task for resume {resume_id}: {str(e)}") + # logger.debug(f"[FINALIZE] Exception type: {type(e).__name__}") + + # Fallback: попытка запуска анализа через HTTP API для любых других ошибок + return await self._start_analysis_via_http(resume_id) + + async def _start_analysis_via_http(self, resume_id: int): + """Fallback: запуск анализа через HTTP API (когда Celery недоступен из AI агента)""" + try: + import httpx + + url = f"http://localhost:8000/api/v1/analysis/interview-report/{resume_id}" + logger.info(f"[FINALIZE] Attempting HTTP fallback to URL: {url}") + + # Попробуем отправить HTTP запрос на локальный API для запуска анализа + async with httpx.AsyncClient() as client: + response = await client.post(url, timeout=5.0) + + if response.status_code == 200: + result = response.json() + logger.info(f"[FINALIZE] Analysis started via HTTP API for resume_id: {resume_id}, task_id: {result.get('task_id', 'unknown')}") + return result + else: + logger.error(f"[FINALIZE] HTTP API returned {response.status_code} for resume_id: {resume_id}") + logger.debug(f"[FINALIZE] Response body: {response.text[:200]}") + return None + + except Exception as e: + logger.error(f"[FINALIZE] HTTP fallback failed for resume {resume_id}: {str(e)}") + return None + + async def save_dialogue_to_session(self, room_name: str, dialogue_history: list) -> bool: + """Сохраняет диалог в сессию (для промежуточных сохранений)""" + try: + success = await self.interview_repo.update_dialogue_history(room_name, dialogue_history) + if success: + logger.info(f"[DIALOGUE] Saved {len(dialogue_history)} messages for room: {room_name}") + return success + except Exception as e: + logger.error(f"[DIALOGUE] Error saving dialogue for room {room_name}: {str(e)}") + return False + + async def cleanup_dead_processes(self) -> int: + """Очищает информацию о мертвых AI процессах""" + try: + import psutil + + active_sessions = await self.interview_repo.get_sessions_with_running_agents() + cleaned_count = 0 + + for session in active_sessions: + if session.ai_agent_pid: + try: + process = psutil.Process(session.ai_agent_pid) + if not process.is_running(): + await self.interview_repo.update_ai_agent_status( + session.id, None, "stopped" + ) + cleaned_count += 1 + except psutil.NoSuchProcess: + await self.interview_repo.update_ai_agent_status( + session.id, None, "stopped" + ) + cleaned_count += 1 + + logger.info(f"[CLEANUP] Cleaned up {cleaned_count} dead processes") + return cleaned_count + + except Exception as e: + logger.error(f"[CLEANUP] Error cleaning up processes: {str(e)}") + return 0 \ No newline at end of file diff --git a/app/services/interview_service.py b/app/services/interview_service.py index 1d525d5..7c92302 100644 --- a/app/services/interview_service.py +++ b/app/services/interview_service.py @@ -3,27 +3,30 @@ import time import uuid import json import subprocess -from typing import Optional -from datetime import datetime, timedelta +from typing import Optional, Annotated +from datetime import datetime from livekit.api import AccessToken, VideoGrants -from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy import select +from fastapi import Depends +from app.repositories.interview_repository import InterviewRepository +from app.repositories.resume_repository import ResumeRepository from app.models.interview import ( InterviewSession, - InterviewSessionCreate, - InterviewSessionUpdate, - InterviewStatus, + InterviewSessionCreate, InterviewValidationResponse, LiveKitTokenResponse ) from app.models.resume import Resume, ResumeStatus -from app.models.vacancy import Vacancy from rag.settings import settings class InterviewRoomService: - def __init__(self, db_session: AsyncSession): - self.db = db_session + def __init__( + self, + interview_repo: Annotated[InterviewRepository, Depends(InterviewRepository)], + resume_repo: Annotated[ResumeRepository, Depends(ResumeRepository)] + ): + self.interview_repo = interview_repo + self.resume_repo = resume_repo self.livekit_url = settings.livekit_url or "ws://localhost:7880" self.api_key = settings.livekit_api_key or "devkey" self.api_secret = settings.livekit_api_secret or "secret" @@ -32,8 +35,7 @@ class InterviewRoomService: """Проверяет, можно ли проводить собеседование для данного резюме""" try: # Получаем резюме - result = await self.db.execute(select(Resume).where(Resume.id == resume_id)) - resume = result.scalar_one_or_none() + resume = await self.resume_repo.get(resume_id) if not resume: return InterviewValidationResponse( @@ -49,12 +51,7 @@ class InterviewRoomService: ) # Проверяем активную сессию только для информации (не блокируем) - result = await self.db.execute( - select(InterviewSession) - .where(InterviewSession.resume_id == resume_id) - .where(InterviewSession.status == "active") - ) - active_session = result.scalar_one_or_none() + active_session = await self.interview_repo.get_active_session_by_resume_id(resume_id) message = "Resume is ready for interview" if active_session: @@ -79,21 +76,12 @@ class InterviewRoomService: timestamp = int(time.time()) room_name = f"interview_{resume_id}_{timestamp}_{unique_id}" - # Создаем сессию в БД - session_data = InterviewSessionCreate( - resume_id=resume_id, - room_name=room_name - ) - - interview_session = InterviewSession(**session_data.model_dump()) - self.db.add(interview_session) - await self.db.commit() - await self.db.refresh(interview_session) + # Создаем сессию в БД через репозиторий + interview_session = await self.interview_repo.create_interview_session(resume_id, room_name) return interview_session except Exception as e: - await self.db.rollback() print(f"Error creating interview session: {str(e)}") return None @@ -125,7 +113,7 @@ class InterviewRoomService: return None # Проверяем, есть ли уже созданная сессия для этого резюме - existing_session = await self.get_interview_session(resume_id) + existing_session = await self.interview_repo.get_active_session_by_resume_id(resume_id) if existing_session: # Используем существующую сессию interview_session = existing_session @@ -148,7 +136,7 @@ class InterviewRoomService: interview_plan = await self.get_resume_data_for_interview(resume_id) # Обновляем статус сессии на ACTIVE - await self.update_session_status(interview_session.id, "active") + await self.interview_repo.update_session_status(interview_session.id, "active") # Запускаем AI агента для этой сессии await self.start_ai_interviewer(interview_session, interview_plan) @@ -165,41 +153,11 @@ class InterviewRoomService: async def update_session_status(self, session_id: int, status: str) -> bool: """Обновляет статус сессии собеседования""" - try: - result = await self.db.execute( - select(InterviewSession).where(InterviewSession.id == session_id) - ) - session = result.scalar_one_or_none() - - if not session: - return False - - session.status = status - if status == "completed": - session.completed_at = datetime.utcnow() - - await self.db.commit() - return True - - except Exception as e: - await self.db.rollback() - print(f"Error updating session status: {str(e)}") - return False + return await self.interview_repo.update_session_status(session_id, status) async def get_interview_session(self, resume_id: int) -> Optional[InterviewSession]: """Получает активную сессию собеседования для резюме""" - try: - result = await self.db.execute( - select(InterviewSession) - .where(InterviewSession.resume_id == resume_id) - .where(InterviewSession.status.in_(["created", "active"])) - .order_by(InterviewSession.started_at.desc()) - ) - return result.scalar_one_or_none() - - except Exception as e: - print(f"Error getting interview session: {str(e)}") - return None + return await self.interview_repo.get_active_session_by_resume_id(resume_id) async def start_ai_interviewer(self, interview_session: InterviewSession, interview_plan: dict): """Запускает AI интервьюера для сессии""" @@ -210,11 +168,14 @@ class InterviewRoomService: f"ai_interviewer_{interview_session.id}" ) - # Подготавливаем метаданные с планом интервью - room_metadata = json.dumps({ - "interview_plan": interview_plan, - "session_id": interview_session.id - }) + # Сохраняем метаданные во временный файл для избежания проблем с кодировкой + import tempfile + metadata_file = f"interview_metadata_{interview_session.id}.json" + with open(metadata_file, 'w', encoding='utf-8') as f: + json.dump({ + "interview_plan": interview_plan, + "session_id": interview_session.id + }, f, ensure_ascii=False, indent=2) # Запускаем AI агента в отдельном процессе agent_cmd = [ @@ -231,10 +192,11 @@ class InterviewRoomService: # Устанавливаем переменные окружения env = os.environ.copy() env.update({ - "LIVEKIT_ROOM_METADATA": room_metadata, + "INTERVIEW_METADATA_FILE": metadata_file, "OPENAI_API_KEY": settings.openai_api_key or "", "DEEPGRAM_API_KEY": settings.deepgram_api_key or "", "CARTESIA_API_KEY": settings.cartesia_api_key or "", + "PYTHONIOENCODING": "utf-8", }) # Запускаем процесс в фоне @@ -251,7 +213,7 @@ class InterviewRoomService: print(f"[DEBUG] Started AI interviewer process {process.pid} for session {interview_session.id}") # Сохраняем PID процесса в БД для управления - await self.update_agent_process_info( + await self.interview_repo.update_ai_agent_status( interview_session.id, process.pid, "running" @@ -260,7 +222,7 @@ class InterviewRoomService: except Exception as e: print(f"Error starting AI interviewer: {str(e)}") # Обновляем статус на failed - await self.update_agent_process_info( + await self.interview_repo.update_ai_agent_status( interview_session.id, None, "failed" @@ -270,10 +232,7 @@ class InterviewRoomService: """Получает готовый план интервью из базы данных""" try: # Получаем резюме с готовым планом интервью - result = await self.db.execute( - select(Resume).where(Resume.id == resume_id) - ) - resume = result.scalar_one_or_none() + resume = await self.resume_repo.get(resume_id) if not resume: return self._get_fallback_interview_plan() @@ -361,46 +320,16 @@ class InterviewRoomService: async def update_agent_process_info(self, session_id: int, pid: int = None, status: str = "not_started") -> bool: """Обновляет информацию о процессе AI агента""" - try: - result = await self.db.execute( - select(InterviewSession).where(InterviewSession.id == session_id) - ) - session = result.scalar_one_or_none() - - if not session: - return False - - session.ai_agent_pid = pid - session.ai_agent_status = status - - await self.db.commit() - return True - - except Exception as e: - await self.db.rollback() - print(f"Error updating agent process info: {str(e)}") - return False + return await self.interview_repo.update_ai_agent_status(session_id, pid, status) async def get_active_agent_processes(self) -> list: """Получает список активных AI процессов""" - try: - result = await self.db.execute( - select(InterviewSession) - .where(InterviewSession.ai_agent_status == "running") - ) - return result.scalars().all() - - except Exception as e: - print(f"Error getting active processes: {str(e)}") - return [] + return await self.interview_repo.get_sessions_with_running_agents() async def stop_agent_process(self, session_id: int) -> bool: """Останавливает AI процесс для сессии""" try: - result = await self.db.execute( - select(InterviewSession).where(InterviewSession.id == session_id) - ) - session = result.scalar_one_or_none() + session = await self.interview_repo.get(session_id) if not session or not session.ai_agent_pid: return False @@ -423,14 +352,14 @@ class InterviewRoomService: process.kill() # Обновляем статус в БД - await self.update_agent_process_info(session_id, None, "stopped") + await self.interview_repo.update_ai_agent_status(session_id, None, "stopped") print(f"Stopped AI agent process {session.ai_agent_pid} for session {session_id}") return True except (psutil.NoSuchProcess, psutil.AccessDenied): # Процесс уже не существует - await self.update_agent_process_info(session_id, None, "stopped") + await self.interview_repo.update_ai_agent_status(session_id, None, "stopped") return True except Exception as e: @@ -450,10 +379,10 @@ class InterviewRoomService: try: process = psutil.Process(session.ai_agent_pid) if not process.is_running(): - await self.update_agent_process_info(session.id, None, "stopped") + await self.interview_repo.update_ai_agent_status(session.id, None, "stopped") cleaned_count += 1 except psutil.NoSuchProcess: - await self.update_agent_process_info(session.id, None, "stopped") + await self.interview_repo.update_ai_agent_status(session.id, None, "stopped") cleaned_count += 1 print(f"Cleaned up {cleaned_count} dead processes") diff --git a/app/services/resume_service.py b/app/services/resume_service.py index 0b3a3a3..2b443bc 100644 --- a/app/services/resume_service.py +++ b/app/services/resume_service.py @@ -1,12 +1,12 @@ -from typing import List, Optional -from sqlalchemy.ext.asyncio import AsyncSession +from typing import List, Optional, Annotated +from fastapi import Depends from app.models.resume import Resume, ResumeCreate, ResumeUpdate, ResumeStatus from app.repositories.resume_repository import ResumeRepository class ResumeService: - def __init__(self, session: AsyncSession): - self.repository = ResumeRepository(session) + def __init__(self, repository: Annotated[ResumeRepository, Depends(ResumeRepository)]): + self.repository = repository async def create_resume(self, resume_data: ResumeCreate) -> Resume: resume = Resume.model_validate(resume_data) diff --git a/app/services/vacancy_service.py b/app/services/vacancy_service.py index 94f0404..d332f35 100644 --- a/app/services/vacancy_service.py +++ b/app/services/vacancy_service.py @@ -1,12 +1,12 @@ -from typing import List, Optional -from sqlalchemy.ext.asyncio import AsyncSession +from typing import List, Optional, Annotated +from fastapi import Depends from app.models.vacancy import Vacancy, VacancyCreate, VacancyUpdate from app.repositories.vacancy_repository import VacancyRepository class VacancyService: - def __init__(self, session: AsyncSession): - self.repository = VacancyRepository(session) + def __init__(self, repository: Annotated[VacancyRepository, Depends(VacancyRepository)]): + self.repository = repository async def create_vacancy(self, vacancy_data: VacancyCreate) -> Vacancy: vacancy = Vacancy.model_validate(vacancy_data) diff --git a/celery_worker/database.py b/celery_worker/database.py index e36bbfd..24dec71 100644 --- a/celery_worker/database.py +++ b/celery_worker/database.py @@ -8,7 +8,8 @@ from rag.settings import settings sync_engine = create_engine( settings.database_url.replace("asyncpg", "psycopg2"), # Убираем asyncpg для синхронного подключения echo=False, - future=True + future=True, + connect_args={"client_encoding": "utf8"} # Принудительно UTF-8 ) # Создаем синхронный session maker @@ -59,7 +60,6 @@ class SyncResumeRepository: resume.status = ResumeStatus.PARSED if parsed_data: resume.parsed_data = parsed_data - # НЕ перезаписываем контактные данные из формы - они уже правильные elif status == 'failed': resume.status = ResumeStatus.PARSE_FAILED if error_message: @@ -84,4 +84,30 @@ class SyncResumeRepository: self.session.add(resume) return resume - return None \ No newline at end of file + return None + + def _normalize_utf8_dict(self, data): + """Нормализует UTF-8 в словаре рекурсивно""" + import json + + # Сериализуем в JSON с ensure_ascii=False, потом парсим обратно + # Это принудительно конвертирует все unicode escape sequences в нормальные символы + try: + json_str = json.dumps(data, ensure_ascii=False, separators=(',', ':')) + return json.loads(json_str) + except (TypeError, ValueError): + # Fallback - рекурсивная обработка + if isinstance(data, dict): + return {key: self._normalize_utf8_dict(value) for key, value in data.items()} + elif isinstance(data, list): + return [self._normalize_utf8_dict(item) for item in data] + elif isinstance(data, str): + try: + # Пытаемся декодировать unicode escape sequences + if '\\u' in data: + return data.encode().decode('unicode_escape') + return data + except (UnicodeDecodeError, UnicodeEncodeError): + return data + else: + return data \ No newline at end of file diff --git a/celery_worker/interview_analysis_task.py b/celery_worker/interview_analysis_task.py new file mode 100644 index 0000000..3cf034c --- /dev/null +++ b/celery_worker/interview_analysis_task.py @@ -0,0 +1,633 @@ +# -*- coding: utf-8 -*- +import json +import logging +from datetime import datetime +from typing import Dict, Any, List, Optional + +from celery import shared_task +from rag.settings import settings +from celery_worker.database import get_sync_session, SyncResumeRepository + +logger = logging.getLogger(__name__) + + +@shared_task +def generate_interview_report(resume_id: int): + """ + Комплексная оценка кандидата на основе резюме, вакансии и диалога интервью + + Args: + resume_id: ID резюме для анализа + + Returns: + dict: Полный отчет с оценками и рекомендациями + """ + logger.info(f"[INTERVIEW_ANALYSIS] Starting analysis for resume_id: {resume_id}") + + try: + with get_sync_session() as db: + repo = SyncResumeRepository(db) + + # Получаем данные резюме + resume = repo.get_by_id(resume_id) + if not resume: + logger.error(f"[INTERVIEW_ANALYSIS] Resume {resume_id} not found") + return {"error": "Resume not found"} + + # Получаем данные вакансии (если нет - используем пустые данные) + vacancy = _get_vacancy_data(db, resume.vacancy_id) + if not vacancy: + logger.warning(f"[INTERVIEW_ANALYSIS] Vacancy {resume.vacancy_id} not found, using empty vacancy data") + vacancy = { + 'id': resume.vacancy_id, + 'title': 'Неизвестная позиция', + 'description': 'Описание недоступно', + 'requirements': [], + 'skills_required': [], + 'experience_level': 'middle' + } + + # Получаем историю интервью + interview_session = _get_interview_session(db, resume_id) + + # Парсим JSON данные + parsed_resume = _parse_json_field(resume.parsed_data) + interview_plan = _parse_json_field(resume.interview_plan) + dialogue_history = _parse_json_field(interview_session.dialogue_history) if interview_session else [] + + # Генерируем отчет + report = _generate_comprehensive_report( + resume_id=resume_id, + candidate_name=resume.applicant_name, + vacancy=vacancy, + parsed_resume=parsed_resume, + interview_plan=interview_plan, + dialogue_history=dialogue_history + ) + + # Сохраняем отчет в БД + _save_report_to_db(db, resume_id, report) + + logger.info(f"[INTERVIEW_ANALYSIS] Analysis completed for resume_id: {resume_id}, score: {report['overall_score']}") + return report + + except Exception as e: + logger.error(f"[INTERVIEW_ANALYSIS] Error analyzing resume {resume_id}: {str(e)}") + return {"error": str(e)} + + +def _get_vacancy_data(db, vacancy_id: int) -> Optional[Dict]: + """Получить данные вакансии""" + try: + from app.models.vacancy import Vacancy + vacancy = db.query(Vacancy).filter(Vacancy.id == vacancy_id).first() + if vacancy: + # Парсим key_skills в список, если это строка + key_skills = [] + if vacancy.key_skills: + if isinstance(vacancy.key_skills, str): + # Разделяем по запятым и очищаем от пробелов + key_skills = [skill.strip() for skill in vacancy.key_skills.split(',') if skill.strip()] + elif isinstance(vacancy.key_skills, list): + key_skills = vacancy.key_skills + + # Маппинг Experience enum в строку уровня опыта + experience_mapping = { + 'noExperience': 'junior', + 'between1And3': 'junior', + 'between3And6': 'middle', + 'moreThan6': 'senior' + } + experience_level = experience_mapping.get(vacancy.experience, 'middle') + + return { + 'id': vacancy.id, + 'title': vacancy.title, + 'description': vacancy.description, + 'requirements': [vacancy.description] if vacancy.description else [], # Используем описание как требования + 'skills_required': key_skills, + 'experience_level': experience_level, + 'employment_type': vacancy.employment_type, + 'salary_range': f"{vacancy.salary_from or 0}-{vacancy.salary_to or 0}" if vacancy.salary_from or vacancy.salary_to else None + } + return None + except Exception as e: + logger.error(f"Error getting vacancy data: {e}") + return None + + +def _get_interview_session(db, resume_id: int): + """Получить сессию интервью""" + try: + from app.models.interview import InterviewSession + return db.query(InterviewSession).filter(InterviewSession.resume_id == resume_id).first() + except Exception as e: + logger.error(f"Error getting interview session: {e}") + return None + + +def _parse_json_field(field_data) -> Dict: + """Безопасный парсинг JSON поля""" + if field_data is None: + return {} + if isinstance(field_data, dict): + return field_data + if isinstance(field_data, str): + try: + return json.loads(field_data) + except (json.JSONDecodeError, TypeError): + return {} + return {} + + +def _generate_comprehensive_report( + resume_id: int, + candidate_name: str, + vacancy: Dict, + parsed_resume: Dict, + interview_plan: Dict, + dialogue_history: List[Dict] +) -> Dict[str, Any]: + """ + Генерирует комплексный отчет о кандидате с использованием LLM + """ + + # Подготавливаем контекст для анализа + context = _prepare_analysis_context( + vacancy=vacancy, + parsed_resume=parsed_resume, + interview_plan=interview_plan, + dialogue_history=dialogue_history + ) + + # Генерируем оценку через OpenAI + evaluation = _call_openai_for_evaluation(context) + + # Формируем финальный отчет + report = { + "resume_id": resume_id, + "candidate_name": candidate_name, + "position": vacancy.get('title', 'Unknown Position'), + "interview_date": datetime.utcnow().isoformat(), + "analysis_context": { + "has_parsed_resume": bool(parsed_resume), + "has_interview_plan": bool(interview_plan), + "dialogue_messages_count": len(dialogue_history), + "vacancy_requirements_count": len(vacancy.get('requirements', [])) + } + } + + # Добавляем результаты оценки + if evaluation: + # Убеждаемся, что есть overall_score + if 'overall_score' not in evaluation: + evaluation['overall_score'] = _calculate_overall_score(evaluation) + + report.update(evaluation) + else: + # Fallback оценка, если LLM не сработал + report.update(_generate_fallback_evaluation( + parsed_resume, vacancy, dialogue_history + )) + + return report + + +def _calculate_overall_score(evaluation: Dict) -> int: + """Вычисляет общий балл как среднее арифметическое всех критериев""" + try: + scores = evaluation.get('scores', {}) + if not scores: + return 50 # Default score + + total_score = 0 + count = 0 + + for criterion_name, criterion_data in scores.items(): + if isinstance(criterion_data, dict) and 'score' in criterion_data: + total_score += criterion_data['score'] + count += 1 + + if count == 0: + return 50 # Default if no valid scores + + overall = int(total_score / count) + return max(0, min(100, overall)) # Ensure 0-100 range + + except Exception: + return 50 # Safe fallback + + +def _prepare_analysis_context( + vacancy: Dict, + parsed_resume: Dict, + interview_plan: Dict, + dialogue_history: List[Dict] +) -> str: + """Подготавливает контекст для анализа LLM""" + + # Собираем диалог интервью + dialogue_text = "" + if dialogue_history: + dialogue_messages = [] + for msg in dialogue_history[-20:]: # Последние 20 сообщений + role = msg.get('role', 'unknown') + content = msg.get('content', '') + dialogue_messages.append(f"{role.upper()}: {content}") + dialogue_text = "\n".join(dialogue_messages) + + # Формируем контекст + context = f""" +АНАЛИЗ КАНДИДАТА НА СОБЕСЕДОВАНИЕ + +ВАКАНСИЯ: +- Позиция: {vacancy.get('title', 'Не указана')} +- Описание: {vacancy.get('description', 'Не указано')[:500]} +- Требования: {', '.join(vacancy.get('requirements', []))} +- Требуемые навыки: {', '.join(vacancy.get('skills_required', []))} +- Уровень опыта: {vacancy.get('experience_level', 'middle')} + +РЕЗЮМЕ КАНДИДАТА: +- Имя: {parsed_resume.get('name', 'Не указано')} +- Опыт работы: {parsed_resume.get('total_years', 'Не указано')} лет +- Навыки: {', '.join(parsed_resume.get('skills', []))} +- Образование: {parsed_resume.get('education', 'Не указано')} +- Предыдущие позиции: {'; '.join([pos.get('title', '') + ' в ' + pos.get('company', '') for pos in parsed_resume.get('work_experience', [])])} + +ПЛАН ИНТЕРВЬЮ: +{json.dumps(interview_plan, ensure_ascii=False, indent=2) if interview_plan else 'План интервью не найден'} + +ДИАЛОГ ИНТЕРВЬЮ: +{dialogue_text if dialogue_text else 'Диалог интервью не найден или пуст'} +""" + + return context + + +def _call_openai_for_evaluation(context: str) -> Optional[Dict]: + """Вызывает OpenAI для генерации оценки""" + + if not settings.openai_api_key: + logger.warning("OpenAI API key not configured, skipping LLM evaluation") + return None + + try: + import openai + openai.api_key = settings.openai_api_key + + evaluation_prompt = f""" +{context} + +ЗАДАЧА: +Проанализируй кандидата и дай оценку по критериям (0-100): +1. technical_skills: Соответствие техническим требованиям +2. experience_relevance: Релевантность опыта +3. communication: Коммуникативные навыки (на основе диалога) +4. problem_solving: Навыки решения задач +5. cultural_fit: Соответствие корпоративной культуре + +Для каждого критерия: +- score: оценка 0-100 +- justification: обоснование с примерами из резюме/интервью +- concerns: возможные риски + +Дай итоговую рекомендацию: +- strongly_recommend (90-100) +- recommend (70-89) +- consider (50-69) +- reject (0-49) + +Вычисли ОБЩИЙ БАЛЛ (overall_score) от 0 до 100 как среднее арифметическое всех 5 критериев. + +И топ 3 сильные/слабые стороны. + +ОТВЕТЬ СТРОГО В JSON ФОРМАТЕ с обязательными полями: +- scores: объект с 5 критериями, каждый содержит score, justification, concerns +- overall_score: число от 0 до 100 (среднее арифметическое всех scores) +- recommendation: одно из 4 значений выше +- strengths: массив из 3 сильных сторон +- weaknesses: массив из 3 слабых сторон +""" + + response = openai.chat.completions.create( + model="gpt-4o-mini", + messages=[{"role": "user", "content": evaluation_prompt}], + response_format={"type": "json_object"}, + temperature=0.3 + ) + + evaluation = json.loads(response.choices[0].message.content) + logger.info(f"[INTERVIEW_ANALYSIS] OpenAI evaluation completed") + return evaluation + + except Exception as e: + logger.error(f"[INTERVIEW_ANALYSIS] Error calling OpenAI: {str(e)}") + return None + + +def _generate_fallback_evaluation( + parsed_resume: Dict, + vacancy: Dict, + dialogue_history: List[Dict] +) -> Dict[str, Any]: + """Генерирует базовую оценку без LLM""" + + # Простая эвристическая оценка + technical_score = _calculate_technical_match(parsed_resume, vacancy) + experience_score = _calculate_experience_score(parsed_resume, vacancy) + communication_score = 70 # Средняя оценка, если нет диалога + + if dialogue_history: + communication_score = min(90, 50 + len(dialogue_history) * 2) # Больше диалога = лучше коммуникация + + overall_score = (technical_score + experience_score + communication_score) // 3 + + # Определяем рекомендацию + if overall_score >= 90: + recommendation = "strongly_recommend" + elif overall_score >= 70: + recommendation = "recommend" + elif overall_score >= 50: + recommendation = "consider" + else: + recommendation = "reject" + + return { + "scores": { + "technical_skills": { + "score": technical_score, + "justification": f"Соответствие по навыкам: {technical_score}%", + "concerns": "Автоматическая оценка без анализа LLM" + }, + "experience_relevance": { + "score": experience_score, + "justification": f"Опыт работы: {parsed_resume.get('total_years', 0)} лет", + "concerns": "Требуется ручная проверка релевантности опыта" + }, + "communication": { + "score": communication_score, + "justification": f"Активность в диалоге: {len(dialogue_history)} сообщений", + "concerns": "Оценка основана на количестве сообщений" + }, + "problem_solving": { + "score": 60, + "justification": "Средняя оценка (нет данных для анализа)", + "concerns": "Требуется техническое интервью" + }, + "cultural_fit": { + "score": 65, + "justification": "Средняя оценка (нет данных для анализа)", + "concerns": "Требуется личная встреча с командой" + } + }, + "overall_score": overall_score, + "recommendation": recommendation, + "strengths": [ + f"Опыт работы: {parsed_resume.get('total_years', 0)} лет", + f"Технические навыки: {len(parsed_resume.get('skills', []))} навыков", + f"Участие в интервью: {len(dialogue_history)} сообщений" + ], + "weaknesses": [ + "Автоматическая оценка без LLM анализа", + "Требуется дополнительное техническое интервью", + "Нет глубокого анализа ответов на вопросы" + ], + "red_flags": [], + "next_steps": "Рекомендуется провести техническое интервью с тимлидом для более точной оценки.", + "analysis_method": "fallback_heuristic" + } + + +def _calculate_technical_match(parsed_resume: Dict, vacancy: Dict) -> int: + """Вычисляет соответствие технических навыков""" + + resume_skills = set([skill.lower() for skill in parsed_resume.get('skills', [])]) + required_skills = set([skill.lower() for skill in vacancy.get('skills_required', [])]) + + if not required_skills: + return 70 # Если требования не указаны + + matching_skills = resume_skills.intersection(required_skills) + match_percentage = (len(matching_skills) / len(required_skills)) * 100 + + return min(100, int(match_percentage)) + + +def _calculate_experience_score(parsed_resume: Dict, vacancy: Dict) -> int: + """Вычисляет оценку релевантности опыта""" + + years_experience = parsed_resume.get('total_years', 0) + required_level = vacancy.get('experience_level', 'middle') + + # Маппинг уровней на годы опыта + level_mapping = { + 'junior': (0, 2), + 'middle': (2, 5), + 'senior': (5, 10), + 'lead': (8, 15) + } + + min_years, max_years = level_mapping.get(required_level, (2, 5)) + + if years_experience < min_years: + # Недостаток опыта + return max(30, int(70 * (years_experience / min_years))) + elif years_experience > max_years: + # Переквалификация + return max(60, int(90 - (years_experience - max_years) * 5)) + else: + # Подходящий опыт + return 90 + + +def _save_report_to_db(db, resume_id: int, report: Dict): + """Сохраняет отчет в базу данных в таблицу interview_reports""" + + try: + from app.models.interview import InterviewSession + from app.models.interview_report import InterviewReport, RecommendationType + + # Находим сессию интервью по resume_id + interview_session = db.query(InterviewSession).filter( + InterviewSession.resume_id == resume_id + ).first() + + if not interview_session: + logger.warning(f"[INTERVIEW_ANALYSIS] No interview session found for resume_id: {resume_id}") + return + + # Проверяем, есть ли уже отчет для этой сессии + existing_report = db.query(InterviewReport).filter( + InterviewReport.interview_session_id == interview_session.id + ).first() + + if existing_report: + logger.info(f"[INTERVIEW_ANALYSIS] Updating existing report for session: {interview_session.id}") + # Обновляем существующий отчет + _update_report_from_dict(existing_report, report) + existing_report.updated_at = datetime.utcnow() + db.add(existing_report) + else: + logger.info(f"[INTERVIEW_ANALYSIS] Creating new report for session: {interview_session.id}") + # Создаем новый отчет + new_report = _create_report_from_dict(interview_session.id, report) + db.add(new_report) + + logger.info(f"[INTERVIEW_ANALYSIS] Report saved for resume_id: {resume_id}, session: {interview_session.id}") + + except Exception as e: + logger.error(f"[INTERVIEW_ANALYSIS] Error saving report: {str(e)}") + + +def _create_report_from_dict(interview_session_id: int, report: Dict) -> 'InterviewReport': + """Создает объект InterviewReport из словаря отчета""" + from app.models.interview_report import InterviewReport, RecommendationType + + # Извлекаем баллы по критериям + scores = report.get('scores', {}) + + return InterviewReport( + interview_session_id=interview_session_id, + + # Основные критерии оценки + technical_skills_score=scores.get('technical_skills', {}).get('score', 0), + technical_skills_justification=scores.get('technical_skills', {}).get('justification', ''), + technical_skills_concerns=scores.get('technical_skills', {}).get('concerns', ''), + + experience_relevance_score=scores.get('experience_relevance', {}).get('score', 0), + experience_relevance_justification=scores.get('experience_relevance', {}).get('justification', ''), + experience_relevance_concerns=scores.get('experience_relevance', {}).get('concerns', ''), + + communication_score=scores.get('communication', {}).get('score', 0), + communication_justification=scores.get('communication', {}).get('justification', ''), + communication_concerns=scores.get('communication', {}).get('concerns', ''), + + problem_solving_score=scores.get('problem_solving', {}).get('score', 0), + problem_solving_justification=scores.get('problem_solving', {}).get('justification', ''), + problem_solving_concerns=scores.get('problem_solving', {}).get('concerns', ''), + + cultural_fit_score=scores.get('cultural_fit', {}).get('score', 0), + cultural_fit_justification=scores.get('cultural_fit', {}).get('justification', ''), + cultural_fit_concerns=scores.get('cultural_fit', {}).get('concerns', ''), + + # Агрегированные поля + overall_score=report.get('overall_score', 0), + recommendation=RecommendationType(report.get('recommendation', 'reject')), + + # Дополнительные поля + strengths=report.get('strengths', []), + weaknesses=report.get('weaknesses', []), + red_flags=report.get('red_flags', []), + + # Метрики интервью + dialogue_messages_count=report.get('analysis_context', {}).get('dialogue_messages_count', 0), + + # Дополнительная информация + next_steps=report.get('next_steps', ''), + questions_analysis=report.get('questions_analysis', []), + + # Метаданные анализа + analysis_method=report.get('analysis_method', 'openai_gpt4'), + ) + + +def _update_report_from_dict(existing_report, report: Dict): + """Обновляет существующий отчет данными из словаря""" + from app.models.interview_report import RecommendationType + + scores = report.get('scores', {}) + + # Основные критерии оценки + if 'technical_skills' in scores: + existing_report.technical_skills_score = scores['technical_skills'].get('score', 0) + existing_report.technical_skills_justification = scores['technical_skills'].get('justification', '') + existing_report.technical_skills_concerns = scores['technical_skills'].get('concerns', '') + + if 'experience_relevance' in scores: + existing_report.experience_relevance_score = scores['experience_relevance'].get('score', 0) + existing_report.experience_relevance_justification = scores['experience_relevance'].get('justification', '') + existing_report.experience_relevance_concerns = scores['experience_relevance'].get('concerns', '') + + if 'communication' in scores: + existing_report.communication_score = scores['communication'].get('score', 0) + existing_report.communication_justification = scores['communication'].get('justification', '') + existing_report.communication_concerns = scores['communication'].get('concerns', '') + + if 'problem_solving' in scores: + existing_report.problem_solving_score = scores['problem_solving'].get('score', 0) + existing_report.problem_solving_justification = scores['problem_solving'].get('justification', '') + existing_report.problem_solving_concerns = scores['problem_solving'].get('concerns', '') + + if 'cultural_fit' in scores: + existing_report.cultural_fit_score = scores['cultural_fit'].get('score', 0) + existing_report.cultural_fit_justification = scores['cultural_fit'].get('justification', '') + existing_report.cultural_fit_concerns = scores['cultural_fit'].get('concerns', '') + + # Агрегированные поля + if 'overall_score' in report: + existing_report.overall_score = report['overall_score'] + + if 'recommendation' in report: + existing_report.recommendation = RecommendationType(report['recommendation']) + + # Дополнительные поля + if 'strengths' in report: + existing_report.strengths = report['strengths'] + + if 'weaknesses' in report: + existing_report.weaknesses = report['weaknesses'] + + if 'red_flags' in report: + existing_report.red_flags = report['red_flags'] + + # Метрики интервью + if 'analysis_context' in report: + existing_report.dialogue_messages_count = report['analysis_context'].get('dialogue_messages_count', 0) + + # Дополнительная информация + if 'next_steps' in report: + existing_report.next_steps = report['next_steps'] + + if 'questions_analysis' in report: + existing_report.questions_analysis = report['questions_analysis'] + + # Метаданные анализа + if 'analysis_method' in report: + existing_report.analysis_method = report['analysis_method'] + + +# Дополнительная задача для массового анализа +@shared_task +def analyze_multiple_candidates(resume_ids: List[int]): + """ + Анализирует несколько кандидатов и возвращает их рейтинг + + Args: + resume_ids: Список ID резюме для анализа + + Returns: + List[Dict]: Список кандидатов с оценками, отсортированный по рейтингу + """ + logger.info(f"[MASS_ANALYSIS] Starting analysis for {len(resume_ids)} candidates") + + results = [] + + for resume_id in resume_ids: + try: + result = generate_interview_report(resume_id) + if 'error' not in result: + results.append({ + 'resume_id': resume_id, + 'candidate_name': result.get('candidate_name', 'Unknown'), + 'overall_score': result.get('overall_score', 0), + 'recommendation': result.get('recommendation', 'reject'), + 'position': result.get('position', 'Unknown') + }) + except Exception as e: + logger.error(f"[MASS_ANALYSIS] Error analyzing resume {resume_id}: {str(e)}") + + # Сортируем по общему баллу + results.sort(key=lambda x: x['overall_score'], reverse=True) + + logger.info(f"[MASS_ANALYSIS] Completed analysis for {len(results)} candidates") + return results \ No newline at end of file diff --git a/celery_worker/tasks.py b/celery_worker/tasks.py index c524a3b..592fe15 100644 --- a/celery_worker/tasks.py +++ b/celery_worker/tasks.py @@ -9,6 +9,9 @@ from celery_worker.database import get_sync_session, SyncResumeRepository from rag.llm.model import ResumeParser from rag.registry import registry +# Импортируем новые задачи анализа интервью +from celery_worker.interview_analysis_task import generate_interview_report, analyze_multiple_candidates + def generate_interview_plan(resume_id: int, combined_data: Dict[str, Any]) -> Dict[str, Any]: """Генерирует план интервью на основе резюме и вакансии""" diff --git a/main.py b/main.py index 350d0bd..2d00da3 100644 --- a/main.py +++ b/main.py @@ -5,6 +5,8 @@ from app.core.session_middleware import SessionMiddleware from app.routers import vacancy_router, resume_router from app.routers.session_router import router as session_router from app.routers.interview_router import router as interview_router +from app.routers.analysis_router import router as analysis_router +from app.routers.admin_router import router as admin_router @asynccontextmanager @@ -34,6 +36,8 @@ app.include_router(vacancy_router, prefix="/api/v1") app.include_router(resume_router, prefix="/api/v1") app.include_router(session_router, prefix="/api/v1") app.include_router(interview_router, prefix="/api/v1") +app.include_router(analysis_router, prefix="/api/v1") +app.include_router(admin_router, prefix="/api/v1") @app.get("/") diff --git a/migrations/versions/772538626a9e_revert_json_fields_back_to_json_type.py b/migrations/versions/772538626a9e_revert_json_fields_back_to_json_type.py new file mode 100644 index 0000000..99cbfb8 --- /dev/null +++ b/migrations/versions/772538626a9e_revert_json_fields_back_to_json_type.py @@ -0,0 +1,48 @@ +"""revert json fields back to json type + +Revision ID: 772538626a9e +Revises: a816820baadb +Create Date: 2025-09-04 00:02:15.230498 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = '772538626a9e' +down_revision: Union[str, Sequence[str], None] = 'a816820baadb' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Upgrade schema.""" + # Convert TEXT fields back to JSON for proper dict handling + op.execute(""" + ALTER TABLE resume + ALTER COLUMN parsed_data TYPE JSON USING parsed_data::JSON, + ALTER COLUMN interview_plan TYPE JSON USING interview_plan::JSON + """) + + op.execute(""" + ALTER TABLE interview_sessions + ALTER COLUMN dialogue_history TYPE JSON USING dialogue_history::JSON + """) + + +def downgrade() -> None: + """Downgrade schema.""" + # Convert JSON fields back to TEXT + op.execute(""" + ALTER TABLE resume + ALTER COLUMN parsed_data TYPE TEXT USING parsed_data::TEXT, + ALTER COLUMN interview_plan TYPE TEXT USING interview_plan::TEXT + """) + + op.execute(""" + ALTER TABLE interview_sessions + ALTER COLUMN dialogue_history TYPE TEXT USING dialogue_history::TEXT + """) diff --git a/migrations/versions/9c60c15f7846_add_interview_reports_table_with_.py b/migrations/versions/9c60c15f7846_add_interview_reports_table_with_.py new file mode 100644 index 0000000..20fe869 --- /dev/null +++ b/migrations/versions/9c60c15f7846_add_interview_reports_table_with_.py @@ -0,0 +1,99 @@ +"""add interview reports table with scoring fields + +Revision ID: 9c60c15f7846 +Revises: 772538626a9e +Create Date: 2025-09-04 12:16:56.495018 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = '9c60c15f7846' +down_revision: Union[str, Sequence[str], None] = '772538626a9e' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Create interview reports table with scoring fields.""" + + # Create enum type for recommendation + op.execute("CREATE TYPE recommendationtype AS ENUM ('STRONGLY_RECOMMEND', 'RECOMMEND', 'CONSIDER', 'REJECT')") + + # Create interview_reports table + op.execute(""" + CREATE TABLE interview_reports ( + id SERIAL PRIMARY KEY, + interview_session_id INTEGER NOT NULL UNIQUE, + + -- Core scoring criteria (0-100) + technical_skills_score INTEGER NOT NULL CHECK (technical_skills_score >= 0 AND technical_skills_score <= 100), + technical_skills_justification VARCHAR(1000), + technical_skills_concerns VARCHAR(500), + + experience_relevance_score INTEGER NOT NULL CHECK (experience_relevance_score >= 0 AND experience_relevance_score <= 100), + experience_relevance_justification VARCHAR(1000), + experience_relevance_concerns VARCHAR(500), + + communication_score INTEGER NOT NULL CHECK (communication_score >= 0 AND communication_score <= 100), + communication_justification VARCHAR(1000), + communication_concerns VARCHAR(500), + + problem_solving_score INTEGER NOT NULL CHECK (problem_solving_score >= 0 AND problem_solving_score <= 100), + problem_solving_justification VARCHAR(1000), + problem_solving_concerns VARCHAR(500), + + cultural_fit_score INTEGER NOT NULL CHECK (cultural_fit_score >= 0 AND cultural_fit_score <= 100), + cultural_fit_justification VARCHAR(1000), + cultural_fit_concerns VARCHAR(500), + + -- Aggregated fields + overall_score INTEGER NOT NULL CHECK (overall_score >= 0 AND overall_score <= 100), + recommendation recommendationtype NOT NULL, + + -- Analysis arrays + strengths JSON, + weaknesses JSON, + red_flags JSON, + + -- Interview metrics + questions_quality_score FLOAT CHECK (questions_quality_score >= 0 AND questions_quality_score <= 10), + interview_duration_minutes INTEGER CHECK (interview_duration_minutes >= 0), + response_count INTEGER CHECK (response_count >= 0), + dialogue_messages_count INTEGER CHECK (dialogue_messages_count >= 0), + + -- Additional info + next_steps VARCHAR(1000), + interviewer_notes TEXT, + questions_analysis JSON, + + -- Analysis metadata + analysis_method VARCHAR(50) DEFAULT 'openai_gpt4', + llm_model_used VARCHAR(100), + analysis_duration_seconds INTEGER CHECK (analysis_duration_seconds >= 0), + + -- Timestamps + created_at TIMESTAMP NOT NULL DEFAULT NOW(), + updated_at TIMESTAMP NOT NULL DEFAULT NOW(), + + -- Foreign key + FOREIGN KEY (interview_session_id) REFERENCES interview_sessions(id) + ) + """) + + # Create useful indexes + op.execute("CREATE INDEX idx_interview_reports_overall_score ON interview_reports (overall_score DESC)") + op.execute("CREATE INDEX idx_interview_reports_recommendation ON interview_reports (recommendation)") + op.execute("CREATE INDEX idx_interview_reports_technical_skills ON interview_reports (technical_skills_score DESC)") + op.execute("CREATE INDEX idx_interview_reports_communication ON interview_reports (communication_score DESC)") + op.execute("CREATE INDEX idx_interview_reports_session_id ON interview_reports (interview_session_id)") + + +def downgrade() -> None: + """Drop interview reports table.""" + op.execute("DROP TABLE IF EXISTS interview_reports") + op.execute("DROP TYPE IF EXISTS recommendationtype") diff --git a/migrations/versions/9d415bf0ff2e_add_dialogue_history_column_only.py b/migrations/versions/9d415bf0ff2e_add_dialogue_history_column_only.py new file mode 100644 index 0000000..43d101b --- /dev/null +++ b/migrations/versions/9d415bf0ff2e_add_dialogue_history_column_only.py @@ -0,0 +1,45 @@ +"""add dialogue_history column only + +Revision ID: 9d415bf0ff2e +Revises: 53d8b753cb71 +Create Date: 2025-09-03 18:04:49.726882 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = '9d415bf0ff2e' +down_revision: Union[str, Sequence[str], None] = 'c2d48b31ee30' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Upgrade schema.""" + # Сначала создаем таблицу interview_sessions (если была удалена) + op.create_table('interview_sessions', + sa.Column('resume_id', sa.Integer(), nullable=False), + sa.Column('room_name', sa.String(length=255), nullable=False), + sa.Column('status', sa.Enum('created', 'active', 'completed', 'failed', name='interviewstatus', create_type=False), nullable=True), + sa.Column('transcript', sa.Text(), nullable=True), + sa.Column('ai_feedback', sa.Text(), nullable=True), + sa.Column('dialogue_history', sa.JSON(), nullable=True), + sa.Column('ai_agent_pid', sa.Integer(), nullable=True), + sa.Column('ai_agent_status', sa.String(), nullable=False), + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('started_at', sa.DateTime(), nullable=False), + sa.Column('completed_at', sa.DateTime(), nullable=True), + sa.ForeignKeyConstraint(['resume_id'], ['resume.id'], ), + sa.PrimaryKeyConstraint('id'), + sa.UniqueConstraint('room_name') + ) + + +def downgrade() -> None: + """Downgrade schema.""" + # Удаляем всю таблицу + op.drop_table('interview_sessions') diff --git a/migrations/versions/a816820baadb_change_json_fields_to_text_for_proper_.py b/migrations/versions/a816820baadb_change_json_fields_to_text_for_proper_.py new file mode 100644 index 0000000..1829c87 --- /dev/null +++ b/migrations/versions/a816820baadb_change_json_fields_to_text_for_proper_.py @@ -0,0 +1,59 @@ +"""change json fields to text for proper utf8 + +Revision ID: a816820baadb +Revises: c9bcdd2ddeeb +Create Date: 2025-09-03 23:45:13.221735 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = 'a816820baadb' +down_revision: Union[str, Sequence[str], None] = 'c9bcdd2ddeeb' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Change JSON fields to TEXT for proper UTF-8 handling.""" + # Convert JSON fields to TEXT for proper UTF-8 storage + op.execute(""" + ALTER TABLE resume + ALTER COLUMN parsed_data TYPE TEXT USING parsed_data::TEXT, + ALTER COLUMN interview_plan TYPE TEXT USING interview_plan::TEXT + """) + + op.execute(""" + ALTER TABLE interview_sessions + ALTER COLUMN dialogue_history TYPE TEXT USING dialogue_history::TEXT + """) + + # Also fix status column + op.alter_column('interview_sessions', 'status', + existing_type=sa.VARCHAR(length=50), + nullable=False, + existing_server_default=sa.text("'created'::character varying")) + + +def downgrade() -> None: + """Convert TEXT fields back to JSON.""" + # Convert TEXT fields back to JSON + op.execute(""" + ALTER TABLE resume + ALTER COLUMN parsed_data TYPE JSON USING parsed_data::JSON, + ALTER COLUMN interview_plan TYPE JSON USING interview_plan::JSON + """) + + op.execute(""" + ALTER TABLE interview_sessions + ALTER COLUMN dialogue_history TYPE JSON USING dialogue_history::JSON + """) + + op.alter_column('interview_sessions', 'status', + existing_type=sa.VARCHAR(length=50), + nullable=True, + existing_server_default=sa.text("'created'::character varying")) diff --git a/migrations/versions/c2d48b31ee30_add_interview_dialogue_and_analysis_.py b/migrations/versions/c2d48b31ee30_add_interview_dialogue_and_analysis_.py new file mode 100644 index 0000000..c2d8e4b --- /dev/null +++ b/migrations/versions/c2d48b31ee30_add_interview_dialogue_and_analysis_.py @@ -0,0 +1,48 @@ +"""add interview dialogue and analysis tables + +Revision ID: c2d48b31ee30 +Revises: de11b016b35a +Create Date: 2025-09-03 17:55:41.653125 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision: str = 'c2d48b31ee30' +down_revision: Union[str, Sequence[str], None] = 'de11b016b35a' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Upgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index(op.f('ix_interview_sessions_id'), table_name='interview_sessions') + op.drop_table('interview_sessions') + # ### end Alembic commands ### + + +def downgrade() -> None: + """Downgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('interview_sessions', + sa.Column('id', sa.INTEGER(), autoincrement=True, nullable=False), + sa.Column('resume_id', sa.INTEGER(), autoincrement=False, nullable=False), + sa.Column('room_name', sa.VARCHAR(length=255), autoincrement=False, nullable=False), + sa.Column('status', postgresql.ENUM('created', 'active', 'completed', 'failed', name='interviewstatus'), autoincrement=False, nullable=False), + sa.Column('transcript', sa.TEXT(), autoincrement=False, nullable=True), + sa.Column('ai_feedback', sa.TEXT(), autoincrement=False, nullable=True), + sa.Column('started_at', postgresql.TIMESTAMP(), autoincrement=False, nullable=False), + sa.Column('completed_at', postgresql.TIMESTAMP(), autoincrement=False, nullable=True), + sa.Column('ai_agent_pid', sa.INTEGER(), autoincrement=False, nullable=True), + sa.Column('ai_agent_status', sa.VARCHAR(), server_default=sa.text("'not_started'::character varying"), autoincrement=False, nullable=False), + sa.ForeignKeyConstraint(['resume_id'], ['resume.id'], name=op.f('interview_sessions_resume_id_fkey')), + sa.PrimaryKeyConstraint('id', name=op.f('interview_sessions_pkey')), + sa.UniqueConstraint('room_name', name=op.f('interview_sessions_room_name_key'), postgresql_include=[], postgresql_nulls_not_distinct=False) + ) + op.create_index(op.f('ix_interview_sessions_id'), 'interview_sessions', ['id'], unique=False) + # ### end Alembic commands ### diff --git a/migrations/versions/c9bcdd2ddeeb_recreate_interview_sessions_with_.py b/migrations/versions/c9bcdd2ddeeb_recreate_interview_sessions_with_.py new file mode 100644 index 0000000..aef54cd --- /dev/null +++ b/migrations/versions/c9bcdd2ddeeb_recreate_interview_sessions_with_.py @@ -0,0 +1,46 @@ +"""recreate interview_sessions with varchar status + +Revision ID: c9bcdd2ddeeb +Revises: 9d415bf0ff2e +Create Date: 2025-09-03 18:07:59.433986 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = 'c9bcdd2ddeeb' +down_revision: Union[str, Sequence[str], None] = '9d415bf0ff2e' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Upgrade schema.""" + # Создаем таблицу interview_sessions заново + op.execute("DROP TABLE IF EXISTS interview_sessions CASCADE") + + op.create_table('interview_sessions', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('resume_id', sa.Integer(), nullable=False), + sa.Column('room_name', sa.String(length=255), nullable=False), + sa.Column('status', sa.String(50), nullable=True, server_default='created'), + sa.Column('transcript', sa.Text(), nullable=True), + sa.Column('ai_feedback', sa.Text(), nullable=True), + sa.Column('dialogue_history', sa.JSON(), nullable=True), + sa.Column('ai_agent_pid', sa.Integer(), nullable=True), + sa.Column('ai_agent_status', sa.String(50), nullable=False, server_default='not_started'), + sa.Column('started_at', sa.DateTime(), nullable=False, server_default=sa.text('CURRENT_TIMESTAMP')), + sa.Column('completed_at', sa.DateTime(), nullable=True), + sa.ForeignKeyConstraint(['resume_id'], ['resume.id'], ), + sa.PrimaryKeyConstraint('id'), + sa.UniqueConstraint('room_name') + ) + + +def downgrade() -> None: + """Downgrade schema.""" + op.drop_table('interview_sessions')