Skip to content

#48 — Audit Trail Middleware

PLANIFIÉ

Priorité: 🔴 CRITIQUE · Type: TYPE B · Conteneur: rgz-api · Code: app/middleware/audit.pyDépendances: #1 rgz-api, #4 rgz-db


Description

Middleware FastAPI qui enregistre chaque requête HTTP vers /api/v1/* dans une table audit_logs. Chaque log contient : qui (user_id, user_ref, reseller_id), quoi (method, path, resource_id), quand (timestamp), d'où (IP, User-Agent), et la réponse (status_code, response_time). Aucune donnée sensible dans le log (MSISDN, montants) — seulement l'ID et l'action.

Le middleware s'active sur TOUTES les routes /api/v1/ SAUF /api/v1/health (health check) et /api/v1/docs (Swagger). Les requêtes GET vers endpoints de lecture sont loggées (audit), les POST/PUT/DELETE sont loggés (modification). Les erreurs (400, 403, 500) sont loggées aussi pour traçabilité fraude.

Les logs sont disponibles via endpoint GET /api/v1/audit avec filtrage par user/action/date/ressource. Intégration ELK (#40) pour searchabilité long-term (12 mois).

Architecture Interne

1. Cycle de requête avec middleware audit:

   Client → FastAPI:
   ├─> Middleware reçoit request: GET /api/v1/subscribers/mine
   ├─> Extract metadata:
   │   ├─ request.method = "GET"
   │   ├─ request.url.path = "/api/v1/subscribers/mine"
   │   ├─ request.client.host = "192.168.1.100"
   │   ├─ request.headers.get('user-agent') = "curl/7.85.0"
   │   ├─ token decode → current_user.id = "sub_xxx", current_user.reseller_id = "res_yyy"
   │   └─ extract path params, query params
   ├─> Appelle endpoint → réponse (avant log)
   ├─> Collecte réponse:
   │   ├─ response.status_code = 200
   │   ├─ response_time_ms = 125
   │   ├─ response size = 2048 bytes
   │   └─ Parse response pour extract resource_id si possible
   └─> INSERT audit_logs:
       ├─ user_id = "sub_xxx"
       ├─ resource_type = "subscribers"
       ├─ resource_id = None (query, pas spécifique)
       ├─ action = "read"
       ├─ request_method = "GET"
       ├─ request_path = "/api/v1/subscribers/mine"
       ├─ response_status = 200
       ├─ response_time_ms = 125
       ├─ ip_address = "192.168.1.100"
       └─ created_at = NOW()

2. Logique extraction resource type + action:

   GET   /api/v1/subscribers/{id}      → resource_type=subscribers, resource_id=id, action=read
   GET   /api/v1/subscribers           → resource_type=subscribers, resource_id=NULL, action=list
   POST  /api/v1/subscribers           → resource_type=subscribers, resource_id=NULL, action=create
   PUT   /api/v1/subscribers/{id}      → resource_type=subscribers, resource_id=id, action=update
   DELETE /api/v1/subscribers/{id}     → resource_type=subscribers, resource_id=id, action=delete
   POST  /api/v1/payments/webhook      → resource_type=payments, resource_id=NULL, action=webhook

3. Skiplist (routes sans audit):
   └─> /api/v1/health           → health check, trop de logs
   └─> /api/v1/docs             → Swagger UI
   └─> /api/v1/openapi.json     → OpenAPI schema
   └─> /api/v1/auth/refresh     → token refresh (loggé mais pas logguer token!)
   └─> /.well-known/            → ACME challenge

4. Sensitive data masking:
   ├─> Request body: ne pas logguer MSISDN, montants, IDs documents
   │   Exemple: {amount: "***", msisdn: "MASKED"}
   ├─> Response: ne pas logguer MSISDN, token de session
   │   Exemple: {subscriber_id: "xxx", msisdn: "MASKED"}
   └─> Headers: ne pas logguer Authorization header

5. Audit trail pour données personnelles:
   └─> Requête: GET /api/v1/subscribers/by-msisdn/0197979964
       ├─> user_id = "res_admin_yyy" (admin revendeur)
       ├─> action = "read_personal_data"
       ├─> resource_type = "subscribers"
       ├─> resource_id = "sub_xxx"  (extrait de la réponse)
       ├─> request_path = "/api/v1/subscribers/by-msisdn/{msisdn_hash}"  (MSISDN pas en clair)
       └─> Cet accès est loggé pour #46 APDP (accès données)

6. Intégration ELK:
   └─> Logstash @ config/logstash/pipelines/:
       ├─> Input: PostgreSQL jdbc (poll toutes les 5min)
       ├─> Filter: parse user_id, action, status
       ├─> Output: Elasticsearch audit-logs-YYYY-MM (rolling index)
       └─> Kibana: dashboards pour trending, alertes
           ├─ "Top 10 users par requête"
           ├─ "Requêtes 403/404 (unauthorized)"
           ├─ "Modifications de configuration par admin"
           └─ "Export données personnelles (APDP audit)"

7. Alertes basées audit trail:
   └─> Si >10 GET 403 du même user en 5min → suspension compte
   └─> Si DELETE ressource critique → SMS admin
   └─> Si accès données personnelles en masse (>100 subscribers) → flag étrange
   └─> Si admin exporte >1000 rows CSV → log spécial

Configuration

Variables d'environnement

env
AUDIT_ENABLED=true                    # Activation middleware
AUDIT_SKIP_PATHS=/health,/docs        # Comma-separated
AUDIT_MASK_SENSITIVE=true             # Masquer MSISDN, montants
AUDIT_LOG_REQUEST_BODY=true           # Logger request body (sauf password)
AUDIT_LOG_RESPONSE_BODY=false         # Logger response body (dangereux pour montants)
AUDIT_RETENTION_DAYS=365              # 12 mois
AUDIT_ELASTICSEARCH_ENABLED=true      # Envoyer vers ELK

Model SQLAlchemy

python
# app/models/compliance.py

class AuditLog(Base):
    __tablename__ = "audit_logs"

    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid4)

    # Qui
    user_id = Column(UUID(as_uuid=True), nullable=True)  # NULL = system/webhook
    user_ref = Column(String(50), nullable=True)  # "RGZ-0197979964" ou "admin_xyz"
    reseller_id = Column(UUID(as_uuid=True), nullable=True)  # Filtered data per reseller

    # Quoi
    resource_type = Column(String(50), nullable=False)  # subscribers, payments, vlans, etc.
    resource_id = Column(UUID(as_uuid=True), nullable=True)  # Spécifique resource
    action = Column(String(50), nullable=False,
        CheckConstraint("action IN ('read', 'list', 'create', 'update', 'delete', 'webhook', 'export', 'other')"))

    # Requête
    request_method = Column(String(10), nullable=False)  # GET, POST, PUT, DELETE
    request_path = Column(String(500), nullable=False)  # /api/v1/...
    request_query = Column(String(1000), nullable=True)  # Query string (masqué)
    request_body = Column(JSON, nullable=True)  # Masked sensitive fields

    # Réponse
    response_status = Column(Integer, nullable=False)  # 200, 201, 400, 403, 500
    response_time_ms = Column(Integer, nullable=False)  # Latency
    response_body_size = Column(Integer, nullable=True)  # Bytes

    # Réseau
    ip_address = Column(String(45), nullable=False)  # IPv4 ou IPv6
    user_agent = Column(String(255), nullable=True)

    # Audit
    created_at = Column(DateTime, default=utcnow, nullable=False, index=True)

    __table_args__ = (
        Index('ix_user_id_created', 'user_id', 'created_at'),
        Index('ix_resource_type_created', 'resource_type', 'created_at'),
        Index('ix_action_created', 'action', 'created_at'),
        Index('ix_response_status_created', 'response_status', 'created_at'),
    )

Endpoints API

MéthodeRouteRéponseAuth
GET/api/v1/audit?user_id=&action=&from=&to=&page=1200 {items: [...], total, pages}Admin
GET/api/v1/audit/{log_id}200 détail logAdmin
GET/api/v1/audit/stats200 {top_users, top_actions, errors_24h}Admin
GET/api/v1/audit/export?from=&to=200 CSVAdmin

Schémas Pydantic

python
# app/schemas/compliance.py

class AuditLogSchema(BaseModel):
    id: UUID
    user_id: Optional[UUID]
    user_ref: Optional[str]
    resource_type: str
    resource_id: Optional[UUID]
    action: str
    request_method: str
    request_path: str
    response_status: int
    response_time_ms: int
    ip_address: str
    created_at: datetime

class AuditFilterRequest(BaseModel):
    user_id: Optional[UUID] = None
    action: Optional[str] = None
    resource_type: Optional[str] = None
    from_date: Optional[datetime] = None
    to_date: Optional[datetime] = None
    response_status: Optional[int] = None
    page: int = 1
    limit: int = 50

Commandes Utiles

bash
# Voir tous les logs audit (dernières 24h)
curl -X GET 'http://localhost:8000/api/v1/audit?from=2026-02-20&to=2026-02-21' \
  -H "Authorization: Bearer $ADMIN_TOKEN" | jq

# Filtrer par user
curl -X GET 'http://localhost:8000/api/v1/audit?user_id=sub_xxx&from=2026-02-20' \
  -H "Authorization: Bearer $ADMIN_TOKEN" | jq

# Filtrer par action (modifications)
curl -X GET 'http://localhost:8000/api/v1/audit?action=update&action=delete&from=2026-02-20' \
  -H "Authorization: Bearer $ADMIN_TOKEN" | jq

# Filtrer par erreurs
curl -X GET 'http://localhost:8000/api/v1/audit?response_status=403&from=2026-02-20' \
  -H "Authorization: Bearer $ADMIN_TOKEN" | jq

# Voir stats
curl -X GET 'http://localhost:8000/api/v1/audit/stats' \
  -H "Authorization: Bearer $ADMIN_TOKEN" | jq

# Exporter CSV (audit externe)
curl -X GET 'http://localhost:8000/api/v1/audit/export?from=2026-02-01&to=2026-02-28' \
  -H "Authorization: Bearer $ADMIN_TOKEN" \
  --output audit_feb_2026.csv

# Requête DB: 10 logs récents
psql -h localhost -U rgz_admin -d rgz_db -c \
  "SELECT user_id, action, request_path, response_status, created_at
   FROM audit_logs ORDER BY created_at DESC LIMIT 10;"

# Requête DB: filtrer par user_id
psql -h localhost -U rgz_admin -d rgz_db -c \
  "SELECT action, request_path, response_status, created_at
   FROM audit_logs WHERE user_id = 'sub_xxx' ORDER BY created_at DESC LIMIT 20;"

Implémentation TODO

  • [ ] Classe middleware AuditMiddleware dans app/middleware/audit.py
  • [ ] Fonction extract_request_info() (method, path, IP, user)
  • [ ] Fonction extract_response_info() (status, time, size)
  • [ ] Fonction parse_resource_from_path() (identifier resource_type + resource_id)
  • [ ] Fonction mask_sensitive_data() (masquer MSISDN, montants)
  • [ ] Middleware registration dans app/main.py
  • [ ] Skiplist routes sans audit (/health, /docs, etc.)
  • [ ] Endpoint GET /api/v1/audit avec filtres
  • [ ] Endpoint GET /api/v1/audit/stats pour dashboards
  • [ ] Endpoint GET /api/v1/audit/export pour CSV audit
  • [ ] Intégration Logstash → Elasticsearch (5min polling)
  • [ ] Tests: vérifier logs créés, filtrages, masking données sensibles
  • [ ] Documentation: "que fait le middleware, quoi on peut voir, confidentialité"
  • [ ] Alertes Kibana: 403 spike, delete mass, export masse
  • [ ] Intégration #45 logs immuables : ne pas dupliquer immuable_logs

Dernière mise à jour: 2026-02-21

PROJET MOSAÏQUE — 81 outils, 22 conteneurs, 500+ revendeurs WiFi Zone