Core Module

The core module provides the fundamental building blocks for Chanx WebSocket consumers, including decorators, WebSocket handling, and authentication.

Decorators

chanx.core.decorators.channel(*, name: str | None = None, description: str | None = None, tags: list[str] | None = None) Callable[[_T], _T]

Decorator for WebSocket consumer classes to add AsyncAPI channel metadata.

This decorator marks consumer classes with channel-level information for AsyncAPI documentation generation. The description will be extracted from the class docstring if not provided explicitly.

Parameters:
  • name -- Custom channel name (defaults to auto-generated from class name)

  • description -- Channel description (overrides docstring)

  • tags -- List of tags for the channel

chanx.core.decorators.ws_handler(*, action: str | None = None, input_type: type[BaseMessage] | None = None, output_type: type[BaseMessage] | UnionType | list[type[BaseMessage]] | tuple[type[BaseMessage], ...] | None = None, description: str | None = None, summary: str | None = None, tags: list[str] | None = None) Callable[[Callable[[_P], Awaitable[_R]]], Callable[[_P], Awaitable[_R]]]
chanx.core.decorators.ws_handler(func: Callable[[_P], Awaitable[_R]], *, action: str | None = None, input_type: type[BaseMessage] | None = None, output_type: type[BaseMessage] | UnionType | list[type[BaseMessage]] | tuple[type[BaseMessage], ...] | None = None, description: str | None = None, summary: str | None = None, tags: list[str] | None = None) Callable[[_P], Awaitable[_R]]

Decorator for WebSocket message handlers.

This decorator marks methods as WebSocket message handlers and optionally specifies input/output types for automatic validation and documentation.

Parameters:
  • func -- The function to decorate (when used without parentheses)

  • action -- The action name this handler responds to. If not provided, will use function name.

  • input_type -- Expected input message type for validation.

  • output_type -- Expected output message type for documentation.

  • description -- AsyncAPI description for the operation.

  • summary -- AsyncAPI summary for the operation.

  • tags -- AsyncAPI tags for the operation.

chanx.core.decorators.event_handler(*, input_type: type[BaseMessage] | None = None, output_type: type[BaseMessage] | UnionType | list[type[BaseMessage]] | tuple[type[BaseMessage], ...] | None = None, description: str | None = None, summary: str | None = None, tags: list[str] | None = None) Callable[[Callable[[_P], Awaitable[_R]]], Callable[[_P], Awaitable[_R]]]
chanx.core.decorators.event_handler(func: Callable[[_P], Awaitable[_R]], *, input_type: type[BaseMessage] | None = None, output_type: type[BaseMessage] | UnionType | list[type[BaseMessage]] | tuple[type[BaseMessage], ...] | None = None, description: str | None = None, summary: str | None = None, tags: list[str] | None = None) Callable[[_P], Awaitable[_R]]

Decorator for channel event handlers.

This decorator marks methods as channel event handlers and uses the method name as the handler identifier. Input types should be BaseMessage subclasses.

Parameters:
  • func -- The function to decorate (when used without parentheses)

  • input_type -- Expected input type (BaseMessage subclass) for validation.

  • output_type -- Expected output type for documentation.

  • description -- AsyncAPI description for the operation.

  • summary -- AsyncAPI summary for the operation.

  • tags -- AsyncAPI tags for the operation.

WebSocket Consumer

class chanx.core.websocket.ChanxWebsocketConsumerMixin(*args: Any, **kwargs: Any)

Mixin providing enhanced WebSocket consumer functionality with automatic message routing.

Provides automatic message type discovery from @ws_handler and @event_handler decorators, type-safe message validation using Pydantic discriminated unions, built-in authentication, group broadcasting, channel event system, and comprehensive error handling.

property all_log_ignored_actions: set[str]

Get the complete set of actions that should be ignored during logging.

Combines instance-specific log_ignored_actions with system-level COMPLETE_ACTIONS to create the full set of actions to exclude from logging.

Returns:

Set of action names that should not be logged

async classmethod broadcast_event(event: ReceiveEvent, groups: Collection[str] | str | None = None) None

Broadcast a typed channel event to channel groups.

Parameters:
  • event -- The typed event to broadcast (BaseMessage subclass)

  • groups -- Groups to broadcast the event to

classmethod broadcast_event_sync(event: ReceiveEvent, groups: Collection[str] | str | None = None) None

Synchronous version of broadcast_event for use in sync contexts.

Parameters:
  • event -- The typed event to broadcast (BaseMessage subclass)

  • groups -- Groups to broadcast to

async broadcast_message(message: BaseMessage | dict[str, Any], groups: list[str] | None = None, *, exclude_current: bool = False) None

Send a BaseMessage object to one or more channel groups.

Broadcasts a message to all consumers in the specified groups. This is useful for implementing pub/sub patterns where messages need to be distributed to multiple connected clients.

Parameters:
  • message -- Message object to send to the groups.

  • groups -- Group names to send to (defaults to self.groups)

  • exclude_current -- Whether to exclude the sending consumer from receiving the broadcast (prevents echo effects)

async handle_channel_event(event_payload: EventPayload) None

Internal dispatcher for typed channel events with completion signal.

This method is called by the channel layer when an event is sent to a group this consumer belongs to. It validates the event data and routes it to the appropriate @event_handler decorated method.

Parameters:

event_payload -- The message from the channel layer containing event data

async handle_event_handler_error(error: Exception, action: str, event: BaseMessage) None

Handle errors that occur in event handlers.

Parameters:
  • error -- The exception that occurred

  • action -- The action that was being processed

  • event -- The event that caused the error

async handle_group_message(event: GroupMessageEvent) None

Handle incoming group message and relay to client.

Processes group messages from the channel layer, adds metadata (is_current), and forwards to the client. Respects exclude_current flag to prevent echo effects.

Parameters:

event -- Group message event containing message data and metadata

async handle_json(content: dict[str, Any], **kwargs: Any) None

Handle received JSON message with validation and routing.

Validates incoming messages using auto-generated discriminated union, routes to appropriate handlers, and sends error responses on failure.

Parameters:
  • content -- The JSON content to handle

  • **kwargs -- Additional keyword arguments

async handle_json_processing_error(error: Exception) None

Handle general exceptions during JSON message processing.

Subclasses can override this method to customize error responses.

Parameters:

error -- The exception that occurred

async handle_message_handler_error(error: Exception, action: str, message: BaseMessage) None

Handle errors that occur in message handlers.

Parameters:
  • error -- The exception that occurred

  • action -- The action that was being handled

  • message -- The message that caused the error

async handle_result(result: BaseMessage) None

Process and send the result returned by a message handler.

Parameters:

result -- The result returned by the handler (BaseMessage)

async handle_validation_error(error: ValidationError) None

Handle ValidationError exceptions during message processing.

Subclasses can override this method to customize validation error responses.

Parameters:

error -- The ValidationError that occurred

async post_authentication() None

Hook for additional actions after successful authentication.

Subclasses can override this method to perform custom actions after a successful authentication.

async receive_event(event: BaseMessage) None

Process channel events received through the channel layer.

Routes events to @event_handler decorated methods based on action field.

Parameters:

event -- The validated event object

async receive_json(content: dict[str, Any], **kwargs: Any) None

Receive and process JSON data from WebSocket.

Logs messages, assigns ID, and creates task for async processing. Also enhances asyncio context with message id and message action.

Parameters:
  • content -- The JSON content received from the client

  • **kwargs -- Additional keyword arguments

async receive_message(message: BaseMessage) None

Process a validated received message and route it to the appropriate handler.

Parameters:

message -- The validated message object (BaseMessage instance)

async classmethod send_event(event: ReceiveEvent, channel_name: str) None

Send a typed channel event to a specific channel.

Parameters:
  • event -- The typed event to send (BaseMessage subclass)

  • channel_name -- Channel name to send the event to

classmethod send_event_sync(event: ReceiveEvent, channel_name: str) None

Synchronous version of send_event for use in sync contexts.

Parameters:
  • event -- The typed event to send (BaseMessage subclass)

  • channel_name -- Channel name to send to

async send_json(content: dict[str, Any], close: bool = False) None

Send JSON data to the client.

Parameters:
  • content -- The JSON data to send

  • close -- Whether to close the connection after sending

async send_message(message: BaseMessage, *, validate: bool = False) None

Send a BaseMessage to the client.

Parameters:
  • message -- The BaseMessage instance to send

  • validate -- Whether to validate the message against the outgoing adapter

property should_camelize: bool

Determine whether message field names should be camelized.

Checks for instance-level camelize attribute first, then falls back to global configuration setting. When True, snake_case field names are converted to camelCase in outgoing messages.

Returns:

True if field names should be camelized, False otherwise

property should_send_completion: bool

Determine whether completion messages should be sent after handling requests.

Checks for instance-level send_completion attribute first, then falls back to global configuration setting.

Returns:

True if completion messages should be sent, False otherwise

async websocket_connect(message: Any) None

Handle WebSocket connection request with authentication.

Accepts the connection, authenticates the user, and either adds the user to appropriate groups or closes the connection.

Parameters:

message -- The connection message from the framework

async websocket_disconnect(message: Any) None

Handle WebSocket disconnection.

Cleans up context variables and logs the disconnection.

Parameters:

message -- The disconnection message from the framework

Authenticator

class chanx.core.authenticator.BaseAuthenticator(send_message: Callable[[BaseMessage], Awaitable[None]])

Abstract base class for WebSocket authentication handlers.

Provides a common interface for implementing custom authentication logic in Chanx WebSocket consumers. Subclasses must implement the authenticate method to define specific authentication rules.

Parameters:

send_message -- Function to send messages back to the client during authentication

abstract async authenticate(scope: MutableMapping[str, Any]) bool

Authenticate a WebSocket connection request.

This method must be implemented by subclasses to define the authentication logic. It receives the ASGI scope and should return True for successful authentication or False to reject the connection.

Parameters:

scope -- ASGI scope dictionary containing connection information, headers, query parameters, and user data

Returns:

True if authentication succeeds, False if it fails

Configuration

class chanx.core.config.Config

Configuration class that provides default values and Django integration.

This class defines default configuration values and automatically retrieves settings from Django's chanx_settings when Django is available.

send_completion

Whether to send completion messages.

Type:

bool

send_message_immediately

Whether to send messages immediately.

Type:

bool

log_websocket_message

Whether to log websocket received and sent messages.

Type:

bool

log_ignored_actions

Collection of action names to ignore in logs.

Type:

collections.abc.Collection[str]

camelize

Whether to camelize field names.

Type:

bool

discriminator_field

Field name used for message discrimination.

Type:

str

Registry

class chanx.core.registry.MessageRegistry

Registry for collecting and managing message types from consumers.

add(message_type: type[BaseMessage] | list[type[BaseMessage]] | tuple[type[BaseMessage], ...] | UnionType, consumer_name: str) None

Add a message type to the registry, handling simple types, unions, lists, and tuples.

Parameters:
  • message_type -- The BaseMessage type, union, list, or tuple to add

  • consumer_name -- Name of the consumer using this message type

build_message(message_type: type[BaseMessage], consumer_name: str) None

Build and register a message type in the registry.

Note: This method expects a single BaseMessage type, not a union/list/tuple. Union/list/tuple handling is done in the add() method.

Parameters:
  • message_type -- The BaseMessage subclass to register

  • consumer_name -- Name of the consumer using this message type

build_message_schema(model_type: type[BaseModel] | UnionType, consumer_name: str) None

Build and register a JSON schema for a BaseModel type.

Processes the model type, extracts field type information, handles unions, and generates proper schema references for AsyncAPI documentation.

Parameters:
  • model_type -- The BaseModel type or UnionType to process

  • consumer_name -- Name of the consumer using this type