Skip to content

Gufo Liftbridge Example: Subscribing with Cursor

We have mastered the message subscription process in our subscribe example. We processed all the messages still stored in the partition. But what to do if the subscriber is restartable and we need to start from the first unprocessed message? Surely, we need to save the current position somewhere. It's up to the application where to store the position. In our example, we will use Liftbridge's cursors, the dedicated position storage just inside the Liftbridge database.

Note

The stream and partition must be created before running the example, so refer to the Liftbridge Docs or pass the create example.

subcursor.py
import asyncio
from gufo.liftbridge.client import LiftbridgeClient, StartPosition

BROKERS = ["127.0.0.1:9292"]
CURSOR_ID = "test_cursor"


async def subscribe():
    async with LiftbridgeClient(BROKERS) as client:
        async for msg in client.subscribe(
            "test",
            partition=0,
            start_position=StartPosition.RESUME,
            cursor_id=CURSOR_ID,
        ):
            print(f"{msg.offset}: {msg.value}")
            await client.set_cursor(
                "test", partition=0, cursor_id=CURSOR_ID, offset=msg.offset
            )


asyncio.run(subscribe())

Let's see the details.

subcursor.py
import asyncio
from gufo.liftbridge.client import LiftbridgeClient, StartPosition

BROKERS = ["127.0.0.1:9292"]
CURSOR_ID = "test_cursor"


async def subscribe():
    async with LiftbridgeClient(BROKERS) as client:
        async for msg in client.subscribe(
            "test",
            partition=0,
            start_position=StartPosition.RESUME,
            cursor_id=CURSOR_ID,
        ):
            print(f"{msg.offset}: {msg.value}")
            await client.set_cursor(
                "test", partition=0, cursor_id=CURSOR_ID, offset=msg.offset
            )


asyncio.run(subscribe())

Gufo Liftbridge is an async library. In our case we should run the client from our synchronous script, so we need to import asyncio to use asyncio.run().

subcursor.py
import asyncio
from gufo.liftbridge.client import LiftbridgeClient, StartPosition

BROKERS = ["127.0.0.1:9292"]
CURSOR_ID = "test_cursor"


async def subscribe():
    async with LiftbridgeClient(BROKERS) as client:
        async for msg in client.subscribe(
            "test",
            partition=0,
            start_position=StartPosition.RESUME,
            cursor_id=CURSOR_ID,
        ):
            print(f"{msg.offset}: {msg.value}")
            await client.set_cursor(
                "test", partition=0, cursor_id=CURSOR_ID, offset=msg.offset
            )


asyncio.run(subscribe())

The client is implemented as a LiftbridgeClient class, which must be imported to be used. We also need the StartPosition enum.

subcursor.py
import asyncio
from gufo.liftbridge.client import LiftbridgeClient, StartPosition

BROKERS = ["127.0.0.1:9292"]
CURSOR_ID = "test_cursor"


async def subscribe():
    async with LiftbridgeClient(BROKERS) as client:
        async for msg in client.subscribe(
            "test",
            partition=0,
            start_position=StartPosition.RESUME,
            cursor_id=CURSOR_ID,
        ):
            print(f"{msg.offset}: {msg.value}")
            await client.set_cursor(
                "test", partition=0, cursor_id=CURSOR_ID, offset=msg.offset
            )


asyncio.run(subscribe())
Liftbridge is the dynamic cluster, synchronized over Raft protocol. Cluster members may enter and leave and the client uses one or more cluster members as a bootstrap to recover an actual topology. These bootstrap members are called seeds and are defined as a list of the strings in the host:port format. For our example, we consider the Liftbridge is running locally at the 127.0.0.1:9292. Take note, ever we have one seed, we must define it as a list.

subcursor.py
import asyncio
from gufo.liftbridge.client import LiftbridgeClient, StartPosition

BROKERS = ["127.0.0.1:9292"]
CURSOR_ID = "test_cursor"


async def subscribe():
    async with LiftbridgeClient(BROKERS) as client:
        async for msg in client.subscribe(
            "test",
            partition=0,
            start_position=StartPosition.RESUME,
            cursor_id=CURSOR_ID,
        ):
            print(f"{msg.offset}: {msg.value}")
            await client.set_cursor(
                "test", partition=0, cursor_id=CURSOR_ID, offset=msg.offset
            )


asyncio.run(subscribe())

Various subscribers may process the same partition in the same time, so multiple cursors on partition may exist. Each cursor has its own id. We use test_cursor for our example.

subcursor.py
import asyncio
from gufo.liftbridge.client import LiftbridgeClient, StartPosition

BROKERS = ["127.0.0.1:9292"]
CURSOR_ID = "test_cursor"


async def subscribe():
    async with LiftbridgeClient(BROKERS) as client:
        async for msg in client.subscribe(
            "test",
            partition=0,
            start_position=StartPosition.RESUME,
            cursor_id=CURSOR_ID,
        ):
            print(f"{msg.offset}: {msg.value}")
            await client.set_cursor(
                "test", partition=0, cursor_id=CURSOR_ID, offset=msg.offset
            )


asyncio.run(subscribe())
All async code must be performed in the async functions, so our subscribe() function is async def.

subcursor.py
import asyncio
from gufo.liftbridge.client import LiftbridgeClient, StartPosition

BROKERS = ["127.0.0.1:9292"]
CURSOR_ID = "test_cursor"


async def subscribe():
    async with LiftbridgeClient(BROKERS) as client:
        async for msg in client.subscribe(
            "test",
            partition=0,
            start_position=StartPosition.RESUME,
            cursor_id=CURSOR_ID,
        ):
            print(f"{msg.offset}: {msg.value}")
            await client.set_cursor(
                "test", partition=0, cursor_id=CURSOR_ID, offset=msg.offset
            )


asyncio.run(subscribe())

We need an instance of the client. The instance may be used directly or operated as an async context manager with the async with clause. When used as a context manager, the client automatically closes all connections on the exit of context, so its lifetime is defined explicitly. LiftbridgeClient requires a list of seeds to connect the cluster, so we passed the BROKERS list. The client is highly configurable, refer to the LiftbridgeClient reference for the detailed explanations.

subcursor.py
import asyncio
from gufo.liftbridge.client import LiftbridgeClient, StartPosition

BROKERS = ["127.0.0.1:9292"]
CURSOR_ID = "test_cursor"


async def subscribe():
    async with LiftbridgeClient(BROKERS) as client:
        async for msg in client.subscribe(
            "test",
            partition=0,
            start_position=StartPosition.RESUME,
            cursor_id=CURSOR_ID,
        ):
            print(f"{msg.offset}: {msg.value}")
            await client.set_cursor(
                "test", partition=0, cursor_id=CURSOR_ID, offset=msg.offset
            )


asyncio.run(subscribe())

The subscribe() method is used to receive the messages. We need to pass the stream (test), the partition (0), and the position from which to start receiving the messages. In our case, we use StartPosition.RESUMEE to resume the last position, stored in the cursor. The cursor's id must be passed as the cursor_id parameter.

The client implements subscribing as an async iterator, so the async for operator is usually used to iterate through. The result of iteration is the Message structure. The cycle is endless so it is up to the application to decide whenever to do the break.

For additional parameters refer to the subscribe documentation.

subcursor.py
import asyncio
from gufo.liftbridge.client import LiftbridgeClient, StartPosition

BROKERS = ["127.0.0.1:9292"]
CURSOR_ID = "test_cursor"


async def subscribe():
    async with LiftbridgeClient(BROKERS) as client:
        async for msg in client.subscribe(
            "test",
            partition=0,
            start_position=StartPosition.RESUME,
            cursor_id=CURSOR_ID,
        ):
            print(f"{msg.offset}: {msg.value}")
            await client.set_cursor(
                "test", partition=0, cursor_id=CURSOR_ID, offset=msg.offset
            )


asyncio.run(subscribe())

All following processing is built around the Message structure. It consists of several fields. Message body contained in the value attribute. The body is the raw bytes type and it's up to the application to handle them properly. We just use print() to display the message body as well as the message's sequential number in the partition from the offset attribute.

subcursor.py
import asyncio
from gufo.liftbridge.client import LiftbridgeClient, StartPosition

BROKERS = ["127.0.0.1:9292"]
CURSOR_ID = "test_cursor"


async def subscribe():
    async with LiftbridgeClient(BROKERS) as client:
        async for msg in client.subscribe(
            "test",
            partition=0,
            start_position=StartPosition.RESUME,
            cursor_id=CURSOR_ID,
        ):
            print(f"{msg.offset}: {msg.value}")
            await client.set_cursor(
                "test", partition=0, cursor_id=CURSOR_ID, offset=msg.offset
            )


asyncio.run(subscribe())

After we processed the message we must store the message's offset into the cursor. set_cursor accepts the following parameters:

  • stream: The stream name.
  • partition: The partition.
  • cursor_id: The cursor's id.
  • offset: The current message offset.

The function is asynchronous and must be awaited.

subcursor.py
import asyncio
from gufo.liftbridge.client import LiftbridgeClient, StartPosition

BROKERS = ["127.0.0.1:9292"]
CURSOR_ID = "test_cursor"


async def subscribe():
    async with LiftbridgeClient(BROKERS) as client:
        async for msg in client.subscribe(
            "test",
            partition=0,
            start_position=StartPosition.RESUME,
            cursor_id=CURSOR_ID,
        ):
            print(f"{msg.offset}: {msg.value}")
            await client.set_cursor(
                "test", partition=0, cursor_id=CURSOR_ID, offset=msg.offset
            )


asyncio.run(subscribe())
Use asyncio.run() function to start our async code.