AI in Production: Cost and Performance Optimization (Part 3/4)
This is the third part of our series on AI in production. If you haven’t read the previous parts, you can find them here: Part 1 | Part 2 |
In this installment, we focus on two critical aspects for AI success in production: cost optimization and performance. These optimizations can mean the difference between a viable project and one that consumes budget without control.
Cost Optimization Strategies
Dynamic Model Selection
The key to controlling costs is using the right model for each task. Not all queries require the most powerful model.
from dataclasses import dataclass
from typing import List, Dict, Any, Optional
import tiktoken
import re
@dataclass
class ModelConfig:
"""Model configuration with cost metrics"""
name: str
cost_per_token_input: float
cost_per_token_output: float
max_tokens: int
performance_tier: int # 1=economy, 2=standard, 3=premium
def calculate_cost(self, input_tokens: int, output_tokens: int) -> float:
"""Calculate estimated cost"""
return (
input_tokens * self.cost_per_token_input +
output_tokens * self.cost_per_token_output
)
class CostOptimizer:
"""Intelligent model selection based on complexity and cost"""
def __init__(self):
self.models = {
"economy": ModelConfig(
name="gpt-3.5-turbo-2025-05-13",
cost_per_token_input=0.0005 / 1000,
cost_per_token_output=0.0015 / 1000,
max_tokens=4096,
performance_tier=1
),
"standard": ModelConfig(
name="gpt-4o-mini-2025-05-13",
cost_per_token_input=0.00015 / 1000,
cost_per_token_output=0.0006 / 1000,
max_tokens=8192,
performance_tier=2
),
"premium": ModelConfig(
name="gpt-4o-2025-05-13",
cost_per_token_input=0.0025 / 1000,
cost_per_token_output=0.01 / 1000,
max_tokens=8192,
performance_tier=3
)
}
# Patterns for complexity classification
self.complexity_patterns = {
"high": [
r"analyze in depth",
r"detailed explanation",
r"complex reasoning",
r"multiple perspectives",
r"critical analysis"
],
"medium": [
r"explain briefly",
r"summarize in",
r"compare.*with",
r"pros and cons",
r"advantages.*disadvantages"
],
"low": [
r"translate.*to",
r"correct.*grammar",
r"format.*as",
r"extract.*information",
r"convert.*to"
]
}
def select_optimal_model(
self,
prompt: str,
user_context: Optional[Dict] = None,
budget_constraint: Optional[float] = None
) -> str:
"""Select optimal model based on complexity and constraints"""
# 1. Analyze prompt complexity
complexity = self._analyze_prompt_complexity(prompt)
# 2. Estimate tokens
estimated_input_tokens = self._estimate_tokens(prompt)
estimated_output_tokens = self._estimate_output_tokens(prompt, complexity)
# 3. Consider user context
user_tier = user_context.get("tier", "standard") if user_context else "standard"
# 4. Select model based on criteria
selected_model = self._select_model_by_criteria(
complexity,
estimated_input_tokens,
estimated_output_tokens,
user_tier,
budget_constraint
)
return selected_model
def _analyze_prompt_complexity(self, prompt: str) -> str:
"""Analyze prompt complexity using patterns"""
prompt_lower = prompt.lower()
# Check high complexity patterns
for pattern in self.complexity_patterns["high"]:
if re.search(pattern, prompt_lower):
return "high"
# Check medium complexity patterns
for pattern in self.complexity_patterns["medium"]:
if re.search(pattern, prompt_lower):
return "medium"
# Check low complexity patterns
for pattern in self.complexity_patterns["low"]:
if re.search(pattern, prompt_lower):
return "low"
# Additional heuristics
if len(prompt) > 1000:
return "high"
elif len(prompt) > 300:
return "medium"
else:
return "low"
def _estimate_tokens(self, text: str) -> int:
"""Estimate number of tokens using tiktoken"""
try:
encoding = tiktoken.encoding_for_model("gpt-4")
return len(encoding.encode(text))
except Exception:
# Fallback estimation: ~4 characters per token
return len(text) // 4
def _estimate_output_tokens(self, prompt: str, complexity: str) -> int:
"""Estimate output tokens based on prompt and complexity"""
base_estimates = {
"low": 50,
"medium": 200,
"premium": 500
}
base = base_estimates.get(complexity, 200)
# Adjust based on indicators in the prompt
if "detailed" in prompt.lower() or "complete explanation" in prompt.lower():
base *= 2
elif "brief" in prompt.lower() or "summarize" in prompt.lower():
base = int(base * 0.5)
return base
def _select_model_by_criteria(
self,
complexity: str,
input_tokens: int,
output_tokens: int,
user_tier: str,
budget_constraint: Optional[float]
) -> str:
"""Select model based on multiple criteria"""
# Complexity to preferred model mapping
complexity_to_model = {
"low": "economy",
"medium": "standard",
"high": "premium"
}
preferred_model = complexity_to_model[complexity]
# Check user restrictions
if user_tier == "basic" and preferred_model == "premium":
preferred_model = "standard"
elif user_tier == "economy":
preferred_model = "economy"
# Check budget constraint
if budget_constraint:
model_config = self.models[preferred_model]
estimated_cost = model_config.calculate_cost(input_tokens, output_tokens)
if estimated_cost > budget_constraint:
# Downgrade to more economical model
if preferred_model == "premium":
preferred_model = "standard"
elif preferred_model == "standard":
preferred_model = "economy"
return self.models[preferred_model].name
def estimate_cost(
self,
model_tier: str,
input_tokens: int,
output_tokens: int
) -> float:
"""Estimate cost for a specific configuration"""
if model_tier not in self.models:
raise ValueError(f"Unknown model tier: {model_tier}")
return self.models[model_tier].calculate_cost(input_tokens, output_tokens)
class CostTracker:
"""AI API cost tracker and alerts"""
def __init__(self, monthly_budget: float = 1000.0):
self.monthly_budget = monthly_budget
self.daily_budget = monthly_budget / 30
# Metrics storage (use database in production)
self.daily_costs = {}
self.monthly_costs = {}
self.user_costs = {}
self.endpoint_costs = {}
# Alert configuration
self.alert_thresholds = {
"daily": 0.8, # 80% of daily budget
"monthly": 0.9, # 90% of monthly budget
"user": 50.0 # $50 per user per day
}
def track_api_call(
self,
user_id: str,
endpoint: str,
model: str,
input_tokens: int,
output_tokens: int,
cost: float,
timestamp: Optional[datetime] = None
):
"""Register API call for cost tracking"""
if not timestamp:
timestamp = datetime.now()
date_key = timestamp.strftime("%Y-%m-%d")
month_key = timestamp.strftime("%Y-%m")
# Update daily costs
if date_key not in self.daily_costs:
self.daily_costs[date_key] = 0
self.daily_costs[date_key] += cost
# Update monthly costs
if month_key not in self.monthly_costs:
self.monthly_costs[month_key] = 0
self.monthly_costs[month_key] += cost
# Update costs per user
user_key = f"{user_id}:{date_key}"
if user_key not in self.user_costs:
self.user_costs[user_key] = 0
self.user_costs[user_key] += cost
# Update costs per endpoint
endpoint_key = f"{endpoint}:{date_key}"
if endpoint_key not in self.endpoint_costs:
self.endpoint_costs[endpoint_key] = 0
self.endpoint_costs[endpoint_key] += cost
# Check alerts
self._check_budget_alerts(date_key, month_key, user_id, cost)
def _check_budget_alerts(
self,
date_key: str,
month_key: str,
user_id: str,
cost: float
):
"""Check and send budget alerts"""
# Daily budget alert
daily_spent = self.daily_costs[date_key]
if daily_spent >= self.daily_budget * self.alert_thresholds["daily"]:
self._send_alert(
"daily_budget",
f"Daily budget: {daily_spent:.2f}/${self.daily_budget:.2f}"
)
# Monthly budget alert
monthly_spent = self.monthly_costs[month_key]
if monthly_spent >= self.monthly_budget * self.alert_thresholds["monthly"]:
self._send_alert(
"monthly_budget",
f"Monthly budget: {monthly_spent:.2f}/${self.monthly_budget:.2f}"
)
# User alert
user_key = f"{user_id}:{date_key}"
user_daily_spent = self.user_costs[user_key]
if user_daily_spent >= self.alert_thresholds["user"]:
self._send_alert(
"user_budget",
f"User {user_id}: ${user_daily_spent:.2f} spent today"
)
def _send_alert(self, alert_type: str, message: str):
"""Send alert (implement according to notification system)"""
logger.warning(f"COST ALERT [{alert_type}]: {message}")
# In production, integrate with:
# - Slack/Teams notifications
# - Email alerts
# - PagerDuty for critical alerts
# - Dashboard updates
def get_cost_summary(self, period: str = "daily") -> Dict[str, Any]:
"""Get cost summary"""
if period == "daily":
today = datetime.now().strftime("%Y-%m-%d")
return {
"period": "daily",
"date": today,
"total_cost": self.daily_costs.get(today, 0),
"budget": self.daily_budget,
"budget_used_pct": (self.daily_costs.get(today, 0) / self.daily_budget) * 100
}
elif period == "monthly":
this_month = datetime.now().strftime("%Y-%m")
return {
"period": "monthly",
"month": this_month,
"total_cost": self.monthly_costs.get(this_month, 0),
"budget": self.monthly_budget,
"budget_used_pct": (self.monthly_costs.get(this_month, 0) / self.monthly_budget) * 100
}
Intelligent Caching Strategy
A well-designed cache system can reduce AI costs by 60-80% while improving latency.
import redis
import hashlib
from typing import Optional, Any, List, Dict
import pickle
import json
import time
from datetime import datetime, timedelta
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
class IntelligentCache:
"""Multi-level cache with semantic similarity matching"""
def __init__(
self,
redis_client: redis.Redis,
similarity_threshold: float = 0.85,
max_cache_size: int = 10000
):
self.redis = redis_client
self.similarity_threshold = similarity_threshold
self.max_cache_size = max_cache_size
# TTL configuration by content type
self.ttl_config = {
"static": 86400 * 7, # 7 days for static content
"dynamic": 3600, # 1 hour for dynamic content
"user_specific": 1800, # 30 minutes for user-specific content
"realtime": 300 # 5 minutes for real-time content
}
# Vectorizer for semantic similarity
self.vectorizer = TfidfVectorizer(
max_features=1000,
stop_words='english',
lowercase=True
)
# Local cache for similarity
self.similarity_cache = {}
self.prompt_vectors = {}
def _generate_cache_key(self, prompt: str, **kwargs) -> str:
"""Generate deterministic cache key"""
# Include relevant parameters in the key
key_data = {
"prompt": prompt.strip().lower(),
"model": kwargs.get("model", ""),
"temperature": kwargs.get("temperature", 0.3),
"max_tokens": kwargs.get("max_tokens", 100)
}
key_string = json.dumps(key_data, sort_keys=True)
return hashlib.sha256(key_string.encode()).hexdigest()
async def get(self, prompt: str, **kwargs) -> Optional[Dict[str, Any]]:
"""Search cache with exact and semantic matching"""
# 1. Exact search
exact_key = self._generate_cache_key(prompt, **kwargs)
cached_result = self.redis.get(f"exact:{exact_key}")
if cached_result:
try:
data = json.loads(cached_result)
data["cache_type"] = "exact"
data["cache_hit"] = True
return data
except json.JSONDecodeError:
pass
# 2. Semantic similarity search
similar_result = await self._find_similar_cached_result(prompt, **kwargs)
if similar_result:
similar_result["cache_type"] = "semantic"
similar_result["cache_hit"] = True
return similar_result
return None
async def set(
self,
prompt: str,
result: str,
content_type: str = "dynamic",
metadata: Optional[Dict] = None,
**kwargs
):
"""Store result in cache with metadata"""
cache_key = self._generate_cache_key(prompt, **kwargs)
cache_data = {
"result": result,
"prompt": prompt,
"timestamp": time.time(),
"content_type": content_type,
"metadata": metadata or {},
"model": kwargs.get("model", ""),
"parameters": {
"temperature": kwargs.get("temperature", 0.3),
"max_tokens": kwargs.get("max_tokens", 100)
}
}
# Store with appropriate TTL
ttl = self.ttl_config.get(content_type, self.ttl_config["dynamic"])
try:
self.redis.setex(
f"exact:{cache_key}",
ttl,
json.dumps(cache_data)
)
# Update similarity index
await self._update_similarity_index(prompt, cache_key, result)
except Exception as e:
logger.error(f"Error storing in cache: {e}")
async def _find_similar_cached_result(
self,
prompt: str,
**kwargs
) -> Optional[Dict[str, Any]]:
"""Find similar result using semantic analysis"""
try:
# Get similar prompt keys
similar_keys = await self._find_similar_prompts(prompt)
for similar_key, similarity_score in similar_keys:
if similarity_score >= self.similarity_threshold:
cached_result = self.redis.get(f"exact:{similar_key}")
if cached_result:
try:
data = json.loads(cached_result)
data["similarity_score"] = similarity_score
return data
except json.JSONDecodeError:
continue
return None
except Exception as e:
logger.error(f"Error in semantic search: {e}")
return None
async def _find_similar_prompts(self, prompt: str) -> List[tuple]:
"""Find similar prompts using TF-IDF and cosine similarity"""
# Get all prompts in cache (in production, use a more efficient index)
pattern = "exact:*"
cached_keys = self.redis.keys(pattern)
if not cached_keys:
return []
# Extract prompts from cached keys
cached_prompts = []
valid_keys = []
for key in cached_keys[:100]: # Limit for performance
try:
cached_data = self.redis.get(key)
if cached_data:
data = json.loads(cached_data)
cached_prompts.append(data["prompt"])
valid_keys.append(key.decode().replace("exact:", ""))
except Exception:
continue
if not cached_prompts:
return []
# Calculate similarity using TF-IDF
try:
all_prompts = cached_prompts + [prompt]
tfidf_matrix = self.vectorizer.fit_transform(all_prompts)
# Calculate cosine similarity
query_vector = tfidf_matrix[-1] # The last one is our prompt
similarities = cosine_similarity(query_vector, tfidf_matrix[:-1]).flatten()
# Sort by similarity and return top results
similar_indices = similarities.argsort()[-10:][::-1] # Top 10
results = []
for idx in similar_indices:
if similarities[idx] > 0.1: # Minimum threshold
results.append((valid_keys[idx], similarities[idx]))
return results
except Exception as e:
logger.error(f"Error calculating similarity: {e}")
return []
async def _update_similarity_index(self, prompt: str, cache_key: str, result: str):
"""Update similarity index (simplified implementation)"""
# In production, use a more sophisticated vector index like:
# - Elasticsearch with dense vector fields
# - Pinecone
# - Weaviate
# - FAISS
pass
def invalidate_pattern(self, pattern: str):
"""Invalidate cache entries matching pattern"""
keys = self.redis.keys(f"exact:*{pattern}*")
if keys:
self.redis.delete(*keys)
def get_cache_stats(self) -> Dict[str, Any]:
"""Get cache statistics"""
info = self.redis.info()
# Count keys by type
exact_keys = len(self.redis.keys("exact:*"))
return {
"total_keys": exact_keys,
"memory_usage": info.get("used_memory_human", "unknown"),
"hit_rate": info.get("keyspace_hits", 0) / max(1, info.get("keyspace_hits", 0) + info.get("keyspace_misses", 0)),
"connected_clients": info.get("connected_clients", 0)
}
# Distributed cache with multiple levels
class MultiTierCache:
"""Multi-level cache system: local memory, Redis, and persistent"""
def __init__(self, redis_client: redis.Redis):
self.local_cache = {} # Local memory cache
self.intelligent_cache = IntelligentCache(redis_client)
self.max_local_size = 100
async def get(self, prompt: str, **kwargs) -> Optional[Dict[str, Any]]:
"""Multi-level cache search"""
cache_key = self.intelligent_cache._generate_cache_key(prompt, **kwargs)
# Level 1: Local memory cache
if cache_key in self.local_cache:
result = self.local_cache[cache_key]
result["cache_level"] = "local"
return result
# Level 2: Intelligent Redis
redis_result = await self.intelligent_cache.get(prompt, **kwargs)
if redis_result:
# Promote to local cache
self._promote_to_local(cache_key, redis_result)
redis_result["cache_level"] = "redis"
return redis_result
# Level 3: Persistent cache (database)
# In production, implement database search
return None
async def set(self, prompt: str, result: str, **kwargs):
"""Store at multiple levels"""
cache_key = self.intelligent_cache._generate_cache_key(prompt, **kwargs)
cache_data = {
"result": result,
"timestamp": time.time(),
"access_count": 1
}
# Store at all levels
self._promote_to_local(cache_key, cache_data)
await self.intelligent_cache.set(prompt, result, **kwargs)
def _promote_to_local(self, key: str, data: Dict[str, Any]):
"""Promote entry to local cache with LRU eviction"""
if len(self.local_cache) >= self.max_local_size:
# Remove oldest entry (simple LRU)
oldest_key = min(
self.local_cache.keys(),
key=lambda k: self.local_cache[k].get("timestamp", 0)
)
del self.local_cache[oldest_key]
self.local_cache[key] = data.copy()
Performance Optimization
Response Time Optimization
import asyncio
from concurrent.futures import ThreadPoolExecutor
from typing import List, Dict, Any, Optional
import numpy as np
import time
class PerformanceOptimizer:
"""Optimize AI response times for production workloads"""
def __init__(self):
self.thread_pool = ThreadPoolExecutor(max_workers=10)
self.request_metrics = []
self.warmup_cache = {}
async def optimize_single_request(
self,
prompt: str,
client,
**kwargs
) -> Dict[str, Any]:
"""Optimize a single request with multiple techniques"""
start_time = time.time()
# 1. Optimized prompt preparation
optimized_prompt = self._optimize_prompt_structure(prompt)
# 2. Parameters optimized for speed
optimized_params = self._optimize_parameters_for_speed(kwargs)
# 3. Execute with optimized timeout
try:
response = await asyncio.wait_for(
client.chat.completions.create(
model=optimized_params["model"],
messages=[{"role": "user", "content": optimized_prompt}],
**optimized_params
),
timeout=optimized_params.get("timeout", 15.0)
)
duration = time.time() - start_time
# Record metrics
self.request_metrics.append({
"duration": duration,
"tokens": response.usage.total_tokens,
"model": optimized_params["model"],
"timestamp": time.time()
})
return {
"content": response.choices[0].message.content,
"duration": duration,
"tokens_used": response.usage.total_tokens,
"optimization_applied": True
}
except asyncio.TimeoutError:
# Fallback to faster model
return await self._fast_fallback(optimized_prompt, client)
def _optimize_prompt_structure(self, prompt: str) -> str:
"""Optimize prompt structure for faster response"""
# Optimization techniques:
# 1. Remove redundant text
# 2. Use more direct instructions
# 3. Limit response scope
if len(prompt) > 1000:
# For long prompts, add brevity instruction
return f"{prompt}\n\nRespond concisely and directly."
return prompt
def _optimize_parameters_for_speed(self, kwargs: Dict[str, Any]) -> Dict[str, Any]:
"""Optimize parameters for maximum speed"""
optimized = kwargs.copy()
# Parameters for speed
optimized.update({
"max_tokens": min(kwargs.get("max_tokens", 150), 150), # Limit response
"temperature": 0.1, # Lower temperature = faster response
"top_p": 0.8, # Focus on most probable tokens
"timeout": 15.0 # Aggressive timeout
})
# Select faster model if not specified
if "model" not in optimized:
optimized["model"] = "gpt-3.5-turbo" # Faster model
return optimized
async def _fast_fallback(self, prompt: str, client) -> Dict[str, Any]:
"""Ultra-fast fallback when timeout occurs"""
try:
# Ultra-simplified prompt
simple_prompt = prompt[:200] + "... (brief response)"
response = await asyncio.wait_for(
client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": simple_prompt}],
max_tokens=50,
temperature=0
),
timeout=5.0
)
return {
"content": response.choices[0].message.content + " [response shortened due to timeout]",
"duration": 5.0,
"tokens_used": response.usage.total_tokens,
"fallback_used": True
}
except Exception:
return {
"content": "Response unavailable due to service timeout",
"duration": 5.0,
"tokens_used": 0,
"error_fallback": True
}
def get_performance_stats(self) -> Dict[str, Any]:
"""Get performance statistics"""
if not self.request_metrics:
return {"error": "No metrics available"}
durations = [m["duration"] for m in self.request_metrics[-100:]] # Last 100
return {
"avg_duration": np.mean(durations),
"p95_duration": np.percentile(durations, 95),
"p99_duration": np.percentile(durations, 99),
"total_requests": len(self.request_metrics),
"requests_per_minute": len([m for m in self.request_metrics
if time.time() - m["timestamp"] < 60])
}
class StreamingOptimizer:
"""Optimize streaming responses for better UX"""
def __init__(self):
self.chunk_buffer = []
self.buffer_size = 50 # Characters before sending chunk
async def optimized_streaming_response(
self,
prompt: str,
client,
**kwargs
):
"""Generate optimized streaming response"""
try:
response = await client.chat.completions.create(
model=kwargs.get("model", "gpt-4o-mini"),
messages=[{"role": "user", "content": prompt}],
stream=True,
**kwargs
)
buffer = ""
async for chunk in response:
if chunk.choices and chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
buffer += content
# Send chunks when buffer reaches target size
if len(buffer) >= self.buffer_size or content.endswith(('.', '!', '?', '\n')):
yield {
"type": "content",
"data": buffer,
"timestamp": time.time()
}
buffer = ""
# Send last chunk if content remains
if buffer:
yield {
"type": "content",
"data": buffer,
"timestamp": time.time()
}
# Completion signal
yield {
"type": "done",
"data": "",
"timestamp": time.time()
}
except Exception as e:
yield {
"type": "error",
"data": str(e),
"timestamp": time.time()
}
class PrecomputeCache:
"""Precompute common queries during low-traffic hours"""
def __init__(self, cache: IntelligentCache):
self.cache = cache
self.common_patterns = [
"What is {topic}?",
"Explain {concept} in simple terms",
"Advantages and disadvantages of {item}",
"How does {technology} work",
"Step-by-step guide for {task}"
]
async def precompute_common_queries(self, topics: List[str], client):
"""Precompute responses for common queries"""
for topic in topics:
for pattern in self.common_patterns:
prompt = pattern.format(topic=topic, concept=topic, item=topic,
technology=topic, task=topic)
# Check if already cached
cached = await self.cache.get(prompt, model="gpt-4o-mini")
if not cached:
try:
# Generate and cache response
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
max_tokens=200
)
await self.cache.set(
prompt,
response.choices[0].message.content,
content_type="static", # Long TTL for precomputed content
model="gpt-4o-mini"
)
# Small pause to not overload the API
await asyncio.sleep(0.5)
except Exception as e:
logger.error(f"Error precomputing {prompt}: {e}")
continue
Real-time Cost Monitoring
from fastapi import FastAPI, Depends, HTTPException
from fastapi.responses import JSONResponse
import time
app = FastAPI()
# Global instances (use dependency injection in production)
cost_optimizer = CostOptimizer()
cost_tracker = CostTracker(monthly_budget=2000.0)
intelligent_cache = IntelligentCache(redis.Redis())
performance_optimizer = PerformanceOptimizer()
@app.post("/api/v1/generate")
async def generate_with_optimization(
request: AIRequest,
user_id: str = "default"
):
"""Optimized endpoint with complete cost and performance tracking"""
start_time = time.time()
# 1. Check cache first
cached_result = await intelligent_cache.get(
request.prompt,
model=request.model,
max_tokens=request.max_tokens,
temperature=request.temperature
)
if cached_result:
return JSONResponse({
"content": cached_result["result"],
"cached": True,
"cache_type": cached_result.get("cache_type", "exact"),
"cost": 0.0,
"duration": time.time() - start_time
})
# 2. Select optimal model
optimal_model = cost_optimizer.select_optimal_model(
request.prompt,
user_context={"tier": "standard"},
budget_constraint=None
)
# 3. Estimate cost before call
estimated_input_tokens = cost_optimizer._estimate_tokens(request.prompt)
estimated_output_tokens = cost_optimizer._estimate_output_tokens(
request.prompt,
cost_optimizer._analyze_prompt_complexity(request.prompt)
)
# 4. Perform optimized call
try:
result = await performance_optimizer.optimize_single_request(
request.prompt,
openai.AsyncOpenAI(),
model=optimal_model,
max_tokens=request.max_tokens,
temperature=request.temperature
)
# 5. Calculate actual cost and register
actual_cost = cost_optimizer.estimate_cost(
"standard", # Map model to tier
estimated_input_tokens,
result["tokens_used"]
)
cost_tracker.track_api_call(
user_id=user_id,
endpoint="/api/v1/generate",
model=optimal_model,
input_tokens=estimated_input_tokens,
output_tokens=result["tokens_used"],
cost=actual_cost
)
# 6. Cache result for future queries
await intelligent_cache.set(
request.prompt,
result["content"],
content_type="dynamic",
metadata={"cost": actual_cost, "model": optimal_model},
model=optimal_model,
max_tokens=request.max_tokens,
temperature=request.temperature
)
return JSONResponse({
"content": result["content"],
"cached": False,
"model_used": optimal_model,
"cost": actual_cost,
"duration": result["duration"],
"tokens_used": result["tokens_used"],
"optimization_applied": result.get("optimization_applied", False)
})
except Exception as e:
logger.error(f"Error in optimized generation: {e}")
raise HTTPException(status_code=503, detail=str(e))
@app.get("/api/v1/metrics/costs")
async def get_cost_metrics():
"""Get real-time cost metrics"""
return {
"daily_summary": cost_tracker.get_cost_summary("daily"),
"monthly_summary": cost_tracker.get_cost_summary("monthly"),
"cache_stats": intelligent_cache.get_cache_stats(),
"performance_stats": performance_optimizer.get_performance_stats()
}
Next in the Series
In Part 4 (final), we’ll cover:
- Advanced Monitoring and Observability: Metrics, alerts, and drift detection
- Security and Compliance: PII protection, auditing, and compliance
- Advanced Deployment: Blue-green deployment, canary releases, and A/B testing
Key Takeaways from this Part
✅ Cost Optimization: Intelligent model selection can reduce costs by 40-60%
✅ Intelligent Caching: A multi-level cache system improves performance and reduces costs
✅ Real-time Monitoring: Cost tracking prevents billing surprises
✅ Performance Optimization: Specific techniques can achieve sub-second responses
Did you find this optimization useful? Part 4 will close the series with advanced deployment and operations.
Remember: In production, every millisecond and every cent counts. Optimization is not optional—it’s the difference between a viable product and one that doesn’t scale.