In Part 1 we built a Self-Extending Agent — one that writes new tools at runtime when it encounters a capability gap. That agent can generate fetch_exchange_rate, validate it in a sandbox, persist it to disk, and use it immediately. Problem solved — once.
But what happens the second, tenth, or hundredth time that tool runs?
The generated tool might be slow (using a deprecated API endpoint). It might fail intermittently (no retry logic). It might return stale data (no cache invalidation). The agent has no idea, because it never looks back.
This article builds the next layer: a Self-Improving Agent — one that monitors every tool invocation, scores performance over time, detects degradation, rewrites underperforming tools using an LLM, validates the rewrite against a regression test suite, and promotes the new version only if it passes. All automatically.
Prerequisite: This article extends the codebase from Part 1. Read it first — this article assumes you have the registry, generator, validator, and orchestrator already working.
Why Self-Improvement Matters
Consider a tool the agent generated last week:
def fetch_exchange_rate(from_currency: str, to_currency: str) -> float:
"""Fetch live exchange rate between two currencies."""
import requests
resp = requests.get(
f"https://api.exchangerate-api.com/v4/latest/{from_currency}"
)
data = resp.json()
return data["rates"][to_currency]This worked perfectly on day one. But now:
- The API returns HTTP 429 under load — no retry logic.
- Average latency climbed to 3.2 seconds — no timeout configured.
- The API occasionally returns stale rates from a CDN cache — no freshness check.
A static agent would keep using this tool forever, degrading silently. A Self-Improving Agent detects the decline, rewrites the tool with retry logic, timeouts, and a fallback API, runs the regression tests, and promotes the fix — all without human intervention.
Architecture Overview
The Self-Improving Agent adds four new components on top of the Part 1 architecture:
┌─────────────────────────────────────────────────────────┐
│ ORCHESTRATOR AGENT │
│ Executes tool → wraps call with metrics capture │
└──────────────────────┬──────────────────────────────────┘
│ {latency, success, output_hash}
▼
┌─────────────────────────────────────────────────────────┐
│ PERFORMANCE MEMORY │
│ Stores per-tool metrics across invocations │
│ SQLite-backed, queryable by tool name + time range │
└──────────────────────┬──────────────────────────────────┘
│ aggregated scores
▼
┌─────────────────────────────────────────────────────────┐
│ EVALUATOR │
│ Computes weighted quality score per tool │
│ Triggers rewrite if score < threshold │
└──────────────────────┬──────────────────────────────────┘
│ rewrite request + context
▼
┌─────────────────────────────────────────────────────────┐
│ REWRITER AGENT │
│ LLM receives: old code + metrics + failure logs │
│ Produces: improved version of the tool │
└──────────────────────┬──────────────────────────────────┘
│ candidate code
▼
┌─────────────────────────────────────────────────────────┐
│ REGRESSION RUNNER │
│ Runs stored test cases against candidate │
│ Compares output, latency, error rate vs. baseline │
│ → PROMOTE new version or ROLLBACK to old │
└─────────────────────────────────────────────────────────┘Project Setup
Extend the Part 1 project:
self_improving_agent/
├── main.py
├── registry.py ← from Part 1 (unchanged)
├── generator.py ← from Part 1 (unchanged)
├── validator.py ← from Part 1 (unchanged)
├── orchestrator.py ← extended with metrics capture
├── performance_memory.py ← NEW
├── evaluator.py ← NEW
├── rewriter.py ← NEW
├── regression_runner.py ← NEW
└── tools/
├── manifest.json
└── tests/ ← NEW: stored test cases
└── manifest.jsonpip install langchain langchain-openai openai pydanticNo new dependencies beyond Part 1. The performance memory uses SQLite from the standard library.
Step 1: Performance Memory
Every tool invocation is recorded. The memory stores latency, success/failure, error messages, input fingerprints, and output hashes — all in a local SQLite database.
import json
import sqlite3
import time
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import Optional
DB_PATH = Path(__file__).parent / "tools" / "performance.db"
@dataclass
class InvocationRecord:
tool_name: str
timestamp: float
latency_ms: float
success: bool
error_message: Optional[str]
input_hash: str
output_hash: Optional[str]
version: int
class PerformanceMemory:
def __init__(self, db_path: Path = DB_PATH):
self.db_path = db_path
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self.conn = sqlite3.connect(str(self.db_path))
self._create_tables()
def _create_tables(self):
self.conn.execute("""
CREATE TABLE IF NOT EXISTS invocations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
tool_name TEXT NOT NULL,
timestamp REAL NOT NULL,
latency_ms REAL NOT NULL,
success INTEGER NOT NULL,
error_message TEXT,
input_hash TEXT NOT NULL,
output_hash TEXT,
version INTEGER NOT NULL DEFAULT 1
)
""")
self.conn.execute("""
CREATE TABLE IF NOT EXISTS tool_versions (
tool_name TEXT NOT NULL,
version INTEGER NOT NULL,
source_code TEXT NOT NULL,
created_at REAL NOT NULL,
PRIMARY KEY (tool_name, version)
)
""")
self.conn.commit()
def record(self, rec: InvocationRecord):
self.conn.execute(
"""INSERT INTO invocations
(tool_name, timestamp, latency_ms, success,
error_message, input_hash, output_hash, version)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(
rec.tool_name, rec.timestamp, rec.latency_ms,
int(rec.success), rec.error_message,
rec.input_hash, rec.output_hash, rec.version,
),
)
self.conn.commit()
def get_recent(self, tool_name: str, limit: int = 20) -> list[dict]:
cursor = self.conn.execute(
"""SELECT tool_name, timestamp, latency_ms, success,
error_message, input_hash, output_hash, version
FROM invocations
WHERE tool_name = ?
ORDER BY timestamp DESC
LIMIT ?""",
(tool_name, limit),
)
columns = [
"tool_name", "timestamp", "latency_ms", "success",
"error_message", "input_hash", "output_hash", "version",
]
return [dict(zip(columns, row)) for row in cursor.fetchall()]
def get_current_version(self, tool_name: str) -> int:
cursor = self.conn.execute(
"SELECT MAX(version) FROM tool_versions WHERE tool_name = ?",
(tool_name,),
)
row = cursor.fetchone()
return row[0] if row[0] is not None else 1
def store_version(self, tool_name: str, version: int, source_code: str):
self.conn.execute(
"""INSERT OR REPLACE INTO tool_versions
(tool_name, version, source_code, created_at)
VALUES (?, ?, ?, ?)""",
(tool_name, version, source_code, time.time()),
)
self.conn.commit()
def get_version_source(self, tool_name: str, version: int) -> Optional[str]:
cursor = self.conn.execute(
"SELECT source_code FROM tool_versions WHERE tool_name = ? AND version = ?",
(tool_name, version),
)
row = cursor.fetchone()
return row[0] if row else None
def summary(self, tool_name: str, last_n: int = 20) -> dict:
records = self.get_recent(tool_name, last_n)
if not records:
return {"tool_name": tool_name, "invocations": 0}
total = len(records)
successes = sum(1 for r in records if r["success"])
latencies = [r["latency_ms"] for r in records]
errors = [r["error_message"] for r in records if r["error_message"]]
return {
"tool_name": tool_name,
"invocations": total,
"success_rate": successes / total,
"avg_latency_ms": sum(latencies) / total,
"max_latency_ms": max(latencies),
"min_latency_ms": min(latencies),
"recent_errors": errors[:5],
"current_version": self.get_current_version(tool_name),
}
memory = PerformanceMemory()Step 2: The Evaluator
The evaluator consumes performance summaries and produces a single quality score between 0.0 and 1.0 for each tool. If the score drops below a configurable threshold, it flags the tool for rewriting.
from dataclasses import dataclass
from performance_memory import memory
REWRITE_THRESHOLD = 0.6
# Weights for the quality score components
WEIGHT_SUCCESS_RATE = 0.50
WEIGHT_LATENCY = 0.30
WEIGHT_CONSISTENCY = 0.20
# Latency targets (ms)
TARGET_LATENCY_MS = 1000.0
MAX_ACCEPTABLE_LATENCY_MS = 5000.0
@dataclass
class EvaluationResult:
tool_name: str
score: float
success_rate: float
latency_score: float
consistency_score: float
needs_rewrite: bool
reason: str
class ToolEvaluator:
def evaluate(self, tool_name: str) -> EvaluationResult:
summary = memory.summary(tool_name)
if summary["invocations"] < 3:
return EvaluationResult(
tool_name=tool_name,
score=1.0,
success_rate=1.0,
latency_score=1.0,
consistency_score=1.0,
needs_rewrite=False,
reason="Insufficient data (< 3 invocations)",
)
# Success rate score: direct mapping
success_rate = summary["success_rate"]
# Latency score: 1.0 if under target, linear decay to 0.0 at max
avg_latency = summary["avg_latency_ms"]
if avg_latency <= TARGET_LATENCY_MS:
latency_score = 1.0
elif avg_latency >= MAX_ACCEPTABLE_LATENCY_MS:
latency_score = 0.0
else:
latency_score = 1.0 - (
(avg_latency - TARGET_LATENCY_MS)
/ (MAX_ACCEPTABLE_LATENCY_MS - TARGET_LATENCY_MS)
)
# Consistency score: penalize high variance in latency
max_lat = summary["max_latency_ms"]
min_lat = summary["min_latency_ms"]
spread = max_lat - min_lat
if avg_latency > 0:
consistency_score = max(0.0, 1.0 - (spread / (avg_latency * 3)))
else:
consistency_score = 1.0
# Weighted composite score
score = (
WEIGHT_SUCCESS_RATE * success_rate
+ WEIGHT_LATENCY * latency_score
+ WEIGHT_CONSISTENCY * consistency_score
)
needs_rewrite = score < REWRITE_THRESHOLD
reason = self._build_reason(
score, success_rate, latency_score, consistency_score, summary
)
result = EvaluationResult(
tool_name=tool_name,
score=round(score, 3),
success_rate=round(success_rate, 3),
latency_score=round(latency_score, 3),
consistency_score=round(consistency_score, 3),
needs_rewrite=needs_rewrite,
reason=reason,
)
print(f"[Evaluator] {tool_name}: score={result.score} "
f"rewrite={'YES' if needs_rewrite else 'no'}")
return result
def _build_reason(
self, score, success_rate, latency_score, consistency_score, summary
) -> str:
issues = []
if success_rate < 0.8:
issues.append(
f"High failure rate ({1 - success_rate:.0%} failures). "
f"Recent errors: {summary['recent_errors']}"
)
if latency_score < 0.5:
issues.append(
f"Slow execution (avg {summary['avg_latency_ms']:.0f}ms, "
f"target <{TARGET_LATENCY_MS:.0f}ms)"
)
if consistency_score < 0.5:
issues.append(
f"Inconsistent latency (range: {summary['min_latency_ms']:.0f}ms "
f"- {summary['max_latency_ms']:.0f}ms)"
)
if not issues:
return "Tool performing within acceptable parameters."
return " | ".join(issues)Step 3: The Rewriter Agent
The rewriter is distinct from the generator in Part 1. The generator writes a tool from scratch. The rewriter receives the existing source code, the performance metrics, and the specific failures — and produces an improved version. This is targeted surgery, not a blank-slate rewrite.
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
REWRITE_PROMPT = ChatPromptTemplate.from_messages([
("system", """You are an expert Python engineer specializing in reliability and performance optimization.
You will receive:
1. The CURRENT source code of a tool that is underperforming.
2. Performance metrics showing what is wrong.
3. Recent error messages (if any).
Your job is to produce an IMPROVED version of the same function.
Rules:
- The function name and signature MUST remain identical.
- The function must remain self-contained (all imports inside or at the top of the snippet).
- Focus specifically on the issues described in the metrics.
- Add retry logic with exponential backoff if there are HTTP errors.
- Add timeouts to all network calls.
- Add fallback mechanisms where possible.
- Improve error handling — catch specific exceptions, not bare except.
- Use only: standard library, requests, httpx, pandas, pydantic.
- Do NOT import from langchain, openai, or any LLM library.
- Do NOT use file system access outside of /tmp.
- Return ONLY the raw Python code. No markdown, no explanation.
"""),
("human", """Rewrite this underperforming tool:
## Current Source Code:
```python
{source_code}
```
## Performance Metrics:
- Success rate: {success_rate}
- Average latency: {avg_latency_ms}ms
- Max latency: {max_latency_ms}ms
- Current score: {score} (threshold: 0.6)
## Issues Identified:
{reason}
## Recent Error Messages:
{recent_errors}
Produce an improved version that addresses these specific issues.
""")
])
class ToolRewriter:
def __init__(self, model: str = "gpt-4o"):
self.llm = ChatOpenAI(model=model, temperature=0.1)
self.chain = REWRITE_PROMPT | self.llm
def rewrite(
self,
source_code: str,
success_rate: float,
avg_latency_ms: float,
max_latency_ms: float,
score: float,
reason: str,
recent_errors: list[str],
) -> str:
print(f"[Rewriter] Generating improved version...")
response = self.chain.invoke({
"source_code": source_code,
"success_rate": f"{success_rate:.1%}",
"avg_latency_ms": f"{avg_latency_ms:.0f}",
"max_latency_ms": f"{max_latency_ms:.0f}",
"score": f"{score:.3f}",
"reason": reason,
"recent_errors": "\n".join(recent_errors) if recent_errors else "None",
})
return response.content.strip()Step 4: The Regression Runner
This is the safety net. Before any rewritten tool is promoted, it must pass the stored test suite. The runner compares the candidate's outputs, latency, and error rate against baseline expectations.
import json
import hashlib
import importlib.util
import subprocess
import sys
import tempfile
import textwrap
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Optional
from validator import ToolValidator, ValidationError
TESTS_DIR = Path(__file__).parent / "tools" / "tests"
TESTS_DIR.mkdir(parents=True, exist_ok=True)
TEST_MANIFEST = TESTS_DIR / "manifest.json"
@dataclass
class TestCase:
input_args: dict
expected_type: str # "float", "str", "dict", "list"
max_latency_ms: float
description: str
@dataclass
class RegressionResult:
passed: bool
tests_run: int
tests_passed: int
avg_latency_ms: float
errors: list[str]
class RegressionRunner:
def __init__(self):
self.validator = ToolValidator()
def store_test_case(self, tool_name: str, test_case: TestCase):
manifest = self._load_manifest()
if tool_name not in manifest:
manifest[tool_name] = []
manifest[tool_name].append({
"input_args": test_case.input_args,
"expected_type": test_case.expected_type,
"max_latency_ms": test_case.max_latency_ms,
"description": test_case.description,
})
TEST_MANIFEST.write_text(json.dumps(manifest, indent=2))
def get_test_cases(self, tool_name: str) -> list[TestCase]:
manifest = self._load_manifest()
raw_cases = manifest.get(tool_name, [])
return [
TestCase(
input_args=tc["input_args"],
expected_type=tc["expected_type"],
max_latency_ms=tc["max_latency_ms"],
description=tc["description"],
)
for tc in raw_cases
]
def run_regression(
self, tool_name: str, source_code: str
) -> RegressionResult:
print(f"[Regression] Running tests for '{tool_name}'...")
# Step 1: Validate code safety (reuses Part 1 validator)
try:
self.validator.validate(tool_name, source_code)
except ValidationError as e:
return RegressionResult(
passed=False, tests_run=0, tests_passed=0,
avg_latency_ms=0, errors=[f"Validation failed: {e}"],
)
# Step 2: Load the function
try:
exec_globals: dict = {}
exec(source_code, exec_globals) # noqa: S102
func = exec_globals[tool_name]
except Exception as e:
return RegressionResult(
passed=False, tests_run=0, tests_passed=0,
avg_latency_ms=0, errors=[f"Failed to load function: {e}"],
)
# Step 3: Run test cases
test_cases = self.get_test_cases(tool_name)
if not test_cases:
print(f"[Regression] No test cases for '{tool_name}'. Generating defaults...")
test_cases = self._generate_default_tests(tool_name, source_code)
tests_passed = 0
latencies = []
errors = []
for i, tc in enumerate(test_cases):
try:
start = time.time()
result = func(**tc.input_args)
elapsed_ms = (time.time() - start) * 1000
latencies.append(elapsed_ms)
# Check return type
expected = {"float": float, "str": str, "dict": dict, "list": list, "int": int}
expected_type = expected.get(tc.expected_type)
if expected_type and not isinstance(result, expected_type):
errors.append(
f"Test {i+1} ({tc.description}): "
f"Expected {tc.expected_type}, got {type(result).__name__}"
)
continue
# Check latency
if elapsed_ms > tc.max_latency_ms:
errors.append(
f"Test {i+1} ({tc.description}): "
f"Too slow ({elapsed_ms:.0f}ms > {tc.max_latency_ms:.0f}ms)"
)
continue
tests_passed += 1
print(f" [Test {i+1}] PASS: {tc.description} ({elapsed_ms:.0f}ms)")
except Exception as e:
errors.append(f"Test {i+1} ({tc.description}): Exception: {e}")
print(f" [Test {i+1}] FAIL: {tc.description} — {e}")
avg_lat = sum(latencies) / len(latencies) if latencies else 0
passed = tests_passed == len(test_cases) and len(test_cases) > 0
print(
f"[Regression] Result: {tests_passed}/{len(test_cases)} passed, "
f"avg latency {avg_lat:.0f}ms"
)
return RegressionResult(
passed=passed,
tests_run=len(test_cases),
tests_passed=tests_passed,
avg_latency_ms=avg_lat,
errors=errors,
)
def _generate_default_tests(
self, tool_name: str, source_code: str
) -> list[TestCase]:
"""Generate basic smoke test: just verify the function is callable
and returns without crashing. Real test cases should be added via
store_test_case() after initial tool creation."""
default = TestCase(
input_args={},
expected_type="str",
max_latency_ms=5000,
description="Smoke test — function is callable",
)
return [default]
def _load_manifest(self) -> dict:
if TEST_MANIFEST.exists():
return json.loads(TEST_MANIFEST.read_text())
return {}Step 5: The Extended Orchestrator
The orchestrator from Part 1 is extended with two critical additions:
- Metrics capture: Every tool call is wrapped in a timing context that records performance to the memory.
- Improvement loop: After every N invocations (configurable), the evaluator runs. If a tool scores below the threshold, the rewrite pipeline triggers automatically.
import hashlib
import time
from langchain_openai import ChatOpenAI
from langchain.agents import AgentExecutor, create_openai_tools_agent
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.tools import StructuredTool
from pydantic import BaseModel, Field
from registry import registry
from generator import ToolGeneratorAgent
from validator import ToolValidator, ValidationError
from performance_memory import memory, InvocationRecord
from evaluator import ToolEvaluator
from rewriter import ToolRewriter
from regression_runner import RegressionRunner
EVAL_EVERY_N = 5 # Evaluate tool quality every N invocations
MAX_REWRITES_PER_TOOL = 3
ORCHESTRATOR_PROMPT = ChatPromptTemplate.from_messages([
("system", """You are an autonomous AI agent with the ability to extend AND improve your own capabilities.
When you need to perform an action for which you have no tool:
1. Use the `request_new_tool` tool to describe exactly what you need.
2. The system will generate and register the tool automatically.
3. The new tool will then be available. Use it to complete the task.
Your tools are continuously monitored and improved. If a tool fails, retry once — the system may upgrade it between calls.
Always complete the user's task fully. Never say you cannot do something — if you lack a tool, request it.
"""),
MessagesPlaceholder("chat_history", optional=True),
("human", "{input}"),
MessagesPlaceholder("agent_scratchpad"),
])
class NewToolRequest(BaseModel):
tool_name: str = Field(
description="snake_case name for the tool, e.g. fetch_exchange_rate"
)
description: str = Field(
description="What the tool does in one clear sentence"
)
input_params: str = Field(
description="Parameters with types, e.g. 'amount: float, from_currency: str'"
)
return_type: str = Field(
description="Return type, e.g. 'float' or 'dict'"
)
example: str = Field(
description="Example call, e.g. fetch_exchange_rate(100.0, 'USD', 'ILS')"
)
class SelfImprovingOrchestrator:
def __init__(self):
self.llm = ChatOpenAI(model="gpt-4o", temperature=0)
self.generator = ToolGeneratorAgent()
self.validator = ToolValidator()
self.evaluator = ToolEvaluator()
self.rewriter = ToolRewriter()
self.regression = RegressionRunner()
self._invocation_counts: dict[str, int] = {}
self._rewrite_counts: dict[str, int] = {}
self._build_executor()
def _build_executor(self):
request_tool = StructuredTool.from_function(
func=self._handle_tool_request,
name="request_new_tool",
description=(
"Call this when you need a capability you don't have. "
"The system will generate and register the tool for you."
),
args_schema=NewToolRequest,
)
# Wrap all registered tools with metrics capture
wrapped_tools = []
for tool in registry.all_tools():
wrapped = self._wrap_with_metrics(tool)
wrapped_tools.append(wrapped)
tools = [request_tool] + wrapped_tools
agent = create_openai_tools_agent(self.llm, tools, ORCHESTRATOR_PROMPT)
self.executor = AgentExecutor(
agent=agent,
tools=tools,
verbose=True,
max_iterations=15,
handle_parsing_errors=True,
)
def _wrap_with_metrics(self, tool: StructuredTool) -> StructuredTool:
original_func = tool.func
tool_name = tool.name
orchestrator = self
def metered_func(**kwargs) -> str:
input_hash = hashlib.md5(
str(sorted(kwargs.items())).encode()
).hexdigest()[:12]
start = time.time()
success = True
error_msg = None
output_hash = None
result = None
try:
result = original_func(**kwargs)
output_hash = hashlib.md5(
str(result).encode()
).hexdigest()[:12]
return str(result)
except Exception as e:
success = False
error_msg = str(e)
raise
finally:
elapsed_ms = (time.time() - start) * 1000
version = memory.get_current_version(tool_name)
record = InvocationRecord(
tool_name=tool_name,
timestamp=time.time(),
latency_ms=elapsed_ms,
success=success,
error_message=error_msg,
input_hash=input_hash,
output_hash=output_hash,
version=version,
)
memory.record(record)
print(
f"[Metrics] {tool_name}: "
f"{'OK' if success else 'FAIL'} "
f"in {elapsed_ms:.0f}ms (v{version})"
)
# Check if we should evaluate this tool
orchestrator._invocation_counts[tool_name] = (
orchestrator._invocation_counts.get(tool_name, 0) + 1
)
if orchestrator._invocation_counts[tool_name] % EVAL_EVERY_N == 0:
orchestrator._maybe_improve(tool_name)
return StructuredTool.from_function(
func=metered_func,
name=tool_name,
description=tool.description,
)
def _maybe_improve(self, tool_name: str):
if self._rewrite_counts.get(tool_name, 0) >= MAX_REWRITES_PER_TOOL:
print(f"[Improve] '{tool_name}' hit max rewrite limit. Skipping.")
return
evaluation = self.evaluator.evaluate(tool_name)
if not evaluation.needs_rewrite:
return
print(f"\n{'='*50}")
print(f"[Improve] Tool '{tool_name}' scored {evaluation.score} — triggering rewrite")
print(f"{'='*50}\n")
current_version = memory.get_current_version(tool_name)
current_source = memory.get_version_source(tool_name, current_version)
if not current_source:
# Try to read from disk
from pathlib import Path
tool_file = Path(__file__).parent / "tools" / f"{tool_name}.py"
if tool_file.exists():
current_source = tool_file.read_text()
else:
print(f"[Improve] No source code found for '{tool_name}'. Skipping.")
return
summary = memory.summary(tool_name)
# Generate improved version
for attempt in range(1, 4):
print(f"[Improve] Rewrite attempt {attempt}/3 for '{tool_name}'")
try:
new_source = self.rewriter.rewrite(
source_code=current_source,
success_rate=summary["success_rate"],
avg_latency_ms=summary["avg_latency_ms"],
max_latency_ms=summary["max_latency_ms"],
score=evaluation.score,
reason=evaluation.reason,
recent_errors=summary["recent_errors"],
)
# Run regression tests
regression_result = self.regression.run_regression(
tool_name, new_source
)
if regression_result.passed:
new_version = current_version + 1
# Load and register the new version
exec_globals: dict = {}
exec(new_source, exec_globals) # noqa: S102
func = exec_globals[tool_name]
registry.register(tool_name, func, registry.get(tool_name).description)
registry.persist_tool(tool_name, new_source, registry.get(tool_name).description)
memory.store_version(tool_name, new_version, new_source)
self._rewrite_counts[tool_name] = (
self._rewrite_counts.get(tool_name, 0) + 1
)
# Rebuild executor with the new tool version
self._build_executor()
print(f"[Improve] '{tool_name}' upgraded to v{new_version}")
return
else:
print(
f"[Improve] Regression failed: "
f"{regression_result.errors}"
)
except Exception as e:
print(f"[Improve] Attempt {attempt} failed: {e}")
print(f"[Improve] All rewrite attempts failed for '{tool_name}'. Keeping current version.")
def _handle_tool_request(
self,
tool_name: str,
description: str,
input_params: str,
return_type: str,
example: str,
) -> str:
if registry.has(tool_name):
return f"Tool '{tool_name}' already exists. Use it directly."
for attempt in range(1, 4):
print(f"\n[Orchestrator] Generating '{tool_name}' (attempt {attempt}/3)")
try:
source_code = self.generator.generate(
tool_name=tool_name,
description=description,
input_params=input_params,
return_type=return_type,
example=example,
)
self.validator.validate(tool_name, source_code)
exec_globals: dict = {}
exec(source_code, exec_globals) # noqa: S102
func = exec_globals[tool_name]
registry.register(tool_name, func, description)
registry.persist_tool(tool_name, source_code, description)
# Store version 1 in performance memory
memory.store_version(tool_name, 1, source_code)
self._build_executor()
print(f"[Orchestrator] '{tool_name}' registered (v1).")
return (
f"Tool '{tool_name}' created and registered successfully. "
f"You can now use it to: {description}"
)
except (ValidationError, Exception) as e:
print(f"[Orchestrator] Attempt {attempt} failed: {e}")
if attempt == 3:
return f"Failed to generate '{tool_name}' after 3 attempts: {e}"
return f"Tool generation failed for '{tool_name}'."
def run(self, task: str) -> str:
print(f"\n{'='*60}")
print(f"Task: {task}")
print(f"Available tools: {registry.tool_names()}")
print(f"{'='*60}\n")
result = self.executor.invoke({"input": task})
return result["output"]Step 6: Main Entry Point
import os
from orchestrator import SelfImprovingOrchestrator
from regression_runner import RegressionRunner, TestCase
os.environ["OPENAI_API_KEY"] = "your-key-here"
def seed_test_cases():
"""Pre-load test cases for tools we expect the agent to create."""
runner = RegressionRunner()
runner.store_test_case("fetch_exchange_rate", TestCase(
input_args={"from_currency": "USD", "to_currency": "EUR"},
expected_type="float",
max_latency_ms=3000,
description="USD to EUR returns a float",
))
runner.store_test_case("fetch_exchange_rate", TestCase(
input_args={"from_currency": "USD", "to_currency": "ILS"},
expected_type="float",
max_latency_ms=3000,
description="USD to ILS returns a float",
))
runner.store_test_case("convert_currency", TestCase(
input_args={"amount": 100.0, "rate": 3.5},
expected_type="float",
max_latency_ms=100,
description="Simple multiplication returns float",
))
def main():
seed_test_cases()
agent = SelfImprovingOrchestrator()
tasks = [
"What is the current USD to ILS exchange rate? Convert 5000 USD.",
"What is the current EUR to GBP rate? Convert 2000 EUR.",
"Convert 10000 JPY to USD using the latest rate.",
]
for task in tasks:
result = agent.run(task)
print(f"\n Result: {result}\n")
print("-" * 60)
# Print final performance report
print("\n" + "=" * 60)
print("PERFORMANCE REPORT")
print("=" * 60)
from performance_memory import memory
for tool_name in ["fetch_exchange_rate", "convert_currency"]:
summary = memory.summary(tool_name)
if summary["invocations"] > 0:
print(f"\n{tool_name}:")
print(f" Invocations: {summary['invocations']}")
print(f" Success rate: {summary['success_rate']:.1%}")
print(f" Avg latency: {summary['avg_latency_ms']:.0f}ms")
print(f" Version: v{summary['current_version']}")
if __name__ == "__main__":
main()Live Demo: The Improvement Loop in Action
Here is what happens when we run the system and a tool starts underperforming. For demonstration, imagine fetch_exchange_rate v1 has been returning errors intermittently:
============================================================
Task: What is the current USD to ILS exchange rate? Convert 5000 USD.
Available tools: ['fetch_exchange_rate', 'convert_currency']
============================================================
> Invoking: `fetch_exchange_rate` with {"from_currency": "USD", "to_currency": "ILS"}
[Metrics] fetch_exchange_rate: OK in 2847ms (v1)
> Invoking: `convert_currency` with {"amount": 5000, "rate": 3.71}
[Metrics] convert_currency: OK in 1ms (v1)
Result: 5,000 USD = 18,550 ILS
------------------------------------------------------------
> Invoking: `fetch_exchange_rate` with {"from_currency": "EUR", "to_currency": "GBP"}
[Metrics] fetch_exchange_rate: FAIL in 5012ms (v1)
> Invoking: `fetch_exchange_rate` with {"from_currency": "EUR", "to_currency": "GBP"}
[Metrics] fetch_exchange_rate: OK in 3201ms (v1)
[Evaluator] fetch_exchange_rate: score=0.42 rewrite=YES
==================================================
[Improve] Tool 'fetch_exchange_rate' scored 0.42 — triggering rewrite
==================================================
[Improve] Rewrite attempt 1/3 for 'fetch_exchange_rate'
[Rewriter] Generating improved version...
[Regression] Running tests for 'fetch_exchange_rate'...
[Test 1] PASS: USD to EUR returns a float (780ms)
[Test 2] PASS: USD to ILS returns a float (650ms)
[Regression] Result: 2/2 passed, avg latency 715ms
[Improve] 'fetch_exchange_rate' upgraded to v2
> Invoking: `fetch_exchange_rate` with {"from_currency": "EUR", "to_currency": "GBP"}
[Metrics] fetch_exchange_rate: OK in 720ms (v2)
Result: 2,000 EUR = 1,714 GBPThe key moment: the agent detected degradation (score 0.42), triggered a rewrite that added retry logic and timeouts, validated the rewrite against two test cases, and promoted v2 — all within a single session. Subsequent calls use the improved version automatically.
What the Rewriter Actually Produces
Here is an example of what v1 vs v2 looks like after the rewriter fixes the issues:
v1 (generated by Part 1):
def fetch_exchange_rate(from_currency: str, to_currency: str) -> float:
"""Fetch live exchange rate between two currencies."""
import requests
resp = requests.get(
f"https://api.exchangerate-api.com/v4/latest/{from_currency}"
)
data = resp.json()
return data["rates"][to_currency]v2 (rewritten by the Self-Improving Agent):
def fetch_exchange_rate(from_currency: str, to_currency: str) -> float:
"""Fetch live exchange rate with retry logic, timeout, and fallback API."""
import requests
import time
PRIMARY_URL = "https://api.exchangerate-api.com/v4/latest/{}"
FALLBACK_URL = "https://open.er-api.com/v6/latest/{}"
def _fetch(url_template: str, retries: int = 3) -> float:
for attempt in range(retries):
try:
resp = requests.get(
url_template.format(from_currency),
timeout=3,
)
resp.raise_for_status()
data = resp.json()
rate = data["rates"][to_currency]
if not isinstance(rate, (int, float)):
raise ValueError(f"Unexpected rate type: {type(rate)}")
return float(rate)
except (requests.RequestException, KeyError, ValueError) as e:
if attempt < retries - 1:
time.sleep(0.5 * (2 ** attempt)) # exponential backoff
continue
raise
try:
return _fetch(PRIMARY_URL)
except Exception:
return _fetch(FALLBACK_URL)The rewriter identified three specific issues from the metrics and fixed all of them:
- No timeout → Added
timeout=3 - No retry logic → Added exponential backoff (3 retries)
- No fallback → Added a secondary API endpoint
The Version Archive
Every version is preserved in the performance memory database. You can query the full history:
# Get all versions of a tool
for v in range(1, memory.get_current_version("fetch_exchange_rate") + 1):
source = memory.get_version_source("fetch_exchange_rate", v)
print(f"=== Version {v} ===")
print(source[:200] + "...")This creates a full audit trail — critical for debugging and compliance in production environments.
Guardrails for Self-Improvement
1. Rewrite Limits
MAX_REWRITES_PER_TOOL = 3A tool can only be rewritten 3 times per session. This prevents infinite rewrite loops where the evaluator and rewriter disagree on quality.
2. Regression Gate
No rewrite is promoted without passing the full test suite. If all 3 attempts fail regression, the system keeps the current version and logs the failure. The old tool is always better than a broken new one.
3. Minimum Data Threshold
if summary["invocations"] < 3:
return EvaluationResult(score=1.0, needs_rewrite=False, ...)The evaluator refuses to score a tool with fewer than 3 invocations. This prevents premature rewrites based on a single bad call.
4. Version Pinning for Critical Tools
For tools that must never be auto-rewritten (financial calculations, auth flows):
PINNED_TOOLS = {"calculate_tax", "verify_signature"}
def _maybe_improve(self, tool_name: str):
if tool_name in PINNED_TOOLS:
print(f"[Improve] '{tool_name}' is pinned. Skipping auto-improvement.")
return
# ... rest of improvement logicProduction Architecture
┌──────────────────────────┐
│ Orchestrator Agent │
│ + Metrics Middleware │
└────────────┬─────────────┘
│
┌───────────────┼───────────────┐
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Performance │ │ Evaluator │ │ Rewriter │
│ Memory (SQL) │ │ (scoring) │ │ (LLM) │
└──────┬───────┘ └──────┬───────┘ └──────┬───────┘
│ │ │
└──────────────────┼──────────────────┘
▼
┌──────────────────┐
│ Regression Runner │
│ (test suite) │
└────────┬─────────┘
│ promote / rollback
┌────────▼─────────┐
│ Tool Registry │
│ (versioned) │
└──────────────────┘For production, extend with:
- PostgreSQL instead of SQLite for the performance memory (multi-instance support).
- Docker sandboxing for regression tests (isolate from host).
- Alerting when a tool is rewritten (Slack/PagerDuty webhook).
- A/B testing: run both versions in parallel and compare outputs before promoting.
Key Takeaways
- Self-extension is not enough. Generating tools once is a start. Continuously improving them is what makes the system production-grade.
- Performance memory is the foundation. Without metrics, there is no signal. Without signal, there is no improvement.
- The evaluator is the brain. A clear, weighted scoring function turns noisy metrics into actionable decisions.
- The regression runner is the safety net. Never promote a rewrite without testing it. The old version is always the fallback.
- Versioning creates accountability. Every tool version is archived. You can trace exactly what changed, when, and why.
- Guardrails prevent runaway loops. Rewrite limits, minimum data thresholds, and version pinning keep the system stable.
The Full Loop: From Static to Self-Improving
Across Parts 1 and 2, we have built a system that:
- Starts with zero tools — only the ability to request new ones.
- Generates tools on demand — writing Python functions from natural language specs.
- Validates before use — AST analysis, banned pattern detection, sandboxed execution.
- Persists tools to disk — so they survive restarts and compound over time.
- Monitors every invocation — capturing latency, errors, and output quality.
- Evaluates continuously — computing quality scores and detecting degradation.
- Rewrites underperforming tools — with full context about what went wrong.
- Tests before promoting — running regression suites against every candidate.
- Archives every version — creating a full audit trail of tool evolution.
This is no longer a static agent. This is a living system that gets better every time it runs.
📂 Source Code
All code examples from this article are available on GitHub: OneManCrew/self-improving-agent

