Skip to content

gufo.liftbridge.client

Python Liftbridge client

Attributes:

Name Type Description
logger

Client logger.

GRPCChannel

Bases: object

gRPC channel wrapper. Wraps connection to broker.

Parameters:

Name Type Description Default
broker str

target address in form "

:".

required

close() async

Close channel connection.

connect(max_message_size=DEFAULT_MAX_MESSAGE_SIZE, enable_http_proxy=False) async

Connect channel.

Parameters:

Name Type Description Default
max_message_size int

Maximal size of message.

DEFAULT_MAX_MESSAGE_SIZE
enable_http_proxy bool

Enable usage of HTTP proxies.

False

LiftbridgeClient

Bases: object

Asynchronous Liftbridge client.

Parameters:

Name Type Description Default
brokers Iterable[str]

Iterable of stings containning hints to cluster members, either as FQDN or in <address>:<port> format.

required
max_message_size int

Maximal size of message in octets.

DEFAULT_MAX_MESSAGE_SIZE
enable_http_proxy bool

Enable usage of HTTP proxies.

False
compression_method Optional[str]

Enable message compression.

None
compression_threshold int

Works only if compression_method is set. Do not compress messages with size below the compression_threshold.

0
encoding_header str

Header name to pass compression_method when message is compressed.

'X-Msg-Encoding'
publish_async_ack_timeout float
10.0
metadata_leader_timeout float

Mean timeout on no metadata leader error.

3.0
metadata_leader_dev float

Deviation of timeout on no metadata leader error.

1.0

Raises:

Type Description
ValueError

If parameters are incorrect.

close() async

Close all open channels.

create_stream(name, *, subject=None, group=None, replication_factor=1, minisr=0, partitions=1, enable_compact=False, retention_max_age=0, retention_max_bytes=0, segment_max_age=0, segment_max_bytes=0, auto_pause_time=0, auto_pause_disable_if_subscribers=False, wait_for_stream=False) async

Create stream. Internal implementation.

Parameters:

Name Type Description Default
name str

Stream name.

required
subject Optional[str]

Optional NATS subject.

None
group Optional[str]
None
replication_factor int

Replication factor, amount of cluster members to replicate each partition.

1
minisr int
0
partitions int

Number of partition in the stream.

1
enable_compact bool

Enable stream compaction.

False
retention_max_age int
0
retention_max_bytes int
0
segment_max_age int
0
segment_max_bytes int
0
auto_pause_time int
0
auto_pause_disable_if_subscribers bool
False
wait_for_stream bool

Wait until the stream is really created.

False

delete_stream(name) async

Delete streeam.

Parameters:

Name Type Description Default
name str

Stream name.

required

get_cursor(stream, partition, cursor_id) async

Fetch current partition cursor position.

Parameters:

Name Type Description Default
stream str

Stream name.

required
partition int

Partition numbers.

required
cursor_id str

Cursor identifier.

required

Returns:

Type Description
int

Current cursor position. -1 for the new cursor.

get_metadata(stream=None, *, wait_for_stream=False) async

Get cluster metadata.

Parameters:

Name Type Description Default
stream Optional[str]

Fetch metadata only for particular stream, if set.

None
wait_for_stream bool

Wait until stream will be created.

False

get_partition_metadata(stream, partition, wait_for_stream=False) async

Fetch metadata for particular partition.

Parameters:

Name Type Description Default
stream str

Stream name.

required
partition int

Partitionn number.

required
wait_for_stream bool

If set, wait for stream being available.

False

Returns:

Type Description
PartitionMetadata

Partition metadata.

get_publish_request(value, *, stream=None, key=None, partition=None, headers=None, ack_inbox=None, correlation_id=None, ack_policy=AckPolicy.LEADER, auto_compress=False)

Generate PublishRequest for bulk operations.

Parameters:

Name Type Description Default
value bytes
required
stream Optional[str]
None
key Optional[bytes]
None
partition Optional[int]
None
headers Optional[Dict[str, bytes]]
None
ack_inbox Optional[str]
None
correlation_id Optional[str]
None
ack_policy AckPolicy
LEADER
auto_compress bool

If True compress value if the client's compression_method is set and the size of value is beyound compression_threshold.

False

publish(value, *, stream=None, key=None, partition=None, headers=None, ack_inbox=None, correlation_id=None, ack_policy=AckPolicy.LEADER, wait_for_stream=False, auto_compress=False) async

Publish single message.

Parameters:

Name Type Description Default
value bytes

bytes,

required
stream Optional[str]

Optional[str] = None,

None
key Optional[bytes]

Optional[bytes] = None,

None
partition Optional[int]

Optional[int] = None,

None
headers Optional[Dict[str, bytes]]

Optional[Dict[str, bytes]] = None,

None
ack_inbox Optional[str]

Optional[str] = None,

None
correlation_id Optional[str]

Optional[str] = None,

None
ack_policy AckPolicy

AckPolicy = AckPolicy.LEADER,

LEADER
wait_for_stream bool

bool = False,

False
auto_compress bool

bool = False,

False

publish_bulk(iter_req, wait=True) async

Bulk publishing from iterator.

Parameters:

Name Type Description Default
iter_req Iterable[PublishRequest]

Iterable of PublishRequest.

required
wait bool

Wait for all acks if set to True.

True

set_cursor(stream, partition, cursor_id, offset) async

Save cursor position for partition.

Parameters:

Name Type Description Default
stream str

Stream name.

required
partition int

Partition number.

required
cursor_id str

Cursor identifier.

required
offset int

Cursor offset to save.

required

subscribe(stream, *, partition=None, start_position=StartPosition.NEW_ONLY, start_offset=None, start_timestamp=None, resume=False, cursor_id=None, timeout=None, allow_isr=False) async

Subscribe to partition.

Parameters:

Name Type Description Default
stream str

Stream name.

required
partition Optional[int]

Stream partition.

None
start_position StartPosition

Starting position. See StartPosition for details.

NEW_ONLY
start_offset Optional[int]

Starting offset, if start_position is OFFSET

None
start_timestamp Optional[float]

Starting timestamp, if start_position is TIMESTAMP

None
resume bool
False
cursor_id Optional[str]

Cursor ID to resume, if start_position is RESUME.

None
timeout Optional[int]

Optional timeout in seconds.

None
allow_isr bool

Allow connections to in-state replicas (ISR).

False

wait_for_cursors() async

Wait until cursors become available

wait_for_stream(stream) async

Wait until stream become availabble.

Parameters:

Name Type Description Default
stream str

Stream name

required