Message Send Router

MessageSendRouter helps AMGI servers route message.send events to different backends based on the event address. It manages setup and teardown of the underlying send callables using async context managers. This is useful when you need to send different messages to different brokers, or you want to keep a single server running while routing outbound traffic by address.

Basic Usage

The router is an async context manager. Create it, register routes, and then enter it when the server starts so the message senders can establish connections:

import os
from dataclasses import dataclass
from typing import Annotated

from amgi_aiobotocore.sqs import MessageSend as SQSMessageSend
from amgi_aiokafka import MessageSend as KafkaMessageSend
from amgi_aiokafka import run
from asyncfast import AsyncFast
from asyncfast import Header
from asyncfast import Message
from asyncfast import MessageSender
from asyncfast.message_send import MessageSendRouter

BOOTSTRAP_SERVERS = os.getenv("BOOTSTRAP_SERVERS", "localhost:9092")

app = AsyncFast()


@dataclass
class Item:
    sku_id: str
    amount: int


@dataclass
class Order:
    items: list[Item]
    status: str


@dataclass
class EmailProcessingOrder(Message, address="order-email-processing-order"):
    order_id: Annotated[str, Header()]
    order: Order


@dataclass
class CancelShipping(Message, address="cancel-shipping"):
    order_id: Annotated[str, Header()]


@app.channel("orders")
async def handle_order(
    order: Order,
    order_id: Annotated[str, Header()],
    message_sender: MessageSender[EmailProcessingOrder | CancelShipping],
) -> None:
    if order.status == "processing":
        await message_sender.send(EmailProcessingOrder(order_id=order_id, order=order))

    if order.status == "cancelled":
        await message_sender.send(CancelShipping(order_id=order_id))


message_send_router = MessageSendRouter()

message_send_router.add_route(
    "order-email-processing-order",
    KafkaMessageSend(bootstrap_servers=BOOTSTRAP_SERVERS),
)
message_send_router.add_route("cancel-shipping", SQSMessageSend())

if __name__ == "__main__":
    run(
        app,
        "orders",
        bootstrap_servers=BOOTSTRAP_SERVERS,
        message_send=message_send_router,
    )
{
  "asyncapi": "3.0.0",
  "info": {
    "title": "AsyncFast",
    "version": "0.1.0"
  },
  "channels": {
    "HandleOrder": {
      "address": "orders",
      "messages": {
        "HandleOrderMessage": {
          "$ref": "#/components/messages/HandleOrderMessage"
        }
      }
    },
    "EmailProcessingOrder": {
      "address": "order-email-processing-order",
      "messages": {
        "EmailProcessingOrder": {
          "$ref": "#/components/messages/EmailProcessingOrder"
        }
      }
    },
    "CancelShipping": {
      "address": "cancel-shipping",
      "messages": {
        "CancelShipping": {
          "$ref": "#/components/messages/CancelShipping"
        }
      }
    }
  },
  "operations": {
    "receiveHandleOrder": {
      "action": "receive",
      "channel": {
        "$ref": "#/channels/HandleOrder"
      }
    },
    "sendEmailProcessingOrder": {
      "action": "send",
      "channel": {
        "$ref": "#/channels/EmailProcessingOrder"
      }
    },
    "sendCancelShipping": {
      "action": "send",
      "channel": {
        "$ref": "#/channels/CancelShipping"
      }
    }
  },
  "components": {
    "messages": {
      "HandleOrderMessage": {
        "headers": {
          "$ref": "#/components/schemas/HandleOrderMessageHeaders"
        },
        "payload": {
          "$ref": "#/components/schemas/Order"
        }
      },
      "EmailProcessingOrder": {
        "headers": {
          "$ref": "#/components/schemas/EmailProcessingOrderHeaders"
        },
        "payload": {
          "$ref": "#/components/schemas/Order"
        }
      },
      "CancelShipping": {
        "headers": {
          "$ref": "#/components/schemas/CancelShippingHeaders"
        }
      }
    },
    "schemas": {
      "CancelShippingHeaders": {
        "properties": {
          "order-id": {
            "title": "Order-Id",
            "type": "string"
          }
        },
        "required": [
          "order-id"
        ],
        "title": "CancelShippingHeaders",
        "type": "object"
      },
      "EmailProcessingOrderHeaders": {
        "properties": {
          "order-id": {
            "title": "Order-Id",
            "type": "string"
          }
        },
        "required": [
          "order-id"
        ],
        "title": "EmailProcessingOrderHeaders",
        "type": "object"
      },
      "HandleOrderMessageHeaders": {
        "properties": {
          "order-id": {
            "title": "Order-Id",
            "type": "string"
          }
        },
        "required": [
          "order-id"
        ],
        "title": "HandleOrderMessageHeaders",
        "type": "object"
      },
      "Item": {
        "properties": {
          "sku_id": {
            "title": "Sku Id",
            "type": "string"
          },
          "amount": {
            "title": "Amount",
            "type": "integer"
          }
        },
        "required": [
          "sku_id",
          "amount"
        ],
        "title": "Item",
        "type": "object"
      },
      "Order": {
        "properties": {
          "items": {
            "items": {
              "$ref": "#/components/schemas/Item"
            },
            "title": "Items",
            "type": "array"
          },
          "status": {
            "title": "Status",
            "type": "string"
          }
        },
        "required": [
          "items",
          "status"
        ],
        "title": "Order",
        "type": "object"
      }
    }
  }
}

In the example above, one address is routed to Kafka, and another to SQS. The app itself stays the same; routing is configured at the server boundary.

Integrating With Run

AMGI servers expect a send callable. MessageSendRouter provides that callable when you enter it, so pass the router instance to the server and let it manage resource lifetimes:

from amgi_aiokafka import run
from asyncfast.message_send import MessageSendRouter

message_send_router = MessageSendRouter()
# add routes...

run(
    app,
    "orders",
    message_send=message_send_router,
)

When the server starts, it enters the router and uses the callable it yields. When the server shuts down, it exits the router and closes all message senders cleanly.

Address Patterns

Routes use the same pattern syntax as channel parameters, so priority.{id} will match addresses like priority.123. Register routes before entering the router so they are included in its setup.

If you need a catch-all pattern, register a default route instead of a broad pattern; this keeps route matching explicit and easier to reason about.

Default Route

If you pass default=, the router will use that send callable when no route matches. Without a default, you should ensure every outgoing address has a route registered, otherwise the send will fail at runtime.

The default sender should be an async context manager just like the routed senders. This allows you to share connection pools or client lifecycles with explicit cleanup on shutdown.