Gufo Liftbridge Example: Transparent Compression
We have mastered message publishing either in
the one-by-one (publish) or
in the bulk (bulk) approaches.
The bulk approach allows reaching much higher publishing
rate, but what to do if your messages are large?
Then we can hit the network bandwidth limitation.
Gufo Liftbridge offers transparent message
compression. It compresses the message before
publishing and decompresses on receiving.
Note
Transparent message compression is the non-standard
Gufo Liftbridge feature. It may not be compatible
with other clients unless you manage it
manually. Use it only when you are sure that publishers
and subscribers are always using Gufo Liftbridge.
Note
The stream and partition must be created before running
the example, so refer to the Liftbridge Docs
or pass the create example.
compression.py |
---|
| import asyncio
from gufo.liftbridge.client import LiftbridgeClient
BROKERS = ["127.0.0.1:9292"]
async def publish():
async with LiftbridgeClient(
BROKERS, compression_method="lzma", compression_threshold=0
) as client:
for i in range(10):
await client.publish(
f"msg{i}".encode("utf-8"),
stream="test",
partition=0,
auto_compress=True,
)
asyncio.run(publish())
|
Let's see the details.
compression.py |
---|
| import asyncio
from gufo.liftbridge.client import LiftbridgeClient
BROKERS = ["127.0.0.1:9292"]
async def publish():
async with LiftbridgeClient(
BROKERS, compression_method="lzma", compression_threshold=0
) as client:
for i in range(10):
await client.publish(
f"msg{i}".encode("utf-8"),
stream="test",
partition=0,
auto_compress=True,
)
asyncio.run(publish())
|
Gufo Liftbridge is an async library. In our case
we should run the client from our synchronous script,
so we need to import asyncio
to use asyncio.run()
.
compression.py |
---|
| import asyncio
from gufo.liftbridge.client import LiftbridgeClient
BROKERS = ["127.0.0.1:9292"]
async def publish():
async with LiftbridgeClient(
BROKERS, compression_method="lzma", compression_threshold=0
) as client:
for i in range(10):
await client.publish(
f"msg{i}".encode("utf-8"),
stream="test",
partition=0,
auto_compress=True,
)
asyncio.run(publish())
|
The client is implemented as a LiftbridgeClient
class,
which must be imported to be used.
compression.py |
---|
| import asyncio
from gufo.liftbridge.client import LiftbridgeClient
BROKERS = ["127.0.0.1:9292"]
async def publish():
async with LiftbridgeClient(
BROKERS, compression_method="lzma", compression_threshold=0
) as client:
for i in range(10):
await client.publish(
f"msg{i}".encode("utf-8"),
stream="test",
partition=0,
auto_compress=True,
)
asyncio.run(publish())
|
Liftbridge is the dynamic cluster, synchronized over
Raft protocol. Cluster members may enter and leave and
the client uses one or more cluster members as a bootstrap
to recover an actual topology. These bootstrap
members are called seeds
and are defined as a list
of the strings in the host:port
format. For our
example, we consider the Liftbridge is running
locally at the 127.0.0.1:9292
. Take note, ever we have
one seed, we must define it as a list.
compression.py |
---|
| import asyncio
from gufo.liftbridge.client import LiftbridgeClient
BROKERS = ["127.0.0.1:9292"]
async def publish():
async with LiftbridgeClient(
BROKERS, compression_method="lzma", compression_threshold=0
) as client:
for i in range(10):
await client.publish(
f"msg{i}".encode("utf-8"),
stream="test",
partition=0,
auto_compress=True,
)
asyncio.run(publish())
|
All async code must be performed in the async
functions,
so our publish()
function is async def
.
compression.py |
---|
| import asyncio
from gufo.liftbridge.client import LiftbridgeClient
BROKERS = ["127.0.0.1:9292"]
async def publish():
async with LiftbridgeClient(
BROKERS, compression_method="lzma", compression_threshold=0
) as client:
for i in range(10):
await client.publish(
f"msg{i}".encode("utf-8"),
stream="test",
partition=0,
auto_compress=True,
)
asyncio.run(publish())
|
We need an instance of the client. The instance may be used
directly or operated as an async context manager
with the async with
clause. When used as a context manager,
the client automatically closes all connections on the exit of context,
so its lifetime is defined explicitly. LiftbridgeClient
requires
a list of seeds to connect the cluster, so we passed the BROKERS
list.
Unlike the publish example we added two options:
compression_method
: must be zlib
or lzma
. Sets the algorithm
used when message compression is requested.
compression_threshold
: sets the minimal size of message required
to compress them. 0
means compressing all messages.
The client is highly configurable, refer to the
LiftbridgeClient reference for the detailed
explanations.
compression.py |
---|
| import asyncio
from gufo.liftbridge.client import LiftbridgeClient
BROKERS = ["127.0.0.1:9292"]
async def publish():
async with LiftbridgeClient(
BROKERS, compression_method="lzma", compression_threshold=0
) as client:
for i in range(10):
await client.publish(
f"msg{i}".encode("utf-8"),
stream="test",
partition=0,
auto_compress=True,
)
asyncio.run(publish())
|
Let's publish 10 messages.
compression.py |
---|
| import asyncio
from gufo.liftbridge.client import LiftbridgeClient
BROKERS = ["127.0.0.1:9292"]
async def publish():
async with LiftbridgeClient(
BROKERS, compression_method="lzma", compression_threshold=0
) as client:
for i in range(10):
await client.publish(
f"msg{i}".encode("utf-8"),
stream="test",
partition=0,
auto_compress=True,
)
asyncio.run(publish())
|
The publish()
function is used to publish one message immediately.
The client does not enforce any specific data format and leaves
it to the application. The only requirement is to pass the message
value as the bytes
type. In our example, we construct the string
as the message and we have to encode it to the bytes
manually.
Then we must specify the stream and partition to publish, we use
partition 0
of stream test
in our example.
Unlike the publish example we added auto_compress
option.
auto_compress
instructs the client to compress the message if it falls
beyond the threshold.
publish()
is the async function and must be awaited. The function may accept
additional parameters, so refer to the
publish() referece for the detailed explanations.
compression.py |
---|
| import asyncio
from gufo.liftbridge.client import LiftbridgeClient
BROKERS = ["127.0.0.1:9292"]
async def publish():
async with LiftbridgeClient(
BROKERS, compression_method="lzma", compression_threshold=0
) as client:
for i in range(10):
await client.publish(
f"msg{i}".encode("utf-8"),
stream="test",
partition=0,
auto_compress=True,
)
asyncio.run(publish())
|
Use asyncio.run()
function to start our async code.