Gufo Liftbridge Example: Subscribing
We have mastered the message publishing process
in our publish example. We also
learned about various optimizations from
bulk and compression.
Now it is a time to learn about
receiving messages. To get published messages
we need to subscribe to the stream.
Note
The stream and partition must be created before running
the example, so refer to the Liftbridge Docs
or pass the create example.
subscribe.py |
---|
| import asyncio
from gufo.liftbridge.client import LiftbridgeClient, StartPosition
BROKERS = ["127.0.0.1:9292"]
async def subscribe():
async with LiftbridgeClient(BROKERS) as client:
async for msg in client.subscribe(
"test", partition=0, start_position=StartPosition.EARLIEST
):
print(f"{msg.offset}: {msg.value}")
asyncio.run(subscribe())
|
Let's see the details.
subscribe.py |
---|
| import asyncio
from gufo.liftbridge.client import LiftbridgeClient, StartPosition
BROKERS = ["127.0.0.1:9292"]
async def subscribe():
async with LiftbridgeClient(BROKERS) as client:
async for msg in client.subscribe(
"test", partition=0, start_position=StartPosition.EARLIEST
):
print(f"{msg.offset}: {msg.value}")
asyncio.run(subscribe())
|
Gufo Liftbridge is an async library. In our case
we should run the client from our synchronous script,
so we need to import asyncio
to use asyncio.run()
.
subscribe.py |
---|
| import asyncio
from gufo.liftbridge.client import LiftbridgeClient, StartPosition
BROKERS = ["127.0.0.1:9292"]
async def subscribe():
async with LiftbridgeClient(BROKERS) as client:
async for msg in client.subscribe(
"test", partition=0, start_position=StartPosition.EARLIEST
):
print(f"{msg.offset}: {msg.value}")
asyncio.run(subscribe())
|
The client is implemented as a LiftbridgeClient
class,
which must be imported to be used. We also need the
StartPosition
enum.
subscribe.py |
---|
| import asyncio
from gufo.liftbridge.client import LiftbridgeClient, StartPosition
BROKERS = ["127.0.0.1:9292"]
async def subscribe():
async with LiftbridgeClient(BROKERS) as client:
async for msg in client.subscribe(
"test", partition=0, start_position=StartPosition.EARLIEST
):
print(f"{msg.offset}: {msg.value}")
asyncio.run(subscribe())
|
Liftbridge is the dynamic cluster, synchronized over
Raft protocol. Cluster members may enter and leave and
the client uses one or more cluster members as a bootstrap
to recover an actual topology. These bootstrap
members are called seeds
and are defined as a list
of the strings in the host:port
format. For our
example, we consider the Liftbridge is running
locally at the 127.0.0.1:9292
. Take note, ever we have
one seed, we must define it as a list.
subscribe.py |
---|
| import asyncio
from gufo.liftbridge.client import LiftbridgeClient, StartPosition
BROKERS = ["127.0.0.1:9292"]
async def subscribe():
async with LiftbridgeClient(BROKERS) as client:
async for msg in client.subscribe(
"test", partition=0, start_position=StartPosition.EARLIEST
):
print(f"{msg.offset}: {msg.value}")
asyncio.run(subscribe())
|
All async code must be performed in the async
functions,
so our subscribe()
function is async def
.
subscribe.py |
---|
| import asyncio
from gufo.liftbridge.client import LiftbridgeClient, StartPosition
BROKERS = ["127.0.0.1:9292"]
async def subscribe():
async with LiftbridgeClient(BROKERS) as client:
async for msg in client.subscribe(
"test", partition=0, start_position=StartPosition.EARLIEST
):
print(f"{msg.offset}: {msg.value}")
asyncio.run(subscribe())
|
We need an instance of the client. The instance may be used
directly or operated as an async context manager
with the async with
clause. When used as a context manager,
the client automatically closes all connections on the exit of context,
so its lifetime is defined explicitly. LiftbridgeClient
requires
a list of seeds to connect the cluster, so we passed the BROKERS
list.
The client is highly configurable, refer to the
LiftbridgeClient reference for the detailed
explanations.
subscribe.py |
---|
| import asyncio
from gufo.liftbridge.client import LiftbridgeClient, StartPosition
BROKERS = ["127.0.0.1:9292"]
async def subscribe():
async with LiftbridgeClient(BROKERS) as client:
async for msg in client.subscribe(
"test", partition=0, start_position=StartPosition.EARLIEST
):
print(f"{msg.offset}: {msg.value}")
asyncio.run(subscribe())
|
The subscribe()
method is used to receive the messages. We need to
pass the stream (test
), the partition (0
), and the position
from which to start receiving the messages. In our case, we
use StartPosition.EARLIEST
to receive all the messages
still stored in the stream. To learn about other starting
options refer to StartPosition documentation.
The client implements subscribing as an async iterator, so
the async for
operator is usually used to iterate through.
The result of iteration is the Message structure.
The cycle is endless so it is up to the application to decide
whenever to do the break
.
For additional parameters refer to the subscribe
documentation.
subscribe.py |
---|
| import asyncio
from gufo.liftbridge.client import LiftbridgeClient, StartPosition
BROKERS = ["127.0.0.1:9292"]
async def subscribe():
async with LiftbridgeClient(BROKERS) as client:
async for msg in client.subscribe(
"test", partition=0, start_position=StartPosition.EARLIEST
):
print(f"{msg.offset}: {msg.value}")
asyncio.run(subscribe())
|
All following processing is built around the Message
structure. It consists of several fields. Message body contained
in the value
attribute. The body is the raw bytes
type
and it's up to the application to handle them properly.
We just use print()
to display the message body as well as
the message's sequential number in the partition from the offset
attribute.
subscribe.py |
---|
| import asyncio
from gufo.liftbridge.client import LiftbridgeClient, StartPosition
BROKERS = ["127.0.0.1:9292"]
async def subscribe():
async with LiftbridgeClient(BROKERS) as client:
async for msg in client.subscribe(
"test", partition=0, start_position=StartPosition.EARLIEST
):
print(f"{msg.offset}: {msg.value}")
asyncio.run(subscribe())
|
Use asyncio.run()
function to start our async code.