Middleware

Middleware lets you wrap the AMGI application to run logic before, or after a message is handled. A middleware is a callable that receives the downstream app, and is itself an AMGI application.

Middleware can be used for cross-cutting concerns like logging, timing, tracing, metrics, or translating errors. It sees the full AMGI scope, and the receive/send callables for each event.

What Middleware Runs

AsyncFast builds a middleware stack that wraps the core app. Each middleware is called for every AMGI event handled by the app:

  • message scopes for regular channel handling.

  • lifespan scopes for startup, and shutdown.

If you need one-time startup, or shutdown logic, prefer the lifespan API. Middleware is for per-event behavior.

Basic Middleware

Create a class with __init__ to receive the downstream app, and __call__ to handle each event:

from time import monotonic

from amgi_types import AMGIApplication
from amgi_types import AMGIReceiveCallable
from amgi_types import AMGISendCallable
from amgi_types import Scope
from asyncfast import AsyncFast


class TimingMiddleware:
    def __init__(self, app: AMGIApplication) -> None:
        self._app = app

    async def __call__(
        self, scope: Scope, receive: AMGIReceiveCallable, send: AMGISendCallable
    ) -> None:
        start = monotonic()
        await self._app(scope, receive, send)
        duration_ms = (monotonic() - start) * 1000
        print(f"{scope['type']} handled in {duration_ms:.2f}ms")


app = AsyncFast()
app.add_middleware(TimingMiddleware)


@app.channel("orders")
async def handle_order(order_id: int) -> None:
    print(f"processing order {order_id}")
{
  "asyncapi": "3.0.0",
  "info": {
    "title": "AsyncFast",
    "version": "0.1.0"
  },
  "channels": {
    "HandleOrder": {
      "address": "orders",
      "messages": {
        "HandleOrderMessage": {
          "$ref": "#/components/messages/HandleOrderMessage"
        }
      }
    }
  },
  "operations": {
    "receiveHandleOrder": {
      "action": "receive",
      "channel": {
        "$ref": "#/channels/HandleOrder"
      }
    }
  },
  "components": {
    "messages": {
      "HandleOrderMessage": {
        "payload": {
          "type": "integer"
        }
      }
    }
  }
}

The code before await self._app(...) runs before the handler (and its dependencies). The code after runs after the handler finishes. This is the standard pattern for timing, logging, or error handling.

If you add dependencies that use yield for cleanup, their teardown runs inside the downstream app, so it completes before the code after await self._app(...).

Working With Scope And Messages

The middleware callable receives:

  • scope: a dict describing the AMGI event (including type, channel address, headers, and protocol info).

  • receive: an async callable that yields inbound events.

  • send: an async callable used to emit outbound events.

Most middleware simply passes these through to the downstream app. If you need to inspect, or transform traffic, you can wrap receive or send before passing them along. When you do, make sure you preserve the expected event flow, and always await the downstream app exactly once.

Registering Middleware

You can register middleware when creating the app:

from asyncfast import AsyncFast
from asyncfast import Middleware

app = AsyncFast(
    middleware=[Middleware(MyMiddleware, "arg1", option=True)],
)

Or add it later:

app = AsyncFast()
app.add_middleware(MyMiddleware, "arg1", option=True)

Note

The middleware stack is built on first use. Add middleware before the app starts handling messages.

Middleware Order

Middleware wraps the app in the order it is registered. The last middleware added runs first.

For example, if you add FirstMiddleware, and then SecondMiddleware, the call order is:

  1. SecondMiddleware before

  2. FirstMiddleware before

  3. router

  4. FirstMiddleware after

  5. SecondMiddleware after