Volver a Documentación

Ejemplos avanzados — Python

Casos de uso reales y patrones avanzados para integrar Rayuela en aplicaciones Python de producción.

Cliente Python Completo
Clase completa con manejo de errores, retry logic y logging
import requests
import logging
import time
from typing import Dict, List, Optional, Any
from dataclasses import dataclass

@dataclass
class RayuelaConfig:
    api_key: str
    base_url: str = "https://rayuela-backend-e7apihrdoa-uc.a.run.app/api/v1"
    timeout: int = 30
    max_retries: int = 3
    retry_delay: float = 1.0

class RayuelaClient:
    def __init__(self, config: RayuelaConfig):
        self.config = config
        self.session = requests.Session()
        self.session.headers.update({
            "X-API-Key": config.api_key,
            "Content-Type": "application/json",
            "User-Agent": "RayuelaClient/1.0"
        })
        self.logger = logging.getLogger(__name__)
    
    def _make_request(self, method: str, endpoint: str, **kwargs) -> Dict[str, Any]:
        url = f"{self.config.base_url}{endpoint}"
        
        for attempt in range(self.config.max_retries):
            try:
                response = self.session.request(
                    method, url, timeout=self.config.timeout, **kwargs
                )
                response.raise_for_status()
                return response.json()
                
            except requests.exceptions.RequestException as e:
                self.logger.warning(f"Request failed (attempt {attempt + 1}): {e}")
                if attempt == self.config.max_retries - 1:
                    raise
                time.sleep(self.config.retry_delay * (2 ** attempt))
    
    def get_recommendations(
        self, 
        user_id: str, 
        limit: int = 10,
        filters: Optional[Dict] = None,
        context: Optional[Dict] = None,
        strategy: str = "hybrid"
    ) -> Dict[str, Any]:
        payload = {
            "external_user_id": user_id,
            "recommendation_goal": "user_engagement",
            "model_variant": "standard",
            "limit": limit,
            "strategy": strategy,
            "include_explanation": False,
            "filters": filters or {"logic": "and", "filters": []},
            "context": context or {}
        }
        
        return self._make_request(
            "POST", 
            "/recommendations/personalized/query", 
            json=payload
        )
Integración E-commerce
Ejemplo completo para tienda online con Django/Flask
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
import json

class RecommendationService:
    def __init__(self):
        self.rayuela = RayuelaClient(RayuelaConfig(
            api_key=settings.RAYUELA_API_KEY
        ))
    
    def get_homepage_recommendations(self, user_id: str) -> List[Dict]:
        """Recomendaciones para la página principal"""
        try:
            response = self.rayuela.get_recommendations(
                user_id=user_id,
                limit=8,
                strategy="hybrid",
                filters={
                    "logic": "and",
                    "filters": [
                        {"field": "inStock", "op": "eq", "value": True},
                        {"field": "price", "op": "gt", "value": 10}
                    ]
                }
            )
            return response.get("items", [])
        except Exception as e:
            self.logger.error(f"Error getting homepage recs: {e}")
            return self._get_fallback_recommendations()
    
    def get_product_page_recommendations(
        self, 
        user_id: str, 
        current_product_id: str
    ) -> List[Dict]:
        """Recomendaciones para página de producto"""
        return self.rayuela.get_recommendations(
            user_id=user_id,
            limit=4,
            strategy="content_based",
            filters={
                "logic": "and",
                "filters": [{"field": "inStock", "op": "eq", "value": True}]
            },
            context={
                "page_type": "product_detail",
                "source_external_product_id": current_product_id
            },
        ).get("items", [])
    
    def track_interaction(
        self, 
        user_id: str, 
        product_id: str, 
        interaction_type: str,
        value: float = 1.0
    ):
        """Registrar interacción del usuario"""
        try:
            self.rayuela._make_request("POST", "/interactions", json={
                "external_user_id": user_id,
                "external_product_id": product_id,
                "interaction_type": interaction_type,
                "value": value
            })
        except Exception as e:
            self.logger.error(f"Error tracking interaction: {e}")
    
    def _get_fallback_recommendations(self) -> List[Dict]:
        """Recomendaciones de fallback si falla la API"""
        # Retornar productos populares de la base de datos local
        return Product.objects.filter(
            is_active=True, 
            stock__gt=0
        ).order_by('-sales_count')[:8].values()
Sincronización de Datos
Script para sincronizar datos desde base de datos existente
import pandas as pd
from datetime import datetime, timedelta
import asyncio
import aiohttp

class DataSyncService:
    def __init__(self, rayuela_client: RayuelaClient):
        self.rayuela = rayuela_client
        self.batch_size = 1000
    
    def sync_products_from_db(self):
        """Sincronizar productos desde base de datos"""
        # Obtener productos de la BD
        products_df = pd.read_sql('''
            SELECT 
                id as externalId,
                name,
                description,
                price,
                category,
                brand,
                inStock,
                created_at
            FROM products 
            WHERE is_active = true
        ''', connection=db_connection)
        
        # Procesar en lotes
        for i in range(0, len(products_df), self.batch_size):
            batch = products_df.iloc[i:i+self.batch_size]
            products_data = batch.to_dict('records')
            
            try:
                self.rayuela._make_request(
                    "POST", 
                    "/ingestion/batch",
                    json={"products": products_data}
                )
                print(f"Synced {len(products_data)} products")
            except Exception as e:
                print(f"Error syncing batch {i}: {e}")
    
    def sync_recent_interactions(self, days_back: int = 7):
        """Sincronizar interacciones recientes"""
        cutoff_date = datetime.now() - timedelta(days=days_back)
        
        interactions_df = pd.read_sql('''
            SELECT 
                user_id as external_user_id,
                product_id as external_product_id,
                action_type as interaction_type,
                CASE 
                    WHEN action_type = 'view' THEN 1.0
                    WHEN action_type = 'cart_add' THEN 2.0
                    WHEN action_type = 'purchase' THEN 5.0
                    ELSE 1.0
                END as value,
                created_at as timestamp
            FROM user_interactions 
            WHERE created_at >= %s
        ''', connection=db_connection, params=[cutoff_date])
        
        # Procesar en lotes
        for i in range(0, len(interactions_df), self.batch_size):
            batch = interactions_df.iloc[i:i+self.batch_size]
            interactions_data = batch.to_dict('records')
            
            try:
                self.rayuela._make_request(
                    "POST",
                    "/ingestion/batch", 
                    json={"interactions": interactions_data}
                )
                print(f"Synced {len(interactions_data)} interactions")
            except Exception as e:
                print(f"Error syncing interactions batch {i}: {e}")

# Uso del servicio
sync_service = DataSyncService(rayuela_client)
sync_service.sync_products_from_db()
sync_service.sync_recent_interactions(days_back=30)
Tracking en Tiempo Real
Sistema de tracking asíncrono para alta concurrencia
import asyncio
import aiohttp
from queue import Queue
import threading
import time

class AsyncTrackingService:
    def __init__(self, api_key: str, max_workers: int = 5):
        self.api_key = api_key
        self.base_url = "https://rayuela-backend-e7apihrdoa-uc.a.run.app/api/v1"
        self.queue = Queue()
        self.max_workers = max_workers
        self.running = False
        
    async def _send_interaction(self, session: aiohttp.ClientSession, data: Dict):
        """Enviar interacción individual"""
        headers = {
            "X-API-Key": self.api_key,
            "Content-Type": "application/json"
        }
        
        try:
            async with session.post(
                f"{self.base_url}/interactions",
                json=data,
                headers=headers,
                timeout=aiohttp.ClientTimeout(total=10)
            ) as response:
                if response.status == 200:
                    return await response.json()
                else:
                    print(f"Error {response.status}: {await response.text()}")
        except Exception as e:
            print(f"Failed to send interaction: {e}")
    
    async def _worker(self):
        """Worker para procesar cola de interacciones"""
        async with aiohttp.ClientSession() as session:
            while self.running:
                try:
                    # Procesar hasta 10 interacciones por lote
                    batch = []
                    for _ in range(10):
                        if not self.queue.empty():
                            batch.append(self.queue.get_nowait())
                    
                    if batch:
                        # Enviar en paralelo
                        tasks = [
                            self._send_interaction(session, interaction)
                            for interaction in batch
                        ]
                        await asyncio.gather(*tasks, return_exceptions=True)
                    else:
                        await asyncio.sleep(0.1)
                        
                except Exception as e:
                    print(f"Worker error: {e}")
    
    def start(self):
        """Iniciar servicio de tracking"""
        self.running = True
        
        # Crear workers asíncronos
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        
        workers = [self._worker() for _ in range(self.max_workers)]
        loop.run_until_complete(asyncio.gather(*workers))
    
    def track(self, user_id: str, product_id: str, interaction_type: str, value: float = 1.0):
        """Agregar interacción a la cola"""
        interaction = {
            "external_user_id": user_id,
            "external_product_id": product_id,
            "interaction_type": interaction_type,
            "value": value,
            "timestamp": datetime.utcnow().isoformat()
        }
        
        self.queue.put(interaction)
    
    def stop(self):
        """Detener servicio"""
        self.running = False

# Uso en aplicación web
tracking_service = AsyncTrackingService(api_key="sk_prod_...")

# En una vista de Django/Flask
def product_view(request, product_id):
    # Tracking asíncrono
    tracking_service.track(
        user_id=request.user.id,
        product_id=product_id,
        interaction_type="view"
    )
    
    # Continuar con la lógica normal
    return render(request, 'product.html', context)
Optimización de Rendimiento

Mejores prácticas

  • • Usa connection pooling con requests.Session()
  • • Implementa retry logic con backoff exponencial
  • • Cachea recomendaciones por 5-15 minutos
  • • Usa tracking asíncrono para interacciones
  • • Implementa fallbacks para alta disponibilidad

⚠️ Consideraciones

  • • No bloquees el hilo principal con llamadas síncronas
  • • Monitorea rate limits y ajusta frecuencia
  • • Usa timeouts apropiados (10-30 segundos)
  • • Logea errores pero no expongas detalles al usuario