Chat Application Example
This example demonstrates a complete real-time chat application built with Chanx. You'll learn how to create:
Chat message types with validation
Room-based WebSocket consumers
Group-based messaging
User presence tracking
Message history
Project Setup
First, let's define our chat room model:
# myapp/models.py
from django.db import models
from django.contrib.auth.models import User
class ChatRoom(models.Model):
"""Chat room model."""
name = models.CharField(max_length=100)
slug = models.SlugField(unique=True)
created_at = models.DateTimeField(auto_now_add=True)
members = models.ManyToManyField(User, related_name="chat_rooms")
def __str__(self):
return self.name
class ChatMessage(models.Model):
"""Model for storing chat message history."""
room = models.ForeignKey(ChatRoom, on_delete=models.CASCADE, related_name="messages")
user = models.ForeignKey(User, on_delete=models.CASCADE)
message = models.TextField()
timestamp = models.DateTimeField(auto_now_add=True)
class Meta:
ordering = ["timestamp"]
def __str__(self):
return f"{self.user.username}: {self.message[:20]}"
Message Definitions
Now let's define our WebSocket message types using Pydantic for validation:
# myapp/messages.py
from datetime import datetime
from typing import Literal, Optional, List, Dict, Any
from pydantic import Field
from chanx.messages.base import BaseMessage, BaseGroupMessage
from chanx.messages.incoming import PingMessage
class ChatMessagePayload(BaseMessage):
"""Chat message sent by a user."""
action: Literal["chat"] = "chat"
payload: str
class UserJoinedPayload(BaseGroupMessage):
"""Notification when a user joins a room."""
action: Literal["user_joined"] = "user_joined"
payload: dict = Field(default_factory=dict)
class UserLeftPayload(BaseGroupMessage):
"""Notification when a user leaves a room."""
action: Literal["user_left"] = "user_left"
payload: dict = Field(default_factory=dict)
class MessageHistoryPayload(BaseMessage):
"""Message containing chat history."""
action: Literal["history"] = "history"
payload: List[Dict[str, Any]]
# Define incoming message union
ChatIncomingMessage = PingMessage | ChatMessagePayload
WebSocket Consumer
Now we'll create our chat consumer with proper pattern matching:
# myapp/consumers.py
import json
from typing import Any, Iterable, Optional, cast
from django.contrib.auth.models import User
from rest_framework.authentication import SessionAuthentication
from rest_framework.permissions import IsAuthenticated
from chanx.generic.websocket import AsyncJsonWebsocketConsumer
from chanx.messages.base import BaseMessage
from chanx.messages.incoming import PingMessage
from chanx.messages.outgoing import PongMessage
from chanx.utils.asyncio import create_task
from myapp.models import ChatRoom, ChatMessage
from myapp.permissions import IsChatRoomMember
from myapp.messages import (
ChatIncomingMessage,
ChatMessagePayload,
UserJoinedPayload,
UserLeftPayload,
MessageHistoryPayload,
)
class ChatConsumer(AsyncJsonWebsocketConsumer[ChatIncomingMessage, None, ChatRoom]):
"""WebSocket consumer for chat rooms."""
# Authentication configuration
authentication_classes = [SessionAuthentication]
permission_classes = [IsAuthenticated, IsChatRoomMember]
queryset = ChatRoom.objects.all()
# Enable completion messages
send_completion = True
async def build_groups(self) -> Iterable[str]:
"""Build channel groups based on the chat room."""
assert self.obj is not None
return [f"chat_room_{self.obj.id}"]
async def post_authentication(self) -> None:
"""Actions after successful authentication."""
# Ensure we have user and object
assert self.user is not None
assert self.obj is not None
room = self.obj
user = self.user
# Send joined notification to the group
await self.send_group_message(
UserJoinedPayload(
payload={
"username": user.username,
"room_name": room.name,
"timestamp": datetime.now().isoformat(),
}
)
)
# Send message history to the new user
await self.send_message_history()
async def send_message_history(self) -> None:
"""Send recent message history to the user."""
assert self.obj is not None
room = self.obj
# Get last 50 messages
messages = await self.get_message_history(room, limit=50)
# Send history to the user
await self.send_message(
MessageHistoryPayload(payload=messages)
)
async def get_message_history(self, room: ChatRoom, limit: int = 50) -> list:
"""Get message history for a room."""
# Convert to list of dicts for serialization
messages = []
# Use sync_to_async to access the database
from asgiref.sync import sync_to_async
@sync_to_async
def get_messages():
return list(room.messages.select_related('user').order_by(
'-timestamp'
)[:limit])
db_messages = await get_messages()
for msg in reversed(db_messages):
messages.append({
"username": msg.user.username,
"message": msg.message,
"timestamp": msg.timestamp.isoformat(),
})
return messages
async def receive_message(self, message: ChatIncomingMessage, **kwargs: Any) -> None:
"""Handle incoming messages using pattern matching."""
match message:
case ChatMessagePayload(payload=text):
# Handle chat message
await self.handle_chat_message(text)
case PingMessage():
# Handle ping message
await self.send_message(PongMessage())
case _:
# Handle any other message types
pass
async def handle_chat_message(self, text: str) -> None:
"""Process and broadcast a chat message."""
assert self.user is not None
assert self.obj is not None
user = self.user
room = self.obj
# Save message to database
create_task(self.save_message_to_db(user, room, text))
# Create chat message
chat_message = ChatMessagePayload(payload=text)
# Broadcast to the group
await self.send_group_message(chat_message)
async def save_message_to_db(self, user: User, room: ChatRoom, text: str) -> None:
"""Save chat message to database."""
from asgiref.sync import sync_to_async
@sync_to_async
def save_message():
ChatMessage.objects.create(
room=room,
user=user,
message=text
)
await save_message()
async def websocket_disconnect(self, message) -> None:
"""Handle WebSocket disconnect."""
if hasattr(self, 'user') and self.user and not self.user.is_anonymous:
# User was authenticated, send left notification
user = self.user
if hasattr(self, 'obj') and self.obj:
room = self.obj
# Send user left notification
await self.send_group_message(
UserLeftPayload(
payload={
"username": user.username,
"room_name": room.name,
"timestamp": datetime.now().isoformat(),
}
)
)
# Call parent disconnect handler
await super().websocket_disconnect(message)
Define Permissions
Let's create a custom permission class:
# myapp/permissions.py
from rest_framework.permissions import BasePermission
class IsChatRoomMember(BasePermission):
"""
Permission to check if a user is a member of the chat room.
"""
def has_object_permission(self, request, view, obj):
return request.user in obj.members.all()
URL Routing
Set up the WebSocket URL routing:
# myapp/routing.py
from channels.routing import URLRouter
from chanx.routing import path
# Important: Name this variable 'router' for string-based includes
router = URLRouter([
path('<int:pk>/', ChatConsumer.as_asgi()),
])
# myproject/routing.py
from channels.routing import URLRouter
from chanx.routing import include, path
router = URLRouter([
path('ws/chat/', include('myapp.routing')),
])
ASGI Configuration
Configure the ASGI application:
# myproject/asgi.py
import os
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter
from channels.security.websocket import OriginValidator
from channels.sessions import CookieMiddleware
from django.conf import settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
django_asgi_app = get_asgi_application()
# Import the main router
from myproject.routing import router
application = ProtocolTypeRouter({
"http": django_asgi_app,
"websocket": OriginValidator(
CookieMiddleware(router),
settings.CORS_ALLOWED_ORIGINS,
),
})
Frontend Implementation
Here's a simple JavaScript client for connecting to our chat:
<!-- templates/chat_room.html -->
<!DOCTYPE html>
<html>
<head>
<title>{{ room.name }} - Chat</title>
<style>
#chat-log {
height: 400px;
overflow-y: scroll;
border: 1px solid #ccc;
padding: 10px;
margin-bottom: 10px;
}
.system-message {
color: #888;
font-style: italic;
}
.chat-message {
margin-bottom: 5px;
}
.message-user {
font-weight: bold;
}
.message-time {
color: #888;
font-size: 0.8em;
}
.message-mine {
color: blue;
}
</style>
</head>
<body>
<h1>{{ room.name }}</h1>
<div id="chat-log"></div>
<form id="chat-form">
<input type="text" id="chat-message-input" size="50">
<button type="submit">Send</button>
</form>
<script>
const roomId = {{ room.id }};
const username = "{{ request.user.username }}";
let chatSocket;
// Connect to WebSocket
function connectWebSocket() {
const wsProtocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
const wsUrl = `${wsProtocol}//${window.location.host}/ws/chat/${roomId}/`;
chatSocket = new WebSocket(wsUrl);
// Connection opened
chatSocket.onopen = function(e) {
console.log('WebSocket connection established');
addSystemMessage('Connected to chat');
};
// Listen for messages
chatSocket.onmessage = function(e) {
const data = JSON.parse(e.data);
console.log('Message received:', data);
// Handle different message types
switch (data.action) {
case 'chat':
addChatMessage(data);
break;
case 'user_joined':
addSystemMessage(`${data.payload.username} joined the room`);
break;
case 'user_left':
addSystemMessage(`${data.payload.username} left the room`);
break;
case 'history':
displayMessageHistory(data.payload);
break;
case 'authentication':
handleAuthentication(data);
break;
case 'error':
handleError(data);
break;
}
};
// Connection closed
chatSocket.onclose = function(e) {
console.log('WebSocket connection closed');
addSystemMessage('Disconnected from chat. Trying to reconnect...');
// Try to reconnect after 2 seconds
setTimeout(function() {
connectWebSocket();
}, 2000);
};
// Connection error
chatSocket.onerror = function(e) {
console.error('WebSocket error:', e);
addSystemMessage('Connection error occurred');
};
}
// Add a system message to the chat log
function addSystemMessage(message) {
const chatLog = document.querySelector('#chat-log');
const messageElement = document.createElement('div');
messageElement.className = 'system-message';
messageElement.textContent = message;
chatLog.appendChild(messageElement);
chatLog.scrollTop = chatLog.scrollHeight;
}
// Add a chat message to the chat log
function addChatMessage(data) {
const chatLog = document.querySelector('#chat-log');
const messageElement = document.createElement('div');
messageElement.className = 'chat-message';
const isMyMessage = data.is_mine === true;
const messageStyle = isMyMessage ? 'message-mine' : '';
const time = new Date().toLocaleTimeString();
messageElement.innerHTML = `
<span class="message-user ${messageStyle}">${isMyMessage ? 'You' : username}:</span>
<span class="message-content">${data.payload}</span>
<span class="message-time">${time}</span>
`;
chatLog.appendChild(messageElement);
chatLog.scrollTop = chatLog.scrollHeight;
}
// Display message history
function displayMessageHistory(messages) {
const chatLog = document.querySelector('#chat-log');
// Add a system message for history separation
const separator = document.createElement('div');
separator.className = 'system-message';
separator.textContent = '--- Previous Messages ---';
chatLog.appendChild(separator);
// Add each history message
messages.forEach(msg => {
const messageElement = document.createElement('div');
messageElement.className = 'chat-message';
const isMyMessage = msg.username === username;
const messageStyle = isMyMessage ? 'message-mine' : '';
const time = new Date(msg.timestamp).toLocaleTimeString();
messageElement.innerHTML = `
<span class="message-user ${messageStyle}">${isMyMessage ? 'You' : msg.username}:</span>
<span class="message-content">${msg.message}</span>
<span class="message-time">${time}</span>
`;
chatLog.appendChild(messageElement);
});
chatLog.scrollTop = chatLog.scrollHeight;
}
// Handle authentication messages
function handleAuthentication(data) {
if (data.payload.status_code === 200) {
console.log('Authentication successful');
} else {
addSystemMessage(`Authentication failed: ${data.payload.status_text}`);
console.error('Authentication failed:', data.payload);
}
}
// Handle error messages
function handleError(data) {
addSystemMessage(`Error: ${JSON.stringify(data.payload)}`);
console.error('Error received:', data);
}
// Send chat message
document.querySelector('#chat-form').addEventListener('submit', function(e) {
e.preventDefault();
const messageInput = document.querySelector('#chat-message-input');
const message = messageInput.value.trim();
if (message) {
// Send message to WebSocket
chatSocket.send(JSON.stringify({
action: 'chat',
payload: message
}));
// Clear input
messageInput.value = '';
}
});
// Connect when page loads
document.addEventListener('DOMContentLoaded', function() {
connectWebSocket();
});
</script>
</body>
</html>
Django View
Create a view to render the chat room page:
# myapp/views.py
from django.contrib.auth.decorators import login_required
from django.shortcuts import render, get_object_or_404
from myapp.models import ChatRoom
@login_required
def chat_room(request, room_id):
"""Render chat room page."""
# Get room and verify membership
room = get_object_or_404(ChatRoom, id=room_id)
# Add user to room members if not already a member
if request.user not in room.members.all():
room.members.add(request.user)
context = {
'room': room,
}
return render(request, 'chat_room.html', context)
URL Configuration
Add the view to your URL patterns:
# myapp/urls.py
from django.urls import path
from myapp import views
urlpatterns = [
path('chat/<int:room_id>/', views.chat_room, name='chat_room'),
]
Testing the Chat Consumer
For proper testing, make sure to configure completion messages in your test settings:
# settings/test.py
CHANX = {
"SEND_COMPLETION": True, # Essential for receive_all_json() to work properly
}
Let's write tests for our chat consumer:
# myapp/tests.py
from django.contrib.auth.models import User
from chanx.testing import WebsocketTestCase
from myapp.models import ChatRoom
from myapp.messages import ChatMessagePayload
class ChatConsumerTest(WebsocketTestCase):
"""Test the chat consumer."""
async def setUp(self):
# Create test user
self.user = await User.objects.acreate_user(
username='testuser',
password='testpassword'
)
# Create chat room
self.room = await ChatRoom.objects.acreate(
name='Test Room',
slug='test-room'
)
# Add user to room
await self.room.members.aadd(self.user)
# Set up WebSocket path
self.ws_path = f'/ws/chat/{self.room.id}/'
# Log in the test client
from asgiref.sync import sync_to_async
await sync_to_async(self.client.login)(username='testuser', password='testpassword')
await super().setUp()
def get_ws_headers(self):
"""Get session cookie for authentication."""
cookies = self.client.cookies
return [
(b"cookie", f"sessionid={cookies['sessionid'].value}".encode()),
]
async def test_connect_and_receive_history(self):
"""Test connecting to chat and receiving history."""
# Connect to WebSocket
await self.auth_communicator.connect()
# Verify authentication success
await self.auth_communicator.assert_authenticated_status_ok()
# Should receive user_joined notification and history
messages = await self.auth_communicator.receive_all_json()
# Verify we received at least 2 messages (user_joined and history)
self.assertGreaterEqual(len(messages), 2)
# Check for user_joined message
join_messages = [m for m in messages if m.get('action') == 'user_joined']
self.assertTrue(join_messages, "No user_joined message received")
# Check username in payload
self.assertEqual(
join_messages[0]['payload']['username'],
'testuser'
)
# Check for history message
history_messages = [m for m in messages if m.get('action') == 'history']
self.assertTrue(history_messages, "No history message received")
# Disconnect
await self.auth_communicator.disconnect()
async def test_chat_message(self):
"""Test sending and receiving chat messages."""
# Connect to WebSocket
await self.auth_communicator.connect()
await self.auth_communicator.assert_authenticated_status_ok()
# Skip authentication, join and history messages
await self.auth_communicator.receive_all_json()
# Send a chat message
message = "Hello, this is a test message!"
await self.auth_communicator.send_message(ChatMessagePayload(payload=message))
# Receive response (should get the same message back)
responses = await self.auth_communicator.receive_all_json(wait_group=True)
# Check if message was received properly
self.assertEqual(len(responses), 1)
self.assertEqual(responses[0]['action'], 'chat')
self.assertEqual(responses[0]['payload'], message)
self.assertTrue(responses[0]['is_mine'])
# Disconnect
await self.auth_communicator.disconnect()
Key Components Explained
This example demonstrates several key Chanx features:
Authentication & Permissions: Uses SessionAuthentication with a custom IsChatRoomMember permission
Structured Messages: Defines message types with Pydantic models for validation
Pattern Matching: Uses Python's match/case syntax for clean message handling
Group Management: Manages chat room groups with build_groups()
Database Integration: Saves messages to database with background tasks
Lifecycle Hooks: Uses post_authentication to send join messages
Type Safety: Employs proper typing and assertions for better code quality
Testing: Uses WebsocketTestCase for comprehensive testing
Additional Features
To enhance this chat application, consider adding:
Typing Indicators: Show when users are typing
Read Receipts: Track which messages have been read
Message Reactions: Allow emoji reactions to messages
File Sharing: Upload and share files in chat
User Presence: Show online/offline status of room members
These could be implemented as additional message types and consumer methods.
Conclusion
This example demonstrates how Chanx simplifies building a real-time chat application with Django. The framework provides:
Structured message handling with validation
Pattern-matching for elegant message processing
Automatic group management for multi-user rooms
Authentication and permission checking
Integration with Django models and database
Clean separation of concerns for maintainability
By following these patterns, you can build robust real-time applications that leverage Django's ecosystem while providing interactive WebSocket experiences.