Use when "data pipelines", "ETL", "data warehousing", "data lakes", or asking about "Airflow", "Spark", "dbt", "Snowflake", "BigQuery", "data modeling"
Installation
Details
Usage
After installing, this skill will be available to your AI coding assistant.
Verify installation:
npx agent-skills-cli listSkill Instructions
name: data-engineering description: Use when "data pipelines", "ETL", "data warehousing", "data lakes", or asking about "Airflow", "Spark", "dbt", "Snowflake", "BigQuery", "data modeling" version: 1.0.0
<!-- Adapted from: claude-skills/engineering-team/senior-data-engineer -->Data Engineering Guide
Data pipelines, warehousing, and modern data stack.
When to Use
- Building data pipelines
- Designing data warehouses
- Implementing ETL/ELT processes
- Setting up data lakes
- Optimizing data infrastructure
Modern Data Stack
Components
Sources β Ingestion β Storage β Transform β Serve β Consume
| Layer | Tools |
|---|---|
| Ingestion | Fivetran, Airbyte, Stitch |
| Storage | S3, GCS, Snowflake, BigQuery |
| Transform | dbt, Spark, Airflow |
| Orchestration | Airflow, Dagster, Prefect |
| Serving | Looker, Tableau, Metabase |
Data Pipeline Patterns
Batch Processing
# Airflow DAG example
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
dag = DAG(
'daily_etl',
schedule_interval='0 6 * * *',
start_date=datetime(2024, 1, 1)
)
def extract():
# Extract from source
pass
def transform():
# Transform data
pass
def load():
# Load to warehouse
pass
extract_task = PythonOperator(
task_id='extract',
python_callable=extract,
dag=dag
)
transform_task = PythonOperator(
task_id='transform',
python_callable=transform,
dag=dag
)
load_task = PythonOperator(
task_id='load',
python_callable=load,
dag=dag
)
extract_task >> transform_task >> load_task
Streaming Processing
# Kafka consumer example
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'events',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
for message in consumer:
process_event(message.value)
dbt Patterns
Model Structure
models/
βββ staging/ # 1:1 with source
β βββ stg_orders.sql
β βββ stg_customers.sql
βββ intermediate/ # Business logic
β βββ int_order_items.sql
βββ marts/ # Final models
βββ dim_customers.sql
βββ fct_orders.sql
Example Model
-- models/marts/fct_orders.sql
{{
config(
materialized='incremental',
unique_key='order_id'
)
}}
select
o.order_id,
o.customer_id,
o.order_date,
sum(oi.quantity * oi.unit_price) as order_total
from {{ ref('stg_orders') }} o
join {{ ref('stg_order_items') }} oi
on o.order_id = oi.order_id
{% if is_incremental() %}
where o.order_date > (select max(order_date) from {{ this }})
{% endif %}
group by 1, 2, 3
Data Modeling
Dimensional Modeling
Fact Tables (events/transactions)
βββ fct_orders
βββ fct_page_views
βββ fct_transactions
Dimension Tables (context)
βββ dim_customers
βββ dim_products
βββ dim_dates
βββ dim_locations
Star Schema
dim_customers
β
dim_dates ββ fct_orders ββ dim_products
β
dim_locations
Data Quality
Validation Rules
-- dbt tests
models:
- name: fct_orders
columns:
- name: order_id
tests:
- unique
- not_null
- name: order_total
tests:
- not_null
- positive_value
Quality Metrics
| Metric | Description |
|---|---|
| Completeness | % non-null values |
| Uniqueness | % distinct values |
| Timeliness | Data freshness |
| Accuracy | Matches source |
| Consistency | Across systems |
Performance Optimization
Partitioning
-- BigQuery partitioned table
CREATE TABLE orders
PARTITION BY DATE(order_date)
CLUSTER BY customer_id
AS SELECT * FROM staging.orders
Query Optimization
| Technique | Impact |
|---|---|
| Partitioning | Reduce scanned data |
| Clustering | Improve filter speed |
| Materialization | Pre-compute joins |
| Caching | Reduce repeat queries |
Monitoring
Pipeline Metrics
| Metric | Alert Threshold |
|---|---|
| Runtime | >2x normal |
| Row count | Β±20% variance |
| Freshness | >SLA |
| Failures | Any failure |
Data Observability
# Monte Carlo / Elementary example
monitors:
- table: fct_orders
tests:
- freshness:
threshold: 6 hours
- volume:
threshold: 10%
- schema_change: true
Best Practices
Pipeline Design
- Idempotent operations
- Incremental processing
- Clear data lineage
- Automated testing
Data Governance
- Document all models
- Track data lineage
- Implement access controls
- Version control SQL
Cost Management
- Monitor query costs
- Use partitioning
- Schedule off-peak
- Archive old data
More by eyadsibai
View allUse when "organizing files", "cleaning up folders", "finding duplicates", "structuring directories", or asking about "Downloads cleanup", "folder structure", "file management"
Use when "LangChain", "LLM chains", "ReAct agents", "tool calling", or asking about "RAG pipelines", "conversation memory", "document QA", "agent tools", "LangSmith"
This skill should be used when the user asks to "create a commit", "write commit message", "create a pull request", "generate changelog", "manage branches", "git workflow", "merge strategy", "PR description", or mentions git operations and version control workflows.
Use when implementing end-to-end tests, using Playwright or Cypress, testing user journeys, debugging flaky tests, or asking about "E2E testing", "Playwright", "Cypress", "browser testing", "visual regression", "test automation"
