Event protocol (Redis)¶
Here we document the specific interactions between Lightbus and Redis.
The concrete implementation of this is provided by the RedisEventTransport
class.
Before reading you should be familiar with Lightbus' data marshalling.
This documentation may be useful when debugging, developing third-party client libraries, or simply for general interest and review. You do not need to be aware of this protocol in order to use Lightbus.
Sending events¶
The following command will send an event:
XADD {stream_name} [MAXLEN ~ {max_stream_length}] * {field_name_1} {field_value_1} {field_name_2} {field_value_2}...
{stream_name}
¶
The {stream_name}
value is composed in one of the following ways:
- One stream per event:
{api_name}.{event_name}
- One stream per API:
{api_name}.*
MAXLEN
¶
The MAXLEN ~ {max_stream_length}
component is optional, but will be used by Lightbus to
limit the stream to the approximate configured length.
Fields¶
Field names are strings. Field values are JSON-encoded strings.
The following metadata fields must be sent:
:id
- A unique message ID:api_name
- The API name for this event:event_name
- The name of this event:version
– The version of this event (1
is a sensible default value)
Note that metadata fields are prefixed by a colon. User-specified fields should not include this colon.
Lightbus does not currently provide specific functionality around the version
field, but the field is available to developers via the EventMessage
class.
Lightbus may implement event functionality around event versions in future
(such as event migrations).
Consuming events¶
Consuming events involves an initial setup stage in which we check for any events which this process has consumed yet failed to process (for example, due to an error, hardware failure, network problem, etc).
We perform this initial check for events as follows:
XREAD_GROUP {group_name} {consumer_name} {stream_name} 0
The 0
above indicates we wish to receive un-acknowledged events for this consumer
(i.e this Lightbus process).
Once we have received and processed any of these events, we can retrieve further events as follows:
XREAD_GROUP {group_name} {consumer_name} {stream} >
{group_name}
¶
The {group_name}
value is comprised of the service name and listener name as follows:
{service_name}-{listener_name}
.
{consumer_name}
¶
The {consumer_name}
is set to the process name of the Lightbus process.
{stream_name}
¶
The stream name, as described above in sending events.
Reclaiming timed-out events¶
Events can be considered timed if another Lightbus process has held onto them for too long. Any client consuming events should check for these from time to time.
Timed out events can be claimed as follows:
# Get pending messages
XPENDING {stream} {group_name} - + {batch_size}
# For each message try to claim it
XCLAIM {stream} {group_name} {timeout}
# If successful, process the event. Otherwise ignore it
{batch_size}
¶
How many pending messages to fetch in each batch
{timeout}
¶
Timeout in milliseconds. This is the maximum time a Lightbus process will have to process an event before another processes assumes it has failed and takes over.
Encoding & Customisation¶
See also: data marshalling
By default field values are serialised using JSON. This can be
This encoding is customisable within the Lightbus configuration. You are welcome to use something custom here, but be aware that:
- A single API must have a single encoding for all events on that API
- All clients accessing the bus must be configured to use the same custom encoding
Data validation¶
Validation of outgoing and incoming events is optional. However, validation of outgoing events is recommended as sending of event messages which fail validation may result in the message being rejected by any consumer.
This validation can be performed using the using the schema available through the schema protocol.
Data deformation & casting¶
The Lightbus client provides Python-specific functionality to streamline the process of moving between Python data structures and interoperable JSON data structures.
The level of functionality required in this regard is not specified here, and is left up to individual library developers.