Message Send Router¶
MessageSendRouter helps AMGI servers route message.send events to different backends based on the event address.
It manages setup and teardown of the underlying send callables using async context managers. This is useful when you
need to send different messages to different brokers, or you want to keep a single server running while routing
outbound traffic by address.
Basic Usage¶
The router is an async context manager. Create it, register routes, and then enter it when the server starts so the message senders can establish connections:
import os
from dataclasses import dataclass
from typing import Annotated
from amgi_aiobotocore.sqs import MessageSend as SQSMessageSend
from amgi_aiokafka import MessageSend as KafkaMessageSend
from amgi_aiokafka import run
from asyncfast import AsyncFast
from asyncfast import Header
from asyncfast import Message
from asyncfast import MessageSender
from asyncfast.message_send import MessageSendRouter
BOOTSTRAP_SERVERS = os.getenv("BOOTSTRAP_SERVERS", "localhost:9092")
app = AsyncFast()
@dataclass
class Item:
sku_id: str
amount: int
@dataclass
class Order:
items: list[Item]
status: str
@dataclass
class EmailProcessingOrder(Message, address="order-email-processing-order"):
order_id: Annotated[str, Header()]
order: Order
@dataclass
class CancelShipping(Message, address="cancel-shipping"):
order_id: Annotated[str, Header()]
@app.channel("orders")
async def handle_order(
order: Order,
order_id: Annotated[str, Header()],
message_sender: MessageSender[EmailProcessingOrder | CancelShipping],
) -> None:
if order.status == "processing":
await message_sender.send(EmailProcessingOrder(order_id=order_id, order=order))
if order.status == "cancelled":
await message_sender.send(CancelShipping(order_id=order_id))
message_send_router = MessageSendRouter()
message_send_router.add_route(
"order-email-processing-order",
KafkaMessageSend(bootstrap_servers=BOOTSTRAP_SERVERS),
)
message_send_router.add_route("cancel-shipping", SQSMessageSend())
if __name__ == "__main__":
run(
app,
"orders",
bootstrap_servers=BOOTSTRAP_SERVERS,
message_send=message_send_router,
)
{
"asyncapi": "3.0.0",
"info": {
"title": "AsyncFast",
"version": "0.1.0"
},
"channels": {
"HandleOrder": {
"address": "orders",
"messages": {
"HandleOrderMessage": {
"$ref": "#/components/messages/HandleOrderMessage"
}
}
},
"EmailProcessingOrder": {
"address": "order-email-processing-order",
"messages": {
"EmailProcessingOrder": {
"$ref": "#/components/messages/EmailProcessingOrder"
}
}
},
"CancelShipping": {
"address": "cancel-shipping",
"messages": {
"CancelShipping": {
"$ref": "#/components/messages/CancelShipping"
}
}
}
},
"operations": {
"receiveHandleOrder": {
"action": "receive",
"channel": {
"$ref": "#/channels/HandleOrder"
}
},
"sendEmailProcessingOrder": {
"action": "send",
"channel": {
"$ref": "#/channels/EmailProcessingOrder"
}
},
"sendCancelShipping": {
"action": "send",
"channel": {
"$ref": "#/channels/CancelShipping"
}
}
},
"components": {
"messages": {
"HandleOrderMessage": {
"headers": {
"$ref": "#/components/schemas/HandleOrderMessageHeaders"
},
"payload": {
"$ref": "#/components/schemas/Order"
}
},
"EmailProcessingOrder": {
"headers": {
"$ref": "#/components/schemas/EmailProcessingOrderHeaders"
},
"payload": {
"$ref": "#/components/schemas/Order"
}
},
"CancelShipping": {
"headers": {
"$ref": "#/components/schemas/CancelShippingHeaders"
}
}
},
"schemas": {
"CancelShippingHeaders": {
"properties": {
"order-id": {
"title": "Order-Id",
"type": "string"
}
},
"required": [
"order-id"
],
"title": "CancelShippingHeaders",
"type": "object"
},
"EmailProcessingOrderHeaders": {
"properties": {
"order-id": {
"title": "Order-Id",
"type": "string"
}
},
"required": [
"order-id"
],
"title": "EmailProcessingOrderHeaders",
"type": "object"
},
"HandleOrderMessageHeaders": {
"properties": {
"order-id": {
"title": "Order-Id",
"type": "string"
}
},
"required": [
"order-id"
],
"title": "HandleOrderMessageHeaders",
"type": "object"
},
"Item": {
"properties": {
"sku_id": {
"title": "Sku Id",
"type": "string"
},
"amount": {
"title": "Amount",
"type": "integer"
}
},
"required": [
"sku_id",
"amount"
],
"title": "Item",
"type": "object"
},
"Order": {
"properties": {
"items": {
"items": {
"$ref": "#/components/schemas/Item"
},
"title": "Items",
"type": "array"
},
"status": {
"title": "Status",
"type": "string"
}
},
"required": [
"items",
"status"
],
"title": "Order",
"type": "object"
}
}
}
}
In the example above, one address is routed to Kafka, and another to SQS. The app itself stays the same; routing is configured at the server boundary.
Integrating With Run¶
AMGI servers expect a send callable. MessageSendRouter provides that callable when you enter it, so pass the
router instance to the server and let it manage resource lifetimes:
from amgi_aiokafka import run
from asyncfast.message_send import MessageSendRouter
message_send_router = MessageSendRouter()
# add routes...
run(
app,
"orders",
message_send=message_send_router,
)
When the server starts, it enters the router and uses the callable it yields. When the server shuts down, it exits the router and closes all message senders cleanly.
Address Patterns¶
Routes use the same pattern syntax as channel parameters, so priority.{id} will match addresses like
priority.123. Register routes before entering the router so they are included in its setup.
If you need a catch-all pattern, register a default route instead of a broad pattern; this keeps route matching explicit and easier to reason about.
Default Route¶
If you pass default=, the router will use that send callable when no route matches. Without a default, you should
ensure every outgoing address has a route registered, otherwise the send will fail at runtime.
The default sender should be an async context manager just like the routed senders. This allows you to share connection pools or client lifecycles with explicit cleanup on shutdown.