RAG and LLMOps: How to Build a Production-Grade AI Second Brain¶
You've built a RAG chatbot that works great on your laptop. It answers questions from a handful of PDFs, the responses feel smart, and you're excited. Then you try to make it production-ready — and everything gets complicated.
How do you keep the knowledge base fresh? How do you know when the LLM starts giving bad answers? How do you fine-tune the model on your own data without breaking what already works? How do you monitor 10,000 daily queries for quality degradation?
This is where LLMOps enters the picture.
This post walks through a complete, real-world architecture: a Second Brain AI assistant that combines RAG, fine-tuned LLMs, agentic inference, and a full observability layer — using the same patterns the best ML teams run in production in 2026. We'll trace every numbered step in the system, explain the why behind each component, and show you what the code looks like.
Part 1: What Is LLMOps — and Why Does It Exist?¶
MLOps (Machine Learning Operations) is the discipline of getting ML models into production and keeping them healthy: versioning, pipelines, monitoring, retraining. It solved the "it works on my laptop" problem for classical ML.
LLMOps is MLOps applied specifically to large language models — and it introduces a new set of challenges that classical MLOps tools weren't built for:
| Challenge | Classical MLOps | LLMOps |
|---|---|---|
| Model size | Megabytes to gigabytes | Billions of parameters; tens of gigabytes |
| Data pipeline | Tabular / structured | Unstructured text, PDFs, web pages |
| Evaluation | Accuracy, F1, AUC | Correctness, hallucination rate, citation quality |
| Latency | Milliseconds | Seconds per token stream |
| Knowledge freshness | Retrain on schedule | RAG for live retrieval + periodic fine-tuning |
| Output monitoring | Drift on numeric predictions | Semantic drift, tone, safety violations |
LLMOps is the set of tools and practices that solve these problems. The architecture we'll walk through is a full LLMOps system for a "Second Brain" assistant: a personal AI that knows everything in your Notion workspace and answers questions from it — accurately, with citations, at scale.
Part 2: The Big Picture¶
The architecture has 10 numbered steps across 5 major pipelines:
┌──────────────────────────────────────────────────────────────────┐
│ │
│ PIPELINE 1: Data Collection Raw Notion → S3 (Markdown) │
│ ↓ │
│ PIPELINE 2: ETL + Filtering S3 → MongoDB (quality-gated) │
│ ↓ ↓ │
│ PIPELINE 3: Dataset Generation MongoDB → Instruct dataset │
│ ↓ │
│ PIPELINE 4: Training Dataset → Fine-tuned LLM │
│ ↓ │
│ PIPELINE 5: RAG Feature LLM + Docs → Vector Index │
│ ↓ │
│ PIPELINE 6: Inference User Q → Retriever → Agent → A │
│ ↓ │
│ PIPELINE 7: Observability Traces → Monitoring + Eval │
│ │
└──────────────────────────────────────────────────────────────────┘
Tools: ZenML (pipelines), MongoDB (docs), opik (observability), unsloth (fine-tuning)
Let's trace every step.
Part 3: The Data Layer (Steps 1–3)¶
Step 1 — Data Collection Pipeline: Notion → S3¶
Everything starts with raw data. In this system, the knowledge source is Notion — a common choice for personal wikis, company documentation, and research notes.
A Data Collection Pipeline extracts all Notion pages and converts them to Markdown. Markdown is the universal format for this stack: LLMs read and write it fluently, it's lightweight, and it's trivially stored as plain text files in S3.
# Pseudo-code: collect and store Notion pages
import boto3
from notion_client import Client
notion = Client(auth=NOTION_TOKEN)
s3 = boto3.client("s3")
def collect_notion_pages(database_id: str) -> None:
pages = notion.databases.query(database_id=database_id)["results"]
for page in pages:
content = export_page_as_markdown(page["id"]) # Notion → Markdown
s3.put_object(
Bucket="my-second-brain",
Key=f"notion/{page['id']}.md",
Body=content.encode("utf-8"),
)
The S3 bucket is your raw data lake: a snapshot of everything, untouched, before any transformation.
Step 2 — ETL Pipeline: S3 → MongoDB¶
The ETL (Extract, Transform, Load) pipeline reads Markdown files from S3 and loads them into MongoDB. Why MongoDB? Because it stores documents as flexible JSON-like objects, making it easy to attach metadata (author, last updated, source URL, crawled resources) alongside the raw content.
This pipeline also ingests crawled external resources — articles, documentation pages, web content that supplements the Notion notes.
# ETL: load a Markdown file into MongoDB with metadata
from pymongo import MongoClient
from datetime import datetime
db = MongoClient(MONGO_URI)["second_brain"]
def load_to_mongo(page_id: str, content: str, metadata: dict) -> None:
db.documents.update_one(
{"page_id": page_id},
{"$set": {
"content": content,
"metadata": metadata,
"source": "notion",
"loaded_at": datetime.utcnow(),
"quality_score": None, # filled by the filter step
}},
upsert=True,
)
Step 3 — Quality Filtering: Two-Stage Gate¶
Not all Notion pages are worth including. A personal workspace has stubs, half-finished notes, and low-signal content. Feeding noise into a RAG system degrades answer quality directly.
This architecture applies two quality filters:
MongoDB (all documents)
↓
Filter 1: Medium-to-high quality
(removes obvious low-quality: <200 words, no headers, mostly bullet fragments)
↓
Medium/High quality docs → RAG Feature Pipeline (used for embeddings)
↓
Filter 2: High quality only
(stricter: well-structured, >500 words, factually dense)
↓
High quality docs → Dataset Generation Pipeline (used for fine-tuning)
The key insight: fine-tuning requires cleaner data than RAG. For RAG, a mediocre document retrieved less often is acceptable. For fine-tuning, a poorly written training example corrupts the model's behavior directly.
A practical quality scoring function:
def score_document(content: str) -> float:
score = 0.0
words = len(content.split())
if words > 200: score += 0.2
if words > 500: score += 0.2
if "## " in content: score += 0.2 # has section headers
if content.count("\n") > 10: score += 0.2 # well-structured
if not content.startswith("TODO"): score += 0.2 # not a stub
return score # 0.0 to 1.0
# threshold: >= 0.4 → medium quality (RAG)
# >= 0.8 → high quality (fine-tuning)
Part 4: The Fine-Tuning Pipeline (Steps 4–5)¶
Step 4 — Dataset Generation: High-Quality Docs → Instruct Dataset¶
Fine-tuning requires instruction-response pairs, not raw documents. The Dataset Generation pipeline transforms high-quality documents into a Summarization Instruct Dataset — examples of the format:
{
"instruction": "Summarize the following Notion page about Transformer architecture.",
"input": "## Transformers\nTransformers use self-attention...",
"output": "Transformers are a neural network architecture that replaces RNNs with self-attention, allowing parallel processing of sequences and capturing long-range dependencies efficiently."
}
This dataset is stored in a Data Registry — a versioned, auditable store of training datasets. Every model you train is traceable back to the exact dataset version it used.
Step 5 — Training Pipeline: Fine-Tuning the Summarization LLM¶
The Training Pipeline takes the instruct dataset and fine-tunes an LLM specifically for summarization — the task of condensing a long Notion page into a crisp, accurate summary.
In 2026 this is done with LoRA + unsloth, which makes fine-tuning dramatically cheaper and faster than full-parameter training:
from unsloth import FastLanguageModel
from trl import SFTTrainer
from transformers import TrainingArguments
# Load base model with 4-bit quantization
model, tokenizer = FastLanguageModel.from_pretrained(
model_name="unsloth/llama-3-8b-bnb-4bit",
max_seq_length=2048,
load_in_4bit=True,
)
# Inject LoRA adapters (trains only ~1% of parameters)
model = FastLanguageModel.get_peft_model(
model,
r=16, # LoRA rank
target_modules=["q_proj", "v_proj"],
lora_alpha=16,
lora_dropout=0.05,
)
trainer = SFTTrainer(
model=model,
tokenizer=tokenizer,
train_dataset=summarization_dataset,
dataset_text_field="text",
args=TrainingArguments(
output_dir="./summarization-llm",
num_train_epochs=3,
per_device_train_batch_size=4,
learning_rate=2e-4,
),
)
trainer.train()
The resulting fine-tuned model is pushed to a Model Registry — a versioned store analogous to the Data Registry, where every model artifact is tracked alongside its training run metadata.
Part 5: The RAG Feature Pipeline (Steps 6–8)¶
This is the heart of the system. The RAG Feature Pipeline builds the searchable knowledge base that the inference layer queries at runtime.
Step 6 — Load the Fine-Tuned LLM¶
The pipeline loads the summarization LLM from the Model Registry. This model's job here is not to answer questions — it's to generate summaries of each document before embedding it.
Why summarize before embedding? Because a 3,000-word Notion page, when embedded as-is, produces a dense vector that's hard to match precisely against a short user query. A crisp 150-word summary embeds to a vector that's much closer to the user's intent.
Step 7 — Ingest Medium-to-High Quality Documents¶
The pipeline reads the filtered documents from MongoDB. For each document, it generates: - Doc chunk: a chunked segment of the raw document - Summary: the LLM-generated summary of that chunk
Both are then embedded.
Step 8 — Embed and Index¶
from sentence_transformers import SentenceTransformer
import numpy as np
embed_model = SentenceTransformer("BAAI/bge-large-en-v1.5")
def embed_and_index(doc_id: str, content: str, summary: str) -> None:
# Embed both raw chunk and its summary
doc_vector = embed_model.encode(content)
summary_vector = embed_model.encode(summary)
# Store in Vector Index (e.g., Qdrant, Weaviate, pgvector)
vector_store.upsert([
{
"id": f"{doc_id}_doc",
"vector": doc_vector.tolist(),
"payload": {"type": "doc", "content": content, "doc_id": doc_id}
},
{
"id": f"{doc_id}_summary",
"vector": summary_vector.tolist(),
"payload": {"type": "summary", "content": summary, "doc_id": doc_id}
},
])
Storing both the raw chunk and its summary gives the retriever two angles to match against a query: the specific wording of the original text, and the semantic meaning of the compressed summary.
The result is a Vector Index: a database of embeddings where every document is represented as a point in high-dimensional space, retrievable by semantic similarity to any incoming query.
Part 6: The Inference Pipeline (Step 9)¶
This is what the user actually interacts with.
User Question (from UI)
↓
Inference Pipeline
┌────────────────────────────────────┐
│ Agentic Layer │
│ ┌──────────────┐ ┌────────────┐ │
│ │ Retriever │ │Summarization│ │
│ │ Tool │ │Tool │ │
│ └──────┬───────┘ └─────┬──────┘ │
│ │ │ │
│ └───── Agent ────┘ │
└─────────────────┬─────────────────┘
↓
Answer (to UI)
The Agentic Layer¶
The inference pipeline doesn't just run a single retrieval + generate cycle. An Agentic Layer orchestrates two tools:
Retriever Tool — takes the user's question, embeds it, queries the Vector Index, and returns the top-K most relevant chunks.
def retriever_tool(question: str, top_k: int = 5) -> list[dict]:
query_vector = embed_model.encode(question)
results = vector_store.search(
query_vector=query_vector.tolist(),
limit=top_k,
)
return [{"content": r.payload["content"], "score": r.score} for r in results]
Summarization Tool — if retrieved chunks are too long for the context window, calls the fine-tuned summarization LLM to compress them before passing to the answer LLM.
def summarization_tool(text: str) -> str:
prompt = f"Summarize the following text concisely:\n\n{text}"
return summarization_llm.generate(prompt, max_new_tokens=200)
The agent decides when to use each tool using a ReACT-style loop: retrieve first, summarize if needed, then generate the final answer with citations.
def answer_question(question: str) -> str:
# Step 1: Retrieve relevant chunks
chunks = retriever_tool(question, top_k=5)
# Step 2: Summarize if total length exceeds context budget
context_parts = []
for chunk in chunks:
text = chunk["content"]
if len(text.split()) > 300:
text = summarization_tool(text)
context_parts.append(text)
context = "\n\n---\n\n".join(context_parts)
# Step 3: Generate final answer grounded in retrieved context
prompt = f"""Answer the question using only the context below.
If the answer isn't in the context, say so.
Context:
{context}
Question: {question}
Answer:"""
return answer_llm.generate(prompt)
Part 7: The Observability Pipeline (Step 10)¶
This is the component most tutorials skip — and the one that separates toy systems from production systems.
The Observability Pipeline captures every inference interaction as a trace:
Every question + answer pair → Prompt Traces → Observability store (opik)
↓
┌──────────────────────┐
│ Prompt Monitoring │ Are prompts drifting?
│ LLM Evaluation │ Is answer quality dropping?
└──────────────────────┘
Prompt Monitoring¶
Tracks prompt-level patterns over time: - Are certain question types being asked more often? (indicates knowledge gaps) - Are prompts getting longer? (could indicate users aren't getting first answers) - Are there recurring queries the RAG system answers poorly?
LLM Evaluation¶
Automated evaluation runs on sampled outputs, scoring for:
| Metric | What It Measures |
|---|---|
| Faithfulness | Does the answer only use facts from retrieved context? |
| Answer relevance | Does the answer actually address the question asked? |
| Context recall | Did retrieval surface the right documents? |
| Hallucination rate | Does the answer contain claims not in any retrieved chunk? |
# Using opik for observability
import opik
@opik.track
def answer_question_traced(question: str) -> str:
with opik.trace(name="second_brain_inference") as trace:
chunks = retriever_tool(question)
trace.log({"retrieved_chunks": len(chunks), "question": question})
answer = generate_answer(question, chunks)
trace.log({"answer": answer})
# Automated faithfulness score
score = evaluate_faithfulness(answer, chunks)
trace.log({"faithfulness_score": score})
return answer
When faithfulness scores drop below a threshold, the system alerts — prompting a review of the RAG pipeline, the vector index freshness, or the fine-tuned model's behavior.
Part 8: How the Pipelines Are Orchestrated with ZenML¶
ZenML is the glue that holds all these pipelines together. It provides:
- Pipeline versioning — every pipeline run is tracked with its inputs, outputs, and parameters
- Step caching — if the input data hasn't changed, skip recomputing expensive steps
- Artifact tracking — datasets, models, and embeddings are versioned and linked to the runs that produced them
- Reproducibility — any past pipeline run can be exactly reproduced
from zenml import pipeline, step
@step
def collect_notion_data() -> list[str]:
... # returns list of S3 paths
@step
def filter_documents(paths: list[str]) -> list[str]:
... # returns quality-filtered document paths
@step
def generate_embeddings(doc_paths: list[str]) -> None:
... # embeds and upserts to Vector Index
@pipeline
def rag_feature_pipeline():
paths = collect_notion_data()
filtered = filter_documents(paths)
generate_embeddings(filtered)
# Run it — every artifact is versioned automatically
rag_feature_pipeline()
ZenML means you can re-run the entire pipeline from any step when data changes, without re-doing work that hasn't changed.
Part 9: The Full Stack at a Glance¶
| Component | Tool | Role |
|---|---|---|
| Knowledge source | Notion | Where raw content lives |
| Raw storage | AWS S3 | Immutable data lake of Markdown files |
| Document store | MongoDB | Structured storage with metadata and quality scores |
| Pipeline orchestration | ZenML | Versioned, reproducible pipeline runs |
| Fine-tuning | unsloth + LoRA | Efficient training of the summarization LLM |
| Model registry | ZenML / MLflow | Versioned model artifacts |
| Embeddings | bge-large / text-embedding-3 | Semantic vector representations |
| Vector index | Qdrant / pgvector | Fast approximate nearest-neighbor search |
| Agent framework | LangChain / custom | Orchestrates retriever + summarization tools |
| Observability | opik | Traces, prompt monitoring, LLM evaluation |
Summary¶
Building a production-grade AI assistant is far more than prompt engineering. It's a system with seven interlocking pipelines: data collection, ETL and quality filtering, dataset generation, fine-tuning, RAG feature indexing, agentic inference, and continuous observability.
LLMOps is the discipline that makes all seven pipelines work together reliably. It borrows from classical MLOps — versioning, reproducibility, monitoring — and extends it for the unique challenges of large language models: unstructured data, semantic quality metrics, hallucination detection, and embedding freshness.
The key architectural decisions that separate this system from a weekend project:
- Two-stage quality filtering — fine-tuning needs cleaner data than RAG; separate the gates.
- Summary + raw chunk dual embeddings — improves retrieval recall by matching both surface form and semantic meaning.
- A fine-tuned summarization LLM — domain-specific summarization consistently outperforms a generic model on documents from your knowledge base.
- An agentic inference layer — the retriever and summarizer as composable tools gives the system flexibility a rigid pipeline doesn't have.
- Observability from day one — faithfulness and relevance scores catch quality degradation before users do.
In 2026, every one of these components has mature tooling: ZenML for pipelines, MongoDB for documents, unsloth for affordable fine-tuning, and opik for observability. The architecture in this post is not aspirational — it's what production Second Brain systems look like today.
Want to see a complete working repo for this system? Drop a comment below — a full GitHub implementation is in progress.
Questions or discussion? Connect on LinkedIn, X or reach out via email.
Discussion
Have thoughts on this post? Share them below — questions, corrections, or your own experience are all welcome.