#48 — Audit Trail Middleware
PLANIFIÉ
Priorité: 🔴 CRITIQUE · Type: TYPE B · Conteneur: rgz-api · Code: app/middleware/audit.pyDépendances: #1 rgz-api, #4 rgz-db
Description
Middleware FastAPI qui enregistre chaque requête HTTP vers /api/v1/* dans une table audit_logs. Chaque log contient : qui (user_id, user_ref, reseller_id), quoi (method, path, resource_id), quand (timestamp), d'où (IP, User-Agent), et la réponse (status_code, response_time). Aucune donnée sensible dans le log (MSISDN, montants) — seulement l'ID et l'action.
Le middleware s'active sur TOUTES les routes /api/v1/ SAUF /api/v1/health (health check) et /api/v1/docs (Swagger). Les requêtes GET vers endpoints de lecture sont loggées (audit), les POST/PUT/DELETE sont loggés (modification). Les erreurs (400, 403, 500) sont loggées aussi pour traçabilité fraude.
Les logs sont disponibles via endpoint GET /api/v1/audit avec filtrage par user/action/date/ressource. Intégration ELK (#40) pour searchabilité long-term (12 mois).
Architecture Interne
1. Cycle de requête avec middleware audit:
Client → FastAPI:
├─> Middleware reçoit request: GET /api/v1/subscribers/mine
├─> Extract metadata:
│ ├─ request.method = "GET"
│ ├─ request.url.path = "/api/v1/subscribers/mine"
│ ├─ request.client.host = "192.168.1.100"
│ ├─ request.headers.get('user-agent') = "curl/7.85.0"
│ ├─ token decode → current_user.id = "sub_xxx", current_user.reseller_id = "res_yyy"
│ └─ extract path params, query params
├─> Appelle endpoint → réponse (avant log)
├─> Collecte réponse:
│ ├─ response.status_code = 200
│ ├─ response_time_ms = 125
│ ├─ response size = 2048 bytes
│ └─ Parse response pour extract resource_id si possible
└─> INSERT audit_logs:
├─ user_id = "sub_xxx"
├─ resource_type = "subscribers"
├─ resource_id = None (query, pas spécifique)
├─ action = "read"
├─ request_method = "GET"
├─ request_path = "/api/v1/subscribers/mine"
├─ response_status = 200
├─ response_time_ms = 125
├─ ip_address = "192.168.1.100"
└─ created_at = NOW()
2. Logique extraction resource type + action:
GET /api/v1/subscribers/{id} → resource_type=subscribers, resource_id=id, action=read
GET /api/v1/subscribers → resource_type=subscribers, resource_id=NULL, action=list
POST /api/v1/subscribers → resource_type=subscribers, resource_id=NULL, action=create
PUT /api/v1/subscribers/{id} → resource_type=subscribers, resource_id=id, action=update
DELETE /api/v1/subscribers/{id} → resource_type=subscribers, resource_id=id, action=delete
POST /api/v1/payments/webhook → resource_type=payments, resource_id=NULL, action=webhook
3. Skiplist (routes sans audit):
└─> /api/v1/health → health check, trop de logs
└─> /api/v1/docs → Swagger UI
└─> /api/v1/openapi.json → OpenAPI schema
└─> /api/v1/auth/refresh → token refresh (loggé mais pas logguer token!)
└─> /.well-known/ → ACME challenge
4. Sensitive data masking:
├─> Request body: ne pas logguer MSISDN, montants, IDs documents
│ Exemple: {amount: "***", msisdn: "MASKED"}
├─> Response: ne pas logguer MSISDN, token de session
│ Exemple: {subscriber_id: "xxx", msisdn: "MASKED"}
└─> Headers: ne pas logguer Authorization header
5. Audit trail pour données personnelles:
└─> Requête: GET /api/v1/subscribers/by-msisdn/0197979964
├─> user_id = "res_admin_yyy" (admin revendeur)
├─> action = "read_personal_data"
├─> resource_type = "subscribers"
├─> resource_id = "sub_xxx" (extrait de la réponse)
├─> request_path = "/api/v1/subscribers/by-msisdn/{msisdn_hash}" (MSISDN pas en clair)
└─> Cet accès est loggé pour #46 APDP (accès données)
6. Intégration ELK:
└─> Logstash @ config/logstash/pipelines/:
├─> Input: PostgreSQL jdbc (poll toutes les 5min)
├─> Filter: parse user_id, action, status
├─> Output: Elasticsearch audit-logs-YYYY-MM (rolling index)
└─> Kibana: dashboards pour trending, alertes
├─ "Top 10 users par requête"
├─ "Requêtes 403/404 (unauthorized)"
├─ "Modifications de configuration par admin"
└─ "Export données personnelles (APDP audit)"
7. Alertes basées audit trail:
└─> Si >10 GET 403 du même user en 5min → suspension compte
└─> Si DELETE ressource critique → SMS admin
└─> Si accès données personnelles en masse (>100 subscribers) → flag étrange
└─> Si admin exporte >1000 rows CSV → log spécialConfiguration
Variables d'environnement
AUDIT_ENABLED=true # Activation middleware
AUDIT_SKIP_PATHS=/health,/docs # Comma-separated
AUDIT_MASK_SENSITIVE=true # Masquer MSISDN, montants
AUDIT_LOG_REQUEST_BODY=true # Logger request body (sauf password)
AUDIT_LOG_RESPONSE_BODY=false # Logger response body (dangereux pour montants)
AUDIT_RETENTION_DAYS=365 # 12 mois
AUDIT_ELASTICSEARCH_ENABLED=true # Envoyer vers ELKModel SQLAlchemy
# app/models/compliance.py
class AuditLog(Base):
__tablename__ = "audit_logs"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid4)
# Qui
user_id = Column(UUID(as_uuid=True), nullable=True) # NULL = system/webhook
user_ref = Column(String(50), nullable=True) # "RGZ-0197979964" ou "admin_xyz"
reseller_id = Column(UUID(as_uuid=True), nullable=True) # Filtered data per reseller
# Quoi
resource_type = Column(String(50), nullable=False) # subscribers, payments, vlans, etc.
resource_id = Column(UUID(as_uuid=True), nullable=True) # Spécifique resource
action = Column(String(50), nullable=False,
CheckConstraint("action IN ('read', 'list', 'create', 'update', 'delete', 'webhook', 'export', 'other')"))
# Requête
request_method = Column(String(10), nullable=False) # GET, POST, PUT, DELETE
request_path = Column(String(500), nullable=False) # /api/v1/...
request_query = Column(String(1000), nullable=True) # Query string (masqué)
request_body = Column(JSON, nullable=True) # Masked sensitive fields
# Réponse
response_status = Column(Integer, nullable=False) # 200, 201, 400, 403, 500
response_time_ms = Column(Integer, nullable=False) # Latency
response_body_size = Column(Integer, nullable=True) # Bytes
# Réseau
ip_address = Column(String(45), nullable=False) # IPv4 ou IPv6
user_agent = Column(String(255), nullable=True)
# Audit
created_at = Column(DateTime, default=utcnow, nullable=False, index=True)
__table_args__ = (
Index('ix_user_id_created', 'user_id', 'created_at'),
Index('ix_resource_type_created', 'resource_type', 'created_at'),
Index('ix_action_created', 'action', 'created_at'),
Index('ix_response_status_created', 'response_status', 'created_at'),
)Endpoints API
| Méthode | Route | Réponse | Auth |
|---|---|---|---|
| GET | /api/v1/audit?user_id=&action=&from=&to=&page=1 | 200 {items: [...], total, pages} | Admin |
| GET | /api/v1/audit/{log_id} | 200 détail log | Admin |
| GET | /api/v1/audit/stats | 200 {top_users, top_actions, errors_24h} | Admin |
| GET | /api/v1/audit/export?from=&to= | 200 CSV | Admin |
Schémas Pydantic
# app/schemas/compliance.py
class AuditLogSchema(BaseModel):
id: UUID
user_id: Optional[UUID]
user_ref: Optional[str]
resource_type: str
resource_id: Optional[UUID]
action: str
request_method: str
request_path: str
response_status: int
response_time_ms: int
ip_address: str
created_at: datetime
class AuditFilterRequest(BaseModel):
user_id: Optional[UUID] = None
action: Optional[str] = None
resource_type: Optional[str] = None
from_date: Optional[datetime] = None
to_date: Optional[datetime] = None
response_status: Optional[int] = None
page: int = 1
limit: int = 50Commandes Utiles
# Voir tous les logs audit (dernières 24h)
curl -X GET 'http://localhost:8000/api/v1/audit?from=2026-02-20&to=2026-02-21' \
-H "Authorization: Bearer $ADMIN_TOKEN" | jq
# Filtrer par user
curl -X GET 'http://localhost:8000/api/v1/audit?user_id=sub_xxx&from=2026-02-20' \
-H "Authorization: Bearer $ADMIN_TOKEN" | jq
# Filtrer par action (modifications)
curl -X GET 'http://localhost:8000/api/v1/audit?action=update&action=delete&from=2026-02-20' \
-H "Authorization: Bearer $ADMIN_TOKEN" | jq
# Filtrer par erreurs
curl -X GET 'http://localhost:8000/api/v1/audit?response_status=403&from=2026-02-20' \
-H "Authorization: Bearer $ADMIN_TOKEN" | jq
# Voir stats
curl -X GET 'http://localhost:8000/api/v1/audit/stats' \
-H "Authorization: Bearer $ADMIN_TOKEN" | jq
# Exporter CSV (audit externe)
curl -X GET 'http://localhost:8000/api/v1/audit/export?from=2026-02-01&to=2026-02-28' \
-H "Authorization: Bearer $ADMIN_TOKEN" \
--output audit_feb_2026.csv
# Requête DB: 10 logs récents
psql -h localhost -U rgz_admin -d rgz_db -c \
"SELECT user_id, action, request_path, response_status, created_at
FROM audit_logs ORDER BY created_at DESC LIMIT 10;"
# Requête DB: filtrer par user_id
psql -h localhost -U rgz_admin -d rgz_db -c \
"SELECT action, request_path, response_status, created_at
FROM audit_logs WHERE user_id = 'sub_xxx' ORDER BY created_at DESC LIMIT 20;"Implémentation TODO
- [ ] Classe middleware
AuditMiddlewaredansapp/middleware/audit.py - [ ] Fonction
extract_request_info()(method, path, IP, user) - [ ] Fonction
extract_response_info()(status, time, size) - [ ] Fonction
parse_resource_from_path()(identifier resource_type + resource_id) - [ ] Fonction
mask_sensitive_data()(masquer MSISDN, montants) - [ ] Middleware registration dans
app/main.py - [ ] Skiplist routes sans audit (/health, /docs, etc.)
- [ ] Endpoint GET /api/v1/audit avec filtres
- [ ] Endpoint GET /api/v1/audit/stats pour dashboards
- [ ] Endpoint GET /api/v1/audit/export pour CSV audit
- [ ] Intégration Logstash → Elasticsearch (5min polling)
- [ ] Tests: vérifier logs créés, filtrages, masking données sensibles
- [ ] Documentation: "que fait le middleware, quoi on peut voir, confidentialité"
- [ ] Alertes Kibana: 403 spike, delete mass, export masse
- [ ] Intégration #45 logs immuables : ne pas dupliquer immuable_logs
Dernière mise à jour: 2026-02-21