Skip to main content

Rate limits

The YODI API enforces rate limits to ensure fair usage and maintain quality of service for all users.

Limits overview​

Limits by plan​

PlanRequĂŞtes/minuteTokens/minuteRequĂŞtes/jour
Gratuit2040,0001,000
Starter6090,00010,000
Pro300500,00050,000
Enterprise1,0002,000,000200,000

Limites par modèle​

ModèleMultiplicationNotes
yodi-instruct1xLimites de base
yodi-11xLimites de base
yodi-embed3xPlus de requêtes autorisées
yodi-code1xLimites de base
yodi-1-32k0.5xLimites réduites

Comment fonctionnent les limites​

Algorithme Token Bucket​

import time
from threading import Lock

class TokenBucket:
def __init__(self, capacity, refill_rate):
self.capacity = capacity
self.tokens = capacity
self.refill_rate = refill_rate # tokens par seconde
self.last_refill = time.time()
self.lock = Lock()

def consume(self, tokens_needed=1):
"""Tente de consommer des tokens"""
with self.lock:
now = time.time()

# Remplir le bucket selon le temps écoulé
time_passed = now - self.last_refill
self.tokens = min(
self.capacity,
self.tokens + time_passed * self.refill_rate
)
self.last_refill = now

if self.tokens >= tokens_needed:
self.tokens -= tokens_needed
return True

return False

def wait_time(self, tokens_needed=1):
"""Calcule le temps d'attente nécessaire"""
with self.lock:
if self.tokens >= tokens_needed:
return 0

tokens_shortage = tokens_needed - self.tokens
return tokens_shortage / self.refill_rate

# Simulation des limites YODI
request_bucket = TokenBucket(capacity=60, refill_rate=1) # 60 req/min
token_bucket = TokenBucket(capacity=90000, refill_rate=1500) # 90k tokens/min

Headers de réponse​

L'API YODI retourne des headers informatifs sur votre usage :

X-RateLimit-Limit-Requests: 60
X-RateLimit-Remaining-Requests: 45
X-RateLimit-Reset-Requests: 1693574460

X-RateLimit-Limit-Tokens: 90000
X-RateLimit-Remaining-Tokens: 85000
X-RateLimit-Reset-Tokens: 1693574460

Gestion des limites dans le code​

Client avec rate limiting intégré​

import time
import math
from datetime import datetime, timedelta

class RateLimitedYodiClient:
def __init__(self, api_key, requests_per_minute=60, tokens_per_minute=90000):
self.client = Client(api_key=api_key)
self.requests_per_minute = requests_per_minute
self.tokens_per_minute = tokens_per_minute

# Tracking des requĂŞtes
self.request_timestamps = []
self.token_usage = []

def _clean_old_records(self):
"""Supprime les enregistrements de plus d'une minute"""
cutoff = datetime.now() - timedelta(minutes=1)

self.request_timestamps = [
ts for ts in self.request_timestamps if ts > cutoff
]

self.token_usage = [
(ts, tokens) for ts, tokens in self.token_usage if ts > cutoff
]

def _can_make_request(self, estimated_tokens=1000):
"""Vérifie si on peut faire une requête"""
self._clean_old_records()

# Vérifier les limites de requêtes
if len(self.request_timestamps) >= self.requests_per_minute:
return False, "Limite de requĂŞtes atteinte"

# Vérifier les limites de tokens
current_tokens = sum(tokens for _, tokens in self.token_usage)
if current_tokens + estimated_tokens > self.tokens_per_minute:
return False, "Limite de tokens atteinte"

return True, None

def _wait_if_needed(self, estimated_tokens=1000):
"""Attend si nécessaire pour respecter les limites"""
can_proceed, reason = self._can_make_request(estimated_tokens)

if not can_proceed:
if "requĂŞtes" in reason:
# Attendre que la plus ancienne requĂŞte expire
oldest_request = min(self.request_timestamps)
wait_until = oldest_request + timedelta(minutes=1)
wait_seconds = (wait_until - datetime.now()).total_seconds()
else:
# Attendre que des tokens se libèrent
oldest_token_usage = min(self.token_usage, key=lambda x: x[0])
wait_until = oldest_token_usage[0] + timedelta(minutes=1)
wait_seconds = (wait_until - datetime.now()).total_seconds()

if wait_seconds > 0:
print(f"Attente de {wait_seconds:.1f}s pour respecter les limites...")
time.sleep(wait_seconds)

def _estimate_tokens(self, messages, max_tokens=None):
"""Estime le nombre de tokens pour une requĂŞte"""
# Estimation grossière : 1.3 tokens par mot
input_tokens = sum(len(msg['content'].split()) * 1.3 for msg in messages)
output_tokens = max_tokens or 500
return int(input_tokens + output_tokens)

def chat_completions_create(self, **kwargs):
"""Crée une completion en respectant les limites"""
messages = kwargs.get('messages', [])
max_tokens = kwargs.get('max_tokens')

estimated_tokens = self._estimate_tokens(messages, max_tokens)

# Attendre si nécessaire
self._wait_if_needed(estimated_tokens)

# Faire la requĂŞte
now = datetime.now()
try:
response = self.client.chat.completions.create(**kwargs)

# Enregistrer l'usage réel
actual_tokens = response.usage.total_tokens if response.usage else estimated_tokens
self.request_timestamps.append(now)
self.token_usage.append((now, actual_tokens))

return response

except Exception as e:
# Enregistrer quand mĂŞme la tentative
self.request_timestamps.append(now)
raise e

# Utilisation
rate_limited_client = RateLimitedYodiClient(
api_key=os.getenv("YODI_API_KEY"),
requests_per_minute=60,
tokens_per_minute=90000
)

Backoff exponentiel avancé​

import random
import time
from functools import wraps

class ExponentialBackoff:
def __init__(self,
base_delay=1,
max_delay=60,
exponential_base=2,
jitter=True,
max_retries=5):
self.base_delay = base_delay
self.max_delay = max_delay
self.exponential_base = exponential_base
self.jitter = jitter
self.max_retries = max_retries

def calculate_delay(self, attempt):
"""Calcule le délai pour une tentative donnée"""
delay = self.base_delay * (self.exponential_base ** attempt)
delay = min(delay, self.max_delay)

if self.jitter:
# Ajouter du jitter pour éviter le thundering herd
jitter_range = delay * 0.1
delay += random.uniform(-jitter_range, jitter_range)

return max(0, delay)

def __call__(self, func):
@wraps(func)
def wrapper(*args, **kwargs):
last_exception = None

for attempt in range(self.max_retries):
try:
return func(*args, **kwargs)

except Exception as e:
last_exception = e

# Vérifier si c'est une erreur de rate limit
if self._is_rate_limit_error(e):
if attempt < self.max_retries - 1:
delay = self.calculate_delay(attempt)
print(f"Rate limit hit, waiting {delay:.1f}s (attempt {attempt + 1})")
time.sleep(delay)
continue

# Pour les autres erreurs, ne pas retry
raise e

raise last_exception

return wrapper

def _is_rate_limit_error(self, error):
"""Détermine si l'erreur est liée au rate limiting"""
error_str = str(error).lower()
return any(keyword in error_str for keyword in [
'rate limit', 'rate_limit', '429', 'too many requests'
])

# Utilisation avec décorateur
@ExponentialBackoff(base_delay=2, max_delay=120, max_retries=5)
def robust_api_call(messages, **kwargs):
return client.chat.completions.create(messages=messages, **kwargs)

Monitoring en temps réel​

import threading
import time
from collections import deque, defaultdict
from datetime import datetime, timedelta

class RateLimitMonitor:
def __init__(self, window_minutes=5):
self.window_minutes = window_minutes
self.requests = deque()
self.tokens = deque()
self.errors = deque()
self.lock = threading.Lock()

# Statistiques par endpoint
self.stats_by_endpoint = defaultdict(lambda: {
'requests': deque(),
'tokens': deque(),
'errors': deque()
})

def record_request(self, endpoint, tokens_used, error=None):
"""Enregistre une requĂŞte"""
now = datetime.now()

with self.lock:
self.requests.append(now)
self.tokens.append((now, tokens_used))

# Stats par endpoint
endpoint_stats = self.stats_by_endpoint[endpoint]
endpoint_stats['requests'].append(now)
endpoint_stats['tokens'].append((now, tokens_used))

if error:
self.errors.append((now, error))
endpoint_stats['errors'].append((now, error))

# Nettoyer les anciens enregistrements
self._cleanup()

def _cleanup(self):
"""Supprime les enregistrements anciens"""
cutoff = datetime.now() - timedelta(minutes=self.window_minutes)

# Nettoyer les stats globales
while self.requests and self.requests[0] < cutoff:
self.requests.popleft()

while self.tokens and self.tokens[0][0] < cutoff:
self.tokens.popleft()

while self.errors and self.errors[0][0] < cutoff:
self.errors.popleft()

# Nettoyer les stats par endpoint
for endpoint_stats in self.stats_by_endpoint.values():
while endpoint_stats['requests'] and endpoint_stats['requests'][0] < cutoff:
endpoint_stats['requests'].popleft()

while endpoint_stats['tokens'] and endpoint_stats['tokens'][0][0] < cutoff:
endpoint_stats['tokens'].popleft()

while endpoint_stats['errors'] and endpoint_stats['errors'][0][0] < cutoff:
endpoint_stats['errors'].popleft()

def get_current_usage(self):
"""Retourne l'usage actuel"""
with self.lock:
requests_per_minute = len(self.requests) / self.window_minutes

total_tokens = sum(tokens for _, tokens in self.tokens)
tokens_per_minute = total_tokens / self.window_minutes

error_rate = len(self.errors) / max(1, len(self.requests))

return {
'requests_per_minute': requests_per_minute,
'tokens_per_minute': tokens_per_minute,
'error_rate': error_rate,
'total_requests': len(self.requests),
'total_tokens': total_tokens
}

def get_endpoint_stats(self, endpoint):
"""Retourne les stats pour un endpoint spécifique"""
with self.lock:
if endpoint not in self.stats_by_endpoint:
return None

stats = self.stats_by_endpoint[endpoint]

requests_per_minute = len(stats['requests']) / self.window_minutes
total_tokens = sum(tokens for _, tokens in stats['tokens'])
tokens_per_minute = total_tokens / self.window_minutes
error_rate = len(stats['errors']) / max(1, len(stats['requests']))

return {
'requests_per_minute': requests_per_minute,
'tokens_per_minute': tokens_per_minute,
'error_rate': error_rate
}

def predict_limit_hit(self, limit_rpm=60, limit_tpm=90000):
"""Prédit quand les limites seront atteintes"""
usage = self.get_current_usage()

predictions = {}

if usage['requests_per_minute'] > 0:
rpm_eta = (limit_rpm - usage['requests_per_minute']) / usage['requests_per_minute']
predictions['requests'] = max(0, rpm_eta)

if usage['tokens_per_minute'] > 0:
tpm_eta = (limit_tpm - usage['tokens_per_minute']) / usage['tokens_per_minute']
predictions['tokens'] = max(0, tpm_eta)

return predictions

# Utilisation
monitor = RateLimitMonitor()

def monitored_api_call(endpoint, messages, **kwargs):
"""Appel API avec monitoring"""
start_time = time.time()
error = None
tokens_used = 0

try:
response = client.chat.completions.create(messages=messages, **kwargs)
tokens_used = response.usage.total_tokens if response.usage else 0
return response

except Exception as e:
error = e
raise

finally:
monitor.record_request(endpoint, tokens_used, error)

# Rapport périodique
def print_usage_report():
usage = monitor.get_current_usage()
predictions = monitor.predict_limit_hit()

print(f"""
📊 Usage des 5 dernières minutes:
- RequĂŞtes/min: {usage['requests_per_minute']:.1f}
- Tokens/min: {usage['tokens_per_minute']:.0f}
- Taux d'erreur: {usage['error_rate']:.2%}

Prédictions:
- Limite requĂŞtes dans: {predictions.get('requests', 'N/A')} min
- Limite tokens dans: {predictions.get('tokens', 'N/A')} min
""")

Optimisation pour les limites​

Stratégies de batching​

import asyncio
from typing import List, Dict, Any

class RequestBatcher:
def __init__(self, batch_size=10, flush_interval=5):
self.batch_size = batch_size
self.flush_interval = flush_interval
self.pending_requests = []
self.lock = asyncio.Lock()

# Démarrer le flush automatique
asyncio.create_task(self._auto_flush())

async def add_request(self, request_data):
"""Ajoute une requĂŞte au batch"""
future = asyncio.Future()

async with self.lock:
self.pending_requests.append((request_data, future))

if len(self.pending_requests) >= self.batch_size:
await self._flush_batch()

return await future

async def _flush_batch(self):
"""Traite le batch actuel"""
if not self.pending_requests:
return

batch = self.pending_requests[:]
self.pending_requests.clear()

# Traiter les requêtes en parallèle (avec limitation)
semaphore = asyncio.Semaphore(5) # Max 5 requêtes simultanées

async def process_request(request_data, future):
async with semaphore:
try:
result = await self._make_api_call(request_data)
future.set_result(result)
except Exception as e:
future.set_exception(e)

tasks = [
process_request(request_data, future)
for request_data, future in batch
]

await asyncio.gather(*tasks, return_exceptions=True)

async def _auto_flush(self):
"""Flush automatique périodique"""
while True:
await asyncio.sleep(self.flush_interval)
async with self.lock:
if self.pending_requests:
await self._flush_batch()

async def _make_api_call(self, request_data):
"""Fait l'appel API réel"""
return client.chat.completions.create(**request_data)

# Utilisation
batcher = RequestBatcher(batch_size=10, flush_interval=3)

async def batched_completion(messages, **kwargs):
"""Completion via le système de batch"""
request_data = {
'messages': messages,
**kwargs
}

return await batcher.add_request(request_data)

Cache intelligent par limite​

import hashlib
import json
import time
from typing import Optional, Tuple

class RateLimitAwareCache:
def __init__(self, max_size=1000):
self.cache = {}
self.access_times = {}
self.max_size = max_size

def _generate_key(self, messages, **kwargs) -> str:
"""Génère une clé de cache unique"""
# Exclure les paramètres qui affectent la créativité pour le cache
cache_kwargs = {k: v for k, v in kwargs.items()
if k not in ['temperature', 'top_p', 'seed']}

content = {
'messages': messages,
'params': cache_kwargs
}

return hashlib.md5(
json.dumps(content, sort_keys=True).encode()
).hexdigest()

def get(self, messages, **kwargs) -> Optional[dict]:
"""Récupère une réponse du cache"""
# Ne pas utiliser le cache pour les requêtes créatives
if kwargs.get('temperature', 0) > 0.5:
return None

key = self._generate_key(messages, **kwargs)

if key in self.cache:
self.access_times[key] = time.time()
return self.cache[key]

return None

def set(self, response, messages, **kwargs):
"""Met en cache une réponse"""
if kwargs.get('temperature', 0) > 0.5:
return

key = self._generate_key(messages, **kwargs)

# Nettoyer le cache si nécessaire
if len(self.cache) >= self.max_size:
self._evict_oldest()

self.cache[key] = response
self.access_times[key] = time.time()

def _evict_oldest(self):
"""Supprime les entrées les plus anciennes"""
# Supprimer 20% des entrées les plus anciennes
sorted_keys = sorted(
self.access_times.keys(),
key=lambda k: self.access_times[k]
)

evict_count = max(1, len(sorted_keys) // 5)

for key in sorted_keys[:evict_count]:
del self.cache[key]
del self.access_times[key]

cache = RateLimitAwareCache()

def cached_completion(messages, **kwargs):
"""Completion avec cache intelligent"""
# Vérifier le cache d'abord
cached_response = cache.get(messages, **kwargs)
if cached_response:
print("📄 Réponse depuis le cache")
return cached_response

# Faire l'appel API
response = monitored_api_call('chat', messages, **kwargs)

# Mettre en cache
cache.set(response, messages, **kwargs)

return response

Alertes et notifications​

Système d'alertes avancé​

import smtplib
from email.mime.text import MimeText
from enum import Enum

class AlertLevel(Enum):
INFO = "info"
WARNING = "warning"
CRITICAL = "critical"

class AlertManager:
def __init__(self, email_config=None, webhook_url=None):
self.email_config = email_config
self.webhook_url = webhook_url
self.alert_history = []

# Seuils d'alerte
self.thresholds = {
AlertLevel.WARNING: {
'requests_per_minute': 45, # 75% de la limite
'tokens_per_minute': 67500,
'error_rate': 0.05
},
AlertLevel.CRITICAL: {
'requests_per_minute': 55, # 90% de la limite
'tokens_per_minute': 81000,
'error_rate': 0.10
}
}

def check_usage_and_alert(self, usage_stats):
"""Vérifie l'usage et envoie des alertes si nécessaire"""
alerts = []

for level in [AlertLevel.CRITICAL, AlertLevel.WARNING]:
thresholds = self.thresholds[level]

if usage_stats['requests_per_minute'] > thresholds['requests_per_minute']:
alerts.append({
'level': level,
'type': 'rate_limit',
'message': f"Limite de requĂŞtes proche: {usage_stats['requests_per_minute']:.1f}/min"
})

if usage_stats['tokens_per_minute'] > thresholds['tokens_per_minute']:
alerts.append({
'level': level,
'type': 'token_limit',
'message': f"Limite de tokens proche: {usage_stats['tokens_per_minute']:.0f}/min"
})

if usage_stats['error_rate'] > thresholds['error_rate']:
alerts.append({
'level': level,
'type': 'error_rate',
'message': f"Taux d'erreur élevé: {usage_stats['error_rate']:.2%}"
})

# Envoyer les alertes
for alert in alerts:
self._send_alert(alert)

def _send_alert(self, alert):
"""Envoie une alerte"""
# Éviter le spam d'alertes
recent_similar = [
a for a in self.alert_history[-10:]
if a['type'] == alert['type'] and
time.time() - a['timestamp'] < 300 # 5 minutes
]

if recent_similar:
return

alert['timestamp'] = time.time()
self.alert_history.append(alert)

message = f"🚨 {alert['level'].value.upper()}: {alert['message']}"

print(message)

if self.email_config and alert['level'] == AlertLevel.CRITICAL:
self._send_email_alert(message)

def _send_email_alert(self, message):
"""Envoie une alerte par email"""
try:
msg = MimeText(message)
msg['Subject'] = 'YODI API Alert'
msg['From'] = self.email_config['from']
msg['To'] = self.email_config['to']

with smtplib.SMTP(self.email_config['smtp_server']) as server:
server.starttls()
server.login(self.email_config['username'], self.email_config['password'])
server.send_message(msg)

except Exception as e:
print(f"Erreur envoi email: {e}")

# Configuration et utilisation
alert_manager = AlertManager(
email_config={
'smtp_server': 'smtp.gmail.com',
'username': 'your-email@gmail.com',
'password': 'your-app-password',
'from': 'your-email@gmail.com',
'to': 'admin@yourcompany.com'
}
)

# Vérification périodique
def periodic_alert_check():
usage = monitor.get_current_usage()
alert_manager.check_usage_and_alert(usage)

Prochaines étapes​