gufo.liftbridge.client¶
Python Liftbridge client.
Attributes:
Name | Type | Description |
---|---|---|
logger |
Client logger. |
GRPCChannel
¶
Bases: object
gRPC channel wrapper. Wraps connection to broker.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
broker
|
str
|
target address in form " : |
required |
__aenter__()
async
¶
Context manager enter.
__aexit__(exc_type, exc, traceback)
async
¶
Context management exit.
__getattr__(item)
¶
Get wrapped API method.
close()
async
¶
Close channel connection.
connect(max_message_size=DEFAULT_MAX_MESSAGE_SIZE, enable_http_proxy=False)
async
¶
Connect channel.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
max_message_size
|
int
|
Maximal size of message. |
DEFAULT_MAX_MESSAGE_SIZE
|
enable_http_proxy
|
bool
|
Enable usage of HTTP proxies. |
False
|
LiftbridgeClient
¶
Bases: object
Asynchronous Liftbridge client.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
brokers
|
Iterable[str]
|
Iterable of stings containning hints to cluster
members, either as FQDN or in |
required |
max_message_size
|
int
|
Maximal size of message in octets. |
DEFAULT_MAX_MESSAGE_SIZE
|
enable_http_proxy
|
bool
|
Enable usage of HTTP proxies. |
False
|
compression_method
|
Optional[str]
|
Enable message compression. |
None
|
compression_threshold
|
int
|
Works only if |
0
|
encoding_header
|
str
|
Header name to pass |
'X-Msg-Encoding'
|
publish_async_ack_timeout
|
float
|
|
10.0
|
metadata_leader_timeout
|
float
|
Mean timeout on |
3.0
|
metadata_leader_dev
|
float
|
Deviation of timeout on
|
1.0
|
Raises:
Type | Description |
---|---|
ValueError
|
If parameters are incorrect. |
__aenter__()
async
¶
Entering context manager.
__aexit__(exc_type, exc, traceback)
async
¶
Exit from context manager.
close()
async
¶
Close all open channels.
create_stream(name, *, subject=None, group=None, replication_factor=1, minisr=0, partitions=1, enable_compact=False, retention_max_age=0, retention_max_bytes=0, segment_max_age=0, segment_max_bytes=0, auto_pause_time=0, auto_pause_disable_if_subscribers=False, wait_for_stream=False)
async
¶
Create stream. Internal implementation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name
|
str
|
Stream name. |
required |
subject
|
Optional[str]
|
Optional NATS subject. |
None
|
group
|
Optional[str]
|
??? |
None
|
replication_factor
|
int
|
Replication factor, amount of cluster members to replicate each partition. |
1
|
minisr
|
int
|
Minimum in-service replicas. |
0
|
partitions
|
int
|
Number of partition in the stream. |
1
|
enable_compact
|
bool
|
Enable stream compaction. |
False
|
retention_max_age
|
int
|
??? |
0
|
retention_max_bytes
|
int
|
??? |
0
|
segment_max_age
|
int
|
??? |
0
|
segment_max_bytes
|
int
|
??? |
0
|
auto_pause_time
|
int
|
??? |
0
|
auto_pause_disable_if_subscribers
|
bool
|
??? |
False
|
wait_for_stream
|
bool
|
Wait until the stream is really created. |
False
|
delete_stream(name)
async
¶
Delete streeam.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name
|
str
|
Stream name. |
required |
get_cursor(stream, partition, cursor_id)
async
¶
Fetch current partition cursor position.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stream
|
str
|
Stream name. |
required |
partition
|
int
|
Partition numbers. |
required |
cursor_id
|
str
|
Cursor identifier. |
required |
Returns:
Type | Description |
---|---|
int
|
Current cursor position. -1 for the new cursor. |
get_metadata(stream=None, *, wait_for_stream=False)
async
¶
Get cluster metadata.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stream
|
Optional[str]
|
Fetch metadata only for particular stream, if set. |
None
|
wait_for_stream
|
bool
|
Wait until stream will be created. |
False
|
get_partition_metadata(stream, partition, wait_for_stream=False)
async
¶
Fetch metadata for particular partition.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stream
|
str
|
Stream name. |
required |
partition
|
int
|
Partitionn number. |
required |
wait_for_stream
|
bool
|
If set, wait for stream being available. |
False
|
Returns:
Type | Description |
---|---|
PartitionMetadata
|
Partition metadata. |
get_publish_request(value, *, stream=None, key=None, partition=None, headers=None, ack_inbox=None, correlation_id=None, ack_policy=AckPolicy.LEADER, auto_compress=False)
¶
Generate PublishRequest for bulk operations.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value
|
bytes
|
Message body. |
required |
stream
|
Optional[str]
|
Stream to publish. |
None
|
key
|
Optional[bytes]
|
Optional message key. |
None
|
partition
|
Optional[int]
|
Partition. |
None
|
headers
|
Optional[Dict[str, bytes]]
|
Message headers. |
None
|
ack_inbox
|
Optional[str]
|
Optional inbox to send acknowledge. |
None
|
correlation_id
|
Optional[str]
|
Opaque id to correlate messages. |
None
|
ack_policy
|
AckPolicy
|
Acknowledgement policies. |
LEADER
|
auto_compress
|
bool
|
If |
False
|
publish(value, *, stream=None, key=None, partition=None, headers=None, ack_inbox=None, correlation_id=None, ack_policy=AckPolicy.LEADER, wait_for_stream=False, auto_compress=False)
async
¶
Publish single message.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value
|
bytes
|
bytes, |
required |
stream
|
Optional[str]
|
Optional[str] = None, |
None
|
key
|
Optional[bytes]
|
Optional[bytes] = None, |
None
|
partition
|
Optional[int]
|
Optional[int] = None, |
None
|
headers
|
Optional[Dict[str, bytes]]
|
Optional[Dict[str, bytes]] = None, |
None
|
ack_inbox
|
Optional[str]
|
Optional[str] = None, |
None
|
correlation_id
|
Optional[str]
|
Optional[str] = None, |
None
|
ack_policy
|
AckPolicy
|
AckPolicy = AckPolicy.LEADER, |
LEADER
|
wait_for_stream
|
bool
|
bool = False, |
False
|
auto_compress
|
bool
|
bool = False, |
False
|
publish_bulk(iter_req, wait=True)
async
¶
Bulk publishing from iterator.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
iter_req
|
Iterable[PublishRequest]
|
Iterable of PublishRequest. |
required |
wait
|
bool
|
Wait for all acks if set to |
True
|
set_cursor(stream, partition, cursor_id, offset)
async
¶
Save cursor position for partition.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stream
|
str
|
Stream name. |
required |
partition
|
int
|
Partition number. |
required |
cursor_id
|
str
|
Cursor identifier. |
required |
offset
|
int
|
Cursor offset to save. |
required |
subscribe(stream, *, partition=None, start_position=StartPosition.NEW_ONLY, start_offset=None, start_timestamp=None, resume=False, cursor_id=None, timeout=None, allow_isr=False)
async
¶
Subscribe to partition.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stream
|
str
|
Stream name. |
required |
partition
|
Optional[int]
|
Stream partition. |
None
|
start_position
|
StartPosition
|
Starting position. See |
NEW_ONLY
|
start_offset
|
Optional[int]
|
Starting offset, if |
None
|
start_timestamp
|
Optional[float]
|
Starting timestamp,
if |
None
|
resume
|
bool
|
Resume start position. |
False
|
cursor_id
|
Optional[str]
|
Cursor ID to resume, if |
None
|
timeout
|
Optional[int]
|
Optional timeout in seconds. |
None
|
allow_isr
|
bool
|
Allow connections to in-state replicas (ISR). |
False
|
wait_for_cursors()
async
¶
Wait until cursors become available.
wait_for_stream(stream)
async
¶
Wait until stream become availabble.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stream
|
str
|
Stream name |
required |