Writing
Genkit Middleware in Python: Retries, Fallbacks, Tool Approval
Here's what breaks in production without middleware:
Here’s what breaks in production without middleware:
# This fails silently when the model is overloaded
response = await ai.generate(prompt=user_input)
# This calls an expensive external API without any human review
response = await ai.generate(prompt='Transfer $5000 to account X', tools=[transfer_money])
# This crashes your app when your primary model hits quota
response = await ai.generate(model='googleai/gemini-2.5-pro', prompt=user_input)
Three separate failure modes. Three separate “I should add retry logic somewhere” thoughts. Middleware is how Genkit solves all of them with a consistent, composable pattern.
What Middleware Is in Genkit Python
Middleware in Genkit is a class that wraps the generate pipeline. There are three hooks:
wrap_generate— called once per full tool loop iteration (model call + optional tool resolution)wrap_model— called for each raw model API callwrap_tool— called for each tool execution
You subclass BaseMiddleware, override the hook(s) you care about, and pass an instance to use=[...]:
from genkit.middleware import BaseMiddleware, GenerateMiddlewareContext
class LoggingMiddleware(BaseMiddleware):
async def wrap_model(self, params, ctx: GenerateMiddlewareContext, next_fn):
print(f'→ model call: {len(params.request.messages)} messages')
response = await next_fn(params, ctx)
print(f'← finish reason: {response.finish_reason}')
return response
response = await ai.generate(
prompt='Hello',
use=[LoggingMiddleware()],
)
The next_fn call runs the rest of the pipeline. Call it to proceed, don’t call it to intercept. Return a different response to substitute. Raise an exception to abort.
Execution order with use=[A(), B()]:
A.wrap_model (before)
B.wrap_model (before)
model call
B.wrap_model (after)
A.wrap_model (after)
First middleware in the list is the outermost wrapper.
The Middleware Plugin
The built-in middleware classes live in genkit.plugins.middleware. To use them, you need to register the plugin:
from genkit import Genkit
from genkit.plugins.google_genai import GoogleAI
from genkit.plugins.middleware import Middleware, Retry, Fallback, ToolApproval
ai = Genkit(
plugins=[GoogleAI(), Middleware()], # <-- register the plugin
model='googleai/gemini-2.0-flash',
)
Middleware() registers the built-in middleware descriptors so they appear in the Dev UI. You still pass instances into use=[...] at the call site.
Install:
uv add "genkit[google-genai] @ git+https://github.com/firebase/genkit.git#subdirectory=py/packages/genkit"
# The middleware plugin is included in the monorepo install
Retry Middleware
Rate limits (RESOURCE_EXHAUSTED) and transient network errors are facts of life with any LLM API. Without retry logic, these manifest as hard failures.
from genkit.plugins.middleware import Retry
response = await ai.generate(
prompt='Summarize this document...',
use=[Retry(max_retries=3)],
)
That’s the minimal usage. With defaults, Retry retries up to 3 times on these statuses: UNAVAILABLE, DEADLINE_EXCEEDED, RESOURCE_EXHAUSTED, ABORTED, INTERNAL. It uses exponential backoff with jitter starting at 1 second, capping at 60 seconds.
Configuring retry behavior
from genkit.plugins.middleware import Retry
# Aggressive retry for a long-running batch job
batch_retry = Retry(
max_retries=5,
initial_delay_ms=500, # start at 500ms
max_delay_ms=30_000, # cap at 30s
backoff_factor=2.0, # double each attempt
no_jitter=False, # jitter prevents thundering herd (default)
)
# Tight retry for a latency-sensitive user request
fast_retry = Retry(
max_retries=2,
initial_delay_ms=200,
max_delay_ms=2_000,
)
# Custom retry statuses — only retry quota errors, not server errors
quota_retry = Retry(
max_retries=3,
statuses=['RESOURCE_EXHAUSTED'],
)
The RetryConfig parameters:
| Parameter | Default | Description |
|---|---|---|
max_retries | 3 | Maximum retry attempts |
initial_delay_ms | 1000 | First retry delay in ms |
max_delay_ms | 60000 | Maximum delay cap in ms |
backoff_factor | 2.0 | Multiplier per attempt |
no_jitter | False | Disable jitter (use for tests) |
statuses | See above | Which error statuses trigger retry |
Retry hooks into wrap_model — it retries at the raw model API call level, not at the flow level. If a retry succeeds, the response is transparent to the caller.
Fallback Middleware
Retry handles transient failures. Fallback handles sustained failures: your primary model is down, quota is exhausted for the day, or you need geographic failover.
from genkit.plugins.middleware import Fallback
response = await ai.generate(
prompt='Write a product description...',
use=[
Fallback(models=['googleai/gemini-flash-latest', 'googleai/gemini-2.0-flash']),
],
)
If the default model (gemini-2.0-flash) fails with a qualifying status, Genkit automatically retries with gemini-flash-latest, then with gemini-2.0-flash (as a second fallback). If all models fail, the last error is raised.
Fallback statuses (what triggers a fallback): UNAVAILABLE, DEADLINE_EXCEEDED, RESOURCE_EXHAUSTED, ABORTED, INTERNAL, NOT_FOUND, UNIMPLEMENTED.
Custom fallback statuses
# Only fall back on quota exhaustion, not on other errors
quota_fallback = Fallback(
models=['googleai/gemini-flash-latest'],
statuses=['RESOURCE_EXHAUSTED', 'UNAVAILABLE'],
)
Fallback hooks into wrap_model at the raw model call level. If a fallback succeeds, the response comes back as normal—callers don’t need to know which model served the request.
Tool Approval Middleware
This is the pattern I use for any agent that can take real-world actions. Without it, the model can call any tool it wants, any time.
from genkit.plugins.middleware import ToolApproval
# Allow read operations; block write operations
response = await ai.generate(
prompt='Look at the workspace and fix the bug in main.py',
tools=[read_file, list_files, write_file, edit_file],
use=[ToolApproval(allowed_tools=['read_file', 'list_files'])],
)
When the model tries to call write_file or edit_file, ToolApproval raises an Interrupt instead of executing the tool. The caller sees response.interrupts populated—you can then show the user what the agent wants to do and ask for approval.
The full approval loop
from genkit import Genkit, restart_tool
from genkit.plugins.middleware import ToolApproval
async def agent_with_approval(user_request: str) -> str:
messages = []
restart: list | None = None
while True:
response = await ai.generate(
prompt=user_request if restart is None else None,
messages=messages,
resume_restart=restart,
tools=[read_file, write_file],
use=[ToolApproval(allowed_tools=['read_file'])],
)
if not response.interrupts:
# No approval needed — we're done
return response.text
# Show the user what the agent wants to do
interrupt = response.interrupts[0]
tool_name = interrupt.tool_request.name
tool_input = interrupt.tool_request.input
print(f'\nAgent wants to call: {tool_name}')
print(f'With input: {tool_input}')
decision = input('Approve? [y/N]: ').strip().lower()
if decision != 'y':
messages = response.messages
return f'Action denied: {tool_name}'
# Approved — resume with the tool marked as approved
restart = [restart_tool(interrupt, metadata={'toolApproved': True})]
messages = response.messages
How it works internally: ToolApproval.wrap_tool intercepts each tool call. If the tool isn’t in allowed_tools, it checks the metadata on the tool request for toolApproved: true. If that flag is absent, it raises Interrupt. On resume (via resume_restart), the metadata is present, so the tool executes normally.
Logging Middleware
Simple telemetry for every model call:
import time
from genkit.middleware import BaseMiddleware, GenerateMiddlewareContext
class LoggingMiddleware(BaseMiddleware):
"""Log request/response details and timing for every model call."""
async def wrap_model(self, params, ctx: GenerateMiddlewareContext, next_fn):
start = time.monotonic()
message_count = len(params.request.messages)
try:
response = await next_fn(params, ctx)
elapsed_ms = (time.monotonic() - start) * 1000
print(
f'[genkit] model={params.request.config} '
f'messages={message_count} '
f'finish={response.finish_reason} '
f'tokens={response.usage.total_tokens if response.usage else "?"} '
f'elapsed={elapsed_ms:.0f}ms'
)
return response
except Exception as e:
elapsed_ms = (time.monotonic() - start) * 1000
print(f'[genkit] ERROR after {elapsed_ms:.0f}ms: {e}')
raise
For production, wire this into your observability platform:
import structlog
logger = structlog.get_logger(__name__)
class StructuredLoggingMiddleware(BaseMiddleware):
async def wrap_model(self, params, ctx: GenerateMiddlewareContext, next_fn):
start = time.monotonic()
try:
response = await next_fn(params, ctx)
await logger.ainfo(
'genkit.model.call',
finish_reason=str(response.finish_reason),
tokens=response.usage.total_tokens if response.usage else None,
elapsed_ms=round((time.monotonic() - start) * 1000),
)
return response
except Exception as e:
await logger.aerror('genkit.model.error', error=str(e))
raise
LoggingMiddleware requires no constructor config—just instantiate and pass in.
Composing Multiple Middleware Pieces
Here’s where the pattern pays off. Compose with use=[...]:
response = await ai.generate(
prompt='...',
use=[
LoggingMiddleware(), # outermost — sees everything
Retry(max_retries=3), # retry before falling back
Fallback(models=['googleai/gemini-flash-latest']),
],
)
Execution order for wrap_model:
LoggingMiddleware (start timer, log)
Retry (attempt 1)
Fallback (try primary)
model API call ← might fail here
Fallback (try gemini-flash-latest if failed)
Retry (attempt 2 if fallback also failed)
LoggingMiddleware (log result + elapsed)
Retry + Fallback compose naturally: Retry retries the same model; Fallback tries a different model. Put Retry inside Fallback (closer to the model call) if you want to retry each fallback model individually. Put Fallback inside Retry if you want to switch models immediately on first failure.
Complete Production-Ready Example
A flow that combines retry, fallback, and logging—exactly what you’d deploy for a user-facing feature:
# src/production_flow.py
import time
from pydantic import BaseModel
from genkit import Genkit
from genkit.middleware import BaseMiddleware, GenerateMiddlewareContext
from genkit.plugins.google_genai import GoogleAI
from genkit.plugins.middleware import Middleware, Retry, Fallback
ai = Genkit(
plugins=[GoogleAI(), Middleware()],
model='googleai/gemini-2.5-pro', # primary: best quality
)
# ── Cost + latency telemetry ─────────────────────────────────────────────────
class TelemetryMiddleware(BaseMiddleware):
"""Track tokens and latency for cost attribution."""
async def wrap_model(self, params, ctx: GenerateMiddlewareContext, next_fn):
start = time.monotonic()
try:
response = await next_fn(params, ctx)
elapsed_ms = (time.monotonic() - start) * 1000
usage = response.usage
if usage:
# Replace with your metrics backend
print(
f'[telemetry] '
f'input_tokens={usage.input_tokens} '
f'output_tokens={usage.output_tokens} '
f'total_tokens={usage.total_tokens} '
f'elapsed_ms={elapsed_ms:.0f}'
)
return response
except Exception as e:
print(f'[telemetry] error: {e}')
raise
# ── Middleware stack ─────────────────────────────────────────────────────────
# Reusable stack for production generate calls
PRODUCTION_MIDDLEWARE = [
TelemetryMiddleware(), # outermost: always logs
Retry( # retry transient errors
max_retries=3,
initial_delay_ms=1000,
max_delay_ms=30_000,
),
Fallback( # fallback on sustained failure
models=[
'googleai/gemini-flash-latest', # tier 2: balanced
'googleai/gemini-2.0-flash', # tier 3: fast, cheap
],
statuses=['RESOURCE_EXHAUSTED', 'UNAVAILABLE', 'DEADLINE_EXCEEDED'],
),
]
# ── Flow ─────────────────────────────────────────────────────────────────────
class SummarizeInput(BaseModel):
text: str
max_bullets: int = 5
class SummarizeOutput(BaseModel):
bullets: list[str]
sentiment: str
confidence: float
@ai.flow()
async def summarize_resilient(input: SummarizeInput) -> SummarizeOutput:
"""Summarize text with retry, fallback, and telemetry."""
response = await ai.generate(
prompt=f"""
Summarize the following text into at most {input.max_bullets} bullet points.
Also assess the overall sentiment (positive/negative/neutral) and your confidence (0.0-1.0).
Text:
{input.text}
""",
output_format='json',
output_schema=SummarizeOutput,
use=PRODUCTION_MIDDLEWARE,
)
return response.output
async def main():
result = await summarize_resilient(SummarizeInput(
text="""
The new product launch exceeded all expectations. Customer feedback has been
overwhelmingly positive, with satisfaction scores hitting record highs.
The engineering team delivered on time despite significant technical challenges.
Revenue projections for Q3 are being revised upward.
""",
))
for bullet in result.bullets:
print(f' • {bullet}')
print(f'Sentiment: {result.sentiment} (confidence: {result.confidence:.0%})')
if __name__ == '__main__':
ai.run_main(main())
Output:
[telemetry] input_tokens=187 output_tokens=89 total_tokens=276 elapsed_ms=1243
• Product launch exceeded expectations with record-high customer satisfaction
• Engineering team delivered on schedule despite technical obstacles
• Q3 revenue projections revised upward following strong launch performance
Sentiment: positive (confidence: 95%)
If gemini-2.5-pro hits quota, the call transparently retries 3 times with backoff, then falls back to gemini-flash-latest, then to gemini-2.0-flash. Your application code doesn’t know which model answered.
Registering Middleware with the Dev UI
To make middleware visible in the Genkit Dev UI (so you can toggle it per-request during development), register it with @ai.middleware:
@ai.middleware(name='telemetry_mw')
class TelemetryMiddleware(BaseMiddleware):
async def wrap_model(self, params, ctx, next_fn):
...
The name must be a single token (no slashes, colons, or whitespace). Registered middleware appears in the Dev UI’s middleware panel, where you can combine middleware visually and run flows with different stacks.
You don’t need registration to use middleware in code. use=[LoggingMiddleware()] works without @ai.middleware. Registration is purely for Dev UI visibility.
Middleware with Config
When your middleware has tunable parameters, declare a BaseModel config:
from pydantic import BaseModel
from genkit.middleware import BaseMiddleware
class BudgetConfig(BaseModel):
max_tokens: int = 4000
warn_at: int = 3000
@ai.middleware(name='token_budget_mw')
class TokenBudgetMiddleware(BaseMiddleware[BudgetConfig]):
"""Warn when approaching token budget; block over-budget calls."""
async def wrap_model(self, params, ctx, next_fn):
# Count input tokens (rough estimate)
text = ' '.join(
part.text
for msg in params.request.messages
for part in (msg.content or [])
if hasattr(part, 'text') and part.text
)
estimated_tokens = len(text.split()) * 1.3 # rough approximation
if estimated_tokens > self.config.max_tokens:
raise ValueError(
f'Request exceeds token budget: {estimated_tokens:.0f} > {self.config.max_tokens}'
)
if estimated_tokens > self.config.warn_at:
print(f'[budget] Warning: {estimated_tokens:.0f} tokens approaching limit')
return await next_fn(params, ctx)
# Per-call configuration
response = await ai.generate(
prompt=very_long_document,
use=[TokenBudgetMiddleware(max_tokens=8000, warn_at=6000)],
)
The BaseMiddleware[BudgetConfig] syntax wires up config automatically. You can pass keyword args directly: TokenBudgetMiddleware(max_tokens=8000) or pass a config object: TokenBudgetMiddleware(config=BudgetConfig(max_tokens=8000)).
Quick Reference
Built-in middleware
| Class | Hook | Purpose |
|---|---|---|
Retry | wrap_model | Exponential backoff on transient errors |
Fallback | wrap_model | Switch to backup models on failure |
ToolApproval | wrap_tool | Human-in-the-loop before tool execution |
Skills | — | Expose .skill.md files as system prompts + tool |
Filesystem | — | Sandboxed file ops (read/write/edit) |
Artifacts | — | Session artifact management |
Hooks and when to use them
| Hook | Called when | Use for |
|---|---|---|
wrap_generate | Each tool loop iteration | Token counting, conversation-level logging |
wrap_model | Each raw model API call | Retry, fallback, request/response inspection |
wrap_tool | Each tool execution | Approval, tool output transformation, audit logging |
Common patterns
# Retry only
use=[Retry(max_retries=3)]
# Retry + fallback
use=[Retry(max_retries=2), Fallback(models=['googleai/gemini-flash-latest'])]
# Tool approval (block all tools by default)
use=[ToolApproval(allowed_tools=[])]
# Tool approval (allow safe tools, block dangerous ones)
use=[ToolApproval(allowed_tools=['read_file', 'search'])]
# Full production stack
use=[LoggingMiddleware(), Retry(max_retries=3), Fallback(models=[...])]
The full middleware source is in py/plugins/middleware/ of the firebase/genkit repo. The py/samples/middleware/ and py/samples/middleware-coding-agent/ directories have runnable examples.