Events
Events are defined as properties on your API classes. Events are useful when:
- You need asynchronous communication between services
- You wish to loosely-couple your services
- You need a service to perform a task in the background
See event considerations in the explanations section for further discussion.
Events provide at-least-once delivery semantics. Given this, your event handlers should be idempotent.
Defining events¶
You can define events using the Event
class. For example,
you could define the following bus.py in your authenication service:
# auth_service/bus.py from lightbus import Api, Event class AuthApi(Api): user_created = Event(parameters=('username', 'email')) user_updated = Event(parameters=('username', 'new_email')) user_deleted = Event(parameters=('username')) class Meta: name = 'auth'
Firing events¶
You can fire events as follows:
# Anywhere in your code # Import your project's bus instance from bus import bus bus.auth.user_created.fire(username='adam', password='adam@example.com')
Firing events (asynchronously)¶
You can also fire events asynchronously using asyncio:
# Anywhere in your code # Import your project's bus instance from bus import bus await bus.auth.user_created.fire_async( username='adam', password='adam@example.com' )
Listening for events¶
Listening for events is typically a long-running background
activity, and is therefore dealt with by the lightbus run
command.
You can setup event listeners in your services' bus
module
as follows:
# other_service/bus.py import lightbus bus = lightbus.create() user_db = {} def handle_created(username, email): user_db[username] = email print(user_db) def handle_updated(username, email): user_db[username] = email print(user_db) def handle_deleted(username, email): user_db.pop(username) print(user_db) @bus.client.on_start() def on_start(): # Bus client has started up, so register our listeners bus.auth.user_created.listen( handle_created, listener_name="user_created" ) bus.auth.user_updated.listen( handle_updated, listener_name="user_updated", ) bus.auth.user_deleted.listen( handle_deleted, listener_name="user_deleted" )
Specifying listener_name
is required in order to ensure
each listeners receives all events it is due.
See the events explanation page for further discussion.
You cannot have two listeners with the same name for the same API.
For example, this will not work (it will raise a DuplicateListenerName
exception:
bus.competitor_prices.changed.listen( send_price_alerts, listener_name="price_listener", ) ### ERROR ## # This will raise a DuplicateListenerName exception because # we have already created a listener named 'price_listener' # on the 'competitor_prices' API (above). bus.competitor_prices.changed.listen( update_prices, listener_name="price_listener", # ⇠ DuplicateListenerName exception )
This restriction only applies to listeners within the same service.
Errors in event listeners¶
By default, the Lightbus worker will exit if an event listener encounters an error. The event being handled will remain unacknowledged, and will be re-attempted after acknowledgement_timeout seconds.
You can customise this behaviour by passing the on_error
parameter to listen()
.
Possible values are:
lightbus.OnError.SHUTDOWN
(Default) - Shutdown the Lightbus work upon encountering an errorlightbus.OnError.ACKNOWLEDGE_AND_LOG
(Default) - Acknowledge erroring messages as being successfully processed, and log the error.
For example:
# Taken from the above example # Worker shutdown on error (the default) bus.auth.user_created.listen( handle_created, listener_name="user_created", on_error=lightbus.OnError.SHUTDOWN, ) # Or, log the error and acknowlegde the message as being processed bus.auth.user_created.listen( handle_created, listener_name="user_created", on_error=lightbus.OnError.ACKNOWLEDGE_AND_LOG, )
Type hints¶
See the typing reference.