Events
Events are defined as properties on your API classes. Events are useful when:
- You need asynchronous communication between services
- You wish to loosely-couple your services
- You need a service to perform a task in the background
See event considerations in the explanations section for further discussion.
Events provide at-least-once delivery semantics. Given this, your event handlers should be idempotent.
Defining events¶
You can define events using the Event
class. For example,
you could define the following bus.py in your authenication service:
# auth_service/bus.py
from lightbus import Api, Event
class AuthApi(Api):
user_created = Event(parameters=('username', 'email'))
user_updated = Event(parameters=('username', 'new_email'))
user_deleted = Event(parameters=('username'))
class Meta:
name = 'auth'
Firing events¶
You can fire events as follows:
# Anywhere in your code
# Import your project's bus instance
from bus import bus
bus.auth.user_created.fire(username='adam', password='adam@example.com')
Firing events (asynchronously)¶
You can also fire events asynchronously using asyncio:
# Anywhere in your code
# Import your project's bus instance
from bus import bus
await bus.auth.user_created.fire_async(
username='adam',
password='adam@example.com'
)
Listening for events¶
Listening for events is typically a long-running background
activity, and is therefore dealt with by the lightbus run
command.
You can setup event listeners in your services' bus
module
as follows:
# other_service/bus.py
import lightbus
bus = lightbus.create()
user_db = {}
def handle_created(username, email):
user_db[username] = email
print(user_db)
def handle_updated(username, email):
user_db[username] = email
print(user_db)
def handle_deleted(username, email):
user_db.pop(username)
print(user_db)
@bus.client.on_start()
def on_start():
# Bus client has started up, so register our listeners
bus.auth.user_created.listen(
handle_created,
listener_name="user_created"
)
bus.auth.user_updated.listen(
handle_updated,
listener_name="user_updated",
)
bus.auth.user_deleted.listen(
handle_deleted,
listener_name="user_deleted"
)
Specifying listener_name
is required in order to ensure
each listeners receives all events it is due.
See the events explanation page for further discussion.
You cannot have two listeners with the same name for the same API.
For example, this will not work (it will raise a DuplicateListenerName
exception:
bus.competitor_prices.changed.listen(
send_price_alerts,
listener_name="price_listener",
)
### ERROR ##
# This will raise a DuplicateListenerName exception because
# we have already created a listener named 'price_listener'
# on the 'competitor_prices' API (above).
bus.competitor_prices.changed.listen(
update_prices,
listener_name="price_listener", # ⇠ DuplicateListenerName exception
)
This restriction only applies to listeners within the same service.
Errors in event listeners¶
By default, the Lightbus worker will exit if an event listener encounters an error. The event being handled will remain unacknowledged, and will be re-attempted after acknowledgement_timeout seconds.
You can customise this behaviour by passing the on_error
parameter to listen()
.
Possible values are:
lightbus.OnError.SHUTDOWN
(Default) - Shutdown the Lightbus work upon encountering an errorlightbus.OnError.ACKNOWLEDGE_AND_LOG
(Default) - Acknowledge erroring messages as being successfully processed, and log the error.
For example:
# Taken from the above example
# Worker shutdown on error (the default)
bus.auth.user_created.listen(
handle_created,
listener_name="user_created",
on_error=lightbus.OnError.SHUTDOWN,
)
# Or, log the error and acknowlegde the message as being processed
bus.auth.user_created.listen(
handle_created,
listener_name="user_created",
on_error=lightbus.OnError.ACKNOWLEDGE_AND_LOG,
)
Listening for events (asynchronous handlers)¶
Event handlers may also be asynchronous. For example:
import lightbus
import asyncio
bus = lightbus.create()
# Asynchronous handles can be used too
async def handle_created(username, email):
await asyncio.sleep(0.1)
print(f"Username {username} created")
@bus.client.on_start()
def on_start():
# Bus client has started up, so register our listeners
bus.auth.user_created.listen(
handle_created,
listener_name="user_created"
)
A note on threads
Asynchronous handlers will be executed in the same thread as Lightbus, whereas synchronous handlers will be executed in their own thread.
Type hints¶
See the typing reference.