你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

EventHubConsumerClient 类

EventHubConsumerClient 类定义了一个高级接口,用于从Azure 事件中心服务接收事件。

EventHubConsumerClient main目标是使用负载均衡和检查点功能从 EventHub 的所有分区接收事件。

当多个 EventHubConsumerClient 实例针对同一个事件中心、使用者组和检查点位置运行时,分区将在它们之间均匀分布。

若要启用负载均衡和持久化检查点,必须在创建 EventHubConsumerClient 时设置checkpoint_store。 如果未提供检查点存储,则检查点将在内存中内部维护。

当您调用 EventHubConsumerClient 方法 receive () receive_batch () 并指定partition_id时,EventHubConsumerClient 也可以从特定分区接收。 负载均衡在单分区模式下不起作用。 但如果设置了checkpoint_store,用户仍然可以保存检查点。

继承
azure.eventhub._client_base.ClientBase
EventHubConsumerClient

构造函数

EventHubConsumerClient(fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, credential: CredentialTypes, **kwargs: Any)

参数

fully_qualified_namespace
str
必需

事件中心命名空间的完全限定主机名。 命名空间格式为: .servicebus.windows.net

eventhub_name
str
必需

要将客户端连接到的特定事件中心的路径。

consumer_group
str
必需

从事件中心接收此使用者组的事件。

credential
TokenCredentialAzureSasCredentialAzureNamedKeyCredential
必需

用于身份验证的凭据对象,该对象实现用于获取令牌的特定接口。 它接受 EventHubSharedKeyCredential由 azure 标识库生成的 、 或凭据对象,以及实现 *get_token (self、 scopes) 方法的对象。

logging_enable
bool

是否将网络跟踪日志输出到记录器。 默认值为 False。

auth_timeout
float

等待令牌由服务授权的时间(以秒为单位)。 默认值为 60 秒。 如果设置为 0,则不会从客户端强制实施超时。

user_agent
str

如果指定,则会将其添加到用户代理字符串的前面。

retry_total
int

发生错误时恢复失败操作的尝试总数。 默认值为 3。 接收中 retry_total 的上下文很特殊: 接收 方法是通过每次迭代中调用内部接收方法的 while 循环实现的。 在 接收 情况下, retry_total 指定 while 循环中内部接收方法引发的错误后重试次数。 如果重试尝试已用尽,则将调用 on_error 回调 ((如果) 提供错误信息)。 如果提供 ) ,则将关闭失败的内部分区使用者, (将调用on_partition_close并创建新的内部分区使用者, (如果提供 on_partition_initialize) 来恢复接收,则将调用on_partition_initialize。

retry_backoff_factor
float

第二次尝试后在尝试之间应用的退让因素 (大多数错误通过第二次尝试立即解决,无需延迟) 。 在固定模式下,重试策略将始终为 {backoff factor} 休眠。 在“指数”模式下,重试策略将休眠: {backoff factor} * (2 ** ({总重试次数} - 1) ) 秒。 如果backoff_factor为 0.1,则重试将在两次重试之间休眠 [0.0s, 0.2s, 0.4s, ...]。 默认值为 0.8。

retry_backoff_max
float

最长回退时间。 默认值为 120 秒 (2 分钟) 。

retry_mode
str

重试尝试之间的延迟行为。 支持的值是“fixed”或“exponential”,其中默认值为“exponential”。

idle_timeout
float

超时(以秒为单位),之后,如果没有其他活动,此客户端将关闭基础连接。 默认情况下,该值为 None,这意味着除非由服务启动,否则客户端不会因为处于非活动状态而关闭。

transport_type
TransportType

将用于与事件中心服务通信的传输协议的类型。 默认值为 TransportType.Amqp ,在这种情况下,使用端口 5671。 如果端口 5671 在网络环境中不可用/被阻止,则可改用使用端口 443 进行通信的 TransportType.AmqpOverWebsocket

http_proxy
dict[str, strint]

HTTP 代理设置。 这必须是具有以下键的字典: “proxy_hostname” (str 值) 和 “proxy_port” (int 值) 。 此外,还可能存在以下密钥:“username”、“password”。

checkpoint_store
CheckpointStoreNone

一个管理器,用于在接收事件时存储分区负载均衡和检查点数据。 检查点存储将用于从所有分区或单个分区接收的两种情况。 在后一种情况下,负载均衡不适用。 如果未提供检查点存储,检查点将在内存中内部维护, EventHubConsumerClient 实例将接收不进行负载均衡的事件。

load_balancing_interval
float

当负载均衡启动时。 这是两次负载均衡评估之间的间隔(以秒为单位)。 默认值为 30 秒。

partition_ownership_expiration_interval
float

分区所有权将在该秒数后过期。 每次负载均衡评估都会自动延长所有权过期时间。 默认值为 6 * load_balancing_interval,即使用 30 秒的默认load_balancing_interval时为 180 秒。

load_balancing_strategy
strLoadBalancingStrategy

启动负载均衡时,它将使用此策略来声明和平衡分区所有权。 将“greedy”或 LoadBalancingStrategy.GREEDY 用于贪婪策略,对于每次负载均衡评估,该策略将获取均衡负载所需的任意数量的未认领分区。 对于均衡策略,请使用“balanced”或 LoadBalancingStrategy.BALANCED ,该策略每次进行负载均衡评估时,只声明其他 EventHubConsumerClient 未声明的一个分区。 如果 EventHub 的所有分区都由其他 EventHubConsumerClient 声明,并且此客户端声明的分区太少,则无论负载均衡策略如何,此客户端都将从其他客户端中窃取一个分区进行每次负载均衡评估。 默认情况下使用贪婪策略。

custom_endpoint_address
strNone

用于与事件中心服务建立连接的自定义终结点地址,允许通过任何应用程序网关或主机环境所需的其他路径路由网络请求。 默认值为 None。 格式类似于“sb://< custom_endpoint_hostname>:<custom_endpoint_port>”。 如果未在 custom_endpoint_address中指定端口,则默认使用端口 443。

connection_verify
strNone

SSL 证书的自定义CA_BUNDLE文件的路径,用于对连接终结点的标识进行身份验证。 默认值为 None,在这种情况下将使用 certifi.where ()

uamqp_transport
bool

是否使用 uamqp 库作为基础传输。 默认值为 False,纯 Python AMQP 库将用作基础传输。

socket_timeout
float

连接上的基础套接字在发送和接收数据时应在超时之前等待的时间(以秒为单位)。对于 TransportType.Amqp,默认值为 0.2,对于 TransportType.AmqpOverWebsocket,默认值为 1。 如果 EventHubsConnectionError 错误由于写入超时而发生,则可能需要传入大于默认值的值。 这适用于高级使用方案,通常默认值应足够。

示例

创建 EventHubConsumerClient 的新实例。


   import os
   from azure.eventhub import EventHubConsumerClient, EventHubSharedKeyCredential

   fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
   shared_access_key = os.environ['EVENT_HUB_SAS_KEY']

   credential = EventHubSharedKeyCredential(shared_access_policy, shared_access_key)
   consumer = EventHubConsumerClient(
       fully_qualified_namespace=fully_qualified_namespace,
       eventhub_name=eventhub_name,
       consumer_group='$Default',
       credential=credential)

方法

close

停止从事件中心检索事件,并关闭基础 AMQP 连接和链接。

from_connection_string

从连接字符串创建 EventHubConsumerClient。

get_eventhub_properties

获取事件中心的属性。

返回的字典中的键包括:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

get_partition_ids

获取事件中心的分区 ID。

get_partition_properties

获取指定分区的属性。

属性字典中的键包括:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

receive

使用可选的负载均衡和检查点从分区 () 接收事件。

receive_batch

使用可选的负载均衡和检查点从分区 () 接收事件。

close

停止从事件中心检索事件,并关闭基础 AMQP 连接和链接。

close() -> None

返回类型

示例

关闭客户端。


   import os
   import threading

   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']

   from azure.eventhub import EventHubConsumerClient
   consumer = EventHubConsumerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       consumer_group="$Default",
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

   logger = logging.getLogger("azure.eventhub")

   def on_event(partition_context, event):
       # Put your code here.
       # If the operation is i/o intensive, multi-thread will have better performance.
       logger.info("Received event from partition: {}".format(partition_context.partition_id))

   # The 'receive' method is a blocking call, it can be executed in a thread for
   # non-blocking behavior, and combined with the 'close' method.

   worker = threading.Thread(
       target=consumer.receive,
       kwargs={
           "on_event": on_event,
           "starting_position": "-1",  # "-1" is from the beginning of the partition.
       }
   )
   worker.start()
   time.sleep(10)  # Keep receiving for 10s then close.
   # Close down the consumer handler explicitly.
   consumer.close()

from_connection_string

从连接字符串创建 EventHubConsumerClient。

from_connection_string(conn_str: str, consumer_group: str, **kwargs: Any) -> EventHubConsumerClient

参数

conn_str
str
必需

事件中心的连接字符串。

consumer_group
str
必需

从事件中心接收此使用者组的事件。

eventhub_name
str

要将客户端连接到的特定事件中心的路径。

logging_enable
bool

是否将网络跟踪日志输出到记录器。 默认值为 False。

auth_timeout
float

等待令牌由服务授权的时间(以秒为单位)。 默认值为 60 秒。 如果设置为 0,则不会从客户端强制实施超时。

user_agent
str

如果指定,则会将其添加到用户代理字符串的前面。

retry_total
int

发生错误时恢复失败操作的尝试总数。 默认值为 3。 接收中 retry_total 的上下文很特殊: 接收 方法是通过每次迭代中调用内部接收方法的 while 循环实现的。 在 接收 情况下, retry_total 指定 while 循环中内部接收方法引发的错误后重试次数。 如果重试尝试已用尽,则将调用 on_error 回调 ((如果) 提供错误信息)。 如果提供 ) ,则将关闭失败的内部分区使用者, (将调用on_partition_close并创建新的内部分区使用者, (如果提供 on_partition_initialize) 来恢复接收,则将调用on_partition_initialize。

retry_backoff_factor
float

第二次尝试后在尝试之间应用的退让因素 (大多数错误通过第二次尝试立即解决,无需延迟) 。 在固定模式下,重试策略将始终为 {backoff factor} 休眠。 在“指数”模式下,重试策略将休眠: {backoff factor} * (2 ** ({总重试次数} - 1) ) 秒。 如果backoff_factor为 0.1,则重试将在两次重试之间休眠 [0.0s, 0.2s, 0.4s, ...]。 默认值为 0.8。

retry_backoff_max
float

最长回退时间。 默认值为 120 秒 (2 分钟) 。

retry_mode
str

重试尝试之间的延迟行为。 支持的值是“fixed”或“exponential”,其中默认值为“exponential”。

idle_timeout
float

超时(以秒为单位),之后,如果没有远端活动,则此客户端将关闭基础连接。 默认情况下,该值为 None,这意味着除非由服务启动,否则客户端不会因为处于非活动状态而关闭。

transport_type
TransportType

将用于与事件中心服务通信的传输协议的类型。 默认值为 TransportType.Amqp ,在这种情况下,使用端口 5671。 如果端口 5671 在网络环境中不可用/被阻止,则可改用使用端口 443 进行通信的 TransportType.AmqpOverWebsocket

http_proxy
dict

HTTP 代理设置。 这必须是具有以下键的字典: “proxy_hostname” (str 值) 和 “proxy_port” (int 值) 。 此外,还可能存在以下密钥:“username”、“password”。

checkpoint_store
CheckpointStoreNone

一个管理器,用于在接收事件时存储分区负载均衡和检查点数据。 检查点存储将用于从所有分区或单个分区接收的两种情况。 在后一种情况下,负载均衡不适用。 如果未提供检查点存储,检查点将在内存中内部维护, EventHubConsumerClient 实例将接收不进行负载均衡的事件。

load_balancing_interval
float

当负载均衡启动时。 这是两次负载均衡评估之间的间隔(以秒为单位)。 默认值为 10 秒。

partition_ownership_expiration_interval
float

分区所有权将在该秒数后过期。 每次负载均衡评估都会自动延长所有权过期时间。 默认值为 6 * load_balancing_interval,即使用 30 秒的默认load_balancing_interval时为 60 秒。

load_balancing_strategy
strLoadBalancingStrategy

启动负载均衡时,它将使用此策略来声明和平衡分区所有权。 将“greedy”或 LoadBalancingStrategy.GREEDY 用于贪婪策略,对于每次负载均衡评估,该策略将获取均衡负载所需的任意数量的未认领分区。 对于均衡策略,请使用“balanced”或 LoadBalancingStrategy.BALANCED ,该策略每次进行负载均衡评估时,只声明其他 EventHubConsumerClient 未声明的一个分区。 如果 EventHub 的所有分区都由其他 EventHubConsumerClient 声明,并且此客户端声明的分区太少,则无论负载均衡策略如何,此客户端都将从其他客户端中窃取一个分区进行每次负载均衡评估。 默认情况下使用贪婪策略。

custom_endpoint_address
strNone

用于与事件中心服务建立连接的自定义终结点地址,允许通过任何应用程序网关或主机环境所需的其他路径路由网络请求。 默认值为 None。 格式类似于“sb://< custom_endpoint_hostname>:<custom_endpoint_port>”。 如果未在 custom_endpoint_address中指定端口,则默认使用端口 443。

connection_verify
strNone

SSL 证书的自定义CA_BUNDLE文件的路径,用于对连接终结点的标识进行身份验证。 默认值为 None,在这种情况下将使用 certifi.where ()

uamqp_transport
bool

是否使用 uamqp 库作为基础传输。 默认值为 False,纯 Python AMQP 库将用作基础传输。

返回类型

示例

从 连接字符串 创建 EventHubConsumerClient 的新实例。


   import os
   from azure.eventhub import EventHubConsumerClient
   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   consumer = EventHubConsumerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       consumer_group='$Default',
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

get_eventhub_properties

获取事件中心的属性。

返回的字典中的键包括:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

get_eventhub_properties() -> Dict[str, Any]

返回

包含有关事件中心信息的字典。

返回类型

例外

get_partition_ids

获取事件中心的分区 ID。

get_partition_ids() -> List[str]

返回

分区 ID 的列表。

返回类型

例外

get_partition_properties

获取指定分区的属性。

属性字典中的键包括:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

get_partition_properties(partition_id: str) -> Dict[str, Any]

参数

partition_id
str
必需

目标分区 ID。

返回

包含分区属性的字典。

返回类型

例外

receive

使用可选的负载均衡和检查点从分区 () 接收事件。

receive(on_event: Callable[[PartitionContext, EventData | None], None], **kwargs: Any) -> None

参数

on_event
callable[PartitionContext, EventDataNone]
必需

用于处理收到的事件的回调函数。 回调采用两个参数:包含分区上下文的partition_context和接收的事件。 回调函数的定义应如下所示: on_event (partition_context、事件) 。 有关详细的分区上下文信息,请参阅 PartitionContext

max_wait_time
float

事件处理程序在调用回调之前等待的最大间隔(秒)。 如果在此间隔内未收到任何事件,则将使用 None 调用on_event回调。 如果此值设置为 None 或 0 (默认) ,则在收到事件之前不会调用回调。

partition_id
str

如果指定,客户端将仅从此分区接收。 否则,客户端将从所有分区接收。

owner_level
int

独占使用者的优先级。 如果设置了owner_level,则会创建独占使用者。 具有较高owner_level的使用者具有较高的独占优先级。 所有者级别也称为使用者的“纪元值”。

prefetch
int

要从服务中预提取以进行处理的事件数。 默认值为 300。

track_last_enqueued_event_properties
bool

指示使用者是否应请求有关其关联分区上最后排队的事件的信息,并在收到事件时跟踪该信息。 跟踪有关最后排队事件的分区信息时,从事件中心服务接收的每个事件都将携带有关分区的元数据。 这会导致少量额外的网络带宽消耗,在考虑与使用事件中心客户端定期发出分区属性请求时,这通常是一种有利的权衡。 默认情况下,它设置为 False

starting_position
str, int, datetimedict[str,any]

如果没有分区的检查点数据,请从此事件位置开始接收。 如果可用,将使用检查点数据。 这可以是分区 ID 作为键、位置作为单个分区的值的听写,也可以是所有分区的单个值。 值类型可以是 str、int 或 datetime.datetime。 还支持从流开头接收的值“-1”和“@latest”,用于仅接收新事件。 默认值为“@latest”。

starting_position_inclusive
booldict[str,bool]

确定给定starting_position是否为非独占 (>=) (>) 。 对于非独占,为 True,对于独占,则为 False。 这可以是分区 ID 作为键,bool 作为值的听写,该值指示特定分区的starting_position是否为非独占分区。 对于所有starting_position,也可以是单个布尔值。 默认值为 False。

on_error
callable[[PartitionContext, Exception]]

在重试尝试用尽后接收期间或负载均衡过程中引发错误时调用的回调函数。 回调采用两个参数: 包含 分区信息的partition_context和 异常错误 。 如果在负载均衡过程中引发错误,partition_context可能为 None。 回调的定义应如下所示: on_error (partition_context、错误) 。 如果在 on_event 回调期间引发未经处理的异常,也会调用 on_error 回调。

on_partition_initialize
callable[[PartitionContext]]

在某个分区的使用者完成初始化后将调用的回调函数。 创建新的内部分区使用者以接管失败且已关闭的内部分区使用者的接收进程时,也会调用它。 回调采用单个参数: partition_context ,其中包含分区信息。 回调的定义应如下所示: on_partition_initialize (partition_context)

on_partition_close
callable[[PartitionContext, CloseReason]]

关闭某个分区的使用者后将调用的回调函数。 在重试尝试用尽后接收期间引发错误时,也会调用它。 回调采用两个参数: partition_context ,其中包含分区信息和关闭 原因 。 回调的定义应如下所示: on_partition_close (partition_context、原因) 。 有关各种结束原因, CloseReason 请参阅 。

返回类型

示例

从 EventHub 接收事件。


       logger = logging.getLogger("azure.eventhub")

       def on_event(partition_context, event):
           # Put your code here.
           # If the operation is i/o intensive, multi-thread will have better performance.
           logger.info("Received event from partition: {}".format(partition_context.partition_id))

       with consumer:
           consumer.receive(on_event=on_event)

receive_batch

使用可选的负载均衡和检查点从分区 () 接收事件。

receive_batch(on_event_batch: Callable[[PartitionContext, List[EventData]], None], **kwargs: Any) -> None

参数

on_event_batch
callable[PartitionContext, list[EventData]]
必需

用于处理一批收到的事件的回调函数。 回调采用两个参数:包含分区上下文 的partition_contextevent_batch(即接收的事件)。 回调函数的定义应如下所示: on_event_batch (partition_context,event_batch) 。 如果max_wait_time不为 None 或 0,并且max_wait_time后未收到任何事件,则event_batch可能是空列表。 有关详细的分区上下文信息,请参阅 PartitionContext

max_batch_size
int

批处理中传递到回调 on_event_batch的最大事件数。 如果实际接收的事件数大于 max_batch_size,则接收的事件将分为多个批,并为每个批调用回调,最多 max_batch_size 个事件。

max_wait_time
float

事件处理程序在调用回调之前等待的最大间隔(秒)。 如果在此时间间隔内未收到任何事件,则将使用空列表调用 on_event_batch 回调。

partition_id
str

如果指定,客户端将仅从此分区接收。 否则,客户端将从所有分区接收。

owner_level
int

独占使用者的优先级。 如果设置了owner_level,则会创建独占使用者。 具有较高owner_level的使用者具有较高的独占优先级。 所有者级别也称为使用者的“纪元值”。

prefetch
int

要从服务中预提取以进行处理的事件数。 默认值为 300。

track_last_enqueued_event_properties
bool

指示使用者是否应请求有关其关联分区上最后排队的事件的信息,并在收到事件时跟踪该信息。 跟踪有关最后排队事件的分区信息时,从事件中心服务接收的每个事件都将携带有关分区的元数据。 这会导致少量额外的网络带宽消耗,在考虑与使用事件中心客户端定期发出分区属性请求时,这通常是一种有利的权衡。 默认情况下,它设置为 False

starting_position
str, int, datetimedict[str,any]

如果没有分区的检查点数据,请从此事件位置开始接收。 如果可用,将使用检查点数据。 这可以是分区 ID 作为键、位置作为单个分区的值的听写,也可以是所有分区的单个值。 值类型可以是 str、int 或 datetime.datetime。 还支持从流开头接收的值“-1”和“@latest”,用于仅接收新事件。 默认值为“@latest”。

starting_position_inclusive
booldict[str,bool]

确定给定starting_position是否为非独占 (>=) (>) 。 对于非独占,为 True,对于独占,则为 False。 这可以是分区 ID 作为键,bool 作为值的听写,该值指示特定分区的starting_position是否为非独占分区。 对于所有starting_position,也可以是单个布尔值。 默认值为 False。

on_error
callable[[PartitionContext, Exception]]

在重试尝试用尽后接收期间或负载均衡过程中引发错误时调用的回调函数。 回调采用两个参数: 包含 分区信息的partition_context和 异常错误 。 如果在负载均衡过程中引发错误,partition_context可能为 None。 回调的定义应如下所示: on_error (partition_context、错误) 。 如果在 on_event 回调期间引发未经处理的异常,也会调用 on_error 回调。

on_partition_initialize
callable[[PartitionContext]]

在某个分区的使用者完成初始化后将调用的回调函数。 创建新的内部分区使用者以接管失败且已关闭的内部分区使用者的接收进程时,也会调用它。 回调采用单个参数: partition_context ,其中包含分区信息。 回调的定义应如下所示: on_partition_initialize (partition_context)

on_partition_close
callable[[PartitionContext, CloseReason]]

关闭某个分区的使用者后将调用的回调函数。 在重试尝试用尽后接收期间引发错误时,也会调用它。 回调采用两个参数: partition_context ,其中包含分区信息和关闭 原因 。 回调的定义应如下所示: on_partition_close (partition_context、原因) 。 有关各种结束原因, CloseReason 请参阅 。

返回类型

示例

从 EventHub 分批接收事件。


       logger = logging.getLogger("azure.eventhub")

       def on_event_batch(partition_context, event_batch):
           # Put your code here.
           # If the operation is i/o intensive, multi-thread will have better performance.
           logger.info("Received events from partition: {}".format(partition_context.partition_id))

       with consumer:
           consumer.receive_batch(on_event_batch=on_event_batch)