9 Essential Components of a Production Microservice App¶
So you've built a microservice. It works on your laptop. Now your manager says "take it to production" — and suddenly you realize a working service and a production-ready service are two very different things.
This guide walks through all 9 essential components that every production microservice system must have, using the architecture diagram below as our map. By the end, you'll understand not just what each component is, but why it exists, how it works, and how to implement it in real code.
Clients (web, mobile, 3rd-party)
│
┌───────▼────────┐
│ 1. API │
│ Gateway │◄── 2. Service Registry
└───────┬────────┘
│
┌────────────────┼────────────────┐
▼ ▼ ▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Service A │ │ Service B │ │ Service C │ ← 3. Service Layer
└──────┬──────┘ └──────┬──────┘ └──────┬──────┘
│ ╔═══════════╗ │ │
└──║ 7. Msg ║─┘ 4. Authorization
║ Queue ║ Server
╚═════╤═════╝
│
┌─────────▼──────────┐
│ Load Balancer │◄── 6. Distributed Cache (Redis)
└─────────┬──────────┘
│
┌─────────▼──────────┐
│ 5. Database Layer │
│ (Primary + Read │
│ Replicas) │
└────────────────────┘
8. Metrics: Prometheus → Grafana
9. Logs: Logstash → Elasticsearch → Kibana
Component 1: API Gateway¶
What Is It?¶
The API Gateway is the single front door to your entire microservice ecosystem. Every external request — from a web browser, mobile app, or third-party API — goes through the gateway first.
Think of it as the reception desk at a large company. You don't walk directly to the desk of the person you need. You go through reception, show your ID, get directed to the right floor, and they handle the scheduling.
WITHOUT API Gateway: WITH API Gateway:
Client → Service A (port 3001) Client → Gateway (port 443)
Client → Service B (port 3002) ├── /api/users → Service A
Client → Service C (port 3003) ├── /api/orders → Service B
└── /api/products → Service C
Clients must know ALL service Client knows ONE address only
addresses and ports
What It Does¶
| Responsibility | How |
|---|---|
| Routing | /api/users → User Service, /api/orders → Order Service |
| Authentication | Validate JWT/API keys before requests reach services |
| Rate Limiting | Block abusive clients (e.g., max 100 req/min per IP) |
| SSL Termination | Handle HTTPS at the gateway; internal traffic can be HTTP |
| Load Distribution | Spread requests across multiple service instances |
| Request Transformation | Rewrite headers, aggregate multiple service calls |
| Logging | Central place to log all incoming requests |
Implementation¶
# kong-ingress.yaml — Route traffic to your services
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: api-gateway
annotations:
konghq.com/strip-path: "true"
konghq.com/plugins: rate-limiting,jwt-auth
spec:
ingressClassName: kong
rules:
- host: api.myapp.com
http:
paths:
- path: /api/users
pathType: Prefix
backend:
service:
name: user-service
port:
number: 80
- path: /api/orders
pathType: Prefix
backend:
service:
name: order-service
port:
number: 80
---
# Rate limiting plugin
apiVersion: configuration.konghq.com/v1
kind: KongPlugin
metadata:
name: rate-limiting
config:
minute: 100 # 100 requests per minute per client
policy: local
plugin: rate-limiting
# gateway/main.py — Lightweight custom gateway
import httpx
from fastapi import FastAPI, Request, HTTPException, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt
app = FastAPI(title="API Gateway")
security = HTTPBearer()
SERVICE_MAP = {
"/api/users": "http://user-service:8001",
"/api/orders": "http://order-service:8002",
"/api/products": "http://product-service:8003",
}
SECRET_KEY = "your-jwt-secret"
def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
try:
payload = jwt.decode(credentials.credentials, SECRET_KEY, algorithms=["HS256"])
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(401, "Token expired")
except jwt.InvalidTokenError:
raise HTTPException(401, "Invalid token")
@app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH"])
async def gateway(request: Request, path: str, user=Depends(verify_token)):
# Find the upstream service
prefix = next((p for p in SERVICE_MAP if f"/{path}".startswith(p)), None)
if not prefix:
raise HTTPException(404, "Route not found")
target_url = f"{SERVICE_MAP[prefix]}/{path[len(prefix.lstrip('/')):]}"
# Forward request
async with httpx.AsyncClient() as client:
response = await client.request(
method=request.method,
url=target_url,
headers={**dict(request.headers), "X-User-ID": user["sub"]},
content=await request.body(),
timeout=30.0
)
return response.json()
# nginx.conf
upstream user_service {
server user-service-1:8001;
server user-service-2:8001;
server user-service-3:8001;
least_conn; # send to least busy server
}
upstream order_service {
server order-service-1:8002;
server order-service-2:8002;
}
server {
listen 443 ssl;
server_name api.myapp.com;
# Rate limiting
limit_req_zone $binary_remote_addr zone=api:10m rate=100r/m;
location /api/users/ {
limit_req zone=api burst=20 nodelay;
proxy_pass http://user_service/;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header Authorization $http_authorization;
}
location /api/orders/ {
limit_req zone=api burst=20 nodelay;
proxy_pass http://order_service/;
}
}
2026 Recommendation
Use Kong or Traefik for Kubernetes-native deployments. Use AWS API Gateway or GCP Cloud Endpoints for cloud-native serverless setups. Build your own only for learning or very specific requirements.
Component 2: Service Registry¶
What Is It?¶
In a microservice system, services start and stop constantly — especially in Kubernetes where pods scale up and down. How does the API Gateway know where to send requests? How does Service A find Service B?
The Service Registry is a dynamic directory of all running service instances. Services register themselves when they start and deregister when they stop. Other services query the registry to find the current address of a target service.
Without Service Registry:
Config file: order-service=192.168.1.10:8002
Instance crashes → no one knows → requests fail
With Service Registry:
Service B starts → registers: "I'm order-service at 192.168.1.10:8002"
Service B scales to 3 → 3 entries in registry
Service B instance crashes → registry removes it automatically
Clients always get a live, healthy address
How Service Discovery Works¶
CLIENT-SIDE DISCOVERY:
Service A asks registry: "Where is order-service?"
Registry returns: ["10.0.0.1:8002", "10.0.0.2:8002", "10.0.0.3:8002"]
Service A picks one (round-robin, random, etc.)
Service A calls it directly
SERVER-SIDE DISCOVERY (more common with Kubernetes):
Service A sends request to Load Balancer / API Gateway
Load Balancer queries registry
Load Balancer forwards to a healthy instance
Service A doesn't need to know about the registry at all
Kubernetes: Built-in Service Registry¶
In Kubernetes, the Service object + CoreDNS acts as your service registry automatically:
# user-service.yaml
apiVersion: v1
kind: Service
metadata:
name: user-service # DNS name: user-service.default.svc.cluster.local
namespace: default
spec:
selector:
app: user-service # Automatically tracks matching pods
ports:
- port: 80
targetPort: 8001
type: ClusterIP
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: user-service
spec:
replicas: 3 # 3 instances — all registered automatically
selector:
matchLabels:
app: user-service
template:
metadata:
labels:
app: user-service
spec:
containers:
- name: user-service
image: myregistry/user-service:v1.2.0
ports:
- containerPort: 8001
readinessProbe: # Only register pod when it's ready!
httpGet:
path: /health
port: 8001
initialDelaySeconds: 5
periodSeconds: 10
Now any service can reach user-service at http://user-service — Kubernetes handles the DNS and routing.
Consul (When You Need More)¶
For multi-cluster, multi-cloud, or hybrid environments where Kubernetes DNS isn't enough:
# Registering with Consul
import consul
c = consul.Consul(host="consul-server", port=8500)
# Register this service instance
c.agent.service.register(
name="order-service",
service_id="order-service-instance-1",
address="10.0.0.5",
port=8002,
tags=["v2", "production"],
check=consul.Check.http(
"http://10.0.0.5:8002/health",
interval="10s",
timeout="5s",
deregister="30s" # auto-deregister after 30s of failure
)
)
# Discover a service
_, services = c.health.service("order-service", passing=True)
instances = [(s["Service"]["Address"], s["Service"]["Port"]) for s in services]
print(instances) # [("10.0.0.5", 8002), ("10.0.0.6", 8002)]
Component 3: Service Layer¶
What Is It?¶
The Service Layer is your actual business logic — the microservices themselves. Each service is an independent, deployable unit responsible for one bounded domain.
Service A (User Service): Everything about users
├── Register / Login
├── User profiles
└── Preferences
Service B (Order Service): Everything about orders
├── Create / cancel orders
├── Order history
└── Order status
Service C (Product Service): Everything about products
├── Product catalog
├── Inventory
└── Pricing
Design Principles for Each Service¶
# Each microservice is a small, focused FastAPI application
# order_service/main.py
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from decimal import Decimal
from datetime import datetime
import httpx
app = FastAPI(title="Order Service", version="2.1.0")
class OrderCreate(BaseModel):
user_id: str
product_id: str
quantity: int
shipping_address: str
class OrderResponse(BaseModel):
order_id: str
status: str
total_amount: Decimal
created_at: datetime
@app.get("/health")
def health_check():
"""Health check for readiness probe."""
return {"status": "healthy", "service": "order-service", "version": "2.1.0"}
@app.post("/orders", response_model=OrderResponse)
async def create_order(order: OrderCreate):
# 1. Validate product exists (call Product Service)
async with httpx.AsyncClient() as client:
product_resp = await client.get(f"http://product-service/products/{order.product_id}")
if product_resp.status_code == 404:
raise HTTPException(404, "Product not found")
product = product_resp.json()
# 2. Calculate total
total = Decimal(str(product["price"])) * order.quantity
# 3. Save order to this service's database
order_id = await save_order_to_db(order, total)
# 4. Publish event for other services to react
await publish_event("order.created", {
"order_id": order_id,
"user_id": order.user_id,
"total": str(total)
})
return OrderResponse(
order_id=order_id,
status="confirmed",
total_amount=total,
created_at=datetime.utcnow()
)
The Golden Rules of Microservice Design¶
| Rule | What It Means | Why |
|---|---|---|
| Single Responsibility | One service = one business capability | Easy to change, easy to test |
| Own Your Data | Each service has its own database | No shared DB = no coupling |
| Communicate via APIs or Events | No direct DB access between services | Decoupled, independently deployable |
| Fail Gracefully | Circuit breakers, fallbacks, timeouts | One service failure ≠ full outage |
| Stateless | No session state in memory | Any instance can handle any request |
| Design for Failure | Assume other services will be unavailable | Reality of distributed systems |
Circuit Breaker Pattern¶
# Prevent cascading failures when a dependency is down
from circuitbreaker import circuit
@circuit(failure_threshold=5, recovery_timeout=30)
async def call_product_service(product_id: str) -> dict:
"""If this fails 5 times, circuit opens for 30 seconds."""
async with httpx.AsyncClient(timeout=3.0) as client:
response = await client.get(f"http://product-service/products/{product_id}")
response.raise_for_status()
return response.json()
async def get_product_safe(product_id: str) -> dict:
try:
return await call_product_service(product_id)
except Exception:
# Fallback: return cached or default data
return {"product_id": product_id, "price": 0, "name": "Unknown", "available": False}
Component 4: Authorization Server¶
What Is It?¶
The Authorization Server is a dedicated service that manages who can do what. It handles:
- Authentication: verifying identity ("Are you who you say you are?")
- Authorization: verifying permissions ("Are you allowed to do this?")
- Token issuance: creating JWTs that carry identity and permissions
Request flow with Authorization:
Client ──→ API Gateway
│
▼
[Validate JWT token]
│
┌───────┴──────────┐
│ Token invalid? │ → 401 Unauthorized
│ Token valid? │ → Forward to service
└───────┬──────────┘
│
Service receives request with verified identity
(no need to re-authenticate in each service)
OAuth 2.0 + OpenID Connect — The Standard¶
# Authorization Server using python-jose for JWT
from fastapi import FastAPI, HTTPException, Depends
from fastapi.security import OAuth2PasswordRequestForm, OAuth2PasswordBearer
from jose import JWTError, jwt
from passlib.context import CryptContext
from datetime import datetime, timedelta
from pydantic import BaseModel
app = FastAPI(title="Authorization Server")
SECRET_KEY = "your-256-bit-secret-key"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
REFRESH_TOKEN_EXPIRE_DAYS = 7
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/token")
class Token(BaseModel):
access_token: str
refresh_token: str
token_type: str
expires_in: int
def create_token(data: dict, expires_delta: timedelta) -> str:
payload = {**data, "exp": datetime.utcnow() + expires_delta, "iat": datetime.utcnow()}
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
@app.post("/auth/token", response_model=Token)
async def login(form: OAuth2PasswordRequestForm = Depends()):
user = await get_user_from_db(form.username)
if not user or not pwd_context.verify(form.password, user.hashed_password):
raise HTTPException(401, "Invalid credentials", headers={"WWW-Authenticate": "Bearer"})
access_token = create_token(
data={"sub": user.id, "email": user.email, "roles": user.roles},
expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
)
refresh_token = create_token(
data={"sub": user.id, "type": "refresh"},
expires_delta=timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
)
return Token(
access_token=access_token,
refresh_token=refresh_token,
token_type="bearer",
expires_in=ACCESS_TOKEN_EXPIRE_MINUTES * 60
)
# Middleware for services to verify tokens (shared library)
def get_current_user(token: str = Depends(oauth2_scheme)) -> dict:
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
if payload.get("type") == "refresh":
raise HTTPException(401, "Cannot use refresh token for API access")
return payload
except JWTError:
raise HTTPException(401, "Could not validate credentials")
def require_role(role: str):
def checker(user: dict = Depends(get_current_user)):
if role not in user.get("roles", []):
raise HTTPException(403, f"Role '{role}' required")
return user
return checker
# In any downstream service:
@app.delete("/users/{user_id}")
async def delete_user(user_id: str, admin=Depends(require_role("admin"))):
# Only admins can reach here
await remove_user(user_id)
return {"deleted": user_id}
Keycloak — Production Authorization Server¶
For enterprise production, use Keycloak (open source) instead of rolling your own:
# keycloak/docker-compose.yml
services:
keycloak:
image: quay.io/keycloak/keycloak:24.0
environment:
KC_DB: postgres
KC_DB_URL: jdbc:postgresql://postgres/keycloak
KC_DB_USERNAME: keycloak
KC_DB_PASSWORD: ${KC_DB_PASSWORD}
KEYCLOAK_ADMIN: admin
KEYCLOAK_ADMIN_PASSWORD: ${ADMIN_PASSWORD}
command: start --optimized
ports:
- "8080:8080"
Keycloak gives you: SSO, social login (Google, GitHub), MFA, fine-grained permissions, admin UI, and audit logs — out of the box.
Component 5: Database Layer with Replication¶
What Is It?¶
Each microservice owns its own database. But a single database server is a single point of failure. Replication solves this by keeping copies of the data on multiple servers.
┌─────────────────┐
│ Primary (Read │ ← All WRITES go here
│ + Write) │
└────────┬────────┘
Replication│ (WAL streaming, binlog)
┌──────────────┼──────────────┐
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Replica 1 │ │ Replica 2 │ │ Replica 3 │
│ (Read only) │ │ (Read only) │ │ (Standby) │
└──────────────┘ └──────────────┘ └──────────────┘
↑ ↑ ↑
Heavy read Heavy read Automatic failover
traffic traffic if primary fails
Read/Write Splitting¶
Route writes to primary, reads to replicas to scale:
# database/connection.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
import random
# Separate engines for write and read
WRITE_ENGINE = create_engine(
"postgresql://user:pass@primary-db:5432/orders",
pool_size=10,
max_overflow=20,
)
READ_ENGINES = [
create_engine(f"postgresql://user:pass@replica-{i}:5432/orders", pool_size=10)
for i in range(1, 4) # 3 read replicas
]
def get_write_session():
"""Use for INSERT, UPDATE, DELETE."""
Session = sessionmaker(bind=WRITE_ENGINE)
session = Session()
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
def get_read_session():
"""Use for SELECT — load balanced across replicas."""
engine = random.choice(READ_ENGINES) # or use round-robin
Session = sessionmaker(bind=engine)
session = Session()
try:
yield session
finally:
session.close()
# Usage in FastAPI endpoints
from fastapi import Depends
@app.get("/orders/{order_id}")
async def get_order(order_id: str, db=Depends(get_read_session)):
return db.query(Order).filter(Order.id == order_id).first()
@app.post("/orders")
async def create_order(order: OrderCreate, db=Depends(get_write_session)):
new_order = Order(**order.dict())
db.add(new_order)
# commit happens automatically in context manager
return new_order
Database Per Service Pattern¶
# Each service gets its own database — no sharing!
# order-service-db.yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: order-db
spec:
replicas: 3
serviceName: order-db
selector:
matchLabels:
app: order-db
template:
spec:
containers:
- name: postgres
image: postgres:16
env:
- name: POSTGRES_DB
value: orders # ONLY order data here
- name: PGDATA
value: /data/pgdata
volumeMounts:
- name: data
mountPath: /data
volumeClaimTemplates:
- metadata:
name: data
spec:
accessModes: ["ReadWriteOnce"]
resources:
requests:
storage: 50Gi
Component 6: Distributed Cache¶
What Is It?¶
A cache stores the results of expensive operations (database queries, API calls, complex computations) in fast memory so the next request gets the answer instantly.
WITHOUT cache:
Request → Service → Database query (50ms) → Response ✓
Request → Service → Database query (50ms) → Response ✓ (repeated work!)
10,000 requests = 10,000 database queries
WITH cache (Redis):
Request 1 → Service → DB query (50ms) → Cache result → Response ✓
Request 2 → Service → Cache hit (1ms!) → Response ✓
Request 3 → Service → Cache hit (1ms!) → Response ✓
10,000 requests = 1 DB query + 9,999 cache hits
Redis: The Industry Standard¶
# cache/redis_client.py
import redis.asyncio as redis
import json
from functools import wraps
from typing import Any, Optional, Callable
import hashlib
class CacheManager:
def __init__(self, redis_url: str = "redis://redis-cluster:6379"):
self.redis = redis.from_url(redis_url, decode_responses=True)
async def get(self, key: str) -> Optional[Any]:
value = await self.redis.get(key)
return json.loads(value) if value else None
async def set(self, key: str, value: Any, ttl: int = 300) -> None:
await self.redis.setex(key, ttl, json.dumps(value, default=str))
async def delete(self, key: str) -> None:
await self.redis.delete(key)
async def delete_pattern(self, pattern: str) -> None:
"""Delete all keys matching a pattern — use carefully."""
keys = await self.redis.keys(pattern)
if keys:
await self.redis.delete(*keys)
cache = CacheManager()
# Decorator for easy caching
def cached(ttl: int = 300, key_prefix: str = ""):
def decorator(func: Callable):
@wraps(func)
async def wrapper(*args, **kwargs):
# Build cache key from function name + arguments
key_data = f"{key_prefix}:{func.__name__}:{args}:{sorted(kwargs.items())}"
cache_key = hashlib.md5(key_data.encode()).hexdigest()
# Try cache first
cached_result = await cache.get(cache_key)
if cached_result is not None:
return cached_result
# Cache miss — call the real function
result = await func(*args, **kwargs)
await cache.set(cache_key, result, ttl=ttl)
return result
return wrapper
return decorator
# Usage
@cached(ttl=60, key_prefix="product") # cache for 60 seconds
async def get_product(product_id: str) -> dict:
return await db.query(f"SELECT * FROM products WHERE id = '{product_id}'")
# Cache invalidation when data changes
async def update_product(product_id: str, data: dict):
await db.update("products", product_id, data)
await cache.delete_pattern(f"*product*{product_id}*") # invalidate all related cache
Cache Strategies¶
| Strategy | How | Use When |
|---|---|---|
| Cache-Aside | App checks cache; on miss, loads from DB and caches | Most common, works for any data |
| Write-Through | Write to cache AND DB simultaneously | Data must always be consistent |
| Write-Behind | Write to cache, async write to DB later | High-write scenarios, tolerate slight lag |
| Read-Through | Cache sits in front of DB, handles misses automatically | Simplifies app code |
| TTL Eviction | Items expire after N seconds | Time-sensitive data (prices, availability) |
Session Storage with Redis¶
# Store user sessions in Redis (shared across all service instances)
from fastapi import Request, Response
import uuid
SESSION_TTL = 3600 # 1 hour
async def create_session(user_id: str) -> str:
session_id = str(uuid.uuid4())
await cache.set(
f"session:{session_id}",
{"user_id": user_id, "created_at": str(datetime.utcnow())},
ttl=SESSION_TTL
)
return session_id
async def get_session(session_id: str) -> Optional[dict]:
session = await cache.get(f"session:{session_id}")
if session:
# Extend TTL on activity (sliding window)
await cache.redis.expire(f"session:{session_id}", SESSION_TTL)
return session
async def delete_session(session_id: str) -> None:
await cache.delete(f"session:{session_id}")
Component 7: Distributed Messaging¶
What Is It?¶
When Service A needs to communicate with Service B, it has two choices:
SYNCHRONOUS (REST/gRPC):
Service A ──HTTP──→ Service B (waits for response)
Problem: If B is slow or down, A is stuck waiting.
If A makes 10 synchronous calls, it waits for all 10.
ASYNCHRONOUS (Message Queue):
Service A ──publish event──→ [Queue] ──deliver──→ Service B
A publishes and moves on immediately. B processes at its own pace.
If B is down, messages wait in the queue. B catches up when it recovers.
When to Use Messages Instead of REST¶
| Situation | Use REST | Use Messaging |
|---|---|---|
| Need immediate response | ✓ | |
| Fire-and-forget | ✓ | |
| Multiple services need to react | ✓ | |
| High volume, can process async | ✓ | |
| Long-running operations | ✓ | |
| Service B might be temporarily down | ✓ |
RabbitMQ — The Classic Choice¶
# messaging/publisher.py
import aio_pika
import json
from datetime import datetime
async def publish_event(event_type: str, payload: dict) -> None:
"""Publish an event to the message broker."""
connection = await aio_pika.connect_robust("amqp://user:pass@rabbitmq/")
async with connection:
channel = await connection.channel()
# Declare exchange (creates if doesn't exist)
exchange = await channel.declare_exchange(
"domain_events",
aio_pika.ExchangeType.TOPIC,
durable=True # survive broker restart
)
message_body = {
"event_type": event_type,
"payload": payload,
"timestamp": datetime.utcnow().isoformat(),
"source": "order-service"
}
await exchange.publish(
aio_pika.Message(
body=json.dumps(message_body).encode(),
delivery_mode=aio_pika.DeliveryMode.PERSISTENT, # survive broker restart
content_type="application/json"
),
routing_key=event_type # e.g., "order.created"
)
print(f"Published: {event_type}")
# messaging/consumer.py
async def start_consumer(queue_name: str, routing_pattern: str):
"""Subscribe to events and process them."""
connection = await aio_pika.connect_robust("amqp://user:pass@rabbitmq/")
channel = await connection.channel()
await channel.set_qos(prefetch_count=10) # process 10 messages at a time
exchange = await channel.declare_exchange("domain_events", aio_pika.ExchangeType.TOPIC)
queue = await channel.declare_queue(queue_name, durable=True)
await queue.bind(exchange, routing_key=routing_pattern)
async def process_message(message: aio_pika.abc.AbstractIncomingMessage):
async with message.process(): # auto-ack on success, nack on exception
event = json.loads(message.body)
print(f"Processing: {event['event_type']}")
await handle_event(event)
await queue.consume(process_message)
print(f"Consuming from {queue_name}, pattern: {routing_pattern}")
# Event handlers in different services
# Notification Service listens for order events
async def handle_event(event: dict):
if event["event_type"] == "order.created":
await send_confirmation_email(event["payload"]["user_id"])
elif event["event_type"] == "order.shipped":
await send_shipping_notification(event["payload"])
# Start the consumer (in a FastAPI lifespan or separate worker)
import asyncio
asyncio.run(start_consumer("notification-queue", "order.*"))
Kafka — For High-Throughput Systems¶
# kafka/producer.py — millions of events per second
from confluent_kafka import Producer
import json
producer = Producer({"bootstrap.servers": "kafka-1:9092,kafka-2:9092,kafka-3:9092"})
def delivery_callback(err, msg):
if err:
print(f"Delivery failed: {err}")
else:
print(f"Delivered to {msg.topic()} partition [{msg.partition()}]")
def publish_to_kafka(topic: str, key: str, value: dict):
producer.produce(
topic=topic,
key=key.encode(),
value=json.dumps(value).encode(),
callback=delivery_callback
)
producer.poll(0) # trigger callbacks
# kafka/consumer.py
from confluent_kafka import Consumer
consumer = Consumer({
"bootstrap.servers": "kafka-1:9092,kafka-2:9092",
"group.id": "payment-service-group", # consumers in same group share partitions
"auto.offset.reset": "earliest",
"enable.auto.commit": False # manual commit for exactly-once processing
})
consumer.subscribe(["order.created"])
while True:
msg = consumer.poll(1.0) # wait up to 1 second
if msg and not msg.error():
event = json.loads(msg.value())
process_order_event(event)
consumer.commit() # commit only after successful processing
Component 8: Metrics (Prometheus + Grafana)¶
What Is It?¶
You cannot manage what you cannot measure. The metrics stack collects numerical data about your system's behavior over time and visualizes it so you can detect problems, understand trends, and set alerts.
Services → expose /metrics endpoint (Prometheus format)
│
▼
Prometheus scrapes /metrics every 15 seconds
│
▼
Stores time-series data
(e.g., "at 14:32:01, error_rate was 0.02")
│
▼
Grafana queries Prometheus
│
▼
Beautiful dashboards + alerts
Instrumenting Your Service¶
# metrics/instrumentation.py
from prometheus_client import Counter, Histogram, Gauge, generate_latest, CONTENT_TYPE_LATEST
from fastapi import FastAPI, Request, Response
import time
# Define metrics
REQUEST_COUNT = Counter(
"http_requests_total",
"Total HTTP requests",
["method", "endpoint", "status_code", "service"]
)
REQUEST_DURATION = Histogram(
"http_request_duration_seconds",
"HTTP request duration in seconds",
["method", "endpoint"],
buckets=[0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0]
)
ACTIVE_CONNECTIONS = Gauge(
"http_active_connections",
"Currently active HTTP connections"
)
DB_QUERY_DURATION = Histogram(
"database_query_duration_seconds",
"Database query duration",
["operation", "table"]
)
CACHE_HITS = Counter("cache_hits_total", "Cache hits", ["cache_type"])
CACHE_MISSES = Counter("cache_misses_total", "Cache misses", ["cache_type"])
# FastAPI middleware to auto-instrument all requests
app = FastAPI()
@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
ACTIVE_CONNECTIONS.inc()
start = time.time()
response = await call_next(request)
duration = time.time() - start
endpoint = request.url.path
REQUEST_COUNT.labels(
method=request.method,
endpoint=endpoint,
status_code=response.status_code,
service="order-service"
).inc()
REQUEST_DURATION.labels(
method=request.method,
endpoint=endpoint
).observe(duration)
ACTIVE_CONNECTIONS.dec()
return response
@app.get("/metrics")
def metrics():
"""Prometheus scrapes this endpoint."""
return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST)
# Usage in service code:
async def get_orders_from_db(user_id: str):
with DB_QUERY_DURATION.labels(operation="select", table="orders").time():
return await db.execute("SELECT * FROM orders WHERE user_id = $1", user_id)
Prometheus Configuration¶
# prometheus/prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
rule_files:
- "alerts.yml"
scrape_configs:
- job_name: "microservices"
kubernetes_sd_configs:
- role: pod
relabel_configs:
- source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_scrape]
action: keep
regex: "true"
- source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_path]
action: replace
target_label: __metrics_path__
regex: (.+)
- job_name: "redis"
static_configs:
- targets: ["redis-exporter:9121"]
- job_name: "postgres"
static_configs:
- targets: ["postgres-exporter:9187"]
Alerting Rules¶
# prometheus/alerts.yml
groups:
- name: microservice_alerts
rules:
- alert: HighErrorRate
expr: |
rate(http_requests_total{status_code=~"5.."}[5m]) /
rate(http_requests_total[5m]) > 0.01
for: 2m
labels:
severity: critical
annotations:
summary: "High error rate on {{ $labels.service }}"
description: "Error rate is {{ $value | humanizePercentage }} (threshold: 1%)"
- alert: SlowResponseTime
expr: |
histogram_quantile(0.99, rate(http_request_duration_seconds_bucket[5m])) > 2.0
for: 5m
labels:
severity: warning
annotations:
summary: "Slow p99 latency on {{ $labels.service }}"
description: "p99 latency is {{ $value }}s (threshold: 2s)"
- alert: ServiceDown
expr: up == 0
for: 1m
labels:
severity: critical
annotations:
summary: "Service {{ $labels.job }} is DOWN"
The RED Method — What to Put on Your Dashboard¶
| Metric | Query | Alert Threshold |
|---|---|---|
| Rate (req/sec) | rate(http_requests_total[5m]) | Drop to <50% of baseline |
| Errors (%) | rate(5xx_total[5m]) / rate(total[5m]) * 100 | > 1% |
| Duration p99 | histogram_quantile(0.99, rate(duration_bucket[5m])) | > 2 seconds |
Component 9: Centralized Logging (ELK Stack)¶
What Is It?¶
When something breaks in production, you need to find out what happened. With 20 microservices each writing logs to their own files, searching for the cause of a failure would be impossible.
The ELK stack (Elasticsearch + Logstash + Kibana) centralizes all logs in one searchable place.
All Services → emit JSON logs to stdout
│
▼
Logstash / Fluent Bit ← collect, parse, enrich logs
│
▼
Elasticsearch ← store and index logs (searchable)
│
▼
Kibana ← visualize, search, alert
Structured Logging: The Foundation¶
Logs must be structured JSON to be searchable:
# logging/setup.py
import structlog
import logging
import sys
def configure_logging(service_name: str, version: str):
"""Configure structured JSON logging for the service."""
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_log_level,
structlog.stdlib.add_logger_name,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer() # output as JSON!
],
wrapper_class=structlog.stdlib.BoundLogger,
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
)
log = structlog.get_logger()
# Usage in your services
log.info("order.created",
order_id="ORD-12345",
user_id="USR-789",
amount=299.99,
currency="USD",
trace_id="abc123", # for cross-service correlation
service=service_name,
version=version
)
log.error("payment.failed",
order_id="ORD-12345",
error_code="card_declined",
error_message="Insufficient funds",
trace_id="abc123",
exc_info=True
)
Output:
{
"timestamp": "2026-05-18T10:23:41.123Z",
"level": "error",
"event": "payment.failed",
"order_id": "ORD-12345",
"error_code": "card_declined",
"error_message": "Insufficient funds",
"trace_id": "abc123",
"service": "payment-service",
"version": "2.3.1"
}
Fluent Bit — Kubernetes Log Collector¶
# fluent-bit-config.yaml (DaemonSet — runs on every node)
apiVersion: v1
kind: ConfigMap
metadata:
name: fluent-bit-config
data:
fluent-bit.conf: |
[SERVICE]
Flush 5
Log_Level info
[INPUT]
Name tail
Tag kube.*
Path /var/log/containers/*.log
Parser docker
Mem_Buf_Limit 50MB
[FILTER]
Name kubernetes
Match kube.*
Kube_URL https://kubernetes.default.svc:443
Merge_Log On # merge JSON logs into structured fields
K8S-Logging.Parser On
[OUTPUT]
Name es
Match *
Host elasticsearch-master
Port 9200
Index microservices-logs
Time_Key @timestamp
Retry_Limit 5
Elasticsearch Index Template¶
# Configure Elasticsearch for optimal log storage
from elasticsearch import Elasticsearch
es = Elasticsearch(["http://elasticsearch:9200"])
# Index template for all microservice logs
es.indices.put_index_template(
name="microservices-logs",
body={
"index_patterns": ["microservices-logs-*"],
"template": {
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1,
"index.lifecycle.name": "logs-policy", # auto-delete after 30 days
},
"mappings": {
"properties": {
"@timestamp": {"type": "date"},
"level": {"type": "keyword"},
"service": {"type": "keyword"},
"trace_id": {"type": "keyword"},
"user_id": {"type": "keyword"},
"order_id": {"type": "keyword"},
"event": {"type": "text", "fields": {"keyword": {"type": "keyword"}}},
"error_message": {"type": "text"},
}
}
}
}
)
# Search logs by trace_id to follow a request across services
results = es.search(index="microservices-logs-*", body={
"query": {
"term": {"trace_id": "abc123"}
},
"sort": [{"@timestamp": "asc"}],
"size": 100
})
for hit in results["hits"]["hits"]:
source = hit["_source"]
print(f"[{source['@timestamp']}] [{source['service']}] {source['event']}")
The Complete Architecture: How All 9 Components Work Together¶
Here's a real request — "User places an order" — flowing through all 9 components:
1. Client sends POST /api/orders with JWT token
2. API GATEWAY (①) receives request
→ Validates JWT with Authorization Server (④)
→ Rate limit check: OK (not exceeded)
→ Routes to Order Service (Service B) via Service Registry (②)
3. ORDER SERVICE (③) receives request
→ Checks Distributed Cache (⑥) for product price: MISS
→ Calls Product Service via Service Registry (②)
→ Product Service queries its Database (⑤), writes to Cache (⑥)
→ Order Service saves order to its own Database (⑤)
→ Publishes "order.created" to Message Queue (⑦)
→ Logs structured event (⑨): {"event": "order.created", "trace_id": "xyz"}
→ Prometheus metrics updated (⑧): request_count++, latency recorded
→ Returns 201 Created to API Gateway → Client
4. Async: Message Queue (⑦) delivers "order.created" to:
→ Notification Service: sends confirmation email
→ Inventory Service: decrements stock
→ Analytics Service: records the sale
5. Grafana dashboard (⑧) shows:
→ +1 order in last 5 minutes
→ Latency p99: 142ms ✓
6. Kibana (⑨) shows the full trace_id "xyz" across all services:
→ 10:23:41.000 [api-gateway] request received
→ 10:23:41.012 [order-service] processing started
→ 10:23:41.034 [product-service] product lookup
→ 10:23:41.089 [order-service] order saved
→ 10:23:41.102 [order-service] event published
→ 10:23:41.110 [api-gateway] response sent (latency: 110ms)
Summary¶
| # | Component | Purpose | 2026 Tool Recommendation |
|---|---|---|---|
| 1 | API Gateway | Single entry point, routing, rate limiting, auth | Kong, Traefik, AWS API GW |
| 2 | Service Registry | Dynamic service discovery | Kubernetes DNS (built-in), Consul |
| 3 | Service Layer | Your business logic, independently deployable | FastAPI, Spring Boot, Go Fiber |
| 4 | Authorization Server | Identity, JWT tokens, RBAC | Keycloak, Auth0, AWS Cognito |
| 5 | Database Layer | Persistent storage with replication | PostgreSQL, MongoDB + read replicas |
| 6 | Distributed Cache | Fast read, session storage, rate limiting | Redis Cluster, Valkey |
| 7 | Distributed Messaging | Async decoupled communication | Kafka (high volume), RabbitMQ (flexible) |
| 8 | Metrics | Measure system health, alert on anomalies | Prometheus + Grafana |
| 9 | Centralized Logging | Search and trace across all services | ELK Stack, Loki + Grafana |
The most important thing to understand: each component exists to solve a specific problem that appears when you go from one service on one server to many services on many servers. You don't need all 9 from day one — but you will need all of them before you can call your system truly production-ready.
Start with the API Gateway and at least one database. Add the Authorization Server before you handle real users. Add the cache when your database becomes the bottleneck. Add messaging when services need to react to each other's events. Add metrics and logging on day one — you'll regret it deeply if you don't.
Implementation Order for a New System
- Week 1: Service Layer + Database (build the thing)
- Week 2: API Gateway + Authorization Server (make it secure)
- Week 3: Logging + Metrics (make it observable)
- Week 4: Cache + Service Registry (make it fast and scalable)
- Month 2+: Distributed Messaging (make it resilient and event-driven)
Questions or discussion? Connect on LinkedIn, X or reach out via email.
Discussion
Have thoughts on this post? Share them below — questions, corrections, or your own experience are all welcome.