EventHubProducerClient 類別
EventHubProducerClient 類別會定義將事件傳送至Azure 事件中樞服務的高階介面。
- 繼承
-
azure.eventhub._client_base.ClientBaseEventHubProducerClient
建構函式
EventHubProducerClient(fully_qualified_namespace: str, eventhub_name: str, credential: CredentialTypes, *, buffered_mode: Literal[False] = False, **kwargs: Any)
參數
- credential
- TokenCredential 或 AzureSasCredential 或 AzureNamedKeyCredential
用於驗證的認證物件,會實作用於取得權杖的特定介面。 它會接受 EventHubSharedKeyCredential azure 身分識別程式庫所產生的認證物件,以及實作 *get_token (自我、 範圍) 方法的物件。
- buffered_mode
- bool
如果為 True,產生者用戶端會在緩衝區中收集事件,並有效率地批次,然後發佈。 預設值是 False。
用來發佈事件的 ThreadPoolExecutor 或 ThreadPoolExecutor 的背景工作角色數目。 預設值為 None,且會針對每個建立預設背景工作角色數目的 ThreadPoolExecutor https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor
成功發佈批次後要呼叫的回呼。 回呼採用兩個參數:
events:已成功發行的事件清單
partition_id:清單中事件已發行至的資料分割識別碼。
回呼函式的定義方式如下: on_success (事件,partition_id) 。 當 buffered_mode 為 True 時,如果 buffered_mode 為 False,則為必要專案。
一旦批次無法發佈之後,要呼叫的回呼。 回呼函式的定義方式如下: on_error (事件、partition_id、錯誤) ,其中:
events:無法發佈的事件清單。
partition_id:清單中事件嘗試發行至 和 的分割區識別碼
error:與傳送失敗相關的例外狀況。
如果 buffered_mode 為 False,on_error 回呼是選擇性的,而且會處理錯誤,如下所示:
如果在產生者用戶端具現化期間傳遞 on_error 回呼,
然後,錯誤資訊會傳遞至 on_error 回呼,然後呼叫。
如果在用戶端具現化期間未傳入 on_error 回呼,
然後,預設會引發錯誤。
如果 buffered_mode 為 True, 則需要on_error 回呼,而且會處理錯誤,如下所示:
如果事件無法在指定的逾時內排入佇列,則會直接引發錯誤。
如果成功排入佇列之後無法傳送事件,則會呼叫 on_error 回呼。
- max_buffer_length
- int
僅限緩衝模式。 每個分割區的事件總數,這些事件可以在觸發排清之前進行緩衝處理。 預設值為緩衝模式中的 1500。
- logging_enable
- bool
是否要將網路追蹤記錄輸出至記錄器。 預設值為 False。
- auth_timeout
- float
等候服務授權權杖的秒數。 預設值為 60 秒。 如果設定為 0,則不會從用戶端強制執行逾時。
- user_agent
- str
如果指定,這會在使用者代理程式字串前面新增。
- retry_total
- int
發生錯誤時,重做失敗作業的嘗試總數。 預設值為 3。
- retry_backoff_factor
- float
第二次嘗試之後套用的輪詢因素 (大部分錯誤都會立即由第二次嘗試解決,而不會延遲) 。 在固定模式中,重試原則一律會睡眠 {backoff factor}。 在 「指數」模式中,重試原則會睡眠: {backoff factor} * (2 ** ({total retries} - 1) ) 秒。 如果backoff_factor為 0.1,則重試會在重試之間進入 [0.0s、0.2s、0.4s...] 的睡眠狀態。 預設值為 0.8。
- retry_backoff_max
- float
回溯時間上限。 預設值為 120 秒, (2 分鐘) 。
- retry_mode
- str
重試嘗試之間的延遲行為。 支援的值為 'fixed' 或 'exponential',其中預設值為 'exponential'。
- idle_timeout
- float
逾時,以秒為單位,在此之後,如果沒有活動,此用戶端將會關閉基礎連線。 根據預設,此值為 None,這表示除非由服務起始,否則用戶端不會因為閒置而關閉。
- transport_type
- TransportType
將用於與事件中樞服務通訊的傳輸通訊協定類型。 預設值為 TransportType.Amqp ,在此情況下會使用埠 5671。 如果網路環境中無法使用/封鎖埠 5671,則可改用 TransportType.AmqpOverWebsocket 來使用埠 443 進行通訊。
- http_proxy
- Dict
HTTP Proxy 設定。 這必須是具有下列索引鍵的字典: 'proxy_hostname' (str 值) 和 'proxy_port' (int 值) 。 此外,也可能有下列金鑰: 'username'、'password'。
用來建立事件中樞服務的連線的自訂端點位址,允許網路要求透過任何應用程式閘道或其他主機環境所需的路徑路由傳送。 預設值為 None。 格式會像 「sb:// < custom_endpoint_hostname > : < custom_endpoint_port > 」。 如果未在 custom_endpoint_address中指定埠,則會預設使用埠 443。
SSL 憑證的自訂CA_BUNDLE檔案路徑,用來驗證連線端點的身分識別。 預設值為 None, 其中會使用 certifi.where () 。
- uamqp_transport
- bool
是否要使用 uamqp 程式庫作為基礎傳輸。 預設值為 False,且純 Python AMQP 程式庫將作為基礎傳輸。
- socket_timeout
- float
連線上基礎通訊端在傳送和接收資料時應該等待的時間,以秒為單位逾時。TransportType.Amqp 的預設值為 0.2,而 TransportType.AmqpOverWebsocket 的預設值為 1。 如果 EventHubsConnectionError 錯誤因為寫入逾時而發生,可能需要傳入大於預設值。 這適用于進階使用案例,而且預設值通常應該足夠。
範例
建立 EventHubProducerClient 的新實例。
import os
from azure.eventhub import EventHubProducerClient, EventHubSharedKeyCredential
fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
eventhub_name = os.environ['EVENT_HUB_NAME']
shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
shared_access_key = os.environ['EVENT_HUB_SAS_KEY']
credential = EventHubSharedKeyCredential(shared_access_policy, shared_access_key)
producer = EventHubProducerClient(
fully_qualified_namespace=fully_qualified_namespace,
eventhub_name=eventhub_name, # EventHub name should be specified if it doesn't show up in connection string.
credential=credential
)
方法
close |
關閉產生者用戶端基礎 AMQP 連線和連結。 |
create_batch |
建立 EventDataBatch 物件,其中包含受max_size_in_bytes限制之所有內容的大小上限。 max_size_in_bytes不應大於服務所定義的允許訊息大小上限。 |
flush |
僅限緩衝模式。 如果用戶端在緩衝模式中運作,則會在緩衝區中排清要立即傳送的事件。 |
from_connection_string |
從連接字串建立 EventHubProducerClient。 |
get_buffered_event_count |
已緩衝處理並等候針對指定分割區發行的事件數目。 在非緩衝模式中傳回 None。 注意:事件緩衝區是在背景執行緒中處理,因此此 API 所報告的緩衝區中事件數目應該只視為近似值,而且只建議用於偵錯。 對於未緩衝處理事件的資料分割識別碼,不論該分割區識別碼是否確實存在於事件中樞內,都會傳回 0。 |
get_eventhub_properties |
取得事件中樞的屬性。 傳回字典中的索引鍵包括:
|
get_partition_ids |
取得事件中樞的資料分割識別碼。 |
get_partition_properties |
取得指定資料分割的屬性。 屬性字典中的索引鍵包括:
|
send_batch |
傳送事件資料的批次。 根據預設,方法會封鎖,直到收到通知或作業逾時為止。如果 EventHubProducerClient 設定為以緩衝模式執行,則如果指定並傳回,方法會嘗試將事件加入至指定時間內的緩衝區。 製作者會在緩衝模式的背景中自動傳送。 如果 buffered_mode 為 False,on_error 回呼是選擇性的,而且錯誤會處理如下:
如果 buffered_mode 為 True,則 on_error回呼是必要的,而且會處理錯誤,如下所示:
在緩衝模式中,傳送批次會保持不變,並以單一單位傳送。 批次不會重新排列。 這可能會導致傳送事件效率不佳。 如果您要傳送 EventData 或 AmqpAnnotatedMessage 的有限清單,而且您知道它位於事件中樞畫面大小限制內,您可以使用 send_batch 呼叫來傳送它們。 否則,請使用 create_batch 建立 EventDataBatch ,並將 EventData 或 AmqpAnnotatedMessage 新增至批次一次,直到大小限制為止,然後呼叫此方法以傳送批次。 |
send_event |
傳送事件資料。 根據預設,方法會封鎖,直到收到通知或作業逾時為止。如果 EventHubProducerClient 設定為以緩衝模式執行,則如果指定並傳回,方法會嘗試將事件加入至指定時間內的緩衝區。 製作者會在緩衝模式的背景中自動傳送。 如果 buffered_mode 為 False,on_error 回呼是選擇性的,而且會處理錯誤,如下所示:* 如果在產生者用戶端具現化期間傳遞 on_error 回呼,
如果 buffered_mode 為 True, 則on_error 回呼是必要的,而且會處理錯誤,如下所示:* 如果事件無法在指定的逾時內排入佇列,則會直接引發錯誤。
|
close
關閉產生者用戶端基礎 AMQP 連線和連結。
close(*, flush: bool = True, **kwargs: Any) -> None
參數
- flush
- bool
僅限緩衝模式。 如果設定為 True,則會立即傳送緩衝區中的事件。 預設值是 True。
傳回類型
例外狀況
如果 flush 設定為 True 或關閉緩衝模式中基礎 AMQP 連線,則排清緩衝區時發生錯誤。
範例
關閉用戶端。
import os
from azure.eventhub import EventHubProducerClient, EventData
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
producer = EventHubProducerClient.from_connection_string(
conn_str=event_hub_connection_str,
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
try:
event_data_batch = producer.create_batch()
while True:
try:
event_data_batch.add(EventData('Message inside EventBatchData'))
except ValueError:
# EventDataBatch object reaches max_size.
# New EventDataBatch object can be created here to send more data
break
producer.send_batch(event_data_batch)
finally:
# Close down the producer handler.
producer.close()
create_batch
建立 EventDataBatch 物件,其中包含受max_size_in_bytes限制之所有內容的大小上限。
max_size_in_bytes不應大於服務所定義的允許訊息大小上限。
create_batch(**kwargs: Any) -> EventDataBatch
傳回類型
例外狀況
如果 flush 設定為 True 或關閉緩衝模式中基礎 AMQP 連線,則排清緩衝區時發生錯誤。
範例
在有限的大小內建立 EventDataBatch 物件
event_data_batch = producer.create_batch()
while True:
try:
event_data_batch.add(EventData('Message inside EventBatchData'))
except ValueError:
# The EventDataBatch object reaches its max_size.
# You can send the full EventDataBatch object and create a new one here.
break
flush
僅限緩衝模式。 如果用戶端在緩衝模式中運作,則會在緩衝區中排清要立即傳送的事件。
flush(**kwargs: Any) -> None
參數
傳回類型
例外狀況
如果產生者無法在緩衝模式的指定逾時內排清緩衝區。
from_connection_string
從連接字串建立 EventHubProducerClient。
from_connection_string(conn_str: str, *, eventhub_name: str | None = None, buffered_mode: Literal[False] = False, **kwargs: Any) -> EventHubProducerClient
參數
- eventhub_name
- str
要連接用戶端的特定事件中樞路徑。
- buffered_mode
- bool
如果為 True,產生者用戶端會在緩衝區中收集事件,並有效率地批次,然後發佈。 預設值是 False。
用來發佈事件的 ThreadPoolExecutor 或 ThreadPoolExecutor 的背景工作角色數目。 預設值為 None,且會針對每個建立預設背景工作角色數目的 ThreadPoolExecutor https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor
成功發佈批次後要呼叫的回呼。 回呼採用兩個參數:
events:已成功發行的事件清單
partition_id:清單中事件已發行至的資料分割識別碼。
回呼函式的定義方式如下: on_success (事件,partition_id) 。 當 buffered_mode為 True 時為必要條件,如果buffered_mode為 False,則為選擇性。
一旦批次無法發佈之後,要呼叫的回呼。 在 buffered_mode 為 True 時為必要條件,如果 buffered_mode 為 False,則為選擇性。 回呼函式的定義方式如下: on_error (事件、partition_id、錯誤) ,其中:
events:無法發佈的事件清單。
partition_id:清單中事件嘗試發行至 和 的分割區識別碼
error:與傳送失敗相關的例外狀況。
如果 buffered_mode 為 False,on_error 回呼是選擇性的,而且會處理錯誤,如下所示:
如果在產生者用戶端具現化期間傳遞 on_error 回呼,
然後,錯誤資訊會傳遞至 on_error 回呼,然後呼叫。
如果在用戶端具現化期間未傳入 on_error 回呼,
然後,預設會引發錯誤。
如果 buffered_mode 為 True, 則需要on_error 回呼,而且會處理錯誤,如下所示:
如果事件無法在指定的逾時內排入佇列,則會直接引發錯誤。
如果成功排入佇列之後無法傳送事件,則會呼叫 on_error 回呼。
- max_buffer_length
- int
僅限緩衝模式。 每個分割區的事件總數,這些事件可以在觸發排清之前進行緩衝處理。 預設值為緩衝模式中的 1500。
- logging_enable
- bool
是否要將網路追蹤記錄輸出至記錄器。 預設值為 False。
- http_proxy
- Dict
HTTP Proxy 設定。 這必須是具有下列索引鍵的字典: 'proxy_hostname' (str 值) 和 'proxy_port' (int 值) 。 此外,也可能有下列金鑰: 'username'、'password'。
- auth_timeout
- float
等候服務授權權杖的秒數。 預設值為 60 秒。 如果設定為 0,則不會從用戶端強制執行逾時。
- user_agent
- str
如果指定,這會在使用者代理程式字串前面新增。
- retry_total
- int
發生錯誤時,重做失敗作業的嘗試總數。 預設值為 3。
- retry_backoff_factor
- float
第二次嘗試之後套用的輪詢因素 (大部分錯誤都會立即由第二次嘗試解決,而不會延遲) 。 在固定模式中,重試原則一律會睡眠 {backoff factor}。 在 「指數」模式中,重試原則會睡眠: {backoff factor} * (2 ** ({total retries} - 1) ) 秒。 如果backoff_factor為 0.1,則重試會在重試之間進入 [0.0s、0.2s、0.4s...] 的睡眠狀態。 預設值為 0.8。
- retry_backoff_max
- float
回溯時間上限。 預設值為 120 秒, (2 分鐘) 。
- retry_mode
- str
重試嘗試之間的延遲行為。 支援的值為 'fixed' 或 'exponential',其中預設值為 'exponential'。
- idle_timeout
- float
逾時,以秒為單位,在此之後,如果沒有活動,此用戶端將會關閉基礎連線。 根據預設,此值為 None,這表示除非由服務起始,否則用戶端不會因為閒置而關閉。
- transport_type
- TransportType
將用於與事件中樞服務通訊的傳輸通訊協定類型。 預設值為 TransportType.Amqp ,在此情況下會使用埠 5671。 如果網路環境中無法使用/封鎖埠 5671,則可改用 TransportType.AmqpOverWebsocket 來使用埠 443 進行通訊。
- http_proxy
HTTP Proxy 設定。 這必須是具有下列索引鍵的字典: 'proxy_hostname' (str 值) 和 'proxy_port' (int 值) 。 此外,也可能有下列金鑰: 'username'、'password'。
用來建立事件中樞服務的連線的自訂端點位址,允許網路要求透過任何應用程式閘道或其他主機環境所需的路徑路由傳送。 預設值為 None。 格式會像 「sb:// < custom_endpoint_hostname > : < custom_endpoint_port > 」。 如果未在 custom_endpoint_address中指定埠,則會預設使用埠 443。
SSL 憑證的自訂CA_BUNDLE檔案路徑,用來驗證連線端點的身分識別。 預設值為 None, 其中會使用 certifi.where () 。
- uamqp_transport
- bool
是否要使用 uamqp 程式庫作為基礎傳輸。 預設值為 False,且純 Python AMQP 程式庫將作為基礎傳輸。
傳回類型
例外狀況
如果 flush 設定為 True 或關閉緩衝模式中基礎 AMQP 連線,則排清緩衝區時發生錯誤。
範例
從 連接字串 建立 EventHubProducerClient 的新實例。
import os
from azure.eventhub import EventHubProducerClient
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
producer = EventHubProducerClient.from_connection_string(
conn_str=event_hub_connection_str,
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
get_buffered_event_count
已緩衝處理並等候針對指定分割區發行的事件數目。 在非緩衝模式中傳回 None。 注意:事件緩衝區是在背景執行緒中處理,因此此 API 所報告的緩衝區中事件數目應該只視為近似值,而且只建議用於偵錯。 對於未緩衝處理事件的資料分割識別碼,不論該分割區識別碼是否確實存在於事件中樞內,都會傳回 0。
get_buffered_event_count(partition_id: str) -> int | None
參數
傳回類型
例外狀況
如果 flush 設定為 True 或關閉緩衝模式中基礎 AMQP 連線,則排清緩衝區時發生錯誤。
get_eventhub_properties
取得事件中樞的屬性。
傳回字典中的索引鍵包括:
eventhub_name (str)
created_at (UTC datetime.datetime)
partition_ids (list[str])
get_eventhub_properties() -> Dict[str, Any]
傳回
包含 eventhub 屬性的字典。
傳回類型
例外狀況
get_partition_ids
get_partition_properties
取得指定資料分割的屬性。
屬性字典中的索引鍵包括:
eventhub_name (str)
id (str)
beginning_sequence_number (int)
last_enqueued_sequence_number (int)
last_enqueued_offset (str)
last_enqueued_time_utc (UTC datetime.datetime)
is_empty (bool)
get_partition_properties(partition_id: str) -> Dict[str, Any]
參數
傳回
資料分割屬性的字典。
傳回類型
例外狀況
send_batch
傳送事件資料的批次。 根據預設,方法會封鎖,直到收到通知或作業逾時為止。如果 EventHubProducerClient 設定為以緩衝模式執行,則如果指定並傳回,方法會嘗試將事件加入至指定時間內的緩衝區。 製作者會在緩衝模式的背景中自動傳送。
如果 buffered_mode 為 False,on_error 回呼是選擇性的,而且錯誤會處理如下:
如果在產生者用戶端具現化期間傳遞 on_error 回呼,
接著,錯誤資訊會傳遞至 on_error回 呼,然後呼叫該回呼。
如果在用戶端具現化期間未傳入 on_error 回呼,
然後,預設會引發錯誤。
如果 buffered_mode 為 True,則 on_error回呼是必要的,而且會處理錯誤,如下所示:
如果事件無法在指定的逾時內排入佇列,則會直接引發錯誤。
如果成功排入佇列之後無法傳送事件,則會呼叫 on_error回 呼。
在緩衝模式中,傳送批次會保持不變,並以單一單位傳送。 批次不會重新排列。 這可能會導致傳送事件效率不佳。
如果您要傳送 EventData 或 AmqpAnnotatedMessage 的有限清單,而且您知道它位於事件中樞畫面大小限制內,您可以使用 send_batch 呼叫來傳送它們。 否則,請使用 create_batch 建立 EventDataBatch ,並將 EventData 或 AmqpAnnotatedMessage 新增至批次一次,直到大小限制為止,然後呼叫此方法以傳送批次。
send_batch(event_data_batch: EventDataBatch | List[EventData | AmqpAnnotatedMessage], **kwargs: Any) -> None
參數
- event_data_batch
- Union[EventDataBatch, List[Union[EventData, AmqpAnnotatedMessage]]
要傳送的 EventDataBatch 物件,或要批次中傳送 的 EventData 清單。 清單中的所有 EventData 或 AmqpAnnotatedMessage 或 EventDataBatch 都會落在相同的分割區上。
- timeout
- float
以非緩衝模式傳送事件資料的最大等候時間,或將事件資料加入緩衝模式中緩衝區的最大等候時間。 在非緩衝模式中,將會使用建立產生者時所指定的預設等候時間。 在緩衝模式中,預設等候時間為 None。
- partition_id
- str
要傳送至的特定資料分割識別碼。 預設值為 None,在此情況下,服務會使用迴圈配置資源指派給所有分割區。 如果指定了partition_id,而且event_data_batch是EventDataBatch,則會引發TypeError,因為EventDataBatch本身有partition_id。
- partition_key
- str
使用指定的partition_key,事件資料會傳送至服務所決定之事件中樞的特定分割區。 如果指定partition_key且event_data_batch為EventDataBatch,則會引發TypeError,因為EventDataBatch本身有partition_key。 如果同時提供partition_id和partition_key,則partition_id優先。 警告:不建議在要傳送的事件上設定非字串值的partition_key,因為事件中樞服務將會忽略partition_key,事件將會使用迴圈配置資源指派給所有分割區。 此外,有 SDK 可用來取用預期只有字串類型的事件partition_key,它們可能無法剖析非字串值。
傳回類型
例外狀況
如果 timeout 參數所指定的值會經過,事件才能以非緩衝模式傳送,或事件無法加入緩衝模式中緩衝處理的事件。
範例
傳送事件資料
with producer:
event_data_batch = producer.create_batch()
while True:
try:
event_data_batch.add(EventData('Message inside EventBatchData'))
except ValueError:
# EventDataBatch object reaches max_size.
# New EventDataBatch object can be created here to send more data
break
producer.send_batch(event_data_batch)
send_event
傳送事件資料。 根據預設,方法會封鎖,直到收到通知或作業逾時為止。如果 EventHubProducerClient 設定為以緩衝模式執行,則如果指定並傳回,方法會嘗試將事件加入至指定時間內的緩衝區。 製作者會在緩衝模式的背景中自動傳送。
如果 buffered_mode 為 False,on_error 回呼是選擇性的,而且會處理錯誤,如下所示:* 如果在產生者用戶端具現化期間傳遞 on_error 回呼,
then error information will be passed to the *on_error* callback, which will then be called.
* If an *on_error* callback is not passed in during client instantiation,
then the error will be raised by default.
如果 buffered_mode 為 True, 則on_error 回呼是必要的,而且會處理錯誤,如下所示:* 如果事件無法在指定的逾時內排入佇列,則會直接引發錯誤。
* If events fail to send after enqueuing successfully, the *on_error* callback will be called.
send_event(event_data: EventData | AmqpAnnotatedMessage, **kwargs: Any) -> None
參數
- timeout
- float
以非緩衝模式傳送事件資料的最大等候時間,或將事件資料加入緩衝模式中緩衝區的最大等候時間。 在非緩衝模式中,將會使用建立產生者時所指定的預設等候時間。 在緩衝模式中,預設等候時間為 None。
- partition_id
- str
要傳送至的特定資料分割識別碼。 預設值為 None,在此情況下,服務會使用迴圈配置資源指派給所有分割區。 如果指定了partition_id,而且event_data_batch是EventDataBatch,則會引發TypeError,因為EventDataBatch本身有partition_id。
- partition_key
- str
使用指定的partition_key,事件資料會傳送至服務所決定之事件中樞的特定分割區。 如果指定partition_key且event_data_batch為EventDataBatch,則會引發TypeError,因為EventDataBatch本身有partition_key。 如果同時提供partition_id和partition_key,則partition_id優先。 警告:不建議在要傳送的事件上設定非字串值的partition_key,因為事件中樞服務將會忽略partition_key,事件將會使用迴圈配置資源指派給所有分割區。 此外,有 SDK 可用來取用預期只有字串類型的事件partition_key,它們可能無法剖析非字串值。
傳回類型
例外狀況
如果 timeout 參數所指定的值經過,事件才能以非緩衝模式傳送,或事件可以加入緩衝模式的緩衝處理中。