Limites de taux
L'API YODI implémente des limites de taux pour assurer une utilisation équitable et maintenir la qualité de service pour tous les utilisateurs.
Vue d'ensemble des limites
Limites par plan
| Plan | Requêtes/minute | Tokens/minute | Requêtes/jour |
|---|---|---|---|
| Gratuit | 20 | 40,000 | 1,000 |
| Starter | 60 | 90,000 | 10,000 |
| Pro | 300 | 500,000 | 50,000 |
| Enterprise | 1,000 | 2,000,000 | 200,000 |
Limites par modèle
| Modèle | Multiplication | Notes |
|---|---|---|
yodi-instruct | 1x | Limites de base |
yodi-1 | 1x | Limites de base |
yodi-embed | 3x | Plus de requêtes autorisées |
yodi-code | 1x | Limites de base |
yodi-1-32k | 0.5x | Limites réduites |
Comment fonctionnent les limites
Algorithme Token Bucket
import time
from threading import Lock
class TokenBucket:
def __init__(self, capacity, refill_rate):
self.capacity = capacity
self.tokens = capacity
self.refill_rate = refill_rate # tokens par seconde
self.last_refill = time.time()
self.lock = Lock()
def consume(self, tokens_needed=1):
"""Tente de consommer des tokens"""
with self.lock:
now = time.time()
# Remplir le bucket selon le temps écoulé
time_passed = now - self.last_refill
self.tokens = min(
self.capacity,
self.tokens + time_passed * self.refill_rate
)
self.last_refill = now
if self.tokens >= tokens_needed:
self.tokens -= tokens_needed
return True
return False
def wait_time(self, tokens_needed=1):
"""Calcule le temps d'attente nécessaire"""
with self.lock:
if self.tokens >= tokens_needed:
return 0
tokens_shortage = tokens_needed - self.tokens
return tokens_shortage / self.refill_rate
# Simulation des limites YODI
request_bucket = TokenBucket(capacity=60, refill_rate=1) # 60 req/min
token_bucket = TokenBucket(capacity=90000, refill_rate=1500) # 90k tokens/min
Headers de réponse
L'API YODI retourne des headers informatifs sur votre usage :
X-RateLimit-Limit-Requests: 60
X-RateLimit-Remaining-Requests: 45
X-RateLimit-Reset-Requests: 1693574460
X-RateLimit-Limit-Tokens: 90000
X-RateLimit-Remaining-Tokens: 85000
X-RateLimit-Reset-Tokens: 1693574460
Gestion des limites dans le code
Client avec rate limiting intégré
import time
import math
from datetime import datetime, timedelta
class RateLimitedYodiClient:
def __init__(self, api_key, requests_per_minute=60, tokens_per_minute=90000):
self.client = Client(api_key=api_key)
self.requests_per_minute = requests_per_minute
self.tokens_per_minute = tokens_per_minute
# Tracking des requêtes
self.request_timestamps = []
self.token_usage = []
def _clean_old_records(self):
"""Supprime les enregistrements de plus d'une minute"""
cutoff = datetime.now() - timedelta(minutes=1)
self.request_timestamps = [
ts for ts in self.request_timestamps if ts > cutoff
]
self.token_usage = [
(ts, tokens) for ts, tokens in self.token_usage if ts > cutoff
]
def _can_make_request(self, estimated_tokens=1000):
"""Vérifie si on peut faire une requête"""
self._clean_old_records()
# Vérifier les limites de requêtes
if len(self.request_timestamps) >= self.requests_per_minute:
return False, "Limite de requêtes atteinte"
# Vérifier les limites de tokens
current_tokens = sum(tokens for _, tokens in self.token_usage)
if current_tokens + estimated_tokens > self.tokens_per_minute:
return False, "Limite de tokens atteinte"
return True, None
def _wait_if_needed(self, estimated_tokens=1000):
"""Attend si nécessaire pour respecter les limites"""
can_proceed, reason = self._can_make_request(estimated_tokens)
if not can_proceed:
if "requêtes" in reason:
# Attendre que la plus ancienne requête expire
oldest_request = min(self.request_timestamps)
wait_until = oldest_request + timedelta(minutes=1)
wait_seconds = (wait_until - datetime.now()).total_seconds()
else:
# Attendre que des tokens se libèrent
oldest_token_usage = min(self.token_usage, key=lambda x: x[0])
wait_until = oldest_token_usage[0] + timedelta(minutes=1)
wait_seconds = (wait_until - datetime.now()).total_seconds()
if wait_seconds > 0:
print(f"Attente de {wait_seconds:.1f}s pour respecter les limites...")
time.sleep(wait_seconds)
def _estimate_tokens(self, messages, max_tokens=None):
"""Estime le nombre de tokens pour une requête"""
# Estimation grossière : 1.3 tokens par mot
input_tokens = sum(len(msg['content'].split()) * 1.3 for msg in messages)
output_tokens = max_tokens or 500
return int(input_tokens + output_tokens)
def chat_completions_create(self, **kwargs):
"""Crée une completion en respectant les limites"""
messages = kwargs.get('messages', [])
max_tokens = kwargs.get('max_tokens')
estimated_tokens = self._estimate_tokens(messages, max_tokens)
# Attendre si nécessaire
self._wait_if_needed(estimated_tokens)
# Faire la requête
now = datetime.now()
try:
response = self.client.chat.completions.create(**kwargs)
# Enregistrer l'usage réel
actual_tokens = response.usage.total_tokens if response.usage else estimated_tokens
self.request_timestamps.append(now)
self.token_usage.append((now, actual_tokens))
return response
except Exception as e:
# Enregistrer quand même la tentative
self.request_timestamps.append(now)
raise e
# Utilisation
rate_limited_client = RateLimitedYodiClient(
api_key=os.getenv("YODI_API_KEY"),
requests_per_minute=60,
tokens_per_minute=90000
)
Backoff exponentiel avancé
import random
import time
from functools import wraps
class ExponentialBackoff:
def __init__(self,
base_delay=1,
max_delay=60,
exponential_base=2,
jitter=True,
max_retries=5):
self.base_delay = base_delay
self.max_delay = max_delay
self.exponential_base = exponential_base
self.jitter = jitter
self.max_retries = max_retries
def calculate_delay(self, attempt):
"""Calcule le délai pour une tentative donnée"""
delay = self.base_delay * (self.exponential_base ** attempt)
delay = min(delay, self.max_delay)
if self.jitter:
# Ajouter du jitter pour éviter le thundering herd
jitter_range = delay * 0.1
delay += random.uniform(-jitter_range, jitter_range)
return max(0, delay)
def __call__(self, func):
@wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(self.max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
last_exception = e
# Vérifier si c'est une erreur de rate limit
if self._is_rate_limit_error(e):
if attempt < self.max_retries - 1:
delay = self.calculate_delay(attempt)
print(f"Rate limit hit, waiting {delay:.1f}s (attempt {attempt + 1})")
time.sleep(delay)
continue
# Pour les autres erreurs, ne pas retry
raise e
raise last_exception
return wrapper
def _is_rate_limit_error(self, error):
"""Détermine si l'erreur est liée au rate limiting"""
error_str = str(error).lower()
return any(keyword in error_str for keyword in [
'rate limit', 'rate_limit', '429', 'too many requests'
])
# Utilisation avec décorateur
@ExponentialBackoff(base_delay=2, max_delay=120, max_retries=5)
def robust_api_call(messages, **kwargs):
return client.chat.completions.create(messages=messages, **kwargs)
Monitoring en temps réel
import threading
import time
from collections import deque, defaultdict
from datetime import datetime, timedelta
class RateLimitMonitor:
def __init__(self, window_minutes=5):
self.window_minutes = window_minutes
self.requests = deque()
self.tokens = deque()
self.errors = deque()
self.lock = threading.Lock()
# Statistiques par endpoint
self.stats_by_endpoint = defaultdict(lambda: {
'requests': deque(),
'tokens': deque(),
'errors': deque()
})
def record_request(self, endpoint, tokens_used, error=None):
"""Enregistre une requête"""
now = datetime.now()
with self.lock:
self.requests.append(now)
self.tokens.append((now, tokens_used))
# Stats par endpoint
endpoint_stats = self.stats_by_endpoint[endpoint]
endpoint_stats['requests'].append(now)
endpoint_stats['tokens'].append((now, tokens_used))
if error:
self.errors.append((now, error))
endpoint_stats['errors'].append((now, error))
# Nettoyer les anciens enregistrements
self._cleanup()
def _cleanup(self):
"""Supprime les enregistrements anciens"""
cutoff = datetime.now() - timedelta(minutes=self.window_minutes)
# Nettoyer les stats globales
while self.requests and self.requests[0] < cutoff:
self.requests.popleft()
while self.tokens and self.tokens[0][0] < cutoff:
self.tokens.popleft()
while self.errors and self.errors[0][0] < cutoff:
self.errors.popleft()
# Nettoyer les stats par endpoint
for endpoint_stats in self.stats_by_endpoint.values():
while endpoint_stats['requests'] and endpoint_stats['requests'][0] < cutoff:
endpoint_stats['requests'].popleft()
while endpoint_stats['tokens'] and endpoint_stats['tokens'][0][0] < cutoff:
endpoint_stats['tokens'].popleft()
while endpoint_stats['errors'] and endpoint_stats['errors'][0][0] < cutoff:
endpoint_stats['errors'].popleft()
def get_current_usage(self):
"""Retourne l'usage actuel"""
with self.lock:
requests_per_minute = len(self.requests) / self.window_minutes
total_tokens = sum(tokens for _, tokens in self.tokens)
tokens_per_minute = total_tokens / self.window_minutes
error_rate = len(self.errors) / max(1, len(self.requests))
return {
'requests_per_minute': requests_per_minute,
'tokens_per_minute': tokens_per_minute,
'error_rate': error_rate,
'total_requests': len(self.requests),
'total_tokens': total_tokens
}
def get_endpoint_stats(self, endpoint):
"""Retourne les stats pour un endpoint spécifique"""
with self.lock:
if endpoint not in self.stats_by_endpoint:
return None
stats = self.stats_by_endpoint[endpoint]
requests_per_minute = len(stats['requests']) / self.window_minutes
total_tokens = sum(tokens for _, tokens in stats['tokens'])
tokens_per_minute = total_tokens / self.window_minutes
error_rate = len(stats['errors']) / max(1, len(stats['requests']))
return {
'requests_per_minute': requests_per_minute,
'tokens_per_minute': tokens_per_minute,
'error_rate': error_rate
}
def predict_limit_hit(self, limit_rpm=60, limit_tpm=90000):
"""Prédit quand les limites seront atteintes"""
usage = self.get_current_usage()
predictions = {}
if usage['requests_per_minute'] > 0:
rpm_eta = (limit_rpm - usage['requests_per_minute']) / usage['requests_per_minute']
predictions['requests'] = max(0, rpm_eta)
if usage['tokens_per_minute'] > 0:
tpm_eta = (limit_tpm - usage['tokens_per_minute']) / usage['tokens_per_minute']
predictions['tokens'] = max(0, tpm_eta)
return predictions
# Utilisation
monitor = RateLimitMonitor()
def monitored_api_call(endpoint, messages, **kwargs):
"""Appel API avec monitoring"""
start_time = time.time()
error = None
tokens_used = 0
try:
response = client.chat.completions.create(messages=messages, **kwargs)
tokens_used = response.usage.total_tokens if response.usage else 0
return response
except Exception as e:
error = e
raise
finally:
monitor.record_request(endpoint, tokens_used, error)
# Rapport périodique
def print_usage_report():
usage = monitor.get_current_usage()
predictions = monitor.predict_limit_hit()
print(f"""
Usage des 5 dernières minutes:
- Requêtes/min: {usage['requests_per_minute']:.1f}
- Tokens/min: {usage['tokens_per_minute']:.0f}
- Taux d'erreur: {usage['error_rate']:.2%}
Prédictions:
- Limite requêtes dans: {predictions.get('requests', 'N/A')} min
- Limite tokens dans: {predictions.get('tokens', 'N/A')} min
""")
Optimisation pour les limites
Stratégies de batching
import asyncio
from typing import List, Dict, Any
class RequestBatcher:
def __init__(self, batch_size=10, flush_interval=5):
self.batch_size = batch_size
self.flush_interval = flush_interval
self.pending_requests = []
self.lock = asyncio.Lock()
# Démarrer le flush automatique
asyncio.create_task(self._auto_flush())
async def add_request(self, request_data):
"""Ajoute une requête au batch"""
future = asyncio.Future()
async with self.lock:
self.pending_requests.append((request_data, future))
if len(self.pending_requests) >= self.batch_size:
await self._flush_batch()
return await future
async def _flush_batch(self):
"""Traite le batch actuel"""
if not self.pending_requests:
return
batch = self.pending_requests[:]
self.pending_requests.clear()
# Traiter les requêtes en parallèle (avec limitation)
semaphore = asyncio.Semaphore(5) # Max 5 requêtes simultanées
async def process_request(request_data, future):
async with semaphore:
try:
result = await self._make_api_call(request_data)
future.set_result(result)
except Exception as e:
future.set_exception(e)
tasks = [
process_request(request_data, future)
for request_data, future in batch
]
await asyncio.gather(*tasks, return_exceptions=True)
async def _auto_flush(self):
"""Flush automatique périodique"""
while True:
await asyncio.sleep(self.flush_interval)
async with self.lock:
if self.pending_requests:
await self._flush_batch()
async def _make_api_call(self, request_data):
"""Fait l'appel API réel"""
return client.chat.completions.create(**request_data)
# Utilisation
batcher = RequestBatcher(batch_size=10, flush_interval=3)
async def batched_completion(messages, **kwargs):
"""Completion via le système de batch"""
request_data = {
'messages': messages,
**kwargs
}
return await batcher.add_request(request_data)
Cache intelligent par limite
import hashlib
import json
import time
from typing import Optional, Tuple
class RateLimitAwareCache:
def __init__(self, max_size=1000):
self.cache = {}
self.access_times = {}
self.max_size = max_size
def _generate_key(self, messages, **kwargs) -> str:
"""Génère une clé de cache unique"""
# Exclure les paramètres qui affectent la créativité pour le cache
cache_kwargs = {k: v for k, v in kwargs.items()
if k not in ['temperature', 'top_p', 'seed']}
content = {
'messages': messages,
'params': cache_kwargs
}
return hashlib.md5(
json.dumps(content, sort_keys=True).encode()
).hexdigest()
def get(self, messages, **kwargs) -> Optional[dict]:
"""Récupère une réponse du cache"""
# Ne pas utiliser le cache pour les requêtes créatives
if kwargs.get('temperature', 0) > 0.5:
return None
key = self._generate_key(messages, **kwargs)
if key in self.cache:
self.access_times[key] = time.time()
return self.cache[key]
return None
def set(self, response, messages, **kwargs):
"""Met en cache une réponse"""
if kwargs.get('temperature', 0) > 0.5:
return
key = self._generate_key(messages, **kwargs)
# Nettoyer le cache si nécessaire
if len(self.cache) >= self.max_size:
self._evict_oldest()
self.cache[key] = response
self.access_times[key] = time.time()
def _evict_oldest(self):
"""Supprime les entrées les plus anciennes"""
# Supprimer 20% des entrées les plus anciennes
sorted_keys = sorted(
self.access_times.keys(),
key=lambda k: self.access_times[k]
)
evict_count = max(1, len(sorted_keys) // 5)
for key in sorted_keys[:evict_count]:
del self.cache[key]
del self.access_times[key]
cache = RateLimitAwareCache()
def cached_completion(messages, **kwargs):
"""Completion avec cache intelligent"""
# Vérifier le cache d'abord
cached_response = cache.get(messages, **kwargs)
if cached_response:
print("� Réponse depuis le cache")
return cached_response
# Faire l'appel API
response = monitored_api_call('chat', messages, **kwargs)
# Mettre en cache
cache.set(response, messages, **kwargs)
return response
Alertes et notifications
Système d'alertes avancé
import smtplib
from email.mime.text import MimeText
from enum import Enum
class AlertLevel(Enum):
INFO = "info"
WARNING = "warning"
CRITICAL = "critical"
class AlertManager:
def __init__(self, email_config=None, webhook_url=None):
self.email_config = email_config
self.webhook_url = webhook_url
self.alert_history = []
# Seuils d'alerte
self.thresholds = {
AlertLevel.WARNING: {
'requests_per_minute': 45, # 75% de la limite
'tokens_per_minute': 67500,
'error_rate': 0.05
},
AlertLevel.CRITICAL: {
'requests_per_minute': 55, # 90% de la limite
'tokens_per_minute': 81000,
'error_rate': 0.10
}
}
def check_usage_and_alert(self, usage_stats):
"""Vérifie l'usage et envoie des alertes si nécessaire"""
alerts = []
for level in [AlertLevel.CRITICAL, AlertLevel.WARNING]:
thresholds = self.thresholds[level]
if usage_stats['requests_per_minute'] > thresholds['requests_per_minute']:
alerts.append({
'level': level,
'type': 'rate_limit',
'message': f"Limite de requêtes proche: {usage_stats['requests_per_minute']:.1f}/min"
})
if usage_stats['tokens_per_minute'] > thresholds['tokens_per_minute']:
alerts.append({
'level': level,
'type': 'token_limit',
'message': f"Limite de tokens proche: {usage_stats['tokens_per_minute']:.0f}/min"
})
if usage_stats['error_rate'] > thresholds['error_rate']:
alerts.append({
'level': level,
'type': 'error_rate',
'message': f"Taux d'erreur élevé: {usage_stats['error_rate']:.2%}"
})
# Envoyer les alertes
for alert in alerts:
self._send_alert(alert)
def _send_alert(self, alert):
"""Envoie une alerte"""
# Éviter le spam d'alertes
recent_similar = [
a for a in self.alert_history[-10:]
if a['type'] == alert['type'] and
time.time() - a['timestamp'] < 300 # 5 minutes
]
if recent_similar:
return
alert['timestamp'] = time.time()
self.alert_history.append(alert)
message = f"� {alert['level'].value.upper()}: {alert['message']}"
print(message)
if self.email_config and alert['level'] == AlertLevel.CRITICAL:
self._send_email_alert(message)
def _send_email_alert(self, message):
"""Envoie une alerte par email"""
try:
msg = MimeText(message)
msg['Subject'] = 'YODI API Alert'
msg['From'] = self.email_config['from']
msg['To'] = self.email_config['to']
with smtplib.SMTP(self.email_config['smtp_server']) as server:
server.starttls()
server.login(self.email_config['username'], self.email_config['password'])
server.send_message(msg)
except Exception as e:
print(f"Erreur envoi email: {e}")
# Configuration et utilisation
alert_manager = AlertManager(
email_config={
'smtp_server': 'smtp.gmail.com',
'username': 'your-email@gmail.com',
'password': 'your-app-password',
'from': 'your-email@gmail.com',
'to': 'admin@yourcompany.com'
}
)
# Vérification périodique
def periodic_alert_check():
usage = monitor.get_current_usage()
alert_manager.check_usage_and_alert(usage)