Every LLM call is a negotiation with a hard limit. GPT-4o gives you 128K tokens. Claude 3.7 gives you 200K. Gemini 1.5 Pro stretches to 1M. These numbers feel generous until you are three hours into a multi-turn coding session, feeding a 90-page contract into a legal analysis agent, or running a customer support bot that needs the last 200 messages for coherent context.
The naive solution is truncation: drop the oldest messages and hope nothing important was in them. The RAG solution is retrieval: embed everything and pull back chunks on demand. Both are compromises. Truncation destroys information. RAG is stateless — it cannot reason across retrieved fragments the way a human reads a document linearly.
There is a third path: compression. Reduce the token count of what you already have while preserving its semantic content. Done well, compression lets you fit a 32K-token conversation into 8K tokens without the LLM noticing the difference. Done wrong, it introduces hallucinations, loses critical named entities, and breaks reasoning chains.
This article walks through three progressive compression strategies, each building on the last:
- Extractive Pruning — remove low-signal tokens and messages without rewriting anything
- Abstractive Compression — use an LLM to rewrite history into dense summaries, with lossless preservation of facts
- Agent-Driven Dynamic Context — an agent that manages a live compression budget, decides what to compress vs. keep verbatim, and re-expands on demand
Each stage includes a complete, runnable Python implementation. By the end, you will have a production-ready context management layer you can drop into any LLM application.
Why Compression Beats Truncation
Consider a 40-turn coding session. The first 20 turns established the architecture: the database schema, the API design, the choice of async over sync. The last 20 turns are actively debugging a specific bug. Naive truncation keeps only the last 20 turns — and the LLM has lost every architectural decision made in the first half.
RAG retrieval could theoretically recover this, but RAG is designed for document corpora, not conversational context. A conversation has order, causality, and reference. "The function we wrote in turn 3", "the bug I mentioned earlier", "the schema change from two steps ago" — these are not retrievable facts; they are the fabric of the reasoning thread.
Compression preserves the thread. Instead of dropping turn 3, you compress it from 800 tokens to 80, retaining the function signature, the key decision, and the outcome. The LLM now has a compressed but continuous history.
| Strategy | Token Reduction | Information Loss Risk | Latency Cost |
|---|---|---|---|
| Truncation (baseline) | 100% of dropped content | High — oldest content gone | Zero |
| Extractive Pruning | 20–50% | Low — only redundancy removed | Near-zero |
| Abstractive Compression | 70–90% | Medium — depends on prompt quality | One LLM call per N turns |
| Agent-Driven Dynamic | Adaptive | Lowest — agent decides per message | Periodic LLM calls |
Part 1: Extractive Pruning
Extractive pruning removes tokens without rewriting any content. It identifies low-signal material — filler phrases, repeated context, verbose tool outputs, sparse JSON — and strips it out at the character level. No LLM needed; this is pure text processing.
The key principle: signal density varies enormously across a conversation. A tool call that returned 3,000 tokens of JSON with 2,800 tokens of null fields carries about 200 tokens of actual information. An assistant message that starts with "Certainly! I'd be happy to help you with that. Let me think through this step by step." contains zero information in its first 25 tokens.
Extractive pruning targets four categories:
- Filler phrases — opening pleasantries, transition sentences, closing offers to help
- Redundant repetition — messages that restate the previous message before responding
- Sparse tool outputs — JSON/XML with high null density, paginated results with empty pages
- Oversized code blocks — when only the function signature and docstring are needed, not the full 200-line implementation
import re
import json
import tiktoken
from dataclasses import dataclass, field
from typing import Optional
from difflib import SequenceMatcher
FILLER_PATTERNS = [
r"^(Certainly|Sure|Of course|Absolutely|Great|Happy to help)[!,.]?\s*",
r"^I'd be happy to (help|assist) (you )?with that\.?\s*",
r"^Let me (think|walk you) (through|about) (this|that)\.?\s*",
r"\s*Is there anything else (I can help|you'd like to know)\??$",
r"\s*Let me know if you (need|have) (any|more) (questions|clarification)\.?$",
r"\s*Feel free to ask if (you need|there's) anything else\.?$",
]
COMPILED_FILLERS = [re.compile(p, re.IGNORECASE | re.MULTILINE) for p in FILLER_PATTERNS]
@dataclass
class Message:
role: str
content: str
token_count: int = 0
@dataclass
class PruningResult:
original_tokens: int
pruned_tokens: int
messages_pruned: int
reduction_pct: float
messages: list[Message]
class ExtractivePruner:
"""
Reduces token count by removing low-signal content without rewriting.
Operates purely on text — no LLM calls required.
"""
def __init__(
self,
model: str = "gpt-4o",
min_message_tokens: int = 10,
max_json_null_ratio: float = 0.4,
prune_code_bodies: bool = False,
):
self.enc = tiktoken.encoding_for_model(model)
self.min_message_tokens = min_message_tokens
self.max_json_null_ratio = max_json_null_ratio
self.prune_code_bodies = prune_code_bodies
def count_tokens(self, text: str) -> int:
return len(self.enc.encode(text))
def strip_fillers(self, text: str) -> str:
"""Remove filler phrases from the beginning and end of a message."""
for pattern in COMPILED_FILLERS:
text = pattern.sub("", text)
return text.strip()
def compress_json(self, text: str) -> str:
"""
Compact JSON tool outputs by removing null/empty fields.
Falls back to original if not valid JSON.
"""
try:
data = json.loads(text)
except (json.JSONDecodeError, ValueError):
return text
def remove_nulls(obj):
if isinstance(obj, dict):
return {
k: remove_nulls(v)
for k, v in obj.items()
if v is not None and v != "" and v != [] and v != {}
}
if isinstance(obj, list):
return [remove_nulls(i) for i in obj if i is not None]
return obj
cleaned = remove_nulls(data)
original_size = len(text)
compact = json.dumps(cleaned, separators=(",", ":"))
null_ratio = 1 - (len(compact) / original_size)
if null_ratio > self.max_json_null_ratio:
return compact
return text
def prune_code_block(self, text: str) -> str:
"""
If prune_code_bodies is enabled, replace function bodies with '...'
while preserving signatures and docstrings.
"""
if not self.prune_code_bodies:
return text
pattern = re.compile(
r"(def \w+\([^)]*\).*?:[ \t]*\n(?:[ \t]+['''\"]{3}.*?['''\"]{3}\n)?)((?:[ \t]+.*\n)+)",
re.MULTILINE,
)
def replace_body(match):
signature = match.group(1)
body = match.group(2)
body_tokens = self.count_tokens(body)
if body_tokens > 150:
indent = re.match(r"[ \t]+", body)
ind = indent.group() if indent else " "
return f"{signature}{ind}...\n"
return match.group(0)
return pattern.sub(replace_body, text)
def deduplicate(self, messages: list[Message]) -> list[Message]:
"""
Remove messages whose content is a near-exact repeat of the previous message.
Threshold: >85% character overlap using SequenceMatcher ratio.
"""
if len(messages) < 2:
return messages
result = [messages[0]]
for msg in messages[1:]:
prev = result[-1]
if msg.role == prev.role:
ratio = SequenceMatcher(None, prev.content, msg.content).ratio()
if ratio > 0.85:
continue
result.append(msg)
return result
def prune_message(self, msg: Message) -> Message:
"""Apply all pruning strategies to a single message."""
content = msg.content
if msg.role == "assistant":
content = self.strip_fillers(content)
if msg.role == "tool" or (msg.role == "assistant" and content.strip().startswith("{")):
content = self.compress_json(content)
if "```" in content or "def " in content:
content = self.prune_code_block(content)
return Message(
role=msg.role,
content=content,
token_count=self.count_tokens(content),
)
def prune(self, messages: list[Message]) -> PruningResult:
"""
Apply all extractive pruning strategies to a message list.
Args:
messages: List of conversation messages.
Returns:
PruningResult with token counts and pruned messages.
"""
original_tokens = sum(self.count_tokens(m.content) for m in messages)
pruned = [self.prune_message(m) for m in messages]
pruned = self.deduplicate(pruned)
pruned = [m for m in pruned if m.token_count >= self.min_message_tokens]
pruned_tokens = sum(m.token_count for m in pruned)
reduction = (original_tokens - pruned_tokens) / original_tokens * 100 if original_tokens else 0
return PruningResult(
original_tokens=original_tokens,
pruned_tokens=pruned_tokens,
messages_pruned=len(messages) - len(pruned),
reduction_pct=round(reduction, 1),
messages=pruned,
)Extractive pruning applies four techniques — filler removal, JSON compaction, code body replacement, and deduplication — without a single LLM call, achieving 20–50% token reduction on typical conversation logs.
The key insight: extractive pruning is lossless for information-dense content. The function signature and docstring are preserved; only the body is stubbed. The JSON result is preserved; only the null fields are removed. No facts are invented or distorted — content is either kept verbatim or removed entirely.
But extractive pruning has a ceiling. You can remove filler and compact JSON, but the core semantic content of each message stays at its original token size. To achieve 70–90% reduction, you need to rewrite history — not remove it.
Part 2: Abstractive Compression
Abstractive compression uses an LLM to convert a window of recent messages into a dense summary. The challenge is doing this without losing information. A naive "summarize this conversation" prompt will produce fluent text that sounds correct but drops specifics: exact variable names, precise numerical values, error messages, function signatures — exactly the details the next LLM call needs.
The solution is a structured compression prompt with explicit preservation rules. The compressor must be instructed to treat code identifiers, numbers, and error strings as sacred — compress the prose, never the data.
The architecture has two components:
- Sliding window compressor — when the context exceeds a budget, compress the oldest N turns into a summary block and keep the newest M turns verbatim
- Progressive summarization — each new summary incorporates the previous summary, so history compounds without growing
import asyncio
import tiktoken
from dataclasses import dataclass, field
from typing import Optional
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage, AIMessage
from langchain_core.prompts import ChatPromptTemplate
COMPRESSION_SYSTEM = """You are a context compression engine for LLM conversations.
Your task: compress a block of conversation turns into a dense, lossless summary.
CRITICAL RULES — never violate these:
1. Preserve ALL named entities: variable names, function names, class names, file paths, URLs.
2. Preserve ALL numerical values: counts, prices, IDs, versions, scores, timestamps.
3. Preserve ALL error messages and exception types verbatim (in backticks).
4. Preserve ALL code snippets that were agreed upon or defined — use inline code.
5. Preserve ALL decisions: "we decided to use X", "user rejected Y", "the bug was Z".
6. Compress prose aggressively. Remove pleasantries, repetition, and verbose explanations.
7. Use structured format: bullet points, key:value pairs, inline code for identifiers.
8. Never invent information. Never generalize specific facts into vague statements.
9. Target compression ratio: 8:1 (800 tokens -> ~100 tokens).
Output format:
[COMPRESSED HISTORY - turns {start} to {end}]
• Context: <1-2 sentences of what was being worked on>
• Decisions: <bullet list of key decisions made>
• Code defined: <inline code for any functions/classes/variables established>
• Errors seen: <exact error messages if any>
• Current state: <where things stood at the end of this block>
"""
COMPRESSION_HUMAN = """Compress these conversation turns:
{conversation_block}
Previous compressed history (incorporate this context):
{previous_summary}
"""
@dataclass
class ConversationMessage:
role: str
content: str
@dataclass
class ContextWindow:
"""Manages a live conversation context with a token budget."""
budget_tokens: int
verbatim_turns: int = 8
compressor_model: str = "gpt-4o-mini"
full_model: str = "gpt-4o"
messages: list[ConversationMessage] = field(default_factory=list)
compressed_summary: str = ""
_total_turns: int = 0
def __post_init__(self):
self.enc = tiktoken.encoding_for_model(self.full_model)
self.llm = ChatOpenAI(model=self.compressor_model, temperature=0.0)
def count_tokens(self, text: str) -> int:
return len(self.enc.encode(text))
def total_tokens(self) -> int:
msg_tokens = sum(self.count_tokens(m.content) for m in self.messages)
summary_tokens = self.count_tokens(self.compressed_summary)
return msg_tokens + summary_tokens
def add_message(self, role: str, content: str):
"""Add a new message to the context."""
self.messages.append(ConversationMessage(role=role, content=content))
self._total_turns += 1
async def compress_oldest_block(self, block_size: int = 6) -> str:
"""
Compress the oldest `block_size` turns into a summary.
Incorporates any previously compressed summary for continuity.
Args:
block_size: Number of turns to compress in one pass.
Returns:
The new compressed summary string.
"""
if len(self.messages) <= block_size:
return self.compressed_summary
to_compress = self.messages[:block_size]
self.messages = self.messages[block_size:]
turn_start = self._total_turns - len(self.messages) - block_size + 1
turn_end = turn_start + block_size - 1
conversation_block = "\n\n".join(
f"[{m.role.upper()}]: {m.content}" for m in to_compress
)
prompt = ChatPromptTemplate.from_messages([
("system", COMPRESSION_SYSTEM.format(start=turn_start, end=turn_end)),
("human", COMPRESSION_HUMAN),
])
chain = prompt | self.llm
response = await chain.ainvoke({
"conversation_block": conversation_block,
"previous_summary": self.compressed_summary or "None — this is the first compression.",
})
new_summary = response.content.strip()
original_tokens = sum(self.count_tokens(m.content) for m in to_compress)
compressed_tokens = self.count_tokens(new_summary)
ratio = original_tokens / compressed_tokens if compressed_tokens else 0
print(
f"[Compressor] Turns {turn_start}-{turn_end}: "
f"{original_tokens} -> {compressed_tokens} tokens ({ratio:.1f}x compression)"
)
self.compressed_summary = new_summary
return new_summary
async def ensure_fits(self) -> bool:
"""
Compress until the context fits within the token budget.
Keeps the most recent `verbatim_turns` messages uncompressed.
Returns:
True if compression was performed, False if already within budget.
"""
if self.total_tokens() <= self.budget_tokens:
return False
print(f"[ContextWindow] Budget exceeded: {self.total_tokens()} / {self.budget_tokens} tokens. Compressing...")
while self.total_tokens() > self.budget_tokens and len(self.messages) > self.verbatim_turns:
compressible = len(self.messages) - self.verbatim_turns
block = min(6, compressible)
if block < 2:
break
await self.compress_oldest_block(block_size=block)
print(f"[ContextWindow] After compression: {self.total_tokens()} tokens")
return True
def build_prompt_messages(self) -> list:
"""
Build the final list of LangChain messages to send to the LLM.
Injects compressed history as a system context block.
"""
result = []
if self.compressed_summary:
result.append(SystemMessage(content=(
"The following is a compressed summary of earlier conversation history. "
"All facts, names, and code in this summary are accurate and should be treated "
"as if you had the full conversation.\n\n"
+ self.compressed_summary
)))
for msg in self.messages:
if msg.role == "user":
result.append(HumanMessage(content=msg.content))
elif msg.role == "assistant":
result.append(AIMessage(content=msg.content))
else:
result.append(SystemMessage(content=f"[{msg.role.upper()}]: {msg.content}"))
return resultThe ContextWindow class maintains a rolling compression budget. When total tokens exceed the limit, the oldest block is compressed using the structured prompt — preserving all code identifiers, error messages, and decisions — then replaced with a dense summary that subsequent compressions build upon progressively.
The progressive summarization approach is critical. Each compression pass does not throw away previous summaries — it incorporates them. The compressor receives both the block to compress and the existing summary, producing a new unified summary that captures the full history. This means no matter how long a conversation runs, the summary stays bounded while continuously accumulating facts.
Part 3: Agent-Driven Dynamic Context Management
Extractive pruning and abstractive compression are both mechanical — they apply fixed rules regardless of content. The third and most powerful approach is agent-driven: an agent that understands the semantics of the current task and makes intelligent decisions about what to compress, what to keep verbatim, and what to re-expand from a compressed summary on demand.
The core insight: not all messages are equal in the context of the current task. In a long debugging session, the message where the bug was first identified is sacred — it must never be compressed away. The three messages before it where the agent searched in the wrong direction are safe to compress aggressively. A mechanical compressor cannot distinguish these; an agent can.
The agent-driven approach adds three capabilities over the previous two:
- Importance scoring — the agent evaluates each message against the current task goal and assigns a retention priority
- Selective compression —
criticalandhighpriority messages are kept verbatim;lowanddroppablemessages are aggressively compressed or removed - On-demand expansion — if the agent detects it needs detail from a compressed block, it can request re-expansion of the original content
import asyncio
import json
import tiktoken
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage, AIMessage
class RetentionPriority(str, Enum):
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
DROPPABLE = "droppable"
@dataclass
class ManagedMessage:
role: str
content: str
turn_id: int
priority: RetentionPriority = RetentionPriority.MEDIUM
compressed_version: Optional[str] = None
is_compressed: bool = False
original_tokens: int = 0
current_tokens: int = 0
SCORER_SYSTEM = """You are a context importance scorer for an AI agent conversation.
Given the current task goal and a set of conversation turns, score each turn's
importance to the ongoing task.
For each turn, output a JSON object with:
- turn_id: the integer turn ID
- priority: one of "critical", "high", "medium", "low", "droppable"
- reason: one sentence explaining the score
Priority definitions:
- critical: Contains the core problem definition, key constraints, agreed solution, or active bug
- high: Contains important decisions, working code, or data that is actively referenced
- medium: Contains useful context that might be referenced later
- low: Contains exploratory discussion, failed attempts, or verbose explanations
- droppable: Contains filler, off-topic content, or complete repetition of other turns
Output a JSON array of these objects, one per turn. Nothing else.
"""
class DynamicContextManager:
"""
Agent-driven context manager that scores message importance against
the current task goal and makes intelligent compression decisions.
"""
def __init__(
self,
budget_tokens: int = 8000,
scorer_model: str = "gpt-4o-mini",
compressor_model: str = "gpt-4o-mini",
working_model: str = "gpt-4o",
):
self.budget_tokens = budget_tokens
self.enc = tiktoken.encoding_for_model(working_model)
self.scorer = ChatOpenAI(model=scorer_model, temperature=0.0)
self.compressor = ChatOpenAI(model=compressor_model, temperature=0.0)
self.messages: list[ManagedMessage] = []
self.task_goal: str = ""
self._turn_counter = 0
def count_tokens(self, text: str) -> int:
return len(self.enc.encode(text))
def set_task_goal(self, goal: str):
"""Set the current task goal used for importance scoring."""
self.task_goal = goal
def add_message(self, role: str, content: str) -> ManagedMessage:
"""Add a message and track its token count."""
tokens = self.count_tokens(content)
msg = ManagedMessage(
role=role,
content=content,
turn_id=self._turn_counter,
original_tokens=tokens,
current_tokens=tokens,
)
self.messages.append(msg)
self._turn_counter += 1
return msg
def total_tokens(self) -> int:
return sum(m.current_tokens for m in self.messages)
async def score_importance(self, messages: list[ManagedMessage]) -> list:
"""
Ask the scorer LLM to evaluate each message's importance
relative to the current task goal.
"""
turns_text = "\n\n".join(
f"Turn {m.turn_id} [{m.role.upper()}]: {m.content[:500]}"
for m in messages
)
prompt = (
f"Current task goal: {self.task_goal}\n\n"
f"Conversation turns to score:\n\n{turns_text}"
)
response = await self.scorer.ainvoke([
SystemMessage(content=SCORER_SYSTEM),
HumanMessage(content=prompt),
])
try:
return json.loads(response.content)
except (json.JSONDecodeError, ValueError):
return [{"turn_id": m.turn_id, "priority": "medium", "reason": "parse error"} for m in messages]
async def compress_message(self, msg: ManagedMessage) -> str:
"""Compress a single low-priority message, retaining the original for re-expansion."""
prompt = (
f"Compress this conversation turn to its minimum essential content.\n"
f"Preserve: all code identifiers, numbers, error messages, file names, and decisions.\n"
f"Remove: prose explanations, pleasantries, verbose context already captured elsewhere.\n"
f"Output only the compressed content, no preamble.\n\n"
f"Turn [{msg.role.upper()}]: {msg.content}"
)
response = await self.compressor.ainvoke([HumanMessage(content=prompt)])
return response.content.strip()
async def optimize(self) -> dict:
"""
Main optimization loop. If context exceeds budget:
1. Score all uncompressed messages by importance against the task goal.
2. Compress low/droppable priority messages first.
3. Repeat until within budget or only critical/high messages remain.
Returns:
Stats dict with before/after token counts and actions taken.
"""
before = self.total_tokens()
if before <= self.budget_tokens:
return {"action": "none", "before": before, "after": before}
print(f"[DynamicCtx] Optimizing: {before} / {self.budget_tokens} tokens")
uncompressed = [m for m in self.messages if not m.is_compressed]
raw_decisions = await self.score_importance(uncompressed)
priority_map = {d["turn_id"]: RetentionPriority(d["priority"]) for d in raw_decisions}
for msg in self.messages:
if msg.turn_id in priority_map:
msg.priority = priority_map[msg.turn_id]
compress_order = [
RetentionPriority.DROPPABLE,
RetentionPriority.LOW,
RetentionPriority.MEDIUM,
]
actions_taken = []
for priority_level in compress_order:
if self.total_tokens() <= self.budget_tokens:
break
candidates = [
m for m in self.messages
if m.priority == priority_level and not m.is_compressed
]
for msg in candidates:
if self.total_tokens() <= self.budget_tokens:
break
if priority_level == RetentionPriority.DROPPABLE:
original_tokens = msg.current_tokens
msg.content = f"[DROPPED - {msg.role} turn {msg.turn_id}]"
msg.current_tokens = self.count_tokens(msg.content)
msg.is_compressed = True
actions_taken.append({"turn": msg.turn_id, "action": "dropped", "saved": original_tokens - msg.current_tokens})
else:
compressed = await self.compress_message(msg)
original_tokens = msg.current_tokens
msg.compressed_version = msg.content
msg.content = compressed
msg.current_tokens = self.count_tokens(compressed)
msg.is_compressed = True
actions_taken.append({"turn": msg.turn_id, "action": "compressed", "priority": priority_level.value, "saved": original_tokens - msg.current_tokens})
after = self.total_tokens()
print(f"[DynamicCtx] Optimized: {before} -> {after} tokens ({len(actions_taken)} actions)")
return {"action": "optimized", "before": before, "after": after, "actions": actions_taken}
def expand_message(self, turn_id: int) -> bool:
"""
Re-expand a previously compressed message back to its original content.
Returns True if expansion was possible, False if original is unavailable.
"""
for msg in self.messages:
if msg.turn_id == turn_id and msg.compressed_version:
msg.content = msg.compressed_version
msg.current_tokens = self.count_tokens(msg.content)
msg.is_compressed = False
print(f"[DynamicCtx] Expanded turn {turn_id}: {msg.current_tokens} tokens restored")
return True
return False
def build_prompt_messages(self) -> list:
"""Build the final message list for the working LLM."""
result = []
for msg in self.messages:
if msg.role == "user":
result.append(HumanMessage(content=msg.content))
elif msg.role == "assistant":
result.append(AIMessage(content=msg.content))
else:
result.append(SystemMessage(content=f"[{msg.role.upper()}]: {msg.content}"))
return resultThe DynamicContextManager scores each message against the current task goal, then compresses low-priority messages first — preserving critical context like active bug reports, agreed code patterns, and key decisions verbatim — while compressing exploratory discussion and failed attempts aggressively. The expand_message method lets the agent restore any compressed turn if it turns out to need the detail.
The key differentiator from the previous two approaches: the agent knows what the current task is. When the task goal is "debug a 422 error", the message that first introduced the 422 gets scored as critical. The early architectural discussion about table schema gets scored as medium. The exploratory detours get scored as low. The compression that follows is semantically informed, not mechanically applied.
Combining All Three
In a production system, you stack all three layers:
- On every new message — run extractive pruning first. Zero latency, zero cost.
- When approaching the budget — trigger abstractive compression on the oldest block. One cheap
gpt-4o-minicall. - When the task is well-defined — run agent-driven scoring once and let it guide which messages get compressed aggressively vs. kept verbatim.
This gives you a graceful degradation curve: the system is never forced into a binary truncate-or-not decision. It finds the highest-fidelity representation of the conversation that fits within your token budget — automatically, at every turn.
The pattern is applicable beyond chat: long agentic loops with hundreds of tool calls, document processing pipelines, multi-session agent memory, and anywhere else a token budget separates you from complete context.
📂 Source Code
All code examples from this article are available on GitHub: OneManCrew/context-window-compression