Writing

Genkit Middleware in Python: Retries, Fallbacks, Tool Approval

Here's what breaks in production without middleware:

Here’s what breaks in production without middleware:

# This fails silently when the model is overloaded
response = await ai.generate(prompt=user_input)

# This calls an expensive external API without any human review
response = await ai.generate(prompt='Transfer $5000 to account X', tools=[transfer_money])

# This crashes your app when your primary model hits quota
response = await ai.generate(model='googleai/gemini-2.5-pro', prompt=user_input)

Three separate failure modes. Three separate “I should add retry logic somewhere” thoughts. Middleware is how Genkit solves all of them with a consistent, composable pattern.


What Middleware Is in Genkit Python

Middleware in Genkit is a class that wraps the generate pipeline. There are three hooks:

  • wrap_generate — called once per full tool loop iteration (model call + optional tool resolution)
  • wrap_model — called for each raw model API call
  • wrap_tool — called for each tool execution

You subclass BaseMiddleware, override the hook(s) you care about, and pass an instance to use=[...]:

from genkit.middleware import BaseMiddleware, GenerateMiddlewareContext

class LoggingMiddleware(BaseMiddleware):
    async def wrap_model(self, params, ctx: GenerateMiddlewareContext, next_fn):
        print(f'→ model call: {len(params.request.messages)} messages')
        response = await next_fn(params, ctx)
        print(f'← finish reason: {response.finish_reason}')
        return response

response = await ai.generate(
    prompt='Hello',
    use=[LoggingMiddleware()],
)

The next_fn call runs the rest of the pipeline. Call it to proceed, don’t call it to intercept. Return a different response to substitute. Raise an exception to abort.

Execution order with use=[A(), B()]:

A.wrap_model (before)
    B.wrap_model (before)
        model call
    B.wrap_model (after)
A.wrap_model (after)

First middleware in the list is the outermost wrapper.


The Middleware Plugin

The built-in middleware classes live in genkit.plugins.middleware. To use them, you need to register the plugin:

from genkit import Genkit
from genkit.plugins.google_genai import GoogleAI
from genkit.plugins.middleware import Middleware, Retry, Fallback, ToolApproval

ai = Genkit(
    plugins=[GoogleAI(), Middleware()],   # <-- register the plugin
    model='googleai/gemini-2.0-flash',
)

Middleware() registers the built-in middleware descriptors so they appear in the Dev UI. You still pass instances into use=[...] at the call site.

Install:

uv add "genkit[google-genai] @ git+https://github.com/firebase/genkit.git#subdirectory=py/packages/genkit"
# The middleware plugin is included in the monorepo install

Retry Middleware

Rate limits (RESOURCE_EXHAUSTED) and transient network errors are facts of life with any LLM API. Without retry logic, these manifest as hard failures.

from genkit.plugins.middleware import Retry

response = await ai.generate(
    prompt='Summarize this document...',
    use=[Retry(max_retries=3)],
)

That’s the minimal usage. With defaults, Retry retries up to 3 times on these statuses: UNAVAILABLE, DEADLINE_EXCEEDED, RESOURCE_EXHAUSTED, ABORTED, INTERNAL. It uses exponential backoff with jitter starting at 1 second, capping at 60 seconds.

Configuring retry behavior

from genkit.plugins.middleware import Retry

# Aggressive retry for a long-running batch job
batch_retry = Retry(
    max_retries=5,
    initial_delay_ms=500,    # start at 500ms
    max_delay_ms=30_000,     # cap at 30s
    backoff_factor=2.0,      # double each attempt
    no_jitter=False,         # jitter prevents thundering herd (default)
)

# Tight retry for a latency-sensitive user request
fast_retry = Retry(
    max_retries=2,
    initial_delay_ms=200,
    max_delay_ms=2_000,
)

# Custom retry statuses — only retry quota errors, not server errors
quota_retry = Retry(
    max_retries=3,
    statuses=['RESOURCE_EXHAUSTED'],
)

The RetryConfig parameters:

ParameterDefaultDescription
max_retries3Maximum retry attempts
initial_delay_ms1000First retry delay in ms
max_delay_ms60000Maximum delay cap in ms
backoff_factor2.0Multiplier per attempt
no_jitterFalseDisable jitter (use for tests)
statusesSee aboveWhich error statuses trigger retry

Retry hooks into wrap_model — it retries at the raw model API call level, not at the flow level. If a retry succeeds, the response is transparent to the caller.


Fallback Middleware

Retry handles transient failures. Fallback handles sustained failures: your primary model is down, quota is exhausted for the day, or you need geographic failover.

from genkit.plugins.middleware import Fallback

response = await ai.generate(
    prompt='Write a product description...',
    use=[
        Fallback(models=['googleai/gemini-flash-latest', 'googleai/gemini-2.0-flash']),
    ],
)

If the default model (gemini-2.0-flash) fails with a qualifying status, Genkit automatically retries with gemini-flash-latest, then with gemini-2.0-flash (as a second fallback). If all models fail, the last error is raised.

Fallback statuses (what triggers a fallback): UNAVAILABLE, DEADLINE_EXCEEDED, RESOURCE_EXHAUSTED, ABORTED, INTERNAL, NOT_FOUND, UNIMPLEMENTED.

Custom fallback statuses

# Only fall back on quota exhaustion, not on other errors
quota_fallback = Fallback(
    models=['googleai/gemini-flash-latest'],
    statuses=['RESOURCE_EXHAUSTED', 'UNAVAILABLE'],
)

Fallback hooks into wrap_model at the raw model call level. If a fallback succeeds, the response comes back as normal—callers don’t need to know which model served the request.


Tool Approval Middleware

This is the pattern I use for any agent that can take real-world actions. Without it, the model can call any tool it wants, any time.

from genkit.plugins.middleware import ToolApproval

# Allow read operations; block write operations
response = await ai.generate(
    prompt='Look at the workspace and fix the bug in main.py',
    tools=[read_file, list_files, write_file, edit_file],
    use=[ToolApproval(allowed_tools=['read_file', 'list_files'])],
)

When the model tries to call write_file or edit_file, ToolApproval raises an Interrupt instead of executing the tool. The caller sees response.interrupts populated—you can then show the user what the agent wants to do and ask for approval.

The full approval loop

from genkit import Genkit, restart_tool
from genkit.plugins.middleware import ToolApproval

async def agent_with_approval(user_request: str) -> str:
    messages = []
    restart: list | None = None

    while True:
        response = await ai.generate(
            prompt=user_request if restart is None else None,
            messages=messages,
            resume_restart=restart,
            tools=[read_file, write_file],
            use=[ToolApproval(allowed_tools=['read_file'])],
        )

        if not response.interrupts:
            # No approval needed — we're done
            return response.text

        # Show the user what the agent wants to do
        interrupt = response.interrupts[0]
        tool_name = interrupt.tool_request.name
        tool_input = interrupt.tool_request.input

        print(f'\nAgent wants to call: {tool_name}')
        print(f'With input: {tool_input}')
        decision = input('Approve? [y/N]: ').strip().lower()

        if decision != 'y':
            messages = response.messages
            return f'Action denied: {tool_name}'

        # Approved — resume with the tool marked as approved
        restart = [restart_tool(interrupt, metadata={'toolApproved': True})]
        messages = response.messages

How it works internally: ToolApproval.wrap_tool intercepts each tool call. If the tool isn’t in allowed_tools, it checks the metadata on the tool request for toolApproved: true. If that flag is absent, it raises Interrupt. On resume (via resume_restart), the metadata is present, so the tool executes normally.


Logging Middleware

Simple telemetry for every model call:

import time
from genkit.middleware import BaseMiddleware, GenerateMiddlewareContext

class LoggingMiddleware(BaseMiddleware):
    """Log request/response details and timing for every model call."""

    async def wrap_model(self, params, ctx: GenerateMiddlewareContext, next_fn):
        start = time.monotonic()
        message_count = len(params.request.messages)

        try:
            response = await next_fn(params, ctx)
            elapsed_ms = (time.monotonic() - start) * 1000

            print(
                f'[genkit] model={params.request.config} '
                f'messages={message_count} '
                f'finish={response.finish_reason} '
                f'tokens={response.usage.total_tokens if response.usage else "?"} '
                f'elapsed={elapsed_ms:.0f}ms'
            )
            return response

        except Exception as e:
            elapsed_ms = (time.monotonic() - start) * 1000
            print(f'[genkit] ERROR after {elapsed_ms:.0f}ms: {e}')
            raise

For production, wire this into your observability platform:

import structlog

logger = structlog.get_logger(__name__)

class StructuredLoggingMiddleware(BaseMiddleware):
    async def wrap_model(self, params, ctx: GenerateMiddlewareContext, next_fn):
        start = time.monotonic()
        try:
            response = await next_fn(params, ctx)
            await logger.ainfo(
                'genkit.model.call',
                finish_reason=str(response.finish_reason),
                tokens=response.usage.total_tokens if response.usage else None,
                elapsed_ms=round((time.monotonic() - start) * 1000),
            )
            return response
        except Exception as e:
            await logger.aerror('genkit.model.error', error=str(e))
            raise

LoggingMiddleware requires no constructor config—just instantiate and pass in.


Composing Multiple Middleware Pieces

Here’s where the pattern pays off. Compose with use=[...]:

response = await ai.generate(
    prompt='...',
    use=[
        LoggingMiddleware(),           # outermost — sees everything
        Retry(max_retries=3),          # retry before falling back
        Fallback(models=['googleai/gemini-flash-latest']),
    ],
)

Execution order for wrap_model:

LoggingMiddleware (start timer, log)
    Retry (attempt 1)
        Fallback (try primary)
            model API call  ← might fail here
        Fallback (try gemini-flash-latest if failed)
    Retry (attempt 2 if fallback also failed)
LoggingMiddleware (log result + elapsed)

Retry + Fallback compose naturally: Retry retries the same model; Fallback tries a different model. Put Retry inside Fallback (closer to the model call) if you want to retry each fallback model individually. Put Fallback inside Retry if you want to switch models immediately on first failure.


Complete Production-Ready Example

A flow that combines retry, fallback, and logging—exactly what you’d deploy for a user-facing feature:

# src/production_flow.py
import time
from pydantic import BaseModel
from genkit import Genkit
from genkit.middleware import BaseMiddleware, GenerateMiddlewareContext
from genkit.plugins.google_genai import GoogleAI
from genkit.plugins.middleware import Middleware, Retry, Fallback

ai = Genkit(
    plugins=[GoogleAI(), Middleware()],
    model='googleai/gemini-2.5-pro',      # primary: best quality
)


# ── Cost + latency telemetry ─────────────────────────────────────────────────

class TelemetryMiddleware(BaseMiddleware):
    """Track tokens and latency for cost attribution."""

    async def wrap_model(self, params, ctx: GenerateMiddlewareContext, next_fn):
        start = time.monotonic()
        try:
            response = await next_fn(params, ctx)
            elapsed_ms = (time.monotonic() - start) * 1000
            usage = response.usage

            if usage:
                # Replace with your metrics backend
                print(
                    f'[telemetry] '
                    f'input_tokens={usage.input_tokens} '
                    f'output_tokens={usage.output_tokens} '
                    f'total_tokens={usage.total_tokens} '
                    f'elapsed_ms={elapsed_ms:.0f}'
                )
            return response
        except Exception as e:
            print(f'[telemetry] error: {e}')
            raise


# ── Middleware stack ─────────────────────────────────────────────────────────

# Reusable stack for production generate calls
PRODUCTION_MIDDLEWARE = [
    TelemetryMiddleware(),                            # outermost: always logs
    Retry(                                            # retry transient errors
        max_retries=3,
        initial_delay_ms=1000,
        max_delay_ms=30_000,
    ),
    Fallback(                                         # fallback on sustained failure
        models=[
            'googleai/gemini-flash-latest',           # tier 2: balanced
            'googleai/gemini-2.0-flash',              # tier 3: fast, cheap
        ],
        statuses=['RESOURCE_EXHAUSTED', 'UNAVAILABLE', 'DEADLINE_EXCEEDED'],
    ),
]


# ── Flow ─────────────────────────────────────────────────────────────────────

class SummarizeInput(BaseModel):
    text: str
    max_bullets: int = 5

class SummarizeOutput(BaseModel):
    bullets: list[str]
    sentiment: str
    confidence: float

@ai.flow()
async def summarize_resilient(input: SummarizeInput) -> SummarizeOutput:
    """Summarize text with retry, fallback, and telemetry."""
    response = await ai.generate(
        prompt=f"""
Summarize the following text into at most {input.max_bullets} bullet points.
Also assess the overall sentiment (positive/negative/neutral) and your confidence (0.0-1.0).

Text:
{input.text}
""",
        output_format='json',
        output_schema=SummarizeOutput,
        use=PRODUCTION_MIDDLEWARE,
    )
    return response.output


async def main():
    result = await summarize_resilient(SummarizeInput(
        text="""
        The new product launch exceeded all expectations. Customer feedback has been 
        overwhelmingly positive, with satisfaction scores hitting record highs. 
        The engineering team delivered on time despite significant technical challenges.
        Revenue projections for Q3 are being revised upward.
        """,
    ))
    for bullet in result.bullets:
        print(f'  • {bullet}')
    print(f'Sentiment: {result.sentiment} (confidence: {result.confidence:.0%})')


if __name__ == '__main__':
    ai.run_main(main())

Output:

[telemetry] input_tokens=187 output_tokens=89 total_tokens=276 elapsed_ms=1243
  • Product launch exceeded expectations with record-high customer satisfaction
  • Engineering team delivered on schedule despite technical obstacles
  • Q3 revenue projections revised upward following strong launch performance
Sentiment: positive (confidence: 95%)

If gemini-2.5-pro hits quota, the call transparently retries 3 times with backoff, then falls back to gemini-flash-latest, then to gemini-2.0-flash. Your application code doesn’t know which model answered.


Registering Middleware with the Dev UI

To make middleware visible in the Genkit Dev UI (so you can toggle it per-request during development), register it with @ai.middleware:

@ai.middleware(name='telemetry_mw')
class TelemetryMiddleware(BaseMiddleware):
    async def wrap_model(self, params, ctx, next_fn):
        ...

The name must be a single token (no slashes, colons, or whitespace). Registered middleware appears in the Dev UI’s middleware panel, where you can combine middleware visually and run flows with different stacks.

You don’t need registration to use middleware in code. use=[LoggingMiddleware()] works without @ai.middleware. Registration is purely for Dev UI visibility.


Middleware with Config

When your middleware has tunable parameters, declare a BaseModel config:

from pydantic import BaseModel
from genkit.middleware import BaseMiddleware

class BudgetConfig(BaseModel):
    max_tokens: int = 4000
    warn_at: int = 3000

@ai.middleware(name='token_budget_mw')
class TokenBudgetMiddleware(BaseMiddleware[BudgetConfig]):
    """Warn when approaching token budget; block over-budget calls."""

    async def wrap_model(self, params, ctx, next_fn):
        # Count input tokens (rough estimate)
        text = ' '.join(
            part.text
            for msg in params.request.messages
            for part in (msg.content or [])
            if hasattr(part, 'text') and part.text
        )
        estimated_tokens = len(text.split()) * 1.3  # rough approximation

        if estimated_tokens > self.config.max_tokens:
            raise ValueError(
                f'Request exceeds token budget: {estimated_tokens:.0f} > {self.config.max_tokens}'
            )
        if estimated_tokens > self.config.warn_at:
            print(f'[budget] Warning: {estimated_tokens:.0f} tokens approaching limit')

        return await next_fn(params, ctx)

# Per-call configuration
response = await ai.generate(
    prompt=very_long_document,
    use=[TokenBudgetMiddleware(max_tokens=8000, warn_at=6000)],
)

The BaseMiddleware[BudgetConfig] syntax wires up config automatically. You can pass keyword args directly: TokenBudgetMiddleware(max_tokens=8000) or pass a config object: TokenBudgetMiddleware(config=BudgetConfig(max_tokens=8000)).


Quick Reference

Built-in middleware

ClassHookPurpose
Retrywrap_modelExponential backoff on transient errors
Fallbackwrap_modelSwitch to backup models on failure
ToolApprovalwrap_toolHuman-in-the-loop before tool execution
SkillsExpose .skill.md files as system prompts + tool
FilesystemSandboxed file ops (read/write/edit)
ArtifactsSession artifact management

Hooks and when to use them

HookCalled whenUse for
wrap_generateEach tool loop iterationToken counting, conversation-level logging
wrap_modelEach raw model API callRetry, fallback, request/response inspection
wrap_toolEach tool executionApproval, tool output transformation, audit logging

Common patterns

# Retry only
use=[Retry(max_retries=3)]

# Retry + fallback
use=[Retry(max_retries=2), Fallback(models=['googleai/gemini-flash-latest'])]

# Tool approval (block all tools by default)
use=[ToolApproval(allowed_tools=[])]

# Tool approval (allow safe tools, block dangerous ones)
use=[ToolApproval(allowed_tools=['read_file', 'search'])]

# Full production stack
use=[LoggingMiddleware(), Retry(max_retries=3), Fallback(models=[...])]

The full middleware source is in py/plugins/middleware/ of the firebase/genkit repo. The py/samples/middleware/ and py/samples/middleware-coding-agent/ directories have runnable examples.