你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

EventHubProducerClient 类

EventHubProducerClient 类定义了一个高级接口,用于将事件发送到Azure 事件中心服务。

继承
azure.eventhub.aio._client_base_async.ClientBaseAsync
EventHubProducerClient

构造函数

EventHubProducerClient(fully_qualified_namespace: str, eventhub_name: str, credential: CredentialTypes, *, buffered_mode: bool = False, on_error: Callable[[List[EventData | AmqpAnnotatedMessage], str | None, Exception], Awaitable[None]] | None = None, on_success: Callable[[List[EventData | AmqpAnnotatedMessage], str | None], Awaitable[None]] | None = None, max_buffer_length: int | None = None, max_wait_time: float | None = None, **kwargs: Any)

参数

fully_qualified_namespace
str
必需

事件中心命名空间的完全限定主机名。 这可能类似于 .servicebus.windows.net

eventhub_name
str
必需

要将客户端连接到的特定事件中心的路径。

credential
AsyncTokenCredentialAzureSasCredentialAzureNamedKeyCredential
必需

用于身份验证的凭据对象,该对象实现用于获取令牌的特定接口。 它接受 EventHubSharedKeyCredential由 azure 标识库生成的 、 或凭据对象,以及实现 *get_token (self、 scopes) 方法的对象。

buffered_mode
bool

如果为 True,则生成者客户端将在缓冲区中收集事件,有效地批处理,然后发布。 默认值为 False。

on_success
Optional[Callable[[<xref:SendEventTypes>, Optional[str]], Awaitable[None]]]

成功发布批后要调用的回调。 回调采用两个参数:

  • 事件:已成功发布的事件列表

  • partition_id:列表中事件已发布到的分区 ID。

回调函数的定义应如下所示: on_success (事件partition_id) 。 当 buffered_mode 为 True 时是必需的,而 如果buffered_mode 为 False,则为可选。

on_error
Optional[Callable[[<xref:SendEventTypes>, Optional[str], Exception], Awaitable[None]]]

未能发布批后要调用的回调。 在 buffered_mode 中为 True 时是必需的,如果 buffered_mode 为 False,则为可选。 回调函数的定义应如下所示: on_error (事件、partition_id、错误) ,其中:

  • 事件:未能发布的事件列表,

  • partition_id:已尝试将列表中的事件发布到 和 的分区 ID

  • 错误:与发送失败相关的异常。

如果 buffered_mode 为 False, 则on_error 回调是可选的,将按如下所示处理错误:

  • 如果在生成者客户端实例化期间传递了 on_error 回调,

    然后,错误信息将传递到 on_error 回调,然后调用回调。

  • 如果在客户端实例化期间未传入 on_error 回调,

    则默认情况下将引发错误。

如果 buffered_mode 为 True,则需要 on_error 回调,并且将按如下所示处理错误:

  • 如果事件无法在给定的超时内排队,则将直接引发错误。

  • 如果在成功排队后未能发送事件,则将调用 on_error 回调。

max_buffer_length
int

仅缓冲模式。 在触发刷新之前可以缓冲的每个分区的事件总数。 缓冲模式下的默认值为 1500。

max_wait_time
Optional[float]

仅缓冲模式。 在发布之前,等待使用缓冲区中的事件生成批处理的时间量。 缓冲模式下的默认值为 1。

logging_enable
bool

是否将网络跟踪日志输出到记录器。 默认值为 False。

auth_timeout
float

等待令牌由服务授权的时间(以秒为单位)。 默认值为 60 秒。 如果设置为 0,则不会从客户端强制实施超时。

user_agent
str

如果指定,则会将其添加到用户代理字符串的前面。

retry_total
int

发生错误时恢复失败操作的尝试总数。 默认值为 3。

retry_backoff_factor
float

第二次尝试后在尝试之间应用的退让因素 (大多数错误通过第二次尝试立即解决,无需延迟) 。 在固定模式下,重试策略将始终为 {backoff factor} 休眠。 在“指数”模式下,重试策略将休眠: {backoff factor} * (2 ** ({总重试次数} - 1) ) 秒。 如果backoff_factor为 0.1,则重试将在两次重试之间休眠 [0.0s, 0.2s, 0.4s, ...]。 默认值为 0.8。

retry_backoff_max
float

最长回退时间。 默认值为 120 秒 (2 分钟) 。

retry_mode
str

重试尝试之间的延迟行为。 支持的值是“fixed”或“exponential”,其中默认值为“exponential”。

idle_timeout
float

超时(以秒为单位),之后,如果没有活动,此客户端将关闭基础连接。 默认情况下,该值为 None,这意味着除非由服务启动,否则客户端不会因为处于非活动状态而关闭。

transport_type
TransportType

将用于与事件中心服务通信的传输协议的类型。 默认值为 TransportType.Amqp ,在这种情况下,使用端口 5671。 如果端口 5671 在网络环境中不可用/被阻止,则可改用使用端口 443 进行通信的 TransportType.AmqpOverWebsocket

http_proxy
dict

HTTP 代理设置。 这必须是具有以下键的字典: “proxy_hostname” (str 值) 和 “proxy_port” (int 值) 。 此外,还可能存在以下密钥:“username”、“password”。

custom_endpoint_address
Optional[str]

用于与事件中心服务建立连接的自定义终结点地址,允许通过任何应用程序网关或主机环境所需的其他路径路由网络请求。 默认值为 None。 格式类似于“sb://< custom_endpoint_hostname>:<custom_endpoint_port>”。 如果未在 custom_endpoint_address中指定端口,则默认使用端口 443。

connection_verify
Optional[str]

SSL 证书的自定义CA_BUNDLE文件的路径,用于对连接终结点的标识进行身份验证。 默认值为 None,在这种情况下将使用 certifi.where ()

uamqp_transport
bool

是否使用 uamqp 库作为基础传输。 默认值为 False,纯 Python AMQP 库将用作基础传输。

socket_timeout
float

连接上的基础套接字在发送和接收数据时应在超时之前等待的时间(以秒为单位)。对于 TransportType.Amqp,默认值为 0.2,对于 TransportType.AmqpOverWebsocket,默认值为 1。 如果 EventHubsConnectionError 错误由于写入超时而发生,则可能需要传入大于默认值的值。 这适用于高级使用方案,通常默认值应足够。

示例

创建 EventHubProducerClient 的新实例。


   import os
   from azure.eventhub.aio import EventHubProducerClient, EventHubSharedKeyCredential

   fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
   shared_access_key = os.environ['EVENT_HUB_SAS_KEY']

   producer = EventHubProducerClient(fully_qualified_namespace=fully_qualified_namespace,
                                     eventhub_name=eventhub_name,
                                     credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key))

方法

close

关闭生成者客户端基础 AMQP 连接和链接。

create_batch

使用受max_size_in_bytes约束的所有内容的最大大小创建 EventDataBatch 对象。

max_size_in_bytes不应大于服务定义的允许的最大消息大小。

flush

仅缓冲模式。 刷新缓冲区中的事件,如果客户端在缓冲模式下工作,将立即发送。

from_connection_string

从连接字符串创建 EventHubProducerClient。

get_buffered_event_count

为给定分区缓冲并等待发布的事件数。 在非缓冲模式下返回 None。 注意:事件缓冲区在后台协同例程中处理,因此,此 API 报告的缓冲区中的事件数应仅被视为近似值,并且仅建议在调试中使用。 对于没有缓冲事件的分区 ID,无论该分区 ID 是否实际存在于事件中心内,都将返回 0。

get_eventhub_properties

获取事件中心的属性。

返回的字典中的键包括:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

get_partition_ids

获取事件中心的分区 ID。

get_partition_properties

获取指定分区的属性。

属性字典中的键包括:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

send_batch

发送一批事件数据。 默认情况下,方法将阻止,直到收到确认或操作超时。如果将 EventHubProducerClient 配置为在缓冲模式下运行,则 方法会将事件排队到本地缓冲区并返回。 生成者将在后台执行自动发送。

如果 buffered_mode 为 False, 则on_error 回调是可选的,将按如下所示处理错误:

  • 如果在生成者客户端实例化期间传递了 on_error 回调,

    然后,错误信息将传递到 on_error 回调,然后调用回调。

  • 如果在客户端实例化期间未传入 on_error 回调,

    则默认情况下将引发错误。

如果 buffered_mode 为 True,则需要 on_error 回调,并且将按如下所示处理错误:

  • 如果事件无法在给定的超时内排队,则将直接引发错误。

  • 如果在成功排队后未能发送事件,则将调用 on_error 回调。

在缓冲模式下,发送批将保持不变,并作为单个单元发送。 批不会重新排列。 这可能会导致发送事件效率低下。

如果要发送 EventDataAmqpAnnotatedMessage 的有限列表,并且知道该列表在事件中心帧大小限制内,则可以使用 send_batch 调用发送它们。 否则,使用 create_batch 创建 EventDataBatch 并将 EventDataAmqpAnnotatedMessage 逐个添加到批处理中,直到达到大小限制,然后调用此方法发送批处理。

send_event

发送事件数据。 默认情况下,方法将阻止,直到收到确认或操作超时。如果将 EventHubProducerClient 配置为在缓冲模式下运行,则 方法会将事件排队到本地缓冲区并返回 。 生成者将在后台执行自动批处理和发送。

如果 buffered_mode 为 False, 则on_error 回调是可选的,将按如下方式处理错误: * 如果在生成者客户端实例化期间传递 了on_error 回调,

       then error information will be passed to the *on_error* callback, which will then be called.

  * If an *on_error* callback is not passed in during client instantiation,

       then the error will be raised by default.

如果 buffered_mode 为 True,则需要 on_error 回调,错误将按如下方式进行处理:* 如果事件无法在给定超时内排队,则将直接引发错误。

  * If events fail to send after enqueuing successfully, the *on_error* callback will be called.

close

关闭生成者客户端基础 AMQP 连接和链接。

async close(*, flush: bool = True, **kwargs: Any) -> None

参数

flush
bool

仅缓冲模式。 如果设置为 True,则会立即发送缓冲区中的事件。 默认值为 True。

timeout
floatNone

仅缓冲模式。 关闭生成者的超时。 默认值为 None,这意味着没有超时。

返回类型

例外

如果在刷新设置为 True 时刷新缓冲区时出错,或在缓冲模式下关闭基础 AMQP 连接。

示例

关闭处理程序。


   import os
   from azure.eventhub.aio import EventHubProducerClient
   from azure.eventhub import EventData

   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']

   producer = EventHubProducerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )
   try:
       event_data_batch = await producer.create_batch()
       while True:
           try:
               event_data_batch.add(EventData('Message inside EventBatchData'))
           except ValueError:
               # The EventDataBatch object reaches its max_size.
               # You can send the full EventDataBatch object and create a new one here.
               break
       await producer.send_batch(event_data_batch)
   finally:
       # Close down the producer handler.
       await producer.close()

create_batch

使用受max_size_in_bytes约束的所有内容的最大大小创建 EventDataBatch 对象。

max_size_in_bytes不应大于服务定义的允许的最大消息大小。

async create_batch(*, partition_id: str | None = None, partition_key: str | None = None, max_size_in_bytes: int | None = None) -> EventDataBatch

返回类型

例外

如果在刷新设置为 True 时刷新缓冲区时出错,或在缓冲模式下关闭基础 AMQP 连接。

示例

在有限大小内创建 EventDataBatch 对象


       from azure.eventhub import EventData
       event_data_batch = await producer.create_batch()
       while True:
           try:
               event_data_batch.add(EventData('Message inside EventBatchData'))
           except ValueError:
               # The EventDataBatch object reaches its max_size.
               # You can send the full EventDataBatch object and create a new one here.
               break

flush

仅缓冲模式。 刷新缓冲区中的事件,如果客户端在缓冲模式下工作,将立即发送。

async flush(**kwargs: Any) -> None

参数

timeout
floatNone

刷新缓冲事件的超时,默认值为 None,这意味着没有超时。

返回类型

例外

如果生成者无法在缓冲模式下的给定超时内刷新缓冲区。

from_connection_string

从连接字符串创建 EventHubProducerClient。

from_connection_string(conn_str: str, *, eventhub_name: str | None = None, buffered_mode: bool = False, on_error: Callable[[List[EventData | AmqpAnnotatedMessage], str | None, Exception], Awaitable[None]] | None = None, on_success: Callable[[List[EventData | AmqpAnnotatedMessage], str | None], Awaitable[None]] | None = None, max_buffer_length: int | None = None, max_wait_time: float | None = None, logging_enable: bool = False, http_proxy: Dict[str, str | int] | None = None, auth_timeout: float = 60, user_agent: str | None = None, retry_total: int = 3, transport_type: TransportType = TransportType.Amqp, **kwargs: Any) -> EventHubProducerClient

参数

conn_str
str
必需

事件中心的连接字符串。

eventhub_name
str

要将客户端连接到的特定事件中心的路径。

buffered_mode
bool

如果为 True,则生成者客户端将在缓冲区中收集事件,有效地批处理,然后发布。 默认值为 False。

on_success
Optional[Callable[[<xref:SendEventTypes>, Optional[str]], Awaitable[None]]]

成功发布批处理后要调用的回调。 回调采用两个参数:

  • events:已成功发布的事件列表

  • partition_id:列表中事件已发布到的分区 ID。

回调函数的定义应如下所示: on_success (事件,partition_id) 。 当 buffered_mode 为 True 时,它是必需的,如果 buffered_mode 为 False,则 可选。

on_error
Optional[Callable[[<xref:SendEventTypes>, Optional[str], Exception], Awaitable[None]]]

未能发布批后要调用的回调。 回调函数的定义应如下所示: on_error (事件、partition_id、错误) ,其中:

  • events:未能发布的事件列表,

  • partition_id:已尝试将列表中的事件发布到 和 的分区 ID

  • error:与发送失败相关的异常。

如果 buffered_mode 为 False, 则on_error 回调是可选的,将按如下所示处理错误:

  • 如果在生成者客户端实例化期间传递on_error回调,

    然后,错误信息将传递到 on_error 回调,然后调用该回调。

  • 如果在客户端实例化期间未传入 on_error 回调,

    则默认情况下将引发错误。

如果 buffered_mode 为 True,则需要 on_error 回调,并将按如下所示处理错误:

  • 如果事件无法在给定的超时时间内排队,则将直接引发错误。

  • 如果成功排队后无法发送事件,将调用 on_error 回调。

max_buffer_length
int

仅缓冲模式。 触发刷新前可以缓冲的每个分区的事件总数。 缓冲模式下的默认值为 1500。

max_wait_time
Optional[float]

仅缓冲模式。 等待在发布之前使用缓冲区中的事件生成批处理的时间。 缓冲模式下的默认值为 1。

logging_enable
bool

是否将网络跟踪日志输出到记录器。 默认值为 False。

http_proxy
dict

HTTP 代理设置。 这必须是具有以下键的字典: “proxy_hostname” (str 值) 和 “proxy_port” (int 值) 。 此外,还可能存在以下密钥:“username”、“password”。

auth_timeout
float

等待令牌由服务授权的时间(以秒为单位)。 默认值为 60 秒。 如果设置为 0,则不会从客户端强制实施超时。

user_agent
str

如果指定,则会将其添加到用户代理字符串的前面。

retry_total
int

发生错误时重做失败操作的尝试总数。 默认值为 3。

retry_backoff_factor
float

第二次尝试后应用两次尝试之间的退让因素 (大多数错误都会立即通过第二次尝试来解决,且无延迟) 。 在固定模式下,重试策略将始终为 {backoff factor} 休眠。 在“指数”模式下,重试策略的睡眠时间为: {backoff factor} * (2 ** ({total retries} - 1) ) 秒。 如果backoff_factor为 0.1,则在两次重试之间,重试将休眠 [0.0s, 0.2s, 0.4s, ...]。 默认值为 0.8。

retry_backoff_max
float

最长回退时间。 默认值为 120 秒 (2 分钟) 。

retry_mode
str

重试尝试之间的延迟行为。 支持的值为“fixed”或“exponential”,默认值为“exponential”。

idle_timeout
float

超时(以秒为单位),在此之后,如果没有活动,此客户端将关闭基础连接。 默认情况下,该值为 None,这意味着除非由服务启动,否则客户端不会因为处于非活动状态而关闭。

transport_type
TransportType

将用于与事件中心服务通信的传输协议的类型。 默认值为 TransportType.Amqp ,在这种情况下,使用端口 5671。 如果端口 5671 在网络环境中不可用/被阻止,则可改用使用端口 443 进行通信的 TransportType.AmqpOverWebsocket

custom_endpoint_address
Optional[str]

用于与事件中心服务建立连接的自定义终结点地址,允许通过任何应用程序网关或主机环境所需的其他路径路由网络请求。 默认值为 None。 格式类似于“sb://< custom_endpoint_hostname>:<custom_endpoint_port>”。 如果未在 custom_endpoint_address指定端口,则默认使用端口 443。

connection_verify
Optional[str]

SSL 证书的自定义CA_BUNDLE文件的路径,该文件用于对连接终结点的标识进行身份验证。 默认值为 None,在这种情况下将使用 certifi.where ()

uamqp_transport
bool

是否使用 uamqp 库作为基础传输。 默认值为 False,纯 Python AMQP 库将用作基础传输。

返回类型

例外

如果在刷新设置为 True 时刷新缓冲区时出错,或在缓冲模式下关闭基础 AMQP 连接。

示例

从 连接字符串 创建 EventHubProducerClient 的新实例。


   import os
   from azure.eventhub.aio import EventHubProducerClient
   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   producer = EventHubProducerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

get_buffered_event_count

为给定分区缓冲并等待发布的事件数。 在非缓冲模式下返回 None。 注意:事件缓冲区在后台协同例程中处理,因此,此 API 报告的缓冲区中的事件数应仅被视为近似值,并且仅建议在调试中使用。 对于没有缓冲事件的分区 ID,无论该分区 ID 是否实际存在于事件中心内,都将返回 0。

get_buffered_event_count(partition_id: str) -> int | None

参数

partition_id
str
必需

目标分区 ID。

返回类型

int,

例外

如果在刷新设置为 True 时刷新缓冲区时出错,或在缓冲模式下关闭基础 AMQP 连接。

get_eventhub_properties

获取事件中心的属性。

返回的字典中的键包括:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

async get_eventhub_properties() -> Dict[str, Any]

返回

包含有关事件中心信息的字典。

返回类型

例外

get_partition_ids

获取事件中心的分区 ID。

async get_partition_ids() -> List[str]

返回

分区 ID 的列表。

返回类型

例外

get_partition_properties

获取指定分区的属性。

属性字典中的键包括:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

async get_partition_properties(partition_id: str) -> Dict[str, Any]

参数

partition_id
str
必需

目标分区 ID。

返回

分区属性的 dict。

返回类型

例外

send_batch

发送一批事件数据。 默认情况下,方法将阻止,直到收到确认或操作超时。如果将 EventHubProducerClient 配置为在缓冲模式下运行,则 方法会将事件排队到本地缓冲区并返回。 生成者将在后台执行自动发送。

如果 buffered_mode 为 False, 则on_error 回调是可选的,将按如下所示处理错误:

  • 如果在生成者客户端实例化期间传递了 on_error 回调,

    然后,错误信息将传递到 on_error 回调,然后调用回调。

  • 如果在客户端实例化期间未传入 on_error 回调,

    则默认情况下将引发错误。

如果 buffered_mode 为 True,则需要 on_error 回调,并且将按如下所示处理错误:

  • 如果事件无法在给定的超时内排队,则将直接引发错误。

  • 如果在成功排队后未能发送事件,则将调用 on_error 回调。

在缓冲模式下,发送批将保持不变,并作为单个单元发送。 批不会重新排列。 这可能会导致发送事件效率低下。

如果要发送 EventDataAmqpAnnotatedMessage 的有限列表,并且知道该列表在事件中心帧大小限制内,则可以使用 send_batch 调用发送它们。 否则,使用 create_batch 创建 EventDataBatch 并将 EventDataAmqpAnnotatedMessage 逐个添加到批处理中,直到达到大小限制,然后调用此方法发送批处理。

async send_batch(event_data_batch: EventDataBatch | List[EventData | AmqpAnnotatedMessage], **kwargs: Any) -> None

参数

event_data_batch
Union[EventDataBatch, List[Union[EventData, AmqpAnnotatedMessage]]
必需

要发送的 EventDataBatch 对象或要在批处理中发送的 EventData 列表。 列表中或 EventDataBatch 中的所有 EventDataAmqpAnnotatedMessage 都将位于同一分区上。

timeout
float

在非缓冲模式下发送事件数据的最大等待时间,或在缓冲模式下将事件数据排入缓冲区的最大等待时间。 在非缓冲模式下,将使用创建生成者时指定的默认等待时间。 在缓冲模式下,默认等待时间为 None。

partition_id
str

要发送到的特定分区 ID。 默认值为 None,在这种情况下,服务将使用轮循机制分配给所有分区。 如果指定了partition_id并且event_data_batch为 EventDataBatch,则引发 TypeError,因为 EventDataBatch 本身partition_id。

partition_key
str

使用给定partition_key,事件数据将发送到服务决定的事件中心的特定分区。 如果指定了partition_key,并且event_data_batch为 EventDataBatch,则会引发 TypeError,因为 EventDataBatch 本身partition_key。 如果同时提供partition_id和partition_key,则优先partition_id。 警告:不建议对要发送的事件设置非字符串值partition_key,因为事件中心服务将忽略partition_key,并且事件将使用轮循机制分配给所有分区。 此外,还有一些 SDK 用于使用预期partition_key仅为字符串类型的事件,它们可能无法分析非字符串值。

返回类型

例外

如果超时参数指定的值在以非缓冲模式发送事件之前已过,或者事件可以在缓冲模式下排队进入缓冲状态。

示例

异步发送事件数据


       async with producer:
           event_data_batch = await producer.create_batch()
           while True:
               try:
                   event_data_batch.add(EventData('Message inside EventBatchData'))
               except ValueError:
                   # The EventDataBatch object reaches its max_size.
                   # You can send the full EventDataBatch object and create a new one here.
                   break
           await producer.send_batch(event_data_batch)

send_event

发送事件数据。 默认情况下,方法将阻止,直到收到确认或操作超时。如果将 EventHubProducerClient 配置为在缓冲模式下运行,则 方法会将事件排队到本地缓冲区并返回 。 生成者将在后台执行自动批处理和发送。

如果 buffered_mode 为 False, 则on_error 回调是可选的,将按如下方式处理错误: * 如果在生成者客户端实例化期间传递 了on_error 回调,

       then error information will be passed to the *on_error* callback, which will then be called.

  * If an *on_error* callback is not passed in during client instantiation,

       then the error will be raised by default.

如果 buffered_mode 为 True,则需要 on_error 回调,错误将按如下方式进行处理:* 如果事件无法在给定超时内排队,则将直接引发错误。

  * If events fail to send after enqueuing successfully, the *on_error* callback will be called.
async send_event(event_data: EventData | AmqpAnnotatedMessage, **kwargs: Any) -> None

参数

event_data
Union[EventData, AmqpAnnotatedMessage]
必需

要发送的 EventData 对象。

timeout
float

在非缓冲模式下发送事件数据的最大等待时间,或在缓冲模式下将事件数据排队到缓冲区的最大等待时间。 在非缓冲模式下,将使用创建生成者时指定的默认等待时间。 在缓冲模式下,默认等待时间为“无”。

partition_id
str

要发送到的特定分区 ID。 默认值为 None,在这种情况下,服务将使用轮循机制分配给所有分区。 如果指定了partition_id并且event_data_batch是 EventDataBatch,则引发 TypeError,因为 EventDataBatch 本身具有partition_id。

partition_key
str

使用给定partition_key,事件数据将发送到服务决定的事件中心的特定分区。 如果指定了partition_key,并且event_data_batch为 EventDataBatch,则引发 TypeError,因为 EventDataBatch 本身具有partition_key。 如果同时提供partition_id和partition_key,则partition_id优先。 警告:不建议对要发送的事件设置非字符串值partition_key,因为事件中心服务将忽略partition_key,并且事件将使用轮循机制分配给所有分区。 此外,还有一些 SDK 用于使用预期partition_key仅为字符串类型的事件,它们可能无法分析非字符串值。

返回类型

例外

如果超时参数指定的值在非缓冲模式下发送事件之前已过,或者事件不能排队进入缓冲模式下的缓冲。

属性

total_buffered_event_count

所有分区中当前缓冲并等待发布的事件总数。 在非缓冲模式下返回 None。 注意:事件缓冲区在后台协同例程中处理,因此,此 API 报告的缓冲区中的事件数应仅被视为近似值,并且仅建议用于调试。

返回类型

int,