Skip to content

Gufo Liftbridge Example: Create Stream

The Liftridge's streams and partitions must be created before usage. Let's create the stream via LiftridgeClient.

create.py
import asyncio
from gufo.liftbridge.client import LiftbridgeClient

BROKERS = ["127.0.0.1:9292"]


async def create():
    async with LiftbridgeClient(BROKERS) as client:
        await client.create_stream("test", partitions=1, wait_for_stream=True)


asyncio.run(create())

Let's see the details.

create.py
import asyncio
from gufo.liftbridge.client import LiftbridgeClient

BROKERS = ["127.0.0.1:9292"]


async def create():
    async with LiftbridgeClient(BROKERS) as client:
        await client.create_stream("test", partitions=1, wait_for_stream=True)


asyncio.run(create())

Gufo Liftbridge is an async library. In our case we should run the client from our synchronous script, so we need to import asyncio to use asyncio.run().

create.py
import asyncio
from gufo.liftbridge.client import LiftbridgeClient

BROKERS = ["127.0.0.1:9292"]


async def create():
    async with LiftbridgeClient(BROKERS) as client:
        await client.create_stream("test", partitions=1, wait_for_stream=True)


asyncio.run(create())

The client is implemented as a LiftbridgeClient class, which must be imported to be used.

create.py
import asyncio
from gufo.liftbridge.client import LiftbridgeClient

BROKERS = ["127.0.0.1:9292"]


async def create():
    async with LiftbridgeClient(BROKERS) as client:
        await client.create_stream("test", partitions=1, wait_for_stream=True)


asyncio.run(create())
Liftbridge is the dynamic cluster, synchronized over Raft protocol. Cluster members may enter and leave and the client uses one or more cluster members as a bootstrap to recover an actual topology. These bootstrap members are called seeds and are defined as a list of the strings in the host:port format. For our example, we consider the Liftbridge is running locally at the 127.0.0.1:9292. Take note, ever we have one seed, we must define it as a list.

create.py
import asyncio
from gufo.liftbridge.client import LiftbridgeClient

BROKERS = ["127.0.0.1:9292"]


async def create():
    async with LiftbridgeClient(BROKERS) as client:
        await client.create_stream("test", partitions=1, wait_for_stream=True)


asyncio.run(create())
All async code must be performed in the async functions, so our create() function is async def.

create.py
import asyncio
from gufo.liftbridge.client import LiftbridgeClient

BROKERS = ["127.0.0.1:9292"]


async def create():
    async with LiftbridgeClient(BROKERS) as client:
        await client.create_stream("test", partitions=1, wait_for_stream=True)


asyncio.run(create())

We need an instance of the client. The instance may be used directly or operated as an async context manager with the async with clause. When used as a context manager, the client automatically closes all connections on the exit of context, so its lifetime is defined explicitly. LiftbridgeClient requires a list of seeds to connect the cluster, so we passed the BROKERS list. The client is highly configurable, refer to the LiftbridgeClient reference for the detailed explanations.

create.py
import asyncio
from gufo.liftbridge.client import LiftbridgeClient

BROKERS = ["127.0.0.1:9292"]


async def create():
    async with LiftbridgeClient(BROKERS) as client:
        await client.create_stream("test", partitions=1, wait_for_stream=True)


asyncio.run(create())

We use the create_stream() function to create a stream test with one partition. By default, create_stream() doesn't wait until the partition is replicated via the cluster. The wait_for_stream parameter instructs to wait until the new stream is ready for use. The create_stream() is an async function and must be awaited.

Refer to the create_stream() reference for the explanations.

create.py
import asyncio
from gufo.liftbridge.client import LiftbridgeClient

BROKERS = ["127.0.0.1:9292"]


async def create():
    async with LiftbridgeClient(BROKERS) as client:
        await client.create_stream("test", partitions=1, wait_for_stream=True)


asyncio.run(create())
Use asyncio.run() function to start our async code.