High-performance PostgreSQL patterns. Use when optimizing queries, designing for scale, or debugging performance issues.
Installation
Details
Usage
After installing, this skill will be available to your AI coding assistant.
Verify installation:
npx agent-skills-cli listSkill Instructions
name: postgres-performance description: High-performance PostgreSQL patterns. Use when optimizing queries, designing for scale, or debugging performance issues.
PostgreSQL Performance Engineering
Problem Statement
Performance problems compound. A query that takes 50ms at 1K rows takes 5s at 100K rows. This skill covers patterns for building performant database interactions from the start and fixing performance issues.
Pattern: Query Optimization Workflow
Step 1: Identify Slow Queries
-- Enable pg_stat_statements (if not already)
CREATE EXTENSION IF NOT EXISTS pg_stat_statements;
-- Find slowest queries
SELECT
query,
calls,
round(mean_exec_time::numeric, 2) as avg_ms,
round(total_exec_time::numeric, 2) as total_ms,
rows
FROM pg_stat_statements
WHERE calls > 10
ORDER BY mean_exec_time DESC
LIMIT 20;
Step 2: Analyze Query Plan
EXPLAIN (ANALYZE, BUFFERS, FORMAT TEXT)
SELECT * FROM assessments
WHERE user_id = 'abc-123'
ORDER BY created_at DESC
LIMIT 10;
What to look for:
| Warning Sign | Problem | Solution |
|---|---|---|
| Seq Scan on large table | Missing index | Add index |
High loops count | N+1 in join | Rewrite query, add index |
| Sort with high cost | No index for ORDER BY | Covering index |
| Hash/Merge Join with high rows | Large intermediate result | Filter earlier, better indexes |
| Buffers: shared read high | Data not cached | More RAM, or query less data |
Step 3: Fix and Verify
-- Add index
CREATE INDEX CONCURRENTLY ix_assessments_user_created
ON assessments (user_id, created_at DESC);
-- Verify improvement
EXPLAIN (ANALYZE, BUFFERS)
SELECT * FROM assessments
WHERE user_id = 'abc-123'
ORDER BY created_at DESC
LIMIT 10;
-- Should now show "Index Scan" instead of "Seq Scan"
Pattern: Covering Indexes (Index-Only Scans)
Problem: Query reads index, then fetches rows from table (heap fetch).
-- Query
SELECT id, title, status FROM assessments WHERE user_id = ?;
-- Regular index: requires heap fetch
CREATE INDEX ix_assessments_user ON assessments (user_id);
-- Plan: Index Scan + Heap Fetches
-- β
Covering index: all columns in index
CREATE INDEX ix_assessments_user_covering
ON assessments (user_id)
INCLUDE (id, title, status);
-- Plan: Index Only Scan (no heap fetch, much faster)
When to use:
- Frequently run queries
- Queries selecting few columns
- Tables with many columns (heap fetch is expensive)
Pattern: Pagination at Scale
-- β SLOW: OFFSET-based pagination
SELECT * FROM events ORDER BY created_at DESC LIMIT 20 OFFSET 10000;
-- Must scan and discard 10,000 rows!
-- β
FAST: Cursor-based (keyset) pagination
SELECT * FROM events
WHERE created_at < '2024-01-15T10:30:00Z' -- Last seen timestamp
ORDER BY created_at DESC
LIMIT 20;
-- Jumps directly to the right place via index
-- For compound cursor (when duplicates possible):
SELECT * FROM events
WHERE (created_at, id) < ('2024-01-15T10:30:00Z', 'last-id')
ORDER BY created_at DESC, id DESC
LIMIT 20;
In SQLAlchemy:
# Cursor-based pagination
async def get_events_page(
session: AsyncSession,
cursor_time: datetime | None,
cursor_id: UUID | None,
limit: int = 20,
) -> list[Event]:
query = select(Event).order_by(Event.created_at.desc(), Event.id.desc())
if cursor_time and cursor_id:
query = query.where(
tuple_(Event.created_at, Event.id) < (cursor_time, cursor_id)
)
result = await session.execute(query.limit(limit))
return result.scalars().all()
Pattern: Batch Processing
-- β SLOW: One huge query/update
UPDATE events SET processed = true WHERE processed = false;
-- Locks millions of rows, times out
-- β
FAST: Batch processing
DO $$
DECLARE
batch_size INT := 10000;
rows_affected INT;
BEGIN
LOOP
UPDATE events
SET processed = true
WHERE id IN (
SELECT id FROM events
WHERE processed = false
LIMIT batch_size
FOR UPDATE SKIP LOCKED
);
GET DIAGNOSTICS rows_affected = ROW_COUNT;
IF rows_affected = 0 THEN
EXIT;
END IF;
COMMIT;
PERFORM pg_sleep(0.1); -- Brief pause to let other queries through
END LOOP;
END $$;
In Python:
async def process_in_batches(session: AsyncSession, batch_size: int = 10000):
while True:
result = await session.execute(
text("""
UPDATE events SET processed = true
WHERE id IN (
SELECT id FROM events
WHERE processed = false
LIMIT :batch_size
FOR UPDATE SKIP LOCKED
)
RETURNING id
"""),
{"batch_size": batch_size}
)
updated = result.fetchall()
await session.commit()
if len(updated) == 0:
break
await asyncio.sleep(0.1)
Pattern: Efficient Aggregations
-- β SLOW: Count with complex WHERE
SELECT COUNT(*) FROM events WHERE user_id = ? AND status = 'active';
-- Scans all matching rows
-- β
FAST: Approximate count (for large tables)
SELECT reltuples::bigint AS estimate
FROM pg_class
WHERE relname = 'events';
-- β
FAST: Maintain counter cache
-- Add column: assessments.answer_count
-- Update on INSERT/DELETE to answers
-- β
FAST: Materialized view for complex aggregations
CREATE MATERIALIZED VIEW user_stats AS
SELECT
user_id,
COUNT(*) as total_assessments,
AVG(rating) as avg_rating
FROM assessments
GROUP BY user_id;
-- Refresh periodically
REFRESH MATERIALIZED VIEW CONCURRENTLY user_stats;
Pattern: Connection Pool Tuning
# Async SQLAlchemy with proper pool settings
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.pool import NullPool, AsyncAdaptedQueuePool
# For serverless/Lambda (no persistent connections)
engine = create_async_engine(
DATABASE_URL,
poolclass=NullPool, # New connection per request
)
# For long-running servers
engine = create_async_engine(
DATABASE_URL,
poolclass=AsyncAdaptedQueuePool,
pool_size=10, # Base connections
max_overflow=20, # Extra connections under load
pool_timeout=30, # Wait for connection
pool_recycle=1800, # Recycle connections every 30 min
pool_pre_ping=True, # Test connection before use
)
PostgreSQL side:
-- Check max connections
SHOW max_connections; -- Default 100
-- See current connections
SELECT count(*) FROM pg_stat_activity;
-- Connection per application
SELECT application_name, count(*)
FROM pg_stat_activity
GROUP BY application_name;
Pattern: Read Replicas
# Route reads to replica, writes to primary
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
primary_engine = create_async_engine(PRIMARY_URL)
replica_engine = create_async_engine(REPLICA_URL)
class RoutingSession(Session):
def get_bind(self, mapper=None, clause=None):
if self._flushing or self.is_modified():
return primary_engine.sync_engine
return replica_engine.sync_engine
Pattern: Denormalization for Read Performance
-- β SLOW: Joining 4 tables for common query
SELECT
a.id, a.title, u.name as user_name,
COUNT(q.id) as question_count,
AVG(ans.value) as avg_score
FROM assessments a
JOIN users u ON a.user_id = u.id
JOIN questions q ON q.assessment_id = a.id
LEFT JOIN answers ans ON ans.question_id = q.id
GROUP BY a.id, a.title, u.name;
-- β
FAST: Denormalized columns
ALTER TABLE assessments ADD COLUMN user_name VARCHAR(100);
ALTER TABLE assessments ADD COLUMN question_count INT DEFAULT 0;
ALTER TABLE assessments ADD COLUMN avg_score NUMERIC(3,2);
-- Update via triggers or application code
-- Query becomes simple:
SELECT id, title, user_name, question_count, avg_score FROM assessments;
Tradeoffs:
- β Much faster reads
- β More complex writes (must update denormalized data)
- β Potential for stale data
Pattern: Partitioning Large Tables
-- Partition events by month
CREATE TABLE events (
id UUID PRIMARY KEY,
user_id UUID NOT NULL,
event_type VARCHAR(50),
created_at TIMESTAMPTZ NOT NULL
) PARTITION BY RANGE (created_at);
-- Create partitions
CREATE TABLE events_2024_01 PARTITION OF events
FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
CREATE TABLE events_2024_02 PARTITION OF events
FOR VALUES FROM ('2024-02-01') TO ('2024-03-01');
-- Query specific partition (fast)
SELECT * FROM events WHERE created_at >= '2024-01-15' AND created_at < '2024-02-01';
-- Drop old data instantly
DROP TABLE events_2023_01; -- Much faster than DELETE
Pattern: Caching Strategy
# Cache frequently-read, rarely-changed data
import redis.asyncio as redis
import json
cache = redis.from_url(REDIS_URL)
async def get_user_stats(user_id: UUID) -> UserStats:
cache_key = f"user_stats:{user_id}"
# Try cache first
cached = await cache.get(cache_key)
if cached:
return UserStats.model_validate_json(cached)
# Query database
async with get_session() as session:
stats = await calculate_user_stats(session, user_id)
# Cache for 5 minutes
await cache.setex(cache_key, 300, stats.model_dump_json())
return stats
# Invalidate on write
async def update_user_assessment(user_id: UUID, ...):
# ... update database ...
await cache.delete(f"user_stats:{user_id}")
Performance Monitoring Queries
-- Table bloat (needs VACUUM)
SELECT
schemaname, relname,
n_dead_tup as dead_tuples,
n_live_tup as live_tuples,
round(n_dead_tup::numeric / NULLIF(n_live_tup, 0) * 100, 2) as dead_pct
FROM pg_stat_user_tables
WHERE n_dead_tup > 10000
ORDER BY n_dead_tup DESC;
-- Index bloat
SELECT
indexrelname as index,
pg_size_pretty(pg_relation_size(indexrelid)) as size,
idx_scan as scans
FROM pg_stat_user_indexes
WHERE idx_scan = 0 -- Unused indexes
ORDER BY pg_relation_size(indexrelid) DESC;
-- Cache hit ratio (should be > 99%)
SELECT
sum(blks_hit) * 100.0 / sum(blks_hit + blks_read) as cache_hit_ratio
FROM pg_stat_database;
-- Long-running queries
SELECT
pid,
now() - query_start as duration,
query
FROM pg_stat_activity
WHERE state = 'active'
AND query NOT LIKE '%pg_stat%'
AND now() - query_start > interval '30 seconds';
Performance Checklist
Before deploying:
- Slow queries identified and optimized
- Indexes match query patterns
- Covering indexes for frequent queries
- Pagination uses cursor-based (not OFFSET)
- Large tables partitioned if > 10M rows
- Connection pool sized appropriately
- Cache layer for hot data
- Monitoring in place for slow queries
- VACUUM and ANALYZE scheduled
More by CJHarmath
View allComplex multi-step operations in React Native. Use when implementing flows with multiple async steps, state machine patterns, or debugging flow ordering issues.
Async testing patterns with pytest-asyncio. Use when writing tests, mocking async code, testing database operations, or debugging test failures.
Performance optimization for React Native. Use when optimizing lists, preventing re-renders, memoizing components, or debugging performance issues in Expo/React Native apps.
Logging, error messages, and debugging patterns for React. Use when adding logging, designing error messages, debugging production issues, or improving code observability. Works for both React web and React Native.
