FastAPI Complete Example

This example demonstrates a comprehensive WebSocket application using FastAPI and Chanx. The sandbox showcases modular app architecture, multiple channel layer types, real ARQ background job processing, and advanced WebSocket patterns across multiple consumer types.

Overview

The FastAPI sandbox implements a sophisticated multi-consumer WebSocket system featuring:

  • Modular App Architecture with 4 distinct consumer apps demonstrating different patterns

  • Multiple Channel Layer Types (Memory, Redis Pub/Sub, Redis Queue) used strategically

  • Real ARQ Background Jobs with job queuing and result streaming

  • Dynamic Room Management with path parameter extraction

  • Direct WebSocket Communication without channel layers for simple use cases

  • Interactive HTML/JS Client with multiple chat interfaces

  • Production Development Setup with coordinated FastAPI + ARQ worker startup

Quick Start

  1. Clone the repository:

    git clone https://github.com/huynguyengl99/chanx
    cd chanx
    
  2. Prerequisites: Ensure Docker and uv are installed

  3. Start services: Run docker compose up -d to start Redis and PostgreSQL

  4. Install dependencies: Run uv sync --all-extras

  5. Activate virtual environment:

    source .venv/bin/activate
    
  6. Start the application:

    python sandbox_fastapi/start_dev.py
    
  7. Access the application:

The development script starts both the FastAPI application and ARQ worker for background job processing.

Project Structure

The FastAPI sandbox demonstrates clean modular architecture:

sandbox_fastapi/
├── main.py                  # FastAPI app with WebSocket routing + HTML client
├── start_dev.py            # Production development script (FastAPI + ARQ)
├── base_consumer.py        # Environment-based configuration for all consumers
├── layers.py               # Multi-type channel layer setup with fast-channels
├── tasks.py                # Real ARQ background tasks with result streaming
├── worker.py               # ARQ worker configuration
├── external_sender.py      # External event broadcasting examples
├── static/                 # CSS/JS for interactive HTML client
└── apps/                   # Modular consumer architecture
    ├── showcase/           # 4 consumers showing different channel layer types
    │   ├── consumer.py     # Chat, Reliable, Notifications, Analytics
    │   └── messages.py     # Pydantic message definitions
    ├── room_chat/          # Dynamic room-based messaging
    │   ├── consumer.py     # Path parameter extraction + room groups
    │   └── messages.py     # Room-specific message types
    ├── background_jobs/    # Real ARQ integration
    │   ├── consumer.py     # Job queuing + result event handling
    │   └── messages.py     # Job processing message types
    └── system_chat/        # Direct WebSocket (no channel layers)
        ├── consumer.py     # Simple echo without groups
        └── messages.py     # Direct response messages

Key Consumer Applications

1. System Chat App (apps/system_chat/consumer.py) - No Channel Layers:

@channel(name="system", description="Direct WebSocket without channel layers")
class SystemMessageConsumer(BaseConsumer):
    # No channel_layer_alias = direct WebSocket only

    @ws_handler
    async def handle_system(self, message: UserMessage) -> SystemEchoMessage:
        # Direct response without groups or broadcasting
        return SystemEchoMessage(payload=MessagePayload(
            message=f"🔧 System Echo: {message.payload.message}"
        ))

2. Room Chat App (apps/room_chat/consumer.py) - Dynamic Path Parameters:

@channel(name="room_chat", description="Dynamic room-based messaging")
class RoomChatConsumer(BaseConsumer):
    channel_layer_alias = "chat"

    async def post_authentication(self) -> None:
        # Extract room from WebSocket path: /ws/room/{room_name}
        self.room_name = self.scope["path_params"]["room_name"]
        self.room_group_name = f"room_{self.room_name}"

        # Join room group dynamically
        await self.channel_layer.group_add(self.room_group_name, self.channel_name)
        self.groups.append(self.room_group_name)

3. Background Jobs App (apps/background_jobs/consumer.py) - Real ARQ Integration:

@channel(name="background_jobs", description="Real background job processing with ARQ")
class BackgroundJobConsumer(BaseConsumer[JobResult]):
    channel_layer_alias = "chat"

    @ws_handler(output_type=JobStatusMessage)
    async def handle_job(self, message: JobMessage) -> None:
        # Queue real ARQ job
        job_id = await queue_job(
            message.payload.type,
            message.payload.content,
            self.channel_name
        )

        # Send immediate confirmation
        await self.send_message(JobStatusMessage(payload={
            "status": "queued",
            "job_id": job_id
        }))

    @event_handler
    async def handle_job_result(self, event: JobResult) -> JobStatusMessage:
        # Receive results from ARQ worker
        return JobStatusMessage(payload={"status": "result", "message": event.payload})

4. Showcase App (apps/showcase/consumer.py) - 4 Different Channel Layer Types:

# Chat Consumer - Redis Pub/Sub Layer
@channel(name="chat", description="Basic Chat Consumer using centralized chat layer")
class ChatConsumer(BaseConsumer[SystemNotify]):
    channel_layer_alias = "chat"  # Redis Pub/Sub for real-time
    groups = ["chat_room"]

    @ws_handler(output_type=ChatNotificationMessage)
    async def handle_chat(self, message: ChatMessage) -> None:
        await self.broadcast_message(
            ChatNotificationMessage(payload=ChatPayload(
                message=f"💬 {message.payload.message}"
            ))
        )

# Reliable Chat Consumer - Redis Queue Layer
@channel(name="reliable_chat", description="Reliable Chat using queue-based layer")
class ReliableChatConsumer(BaseConsumer[SystemNotify]):
    channel_layer_alias = "queue"  # Redis Queue for reliability
    groups = ["reliable_chat"]

# Analytics Consumer - High-capacity Redis Layer
@channel(name="analytics", description="Analytics events with reliable delivery")
class AnalyticsConsumer(BaseConsumer[SystemNotify]):
    channel_layer_alias = "analytics"  # High-capacity Redis (5000 messages)
    groups = ["analytics"]

Channel Layer Configuration

Strategic Multi-Layer Setup (sandbox_fastapi/layers.py):

"""
Channel layer definitions and registration.
This file centralizes all channel layer configuration for the application.
"""

import os

from fast_channels.layers import (
    InMemoryChannelLayer,
    has_layers,
    register_channel_layer,
)
from fast_channels.layers.redis import (
    RedisChannelLayer,
    RedisPubSubChannelLayer,
)

base_redis_url = os.getenv("REDIS_URL", "redis://localhost:6363")


def setup_layers(force: bool = False, worker_id: int | None = None) -> None:
    """
    Set up and register all channel layers for the application.
    This should be called once during application startup.
    """
    # Get Redis URL from environment or use default
    if has_layers() and not force:
        return

    redis_url = base_redis_url
    post_fix = ""
    if worker_id is not None:
        redis_url = f"{redis_url}/{worker_id + 8}"
        post_fix = str(worker_id)

    # Create different types of layers
    layers_config = {
        # In-memory layer for development/testing
        "memory": InMemoryChannelLayer(),
        # Redis Pub/Sub layer for real-time messaging
        "chat": RedisPubSubChannelLayer(hosts=[redis_url], prefix=f"chat{post_fix}"),
        # Redis Queue layer for reliable messaging
        "queue": RedisChannelLayer(
            hosts=[redis_url],
            prefix=f"queue{post_fix}",
            expiry=900,  # 15 minutes
            capacity=1000,
        ),
        # Notifications layer with different prefix
        "notifications": RedisPubSubChannelLayer(
            hosts=[redis_url], prefix=f"notify{post_fix}"
        ),
        # Analytics layer for metrics/events
        "analytics": RedisChannelLayer(
            hosts=[redis_url],
            prefix=f"analytics{post_fix}",
            expiry=3600,  # 1 hour
            capacity=5000,
        ),
    }

    # Register all layers
    for alias, layer in layers_config.items():
        register_channel_layer(alias, layer)

Layer Strategy Breakdown:

  • Memory Layer: Development/testing without Redis dependency

  • Chat Layer (Redis Pub/Sub): Real-time messaging with instant delivery

  • Queue Layer (Redis Queue): Reliable messaging with persistence (15min expiry, 1000 capacity)

  • Notifications Layer (Redis Pub/Sub): Separate namespace for system notifications

  • Analytics Layer (Redis Queue): High-capacity event storage (1hr expiry, 5000 capacity)

Base Consumer Configuration:

import os

from chanx.fast_channels.websocket import AsyncJsonWebsocketConsumer, ReceiveEvent


class BaseConsumer(AsyncJsonWebsocketConsumer[ReceiveEvent]):
    send_completion = bool(os.environ.get("SEND_COMPLETION", None))

Environment-driven configuration allows testing with/without completion signals.

System Messaging (No Channel Layers)

Direct WebSocket communication without channel layers:

# From sandbox_fastapi/apps/system_chat/consumer.py
@channel(name="system_chat", description="Direct WebSocket messaging")
class SystemMessageConsumer(BaseConsumer):
    # No channel_layer_alias - uses direct WebSocket

    @ws_handler(summary="Echo system message")
    async def handle_system_message(self, message: SystemMessage) -> SystemEchoMessage:
        return SystemEchoMessage(
            payload=f"SYSTEM ECHO: {message.payload.message}"
        )

Room Chat Management

The room chat consumer demonstrates path parameter handling:

# From sandbox_fastapi/apps/room_chat/consumer.py
@channel(name="room_chat", description="Dynamic room-based chat")
class RoomChatConsumer(BaseConsumer):
    async def post_authentication(self):
        # Extract room from WebSocket path
        room_name = self.scope["path_info"].split("/")[-1]
        await self.join_group(f"room_{room_name}")

    @ws_handler(summary="Send message to room")
    async def handle_room_message(self, message: RoomMessage) -> None:
        room_name = self.scope["path_info"].split("/")[-1]
        await self.broadcast_message(
            RoomNotificationMessage(
                payload=f"[{room_name}] {message.payload.message}"
            ),
            groups=[f"room_{room_name}"]
        )

Real ARQ Background Job Processing

Complete Job Lifecycle (sandbox_fastapi/tasks.py):

# Real ARQ tasks with result streaming back to WebSocket
async def translate(ctx: dict, job_id: str, content: str, channel_name: str) -> dict:
    """Real translation task with async processing."""
    await asyncio.sleep(2)  # Simulate API call

    translations = {"hello": "hola", "world": "mundo"}
    translated = translations.get(content.lower(), f"[TRANSLATED: {content}]")
    result = f"🌍 Translated: '{content}' → '{translated}'"

    # Send result back to WebSocket client via channel layer
    await _send_result_to_client(channel_name, result)
    return {"status": "completed", "result": result, "job_id": job_id}

async def analyze(ctx: dict, job_id: str, content: str, channel_name: str) -> dict:
    """Text analysis task."""
    await asyncio.sleep(3)

    word_count = len(content.split())
    char_count = len(content)
    result = f"📊 Analysis: {char_count} chars, {word_count} words"

    await _send_result_to_client(channel_name, result)
    return {"status": "completed", "result": result}

async def _send_result_to_client(channel_name: str, message: str) -> None:
    """Stream result back to WebSocket consumer."""
    from sandbox_fastapi.apps.background_jobs.consumer import BackgroundJobConsumer
    await BackgroundJobConsumer.send_event(JobResult(payload=message), channel_name)

ARQ Job Queuing:

async def queue_job(job_type: str, content: str, channel_name: str) -> str:
    """Queue job with ARQ and return job ID."""
    redis = await create_pool(REDIS_SETTINGS)

    try:
        job_id = f"{job_type}_{int(time.time())}"
        job = await redis.enqueue_job(job_type, job_id, content, channel_name)
        return job.job_id if job else job_id
    finally:
        await redis.aclose()

Consumer Integration:

The BackgroundJobConsumer queues jobs immediately and receives results via events:

@ws_handler(output_type=JobStatusMessage)
async def handle_job(self, message: JobMessage) -> None:
    # 1. Queue ARQ job immediately
    job_id = await queue_job(message.payload.type, message.payload.content, self.channel_name)

    # 2. Send confirmation to client
    await self.send_message(JobStatusMessage(payload={
        "status": "queued", "job_id": job_id
    }))

@event_handler
async def handle_job_result(self, event: JobResult) -> JobStatusMessage:
    # 3. Receive results from ARQ worker
    return JobStatusMessage(payload={"status": "result", "message": event.payload})

Multi-Layer Consumer Showcase

The showcase app demonstrates different channel layer types working together:

"""
Layers Combo Consumers - Different channel layer types working together.
Migrated to use chanx framework for unified API.
"""

from chanx.core.decorators import channel, event_handler, ws_handler
from chanx.messages.incoming import PingMessage
from chanx.messages.outgoing import PongMessage

from fast_channels.type_defs import WebSocketDisconnectEvent
from sandbox_fastapi.base_consumer import BaseConsumer

from ..mixins import ExtraRequestMessage, ExtraWsHandlerMixin
from .messages import (
    AnalyticsMessage,
    AnalyticsNotificationMessage,
    AnalyticsPayload,
    ChatMessage,
    ChatNotificationMessage,
    ChatPayload,
    NotificationBroadcastMessage,
    NotificationMessage,
    NotificationPayload,
    ReliableChatMessage,
    ReliableChatNotificationMessage,
    ReliableChatPayload,
    SystemNotify,
    SystemPeriodicNotify,
    UserJoinedNotification,
    UserLeftNotification,
)

AllEvent = (
    SystemNotify | ExtraRequestMessage | UserJoinedNotification | UserLeftNotification
)


@channel(
    name="chat",
    description="Basic Chat Consumer using centralized chat layer",
    tags=["chat", "showcase"],
)
class ChatConsumer(ExtraWsHandlerMixin, BaseConsumer[AllEvent]):
    """
    Chat consumer using the centralized chat layer.
    Migrated to use chanx framework.
    """

    groups = ["chat_room"]
    channel_layer_alias = "chat"

    # Events that are forwarded directly to WebSocket clients without processing
    passthrough_events = [UserJoinedNotification, UserLeftNotification]

    @ws_handler(
        summary="Handle ping requests",
        description="Simple ping-pong for connectivity testing",
    )
    async def handle_ping(self, _message: PingMessage) -> PongMessage:
        return PongMessage()

    @ws_handler(
        summary="Handle chat messages",
        description="Process chat messages and broadcast to room",
        output_type=ChatNotificationMessage,
    )
    async def handle_chat(self, message: ChatMessage) -> None:
        """Handle incoming chat messages and broadcast to room."""
        await self.broadcast_message(
            ChatNotificationMessage(
                payload=ChatPayload(message=f"💬 {message.payload.message}"),
            ),
        )

    async def post_authentication(self) -> None:
        """Send join message when user connects."""
        await self.broadcast_message(
            ChatNotificationMessage(
                payload=ChatPayload(message="📢 Someone joined the chat")
            ),
        )

    async def websocket_disconnect(self, message: WebSocketDisconnectEvent) -> None:
        """Send leave message when user disconnects."""
        await self.broadcast_message(
            ChatNotificationMessage(
                payload=ChatPayload(message="❌ Someone left the chat.")
            ),
        )
        await super().websocket_disconnect(message)

    @event_handler
    async def system_notify_chat(self, event: SystemNotify) -> ChatNotificationMessage:
        return ChatNotificationMessage(
            payload=ChatPayload(message=event.payload),
        )


@channel(
    name="reliable_chat",
    description="Reliable Chat Consumer using queue-based layer",
    tags=["chat", "reliable", "showcase"],
)
class ReliableChatConsumer(BaseConsumer[SystemNotify]):
    """
    Chat consumer using the queue-based layer for guaranteed message delivery.
    Migrated to use chanx framework.
    """

    channel_layer_alias = "queue"
    groups = ["reliable_chat"]

    @ws_handler(
        summary="Handle ping requests",
        description="Simple ping-pong for connectivity testing",
    )
    async def handle_ping(self, _message: PingMessage) -> PongMessage:
        return PongMessage()

    @ws_handler(
        summary="Handle reliable chat messages",
        description="Process reliable chat messages with guaranteed delivery",
        output_type=ReliableChatNotificationMessage,
    )
    async def handle_reliable_chat(self, message: ReliableChatMessage) -> None:
        """Handle incoming reliable chat messages."""
        await self.broadcast_message(
            ReliableChatNotificationMessage(
                payload=ReliableChatPayload(message=f"📨 {message.payload.message}")
            ),
            groups=["reliable_chat"],
        )

    async def post_authentication(self) -> None:
        """Send connection established message."""
        await self.broadcast_message(
            ReliableChatNotificationMessage(
                payload=ReliableChatPayload(
                    message="🔒 Reliable chat connection established!"
                )
            ),
            groups=["reliable_chat"],
        )

    async def websocket_disconnect(self, message: WebSocketDisconnectEvent) -> None:
        """Send disconnect message."""
        await self.broadcast_message(
            ReliableChatNotificationMessage(
                payload=ReliableChatPayload(message="🚪 Left reliable chat!")
            ),
            groups=["reliable_chat"],
        )
        await super().websocket_disconnect(message)

    @event_handler
    async def system_notify_chat(self, event: SystemNotify) -> ChatNotificationMessage:
        return ChatNotificationMessage(
            payload=ChatPayload(message=event.payload),
        )


@channel(
    name="notifications",
    description="Notification Consumer for real-time notifications",
    tags=["notifications", "showcase"],
)
class NotificationConsumer(BaseConsumer):
    """
    Consumer for real-time notifications using JSON messages.
    Migrated to use chanx framework.
    """

    channel_layer_alias = "notifications"
    groups = ["notifications"]

    @ws_handler(
        summary="Handle ping requests",
        description="Simple ping-pong for connectivity testing",
    )
    async def handle_ping(self, _message: PingMessage) -> PongMessage:
        return PongMessage()

    @ws_handler(
        summary="Handle notification messages",
        description="Process notification messages and broadcast to all clients",
        output_type=NotificationBroadcastMessage,
    )
    async def handle_notification(self, message: NotificationMessage) -> None:
        """Handle incoming notifications and broadcast to all clients."""
        await self.broadcast_message(
            NotificationBroadcastMessage(
                payload=NotificationPayload(
                    type="user",
                    message=f"🔔 Notification: {message.payload.message}",
                )
            ),
        )

    async def post_authentication(self) -> None:
        """Send connection established notification."""
        await self.broadcast_message(
            NotificationBroadcastMessage(
                payload=NotificationPayload(
                    type="system", message="🔔 Connected to notifications!"
                )
            ),
        )

    @event_handler
    async def system_notify_notification(
        self, event: SystemNotify
    ) -> NotificationBroadcastMessage:
        return NotificationBroadcastMessage(
            payload=NotificationPayload(type="system", message=event.payload)
        )

    @event_handler
    async def system_notify_periodic_notification(
        self, event: SystemPeriodicNotify
    ) -> NotificationBroadcastMessage:
        return NotificationBroadcastMessage(
            payload=NotificationPayload(type="periodic", message=event.payload)
        )


@channel(
    name="analytics",
    description="Analytics Consumer for reliable event delivery",
    tags=["analytics", "showcase"],
)
class AnalyticsConsumer(BaseConsumer[SystemNotify]):
    """
    Consumer for analytics events with reliable delivery.
    Migrated to use chanx framework.
    """

    channel_layer_alias = "analytics"
    groups = ["analytics"]

    @ws_handler(
        summary="Handle ping requests",
        description="Simple ping-pong for connectivity testing",
    )
    async def handle_ping(self, _message: PingMessage) -> PongMessage:
        return PongMessage()

    @ws_handler(
        summary="Handle analytics events",
        description="Process analytics events with reliable delivery",
        output_type=AnalyticsNotificationMessage,
    )
    async def handle_analytics(self, message: AnalyticsMessage) -> None:
        """Handle incoming analytics events."""
        await self.broadcast_message(
            AnalyticsNotificationMessage(
                payload=AnalyticsPayload(
                    event=f"📊 Analytics: {message.payload.event}",
                    data=message.payload.data,
                )
            ),
        )

    @event_handler
    async def system_notify_analytic(
        self, event: SystemNotify
    ) -> AnalyticsNotificationMessage:
        return AnalyticsNotificationMessage(
            payload=AnalyticsPayload(
                event=f"{event.payload}",
            )
        )

Key Features:

  1. Channel-specific configuration via channel_layer_alias

  2. Group-based broadcasting for room-like functionality

  3. Event handlers for server-side message processing

  4. Connection lifecycle management with join/leave notifications

WebSocket Mounting:

WebSocket consumers are mounted as ASGI applications:

from fastapi import FastAPI
from sandbox_fastapi.apps.showcase.consumer import (
    ChatConsumer, AnalyticsConsumer, NotificationConsumer
)

app = FastAPI()
ws_router = FastAPI()

# Mount WebSocket consumers
ws_router.add_websocket_route("/chat", ChatConsumer.as_asgi())
ws_router.add_websocket_route("/analytics", AnalyticsConsumer.as_asgi())
ws_router.add_websocket_route("/notifications", NotificationConsumer.as_asgi())
ws_router.add_websocket_route("/room/{room_name}", RoomChatConsumer.as_asgi())

# Mount WebSocket sub-app
app.mount("/ws", ws_router)

Testing External Broadcasting:

The external sender script demonstrates broadcasting events from outside consumers to connected WebSocket clients:

# Start the application and visit http://localhost:8080/
python sandbox_fastapi/start_dev.py

# In another terminal, run the external sender script
python sandbox_fastapi/external_sender.py

AsyncAPI Documentation

Automatic API documentation generation from decorated consumers:

from chanx.fast_channels import (
    asyncapi_docs, asyncapi_spec_json, asyncapi_spec_yaml
)
from chanx.fast_channels.type_defs import AsyncAPIConfig

# Configure AsyncAPI
asyncapi_conf = AsyncAPIConfig(
    description="Websocket API documentation generated by Chanx",
    version="1.0.0",
)

@app.get("/asyncapi")
async def asyncapi_documentation(request: Request) -> HTMLResponse:
    return await asyncapi_docs(request=request, app=app, config=asyncapi_conf)

@app.get("/asyncapi.json")
async def asyncapi_json_spec(request: Request) -> JSONResponse:
    return await asyncapi_spec_json(request=request, app=app, config=asyncapi_conf)

The FastAPI integration provides:

  • Interactive documentation with WebSocket testing capabilities

  • JSON/YAML exports for API contract sharing

  • Automatic discovery of all decorated consumers

AsyncAPI Documentation UI showing FastAPI WebSocket endpoints

HTML Client Interface

The sandbox includes a complete HTML/JavaScript client for testing:

<!-- From main.py HTML template -->
<div class="chat-container">
    <!-- System Messages (No Channel Layer) -->
    <div class="chat-box system-chat">
        <h3>System Messages (No Layer)</h3>
        <form onsubmit="sendSystemMessage(event)">
            <input type="text" placeholder="Type system message..."/>
            <button>Send</button>
        </form>
        <ul id='systemMessages'></ul>
    </div>

    <!-- Room Chat with Dynamic Rooms -->
    <div class="chat-box room-chat">
        <h3>Room Chat</h3>
        <input type="text" id="roomName" placeholder="Enter room name..."/>
        <button onclick="connectToRoom()">Connect</button>
        <ul id='roomMessages'></ul>
    </div>

    <!-- Background Job Processing -->
    <div class="chat-box job-chat">
        <h3>Background Job Processing</h3>
        <select id="jobType">
            <option value="translate">Translation</option>
            <option value="analyze">Text Analysis</option>
        </select>
        <ul id='jobMessages'></ul>
    </div>
</div>

Production Development Workflow

Coordinated FastAPI + ARQ Startup (sandbox_fastapi/start_dev.py):

#!/usr/bin/env python3
"""
Development startup script that runs both FastAPI app and ARQ worker.

Usage:
    python sandbox_fastapi/start_dev.py

This will start:
1. ARQ worker in the background
2. FastAPI application with live reload

Both processes will be managed together and stopped with Ctrl+C.
"""

import signal
import subprocess
import sys
import time
from types import FrameType

import uvicorn


def main() -> None:  # noqa
    """Start both worker and FastAPI app."""
    print("🚀 Starting development environment...")

    # Store process references
    worker_process = None

    def cleanup(signum: int | None = None, frame: FrameType | None = None) -> None:
        """Clean up processes on exit."""
        print("\n🛑 Shutting down...")

        if worker_process:
            print("🔄 Stopping ARQ worker...")
            worker_process.terminate()
            try:
                worker_process.wait(timeout=5)
            except subprocess.TimeoutExpired:
                worker_process.kill()

        print("✅ Shutdown complete")
        sys.exit(0)

    # Set up signal handlers
    signal.signal(signal.SIGINT, cleanup)
    signal.signal(signal.SIGTERM, cleanup)

    try:
        # Start ARQ worker using CLI
        print("🔄 Starting ARQ worker...")
        worker_process = subprocess.Popen(
            [sys.executable, "-m", "arq", "sandbox_fastapi.tasks.WorkerSettings"]
        )

        # Give worker a moment to start
        time.sleep(2)

        # Start FastAPI app
        print("🌐 Starting FastAPI application...")
        uvicorn.run("sandbox_fastapi.main:app", host="0.0.0.0", port=8080, reload=True)

        print("\n✅ Development environment ready!")
        print("📱 FastAPI app: http://localhost:8080")
        print("🔄 ARQ worker: running in background")
        print("🛑 Press Ctrl+C to stop both services")

    except KeyboardInterrupt:
        pass
    except Exception as e:
        print(f"❌ Error: {e}")
    finally:
        cleanup()


if __name__ == "__main__":
    main()

Key Features:

  • Process Management: Automatic ARQ worker startup before FastAPI

  • Signal Handling: Graceful shutdown of both processes with Ctrl+C

  • Development Optimized: FastAPI live reload + ARQ worker coordination

  • Error Recovery: Proper cleanup on exceptions or forced termination

Production Benefits:

  • Realistic Development: Same ARQ integration as production

  • Job Testing: Real background job processing during development

  • Resource Cleanup: Prevents orphaned worker processes

  • Development UX: Single command starts entire system

Testing

The FastAPI sandbox uses pytest with comprehensive WebSocket testing:

# Run all tests
pytest sandbox_fastapi/tests/

# Run specific test files
pytest sandbox_fastapi/tests/test_background_jobs.py
pytest sandbox_fastapi/tests/test_room_chat.py

# Run with coverage
pytest sandbox_fastapi/tests/ --cov=sandbox_fastapi

WebSocket Testing with Context Managers:

# From sandbox_fastapi/tests/test_background_jobs.py
import pytest
from chanx.fast_channels.testing import WebsocketCommunicator
from sandbox_fastapi.apps.background_jobs.consumer import BackgroundJobConsumer

@pytest.mark.asyncio
async def test_job_success(bg_worker):
    """Test real ARQ job processing."""
    async with WebsocketCommunicator(
        app, "/ws/background_jobs", consumer=BackgroundJobConsumer
    ) as comm:
        # Skip connection message
        await comm.receive_all_messages(stop_action="job_status")

        # Send job message
        job_message = JobMessage(payload=JobPayload(type="translate", content="hello"))
        await comm.send_message(job_message)

        # Receive queuing and queued messages
        replies = await comm.receive_all_messages()
        assert len(replies) == 2
        assert replies[0].payload["status"] == "queuing"
        assert replies[1].payload["status"] == "queued"

        # Process with real ARQ worker
        await bg_worker.async_run()

        # Receive job result
        results = await comm.receive_all_messages(stop_action=EVENT_ACTION_COMPLETE)
        translated_result = cast(JobStatusMessage, results[0])
        assert "Translated: 'hello' → 'hola'" in translated_result.payload["message"]

Room Testing with Path Parameters:

@pytest.mark.asyncio
async def test_room_connection():
    """Test dynamic room joining."""
    async with WebsocketCommunicator(
        app, "/ws/room/test-room", consumer=RoomChatConsumer
    ) as comm:
        # Send room message
        await comm.send_message(RoomChatMessage(payload=RoomMessagePayload(message="Hello room")))

        messages = await comm.receive_all_messages()
        assert "Hello room" in messages[0].payload.message
        assert messages[0].payload.room_name == "test-room"

Testing Features:

  • pytest-asyncio Integration for async WebSocket testing

  • WebsocketCommunicator Context Managers for automatic cleanup

  • Real ARQ Worker Testing with bg_worker fixture

  • Message Type Validation using Pydantic message objects

  • Completion Signal Testing with different stop actions

  • Path Parameter Testing for dynamic routing

  • Multi-Consumer Testing across different channel layer types

Configuration Patterns

1. Environment-based Configuration

import os

class BaseConsumer(AsyncJsonWebsocketConsumer):
    send_completion = bool(os.environ.get("SEND_COMPLETION", False))
    log_websocket_message = bool(os.environ.get("LOG_WEBSOCKET", True))

2. Per-Consumer Overrides

@channel(name="analytics")
class AnalyticsConsumer(BaseConsumer):
    channel_layer_alias = "analytics"  # Use analytics-specific layer
    log_ignored_actions = ["ping", "heartbeat"]  # Don't log frequent events

3. Dynamic Configuration

class RoomChatConsumer(BaseConsumer):
    async def get_groups(self) -> list[str]:
        room_name = self.scope["path_info"].split("/")[-1]
        return [f"room_{room_name}"]

Production Deployment

Key considerations for production deployment:

1. Channel Layer Scaling

  • Use Redis Cluster or RabbitMQ for high-availability channel layers

  • Configure appropriate connection pools and timeouts

2. Background Job Processing

  • Deploy ARQ workers as separate processes/containers

  • Use Redis Sentinel for worker queue high availability

3. WebSocket Load Balancing

  • Configure sticky sessions or use Redis for session storage

  • Consider using a WebSocket-aware load balancer

4. Monitoring and Observability

  • Enable structured logging with correlation IDs

  • Monitor WebSocket connection counts and message rates

  • Set up health checks for both HTTP and WebSocket endpoints

Learning Path

To understand the FastAPI integration:

  1. Start with base configuration (base_consumer.py)

  2. Examine the showcase consumers (apps/showcase/consumer.py)

  3. Study channel layer setup (layers.py)

  4. Review background job integration (apps/background_jobs/)

  5. Check the main FastAPI app (main.py)

  6. Run the development script (start_dev.py)

  7. Test with the HTML client (visit http://localhost:8080)

This example demonstrates how Chanx provides a consistent API across different ASGI frameworks while leveraging each framework's specific strengths.