Django Complete Example

This example demonstrates a complete AI assistant WebSocket application using Django Channels and Chanx. The sandbox showcases real-time chat capabilities with AI streaming responses, user authentication, and database integration across multiple apps including assistants, group chat, and discussion forums.

Overview

The Django sandbox implements a comprehensive real-time system that features:

  • AI Assistant with streaming OpenAI responses and conversation management

  • Group Chat with dynamic member management and real-time messaging

  • Discussion Forums for structured conversations

  • Anonymous & Authenticated conversation support

  • Multiple Consumer Types demonstrating different WebSocket patterns

  • Event Broadcasting from HTTP endpoints to WebSocket clients

  • Production-ready patterns with structured logging and testing

Quick Start

  1. Clone the repository:

    git clone https://github.com/huynguyengl99/chanx
    cd chanx
    
  2. Prerequisites: Ensure Docker and uv are installed

  3. Start services: Run docker compose up -d to start Redis and PostgreSQL

  4. Install dependencies: Run uv sync --all-extras

  5. Activate virtual environment:

    source .venv/bin/activate
    
  6. Setup environment for AI integration (for ChatGPT functionality):

    # Copy the example environment file
    cp .env.EXAMPLE .env
    # Edit .env and add your OpenAI credentials:
    # OPENAI_API_KEY=your_openai_api_key_here
    # OPENAI_ORG=your_openai_org_id_here (optional)
    
  7. Setup database:

    python sandbox_django/manage.py migrate
    python sandbox_django/manage.py createsuperuser
    
  8. Run the server:

    python sandbox_django/manage.py runserver
    
  9. Access the application:

Project Structure

The Django sandbox follows a clean app-based architecture:

sandbox_django/
├── config/                   # Django project configuration
│   ├── settings/            # Environment-specific settings (dev/test/prod)
│   ├── routing.py           # WebSocket URL routing with module includes
│   ├── asgi.py             # ASGI application with security middleware
│   └── urls.py             # HTTP URL configuration
├── assistants/             # AI assistant with streaming responses
│   ├── consumers/          # WebSocket consumers for AI chat
│   ├── models/             # Conversation & message models (UUID-based)
│   ├── messages/           # Pydantic message & event definitions
│   ├── tasks/              # AI service tasks (inline execution)
│   ├── services/           # OpenAI integration with LangChain
│   ├── views/              # REST API endpoints for HTTP integration
│   └── permissions.py      # Custom conversation ownership permissions
├── chat/                   # Group chat with member management
│   ├── consumers/          # WebSocket consumers for group messaging
│   ├── models/             # Group & member models with permissions
│   ├── messages/           # Group chat message definitions
│   └── permissions.py      # Group membership permissions
├── discussion/             # Discussion forum functionality
├── accounts/               # Custom user model with auth integration
├── asyncapi/               # AsyncAPI documentation endpoints
└── test_utils/             # Shared testing utilities

Key WebSocket Consumers

1. AI Assistant Consumer (assistants/consumers/conversation_consumer.py)

The main AI assistant consumer handles both authenticated and anonymous conversations:

from chanx.channels.authenticator import DjangoAuthenticator
from chanx.channels.websocket import AsyncJsonWebsocketConsumer
from chanx.core.decorators import channel, event_handler, ws_handler
from chanx.messages.incoming import PingMessage
from chanx.messages.outgoing import PongMessage

from assistants.messages.assistant import (
    AssistantErrorMessage,
    AssistantEvent,
    CompleteStreamingEvent,
    CompleteStreamingMessage,
    ErrorEvent,
    NewAssistantMessage,
    NewAssistantMessageEvent,
    StreamingEvent,
    StreamingMessage,
)
from assistants.models import AssistantConversation
from assistants.permissions import ConversationOwner


class AssistantAuthenticator(DjangoAuthenticator):
    permission_classes = [ConversationOwner]
    queryset = AssistantConversation.objects.all()
    obj: AssistantConversation


@channel(
    name="assistants", description="AI Assistant WebSocket API", tags=["ai", "chat"]
)
class ConversationAssistantConsumer(AsyncJsonWebsocketConsumer[AssistantEvent]):
    """WebSocket consumer for both authenticated and anonymous users with specific conversations."""

    authenticator_class = AssistantAuthenticator
    authenticator: AssistantAuthenticator

    log_ignored_actions = ["streaming"]

    async def post_authentication(self) -> None:
        assert self.channel_layer
        conversation_id = self.authenticator.obj.id
        user = self.authenticator.user

        if user and user.is_authenticated:
            group_name = f"user_{user.pk}_conversation_{conversation_id}"
        else:
            group_name = f"anonymous_{conversation_id}"
        await self.channel_layer.group_add(group_name, self.channel_name)
        self.groups.append(group_name)

    @ws_handler(
        summary="Handle ping requests",
        description="Simple ping-pong for connectivity testing",
    )
    async def handle_ping(self, _message: PingMessage) -> PongMessage:
        return PongMessage()

    @event_handler
    async def handle_streaming(self, event: StreamingEvent) -> StreamingMessage:
        return StreamingMessage(payload=event.payload)

    @event_handler
    async def handle_complete_streaming(
        self, event: CompleteStreamingEvent
    ) -> CompleteStreamingMessage:
        return CompleteStreamingMessage(payload=event.payload)

    @event_handler
    async def handle_new_assistant_message(
        self, event: NewAssistantMessageEvent
    ) -> NewAssistantMessage:
        return NewAssistantMessage(payload=event.payload)

    @event_handler
    async def handle_error_assistant_event(
        self, event: ErrorEvent
    ) -> AssistantErrorMessage:
        return AssistantErrorMessage(payload=event.payload)

Key Features:

  • Dual Authentication: Supports both authenticated users and anonymous conversations

  • UUID-based Conversations: Uses UUID primary keys for conversation identification

  • Custom Permission System: ConversationOwner permission ensures users can only access their own conversations

  • Event-Only Handler: Pure event-driven pattern - only handles events from HTTP endpoints, no direct client messages

2. Group Chat Consumer (chat/consumers/chat_detail.py)

Demonstrates different patterns for group-based real-time messaging:

class ChatDetailConsumer(AsyncJsonWebsocketConsumer[ChatDetailEvent]):
    """WebSocket consumer for group chat details."""

    authenticator_class = ChatDetailAuthenticator

    async def post_authentication(self) -> None:
        """Join the chat group after authentication."""
        chat_member = self.authenticator.obj
        group_name = name_group_chat(chat_member.pk)

        await self.channel_layer.group_add(group_name, self.channel_name)
        self.groups.append(group_name)

    @event_handler
    async def handle_member_removed(
        self, event: NotifyMemberRemovedEvent
    ) -> UserRemovedFromGroupMessage | MemberRemovedMessage:
        # Smart handling: different response for self vs others
        if user and str(user.pk) == str(removed_user_pk):
            return UserRemovedFromGroupMessage(payload=...)
        return MemberRemovedMessage(payload=event.payload)

Authentication & Permissions

Custom DRF Integration:

class AssistantAuthenticator(DjangoAuthenticator):
    permission_classes = [ConversationOwner]
    queryset = AssistantConversation.objects.all()
    obj: AssistantConversation

class ConversationOwner(BasePermission):
    def has_object_permission(self, request, view, obj: AssistantConversation) -> bool:
        # Allow anonymous conversations (user=None) for everyone
        # Restrict user conversations to their owners only
        if obj.user is not None and request.user != obj.user:
            raise PermissionDenied()
        return True

Features:

  • Object-level permissions using DRF permission classes

  • Path parameter extraction for conversation/group identification

  • Anonymous user support with different group naming patterns

  • Automatic group management in post_authentication()

Message Types & Events

Streaming AI Response Messages:

# From assistants/messages/assistant.py
class StreamingMessage(BaseMessage):
    """Real-time streaming chunks from AI."""
    action: Literal["streaming"] = "streaming"
    payload: StreamingPayload

class StreamingPayload(BaseModel):
    content: str
    is_complete: bool = False
    message_id: int

# Channel Events (sent from HTTP endpoints to consumers)
class StreamingEvent(BaseMessage):
    action: Literal["handle_streaming"] = "handle_streaming"
    payload: StreamingPayload

Event-Driven Architecture:

All AI processing happens in HTTP endpoints, which then broadcast events to WebSocket consumers:

# From assistants/tasks/assistant_tasks.py
def task_handle_new_assistant_message(user_message_id: int) -> None:
    """Called directly from HTTP view when user sends message."""

    # Get conversation and build AI context
    user_message = AssistantMessage.objects.get(id=user_message_id)
    conversation = user_message.conversation

    # Determine group name (authenticated vs anonymous)
    if conversation.user is None:
        group_name = f"anonymous_{conversation_id}"
    else:
        group_name = f"user_{conversation.user.pk}_conversation_{conversation_id}"

    # Stream AI response chunks to WebSocket clients
    for token in ai_service.generate_stream(user_content, history):
        ConversationAssistantConsumer.broadcast_event_sync(
            StreamingEvent(payload=StreamingPayload(
                content=token, is_complete=False, message_id=user_message_id
            )),
            [group_name]
        )

HTTP to WebSocket Integration

REST API Triggers WebSocket Events:

# From assistants/views/message_views.py
class AssistantMessageViewSet(ModelViewSet):
    """Handles both authenticated and anonymous message creation."""

    def perform_create(self, serializer) -> None:
        # Save user message to database
        user_message = serializer.save(
            conversation=conversation,
            message_type=AssistantMessage.MessageType.USER
        )

        # Trigger AI response (calls task directly, not via queue)
        task_handle_new_assistant_message(user_message_id=user_message.pk)

Key Integration Patterns:

  • HTTP POST to create messages → Task executionWebSocket eventsClient updates

  • Direct task calls from views (easily adaptable to Celery/ARQ/etc.)

  • Group-based broadcasting with user/conversation-specific channels

  • Anonymous conversation support with different group naming

Configuration

Production-Ready Django Settings:

# config/settings/base.py
INSTALLED_APPS = [
    # Core Django apps
    'django.contrib.admin',
    'django.contrib.auth',
    # Third-party integrations
    'corsheaders',
    'rest_framework',
    'rest_framework_simplejwt.token_blacklist',
    'drf_standardized_errors',
    'django_structlog',
    'channels',
    'chanx.channels',
    # Local apps
    'accounts', 'assistants', 'chat', 'discussion',
]

# Chanx Configuration
CHANX = {
    "CAMELIZE": True,  # Convert snake_case ↔ camelCase for frontend
    "ASYNCAPI_TITLE": "CHANX AsyncAPI Documentation",
    "ASYNCAPI_DESCRIPTION": "Websocket schema of Chanx",
}

# WebSocket Configuration
ASGI_APPLICATION = "config.asgi.application"
CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels_redis.core.RedisChannelLayer",
        "CONFIG": {"hosts": [REDIS_URL]},
    },
}

# AI Integration
OPENAI_API_KEY = env.str("OPENAI_API_KEY", "")
OPENAI_ORG = env.str("OPENAI_ORG", "")

WebSocket Routing

Modular WebSocket URL Configuration:

from channels.routing import URLRouter

from chanx.channels.routing import include, path, re_path

ws_router = URLRouter(
    [
        # Use ws_include which returns a URLRouter
        path("assistants/", include("assistants.routing")),
        path("discussion/", include("discussion.routing")),
        re_path("chat/", include("chat.routing")),
    ]
)
router = URLRouter(
    [
        re_path("ws/", include(ws_router)),
    ]
)

App-Specific Routing:

# assistants/routing.py
from chanx.channels.routing import re_path

router = URLRouter([
    # UUID-based conversation routing
    re_path(r"(?P<pk>[0-9a-f-]+)/", ConversationAssistantConsumer.as_asgi()),
])

ASGI Application with Security:

# config/asgi.py
application = ProtocolTypeRouter({
    "http": django_asgi_app,
    "websocket": OriginValidator(
        CookieMiddleware(include("config.routing")),
        settings.CORS_ALLOWED_ORIGINS + settings.CSRF_TRUSTED_ORIGINS,
    ),
})

Routing Features:

  • Modular app-based routing with include() patterns

  • UUID path parameters for conversation identification

  • Security middleware with CORS origin validation

  • Cookie middleware for session-based authentication

Database Models

Assistant Conversation Models:

# assistants/models/assistant_conversation.py
class AssistantConversation(models.Model):
    """A conversation thread with the AI assistant."""

    id = models.UUIDField(primary_key=True, default=uuid4, editable=False)
    user = models.ForeignKey(
        settings.AUTH_USER_MODEL,
        on_delete=models.CASCADE,
        null=True,  # Allow anonymous conversations
        blank=True,
    )
    title = models.CharField(max_length=200, blank=True)
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)

    def generate_title_from_first_message(self) -> None:
        """Auto-generate title using AI if not manually set."""
        if not self.title:
            first_message = self.messages.filter(
                message_type=AssistantMessage.MessageType.USER
            ).first()
            # Uses OpenAI to generate a concise title
            generated_title = task_generate_conversation_title(first_message.content)
            self.title = generated_title
            self.save(update_fields=["title"])

class AssistantMessage(models.Model):
    """Individual messages within a conversation."""

    class MessageType(models.TextChoices):
        USER = "user", "User"
        ASSISTANT = "assistant", "Assistant"

    conversation = models.ForeignKey(AssistantConversation, on_delete=models.CASCADE)
    message_type = models.CharField(max_length=20, choices=MessageType.choices)
    content = models.TextField()
    created_at = models.DateTimeField(auto_now_add=True)

Key Model Features:

  • UUID Primary Keys for secure, non-sequential identifiers

  • Anonymous Support with nullable user foreign keys

  • Auto-Generated Titles using AI service integration

  • Structured Message Types with clear user/assistant distinction

Testing

The Django sandbox uses pytest for comprehensive test coverage:

# Run all tests
pytest sandbox_django/

# Run specific app tests
pytest sandbox_django/assistants/tests/

# Run specific test files
pytest sandbox_django/chat/tests/consumers/test_chat_detail_consumer.py

WebSocket Testing with Chanx:

# From chat/tests/consumers/test_chat_detail_consumer.py
from chanx.channels.testing import WebsocketTestCase
from chanx.constants import EVENT_ACTION_COMPLETE

class TestChatDetailConsumer(WebsocketTestCase):
    consumer = ChatDetailConsumer

    def setUp(self) -> None:
        super().setUp()
        # Create test data
        self.group_chat = GroupChat.objects.create(title="Test Group Chat")
        ChatMemberFactory.create(user=self.user, group_chat=self.group_chat)
        self.ws_path = f"/ws/chat/{self.group_chat.pk}/"

    async def test_connect_successfully_and_ping(self) -> None:
        """Test basic connection and ping/pong functionality."""
        await self.auth_communicator.connect()
        await self.auth_communicator.assert_authenticated_status_ok()

        await self.auth_communicator.send_message(PingMessage())
        all_messages = await self.auth_communicator.receive_all_messages()
        assert all_messages == [PongMessage()]

    async def test_notify_member_add_event(self) -> None:
        """Test consumer handles member addition events correctly."""
        await self.auth_communicator.connect()
        await self.auth_communicator.assert_authenticated_status_ok()

        # Send event directly to test consumer's event handling
        await ChatDetailConsumer.broadcast_event(
            NotifyMemberAddedEvent(payload=test_member_payload),
            [f"group_chat.{self.group_chat.pk}"],
        )

        # Verify consumer processed and forwarded the event
        all_messages = await self.auth_communicator.receive_all_messages(
            stop_action=EVENT_ACTION_COMPLETE
        )
        assert len(all_messages) == 1
        message = cast(MemberAddedMessage, all_messages[0])
        assert message.action == "member_added"

Testing Features:

  • pytest Integration with Django test database

  • WebsocketTestCase for consumer testing with authentication

  • Factory Boy for test data generation

  • Event Broadcasting Tests with completion signals

  • Multi-user Testing with separate communicators

  • Async Test Support with proper cleanup

Production Considerations

The Django example demonstrates production-ready patterns:

1. Security

  • CORS configuration for cross-origin WebSocket connections

  • Authentication required for sensitive operations

  • Permission-based access control

2. Scalability

  • Redis channel layer for multi-server deployments

  • Task functions ready for background worker integration (Celery, ARQ, TaskIQ, etc.)

  • Database connection pooling

3. Monitoring

  • Structured logging with request correlation IDs

  • AsyncAPI documentation for API contracts

Learning Path

To understand the Django integration:

  1. Start with the consumer (assistants/consumers/conversation_consumer.py)

  2. Examine message types (assistants/messages/assistant.py)

  3. Review authentication (assistants/permissions.py)

  4. Study the routing (config/routing.py)

  5. Look at background tasks (assistants/tasks/)

  6. Check the tests (assistants/tests/)

This example provides a solid foundation for building production WebSocket applications with Django Channels and Chanx.