Volver a Documentación
Ejemplos avanzados — Python
Casos de uso reales y patrones avanzados para integrar Rayuela en aplicaciones Python de producción.
Cliente Python Completo
Clase completa con manejo de errores, retry logic y logging
import requests
import logging
import time
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
@dataclass
class RayuelaConfig:
api_key: str
base_url: str = "https://rayuela-backend-e7apihrdoa-uc.a.run.app/api/v1"
timeout: int = 30
max_retries: int = 3
retry_delay: float = 1.0
class RayuelaClient:
def __init__(self, config: RayuelaConfig):
self.config = config
self.session = requests.Session()
self.session.headers.update({
"X-API-Key": config.api_key,
"Content-Type": "application/json",
"User-Agent": "RayuelaClient/1.0"
})
self.logger = logging.getLogger(__name__)
def _make_request(self, method: str, endpoint: str, **kwargs) -> Dict[str, Any]:
url = f"{self.config.base_url}{endpoint}"
for attempt in range(self.config.max_retries):
try:
response = self.session.request(
method, url, timeout=self.config.timeout, **kwargs
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
self.logger.warning(f"Request failed (attempt {attempt + 1}): {e}")
if attempt == self.config.max_retries - 1:
raise
time.sleep(self.config.retry_delay * (2 ** attempt))
def get_recommendations(
self,
user_id: str,
limit: int = 10,
filters: Optional[Dict] = None,
context: Optional[Dict] = None,
strategy: str = "hybrid"
) -> Dict[str, Any]:
payload = {
"external_user_id": user_id,
"recommendation_goal": "user_engagement",
"model_variant": "standard",
"limit": limit,
"strategy": strategy,
"include_explanation": False,
"filters": filters or {"logic": "and", "filters": []},
"context": context or {}
}
return self._make_request(
"POST",
"/recommendations/personalized/query",
json=payload
)Integración E-commerce
Ejemplo completo para tienda online con Django/Flask
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
import json
class RecommendationService:
def __init__(self):
self.rayuela = RayuelaClient(RayuelaConfig(
api_key=settings.RAYUELA_API_KEY
))
def get_homepage_recommendations(self, user_id: str) -> List[Dict]:
"""Recomendaciones para la página principal"""
try:
response = self.rayuela.get_recommendations(
user_id=user_id,
limit=8,
strategy="hybrid",
filters={
"logic": "and",
"filters": [
{"field": "inStock", "op": "eq", "value": True},
{"field": "price", "op": "gt", "value": 10}
]
}
)
return response.get("items", [])
except Exception as e:
self.logger.error(f"Error getting homepage recs: {e}")
return self._get_fallback_recommendations()
def get_product_page_recommendations(
self,
user_id: str,
current_product_id: str
) -> List[Dict]:
"""Recomendaciones para página de producto"""
return self.rayuela.get_recommendations(
user_id=user_id,
limit=4,
strategy="content_based",
filters={
"logic": "and",
"filters": [{"field": "inStock", "op": "eq", "value": True}]
},
context={
"page_type": "product_detail",
"source_external_product_id": current_product_id
},
).get("items", [])
def track_interaction(
self,
user_id: str,
product_id: str,
interaction_type: str,
value: float = 1.0
):
"""Registrar interacción del usuario"""
try:
self.rayuela._make_request("POST", "/interactions", json={
"external_user_id": user_id,
"external_product_id": product_id,
"interaction_type": interaction_type,
"value": value
})
except Exception as e:
self.logger.error(f"Error tracking interaction: {e}")
def _get_fallback_recommendations(self) -> List[Dict]:
"""Recomendaciones de fallback si falla la API"""
# Retornar productos populares de la base de datos local
return Product.objects.filter(
is_active=True,
stock__gt=0
).order_by('-sales_count')[:8].values()Sincronización de Datos
Script para sincronizar datos desde base de datos existente
import pandas as pd
from datetime import datetime, timedelta
import asyncio
import aiohttp
class DataSyncService:
def __init__(self, rayuela_client: RayuelaClient):
self.rayuela = rayuela_client
self.batch_size = 1000
def sync_products_from_db(self):
"""Sincronizar productos desde base de datos"""
# Obtener productos de la BD
products_df = pd.read_sql('''
SELECT
id as externalId,
name,
description,
price,
category,
brand,
inStock,
created_at
FROM products
WHERE is_active = true
''', connection=db_connection)
# Procesar en lotes
for i in range(0, len(products_df), self.batch_size):
batch = products_df.iloc[i:i+self.batch_size]
products_data = batch.to_dict('records')
try:
self.rayuela._make_request(
"POST",
"/ingestion/batch",
json={"products": products_data}
)
print(f"Synced {len(products_data)} products")
except Exception as e:
print(f"Error syncing batch {i}: {e}")
def sync_recent_interactions(self, days_back: int = 7):
"""Sincronizar interacciones recientes"""
cutoff_date = datetime.now() - timedelta(days=days_back)
interactions_df = pd.read_sql('''
SELECT
user_id as external_user_id,
product_id as external_product_id,
action_type as interaction_type,
CASE
WHEN action_type = 'view' THEN 1.0
WHEN action_type = 'cart_add' THEN 2.0
WHEN action_type = 'purchase' THEN 5.0
ELSE 1.0
END as value,
created_at as timestamp
FROM user_interactions
WHERE created_at >= %s
''', connection=db_connection, params=[cutoff_date])
# Procesar en lotes
for i in range(0, len(interactions_df), self.batch_size):
batch = interactions_df.iloc[i:i+self.batch_size]
interactions_data = batch.to_dict('records')
try:
self.rayuela._make_request(
"POST",
"/ingestion/batch",
json={"interactions": interactions_data}
)
print(f"Synced {len(interactions_data)} interactions")
except Exception as e:
print(f"Error syncing interactions batch {i}: {e}")
# Uso del servicio
sync_service = DataSyncService(rayuela_client)
sync_service.sync_products_from_db()
sync_service.sync_recent_interactions(days_back=30)Tracking en Tiempo Real
Sistema de tracking asíncrono para alta concurrencia
import asyncio
import aiohttp
from queue import Queue
import threading
import time
class AsyncTrackingService:
def __init__(self, api_key: str, max_workers: int = 5):
self.api_key = api_key
self.base_url = "https://rayuela-backend-e7apihrdoa-uc.a.run.app/api/v1"
self.queue = Queue()
self.max_workers = max_workers
self.running = False
async def _send_interaction(self, session: aiohttp.ClientSession, data: Dict):
"""Enviar interacción individual"""
headers = {
"X-API-Key": self.api_key,
"Content-Type": "application/json"
}
try:
async with session.post(
f"{self.base_url}/interactions",
json=data,
headers=headers,
timeout=aiohttp.ClientTimeout(total=10)
) as response:
if response.status == 200:
return await response.json()
else:
print(f"Error {response.status}: {await response.text()}")
except Exception as e:
print(f"Failed to send interaction: {e}")
async def _worker(self):
"""Worker para procesar cola de interacciones"""
async with aiohttp.ClientSession() as session:
while self.running:
try:
# Procesar hasta 10 interacciones por lote
batch = []
for _ in range(10):
if not self.queue.empty():
batch.append(self.queue.get_nowait())
if batch:
# Enviar en paralelo
tasks = [
self._send_interaction(session, interaction)
for interaction in batch
]
await asyncio.gather(*tasks, return_exceptions=True)
else:
await asyncio.sleep(0.1)
except Exception as e:
print(f"Worker error: {e}")
def start(self):
"""Iniciar servicio de tracking"""
self.running = True
# Crear workers asíncronos
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
workers = [self._worker() for _ in range(self.max_workers)]
loop.run_until_complete(asyncio.gather(*workers))
def track(self, user_id: str, product_id: str, interaction_type: str, value: float = 1.0):
"""Agregar interacción a la cola"""
interaction = {
"external_user_id": user_id,
"external_product_id": product_id,
"interaction_type": interaction_type,
"value": value,
"timestamp": datetime.utcnow().isoformat()
}
self.queue.put(interaction)
def stop(self):
"""Detener servicio"""
self.running = False
# Uso en aplicación web
tracking_service = AsyncTrackingService(api_key="sk_prod_...")
# En una vista de Django/Flask
def product_view(request, product_id):
# Tracking asíncrono
tracking_service.track(
user_id=request.user.id,
product_id=product_id,
interaction_type="view"
)
# Continuar con la lógica normal
return render(request, 'product.html', context)Optimización de Rendimiento
Mejores prácticas
- • Usa connection pooling con requests.Session()
- • Implementa retry logic con backoff exponencial
- • Cachea recomendaciones por 5-15 minutos
- • Usa tracking asíncrono para interacciones
- • Implementa fallbacks para alta disponibilidad
⚠️ Consideraciones
- • No bloquees el hilo principal con llamadas síncronas
- • Monitorea rate limits y ajusta frecuencia
- • Usa timeouts apropiados (10-30 segundos)
- • Logea errores pero no expongas detalles al usuario
Siguientes Pasos