Advanced Usage

Connection types

The Bayeux protocol used by CometD is a transport-independent protocol, that can be carried over HTTP or over WebSocket (or other transport protocols), so that an application is not bound to a specific transport technology.

aiocometd supports the LONG_POLLING and WEBSOCKET transports.

When a client connects to a CometD server, a so called handshake operation is executed first using the default transport that all CometD servers should support. Based on the types of transports that the server offers and what the client supports, the client picks one of the transports that it will use to communicate with the server.

By default, if the preferred connection types are not specified when the Client is created, it will use the WEBSOCKET transport if it’s supported by the server or otherwise fall back to using LONG_POLLING.

If you prefer a different ordering then it can be specified when the Client is created:

client = Client("http://example.com/cometd",
                connection_types=[ConnectionType.LONG_POLLING,
                                  ConnectionType.WEBSOCKET])

If there is only a single connection type that you would wan’t your client to accept or fail if it’s not available on the server, then instead of a list specify a single connection type:

client = Client("http://example.com/cometd",
                connection_types=ConnectionType.WEBSOCKET)

Extensions

Extensions allow the modification of a message just after receiving it but before the rest of the message processing takes place, or just before sending it. An extension normally adds fields to the message being sent or received in the ext object that the Bayeux protocol specification defines. An extension is not a way to add business fields to a message, but rather a way to process all messages, including the meta messages the Bayeux protocol uses, and to extend the Bayeux protocol itself.

aiocometd provides abstract base classes for implementing custom extensions using the Extension and AuthExtension classes.

Extension

To create a new extension use the Extension class as the base class:

class MyExtension(Extension):
    async def incoming(payload, headers=None):
        pass

    async def outgoing(payload, headers):
        pass

The incoming message payload, which is a list of messages, is first passed to the incoming() method along with the received headers. The incoming headers might or might not be empty, it depends on the type of transport used, whether it receives headers for responses.

The outgoing payload along with the headers are passed to the outgoing() method before sending.

Custom extension implementation can use these two methods to inspect or alter the messages or headers. The list of extension objects that you would want to use should be passed to the Client.

client = Client("http://example.com/cometd",
                extensions=[MyExtension()])

AuthExtension

The AuthExtension class, which is based on Extension, can be used to implement authentication extensions.

For authentication schemes where the credentials are static it doesn’t makes much sense to use AuthExtension instead of Extension. However for schemes where the credentials can expire (like OAuth, JWT…) authenticate() method can be reimplemented to update those credentials. The authenticate() method is called by the client after an authentication failure.

class MyAuthExtension(AuthExtension):
    async def incoming(payload, headers=None):
        pass

    async def outgoing(payload, headers):
        pass

    async def authenticate():
        # get new JWT

An auth extension should be passed to the client separately from the other extensions.

client = Client("http://example.com/cometd",
                extensions=[MyExtension()]
                auth=MyAuthExtension())

Network failures

When a Client object is opened, it will try to maintain a continuous connection in the background with the server. If any network failures happen while waiting to receive() messages, the client will reconnect to the server transparently, it will resubscribe to the subscribed channels, and continue to wait for incoming messages.

To avoid waiting for a server which went offline permanently, a connection_timeout can be passed to the Client, to limit how many seconds the client object should wait before raising a TransportTimeoutError if it can’t reconnect to the server.

client = Client("http://example.com/cometd",
                connection_timeout=60)

try:
    message = await client.receive()
except TransportTimeoutError:
    print("Connection is lost with the server. "
          "Couldn't reconnect in 60 seconds.")

The defaul value is 10 seconds. If you pass None as the connection_timeout value, then the client will keep on trying indefinitely.

Prefetch and backpressure

When a Client is opened it will start and maintain a connection in the background with the server. It will start to fetch messages from the server as soon as it’s connected, even before receive() is called.

Firstly, prefetching messages has the advantage, that incoming messages will wait in a buffer for users to consume them when receive() is called, without any delay. Secondly, the client has no choice but to accept incoming messages.

The Bayeux protocol is modelled very heavily around long-polling type HTTP transports. Which requires from clients to send periodic requests to the server to simulate a continuous connection, otherwise the server will terminate the session. This makes it impossible to use backpressure, even with the type of transports like WebSocket which would otherwise support it. So the connection can not be suspended if the client can’t keep up with receiving the incoming messages, or otherwise the session will be closed.

To avoid consuming all the available memory by the incoming messages, which are not consumed yet, the number of prefetched messages can be limited with the max_pending_count parameter of the Client. The default value is 100.

client = Client("http://example.com/cometd",
                max_pending_count=42)

The current number of messages waiting to be consumed can be obtained from the Client.pending_count attribute.

JSON encoder/decoder

Besides the standard json module, many third party libraries offer JSON serialization/deserilization functionality. To use a different library for handling JSON data types, you can specify the callable to use for serialization with the json_dumps and the callable for deserialization with the json_loads parameters of the Client.

import ujson

client = Client("http://example.com/cometd",
                json_dumps=ujson.dumps,
                json_loads=ujson.loads)