Aller au contenu principal

Sécurité

La sécurité est une priorité absolue lors de l'intégration de l'API YODI. Ce guide couvre les meilleures pratiques de sécurité pour protéger vos clés API, vos données et vos utilisateurs.

Gestion des clés API

Stockage sécurisé

Bonnes pratiques

# Variables d'environnement
export YODI_API_KEY="sk-your-secure-key"
export YODI_ORG_ID="org-your-organization"

# Fichier .env (à exclure du contrôle de version)
YODI_API_KEY=sk-your-secure-key
YODI_ORG_ID=org-your-organization
YODI_BASE_URL=https://api.yodi.tg/v1
# Configuration sécurisée avec validation
import os
from cryptography.fernet import Fernet

class SecureConfig:
def __init__(self):
self.api_key = self._get_secure_api_key()
self.validate_key()

def _get_secure_api_key(self):
"""Récupère la clé API de manière sécurisée"""
# Priorité 1: Variable d'environnement
key = os.getenv("YODI_API_KEY")
if key:
return key

# Priorité 2: Fichier chiffré
try:
return self._decrypt_key_from_file()
except:
pass

# Priorité 3: Service de secrets (AWS Secrets Manager, etc.)
try:
return self._get_key_from_secrets_manager()
except:
pass

raise ValueError("Aucune clé API trouvée dans les sources sécurisées")

def validate_key(self):
"""Valide le format de la clé API"""
if not self.api_key:
raise ValueError("Clé API manquante")

if not self.api_key.startswith("sk-"):
raise ValueError("Format de clé API invalide")

if len(self.api_key) < 20:
raise ValueError("Clé API trop courte")

def _decrypt_key_from_file(self):
"""Déchiffre la clé depuis un fichier sécurisé"""
# Implémentation exemple avec Fernet
with open("key.enc", "rb") as f:
encrypted_key = f.read()

fernet_key = os.getenv("FERNET_KEY")
if not fernet_key:
raise ValueError("Clé de chiffrement manquante")

fernet = Fernet(fernet_key.encode())
return fernet.decrypt(encrypted_key).decode()

def _get_key_from_secrets_manager(self):
"""Récupère la clé depuis AWS Secrets Manager"""
import boto3

client = boto3.client('secretsmanager')
response = client.get_secret_value(SecretId='yodi-api-key')
return response['SecretString']

config = SecureConfig()

Pratiques à éviter

# JAMAIS faire ceci
api_key = "sk-your-actual-key-here" # Clé en dur dans le code

# JAMAIS faire ceci
with open("config.json", "w") as f:
json.dump({"api_key": "sk-your-key"}, f) # Clé en clair dans un fichier

# JAMAIS faire ceci
print(f"Using API key: {api_key}") # Log de la clé

# JAMAIS faire ceci
requests.get(f"https://api.yodi.tg/v1/models?key={api_key}") # Clé dans l'URL

Rotation des clés

import time
import logging
from datetime import datetime, timedelta

class KeyRotationManager:
def __init__(self, primary_key, secondary_key=None, rotation_interval_days=90):
self.primary_key = primary_key
self.secondary_key = secondary_key
self.rotation_interval = timedelta(days=rotation_interval_days)
self.last_rotation = datetime.now()
self.logger = logging.getLogger(__name__)

def get_current_key(self):
"""Retourne la clé actuellement active"""
if self.needs_rotation():
self.logger.warning("Rotation de clé nécessaire")

return self.primary_key

def needs_rotation(self):
"""Vérifie si une rotation est nécessaire"""
return datetime.now() - self.last_rotation > self.rotation_interval

def rotate_keys(self, new_key):
"""Effectue la rotation des clés"""
self.logger.info("Début de la rotation des clés")

# Test de la nouvelle clé
if not self._test_key(new_key):
raise ValueError("La nouvelle clé est invalide")

# Rotation
self.secondary_key = self.primary_key
self.primary_key = new_key
self.last_rotation = datetime.now()

self.logger.info("Rotation des clés terminée avec succès")

def _test_key(self, key):
"""Teste la validité d'une clé"""
try:
test_client = Client(api_key=key)
test_client.models.list()
return True
except:
return False

def emergency_fallback(self):
"""Bascule sur la clé de secours en cas d'urgence"""
if self.secondary_key:
self.logger.warning("Basculement vers la clé de secours")
self.primary_key, self.secondary_key = self.secondary_key, self.primary_key
return True
return False

# Utilisation
key_manager = KeyRotationManager(
primary_key=os.getenv("YODI_API_KEY"),
secondary_key=os.getenv("YODI_API_KEY_BACKUP")
)

Sécurisation des communications

HTTPS et certificats

import ssl
import certifi
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

class SecureHTTPAdapter(HTTPAdapter):
def init_poolmanager(self, *args, **kwargs):
context = ssl.create_default_context(cafile=certifi.where())
context.check_hostname = True
context.verify_mode = ssl.CERT_REQUIRED
context.minimum_version = ssl.TLSVersion.TLSv1_2

kwargs['ssl_context'] = context
return super().init_poolmanager(*args, **kwargs)

def create_secure_session():
"""Crée une session HTTP sécurisée"""
session = requests.Session()

# Configuration SSL stricte
session.mount('https://', SecureHTTPAdapter())

# Retry strategy
retry_strategy = Retry(
total=3,
status_forcelist=[429, 500, 502, 503, 504],
backoff_factor=1,
allowed_methods=["HEAD", "GET", "POST"]
)

adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("https://", adapter)

# Headers de sécurité
session.headers.update({
'User-Agent': 'YourApp/1.0 (Security-Enhanced)',
'X-Content-Type-Options': 'nosniff',
'X-Frame-Options': 'DENY'
})

return session

# Utilisation avec le client YODI
class SecureYodiClient:
def __init__(self, api_key):
self.api_key = api_key
self.session = create_secure_session()
self.base_url = "https://api.yodi.tg/v1"

def _make_request(self, method, endpoint, **kwargs):
headers = {
'Authorization': f'Bearer {self.api_key}',
'Content-Type': 'application/json'
}

url = f"{self.base_url}/{endpoint}"

response = self.session.request(
method=method,
url=url,
headers=headers,
timeout=30,
**kwargs
)

response.raise_for_status()
return response.json()

Validation et sanitisation des entrées

import re
import html
from typing import List, Dict, Any

class InputValidator:
def __init__(self):
# Patterns de contenu potentiellement dangereux
self.dangerous_patterns = [
r'<script.*?>.*?</script>', # Scripts
r'javascript:', # JavaScript URLs
r'data:text/html', # Data URLs HTML
r'vbscript:', # VBScript
r'onload=', # Event handlers
r'onerror=',
r'onclick=',
]

# Limites de sécurité
self.max_message_length = 10000
self.max_messages = 50
self.max_total_length = 50000

def validate_messages(self, messages: List[Dict[str, str]]) -> List[Dict[str, str]]:
"""Valide et nettoie les messages"""
if not isinstance(messages, list):
raise ValueError("Messages doit être une liste")

if len(messages) > self.max_messages:
raise ValueError(f"Trop de messages (max: {self.max_messages})")

validated_messages = []
total_length = 0

for message in messages:
validated_msg = self._validate_message(message)
validated_messages.append(validated_msg)
total_length += len(validated_msg['content'])

if total_length > self.max_total_length:
raise ValueError(f"Contenu total trop long (max: {self.max_total_length})")

return validated_messages

def _validate_message(self, message: Dict[str, str]) -> Dict[str, str]:
"""Valide un message individuel"""
if not isinstance(message, dict):
raise ValueError("Message doit être un dictionnaire")

required_fields = {'role', 'content'}
if not required_fields.issubset(message.keys()):
raise ValueError(f"Champs requis: {required_fields}")

# Valider le rôle
valid_roles = {'system', 'user', 'assistant'}
if message['role'] not in valid_roles:
raise ValueError(f"Rôle invalide. Valides: {valid_roles}")

# Valider et nettoyer le contenu
content = self._sanitize_content(message['content'])

return {
'role': message['role'],
'content': content
}

def _sanitize_content(self, content: str) -> str:
"""Nettoie le contenu des éléments dangereux"""
if not isinstance(content, str):
raise ValueError("Le contenu doit être une chaîne")

if len(content) > self.max_message_length:
raise ValueError(f"Message trop long (max: {self.max_message_length})")

# Échapper le HTML
content = html.escape(content)

# Vérifier les patterns dangereux
for pattern in self.dangerous_patterns:
if re.search(pattern, content, re.IGNORECASE):
raise ValueError(f"Contenu potentiellement dangereux détecté")

# Nettoyer les caractères de contrôle
content = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', content)

return content.strip()

def validate_parameters(self, **kwargs) -> Dict[str, Any]:
"""Valide les paramètres de l'API"""
validated = {}

# Temperature
if 'temperature' in kwargs:
temp = kwargs['temperature']
if not isinstance(temp, (int, float)) or not 0 <= temp <= 2:
raise ValueError("Temperature doit être entre 0 et 2")
validated['temperature'] = temp

# Max tokens
if 'max_tokens' in kwargs:
max_tokens = kwargs['max_tokens']
if not isinstance(max_tokens, int) or max_tokens < 1 or max_tokens > 4000:
raise ValueError("max_tokens doit être entre 1 et 4000")
validated['max_tokens'] = max_tokens

# Top_p
if 'top_p' in kwargs:
top_p = kwargs['top_p']
if not isinstance(top_p, (int, float)) or not 0 <= top_p <= 1:
raise ValueError("top_p doit être entre 0 et 1")
validated['top_p'] = top_p

return validated

validator = InputValidator()

def secure_chat_completion(messages, **kwargs):
"""Completion sécurisée avec validation"""
try:
# Valider les entrées
clean_messages = validator.validate_messages(messages)
clean_params = validator.validate_parameters(**kwargs)

# Faire l'appel API
response = client.chat.completions.create(
messages=clean_messages,
**clean_params
)

return response

except ValueError as e:
logging.warning(f"Validation failed: {e}")
raise

Filtrage de contenu

Détection de contenu inapproprié

import re
from typing import List, Tuple

class ContentFilter:
def __init__(self):
# Mots-clés et patterns à filtrer
self.blocked_keywords = [
'spam', 'abuse', 'harassment', 'hate', 'violence',
'illegal', 'harmful', 'toxic', 'malicious'
]

# Patterns de PII (Personally Identifiable Information)
self.pii_patterns = {
'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
'phone': r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
'ssn': r'\b\d{3}-\d{2}-\d{4}\b',
'credit_card': r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b',
'ip_address': r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b'
}

def scan_content(self, content: str) -> Dict[str, List[str]]:
"""Scanne le contenu pour détecter les problèmes"""
issues = {
'blocked_keywords': [],
'pii_detected': [],
'suspicious_patterns': []
}

content_lower = content.lower()

# Vérifier les mots-clés bloqués
for keyword in self.blocked_keywords:
if keyword in content_lower:
issues['blocked_keywords'].append(keyword)

# Vérifier les PII
for pii_type, pattern in self.pii_patterns.items():
matches = re.findall(pattern, content)
if matches:
issues['pii_detected'].extend([f"{pii_type}: {match}" for match in matches])

# Vérifier les patterns suspects
suspicious_patterns = [
r'password\s*[:=]\s*\w+',
r'api[_\s]*key\s*[:=]\s*\w+',
r'secret\s*[:=]\s*\w+',
r'token\s*[:=]\s*\w+'
]

for pattern in suspicious_patterns:
matches = re.findall(pattern, content, re.IGNORECASE)
if matches:
issues['suspicious_patterns'].extend(matches)

return issues

def is_content_safe(self, content: str) -> Tuple[bool, Dict[str, List[str]]]:
"""Détermine si le contenu est sûr"""
issues = self.scan_content(content)

# Le contenu est unsafe s'il y a des problèmes
is_safe = not any(issues.values())

return is_safe, issues

def sanitize_content(self, content: str) -> str:
"""Nettoie le contenu en masquant les informations sensibles"""
sanitized = content

# Masquer les PII
for pii_type, pattern in self.pii_patterns.items():
if pii_type == 'email':
sanitized = re.sub(pattern, '[EMAIL_REDACTED]', sanitized)
elif pii_type == 'phone':
sanitized = re.sub(pattern, '[PHONE_REDACTED]', sanitized)
elif pii_type == 'ssn':
sanitized = re.sub(pattern, '[SSN_REDACTED]', sanitized)
elif pii_type == 'credit_card':
sanitized = re.sub(pattern, '[CARD_REDACTED]', sanitized)
elif pii_type == 'ip_address':
sanitized = re.sub(pattern, '[IP_REDACTED]', sanitized)

return sanitized

content_filter = ContentFilter()

def safe_chat_completion(messages, **kwargs):
"""Completion avec filtrage de contenu"""
filtered_messages = []

for message in messages:
content = message['content']

# Vérifier la sécurité du contenu
is_safe, issues = content_filter.is_content_safe(content)

if not is_safe:
logging.warning(f"Contenu unsafe détecté: {issues}")
# Option 1: Rejeter la requête
raise ValueError(f"Contenu inapproprié détecté: {list(issues.keys())}")

# Option 2: Nettoyer le contenu
# content = content_filter.sanitize_content(content)

filtered_messages.append({
'role': message['role'],
'content': content
})

return client.chat.completions.create(
messages=filtered_messages,
**kwargs
)

Audit et logging

Logging sécurisé

import logging
import json
import hashlib
from datetime import datetime
from typing import Dict, Any

class SecurityLogger:
def __init__(self, log_file="security.log"):
self.logger = logging.getLogger("security")
self.logger.setLevel(logging.INFO)

# Handler pour fichier avec rotation
from logging.handlers import RotatingFileHandler
file_handler = RotatingFileHandler(
log_file, maxBytes=10*1024*1024, backupCount=5
)

# Format JSON structuré
formatter = logging.Formatter('%(message)s')
file_handler.setFormatter(formatter)
self.logger.addHandler(file_handler)

def log_api_call(self,
user_id: str,
endpoint: str,
success: bool,
request_data: Dict[str, Any] = None,
error: str = None):
"""Log d'appel API avec informations de sécurité"""

# Hasher les données sensibles
request_hash = None
if request_data:
request_str = json.dumps(request_data, sort_keys=True)
request_hash = hashlib.sha256(request_str.encode()).hexdigest()[:16]

log_entry = {
'timestamp': datetime.utcnow().isoformat(),
'event_type': 'api_call',
'user_id': self._hash_user_id(user_id),
'endpoint': endpoint,
'success': success,
'request_hash': request_hash,
'error': error
}

self.logger.info(json.dumps(log_entry))

def log_security_event(self,
event_type: str,
user_id: str = None,
details: Dict[str, Any] = None):
"""Log d'événement de sécurité"""

log_entry = {
'timestamp': datetime.utcnow().isoformat(),
'event_type': f'security_{event_type}',
'user_id': self._hash_user_id(user_id) if user_id else None,
'details': details or {}
}

self.logger.warning(json.dumps(log_entry))

def _hash_user_id(self, user_id: str) -> str:
"""Hash l'ID utilisateur pour la confidentialité"""
return hashlib.sha256(user_id.encode()).hexdigest()[:16]

security_logger = SecurityLogger()

def logged_secure_completion(user_id: str, messages: List[Dict], **kwargs):
"""Completion avec logging de sécurité complet"""
start_time = datetime.utcnow()

try:
# Valider les entrées
clean_messages = validator.validate_messages(messages)
clean_params = validator.validate_parameters(**kwargs)

# Vérifier le contenu
for message in clean_messages:
is_safe, issues = content_filter.is_content_safe(message['content'])
if not is_safe:
security_logger.log_security_event(
'content_violation',
user_id=user_id,
details={'issues': issues}
)
raise ValueError("Contenu inapproprié détecté")

# Faire l'appel API
response = client.chat.completions.create(
messages=clean_messages,
**clean_params
)

# Log de succès
security_logger.log_api_call(
user_id=user_id,
endpoint='chat/completions',
success=True,
request_data={'message_count': len(clean_messages)}
)

return response

except Exception as e:
# Log d'erreur
security_logger.log_api_call(
user_id=user_id,
endpoint='chat/completions',
success=False,
error=str(e)
)
raise

Surveillance des anomalies

from collections import defaultdict, deque
from datetime import datetime, timedelta
import statistics

class AnomalyDetector:
def __init__(self, window_minutes=60):
self.window_minutes = window_minutes
self.user_activity = defaultdict(lambda: {
'requests': deque(),
'errors': deque(),
'blocked_content': deque()
})

def record_activity(self, user_id: str, activity_type: str, metadata: Dict = None):
"""Enregistre l'activité d'un utilisateur"""
timestamp = datetime.utcnow()

user_data = self.user_activity[user_id]
user_data[activity_type].append({
'timestamp': timestamp,
'metadata': metadata or {}
})

# Nettoyer les anciens enregistrements
self._cleanup_old_records(user_id)

# Détecter les anomalies
anomalies = self._detect_anomalies(user_id)
if anomalies:
self._handle_anomalies(user_id, anomalies)

def _cleanup_old_records(self, user_id: str):
"""Supprime les enregistrements anciens"""
cutoff = datetime.utcnow() - timedelta(minutes=self.window_minutes)

user_data = self.user_activity[user_id]
for activity_type in user_data:
while (user_data[activity_type] and
user_data[activity_type][0]['timestamp'] < cutoff):
user_data[activity_type].popleft()

def _detect_anomalies(self, user_id: str) -> List[str]:
"""Détecte les anomalies pour un utilisateur"""
user_data = self.user_activity[user_id]
anomalies = []

# Trop de requêtes
request_count = len(user_data['requests'])
if request_count > 100: # 100 requêtes par heure
anomalies.append(f"high_request_volume: {request_count}")

# Taux d'erreur élevé
error_count = len(user_data['errors'])
if error_count > 0 and request_count > 0:
error_rate = error_count / request_count
if error_rate > 0.3: # Plus de 30% d'erreurs
anomalies.append(f"high_error_rate: {error_rate:.2%}")

# Contenu bloqué répétitif
blocked_count = len(user_data['blocked_content'])
if blocked_count > 5: # Plus de 5 contenus bloqués
anomalies.append(f"repeated_violations: {blocked_count}")

return anomalies

def _handle_anomalies(self, user_id: str, anomalies: List[str]):
"""Gère les anomalies détectées"""
security_logger.log_security_event(
'anomaly_detected',
user_id=user_id,
details={'anomalies': anomalies}
)

# Actions possibles :
# - Limitation temporaire
# - Alerte administrateur
# - Blocage automatique

print(f"� Anomalies détectées pour {user_id}: {anomalies}")

anomaly_detector = AnomalyDetector()

def monitored_completion(user_id: str, messages: List[Dict], **kwargs):
"""Completion avec détection d'anomalies"""
try:
# Enregistrer la requête
anomaly_detector.record_activity(user_id, 'requests')

response = logged_secure_completion(user_id, messages, **kwargs)
return response

except ValueError as e:
# Enregistrer les violations de contenu
if "inapproprié" in str(e):
anomaly_detector.record_activity(user_id, 'blocked_content')

# Enregistrer l'erreur
anomaly_detector.record_activity(user_id, 'errors', {'error': str(e)})
raise

except Exception as e:
# Enregistrer l'erreur générale
anomaly_detector.record_activity(user_id, 'errors', {'error': str(e)})
raise

Chiffrement des données

Chiffrement des logs sensibles

from cryptography.fernet import Fernet
import base64
import os

class EncryptedLogger:
def __init__(self, encryption_key=None):
if encryption_key:
self.fernet = Fernet(encryption_key)
else:
# Générer une nouvelle clé (à sauvegarder de manière sécurisée)
key = Fernet.generate_key()
self.fernet = Fernet(key)
print(f"Nouvelle clé de chiffrement générée: {key.decode()}")

def encrypt_log_data(self, data: Dict[str, Any]) -> str:
"""Chiffre les données de log"""
json_str = json.dumps(data)
encrypted_data = self.fernet.encrypt(json_str.encode())
return base64.b64encode(encrypted_data).decode()

def decrypt_log_data(self, encrypted_data: str) -> Dict[str, Any]:
"""Déchiffre les données de log"""
encrypted_bytes = base64.b64decode(encrypted_data.encode())
decrypted_data = self.fernet.decrypt(encrypted_bytes)
return json.loads(decrypted_data.decode())

def log_sensitive_data(self, data: Dict[str, Any]):
"""Log des données sensibles de manière chiffrée"""
encrypted = self.encrypt_log_data(data)

log_entry = {
'timestamp': datetime.utcnow().isoformat(),
'encrypted_data': encrypted
}

logging.info(json.dumps(log_entry))

# Utilisation
encryption_key = os.getenv("LOG_ENCRYPTION_KEY")
encrypted_logger = EncryptedLogger(encryption_key.encode() if encryption_key else None)

Checklist de sécurité

Avant le déploiement

## Checklist de sécurité YODI

### Gestion des clés

- [ ] Clés API stockées dans des variables d'environnement
- [ ] Aucune clé en dur dans le code
- [ ] Fichiers de configuration exclus du contrôle de version
- [ ] Rotation des clés configurée
- [ ] Clés de secours disponibles

### Communication

- [ ] HTTPS obligatoire pour toutes les communications
- [ ] Certificats SSL valides et à jour
- [ ] Headers de sécurité configurés
- [ ] Timeout appropriés configurés

### Validation des entrées

- [ ] Validation de tous les paramètres utilisateur
- [ ] Sanitisation du contenu
- [ ] Limites de taille implémentées
- [ ] Filtrage de contenu inapproprié

### Monitoring et audit

- [ ] Logs de sécurité configurés
- [ ] Détection d'anomalies active
- [ ] Alertes de sécurité configurées
- [ ] Chiffrement des logs sensibles

### � Architecture

- [ ] API proxy sécurisé déployé
- [ ] Rate limiting implémenté
- [ ] Authentification utilisateur
- [ ] Principe du moindre privilège appliqué

### � Réponse aux incidents

- [ ] Plan de réponse aux incidents documenté
- [ ] Procédure de révocation des clés
- [ ] Contacts d'urgence définis
- [ ] Tests de sécurité effectués

Prochaines étapes