Implement Customer.io reference architecture. Use when designing integrations, planning architecture, or implementing enterprise patterns. Trigger with phrases like "customer.io architecture", "customer.io design", "customer.io enterprise", "customer.io integration pattern".
Installation
Details
Usage
After installing, this skill will be available to your AI coding assistant.
Verify installation:
skills listSkill Instructions
name: customerio-reference-architecture description: | Implement Customer.io reference architecture. Use when designing integrations, planning architecture, or implementing enterprise patterns. Trigger with phrases like "customer.io architecture", "customer.io design", "customer.io enterprise", "customer.io integration pattern". allowed-tools: Read, Write, Edit, Bash(gh:), Bash(curl:) version: 1.0.0 license: MIT author: Jeremy Longshore jeremy@intentsolutions.io
Customer.io Reference Architecture
Overview
Enterprise-grade reference architecture for Customer.io integration with proper separation of concerns, reliability, and scalability.
Architecture Diagram
Customer.io
|
+-------------------+-------------------+
| | |
Track API App API Webhooks
| | |
v v v
+-------+-------+ +-------+-------+ +-------+-------+
| Event Bus | | Transactional | | Webhook Handler|
| (Kafka) | | Service | | (Express) |
+-------+-------+ +-------+-------+ +-------+-------+
| | |
v v v
+-------+-------+ +-------+-------+ +-------+-------+
| CustomerIO | | Email | | Event |
| Worker | | Templates | | Processor |
+-------+-------+ +-------+-------+ +-------+-------+
| | |
+-------------------+-------------------+
|
v
+-------+-------+
| Data Lake |
| (BigQuery) |
+---------------+
Instructions
Step 1: Core Service Layer
// src/services/customerio/index.ts
import { TrackClient, APIClient, RegionUS } from '@customerio/track';
import { EventEmitter } from 'events';
export interface CustomerIOConfig {
trackSiteId: string;
trackApiKey: string;
appApiKey: string;
region: 'us' | 'eu';
environment: 'development' | 'staging' | 'production';
}
export class CustomerIOService extends EventEmitter {
private trackClient: TrackClient;
private apiClient: APIClient;
private config: CustomerIOConfig;
constructor(config: CustomerIOConfig) {
super();
this.config = config;
this.trackClient = new TrackClient(
config.trackSiteId,
config.trackApiKey,
{ region: config.region === 'eu' ? RegionEU : RegionUS }
);
this.apiClient = new APIClient(config.appApiKey, {
region: config.region === 'eu' ? RegionEU : RegionUS
});
}
// User management
async identifyUser(userId: string, attributes: UserAttributes): Promise<void> {
this.emit('identify:start', { userId, attributes });
try {
await this.trackClient.identify(userId, {
...attributes,
_env: this.config.environment,
_updated_at: Math.floor(Date.now() / 1000)
});
this.emit('identify:success', { userId });
} catch (error) {
this.emit('identify:error', { userId, error });
throw error;
}
}
// Event tracking
async trackEvent(userId: string, event: EventPayload): Promise<void> {
this.emit('track:start', { userId, event });
try {
await this.trackClient.track(userId, {
name: event.name,
data: {
...event.data,
_env: this.config.environment
}
});
this.emit('track:success', { userId, event: event.name });
} catch (error) {
this.emit('track:error', { userId, event: event.name, error });
throw error;
}
}
// Transactional messaging
async sendTransactional(request: TransactionalRequest): Promise<void> {
return this.apiClient.sendEmail(request);
}
}
Step 2: Event Bus Integration
// src/services/customerio/event-bus.ts
import { Kafka, Producer, Consumer } from 'kafkajs';
import { CustomerIOService } from './index';
interface CustomerIOEvent {
type: 'identify' | 'track' | 'transactional';
userId: string;
payload: any;
timestamp: number;
correlationId: string;
}
export class CustomerIOEventBus {
private producer: Producer;
private consumer: Consumer;
private service: CustomerIOService;
constructor(kafka: Kafka, service: CustomerIOService) {
this.producer = kafka.producer();
this.consumer = kafka.consumer({ groupId: 'customerio-worker' });
this.service = service;
}
async start(): Promise<void> {
await this.producer.connect();
await this.consumer.connect();
await this.consumer.subscribe({
topics: ['customerio.identify', 'customerio.track', 'customerio.transactional']
});
await this.consumer.run({
eachMessage: async ({ topic, message }) => {
const event: CustomerIOEvent = JSON.parse(message.value!.toString());
await this.processEvent(topic, event);
}
});
}
private async processEvent(topic: string, event: CustomerIOEvent): Promise<void> {
const startTime = Date.now();
try {
switch (event.type) {
case 'identify':
await this.service.identifyUser(event.userId, event.payload);
break;
case 'track':
await this.service.trackEvent(event.userId, event.payload);
break;
case 'transactional':
await this.service.sendTransactional(event.payload);
break;
}
// Emit success metrics
await this.producer.send({
topic: 'customerio.processed',
messages: [{
key: event.correlationId,
value: JSON.stringify({
...event,
status: 'success',
processingTime: Date.now() - startTime
})
}]
});
} catch (error) {
// Dead letter queue for failed events
await this.producer.send({
topic: 'customerio.dlq',
messages: [{
key: event.correlationId,
value: JSON.stringify({
...event,
status: 'failed',
error: error.message,
processingTime: Date.now() - startTime
})
}]
});
}
}
// Publish events to be processed
async publish(event: CustomerIOEvent): Promise<void> {
await this.producer.send({
topic: `customerio.${event.type}`,
messages: [{
key: event.userId,
value: JSON.stringify(event)
}]
});
}
}
Step 3: Repository Pattern
// src/repositories/user-messaging.ts
import { CustomerIOService } from '../services/customerio';
import { UserRepository } from './user';
export interface MessagingPreferences {
email: boolean;
push: boolean;
sms: boolean;
inApp: boolean;
}
export class UserMessagingRepository {
constructor(
private cio: CustomerIOService,
private users: UserRepository
) {}
async syncUser(userId: string): Promise<void> {
const user = await this.users.findById(userId);
if (!user) throw new Error(`User ${userId} not found`);
const preferences = await this.getPreferences(userId);
await this.cio.identifyUser(userId, {
email: user.email,
first_name: user.firstName,
last_name: user.lastName,
created_at: Math.floor(user.createdAt.getTime() / 1000),
plan: user.subscription?.plan || 'free',
// Preferences
email_opt_in: preferences.email,
push_opt_in: preferences.push,
sms_opt_in: preferences.sms
});
}
async getPreferences(userId: string): Promise<MessagingPreferences> {
// Load from your preferences store
return {
email: true,
push: false,
sms: false,
inApp: true
};
}
async updatePreferences(
userId: string,
preferences: Partial<MessagingPreferences>
): Promise<void> {
// Update local store
await this.savePreferences(userId, preferences);
// Sync to Customer.io
await this.cio.identifyUser(userId, {
email_opt_in: preferences.email,
push_opt_in: preferences.push,
sms_opt_in: preferences.sms
});
}
}
Step 4: Webhook Handler
// src/webhooks/customerio.ts
import { Router } from 'express';
import { EventEmitter } from 'events';
export class CustomerIOWebhooks extends EventEmitter {
private router: Router;
private signingSecret: string;
constructor(signingSecret: string) {
super();
this.signingSecret = signingSecret;
this.router = Router();
this.setupRoutes();
}
private setupRoutes(): void {
this.router.post('/', async (req, res) => {
// Verify signature
if (!this.verifySignature(req)) {
return res.status(401).send('Invalid signature');
}
// Process events
const events = req.body.events || [];
for (const event of events) {
this.emit(event.metric, event);
this.emit('*', event);
}
res.status(200).json({ received: events.length });
});
}
getRouter(): Router {
return this.router;
}
}
// Usage
const webhooks = new CustomerIOWebhooks(process.env.WEBHOOK_SECRET!);
webhooks.on('email_delivered', (event) => {
// Update delivery status
});
webhooks.on('email_bounced', async (event) => {
// Handle bounce - suppress user
await cio.suppress(event.data.customer_id);
});
webhooks.on('*', (event) => {
// Stream all events to data warehouse
await streamToDataWarehouse(event);
});
app.use('/webhooks/customerio', webhooks.getRouter());
Step 5: Infrastructure as Code
# terraform/customerio.tf
resource "google_secret_manager_secret" "customerio_site_id" {
secret_id = "customerio-site-id"
replication {
auto {}
}
}
resource "google_secret_manager_secret" "customerio_api_key" {
secret_id = "customerio-api-key"
replication {
auto {}
}
}
resource "google_cloud_run_service" "customerio_worker" {
name = "customerio-worker"
location = var.region
template {
spec {
containers {
image = "gcr.io/${var.project}/customerio-worker:latest"
env {
name = "CUSTOMERIO_SITE_ID"
value_from {
secret_key_ref {
name = google_secret_manager_secret.customerio_site_id.secret_id
key = "latest"
}
}
}
env {
name = "CUSTOMERIO_API_KEY"
value_from {
secret_key_ref {
name = google_secret_manager_secret.customerio_api_key.secret_id
key = "latest"
}
}
}
}
}
}
}
resource "google_pubsub_topic" "customerio_events" {
name = "customerio-events"
}
resource "google_bigquery_dataset" "customerio" {
dataset_id = "customerio_events"
location = var.region
}
resource "google_bigquery_table" "delivery_events" {
dataset_id = google_bigquery_dataset.customerio.dataset_id
table_id = "delivery_events"
schema = file("${path.module}/schemas/delivery_events.json")
time_partitioning {
type = "DAY"
field = "timestamp"
}
}
Architecture Principles
- Separation of Concerns: Track API, App API, and Webhooks are handled by separate services
- Event-Driven: Use message queues for reliable async processing
- Idempotency: All operations can be safely retried
- Observability: Events are emitted for monitoring and debugging
- Infrastructure as Code: All resources defined in Terraform
Output
- Core Customer.io service layer
- Event bus integration (Kafka)
- Repository pattern for user messaging
- Webhook handler with signature verification
- Terraform infrastructure code
Resources
Next Steps
After implementing architecture, proceed to customerio-multi-env-setup for multi-environment configuration.
More by jeremylongshore
View allRabbitmq Queue Setup - Auto-activating skill for Backend Development. Triggers on: rabbitmq queue setup, rabbitmq queue setup Part of the Backend Development skill category.
evaluating-machine-learning-models: This skill allows Claude to evaluate machine learning models using a comprehensive suite of metrics. It should be used when the user requests model performance analysis, validation, or testing. Claude can use this skill to assess model accuracy, precision, recall, F1-score, and other relevant metrics. Trigger this skill when the user mentions "evaluate model", "model performance", "testing metrics", "validation results", or requests a comprehensive "model evaluation".
building-neural-networks: This skill allows Claude to construct and configure neural network architectures using the neural-network-builder plugin. It should be used when the user requests the creation of a new neural network, modification of an existing one, or assistance with defining the layers, parameters, and training process. The skill is triggered by requests involving terms like "build a neural network," "define network architecture," "configure layers," or specific mentions of neural network types (e.g., "CNN," "RNN," "transformer").
Oauth Callback Handler - Auto-activating skill for API Integration. Triggers on: oauth callback handler, oauth callback handler Part of the API Integration skill category.
