Files
AuditShield/backend/api/audits.py
Vincent 0fe1a1b751 feat: Phase 1 — socle backend FastAPI + frontend Next.js
Backend (FastAPI + SQLAlchemy):
- Modèles : User, Client, Audit, Cible, Vulnérabilité, Action
- Auth JWT (register/login/me) avec bcrypt
- Routes CRUD complets : clients, audits, cibles, vulnérabilités, actions
- Schémas Pydantic v2, migrations Alembic configurées
- Rate limiting (slowapi), CORS, structure scanners/reports pour phase 2

Frontend (Next.js 14 App Router):
- shadcn/ui : Button, Input, Card, Badge, Label
- Page login avec gestion token JWT
- Dashboard avec stats temps réel
- Pages Clients (grille) et Audits (liste) avec recherche
- Layout avec sidebar navigation + protection auth
- Dockerfiles multi-stage (backend + frontend standalone)

Infrastructure:
- docker-compose.yml : postgres, redis, backend, frontend
- docker-compose.prod.yml avec labels Traefik
- .env.example complet
- .gitignore mis à jour

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-21 17:16:12 +01:00

192 lines
5.3 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session, selectinload
from backend.core.database import get_db
from backend.core.security import get_current_user
from backend.models.audit import Audit
from backend.models.target import Cible
from backend.models.vulnerability import Vulnerabilite
from backend.models.action import Action
from backend.models.user import User
from backend.schemas.audit import (
AuditCreate,
AuditUpdate,
AuditRead,
AuditDetail,
CibleCreate,
CibleRead,
VulnerabiliteCreate,
VulnerabiliteRead,
ActionCreate,
ActionUpdate,
ActionRead,
)
router = APIRouter(prefix="/audits", tags=["audits"])
@router.get("/", response_model=list[AuditRead])
def list_audits(
client_id: int | None = None,
skip: int = 0,
limit: int = 50,
db: Session = Depends(get_db),
_: User = Depends(get_current_user),
) -> list[Audit]:
q = db.query(Audit)
if client_id:
q = q.filter(Audit.client_id == client_id)
return q.offset(skip).limit(limit).all()
@router.post("/", response_model=AuditRead, status_code=status.HTTP_201_CREATED)
def create_audit(
payload: AuditCreate,
db: Session = Depends(get_db),
_: User = Depends(get_current_user),
) -> Audit:
audit = Audit(**payload.model_dump())
db.add(audit)
db.commit()
db.refresh(audit)
return audit
@router.get("/{audit_id}", response_model=AuditDetail)
def get_audit(
audit_id: int,
db: Session = Depends(get_db),
_: User = Depends(get_current_user),
) -> Audit:
audit = (
db.query(Audit)
.options(selectinload(Audit.cibles), selectinload(Audit.vulnerabilites))
.filter(Audit.id == audit_id)
.first()
)
if not audit:
raise HTTPException(status_code=404, detail="Audit introuvable")
return audit
@router.patch("/{audit_id}", response_model=AuditRead)
def update_audit(
audit_id: int,
payload: AuditUpdate,
db: Session = Depends(get_db),
_: User = Depends(get_current_user),
) -> Audit:
audit = db.get(Audit, audit_id)
if not audit:
raise HTTPException(status_code=404, detail="Audit introuvable")
for field, value in payload.model_dump(exclude_unset=True).items():
setattr(audit, field, value)
db.commit()
db.refresh(audit)
return audit
@router.delete("/{audit_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_audit(
audit_id: int,
db: Session = Depends(get_db),
_: User = Depends(get_current_user),
) -> None:
audit = db.get(Audit, audit_id)
if not audit:
raise HTTPException(status_code=404, detail="Audit introuvable")
db.delete(audit)
db.commit()
# --- Cibles ---
@router.post("/{audit_id}/cibles", response_model=CibleRead, status_code=status.HTTP_201_CREATED)
def add_cible(
audit_id: int,
payload: CibleCreate,
db: Session = Depends(get_db),
_: User = Depends(get_current_user),
) -> Cible:
if not db.get(Audit, audit_id):
raise HTTPException(status_code=404, detail="Audit introuvable")
cible = Cible(audit_id=audit_id, **payload.model_dump())
db.add(cible)
db.commit()
db.refresh(cible)
return cible
@router.patch("/{audit_id}/cibles/{cible_id}/valider", response_model=CibleRead)
def valider_cible(
audit_id: int,
cible_id: int,
db: Session = Depends(get_db),
_: User = Depends(get_current_user),
) -> Cible:
cible = db.query(Cible).filter(Cible.id == cible_id, Cible.audit_id == audit_id).first()
if not cible:
raise HTTPException(status_code=404, detail="Cible introuvable")
cible.validee = True
db.commit()
db.refresh(cible)
return cible
# --- Vulnérabilités ---
@router.post("/{audit_id}/vulnerabilites", response_model=VulnerabiliteRead, status_code=status.HTTP_201_CREATED)
def add_vulnerabilite(
audit_id: int,
payload: VulnerabiliteCreate,
db: Session = Depends(get_db),
_: User = Depends(get_current_user),
) -> Vulnerabilite:
if not db.get(Audit, audit_id):
raise HTTPException(status_code=404, detail="Audit introuvable")
vuln = Vulnerabilite(audit_id=audit_id, **payload.model_dump())
db.add(vuln)
db.commit()
db.refresh(vuln)
return vuln
# --- Actions ---
@router.post("/{audit_id}/actions", response_model=ActionRead, status_code=status.HTTP_201_CREATED)
def add_action(
audit_id: int,
payload: ActionCreate,
db: Session = Depends(get_db),
_: User = Depends(get_current_user),
) -> Action:
vuln = db.get(Vulnerabilite, payload.vulnerabilite_id)
if not vuln or vuln.audit_id != audit_id:
raise HTTPException(status_code=404, detail="Vulnérabilité introuvable dans cet audit")
action = Action(**payload.model_dump())
db.add(action)
db.commit()
db.refresh(action)
return action
@router.patch("/{audit_id}/actions/{action_id}", response_model=ActionRead)
def update_action(
audit_id: int,
action_id: int,
payload: ActionUpdate,
db: Session = Depends(get_db),
_: User = Depends(get_current_user),
) -> Action:
action = db.get(Action, action_id)
if not action:
raise HTTPException(status_code=404, detail="Action introuvable")
for field, value in payload.model_dump(exclude_unset=True).items():
setattr(action, field, value)
db.commit()
db.refresh(action)
return action