Django Complete Example
This example demonstrates a complete AI assistant WebSocket application using Django Channels and Chanx. The sandbox showcases real-time chat capabilities with AI streaming responses, user authentication, and database integration across multiple apps including assistants, group chat, and discussion forums.
Overview
The Django sandbox implements a comprehensive real-time system that features:
AI Assistant with streaming OpenAI responses and conversation management
Group Chat with dynamic member management and real-time messaging
Discussion Forums for structured conversations
Anonymous & Authenticated conversation support
Multiple Consumer Types demonstrating different WebSocket patterns
Event Broadcasting from HTTP endpoints to WebSocket clients
Production-ready patterns with structured logging and testing
Quick Start
Clone the repository:
git clone https://github.com/huynguyengl99/chanx cd chanx
Prerequisites: Ensure Docker and uv are installed
Start services: Run
docker compose up -dto start Redis and PostgreSQLInstall dependencies: Run
uv sync --all-extrasActivate virtual environment:
source .venv/bin/activate
Setup environment for AI integration (for ChatGPT functionality):
# Copy the example environment file cp .env.EXAMPLE .env # Edit .env and add your OpenAI credentials: # OPENAI_API_KEY=your_openai_api_key_here # OPENAI_ORG=your_openai_org_id_here (optional)
Setup database:
python sandbox_django/manage.py migrate python sandbox_django/manage.py createsuperuser
Run the server:
python sandbox_django/manage.py runserver
Access the application:
Main interface: http://localhost:8000/chat
Admin interface: http://localhost:8000/admin/
AsyncAPI docs: http://localhost:8000/asyncapi/
Login page: http://localhost:8000/login
Project Structure
The Django sandbox follows a clean app-based architecture:
sandbox_django/
├── config/ # Django project configuration
│ ├── settings/ # Environment-specific settings (dev/test/prod)
│ ├── routing.py # WebSocket URL routing with module includes
│ ├── asgi.py # ASGI application with security middleware
│ └── urls.py # HTTP URL configuration
├── assistants/ # AI assistant with streaming responses
│ ├── consumers/ # WebSocket consumers for AI chat
│ ├── models/ # Conversation & message models (UUID-based)
│ ├── messages/ # Pydantic message & event definitions
│ ├── tasks/ # AI service tasks (inline execution)
│ ├── services/ # OpenAI integration with LangChain
│ ├── views/ # REST API endpoints for HTTP integration
│ └── permissions.py # Custom conversation ownership permissions
├── chat/ # Group chat with member management
│ ├── consumers/ # WebSocket consumers for group messaging
│ ├── models/ # Group & member models with permissions
│ ├── messages/ # Group chat message definitions
│ └── permissions.py # Group membership permissions
├── discussion/ # Discussion forum functionality
├── accounts/ # Custom user model with auth integration
├── asyncapi/ # AsyncAPI documentation endpoints
└── test_utils/ # Shared testing utilities
Key WebSocket Consumers
1. AI Assistant Consumer (assistants/consumers/conversation_consumer.py)
The main AI assistant consumer handles both authenticated and anonymous conversations:
from chanx.channels.authenticator import DjangoAuthenticator
from chanx.channels.websocket import AsyncJsonWebsocketConsumer
from chanx.core.decorators import channel, event_handler, ws_handler
from chanx.messages.incoming import PingMessage
from chanx.messages.outgoing import PongMessage
from assistants.messages.assistant import (
AssistantErrorMessage,
AssistantEvent,
CompleteStreamingEvent,
CompleteStreamingMessage,
ErrorEvent,
NewAssistantMessage,
NewAssistantMessageEvent,
StreamingEvent,
StreamingMessage,
)
from assistants.models import AssistantConversation
from assistants.permissions import ConversationOwner
class AssistantAuthenticator(DjangoAuthenticator):
permission_classes = [ConversationOwner]
queryset = AssistantConversation.objects.all()
obj: AssistantConversation
@channel(
name="assistants", description="AI Assistant WebSocket API", tags=["ai", "chat"]
)
class ConversationAssistantConsumer(AsyncJsonWebsocketConsumer[AssistantEvent]):
"""WebSocket consumer for both authenticated and anonymous users with specific conversations."""
authenticator_class = AssistantAuthenticator
authenticator: AssistantAuthenticator
log_ignored_actions = ["streaming"]
async def post_authentication(self) -> None:
assert self.channel_layer
conversation_id = self.authenticator.obj.id
user = self.authenticator.user
if user and user.is_authenticated:
group_name = f"user_{user.pk}_conversation_{conversation_id}"
else:
group_name = f"anonymous_{conversation_id}"
await self.channel_layer.group_add(group_name, self.channel_name)
self.groups.append(group_name)
@ws_handler(
summary="Handle ping requests",
description="Simple ping-pong for connectivity testing",
)
async def handle_ping(self, _message: PingMessage) -> PongMessage:
return PongMessage()
@event_handler
async def handle_streaming(self, event: StreamingEvent) -> StreamingMessage:
return StreamingMessage(payload=event.payload)
@event_handler
async def handle_complete_streaming(
self, event: CompleteStreamingEvent
) -> CompleteStreamingMessage:
return CompleteStreamingMessage(payload=event.payload)
@event_handler
async def handle_new_assistant_message(
self, event: NewAssistantMessageEvent
) -> NewAssistantMessage:
return NewAssistantMessage(payload=event.payload)
@event_handler
async def handle_error_assistant_event(
self, event: ErrorEvent
) -> AssistantErrorMessage:
return AssistantErrorMessage(payload=event.payload)
Key Features:
Dual Authentication: Supports both authenticated users and anonymous conversations
UUID-based Conversations: Uses UUID primary keys for conversation identification
Custom Permission System: ConversationOwner permission ensures users can only access their own conversations
Event-Only Handler: Pure event-driven pattern - only handles events from HTTP endpoints, no direct client messages
2. Group Chat Consumer (chat/consumers/chat_detail.py)
Demonstrates different patterns for group-based real-time messaging:
class ChatDetailConsumer(AsyncJsonWebsocketConsumer[ChatDetailEvent]):
"""WebSocket consumer for group chat details."""
authenticator_class = ChatDetailAuthenticator
async def post_authentication(self) -> None:
"""Join the chat group after authentication."""
chat_member = self.authenticator.obj
group_name = name_group_chat(chat_member.pk)
await self.channel_layer.group_add(group_name, self.channel_name)
self.groups.append(group_name)
@event_handler
async def handle_member_removed(
self, event: NotifyMemberRemovedEvent
) -> UserRemovedFromGroupMessage | MemberRemovedMessage:
# Smart handling: different response for self vs others
if user and str(user.pk) == str(removed_user_pk):
return UserRemovedFromGroupMessage(payload=...)
return MemberRemovedMessage(payload=event.payload)
Authentication & Permissions
Custom DRF Integration:
class AssistantAuthenticator(DjangoAuthenticator):
permission_classes = [ConversationOwner]
queryset = AssistantConversation.objects.all()
obj: AssistantConversation
class ConversationOwner(BasePermission):
def has_object_permission(self, request, view, obj: AssistantConversation) -> bool:
# Allow anonymous conversations (user=None) for everyone
# Restrict user conversations to their owners only
if obj.user is not None and request.user != obj.user:
raise PermissionDenied()
return True
Features:
Object-level permissions using DRF permission classes
Path parameter extraction for conversation/group identification
Anonymous user support with different group naming patterns
Automatic group management in
post_authentication()
Message Types & Events
Streaming AI Response Messages:
# From assistants/messages/assistant.py
class StreamingMessage(BaseMessage):
"""Real-time streaming chunks from AI."""
action: Literal["streaming"] = "streaming"
payload: StreamingPayload
class StreamingPayload(BaseModel):
content: str
is_complete: bool = False
message_id: int
# Channel Events (sent from HTTP endpoints to consumers)
class StreamingEvent(BaseMessage):
action: Literal["handle_streaming"] = "handle_streaming"
payload: StreamingPayload
Event-Driven Architecture:
All AI processing happens in HTTP endpoints, which then broadcast events to WebSocket consumers:
# From assistants/tasks/assistant_tasks.py
def task_handle_new_assistant_message(user_message_id: int) -> None:
"""Called directly from HTTP view when user sends message."""
# Get conversation and build AI context
user_message = AssistantMessage.objects.get(id=user_message_id)
conversation = user_message.conversation
# Determine group name (authenticated vs anonymous)
if conversation.user is None:
group_name = f"anonymous_{conversation_id}"
else:
group_name = f"user_{conversation.user.pk}_conversation_{conversation_id}"
# Stream AI response chunks to WebSocket clients
for token in ai_service.generate_stream(user_content, history):
ConversationAssistantConsumer.broadcast_event_sync(
StreamingEvent(payload=StreamingPayload(
content=token, is_complete=False, message_id=user_message_id
)),
[group_name]
)
HTTP to WebSocket Integration
REST API Triggers WebSocket Events:
# From assistants/views/message_views.py
class AssistantMessageViewSet(ModelViewSet):
"""Handles both authenticated and anonymous message creation."""
def perform_create(self, serializer) -> None:
# Save user message to database
user_message = serializer.save(
conversation=conversation,
message_type=AssistantMessage.MessageType.USER
)
# Trigger AI response (calls task directly, not via queue)
task_handle_new_assistant_message(user_message_id=user_message.pk)
Key Integration Patterns:
HTTP POST to create messages → Task execution → WebSocket events → Client updates
Direct task calls from views (easily adaptable to Celery/ARQ/etc.)
Group-based broadcasting with user/conversation-specific channels
Anonymous conversation support with different group naming
Configuration
Production-Ready Django Settings:
# config/settings/base.py
INSTALLED_APPS = [
# Core Django apps
'django.contrib.admin',
'django.contrib.auth',
# Third-party integrations
'corsheaders',
'rest_framework',
'rest_framework_simplejwt.token_blacklist',
'drf_standardized_errors',
'django_structlog',
'channels',
'chanx.channels',
# Local apps
'accounts', 'assistants', 'chat', 'discussion',
]
# Chanx Configuration
CHANX = {
"CAMELIZE": True, # Convert snake_case ↔ camelCase for frontend
"ASYNCAPI_TITLE": "CHANX AsyncAPI Documentation",
"ASYNCAPI_DESCRIPTION": "Websocket schema of Chanx",
}
# WebSocket Configuration
ASGI_APPLICATION = "config.asgi.application"
CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels_redis.core.RedisChannelLayer",
"CONFIG": {"hosts": [REDIS_URL]},
},
}
# AI Integration
OPENAI_API_KEY = env.str("OPENAI_API_KEY", "")
OPENAI_ORG = env.str("OPENAI_ORG", "")
WebSocket Routing
Modular WebSocket URL Configuration:
from channels.routing import URLRouter
from chanx.channels.routing import include, path, re_path
ws_router = URLRouter(
[
# Use ws_include which returns a URLRouter
path("assistants/", include("assistants.routing")),
path("discussion/", include("discussion.routing")),
re_path("chat/", include("chat.routing")),
]
)
router = URLRouter(
[
re_path("ws/", include(ws_router)),
]
)
App-Specific Routing:
# assistants/routing.py
from chanx.channels.routing import re_path
router = URLRouter([
# UUID-based conversation routing
re_path(r"(?P<pk>[0-9a-f-]+)/", ConversationAssistantConsumer.as_asgi()),
])
ASGI Application with Security:
# config/asgi.py
application = ProtocolTypeRouter({
"http": django_asgi_app,
"websocket": OriginValidator(
CookieMiddleware(include("config.routing")),
settings.CORS_ALLOWED_ORIGINS + settings.CSRF_TRUSTED_ORIGINS,
),
})
Routing Features:
Modular app-based routing with include() patterns
UUID path parameters for conversation identification
Security middleware with CORS origin validation
Cookie middleware for session-based authentication
Database Models
Assistant Conversation Models:
# assistants/models/assistant_conversation.py
class AssistantConversation(models.Model):
"""A conversation thread with the AI assistant."""
id = models.UUIDField(primary_key=True, default=uuid4, editable=False)
user = models.ForeignKey(
settings.AUTH_USER_MODEL,
on_delete=models.CASCADE,
null=True, # Allow anonymous conversations
blank=True,
)
title = models.CharField(max_length=200, blank=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
def generate_title_from_first_message(self) -> None:
"""Auto-generate title using AI if not manually set."""
if not self.title:
first_message = self.messages.filter(
message_type=AssistantMessage.MessageType.USER
).first()
# Uses OpenAI to generate a concise title
generated_title = task_generate_conversation_title(first_message.content)
self.title = generated_title
self.save(update_fields=["title"])
class AssistantMessage(models.Model):
"""Individual messages within a conversation."""
class MessageType(models.TextChoices):
USER = "user", "User"
ASSISTANT = "assistant", "Assistant"
conversation = models.ForeignKey(AssistantConversation, on_delete=models.CASCADE)
message_type = models.CharField(max_length=20, choices=MessageType.choices)
content = models.TextField()
created_at = models.DateTimeField(auto_now_add=True)
Key Model Features:
UUID Primary Keys for secure, non-sequential identifiers
Anonymous Support with nullable user foreign keys
Auto-Generated Titles using AI service integration
Structured Message Types with clear user/assistant distinction
Testing
The Django sandbox uses pytest for comprehensive test coverage:
# Run all tests
pytest sandbox_django/
# Run specific app tests
pytest sandbox_django/assistants/tests/
# Run specific test files
pytest sandbox_django/chat/tests/consumers/test_chat_detail_consumer.py
WebSocket Testing with Chanx:
# From chat/tests/consumers/test_chat_detail_consumer.py
from chanx.channels.testing import WebsocketTestCase
from chanx.constants import EVENT_ACTION_COMPLETE
class TestChatDetailConsumer(WebsocketTestCase):
consumer = ChatDetailConsumer
def setUp(self) -> None:
super().setUp()
# Create test data
self.group_chat = GroupChat.objects.create(title="Test Group Chat")
ChatMemberFactory.create(user=self.user, group_chat=self.group_chat)
self.ws_path = f"/ws/chat/{self.group_chat.pk}/"
async def test_connect_successfully_and_ping(self) -> None:
"""Test basic connection and ping/pong functionality."""
await self.auth_communicator.connect()
await self.auth_communicator.assert_authenticated_status_ok()
await self.auth_communicator.send_message(PingMessage())
all_messages = await self.auth_communicator.receive_all_messages()
assert all_messages == [PongMessage()]
async def test_notify_member_add_event(self) -> None:
"""Test consumer handles member addition events correctly."""
await self.auth_communicator.connect()
await self.auth_communicator.assert_authenticated_status_ok()
# Send event directly to test consumer's event handling
await ChatDetailConsumer.broadcast_event(
NotifyMemberAddedEvent(payload=test_member_payload),
[f"group_chat.{self.group_chat.pk}"],
)
# Verify consumer processed and forwarded the event
all_messages = await self.auth_communicator.receive_all_messages(
stop_action=EVENT_ACTION_COMPLETE
)
assert len(all_messages) == 1
message = cast(MemberAddedMessage, all_messages[0])
assert message.action == "member_added"
Testing Features:
pytest Integration with Django test database
WebsocketTestCase for consumer testing with authentication
Factory Boy for test data generation
Event Broadcasting Tests with completion signals
Multi-user Testing with separate communicators
Async Test Support with proper cleanup
Production Considerations
The Django example demonstrates production-ready patterns:
1. Security
CORS configuration for cross-origin WebSocket connections
Authentication required for sensitive operations
Permission-based access control
2. Scalability
Redis channel layer for multi-server deployments
Task functions ready for background worker integration (Celery, ARQ, TaskIQ, etc.)
Database connection pooling
3. Monitoring
Structured logging with request correlation IDs
AsyncAPI documentation for API contracts
Learning Path
To understand the Django integration:
Start with the consumer (
assistants/consumers/conversation_consumer.py)Examine message types (
assistants/messages/assistant.py)Review authentication (
assistants/permissions.py)Study the routing (
config/routing.py)Look at background tasks (
assistants/tasks/)Check the tests (
assistants/tests/)
This example provides a solid foundation for building production WebSocket applications with Django Channels and Chanx.