#45 — Logs SHA-256 Immuables
PLANIFIÉ
Priorité: 🔴 CRITIQUE · Type: TYPE B · Conteneur: rgz-api · Code: app/services/immutable_logs.pyDépendances: #4 rgz-db, #40 elk-stack
Description
Système de logs immuables avec chaîne SHA-256. Chaque événement est hashé avec la donnée précédente, créant une chaîne non-répudiable : hash = SHA256(timestamp + event_data + prev_hash). Cette approche empêche toute modification rétroactive des logs sans détection immédiate. Les événements loggés incluent : toutes les connexions (auth), tous les paiements (montant, MSISDN, timestamp), accès données personnelles (par qui, quand), modifications de configuration, changement de statut abonné, etc.
Les logs sont stockés dans PostgreSQL dans une table append-only (rôle DB sans UPDATE/DELETE), répliquées en temps réel vers Elasticsearch (#40) pour recherche/analyse. Une vérification d'intégrité est exécutée mensuellement : on rejoue tous les hash depuis le début pour s'assurer qu'aucune donnée n'a été altérée. L'audit trail (#48) complète ce système en loggant les actions utilisateur (who/when/what), tandis que les logs immuables garantissent la non-répudiation (ce qui s'est vraiment passé).
Rétention : 12 mois obligatoire pour ARCEP. Archive annuelle vers S3 ou disque (immutable storage).
Architecture Interne
1. Événement système (ex: paiement):
└─> app/api/v1/endpoints/payments.py
├─> POST /api/v1/payments/webhooks/kkiapay
└─> ImmutableLogService.log_event(
event_type='payment',
event_data={
'subscriber_id': 'abcd-1234',
'amount': 5000, # FCFA entier
'provider': 'MTN_MOMO',
'kkiapay_txn_id': 'txn_xyz123',
'timestamp': '2026-02-21T14:30:45Z'
}
)
2. Calcul du hash:
├─> prev_log = db.session.query(ImmutableLog).order_by(id DESC).first()
├─> prev_hash = prev_log.current_hash if prev_log else '0' * 64
├─> event_str = json.dumps(event_data, sort_keys=True)
├─> input = timestamp + event_str + prev_hash
├─> current_hash = hashlib.sha256(input.encode()).hexdigest()
└─> INSERT immutable_logs(event_type, event_data, prev_hash, current_hash)
3. Réplication ELK:
└─> Logstash @ config/logstash/pipelines/:
├─> Input: PostgreSQL jdbc (poll toutes les 5min)
├─> Filter: parse event_data JSON
├─> Output: Elasticsearch immutable-logs-YYYY-MM (rolling index)
└─> Kibana: dashboards pour recherche/audit par date/type
4. Vérification d'intégrité (monthly, Celery task):
└─> app/tasks/compliance.py → function: verify_immutable_logs()
├─> SELECT ALL FROM immutable_logs ORDER BY created_at ASC
├─> Replay: pour chaque log, recalcul hash = SHA256(ts + data + prev)
├─> Comparaison: calculated_hash vs stored_hash
├─> Résultat:
│ ├─ TOUS OK → log "integrity verified" → immutable_logs_verification table
│ └─ MISMATCH → ALERT CRITICAL, disable API auth, appel admin
└─> Report: INSERT integrity_verification_report
{date, total_logs, verified_count, failures: []}
5. Endpoints pour audit:
└─> GET /api/v1/logs/immutable?from=2026-02-01&to=2026-02-21&type=payment
├─> SELECT * FROM immutable_logs
│ WHERE created_at BETWEEN ? AND ? AND event_type = ?
├─> ORDER BY created_at DESC
├─> Pagination: LIMIT 50 OFFSET
└─> Retour JSON avec hashes (transparence)
└─> GET /api/v1/logs/immutable/{log_id}/verify
├─> Récupérer log + prev_log
├─> Recalcul hash: expected = SHA256(ts + data + prev_hash)
├─> Comparaison: expected vs stored
└─> JSON: {valid: true/false, hash: stored, expected: expected, mismatch: false/true}
6. Rétention 12 mois:
└─> Celery task: rgz.logs.archive_expired (monthly, first day)
├─> SELECT WHERE created_at < (TODAY - 365 DAYS)
├─> Dump to CSV/JSON → s3://rgz-backups/immutable-logs-2025-01.tar.gz
├─> DELETE old rows (ou TRUNCATE partition)
└─> Verify backup integrity via separate restoration test
7. Audit du système lui-même:
└─> Qui a accédé les logs immuables?
└─> Ça, c'est loggé dans #48 audit_trail
{user_id, action=read_immutable_log, log_id, timestamp, ip}Configuration
Variables d'environnement
IMMUTABLE_LOG_ENABLED=true # Activation
IMMUTABLE_LOG_RETENTION_DAYS=365 # Rétention (12 mois)
IMMUTABLE_LOG_ARCHIVE_BACKEND=s3 # s3, gcs, local_disk
IMMUTABLE_LOG_ARCHIVE_BUCKET=rgz-backups # S3 bucket
IMMUTABLE_LOG_VERIFY_FREQUENCY=monthly # monthly, weekly
IMMUTABLE_LOG_HASH_ALGORITHM=sha256 # sha256 (uniquement)Model SQLAlchemy
# app/models/compliance.py
class ImmutableLog(Base):
__tablename__ = "immutable_logs"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid4)
event_type = Column(String(50), nullable=False,
CheckConstraint("event_type IN ('subscription', 'auth', 'payment', 'refund', "
"'access_data', 'config_change', 'anomaly', 'incident', 'other')"))
event_timestamp = Column(DateTime, nullable=False) # Timestamp when event occurred
event_data = Column(JSON, nullable=False) # {msisdn, amount, nas_id, ip, ...}
# Hash chain
prev_hash = Column(String(64), nullable=True) # SHA-256 hex (NULL if first)
current_hash = Column(String(64), nullable=False, unique=True) # SHA-256 hex
# Metadata
created_at = Column(DateTime, default=utcnow, nullable=False, index=True)
updated_at = Column(DateTime, default=utcnow, onupdate=utcnow)
# Verification audit
is_verified = Column(Boolean, default=False, nullable=False)
last_verification_at = Column(DateTime, nullable=True)
# Index pour requêtes rapides
__table_args__ = (
Index('ix_event_type_created', 'event_type', 'created_at'),
Index('ix_created_at', 'created_at'),
)
class IntegrityVerificationReport(Base):
__tablename__ = "integrity_verification_reports"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid4)
verification_date = Column(DateTime, nullable=False)
total_logs = Column(Integer, nullable=False)
verified_count = Column(Integer, nullable=False)
failure_count = Column(Integer, nullable=False, default=0)
failures = Column(JSON, nullable=True) # [{log_id, expected_hash, actual_hash, reason}]
status = Column(String(20), nullable=False) # success, partial_failure, failure
created_at = Column(DateTime, default=utcnow, nullable=False)Rôle PostgreSQL (append-only)
-- Créer rôle pour rgz-api
CREATE ROLE rgz_immutable_logs LOGIN PASSWORD '...';
-- Grant: SELECT sur immutable_logs (lecture audit)
GRANT SELECT ON immutable_logs TO rgz_immutable_logs;
-- Grant: INSERT seulement (pas UPDATE/DELETE)
GRANT INSERT ON immutable_logs TO rgz_immutable_logs;
-- Grant: SELECT sur integrity_verification_reports
GRANT SELECT ON integrity_verification_reports TO rgz_immutable_logs;
GRANT INSERT ON integrity_verification_reports TO rgz_immutable_logs;
-- Désactiver explicitement UPDATE/DELETE
REVOKE UPDATE, DELETE ON immutable_logs FROM rgz_immutable_logs;
REVOKE TRUNCATE ON immutable_logs FROM rgz_immutable_logs;Endpoints API
| Méthode | Route | Réponse | Auth |
|---|---|---|---|
| GET | /api/v1/logs/immutable?from=&to=&type=&page=1 | 200 {items: [...], total, page, pages} | Admin |
| GET | /api/v1/logs/immutable/{log_id} | 200 log detail | Admin |
| GET | /api/v1/logs/immutable/{log_id}/verify | 200 {valid, hash, expected, mismatch} | Admin |
| GET | /api/v1/logs/integrity/reports | 200 {items: [...], total} | Admin |
| POST | /api/v1/logs/immutable/manual-verify | 202 {job_id, status} | Admin |
Schémas Pydantic
# app/schemas/compliance.py
class ImmutableLogSchema(BaseModel):
id: UUID
event_type: str
event_timestamp: datetime
event_data: dict
current_hash: str
created_at: datetime
is_verified: bool
last_verification_at: Optional[datetime]
class ImmutableLogVerifyResponse(BaseModel):
log_id: UUID
valid: bool
hash_stored: str
hash_expected: str
mismatch: bool
message: str
class VerificationReportSchema(BaseModel):
verification_date: datetime
total_logs: int
verified_count: int
failure_count: int
status: str
failures: List[dict]Commandes Utiles
# Insérer événement manuellement (CLI test)
python3 -c "
from app.services.immutable_logs import ImmutableLogService
from app.database import SessionLocal
with SessionLocal() as db:
service = ImmutableLogService(db)
service.log_event(
event_type='test',
event_data={'msg': 'hello', 'value': 42}
)
print('Log inserido com sucesso')
"
# Requête DB : voir logs récents
psql -h localhost -U rgz_admin -d rgz_db -c \
"SELECT id, event_type, current_hash, created_at FROM immutable_logs ORDER BY created_at DESC LIMIT 10;"
# Voir hash d'un log spécifique
psql -h localhost -U rgz_admin -d rgz_db -c \
"SELECT current_hash, prev_hash, event_data FROM immutable_logs WHERE id = 'xxx';"
# Tester endpoint API
curl -X GET http://localhost:8000/api/v1/logs/immutable?type=payment&from=2026-02-01 \
-H "Authorization: Bearer $TOKEN" | jq
# Vérifier intégrité un log (endpoint)
curl -X GET http://localhost:8000/api/v1/logs/immutable/xxx/verify \
-H "Authorization: Bearer $TOKEN" | jq
# Lancer vérification manuelle
curl -X POST http://localhost:8000/api/v1/logs/immutable/manual-verify \
-H "Authorization: Bearer $TOKEN"
# Voir rapports de vérification
curl -X GET http://localhost:8000/api/v1/logs/integrity/reports \
-H "Authorization: Bearer $TOKEN" | jq
# Voir logs dans Kibana (si ELK intégré)
# URL: http://kibana:5601/app/discover?index=immutable-logs-*Implémentation TODO
- [ ] Classe
ImmutableLogServiceaveclog_event(event_type, event_data) - [ ] Fonction calcul hash: SHA256(timestamp + json.dumps(data) + prev_hash)
- [ ] Fonction insertion DB: transactionnel, vérifier prev_hash intégrité
- [ ] Endpoint GET /api/v1/logs/immutable avec filtres type/from/to/page
- [ ] Endpoint GET /api/v1/logs/immutable/{id}/verify pour vérifier log spécifique
- [ ] Endpoint POST /api/v1/logs/immutable/manual-verify pour lancer job Celery
- [ ] Celery task: monthly integrity verification (replay tous les hash)
- [ ] Celery task: monthly archive (export + delete old rows)
- [ ] Rôle PostgreSQL append-only (INSERT only, no UPDATE/DELETE)
- [ ] Intégration Logstash → Elasticsearch (5min polling)
- [ ] Monitoring: alerter si vérification intégrité échoue
- [ ] Tests: log event, verify hash, replay chain, detect tampering
- [ ] Documentation: "comment le système prouve la non-répudiation"
- [ ] Exemple: événement paiement loggé, hash chain vérifiée, données intactes
Dernière mise à jour: 2026-02-21