Consumers
The AsyncJsonWebsocketConsumer class is the cornerstone of Chanx, providing a robust foundation for building WebSocket applications. This guide covers its features, configuration options, and best practices.
Consumer Basics
Chanx consumers extend Django Channels' WebSocket consumers with:
DRF-style authentication and permissions
Structured message handling with validation
Automatic group management
Typed channel events
Comprehensive error handling
Logging and diagnostics
Minimal Consumer Example
Here's a minimal Chanx consumer:
from typing import Any
from chanx.generic.websocket import AsyncJsonWebsocketConsumer
from chanx.messages.base import BaseMessage
from chanx.messages.incoming import PingMessage
from chanx.messages.outgoing import PongMessage
class MyConsumer(AsyncJsonWebsocketConsumer[PingMessage]):
"""Basic WebSocket consumer."""
async def receive_message(self, message: PingMessage, **kwargs: Any) -> None:
"""Handle incoming validated messages."""
# Handle message using pattern matching
match message:
case PingMessage():
await self.send_message(PongMessage())
Consumer Lifecycle
A Chanx consumer follows this lifecycle:
Connection: Client initiates WebSocket connection
Authentication: Consumer authenticates the connection using DRF classes
Group Setup: If authenticated, consumer joins channel groups
Message Processing: Consumer handles incoming messages
Disconnection: Client or server terminates the connection
Authentication Configuration
Configure authentication and permissions using DRF-style attributes:
from typing import Any
from rest_framework.authentication import SessionAuthentication
from rest_framework.permissions import IsAuthenticated
from chanx.generic.websocket import AsyncJsonWebsocketConsumer
from chanx.messages.base import BaseMessage
from chat.models import Room
class SecureConsumer(AsyncJsonWebsocketConsumer[PingMessage, None, None, Room]):
# Authentication classes determine how users are identified
authentication_classes = [SessionAuthentication]
# Permission classes determine if authenticated users have access
permission_classes = [IsAuthenticated]
# For object-level permissions, provide a queryset
queryset = Room.objects.all()
# HTTP method to emulate for authentication
auth_method = "get" # Default is "get"
async def receive_message(self, message: PingMessage, **kwargs: Any) -> None:
# Only authenticated users reach this point
pass
Generic Type Parameters
Chanx consumers support four generic type parameters for improved type safety:
class AsyncJsonWebsocketConsumer(Generic[IC, Event, OG, M]):
"""
Base WebSocket consumer with generic type parameters.
Generic Parameters:
IC: Incoming message type (required) - Union of BaseMessage subclasses
Event: Channel event type (optional) - Union of BaseChannelEvent subclasses or None
OG: Outgoing group message type (optional) - BaseGroupMessage subclass or None
M: Model type for object-level permissions (optional) - Model subclass or None
"""
At minimum, you must specify the incoming message type:
# Simple consumer with just incoming message type
class SimpleConsumer(AsyncJsonWebsocketConsumer[PingMessage]):
...
# Full consumer with all generic parameters
class FullConsumer(AsyncJsonWebsocketConsumer[
ChatIncomingMessage, # Incoming message types
ChatEvent, # Channel events
ChatGroupMessage, # Outgoing group message
Room # Model for object permissions
]):
...
Message Handling
The core of a consumer is the receive_message method which processes validated messages:
async def receive_message(self, message: ChatIncomingMessage, **kwargs: Any) -> None:
"""
Process a validated received message.
Args:
message: The validated message object (typed as ChatIncomingMessage)
**kwargs: Additional arguments from receive_json
"""
# Use pattern matching for cleaner message handling
match message:
case ChatMessage(payload=payload):
# Handle chat message with extracted payload
await self.handle_chat(payload)
case NotificationMessage(payload=notification_payload):
# Handle notification with direct access to payload
await self.handle_notification(notification_payload)
case PingMessage():
# Handle standard ping message
await self.send_message(PongMessage())
Sending Messages
To send a message to the connected client:
# Create a message instance with structured payload
notification = NotificationMessage(payload={"type": "info", "text": "Update received"})
# Send it to the client
await self.send_message(notification)
Group Messaging
Chanx simplifies WebSocket group management for pub/sub messaging.
First, define your group message types:
from typing import Literal
from chanx.messages.base import BaseGroupMessage
# Define a group message type
class ChatGroupMessage(BaseGroupMessage):
"""Message type for group chat."""
action: Literal["chat_group"] = "chat_group"
payload: dict[str, str]
Then, configure your consumer to use these group message types:
class ChatConsumer(AsyncJsonWebsocketConsumer[ChatIncomingMessage, None, ChatGroupMessage]):
async def build_groups(self) -> list[str]:
"""
Define which groups this consumer should join.
Returns:
Iterable of group names
"""
# Get room ID from URL parameters
room_id = self.scope["url_route"]["kwargs"].get("room_id", "default")
# Return list of groups to join
return [f"chat_room_{room_id}"]
async def receive_message(self, message: ChatIncomingMessage, **kwargs: Any) -> None:
"""Handle incoming messages and broadcast to groups."""
match message:
case ChatMessage(payload=payload):
# Using send_group_message
username = getattr(self.user, 'username', 'Anonymous')
await self.send_group_message(
ChatGroupMessage(payload={"username": username, "content": payload.content}),
exclude_current=False # Include sender in recipients
)
case _:
pass
Group messages are automatically enhanced with metadata:
{
"action": "chat_group",
"payload": {
"username": "Alice",
"content": "Hello everyone!"
},
"is_mine": false,
"is_current": false
}
is_mine: True if the message originated from the current useris_current: True if the message came from this specific connection
Channel Events
Chanx provides a type-safe channel event system for sending events between consumers through the channel layer:
from typing import Literal
from chanx.messages.base import BaseChannelEvent
from pydantic import BaseModel
# Define channel event types
class NotifyEvent(BaseChannelEvent):
class Payload(BaseModel):
content: str
level: str = "info"
handler: Literal["notify"] = "notify"
payload: Payload
# Define event union type
ChatEvent = NotifyEvent # Can be a union of multiple event types
Configure your consumer to handle these events:
class ChatConsumer(AsyncJsonWebsocketConsumer[ChatIncomingMessage, ChatEvent]):
# Configure groups to receive events
groups = ["announcements"]
# Override receive_event method to handle all event types
async def receive_event(self, event: ChatEvent) -> None:
"""Handle channel events using pattern matching."""
match event:
case NotifyEvent():
notification = f"{event.payload.level.upper()}: {event.payload.content}"
await self.send_message(MessageResponse(payload={"text": notification}))
To send events from outside the consumer (e.g., from a Django view or task):
# Using synchronous code (e.g., in a Django view)
def send_notification(request):
ChatConsumer.send_channel_event(
"announcements", # Group name to send to
NotifyEvent(payload=NotifyEvent.Payload(
content="Important system notice",
level="warning"
))
)
return JsonResponse({"status": "sent"})
# Using asynchronous code
async def async_send_notification():
await ChatConsumer.asend_channel_event(
"announcements",
NotifyEvent(payload=NotifyEvent.Payload(
content="Important system notice",
level="warning"
))
)
The channel event system provides:
Type-safe event handling with Pydantic validation
Single method override (
receive_event) for handling all event typesPattern matching for different event types within the method
Automatic error handling and logging
Support for both sync and async code
Completion messages (if configured)
Accessing User and Context
Within a consumer, you can access user information and context:
async def receive_message(self, message: ChatIncomingMessage, **kwargs: Any) -> None:
# Access the authenticated user
user = self.user
# Access the Django request (from authentication)
request = self.request
# For consumers with object-level permissions, access the object
obj = self.obj # Typed based on M generic parameter
# Access the raw ASGI connection scope
scope = self.scope
# Access URL parameters
url_params = self.scope["url_route"]["kwargs"]
# Access query string parameters
from urllib.parse import parse_qs
query_params = parse_qs(self.scope["query_string"].decode())
Post-Authentication Hook
You can perform custom actions after successful authentication:
async def post_authentication(self) -> None:
"""Execute after successful authentication."""
# Perform custom initialization
self.user_status = "online"
# Record connection in database
await self.update_user_status()
# For object-based consumers, access the object
if self.obj:
# Initialize object-specific state
self.room = self.obj
self.member = await self.room.members.aget(user=self.user)
Error Handling
Chanx automatically handles most errors:
Validation errors: Sends detailed error messages to the client
Processing errors: Captures exceptions and sends generic error
Authentication errors: Closes connection with authentication failure
For custom error handling:
async def receive_message(self, message: ChatIncomingMessage, **kwargs: Any) -> None:
try:
match message:
case ChatMessage(payload=payload):
result = await self.process_chat(payload)
await self.send_message(SuccessMessage(payload=result))
except ValueError as e:
# Send custom error for specific exceptions
from chanx.messages.outgoing import ErrorMessage
await self.send_message(ErrorMessage(payload={"detail": str(e)}))
# Other exceptions are handled automatically
Real-World Example
Here's a complete example of a chat consumer:
from typing import Any, cast
from chanx.generic.websocket import AsyncJsonWebsocketConsumer
from chanx.messages.incoming import PingMessage
from chanx.messages.outgoing import PongMessage
from chat.messages.chat import (
ChatIncomingMessage,
JoinGroupMessage,
NewChatMessage,
)
from chat.messages.group import MemberMessage
from chat.models import ChatMember, ChatMessage, GroupChat
from chat.permissions import IsGroupChatMember
from chat.serializers import ChatMessageSerializer
from chat.utils import name_group_chat
class ChatDetailConsumer(
AsyncJsonWebsocketConsumer[
ChatIncomingMessage, None, MemberMessage, GroupChat
]
):
permission_classes = [IsGroupChatMember]
queryset = GroupChat.objects.get_queryset()
member: ChatMember
groups: list[str]
async def build_groups(self) -> list[str]:
assert self.obj
self.group_name = name_group_chat(self.obj.pk)
return [self.group_name]
async def post_authentication(self) -> None:
assert self.user is not None
assert self.obj
self.member = await self.obj.members.select_related("user").aget(user=self.user)
async def receive_message(self, message: ChatIncomingMessage, **kwargs: Any) -> None:
match message:
case PingMessage():
await self.send_message(PongMessage())
case NewChatMessage(payload=message_payload):
assert self.obj
new_message = await ChatMessage.objects.acreate(
content=message_payload.content,
group_chat_id=self.obj.pk,
sender=self.member,
)
groups = message_payload.groups
message_serializer = ChatMessageSerializer(instance=new_message)
await self.send_group_message(
MemberMessage(payload=cast(Any, message_serializer.data)),
groups=groups,
exclude_current=False,
)
case JoinGroupMessage(payload=join_group_payload):
await self.channel_layer.group_add(
join_group_payload.group_name, self.channel_name
)
self.groups.extend(join_group_payload.group_name)
Configuration Options
Chanx consumers have several configuration options:
class ConfiguredConsumer(AsyncJsonWebsocketConsumer[ChatIncomingMessage, None, ChatGroupMessage]):
# Authentication
authentication_classes = [SessionAuthentication]
permission_classes = [IsAuthenticated]
queryset = Room.objects.all()
auth_method = "get"
# Behavior flags
send_completion = True # Send completion messages
send_message_immediately = True # Yield control after sending
log_received_message = True # Log received messages
log_sent_message = True # Log sent messages
log_ignored_actions = ["ping", "pong"] # Don't log these actions
send_authentication_message = True # Send auth status
Best Practices
Use generic type parameters: Specify the message, event, and model types for better IDE support
Use pattern matching: Handle messages with clear match/case patterns
Keep consumers focused: Each consumer should handle a specific domain
Document message formats: Clearly document expected message structures
Implement proper error handling: Provide meaningful error messages
Use object-level permissions: For endpoints tied to specific resources
Include appropriate assertions: Use assert for type-checking in async methods
Test thoroughly: Test both happy paths and error scenarios
Next Steps
Authentication - Learn more about authentication options
Messages System - Explore the message validation system
Routing - Understand WebSocket URL routing
Testing - Learn how to test your consumers
Chat Application Example - See a complete chat application example