Consumers

The AsyncJsonWebsocketConsumer class is the cornerstone of Chanx, providing a robust foundation for building WebSocket applications. This guide covers its features, configuration options, and best practices.

Consumer Basics

Chanx consumers extend Django Channels’ WebSocket consumers with:

  1. DRF-style authentication and permissions

  2. Structured message handling with validation

  3. Automatic group management

  4. Comprehensive error handling

  5. Logging and diagnostics

Minimal Consumer Example

Here’s a minimal Chanx consumer:

from chanx.generic.websocket import AsyncJsonWebsocketConsumer
from chanx.messages.incoming import IncomingMessage


class MyConsumer(AsyncJsonWebsocketConsumer):
    """Basic WebSocket consumer."""

    # Required: Specify the message schema
    INCOMING_MESSAGE_SCHEMA = IncomingMessage

    async def receive_message(self, message, **kwargs):
        """Handle incoming validated messages."""
        # Handle message based on its action
        if message.action == "ping":
            from chanx.messages.outgoing import PongMessage
            await self.send_message(PongMessage())

Consumer Lifecycle

A Chanx consumer follows this lifecycle:

  1. Connection: Client initiates WebSocket connection

  2. Authentication: Consumer authenticates the connection using DRF classes

  3. Group Setup: If authenticated, consumer joins channel groups

  4. Message Processing: Consumer handles incoming messages

  5. Disconnection: Client or server terminates the connection

Authentication Configuration

Configure authentication and permissions using DRF-style attributes:

from rest_framework.authentication import SessionAuthentication, TokenAuthentication
from rest_framework.permissions import IsAuthenticated


class SecureConsumer(AsyncJsonWebsocketConsumer):
    # Authentication classes determine how users are identified
    authentication_classes = [SessionAuthentication, TokenAuthentication]

    # Permission classes determine if authenticated users have access
    permission_classes = [IsAuthenticated]

    # For object-level permissions, provide a queryset
    queryset = Room.objects.all()

    # HTTP method to emulate for authentication
    auth_method = "get"  # Default is "get"

    INCOMING_MESSAGE_SCHEMA = IncomingMessage

Message Handling

The core of a consumer is the receive_message method which processes validated messages:

async def receive_message(self, message, **kwargs):
    """
    Handle incoming validated messages.

    Args:
        message: The validated message object
        **kwargs: Additional arguments from receive_json
    """
    # Access the action field to determine message type
    if message.action == "chat":
        # Access payload for message data
        text = message.payload

        # Create response message
        from myapp.messages import ChatResponse
        response = ChatResponse(payload=f"Received: {text}")

        # Send response to the client
        await self.send_message(response)

Group Management

Chanx simplifies WebSocket group management for pub/sub messaging:

class ChatConsumer(AsyncJsonWebsocketConsumer):

    async def build_groups(self):
        """
        Define which groups this consumer should join.

        Returns:
            Iterable of group names
        """
        # Get room ID from URL parameters
        room_id = self.scope["url_route"]["kwargs"].get("room_id", "default")

        # Return list of groups to join
        return [f"chat_room_{room_id}"]

    async def receive_message(self, message, **kwargs):
        if message.action == "chat":
            # Forward message to the entire group
            await self.send_group_message(message)

Sending Messages

Chanx provides several methods for sending messages:

# Send to the connected client
await self.send_message(MyMessage())

# Send to all clients in groups (excluding this one)
await self.send_group_message(
    GroupMessage(),
    exclude_current=True  # Don't echo to sender
)

# Send to specific groups
await self.send_group_message(
    GroupMessage(),
    groups=["custom_group"],  # Override default groups
    exclude_current=False     # Include sender
)

Configuration Options

Chanx consumers have several configuration options:

class ConfiguredConsumer(AsyncJsonWebsocketConsumer):
    # Authentication
    authentication_classes = [SessionAuthentication]
    permission_classes = [IsAuthenticated]
    queryset = None
    auth_method = "get"

    # Message handling
    INCOMING_MESSAGE_SCHEMA = MyIncomingMessage
    OUTGOING_GROUP_MESSAGE_SCHEMA = MyOutgoingGroupMessage

    # Behavior flags
    send_completion = True  # Send completion messages
    send_message_immediately = True  # Yield control after sending
    log_received_message = True  # Log received messages
    log_sent_message = True  # Log sent messages
    log_ignored_actions = ["ping", "pong"]  # Don't log these actions
    send_authentication_message = True  # Send auth status

Accessing User and Context

Within a consumer, you can access user information and context:

async def receive_message(self, message, **kwargs):
    # Access the authenticated user
    user = self.user

    # Access the Django request (from authentication)
    request = self.request

    # For consumers with object-level permissions, access the object
    obj = self.obj

    # Access the raw ASGI connection scope
    scope = self.scope

    # Access URL parameters
    url_params = self.scope["url_route"]["kwargs"]

    # Access query string parameters
    query_params = parse_qs(self.scope["query_string"].decode())

Post-Authentication Hook

You can perform custom actions after successful authentication:

async def post_authentication(self):
    """Execute after successful authentication."""
    # Perform custom initialization
    self.user_status = "online"

    # Record connection in database
    await self.update_user_status()

Error Handling

Chanx automatically handles most errors:

  1. Validation errors: Sends detailed error messages to the client

  2. Processing errors: Captures exceptions and sends generic error

  3. Authentication errors: Closes connection with authentication failure

For custom error handling:

async def receive_message(self, message, **kwargs):
    try:
        result = await self.process_message(message)
        await self.send_message(SuccessMessage(payload=result))
    except ValueError as e:
        # Send custom error for specific exceptions
        await self.send_message(ErrorMessage(payload={"detail": str(e)}))
    # Other exceptions are handled automatically

Testing Consumers

Chanx provides utilities for testing consumers:

from chanx.testing import WebsocketTestCase
from myapp.messages import ChatMessage


class TestChatConsumer(WebsocketTestCase):
    ws_path = "/ws/chat/room1/"

    async def test_chat_message(self):
        # Create and connect a websocket client
        communicator = self.create_communicator()
        connected, _ = await communicator.connect()
        self.assertTrue(connected)

        # Ensure authentication succeeded
        await communicator.assert_authenticated_status_ok()

        # Send a test message
        await communicator.send_message(ChatMessage(payload="Hello"))

        # Receive all messages until completion
        messages = await communicator.receive_all_json()

        # Assert on the received messages
        self.assertEqual(len(messages), 1)
        self.assertEqual(messages[0]["payload"], "Hello")

        # Disconnect
        await communicator.disconnect()

Best Practices

  1. Use type hints: Add proper type annotations for better IDE support

  2. Keep consumers focused: Each consumer should handle a specific domain

  3. Document message formats: Clearly document expected message structures

  4. Implement proper error handling: Provide meaningful error messages

  5. Use object-level permissions: For endpoints tied to specific resources

  6. Implement reconnection logic: Clients should handle reconnection

  7. Test thoroughly: Test both happy paths and error scenarios

Next Steps