AI & Machine Learning

Deploying Production LLMs with AWS Bedrock: A Complete Guide

January 20, 2025
8 min read
By Engineering Team

Learn how to architect, deploy, and scale large language models in production using AWS Bedrock, covering cost optimization, security, and performance best practices.

Architecture Overview

Our production architecture leverages AWS Bedrock for scalable LLM deployments. This serverless approach ensures automatic scaling, cost optimization, and enterprise-grade security.

Infrastructure as Code

Terraform
# terraform/bedrock.tf
resource "aws_bedrockagent_agent" "production_agent" {
  agent_name              = "production-llm-agent"
  agent_resource_role_arn = aws_iam_role.bedrock_agent.arn
  foundation_model        = "anthropic.claude-3-sonnet-20240229-v1:0"
  
  instruction = "You are an AI assistant for enterprise applications"
  
  idle_session_ttl_in_seconds = 600
  
  tags = {
    Environment = "production"
    ManagedBy   = "terraform"
  }
}

resource "aws_lambda_function" "bedrock_proxy" {
  filename      = "bedrock_proxy.zip"
  function_name = "bedrock-llm-proxy"
  role          = aws_iam_role.lambda_exec.arn
  handler       = "index.handler"
  runtime       = "python3.11"
  
  environment {
    variables = {
      BEDROCK_MODEL_ID = var.bedrock_model_id
      MAX_TOKENS       = "4096"
      TEMPERATURE      = "0.7"
    }
  }
  
  timeout     = 300
  memory_size = 1024
}

Python Integration

Python
import boto3
import json
from typing import Dict, List, Optional

class BedrockLLMClient:
    def __init__(self, region_name: str = "us-east-1"):
        self.bedrock = boto3.client(
            service_name='bedrock-runtime',
            region_name=region_name
        )
        self.model_id = "anthropic.claude-3-sonnet-20240229-v1:0"
    
    def generate_response(
        self,
        prompt: str,
        max_tokens: int = 4096,
        temperature: float = 0.7,
        system_prompt: Optional[str] = None
    ) -> Dict:
        """Generate response from Claude via Bedrock."""
        
        messages = [{"role": "user", "content": prompt}]
        
        body = json.dumps({
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": max_tokens,
            "temperature": temperature,
            "messages": messages,
            "system": system_prompt or "You are a helpful AI assistant."
        })
        
        try:
            response = self.bedrock.invoke_model(
                modelId=self.model_id,
                body=body
            )
            
            response_body = json.loads(response.get('body').read())
            return {
                "success": True,
                "content": response_body['content'][0]['text'],
                "usage": response_body['usage'],
                "model": self.model_id
            }
        except Exception as e:
            return {
                "success": False,
                "error": str(e)
            }
    
    def stream_response(self, prompt: str):
        """Stream response from Bedrock."""
        response = self.bedrock.invoke_model_with_response_stream(
            modelId=self.model_id,
            body=json.dumps({
                "anthropic_version": "bedrock-2023-05-31",
                "max_tokens": 4096,
                "messages": [{"role": "user", "content": prompt}],
                "stream": True
            })
        )
        
        for event in response.get('body'):
            chunk = json.loads(event['chunk']['bytes'])
            if chunk['type'] == 'content_block_delta':
                yield chunk['delta']['text']

Cost Optimization and Rate Limiting

AWS Bedrock pricing is per-token, making cost optimization critical for production deployments.

  • Input tokens: $3 per 1M tokens
  • Output tokens: $15 per 1M tokens
  • No infrastructure costs (serverless)

Cost Optimization Strategies:

  • Reduce unnecessary context
  • Use concise system prompts
  • Cache common prefixes

2. Output Length Control:
max_tokens is critical - output tokens cost 5x input

BAD: Unbounded output → response = bedrock.invoke(prompt, max_tokens=4096)
GOOD: Limit based on use case → response = bedrock.invoke(prompt, max_tokens=500) for summaries

  • Cache identical prompts (Redis/DynamoDB)
  • Use semantic similarity for near-duplicates
  • 90%+ cache hit rate possible
  • Protect against abuse
  • Prevent bill shock
  • Implement quotas per user/tenant
  • 10K requests/day, 500 tokens avg → $450/month
  • 100K requests/day, 1K tokens avg → $9,000/month
  • 1M requests/day, 2K tokens avg → $180,000/month
Python
# Cost optimization implementation

import hashlib
import redis
from functools import wraps
from datetime import datetime, timedelta

class BedrockCostOptimizer:
    """Optimize Bedrock costs with caching and rate limiting."""
    
    def __init__(self):
        self.redis_client = redis.Redis(host='localhost', decode_responses=True)
        self.cost_per_1k_input = 0.003
        self.cost_per_1k_output = 0.015
    
    def cache_response(self, ttl: int = 3600):
        """Cache responses to reduce costs."""
        def decorator(func):
            @wraps(func)
            def wrapper(prompt: str, *args, **kwargs):
                # Generate cache key
                cache_key = f"bedrock:{hashlib.sha256(prompt.encode()).hexdigest()}"
                
                # Check cache
                cached = self.redis_client.get(cache_key)
                if cached:
                    print(f"Cache hit! Saved ~\$0.01")
                    return json.loads(cached)
                
                # Call Bedrock
                response = func(prompt, *args, **kwargs)
                
                # Cache result
                self.redis_client.setex(
                    cache_key,
                    ttl,
                    json.dumps(response)
                )
                
                return response
            return wrapper
        return decorator
    
    def rate_limit(self, max_requests: int, window_seconds: int):
        """Rate limit to prevent cost overruns."""
        def decorator(func):
            @wraps(func)
            def wrapper(user_id: str, *args, **kwargs):
                key = f"rate_limit:{user_id}:{datetime.now().strftime('%Y%m%d%H%M')}"
                
                # Increment counter
                count = self.redis_client.incr(key)
                self.redis_client.expire(key, window_seconds)
                
                if count > max_requests:
                    raise Exception(
                        f"Rate limit exceeded: {max_requests} requests per {window_seconds}s"
                    )
                
                return func(*args, **kwargs)
            return wrapper
        return decorator
    
    def calculate_cost(self, input_tokens: int, output_tokens: int) -> float:
        """Calculate cost for a request."""
        input_cost = (input_tokens / 1000) * self.cost_per_1k_input
        output_cost = (output_tokens / 1000) * self.cost_per_1k_output
        return input_cost + output_cost
    
    def track_usage(self, user_id: str, input_tokens: int, output_tokens: int):
        """Track usage and costs per user."""
        cost = self.calculate_cost(input_tokens, output_tokens)
        
        # Track daily usage
        date_key = datetime.now().strftime('%Y-%m-%d')
        usage_key = f"usage:{user_id}:{date_key}"
        
        self.redis_client.hincrby(usage_key, "requests", 1)
        self.redis_client.hincrby(usage_key, "input_tokens", input_tokens)
        self.redis_client.hincrby(usage_key, "output_tokens", output_tokens)
        self.redis_client.hincrbyfloat(usage_key, "cost", cost)
        self.redis_client.expire(usage_key, 86400 * 90)  # 90 days
        
        return cost

# Usage example
optimizer = BedrockCostOptimizer()

@optimizer.cache_response(ttl=3600)
@optimizer.rate_limit(max_requests=100, window_seconds=60)
def generate_with_optimization(user_id: str, prompt: str):
    """Generate response with caching and rate limiting."""
    client = BedrockLLMClient()
    
    response = client.generate_response(
        prompt=prompt,
        max_tokens=500  # Limit output to reduce cost
    )
    
    if response['success']:
        # Track usage
        cost = optimizer.track_usage(
            user_id=user_id,
            input_tokens=response['usage']['input_tokens'],
            output_tokens=response['usage']['output_tokens']
        )
        print(f"Request cost: ${round(cost, 4)}")
    
    return response

# Cost monitoring dashboard query
def get_user_costs(user_id: str, days: int = 30):
    """Get user costs over time."""
    costs = []
    for i in range(days):
        date = (datetime.now() - timedelta(days=i)).strftime('%Y-%m-%d')
        usage_key = f"usage:{user_id}:{date}"
        
        data = redis_client.hgetall(usage_key)
        if data:
            costs.append({
                'date': date,
                'requests': int(data.get('requests', 0)),
                'cost': float(data.get('cost', 0))
            })
    
    return costs

# Alert on high spend
def check_budget_alert(user_id: str, daily_budget: float = 100.0):
    """Alert if daily budget exceeded."""
    today = datetime.now().strftime('%Y-%m-%d')
    usage_key = f"usage:{user_id}:{today}"
    
    cost_today = float(redis_client.hget(usage_key, "cost") or 0)
    
    if cost_today > daily_budget:
        # Send alert (Slack, email, etc.)
        send_alert(
            f"⚠️ Budget Alert: User {user_id} spent ${round(cost_today, 2)} today "
            f"(budget: ${round(daily_budget, 2)})"
        )

Monitoring and Observability

Production LLM deployments require comprehensive monitoring to ensure reliability and catch issues early.

Key Metrics to Track:

  • Latency (p50, p95, p99)
  • Tokens per second
  • Time to first token (TTFT)
  • Request success rate
  • Cost per request
  • Daily/monthly spend
  • Cost by user/tenant
  • Token usage trends
  • Response quality scores
  • User feedback ratings
  • Error rates by type
  • Prompt injection attempts
  • Lambda cold starts
  • API Gateway errors
  • Bedrock throttling
  • Downstream service health
  • Metrics: CloudWatch + Datadog
  • Logging: CloudWatch Logs + ELK
  • Tracing: AWS X-Ray
  • Alerting: PagerDuty + Slack
  • P99 latency > 10s → Warning
  • Success rate < 99% → Critical
  • Daily cost > $1,000 → Warning
  • Error rate > 1% → Warning
Python
# Comprehensive monitoring implementation

import time
import logging
from datadog import statsd
from aws_xray_sdk.core import xray_recorder
from typing import Dict, Any

class BedrockMonitoring:
    """Monitoring and observability for Bedrock deployments."""
    
    def __init__(self, service_name: str = "bedrock-llm"):
        self.service_name = service_name
        self.logger = logging.getLogger(service_name)
    
    def track_request(self, func):
        """Decorator to track all request metrics."""
        def wrapper(*args, **kwargs):
            start_time = time.time()
            
            # Start X-Ray trace
            with xray_recorder.capture(f'{self.service_name}.{func.__name__}'):
                try:
                    result = func(*args, **kwargs)
                    
                    # Track success
                    duration = time.time() - start_time
                    self._log_success(func.__name__, duration, result)
                    
                    return result
                    
                except Exception as e:
                    # Track failure
                    duration = time.time() - start_time
                    self._log_error(func.__name__, duration, str(e))
                    raise
        
        return wrapper
    
    def _log_success(self, operation: str, duration: float, result: Dict):
        """Log successful request."""
        # Send metrics to Datadog
        statsd.increment(f'{self.service_name}.requests', tags=[
            f'operation:{operation}',
            'status:success'
        ])
        
        statsd.histogram(f'{self.service_name}.latency', duration, tags=[
            f'operation:{operation}'
        ])
        
        if 'usage' in result:
            statsd.histogram(
                f'{self.service_name}.tokens.input',
                result['usage']['input_tokens']
            )
            statsd.histogram(
                f'{self.service_name}.tokens.output',
                result['usage']['output_tokens']
            )
        
        # Structured logging
        self.logger.info({
            'event': 'bedrock_request_success',
            'operation': operation,
            'duration_seconds': duration,
            'input_tokens': result.get('usage', {}).get('input_tokens'),
            'output_tokens': result.get('usage', {}).get('output_tokens'),
        })
    
    def _log_error(self, operation: str, duration: float, error: str):
        """Log failed request."""
        statsd.increment(f'{self.service_name}.requests', tags=[
            f'operation:{operation}',
            'status:error'
        ])
        
        self.logger.error({
            'event': 'bedrock_request_error',
            'operation': operation,
            'duration_seconds': duration,
            'error': error
        })
    
    def check_health(self) -> Dict[str, Any]:
        """Health check endpoint."""
        try:
            # Test Bedrock connectivity
            client = BedrockLLMClient()
            test_response = client.generate_response(
                prompt="Say 'OK' if you're working",
                max_tokens=10
            )
            
            if test_response['success']:
                return {
                    'status': 'healthy',
                    'service': self.service_name,
                    'timestamp': datetime.now().isoformat(),
                    'checks': {
                        'bedrock': 'ok',
                        'latency_ms': 250  # Example
                    }
                }
            else:
                return {
                    'status': 'unhealthy',
                    'service': self.service_name,
                    'error': test_response.get('error')
                }
        except Exception as e:
            return {
                'status': 'unhealthy',
                'service': self.service_name,
                'error': str(e)
            }

# Usage with FastAPI endpoint
from fastapi import FastAPI

app = FastAPI()
monitoring = BedrockMonitoring()

@app.get("/health")
async def health_check():
    """Health check endpoint."""
    return monitoring.check_health()

@app.post("/generate")
@monitoring.track_request
async def generate_text(prompt: str, user_id: str):
    """Generate text with full monitoring."""
    
    # Add custom trace metadata
    xray_recorder.put_annotation('user_id', user_id)
    xray_recorder.put_metadata('prompt_length', len(prompt))
    
    client = BedrockLLMClient()
    response = client.generate_response(prompt)
    
    return response

# CloudWatch Custom Metrics
import boto3

cloudwatch = boto3.client('cloudwatch')

def publish_custom_metrics(
    cost: float,
    tokens: int,
    latency: float
):
    """Publish custom metrics to CloudWatch."""
    cloudwatch.put_metric_data(
        Namespace='BedrockLLM/Production',
        MetricData=[
            {
                'MetricName': 'RequestCost',
                'Value': cost,
                'Unit': 'None',
                'Timestamp': datetime.now()
            },
            {
                'MetricName': 'TokensProcessed',
                'Value': tokens,
                'Unit': 'Count',
                'Timestamp': datetime.now()
            },
            {
                'MetricName': 'Latency',
                'Value': latency,
                'Unit': 'Milliseconds',
                'Timestamp': datetime.now(),
                'StatisticValues': {
                    'SampleCount': 1,
                    'Sum': latency,
                    'Minimum': latency,
                    'Maximum': latency
                }
            }
        ]
    )

# Alert on anomalies
def check_anomalies():
    """Check for anomalous patterns."""
    # Get recent metrics
    response = cloudwatch.get_metric_statistics(
        Namespace='BedrockLLM/Production',
        MetricName='Latency',
        StartTime=datetime.now() - timedelta(hours=1),
        EndTime=datetime.now(),
        Period=300,  # 5 minutes
        Statistics=['Average', 'Maximum']
    )
    
    for datapoint in response['Datapoints']:
        if datapoint['Maximum'] > 10000:  # 10s threshold
            send_alert(
                f"⚠️ High latency detected: {datapoint['Maximum']}ms "
                f"at {datapoint['Timestamp']}"
            )

Conclusion: Production-Ready LLM Deployment

AWS Bedrock has democratized enterprise LLM deployment, removing the infrastructure complexity while maintaining production-grade reliability.

Key Takeaways:

1. Serverless Simplicity: No infrastructure management, automatic scaling, pay-per-use pricing

2. Enterprise Security: Built-in encryption, VPC isolation, IAM integration, audit logging

3. Cost Optimization: Caching reduces costs by 90%+, rate limiting prevents bill shock, usage tracking enables chargeback

4. Monitoring is Critical: Track latency, costs, quality, and system health from day one

  • - Cost monitoring and alerts
  • - Rate limiting per user/tenant
  • - Response caching strategy
  • - Error handling and retries
  • - Security scanning (prompt injection)
  • - Comprehensive logging
  • Need Claude/other foundation models
  • Want serverless deployment
  • Require enterprise compliance
  • Rapid time-to-market
  • Very high volume (>10M requests/day)
  • Need custom models
  • Latency <50ms requirement
  • Cost optimization at scale

AWS Bedrock excels for most enterprise use cases-fast deployment, reliable operation, and predictable costs. For the 5% of applications with extreme requirements, consider self-hosted alternatives.

Next Steps: Start with Bedrock's free tier, build your MVP, then optimize based on real usage patterns.

AWSBedrockLLMAIProduction

Need Expert Help?

Our team has extensive experience implementing solutions like this. Let's discuss your project.