你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
EventHubProducerClient 类
EventHubProducerClient 类定义了一个高级接口,用于将事件发送到Azure 事件中心服务。
- 继承
-
azure.eventhub.aio._client_base_async.ClientBaseAsyncEventHubProducerClient
构造函数
EventHubProducerClient(fully_qualified_namespace: str, eventhub_name: str, credential: CredentialTypes, *, buffered_mode: bool = False, on_error: Callable[[List[EventData | AmqpAnnotatedMessage], str | None, Exception], Awaitable[None]] | None = None, on_success: Callable[[List[EventData | AmqpAnnotatedMessage], str | None], Awaitable[None]] | None = None, max_buffer_length: int | None = None, max_wait_time: float | None = None, **kwargs: Any)
参数
- credential
- AsyncTokenCredential 或 AzureSasCredential 或 AzureNamedKeyCredential
用于身份验证的凭据对象,该对象实现用于获取令牌的特定接口。 它接受 EventHubSharedKeyCredential由 azure 标识库生成的 、 或凭据对象,以及实现 *get_token (self、 scopes) 方法的对象。
- buffered_mode
- bool
如果为 True,则生成者客户端将在缓冲区中收集事件,有效地批处理,然后发布。 默认值为 False。
成功发布批后要调用的回调。 回调采用两个参数:
事件:已成功发布的事件列表
partition_id:列表中事件已发布到的分区 ID。
回调函数的定义应如下所示: on_success (事件partition_id) 。 当 buffered_mode 为 True 时是必需的,而 如果buffered_mode 为 False,则为可选。
未能发布批后要调用的回调。 在 buffered_mode 中为 True 时是必需的,如果 buffered_mode 为 False,则为可选。 回调函数的定义应如下所示: on_error (事件、partition_id、错误) ,其中:
事件:未能发布的事件列表,
partition_id:已尝试将列表中的事件发布到 和 的分区 ID
错误:与发送失败相关的异常。
如果 buffered_mode 为 False, 则on_error 回调是可选的,将按如下所示处理错误:
如果在生成者客户端实例化期间传递了 on_error 回调,
然后,错误信息将传递到 on_error 回调,然后调用回调。
如果在客户端实例化期间未传入 on_error 回调,
则默认情况下将引发错误。
如果 buffered_mode 为 True,则需要 on_error 回调,并且将按如下所示处理错误:
如果事件无法在给定的超时内排队,则将直接引发错误。
如果在成功排队后未能发送事件,则将调用 on_error 回调。
- max_buffer_length
- int
仅缓冲模式。 在触发刷新之前可以缓冲的每个分区的事件总数。 缓冲模式下的默认值为 1500。
- logging_enable
- bool
是否将网络跟踪日志输出到记录器。 默认值为 False。
- auth_timeout
- float
等待令牌由服务授权的时间(以秒为单位)。 默认值为 60 秒。 如果设置为 0,则不会从客户端强制实施超时。
- user_agent
- str
如果指定,则会将其添加到用户代理字符串的前面。
- retry_total
- int
发生错误时恢复失败操作的尝试总数。 默认值为 3。
- retry_backoff_factor
- float
第二次尝试后在尝试之间应用的退让因素 (大多数错误通过第二次尝试立即解决,无需延迟) 。 在固定模式下,重试策略将始终为 {backoff factor} 休眠。 在“指数”模式下,重试策略将休眠: {backoff factor} * (2 ** ({总重试次数} - 1) ) 秒。 如果backoff_factor为 0.1,则重试将在两次重试之间休眠 [0.0s, 0.2s, 0.4s, ...]。 默认值为 0.8。
- retry_backoff_max
- float
最长回退时间。 默认值为 120 秒 (2 分钟) 。
- retry_mode
- str
重试尝试之间的延迟行为。 支持的值是“fixed”或“exponential”,其中默认值为“exponential”。
- idle_timeout
- float
超时(以秒为单位),之后,如果没有活动,此客户端将关闭基础连接。 默认情况下,该值为 None,这意味着除非由服务启动,否则客户端不会因为处于非活动状态而关闭。
- transport_type
- TransportType
将用于与事件中心服务通信的传输协议的类型。 默认值为 TransportType.Amqp ,在这种情况下,使用端口 5671。 如果端口 5671 在网络环境中不可用/被阻止,则可改用使用端口 443 进行通信的 TransportType.AmqpOverWebsocket 。
- http_proxy
- dict
HTTP 代理设置。 这必须是具有以下键的字典: “proxy_hostname” (str 值) 和 “proxy_port” (int 值) 。 此外,还可能存在以下密钥:“username”、“password”。
用于与事件中心服务建立连接的自定义终结点地址,允许通过任何应用程序网关或主机环境所需的其他路径路由网络请求。 默认值为 None。 格式类似于“sb://< custom_endpoint_hostname>:<custom_endpoint_port>”。 如果未在 custom_endpoint_address中指定端口,则默认使用端口 443。
SSL 证书的自定义CA_BUNDLE文件的路径,用于对连接终结点的标识进行身份验证。 默认值为 None,在这种情况下将使用 certifi.where () 。
- uamqp_transport
- bool
是否使用 uamqp 库作为基础传输。 默认值为 False,纯 Python AMQP 库将用作基础传输。
- socket_timeout
- float
连接上的基础套接字在发送和接收数据时应在超时之前等待的时间(以秒为单位)。对于 TransportType.Amqp,默认值为 0.2,对于 TransportType.AmqpOverWebsocket,默认值为 1。 如果 EventHubsConnectionError 错误由于写入超时而发生,则可能需要传入大于默认值的值。 这适用于高级使用方案,通常默认值应足够。
示例
创建 EventHubProducerClient 的新实例。
import os
from azure.eventhub.aio import EventHubProducerClient, EventHubSharedKeyCredential
fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
eventhub_name = os.environ['EVENT_HUB_NAME']
shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
shared_access_key = os.environ['EVENT_HUB_SAS_KEY']
producer = EventHubProducerClient(fully_qualified_namespace=fully_qualified_namespace,
eventhub_name=eventhub_name,
credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key))
方法
close |
关闭生成者客户端基础 AMQP 连接和链接。 |
create_batch |
使用受max_size_in_bytes约束的所有内容的最大大小创建 EventDataBatch 对象。 max_size_in_bytes不应大于服务定义的允许的最大消息大小。 |
flush |
仅缓冲模式。 刷新缓冲区中的事件,如果客户端在缓冲模式下工作,将立即发送。 |
from_connection_string |
从连接字符串创建 EventHubProducerClient。 |
get_buffered_event_count |
为给定分区缓冲并等待发布的事件数。 在非缓冲模式下返回 None。 注意:事件缓冲区在后台协同例程中处理,因此,此 API 报告的缓冲区中的事件数应仅被视为近似值,并且仅建议在调试中使用。 对于没有缓冲事件的分区 ID,无论该分区 ID 是否实际存在于事件中心内,都将返回 0。 |
get_eventhub_properties |
获取事件中心的属性。 返回的字典中的键包括:
|
get_partition_ids |
获取事件中心的分区 ID。 |
get_partition_properties |
获取指定分区的属性。 属性字典中的键包括:
|
send_batch |
发送一批事件数据。 默认情况下,方法将阻止,直到收到确认或操作超时。如果将 EventHubProducerClient 配置为在缓冲模式下运行,则 方法会将事件排队到本地缓冲区并返回。 生成者将在后台执行自动发送。 如果 buffered_mode 为 False, 则on_error 回调是可选的,将按如下所示处理错误:
如果 buffered_mode 为 True,则需要 on_error 回调,并且将按如下所示处理错误:
在缓冲模式下,发送批将保持不变,并作为单个单元发送。 批不会重新排列。 这可能会导致发送事件效率低下。 如果要发送 EventData 或 AmqpAnnotatedMessage 的有限列表,并且知道该列表在事件中心帧大小限制内,则可以使用 send_batch 调用发送它们。 否则,使用 create_batch 创建 EventDataBatch 并将 EventData 或 AmqpAnnotatedMessage 逐个添加到批处理中,直到达到大小限制,然后调用此方法发送批处理。 |
send_event |
发送事件数据。 默认情况下,方法将阻止,直到收到确认或操作超时。如果将 EventHubProducerClient 配置为在缓冲模式下运行,则 方法会将事件排队到本地缓冲区并返回 。 生成者将在后台执行自动批处理和发送。 如果 buffered_mode 为 False, 则on_error 回调是可选的,将按如下方式处理错误: * 如果在生成者客户端实例化期间传递 了on_error 回调,
如果 buffered_mode 为 True,则需要 on_error 回调,错误将按如下方式进行处理:* 如果事件无法在给定超时内排队,则将直接引发错误。
|
close
关闭生成者客户端基础 AMQP 连接和链接。
async close(*, flush: bool = True, **kwargs: Any) -> None
参数
- flush
- bool
仅缓冲模式。 如果设置为 True,则会立即发送缓冲区中的事件。 默认值为 True。
返回类型
例外
如果在刷新设置为 True 时刷新缓冲区时出错,或在缓冲模式下关闭基础 AMQP 连接。
示例
关闭处理程序。
import os
from azure.eventhub.aio import EventHubProducerClient
from azure.eventhub import EventData
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
producer = EventHubProducerClient.from_connection_string(
conn_str=event_hub_connection_str,
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
try:
event_data_batch = await producer.create_batch()
while True:
try:
event_data_batch.add(EventData('Message inside EventBatchData'))
except ValueError:
# The EventDataBatch object reaches its max_size.
# You can send the full EventDataBatch object and create a new one here.
break
await producer.send_batch(event_data_batch)
finally:
# Close down the producer handler.
await producer.close()
create_batch
使用受max_size_in_bytes约束的所有内容的最大大小创建 EventDataBatch 对象。
max_size_in_bytes不应大于服务定义的允许的最大消息大小。
async create_batch(*, partition_id: str | None = None, partition_key: str | None = None, max_size_in_bytes: int | None = None) -> EventDataBatch
返回类型
例外
如果在刷新设置为 True 时刷新缓冲区时出错,或在缓冲模式下关闭基础 AMQP 连接。
示例
在有限大小内创建 EventDataBatch 对象
from azure.eventhub import EventData
event_data_batch = await producer.create_batch()
while True:
try:
event_data_batch.add(EventData('Message inside EventBatchData'))
except ValueError:
# The EventDataBatch object reaches its max_size.
# You can send the full EventDataBatch object and create a new one here.
break
flush
仅缓冲模式。 刷新缓冲区中的事件,如果客户端在缓冲模式下工作,将立即发送。
async flush(**kwargs: Any) -> None
参数
返回类型
例外
如果生成者无法在缓冲模式下的给定超时内刷新缓冲区。
from_connection_string
从连接字符串创建 EventHubProducerClient。
from_connection_string(conn_str: str, *, eventhub_name: str | None = None, buffered_mode: bool = False, on_error: Callable[[List[EventData | AmqpAnnotatedMessage], str | None, Exception], Awaitable[None]] | None = None, on_success: Callable[[List[EventData | AmqpAnnotatedMessage], str | None], Awaitable[None]] | None = None, max_buffer_length: int | None = None, max_wait_time: float | None = None, logging_enable: bool = False, http_proxy: Dict[str, str | int] | None = None, auth_timeout: float = 60, user_agent: str | None = None, retry_total: int = 3, transport_type: TransportType = TransportType.Amqp, **kwargs: Any) -> EventHubProducerClient
参数
- eventhub_name
- str
要将客户端连接到的特定事件中心的路径。
- buffered_mode
- bool
如果为 True,则生成者客户端将在缓冲区中收集事件,有效地批处理,然后发布。 默认值为 False。
成功发布批处理后要调用的回调。 回调采用两个参数:
events:已成功发布的事件列表
partition_id:列表中事件已发布到的分区 ID。
回调函数的定义应如下所示: on_success (事件,partition_id) 。 当 buffered_mode 为 True 时,它是必需的,如果 buffered_mode 为 False,则 为 可选。
未能发布批后要调用的回调。 回调函数的定义应如下所示: on_error (事件、partition_id、错误) ,其中:
events:未能发布的事件列表,
partition_id:已尝试将列表中的事件发布到 和 的分区 ID
error:与发送失败相关的异常。
如果 buffered_mode 为 False, 则on_error 回调是可选的,将按如下所示处理错误:
如果在生成者客户端实例化期间传递on_error回调,
然后,错误信息将传递到 on_error 回调,然后调用该回调。
如果在客户端实例化期间未传入 on_error 回调,
则默认情况下将引发错误。
如果 buffered_mode 为 True,则需要 on_error 回调,并将按如下所示处理错误:
如果事件无法在给定的超时时间内排队,则将直接引发错误。
如果成功排队后无法发送事件,将调用 on_error 回调。
- max_buffer_length
- int
仅缓冲模式。 触发刷新前可以缓冲的每个分区的事件总数。 缓冲模式下的默认值为 1500。
- logging_enable
- bool
是否将网络跟踪日志输出到记录器。 默认值为 False。
- http_proxy
- dict
HTTP 代理设置。 这必须是具有以下键的字典: “proxy_hostname” (str 值) 和 “proxy_port” (int 值) 。 此外,还可能存在以下密钥:“username”、“password”。
- auth_timeout
- float
等待令牌由服务授权的时间(以秒为单位)。 默认值为 60 秒。 如果设置为 0,则不会从客户端强制实施超时。
- user_agent
- str
如果指定,则会将其添加到用户代理字符串的前面。
- retry_total
- int
发生错误时重做失败操作的尝试总数。 默认值为 3。
- retry_backoff_factor
- float
第二次尝试后应用两次尝试之间的退让因素 (大多数错误都会立即通过第二次尝试来解决,且无延迟) 。 在固定模式下,重试策略将始终为 {backoff factor} 休眠。 在“指数”模式下,重试策略的睡眠时间为: {backoff factor} * (2 ** ({total retries} - 1) ) 秒。 如果backoff_factor为 0.1,则在两次重试之间,重试将休眠 [0.0s, 0.2s, 0.4s, ...]。 默认值为 0.8。
- retry_backoff_max
- float
最长回退时间。 默认值为 120 秒 (2 分钟) 。
- retry_mode
- str
重试尝试之间的延迟行为。 支持的值为“fixed”或“exponential”,默认值为“exponential”。
- idle_timeout
- float
超时(以秒为单位),在此之后,如果没有活动,此客户端将关闭基础连接。 默认情况下,该值为 None,这意味着除非由服务启动,否则客户端不会因为处于非活动状态而关闭。
- transport_type
- TransportType
将用于与事件中心服务通信的传输协议的类型。 默认值为 TransportType.Amqp ,在这种情况下,使用端口 5671。 如果端口 5671 在网络环境中不可用/被阻止,则可改用使用端口 443 进行通信的 TransportType.AmqpOverWebsocket 。
用于与事件中心服务建立连接的自定义终结点地址,允许通过任何应用程序网关或主机环境所需的其他路径路由网络请求。 默认值为 None。 格式类似于“sb://< custom_endpoint_hostname>:<custom_endpoint_port>”。 如果未在 custom_endpoint_address指定端口,则默认使用端口 443。
SSL 证书的自定义CA_BUNDLE文件的路径,该文件用于对连接终结点的标识进行身份验证。 默认值为 None,在这种情况下将使用 certifi.where () 。
- uamqp_transport
- bool
是否使用 uamqp 库作为基础传输。 默认值为 False,纯 Python AMQP 库将用作基础传输。
返回类型
例外
如果在刷新设置为 True 时刷新缓冲区时出错,或在缓冲模式下关闭基础 AMQP 连接。
示例
从 连接字符串 创建 EventHubProducerClient 的新实例。
import os
from azure.eventhub.aio import EventHubProducerClient
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
producer = EventHubProducerClient.from_connection_string(
conn_str=event_hub_connection_str,
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
get_buffered_event_count
为给定分区缓冲并等待发布的事件数。 在非缓冲模式下返回 None。 注意:事件缓冲区在后台协同例程中处理,因此,此 API 报告的缓冲区中的事件数应仅被视为近似值,并且仅建议在调试中使用。 对于没有缓冲事件的分区 ID,无论该分区 ID 是否实际存在于事件中心内,都将返回 0。
get_buffered_event_count(partition_id: str) -> int | None
参数
返回类型
例外
如果在刷新设置为 True 时刷新缓冲区时出错,或在缓冲模式下关闭基础 AMQP 连接。
get_eventhub_properties
获取事件中心的属性。
返回的字典中的键包括:
eventhub_name (str)
created_at (UTC datetime.datetime)
partition_ids (list[str])
async get_eventhub_properties() -> Dict[str, Any]
返回
包含有关事件中心信息的字典。
返回类型
例外
get_partition_ids
get_partition_properties
获取指定分区的属性。
属性字典中的键包括:
eventhub_name (str)
id (str)
beginning_sequence_number (int)
last_enqueued_sequence_number (int)
last_enqueued_offset (str)
last_enqueued_time_utc (UTC datetime.datetime)
is_empty (bool)
async get_partition_properties(partition_id: str) -> Dict[str, Any]
参数
返回
分区属性的 dict。
返回类型
例外
send_batch
发送一批事件数据。 默认情况下,方法将阻止,直到收到确认或操作超时。如果将 EventHubProducerClient 配置为在缓冲模式下运行,则 方法会将事件排队到本地缓冲区并返回。 生成者将在后台执行自动发送。
如果 buffered_mode 为 False, 则on_error 回调是可选的,将按如下所示处理错误:
如果在生成者客户端实例化期间传递了 on_error 回调,
然后,错误信息将传递到 on_error 回调,然后调用回调。
如果在客户端实例化期间未传入 on_error 回调,
则默认情况下将引发错误。
如果 buffered_mode 为 True,则需要 on_error 回调,并且将按如下所示处理错误:
如果事件无法在给定的超时内排队,则将直接引发错误。
如果在成功排队后未能发送事件,则将调用 on_error 回调。
在缓冲模式下,发送批将保持不变,并作为单个单元发送。 批不会重新排列。 这可能会导致发送事件效率低下。
如果要发送 EventData 或 AmqpAnnotatedMessage 的有限列表,并且知道该列表在事件中心帧大小限制内,则可以使用 send_batch 调用发送它们。 否则,使用 create_batch 创建 EventDataBatch 并将 EventData 或 AmqpAnnotatedMessage 逐个添加到批处理中,直到达到大小限制,然后调用此方法发送批处理。
async send_batch(event_data_batch: EventDataBatch | List[EventData | AmqpAnnotatedMessage], **kwargs: Any) -> None
参数
- event_data_batch
- Union[EventDataBatch, List[Union[EventData, AmqpAnnotatedMessage]]
要发送的 EventDataBatch 对象或要在批处理中发送的 EventData 列表。 列表中或 EventDataBatch 中的所有 EventData 或 AmqpAnnotatedMessage 都将位于同一分区上。
- timeout
- float
在非缓冲模式下发送事件数据的最大等待时间,或在缓冲模式下将事件数据排入缓冲区的最大等待时间。 在非缓冲模式下,将使用创建生成者时指定的默认等待时间。 在缓冲模式下,默认等待时间为 None。
- partition_id
- str
要发送到的特定分区 ID。 默认值为 None,在这种情况下,服务将使用轮循机制分配给所有分区。 如果指定了partition_id并且event_data_batch为 EventDataBatch,则引发 TypeError,因为 EventDataBatch 本身partition_id。
- partition_key
- str
使用给定partition_key,事件数据将发送到服务决定的事件中心的特定分区。 如果指定了partition_key,并且event_data_batch为 EventDataBatch,则会引发 TypeError,因为 EventDataBatch 本身partition_key。 如果同时提供partition_id和partition_key,则优先partition_id。 警告:不建议对要发送的事件设置非字符串值partition_key,因为事件中心服务将忽略partition_key,并且事件将使用轮循机制分配给所有分区。 此外,还有一些 SDK 用于使用预期partition_key仅为字符串类型的事件,它们可能无法分析非字符串值。
返回类型
例外
如果超时参数指定的值在以非缓冲模式发送事件之前已过,或者事件可以在缓冲模式下排队进入缓冲状态。
示例
异步发送事件数据
async with producer:
event_data_batch = await producer.create_batch()
while True:
try:
event_data_batch.add(EventData('Message inside EventBatchData'))
except ValueError:
# The EventDataBatch object reaches its max_size.
# You can send the full EventDataBatch object and create a new one here.
break
await producer.send_batch(event_data_batch)
send_event
发送事件数据。 默认情况下,方法将阻止,直到收到确认或操作超时。如果将 EventHubProducerClient 配置为在缓冲模式下运行,则 方法会将事件排队到本地缓冲区并返回 。 生成者将在后台执行自动批处理和发送。
如果 buffered_mode 为 False, 则on_error 回调是可选的,将按如下方式处理错误: * 如果在生成者客户端实例化期间传递 了on_error 回调,
then error information will be passed to the *on_error* callback, which will then be called.
* If an *on_error* callback is not passed in during client instantiation,
then the error will be raised by default.
如果 buffered_mode 为 True,则需要 on_error 回调,错误将按如下方式进行处理:* 如果事件无法在给定超时内排队,则将直接引发错误。
* If events fail to send after enqueuing successfully, the *on_error* callback will be called.
async send_event(event_data: EventData | AmqpAnnotatedMessage, **kwargs: Any) -> None
参数
- timeout
- float
在非缓冲模式下发送事件数据的最大等待时间,或在缓冲模式下将事件数据排队到缓冲区的最大等待时间。 在非缓冲模式下,将使用创建生成者时指定的默认等待时间。 在缓冲模式下,默认等待时间为“无”。
- partition_id
- str
要发送到的特定分区 ID。 默认值为 None,在这种情况下,服务将使用轮循机制分配给所有分区。 如果指定了partition_id并且event_data_batch是 EventDataBatch,则引发 TypeError,因为 EventDataBatch 本身具有partition_id。
- partition_key
- str
使用给定partition_key,事件数据将发送到服务决定的事件中心的特定分区。 如果指定了partition_key,并且event_data_batch为 EventDataBatch,则引发 TypeError,因为 EventDataBatch 本身具有partition_key。 如果同时提供partition_id和partition_key,则partition_id优先。 警告:不建议对要发送的事件设置非字符串值partition_key,因为事件中心服务将忽略partition_key,并且事件将使用轮循机制分配给所有分区。 此外,还有一些 SDK 用于使用预期partition_key仅为字符串类型的事件,它们可能无法分析非字符串值。
返回类型
例外
如果超时参数指定的值在非缓冲模式下发送事件之前已过,或者事件不能排队进入缓冲模式下的缓冲。