Agent SkillsAgent Skills
Lobbi-Docs

batch-processing

@Lobbi-Docs/batch-processing
Lobbi-Docs
9
0 forks
Updated 4/12/2026
View on GitHub

Message Batches API for Claude with 50% cost savings on bulk processing. Activate for batch jobs, JSONL processing, bulk analysis, and cost optimization.

Installation

$npx agent-skills-cli install @Lobbi-Docs/batch-processing
Claude Code
Cursor
Copilot
Codex
Antigravity

Details

Path.claude/skills/batch-processing/SKILL.md
Branchmain
Scoped Name@Lobbi-Docs/batch-processing

Usage

After installing, this skill will be available to your AI coding assistant.

Verify installation:

npx agent-skills-cli list

Skill Instructions


name: batch-processing description: Message Batches API for Claude with 50% cost savings on bulk processing. Activate for batch jobs, JSONL processing, bulk analysis, and cost optimization. allowed-tools:

  • Bash
  • Read
  • Write
  • Edit
  • Glob
  • Grep
  • Task triggers:
  • batch
  • batches
  • bulk
  • jsonl
  • batch processing
  • batch api
  • cost optimization
  • mass processing dependencies:
  • llm-integration related-skills:
  • prompt-caching
  • streaming

Batch Processing Skill

Leverage Anthropic's Message Batches API for 50% cost savings on bulk processing workloads.

When to Use This Skill

  • Processing 10K+ documents
  • Model evaluation and benchmarks
  • ETL and data enrichment
  • Training data generation
  • Bulk content analysis
  • Any non-time-sensitive workload

Cost Savings

Processing TypeCost
Standard API100%
Batch API50%

Example: 1M tokens @ $3/1M = $3 (standard) vs $1.50 (batch)

Batch Lifecycle

Created → Processing → Ended (completed/failed/expired/canceled)
  • Processing time: Up to 24 hours
  • Results retention: 29 days
  • Full feature support: Tools, vision, system prompts

Core Implementation

Step 1: Create JSONL File

import json

# Each line is a complete request
requests = [
    {
        "custom_id": "request-1",
        "params": {
            "model": "claude-sonnet-4-20250514",
            "max_tokens": 1024,
            "messages": [{"role": "user", "content": "Summarize: Document 1..."}]
        }
    },
    {
        "custom_id": "request-2",
        "params": {
            "model": "claude-sonnet-4-20250514",
            "max_tokens": 1024,
            "messages": [{"role": "user", "content": "Summarize: Document 2..."}]
        }
    }
]

# Write JSONL file
with open("batch_requests.jsonl", "w") as f:
    for request in requests:
        f.write(json.dumps(request) + "\n")

Step 2: Submit Batch

import anthropic

client = anthropic.Anthropic()

# Create batch from file
with open("batch_requests.jsonl", "rb") as f:
    batch = client.beta.messages.batches.create(
        requests=f
    )

print(f"Batch ID: {batch.id}")
print(f"Status: {batch.processing_status}")

Step 3: Poll for Completion

import time

def wait_for_batch(client, batch_id, poll_interval=60):
    """Poll until batch completes"""
    while True:
        batch = client.beta.messages.batches.retrieve(batch_id)

        if batch.processing_status == "ended":
            print(f"Batch completed!")
            print(f"  Succeeded: {batch.request_counts.succeeded}")
            print(f"  Errored: {batch.request_counts.errored}")
            print(f"  Expired: {batch.request_counts.expired}")
            return batch

        print(f"Status: {batch.processing_status} - waiting...")
        time.sleep(poll_interval)

Step 4: Stream Results

def process_results(client, batch_id):
    """Stream and process batch results"""
    results = {}

    for result in client.beta.messages.batches.results(batch_id):
        custom_id = result.custom_id

        if result.result.type == "succeeded":
            message = result.result.message
            results[custom_id] = {
                "status": "success",
                "content": message.content[0].text,
                "usage": message.usage
            }
        elif result.result.type == "errored":
            results[custom_id] = {
                "status": "error",
                "error": result.result.error
            }
        elif result.result.type == "expired":
            results[custom_id] = {
                "status": "expired"
            }

    return results

Complete Workflow

import anthropic
import json
import time

def run_batch_job(documents):
    """Complete batch processing workflow"""
    client = anthropic.Anthropic()

    # 1. Create requests
    requests = []
    for i, doc in enumerate(documents):
        requests.append({
            "custom_id": f"doc-{i}",
            "params": {
                "model": "claude-sonnet-4-20250514",
                "max_tokens": 1024,
                "messages": [{"role": "user", "content": f"Summarize: {doc}"}]
            }
        })

    # 2. Write JSONL
    with open("batch.jsonl", "w") as f:
        for req in requests:
            f.write(json.dumps(req) + "\n")

    # 3. Submit batch
    with open("batch.jsonl", "rb") as f:
        batch = client.beta.messages.batches.create(requests=f)

    print(f"Batch submitted: {batch.id}")

    # 4. Wait for completion
    while True:
        batch = client.beta.messages.batches.retrieve(batch.id)
        if batch.processing_status == "ended":
            break
        time.sleep(60)

    # 5. Collect results
    results = {}
    for result in client.beta.messages.batches.results(batch.id):
        if result.result.type == "succeeded":
            results[result.custom_id] = result.result.message.content[0].text

    return results

Error Handling

Retry Failed Requests

def retry_failed(client, original_batch_id):
    """Retry failed and expired requests"""
    retry_requests = []

    for result in client.beta.messages.batches.results(original_batch_id):
        if result.result.type in ["errored", "expired"]:
            # Re-create the request (you'll need to store original params)
            retry_requests.append({
                "custom_id": result.custom_id,
                "params": get_original_params(result.custom_id)  # Your implementation
            })

    if retry_requests:
        # Write and submit retry batch
        with open("retry.jsonl", "w") as f:
            for req in retry_requests:
                f.write(json.dumps(req) + "\n")

        with open("retry.jsonl", "rb") as f:
            return client.beta.messages.batches.create(requests=f)

    return None

Error Types

Result TypeAction
succeededProcess normally
erroredCheck error, may retry
expiredRequest took >24h, retry
canceledBatch was canceled

API Reference

Endpoints

MethodEndpointPurpose
POST/v1/messages/batchesCreate batch
GET/v1/messages/batches/{id}Get batch status
GET/v1/messages/batchesList batches
POST/v1/messages/batches/{id}/cancelCancel batch
GET/v1/messages/batches/{id}/resultsStream results

Request Format

{
    "custom_id": "unique-identifier",
    "params": {
        "model": "claude-sonnet-4-20250514",
        "max_tokens": 1024,
        "system": "Optional system prompt",
        "messages": [{"role": "user", "content": "..."}],
        "tools": [],
        "temperature": 0.7
    }
}

Best Practices

DO:

  • Use for workloads >100 requests
  • Include unique custom_id for tracking
  • Store original params for retry logic
  • Monitor batch status via polling
  • Process results as streaming JSONL

DON'T:

  • Use for time-sensitive requests
  • Expect immediate results
  • Forget to handle expired requests
  • Ignore error results

Cost Optimization Tips

  1. Batch similar requests - Same model, similar prompts
  2. Combine with caching - Cache static context before batch
  3. Use appropriate model - Haiku for simple tasks, Sonnet for complex
  4. Optimize prompts - Shorter prompts = lower cost

Use Case: Document Processing

# Process 10,000 documents at 50% cost
documents = load_documents("data/*.txt")  # 10K files

# Create batch with consistent format
requests = [{
    "custom_id": doc.id,
    "params": {
        "model": "claude-haiku-4-20250514",  # Cheapest for bulk
        "max_tokens": 512,
        "system": "Extract key information as JSON.",
        "messages": [{"role": "user", "content": doc.content}]
    }
} for doc in documents]

# Run batch job
results = run_batch_job(requests)
# Cost: ~50% of standard API!

See Also

  • llm-integration - API basics
  • prompt-caching - Cache context
  • streaming - Real-time output