const ai = new Model();
await model.train(data);
import torch
def forward(self, x):
npm run deploy
git push origin main
loss.backward()
SELECT * FROM neurons
docker build -t ai .
export default App;
fn main() -> Result<>
pipe = pipeline('text')
kubectl apply -f pod.yml
console.log('hello AI')
{ }
</>
Py
TS
JS
=>
AI
Go
Rs
C#
++
λ
#!
>>
SQL
function predict(input) {
return model.run(input);
}
class NeuralNet:
def __init__(self):
self.layers = []
const config = {
epochs: 100,
lr: 0.001,
};
impl Transformer {
fn attention(&self)
-> Tensor { }
}
{ }
</>
( )=>
Back to all posts
#Agentic AI#LangChain#Python#Tool Generation#LLM#AI Agents#Advanced

The Self-Extending AI Agent Part 2: Build a Self-Improving Agent That Rewrites Its Own Tools

In Part 1, the agent learned to write new tools. In Part 2, it learns to rewrite them. Build a Self-Improving Agent with performance memory, an evaluation loop, a rewriter LLM, and a regression test suite — so your agent gets better every time it runs.

2026-04-3022 min readAI-powered

The Self-Improving Agent

In Part 1 we built a Self-Extending Agent — one that writes new tools at runtime when it encounters a capability gap. That agent can generate fetch_exchange_rate, validate it in a sandbox, persist it to disk, and use it immediately. Problem solved — once.

But what happens the second, tenth, or hundredth time that tool runs?

The generated tool might be slow (using a deprecated API endpoint). It might fail intermittently (no retry logic). It might return stale data (no cache invalidation). The agent has no idea, because it never looks back.

This article builds the next layer: a Self-Improving Agent — one that monitors every tool invocation, scores performance over time, detects degradation, rewrites underperforming tools using an LLM, validates the rewrite against a regression test suite, and promotes the new version only if it passes. All automatically.

Prerequisite: This article extends the codebase from Part 1. Read it first — this article assumes you have the registry, generator, validator, and orchestrator already working.


Why Self-Improvement Matters

Consider a tool the agent generated last week:

def fetch_exchange_rate(from_currency: str, to_currency: str) -> float:
    """Fetch live exchange rate between two currencies."""
    import requests
    resp = requests.get(
        f"https://api.exchangerate-api.com/v4/latest/{from_currency}"
    )
    data = resp.json()
    return data["rates"][to_currency]

This worked perfectly on day one. But now:

  • The API returns HTTP 429 under load — no retry logic.
  • Average latency climbed to 3.2 seconds — no timeout configured.
  • The API occasionally returns stale rates from a CDN cache — no freshness check.

A static agent would keep using this tool forever, degrading silently. A Self-Improving Agent detects the decline, rewrites the tool with retry logic, timeouts, and a fallback API, runs the regression tests, and promotes the fix — all without human intervention.


Architecture Overview

The Self-Improving Agent adds four new components on top of the Part 1 architecture:

┌─────────────────────────────────────────────────────────┐
ORCHESTRATOR AGENT
Executes toolwraps call with metrics capture
└──────────────────────┬──────────────────────────────────┘
                       │ {latency, success, output_hash}

┌─────────────────────────────────────────────────────────┐
PERFORMANCE MEMORY
Stores per-tool metrics across invocations
SQLite-backed, queryable by tool name + time range
└──────────────────────┬──────────────────────────────────┘
aggregated scores

┌─────────────────────────────────────────────────────────┐
EVALUATOR
Computes weighted quality score per tool
Triggers rewrite if score < threshold
└──────────────────────┬──────────────────────────────────┘
rewrite request + context

┌─────────────────────────────────────────────────────────┐
REWRITER AGENT
LLM receives: old code + metrics + failure logs
Produces: improved version of the tool
└──────────────────────┬──────────────────────────────────┘
candidate code

┌─────────────────────────────────────────────────────────┐
REGRESSION RUNNER
Runs stored test cases against candidate
Compares output, latency, error rate vs. baseline
│  → PROMOTE new version or ROLLBACK to old
└─────────────────────────────────────────────────────────┘

Project Setup

Extend the Part 1 project:

self_improving_agent/
├── main.py
├── registry.pyfrom Part 1 (unchanged)
├── generator.pyfrom Part 1 (unchanged)
├── validator.pyfrom Part 1 (unchanged)
├── orchestrator.pyextended with metrics capture
├── performance_memory.pyNEW
├── evaluator.pyNEW
├── rewriter.pyNEW
├── regression_runner.pyNEW
└── tools/
    ├── manifest.json
    └── tests/NEW: stored test cases
        └── manifest.json
pip install langchain langchain-openai openai pydantic

No new dependencies beyond Part 1. The performance memory uses SQLite from the standard library.


Step 1: Performance Memory

Every tool invocation is recorded. The memory stores latency, success/failure, error messages, input fingerprints, and output hashes — all in a local SQLite database.

performance_memory.py
import json
import sqlite3
import time
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import Optional
 
DB_PATH = Path(__file__).parent / "tools" / "performance.db"
 
 
@dataclass
class InvocationRecord:
    tool_name: str
    timestamp: float
    latency_ms: float
    success: bool
    error_message: Optional[str]
    input_hash: str
    output_hash: Optional[str]
    version: int
 
 
class PerformanceMemory:
    def __init__(self, db_path: Path = DB_PATH):
        self.db_path = db_path
        self.db_path.parent.mkdir(parents=True, exist_ok=True)
        self.conn = sqlite3.connect(str(self.db_path))
        self._create_tables()
 
    def _create_tables(self):
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS invocations (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                tool_name TEXT NOT NULL,
                timestamp REAL NOT NULL,
                latency_ms REAL NOT NULL,
                success INTEGER NOT NULL,
                error_message TEXT,
                input_hash TEXT NOT NULL,
                output_hash TEXT,
                version INTEGER NOT NULL DEFAULT 1
            )
        """)
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS tool_versions (
                tool_name TEXT NOT NULL,
                version INTEGER NOT NULL,
                source_code TEXT NOT NULL,
                created_at REAL NOT NULL,
                PRIMARY KEY (tool_name, version)
            )
        """)
        self.conn.commit()
 
    def record(self, rec: InvocationRecord):
        self.conn.execute(
            """INSERT INTO invocations
               (tool_name, timestamp, latency_ms, success,
                error_message, input_hash, output_hash, version)
               VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
            (
                rec.tool_name, rec.timestamp, rec.latency_ms,
                int(rec.success), rec.error_message,
                rec.input_hash, rec.output_hash, rec.version,
            ),
        )
        self.conn.commit()
 
    def get_recent(self, tool_name: str, limit: int = 20) -> list[dict]:
        cursor = self.conn.execute(
            """SELECT tool_name, timestamp, latency_ms, success,
                      error_message, input_hash, output_hash, version
               FROM invocations
               WHERE tool_name = ?
               ORDER BY timestamp DESC
               LIMIT ?""",
            (tool_name, limit),
        )
        columns = [
            "tool_name", "timestamp", "latency_ms", "success",
            "error_message", "input_hash", "output_hash", "version",
        ]
        return [dict(zip(columns, row)) for row in cursor.fetchall()]
 
    def get_current_version(self, tool_name: str) -> int:
        cursor = self.conn.execute(
            "SELECT MAX(version) FROM tool_versions WHERE tool_name = ?",
            (tool_name,),
        )
        row = cursor.fetchone()
        return row[0] if row[0] is not None else 1
 
    def store_version(self, tool_name: str, version: int, source_code: str):
        self.conn.execute(
            """INSERT OR REPLACE INTO tool_versions
               (tool_name, version, source_code, created_at)
               VALUES (?, ?, ?, ?)""",
            (tool_name, version, source_code, time.time()),
        )
        self.conn.commit()
 
    def get_version_source(self, tool_name: str, version: int) -> Optional[str]:
        cursor = self.conn.execute(
            "SELECT source_code FROM tool_versions WHERE tool_name = ? AND version = ?",
            (tool_name, version),
        )
        row = cursor.fetchone()
        return row[0] if row else None
 
    def summary(self, tool_name: str, last_n: int = 20) -> dict:
        records = self.get_recent(tool_name, last_n)
        if not records:
            return {"tool_name": tool_name, "invocations": 0}
        total = len(records)
        successes = sum(1 for r in records if r["success"])
        latencies = [r["latency_ms"] for r in records]
        errors = [r["error_message"] for r in records if r["error_message"]]
        return {
            "tool_name": tool_name,
            "invocations": total,
            "success_rate": successes / total,
            "avg_latency_ms": sum(latencies) / total,
            "max_latency_ms": max(latencies),
            "min_latency_ms": min(latencies),
            "recent_errors": errors[:5],
            "current_version": self.get_current_version(tool_name),
        }
 
 
memory = PerformanceMemory()

Step 2: The Evaluator

The evaluator consumes performance summaries and produces a single quality score between 0.0 and 1.0 for each tool. If the score drops below a configurable threshold, it flags the tool for rewriting.

evaluator.py
from dataclasses import dataclass
from performance_memory import memory
 
REWRITE_THRESHOLD = 0.6
 
# Weights for the quality score components
WEIGHT_SUCCESS_RATE = 0.50
WEIGHT_LATENCY = 0.30
WEIGHT_CONSISTENCY = 0.20
 
# Latency targets (ms)
TARGET_LATENCY_MS = 1000.0
MAX_ACCEPTABLE_LATENCY_MS = 5000.0
 
 
@dataclass
class EvaluationResult:
    tool_name: str
    score: float
    success_rate: float
    latency_score: float
    consistency_score: float
    needs_rewrite: bool
    reason: str
 
 
class ToolEvaluator:
 
    def evaluate(self, tool_name: str) -> EvaluationResult:
        summary = memory.summary(tool_name)
 
        if summary["invocations"] < 3:
            return EvaluationResult(
                tool_name=tool_name,
                score=1.0,
                success_rate=1.0,
                latency_score=1.0,
                consistency_score=1.0,
                needs_rewrite=False,
                reason="Insufficient data (< 3 invocations)",
            )
 
        # Success rate score: direct mapping
        success_rate = summary["success_rate"]
 
        # Latency score: 1.0 if under target, linear decay to 0.0 at max
        avg_latency = summary["avg_latency_ms"]
        if avg_latency <= TARGET_LATENCY_MS:
            latency_score = 1.0
        elif avg_latency >= MAX_ACCEPTABLE_LATENCY_MS:
            latency_score = 0.0
        else:
            latency_score = 1.0 - (
                (avg_latency - TARGET_LATENCY_MS)
                / (MAX_ACCEPTABLE_LATENCY_MS - TARGET_LATENCY_MS)
            )
 
        # Consistency score: penalize high variance in latency
        max_lat = summary["max_latency_ms"]
        min_lat = summary["min_latency_ms"]
        spread = max_lat - min_lat
        if avg_latency > 0:
            consistency_score = max(0.0, 1.0 - (spread / (avg_latency * 3)))
        else:
            consistency_score = 1.0
 
        # Weighted composite score
        score = (
            WEIGHT_SUCCESS_RATE * success_rate
            + WEIGHT_LATENCY * latency_score
            + WEIGHT_CONSISTENCY * consistency_score
        )
 
        needs_rewrite = score < REWRITE_THRESHOLD
        reason = self._build_reason(
            score, success_rate, latency_score, consistency_score, summary
        )
 
        result = EvaluationResult(
            tool_name=tool_name,
            score=round(score, 3),
            success_rate=round(success_rate, 3),
            latency_score=round(latency_score, 3),
            consistency_score=round(consistency_score, 3),
            needs_rewrite=needs_rewrite,
            reason=reason,
        )
        print(f"[Evaluator] {tool_name}: score={result.score} "
              f"rewrite={'YES' if needs_rewrite else 'no'}")
        return result
 
    def _build_reason(
        self, score, success_rate, latency_score, consistency_score, summary
    ) -> str:
        issues = []
        if success_rate < 0.8:
            issues.append(
                f"High failure rate ({1 - success_rate:.0%} failures). "
                f"Recent errors: {summary['recent_errors']}"
            )
        if latency_score < 0.5:
            issues.append(
                f"Slow execution (avg {summary['avg_latency_ms']:.0f}ms, "
                f"target <{TARGET_LATENCY_MS:.0f}ms)"
            )
        if consistency_score < 0.5:
            issues.append(
                f"Inconsistent latency (range: {summary['min_latency_ms']:.0f}ms "
                f"- {summary['max_latency_ms']:.0f}ms)"
            )
        if not issues:
            return "Tool performing within acceptable parameters."
        return " | ".join(issues)

Step 3: The Rewriter Agent

The rewriter is distinct from the generator in Part 1. The generator writes a tool from scratch. The rewriter receives the existing source code, the performance metrics, and the specific failures — and produces an improved version. This is targeted surgery, not a blank-slate rewrite.

rewriter.py
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
 
REWRITE_PROMPT = ChatPromptTemplate.from_messages([
    ("system", """You are an expert Python engineer specializing in reliability and performance optimization.
 
You will receive:
1. The CURRENT source code of a tool that is underperforming.
2. Performance metrics showing what is wrong.
3. Recent error messages (if any).
 
Your job is to produce an IMPROVED version of the same function.
 
Rules:
- The function name and signature MUST remain identical.
- The function must remain self-contained (all imports inside or at the top of the snippet).
- Focus specifically on the issues described in the metrics.
- Add retry logic with exponential backoff if there are HTTP errors.
- Add timeouts to all network calls.
- Add fallback mechanisms where possible.
- Improve error handling — catch specific exceptions, not bare except.
- Use only: standard library, requests, httpx, pandas, pydantic.
- Do NOT import from langchain, openai, or any LLM library.
- Do NOT use file system access outside of /tmp.
- Return ONLY the raw Python code. No markdown, no explanation.
"""),
    ("human", """Rewrite this underperforming tool:
 
## Current Source Code:
```python
{source_code}
```
 
## Performance Metrics:
- Success rate: {success_rate}
- Average latency: {avg_latency_ms}ms
- Max latency: {max_latency_ms}ms
- Current score: {score} (threshold: 0.6)
 
## Issues Identified:
{reason}
 
## Recent Error Messages:
{recent_errors}
 
Produce an improved version that addresses these specific issues.
""")
])
 
 
class ToolRewriter:
    def __init__(self, model: str = "gpt-4o"):
        self.llm = ChatOpenAI(model=model, temperature=0.1)
        self.chain = REWRITE_PROMPT | self.llm
 
    def rewrite(
        self,
        source_code: str,
        success_rate: float,
        avg_latency_ms: float,
        max_latency_ms: float,
        score: float,
        reason: str,
        recent_errors: list[str],
    ) -> str:
        print(f"[Rewriter] Generating improved version...")
        response = self.chain.invoke({
            "source_code": source_code,
            "success_rate": f"{success_rate:.1%}",
            "avg_latency_ms": f"{avg_latency_ms:.0f}",
            "max_latency_ms": f"{max_latency_ms:.0f}",
            "score": f"{score:.3f}",
            "reason": reason,
            "recent_errors": "\n".join(recent_errors) if recent_errors else "None",
        })
        return response.content.strip()

Step 4: The Regression Runner

This is the safety net. Before any rewritten tool is promoted, it must pass the stored test suite. The runner compares the candidate's outputs, latency, and error rate against baseline expectations.

regression_runner.py
import json
import hashlib
import importlib.util
import subprocess
import sys
import tempfile
import textwrap
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Optional
 
from validator import ToolValidator, ValidationError
 
TESTS_DIR = Path(__file__).parent / "tools" / "tests"
TESTS_DIR.mkdir(parents=True, exist_ok=True)
TEST_MANIFEST = TESTS_DIR / "manifest.json"
 
 
@dataclass
class TestCase:
    input_args: dict
    expected_type: str  # "float", "str", "dict", "list"
    max_latency_ms: float
    description: str
 
 
@dataclass
class RegressionResult:
    passed: bool
    tests_run: int
    tests_passed: int
    avg_latency_ms: float
    errors: list[str]
 
 
class RegressionRunner:
    def __init__(self):
        self.validator = ToolValidator()
 
    def store_test_case(self, tool_name: str, test_case: TestCase):
        manifest = self._load_manifest()
        if tool_name not in manifest:
            manifest[tool_name] = []
        manifest[tool_name].append({
            "input_args": test_case.input_args,
            "expected_type": test_case.expected_type,
            "max_latency_ms": test_case.max_latency_ms,
            "description": test_case.description,
        })
        TEST_MANIFEST.write_text(json.dumps(manifest, indent=2))
 
    def get_test_cases(self, tool_name: str) -> list[TestCase]:
        manifest = self._load_manifest()
        raw_cases = manifest.get(tool_name, [])
        return [
            TestCase(
                input_args=tc["input_args"],
                expected_type=tc["expected_type"],
                max_latency_ms=tc["max_latency_ms"],
                description=tc["description"],
            )
            for tc in raw_cases
        ]
 
    def run_regression(
        self, tool_name: str, source_code: str
    ) -> RegressionResult:
        print(f"[Regression] Running tests for '{tool_name}'...")
 
        # Step 1: Validate code safety (reuses Part 1 validator)
        try:
            self.validator.validate(tool_name, source_code)
        except ValidationError as e:
            return RegressionResult(
                passed=False, tests_run=0, tests_passed=0,
                avg_latency_ms=0, errors=[f"Validation failed: {e}"],
            )
 
        # Step 2: Load the function
        try:
            exec_globals: dict = {}
            exec(source_code, exec_globals)  # noqa: S102
            func = exec_globals[tool_name]
        except Exception as e:
            return RegressionResult(
                passed=False, tests_run=0, tests_passed=0,
                avg_latency_ms=0, errors=[f"Failed to load function: {e}"],
            )
 
        # Step 3: Run test cases
        test_cases = self.get_test_cases(tool_name)
        if not test_cases:
            print(f"[Regression] No test cases for '{tool_name}'. Generating defaults...")
            test_cases = self._generate_default_tests(tool_name, source_code)
 
        tests_passed = 0
        latencies = []
        errors = []
 
        for i, tc in enumerate(test_cases):
            try:
                start = time.time()
                result = func(**tc.input_args)
                elapsed_ms = (time.time() - start) * 1000
                latencies.append(elapsed_ms)
 
                # Check return type
                expected = {"float": float, "str": str, "dict": dict, "list": list, "int": int}
                expected_type = expected.get(tc.expected_type)
                if expected_type and not isinstance(result, expected_type):
                    errors.append(
                        f"Test {i+1} ({tc.description}): "
                        f"Expected {tc.expected_type}, got {type(result).__name__}"
                    )
                    continue
 
                # Check latency
                if elapsed_ms > tc.max_latency_ms:
                    errors.append(
                        f"Test {i+1} ({tc.description}): "
                        f"Too slow ({elapsed_ms:.0f}ms > {tc.max_latency_ms:.0f}ms)"
                    )
                    continue
 
                tests_passed += 1
                print(f"  [Test {i+1}] PASS: {tc.description} ({elapsed_ms:.0f}ms)")
 
            except Exception as e:
                errors.append(f"Test {i+1} ({tc.description}): Exception: {e}")
                print(f"  [Test {i+1}] FAIL: {tc.description}{e}")
 
        avg_lat = sum(latencies) / len(latencies) if latencies else 0
        passed = tests_passed == len(test_cases) and len(test_cases) > 0
 
        print(
            f"[Regression] Result: {tests_passed}/{len(test_cases)} passed, "
            f"avg latency {avg_lat:.0f}ms"
        )
 
        return RegressionResult(
            passed=passed,
            tests_run=len(test_cases),
            tests_passed=tests_passed,
            avg_latency_ms=avg_lat,
            errors=errors,
        )
 
    def _generate_default_tests(
        self, tool_name: str, source_code: str
    ) -> list[TestCase]:
        """Generate basic smoke test: just verify the function is callable
        and returns without crashing. Real test cases should be added via
        store_test_case() after initial tool creation."""
        default = TestCase(
            input_args={},
            expected_type="str",
            max_latency_ms=5000,
            description="Smoke test — function is callable",
        )
        return [default]
 
    def _load_manifest(self) -> dict:
        if TEST_MANIFEST.exists():
            return json.loads(TEST_MANIFEST.read_text())
        return {}

Step 5: The Extended Orchestrator

The orchestrator from Part 1 is extended with two critical additions:

  1. Metrics capture: Every tool call is wrapped in a timing context that records performance to the memory.
  2. Improvement loop: After every N invocations (configurable), the evaluator runs. If a tool scores below the threshold, the rewrite pipeline triggers automatically.
orchestrator.py
import hashlib
import time
from langchain_openai import ChatOpenAI
from langchain.agents import AgentExecutor, create_openai_tools_agent
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.tools import StructuredTool
from pydantic import BaseModel, Field
 
from registry import registry
from generator import ToolGeneratorAgent
from validator import ToolValidator, ValidationError
from performance_memory import memory, InvocationRecord
from evaluator import ToolEvaluator
from rewriter import ToolRewriter
from regression_runner import RegressionRunner
 
EVAL_EVERY_N = 5  # Evaluate tool quality every N invocations
MAX_REWRITES_PER_TOOL = 3
 
ORCHESTRATOR_PROMPT = ChatPromptTemplate.from_messages([
    ("system", """You are an autonomous AI agent with the ability to extend AND improve your own capabilities.
 
When you need to perform an action for which you have no tool:
1. Use the `request_new_tool` tool to describe exactly what you need.
2. The system will generate and register the tool automatically.
3. The new tool will then be available. Use it to complete the task.
 
Your tools are continuously monitored and improved. If a tool fails, retry once — the system may upgrade it between calls.
 
Always complete the user's task fully. Never say you cannot do something — if you lack a tool, request it.
"""),
    MessagesPlaceholder("chat_history", optional=True),
    ("human", "{input}"),
    MessagesPlaceholder("agent_scratchpad"),
])
 
 
class NewToolRequest(BaseModel):
    tool_name: str = Field(
        description="snake_case name for the tool, e.g. fetch_exchange_rate"
    )
    description: str = Field(
        description="What the tool does in one clear sentence"
    )
    input_params: str = Field(
        description="Parameters with types, e.g. 'amount: float, from_currency: str'"
    )
    return_type: str = Field(
        description="Return type, e.g. 'float' or 'dict'"
    )
    example: str = Field(
        description="Example call, e.g. fetch_exchange_rate(100.0, 'USD', 'ILS')"
    )
 
 
class SelfImprovingOrchestrator:
    def __init__(self):
        self.llm = ChatOpenAI(model="gpt-4o", temperature=0)
        self.generator = ToolGeneratorAgent()
        self.validator = ToolValidator()
        self.evaluator = ToolEvaluator()
        self.rewriter = ToolRewriter()
        self.regression = RegressionRunner()
        self._invocation_counts: dict[str, int] = {}
        self._rewrite_counts: dict[str, int] = {}
        self._build_executor()
 
    def _build_executor(self):
        request_tool = StructuredTool.from_function(
            func=self._handle_tool_request,
            name="request_new_tool",
            description=(
                "Call this when you need a capability you don't have. "
                "The system will generate and register the tool for you."
            ),
            args_schema=NewToolRequest,
        )
 
        # Wrap all registered tools with metrics capture
        wrapped_tools = []
        for tool in registry.all_tools():
            wrapped = self._wrap_with_metrics(tool)
            wrapped_tools.append(wrapped)
 
        tools = [request_tool] + wrapped_tools
        agent = create_openai_tools_agent(self.llm, tools, ORCHESTRATOR_PROMPT)
        self.executor = AgentExecutor(
            agent=agent,
            tools=tools,
            verbose=True,
            max_iterations=15,
            handle_parsing_errors=True,
        )
 
    def _wrap_with_metrics(self, tool: StructuredTool) -> StructuredTool:
        original_func = tool.func
        tool_name = tool.name
        orchestrator = self
 
        def metered_func(**kwargs) -> str:
            input_hash = hashlib.md5(
                str(sorted(kwargs.items())).encode()
            ).hexdigest()[:12]
 
            start = time.time()
            success = True
            error_msg = None
            output_hash = None
            result = None
 
            try:
                result = original_func(**kwargs)
                output_hash = hashlib.md5(
                    str(result).encode()
                ).hexdigest()[:12]
                return str(result)
            except Exception as e:
                success = False
                error_msg = str(e)
                raise
            finally:
                elapsed_ms = (time.time() - start) * 1000
                version = memory.get_current_version(tool_name)
                record = InvocationRecord(
                    tool_name=tool_name,
                    timestamp=time.time(),
                    latency_ms=elapsed_ms,
                    success=success,
                    error_message=error_msg,
                    input_hash=input_hash,
                    output_hash=output_hash,
                    version=version,
                )
                memory.record(record)
                print(
                    f"[Metrics] {tool_name}: "
                    f"{'OK' if success else 'FAIL'} "
                    f"in {elapsed_ms:.0f}ms (v{version})"
                )
 
                # Check if we should evaluate this tool
                orchestrator._invocation_counts[tool_name] = (
                    orchestrator._invocation_counts.get(tool_name, 0) + 1
                )
                if orchestrator._invocation_counts[tool_name] % EVAL_EVERY_N == 0:
                    orchestrator._maybe_improve(tool_name)
 
        return StructuredTool.from_function(
            func=metered_func,
            name=tool_name,
            description=tool.description,
        )
 
    def _maybe_improve(self, tool_name: str):
        if self._rewrite_counts.get(tool_name, 0) >= MAX_REWRITES_PER_TOOL:
            print(f"[Improve] '{tool_name}' hit max rewrite limit. Skipping.")
            return
 
        evaluation = self.evaluator.evaluate(tool_name)
        if not evaluation.needs_rewrite:
            return
 
        print(f"\n{'='*50}")
        print(f"[Improve] Tool '{tool_name}' scored {evaluation.score} — triggering rewrite")
        print(f"{'='*50}\n")
 
        current_version = memory.get_current_version(tool_name)
        current_source = memory.get_version_source(tool_name, current_version)
 
        if not current_source:
            # Try to read from disk
            from pathlib import Path
            tool_file = Path(__file__).parent / "tools" / f"{tool_name}.py"
            if tool_file.exists():
                current_source = tool_file.read_text()
            else:
                print(f"[Improve] No source code found for '{tool_name}'. Skipping.")
                return
 
        summary = memory.summary(tool_name)
 
        # Generate improved version
        for attempt in range(1, 4):
            print(f"[Improve] Rewrite attempt {attempt}/3 for '{tool_name}'")
            try:
                new_source = self.rewriter.rewrite(
                    source_code=current_source,
                    success_rate=summary["success_rate"],
                    avg_latency_ms=summary["avg_latency_ms"],
                    max_latency_ms=summary["max_latency_ms"],
                    score=evaluation.score,
                    reason=evaluation.reason,
                    recent_errors=summary["recent_errors"],
                )
 
                # Run regression tests
                regression_result = self.regression.run_regression(
                    tool_name, new_source
                )
 
                if regression_result.passed:
                    new_version = current_version + 1
 
                    # Load and register the new version
                    exec_globals: dict = {}
                    exec(new_source, exec_globals)  # noqa: S102
                    func = exec_globals[tool_name]
 
                    registry.register(tool_name, func, registry.get(tool_name).description)
                    registry.persist_tool(tool_name, new_source, registry.get(tool_name).description)
                    memory.store_version(tool_name, new_version, new_source)
 
                    self._rewrite_counts[tool_name] = (
                        self._rewrite_counts.get(tool_name, 0) + 1
                    )
 
                    # Rebuild executor with the new tool version
                    self._build_executor()
 
                    print(f"[Improve] '{tool_name}' upgraded to v{new_version}")
                    return
                else:
                    print(
                        f"[Improve] Regression failed: "
                        f"{regression_result.errors}"
                    )
 
            except Exception as e:
                print(f"[Improve] Attempt {attempt} failed: {e}")
 
        print(f"[Improve] All rewrite attempts failed for '{tool_name}'. Keeping current version.")
 
    def _handle_tool_request(
        self,
        tool_name: str,
        description: str,
        input_params: str,
        return_type: str,
        example: str,
    ) -> str:
        if registry.has(tool_name):
            return f"Tool '{tool_name}' already exists. Use it directly."
 
        for attempt in range(1, 4):
            print(f"\n[Orchestrator] Generating '{tool_name}' (attempt {attempt}/3)")
            try:
                source_code = self.generator.generate(
                    tool_name=tool_name,
                    description=description,
                    input_params=input_params,
                    return_type=return_type,
                    example=example,
                )
                self.validator.validate(tool_name, source_code)
 
                exec_globals: dict = {}
                exec(source_code, exec_globals)  # noqa: S102
                func = exec_globals[tool_name]
 
                registry.register(tool_name, func, description)
                registry.persist_tool(tool_name, source_code, description)
 
                # Store version 1 in performance memory
                memory.store_version(tool_name, 1, source_code)
 
                self._build_executor()
 
                print(f"[Orchestrator] '{tool_name}' registered (v1).")
                return (
                    f"Tool '{tool_name}' created and registered successfully. "
                    f"You can now use it to: {description}"
                )
 
            except (ValidationError, Exception) as e:
                print(f"[Orchestrator] Attempt {attempt} failed: {e}")
                if attempt == 3:
                    return f"Failed to generate '{tool_name}' after 3 attempts: {e}"
 
        return f"Tool generation failed for '{tool_name}'."
 
    def run(self, task: str) -> str:
        print(f"\n{'='*60}")
        print(f"Task: {task}")
        print(f"Available tools: {registry.tool_names()}")
        print(f"{'='*60}\n")
        result = self.executor.invoke({"input": task})
        return result["output"]

Step 6: Main Entry Point

main.py
import os
from orchestrator import SelfImprovingOrchestrator
from regression_runner import RegressionRunner, TestCase
 
os.environ["OPENAI_API_KEY"] = "your-key-here"
 
 
def seed_test_cases():
    """Pre-load test cases for tools we expect the agent to create."""
    runner = RegressionRunner()
 
    runner.store_test_case("fetch_exchange_rate", TestCase(
        input_args={"from_currency": "USD", "to_currency": "EUR"},
        expected_type="float",
        max_latency_ms=3000,
        description="USD to EUR returns a float",
    ))
    runner.store_test_case("fetch_exchange_rate", TestCase(
        input_args={"from_currency": "USD", "to_currency": "ILS"},
        expected_type="float",
        max_latency_ms=3000,
        description="USD to ILS returns a float",
    ))
    runner.store_test_case("convert_currency", TestCase(
        input_args={"amount": 100.0, "rate": 3.5},
        expected_type="float",
        max_latency_ms=100,
        description="Simple multiplication returns float",
    ))
 
 
def main():
    seed_test_cases()
    agent = SelfImprovingOrchestrator()
 
    tasks = [
        "What is the current USD to ILS exchange rate? Convert 5000 USD.",
        "What is the current EUR to GBP rate? Convert 2000 EUR.",
        "Convert 10000 JPY to USD using the latest rate.",
    ]
 
    for task in tasks:
        result = agent.run(task)
        print(f"\n Result: {result}\n")
        print("-" * 60)
 
    # Print final performance report
    print("\n" + "=" * 60)
    print("PERFORMANCE REPORT")
    print("=" * 60)
    from performance_memory import memory
    for tool_name in ["fetch_exchange_rate", "convert_currency"]:
        summary = memory.summary(tool_name)
        if summary["invocations"] > 0:
            print(f"\n{tool_name}:")
            print(f"  Invocations: {summary['invocations']}")
            print(f"  Success rate: {summary['success_rate']:.1%}")
            print(f"  Avg latency: {summary['avg_latency_ms']:.0f}ms")
            print(f"  Version: v{summary['current_version']}")
 
 
if __name__ == "__main__":
    main()

Live Demo: The Improvement Loop in Action

Here is what happens when we run the system and a tool starts underperforming. For demonstration, imagine fetch_exchange_rate v1 has been returning errors intermittently:

============================================================
Task: What is the current USD to ILS exchange rate? Convert 5000 USD.
Available tools: ['fetch_exchange_rate', 'convert_currency']
============================================================
 
> Invoking: `fetch_exchange_rate` with {"from_currency": "USD", "to_currency": "ILS"}
[Metrics] fetch_exchange_rate: OK in 2847ms (v1)
 
> Invoking: `convert_currency` with {"amount": 5000, "rate": 3.71}
[Metrics] convert_currency: OK in 1ms (v1)
 
 Result: 5,000 USD = 18,550 ILS
 
------------------------------------------------------------
 
> Invoking: `fetch_exchange_rate` with {"from_currency": "EUR", "to_currency": "GBP"}
[Metrics] fetch_exchange_rate: FAIL in 5012ms (v1)
 
> Invoking: `fetch_exchange_rate` with {"from_currency": "EUR", "to_currency": "GBP"}
[Metrics] fetch_exchange_rate: OK in 3201ms (v1)
 
[Evaluator] fetch_exchange_rate: score=0.42 rewrite=YES
 
==================================================
[Improve] Tool 'fetch_exchange_rate' scored 0.42triggering rewrite
==================================================
 
[Improve] Rewrite attempt 1/3 for 'fetch_exchange_rate'
[Rewriter] Generating improved version...
[Regression] Running tests for 'fetch_exchange_rate'...
  [Test 1] PASS: USD to EUR returns a float (780ms)
  [Test 2] PASS: USD to ILS returns a float (650ms)
[Regression] Result: 2/2 passed, avg latency 715ms
[Improve] 'fetch_exchange_rate' upgraded to v2
 
> Invoking: `fetch_exchange_rate` with {"from_currency": "EUR", "to_currency": "GBP"}
[Metrics] fetch_exchange_rate: OK in 720ms (v2)
 
 Result: 2,000 EUR = 1,714 GBP

The key moment: the agent detected degradation (score 0.42), triggered a rewrite that added retry logic and timeouts, validated the rewrite against two test cases, and promoted v2 — all within a single session. Subsequent calls use the improved version automatically.


What the Rewriter Actually Produces

Here is an example of what v1 vs v2 looks like after the rewriter fixes the issues:

v1 (generated by Part 1):

def fetch_exchange_rate(from_currency: str, to_currency: str) -> float:
    """Fetch live exchange rate between two currencies."""
    import requests
    resp = requests.get(
        f"https://api.exchangerate-api.com/v4/latest/{from_currency}"
    )
    data = resp.json()
    return data["rates"][to_currency]

v2 (rewritten by the Self-Improving Agent):

def fetch_exchange_rate(from_currency: str, to_currency: str) -> float:
    """Fetch live exchange rate with retry logic, timeout, and fallback API."""
    import requests
    import time
 
    PRIMARY_URL = "https://api.exchangerate-api.com/v4/latest/{}"
    FALLBACK_URL = "https://open.er-api.com/v6/latest/{}"
 
    def _fetch(url_template: str, retries: int = 3) -> float:
        for attempt in range(retries):
            try:
                resp = requests.get(
                    url_template.format(from_currency),
                    timeout=3,
                )
                resp.raise_for_status()
                data = resp.json()
                rate = data["rates"][to_currency]
                if not isinstance(rate, (int, float)):
                    raise ValueError(f"Unexpected rate type: {type(rate)}")
                return float(rate)
            except (requests.RequestException, KeyError, ValueError) as e:
                if attempt < retries - 1:
                    time.sleep(0.5 * (2 ** attempt))  # exponential backoff
                    continue
                raise
 
    try:
        return _fetch(PRIMARY_URL)
    except Exception:
        return _fetch(FALLBACK_URL)

The rewriter identified three specific issues from the metrics and fixed all of them:

  1. No timeout → Added timeout=3
  2. No retry logic → Added exponential backoff (3 retries)
  3. No fallback → Added a secondary API endpoint

The Version Archive

Every version is preserved in the performance memory database. You can query the full history:

# Get all versions of a tool
for v in range(1, memory.get_current_version("fetch_exchange_rate") + 1):
    source = memory.get_version_source("fetch_exchange_rate", v)
    print(f"=== Version {v} ===")
    print(source[:200] + "...")

This creates a full audit trail — critical for debugging and compliance in production environments.


Guardrails for Self-Improvement

1. Rewrite Limits

MAX_REWRITES_PER_TOOL = 3

A tool can only be rewritten 3 times per session. This prevents infinite rewrite loops where the evaluator and rewriter disagree on quality.

2. Regression Gate

No rewrite is promoted without passing the full test suite. If all 3 attempts fail regression, the system keeps the current version and logs the failure. The old tool is always better than a broken new one.

3. Minimum Data Threshold

if summary["invocations"] < 3:
    return EvaluationResult(score=1.0, needs_rewrite=False, ...)

The evaluator refuses to score a tool with fewer than 3 invocations. This prevents premature rewrites based on a single bad call.

4. Version Pinning for Critical Tools

For tools that must never be auto-rewritten (financial calculations, auth flows):

PINNED_TOOLS = {"calculate_tax", "verify_signature"}
 
def _maybe_improve(self, tool_name: str):
    if tool_name in PINNED_TOOLS:
        print(f"[Improve] '{tool_name}' is pinned. Skipping auto-improvement.")
        return
    # ... rest of improvement logic

Production Architecture

                 ┌──────────────────────────┐
Orchestrator Agent
+ Metrics Middleware
                 └────────────┬─────────────┘

              ┌───────────────┼───────────────┐
              ▼               ▼               ▼
   ┌──────────────┐  ┌──────────────┐  ┌──────────────┐
Performance   │  │  Evaluator   │  │   Rewriter
Memory (SQL)  │  │  (scoring)   │  │   (LLM)      │
   └──────┬───────┘  └──────┬───────┘  └──────┬───────┘
          │                  │                  │
          └──────────────────┼──────────────────┘

                  ┌──────────────────┐
Regression Runner
                  │ (test suite)      │
                  └────────┬─────────┘
promote / rollback
                  ┌────────▼─────────┐
Tool Registry
                  │  (versioned)     │
                  └──────────────────┘

For production, extend with:

  • PostgreSQL instead of SQLite for the performance memory (multi-instance support).
  • Docker sandboxing for regression tests (isolate from host).
  • Alerting when a tool is rewritten (Slack/PagerDuty webhook).
  • A/B testing: run both versions in parallel and compare outputs before promoting.

Key Takeaways

  • Self-extension is not enough. Generating tools once is a start. Continuously improving them is what makes the system production-grade.
  • Performance memory is the foundation. Without metrics, there is no signal. Without signal, there is no improvement.
  • The evaluator is the brain. A clear, weighted scoring function turns noisy metrics into actionable decisions.
  • The regression runner is the safety net. Never promote a rewrite without testing it. The old version is always the fallback.
  • Versioning creates accountability. Every tool version is archived. You can trace exactly what changed, when, and why.
  • Guardrails prevent runaway loops. Rewrite limits, minimum data thresholds, and version pinning keep the system stable.

The Full Loop: From Static to Self-Improving

Across Parts 1 and 2, we have built a system that:

  1. Starts with zero tools — only the ability to request new ones.
  2. Generates tools on demand — writing Python functions from natural language specs.
  3. Validates before use — AST analysis, banned pattern detection, sandboxed execution.
  4. Persists tools to disk — so they survive restarts and compound over time.
  5. Monitors every invocation — capturing latency, errors, and output quality.
  6. Evaluates continuously — computing quality scores and detecting degradation.
  7. Rewrites underperforming tools — with full context about what went wrong.
  8. Tests before promoting — running regression suites against every candidate.
  9. Archives every version — creating a full audit trail of tool evolution.

This is no longer a static agent. This is a living system that gets better every time it runs.


📂 Source Code

All code examples from this article are available on GitHub: OneManCrew/self-improving-agent

Share this article