Part 5: Comprehensive WebSocket Testing
In this final part, you'll learn how to test WebSocket consumers comprehensively. This demonstrates:
Setting up pytest for WebSocket testing
Using Chanx testing utilities
Testing message flows and broadcasting
Testing ARQ worker integration
Testing external messaging
Starting Point
Make sure you've completed Part 4. If you want to start fresh from checkpoint 4:
git checkout cp4
git reset --hard
Test Setup
Create src/pytest.ini for pytest configuration:
[pytest]
filterwarnings =
ignore::DeprecationWarning
env =
SEND_COMPLETION=True
asyncio_default_fixture_loop_scope = function
Understanding SEND_COMPLETION:
The SEND_COMPLETION=True environment variable enables special completion messages that are crucial for testing. When enabled:
After single message replies complete → sends
MESSAGE_ACTION_COMPLETEmessageAfter group broadcasts complete → sends
GROUP_ACTION_COMPLETEmessageAfter event handlers complete → sends
EVENT_ACTION_COMPLETEmessage
These completion messages tell tests when to stop waiting for messages. Without them, tests will wait until they hit the base timeout, making your test suite slower. With stop_action, tests can finish immediately when the expected message arrives, providing early circuit breaking and faster test execution. Our BaseConsumer checks this environment variable to enable this behavior during testing.
# In BaseConsumer
send_completion = bool(os.environ.get("SEND_COMPLETION", None))
In tests, you use these constants with stop_action:
from chanx.constants import MESSAGE_ACTION_COMPLETE, GROUP_ACTION_COMPLETE, EVENT_ACTION_COMPLETE
# Wait for single message reply to complete
replies = await comm.receive_all_messages(stop_action=MESSAGE_ACTION_COMPLETE)
# Wait for broadcast to complete
messages = await comm.receive_all_messages(stop_action=GROUP_ACTION_COMPLETE)
# Wait for event handler to complete
results = await comm.receive_all_messages(stop_action=EVENT_ACTION_COMPLETE)
Create src/conftest.py for shared fixtures:
from typing import Any
import pytest_asyncio
from arq import create_pool
from arq.worker import Worker
from .tasks import REDIS_SETTINGS, WorkerSettings
@pytest_asyncio.fixture(scope="function")
async def bg_worker() -> Any:
"""Create a real ARQ worker for testing."""
redis = await create_pool(REDIS_SETTINGS)
worker = Worker(
functions=WorkerSettings.functions,
redis_pool=redis,
burst=True, # Process jobs immediately and exit
poll_delay=0.1, # Fast polling for tests
)
yield worker
await redis.aclose()
Key points:
bg_workerfixture creates a real ARQ worker for testingburst=True- Worker processes all jobs and exits (perfect for tests)poll_delay=0.1- Fast polling for quicker tests
Create Tests Directory
Create the tests directory and init file:
mkdir -p src/tests
touch src/tests/__init__.py
Testing System Chat Consumer
Create src/tests/test_system_chat.py:
from typing import cast
import pytest
from chanx.fast_channels.testing import WebsocketCommunicator
from chanx.messages.incoming import PingMessage
from chanx.messages.outgoing import PongMessage
from src.apps.system_chat.consumer import SystemMessageConsumer
from src.apps.system_chat.messages import (
MessagePayload,
SystemEchoMessage,
UserMessage,
)
from src.main import app
@pytest.mark.asyncio
async def test_system_socket() -> None:
async with WebsocketCommunicator(
app, "/ws/system", consumer=SystemMessageConsumer
) as comm:
# Receive connection message
init_messages = await comm.receive_all_messages(stop_action="system_echo")
assert len(init_messages) == 1
init_message = cast(SystemEchoMessage, init_messages[0])
assert init_message.payload.message == "🔧 System: Connection established!"
# Test ping-pong
await comm.send_message(PingMessage())
replies = await comm.receive_all_messages()
assert len(replies) == 1
assert replies == [PongMessage()]
# Test echo
test_message = "This is a test message"
await comm.send_message(
UserMessage(payload=MessagePayload(message=test_message))
)
replies = await comm.receive_all_messages()
assert len(replies) == 1
assert replies == [
SystemEchoMessage(
payload=MessagePayload(message=f"🔧 System Echo: {test_message}")
)
]
Understanding WebsocketCommunicator:
async with WebsocketCommunicator(
app, # FastAPI app
"/ws/system", # WebSocket path
consumer=SystemMessageConsumer # Consumer class (optional but recommended)
) as comm:
Context manager handles connection/disconnection
send_message()- Send messages to WebSocketreceive_all_messages()- Receive messages from WebSocketstop_action- Stop receiving when message with this action arrives
Testing Room Chat Consumer
Create src/tests/test_room_chat.py:
from typing import cast
import pytest
from chanx.constants import GROUP_ACTION_COMPLETE
from chanx.fast_channels.testing import WebsocketCommunicator
from chanx.messages.incoming import PingMessage
from chanx.messages.outgoing import PongMessage
from src.apps.room_chat.consumer import RoomChatConsumer
from src.apps.room_chat.messages import (
RoomChatMessage,
RoomMessagePayload,
RoomNotificationMessage,
)
from src.main import app
@pytest.mark.asyncio
async def test_room_chat_ping() -> None:
room_name = "my-room"
async with WebsocketCommunicator(
app, f"/ws/room/{room_name}", consumer=RoomChatConsumer
) as comm:
await comm.send_message(PingMessage())
replies = await comm.receive_all_messages()
assert replies == [PongMessage()]
@pytest.mark.asyncio
async def test_room_chat_broadcast_messaging() -> None:
room_name = "my-room"
# Create two clients in the same room
first_comm = WebsocketCommunicator(
app, f"/ws/room/{room_name}", consumer=RoomChatConsumer
)
second_comm = WebsocketCommunicator(
app, f"/ws/room/{room_name}", consumer=RoomChatConsumer
)
# Connect first client
await first_comm.connect()
assert await first_comm.receive_nothing()
# Connect second client
await second_comm.connect()
# First client should receive join notification
notified_messages = await first_comm.receive_all_messages(
stop_action=GROUP_ACTION_COMPLETE
)
assert len(notified_messages) == 1
notified_message = cast(RoomNotificationMessage, notified_messages[0])
assert notified_message.payload.message == f"🚪 Someone joined room '{room_name}'"
# Second client doesn't see their own join (exclude_current=True)
assert await second_comm.receive_nothing()
# First client sends message
room_message = "This is a test message"
expected_message = RoomNotificationMessage(
payload=RoomMessagePayload(message=f"💬 {room_message}", room_name=room_name)
)
await first_comm.send_message(
RoomChatMessage(payload=RoomMessagePayload(message=room_message))
)
# First client receives their own broadcast (exclude_current=False in this consumer)
first_comm_replies = await first_comm.receive_all_messages(
stop_action=GROUP_ACTION_COMPLETE
)
assert len(first_comm_replies) == 1
assert first_comm_replies == [expected_message]
# Second client also receives the message
second_comm_replies = await second_comm.receive_all_messages(
stop_action=GROUP_ACTION_COMPLETE
)
assert len(second_comm_replies) == 1
assert second_comm_replies == [expected_message]
await first_comm.disconnect()
await second_comm.disconnect()
Key testing patterns:
GROUP_ACTION_COMPLETE- Special action sent after group broadcasts completereceive_nothing()- Assert no messages receivedMultiple communicators - Test broadcasting between clients
Manual
connect()/disconnect()- Control connection timing
Testing Background Jobs
Create src/tests/test_background_jobs.py:
from typing import Any, cast
import pytest
from chanx.constants import EVENT_ACTION_COMPLETE
from chanx.fast_channels.testing import WebsocketCommunicator
from src.apps.background_jobs.consumer import BackgroundJobConsumer
from src.apps.background_jobs.messages import (
JobMessage,
JobPayload,
JobStatusMessage,
)
from src.main import app
@pytest.mark.asyncio
async def test_job_success(bg_worker: Any) -> None:
"""Test successful job queuing and processing."""
async with WebsocketCommunicator(
app, "/ws/background_jobs", consumer=BackgroundJobConsumer
) as comm:
# Skip connection message
await comm.receive_all_messages(stop_action="job_status")
# Send job message
message_to_translate = "hello"
job_message = JobMessage(
payload=JobPayload(type="translate", content=message_to_translate)
)
await comm.send_message(job_message)
# Receive queuing and queued messages
replies = await comm.receive_all_messages()
assert len(replies) == 2
queuing_msg = cast(JobStatusMessage, replies[0])
assert queuing_msg.payload["status"] == "queuing"
queued_msg = cast(JobStatusMessage, replies[1])
assert queued_msg.payload["status"] == "queued"
# Process jobs with real ARQ worker
await bg_worker.async_run()
# Receive job result
results = await comm.receive_all_messages(stop_action=EVENT_ACTION_COMPLETE)
assert len(results) == 1
translated_result = cast(JobStatusMessage, results[0])
translated_text = f"🌍 Translated: '{message_to_translate}' → 'hola'"
assert translated_result == JobStatusMessage(
payload={"status": "result", "message": translated_text}
)
Key points:
bg_workerfixture - Real ARQ worker for testingbg_worker.async_run()- Process all queued jobsEVENT_ACTION_COMPLETE- Special action sent after event handler completesTests complete flow: queue → process → result
Testing External Messaging
Create src/tests/test_showcase.py (excerpt):
import pytest
from chanx.constants import GROUP_ACTION_COMPLETE
from chanx.fast_channels.testing import WebsocketCommunicator
from src.apps.showcase.consumer import ChatConsumer
from src.external_sender import send_chat_message
from src.main import app
@pytest.mark.asyncio
async def test_external_sender_broadcast() -> None:
"""Test external sender script broadcasts to consumers."""
chat_comm = WebsocketCommunicator(app, "/ws/chat", consumer=ChatConsumer)
await chat_comm.connect()
# Clear initial connection messages
await chat_comm.receive_all_messages(stop_action=GROUP_ACTION_COMPLETE)
# Call external sender function
await send_chat_message()
# Receive broadcasted message
chat_replies = await chat_comm.receive_all_messages(
stop_action=GROUP_ACTION_COMPLETE
)
assert len(chat_replies) == 1
# Assert message content...
await chat_comm.disconnect()
This tests that external scripts can successfully send messages to WebSocket clients.
Testing AsyncAPI Schema
Create src/tests/test_asyncapi_schema.py:
import json
from pathlib import Path
from fastapi.testclient import TestClient
from src.main import app
client = TestClient(app)
results_dir = Path(__file__).parent / "test_results"
def test_asyncapi_schema_html_doc() -> None:
response = client.get("/asyncapi")
assert response.status_code == 200
assert "AsyncApiStandalone.render" in response.text
assert "Websocket API documentation generated by Chanx" in response.text
def test_asyncapi_schema_json() -> None:
response = client.get("/asyncapi.json")
assert response.status_code == 200
data = response.json()
with open(results_dir / "asyncapi_schema_res.json", "w") as f:
json.dump(data, f, indent=2)
with open(results_dir / "asyncapi_schema.json") as f:
expected_data = json.load(f)
assert data == expected_data
def test_asyncapi_schema_yaml() -> None:
response = client.get("/asyncapi.yaml")
assert response.status_code == 200
data = response.text
with open(results_dir / "asyncapi_schema_res.yaml", "w") as f:
f.write(data)
with open(results_dir / "asyncapi_schema.yaml") as f:
expected_data = f.read()
assert data == expected_data
Understanding the tests:
test_asyncapi_schema_html_doc: Verifies the HTML documentation page renders correctly
test_asyncapi_schema_json: Compares the generated JSON schema against an expected baseline
test_asyncapi_schema_yaml: Compares the generated YAML schema against an expected baseline
The JSON and YAML tests write the actual results to *_res files, which allows you to:
See what was actually generated during the test run
Compare against expected baseline files (
asyncapi_schema.jsonandasyncapi_schema.yaml)Update baselines when you intentionally change your API
Note
You'll need to create the src/tests/test_results/ directory and baseline files. On first run, the test will fail. Copy the generated *_res files to create your baselines:
mkdir -p src/tests/test_results
# Run tests once to generate the _res files
pytest src/tests/test_asyncapi_schema.py || true
# Copy generated files as baselines
cp src/tests/test_results/asyncapi_schema_res.json src/tests/test_results/asyncapi_schema.json
cp src/tests/test_results/asyncapi_schema_res.yaml src/tests/test_results/asyncapi_schema.yaml
Running Tests
Run all tests:
pytest src
Run specific test file:
pytest src/tests/test_system_chat.py
Run with verbose output:
pytest src -v
Run with coverage:
pytest src --cov=src --cov-report=html
Run specific test:
pytest src/tests/test_background_jobs.py::test_job_success
Key Testing Patterns
Pattern 1: Basic message flow
async with WebsocketCommunicator(app, "/ws/path", consumer=Consumer) as comm:
await comm.send_message(InputMessage(...))
replies = await comm.receive_all_messages()
assert replies[0] == ExpectedMessage(...)
Pattern 2: Broadcasting between clients
comm1 = WebsocketCommunicator(app, "/ws/path", consumer=Consumer)
comm2 = WebsocketCommunicator(app, "/ws/path", consumer=Consumer)
await comm1.connect()
await comm2.connect()
await comm1.send_message(Message(...))
replies1 = await comm1.receive_all_messages(stop_action=GROUP_ACTION_COMPLETE)
replies2 = await comm2.receive_all_messages(stop_action=GROUP_ACTION_COMPLETE)
await comm1.disconnect()
await comm2.disconnect()
Pattern 3: Testing with ARQ worker
async def test_with_worker(bg_worker: Any) -> None:
async with WebsocketCommunicator(...) as comm:
await comm.send_message(JobMessage(...))
await comm.receive_all_messages() # Skip queuing messages
await bg_worker.async_run() # Process jobs
results = await comm.receive_all_messages(stop_action=EVENT_ACTION_COMPLETE)
assert results[0] == ExpectedResult(...)
Pattern 4: Testing server-initiated messages
async with WebsocketCommunicator(...) as comm:
# Receive connection message (server-initiated)
init_messages = await comm.receive_all_messages(stop_action="some_action")
assert init_messages[0] == WelcomeMessage(...)
Pattern 5: Testing external messaging
async with WebsocketCommunicator(...) as comm:
await comm.connect()
await comm.receive_all_messages(stop_action=GROUP_ACTION_COMPLETE)
# Call external function
await some_external_function()
# Receive broadcasted message
messages = await comm.receive_all_messages(stop_action=GROUP_ACTION_COMPLETE)
Common Assertions
Assert message count:
replies = await comm.receive_all_messages()
assert len(replies) == 2
Assert message content:
assert replies[0] == ExpectedMessage(payload=...)
assert replies[0].payload.field == "expected_value"
Assert no messages:
assert await comm.receive_nothing()
Assert message type:
from typing import cast
message = cast(ExpectedMessageType, replies[0])
assert message.action == "expected_action"
Troubleshooting
Test hangs waiting for messages:
Check if you're using the correct
stop_actionVerify the consumer actually sends messages
Use timeout:
await comm.receive_all_messages(timeout=1.0)
ARQ tests fail:
Ensure Redis is running:
docker compose up -dCheck ARQ worker fixture is being used
Verify task functions are registered in
WorkerSettings
Connection messages interfere:
Use
stop_actionto skip themOr call
receive_all_messages()to clear them before testing
Type checking issues:
Use
cast()for proper type hintsImport message types correctly
Conclusion
Congratulations! You've completed the entire Chanx FastAPI tutorial. You've learned:
Core Concepts:
✅ Type-safe WebSocket consumers with Pydantic
✅ Automatic message routing with
@ws_handler✅ Event handlers for server-to-server communication
✅ Direct WebSocket and channel layer communication
Advanced Features:
✅ Dynamic URL routing with path parameters
✅ Channel layers with Redis (Pub/Sub and Queue)
✅ Background job processing with ARQ
✅ External messaging from scripts/endpoints
✅ Multi-layer architecture
Testing:
✅ Comprehensive WebSocket testing with pytest
✅ Testing broadcasting and group messaging
✅ Testing ARQ worker integration
✅ Testing external messaging
You now have the knowledge to build production-ready WebSocket applications with FastAPI and Chanx!
Next Steps:
Build your own WebSocket application
Explore the FastAPI & ASGI Frameworks Integration for advanced features
Check out FastAPI Complete Example for more examples
Try the Django Tutorial: Prerequisites if you're interested in Django
Thank you for completing this FastAPI tutorial! Happy building with Chanx! 🚀