Skip to content

9 Essential Components of a Production Microservice App

So you've built a microservice. It works on your laptop. Now your manager says "take it to production" — and suddenly you realize a working service and a production-ready service are two very different things.

This guide walks through all 9 essential components that every production microservice system must have, using the architecture diagram below as our map. By the end, you'll understand not just what each component is, but why it exists, how it works, and how to implement it in real code.

                    Clients (web, mobile, 3rd-party)
                    ┌───────▼────────┐
                    │  1. API        │
                    │  Gateway       │◄── 2. Service Registry
                    └───────┬────────┘
           ┌────────────────┼────────────────┐
           ▼                ▼                ▼
    ┌─────────────┐  ┌─────────────┐  ┌─────────────┐
    │  Service A  │  │  Service B  │  │  Service C  │  ← 3. Service Layer
    └──────┬──────┘  └──────┬──────┘  └──────┬──────┘
           │  ╔═══════════╗ │                 │
           └──║  7. Msg   ║─┘         4. Authorization
              ║  Queue    ║                Server
              ╚═════╤═════╝
          ┌─────────▼──────────┐
          │   Load Balancer    │◄── 6. Distributed Cache (Redis)
          └─────────┬──────────┘
          ┌─────────▼──────────┐
          │  5. Database Layer │
          │  (Primary + Read   │
          │   Replicas)        │
          └────────────────────┘

  8. Metrics: Prometheus → Grafana
  9. Logs:    Logstash → Elasticsearch → Kibana

Component 1: API Gateway

What Is It?

The API Gateway is the single front door to your entire microservice ecosystem. Every external request — from a web browser, mobile app, or third-party API — goes through the gateway first.

Think of it as the reception desk at a large company. You don't walk directly to the desk of the person you need. You go through reception, show your ID, get directed to the right floor, and they handle the scheduling.

WITHOUT API Gateway:              WITH API Gateway:
Client → Service A (port 3001)    Client → Gateway (port 443)
Client → Service B (port 3002)         ├── /api/users → Service A
Client → Service C (port 3003)         ├── /api/orders → Service B
                                        └── /api/products → Service C
Clients must know ALL service            Client knows ONE address only
addresses and ports

What It Does

Responsibility How
Routing /api/users → User Service, /api/orders → Order Service
Authentication Validate JWT/API keys before requests reach services
Rate Limiting Block abusive clients (e.g., max 100 req/min per IP)
SSL Termination Handle HTTPS at the gateway; internal traffic can be HTTP
Load Distribution Spread requests across multiple service instances
Request Transformation Rewrite headers, aggregate multiple service calls
Logging Central place to log all incoming requests

Implementation

# kong-ingress.yaml — Route traffic to your services
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: api-gateway
  annotations:
    konghq.com/strip-path: "true"
    konghq.com/plugins: rate-limiting,jwt-auth
spec:
  ingressClassName: kong
  rules:
  - host: api.myapp.com
    http:
      paths:
      - path: /api/users
        pathType: Prefix
        backend:
          service:
            name: user-service
            port:
              number: 80
      - path: /api/orders
        pathType: Prefix
        backend:
          service:
            name: order-service
            port:
              number: 80
---
# Rate limiting plugin
apiVersion: configuration.konghq.com/v1
kind: KongPlugin
metadata:
  name: rate-limiting
config:
  minute: 100        # 100 requests per minute per client
  policy: local
plugin: rate-limiting
# gateway/main.py — Lightweight custom gateway
import httpx
from fastapi import FastAPI, Request, HTTPException, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt

app = FastAPI(title="API Gateway")
security = HTTPBearer()

SERVICE_MAP = {
    "/api/users":    "http://user-service:8001",
    "/api/orders":   "http://order-service:8002",
    "/api/products": "http://product-service:8003",
}

SECRET_KEY = "your-jwt-secret"

def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
    try:
        payload = jwt.decode(credentials.credentials, SECRET_KEY, algorithms=["HS256"])
        return payload
    except jwt.ExpiredSignatureError:
        raise HTTPException(401, "Token expired")
    except jwt.InvalidTokenError:
        raise HTTPException(401, "Invalid token")

@app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH"])
async def gateway(request: Request, path: str, user=Depends(verify_token)):
    # Find the upstream service
    prefix = next((p for p in SERVICE_MAP if f"/{path}".startswith(p)), None)
    if not prefix:
        raise HTTPException(404, "Route not found")

    target_url = f"{SERVICE_MAP[prefix]}/{path[len(prefix.lstrip('/')):]}"

    # Forward request
    async with httpx.AsyncClient() as client:
        response = await client.request(
            method=request.method,
            url=target_url,
            headers={**dict(request.headers), "X-User-ID": user["sub"]},
            content=await request.body(),
            timeout=30.0
        )
    return response.json()
# nginx.conf
upstream user_service {
    server user-service-1:8001;
    server user-service-2:8001;
    server user-service-3:8001;
    least_conn;  # send to least busy server
}

upstream order_service {
    server order-service-1:8002;
    server order-service-2:8002;
}

server {
    listen 443 ssl;
    server_name api.myapp.com;

    # Rate limiting
    limit_req_zone $binary_remote_addr zone=api:10m rate=100r/m;

    location /api/users/ {
        limit_req zone=api burst=20 nodelay;
        proxy_pass http://user_service/;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header Authorization $http_authorization;
    }

    location /api/orders/ {
        limit_req zone=api burst=20 nodelay;
        proxy_pass http://order_service/;
    }
}

2026 Recommendation

Use Kong or Traefik for Kubernetes-native deployments. Use AWS API Gateway or GCP Cloud Endpoints for cloud-native serverless setups. Build your own only for learning or very specific requirements.


Component 2: Service Registry

What Is It?

In a microservice system, services start and stop constantly — especially in Kubernetes where pods scale up and down. How does the API Gateway know where to send requests? How does Service A find Service B?

The Service Registry is a dynamic directory of all running service instances. Services register themselves when they start and deregister when they stop. Other services query the registry to find the current address of a target service.

Without Service Registry:
  Config file: order-service=192.168.1.10:8002
  Instance crashes → no one knows → requests fail

With Service Registry:
  Service B starts → registers: "I'm order-service at 192.168.1.10:8002"
  Service B scales to 3 → 3 entries in registry
  Service B instance crashes → registry removes it automatically
  Clients always get a live, healthy address

How Service Discovery Works

CLIENT-SIDE DISCOVERY:
  Service A asks registry: "Where is order-service?"
  Registry returns: ["10.0.0.1:8002", "10.0.0.2:8002", "10.0.0.3:8002"]
  Service A picks one (round-robin, random, etc.)
  Service A calls it directly

SERVER-SIDE DISCOVERY (more common with Kubernetes):
  Service A sends request to Load Balancer / API Gateway
  Load Balancer queries registry
  Load Balancer forwards to a healthy instance
  Service A doesn't need to know about the registry at all

Kubernetes: Built-in Service Registry

In Kubernetes, the Service object + CoreDNS acts as your service registry automatically:

# user-service.yaml
apiVersion: v1
kind: Service
metadata:
  name: user-service          # DNS name: user-service.default.svc.cluster.local
  namespace: default
spec:
  selector:
    app: user-service         # Automatically tracks matching pods
  ports:
  - port: 80
    targetPort: 8001
  type: ClusterIP
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: user-service
spec:
  replicas: 3                 # 3 instances — all registered automatically
  selector:
    matchLabels:
      app: user-service
  template:
    metadata:
      labels:
        app: user-service
    spec:
      containers:
      - name: user-service
        image: myregistry/user-service:v1.2.0
        ports:
        - containerPort: 8001
        readinessProbe:        # Only register pod when it's ready!
          httpGet:
            path: /health
            port: 8001
          initialDelaySeconds: 5
          periodSeconds: 10

Now any service can reach user-service at http://user-service — Kubernetes handles the DNS and routing.

Consul (When You Need More)

For multi-cluster, multi-cloud, or hybrid environments where Kubernetes DNS isn't enough:

# Registering with Consul
import consul

c = consul.Consul(host="consul-server", port=8500)

# Register this service instance
c.agent.service.register(
    name="order-service",
    service_id="order-service-instance-1",
    address="10.0.0.5",
    port=8002,
    tags=["v2", "production"],
    check=consul.Check.http(
        "http://10.0.0.5:8002/health",
        interval="10s",
        timeout="5s",
        deregister="30s"  # auto-deregister after 30s of failure
    )
)

# Discover a service
_, services = c.health.service("order-service", passing=True)
instances = [(s["Service"]["Address"], s["Service"]["Port"]) for s in services]
print(instances)  # [("10.0.0.5", 8002), ("10.0.0.6", 8002)]

Component 3: Service Layer

What Is It?

The Service Layer is your actual business logic — the microservices themselves. Each service is an independent, deployable unit responsible for one bounded domain.

Service A (User Service):     Everything about users
  ├── Register / Login
  ├── User profiles
  └── Preferences

Service B (Order Service):    Everything about orders
  ├── Create / cancel orders
  ├── Order history
  └── Order status

Service C (Product Service):  Everything about products
  ├── Product catalog
  ├── Inventory
  └── Pricing

Design Principles for Each Service

# Each microservice is a small, focused FastAPI application
# order_service/main.py

from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from decimal import Decimal
from datetime import datetime
import httpx

app = FastAPI(title="Order Service", version="2.1.0")

class OrderCreate(BaseModel):
    user_id: str
    product_id: str
    quantity: int
    shipping_address: str

class OrderResponse(BaseModel):
    order_id: str
    status: str
    total_amount: Decimal
    created_at: datetime

@app.get("/health")
def health_check():
    """Health check for readiness probe."""
    return {"status": "healthy", "service": "order-service", "version": "2.1.0"}

@app.post("/orders", response_model=OrderResponse)
async def create_order(order: OrderCreate):
    # 1. Validate product exists (call Product Service)
    async with httpx.AsyncClient() as client:
        product_resp = await client.get(f"http://product-service/products/{order.product_id}")
        if product_resp.status_code == 404:
            raise HTTPException(404, "Product not found")
        product = product_resp.json()

    # 2. Calculate total
    total = Decimal(str(product["price"])) * order.quantity

    # 3. Save order to this service's database
    order_id = await save_order_to_db(order, total)

    # 4. Publish event for other services to react
    await publish_event("order.created", {
        "order_id": order_id,
        "user_id": order.user_id,
        "total": str(total)
    })

    return OrderResponse(
        order_id=order_id,
        status="confirmed",
        total_amount=total,
        created_at=datetime.utcnow()
    )

The Golden Rules of Microservice Design

Rule What It Means Why
Single Responsibility One service = one business capability Easy to change, easy to test
Own Your Data Each service has its own database No shared DB = no coupling
Communicate via APIs or Events No direct DB access between services Decoupled, independently deployable
Fail Gracefully Circuit breakers, fallbacks, timeouts One service failure ≠ full outage
Stateless No session state in memory Any instance can handle any request
Design for Failure Assume other services will be unavailable Reality of distributed systems

Circuit Breaker Pattern

# Prevent cascading failures when a dependency is down
from circuitbreaker import circuit

@circuit(failure_threshold=5, recovery_timeout=30)
async def call_product_service(product_id: str) -> dict:
    """If this fails 5 times, circuit opens for 30 seconds."""
    async with httpx.AsyncClient(timeout=3.0) as client:
        response = await client.get(f"http://product-service/products/{product_id}")
        response.raise_for_status()
        return response.json()

async def get_product_safe(product_id: str) -> dict:
    try:
        return await call_product_service(product_id)
    except Exception:
        # Fallback: return cached or default data
        return {"product_id": product_id, "price": 0, "name": "Unknown", "available": False}

Component 4: Authorization Server

What Is It?

The Authorization Server is a dedicated service that manages who can do what. It handles:

  • Authentication: verifying identity ("Are you who you say you are?")
  • Authorization: verifying permissions ("Are you allowed to do this?")
  • Token issuance: creating JWTs that carry identity and permissions
Request flow with Authorization:
  Client ──→ API Gateway
          [Validate JWT token]
          ┌───────┴──────────┐
          │ Token invalid?   │ → 401 Unauthorized
          │ Token valid?     │ → Forward to service
          └───────┬──────────┘
          Service receives request with verified identity
          (no need to re-authenticate in each service)

OAuth 2.0 + OpenID Connect — The Standard

# Authorization Server using python-jose for JWT
from fastapi import FastAPI, HTTPException, Depends
from fastapi.security import OAuth2PasswordRequestForm, OAuth2PasswordBearer
from jose import JWTError, jwt
from passlib.context import CryptContext
from datetime import datetime, timedelta
from pydantic import BaseModel

app = FastAPI(title="Authorization Server")

SECRET_KEY = "your-256-bit-secret-key"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
REFRESH_TOKEN_EXPIRE_DAYS = 7

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/token")

class Token(BaseModel):
    access_token: str
    refresh_token: str
    token_type: str
    expires_in: int

def create_token(data: dict, expires_delta: timedelta) -> str:
    payload = {**data, "exp": datetime.utcnow() + expires_delta, "iat": datetime.utcnow()}
    return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)

@app.post("/auth/token", response_model=Token)
async def login(form: OAuth2PasswordRequestForm = Depends()):
    user = await get_user_from_db(form.username)
    if not user or not pwd_context.verify(form.password, user.hashed_password):
        raise HTTPException(401, "Invalid credentials", headers={"WWW-Authenticate": "Bearer"})

    access_token = create_token(
        data={"sub": user.id, "email": user.email, "roles": user.roles},
        expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    )
    refresh_token = create_token(
        data={"sub": user.id, "type": "refresh"},
        expires_delta=timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
    )

    return Token(
        access_token=access_token,
        refresh_token=refresh_token,
        token_type="bearer",
        expires_in=ACCESS_TOKEN_EXPIRE_MINUTES * 60
    )

# Middleware for services to verify tokens (shared library)
def get_current_user(token: str = Depends(oauth2_scheme)) -> dict:
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        if payload.get("type") == "refresh":
            raise HTTPException(401, "Cannot use refresh token for API access")
        return payload
    except JWTError:
        raise HTTPException(401, "Could not validate credentials")

def require_role(role: str):
    def checker(user: dict = Depends(get_current_user)):
        if role not in user.get("roles", []):
            raise HTTPException(403, f"Role '{role}' required")
        return user
    return checker

# In any downstream service:
@app.delete("/users/{user_id}")
async def delete_user(user_id: str, admin=Depends(require_role("admin"))):
    # Only admins can reach here
    await remove_user(user_id)
    return {"deleted": user_id}

Keycloak — Production Authorization Server

For enterprise production, use Keycloak (open source) instead of rolling your own:

# keycloak/docker-compose.yml
services:
  keycloak:
    image: quay.io/keycloak/keycloak:24.0
    environment:
      KC_DB: postgres
      KC_DB_URL: jdbc:postgresql://postgres/keycloak
      KC_DB_USERNAME: keycloak
      KC_DB_PASSWORD: ${KC_DB_PASSWORD}
      KEYCLOAK_ADMIN: admin
      KEYCLOAK_ADMIN_PASSWORD: ${ADMIN_PASSWORD}
    command: start --optimized
    ports:
    - "8080:8080"

Keycloak gives you: SSO, social login (Google, GitHub), MFA, fine-grained permissions, admin UI, and audit logs — out of the box.


Component 5: Database Layer with Replication

What Is It?

Each microservice owns its own database. But a single database server is a single point of failure. Replication solves this by keeping copies of the data on multiple servers.

                    ┌─────────────────┐
                    │  Primary (Read  │  ← All WRITES go here
                    │   + Write)      │
                    └────────┬────────┘
                   Replication│ (WAL streaming, binlog)
              ┌──────────────┼──────────────┐
              ▼              ▼              ▼
    ┌──────────────┐ ┌──────────────┐ ┌──────────────┐
    │  Replica 1   │ │  Replica 2   │ │  Replica 3   │
    │  (Read only) │ │  (Read only) │ │  (Standby)   │
    └──────────────┘ └──────────────┘ └──────────────┘
         ↑                  ↑               ↑
    Heavy read         Heavy read      Automatic failover
    traffic            traffic         if primary fails

Read/Write Splitting

Route writes to primary, reads to replicas to scale:

# database/connection.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
import random

# Separate engines for write and read
WRITE_ENGINE = create_engine(
    "postgresql://user:pass@primary-db:5432/orders",
    pool_size=10,
    max_overflow=20,
)

READ_ENGINES = [
    create_engine(f"postgresql://user:pass@replica-{i}:5432/orders", pool_size=10)
    for i in range(1, 4)  # 3 read replicas
]

def get_write_session():
    """Use for INSERT, UPDATE, DELETE."""
    Session = sessionmaker(bind=WRITE_ENGINE)
    session = Session()
    try:
        yield session
        session.commit()
    except Exception:
        session.rollback()
        raise
    finally:
        session.close()

def get_read_session():
    """Use for SELECT — load balanced across replicas."""
    engine = random.choice(READ_ENGINES)  # or use round-robin
    Session = sessionmaker(bind=engine)
    session = Session()
    try:
        yield session
    finally:
        session.close()

# Usage in FastAPI endpoints
from fastapi import Depends

@app.get("/orders/{order_id}")
async def get_order(order_id: str, db=Depends(get_read_session)):
    return db.query(Order).filter(Order.id == order_id).first()

@app.post("/orders")
async def create_order(order: OrderCreate, db=Depends(get_write_session)):
    new_order = Order(**order.dict())
    db.add(new_order)
    # commit happens automatically in context manager
    return new_order

Database Per Service Pattern

# Each service gets its own database — no sharing!
# order-service-db.yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: order-db
spec:
  replicas: 3
  serviceName: order-db
  selector:
    matchLabels:
      app: order-db
  template:
    spec:
      containers:
      - name: postgres
        image: postgres:16
        env:
        - name: POSTGRES_DB
          value: orders          # ONLY order data here
        - name: PGDATA
          value: /data/pgdata
        volumeMounts:
        - name: data
          mountPath: /data
  volumeClaimTemplates:
  - metadata:
      name: data
    spec:
      accessModes: ["ReadWriteOnce"]
      resources:
        requests:
          storage: 50Gi

Component 6: Distributed Cache

What Is It?

A cache stores the results of expensive operations (database queries, API calls, complex computations) in fast memory so the next request gets the answer instantly.

WITHOUT cache:
  Request → Service → Database query (50ms) → Response ✓
  Request → Service → Database query (50ms) → Response ✓  (repeated work!)
  10,000 requests = 10,000 database queries

WITH cache (Redis):
  Request 1 → Service → DB query (50ms) → Cache result → Response ✓
  Request 2 → Service → Cache hit (1ms!) → Response ✓
  Request 3 → Service → Cache hit (1ms!) → Response ✓
  10,000 requests = 1 DB query + 9,999 cache hits

Redis: The Industry Standard

# cache/redis_client.py
import redis.asyncio as redis
import json
from functools import wraps
from typing import Any, Optional, Callable
import hashlib

class CacheManager:
    def __init__(self, redis_url: str = "redis://redis-cluster:6379"):
        self.redis = redis.from_url(redis_url, decode_responses=True)

    async def get(self, key: str) -> Optional[Any]:
        value = await self.redis.get(key)
        return json.loads(value) if value else None

    async def set(self, key: str, value: Any, ttl: int = 300) -> None:
        await self.redis.setex(key, ttl, json.dumps(value, default=str))

    async def delete(self, key: str) -> None:
        await self.redis.delete(key)

    async def delete_pattern(self, pattern: str) -> None:
        """Delete all keys matching a pattern — use carefully."""
        keys = await self.redis.keys(pattern)
        if keys:
            await self.redis.delete(*keys)

cache = CacheManager()

# Decorator for easy caching
def cached(ttl: int = 300, key_prefix: str = ""):
    def decorator(func: Callable):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            # Build cache key from function name + arguments
            key_data = f"{key_prefix}:{func.__name__}:{args}:{sorted(kwargs.items())}"
            cache_key = hashlib.md5(key_data.encode()).hexdigest()

            # Try cache first
            cached_result = await cache.get(cache_key)
            if cached_result is not None:
                return cached_result

            # Cache miss — call the real function
            result = await func(*args, **kwargs)
            await cache.set(cache_key, result, ttl=ttl)
            return result
        return wrapper
    return decorator

# Usage
@cached(ttl=60, key_prefix="product")  # cache for 60 seconds
async def get_product(product_id: str) -> dict:
    return await db.query(f"SELECT * FROM products WHERE id = '{product_id}'")

# Cache invalidation when data changes
async def update_product(product_id: str, data: dict):
    await db.update("products", product_id, data)
    await cache.delete_pattern(f"*product*{product_id}*")  # invalidate all related cache

Cache Strategies

Strategy How Use When
Cache-Aside App checks cache; on miss, loads from DB and caches Most common, works for any data
Write-Through Write to cache AND DB simultaneously Data must always be consistent
Write-Behind Write to cache, async write to DB later High-write scenarios, tolerate slight lag
Read-Through Cache sits in front of DB, handles misses automatically Simplifies app code
TTL Eviction Items expire after N seconds Time-sensitive data (prices, availability)

Session Storage with Redis

# Store user sessions in Redis (shared across all service instances)
from fastapi import Request, Response
import uuid

SESSION_TTL = 3600  # 1 hour

async def create_session(user_id: str) -> str:
    session_id = str(uuid.uuid4())
    await cache.set(
        f"session:{session_id}",
        {"user_id": user_id, "created_at": str(datetime.utcnow())},
        ttl=SESSION_TTL
    )
    return session_id

async def get_session(session_id: str) -> Optional[dict]:
    session = await cache.get(f"session:{session_id}")
    if session:
        # Extend TTL on activity (sliding window)
        await cache.redis.expire(f"session:{session_id}", SESSION_TTL)
    return session

async def delete_session(session_id: str) -> None:
    await cache.delete(f"session:{session_id}")

Component 7: Distributed Messaging

What Is It?

When Service A needs to communicate with Service B, it has two choices:

SYNCHRONOUS (REST/gRPC):
  Service A ──HTTP──→ Service B (waits for response)

  Problem: If B is slow or down, A is stuck waiting.
  If A makes 10 synchronous calls, it waits for all 10.

ASYNCHRONOUS (Message Queue):
  Service A ──publish event──→ [Queue] ──deliver──→ Service B

  A publishes and moves on immediately. B processes at its own pace.
  If B is down, messages wait in the queue. B catches up when it recovers.

When to Use Messages Instead of REST

Situation Use REST Use Messaging
Need immediate response
Fire-and-forget
Multiple services need to react
High volume, can process async
Long-running operations
Service B might be temporarily down

RabbitMQ — The Classic Choice

# messaging/publisher.py
import aio_pika
import json
from datetime import datetime

async def publish_event(event_type: str, payload: dict) -> None:
    """Publish an event to the message broker."""
    connection = await aio_pika.connect_robust("amqp://user:pass@rabbitmq/")

    async with connection:
        channel = await connection.channel()

        # Declare exchange (creates if doesn't exist)
        exchange = await channel.declare_exchange(
            "domain_events",
            aio_pika.ExchangeType.TOPIC,
            durable=True  # survive broker restart
        )

        message_body = {
            "event_type": event_type,
            "payload": payload,
            "timestamp": datetime.utcnow().isoformat(),
            "source": "order-service"
        }

        await exchange.publish(
            aio_pika.Message(
                body=json.dumps(message_body).encode(),
                delivery_mode=aio_pika.DeliveryMode.PERSISTENT,  # survive broker restart
                content_type="application/json"
            ),
            routing_key=event_type  # e.g., "order.created"
        )
        print(f"Published: {event_type}")

# messaging/consumer.py
async def start_consumer(queue_name: str, routing_pattern: str):
    """Subscribe to events and process them."""
    connection = await aio_pika.connect_robust("amqp://user:pass@rabbitmq/")
    channel = await connection.channel()
    await channel.set_qos(prefetch_count=10)  # process 10 messages at a time

    exchange = await channel.declare_exchange("domain_events", aio_pika.ExchangeType.TOPIC)
    queue = await channel.declare_queue(queue_name, durable=True)
    await queue.bind(exchange, routing_key=routing_pattern)

    async def process_message(message: aio_pika.abc.AbstractIncomingMessage):
        async with message.process():  # auto-ack on success, nack on exception
            event = json.loads(message.body)
            print(f"Processing: {event['event_type']}")
            await handle_event(event)

    await queue.consume(process_message)
    print(f"Consuming from {queue_name}, pattern: {routing_pattern}")

# Event handlers in different services

# Notification Service listens for order events
async def handle_event(event: dict):
    if event["event_type"] == "order.created":
        await send_confirmation_email(event["payload"]["user_id"])
    elif event["event_type"] == "order.shipped":
        await send_shipping_notification(event["payload"])

# Start the consumer (in a FastAPI lifespan or separate worker)
import asyncio
asyncio.run(start_consumer("notification-queue", "order.*"))

Kafka — For High-Throughput Systems

# kafka/producer.py — millions of events per second
from confluent_kafka import Producer
import json

producer = Producer({"bootstrap.servers": "kafka-1:9092,kafka-2:9092,kafka-3:9092"})

def delivery_callback(err, msg):
    if err:
        print(f"Delivery failed: {err}")
    else:
        print(f"Delivered to {msg.topic()} partition [{msg.partition()}]")

def publish_to_kafka(topic: str, key: str, value: dict):
    producer.produce(
        topic=topic,
        key=key.encode(),
        value=json.dumps(value).encode(),
        callback=delivery_callback
    )
    producer.poll(0)  # trigger callbacks

# kafka/consumer.py
from confluent_kafka import Consumer

consumer = Consumer({
    "bootstrap.servers": "kafka-1:9092,kafka-2:9092",
    "group.id": "payment-service-group",  # consumers in same group share partitions
    "auto.offset.reset": "earliest",
    "enable.auto.commit": False   # manual commit for exactly-once processing
})

consumer.subscribe(["order.created"])

while True:
    msg = consumer.poll(1.0)  # wait up to 1 second
    if msg and not msg.error():
        event = json.loads(msg.value())
        process_order_event(event)
        consumer.commit()  # commit only after successful processing

Component 8: Metrics (Prometheus + Grafana)

What Is It?

You cannot manage what you cannot measure. The metrics stack collects numerical data about your system's behavior over time and visualizes it so you can detect problems, understand trends, and set alerts.

Services → expose /metrics endpoint (Prometheus format)
          Prometheus scrapes /metrics every 15 seconds
          Stores time-series data
          (e.g., "at 14:32:01, error_rate was 0.02")
          Grafana queries Prometheus
          Beautiful dashboards + alerts

Instrumenting Your Service

# metrics/instrumentation.py
from prometheus_client import Counter, Histogram, Gauge, generate_latest, CONTENT_TYPE_LATEST
from fastapi import FastAPI, Request, Response
import time

# Define metrics
REQUEST_COUNT = Counter(
    "http_requests_total",
    "Total HTTP requests",
    ["method", "endpoint", "status_code", "service"]
)

REQUEST_DURATION = Histogram(
    "http_request_duration_seconds",
    "HTTP request duration in seconds",
    ["method", "endpoint"],
    buckets=[0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0]
)

ACTIVE_CONNECTIONS = Gauge(
    "http_active_connections",
    "Currently active HTTP connections"
)

DB_QUERY_DURATION = Histogram(
    "database_query_duration_seconds",
    "Database query duration",
    ["operation", "table"]
)

CACHE_HITS = Counter("cache_hits_total", "Cache hits", ["cache_type"])
CACHE_MISSES = Counter("cache_misses_total", "Cache misses", ["cache_type"])

# FastAPI middleware to auto-instrument all requests
app = FastAPI()

@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
    ACTIVE_CONNECTIONS.inc()
    start = time.time()

    response = await call_next(request)

    duration = time.time() - start
    endpoint = request.url.path

    REQUEST_COUNT.labels(
        method=request.method,
        endpoint=endpoint,
        status_code=response.status_code,
        service="order-service"
    ).inc()

    REQUEST_DURATION.labels(
        method=request.method,
        endpoint=endpoint
    ).observe(duration)

    ACTIVE_CONNECTIONS.dec()
    return response

@app.get("/metrics")
def metrics():
    """Prometheus scrapes this endpoint."""
    return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST)

# Usage in service code:
async def get_orders_from_db(user_id: str):
    with DB_QUERY_DURATION.labels(operation="select", table="orders").time():
        return await db.execute("SELECT * FROM orders WHERE user_id = $1", user_id)

Prometheus Configuration

# prometheus/prometheus.yml
global:
  scrape_interval: 15s
  evaluation_interval: 15s

rule_files:
  - "alerts.yml"

scrape_configs:
  - job_name: "microservices"
    kubernetes_sd_configs:
    - role: pod
    relabel_configs:
    - source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_scrape]
      action: keep
      regex: "true"
    - source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_path]
      action: replace
      target_label: __metrics_path__
      regex: (.+)

  - job_name: "redis"
    static_configs:
    - targets: ["redis-exporter:9121"]

  - job_name: "postgres"
    static_configs:
    - targets: ["postgres-exporter:9187"]

Alerting Rules

# prometheus/alerts.yml
groups:
- name: microservice_alerts
  rules:
  - alert: HighErrorRate
    expr: |
      rate(http_requests_total{status_code=~"5.."}[5m]) /
      rate(http_requests_total[5m]) > 0.01
    for: 2m
    labels:
      severity: critical
    annotations:
      summary: "High error rate on {{ $labels.service }}"
      description: "Error rate is {{ $value | humanizePercentage }} (threshold: 1%)"

  - alert: SlowResponseTime
    expr: |
      histogram_quantile(0.99, rate(http_request_duration_seconds_bucket[5m])) > 2.0
    for: 5m
    labels:
      severity: warning
    annotations:
      summary: "Slow p99 latency on {{ $labels.service }}"
      description: "p99 latency is {{ $value }}s (threshold: 2s)"

  - alert: ServiceDown
    expr: up == 0
    for: 1m
    labels:
      severity: critical
    annotations:
      summary: "Service {{ $labels.job }} is DOWN"

The RED Method — What to Put on Your Dashboard

Metric Query Alert Threshold
Rate (req/sec) rate(http_requests_total[5m]) Drop to <50% of baseline
Errors (%) rate(5xx_total[5m]) / rate(total[5m]) * 100 > 1%
Duration p99 histogram_quantile(0.99, rate(duration_bucket[5m])) > 2 seconds

Component 9: Centralized Logging (ELK Stack)

What Is It?

When something breaks in production, you need to find out what happened. With 20 microservices each writing logs to their own files, searching for the cause of a failure would be impossible.

The ELK stack (Elasticsearch + Logstash + Kibana) centralizes all logs in one searchable place.

All Services → emit JSON logs to stdout
               Logstash / Fluent Bit     ← collect, parse, enrich logs
               Elasticsearch             ← store and index logs (searchable)
               Kibana                    ← visualize, search, alert

Structured Logging: The Foundation

Logs must be structured JSON to be searchable:

# logging/setup.py
import structlog
import logging
import sys

def configure_logging(service_name: str, version: str):
    """Configure structured JSON logging for the service."""

    structlog.configure(
        processors=[
            structlog.stdlib.filter_by_level,
            structlog.stdlib.add_log_level,
            structlog.stdlib.add_logger_name,
            structlog.processors.TimeStamper(fmt="iso"),
            structlog.processors.StackInfoRenderer(),
            structlog.processors.format_exc_info,
            structlog.processors.JSONRenderer()     # output as JSON!
        ],
        wrapper_class=structlog.stdlib.BoundLogger,
        context_class=dict,
        logger_factory=structlog.stdlib.LoggerFactory(),
    )

log = structlog.get_logger()

# Usage in your services
log.info("order.created",
    order_id="ORD-12345",
    user_id="USR-789",
    amount=299.99,
    currency="USD",
    trace_id="abc123",        # for cross-service correlation
    service=service_name,
    version=version
)

log.error("payment.failed",
    order_id="ORD-12345",
    error_code="card_declined",
    error_message="Insufficient funds",
    trace_id="abc123",
    exc_info=True
)

Output:

{
  "timestamp": "2026-05-18T10:23:41.123Z",
  "level": "error",
  "event": "payment.failed",
  "order_id": "ORD-12345",
  "error_code": "card_declined",
  "error_message": "Insufficient funds",
  "trace_id": "abc123",
  "service": "payment-service",
  "version": "2.3.1"
}

Fluent Bit — Kubernetes Log Collector

# fluent-bit-config.yaml (DaemonSet — runs on every node)
apiVersion: v1
kind: ConfigMap
metadata:
  name: fluent-bit-config
data:
  fluent-bit.conf: |
    [SERVICE]
        Flush         5
        Log_Level     info

    [INPUT]
        Name              tail
        Tag               kube.*
        Path              /var/log/containers/*.log
        Parser            docker
        Mem_Buf_Limit     50MB

    [FILTER]
        Name                kubernetes
        Match               kube.*
        Kube_URL            https://kubernetes.default.svc:443
        Merge_Log           On          # merge JSON logs into structured fields
        K8S-Logging.Parser  On

    [OUTPUT]
        Name  es
        Match *
        Host  elasticsearch-master
        Port  9200
        Index microservices-logs
        Time_Key @timestamp
        Retry_Limit 5

Elasticsearch Index Template

# Configure Elasticsearch for optimal log storage
from elasticsearch import Elasticsearch

es = Elasticsearch(["http://elasticsearch:9200"])

# Index template for all microservice logs
es.indices.put_index_template(
    name="microservices-logs",
    body={
        "index_patterns": ["microservices-logs-*"],
        "template": {
            "settings": {
                "number_of_shards": 3,
                "number_of_replicas": 1,
                "index.lifecycle.name": "logs-policy",  # auto-delete after 30 days
            },
            "mappings": {
                "properties": {
                    "@timestamp":    {"type": "date"},
                    "level":         {"type": "keyword"},
                    "service":       {"type": "keyword"},
                    "trace_id":      {"type": "keyword"},
                    "user_id":       {"type": "keyword"},
                    "order_id":      {"type": "keyword"},
                    "event":         {"type": "text", "fields": {"keyword": {"type": "keyword"}}},
                    "error_message": {"type": "text"},
                }
            }
        }
    }
)

# Search logs by trace_id to follow a request across services
results = es.search(index="microservices-logs-*", body={
    "query": {
        "term": {"trace_id": "abc123"}
    },
    "sort": [{"@timestamp": "asc"}],
    "size": 100
})

for hit in results["hits"]["hits"]:
    source = hit["_source"]
    print(f"[{source['@timestamp']}] [{source['service']}] {source['event']}")

The Complete Architecture: How All 9 Components Work Together

Here's a real request — "User places an order" — flowing through all 9 components:

1. Client sends POST /api/orders with JWT token

2. API GATEWAY (①) receives request
   → Validates JWT with Authorization Server (④)
   → Rate limit check: OK (not exceeded)
   → Routes to Order Service (Service B) via Service Registry (②)

3. ORDER SERVICE (③) receives request
   → Checks Distributed Cache (⑥) for product price: MISS
   → Calls Product Service via Service Registry (②)
   → Product Service queries its Database (⑤), writes to Cache (⑥)
   → Order Service saves order to its own Database (⑤)
   → Publishes "order.created" to Message Queue (⑦)
   → Logs structured event (⑨): {"event": "order.created", "trace_id": "xyz"}
   → Prometheus metrics updated (⑧): request_count++, latency recorded
   → Returns 201 Created to API Gateway → Client

4. Async: Message Queue (⑦) delivers "order.created" to:
   → Notification Service: sends confirmation email
   → Inventory Service: decrements stock
   → Analytics Service: records the sale

5. Grafana dashboard (⑧) shows:
   → +1 order in last 5 minutes
   → Latency p99: 142ms ✓

6. Kibana (⑨) shows the full trace_id "xyz" across all services:
   → 10:23:41.000 [api-gateway]      request received
   → 10:23:41.012 [order-service]    processing started
   → 10:23:41.034 [product-service]  product lookup
   → 10:23:41.089 [order-service]    order saved
   → 10:23:41.102 [order-service]    event published
   → 10:23:41.110 [api-gateway]      response sent (latency: 110ms)

Summary

# Component Purpose 2026 Tool Recommendation
1 API Gateway Single entry point, routing, rate limiting, auth Kong, Traefik, AWS API GW
2 Service Registry Dynamic service discovery Kubernetes DNS (built-in), Consul
3 Service Layer Your business logic, independently deployable FastAPI, Spring Boot, Go Fiber
4 Authorization Server Identity, JWT tokens, RBAC Keycloak, Auth0, AWS Cognito
5 Database Layer Persistent storage with replication PostgreSQL, MongoDB + read replicas
6 Distributed Cache Fast read, session storage, rate limiting Redis Cluster, Valkey
7 Distributed Messaging Async decoupled communication Kafka (high volume), RabbitMQ (flexible)
8 Metrics Measure system health, alert on anomalies Prometheus + Grafana
9 Centralized Logging Search and trace across all services ELK Stack, Loki + Grafana

The most important thing to understand: each component exists to solve a specific problem that appears when you go from one service on one server to many services on many servers. You don't need all 9 from day one — but you will need all of them before you can call your system truly production-ready.

Start with the API Gateway and at least one database. Add the Authorization Server before you handle real users. Add the cache when your database becomes the bottleneck. Add messaging when services need to react to each other's events. Add metrics and logging on day one — you'll regret it deeply if you don't.

Implementation Order for a New System

  1. Week 1: Service Layer + Database (build the thing)
  2. Week 2: API Gateway + Authorization Server (make it secure)
  3. Week 3: Logging + Metrics (make it observable)
  4. Week 4: Cache + Service Registry (make it fast and scalable)
  5. Month 2+: Distributed Messaging (make it resilient and event-driven)

Questions or discussion? Connect on LinkedIn, X or reach out via email.

Discussion

Have thoughts on this post? Share them below — questions, corrections, or your own experience are all welcome.