Consumers & Decorators
Chanx transforms WebSocket development with decorator-based message handlers that automatically route messages, validate types, and generate documentation. This guide covers the core patterns you'll use to build WebSocket applications.
The AsyncJsonWebsocketConsumer Base Class
All Chanx consumers inherit from AsyncJsonWebsocketConsumer, which provides the foundation for decorator-based message handling:
from chanx.core.websocket import AsyncJsonWebsocketConsumer
from chanx.core.decorators import ws_handler, event_handler, channel
@channel(name="chat", description="Real-time chat system")
class ChatConsumer(AsyncJsonWebsocketConsumer):
# Your handlers go here
pass
Optional Generic Type Parameter:
You can optionally specify an event type for better static type checking with mypy/pyright:
# Without generic type (perfectly fine)
class ChatConsumer(AsyncJsonWebsocketConsumer):
pass
# With generic type for enhanced type checking
class ChatConsumer(AsyncJsonWebsocketConsumer[SystemNotifyEvent]):
pass
The generic type parameter helps with:
Type safety for
send_event(),broadcast_event()methodsBetter IDE support with autocomplete and type hints
Static analysis with mypy/pyright
Note: The generic type is purely optional and for developer convenience. It doesn't affect runtime behavior or functionality.
Key features provided by the base class:
Automatic message routing based on decorators
Type validation using Pydantic discriminated unions
Channel layer integration for group messaging
Authentication handling with configurable authenticators
Completion signals for reliable testing
Configurable behavior via class attributes
The @channel Decorator
The @channel decorator marks consumer classes and provides metadata for AsyncAPI documentation generation:
@channel(
name="notifications",
description="User notification system",
tags=["notifications", "real-time"]
)
class NotificationConsumer(AsyncJsonWebsocketConsumer):
pass
Parameters:
name (str, optional): Custom channel name (defaults to class name)
description (str, optional): Channel description (overrides class docstring)
tags (list[str], optional): Tags for AsyncAPI grouping
The @ws_handler Decorator
The @ws_handler decorator handles messages sent directly from WebSocket clients. Each handler method corresponds to a specific message type:
from chanx.messages.incoming import PingMessage
from chanx.messages.outgoing import PongMessage
@channel(name="example")
class ExampleConsumer(AsyncJsonWebsocketConsumer):
@ws_handler(summary="Handle ping requests")
async def handle_ping(self, message: PingMessage) -> PongMessage:
return PongMessage()
@ws_handler(
summary="Handle chat messages",
description="Process chat messages and broadcast to room members",
output_type=ChatNotificationMessage,
tags=["chat", "messaging"]
)
async def handle_chat(self, message: ChatMessage) -> None:
# Broadcast to group instead of direct response
await self.broadcast_message(
ChatNotificationMessage(
payload=ChatPayload(
message=f"User: {message.payload.message}"
)
)
)
Handler patterns:
1. Direct Response (return message):
@ws_handler
async def handle_ping(self, message: PingMessage) -> PongMessage:
return PongMessage() # Sent directly back to client
2. Broadcast Response (return None):
@ws_handler(output_type=NotificationMessage)
async def handle_broadcast(self, message: BroadcastMessage) -> None:
await self.broadcast_message(
NotificationMessage(payload="Broadcasted to all")
)
3. No Response:
@ws_handler
async def handle_log_event(self, message: LogMessage) -> None:
logger.info(f"Received: {message.payload}")
# No response sent
@ws_handler parameters:
func: The handler function (when used without parentheses)
action (str, optional): Action name (defaults to function name)
input_type (type, optional): Expected input message type
output_type (type, optional): Expected output message type for docs
summary (str, optional): Brief description for AsyncAPI
description (str, optional): Detailed description for AsyncAPI
tags (list[str], optional): Tags for AsyncAPI grouping
The @event_handler Decorator
The @event_handler decorator handles events sent through the channel layer from other parts of your application (HTTP views, background tasks, management scripts, etc.):
@event_handler(output_type=StreamingMessage)
async def handle_streaming(self, event: StreamingEvent) -> StreamingMessage:
"""Handle streaming events from background tasks."""
return StreamingMessage(payload=event.payload)
@event_handler
async def user_joined_room(self, event: UserJoinedEvent) -> None:
"""Handle user join events without direct response."""
await self.broadcast_message(
SystemMessage(payload=f"{event.payload.username} joined")
)
@event_handler return patterns:
Return message: - If triggered by
send_event()→ sends to the specific channel (WebSocket connection) - If triggered bybroadcast_event()→ broadcasts the message to all channels in the target groupsReturn None: Use for custom behavior - you can manually broadcast, send to specific channels, or perform side effects
@event_handler parameters:
func: The handler function (when used without parentheses)
input_type (type, optional): Expected event type for validation
output_type (type, optional): Expected output type for docs
summary (str, optional): Brief description for AsyncAPI
description (str, optional): Detailed description for AsyncAPI
tags (list[str], optional): Tags for AsyncAPI grouping
Message Types and Automatic Routing
Chanx uses discriminated unions to automatically route both WebSocket messages and events to the correct handlers. Messages and events are identified by their action field:
WebSocket Message Example:
from chanx.messages.base import BaseMessage
from typing import Literal
class ChatMessage(BaseMessage):
action: Literal["chat"] = "chat" # Discriminator field
payload: ChatPayload
class PingMessage(BaseMessage):
action: Literal["ping"] = "ping"
payload: None = None
Event Message Example:
class UserJoinedEvent(BaseMessage):
action: Literal["user_joined"] = "user_joined" # Discriminator field
payload: UserPayload
class SystemNotifyEvent(BaseMessage):
action: Literal["system_notify"] = "system_notify"
payload: NotificationPayload
How routing works:
For WebSocket messages:
1. Client sends: {"action": "chat", "payload": {"message": "Hello"}}
2. Framework validates against discriminated union of all input messages
3. Routes to @ws_handler method based on message type
4. Handler receives properly typed message object
For events:
1. Application sends: Consumer.send_event(UserJoinedEvent(...))
2. Framework validates against discriminated union of all event types
3. Routes to @event_handler method based on event type
4. Handler receives properly typed event object
The framework automatically:
Builds separate discriminated unions for WebSocket messages and events
Validates incoming JSON/events against the appropriate union
Routes to the correct handler method (
@ws_handleror@event_handler)Provides full type safety with IDE support for both message types
Output Messages and Broadcasting
Direct Response Messages:
@ws_handler
async def handle_get_status(self, message: GetStatusMessage) -> StatusMessage:
return StatusMessage(
payload=StatusPayload(
status="online",
timestamp=datetime.now()
)
)
Broadcasting Messages:
@ws_handler(output_type=ChatNotificationMessage)
async def handle_chat(self, message: ChatMessage) -> None:
# Broadcast to all connections in the same groups
await self.broadcast_message(
ChatNotificationMessage(payload=message.payload)
)
# Broadcast to specific groups
await self.broadcast_message(
ChatNotificationMessage(payload=message.payload),
groups=["room_123", "moderators"]
)
Group Management:
class RoomChatConsumer(AsyncJsonWebsocketConsumer):
async def post_authentication(self) -> None:
"""Called after successful authentication."""
# Extract room from URL parameters
room_id = self.scope["path_params"]["room_id"]
group_name = f"room_{room_id}"
# Join the room group
await self.channel_layer.group_add(group_name, self.channel_name)
self.groups.append(group_name)
# Notify others about the join
await self.broadcast_message(
UserJoinedMessage(payload={"user_id": self.user.id}),
groups=[group_name],
exclude_current=True # Don't send to self
)
Event Broadcasting from Anywhere
One of Chanx's most powerful features is sending events to WebSocket consumers from anywhere in your application:
From Django Views:
# views.py
from django.http import JsonResponse
from myapp.consumers import NotificationConsumer
from myapp.events import NewPostEvent
def create_post(request):
post = Post.objects.create(...)
# Send event to WebSocket consumers
NotificationConsumer.broadcast_event_sync(
NewPostEvent(payload={"post_id": post.id, "title": post.title}),
groups=["news_feed"]
)
return JsonResponse({"status": "created"})
From Celery Tasks:
# tasks.py
from celery import shared_task
from myapp.consumers import PaymentConsumer
from myapp.events import PaymentCompleteEvent
@shared_task
def process_payment(payment_id):
result = process_payment_logic(payment_id)
# Notify specific user
PaymentConsumer.send_event_sync(
PaymentCompleteEvent(payload={"status": result}),
channel_name=f"user_{payment.user_id}"
)
From Management Commands:
# management/commands/send_announcement.py
from django.core.management.base import BaseCommand
from myapp.consumers import AnnouncementConsumer
class Command(BaseCommand):
def add_arguments(self, parser):
parser.add_argument('message', type=str)
def handle(self, *args, **options):
AnnouncementConsumer.broadcast_event_sync(
SystemAnnouncementEvent(payload=options['message']),
groups=["all_users"]
)
Available event methods:
send_event_sync(): Send to specific channel name
broadcast_event_sync(): Send to groups
send_event(): Async version of send_event_sync
broadcast_event(): Async version of broadcast_event_sync
Note: When using the optional generic type parameter (AsyncJsonWebsocketConsumer[EventType]), these methods will provide better type checking and IDE support for the event parameter.
Consumer Configuration
Consumers can be configured via class attributes:
class MyConsumer(AsyncJsonWebsocketConsumer):
# Message behavior
send_completion = False # Whether to send completion signals
send_message_immediately = True # Yield control after sending
log_websocket_message = True # Log messages
log_ignored_actions = ["ping", "pong"] # Skip logging these
# Message formatting
camelize = False # Convert snake_case <-> camelCase
discriminator_field = "action" # Field for message routing
# Channel layer (required for non-Django frameworks)
channel_layer_alias = "default"
# Authentication
authenticator_class = MyAuthenticator
Authentication Integration
Django Example:
from chanx.ext.channels.authenticator import DjangoAuthenticator
from rest_framework.permissions import IsAuthenticated
class ChatAuthenticator(DjangoAuthenticator):
permission_classes = [IsAuthenticated]
# Optional: object-level permissions
queryset = ChatRoom.objects.all()
obj: ChatRoom
@channel(name="chat")
class ChatConsumer(AsyncJsonWebsocketConsumer):
authenticator_class = ChatAuthenticator
authenticator: ChatAuthenticator # Type hint
async def post_authentication(self) -> None:
"""Called after successful authentication."""
# Access authenticated user and object
user = self.authenticator.user
room = self.authenticator.obj # If using object-level auth
# Join room-specific group
await self.channel_layer.group_add(
f"room_{room.id}",
self.channel_name
)
FastAPI/Custom Example:
from chanx.core.authenticator import BaseAuthenticator
class TokenAuthenticator(BaseAuthenticator):
async def authenticate(self) -> bool:
token = self.get_query_param("token")
if not token:
return False
# Validate token logic here
self.user = await get_user_by_token(token)
return self.user is not None
class MyConsumer(AsyncJsonWebsocketConsumer):
authenticator_class = TokenAuthenticator
Complete Example: Chat Consumer
Here's a complete example that demonstrates all the concepts:
from chanx.core.decorators import ws_handler, event_handler, channel
from chanx.core.websocket import AsyncJsonWebsocketConsumer
from chanx.ext.channels.authenticator import DjangoAuthenticator
from rest_framework.permissions import IsAuthenticated
# Messages
class ChatMessage(BaseMessage):
action: Literal["chat"] = "chat"
payload: ChatPayload
class JoinRoomMessage(BaseMessage):
action: Literal["join_room"] = "join_room"
payload: JoinRoomPayload
# Events
class UserJoinedEvent(BaseChannelEvent):
handler: Literal["user_joined"] = "user_joined"
payload: UserPayload
# Authenticator
class RoomAuthenticator(DjangoAuthenticator):
permission_classes = [IsAuthenticated]
queryset = ChatRoom.objects.all()
obj: ChatRoom
# Consumer (generic type is optional - either approach works)
@channel(
name="room_chat",
description="Real-time chat for specific rooms",
tags=["chat", "rooms"]
)
class RoomChatConsumer(AsyncJsonWebsocketConsumer[UserJoinedEvent]): # Optional generic for type hints
authenticator_class = RoomAuthenticator
authenticator: RoomAuthenticator
# Configuration
log_ignored_actions = ["ping", "pong"]
async def post_authentication(self) -> None:
"""Join room group after authentication."""
room_group = f"room_{self.authenticator.obj.id}"
await self.channel_layer.group_add(room_group, self.channel_name)
self.groups.append(room_group)
@ws_handler(
summary="Handle chat messages",
output_type=ChatNotificationMessage
)
async def handle_chat(self, message: ChatMessage) -> None:
"""Process and broadcast chat messages."""
await self.broadcast_message(
ChatNotificationMessage(
payload=ChatNotificationPayload(
message=message.payload.message,
username=self.authenticator.user.username,
room_id=self.authenticator.obj.id
)
)
)
@event_handler(output_type=UserJoinedMessage)
async def user_joined(self, event: UserJoinedEvent) -> UserJoinedMessage:
"""Handle user join events from other parts of the app."""
return UserJoinedMessage(
payload=UserJoinedPayload(
username=event.payload.username,
message=f"{event.payload.username} joined the room"
)
)
Next Steps
Now that you understand Chanx's decorator-based approach, you can:
Learn about AsyncAPI Documentation documentation generation
Explore Testing WebSocket Consumers patterns for WebSocket consumers
Check out Framework Integration for Django views and FastAPI API endpoints
The decorator pattern eliminates the complexity of manual message routing while providing full type safety and automatic documentation generation. This makes WebSocket development as straightforward as building REST APIs.