Every LLM has a context window. GPT-4o: 128K tokens. Claude 3.7: 200K tokens. Gemini 1.5 Pro: 1M tokens. These numbers feel large until you try to reason over a 50,000-line codebase, a 400-page legal document, a week of multi-agent conversation logs, or a real-time data stream that never ends.
The standard response to this problem has been "just use RAG." Chunk your documents, embed them, retrieve the top-K chunks at query time, and hope the right information lands in the window. This works for simple Q&A. It fails catastrophically for tasks that require sustained coherence across a long reasoning horizon — code refactoring, multi-step analysis, sequential decision-making, anything where context from step 12 is critical for step 47.
There is a better mental model — one that operating systems engineers solved 60 years ago.
This article proposes treating the LLM context window not as a fixed container you fill to the brim, but as an L1 cache in a four-layer memory hierarchy. We borrow the full apparatus of virtual memory — paging, swap space, page tables, page fault interrupts, LRU eviction, and lazy loading — and implement it as a production-grade Python system for agentic context management.
Warning: This is production-grade, advanced material. The implementations here are complete and deployable. You will need: Python 3.11+, Redis, a vector database (Chroma or Qdrant), and an OpenAI API key.
The Problem With Static Context Windows
Consider this real scenario: you ask an agentic system to audit a 300-file Python repository for security vulnerabilities. The agent must:
- Read each file, understand its purpose, and identify suspicious patterns.
- Cross-reference findings across files (a vulnerability in
auth.pymight only be exploitable viaapi_router.py). - Maintain a running model of the entire codebase's security posture.
- Produce a prioritized report that synthesizes everything.
With a 128K context window, you can fit roughly 50 average-sized Python files at once. After that, you have to choose: which files stay in context and which get dropped? Any static selection strategy — first-N, last-N, sampled — guarantees you will lose critical cross-file relationships.
RAG alone does not solve this. RAG is stateless: each retrieval is independent. It cannot maintain a coherent reasoning thread that spans hundreds of steps. It cannot say "remember that function signature I saw in file 47 — does the call in file 203 violate its contract?"
What you need is a memory hierarchy that makes the entire repository feel like it fits in the context window — while physically only keeping the currently-relevant subset in the LLM's attention.
The Core Thesis: The Context Window Is a Cache
In operating systems, physical RAM is limited. Programs address a virtual address space that is orders of magnitude larger. The OS maintains a page table that maps virtual addresses to physical locations — either RAM or disk (swap). When a program accesses a virtual address not currently in RAM, the CPU triggers a page fault. The OS handles it silently: load the required page from swap into RAM, evict a less-used page to make room, and resume execution.
The program never knows it was waiting. From its perspective, it has infinite memory.
We apply exactly this model to LLM context management:
| OS Concept | LLM Context Equivalent |
|---|---|
| Physical RAM | Context window (hard limit) |
| Virtual Address Space | Full document corpus / conversation history |
| Page | Fixed-size token chunk (e.g., 2,000 tokens) |
| Page Table | Index mapping chunk IDs to storage locations |
| L1/L2 Cache | Active context + hot Redis buffer |
| Swap Space | Vector database (semantic retrieval) |
| Disk | Raw object storage (S3 / local filesystem) |
| Page Fault | Agent detecting missing information |
| Page Fault Handler | Context pager that loads + evicts chunks |
| LRU Eviction | Attention-weighted or recency-based eviction |
| Dirty Page Write-Back | Compression of evicted pages before archival |
Architecture: The AI Memory Hierarchy
The system is composed of four layers and a paging controller that moves data between them.
Project Setup
pip install langchain langchain-openai openai chromadb redis tiktokenCreate the project structure:
virtual_memory_llm/
├── main.py
├── page_table.py ← maps chunk IDs to storage locations
├── context_pager.py ← the page fault handler + eviction engine
├── memory_layers.py ← L1/L2/L3/L4 implementations
├── compressor.py ← abstractive compression of evicted pages
├── context_serializer.py ← agent-to-agent context protocol
└── agent.py ← the orchestrating agent using the hierarchyStep 1: The Page Table
The page table is the central directory of the entire system. Every chunk of every document gets a unique ID, a storage tier, and metadata.
import json
import time
import uuid
from dataclasses import dataclass, asdict, field
from enum import Enum
from pathlib import Path
from typing import Optional
PAGE_TABLE_PATH = Path(__file__).parent / "page_table.json"
PAGE_SIZE_TOKENS = 2000
class StorageTier(str, Enum):
L1_ACTIVE = "L1"
L2_RAM = "L2"
L3_SWAP = "L3"
L4_DISK = "L4"
@dataclass
class PageEntry:
page_id: str
doc_id: str
chunk_index: int
token_count: int
tier: StorageTier
last_accessed: float
access_count: int
compressed: bool
summary: Optional[str] = None
embedding_id: Optional[str] = None
storage_path: Optional[str] = None
tags: list[str] = field(default_factory=list)
class PageTable:
def __init__(self, path: Path = PAGE_TABLE_PATH):
self.path = path
self._entries: dict[str, PageEntry] = {}
self._load()
def register(
self,
doc_id: str,
chunk_index: int,
token_count: int,
tier: StorageTier = StorageTier.L4_DISK,
storage_path: Optional[str] = None,
tags: Optional[list[str]] = None,
) -> str:
page_id = f"{doc_id}::chunk_{chunk_index:04d}"
entry = PageEntry(
page_id=page_id,
doc_id=doc_id,
chunk_index=chunk_index,
token_count=token_count,
tier=tier,
last_accessed=time.time(),
access_count=0,
compressed=False,
storage_path=storage_path,
tags=tags or [],
)
self._entries[page_id] = entry
self._save()
return page_id
def get(self, page_id: str) -> Optional[PageEntry]:
return self._entries.get(page_id)
def touch(self, page_id: str):
entry = self._entries.get(page_id)
if entry:
entry.last_accessed = time.time()
entry.access_count += 1
self._save()
def update_tier(self, page_id: str, tier: StorageTier):
entry = self._entries.get(page_id)
if entry:
entry.tier = tier
self._save()
def set_summary(self, page_id: str, summary: str):
entry = self._entries.get(page_id)
if entry:
entry.summary = summary
entry.compressed = True
self._save()
def set_embedding_id(self, page_id: str, embedding_id: str):
entry = self._entries.get(page_id)
if entry:
entry.embedding_id = embedding_id
self._save()
def pages_in_tier(self, tier: StorageTier) -> list[PageEntry]:
return [e for e in self._entries.values() if e.tier == tier]
def lru_candidates(self, tier: StorageTier, n: int) -> list[PageEntry]:
pages = self.pages_in_tier(tier)
return sorted(pages, key=lambda e: e.last_accessed)[:n]
def all_pages_for_doc(self, doc_id: str) -> list[PageEntry]:
return sorted(
[e for e in self._entries.values() if e.doc_id == doc_id],
key=lambda e: e.chunk_index,
)
def _load(self):
if self.path.exists():
raw = json.loads(self.path.read_text())
for page_id, data in raw.items():
data["tier"] = StorageTier(data["tier"])
self._entries[page_id] = PageEntry(**data)
def _save(self):
self.path.parent.mkdir(parents=True, exist_ok=True)
serialized = {}
for page_id, entry in self._entries.items():
d = asdict(entry)
d["tier"] = entry.tier.value
serialized[page_id] = d
self.path.write_text(json.dumps(serialized, indent=2))
page_table = PageTable()Step 2: The Memory Layers
Each layer has a concrete implementation. L1 is in-process (a Python dict). L2 uses Redis. L3 uses ChromaDB. L4 reads from disk or object storage.
import json
import time
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Optional
import redis
import chromadb
from chromadb.utils import embedding_functions
from page_table import page_table, StorageTier, PAGE_SIZE_TOKENS
L4_STORAGE_DIR = Path(__file__).parent / "storage" / "raw"
L4_STORAGE_DIR.mkdir(parents=True, exist_ok=True)
class MemoryLayer(ABC):
@abstractmethod
def read(self, page_id: str) -> Optional[str]:
pass
@abstractmethod
def write(self, page_id: str, content: str) -> None:
pass
@abstractmethod
def evict(self, page_id: str) -> Optional[str]:
pass
class L1ActiveContext(MemoryLayer):
"""In-process dict simulating the active context window."""
MAX_TOKENS = 6000 # reserve some headroom from the full window
def __init__(self):
self._store: dict[str, str] = {}
self._token_count = 0
def read(self, page_id: str) -> Optional[str]:
content = self._store.get(page_id)
if content:
page_table.touch(page_id)
return content
def write(self, page_id: str, content: str) -> None:
entry = page_table.get(page_id)
tokens = entry.token_count if entry else len(content) // 4
self._store[page_id] = content
self._token_count += tokens
page_table.update_tier(page_id, StorageTier.L1_ACTIVE)
print(f"[L1] Loaded: {page_id} ({tokens} tokens) | Total: {self._token_count}")
def evict(self, page_id: str) -> Optional[str]:
content = self._store.pop(page_id, None)
if content:
entry = page_table.get(page_id)
tokens = entry.token_count if entry else len(content) // 4
self._token_count -= tokens
print(f"[L1] Evicted: {page_id} | Remaining: {self._token_count}")
return content
def has_capacity(self, tokens_needed: int) -> bool:
return self._token_count + tokens_needed <= self.MAX_TOKENS
def current_tokens(self) -> int:
return self._token_count
def all_page_ids(self) -> list[str]:
return list(self._store.keys())
def build_prompt_context(self) -> str:
"""Concatenate all active pages into a single context string."""
parts = []
for page_id, content in self._store.items():
entry = page_table.get(page_id)
header = f"=== [{page_id}] ==="
if entry and entry.summary:
parts.append(f"{header}\n[COMPRESSED SUMMARY]\n{entry.summary}")
else:
parts.append(f"{header}\n{content}")
return "\n\n".join(parts)
class L2RedisRAM(MemoryLayer):
"""Redis-backed short-term memory with TTL eviction."""
TTL_SECONDS = 3600 # 1 hour
def __init__(self, host: str = "localhost", port: int = 6379, db: int = 0):
try:
self._client = redis.Redis(host=host, port=port, db=db, decode_responses=True)
self._client.ping()
self._available = True
print("[L2] Redis connected.")
except redis.ConnectionError:
self._available = False
self._fallback: dict[str, str] = {}
print("[L2] Redis unavailable. Using in-memory fallback.")
def read(self, page_id: str) -> Optional[str]:
if self._available:
raw = self._client.get(f"vmllm:{page_id}")
if raw:
self._client.expire(f"vmllm:{page_id}", self.TTL_SECONDS)
page_table.touch(page_id)
return raw
return None
return self._fallback.get(page_id)
def write(self, page_id: str, content: str) -> None:
if self._available:
self._client.setex(f"vmllm:{page_id}", self.TTL_SECONDS, content)
else:
self._fallback[page_id] = content
page_table.update_tier(page_id, StorageTier.L2_RAM)
print(f"[L2] Stored: {page_id}")
def evict(self, page_id: str) -> Optional[str]:
if self._available:
content = self._client.get(f"vmllm:{page_id}")
self._client.delete(f"vmllm:{page_id}")
return content
return self._fallback.pop(page_id, None)
class L3VectorSwap(MemoryLayer):
"""ChromaDB-backed swap space with semantic retrieval."""
COLLECTION_NAME = "vmllm_swap"
def __init__(self):
self._client = chromadb.Client()
self._ef = embedding_functions.DefaultEmbeddingFunction()
self._collection = self._client.get_or_create_collection(
name=self.COLLECTION_NAME,
embedding_function=self._ef,
)
print("[L3] ChromaDB vector swap initialized.")
def read(self, page_id: str) -> Optional[str]:
try:
results = self._collection.get(ids=[page_id], include=["documents"])
docs = results.get("documents", [])
if docs and docs[0]:
page_table.touch(page_id)
return docs[0]
return None
except Exception:
return None
def write(self, page_id: str, content: str) -> None:
entry = page_table.get(page_id)
metadata = {
"doc_id": entry.doc_id if entry else "",
"chunk_index": entry.chunk_index if entry else 0,
"stored_at": time.time(),
}
self._collection.upsert(
ids=[page_id],
documents=[content],
metadatas=[metadata],
)
if entry:
page_table.set_embedding_id(page_id, page_id)
page_table.update_tier(page_id, StorageTier.L3_SWAP)
print(f"[L3] Embedded & stored: {page_id}")
def evict(self, page_id: str) -> Optional[str]:
content = self.read(page_id)
if content:
self._collection.delete(ids=[page_id])
return content
def semantic_search(self, query: str, n_results: int = 5) -> list[tuple[str, str, float]]:
"""Return (page_id, content, distance) sorted by relevance."""
results = self._collection.query(
query_texts=[query],
n_results=min(n_results, self._collection.count()),
include=["documents", "distances"],
)
if not results["ids"] or not results["ids"][0]:
return []
return [
(pid, doc, dist)
for pid, doc, dist in zip(
results["ids"][0],
results["documents"][0],
results["distances"][0],
)
]
class L4DiskStorage(MemoryLayer):
"""Local filesystem storage for raw source documents."""
def __init__(self, base_dir: Path = L4_STORAGE_DIR):
self.base_dir = base_dir
def read(self, page_id: str) -> Optional[str]:
safe_id = page_id.replace("::", "__")
path = self.base_dir / f"{safe_id}.txt"
if path.exists():
page_table.touch(page_id)
return path.read_text(encoding="utf-8")
return None
def write(self, page_id: str, content: str) -> None:
safe_id = page_id.replace("::", "__")
path = self.base_dir / f"{safe_id}.txt"
path.write_text(content, encoding="utf-8")
page_table.update_tier(page_id, StorageTier.L4_DISK)
print(f"[L4] Persisted to disk: {page_id}")
def evict(self, page_id: str) -> Optional[str]:
content = self.read(page_id)
return content # L4 never truly evicts — it is the source of truth
def ingest_document(self, doc_id: str, text: str) -> list[str]:
"""Chunk a document into pages and store all in L4. Returns list of page_ids."""
import tiktoken
enc = tiktoken.get_encoding("cl100k_base")
tokens = enc.encode(text)
page_ids = []
chunk_size = PAGE_SIZE_TOKENS
for i, start in enumerate(range(0, len(tokens), chunk_size)):
chunk_tokens = tokens[start:start + chunk_size]
chunk_text = enc.decode(chunk_tokens)
page_id = page_table.register(
doc_id=doc_id,
chunk_index=i,
token_count=len(chunk_tokens),
tier=StorageTier.L4_DISK,
)
self.write(page_id, chunk_text)
page_ids.append(page_id)
print(f"[L4] Ingested '{doc_id}' → {len(page_ids)} pages")
return page_idsStep 3: The Compressor (Abstractive Page Compression)
When a page is evicted from L1, we don't just discard it. A small, fast model compresses it into a semantic summary — preserving the key facts, decisions, and relationships at a fraction of the token cost. This compressed summary can re-enter L1 later instead of the full page, saving precious context space.
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
COMPRESSION_PROMPT = ChatPromptTemplate.from_messages([
("system", """You are a context compression engine. Your job is to compress a chunk of text
into a dense, lossless summary that preserves ALL semantically critical information.
Rules:
- Preserve all named entities, numerical values, function names, variable names, and decisions.
- Use structured notation (bullets, key:value pairs) where appropriate.
- Target compression ratio: ~10:1 (2000 tokens → 200 tokens).
- Do NOT add interpretation or commentary. Only compress.
- Use the format: [SUMMARY] followed by bullet points of key facts.
"""),
("human", """Compress this context page:
Page ID: {page_id}
Content:
{content}
""")
])
class PageCompressor:
def __init__(self, model: str = "gpt-4o-mini"):
self.llm = ChatOpenAI(model=model, temperature=0)
self.chain = COMPRESSION_PROMPT | self.llm
def compress(self, page_id: str, content: str) -> str:
print(f"[Compressor] Compressing: {page_id} ({len(content)} chars)...")
response = self.chain.invoke({
"page_id": page_id,
"content": content[:8000], # safety cap at ~2K tokens input
})
summary = response.content.strip()
print(f"[Compressor] {page_id}: {len(content)} → {len(summary)} chars")
return summaryStep 4: The Context Pager
This is the core of the system — the equivalent of the OS page fault handler. It manages all movement between tiers: swap-in (load to L1), swap-out (evict from L1), and the page fault interrupt pattern.
from page_table import page_table, StorageTier
from memory_layers import L1ActiveContext, L2RedisRAM, L3VectorSwap, L4DiskStorage
from compressor import PageCompressor
class PageFaultError(Exception):
pass
class ContextPager:
def __init__(self):
self.l1 = L1ActiveContext()
self.l2 = L2RedisRAM()
self.l3 = L3VectorSwap()
self.l4 = L4DiskStorage()
self.compressor = PageCompressor()
def request_page(self, page_id: str) -> str:
"""
Core paging operation. Ensures page_id is in L1 and returns its content.
Triggers swap-in from lower tiers if needed. Evicts LRU pages to make room.
"""
# Check L1 first (fast path)
content = self.l1.read(page_id)
if content:
return content
# Page fault — locate the page in lower tiers
entry = page_table.get(page_id)
if not entry:
raise PageFaultError(f"Page '{page_id}' not registered in page table")
print(f"[Pager] Page fault on '{page_id}' (currently in {entry.tier})")
# Load from wherever it lives
content = self._load_from_tier(page_id, entry.tier)
if not content:
raise PageFaultError(f"Page '{page_id}' not found in any storage tier")
# Ensure L1 has capacity — evict LRU pages if needed
tokens_needed = entry.token_count
self._ensure_capacity(tokens_needed)
# Load into L1
self.l1.write(page_id, content)
return content
def release_page(self, page_id: str, compress: bool = True):
"""
Evict a page from L1. Compress it, store in L2 and L3.
"""
content = self.l1.evict(page_id)
if not content:
return
# Compress before archiving
if compress:
summary = self.compressor.compress(page_id, content)
page_table.set_summary(page_id, summary)
# Store compressed summary in L2 for fast re-access
self.l2.write(page_id, summary)
else:
self.l2.write(page_id, content)
# Store full content in L3 (vector DB for semantic retrieval)
self.l3.write(page_id, content)
def semantic_find(self, query: str, n: int = 5) -> list[str]:
"""
Find the most semantically relevant page IDs for a query.
Searches L3 swap space. Returns page_ids sorted by relevance.
"""
results = self.l3.semantic_search(query, n_results=n)
return [page_id for page_id, _, _ in results]
def preload_document(self, doc_id: str, text: str) -> list[str]:
"""Ingest a document into L4 and register all pages. No L1/L2/L3 loading yet."""
return self.l4.ingest_document(doc_id, text)
def build_context(self) -> str:
"""Return the full current L1 content formatted as a prompt context."""
return self.l1.build_prompt_context()
def context_token_count(self) -> int:
return self.l1.current_tokens()
def active_pages(self) -> list[str]:
return self.l1.all_page_ids()
def _load_from_tier(self, page_id: str, tier: StorageTier) -> str | None:
if tier == StorageTier.L2_RAM:
return self.l2.read(page_id)
if tier == StorageTier.L3_SWAP:
# Try L3 first; also check L2 for compressed summary
content = self.l3.read(page_id)
if not content:
content = self.l2.read(page_id)
return content
if tier == StorageTier.L4_DISK:
content = self.l4.read(page_id)
if content:
# Promote: write to L3 for future semantic retrieval
self.l3.write(page_id, content)
return content
return None
def _ensure_capacity(self, tokens_needed: int):
"""Evict LRU pages from L1 until there is enough capacity."""
while not self.l1.has_capacity(tokens_needed):
lru = page_table.lru_candidates(StorageTier.L1_ACTIVE, n=1)
if not lru:
break
victim = lru[0]
print(f"[Pager] Evicting LRU page: {victim.page_id}")
self.release_page(victim.page_id, compress=True)Step 5: Context Serialization (Agent-to-Agent Protocol)
When one agent hands off work to another, it should not dump its entire context window. Instead, it serializes a compact state transfer object — the current task state plus pointers to the relevant pages in swap. The receiving agent can then lazy-load only what it needs.
import json
import time
import uuid
from dataclasses import dataclass, asdict, field
from typing import Any
from page_table import page_table, StorageTier
@dataclass
class ContextSnapshot:
snapshot_id: str
created_at: float
task_state: dict[str, Any]
active_page_ids: list[str]
swap_page_ids: list[str]
compressed_summaries: dict[str, str]
metadata: dict[str, Any] = field(default_factory=dict)
class ContextSerializer:
def serialize(
self,
task_state: dict,
active_page_ids: list[str],
metadata: dict | None = None,
) -> ContextSnapshot:
"""
Create a transferable snapshot.
Includes: task state, active page pointers, compressed summaries of swap pages.
Does NOT include raw page content — receiver lazy-loads what it needs.
"""
# Collect compressed summaries for currently active pages
summaries = {}
for page_id in active_page_ids:
entry = page_table.get(page_id)
if entry and entry.summary:
summaries[page_id] = entry.summary
# Identify all swap-space pages related to the same documents
doc_ids = set()
for page_id in active_page_ids:
entry = page_table.get(page_id)
if entry:
doc_ids.add(entry.doc_id)
swap_pages = []
for doc_id in doc_ids:
for entry in page_table.all_pages_for_doc(doc_id):
if entry.tier in (StorageTier.L2_RAM, StorageTier.L3_SWAP):
swap_pages.append(entry.page_id)
snapshot = ContextSnapshot(
snapshot_id=str(uuid.uuid4()),
created_at=time.time(),
task_state=task_state,
active_page_ids=active_page_ids,
swap_page_ids=swap_pages,
compressed_summaries=summaries,
metadata=metadata or {},
)
print(
f"[Serializer] Snapshot {snapshot.snapshot_id[:8]}... | "
f"{len(active_page_ids)} active pages, "
f"{len(swap_pages)} swap pointers, "
f"{len(summaries)} summaries"
)
return snapshot
def to_json(self, snapshot: ContextSnapshot) -> str:
return json.dumps(asdict(snapshot), indent=2)
def from_json(self, raw: str) -> ContextSnapshot:
data = json.loads(raw)
return ContextSnapshot(**data)
def estimate_tokens(self, snapshot: ContextSnapshot) -> int:
"""Rough estimate of tokens in the serialized snapshot."""
summaries_text = " ".join(snapshot.compressed_summaries.values())
state_text = json.dumps(snapshot.task_state)
return len(summaries_text + state_text) // 4Step 6: The Agent
With the hierarchy in place, the agent is surprisingly clean. It interacts with the pager to resolve page faults, builds its context on demand, and uses semantic search to fetch relevant pages when it detects a knowledge gap.
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.messages import HumanMessage, SystemMessage
from context_pager import ContextPager, PageFaultError
from context_serializer import ContextSerializer
AGENT_SYSTEM = """You are a document analysis agent with virtual memory capabilities.
You have access to a large document corpus. Not all of it fits in your context at once.
When you need information that is not currently in your context:
- State explicitly: "PAGE_FAULT: I need information about [topic]"
- The system will load the relevant pages and resume.
When analyzing, always note:
- Which facts came from which document sections (cite page IDs)
- When you detect a cross-reference between sections
- When your reasoning requires information you don't currently have in context
"""
class VirtualMemoryAgent:
def __init__(self):
self.llm = ChatOpenAI(model="gpt-4o", temperature=0)
self.pager = ContextPager()
self.serializer = ContextSerializer()
self._max_fault_retries = 5
def ingest(self, doc_id: str, text: str) -> list[str]:
"""Load a document into L4 storage and register all pages."""
return self.pager.preload_document(doc_id, text)
def query(self, question: str, seed_page_ids: list[str] | None = None) -> str:
"""
Answer a question over the loaded document corpus.
Starts with seed pages in L1; handles page faults automatically.
"""
# Load seed pages into L1
if seed_page_ids:
for page_id in seed_page_ids:
try:
self.pager.request_page(page_id)
except PageFaultError as e:
print(f"[Agent] Warning: {e}")
# If no seeds, do a semantic search to find the best starting pages
if not seed_page_ids or not self.pager.active_pages():
print(f"[Agent] Semantic search for: '{question}'")
candidate_ids = self.pager.semantic_find(question, n=3)
for page_id in candidate_ids:
try:
self.pager.request_page(page_id)
except PageFaultError:
pass
return self._run_with_fault_handling(question)
def _run_with_fault_handling(self, question: str) -> str:
for attempt in range(self._max_fault_retries):
context = self.pager.build_context()
token_count = self.pager.context_token_count()
active = self.pager.active_pages()
prompt = f"""Current context ({token_count} tokens, {len(active)} pages):
{context}
---
Question: {question}
If you need information not present above, respond with:
PAGE_FAULT: <description of what you need>
Otherwise, answer the question fully, citing page IDs for key facts.
"""
response = self.llm.invoke([
SystemMessage(content=AGENT_SYSTEM),
HumanMessage(content=prompt),
])
answer = response.content.strip()
if "PAGE_FAULT:" not in answer:
return answer
# Handle page fault
fault_description = answer.split("PAGE_FAULT:", 1)[1].strip()
print(f"\n[Agent] Page fault detected: {fault_description}")
loaded = self._handle_page_fault(fault_description)
if not loaded:
print("[Agent] Could not resolve page fault. Proceeding with current context.")
break
return self._run_without_faults(question)
def _handle_page_fault(self, description: str) -> bool:
"""Semantic search in L3 for pages matching the fault description."""
candidates = self.pager.semantic_find(description, n=3)
if not candidates:
print(f"[Agent] No candidates found in L3 for: {description}")
return False
for page_id in candidates:
try:
self.pager.request_page(page_id)
print(f"[Agent] Resolved fault: loaded {page_id}")
return True
except PageFaultError:
continue
return False
def _run_without_faults(self, question: str) -> str:
context = self.pager.build_context()
response = self.llm.invoke([
SystemMessage(content=AGENT_SYSTEM),
HumanMessage(content=f"Context:\n{context}\n\nQuestion: {question}"),
])
return response.content.strip()
def hand_off(self, task_state: dict) -> str:
"""Serialize current context state for hand-off to another agent."""
snapshot = self.serializer.serialize(
task_state=task_state,
active_page_ids=self.pager.active_pages(),
)
return self.serializer.to_json(snapshot)
def receive_hand_off(self, snapshot_json: str):
"""Restore context from a serialized hand-off snapshot."""
snapshot = self.serializer.from_json(snapshot_json)
print(
f"[Agent] Receiving hand-off: {snapshot.snapshot_id[:8]}... | "
f"{len(snapshot.active_page_ids)} pages to load"
)
# Load only the active pages — swap pages stay in swap (lazy loading)
for page_id in snapshot.active_page_ids:
try:
self.pager.request_page(page_id)
except PageFaultError:
pass
return snapshot.task_stateStep 7: Main Entry Point
import os
from agent import VirtualMemoryAgent
os.environ["OPENAI_API_KEY"] = "your-key-here"
LARGE_DOCUMENT = """
[Section 1: Introduction to Microservices Architecture]
Microservices architecture decomposes a monolithic application into small, independently deployable
services, each responsible for a specific business capability. Services communicate via APIs or
message queues. Key benefits include independent scaling, fault isolation, and technology diversity.
[Section 2: Service Discovery and Load Balancing]
In a microservices environment, services must locate each other dynamically. Service discovery
can be client-side (the client queries a registry) or server-side (a load balancer routes
requests). Common solutions: Consul, Eureka, Kubernetes DNS. Load balancing strategies include
round-robin, least-connections, and weighted routing.
[Section 3: Circuit Breaker Pattern]
The circuit breaker pattern prevents cascading failures. When a downstream service fails repeatedly,
the circuit "opens" and subsequent calls fail fast without hitting the failing service. After a
timeout, the circuit moves to half-open state to test recovery. Libraries: Resilience4j (Java),
PyBreaker (Python), Polly (.NET).
[Section 4: Data Management in Microservices]
Each microservice should own its data — the "database per service" pattern. This prevents tight
coupling via shared databases. Cross-service queries require the Saga pattern (distributed
transactions via compensating transactions) or CQRS (Command Query Responsibility Segregation)
with event sourcing. Eventual consistency is the norm, not the exception.
[Section 5: Security Considerations]
Authentication and authorization in microservices require careful design. Recommended approach:
API Gateway handles auth at the edge; downstream services trust the gateway's JWT claims.
Service-to-service calls should use mutual TLS (mTLS). Secrets management via HashiCorp Vault
or AWS Secrets Manager. Never pass credentials in environment variables in production.
""" * 20 # Simulate a large document (~10,000 tokens)
def main():
agent = VirtualMemoryAgent()
print("=== Ingesting document into L4 storage ===")
page_ids = agent.ingest("microservices_guide", LARGE_DOCUMENT)
print(f"Ingested {len(page_ids)} pages\n")
queries = [
"What is the circuit breaker pattern and how does it prevent cascading failures?",
"How should secrets be managed in a microservices deployment?",
"Explain the database per service pattern and when to use Saga vs CQRS.",
]
for q in queries:
print(f"\n{'='*60}")
print(f"Query: {q}")
print("="*60)
answer = agent.query(q)
print(f"\nAnswer:\n{answer}\n")
print("\n=== Context Hand-off Demo ===")
snapshot_json = agent.hand_off(task_state={"phase": "analysis", "completed_sections": [1, 2]})
print(f"Snapshot size: {len(snapshot_json)} chars")
new_agent = VirtualMemoryAgent()
state = new_agent.receive_hand_off(snapshot_json)
print(f"New agent received state: {state}")
if __name__ == "__main__":
main()Live Demo: Page Faults in Action
Running the system against a large document corpus:
=== Ingesting document into L4 storage ===
[L4] Ingested 'microservices_guide' → 48 pages
==============================
Query: What is the circuit breaker pattern?
==============================
[Agent] Semantic search for: 'What is the circuit breaker pattern?'
[L3] Embedded & stored: microservices_guide::chunk_0004
[Pager] Page fault on 'microservices_guide::chunk_0004' (currently in L3)
[L1] Loaded: microservices_guide::chunk_0004 (1987 tokens) | Total: 1987
[Agent] Page fault detected: circuit breaker implementation details
[L3] Embedded & stored: microservices_guide::chunk_0028
[Pager] Page fault on 'microservices_guide::chunk_0028' (currently in L4)
[L1] Loaded: microservices_guide::chunk_0028 (2000 tokens) | Total: 3987
Answer:
The circuit breaker pattern prevents cascading failures in microservices [chunk_0004].
When a downstream service fails repeatedly, the circuit "opens" — subsequent calls
fail immediately without hitting the failing service. After a configurable timeout,
it enters "half-open" state to probe recovery. Libraries: Resilience4j (Java),
PyBreaker (Python), Polly (.NET) [chunk_0028].The agent never saw the full 48-page document at once. It resolved two page faults, loaded exactly 3,987 tokens of the 96,000 total, and answered precisely.
Key Design Decisions
Why Not Just Use a Larger Context Window?
A 1M-token window sounds like it solves everything. It doesn't:
- Cost: 1M-token inference costs ~100x more than 10K-token inference.
- Latency: Attention is quadratic in sequence length. A 1M-token context is orders of magnitude slower than 10K.
- Lost-in-the-middle: Research consistently shows LLM recall degrades for information buried in the middle of very long contexts. A 1M-token window doesn't mean 1M tokens of equal attention.
- Infinite streams: Some tasks — monitoring a production system, processing a live data feed — have no defined endpoint. No context window is ever large enough.
The memory hierarchy solves all four problems: it keeps the active window small (fast, cheap), ensures relevance of what's in context (semantic retrieval), and handles infinite-length inputs naturally.
Page Size Tuning
The 2,000-token page size is a starting point, not a law. Tune it based on:
- Document granularity: Code files → 200–500 tokens per function. Legal documents → 1,000–2,000 tokens per clause. Research papers → 500–1,000 tokens per section.
- Semantic coherence: Pages should be semantically self-contained. A function split mid-body is worse than a slightly oversized page.
- Compression ratio: The compressor targets 10:1. If summaries are too lossy, reduce page size.
LRU vs. Attention-Weighted Eviction
The current implementation uses LRU (least recently used) as the L1 eviction policy — simple and predictable. A more sophisticated strategy: attention-weighted eviction. Track which page IDs the LLM cited in its last response. Pages that were cited get a recency boost; pages that weren't get penalized. This approximates the LLM's own sense of relevance.
The Compression Trade-off
Compression reduces token cost at the price of potential information loss. Mitigations:
- Use a large, capable model for compression (not the cheapest one).
- Store the full original in L3/L4 — the summary is a pointer acceleration, not a replacement.
- Detect when the agent asks for a compressed page and reload the full version on demand.
Production Deployment Considerations
- Context Pager as a microservice: The pager should be a standalone service, not embedded in the agent process. This enables multiple agents to share the same L2/L3/L4 state — critical for multi-agent pipelines.
- Redis Cluster for L2: Production deployments need Redis Cluster (not a single node) for HA and throughput. Use Redis Stack for the sorted-set TTL eviction.
- Qdrant over ChromaDB for L3: ChromaDB is excellent for development. Qdrant or Weaviate handle production-scale vector workloads with HNSW indexing and horizontal sharding.
- S3 lifecycle policies for L4: Set S3 lifecycle rules to automatically tier cold objects to Glacier after 30 days. Warm retrieval for Glacier takes 3–5 hours — acceptable for truly cold data.
Live Demo: Watching a PAGE_FAULT Triggered by the LLM
The concepts above are not theoretical. The interactive demo at huggingface.co/spaces/onemancrew/infinite-window-demo lets you watch every layer of the hierarchy in real time.
The most revealing mode is Blind Mode — it deliberately loads an unrelated page into L1 first, then asks the LLM a question it cannot answer with the available context. This forces the LLM itself to trigger a page fault, exactly mirroring the CPU interrupt model from operating systems.
Here is the exact run:
Setup:
- Document: Microservices Guide (2 pages, stored in L4)
- Backend:
Qwen2.5-7B (free) - Mode:
Blind — LLM triggers PAGE_FAULT itself - Question: "What libraries implement the circuit breaker pattern?"
Event Log — what the system recorded:
20:02:48 Ingested microservices_guide → 2 pages stored in L4
20:02:56 Blind mode: loaded unrelated page microservices_guide::p001
into L1 — only 17 tokens
20:02:56 Calling Qwen2.5-7B (attempt 1, 17 tokens in L1)...
20:02:56 LLM triggered PAGE_FAULT (fault #1):
"Information on libraries implementing the circuit breaker pattern"
20:02:57 Semantic search → 1 candidate found in L4
20:02:57 Page fault microservices_guide::p000
(was in L4 — Disk Storage) — loading... (+200ms)
Loaded into L1 (300 tokens) | L1 total: 317
20:02:57 Calling Qwen2.5-7B (attempt 2, 317 tokens in L1)...
20:02:57 Answer received from Qwen2.5-7BThe LLM's answer on attempt 2:
"Popular libraries for implementing the circuit breaker pattern include: Resilience4j for Java, PyBreaker for Python, and Polly for .NET — [microservices_guide::p001]"
Breaking Down What Happened
Attempt 1 — the page fault: The LLM received only 17 tokens of unrelated content. Rather than hallucinating an answer, it responded with PAGE_FAULT: Information on libraries implementing the circuit breaker pattern. This is the system prompt instruction at work: "If the answer is NOT present in the provided context pages, you MUST respond with PAGE_FAULT:"
The pager intercepts the fault: The pager parses the PAGE_FAULT: prefix, extracts the description, and runs a semantic search across L4. It finds microservices_guide::p000 — the page containing the circuit breaker section — and loads it into L1. The eviction queue is not triggered because there was sufficient room.
Attempt 2 — resolution: Now with 317 tokens in L1 including the correct page, the LLM answers precisely and cites the page ID. Total round-trips: 2. Total tokens loaded into context: 300. The other 1,700+ tokens in the document were never touched.
The OS Analogy — Exact Mapping
| What happened in the demo | OS equivalent |
|---|---|
| LLM on attempt 1 with 17 tokens | CPU executing with a page not in RAM |
LLM writes PAGE_FAULT: | CPU raises a page fault interrupt |
| Pager parses the fault description | Kernel interrupt handler runs |
Semantic search finds p000 in L4 | OS locates the page on disk |
p000 loaded from L4 to L1 (+200ms) | OS copies page from disk to RAM |
| LLM runs attempt 2 with 317 tokens | CPU resumes execution from same instruction |
| LLM answers correctly | Program continues as if memory was always there |
The LLM never "knew" it was waiting for a page load. From its perspective, it asked for information and the information appeared. This is precisely the abstraction virtual memory provides to programs — and it is exactly what we are providing to language models.
Try it yourself: The Infinite Window — Live Demo
Key Takeaways
- The context window is a cache, not a container. Stop trying to fit everything in it. Start managing what goes in and out of it.
- Virtual memory is a proven abstraction. Operating systems have solved this problem for 60 years. The LLM domain is reinventing it — badly, with RAG hacks. Borrow the right abstraction.
- Four layers beat two. The standard "context + vector DB" setup collapses L2 and misses the critical hot-buffer tier. Redis as L2 eliminates re-embedding round-trips for recently evicted pages.
- Compression is not optional. Evicted pages that re-enter as 200-token summaries instead of 2,000-token originals give you a 10x capacity multiplier.
- Lazy loading via context serialization is how multi-agent systems should pass state. Pointers, not payloads.
🚀 Live Demo (Hugging Face Spaces)
Try the interactive demo on Hugging Face Spaces — ingest a document, fire queries, and watch page faults, LRU evictions, and tier promotions happen in real time:
🧠 The Infinite Window — Live Demo
📂 Source Code (GitHub)
All code examples from this article are available on GitHub: OneManCrew/virtual-memory-llm
