We have mastered the message publishing
in our publish example. Synchronous
publishing is a decent choice while the publishing
rate remains moderate. But its performance
is limited by the round-trip time (RTT).
If the server responds in 1ms, you will have
an overall limit of 1000 requests per second
for a single publisher. Bulk publishing overrides
an RTT limitation by separating request generation,
sending, and acknowledgment.
Note
The stream and partition must be created before running
the example, so refer to the Liftbridge Docs
or pass the create example.
Gufo Liftbridge is an async library. In our case
we should run the client from our synchronous script,
so we need to import asyncio to use asyncio.run().
Liftbridge is the dynamic cluster, synchronized over
Raft protocol. Cluster members may enter and leave and
the client uses one or more cluster members as a bootstrap
to recover an actual topology. These bootstrap
members are called seeds and are defined as a list
of the strings in the host:port format. For our
example, we consider the Liftbridge is running
locally at the 127.0.0.1:9292. Take note, ever we have
one seed, we must define it as a list.
We need an instance of the client. The instance may be used
directly or operated as an async context manager
with the async with clause. When used as a context manager,
the client automatically closes all connections on the exit of context,
so its lifetime is defined explicitly. LiftbridgeClient requires
a list of seeds to connect the cluster, so we passed the BROKERS list.
The client is highly configurable, refer to the
LiftbridgeClient reference for the detailed
explanations.
We have to prepare requests for bulk publishing. In our example
we use the list comprehensions to build the list of 10 requests.
Though we use list for our example, its possible to use any
types of iterables, like generator functions and so on.
Requests are created by get_publish_request() method.
It accepts same parameters as the publish() method, but
instead of immediate publishing returns the requests
which can be passed to publish_bulk() later.
The client does not enforce any specific data format and leaves
it to the application. The only requirement is to pass the message
value as the bytes type. In our example, we construct the string
as the message and we have to encode it to the bytes manually.
Then we must specify the stream and partition to publish, we use
partition 0 of stream test in our example. It is possible
to publish into diffent streams and partition in the one
publish_bulk() call.
We use publish_bulk() method to send our prepared bulk of requests.
The method is an asynchronous iterator, so it must be used with
async for directive. It sends our bulk in one or more batch
and yield the Ack for each request send. If the wait parameter
is not set, method returns just after sending the last request
and not waits for remaining acknowledgment which are in-flight.
So the amount of for cycle iterations may be less than lenght
of the bulk. This scenario is suitable for fire-and-forget tasks.
It is up to application to control the acknowledgments and
to deal the failures. In our example we just print them.