Aller au contenu principal

Meilleures pratiques

Ce guide présente les meilleures pratiques pour utiliser l'API YODI de manière efficace, sécurisée et optimale.

Sécurité

Gestion des clés API

Bonnes pratiques

# Utilisez des variables d'environnement
export YODI_API_KEY="your_secure_api_key"

# Fichier .env (à ajouter dans .gitignore)
YODI_API_KEY=your_secure_api_key
YODI_BASE_URL=https://api.yodi.tg/v1
# Configuration sécurisée
import os
from yodi import Client

# Correct : utilise les variables d'environnement
client = Client(api_key=os.getenv("YODI_API_KEY"))

# Incorrect : clé en dur dans le code
# client = Client(api_key="sk-your-key-here")

Rotation des clés

# Implémentation avec rotation automatique
class SecureYodiClient:
def __init__(self, primary_key, backup_key=None):
self.primary_key = primary_key
self.backup_key = backup_key
self.current_client = Client(api_key=primary_key)

def _fallback_to_backup(self):
"""Utilise la clé de secours en cas d'échec"""
if self.backup_key:
self.current_client = Client(api_key=self.backup_key)
return True
return False

def chat_completion(self, **kwargs):
try:
return self.current_client.chat.completions.create(**kwargs)
except Exception as e:
if "401" in str(e) and self._fallback_to_backup():
return self.current_client.chat.completions.create(**kwargs)
raise e

Proxy sécurisé

// Exemple de proxy sécurisé avec Express
const express = require("express");
const rateLimit = require("express-rate-limit");
const helmet = require("helmet");

const app = express();

// Sécurité de base
app.use(helmet());

// Rate limiting
const limiter = rateLimit({
windowMs: 15 * 60 * 1000, // 15 minutes
max: 100, // limite par IP
message: "Trop de requêtes, réessayez plus tard",
});
app.use("/api/", limiter);

// Validation des requêtes
const validateRequest = (req, res, next) => {
const { messages } = req.body;

// Vérifier la taille du contenu
if (JSON.stringify(messages).length > 100000) {
return res.status(413).json({ error: "Requête trop large" });
}

// Filtrer le contenu inapproprié
const hasInappropriateContent = messages.some((msg) =>
/spam|abuse|harmful/.test(msg.content.toLowerCase()),
);

if (hasInappropriateContent) {
return res.status(400).json({ error: "Contenu inapproprié détecté" });
}

next();
};

app.use("/api/chat", validateRequest);

Performance

Optimisation des requêtes

Paramètres optimaux par cas d'usage

# Configuration pour différents cas d'usage
OPTIMAL_CONFIGS = {
'conversation': {
'temperature': 0.7,
'max_tokens': 150,
'top_p': 0.9,
'frequency_penalty': 0.2
},
'creative_writing': {
'temperature': 1.2,
'max_tokens': 500,
'top_p': 0.95,
'presence_penalty': 0.3
},
'factual_qa': {
'temperature': 0.1,
'max_tokens': 200,
'top_p': 0.8,
'frequency_penalty': 0.0
},
'code_generation': {
'temperature': 0.2,
'max_tokens': 800,
'top_p': 0.9,
'stop': ['```', '---']
}
}

def create_optimized_completion(use_case, messages):
config = OPTIMAL_CONFIGS.get(use_case, OPTIMAL_CONFIGS['conversation'])

return client.chat.completions.create(
model="yodi-1",
messages=messages,
**config
)

Cache intelligent

import hashlib
import json
import time
from functools import wraps

class ResponseCache:
def __init__(self, ttl=3600): # TTL en secondes
self.cache = {}
self.ttl = ttl

def _get_cache_key(self, *args, **kwargs):
"""Génère une clé de cache unique"""
content = json.dumps((args, sorted(kwargs.items())), sort_keys=True)
return hashlib.md5(content.encode()).hexdigest()

def _is_expired(self, timestamp):
return time.time() - timestamp > self.ttl

def get(self, *args, **kwargs):
key = self._get_cache_key(*args, **kwargs)
if key in self.cache:
response, timestamp = self.cache[key]
if not self._is_expired(timestamp):
return response
else:
del self.cache[key]
return None

def set(self, response, *args, **kwargs):
key = self._get_cache_key(*args, **kwargs)
self.cache[key] = (response, time.time())

cache = ResponseCache(ttl=1800) # 30 minutes

def cached_completion(messages, **kwargs):
"""Completion avec cache pour réduire les coûts"""
# Ne pas cacher les réponses créatives
if kwargs.get('temperature', 0) > 0.5:
return client.chat.completions.create(messages=messages, **kwargs)

cached_response = cache.get(messages, **kwargs)
if cached_response:
return cached_response

response = client.chat.completions.create(messages=messages, **kwargs)
cache.set(response, messages, **kwargs)

return response

Gestion des tokens

Optimisation du contexte

def optimize_context(messages, max_tokens=6000):
"""Optimise le contexte pour rester sous la limite"""
# Garder toujours le message système
system_messages = [msg for msg in messages if msg["role"] == "system"]
conversation = [msg for msg in messages if msg["role"] != "system"]

# Estimation grossière des tokens (1.3 tokens par mot)
def estimate_tokens(text):
return len(text.split()) * 1.3

total_tokens = sum(estimate_tokens(msg["content"]) for msg in messages)

# Si on dépasse la limite, garder seulement les messages récents
while total_tokens > max_tokens and len(conversation) > 2:
# Supprimer les messages les plus anciens par paires (user/assistant)
conversation = conversation[2:]
total_tokens = sum(estimate_tokens(msg["content"]) for msg in system_messages + conversation)

return system_messages + conversation

def smart_completion(messages, **kwargs):
"""Completion avec gestion automatique du contexte"""
optimized_messages = optimize_context(messages)

try:
return client.chat.completions.create(
messages=optimized_messages,
**kwargs
)
except Exception as e:
if "context_length_exceeded" in str(e):
# Réduire encore plus le contexte
further_optimized = optimize_context(messages, max_tokens=4000)
return client.chat.completions.create(
messages=further_optimized,
**kwargs
)
raise e

Compression de prompts

def compress_prompt(text, target_ratio=0.7):
"""Compresse un prompt en gardant l'essentiel"""
sentences = text.split('. ')

# Garder les phrases les plus importantes (début et fin)
if len(sentences) <= 3:
return text

keep_count = max(2, int(len(sentences) * target_ratio))

# Garder la première phrase, les dernières, et quelques du milieu
important_sentences = (
sentences[:1] + # Première phrase
sentences[-(keep_count-1):] # Dernières phrases
)

return '. '.join(important_sentences) + '.'

def create_efficient_prompt(context, question):
"""Crée un prompt efficace"""
compressed_context = compress_prompt(context, target_ratio=0.6)

prompt = f"""Contexte: {compressed_context}

Question: {question}

Réponse concise basée sur le contexte:"""

return prompt

� Optimisation des coûts

Stratégies de réduction des coûts

class CostOptimizer:
def __init__(self):
self.model_costs = {
'yodi-instruct': 1.0, # Le moins cher
'yodi-1': 2.0,
'yodi-code': 2.0,
'yodi-1-32k': 4.0 # Le plus cher
}

def choose_optimal_model(self, task_type, context_length, creativity_needed):
"""Choisit le modèle le plus économique pour la tâche"""
if task_type in ['extraction', 'classification', 'simple_qa']:
return 'yodi-instruct'

if task_type == 'code' and context_length < 4000:
return 'yodi-code'

if context_length > 8000:
return 'yodi-1-32k'

return 'yodi-1'

def estimate_cost(self, input_tokens, output_tokens, model):
"""Estime le coût d'une requête"""
base_cost = self.model_costs.get(model, 2.0)
total_tokens = input_tokens + output_tokens
return total_tokens * base_cost * 0.001 # Prix fictif

def optimize_request(self, messages, task_type='general'):
"""Optimise une requête pour minimiser les coûts"""
context_length = sum(len(msg['content'].split()) for msg in messages)

# Choisir le modèle optimal
optimal_model = self.choose_optimal_model(
task_type,
context_length,
creativity_needed=False
)

# Ajuster max_tokens selon le besoin
if task_type in ['classification', 'yes_no']:
max_tokens = 50
elif task_type == 'summary':
max_tokens = 200
else:
max_tokens = 500

return {
'model': optimal_model,
'max_tokens': max_tokens,
'temperature': 0.3 if task_type != 'creative' else 0.8
}

optimizer = CostOptimizer()

# Utilisation
def cost_effective_completion(messages, task_type='general'):
config = optimizer.optimize_request(messages, task_type)

return client.chat.completions.create(
messages=messages,
**config
)

Batch processing

import asyncio
from concurrent.futures import ThreadPoolExecutor

async def process_batch_requests(requests, batch_size=10):
"""Traite les requêtes par lots pour optimiser les coûts"""
results = []

for i in range(0, len(requests), batch_size):
batch = requests[i:i + batch_size]

# Traiter le lot en parallèle
with ThreadPoolExecutor(max_workers=5) as executor:
loop = asyncio.get_event_loop()

tasks = [
loop.run_in_executor(
executor,
lambda req=request: client.chat.completions.create(**req)
)
for request in batch
]

batch_results = await asyncio.gather(*tasks, return_exceptions=True)
results.extend(batch_results)

# Pause entre les lots pour respecter les rate limits
if i + batch_size < len(requests):
await asyncio.sleep(1)

return results

# Exemple d'utilisation
async def process_customer_queries(queries):
requests = [
{
'model': 'yodi-instruct',
'messages': [{'role': 'user', 'content': query}],
'max_tokens': 100,
'temperature': 0.3
}
for query in queries
]

return await process_batch_requests(requests)

Robustesse et fiabilité

Gestion d'erreurs robuste

import logging
import time
import random
from functools import wraps

class RobustYodiClient:
def __init__(self, api_key, max_retries=3, base_delay=1):
self.client = Client(api_key=api_key)
self.max_retries = max_retries
self.base_delay = base_delay
self.logger = logging.getLogger(__name__)

def _should_retry(self, error):
"""Détermine si on doit réessayer selon l'erreur"""
error_str = str(error).lower()

# Retry sur les erreurs temporaires
retry_conditions = [
"rate_limit",
"429",
"500",
"502",
"503",
"timeout",
"connection"
]

return any(condition in error_str for condition in retry_conditions)

def _calculate_delay(self, attempt):
"""Calcule le délai avec backoff exponentiel et jitter"""
base_delay = self.base_delay * (2 ** attempt)
jitter = random.uniform(0, base_delay * 0.1)
return base_delay + jitter

def _robust_call(self, func, *args, **kwargs):
"""Exécute un appel avec retry automatique"""
last_error = None

for attempt in range(self.max_retries):
try:
return func(*args, **kwargs)

except Exception as error:
last_error = error

if not self._should_retry(error) or attempt == self.max_retries - 1:
self.logger.error(f"Erreur définitive après {attempt + 1} tentatives: {error}")
raise error

delay = self._calculate_delay(attempt)
self.logger.warning(f"Tentative {attempt + 1} échouée, retry dans {delay:.1f}s: {error}")
time.sleep(delay)

raise last_error

def chat_completion(self, **kwargs):
return self._robust_call(self.client.chat.completions.create, **kwargs)

def embeddings(self, **kwargs):
return self._robust_call(self.client.embeddings.create, **kwargs)

# Utilisation
robust_client = RobustYodiClient(os.getenv("YODI_API_KEY"))

try:
response = robust_client.chat_completion(
model="yodi-1",
messages=[{"role": "user", "content": "Hello"}]
)
except Exception as e:
print(f"Échec définitif: {e}")

Circuit breaker pattern

import time
from enum import Enum

class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"

class CircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED

def call(self, func, *args, **kwargs):
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.timeout:
self.state = CircuitState.HALF_OPEN
else:
raise Exception("Circuit breaker is OPEN")

try:
result = func(*args, **kwargs)
self._on_success()
return result

except Exception as e:
self._on_failure()
raise e

def _on_success(self):
self.failure_count = 0
self.state = CircuitState.CLOSED

def _on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()

if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN

# Utilisation
circuit_breaker = CircuitBreaker(failure_threshold=3, timeout=30)

def protected_api_call(messages):
return circuit_breaker.call(
client.chat.completions.create,
model="yodi-1",
messages=messages
)

Monitoring et observabilité

Métriques personnalisées

import time
from collections import defaultdict, deque
from datetime import datetime, timedelta

class YodiMetrics:
def __init__(self):
self.metrics = defaultdict(list)
self.counters = defaultdict(int)
self.response_times = deque(maxlen=1000)
self.errors = deque(maxlen=100)

def record_request(self, endpoint, model, response_time, tokens_used, error=None):
timestamp = datetime.now()

self.response_times.append(response_time)
self.counters[f"{endpoint}_{model}"] += 1
self.counters["total_tokens"] += tokens_used

if error:
self.errors.append({
'timestamp': timestamp,
'endpoint': endpoint,
'model': model,
'error': str(error)
})
self.counters["errors"] += 1

def get_stats(self, window_minutes=60):
cutoff = datetime.now() - timedelta(minutes=window_minutes)

recent_errors = [
err for err in self.errors
if err['timestamp'] > cutoff
]

recent_response_times = list(self.response_times)[-100:] # 100 dernières requêtes

return {
'total_requests': sum(v for k, v in self.counters.items() if k.endswith('_yodi-1')),
'total_tokens': self.counters["total_tokens"],
'error_count': len(recent_errors),
'avg_response_time': sum(recent_response_times) / len(recent_response_times) if recent_response_times else 0,
'error_rate': len(recent_errors) / max(1, len(recent_response_times))
}

metrics = YodiMetrics()

def monitored_completion(messages, **kwargs):
"""Completion avec monitoring automatique"""
start_time = time.time()
error = None
tokens_used = 0

try:
response = client.chat.completions.create(messages=messages, **kwargs)
tokens_used = response.usage.total_tokens if response.usage else 0
return response

except Exception as e:
error = e
raise

finally:
response_time = time.time() - start_time
metrics.record_request(
endpoint="chat_completion",
model=kwargs.get('model', 'yodi-1'),
response_time=response_time,
tokens_used=tokens_used,
error=error
)

# Rapport périodique
def print_metrics_report():
stats = metrics.get_stats()
print(f"""
Métriques YODI (dernière heure):
- Requêtes totales: {stats['total_requests']}
- Tokens utilisés: {stats['total_tokens']:,}
- Taux d'erreur: {stats['error_rate']:.2%}
- Temps de réponse moyen: {stats['avg_response_time']:.2f}s
""")

Alertes automatiques

class AlertManager:
def __init__(self, webhook_url=None):
self.webhook_url = webhook_url
self.thresholds = {
'error_rate': 0.05, # 5%
'response_time': 10.0, # 10 secondes
'tokens_per_hour': 100000
}

def check_and_alert(self, stats):
alerts = []

if stats['error_rate'] > self.thresholds['error_rate']:
alerts.append(f"� Taux d'erreur élevé: {stats['error_rate']:.2%}")

if stats['avg_response_time'] > self.thresholds['response_time']:
alerts.append(f" Temps de réponse lent: {stats['avg_response_time']:.2f}s")

if stats['total_tokens'] > self.thresholds['tokens_per_hour']:
alerts.append(f"� Usage élevé: {stats['total_tokens']:,} tokens")

if alerts:
self._send_alert('\n'.join(alerts))

def _send_alert(self, message):
print(f"ALERTE: {message}")
# Ici, vous pourriez envoyer à Slack, email, etc.

alert_manager = AlertManager()

# Vérification périodique (à exécuter avec un scheduler)
def periodic_health_check():
stats = metrics.get_stats()
alert_manager.check_and_alert(stats)

Configuration et déploiement

Configuration par environnement

# config.py
import os
from enum import Enum

class Environment(Enum):
DEVELOPMENT = "development"
STAGING = "staging"
PRODUCTION = "production"

class Config:
def __init__(self):
self.env = Environment(os.getenv("ENVIRONMENT", "development"))
self.yodi_api_key = os.getenv("YODI_API_KEY")
self.yodi_base_url = os.getenv("YODI_BASE_URL", "https://api.yodi.tg/v1")

# Configuration par environnement
if self.env == Environment.DEVELOPMENT:
self.default_model = "yodi-instruct" # Moins cher pour dev
self.max_tokens = 100
self.rate_limit = 10
self.cache_ttl = 300 # 5 minutes

elif self.env == Environment.STAGING:
self.default_model = "yodi-1"
self.max_tokens = 300
self.rate_limit = 50
self.cache_ttl = 1800 # 30 minutes

else: # PRODUCTION
self.default_model = "yodi-1"
self.max_tokens = 500
self.rate_limit = 100
self.cache_ttl = 3600 # 1 heure

def get_client_config(self):
return {
'api_key': self.yodi_api_key,
'base_url': self.yodi_base_url,
'timeout': 30 if self.env == Environment.PRODUCTION else 10
}

config = Config()

Tests et validation

# test_yodi_integration.py
import pytest
import time
from unittest.mock import Mock, patch

class TestYodiIntegration:
def setup_method(self):
self.client = Client(api_key="test_key")

def test_basic_completion(self):
"""Test de base pour les completions"""
messages = [{"role": "user", "content": "Hello"}]

with patch.object(self.client.chat.completions, 'create') as mock_create:
mock_create.return_value = Mock(
choices=[Mock(message=Mock(content="Hello!"))]
)

response = self.client.chat.completions.create(
model="yodi-1",
messages=messages
)

assert response.choices[0].message.content == "Hello!"

def test_rate_limiting(self):
"""Test du rate limiting"""
start_time = time.time()

# Simuler plusieurs requêtes rapides
for _ in range(5):
try:
monitored_completion([{"role": "user", "content": "test"}])
except:
pass # Ignorer les erreurs pour ce test

duration = time.time() - start_time
assert duration >= 1.0 # Devrait prendre au moins 1 seconde avec rate limiting

def test_error_handling(self):
"""Test de la gestion d'erreurs"""
with patch.object(self.client.chat.completions, 'create') as mock_create:
mock_create.side_effect = Exception("API Error")

with pytest.raises(Exception):
monitored_completion([{"role": "user", "content": "test"}])

@pytest.mark.integration
def test_real_api_call(self):
"""Test d'intégration réelle (nécessite une vraie clé API)"""
if not os.getenv("YODI_API_KEY"):
pytest.skip("Pas de clé API pour les tests d'intégration")

response = self.client.chat.completions.create(
model="yodi-1",
messages=[{"role": "user", "content": "Say 'test successful'"}],
max_tokens=10
)

assert "test" in response.choices[0].message.content.lower()

# Exécution des tests
# pytest test_yodi_integration.py -v
# pytest test_yodi_integration.py -m integration # Tests d'intégration seulement

Documentation et maintenance

Documentation automatique

def generate_api_docs():
"""Génère automatiquement la documentation de votre API wrapper"""
docs = {
'endpoints': {},
'models': {},
'examples': {}
}

# Documenter les endpoints
docs['endpoints']['chat'] = {
'description': 'Crée une completion de chat',
'parameters': {
'messages': 'Liste des messages de conversation',
'model': 'Modèle à utiliser (défaut: yodi-1)',
'temperature': 'Contrôle la créativité (0.0-2.0)',
'max_tokens': 'Nombre maximum de tokens à générer'
},
'example': {
'request': {
'messages': [{'role': 'user', 'content': 'Hello'}],
'model': 'yodi-1',
'temperature': 0.7
},
'response': {
'choices': [{'message': {'content': 'Hello! How can I help you?'}}]
}
}
}

return docs

Logs structurés

import json
import logging

class StructuredLogger:
def __init__(self, name):
self.logger = logging.getLogger(name)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(message)s'))
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)

def log_api_call(self, endpoint, params, response_time, tokens, error=None):
log_data = {
'timestamp': time.time(),
'endpoint': endpoint,
'model': params.get('model'),
'response_time_ms': int(response_time * 1000),
'tokens_used': tokens,
'success': error is None
}

if error:
log_data['error'] = str(error)

self.logger.info(json.dumps(log_data))

structured_logger = StructuredLogger('yodi_api')

Prochaines étapes