EventHubProducerClient 類別
EventHubProducerClient 類別會定義高階介面,以便將事件傳送至Azure 事件中樞服務。
- 繼承
-
azure.eventhub.aio._client_base_async.ClientBaseAsyncEventHubProducerClient
建構函式
EventHubProducerClient(fully_qualified_namespace: str, eventhub_name: str, credential: CredentialTypes, *, buffered_mode: bool = False, on_error: Callable[[List[EventData | AmqpAnnotatedMessage], str | None, Exception], Awaitable[None]] | None = None, on_success: Callable[[List[EventData | AmqpAnnotatedMessage], str | None], Awaitable[None]] | None = None, max_buffer_length: int | None = None, max_wait_time: float | None = None, **kwargs: Any)
參數
- credential
- AsyncTokenCredential 或 AzureSasCredential 或 AzureNamedKeyCredential
用於驗證的認證物件,會實作用於取得權杖的特定介面。 它會接受 EventHubSharedKeyCredential 由 azure-identity 程式庫所產生的認證物件,以及實作 *get_token (自我範圍) 方法的物件。
- buffered_mode
- bool
如果為 True,產生者用戶端會在緩衝區中收集事件、有效率地批次,然後發佈。 預設值是 False。
成功發佈批次後要呼叫的回呼。 回呼會採用兩個參數:
events:已成功發行的事件清單
partition_id:清單中事件已發佈至的資料分割識別碼。
回呼函式的定義方式如下: on_success (事件,partition_id) 。 如果buffered_mode為 False,則buffered_mode為 True 時為必要項。
批次發佈失敗後要呼叫的回呼。 當 buffered_mode 為 True 時為必要項,如果 buffered_mode 為 False 則為選擇性。 回呼函式的定義方式如下: on_error (事件、partition_id、錯誤) ,其中:
events:無法發行的事件清單。
partition_id:清單中事件嘗試發行至 和 的分割區識別碼
error:與傳送失敗相關的例外狀況。
如果 buffered_mode 為 False,on_error 回呼是選擇性的,而且錯誤會處理如下:
如果在產生者用戶端具現化期間傳遞 on_error 回呼,
接著,錯誤資訊會傳遞至 on_error回 呼,然後呼叫該回呼。
如果在用戶端具現化期間未傳入 on_error 回呼,
然後,預設會引發錯誤。
如果 buffered_mode 為 True,則 on_error回呼是必要的,而且會處理錯誤,如下所示:
如果事件無法在指定的逾時內排入佇列,則會直接引發錯誤。
如果成功排入佇列之後無法傳送事件,則會呼叫 on_error回 呼。
- max_buffer_length
- int
僅限緩衝模式。 每個分割區的事件總數,可在觸發排清之前進行緩衝處理。 預設值為緩衝模式中的 1500。
- logging_enable
- bool
是否要將網路追蹤記錄輸出至記錄器。 預設值為 False。
- auth_timeout
- float
等候服務授權權杖的秒數。 預設值為 60 秒。 如果設定為 0,則不會從用戶端強制執行逾時。
- user_agent
- str
如果指定,這會新增到使用者代理程式字串前面。
- retry_total
- int
發生錯誤時,嘗試取消復原失敗作業的嘗試總數。 預設值為 3。
- retry_backoff_factor
- float
第二次嘗試之後要套用的輪詢因素, (大部分的錯誤都會立即由第二次嘗試解決,而不會延遲) 。 在固定模式中,重試原則一律會睡眠 {backoff factor}。 在 「指數」模式中,重試原則會睡眠: {backoff factor} * (2 ** ({total retries} - 1) ) 秒。 如果backoff_factor為 0.1,則重試會在重試之間睡眠 [0.0s, 0.2s, 0.4s, ...]。 預設值為 0.8。
- retry_backoff_max
- float
退班時間上限。 預設值為 120 秒, (2 分鐘) 。
- retry_mode
- str
重試嘗試之間的延遲行為。 支援的值為 'fixed' 或 'exponential',其中預設值為 'exponential'。
- idle_timeout
- float
逾時,以秒為單位,在此之後,如果沒有任何活動,此用戶端將會關閉基礎連線。 根據預設,此值為 None,表示除非服務起始,否則用戶端不會因為閒置而關閉。
- transport_type
- TransportType
將用於與事件中樞服務通訊的傳輸通訊協定類型。 預設值為 TransportType.Amqp ,在此情況下會使用埠 5671。 如果網路環境中無法使用/封鎖埠 5671,則可改用 TransportType.AmqpOverWebsocket 來使用埠 443 進行通訊。
- http_proxy
- dict
HTTP Proxy 設定。 這必須是具有下列索引鍵的字典: 'proxy_hostname' ( str 值) 和 'proxy_port' (int 值) 。 此外,也可能會出現下列金鑰: 'username'、'password'。
用來建立事件中樞服務的連線的自訂端點位址,允許透過任何應用程式閘道或其他主機環境所需的路徑路由傳送網路要求。 預設值為 None。 格式會像 「sb:// < custom_endpoint_hostname > : < custom_endpoint_port > 」。 如果未在 custom_endpoint_address中指定埠,預設會使用埠 443。
SSL 憑證之自訂CA_BUNDLE檔案的路徑,用來驗證連線端點的身分識別。 預設值為 None,在此情況下會使用 certifi.where () 。
- uamqp_transport
- bool
是否要使用 uamqp 程式庫作為基礎傳輸。 預設值為 False,而純 Python AMQP 程式庫將作為基礎傳輸。
- socket_timeout
- float
連接上基礎通訊端在傳送和接收資料之前應該等候的時間,以秒為單位。TransportType.Amqp 的預設值為 0.2,而 TransportType.AmqpOverWebsocket 的預設值為 1。 如果 EventHubsConnectionError 錯誤是因為寫入逾時而發生,可能需要傳入大於預設值。 這適用于進階使用案例,而且預設值通常就已足夠。
範例
建立 EventHubProducerClient 的新實例。
import os
from azure.eventhub.aio import EventHubProducerClient, EventHubSharedKeyCredential
fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
eventhub_name = os.environ['EVENT_HUB_NAME']
shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
shared_access_key = os.environ['EVENT_HUB_SAS_KEY']
producer = EventHubProducerClient(fully_qualified_namespace=fully_qualified_namespace,
eventhub_name=eventhub_name,
credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key))
方法
close |
關閉產生者用戶端基礎 AMQP 連線和連結。 |
create_batch |
建立 EventDataBatch 物件,其大小上限為受max_size_in_bytes限制的所有內容。 max_size_in_bytes不應大於服務所定義的允許訊息大小上限。 |
flush |
僅限緩衝模式。 如果用戶端在緩衝模式中運作,則清除緩衝區中要立即傳送的事件。 |
from_connection_string |
從連接字串建立 EventHubProducerClient。 |
get_buffered_event_count |
已緩衝處理和等待針對指定分割區發行的事件數目。 以非緩衝模式傳回 None。 注意:事件緩衝區是在背景協同程式中處理,因此此 API 所報告的緩衝區中事件數目應該只視為近似值,而且只建議用於偵錯。 對於未緩衝處理事件的分割區識別碼,不論該分割區識別碼是否確實存在於事件中樞內,都會傳回 0。 |
get_eventhub_properties |
取得事件中樞的屬性。 傳回字典中的索引鍵包括:
|
get_partition_ids |
取得事件中樞的資料分割識別碼。 |
get_partition_properties |
取得指定之分割區的屬性。 屬性字典中的索引鍵包括:
|
send_batch |
傳送事件資料的批次。 根據預設,方法會封鎖,直到收到通知或作業逾時為止。如果 EventHubProducerClient 設定為以緩衝模式執行,方法會將事件加入本機緩衝區並傳回。 製作者會在背景自動傳送。 如果 buffered_mode 為 False,on_error 回呼是選擇性的,而且會處理錯誤,如下所示:
如果 buffered_mode 為 True, 則需要on_error 回呼,而且會處理錯誤,如下所示:
在緩衝模式中,傳送批次會保持不變,並以單一單位的形式傳送。 批次不會重新排列。 這可能會導致傳送事件效率不佳。 如果您要傳送 EventData 或 AmqpAnnotatedMessage 的有限清單,而且您知道它位於事件中樞畫面大小限制內,您可以使用 send_batch 呼叫來傳送它們。 否則,請使用 create_batch 來建立 EventDataBatch ,並將 EventData 或 AmqpAnnotatedMessage 新增至批次,直到大小限制為止,然後呼叫此方法以傳送批次。 |
send_event |
傳送事件資料。 根據預設,方法會封鎖,直到收到通知或作業逾時為止。如果 EventHubProducerClient 設定為以緩衝模式執行,方法會將事件加入本機緩衝區並傳回。 產生者會在背景中自動批次處理和傳送。 如果 buffered_mode 為 False, 則on_error 回呼是選擇性的,而且錯誤會依照下列方式處理:* 如果在產生者用戶端具現化期間傳遞 on_error 回呼,
如果 buffered_mode 為 True, 則 on_error回呼是必要的,而且錯誤會依照下列方式處理:* 如果事件無法在指定的逾時內排入佇列,則會直接引發錯誤。
|
close
關閉產生者用戶端基礎 AMQP 連線和連結。
async close(*, flush: bool = True, **kwargs: Any) -> None
參數
- flush
- bool
僅限緩衝模式。 如果設定為 True,則會立即傳送緩衝區中的事件。 預設值是 True。
傳回類型
例外狀況
如果 flush 設定為 True 或關閉緩衝模式中基礎 AMQP 連線,則排清緩衝區時發生錯誤。
範例
關閉處理常式。
import os
from azure.eventhub.aio import EventHubProducerClient
from azure.eventhub import EventData
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
producer = EventHubProducerClient.from_connection_string(
conn_str=event_hub_connection_str,
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
try:
event_data_batch = await producer.create_batch()
while True:
try:
event_data_batch.add(EventData('Message inside EventBatchData'))
except ValueError:
# The EventDataBatch object reaches its max_size.
# You can send the full EventDataBatch object and create a new one here.
break
await producer.send_batch(event_data_batch)
finally:
# Close down the producer handler.
await producer.close()
create_batch
建立 EventDataBatch 物件,其大小上限為受max_size_in_bytes限制的所有內容。
max_size_in_bytes不應大於服務所定義的允許訊息大小上限。
async create_batch(*, partition_id: str | None = None, partition_key: str | None = None, max_size_in_bytes: int | None = None) -> EventDataBatch
傳回類型
例外狀況
如果 flush 設定為 True 或關閉緩衝模式中基礎 AMQP 連線,則排清緩衝區時發生錯誤。
範例
在有限大小內建立 EventDataBatch 物件
from azure.eventhub import EventData
event_data_batch = await producer.create_batch()
while True:
try:
event_data_batch.add(EventData('Message inside EventBatchData'))
except ValueError:
# The EventDataBatch object reaches its max_size.
# You can send the full EventDataBatch object and create a new one here.
break
flush
僅限緩衝模式。 如果用戶端在緩衝模式中運作,則清除緩衝區中要立即傳送的事件。
async flush(**kwargs: Any) -> None
參數
傳回類型
例外狀況
如果產生者無法在緩衝模式的指定逾時內排清緩衝區。
from_connection_string
從連接字串建立 EventHubProducerClient。
from_connection_string(conn_str: str, *, eventhub_name: str | None = None, buffered_mode: bool = False, on_error: Callable[[List[EventData | AmqpAnnotatedMessage], str | None, Exception], Awaitable[None]] | None = None, on_success: Callable[[List[EventData | AmqpAnnotatedMessage], str | None], Awaitable[None]] | None = None, max_buffer_length: int | None = None, max_wait_time: float | None = None, logging_enable: bool = False, http_proxy: Dict[str, str | int] | None = None, auth_timeout: float = 60, user_agent: str | None = None, retry_total: int = 3, transport_type: TransportType = TransportType.Amqp, **kwargs: Any) -> EventHubProducerClient
參數
- eventhub_name
- str
要連接用戶端的特定事件中樞路徑。
- buffered_mode
- bool
如果為 True,產生者用戶端會在緩衝區中收集事件、有效率地批次,然後發佈。 預設值是 False。
成功發佈批次後要呼叫的回呼。 回呼會採用兩個參數:
events:已成功發行的事件清單
partition_id:清單中事件已發佈至的資料分割識別碼。
回呼函式的定義方式如下: on_success (事件,partition_id) 。 當 buffered_mode 為 True 時,如果 buffered_mode 為 False,則為必要項。
批次發佈失敗後要呼叫的回呼。 回呼函式的定義方式如下: on_error (事件、partition_id、錯誤) ,其中:
events:無法發行的事件清單。
partition_id:清單中事件嘗試發行至 和 的分割區識別碼
error:與傳送失敗相關的例外狀況。
如果 buffered_mode 為 False,on_error 回呼是選擇性的,而且錯誤會處理如下:
如果在產生者用戶端具現化期間傳遞 on_error 回呼,
接著,錯誤資訊會傳遞至 on_error回 呼,然後呼叫該回呼。
如果在用戶端具現化期間未傳入 on_error 回呼,
然後,預設會引發錯誤。
如果 buffered_mode 為 True,則 on_error回呼是必要的,而且會處理錯誤,如下所示:
如果事件無法在指定的逾時內排入佇列,則會直接引發錯誤。
如果成功排入佇列之後無法傳送事件,則會呼叫 on_error回 呼。
- max_buffer_length
- int
僅限緩衝模式。 每個分割區的事件總數,可在觸發排清之前進行緩衝處理。 預設值為緩衝模式中的 1500。
- logging_enable
- bool
是否要將網路追蹤記錄輸出至記錄器。 預設值為 False。
- http_proxy
- dict
HTTP Proxy 設定。 這必須是具有下列索引鍵的字典: 'proxy_hostname' ( str 值) 和 'proxy_port' (int 值) 。 此外,也可能會出現下列金鑰: 'username'、'password'。
- auth_timeout
- float
等候服務授權權杖的秒數。 預設值為 60 秒。 如果設定為 0,則不會從用戶端強制執行逾時。
- user_agent
- str
如果指定,這會新增到使用者代理程式字串前面。
- retry_total
- int
發生錯誤時,嘗試取消復原失敗作業的嘗試總數。 預設值為 3。
- retry_backoff_factor
- float
第二次嘗試之後要套用的輪詢因素, (大部分的錯誤都會立即由第二次嘗試解決,而不會延遲) 。 在固定模式中,重試原則一律會睡眠 {backoff factor}。 在 「指數」模式中,重試原則會睡眠: {backoff factor} * (2 ** ({total retries} - 1) ) 秒。 如果backoff_factor為 0.1,則重試會在重試之間睡眠 [0.0s, 0.2s, 0.4s, ...]。 預設值為 0.8。
- retry_backoff_max
- float
退班時間上限。 預設值為 120 秒, (2 分鐘) 。
- retry_mode
- str
重試嘗試之間的延遲行為。 支援的值為 'fixed' 或 'exponential',其中預設值為 'exponential'。
- idle_timeout
- float
逾時,以秒為單位,在此之後,如果沒有任何活動,此用戶端將會關閉基礎連線。 根據預設,此值為 None,表示除非服務起始,否則用戶端不會因為閒置而關閉。
- transport_type
- TransportType
將用於與事件中樞服務通訊的傳輸通訊協定類型。 預設值為 TransportType.Amqp ,在此情況下會使用埠 5671。 如果網路環境中無法使用/封鎖埠 5671,則可改用 TransportType.AmqpOverWebsocket 來使用埠 443 進行通訊。
用來建立事件中樞服務的連線的自訂端點位址,允許透過任何應用程式閘道或其他主機環境所需的路徑路由傳送網路要求。 預設值為 None。 格式會像 「sb:// < custom_endpoint_hostname > : < custom_endpoint_port > 」。 如果未在 custom_endpoint_address中指定埠,預設會使用埠 443。
SSL 憑證之自訂CA_BUNDLE檔案的路徑,用來驗證連線端點的身分識別。 預設值為 None,在此情況下會使用 certifi.where () 。
- uamqp_transport
- bool
是否要使用 uamqp 程式庫作為基礎傳輸。 預設值為 False,而純 Python AMQP 程式庫將作為基礎傳輸。
傳回類型
例外狀況
如果 flush 設定為 True 或關閉緩衝模式中基礎 AMQP 連線,則排清緩衝區時發生錯誤。
範例
從 連接字串 建立 EventHubProducerClient 的新實例。
import os
from azure.eventhub.aio import EventHubProducerClient
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
producer = EventHubProducerClient.from_connection_string(
conn_str=event_hub_connection_str,
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
get_buffered_event_count
已緩衝處理和等待針對指定分割區發行的事件數目。 以非緩衝模式傳回 None。 注意:事件緩衝區是在背景協同程式中處理,因此此 API 所報告的緩衝區中事件數目應該只視為近似值,而且只建議用於偵錯。 對於未緩衝處理事件的分割區識別碼,不論該分割區識別碼是否確實存在於事件中樞內,都會傳回 0。
get_buffered_event_count(partition_id: str) -> int | None
參數
傳回類型
例外狀況
如果 flush 設定為 True 或關閉緩衝模式中基礎 AMQP 連線,則排清緩衝區時發生錯誤。
get_eventhub_properties
取得事件中樞的屬性。
傳回字典中的索引鍵包括:
eventhub_name str) (
created_at (UTC datetime.datetime)
partition_ids (list[str])
async get_eventhub_properties() -> Dict[str, Any]
傳回
包含事件中樞相關資訊的字典。
傳回類型
例外狀況
get_partition_ids
取得事件中樞的資料分割識別碼。
async get_partition_ids() -> List[str]
傳回
資料分割識別碼的清單。
傳回類型
例外狀況
get_partition_properties
取得指定之分割區的屬性。
屬性字典中的索引鍵包括:
eventhub_name str) (
id (str)
beginning_sequence_number (int)
last_enqueued_sequence_number (int)
last_enqueued_offset str) (
last_enqueued_time_utc (UTC datetime.datetime)
is_empty (bool)
async get_partition_properties(partition_id: str) -> Dict[str, Any]
參數
傳回
分割區屬性的聽寫。
傳回類型
例外狀況
send_batch
傳送事件資料的批次。 根據預設,方法會封鎖,直到收到通知或作業逾時為止。如果 EventHubProducerClient 設定為以緩衝模式執行,方法會將事件加入本機緩衝區並傳回。 製作者會在背景自動傳送。
如果 buffered_mode 為 False,on_error 回呼是選擇性的,而且會處理錯誤,如下所示:
如果在產生者用戶端具現化期間傳遞 on_error 回呼,
然後,錯誤資訊會傳遞至 on_error 回呼,然後呼叫。
如果在用戶端具現化期間未傳入 on_error 回呼,
然後,預設會引發錯誤。
如果 buffered_mode 為 True, 則需要on_error 回呼,而且會處理錯誤,如下所示:
如果事件無法在指定的逾時內排入佇列,則會直接引發錯誤。
如果成功排入佇列之後無法傳送事件,則會呼叫 on_error 回呼。
在緩衝模式中,傳送批次會保持不變,並以單一單位的形式傳送。 批次不會重新排列。 這可能會導致傳送事件效率不佳。
如果您要傳送 EventData 或 AmqpAnnotatedMessage 的有限清單,而且您知道它位於事件中樞畫面大小限制內,您可以使用 send_batch 呼叫來傳送它們。 否則,請使用 create_batch 來建立 EventDataBatch ,並將 EventData 或 AmqpAnnotatedMessage 新增至批次,直到大小限制為止,然後呼叫此方法以傳送批次。
async send_batch(event_data_batch: EventDataBatch | List[EventData | AmqpAnnotatedMessage], **kwargs: Any) -> None
參數
- event_data_batch
- Union[EventDataBatch, List[Union[EventData, AmqpAnnotatedMessage]]
要傳送的 EventDataBatch 物件,或要批次傳送的 EventData 清單。 清單中的所有 EventData 或 AmqpAnnotatedMessage 或 EventDataBatch 都會落在相同的分割區上。
- timeout
- float
以非緩衝模式傳送事件資料的最大等候時間,或將事件資料加入緩衝模式中緩衝區的最大等候時間。 在非緩衝模式中,將會使用建立產生者時所指定的預設等候時間。 在緩衝模式中,預設等候時間為 None。
- partition_id
- str
要傳送至的特定資料分割識別碼。 預設值為 None,在此情況下,服務會使用迴圈配置資源指派給所有分割區。 如果指定了partition_id,而且event_data_batch是EventDataBatch,則會引發TypeError,因為EventDataBatch本身有partition_id。
- partition_key
- str
使用指定的partition_key,事件資料會傳送至服務所決定之事件中樞的特定分割區。 如果指定partition_key且event_data_batch為EventDataBatch,則會引發TypeError,因為EventDataBatch本身有partition_key。 如果同時提供partition_id和partition_key,則partition_id優先。 警告:不建議在要傳送的事件上設定非字串值的partition_key,因為事件中樞服務將會忽略partition_key,而且事件將會使用迴圈配置資源指派給所有分割區。 此外,有 SDK 可用來取用預期只有字串類型的事件partition_key,它們可能無法剖析非字串值。
傳回類型
例外狀況
如果 timeout 參數所指定的值經過,事件才能以非緩衝模式傳送,或事件可以加入緩衝模式的緩衝處理中。
範例
以非同步方式傳送事件資料
async with producer:
event_data_batch = await producer.create_batch()
while True:
try:
event_data_batch.add(EventData('Message inside EventBatchData'))
except ValueError:
# The EventDataBatch object reaches its max_size.
# You can send the full EventDataBatch object and create a new one here.
break
await producer.send_batch(event_data_batch)
send_event
傳送事件資料。 根據預設,方法會封鎖,直到收到通知或作業逾時為止。如果 EventHubProducerClient 設定為以緩衝模式執行,方法會將事件加入本機緩衝區並傳回。 產生者會在背景中自動批次處理和傳送。
如果 buffered_mode 為 False, 則on_error 回呼是選擇性的,而且錯誤會依照下列方式處理:* 如果在產生者用戶端具現化期間傳遞 on_error 回呼,
then error information will be passed to the *on_error* callback, which will then be called.
* If an *on_error* callback is not passed in during client instantiation,
then the error will be raised by default.
如果 buffered_mode 為 True, 則 on_error回呼是必要的,而且錯誤會依照下列方式處理:* 如果事件無法在指定的逾時內排入佇列,則會直接引發錯誤。
* If events fail to send after enqueuing successfully, the *on_error* callback will be called.
async send_event(event_data: EventData | AmqpAnnotatedMessage, **kwargs: Any) -> None
參數
- timeout
- float
以非緩衝模式傳送事件資料的最大等候時間,或將事件資料加入緩衝模式中緩衝區的最大等候時間。 在非緩衝模式中,將會使用建立產生者時所指定的預設等候時間。 在緩衝模式中,預設等候時間為 None。
- partition_id
- str
要傳送至的特定分割區識別碼。 預設值為 None,在此情況下,服務會使用迴圈配置資源指派給所有分割區。 如果指定了 partition_id,且 event_data_batch 是EventDataBatch,則會引發TypeError,因為EventDataBatch本身有partition_id。
- partition_key
- str
使用指定的partition_key,事件資料將會傳送至服務決定的事件中樞特定分割區。 如果指定了 partition_key,且 event_data_batch 為EventDataBatch,則會引發TypeError,因為EventDataBatch本身有partition_key。 如果同時提供partition_id和partition_key,則partition_id優先。 警告:不建議在要傳送的事件上設定非字串值的partition_key,因為事件中樞服務將會忽略partition_key,而且事件將會使用迴圈配置資源指派給所有分割區。 此外,有 SDK 可用來取用預期只有字串類型的事件partition_key,它們可能無法剖析非字串值。
傳回類型
例外狀況
如果 timeout 參數所指定的值會經過,事件才能以非緩衝模式傳送,或事件無法加入緩衝模式中緩衝處理的事件。