EventHubConsumerClient 類別
EventHubConsumerClient 類別會定義從Azure 事件中樞服務接收事件的高階介面。
EventHubConsumerClient的主要目標是從具有負載平衡和檢查點的 EventHub 的所有分割區接收事件。
當多個 EventHubConsumerClient 實例針對相同的事件中樞、取用者群組和檢查點位置執行時,分割區會平均分散于其中。
若要啟用負載平衡和保存的檢查點,必須在建立 EventHubConsumerClient時設定checkpoint_store。 如果未提供檢查點存放區,則會在記憶體內部維護檢查點。
當您呼叫其方法receive () 或 receive_batch () 並指定partition_id時,EventHubConsumerClient也可以從特定分割區接收。 負載平衡無法在單一資料分割模式中運作。 但如果已設定checkpoint_store,使用者仍然可以儲存檢查點。
- 繼承
-
azure.eventhub._client_base.ClientBaseEventHubConsumerClient
建構函式
EventHubConsumerClient(fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, credential: CredentialTypes, **kwargs: Any)
參數
- credential
- TokenCredential 或 AzureSasCredential 或 AzureNamedKeyCredential
用於驗證的認證物件,會實作用於取得權杖的特定介面。 它會接受 EventHubSharedKeyCredential azure 身分識別程式庫所產生的認證物件,以及實作 *get_token (自我、 範圍) 方法的物件。
- logging_enable
- bool
是否要將網路追蹤記錄輸出至記錄器。 預設值為 False。
- auth_timeout
- float
等候服務授權權杖的秒數。 預設值為 60 秒。 如果設定為 0,則不會從用戶端強制執行逾時。
- user_agent
- str
如果指定,這會在使用者代理程式字串前面新增。
- retry_total
- int
發生錯誤時,重做失敗作業的嘗試總數。 預設值為 3。 接收中 retry_total 的內容特別: 接收 方法是由在每次反復專案中呼叫內部接收方法的 while 迴圈實作。 在 接收 案例中, retry_total 指定 while-loop 中內部接收方法引發錯誤之後重試的數目。 如果重試嘗試用盡,如果提供 錯誤資訊) , 則會呼叫on_error回呼 (。 如果提供) ,將會關閉失敗的內部分割區取用者 , (on_partition_close將會呼叫失敗的內部分割區取用者, (on_partition_initialize如果) 提供繼續接收 ,則會呼叫 on_partition_initialize。
- retry_backoff_factor
- float
第二次嘗試之後套用的輪詢因素 (大部分錯誤都會立即由第二次嘗試解決,而不會延遲) 。 在固定模式中,重試原則一律會睡眠 {backoff factor}。 在 「指數」模式中,重試原則會睡眠: {backoff factor} * (2 ** ({total retries} - 1) ) 秒。 如果backoff_factor為 0.1,則重試會在重試之間進入 [0.0s、0.2s、0.4s...] 的睡眠狀態。 預設值為 0.8。
- retry_backoff_max
- float
回溯時間上限。 預設值為 120 秒, (2 分鐘) 。
- retry_mode
- str
重試嘗試之間的延遲行為。 支援的值為 'fixed' 或 'exponential',其中預設值為 'exponential'。
- idle_timeout
- float
逾時,以秒為單位,在此之後,如果沒有進一步的活動,此用戶端將會關閉基礎連線。 根據預設,此值為 None,這表示除非由服務起始,否則用戶端不會因為閒置而關閉。
- transport_type
- TransportType
將用於與事件中樞服務通訊的傳輸通訊協定類型。 預設值為 TransportType.Amqp ,在此情況下會使用埠 5671。 如果網路環境中無法使用/封鎖埠 5671,則可改用 TransportType.AmqpOverWebsocket 來使用埠 443 進行通訊。
HTTP Proxy 設定。 這必須是具有下列索引鍵的字典: 'proxy_hostname' (str 值) 和 'proxy_port' (int 值) 。 此外,也可能有下列金鑰: 'username'、'password'。
- checkpoint_store
- CheckpointStore 或 None
在接收事件時儲存資料分割負載平衡和檢查點資料的管理員。 檢查點存放區將用於從所有分割區或單一分割區接收的兩種情況下。 在後者的情況下,不會套用負載平衡。 如果未提供檢查點存放區,則會在記憶體內部維護檢查點, 而 EventHubConsumerClient 實例將會接收事件,而不會進行負載平衡。
- load_balancing_interval
- float
負載平衡開始時。 這是兩個負載平衡評估之間的間隔,以秒為單位。 預設值為 30 秒。
- partition_ownership_expiration_interval
- float
分割區擁有權會在此秒數之後到期。 每個負載平衡評估都會自動延長擁有權到期時間。 預設值為 6 * load_balancing_interval,也就是使用預設load_balancing_interval為 30 秒時的 180 秒。
- load_balancing_strategy
- str 或 LoadBalancingStrategy
當負載平衡開始時,它會使用此策略來宣告和平衡資料分割擁有權。 針對每個負載平衡評估,請使用 「gredy」 或 LoadBalancingStrategy.GREDY 來取得所有負載平衡評估所需的未重載資料分割。 針對平衡策略使用 「balancer」 或 LoadBalancingStrategy.BALANCER ,針對每個負載平衡評估,只會宣告其他 EventHubConsumerClient未宣告的一個分割區。 如果 EventHub 的所有分割區是由其他 EventHubConsumerClient 宣告,而且此用戶端宣告的資料分割太少,則不論負載平衡策略為何,此用戶端都會從其他用戶端竊取一個分割區,以進行每個負載平衡評估。 預設會使用窮盡策略。
用來建立事件中樞服務的連線的自訂端點位址,允許網路要求透過任何應用程式閘道或其他主機環境所需的路徑路由傳送。 預設值為 None。 格式會像 「sb:// < custom_endpoint_hostname > : < custom_endpoint_port > 」。 如果未在 custom_endpoint_address中指定埠,則會預設使用埠 443。
SSL 憑證的自訂CA_BUNDLE檔案路徑,用來驗證連線端點的身分識別。 預設值為 None, 其中會使用 certifi.where () 。
- uamqp_transport
- bool
是否要使用 uamqp 程式庫作為基礎傳輸。 預設值為 False,且純 Python AMQP 程式庫將作為基礎傳輸。
- socket_timeout
- float
連線上基礎通訊端在傳送和接收資料時應該等待的時間,以秒為單位逾時。TransportType.Amqp 的預設值為 0.2,而 TransportType.AmqpOverWebsocket 的預設值為 1。 如果 EventHubsConnectionError 錯誤因為寫入逾時而發生,可能需要傳入大於預設值。 這適用于進階使用案例,而且預設值通常應該足夠。
範例
建立 EventHubConsumerClient 的新實例。
import os
from azure.eventhub import EventHubConsumerClient, EventHubSharedKeyCredential
fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
eventhub_name = os.environ['EVENT_HUB_NAME']
shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
shared_access_key = os.environ['EVENT_HUB_SAS_KEY']
credential = EventHubSharedKeyCredential(shared_access_policy, shared_access_key)
consumer = EventHubConsumerClient(
fully_qualified_namespace=fully_qualified_namespace,
eventhub_name=eventhub_name,
consumer_group='$Default',
credential=credential)
方法
close |
停止從事件中樞擷取事件,並關閉基礎 AMQP 連線和連結。 |
from_connection_string |
從連接字串建立 EventHubConsumerClient。 |
get_eventhub_properties |
取得事件中樞的屬性。 傳回字典中的索引鍵包括:
|
get_partition_ids |
取得事件中樞的資料分割識別碼。 |
get_partition_properties |
取得指定之分割區的屬性。 屬性字典中的索引鍵包括:
|
receive |
使用選擇性的負載平衡和檢查點,從分割區 () 接收事件。 |
receive_batch |
使用選擇性的負載平衡和檢查點,從分割區 () 接收事件。 |
close
停止從事件中樞擷取事件,並關閉基礎 AMQP 連線和連結。
close() -> None
傳回類型
範例
關閉用戶端。
import os
import threading
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
from azure.eventhub import EventHubConsumerClient
consumer = EventHubConsumerClient.from_connection_string(
conn_str=event_hub_connection_str,
consumer_group="$Default",
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
logger = logging.getLogger("azure.eventhub")
def on_event(partition_context, event):
# Put your code here.
# If the operation is i/o intensive, multi-thread will have better performance.
logger.info("Received event from partition: {}".format(partition_context.partition_id))
# The 'receive' method is a blocking call, it can be executed in a thread for
# non-blocking behavior, and combined with the 'close' method.
worker = threading.Thread(
target=consumer.receive,
kwargs={
"on_event": on_event,
"starting_position": "-1", # "-1" is from the beginning of the partition.
}
)
worker.start()
time.sleep(10) # Keep receiving for 10s then close.
# Close down the consumer handler explicitly.
consumer.close()
from_connection_string
從連接字串建立 EventHubConsumerClient。
from_connection_string(conn_str: str, consumer_group: str, **kwargs: Any) -> EventHubConsumerClient
參數
- eventhub_name
- str
要連接用戶端的特定事件中樞路徑。
- logging_enable
- bool
是否要將網路追蹤記錄輸出至記錄器。 預設值為 False。
- auth_timeout
- float
等候服務授權權杖的秒數。 預設值為 60 秒。 如果設定為 0,則不會從用戶端強制執行逾時。
- user_agent
- str
如果指定,這會在使用者代理程式字串前面新增。
- retry_total
- int
發生錯誤時,重做失敗作業的嘗試總數。 預設值為 3。 接收中 retry_total 的內容特別: 接收 方法是由在每次反復專案中呼叫內部接收方法的 while 迴圈實作。 在 接收 案例中, retry_total 指定 while-loop 中內部接收方法引發錯誤之後重試的數目。 如果重試嘗試用盡,如果提供 錯誤資訊) , 則會呼叫on_error回呼 (。 如果提供) ,將會關閉失敗的內部分割區取用者 , (on_partition_close將會呼叫失敗的內部分割區取用者, (on_partition_initialize如果) 提供繼續接收 ,則會呼叫 on_partition_initialize。
- retry_backoff_factor
- float
第二次嘗試之後套用的輪詢因素 (大部分錯誤都會立即由第二次嘗試解決,而不會延遲) 。 在固定模式中,重試原則一律會睡眠 {backoff factor}。 在 「指數」模式中,重試原則會睡眠: {backoff factor} * (2 ** ({total retries} - 1) ) 秒。 如果backoff_factor為 0.1,則重試會在重試之間睡眠 [0.0s, 0.2s, 0.4s, ...]。 預設值為 0.8。
- retry_backoff_max
- float
退班時間上限。 預設值為 120 秒, (2 分鐘) 。
- retry_mode
- str
重試嘗試之間的延遲行為。 支援的值為 'fixed' 或 'exponential',其中預設值為 'exponential'。
- idle_timeout
- float
逾時,以秒為單位,在此之後,如果沒有任何活動,此用戶端將會關閉基礎連線。 根據預設,此值為 None,表示除非服務起始,否則用戶端不會因為閒置而關閉。
- transport_type
- TransportType
將用於與事件中樞服務通訊的傳輸通訊協定類型。 預設值為 TransportType.Amqp ,在此情況下會使用埠 5671。 如果網路環境中無法使用/封鎖埠 5671,則可改用 TransportType.AmqpOverWebsocket 來使用埠 443 進行通訊。
- http_proxy
- dict
HTTP Proxy 設定。 這必須是具有下列索引鍵的字典: 'proxy_hostname' ( str 值) 和 'proxy_port' (int 值) 。 此外,也可能會出現下列金鑰: 'username'、'password'。
- checkpoint_store
- CheckpointStore 或 None
在接收事件時儲存分割區負載平衡和檢查點資料的管理員。 檢查點存放區將會用於從所有分割區或單一分割區接收的兩種情況。 在後者的情況下,不會套用負載平衡。 如果未提供檢查點存放區,則會在記憶體內部維護檢查點,而且 EventHubConsumerClient 實例將會收到事件,而不會進行負載平衡。
- load_balancing_interval
- float
負載平衡啟動時。 這是兩個負載平衡評估之間的間隔,以秒為單位。 預設值為 10 秒。
- partition_ownership_expiration_interval
- float
分割區擁有權會在此秒數之後過期。 每個負載平衡評估都會自動延長擁有權到期時間。 預設值為 6 * load_balancing_interval,也就是使用預設load_balancing_interval為 30 秒時的 60 秒。
- load_balancing_strategy
- str 或 LoadBalancingStrategy
當負載平衡開始時,它會使用此策略來宣告和平衡分割區擁有權。 使用 「gredy」 或 LoadBalancingStrategy.GREDY 作為窮盡策略,針對每個負載平衡評估,將會擷取平衡負載所需的未重載分割區數目。 針對平衡策略使用 「balanced」 或 LoadBalancingStrategy.BALANCED ,針對每個負載平衡評估,只會宣告其他 EventHubConsumerClient未宣告的一個分割區。 如果 EventHub 的所有分割區都由其他 EventHubConsumerClient 宣告,而且此用戶端宣告了太少的資料分割,則不論負載平衡策略為何,此用戶端都會從其他用戶端竊取一個分割區, 預設會使用窮盡策略。
用來建立事件中樞服務的連線的自訂端點位址,允許透過任何應用程式閘道或其他主機環境所需的路徑路由傳送網路要求。 預設值為 None。 格式會像 「sb:// < custom_endpoint_hostname > : < custom_endpoint_port > 」。 如果未在 custom_endpoint_address中指定埠,預設會使用埠 443。
SSL 憑證之自訂CA_BUNDLE檔案的路徑,用來驗證連線端點的身分識別。 預設值為 None,在此情況下會使用 certifi.where () 。
- uamqp_transport
- bool
是否要使用 uamqp 程式庫作為基礎傳輸。 預設值為 False,而純 Python AMQP 程式庫將作為基礎傳輸。
傳回類型
範例
從 連接字串 建立 EventHubConsumerClient 的新實例。
import os
from azure.eventhub import EventHubConsumerClient
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
consumer = EventHubConsumerClient.from_connection_string(
conn_str=event_hub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
get_eventhub_properties
取得事件中樞的屬性。
傳回字典中的索引鍵包括:
eventhub_name str) (
created_at (UTC datetime.datetime)
partition_ids (list[str])
get_eventhub_properties() -> Dict[str, Any]
傳回
包含事件中樞相關資訊的字典。
傳回類型
例外狀況
get_partition_ids
get_partition_properties
取得指定之分割區的屬性。
屬性字典中的索引鍵包括:
eventhub_name str) (
id (str)
beginning_sequence_number (int)
last_enqueued_sequence_number (int)
last_enqueued_offset str) (
last_enqueued_time_utc (UTC datetime.datetime)
is_empty (bool)
get_partition_properties(partition_id: str) -> Dict[str, Any]
參數
傳回
包含資料分割屬性的字典。
傳回類型
例外狀況
receive
使用選擇性的負載平衡和檢查點,從分割區 () 接收事件。
receive(on_event: Callable[[PartitionContext, EventData | None], None], **kwargs: Any) -> None
參數
- on_event
- callable[PartitionContext, EventData 或 None]
用於處理已接收事件的回呼函式。 回呼會採用兩個參數: partition_coNtext ,其中包含已接收的事件分割內容和 事件 。 回呼函式的定義方式如下: on_event (partition_coNtext、事件) 。 如需詳細的資料分割內容資訊,請參閱 PartitionContext 。
- max_wait_time
- float
事件處理器在呼叫回呼之前會等候的最大間隔,以秒為單位。 如果未在此間隔內收到任何事件,則會以None呼叫on_event回呼。 如果此值設定為 None 或 0 (預設) ,在收到事件之前,將不會呼叫回呼。
- partition_id
- str
如果指定,用戶端只會從這個分割區接收。 否則,用戶端會從所有分割區接收。
- owner_level
- int
獨佔取用者的優先順序。 如果已設定owner_level,將會建立獨佔取用者。 具有較高owner_level的取用者具有較高的獨佔優先權。 擁有者層級也稱為取用者的「Epoch 值」。
- prefetch
- int
要從服務預先擷取以進行處理的事件數目。 預設值為 300。
- track_last_enqueued_event_properties
- bool
指出取用者是否應該在其相關聯的分割區上要求最後排入佇列事件的資訊,並在收到事件時追蹤該資訊。 當追蹤資料分割最後排入佇列事件的相關資訊時,從事件中樞服務收到的每個事件都會包含有關資料分割的中繼資料。 這會導致少量額外的網路頻寬耗用量,在考慮使用事件中樞用戶端定期提出分割區屬性的要求時,通常會是一個偏好的取捨。 預設會設定為 False 。
如果沒有分割區的檢查點資料,請開始從這個事件位置接收。 如果有的話,將會使用檢查點資料。 這可以是具有分割區識別碼的聽寫,做為個別分割區的值,或是所有分割區的單一值。 實數值型別可以是 str、int 或 datetime.datetime。 此外,支援從資料流程開頭接收的值 「-1」,以及只接收新事件的 「@latest」。 預設值為 「@latest」。
判斷指定的starting_position是否包含 (> =) > () 。 True 表示包含,若為 False 則為獨佔。 這可以是分割區識別碼為索引鍵的聽寫,而 bool 是值,指出特定分割區的starting_position是否包含。 這也可以是所有starting_position的單一 bool 值。 預設值是 False。
- on_error
- callable[[PartitionContext, Exception]]
在重試嘗試用盡或負載平衡程式期間收到錯誤時,將會呼叫的回呼函式。 回呼會採用兩個參數: partition_coNtext ,其中包含分割區資訊和 錯誤 是例外狀況。 如果在 負載平衡過程中引發錯誤,partition_coNtext可能是 None。 回呼的定義應該如下: on_error (partition_coNtext,錯誤) 。 如果在on_event回呼期間引發未處理的例外狀況,也會呼叫on_error回呼。
- on_partition_initialize
- callable[[PartitionContext]]
在特定分割區的取用者完成初始化之後所呼叫的回呼函式。 建立新的內部資料分割取用者以接管失敗和已關閉內部資料分割取用者的接收程式時,也會呼叫它。 回呼會採用單一參數: partition_coNtext 其中包含資料分割資訊。 回呼的定義應該如下: on_partition_initialize (partition_coNtext) 。
- on_partition_close
- callable[[PartitionContext, CloseReason]]
在關閉特定分割區的取用者之後呼叫的回呼函式。 當重試嘗試用盡之後,在接收期間引發錯誤時,也會呼叫它。 回呼會採用兩個參數: partition_coNtext ,其中包含關閉的資料分割資訊和 原因 。 回呼的定義應該如下: on_partition_close (partition_coNtext,原因) 。 如需各種關閉原因,請參閱 CloseReason 。
傳回類型
範例
從 EventHub 接收事件。
logger = logging.getLogger("azure.eventhub")
def on_event(partition_context, event):
# Put your code here.
# If the operation is i/o intensive, multi-thread will have better performance.
logger.info("Received event from partition: {}".format(partition_context.partition_id))
with consumer:
consumer.receive(on_event=on_event)
receive_batch
使用選擇性的負載平衡和檢查點,從分割區 () 接收事件。
receive_batch(on_event_batch: Callable[[PartitionContext, List[EventData]], None], **kwargs: Any) -> None
參數
- on_event_batch
- callable[PartitionContext, list[EventData]]
用於處理已接收事件批次的回呼函式。 回呼會採用兩個參數: 包含 分割區內容和event_batch的 partition_coNtext,也就是所接收的事件。 回呼函式的定義應該如下: on_event_batch (partition_coNtext,event_batch) 。 如果max_wait_time不是 None 或 0,而且max_wait_time之後不會收到任何事件,event_batch可能是空的清單。 如需詳細的資料分割內容資訊,請參閱 PartitionContext 。
- max_batch_size
- int
批次中傳遞至回 呼on_event_batch的事件數目上限。 如果實際收到的事件數目大於 max_batch_size,則接收的事件會分成批次,並呼叫每個批次的回呼,最多 max_batch_size 個事件。
- max_wait_time
- float
事件處理器在呼叫回呼之前會等候的最大間隔,以秒為單位。 如果未在此間隔內收到任何事件,則會使用空清單呼叫 on_event_batch 回呼。
- partition_id
- str
如果指定,則用戶端只會從這個分割區接收。 否則,用戶端會從所有分割區接收。
- owner_level
- int
獨佔取用者的優先順序。 如果已設定owner_level,將會建立獨佔取用者。 具有較高owner_level的取用者具有較高的獨佔優先順序。 擁有者層級也稱為取用者的「epoch 值」。
- prefetch
- int
要從服務預先擷取以進行處理的事件數目。 預設值為 300。
- track_last_enqueued_event_properties
- bool
指出取用者是否應該在其相關聯的分割區上要求最後排入佇列事件的資訊,並在收到事件時追蹤該資訊。 當追蹤分割區上一次排入佇列事件的相關資訊時,從事件中樞服務收到的每個事件都會包含資料分割的相關中繼資料。 這會導致少量額外的網路頻寬耗用量,這通常是在考慮使用事件中樞用戶端定期提出資料分割屬性的要求時,取捨較好。 預設會設定為 False 。
如果沒有資料分割的檢查點資料,請開始從這個事件位置接收。 如果有的話,將會使用檢查點資料。 這可以是具有分割區識別碼的聽寫,做為個別分割區的索引鍵和位置,或是所有分割區的單一值。 實數值型別可以是 str、int 或 datetime.datetime。 此外,支援從資料流程開頭接收的值 「-1」,以及只接收新事件的 「@latest」。 預設值為 「@latest」。
判斷指定的starting_position是否包含 (> =) () > 。 若為 True,則為內含和 False 表示為獨佔。 這可以是分割區識別碼為索引鍵的聽寫,而布林值表示特定分割區的starting_position是否包含。 這可以是所有starting_position的單一 bool 值。 預設值是 False。
- on_error
- callable[[PartitionContext, Exception]]
在重試嘗試用盡或負載平衡程式期間收到錯誤時,將會呼叫的回呼函式。 回呼會採用兩個參數: partition_coNtext ,其中包含資料分割資訊和例外狀況 的錯誤 。 如果負載平衡期間引發錯誤,partition_coNtext可能是 None。 回呼的定義應該如下: on_error (partition_coNtext,錯誤) 。 如果在on_event回呼期間引發未處理的例外狀況,也會呼叫on_error回呼。
- on_partition_initialize
- callable[[PartitionContext]]
在特定分割區的取用者完成初始化之後所呼叫的回呼函式。 建立新的內部資料分割取用者以接管失敗和關閉的內部分割取用者接收程式時,也會呼叫它。 回呼會採用單一參數:包含分割區資訊的 partition_coNtext 。 回呼的定義方式如下: on_partition_initialize (partition_coNtext) 。
- on_partition_close
- callable[[PartitionContext, CloseReason]]
在關閉特定分割區的取用者之後呼叫的回呼函式。 當重試嘗試用盡之後,在接收期間引發錯誤時,也會呼叫它。 回呼會採用兩個參數: partition_coNtext ,其中包含資料分割資訊和關閉 的原因 。 回呼的定義應該如下: on_partition_close (partition_coNtext,原因) 。 如需各種關閉原因,請參閱 CloseReason 。
傳回類型
範例
從 EventHub 批次接收事件。
logger = logging.getLogger("azure.eventhub")
def on_event_batch(partition_context, event_batch):
# Put your code here.
# If the operation is i/o intensive, multi-thread will have better performance.
logger.info("Received events from partition: {}".format(partition_context.partition_id))
with consumer:
consumer.receive_batch(on_event_batch=on_event_batch)