Middleware¶
Middleware lets you wrap the AMGI application to run logic before, or after a message is handled. A middleware is a callable that receives the downstream app, and is itself an AMGI application.
Middleware can be used for cross-cutting concerns like logging, timing, tracing, metrics, or translating errors. It sees
the full AMGI scope, and the receive/send callables for each event.
What Middleware Runs¶
AsyncFast builds a middleware stack that wraps the core app. Each middleware is called for every AMGI event handled by the app:
messagescopes for regular channel handling.lifespanscopes for startup, and shutdown.
If you need one-time startup, or shutdown logic, prefer the lifespan API. Middleware is for per-event behavior.
Basic Middleware¶
Create a class with __init__ to receive the downstream app, and __call__ to handle each event:
from time import monotonic
from amgi_types import AMGIApplication
from amgi_types import AMGIReceiveCallable
from amgi_types import AMGISendCallable
from amgi_types import Scope
from asyncfast import AsyncFast
class TimingMiddleware:
def __init__(self, app: AMGIApplication) -> None:
self._app = app
async def __call__(
self, scope: Scope, receive: AMGIReceiveCallable, send: AMGISendCallable
) -> None:
start = monotonic()
await self._app(scope, receive, send)
duration_ms = (monotonic() - start) * 1000
print(f"{scope['type']} handled in {duration_ms:.2f}ms")
app = AsyncFast()
app.add_middleware(TimingMiddleware)
@app.channel("orders")
async def handle_order(order_id: int) -> None:
print(f"processing order {order_id}")
{
"asyncapi": "3.0.0",
"info": {
"title": "AsyncFast",
"version": "0.1.0"
},
"channels": {
"HandleOrder": {
"address": "orders",
"messages": {
"HandleOrderMessage": {
"$ref": "#/components/messages/HandleOrderMessage"
}
}
}
},
"operations": {
"receiveHandleOrder": {
"action": "receive",
"channel": {
"$ref": "#/channels/HandleOrder"
}
}
},
"components": {
"messages": {
"HandleOrderMessage": {
"payload": {
"type": "integer"
}
}
}
}
}
The code before await self._app(...) runs before the handler (and its dependencies). The code after runs after the
handler finishes. This is the standard pattern for timing, logging, or error handling.
If you add dependencies that use yield for cleanup, their teardown runs inside the downstream app, so it completes
before the code after await self._app(...).
Working With Scope And Messages¶
The middleware callable receives:
scope: a dict describing the AMGI event (includingtype, channel address, headers, and protocol info).receive: an async callable that yields inbound events.send: an async callable used to emit outbound events.
Most middleware simply passes these through to the downstream app. If you need to inspect, or transform traffic, you can
wrap receive or send before passing them along. When you do, make sure you preserve the expected event flow, and
always await the downstream app exactly once.
Registering Middleware¶
You can register middleware when creating the app:
from asyncfast import AsyncFast
from asyncfast import Middleware
app = AsyncFast(
middleware=[Middleware(MyMiddleware, "arg1", option=True)],
)
Or add it later:
app = AsyncFast()
app.add_middleware(MyMiddleware, "arg1", option=True)
Note
The middleware stack is built on first use. Add middleware before the app starts handling messages.
Middleware Order¶
Middleware wraps the app in the order it is registered. The last middleware added runs first.
For example, if you add FirstMiddleware, and then SecondMiddleware, the call order is:
SecondMiddlewarebeforeFirstMiddlewarebeforerouter
FirstMiddlewareafterSecondMiddlewareafter