gufo.liftbridge.client¶
Python Liftbridge client
Attributes:
Name | Type | Description |
---|---|---|
logger |
Client logger. |
GRPCChannel
¶
Bases: object
gRPC channel wrapper. Wraps connection to broker.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
broker |
str
|
target address in form " : |
required |
close()
async
¶
Close channel connection.
connect(max_message_size=DEFAULT_MAX_MESSAGE_SIZE, enable_http_proxy=False)
async
¶
Connect channel.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
max_message_size |
int
|
Maximal size of message. |
DEFAULT_MAX_MESSAGE_SIZE
|
enable_http_proxy |
bool
|
Enable usage of HTTP proxies. |
False
|
LiftbridgeClient
¶
Bases: object
Asynchronous Liftbridge client.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
brokers |
Iterable[str]
|
Iterable of stings containning hints to cluster
members, either as FQDN or in |
required |
max_message_size |
int
|
Maximal size of message in octets. |
DEFAULT_MAX_MESSAGE_SIZE
|
enable_http_proxy |
bool
|
Enable usage of HTTP proxies. |
False
|
compression_method |
Optional[str]
|
Enable message compression. |
None
|
compression_threshold |
int
|
Works only if |
0
|
encoding_header |
str
|
Header name to pass |
'X-Msg-Encoding'
|
publish_async_ack_timeout |
float
|
|
10.0
|
metadata_leader_timeout |
float
|
Mean timeout on |
3.0
|
metadata_leader_dev |
float
|
Deviation of timeout on
|
1.0
|
Raises:
Type | Description |
---|---|
ValueError
|
If parameters are incorrect. |
close()
async
¶
Close all open channels.
create_stream(name, *, subject=None, group=None, replication_factor=1, minisr=0, partitions=1, enable_compact=False, retention_max_age=0, retention_max_bytes=0, segment_max_age=0, segment_max_bytes=0, auto_pause_time=0, auto_pause_disable_if_subscribers=False, wait_for_stream=False)
async
¶
Create stream. Internal implementation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str
|
Stream name. |
required |
subject |
Optional[str]
|
Optional NATS subject. |
None
|
group |
Optional[str]
|
|
None
|
replication_factor |
int
|
Replication factor, amount of cluster members to replicate each partition. |
1
|
minisr |
int
|
|
0
|
partitions |
int
|
Number of partition in the stream. |
1
|
enable_compact |
bool
|
Enable stream compaction. |
False
|
retention_max_age |
int
|
|
0
|
retention_max_bytes |
int
|
|
0
|
segment_max_age |
int
|
|
0
|
segment_max_bytes |
int
|
|
0
|
auto_pause_time |
int
|
|
0
|
auto_pause_disable_if_subscribers |
bool
|
|
False
|
wait_for_stream |
bool
|
Wait until the stream is really created. |
False
|
delete_stream(name)
async
¶
Delete streeam.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str
|
Stream name. |
required |
get_cursor(stream, partition, cursor_id)
async
¶
Fetch current partition cursor position.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stream |
str
|
Stream name. |
required |
partition |
int
|
Partition numbers. |
required |
cursor_id |
str
|
Cursor identifier. |
required |
Returns:
Type | Description |
---|---|
int
|
Current cursor position. -1 for the new cursor. |
get_metadata(stream=None, *, wait_for_stream=False)
async
¶
Get cluster metadata.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stream |
Optional[str]
|
Fetch metadata only for particular stream, if set. |
None
|
wait_for_stream |
bool
|
Wait until stream will be created. |
False
|
get_partition_metadata(stream, partition, wait_for_stream=False)
async
¶
Fetch metadata for particular partition.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stream |
str
|
Stream name. |
required |
partition |
int
|
Partitionn number. |
required |
wait_for_stream |
bool
|
If set, wait for stream being available. |
False
|
Returns:
Type | Description |
---|---|
PartitionMetadata
|
Partition metadata. |
get_publish_request(value, *, stream=None, key=None, partition=None, headers=None, ack_inbox=None, correlation_id=None, ack_policy=AckPolicy.LEADER, auto_compress=False)
¶
Generate PublishRequest for bulk operations.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
bytes
|
|
required |
stream |
Optional[str]
|
|
None
|
key |
Optional[bytes]
|
|
None
|
partition |
Optional[int]
|
|
None
|
headers |
Optional[Dict[str, bytes]]
|
|
None
|
ack_inbox |
Optional[str]
|
|
None
|
correlation_id |
Optional[str]
|
|
None
|
ack_policy |
AckPolicy
|
|
LEADER
|
auto_compress |
bool
|
If |
False
|
publish(value, *, stream=None, key=None, partition=None, headers=None, ack_inbox=None, correlation_id=None, ack_policy=AckPolicy.LEADER, wait_for_stream=False, auto_compress=False)
async
¶
Publish single message.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
bytes
|
bytes, |
required |
stream |
Optional[str]
|
Optional[str] = None, |
None
|
key |
Optional[bytes]
|
Optional[bytes] = None, |
None
|
partition |
Optional[int]
|
Optional[int] = None, |
None
|
headers |
Optional[Dict[str, bytes]]
|
Optional[Dict[str, bytes]] = None, |
None
|
ack_inbox |
Optional[str]
|
Optional[str] = None, |
None
|
correlation_id |
Optional[str]
|
Optional[str] = None, |
None
|
ack_policy |
AckPolicy
|
AckPolicy = AckPolicy.LEADER, |
LEADER
|
wait_for_stream |
bool
|
bool = False, |
False
|
auto_compress |
bool
|
bool = False, |
False
|
publish_bulk(iter_req, wait=True)
async
¶
Bulk publishing from iterator.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
iter_req |
Iterable[PublishRequest]
|
Iterable of PublishRequest. |
required |
wait |
bool
|
Wait for all acks if set to |
True
|
set_cursor(stream, partition, cursor_id, offset)
async
¶
Save cursor position for partition.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stream |
str
|
Stream name. |
required |
partition |
int
|
Partition number. |
required |
cursor_id |
str
|
Cursor identifier. |
required |
offset |
int
|
Cursor offset to save. |
required |
subscribe(stream, *, partition=None, start_position=StartPosition.NEW_ONLY, start_offset=None, start_timestamp=None, resume=False, cursor_id=None, timeout=None, allow_isr=False)
async
¶
Subscribe to partition.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stream |
str
|
Stream name. |
required |
partition |
Optional[int]
|
Stream partition. |
None
|
start_position |
StartPosition
|
Starting position. See |
NEW_ONLY
|
start_offset |
Optional[int]
|
Starting offset, if |
None
|
start_timestamp |
Optional[float]
|
Starting timestamp,
if |
None
|
resume |
bool
|
|
False
|
cursor_id |
Optional[str]
|
Cursor ID to resume, if |
None
|
timeout |
Optional[int]
|
Optional timeout in seconds. |
None
|
allow_isr |
bool
|
Allow connections to in-state replicas (ISR). |
False
|
wait_for_cursors()
async
¶
Wait until cursors become available
wait_for_stream(stream)
async
¶
Wait until stream become availabble.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stream |
str
|
Stream name |
required |