Part 5: Integration Tests
In this final part, you'll write comprehensive integration tests for all your WebSocket consumers. You'll learn how to:
Use Chanx's
WebsocketTestCasefor testing WebSocket consumersMock external services (OpenAI API)
Test with real Celery workers
Use
receive_all_messages()with stop actions for predictable test assertionsConfigure pytest for Django projects
By the end, you'll have a complete test suite that ensures your WebSocket endpoints work correctly.
Starting Point
Make sure you've completed Part 4. If you want to start fresh from checkpoint 4:
git checkout cp4
git reset --hard
Step 1: Install Test Dependencies
Add pytest and pytest-django to your development dependencies:
uv add --dev pytest pytest-django
This installs:
pytest- Testing frameworkpytest-django- Django integration for pytest
Step 2: Configure Pytest
Create chanx_django/pytest.ini:
[pytest]
DJANGO_SETTINGS_MODULE = config.settings.test
# Warnings
filterwarnings =
ignore::DeprecationWarning
This configures pytest to:
Use the test settings module
Filter deprecation warnings
Create chanx_django/conftest.py:
"""Pytest configuration and fixtures for chanx_django project."""
from typing import Any
import pytest
pytest_plugins = ("celery.contrib.pytest",)
@pytest.fixture(scope="session")
def celery_config() -> dict[str, Any]:
"""Celery configuration for testing."""
return {
"broker_url": "memory://",
"result_backend": "cache+memory://",
"task_always_eager": False,
"task_eager_propagates": True,
}
This sets up Celery for testing:
pytest_plugins- Loads Celery's pytest plugin forcelery_workerfixturecelery_config- Configures Celery to use in-memory broker/backend for teststask_always_eager=False- Tasks run asynchronously (more realistic)
Step 3: Create Test Settings
Create chanx_django/config/settings/test.py:
from .base import * # noqa
CHANX = {"SEND_COMPLETION": True}
The SEND_COMPLETION setting enables completion signals after each message/event, which helps tests know when to stop receiving messages.
Step 4: Test Chat Consumer
Create test directory and files:
mkdir -p chanx_django/chat/tests
touch chanx_django/chat/tests/__init__.py
Create chanx_django/chat/tests/test_chat_consumer.py:
"""Tests for ChatConsumer WebSocket functionality."""
from typing import cast
from chanx.channels.testing import WebsocketTestCase
from chanx.constants import GROUP_ACTION_COMPLETE
from chanx.messages.incoming import PingMessage
from chanx.messages.outgoing import PongMessage
from chat.consumers.chat_consumer import ChatConsumer
from chat.messages import ChatMessagePayload, NewChatMessage
class ChatConsumerTestCase(WebsocketTestCase):
"""Unit tests for ChatConsumer - focuses on group chat broadcasting."""
consumer = ChatConsumer
def setUp(self) -> None:
super().setUp()
self.group_name = "test-room"
self.ws_path = f"/ws/chat/{self.group_name}/"
async def test_connect_and_ping(self) -> None:
"""Test basic connection and ping/pong functionality."""
await self.auth_communicator.connect()
await self.auth_communicator.send_message(PingMessage())
messages = await self.auth_communicator.receive_all_messages()
assert messages == [PongMessage()]
async def test_broadcast_message_to_group(self) -> None:
"""Test message broadcasting to all group members."""
# Connect two users to the same group
await self.auth_communicator.connect()
communicator2 = self.create_communicator()
await communicator2.connect()
# User 1 sends message
message = NewChatMessage(
payload=ChatMessagePayload(message="Hello everyone!", name="User1")
)
await self.auth_communicator.send_message(message)
# Only user 2 should receive message
messages1 = await self.auth_communicator.receive_all_messages(
stop_action=GROUP_ACTION_COMPLETE, timeout=0.2
)
messages2 = await communicator2.receive_all_messages(
stop_action=GROUP_ACTION_COMPLETE
)
assert len(messages1) == 0 # due to exclude_current=True
assert len(messages2) == 1
received2 = cast(NewChatMessage, messages2[0])
assert received2.action == "new_chat_message"
assert received2.payload.message == "Hello everyone!"
assert received2.payload.name == "User1"
async def test_group_isolation(self) -> None:
"""Test that messages are isolated to specific groups."""
# User 1 in room1
communicator_room1 = self.create_communicator(ws_path="/ws/chat/room1/")
await communicator_room1.connect()
# User 2 in room2
communicator_room2 = self.create_communicator(ws_path="/ws/chat/room2/")
await communicator_room2.connect()
# Send message in room1
message = NewChatMessage(
payload=ChatMessagePayload(message="Room1 message", name="User1")
)
await communicator_room1.send_message(message)
# User in room2 should not receive it
assert await communicator_room2.receive_nothing()
What we're testing:
test_connect_and_ping: Basic WebSocket connectivity
test_broadcast_message_to_group: Group broadcasting with
exclude_current=Truetest_group_isolation: Messages don't leak between different groups
Test flow (broadcast test):
Connect two users to the same group (
test-room)User 1 sends a chat message
User 1 receives nothing (
exclude_current=Truein consumer)User 2 receives the broadcasted message
Critical code:
# Create second communicator for multi-user test
communicator2 = self.create_communicator()
# Use GROUP_ACTION_COMPLETE for broadcast tests
messages = await communicator2.receive_all_messages(
stop_action=GROUP_ACTION_COMPLETE # Waits for group message
)
# Assert no messages received
assert await communicator_room2.receive_nothing()
Step 5: Test Assistants Consumer
Create assistants/tests/__init__.py (empty file):
mkdir -p chanx_django/assistants/tests
touch chanx_django/assistants/tests/__init__.py
Create chanx_django/assistants/tests/test_assistant_consumer.py:
"""Tests for ConversationAssistantConsumer WebSocket functionality."""
from typing import cast
from unittest.mock import Mock, patch
from chanx.channels.testing import WebsocketTestCase
from chanx.messages.incoming import PingMessage
from chanx.messages.outgoing import PongMessage
from assistants.conversation_consumer import ConversationAssistantConsumer
from assistants.messages import (
AssistantMessage,
CompleteStreamingMessage,
StreamingMessage,
StreamingStartMessage,
)
class TestConversationAssistantConsumer(WebsocketTestCase):
"""Unit tests for ConversationAssistantConsumer - focuses on AI streaming functionality."""
consumer = ConversationAssistantConsumer
def setUp(self) -> None:
super().setUp()
self.ws_path = "/ws/assistants/"
async def test_connect_and_ping(self) -> None:
"""Test basic connection and ping/pong functionality."""
await self.auth_communicator.connect()
await self.auth_communicator.send_message(PingMessage())
messages = await self.auth_communicator.receive_all_messages()
assert messages == [PongMessage()]
@patch("assistants.conversation_consumer.OpenAIService")
async def test_assistant_message_streaming_flow(
self, mock_openai_service: Mock
) -> None:
"""Test complete AI streaming flow: start → chunks → complete."""
# Mock AI service to return streaming tokens
mock_service = Mock()
mock_service.generate_stream.return_value = iter(["Hello", " ", "world"])
mock_openai_service.return_value = mock_service
await self.auth_communicator.connect()
# Send user message
await self.auth_communicator.send_message(
AssistantMessage(payload="Test message")
)
# Receive all streaming messages
messages = await self.auth_communicator.receive_all_messages()
# Verify message sequence: start → streaming chunks → complete
assert len(messages) >= 3
assert cast(StreamingStartMessage, messages[0]).action == "streaming_start"
assert (
cast(CompleteStreamingMessage, messages[-1]).action == "complete_streaming"
)
# Verify streaming chunks
streaming_chunks = messages[1:-1]
for chunk in streaming_chunks:
assert cast(StreamingMessage, chunk).action == "streaming"
# Verify service was called with user message
mock_service.generate_stream.assert_called_once()
call_args = mock_service.generate_stream.call_args
assert call_args[0][0] == "Test message"
assert call_args[0][1] == [] # Empty history on first message
@patch("assistants.conversation_consumer.OpenAIService")
async def test_conversation_history_maintained(
self, mock_openai_service: Mock
) -> None:
"""Test that conversation history is maintained across messages."""
mock_service = Mock()
mock_service.generate_stream.side_effect = [
iter(["First", " response"]),
iter(["Second", " response"]),
]
mock_openai_service.return_value = mock_service
await self.auth_communicator.connect()
# First message
await self.auth_communicator.send_message(
AssistantMessage(payload="First question")
)
await self.auth_communicator.receive_all_messages()
# Second message
await self.auth_communicator.send_message(
AssistantMessage(payload="Second question")
)
await self.auth_communicator.receive_all_messages()
# Verify history was maintained
assert mock_service.generate_stream.call_count == 2
second_call_history = mock_service.generate_stream.call_args_list[1][0][1]
assert len(second_call_history) == 2
assert second_call_history[0] == {"role": "user", "content": "First question"}
assert second_call_history[1] == {
"role": "assistant",
"content": "First response",
}
What we're testing:
test_connect_and_ping: Basic WebSocket connectivity
test_assistant_message_streaming_flow: Streaming sequence (start → chunks → complete) and mock verification
test_conversation_history_maintained: Stateful consumer maintains conversation context across messages
Test flow (streaming test):
Mock OpenAI service to return 3 tokens:
["Hello", " ", "world"]Send user message "Test message"
Receive streaming start signal
Receive 3 streaming chunks (one per token)
Receive streaming complete signal
Verify mock was called with correct message and empty history
Critical code:
# Mock OpenAI to return streaming tokens
@patch("assistants.conversation_consumer.OpenAIService")
mock_service.generate_stream.return_value = iter(["Hello", " ", "world"])
# Verify message sequence
assert messages[0].action == "streaming_start"
assert messages[-1].action == "complete_streaming"
# Inspect mock call arguments
call_args = mock_service.generate_stream.call_args
assert call_args[0][1] == [] # Empty history on first message
Step 6: Test System Consumer
Create system/tests/__init__.py (empty file):
mkdir -p chanx_django/system/tests
touch chanx_django/system/tests/__init__.py
Create chanx_django/system/tests/test_system_consumer.py:
"""Tests for SystemConsumer WebSocket functionality."""
from typing import cast
import pytest
from celery.apps.worker import Worker
from chanx.channels.testing import WebsocketTestCase
from chanx.constants import EVENT_ACTION_COMPLETE
from chanx.messages.incoming import PingMessage
from chanx.messages.outgoing import PongMessage
from system.consumers.system_consumer import SystemConsumer
from system.messages import (
JobQueued,
JobResult,
SystemMessage,
TaskPayload,
)
class TestSystemConsumer(WebsocketTestCase):
"""Unit tests for SystemConsumer - tests task queueing and event handling."""
consumer = SystemConsumer
@pytest.fixture(autouse=True)
def _inject_fixtures(
self,
celery_worker: Worker,
) -> None:
self.celery_worker: Worker = celery_worker
def setUp(self) -> None:
super().setUp()
self.ws_path = "/ws/system/"
async def test_connect_and_ping(self) -> None:
"""Test basic connection and ping/pong functionality."""
await self.auth_communicator.connect()
await self.auth_communicator.send_message(PingMessage())
messages = await self.auth_communicator.receive_all_messages()
assert messages == [PongMessage()]
async def test_queue_task_returns_acknowledgment(self) -> None:
"""Test that queueing a task returns acknowledgment."""
await self.auth_communicator.connect()
# Queue task
message = SystemMessage(
payload=TaskPayload(task_type="translate", content="hello")
)
await self.auth_communicator.send_message(message)
# Receive acknowledgment
messages = await self.auth_communicator.receive_all_messages(timeout=1)
assert len(messages) == 1
job_queued = cast(JobQueued, messages[0])
assert job_queued.action == "job_queued"
assert "Job queued" in job_queued.payload
assert "translate" in job_queued.payload
async def test_translate_task_end_to_end(self) -> None:
"""Test full translate task execution with real Celery worker."""
await self.auth_communicator.connect()
# Queue translate task
message = SystemMessage(
payload=TaskPayload(task_type="translate", content="hello")
)
await self.auth_communicator.send_message(message)
# Receive acknowledgment
messages = await self.auth_communicator.receive_all_messages()
assert len(messages) == 1
assert messages[0].action == "job_queued"
# Wait for task to complete (2s + buffer)
result_messages = await self.auth_communicator.receive_all_messages(
stop_action=EVENT_ACTION_COMPLETE, timeout=4
)
# Verify result
assert len(result_messages) == 1
job_result = cast(JobResult, result_messages[0])
assert job_result.action == "job_result"
assert "Translated" in job_result.payload
assert "hola" in job_result.payload
What we're testing:
test_connect_and_ping: Basic WebSocket connectivity
test_queue_task_returns_acknowledgment: Task queueing returns immediate acknowledgment
test_translate_task_end_to_end: Full flow with real Celery worker processing task and returning result via channel layer
Test flow (end-to-end test):
Connect to WebSocket
Send
SystemMessagewith task type "translate" and content "hello"Receive immediate
JobQueuedacknowledgmentCelery worker processes task (2 seconds)
Worker sends
JobResultviasend_event_sync()to channel layer@event_handlerreceives event and forwards to WebSocketClient receives
JobResultwith translation
Critical code:
# Inject real Celery worker fixture
@pytest.fixture(autouse=True)
def _inject_fixtures(self, celery_worker: Worker) -> None:
self.celery_worker: Worker = celery_worker
# First receive: immediate acknowledgment
messages = await self.auth_communicator.receive_all_messages()
assert messages[0].action == "job_queued"
# Second receive: wait for event from Celery worker
result_messages = await self.auth_communicator.receive_all_messages(
stop_action=EVENT_ACTION_COMPLETE, # Wait for event completion
timeout=4 # Task takes 2s + buffer
)
Step 7: Run Tests
Run all tests:
pytest chanx_django
Run specific test file:
pytest chanx_django/chat/tests/test_chat_consumer.py
Run with verbose output:
pytest chanx_django -v
Run specific test:
pytest chanx_django/system/tests/test_system_consumer.py::TestSystemConsumer::test_translate_task_end_to_end
You should see output like:
configfile: pytest.ini
plugins: anyio-4.11.0, django-4.11.1, langsmith-0.4.34
collected 9 items
chanx_django/assistants/tests/test_assistant_consumer.py ... [ 33%]
chanx_django/chat/tests/test_chat_consumer.py ... [ 66%]
chanx_django/system/tests/test_system_consumer.py ... [100%]
========================= 9 passed in 20.47s =========================
Testing Summary
Three consumer types, three test strategies:
Chat: Multi-user group broadcasting tests
Use
GROUP_ACTION_COMPLETEfor broadcastsCreate multiple communicators with
self.create_communicator()Test group isolation with different
ws_path
Assistants: Mocking external services (OpenAI)
Mock with
@patch("module.Service")Simulate streaming with
iter(["token1", "token2"])Verify message sequence and mock call arguments
System: Real Celery integration tests
Inject
celery_workerfixture for real task executionUse
EVENT_ACTION_COMPLETEfor channel layer eventsTest WebSocket → Celery → WebSocket flow
Stop actions control when to stop receiving:
MESSAGE_ACTION_COMPLETE- After direct responses (default)GROUP_ACTION_COMPLETE- After group broadcastsEVENT_ACTION_COMPLETE- After channel layer events
What You've Learned
Congratulations! You've completed the Chanx Django tutorial. You've built a full-featured real-time application with:
Part 1: Setup Chanx
✅ Installed and configured Chanx
✅ Set up WebSocket routing
✅ Enabled AsyncAPI documentation
Part 2: Chat WebSocket
✅ Created type-safe message models
✅ Built WebSocket consumer with
@ws_handler✅ Implemented group broadcasting
✅ Dynamic URL routing
Part 3: Assistants WebSocket
✅ Server-initiated streaming messages
✅ Stateful consumers with conversation history
✅ External API integration (OpenAI)
✅ Union types for multiple outputs
✅ Enhanced AsyncAPI metadata
Part 4: System WebSocket
✅ Channel layer events with
@event_handler✅ Celery background task integration
✅ Server-to-server communication
✅ Management commands
✅ Generic type parameters for type safety
Part 5: Integration Tests
✅ Comprehensive integration tests
✅ WebSocket testing with
WebsocketTestCase✅ Mocking external services
✅ Real Celery worker integration
✅ Stop actions for predictable assertions
The complete code is available at the cp5 branch:
git checkout cp5
Next Steps
Now that you've mastered Chanx fundamentals, explore:
User Guide: Deep dive into advanced features and patterns
API Reference: Complete API documentation
Examples: More real-world examples and use cases
AsyncAPI: Learn to customize your API documentation
Happy building with Chanx! 🚀