Gufo Liftbridge Example: Create Stream
The Liftridge's streams and partitions
must be created before usage. Let's create
the stream via LiftridgeClient.
create.py |
---|
| import asyncio
from gufo.liftbridge.client import LiftbridgeClient
BROKERS = ["127.0.0.1:9292"]
async def create():
async with LiftbridgeClient(BROKERS) as client:
await client.create_stream("test", partitions=1, wait_for_stream=True)
asyncio.run(create())
|
Let's see the details.
create.py |
---|
| import asyncio
from gufo.liftbridge.client import LiftbridgeClient
BROKERS = ["127.0.0.1:9292"]
async def create():
async with LiftbridgeClient(BROKERS) as client:
await client.create_stream("test", partitions=1, wait_for_stream=True)
asyncio.run(create())
|
Gufo Liftbridge is an async library. In our case
we should run the client from our synchronous script,
so we need to import asyncio
to use asyncio.run()
.
create.py |
---|
| import asyncio
from gufo.liftbridge.client import LiftbridgeClient
BROKERS = ["127.0.0.1:9292"]
async def create():
async with LiftbridgeClient(BROKERS) as client:
await client.create_stream("test", partitions=1, wait_for_stream=True)
asyncio.run(create())
|
The client is implemented as a LiftbridgeClient
class,
which must be imported to be used.
create.py |
---|
| import asyncio
from gufo.liftbridge.client import LiftbridgeClient
BROKERS = ["127.0.0.1:9292"]
async def create():
async with LiftbridgeClient(BROKERS) as client:
await client.create_stream("test", partitions=1, wait_for_stream=True)
asyncio.run(create())
|
Liftbridge is the dynamic cluster, synchronized over
Raft protocol. Cluster members may enter and leave and
the client uses one or more cluster members as a bootstrap
to recover an actual topology. These bootstrap
members are called seeds
and are defined as a list
of the strings in the host:port
format. For our
example, we consider the Liftbridge is running
locally at the 127.0.0.1:9292
. Take note, ever we have
one seed, we must define it as a list.
create.py |
---|
| import asyncio
from gufo.liftbridge.client import LiftbridgeClient
BROKERS = ["127.0.0.1:9292"]
async def create():
async with LiftbridgeClient(BROKERS) as client:
await client.create_stream("test", partitions=1, wait_for_stream=True)
asyncio.run(create())
|
All async code must be performed in the async
functions,
so our create()
function is async def
.
create.py |
---|
| import asyncio
from gufo.liftbridge.client import LiftbridgeClient
BROKERS = ["127.0.0.1:9292"]
async def create():
async with LiftbridgeClient(BROKERS) as client:
await client.create_stream("test", partitions=1, wait_for_stream=True)
asyncio.run(create())
|
We need an instance of the client. The instance may be used
directly or operated as an async context manager
with the async with
clause. When used as a context manager,
the client automatically closes all connections on the exit of context,
so its lifetime is defined explicitly. LiftbridgeClient
requires
a list of seeds to connect the cluster, so we passed the BROKERS
list.
The client is highly configurable, refer to the
LiftbridgeClient reference for the detailed
explanations.
create.py |
---|
| import asyncio
from gufo.liftbridge.client import LiftbridgeClient
BROKERS = ["127.0.0.1:9292"]
async def create():
async with LiftbridgeClient(BROKERS) as client:
await client.create_stream("test", partitions=1, wait_for_stream=True)
asyncio.run(create())
|
We use the create_stream() function to create
a stream test
with one partition. By default, create_stream()
doesn't wait until the partition is replicated via the cluster.
The wait_for_stream
parameter instructs to wait until the new stream
is ready for use. The create_stream() is an async function
and must be awaited.
Refer to the create_stream() reference for the explanations.
create.py |
---|
| import asyncio
from gufo.liftbridge.client import LiftbridgeClient
BROKERS = ["127.0.0.1:9292"]
async def create():
async with LiftbridgeClient(BROKERS) as client:
await client.create_stream("test", partitions=1, wait_for_stream=True)
asyncio.run(create())
|
Use asyncio.run()
function to start our async code.