Rate limits
The YODI API enforces rate limits to ensure fair usage and maintain quality of service for all users.
Limits overview​
Limits by plan​
| Plan | RequĂŞtes/minute | Tokens/minute | RequĂŞtes/jour |
|---|---|---|---|
| Gratuit | 20 | 40,000 | 1,000 |
| Starter | 60 | 90,000 | 10,000 |
| Pro | 300 | 500,000 | 50,000 |
| Enterprise | 1,000 | 2,000,000 | 200,000 |
Limites par modèle​
| Modèle | Multiplication | Notes |
|---|---|---|
yodi-instruct | 1x | Limites de base |
yodi-1 | 1x | Limites de base |
yodi-embed | 3x | Plus de requêtes autorisées |
yodi-code | 1x | Limites de base |
yodi-1-32k | 0.5x | Limites réduites |
Comment fonctionnent les limites​
Algorithme Token Bucket​
import time
from threading import Lock
class TokenBucket:
def __init__(self, capacity, refill_rate):
self.capacity = capacity
self.tokens = capacity
self.refill_rate = refill_rate # tokens par seconde
self.last_refill = time.time()
self.lock = Lock()
def consume(self, tokens_needed=1):
"""Tente de consommer des tokens"""
with self.lock:
now = time.time()
# Remplir le bucket selon le temps écoulé
time_passed = now - self.last_refill
self.tokens = min(
self.capacity,
self.tokens + time_passed * self.refill_rate
)
self.last_refill = now
if self.tokens >= tokens_needed:
self.tokens -= tokens_needed
return True
return False
def wait_time(self, tokens_needed=1):
"""Calcule le temps d'attente nécessaire"""
with self.lock:
if self.tokens >= tokens_needed:
return 0
tokens_shortage = tokens_needed - self.tokens
return tokens_shortage / self.refill_rate
# Simulation des limites YODI
request_bucket = TokenBucket(capacity=60, refill_rate=1) # 60 req/min
token_bucket = TokenBucket(capacity=90000, refill_rate=1500) # 90k tokens/min
Headers de réponse​
L'API YODI retourne des headers informatifs sur votre usage :
X-RateLimit-Limit-Requests: 60
X-RateLimit-Remaining-Requests: 45
X-RateLimit-Reset-Requests: 1693574460
X-RateLimit-Limit-Tokens: 90000
X-RateLimit-Remaining-Tokens: 85000
X-RateLimit-Reset-Tokens: 1693574460
Gestion des limites dans le code​
Client avec rate limiting intégré​
import time
import math
from datetime import datetime, timedelta
class RateLimitedYodiClient:
def __init__(self, api_key, requests_per_minute=60, tokens_per_minute=90000):
self.client = Client(api_key=api_key)
self.requests_per_minute = requests_per_minute
self.tokens_per_minute = tokens_per_minute
# Tracking des requĂŞtes
self.request_timestamps = []
self.token_usage = []
def _clean_old_records(self):
"""Supprime les enregistrements de plus d'une minute"""
cutoff = datetime.now() - timedelta(minutes=1)
self.request_timestamps = [
ts for ts in self.request_timestamps if ts > cutoff
]
self.token_usage = [
(ts, tokens) for ts, tokens in self.token_usage if ts > cutoff
]
def _can_make_request(self, estimated_tokens=1000):
"""Vérifie si on peut faire une requête"""
self._clean_old_records()
# Vérifier les limites de requêtes
if len(self.request_timestamps) >= self.requests_per_minute:
return False, "Limite de requĂŞtes atteinte"
# Vérifier les limites de tokens
current_tokens = sum(tokens for _, tokens in self.token_usage)
if current_tokens + estimated_tokens > self.tokens_per_minute:
return False, "Limite de tokens atteinte"
return True, None
def _wait_if_needed(self, estimated_tokens=1000):
"""Attend si nécessaire pour respecter les limites"""
can_proceed, reason = self._can_make_request(estimated_tokens)
if not can_proceed:
if "requĂŞtes" in reason:
# Attendre que la plus ancienne requĂŞte expire
oldest_request = min(self.request_timestamps)
wait_until = oldest_request + timedelta(minutes=1)
wait_seconds = (wait_until - datetime.now()).total_seconds()
else:
# Attendre que des tokens se libèrent
oldest_token_usage = min(self.token_usage, key=lambda x: x[0])
wait_until = oldest_token_usage[0] + timedelta(minutes=1)
wait_seconds = (wait_until - datetime.now()).total_seconds()
if wait_seconds > 0:
print(f"Attente de {wait_seconds:.1f}s pour respecter les limites...")
time.sleep(wait_seconds)
def _estimate_tokens(self, messages, max_tokens=None):
"""Estime le nombre de tokens pour une requĂŞte"""
# Estimation grossière : 1.3 tokens par mot
input_tokens = sum(len(msg['content'].split()) * 1.3 for msg in messages)
output_tokens = max_tokens or 500
return int(input_tokens + output_tokens)
def chat_completions_create(self, **kwargs):
"""Crée une completion en respectant les limites"""
messages = kwargs.get('messages', [])
max_tokens = kwargs.get('max_tokens')
estimated_tokens = self._estimate_tokens(messages, max_tokens)
# Attendre si nécessaire
self._wait_if_needed(estimated_tokens)
# Faire la requĂŞte
now = datetime.now()
try:
response = self.client.chat.completions.create(**kwargs)
# Enregistrer l'usage réel
actual_tokens = response.usage.total_tokens if response.usage else estimated_tokens
self.request_timestamps.append(now)
self.token_usage.append((now, actual_tokens))
return response
except Exception as e:
# Enregistrer quand mĂŞme la tentative
self.request_timestamps.append(now)
raise e
# Utilisation
rate_limited_client = RateLimitedYodiClient(
api_key=os.getenv("YODI_API_KEY"),
requests_per_minute=60,
tokens_per_minute=90000
)
Backoff exponentiel avancé​
import random
import time
from functools import wraps
class ExponentialBackoff:
def __init__(self,
base_delay=1,
max_delay=60,
exponential_base=2,
jitter=True,
max_retries=5):
self.base_delay = base_delay
self.max_delay = max_delay
self.exponential_base = exponential_base
self.jitter = jitter
self.max_retries = max_retries
def calculate_delay(self, attempt):
"""Calcule le délai pour une tentative donnée"""
delay = self.base_delay * (self.exponential_base ** attempt)
delay = min(delay, self.max_delay)
if self.jitter:
# Ajouter du jitter pour éviter le thundering herd
jitter_range = delay * 0.1
delay += random.uniform(-jitter_range, jitter_range)
return max(0, delay)
def __call__(self, func):
@wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(self.max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
last_exception = e
# Vérifier si c'est une erreur de rate limit
if self._is_rate_limit_error(e):
if attempt < self.max_retries - 1:
delay = self.calculate_delay(attempt)
print(f"Rate limit hit, waiting {delay:.1f}s (attempt {attempt + 1})")
time.sleep(delay)
continue
# Pour les autres erreurs, ne pas retry
raise e
raise last_exception
return wrapper
def _is_rate_limit_error(self, error):
"""Détermine si l'erreur est liée au rate limiting"""
error_str = str(error).lower()
return any(keyword in error_str for keyword in [
'rate limit', 'rate_limit', '429', 'too many requests'
])
# Utilisation avec décorateur
@ExponentialBackoff(base_delay=2, max_delay=120, max_retries=5)
def robust_api_call(messages, **kwargs):
return client.chat.completions.create(messages=messages, **kwargs)
Monitoring en temps réel​
import threading
import time
from collections import deque, defaultdict
from datetime import datetime, timedelta
class RateLimitMonitor:
def __init__(self, window_minutes=5):
self.window_minutes = window_minutes
self.requests = deque()
self.tokens = deque()
self.errors = deque()
self.lock = threading.Lock()
# Statistiques par endpoint
self.stats_by_endpoint = defaultdict(lambda: {
'requests': deque(),
'tokens': deque(),
'errors': deque()
})
def record_request(self, endpoint, tokens_used, error=None):
"""Enregistre une requĂŞte"""
now = datetime.now()
with self.lock:
self.requests.append(now)
self.tokens.append((now, tokens_used))
# Stats par endpoint
endpoint_stats = self.stats_by_endpoint[endpoint]
endpoint_stats['requests'].append(now)
endpoint_stats['tokens'].append((now, tokens_used))
if error:
self.errors.append((now, error))
endpoint_stats['errors'].append((now, error))
# Nettoyer les anciens enregistrements
self._cleanup()
def _cleanup(self):
"""Supprime les enregistrements anciens"""
cutoff = datetime.now() - timedelta(minutes=self.window_minutes)
# Nettoyer les stats globales
while self.requests and self.requests[0] < cutoff:
self.requests.popleft()
while self.tokens and self.tokens[0][0] < cutoff:
self.tokens.popleft()
while self.errors and self.errors[0][0] < cutoff:
self.errors.popleft()
# Nettoyer les stats par endpoint
for endpoint_stats in self.stats_by_endpoint.values():
while endpoint_stats['requests'] and endpoint_stats['requests'][0] < cutoff:
endpoint_stats['requests'].popleft()
while endpoint_stats['tokens'] and endpoint_stats['tokens'][0][0] < cutoff:
endpoint_stats['tokens'].popleft()
while endpoint_stats['errors'] and endpoint_stats['errors'][0][0] < cutoff:
endpoint_stats['errors'].popleft()
def get_current_usage(self):
"""Retourne l'usage actuel"""
with self.lock:
requests_per_minute = len(self.requests) / self.window_minutes
total_tokens = sum(tokens for _, tokens in self.tokens)
tokens_per_minute = total_tokens / self.window_minutes
error_rate = len(self.errors) / max(1, len(self.requests))
return {
'requests_per_minute': requests_per_minute,
'tokens_per_minute': tokens_per_minute,
'error_rate': error_rate,
'total_requests': len(self.requests),
'total_tokens': total_tokens
}
def get_endpoint_stats(self, endpoint):
"""Retourne les stats pour un endpoint spécifique"""
with self.lock:
if endpoint not in self.stats_by_endpoint:
return None
stats = self.stats_by_endpoint[endpoint]
requests_per_minute = len(stats['requests']) / self.window_minutes
total_tokens = sum(tokens for _, tokens in stats['tokens'])
tokens_per_minute = total_tokens / self.window_minutes
error_rate = len(stats['errors']) / max(1, len(stats['requests']))
return {
'requests_per_minute': requests_per_minute,
'tokens_per_minute': tokens_per_minute,
'error_rate': error_rate
}
def predict_limit_hit(self, limit_rpm=60, limit_tpm=90000):
"""Prédit quand les limites seront atteintes"""
usage = self.get_current_usage()
predictions = {}
if usage['requests_per_minute'] > 0:
rpm_eta = (limit_rpm - usage['requests_per_minute']) / usage['requests_per_minute']
predictions['requests'] = max(0, rpm_eta)
if usage['tokens_per_minute'] > 0:
tpm_eta = (limit_tpm - usage['tokens_per_minute']) / usage['tokens_per_minute']
predictions['tokens'] = max(0, tpm_eta)
return predictions
# Utilisation
monitor = RateLimitMonitor()
def monitored_api_call(endpoint, messages, **kwargs):
"""Appel API avec monitoring"""
start_time = time.time()
error = None
tokens_used = 0
try:
response = client.chat.completions.create(messages=messages, **kwargs)
tokens_used = response.usage.total_tokens if response.usage else 0
return response
except Exception as e:
error = e
raise
finally:
monitor.record_request(endpoint, tokens_used, error)
# Rapport périodique
def print_usage_report():
usage = monitor.get_current_usage()
predictions = monitor.predict_limit_hit()
print(f"""
📊 Usage des 5 dernières minutes:
- RequĂŞtes/min: {usage['requests_per_minute']:.1f}
- Tokens/min: {usage['tokens_per_minute']:.0f}
- Taux d'erreur: {usage['error_rate']:.2%}
Prédictions:
- Limite requĂŞtes dans: {predictions.get('requests', 'N/A')} min
- Limite tokens dans: {predictions.get('tokens', 'N/A')} min
""")
Optimisation pour les limites​
Stratégies de batching​
import asyncio
from typing import List, Dict, Any
class RequestBatcher:
def __init__(self, batch_size=10, flush_interval=5):
self.batch_size = batch_size
self.flush_interval = flush_interval
self.pending_requests = []
self.lock = asyncio.Lock()
# Démarrer le flush automatique
asyncio.create_task(self._auto_flush())
async def add_request(self, request_data):
"""Ajoute une requĂŞte au batch"""
future = asyncio.Future()
async with self.lock:
self.pending_requests.append((request_data, future))
if len(self.pending_requests) >= self.batch_size:
await self._flush_batch()
return await future
async def _flush_batch(self):
"""Traite le batch actuel"""
if not self.pending_requests:
return
batch = self.pending_requests[:]
self.pending_requests.clear()
# Traiter les requêtes en parallèle (avec limitation)
semaphore = asyncio.Semaphore(5) # Max 5 requêtes simultanées
async def process_request(request_data, future):
async with semaphore:
try:
result = await self._make_api_call(request_data)
future.set_result(result)
except Exception as e:
future.set_exception(e)
tasks = [
process_request(request_data, future)
for request_data, future in batch
]
await asyncio.gather(*tasks, return_exceptions=True)
async def _auto_flush(self):
"""Flush automatique périodique"""
while True:
await asyncio.sleep(self.flush_interval)
async with self.lock:
if self.pending_requests:
await self._flush_batch()
async def _make_api_call(self, request_data):
"""Fait l'appel API réel"""
return client.chat.completions.create(**request_data)
# Utilisation
batcher = RequestBatcher(batch_size=10, flush_interval=3)
async def batched_completion(messages, **kwargs):
"""Completion via le système de batch"""
request_data = {
'messages': messages,
**kwargs
}
return await batcher.add_request(request_data)
Cache intelligent par limite​
import hashlib
import json
import time
from typing import Optional, Tuple
class RateLimitAwareCache:
def __init__(self, max_size=1000):
self.cache = {}
self.access_times = {}
self.max_size = max_size
def _generate_key(self, messages, **kwargs) -> str:
"""Génère une clé de cache unique"""
# Exclure les paramètres qui affectent la créativité pour le cache
cache_kwargs = {k: v for k, v in kwargs.items()
if k not in ['temperature', 'top_p', 'seed']}
content = {
'messages': messages,
'params': cache_kwargs
}
return hashlib.md5(
json.dumps(content, sort_keys=True).encode()
).hexdigest()
def get(self, messages, **kwargs) -> Optional[dict]:
"""Récupère une réponse du cache"""
# Ne pas utiliser le cache pour les requêtes créatives
if kwargs.get('temperature', 0) > 0.5:
return None
key = self._generate_key(messages, **kwargs)
if key in self.cache:
self.access_times[key] = time.time()
return self.cache[key]
return None
def set(self, response, messages, **kwargs):
"""Met en cache une réponse"""
if kwargs.get('temperature', 0) > 0.5:
return
key = self._generate_key(messages, **kwargs)
# Nettoyer le cache si nécessaire
if len(self.cache) >= self.max_size:
self._evict_oldest()
self.cache[key] = response
self.access_times[key] = time.time()
def _evict_oldest(self):
"""Supprime les entrées les plus anciennes"""
# Supprimer 20% des entrées les plus anciennes
sorted_keys = sorted(
self.access_times.keys(),
key=lambda k: self.access_times[k]
)
evict_count = max(1, len(sorted_keys) // 5)
for key in sorted_keys[:evict_count]:
del self.cache[key]
del self.access_times[key]
cache = RateLimitAwareCache()
def cached_completion(messages, **kwargs):
"""Completion avec cache intelligent"""
# Vérifier le cache d'abord
cached_response = cache.get(messages, **kwargs)
if cached_response:
print("📄 Réponse depuis le cache")
return cached_response
# Faire l'appel API
response = monitored_api_call('chat', messages, **kwargs)
# Mettre en cache
cache.set(response, messages, **kwargs)
return response
Alertes et notifications​
Système d'alertes avancé​
import smtplib
from email.mime.text import MimeText
from enum import Enum
class AlertLevel(Enum):
INFO = "info"
WARNING = "warning"
CRITICAL = "critical"
class AlertManager:
def __init__(self, email_config=None, webhook_url=None):
self.email_config = email_config
self.webhook_url = webhook_url
self.alert_history = []
# Seuils d'alerte
self.thresholds = {
AlertLevel.WARNING: {
'requests_per_minute': 45, # 75% de la limite
'tokens_per_minute': 67500,
'error_rate': 0.05
},
AlertLevel.CRITICAL: {
'requests_per_minute': 55, # 90% de la limite
'tokens_per_minute': 81000,
'error_rate': 0.10
}
}
def check_usage_and_alert(self, usage_stats):
"""Vérifie l'usage et envoie des alertes si nécessaire"""
alerts = []
for level in [AlertLevel.CRITICAL, AlertLevel.WARNING]:
thresholds = self.thresholds[level]
if usage_stats['requests_per_minute'] > thresholds['requests_per_minute']:
alerts.append({
'level': level,
'type': 'rate_limit',
'message': f"Limite de requĂŞtes proche: {usage_stats['requests_per_minute']:.1f}/min"
})
if usage_stats['tokens_per_minute'] > thresholds['tokens_per_minute']:
alerts.append({
'level': level,
'type': 'token_limit',
'message': f"Limite de tokens proche: {usage_stats['tokens_per_minute']:.0f}/min"
})
if usage_stats['error_rate'] > thresholds['error_rate']:
alerts.append({
'level': level,
'type': 'error_rate',
'message': f"Taux d'erreur élevé: {usage_stats['error_rate']:.2%}"
})
# Envoyer les alertes
for alert in alerts:
self._send_alert(alert)
def _send_alert(self, alert):
"""Envoie une alerte"""
# Éviter le spam d'alertes
recent_similar = [
a for a in self.alert_history[-10:]
if a['type'] == alert['type'] and
time.time() - a['timestamp'] < 300 # 5 minutes
]
if recent_similar:
return
alert['timestamp'] = time.time()
self.alert_history.append(alert)
message = f"🚨 {alert['level'].value.upper()}: {alert['message']}"
print(message)
if self.email_config and alert['level'] == AlertLevel.CRITICAL:
self._send_email_alert(message)
def _send_email_alert(self, message):
"""Envoie une alerte par email"""
try:
msg = MimeText(message)
msg['Subject'] = 'YODI API Alert'
msg['From'] = self.email_config['from']
msg['To'] = self.email_config['to']
with smtplib.SMTP(self.email_config['smtp_server']) as server:
server.starttls()
server.login(self.email_config['username'], self.email_config['password'])
server.send_message(msg)
except Exception as e:
print(f"Erreur envoi email: {e}")
# Configuration et utilisation
alert_manager = AlertManager(
email_config={
'smtp_server': 'smtp.gmail.com',
'username': 'your-email@gmail.com',
'password': 'your-app-password',
'from': 'your-email@gmail.com',
'to': 'admin@yourcompany.com'
}
)
# Vérification périodique
def periodic_alert_check():
usage = monitor.get_current_usage()
alert_manager.check_usage_and_alert(usage)