jeremylongshore

langchain-enterprise-rbac

@jeremylongshore/langchain-enterprise-rbac
jeremylongshore
1,004
123 forks
Updated 1/18/2026
View on GitHub

Implement enterprise role-based access control for LangChain applications. Use when implementing user permissions, multi-tenant access, or enterprise security controls for LLM applications. Trigger with phrases like "langchain RBAC", "langchain permissions", "langchain access control", "langchain multi-tenant", "langchain enterprise auth".

Installation

$skills install @jeremylongshore/langchain-enterprise-rbac
Claude Code
Cursor
Copilot
Codex
Antigravity

Details

Pathplugins/saas-packs/langchain-pack/skills/langchain-enterprise-rbac/SKILL.md
Branchmain
Scoped Name@jeremylongshore/langchain-enterprise-rbac

Usage

After installing, this skill will be available to your AI coding assistant.

Verify installation:

skills list

Skill Instructions


name: langchain-enterprise-rbac description: | Implement enterprise role-based access control for LangChain applications. Use when implementing user permissions, multi-tenant access, or enterprise security controls for LLM applications. Trigger with phrases like "langchain RBAC", "langchain permissions", "langchain access control", "langchain multi-tenant", "langchain enterprise auth". allowed-tools: Read, Write, Edit version: 1.0.0 license: MIT author: Jeremy Longshore jeremy@intentsolutions.io

LangChain Enterprise RBAC

Overview

Implement role-based access control (RBAC) for LangChain applications with multi-tenant support and fine-grained permissions.

Prerequisites

  • LangChain application with user authentication
  • Identity provider (Auth0, Okta, Azure AD)
  • Understanding of RBAC concepts

Instructions

Step 1: Define Permission Model

from enum import Enum
from typing import Set, Optional
from pydantic import BaseModel
from datetime import datetime

class Permission(str, Enum):
    # Chain permissions
    CHAIN_READ = "chain:read"
    CHAIN_EXECUTE = "chain:execute"
    CHAIN_CREATE = "chain:create"
    CHAIN_DELETE = "chain:delete"

    # Model permissions
    MODEL_GPT4 = "model:gpt-4"
    MODEL_GPT4_MINI = "model:gpt-4o-mini"
    MODEL_CLAUDE = "model:claude"

    # Feature permissions
    FEATURE_STREAMING = "feature:streaming"
    FEATURE_TOOLS = "feature:tools"
    FEATURE_RAG = "feature:rag"

    # Admin permissions
    ADMIN_USERS = "admin:users"
    ADMIN_BILLING = "admin:billing"
    ADMIN_AUDIT = "admin:audit"

class Role(BaseModel):
    name: str
    permissions: Set[Permission]
    description: str = ""

# Predefined roles
ROLES = {
    "viewer": Role(
        name="viewer",
        permissions={Permission.CHAIN_READ},
        description="Read-only access to chains"
    ),
    "user": Role(
        name="user",
        permissions={
            Permission.CHAIN_READ,
            Permission.CHAIN_EXECUTE,
            Permission.MODEL_GPT4_MINI,
        },
        description="Standard user with execution rights"
    ),
    "power_user": Role(
        name="power_user",
        permissions={
            Permission.CHAIN_READ,
            Permission.CHAIN_EXECUTE,
            Permission.CHAIN_CREATE,
            Permission.MODEL_GPT4_MINI,
            Permission.MODEL_GPT4,
            Permission.FEATURE_STREAMING,
            Permission.FEATURE_TOOLS,
        },
        description="Power user with advanced features"
    ),
    "admin": Role(
        name="admin",
        permissions=set(Permission),  # All permissions
        description="Full administrative access"
    ),
}

Step 2: User and Tenant Management

from typing import Dict, List
import uuid

class Tenant(BaseModel):
    id: str
    name: str
    allowed_models: List[str] = []
    monthly_token_limit: int = 1_000_000
    features: Set[str] = set()
    created_at: datetime = None

class User(BaseModel):
    id: str
    email: str
    tenant_id: str
    roles: List[str]
    created_at: datetime = None

    def get_permissions(self) -> Set[Permission]:
        """Get all permissions for user based on roles."""
        permissions = set()
        for role_name in self.roles:
            if role_name in ROLES:
                permissions.update(ROLES[role_name].permissions)
        return permissions

    def has_permission(self, permission: Permission) -> bool:
        """Check if user has specific permission."""
        return permission in self.get_permissions()

class UserStore:
    """User and tenant management."""

    def __init__(self):
        self.tenants: Dict[str, Tenant] = {}
        self.users: Dict[str, User] = {}

    def create_tenant(self, name: str, **kwargs) -> Tenant:
        tenant = Tenant(
            id=str(uuid.uuid4()),
            name=name,
            created_at=datetime.now(),
            **kwargs
        )
        self.tenants[tenant.id] = tenant
        return tenant

    def create_user(
        self,
        email: str,
        tenant_id: str,
        roles: List[str] = None
    ) -> User:
        if tenant_id not in self.tenants:
            raise ValueError(f"Tenant {tenant_id} not found")

        user = User(
            id=str(uuid.uuid4()),
            email=email,
            tenant_id=tenant_id,
            roles=roles or ["user"],
            created_at=datetime.now()
        )
        self.users[user.id] = user
        return user

    def get_user_tenant(self, user_id: str) -> Optional[Tenant]:
        user = self.users.get(user_id)
        if user:
            return self.tenants.get(user.tenant_id)
        return None

Step 3: Permission Enforcement

from functools import wraps
from fastapi import HTTPException, Depends, Request
from typing import Callable

class PermissionChecker:
    """Check and enforce permissions."""

    def __init__(self, user_store: UserStore):
        self.user_store = user_store

    def require_permission(self, permission: Permission):
        """Decorator to require specific permission."""
        def decorator(func: Callable):
            @wraps(func)
            async def wrapper(request: Request, *args, **kwargs):
                user_id = request.state.user_id  # Set by auth middleware
                user = self.user_store.users.get(user_id)

                if not user:
                    raise HTTPException(status_code=401, detail="User not found")

                if not user.has_permission(permission):
                    raise HTTPException(
                        status_code=403,
                        detail=f"Permission denied: {permission.value}"
                    )

                return await func(request, *args, **kwargs)
            return wrapper
        return decorator

    def require_any_permission(self, permissions: List[Permission]):
        """Require at least one of the specified permissions."""
        def decorator(func: Callable):
            @wraps(func)
            async def wrapper(request: Request, *args, **kwargs):
                user_id = request.state.user_id
                user = self.user_store.users.get(user_id)

                if not user:
                    raise HTTPException(status_code=401)

                if not any(user.has_permission(p) for p in permissions):
                    raise HTTPException(status_code=403)

                return await func(request, *args, **kwargs)
            return wrapper
        return decorator

# Usage
user_store = UserStore()
checker = PermissionChecker(user_store)

@app.post("/chains/{chain_id}/execute")
@checker.require_permission(Permission.CHAIN_EXECUTE)
async def execute_chain(request: Request, chain_id: str):
    # User has CHAIN_EXECUTE permission
    pass

Step 4: Model Access Control

from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic

class ModelAccessController:
    """Control access to LLM models based on permissions."""

    MODEL_PERMISSIONS = {
        "gpt-4o": Permission.MODEL_GPT4,
        "gpt-4o-mini": Permission.MODEL_GPT4_MINI,
        "claude-3-5-sonnet-20241022": Permission.MODEL_CLAUDE,
    }

    def __init__(self, user_store: UserStore):
        self.user_store = user_store

    def get_allowed_models(self, user_id: str) -> List[str]:
        """Get list of models user can access."""
        user = self.user_store.users.get(user_id)
        if not user:
            return []

        permissions = user.get_permissions()
        tenant = self.user_store.get_user_tenant(user_id)

        allowed = []
        for model, permission in self.MODEL_PERMISSIONS.items():
            if permission in permissions:
                # Also check tenant restrictions
                if tenant and tenant.allowed_models:
                    if model in tenant.allowed_models:
                        allowed.append(model)
                else:
                    allowed.append(model)

        return allowed

    def create_llm(self, user_id: str, model: str = None):
        """Create LLM instance with access control."""
        allowed = self.get_allowed_models(user_id)

        if not allowed:
            raise PermissionError("No model access")

        # Use requested model or default to first allowed
        model = model or allowed[0]

        if model not in allowed:
            raise PermissionError(f"Access denied to model: {model}")

        if model.startswith("gpt"):
            return ChatOpenAI(model=model)
        elif model.startswith("claude"):
            return ChatAnthropic(model=model)
        else:
            raise ValueError(f"Unknown model: {model}")

Step 5: Tenant Isolation

from langchain_core.callbacks import BaseCallbackHandler
from contextvars import ContextVar

# Context variable for current tenant
current_tenant: ContextVar[str] = ContextVar("current_tenant")

class TenantIsolationMiddleware:
    """Middleware to enforce tenant isolation."""

    def __init__(self, user_store: UserStore):
        self.user_store = user_store

    async def __call__(self, request: Request, call_next):
        user_id = request.state.user_id
        user = self.user_store.users.get(user_id)

        if user:
            # Set tenant context
            token = current_tenant.set(user.tenant_id)
            try:
                response = await call_next(request)
            finally:
                current_tenant.reset(token)
            return response

        return await call_next(request)

class TenantAwareCallback(BaseCallbackHandler):
    """Tag all LLM calls with tenant ID."""

    def on_llm_start(self, serialized, prompts, **kwargs):
        tenant_id = current_tenant.get(None)
        if tenant_id:
            # Add tenant to metadata for billing/logging
            kwargs.setdefault("metadata", {})["tenant_id"] = tenant_id

# Tenant-scoped data access
class TenantScopedVectorStore:
    """Vector store with tenant isolation."""

    def __init__(self, base_store):
        self.base_store = base_store

    def similarity_search(self, query: str, **kwargs):
        tenant_id = current_tenant.get()
        if not tenant_id:
            raise ValueError("Tenant context required")

        # Add tenant filter
        kwargs["filter"] = kwargs.get("filter", {})
        kwargs["filter"]["tenant_id"] = tenant_id

        return self.base_store.similarity_search(query, **kwargs)

Step 6: Usage Quotas

from datetime import datetime, timedelta
from collections import defaultdict

class UsageQuotaManager:
    """Manage usage quotas per user and tenant."""

    def __init__(self, user_store: UserStore):
        self.user_store = user_store
        self.usage = defaultdict(lambda: {"tokens": 0, "requests": 0})
        self.reset_time = {}

    def check_quota(self, user_id: str, tokens: int = 0) -> bool:
        """Check if user has available quota."""
        user = self.user_store.users.get(user_id)
        tenant = self.user_store.get_user_tenant(user_id)

        if not tenant:
            return False

        # Reset monthly quota if needed
        self._maybe_reset(tenant.id)

        current = self.usage[tenant.id]["tokens"]
        return (current + tokens) <= tenant.monthly_token_limit

    def record_usage(self, user_id: str, tokens: int) -> None:
        """Record token usage."""
        tenant = self.user_store.get_user_tenant(user_id)
        if tenant:
            self.usage[tenant.id]["tokens"] += tokens
            self.usage[tenant.id]["requests"] += 1

    def _maybe_reset(self, tenant_id: str) -> None:
        """Reset quota at start of month."""
        now = datetime.now()
        last_reset = self.reset_time.get(tenant_id)

        if not last_reset or last_reset.month != now.month:
            self.usage[tenant_id] = {"tokens": 0, "requests": 0}
            self.reset_time[tenant_id] = now

    def get_usage_report(self, tenant_id: str) -> dict:
        """Get usage report for tenant."""
        tenant = self.user_store.tenants.get(tenant_id)
        usage = self.usage[tenant_id]

        return {
            "tenant_id": tenant_id,
            "tokens_used": usage["tokens"],
            "tokens_limit": tenant.monthly_token_limit if tenant else 0,
            "requests": usage["requests"],
            "percentage_used": (usage["tokens"] / tenant.monthly_token_limit * 100) if tenant else 0
        }

Output

  • Permission model with roles
  • User and tenant management
  • Model access control
  • Tenant isolation
  • Usage quotas

Resources

Next Steps

Use langchain-data-handling for data privacy controls.

More by jeremylongshore

View all
rabbitmq-queue-setup
1,004

Rabbitmq Queue Setup - Auto-activating skill for Backend Development. Triggers on: rabbitmq queue setup, rabbitmq queue setup Part of the Backend Development skill category.

model-evaluation-suite
1,004

evaluating-machine-learning-models: This skill allows Claude to evaluate machine learning models using a comprehensive suite of metrics. It should be used when the user requests model performance analysis, validation, or testing. Claude can use this skill to assess model accuracy, precision, recall, F1-score, and other relevant metrics. Trigger this skill when the user mentions "evaluate model", "model performance", "testing metrics", "validation results", or requests a comprehensive "model evaluation".

neural-network-builder
1,004

building-neural-networks: This skill allows Claude to construct and configure neural network architectures using the neural-network-builder plugin. It should be used when the user requests the creation of a new neural network, modification of an existing one, or assistance with defining the layers, parameters, and training process. The skill is triggered by requests involving terms like "build a neural network," "define network architecture," "configure layers," or specific mentions of neural network types (e.g., "CNN," "RNN," "transformer").

oauth-callback-handler
1,004

Oauth Callback Handler - Auto-activating skill for API Integration. Triggers on: oauth callback handler, oauth callback handler Part of the API Integration skill category.