FastAPI Quick Start

This guide shows you how to get started with Chanx in FastAPI.

Prerequisites

Install Chanx with FastAPI support:

pip install "chanx[fast_channels]"

This installs FastAPI and fast-channels.

Project Setup

  1. Create Base Consumer Configuration

Create base_consumer.py:

from chanx.core.websocket import AsyncJsonWebsocketConsumer

class BaseConsumer(AsyncJsonWebsocketConsumer):
    # Message configuration
    send_completion = False
    log_websocket_message = True
    camelize = False

    # Required: Channel layer alias for non-Django frameworks
    channel_layer_alias = "default"
  1. Configure Channel Layers

Create layers.py:

import os
from fast_channels.layers import (
    InMemoryChannelLayer,
    has_layers,
    register_channel_layer,
)
from fast_channels.layers.redis import RedisChannelLayer

def setup_channel_layers():
    """Set up and register channel layers for the application."""
    if has_layers():
        return

    redis_url = os.getenv("REDIS_URL", "redis://localhost:6379")

    # Register the default channel layer
    register_channel_layer(
        "default",
        RedisChannelLayer(hosts=[redis_url])
    )

    # Optional: Add more layers for different purposes
    # register_channel_layer("memory", InMemoryChannelLayer())
  1. Create FastAPI Application

Create main.py:

from fastapi import FastAPI
from fastapi.requests import Request
from fastapi.responses import HTMLResponse, JSONResponse, Response
from chanx.ext.fast_channels import (
    asyncapi_docs,
    asyncapi_spec_json,
    asyncapi_spec_yaml,
)
from chanx.ext.fast_channels.type_defs import AsyncAPIConfig
from layers import setup_channel_layers
from consumers import ChatConsumer

# Setup channel layers
setup_channel_layers()

app = FastAPI()

# AsyncAPI configuration
asyncapi_config = AsyncAPIConfig(
    title="My FastAPI WebSocket API",
    description="WebSocket API with Chanx",
    version="1.0.0",
)

# Add AsyncAPI documentation routes
@app.get("/asyncapi")
async def asyncapi_documentation(request: Request) -> HTMLResponse:
    return await asyncapi_docs(request=request, app=app, config=asyncapi_config)

@app.get("/asyncapi.json")
async def asyncapi_json_spec(request: Request) -> JSONResponse:
    return await asyncapi_spec_json(request=request, app=app, config=asyncapi_config)

@app.get("/asyncapi.yaml")
async def asyncapi_yaml_spec(request: Request) -> Response:
    return await asyncapi_spec_yaml(request=request, app=app, config=asyncapi_config)

@app.get("/")
async def root():
    return {"message": "WebSocket server running"}

# WebSocket routes
app.add_websocket_route("/ws/chat", ChatConsumer.as_asgi())

Create a Simple Consumer

  1. Define Message Types

Create messages.py:

from typing import Literal
from pydantic import BaseModel
from chanx.messages.base import BaseMessage

# Payloads
class EchoPayload(BaseModel):
    message: str

class NotificationPayload(BaseModel):
    alert: str
    level: str = "info"

# Client Messages
class EchoMessage(BaseMessage):
    action: Literal["echo"] = "echo"
    payload: EchoPayload

# Server Messages
class EchoResponseMessage(BaseMessage):
    action: Literal["echo_response"] = "echo_response"
    payload: EchoPayload

class NotificationMessage(BaseMessage):
    action: Literal["notification"] = "notification"
    payload: NotificationPayload

# Events (for server-side broadcasting)
class SystemNotifyEvent(BaseMessage):
    action: Literal["system_notify"] = "system_notify"
    payload: NotificationPayload
  1. Create the Consumer

Create consumers.py:

from chanx.core.decorators import ws_handler, event_handler, channel
from chanx.messages.incoming import PingMessage
from chanx.messages.outgoing import PongMessage
from base_consumer import BaseConsumer
from messages import (
    EchoMessage, EchoResponseMessage, NotificationMessage,
    SystemNotifyEvent, EchoPayload, NotificationPayload
)

@channel(name="chat", description="Simple chat and echo system", tags=["demo"])
class ChatConsumer(BaseConsumer):
    groups = ["general_chat"]

    @ws_handler(summary="Handle ping requests")
    async def handle_ping(self, message: PingMessage) -> PongMessage:
        return PongMessage()

    @ws_handler(summary="Echo messages back")
    async def handle_echo(self, message: EchoMessage) -> EchoResponseMessage:
        return EchoResponseMessage(
            payload=EchoPayload(message=f"Echo: {message.payload.message}")
        )

    @ws_handler(
        summary="Broadcast message to all clients",
        output_type=NotificationMessage,
    )
    async def handle_broadcast(self, message: EchoMessage) -> None:
        await self.broadcast_message(
            NotificationMessage(
                payload=NotificationPayload(
                    alert=f"Broadcast: {message.payload.message}",
                    level="info"
                )
            )
        )

    @event_handler
    async def handle_system_notify(self, event: SystemNotifyEvent) -> NotificationMessage:
        return NotificationMessage(payload=event.payload)
  1. Run the Application

uvicorn main:app --reload

Visit http://localhost:8000/asyncapi to see the auto-generated documentation.

AsyncAPI Documentation - FastAPI API Information
  1. Test the Consumer

Test with JavaScript in the browser console:

// Connect to WebSocket
const ws = new WebSocket('ws://localhost:8000/ws/chat');

ws.onmessage = (event) => {
    console.log('Received:', JSON.parse(event.data));
};

// Test ping
ws.send(JSON.stringify({"action": "ping"}));

// Test echo
ws.send(JSON.stringify({
    "action": "echo",
    "payload": {"message": "Hello World"}
}));

// Test broadcast (all connected clients will receive this)
ws.send(JSON.stringify({
    "action": "broadcast",
    "payload": {"message": "Hello everyone!"}
}));
  1. Send Events from Background Tasks

You can send events to WebSocket clients from background tasks:

# From a FastAPI endpoint or background task
from consumers import ChatConsumer
from messages import SystemNotifyEvent, NotificationPayload

@app.post("/notify")
async def send_notification():
    # Send notification to all connected clients
    await ChatConsumer.broadcast_event(
        SystemNotifyEvent(
            payload=NotificationPayload(
                alert="Server maintenance in 5 minutes",
                level="warning"
            )
        ),
        groups=["general_chat"]
    )
    return {"status": "notification sent"}

Next Steps

Now that you have a working FastAPI WebSocket consumer with Chanx: