Skip to content

#45 — Logs SHA-256 Immuables

PLANIFIÉ

Priorité: 🔴 CRITIQUE · Type: TYPE B · Conteneur: rgz-api · Code: app/services/immutable_logs.pyDépendances: #4 rgz-db, #40 elk-stack


Description

Système de logs immuables avec chaîne SHA-256. Chaque événement est hashé avec la donnée précédente, créant une chaîne non-répudiable : hash = SHA256(timestamp + event_data + prev_hash). Cette approche empêche toute modification rétroactive des logs sans détection immédiate. Les événements loggés incluent : toutes les connexions (auth), tous les paiements (montant, MSISDN, timestamp), accès données personnelles (par qui, quand), modifications de configuration, changement de statut abonné, etc.

Les logs sont stockés dans PostgreSQL dans une table append-only (rôle DB sans UPDATE/DELETE), répliquées en temps réel vers Elasticsearch (#40) pour recherche/analyse. Une vérification d'intégrité est exécutée mensuellement : on rejoue tous les hash depuis le début pour s'assurer qu'aucune donnée n'a été altérée. L'audit trail (#48) complète ce système en loggant les actions utilisateur (who/when/what), tandis que les logs immuables garantissent la non-répudiation (ce qui s'est vraiment passé).

Rétention : 12 mois obligatoire pour ARCEP. Archive annuelle vers S3 ou disque (immutable storage).

Architecture Interne

1. Événement système (ex: paiement):
   └─> app/api/v1/endpoints/payments.py
       ├─> POST /api/v1/payments/webhooks/kkiapay
       └─> ImmutableLogService.log_event(
           event_type='payment',
           event_data={
             'subscriber_id': 'abcd-1234',
             'amount': 5000,  # FCFA entier
             'provider': 'MTN_MOMO',
             'kkiapay_txn_id': 'txn_xyz123',
             'timestamp': '2026-02-21T14:30:45Z'
           }
       )

2. Calcul du hash:
   ├─> prev_log = db.session.query(ImmutableLog).order_by(id DESC).first()
   ├─> prev_hash = prev_log.current_hash if prev_log else '0' * 64
   ├─> event_str = json.dumps(event_data, sort_keys=True)
   ├─> input = timestamp + event_str + prev_hash
   ├─> current_hash = hashlib.sha256(input.encode()).hexdigest()
   └─> INSERT immutable_logs(event_type, event_data, prev_hash, current_hash)

3. Réplication ELK:
   └─> Logstash @ config/logstash/pipelines/:
       ├─> Input: PostgreSQL jdbc (poll toutes les 5min)
       ├─> Filter: parse event_data JSON
       ├─> Output: Elasticsearch immutable-logs-YYYY-MM (rolling index)
       └─> Kibana: dashboards pour recherche/audit par date/type

4. Vérification d'intégrité (monthly, Celery task):
   └─> app/tasks/compliance.py → function: verify_immutable_logs()
       ├─> SELECT ALL FROM immutable_logs ORDER BY created_at ASC
       ├─> Replay: pour chaque log, recalcul hash = SHA256(ts + data + prev)
       ├─> Comparaison: calculated_hash vs stored_hash
       ├─> Résultat:
       │   ├─ TOUS OK → log "integrity verified" → immutable_logs_verification table
       │   └─ MISMATCH → ALERT CRITICAL, disable API auth, appel admin
       └─> Report: INSERT integrity_verification_report
           {date, total_logs, verified_count, failures: []}

5. Endpoints pour audit:
   └─> GET /api/v1/logs/immutable?from=2026-02-01&to=2026-02-21&type=payment
       ├─> SELECT * FROM immutable_logs
       │   WHERE created_at BETWEEN ? AND ? AND event_type = ?
       ├─> ORDER BY created_at DESC
       ├─> Pagination: LIMIT 50 OFFSET
       └─> Retour JSON avec hashes (transparence)

   └─> GET /api/v1/logs/immutable/{log_id}/verify
       ├─> Récupérer log + prev_log
       ├─> Recalcul hash: expected = SHA256(ts + data + prev_hash)
       ├─> Comparaison: expected vs stored
       └─> JSON: {valid: true/false, hash: stored, expected: expected, mismatch: false/true}

6. Rétention 12 mois:
   └─> Celery task: rgz.logs.archive_expired (monthly, first day)
       ├─> SELECT WHERE created_at < (TODAY - 365 DAYS)
       ├─> Dump to CSV/JSON → s3://rgz-backups/immutable-logs-2025-01.tar.gz
       ├─> DELETE old rows (ou TRUNCATE partition)
       └─> Verify backup integrity via separate restoration test

7. Audit du système lui-même:
   └─> Qui a accédé les logs immuables?
       └─> Ça, c'est loggé dans #48 audit_trail
           {user_id, action=read_immutable_log, log_id, timestamp, ip}

Configuration

Variables d'environnement

env
IMMUTABLE_LOG_ENABLED=true            # Activation
IMMUTABLE_LOG_RETENTION_DAYS=365      # Rétention (12 mois)
IMMUTABLE_LOG_ARCHIVE_BACKEND=s3      # s3, gcs, local_disk
IMMUTABLE_LOG_ARCHIVE_BUCKET=rgz-backups  # S3 bucket
IMMUTABLE_LOG_VERIFY_FREQUENCY=monthly    # monthly, weekly
IMMUTABLE_LOG_HASH_ALGORITHM=sha256   # sha256 (uniquement)

Model SQLAlchemy

python
# app/models/compliance.py

class ImmutableLog(Base):
    __tablename__ = "immutable_logs"

    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid4)
    event_type = Column(String(50), nullable=False,
        CheckConstraint("event_type IN ('subscription', 'auth', 'payment', 'refund', "
                       "'access_data', 'config_change', 'anomaly', 'incident', 'other')"))
    event_timestamp = Column(DateTime, nullable=False)  # Timestamp when event occurred
    event_data = Column(JSON, nullable=False)  # {msisdn, amount, nas_id, ip, ...}

    # Hash chain
    prev_hash = Column(String(64), nullable=True)  # SHA-256 hex (NULL if first)
    current_hash = Column(String(64), nullable=False, unique=True)  # SHA-256 hex

    # Metadata
    created_at = Column(DateTime, default=utcnow, nullable=False, index=True)
    updated_at = Column(DateTime, default=utcnow, onupdate=utcnow)

    # Verification audit
    is_verified = Column(Boolean, default=False, nullable=False)
    last_verification_at = Column(DateTime, nullable=True)

    # Index pour requêtes rapides
    __table_args__ = (
        Index('ix_event_type_created', 'event_type', 'created_at'),
        Index('ix_created_at', 'created_at'),
    )

class IntegrityVerificationReport(Base):
    __tablename__ = "integrity_verification_reports"

    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid4)
    verification_date = Column(DateTime, nullable=False)
    total_logs = Column(Integer, nullable=False)
    verified_count = Column(Integer, nullable=False)
    failure_count = Column(Integer, nullable=False, default=0)
    failures = Column(JSON, nullable=True)  # [{log_id, expected_hash, actual_hash, reason}]
    status = Column(String(20), nullable=False)  # success, partial_failure, failure
    created_at = Column(DateTime, default=utcnow, nullable=False)

Rôle PostgreSQL (append-only)

sql
-- Créer rôle pour rgz-api
CREATE ROLE rgz_immutable_logs LOGIN PASSWORD '...';

-- Grant: SELECT sur immutable_logs (lecture audit)
GRANT SELECT ON immutable_logs TO rgz_immutable_logs;

-- Grant: INSERT seulement (pas UPDATE/DELETE)
GRANT INSERT ON immutable_logs TO rgz_immutable_logs;

-- Grant: SELECT sur integrity_verification_reports
GRANT SELECT ON integrity_verification_reports TO rgz_immutable_logs;
GRANT INSERT ON integrity_verification_reports TO rgz_immutable_logs;

-- Désactiver explicitement UPDATE/DELETE
REVOKE UPDATE, DELETE ON immutable_logs FROM rgz_immutable_logs;
REVOKE TRUNCATE ON immutable_logs FROM rgz_immutable_logs;

Endpoints API

MéthodeRouteRéponseAuth
GET/api/v1/logs/immutable?from=&to=&type=&page=1200 {items: [...], total, page, pages}Admin
GET/api/v1/logs/immutable/{log_id}200 log detailAdmin
GET/api/v1/logs/immutable/{log_id}/verify200 {valid, hash, expected, mismatch}Admin
GET/api/v1/logs/integrity/reports200 {items: [...], total}Admin
POST/api/v1/logs/immutable/manual-verify202 {job_id, status}Admin

Schémas Pydantic

python
# app/schemas/compliance.py

class ImmutableLogSchema(BaseModel):
    id: UUID
    event_type: str
    event_timestamp: datetime
    event_data: dict
    current_hash: str
    created_at: datetime
    is_verified: bool
    last_verification_at: Optional[datetime]

class ImmutableLogVerifyResponse(BaseModel):
    log_id: UUID
    valid: bool
    hash_stored: str
    hash_expected: str
    mismatch: bool
    message: str

class VerificationReportSchema(BaseModel):
    verification_date: datetime
    total_logs: int
    verified_count: int
    failure_count: int
    status: str
    failures: List[dict]

Commandes Utiles

bash
# Insérer événement manuellement (CLI test)
python3 -c "
from app.services.immutable_logs import ImmutableLogService
from app.database import SessionLocal

with SessionLocal() as db:
    service = ImmutableLogService(db)
    service.log_event(
        event_type='test',
        event_data={'msg': 'hello', 'value': 42}
    )
    print('Log inserido com sucesso')
"

# Requête DB : voir logs récents
psql -h localhost -U rgz_admin -d rgz_db -c \
  "SELECT id, event_type, current_hash, created_at FROM immutable_logs ORDER BY created_at DESC LIMIT 10;"

# Voir hash d'un log spécifique
psql -h localhost -U rgz_admin -d rgz_db -c \
  "SELECT current_hash, prev_hash, event_data FROM immutable_logs WHERE id = 'xxx';"

# Tester endpoint API
curl -X GET http://localhost:8000/api/v1/logs/immutable?type=payment&from=2026-02-01 \
  -H "Authorization: Bearer $TOKEN" | jq

# Vérifier intégrité un log (endpoint)
curl -X GET http://localhost:8000/api/v1/logs/immutable/xxx/verify \
  -H "Authorization: Bearer $TOKEN" | jq

# Lancer vérification manuelle
curl -X POST http://localhost:8000/api/v1/logs/immutable/manual-verify \
  -H "Authorization: Bearer $TOKEN"

# Voir rapports de vérification
curl -X GET http://localhost:8000/api/v1/logs/integrity/reports \
  -H "Authorization: Bearer $TOKEN" | jq

# Voir logs dans Kibana (si ELK intégré)
# URL: http://kibana:5601/app/discover?index=immutable-logs-*

Implémentation TODO

  • [ ] Classe ImmutableLogService avec log_event(event_type, event_data)
  • [ ] Fonction calcul hash: SHA256(timestamp + json.dumps(data) + prev_hash)
  • [ ] Fonction insertion DB: transactionnel, vérifier prev_hash intégrité
  • [ ] Endpoint GET /api/v1/logs/immutable avec filtres type/from/to/page
  • [ ] Endpoint GET /api/v1/logs/immutable/{id}/verify pour vérifier log spécifique
  • [ ] Endpoint POST /api/v1/logs/immutable/manual-verify pour lancer job Celery
  • [ ] Celery task: monthly integrity verification (replay tous les hash)
  • [ ] Celery task: monthly archive (export + delete old rows)
  • [ ] Rôle PostgreSQL append-only (INSERT only, no UPDATE/DELETE)
  • [ ] Intégration Logstash → Elasticsearch (5min polling)
  • [ ] Monitoring: alerter si vérification intégrité échoue
  • [ ] Tests: log event, verify hash, replay chain, detect tampering
  • [ ] Documentation: "comment le système prouve la non-répudiation"
  • [ ] Exemple: événement paiement loggé, hash chain vérifiée, données intactes

Dernière mise à jour: 2026-02-21

PROJET MOSAÏQUE — 81 outils, 22 conteneurs, 500+ revendeurs WiFi Zone