Testing WebSocket Consumers

Chanx provides enhanced testing utilities that make WebSocket testing reliable and straightforward. The testing framework includes completion signal handling, automatic message collection, and framework-specific authentication support.

Testing Configuration

Django Test Settings:

# settings/test.py
CHANX = {
    'SEND_COMPLETION': True,  # Important for receive_all_messages() to work
    'LOG_WEBSOCKET_MESSAGE': False,  # Reduce test noise
    'LOG_IGNORED_ACTIONS': [],
}

FastAPI/Other Frameworks:

# Configure via base consumer or environment
class TestBaseConsumer(AsyncJsonWebsocketConsumer):
    send_completion = True  # Enable completion signals for testing
    log_websocket_message = False  # Reduce test noise

Framework-Specific Testing

FastAPI and ASGI Frameworks:

  • Use WebsocketCommunicator from chanx.testing

  • Async context manager support for automatic cleanup

  • Direct pytest integration

Django Channels:

  • Use WebsocketTestCase from chanx.ext.channels.testing

  • Enhanced Django test case with authentication support

  • Automatic ASGI application discovery

FastAPI Testing Examples

Key Benefits of WebsocketCommunicator:

  • Automatic connection management: Use async with for auto-connect and auto-disconnect

  • No manual cleanup required: Context manager handles all connection lifecycle

  • Built-in message collection: Easily gather and assert on received messages

Basic Test Setup:

import pytest
from chanx.testing import WebsocketCommunicator
from chanx.constants import GROUP_ACTION_COMPLETE
from chanx.messages.incoming import PingMessage
from chanx.messages.outgoing import PongMessage

from myapp.main import app
from myapp.consumers import ChatConsumer

@pytest.mark.asyncio
async def test_ping_pong():
    # Context manager automatically connects and disconnects WebSocket
    async with WebsocketCommunicator(
        app, "/ws/chat", consumer=ChatConsumer
    ) as comm:
        # WebSocket is now connected and ready
        await comm.send_message(PingMessage())
        messages = await comm.receive_all_messages()
        # WebSocket automatically disconnected when exiting context
        assert len(messages) == 1
        assert isinstance(messages[0], PongMessage)

Testing Message Broadcasting:

@pytest.mark.asyncio
async def test_chat_broadcasting():
    # Note: consumer parameter is required for completion signals
    async with WebsocketCommunicator(
        app, "/ws/room/test", consumer=RoomChatConsumer
    ) as first_comm, WebsocketCommunicator(
        app, "/ws/room/test", consumer=RoomChatConsumer
    ) as second_comm:
        # Send a message that triggers broadcasting
        await first_comm.send_message(ChatMessage(payload={"message": "Hello"}))

        # Use GROUP_ACTION_COMPLETE for broadcast scenarios
        first_replies = await first_comm.receive_all_messages(
            stop_action=GROUP_ACTION_COMPLETE
        )
        second_replies = await second_comm.receive_all_messages(
            stop_action=GROUP_ACTION_COMPLETE
        )

        assert len(first_replies) == 1
        assert len(second_replies) == 1

Django Testing Examples

Basic Test Setup:

from chanx.ext.channels.testing import WebsocketTestCase
from chanx.constants import EVENT_ACTION_COMPLETE

class TestChatConsumer(WebsocketTestCase):
    consumer = ChatConsumer

    def setUp(self):
        super().setUp()
        self.ws_path = "/ws/chat/"

    def get_ws_headers(self):
        self.user, headers = self.create_user_and_ws_headers()
        return headers

    async def test_authenticated_chat(self):
        await self.auth_communicator.connect()
        await self.auth_communicator.assert_authenticated_status_ok()

        await self.auth_communicator.send_message(
            ChatMessage(payload={"message": "Hello"})
        )

        messages = await self.auth_communicator.receive_all_messages()
        assert len(messages) >= 1

Multi-User Testing:

async def test_multi_user_chat(self):
    # First user
    await self.auth_communicator.connect()
    await self.auth_communicator.assert_authenticated_status_ok()

    # Second user
    second_user, second_headers = await self.acreate_user_and_ws_headers()
    second_comm = self.create_communicator(headers=second_headers)
    await second_comm.connect()
    await second_comm.assert_authenticated_status_ok()

    # Test interaction between users

Key Testing Methods

WebsocketCommunicator (FastAPI/ASGI):

# Context manager (recommended) - auto-connects and auto-disconnects
async with WebsocketCommunicator(app, "/ws/chat", consumer=ChatConsumer) as comm:
    # Connection established automatically here
    await comm.send_message(PingMessage())
    messages = await comm.receive_all_messages()
    # Connection closed automatically when exiting context

# Manual connection management (if needed)
comm = WebsocketCommunicator(app, "/ws/chat", consumer=ChatConsumer)
await comm.connect()
# ... do work ...
await comm.disconnect()

# Message collection with completion signals
messages = await comm.receive_all_messages(
    stop_action=GROUP_ACTION_COMPLETE,
    timeout=2.0
)

Additional Testing Methods:

# Raw JSON methods (for low-level testing)
await comm.send_json({"action": "custom", "payload": {"data": "test"}})
json_response = await comm.receive_json(timeout=1.0)

# Receive exactly one message
single_message = await comm.receive_message(timeout=1.0)

# Verify no messages are sent (useful for negative testing)
await comm.receive_nothing(timeout=0.5)

# Receive all JSON (without message parsing)
all_json = await comm.receive_all_json(timeout=2.0)

# Receive all messages with different completion actions
messages = await comm.receive_all_messages(stop_action="complete")        # Default
messages = await comm.receive_all_messages(stop_action="group_complete")  # For broadcasts
messages = await comm.receive_all_messages(stop_action="event_complete")  # For events
messages = await comm.receive_all_messages(stop_action="custom_action")   # Any custom action

WebsocketTestCase (Django):

# Primary authenticated communicator
await self.auth_communicator.connect()
await self.auth_communicator.assert_authenticated_status_ok()

# Create additional communicators
second_comm = self.create_communicator(headers=different_headers)

# Event broadcasting tests
await ChatConsumer.broadcast_event(
    NotificationEvent(payload={"message": "test"}),
    groups=[f"user_{self.user.id}"]
)

Understanding Completion Actions

Completion actions determine when receive_all_messages() stops collecting messages:

from chanx.constants import (
    MESSAGE_ACTION_COMPLETE,   # "complete"
    GROUP_ACTION_COMPLETE,     # "group_complete"
    EVENT_ACTION_COMPLETE,     # "event_complete"
)

When to use each completion action:

  • MESSAGE_ACTION_COMPLETE (default): For simple request-response patterns with @ws_handler

  • GROUP_ACTION_COMPLETE: When testing message broadcasting to groups

  • EVENT_ACTION_COMPLETE: When testing send_event() or broadcast_event() calls

  • Custom action strings: Any custom message action can be used as a stop condition

Examples:

# Testing simple echo (ws_handler that returns directly)
await comm.send_message(EchoMessage(payload={"text": "hello"}))
responses = await comm.receive_all_messages()  # Uses MESSAGE_ACTION_COMPLETE

# Testing broadcast functionality
await comm.send_message(ChatMessage(payload={"text": "hello everyone"}))
responses = await comm.receive_all_messages(stop_action=GROUP_ACTION_COMPLETE)

# Testing event broadcasting from outside WebSocket
await ChatConsumer.broadcast_event(NotificationEvent(...), groups=["users"])
responses = await comm.receive_all_messages(stop_action=EVENT_ACTION_COMPLETE)

# Using custom action as stop condition
await comm.send_message(StartProcessMessage())
responses = await comm.receive_all_messages(stop_action="process_complete")  # Custom action

# Wait for specific status message
responses = await comm.receive_all_messages(stop_action="job_status")

Important: The consumer must be specified for completion signals to work:

# ✅ Correct - consumer specified
async with WebsocketCommunicator(app, "/ws/chat", consumer=ChatConsumer) as comm:
    messages = await comm.receive_all_messages(stop_action=GROUP_ACTION_COMPLETE)

# ❌ Incorrect - no consumer specified
async with WebsocketCommunicator(app, "/ws/chat") as comm:
    # Completion signals won't work properly

Common Testing Patterns

Connection Lifecycle:

async def test_connection_lifecycle(self):
    await self.auth_communicator.connect()
    await self.auth_communicator.assert_authenticated_status_ok()

    # Use connection
    await self.auth_communicator.send_message(TestMessage())
    messages = await self.auth_communicator.receive_all_messages()

    # Cleanup handled automatically

Event Broadcasting:

async def test_event_broadcasting(self):
    await self.auth_communicator.connect()
    await self.auth_communicator.assert_authenticated_status_ok()

    # Send event from outside WebSocket (HTTP view, task, etc.)
    await ChatConsumer.broadcast_event(
        NotificationEvent(payload={"message": "System notification"}),
        groups=[f"user_{self.user.id}"]
    )

    messages = await self.auth_communicator.receive_all_messages(
        stop_action=EVENT_ACTION_COMPLETE
    )
    assert len(messages) == 1

Testing Negative Scenarios:

async def test_no_unauthorized_messages(self):
    # Connect without authentication
    await self.communicator.connect()

    # Send a message that should be rejected
    await self.communicator.send_message(ProtectedMessage())

    # Verify no response is sent (should timeout)
    await self.communicator.receive_nothing(timeout=0.5)

async def test_invalid_message_ignored(self):
    await self.auth_communicator.connect()
    await self.auth_communicator.assert_authenticated_status_ok()

    # Send invalid JSON
    await self.auth_communicator.send_json({"invalid": "format"})

    # Should receive no messages back
    await self.auth_communicator.receive_nothing(timeout=0.5)

Streaming Messages:

@pytest.mark.asyncio
async def test_streaming_response():
    async with WebsocketCommunicator(app, "/ws/ai-chat", consumer=AIConsumer) as comm:
        await comm.send_message(GenerateStoryMessage(payload={"prompt": "Tell me a story"}))

        messages = await comm.receive_all_messages(
            stop_action=EVENT_ACTION_COMPLETE,
            timeout=5.0  # Longer timeout for AI responses
        )

        streaming_messages = [m for m in messages if m.action == "streaming"]
        assert len(streaming_messages) >= 1

Testing Best Practices

1. Use completion signals:

# Always use receive_all_messages() with appropriate stop_action
messages = await comm.receive_all_messages(
    stop_action=GROUP_ACTION_COMPLETE
)

2. Handle async properly:

# FastAPI: Mark tests as async
@pytest.mark.asyncio
async def test_something(): ...

# Django: Test methods are automatically async in WebsocketTestCase
async def test_something(self): ...

3. Clean up connections:

# FastAPI: Use context managers (automatic connect/disconnect)
async with WebsocketCommunicator(...) as comm:
    # WebSocket connects automatically when entering context
    # WebSocket disconnects automatically when exiting context
    pass

# Django: WebsocketTestCase handles cleanup automatically

4. Test both success and failure scenarios:

# Test successful authentication
await comm.assert_authenticated_status_ok()

# Test failed authentication
auth_msg = await comm.wait_for_auth()
assert auth_msg.payload.status_code == 403

Next Steps

With comprehensive testing utilities, you can ensure your WebSocket consumers work correctly across all scenarios. Continue to Framework Integration for Django views and FastAPI API endpoints that complement your WebSocket consumers.

The enhanced testing framework makes WebSocket testing as reliable as HTTP testing, with automatic cleanup, completion signals, and framework-specific authentication support.