Security
Security is a top priority when integrating the YODI API. This guide covers security best practices to protect your API keys, data, and users.
đ API key managementâ
Stockage sĂ©curisĂ©â
Bonnes pratiquesâ
# Variables d'environnement
export YODI_API_KEY="sk-your-secure-key"
export YODI_ORG_ID="org-your-organization"
# Fichier .env (Ă exclure du contrĂŽle de version)
YODI_API_KEY=sk-your-secure-key
YODI_ORG_ID=org-your-organization
YODI_BASE_URL=https://api.yodi.tg/v1
# Configuration sécurisée avec validation
import os
from cryptography.fernet import Fernet
class SecureConfig:
def __init__(self):
self.api_key = self._get_secure_api_key()
self.validate_key()
def _get_secure_api_key(self):
"""RécupÚre la clé API de maniÚre sécurisée"""
# Priorité 1: Variable d'environnement
key = os.getenv("YODI_API_KEY")
if key:
return key
# Priorité 2: Fichier chiffré
try:
return self._decrypt_key_from_file()
except:
pass
# Priorité 3: Service de secrets (AWS Secrets Manager, etc.)
try:
return self._get_key_from_secrets_manager()
except:
pass
raise ValueError("Aucune clé API trouvée dans les sources sécurisées")
def validate_key(self):
"""Valide le format de la clé API"""
if not self.api_key:
raise ValueError("Clé API manquante")
if not self.api_key.startswith("sk-"):
raise ValueError("Format de clé API invalide")
if len(self.api_key) < 20:
raise ValueError("Clé API trop courte")
def _decrypt_key_from_file(self):
"""Déchiffre la clé depuis un fichier sécurisé"""
# Implémentation exemple avec Fernet
with open("key.enc", "rb") as f:
encrypted_key = f.read()
fernet_key = os.getenv("FERNET_KEY")
if not fernet_key:
raise ValueError("Clé de chiffrement manquante")
fernet = Fernet(fernet_key.encode())
return fernet.decrypt(encrypted_key).decode()
def _get_key_from_secrets_manager(self):
"""RécupÚre la clé depuis AWS Secrets Manager"""
import boto3
client = boto3.client('secretsmanager')
response = client.get_secret_value(SecretId='yodi-api-key')
return response['SecretString']
config = SecureConfig()
Pratiques Ă Ă©viterâ
# JAMAIS faire ceci
api_key = "sk-your-actual-key-here" # Clé en dur dans le code
# JAMAIS faire ceci
with open("config.json", "w") as f:
json.dump({"api_key": "sk-your-key"}, f) # Clé en clair dans un fichier
# JAMAIS faire ceci
print(f"Using API key: {api_key}") # Log de la clé
# JAMAIS faire ceci
requests.get(f"https://api.yodi.tg/v1/models?key={api_key}") # Clé dans l'URL
Rotation des clĂ©sâ
import time
import logging
from datetime import datetime, timedelta
class KeyRotationManager:
def __init__(self, primary_key, secondary_key=None, rotation_interval_days=90):
self.primary_key = primary_key
self.secondary_key = secondary_key
self.rotation_interval = timedelta(days=rotation_interval_days)
self.last_rotation = datetime.now()
self.logger = logging.getLogger(__name__)
def get_current_key(self):
"""Retourne la clé actuellement active"""
if self.needs_rotation():
self.logger.warning("Rotation de clé nécessaire")
return self.primary_key
def needs_rotation(self):
"""Vérifie si une rotation est nécessaire"""
return datetime.now() - self.last_rotation > self.rotation_interval
def rotate_keys(self, new_key):
"""Effectue la rotation des clés"""
self.logger.info("Début de la rotation des clés")
# Test de la nouvelle clé
if not self._test_key(new_key):
raise ValueError("La nouvelle clé est invalide")
# Rotation
self.secondary_key = self.primary_key
self.primary_key = new_key
self.last_rotation = datetime.now()
self.logger.info("Rotation des clés terminée avec succÚs")
def _test_key(self, key):
"""Teste la validité d'une clé"""
try:
test_client = Client(api_key=key)
test_client.models.list()
return True
except:
return False
def emergency_fallback(self):
"""Bascule sur la clé de secours en cas d'urgence"""
if self.secondary_key:
self.logger.warning("Basculement vers la clé de secours")
self.primary_key, self.secondary_key = self.secondary_key, self.primary_key
return True
return False
# Utilisation
key_manager = KeyRotationManager(
primary_key=os.getenv("YODI_API_KEY"),
secondary_key=os.getenv("YODI_API_KEY_BACKUP")
)
đĄïž SĂ©curisation des communicationsâ
HTTPS et certificatsâ
import ssl
import certifi
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
class SecureHTTPAdapter(HTTPAdapter):
def init_poolmanager(self, *args, **kwargs):
context = ssl.create_default_context(cafile=certifi.where())
context.check_hostname = True
context.verify_mode = ssl.CERT_REQUIRED
context.minimum_version = ssl.TLSVersion.TLSv1_2
kwargs['ssl_context'] = context
return super().init_poolmanager(*args, **kwargs)
def create_secure_session():
"""Crée une session HTTP sécurisée"""
session = requests.Session()
# Configuration SSL stricte
session.mount('https://', SecureHTTPAdapter())
# Retry strategy
retry_strategy = Retry(
total=3,
status_forcelist=[429, 500, 502, 503, 504],
backoff_factor=1,
allowed_methods=["HEAD", "GET", "POST"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("https://", adapter)
# Headers de sécurité
session.headers.update({
'User-Agent': 'YourApp/1.0 (Security-Enhanced)',
'X-Content-Type-Options': 'nosniff',
'X-Frame-Options': 'DENY'
})
return session
# Utilisation avec le client YODI
class SecureYodiClient:
def __init__(self, api_key):
self.api_key = api_key
self.session = create_secure_session()
self.base_url = "https://api.yodi.tg/v1"
def _make_request(self, method, endpoint, **kwargs):
headers = {
'Authorization': f'Bearer {self.api_key}',
'Content-Type': 'application/json'
}
url = f"{self.base_url}/{endpoint}"
response = self.session.request(
method=method,
url=url,
headers=headers,
timeout=30,
**kwargs
)
response.raise_for_status()
return response.json()
Validation et sanitisation des entrĂ©esâ
import re
import html
from typing import List, Dict, Any
class InputValidator:
def __init__(self):
# Patterns de contenu potentiellement dangereux
self.dangerous_patterns = [
r'<script.*?>.*?</script>', # Scripts
r'javascript:', # JavaScript URLs
r'data:text/html', # Data URLs HTML
r'vbscript:', # VBScript
r'onload=', # Event handlers
r'onerror=',
r'onclick=',
]
# Limites de sécurité
self.max_message_length = 10000
self.max_messages = 50
self.max_total_length = 50000
def validate_messages(self, messages: List[Dict[str, str]]) -> List[Dict[str, str]]:
"""Valide et nettoie les messages"""
if not isinstance(messages, list):
raise ValueError("Messages doit ĂȘtre une liste")
if len(messages) > self.max_messages:
raise ValueError(f"Trop de messages (max: {self.max_messages})")
validated_messages = []
total_length = 0
for message in messages:
validated_msg = self._validate_message(message)
validated_messages.append(validated_msg)
total_length += len(validated_msg['content'])
if total_length > self.max_total_length:
raise ValueError(f"Contenu total trop long (max: {self.max_total_length})")
return validated_messages
def _validate_message(self, message: Dict[str, str]) -> Dict[str, str]:
"""Valide un message individuel"""
if not isinstance(message, dict):
raise ValueError("Message doit ĂȘtre un dictionnaire")
required_fields = {'role', 'content'}
if not required_fields.issubset(message.keys()):
raise ValueError(f"Champs requis: {required_fields}")
# Valider le rĂŽle
valid_roles = {'system', 'user', 'assistant'}
if message['role'] not in valid_roles:
raise ValueError(f"RĂŽle invalide. Valides: {valid_roles}")
# Valider et nettoyer le contenu
content = self._sanitize_content(message['content'])
return {
'role': message['role'],
'content': content
}
def _sanitize_content(self, content: str) -> str:
"""Nettoie le contenu des éléments dangereux"""
if not isinstance(content, str):
raise ValueError("Le contenu doit ĂȘtre une chaĂźne")
if len(content) > self.max_message_length:
raise ValueError(f"Message trop long (max: {self.max_message_length})")
# Ăchapper le HTML
content = html.escape(content)
# Vérifier les patterns dangereux
for pattern in self.dangerous_patterns:
if re.search(pattern, content, re.IGNORECASE):
raise ValueError(f"Contenu potentiellement dangereux détecté")
# Nettoyer les caractĂšres de contrĂŽle
content = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', content)
return content.strip()
def validate_parameters(self, **kwargs) -> Dict[str, Any]:
"""Valide les paramĂštres de l'API"""
validated = {}
# Temperature
if 'temperature' in kwargs:
temp = kwargs['temperature']
if not isinstance(temp, (int, float)) or not 0 <= temp <= 2:
raise ValueError("Temperature doit ĂȘtre entre 0 et 2")
validated['temperature'] = temp
# Max tokens
if 'max_tokens' in kwargs:
max_tokens = kwargs['max_tokens']
if not isinstance(max_tokens, int) or max_tokens < 1 or max_tokens > 4000:
raise ValueError("max_tokens doit ĂȘtre entre 1 et 4000")
validated['max_tokens'] = max_tokens
# Top_p
if 'top_p' in kwargs:
top_p = kwargs['top_p']
if not isinstance(top_p, (int, float)) or not 0 <= top_p <= 1:
raise ValueError("top_p doit ĂȘtre entre 0 et 1")
validated['top_p'] = top_p
return validated
validator = InputValidator()
def secure_chat_completion(messages, **kwargs):
"""Completion sécurisée avec validation"""
try:
# Valider les entrées
clean_messages = validator.validate_messages(messages)
clean_params = validator.validate_parameters(**kwargs)
# Faire l'appel API
response = client.chat.completions.create(
messages=clean_messages,
**clean_params
)
return response
except ValueError as e:
logging.warning(f"Validation failed: {e}")
raise
đ« Filtrage de contenuâ
DĂ©tection de contenu inappropriĂ©â
import re
from typing import List, Tuple
class ContentFilter:
def __init__(self):
# Mots-clés et patterns à filtrer
self.blocked_keywords = [
'spam', 'abuse', 'harassment', 'hate', 'violence',
'illegal', 'harmful', 'toxic', 'malicious'
]
# Patterns de PII (Personally Identifiable Information)
self.pii_patterns = {
'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
'phone': r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
'ssn': r'\b\d{3}-\d{2}-\d{4}\b',
'credit_card': r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b',
'ip_address': r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b'
}
def scan_content(self, content: str) -> Dict[str, List[str]]:
"""Scanne le contenu pour détecter les problÚmes"""
issues = {
'blocked_keywords': [],
'pii_detected': [],
'suspicious_patterns': []
}
content_lower = content.lower()
# Vérifier les mots-clés bloqués
for keyword in self.blocked_keywords:
if keyword in content_lower:
issues['blocked_keywords'].append(keyword)
# Vérifier les PII
for pii_type, pattern in self.pii_patterns.items():
matches = re.findall(pattern, content)
if matches:
issues['pii_detected'].extend([f"{pii_type}: {match}" for match in matches])
# Vérifier les patterns suspects
suspicious_patterns = [
r'password\s*[:=]\s*\w+',
r'api[_\s]*key\s*[:=]\s*\w+',
r'secret\s*[:=]\s*\w+',
r'token\s*[:=]\s*\w+'
]
for pattern in suspicious_patterns:
matches = re.findall(pattern, content, re.IGNORECASE)
if matches:
issues['suspicious_patterns'].extend(matches)
return issues
def is_content_safe(self, content: str) -> Tuple[bool, Dict[str, List[str]]]:
"""Détermine si le contenu est sûr"""
issues = self.scan_content(content)
# Le contenu est unsafe s'il y a des problĂšmes
is_safe = not any(issues.values())
return is_safe, issues
def sanitize_content(self, content: str) -> str:
"""Nettoie le contenu en masquant les informations sensibles"""
sanitized = content
# Masquer les PII
for pii_type, pattern in self.pii_patterns.items():
if pii_type == 'email':
sanitized = re.sub(pattern, '[EMAIL_REDACTED]', sanitized)
elif pii_type == 'phone':
sanitized = re.sub(pattern, '[PHONE_REDACTED]', sanitized)
elif pii_type == 'ssn':
sanitized = re.sub(pattern, '[SSN_REDACTED]', sanitized)
elif pii_type == 'credit_card':
sanitized = re.sub(pattern, '[CARD_REDACTED]', sanitized)
elif pii_type == 'ip_address':
sanitized = re.sub(pattern, '[IP_REDACTED]', sanitized)
return sanitized
content_filter = ContentFilter()
def safe_chat_completion(messages, **kwargs):
"""Completion avec filtrage de contenu"""
filtered_messages = []
for message in messages:
content = message['content']
# Vérifier la sécurité du contenu
is_safe, issues = content_filter.is_content_safe(content)
if not is_safe:
logging.warning(f"Contenu unsafe détecté: {issues}")
# Option 1: Rejeter la requĂȘte
raise ValueError(f"Contenu inapproprié détecté: {list(issues.keys())}")
# Option 2: Nettoyer le contenu
# content = content_filter.sanitize_content(content)
filtered_messages.append({
'role': message['role'],
'content': content
})
return client.chat.completions.create(
messages=filtered_messages,
**kwargs
)
đ Audit et loggingâ
Logging sĂ©curisĂ©â
import logging
import json
import hashlib
from datetime import datetime
from typing import Dict, Any
class SecurityLogger:
def __init__(self, log_file="security.log"):
self.logger = logging.getLogger("security")
self.logger.setLevel(logging.INFO)
# Handler pour fichier avec rotation
from logging.handlers import RotatingFileHandler
file_handler = RotatingFileHandler(
log_file, maxBytes=10*1024*1024, backupCount=5
)
# Format JSON structuré
formatter = logging.Formatter('%(message)s')
file_handler.setFormatter(formatter)
self.logger.addHandler(file_handler)
def log_api_call(self,
user_id: str,
endpoint: str,
success: bool,
request_data: Dict[str, Any] = None,
error: str = None):
"""Log d'appel API avec informations de sécurité"""
# Hasher les données sensibles
request_hash = None
if request_data:
request_str = json.dumps(request_data, sort_keys=True)
request_hash = hashlib.sha256(request_str.encode()).hexdigest()[:16]
log_entry = {
'timestamp': datetime.utcnow().isoformat(),
'event_type': 'api_call',
'user_id': self._hash_user_id(user_id),
'endpoint': endpoint,
'success': success,
'request_hash': request_hash,
'error': error
}
self.logger.info(json.dumps(log_entry))
def log_security_event(self,
event_type: str,
user_id: str = None,
details: Dict[str, Any] = None):
"""Log d'événement de sécurité"""
log_entry = {
'timestamp': datetime.utcnow().isoformat(),
'event_type': f'security_{event_type}',
'user_id': self._hash_user_id(user_id) if user_id else None,
'details': details or {}
}
self.logger.warning(json.dumps(log_entry))
def _hash_user_id(self, user_id: str) -> str:
"""Hash l'ID utilisateur pour la confidentialité"""
return hashlib.sha256(user_id.encode()).hexdigest()[:16]
security_logger = SecurityLogger()
def logged_secure_completion(user_id: str, messages: List[Dict], **kwargs):
"""Completion avec logging de sécurité complet"""
start_time = datetime.utcnow()
try:
# Valider les entrées
clean_messages = validator.validate_messages(messages)
clean_params = validator.validate_parameters(**kwargs)
# Vérifier le contenu
for message in clean_messages:
is_safe, issues = content_filter.is_content_safe(message['content'])
if not is_safe:
security_logger.log_security_event(
'content_violation',
user_id=user_id,
details={'issues': issues}
)
raise ValueError("Contenu inapproprié détecté")
# Faire l'appel API
response = client.chat.completions.create(
messages=clean_messages,
**clean_params
)
# Log de succĂšs
security_logger.log_api_call(
user_id=user_id,
endpoint='chat/completions',
success=True,
request_data={'message_count': len(clean_messages)}
)
return response
except Exception as e:
# Log d'erreur
security_logger.log_api_call(
user_id=user_id,
endpoint='chat/completions',
success=False,
error=str(e)
)
raise
Surveillance des anomaliesâ
from collections import defaultdict, deque
from datetime import datetime, timedelta
import statistics
class AnomalyDetector:
def __init__(self, window_minutes=60):
self.window_minutes = window_minutes
self.user_activity = defaultdict(lambda: {
'requests': deque(),
'errors': deque(),
'blocked_content': deque()
})
def record_activity(self, user_id: str, activity_type: str, metadata: Dict = None):
"""Enregistre l'activité d'un utilisateur"""
timestamp = datetime.utcnow()
user_data = self.user_activity[user_id]
user_data[activity_type].append({
'timestamp': timestamp,
'metadata': metadata or {}
})
# Nettoyer les anciens enregistrements
self._cleanup_old_records(user_id)
# Détecter les anomalies
anomalies = self._detect_anomalies(user_id)
if anomalies:
self._handle_anomalies(user_id, anomalies)
def _cleanup_old_records(self, user_id: str):
"""Supprime les enregistrements anciens"""
cutoff = datetime.utcnow() - timedelta(minutes=self.window_minutes)
user_data = self.user_activity[user_id]
for activity_type in user_data:
while (user_data[activity_type] and
user_data[activity_type][0]['timestamp'] < cutoff):
user_data[activity_type].popleft()
def _detect_anomalies(self, user_id: str) -> List[str]:
"""Détecte les anomalies pour un utilisateur"""
user_data = self.user_activity[user_id]
anomalies = []
# Trop de requĂȘtes
request_count = len(user_data['requests'])
if request_count > 100: # 100 requĂȘtes par heure
anomalies.append(f"high_request_volume: {request_count}")
# Taux d'erreur élevé
error_count = len(user_data['errors'])
if error_count > 0 and request_count > 0:
error_rate = error_count / request_count
if error_rate > 0.3: # Plus de 30% d'erreurs
anomalies.append(f"high_error_rate: {error_rate:.2%}")
# Contenu bloqué répétitif
blocked_count = len(user_data['blocked_content'])
if blocked_count > 5: # Plus de 5 contenus bloqués
anomalies.append(f"repeated_violations: {blocked_count}")
return anomalies
def _handle_anomalies(self, user_id: str, anomalies: List[str]):
"""GÚre les anomalies détectées"""
security_logger.log_security_event(
'anomaly_detected',
user_id=user_id,
details={'anomalies': anomalies}
)
# Actions possibles :
# - Limitation temporaire
# - Alerte administrateur
# - Blocage automatique
print(f"đš Anomalies dĂ©tectĂ©es pour {user_id}: {anomalies}")
anomaly_detector = AnomalyDetector()
def monitored_completion(user_id: str, messages: List[Dict], **kwargs):
"""Completion avec détection d'anomalies"""
try:
# Enregistrer la requĂȘte
anomaly_detector.record_activity(user_id, 'requests')
response = logged_secure_completion(user_id, messages, **kwargs)
return response
except ValueError as e:
# Enregistrer les violations de contenu
if "inapproprié" in str(e):
anomaly_detector.record_activity(user_id, 'blocked_content')
# Enregistrer l'erreur
anomaly_detector.record_activity(user_id, 'errors', {'error': str(e)})
raise
except Exception as e:
# Enregistrer l'erreur générale
anomaly_detector.record_activity(user_id, 'errors', {'error': str(e)})
raise
đ Chiffrement des donnĂ©esâ
Chiffrement des logs sensiblesâ
from cryptography.fernet import Fernet
import base64
import os
class EncryptedLogger:
def __init__(self, encryption_key=None):
if encryption_key:
self.fernet = Fernet(encryption_key)
else:
# Générer une nouvelle clé (à sauvegarder de maniÚre sécurisée)
key = Fernet.generate_key()
self.fernet = Fernet(key)
print(f"Nouvelle clé de chiffrement générée: {key.decode()}")
def encrypt_log_data(self, data: Dict[str, Any]) -> str:
"""Chiffre les données de log"""
json_str = json.dumps(data)
encrypted_data = self.fernet.encrypt(json_str.encode())
return base64.b64encode(encrypted_data).decode()
def decrypt_log_data(self, encrypted_data: str) -> Dict[str, Any]:
"""Déchiffre les données de log"""
encrypted_bytes = base64.b64decode(encrypted_data.encode())
decrypted_data = self.fernet.decrypt(encrypted_bytes)
return json.loads(decrypted_data.decode())
def log_sensitive_data(self, data: Dict[str, Any]):
"""Log des données sensibles de maniÚre chiffrée"""
encrypted = self.encrypt_log_data(data)
log_entry = {
'timestamp': datetime.utcnow().isoformat(),
'encrypted_data': encrypted
}
logging.info(json.dumps(log_entry))
# Utilisation
encryption_key = os.getenv("LOG_ENCRYPTION_KEY")
encrypted_logger = EncryptedLogger(encryption_key.encode() if encryption_key else None)
đ Checklist de sĂ©curitĂ©â
Avant le dĂ©ploiementâ
## Checklist de sécurité YODI
### đ Gestion des clĂ©s
- [ ] Clés API stockées dans des variables d'environnement
- [ ] Aucune clé en dur dans le code
- [ ] Fichiers de configuration exclus du contrĂŽle de version
- [ ] Rotation des clés configurée
- [ ] Clés de secours disponibles
### đĄïž Communication
- [ ] HTTPS obligatoire pour toutes les communications
- [ ] Certificats SSL valides et Ă jour
- [ ] Headers de sécurité configurés
- [ ] Timeout appropriés configurés
### đ« Validation des entrĂ©es
- [ ] Validation de tous les paramĂštres utilisateur
- [ ] Sanitisation du contenu
- [ ] Limites de taille implémentées
- [ ] Filtrage de contenu inapproprié
### đ Monitoring et audit
- [ ] Logs de sécurité configurés
- [ ] Détection d'anomalies active
- [ ] Alertes de sécurité configurées
- [ ] Chiffrement des logs sensibles
### đ Architecture
- [ ] API proxy sécurisé déployé
- [ ] Rate limiting implémenté
- [ ] Authentification utilisateur
- [ ] Principe du moindre privilÚge appliqué
### đš RĂ©ponse aux incidents
- [ ] Plan de réponse aux incidents documenté
- [ ] Procédure de révocation des clés
- [ ] Contacts d'urgence définis
- [ ] Tests de sécurité effectués