Manejo de tareas asíncronas con 'arq' (Redis) para escalar la ingesta y procesos pesados.
Installation
Details
Usage
After installing, this skill will be available to your AI coding assistant.
Verify installation:
npx agent-skills-cli listSkill Instructions
name: background-tasks description: Manejo de tareas asíncronas con 'arq' (Redis) para escalar la ingesta y procesos pesados. trigger: background OR async OR queue OR arq OR redis OR task scope: backend
Background Tasks (arq + Redis)
Contexto
Para evitar bloquear el request HTTP durante procesos largos (ej. Procesamiento de IA, uploads pesados), usamos arq con Redis.
Esta skill fue generada siguiendo el [Protocolo de Adquisición de Conocimiento] usando ask_context7.py arq.
Instalación
pip install arq redis
Patrón de Implementación (FastAPI)
1. Configuración del Worker (worker.py)
Crea un archivo dedicado para definir los settings y las funciones del worker.
import asyncio
from arq.connections import RedisSettings
from backend.core.config import settings
async def startup(ctx):
print("🚀 Worker starting...")
# Inicializar DB o AI clients aquí y guardarlos en ctx
# ctx['db'] = ...
async def shutdown(ctx):
print("🛑 Worker shutting down...")
async def process_ingestion(ctx, file_path: str, user_id: str):
"""
Tarea pesada de ejemplo.
"""
print(f"Processing ingestion for {user_id} at {file_path}")
# Simular trabajo
await asyncio.sleep(5)
return "done"
class WorkerSettings:
functions = [process_ingestion]
on_startup = startup
on_shutdown = shutdown
redis_settings = RedisSettings(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT
)
2. Encolar Trabajos (Desde un Router)
En tus routers de FastAPI, usa create_pool para conectar y encolar.
from arq import create_pool
from arq.connections import RedisSettings
# ... En tu endpoint ...
@router.post("/ingest")
async def ingest_file(file: UploadFile):
# 1. Guardar archivo temporalmente (o subir a storage rápido)
# ...
# 2. Encolar tarea
redis = await create_pool(RedisSettings())
await redis.enqueue_job('process_ingestion', file_path="tmp/file.jpg", user_id="123")
return {"status": "queued", "msg": "Processing in background"}
Best Practices (Context7 Findings)
- Job Deferral: Puedes programar tareas para el futuro usando
_defer_by(timedelta) o_defer_until(datetime).await redis.enqueue_job('task_name', _defer_by=timedelta(minutes=5)) - Context Injection: Usa
ctxen las funciones del worker para compartir conexiones a DB/AI (evita crearlas en cada ejecución). - Error Handling:
arqreintenta fallos automáticamente si se configuramax_tries.
Referencia
- Generado con:
python scripts/ask_context7.py arq "how to use arq with fastapi"
More by LeandroLarrosa
View allStandardizes project for Cloud Sovereignty (Coolify + VPS) with Eco Mode resource limits and Professional Folder Hierarchy.
Integración con Context7 MCP Server para obtener documentación actualizada de librerías.
Conventional Commits specification for standardized, semantic versioning-compatible commit messages
Plantilla base para crear nuevas skills en este proyecto. Copia esta estructura.
