Core Module
The core module provides the fundamental building blocks for Chanx WebSocket consumers, including decorators, WebSocket handling, and authentication.
Decorators
- chanx.core.decorators.channel(*, name: str | None = None, description: str | None = None, tags: list[str] | None = None) Callable[[_T], _T]
Decorator for WebSocket consumer classes to add AsyncAPI channel metadata.
This decorator marks consumer classes with channel-level information for AsyncAPI documentation generation. The description will be extracted from the class docstring if not provided explicitly.
- Parameters:
name -- Custom channel name (defaults to auto-generated from class name)
description -- Channel description (overrides docstring)
tags -- List of tags for the channel
- chanx.core.decorators.ws_handler(*, action: str | None = None, input_type: type[BaseMessage] | None = None, output_type: type[BaseMessage] | UnionType | list[type[BaseMessage]] | tuple[type[BaseMessage], ...] | None = None, description: str | None = None, summary: str | None = None, tags: list[str] | None = None) Callable[[Callable[[_P], Awaitable[_R]]], Callable[[_P], Awaitable[_R]]]
- chanx.core.decorators.ws_handler(func: Callable[[_P], Awaitable[_R]], *, action: str | None = None, input_type: type[BaseMessage] | None = None, output_type: type[BaseMessage] | UnionType | list[type[BaseMessage]] | tuple[type[BaseMessage], ...] | None = None, description: str | None = None, summary: str | None = None, tags: list[str] | None = None) Callable[[_P], Awaitable[_R]]
Decorator for WebSocket message handlers.
This decorator marks methods as WebSocket message handlers and optionally specifies input/output types for automatic validation and documentation.
- Parameters:
func -- The function to decorate (when used without parentheses)
action -- The action name this handler responds to. If not provided, will use function name.
input_type -- Expected input message type for validation.
output_type -- Expected output message type for documentation.
description -- AsyncAPI description for the operation.
summary -- AsyncAPI summary for the operation.
tags -- AsyncAPI tags for the operation.
- chanx.core.decorators.event_handler(*, input_type: type[BaseMessage] | None = None, output_type: type[BaseMessage] | UnionType | list[type[BaseMessage]] | tuple[type[BaseMessage], ...] | None = None, description: str | None = None, summary: str | None = None, tags: list[str] | None = None) Callable[[Callable[[_P], Awaitable[_R]]], Callable[[_P], Awaitable[_R]]]
- chanx.core.decorators.event_handler(func: Callable[[_P], Awaitable[_R]], *, input_type: type[BaseMessage] | None = None, output_type: type[BaseMessage] | UnionType | list[type[BaseMessage]] | tuple[type[BaseMessage], ...] | None = None, description: str | None = None, summary: str | None = None, tags: list[str] | None = None) Callable[[_P], Awaitable[_R]]
Decorator for channel event handlers.
This decorator marks methods as channel event handlers and uses the method name as the handler identifier. Input types should be BaseMessage subclasses.
- Parameters:
func -- The function to decorate (when used without parentheses)
input_type -- Expected input type (BaseMessage subclass) for validation.
output_type -- Expected output type for documentation.
description -- AsyncAPI description for the operation.
summary -- AsyncAPI summary for the operation.
tags -- AsyncAPI tags for the operation.
WebSocket Consumer
- class chanx.core.websocket.ChanxWebsocketConsumerMixin(*args: Any, **kwargs: Any)
Mixin providing enhanced WebSocket consumer functionality with automatic message routing.
Provides automatic message type discovery from @ws_handler and @event_handler decorators, type-safe message validation using Pydantic discriminated unions, built-in authentication, group broadcasting, channel event system, and comprehensive error handling.
- property all_log_ignored_actions: set[str]
Get the complete set of actions that should be ignored during logging.
Combines instance-specific log_ignored_actions with system-level COMPLETE_ACTIONS to create the full set of actions to exclude from logging.
- Returns:
Set of action names that should not be logged
- async classmethod broadcast_event(event: ReceiveEvent, groups: Collection[str] | str | None = None) None
Broadcast a typed channel event to channel groups.
- Parameters:
event -- The typed event to broadcast (BaseMessage subclass)
groups -- Groups to broadcast the event to
- classmethod broadcast_event_sync(event: ReceiveEvent, groups: Collection[str] | str | None = None) None
Synchronous version of broadcast_event for use in sync contexts.
- Parameters:
event -- The typed event to broadcast (BaseMessage subclass)
groups -- Groups to broadcast to
- async broadcast_message(message: BaseMessage | dict[str, Any], groups: list[str] | None = None, *, exclude_current: bool = False) None
Send a BaseMessage object to one or more channel groups.
Broadcasts a message to all consumers in the specified groups. This is useful for implementing pub/sub patterns where messages need to be distributed to multiple connected clients.
- Parameters:
message -- Message object to send to the groups.
groups -- Group names to send to (defaults to self.groups)
exclude_current -- Whether to exclude the sending consumer from receiving the broadcast (prevents echo effects)
- async handle_channel_event(event_payload: EventPayload) None
Internal dispatcher for typed channel events with completion signal.
This method is called by the channel layer when an event is sent to a group this consumer belongs to. It validates the event data and routes it to the appropriate @event_handler decorated method.
- Parameters:
event_payload -- The message from the channel layer containing event data
- async handle_event_handler_error(error: Exception, action: str, event: BaseMessage) None
Handle errors that occur in event handlers.
- Parameters:
error -- The exception that occurred
action -- The action that was being processed
event -- The event that caused the error
- async handle_group_message(event: GroupMessageEvent) None
Handle incoming group message and relay to client.
Processes group messages from the channel layer, adds metadata (is_current), and forwards to the client. Respects exclude_current flag to prevent echo effects.
- Parameters:
event -- Group message event containing message data and metadata
- async handle_json(content: dict[str, Any], **kwargs: Any) None
Handle received JSON message with validation and routing.
Validates incoming messages using auto-generated discriminated union, routes to appropriate handlers, and sends error responses on failure.
- Parameters:
content -- The JSON content to handle
**kwargs -- Additional keyword arguments
- async handle_json_processing_error(error: Exception) None
Handle general exceptions during JSON message processing.
Subclasses can override this method to customize error responses.
- Parameters:
error -- The exception that occurred
- async handle_message_handler_error(error: Exception, action: str, message: BaseMessage) None
Handle errors that occur in message handlers.
- Parameters:
error -- The exception that occurred
action -- The action that was being handled
message -- The message that caused the error
- async handle_result(result: BaseMessage) None
Process and send the result returned by a message handler.
- Parameters:
result -- The result returned by the handler (BaseMessage)
- async handle_validation_error(error: ValidationError) None
Handle ValidationError exceptions during message processing.
Subclasses can override this method to customize validation error responses.
- Parameters:
error -- The ValidationError that occurred
- async post_authentication() None
Hook for additional actions after successful authentication.
Subclasses can override this method to perform custom actions after a successful authentication.
- async receive_event(event: BaseMessage) None
Process channel events received through the channel layer.
Routes events to @event_handler decorated methods based on action field.
- Parameters:
event -- The validated event object
- async receive_json(content: dict[str, Any], **kwargs: Any) None
Receive and process JSON data from WebSocket.
Logs messages, assigns ID, and creates task for async processing. Also enhances asyncio context with message id and message action.
- Parameters:
content -- The JSON content received from the client
**kwargs -- Additional keyword arguments
- async receive_message(message: BaseMessage) None
Process a validated received message and route it to the appropriate handler.
- Parameters:
message -- The validated message object (BaseMessage instance)
- async classmethod send_event(event: ReceiveEvent, channel_name: str) None
Send a typed channel event to a specific channel.
- Parameters:
event -- The typed event to send (BaseMessage subclass)
channel_name -- Channel name to send the event to
- classmethod send_event_sync(event: ReceiveEvent, channel_name: str) None
Synchronous version of send_event for use in sync contexts.
- Parameters:
event -- The typed event to send (BaseMessage subclass)
channel_name -- Channel name to send to
- async send_json(content: dict[str, Any], close: bool = False) None
Send JSON data to the client.
- Parameters:
content -- The JSON data to send
close -- Whether to close the connection after sending
- async send_message(message: BaseMessage, *, validate: bool = False) None
Send a BaseMessage to the client.
- Parameters:
message -- The BaseMessage instance to send
validate -- Whether to validate the message against the outgoing adapter
- property should_camelize: bool
Determine whether message field names should be camelized.
Checks for instance-level camelize attribute first, then falls back to global configuration setting. When True, snake_case field names are converted to camelCase in outgoing messages.
- Returns:
True if field names should be camelized, False otherwise
- property should_send_completion: bool
Determine whether completion messages should be sent after handling requests.
Checks for instance-level send_completion attribute first, then falls back to global configuration setting.
- Returns:
True if completion messages should be sent, False otherwise
- async websocket_connect(message: Any) None
Handle WebSocket connection request with authentication.
Accepts the connection, authenticates the user, and either adds the user to appropriate groups or closes the connection.
- Parameters:
message -- The connection message from the framework
- async websocket_disconnect(message: Any) None
Handle WebSocket disconnection.
Cleans up context variables and logs the disconnection.
- Parameters:
message -- The disconnection message from the framework
Authenticator
- class chanx.core.authenticator.BaseAuthenticator(send_message: Callable[[BaseMessage], Awaitable[None]])
Abstract base class for WebSocket authentication handlers.
Provides a common interface for implementing custom authentication logic in Chanx WebSocket consumers. Subclasses must implement the authenticate method to define specific authentication rules.
- Parameters:
send_message -- Function to send messages back to the client during authentication
- abstract async authenticate(scope: MutableMapping[str, Any]) bool
Authenticate a WebSocket connection request.
This method must be implemented by subclasses to define the authentication logic. It receives the ASGI scope and should return True for successful authentication or False to reject the connection.
- Parameters:
scope -- ASGI scope dictionary containing connection information, headers, query parameters, and user data
- Returns:
True if authentication succeeds, False if it fails
Configuration
- class chanx.core.config.Config
Configuration class that provides default values and Django integration.
This class defines default configuration values and automatically retrieves settings from Django's chanx_settings when Django is available.
- send_completion
Whether to send completion messages.
- Type:
bool
- send_message_immediately
Whether to send messages immediately.
- Type:
bool
- log_websocket_message
Whether to log websocket received and sent messages.
- Type:
bool
- log_ignored_actions
Collection of action names to ignore in logs.
- Type:
collections.abc.Collection[str]
- camelize
Whether to camelize field names.
- Type:
bool
- discriminator_field
Field name used for message discrimination.
- Type:
str
Registry
- class chanx.core.registry.MessageRegistry
Registry for collecting and managing message types from consumers.
- add(message_type: type[BaseMessage] | list[type[BaseMessage]] | tuple[type[BaseMessage], ...] | UnionType, consumer_name: str) None
Add a message type to the registry, handling simple types, unions, lists, and tuples.
- Parameters:
message_type -- The BaseMessage type, union, list, or tuple to add
consumer_name -- Name of the consumer using this message type
- build_message(message_type: type[BaseMessage], consumer_name: str) None
Build and register a message type in the registry.
Note: This method expects a single BaseMessage type, not a union/list/tuple. Union/list/tuple handling is done in the add() method.
- Parameters:
message_type -- The BaseMessage subclass to register
consumer_name -- Name of the consumer using this message type
- build_message_schema(model_type: type[BaseModel] | UnionType, consumer_name: str) None
Build and register a JSON schema for a BaseModel type.
Processes the model type, extracts field type information, handles unions, and generates proper schema references for AsyncAPI documentation.
- Parameters:
model_type -- The BaseModel type or UnionType to process
consumer_name -- Name of the consumer using this type