Back to blog
FILE 0xE9·THE ONE WRAPPER THAT MAKES LLM TOOL CALLS PRODUCTION-READY

The one wrapper that makes LLM tool calls production-ready

June 7, 2026 · claude, ai-agents, cass, python, build-your-own-cass

The first version of Cass’s job pipeline crashed at 3 AM and I didn’t know until morning. An HTTP timeout in one tool bubbled up as an exception, the Lambda failed, and the coordinator moved on. Forty listings processed, zero submitted.

The fix was one function. It changed how I design every tool.

The problem with raw tool calls

When Claude calls a tool, it expects one of two things back:

  1. A tool result (any string — JSON, plain text, whatever the tool returns)
  2. An error (a tool_result with is_error: true)

If your tool raises a Python exception instead, the Claude API call itself succeeds — but the exception propagates up your Lambda handler, the Lambda crashes, and the conversation is over. Claude never gets a chance to recover.

The naive implementation:

def score_listing(listing_id: str) -> str:
    listing = table.get_item(Key={"id": listing_id})["Item"]
    score = compute_score(listing)  # may raise
    return json.dumps({"score": score})

If compute_score raises, you’re done. Not just this tool call — the whole agent.

The wrapper

import functools
import traceback
import time

def safe_tool(fn):
    @functools.wraps(fn)
    def wrapper(*args, **kwargs):
        start = time.time()
        try:
            result = fn(*args, **kwargs)
            elapsed = time.time() - start
            print(f"[tool] {fn.__name__} OK ({elapsed:.2f}s)")
            return result
        except Exception as e:
            elapsed = time.time() - start
            tb = traceback.format_exc()
            print(f"[tool] {fn.__name__} ERROR ({elapsed:.2f}s): {e}\n{tb}")
            return f"Error: {fn.__name__} failed — {type(e).__name__}: {e}"
    return wrapper

Apply it as a decorator:

@safe_tool
def score_listing(listing_id: str) -> str:
    listing = table.get_item(Key={"id": listing_id})["Item"]
    score = compute_score(listing)
    return json.dumps({"score": score})

Now if compute_score raises, the tool returns "Error: score_listing failed — KeyError: 'skills'" instead of crashing. Claude gets this as a tool result, reads it, and decides what to do — skip the listing, try a different approach, or report that it can’t proceed.

Why the error string format matters

The exact format "Error: <message>" is important. Your system prompt should tell the agent:

If a tool returns a string starting with “Error:”, treat it as a failure. Don’t retry the same call more than once. Move on or report the problem.

This gives Claude a consistent signal to recognize failures. Without it, Claude might re-call the same failing tool in a loop until you run out of context or hit your iteration cap.

The system prompt addition:

SYSTEM_PROMPT = """
You are a job application agent...

Tool use rules:
- If a tool returns "Error: ...", that tool has failed. Do not call it
  again with the same arguments.
- After two consecutive tool failures, stop and return a summary of
  what you accomplished before the failures.
- Never invent tool results. If you don't have a result, say so.
"""

Retry detection

The wrapper prevents crashes. But Claude might still call a failing tool twice before heeding the system prompt. Add explicit retry detection to your agent loop:

last_tool = None
tool_retries = 0

for attempt in range(8):
    response = client.messages.create(...)

    for block in response.content:
        if block.type != "tool_use":
            continue

        if block.name == last_tool:
            tool_retries += 1
            if tool_retries >= 2:
                tool_results.append({
                    "type": "tool_result",
                    "tool_use_id": block.id,
                    "content": f"Error: {block.name} has been called twice with no success. Stop retrying."
                })
                continue
        else:
            last_tool = block.name
            tool_retries = 0

        result = dispatch_tool(block.name, block.input)
        tool_results.append({"type": "tool_result", "tool_use_id": block.id, "content": result})

This caps retries at the agent loop level regardless of what the system prompt says. Defense in depth.

Timeout protection

Some tools are slow. An S3 fetch or a third-party API call can hang for 30+ seconds, burning Lambda time and making the caller wait. Add a timeout:

import concurrent.futures

def safe_tool_with_timeout(timeout_seconds: int = 10):
    def decorator(fn):
        @functools.wraps(fn)
        def wrapper(*args, **kwargs):
            with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
                future = executor.submit(fn, *args, **kwargs)
                try:
                    return future.result(timeout=timeout_seconds)
                except concurrent.futures.TimeoutError:
                    return f"Error: {fn.__name__} timed out after {timeout_seconds}s"
                except Exception as e:
                    return f"Error: {fn.__name__} failed — {type(e).__name__}: {e}"
        return wrapper
    return decorator

@safe_tool_with_timeout(timeout_seconds=15)
def fetch_company_context(company_name: str) -> str:
    return search_web(company_name)  # can be slow

What this changes about tool design

Once you have safe_tool, the rule for tool design changes:

Before: Tools can raise exceptions; callers handle them.

After: Tools never raise. Callers (Claude) handle errors as strings.

This flips your error handling philosophy. Instead of propagating exceptions up, you absorb them at the boundary and communicate failures as data. The agent becomes the error handler.

It also means you can log comprehensively inside safe_tool without polluting every tool function with logging code. The wrapper is the single place where you add monitoring, tracing, or alerting.

One more thing: rate limiting

Some APIs will rate-limit you if the agent calls them in a tight loop. Add rate limiting to the wrapper:

import threading
import time as time_module

class RateLimiter:
    def __init__(self, calls_per_second: float = 1.0):
        self.min_interval = 1.0 / calls_per_second
        self.last_called = 0.0
        self.lock = threading.Lock()

    def wait(self):
        with self.lock:
            elapsed = time_module.time() - self.last_called
            wait_time = self.min_interval - elapsed
            if wait_time > 0:
                time_module.sleep(wait_time)
            self.last_called = time_module.time()

_hn_limiter = RateLimiter(calls_per_second=0.5)

@safe_tool
def search_hn_jobs(query: str) -> str:
    _hn_limiter.wait()
    return _do_search(query)

Rate limiting inside the tool means Claude never has to know about it. The tool just runs at whatever pace is safe.


This wrapper is in every agent I ship now. Module 3 of Build Your Own Cass covers it in full, including the complete safe_tool implementation with logging, timeouts, and the system-prompt patterns that make it work end-to-end.