Files

107 lines
3.6 KiB
Python

"""Endpoints для Skill Tests (сертификация навыков)."""
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.core.database import get_db
from app.core.security import get_current_user
from app.models.skill_test import SkillTest, SkillTestResult
from app.models.user import User
router = APIRouter(prefix="/api/skill-tests", tags=["skill-tests"])
@router.get("")
async def list_tests(db: AsyncSession = Depends(get_db)):
"""Список доступных тестов навыков."""
result = await db.execute(select(SkillTest))
tests = result.scalars().all()
return [
{
"id": str(t.id),
"name": t.name,
"category": t.category,
"questions_count": t.questions_count,
"passing_score": t.passing_score,
} for t in tests
]
@router.post("/take/{test_id}")
async def take_test(test_id: str, user: dict = Depends(get_current_user), db: AsyncSession = Depends(get_db)):
"""Начать тест навыков."""
result = await db.execute(select(SkillTest).where(SkillTest.id == test_id))
test = result.scalar_one_or_none()
if not test:
raise HTTPException(status_code=404, detail="Тест не найден")
# Генерация вопросов (placeholder — в продакшене реальные вопросы)
questions = [
{"id": i + 1, "question": f"Вопрос {i+1} по теме '{test.name}'", "options": ["A", "B", "C", "D"], "correct": 0}
for i in range(test.questions_count)
]
return {"test_id": str(test.id), "questions": questions, "time_limit_minutes": test.questions_count * 2}
@router.post("/submit/{test_id}")
async def submit_test(test_id: str, answers: dict, user: dict = Depends(get_current_user), db: AsyncSession = Depends(get_db)):
"""Отправить ответы на тест."""
result = await db.execute(select(SkillTest).where(SkillTest.id == test_id))
test = result.scalar_one_or_none()
if not test:
raise HTTPException(status_code=404, detail="Тест не найден")
# Подсчёт баллов (placeholder)
score = 75.0 # В продакшене реальный подсчёт
passed = score >= test.passing_score
result2 = await db.execute(select(SkillTestResult).where(
SkillTestResult.user_id == user["id"],
SkillTestResult.skill_test_id == test_id,
))
existing = result2.scalar_one_or_none()
if existing:
existing.score = score
existing.passed = passed
existing.completed_at = None # TODO: datetime.now(timezone.utc)
else:
new_result = SkillTestResult(
user_id=user["id"],
skill_test_id=test_id,
score=score,
passed=passed,
completed_at=None,
)
db.add(new_result)
await db.commit()
return {"score": score, "passed": passed}
@router.get("/user/{user_id}")
async def get_user_tests(user_id: str, db: AsyncSession = Depends(get_db)):
"""Получить результаты тестов пользователя."""
result = await db.execute(select(SkillTestResult).where(SkillTestResult.user_id == user_id))
results = result.scalars().all()
return [
{
"test_name": r.skill_test.name if hasattr(r, 'skill_test') else "Unknown",
"score": r.score,
"passed": r.passed,
"completed_at": str(r.completed_at) if r.completed_at else None,
} for r in results
]