共用方式為


EventHubProducerClient 類別

EventHubProducerClient 類別會定義高階介面,以便將事件傳送至Azure 事件中樞服務。

繼承
azure.eventhub.aio._client_base_async.ClientBaseAsync
EventHubProducerClient

建構函式

EventHubProducerClient(fully_qualified_namespace: str, eventhub_name: str, credential: CredentialTypes, *, buffered_mode: bool = False, on_error: Callable[[List[EventData | AmqpAnnotatedMessage], str | None, Exception], Awaitable[None]] | None = None, on_success: Callable[[List[EventData | AmqpAnnotatedMessage], str | None], Awaitable[None]] | None = None, max_buffer_length: int | None = None, max_wait_time: float | None = None, **kwargs: Any)

參數

fully_qualified_namespace
str
必要

事件中樞命名空間的完整主機名稱。 這可能是類似 .servicebus.windows.net

eventhub_name
str
必要

要連接用戶端的特定事件中樞路徑。

credential
AsyncTokenCredentialAzureSasCredentialAzureNamedKeyCredential
必要

用於驗證的認證物件,會實作用於取得權杖的特定介面。 它會接受 EventHubSharedKeyCredential 由 azure-identity 程式庫所產生的認證物件,以及實作 *get_token (自我範圍) 方法的物件。

buffered_mode
bool

如果為 True,產生者用戶端會在緩衝區中收集事件、有效率地批次,然後發佈。 預設值是 False。

on_success
Optional[Callable[[<xref:SendEventTypes>, Optional[str]], Awaitable[None]]]

成功發佈批次後要呼叫的回呼。 回呼會採用兩個參數:

  • events:已成功發行的事件清單

  • partition_id:清單中事件已發佈至的資料分割識別碼。

回呼函式的定義方式如下: on_success (事件,partition_id) 。 如果buffered_mode為 False,則buffered_mode為 True 時為必要項。

on_error
Optional[Callable[[<xref:SendEventTypes>, Optional[str], Exception], Awaitable[None]]]

批次發佈失敗後要呼叫的回呼。 當 buffered_mode 為 True 時為必要項,如果 buffered_mode 為 False 則為選擇性。 回呼函式的定義方式如下: on_error (事件、partition_id、錯誤) ,其中:

  • events:無法發行的事件清單。

  • partition_id:清單中事件嘗試發行至 和 的分割區識別碼

  • error:與傳送失敗相關的例外狀況。

如果 buffered_modeFalse,on_error 回呼是選擇性的,而且錯誤會處理如下:

  • 如果在產生者用戶端具現化期間傳遞 on_error 回呼,

    接著,錯誤資訊會傳遞至 on_error回 呼,然後呼叫該回呼。

  • 如果在用戶端具現化期間未傳入 on_error 回呼,

    然後,預設會引發錯誤。

如果 buffered_modeTrue,則 on_error回呼是必要的,而且會處理錯誤,如下所示:

  • 如果事件無法在指定的逾時內排入佇列,則會直接引發錯誤。

  • 如果成功排入佇列之後無法傳送事件,則會呼叫 on_error回 呼。

max_buffer_length
int

僅限緩衝模式。 每個分割區的事件總數,可在觸發排清之前進行緩衝處理。 預設值為緩衝模式中的 1500。

max_wait_time
Optional[float]

僅限緩衝模式。 在發佈之前,等候批次在緩衝區中建置事件的時間量。 預設值為緩衝模式中的 1。

logging_enable
bool

是否要將網路追蹤記錄輸出至記錄器。 預設值為 False

auth_timeout
float

等候服務授權權杖的秒數。 預設值為 60 秒。 如果設定為 0,則不會從用戶端強制執行逾時。

user_agent
str

如果指定,這會新增到使用者代理程式字串前面。

retry_total
int

發生錯誤時,嘗試取消復原失敗作業的嘗試總數。 預設值為 3。

retry_backoff_factor
float

第二次嘗試之後要套用的輪詢因素, (大部分的錯誤都會立即由第二次嘗試解決,而不會延遲) 。 在固定模式中,重試原則一律會睡眠 {backoff factor}。 在 「指數」模式中,重試原則會睡眠: {backoff factor} * (2 ** ({total retries} - 1) ) 秒。 如果backoff_factor為 0.1,則重試會在重試之間睡眠 [0.0s, 0.2s, 0.4s, ...]。 預設值為 0.8。

retry_backoff_max
float

退班時間上限。 預設值為 120 秒, (2 分鐘) 。

retry_mode
str

重試嘗試之間的延遲行為。 支援的值為 'fixed' 或 'exponential',其中預設值為 'exponential'。

idle_timeout
float

逾時,以秒為單位,在此之後,如果沒有任何活動,此用戶端將會關閉基礎連線。 根據預設,此值為 None,表示除非服務起始,否則用戶端不會因為閒置而關閉。

transport_type
TransportType

將用於與事件中樞服務通訊的傳輸通訊協定類型。 預設值為 TransportType.Amqp ,在此情況下會使用埠 5671。 如果網路環境中無法使用/封鎖埠 5671,則可改用 TransportType.AmqpOverWebsocket 來使用埠 443 進行通訊。

http_proxy
dict

HTTP Proxy 設定。 這必須是具有下列索引鍵的字典: 'proxy_hostname' ( str 值) 和 'proxy_port' (int 值) 。 此外,也可能會出現下列金鑰: 'username'、'password'

custom_endpoint_address
Optional[str]

用來建立事件中樞服務的連線的自訂端點位址,允許透過任何應用程式閘道或其他主機環境所需的路徑路由傳送網路要求。 預設值為 None。 格式會像 「sb:// < custom_endpoint_hostname > : < custom_endpoint_port > 」。 如果未在 custom_endpoint_address中指定埠,預設會使用埠 443。

connection_verify
Optional[str]

SSL 憑證之自訂CA_BUNDLE檔案的路徑,用來驗證連線端點的身分識別。 預設值為 None,在此情況下會使用 certifi.where ()

uamqp_transport
bool

是否要使用 uamqp 程式庫作為基礎傳輸。 預設值為 False,而純 Python AMQP 程式庫將作為基礎傳輸。

socket_timeout
float

連接上基礎通訊端在傳送和接收資料之前應該等候的時間,以秒為單位。TransportType.Amqp 的預設值為 0.2,而 TransportType.AmqpOverWebsocket 的預設值為 1。 如果 EventHubsConnectionError 錯誤是因為寫入逾時而發生,可能需要傳入大於預設值。 這適用于進階使用案例,而且預設值通常就已足夠。

範例

建立 EventHubProducerClient 的新實例。


   import os
   from azure.eventhub.aio import EventHubProducerClient, EventHubSharedKeyCredential

   fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
   shared_access_key = os.environ['EVENT_HUB_SAS_KEY']

   producer = EventHubProducerClient(fully_qualified_namespace=fully_qualified_namespace,
                                     eventhub_name=eventhub_name,
                                     credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key))

方法

close

關閉產生者用戶端基礎 AMQP 連線和連結。

create_batch

建立 EventDataBatch 物件,其大小上限為受max_size_in_bytes限制的所有內容。

max_size_in_bytes不應大於服務所定義的允許訊息大小上限。

flush

僅限緩衝模式。 如果用戶端在緩衝模式中運作,則清除緩衝區中要立即傳送的事件。

from_connection_string

從連接字串建立 EventHubProducerClient。

get_buffered_event_count

已緩衝處理和等待針對指定分割區發行的事件數目。 以非緩衝模式傳回 None。 注意:事件緩衝區是在背景協同程式中處理,因此此 API 所報告的緩衝區中事件數目應該只視為近似值,而且只建議用於偵錯。 對於未緩衝處理事件的分割區識別碼,不論該分割區識別碼是否確實存在於事件中樞內,都會傳回 0。

get_eventhub_properties

取得事件中樞的屬性。

傳回字典中的索引鍵包括:

  • eventhub_name str) (

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

get_partition_ids

取得事件中樞的資料分割識別碼。

get_partition_properties

取得指定之分割區的屬性。

屬性字典中的索引鍵包括:

  • eventhub_name str) (

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset str) (

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

send_batch

傳送事件資料的批次。 根據預設,方法會封鎖,直到收到通知或作業逾時為止。如果 EventHubProducerClient 設定為以緩衝模式執行,方法會將事件加入本機緩衝區並傳回。 製作者會在背景自動傳送。

如果 buffered_modeFalse,on_error 回呼是選擇性的,而且會處理錯誤,如下所示:

  • 如果在產生者用戶端具現化期間傳遞 on_error 回呼,

    然後,錯誤資訊會傳遞至 on_error 回呼,然後呼叫。

  • 如果在用戶端具現化期間未傳入 on_error 回呼,

    然後,預設會引發錯誤。

如果 buffered_mode 為 True, 則需要on_error 回呼,而且會處理錯誤,如下所示:

  • 如果事件無法在指定的逾時內排入佇列,則會直接引發錯誤。

  • 如果成功排入佇列之後無法傳送事件,則會呼叫 on_error 回呼。

在緩衝模式中,傳送批次會保持不變,並以單一單位的形式傳送。 批次不會重新排列。 這可能會導致傳送事件效率不佳。

如果您要傳送 EventDataAmqpAnnotatedMessage 的有限清單,而且您知道它位於事件中樞畫面大小限制內,您可以使用 send_batch 呼叫來傳送它們。 否則,請使用 create_batch 來建立 EventDataBatch ,並將 EventDataAmqpAnnotatedMessage 新增至批次,直到大小限制為止,然後呼叫此方法以傳送批次。

send_event

傳送事件資料。 根據預設,方法會封鎖,直到收到通知或作業逾時為止。如果 EventHubProducerClient 設定為以緩衝模式執行,方法會將事件加入本機緩衝區並傳回。 產生者會在背景中自動批次處理和傳送。

如果 buffered_mode 為 False, 則on_error 回呼是選擇性的,而且錯誤會依照下列方式處理:* 如果在產生者用戶端具現化期間傳遞 on_error 回呼,

       then error information will be passed to the *on_error* callback, which will then be called.

  * If an *on_error* callback is not passed in during client instantiation,

       then the error will be raised by default.

如果 buffered_mode 為 True, on_error回呼是必要的,而且錯誤會依照下列方式處理:* 如果事件無法在指定的逾時內排入佇列,則會直接引發錯誤。

  * If events fail to send after enqueuing successfully, the *on_error* callback will be called.

close

關閉產生者用戶端基礎 AMQP 連線和連結。

async close(*, flush: bool = True, **kwargs: Any) -> None

參數

flush
bool

僅限緩衝模式。 如果設定為 True,則會立即傳送緩衝區中的事件。 預設值是 True。

timeout
floatNone

僅限緩衝模式。 關閉產生者的逾時。 預設值為 None,表示沒有逾時。

傳回類型

例外狀況

如果 flush 設定為 True 或關閉緩衝模式中基礎 AMQP 連線,則排清緩衝區時發生錯誤。

範例

關閉處理常式。


   import os
   from azure.eventhub.aio import EventHubProducerClient
   from azure.eventhub import EventData

   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']

   producer = EventHubProducerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )
   try:
       event_data_batch = await producer.create_batch()
       while True:
           try:
               event_data_batch.add(EventData('Message inside EventBatchData'))
           except ValueError:
               # The EventDataBatch object reaches its max_size.
               # You can send the full EventDataBatch object and create a new one here.
               break
       await producer.send_batch(event_data_batch)
   finally:
       # Close down the producer handler.
       await producer.close()

create_batch

建立 EventDataBatch 物件,其大小上限為受max_size_in_bytes限制的所有內容。

max_size_in_bytes不應大於服務所定義的允許訊息大小上限。

async create_batch(*, partition_id: str | None = None, partition_key: str | None = None, max_size_in_bytes: int | None = None) -> EventDataBatch

傳回類型

例外狀況

如果 flush 設定為 True 或關閉緩衝模式中基礎 AMQP 連線,則排清緩衝區時發生錯誤。

範例

在有限大小內建立 EventDataBatch 物件


       from azure.eventhub import EventData
       event_data_batch = await producer.create_batch()
       while True:
           try:
               event_data_batch.add(EventData('Message inside EventBatchData'))
           except ValueError:
               # The EventDataBatch object reaches its max_size.
               # You can send the full EventDataBatch object and create a new one here.
               break

flush

僅限緩衝模式。 如果用戶端在緩衝模式中運作,則清除緩衝區中要立即傳送的事件。

async flush(**kwargs: Any) -> None

參數

timeout
floatNone

排清緩衝事件的逾時,預設值為 None,表示沒有逾時。

傳回類型

例外狀況

如果產生者無法在緩衝模式的指定逾時內排清緩衝區。

from_connection_string

從連接字串建立 EventHubProducerClient。

from_connection_string(conn_str: str, *, eventhub_name: str | None = None, buffered_mode: bool = False, on_error: Callable[[List[EventData | AmqpAnnotatedMessage], str | None, Exception], Awaitable[None]] | None = None, on_success: Callable[[List[EventData | AmqpAnnotatedMessage], str | None], Awaitable[None]] | None = None, max_buffer_length: int | None = None, max_wait_time: float | None = None, logging_enable: bool = False, http_proxy: Dict[str, str | int] | None = None, auth_timeout: float = 60, user_agent: str | None = None, retry_total: int = 3, transport_type: TransportType = TransportType.Amqp, **kwargs: Any) -> EventHubProducerClient

參數

conn_str
str
必要

事件中樞的連接字串。

eventhub_name
str

要連接用戶端的特定事件中樞路徑。

buffered_mode
bool

如果為 True,產生者用戶端會在緩衝區中收集事件、有效率地批次,然後發佈。 預設值是 False。

on_success
Optional[Callable[[<xref:SendEventTypes>, Optional[str]], Awaitable[None]]]

成功發佈批次後要呼叫的回呼。 回呼會採用兩個參數:

  • events:已成功發行的事件清單

  • partition_id:清單中事件已發佈至的資料分割識別碼。

回呼函式的定義方式如下: on_success (事件,partition_id) 。 當 buffered_mode 為 True 時,如果 buffered_mode 為 False,則為必要項。

on_error
Optional[Callable[[<xref:SendEventTypes>, Optional[str], Exception], Awaitable[None]]]

批次發佈失敗後要呼叫的回呼。 回呼函式的定義方式如下: on_error (事件、partition_id、錯誤) ,其中:

  • events:無法發行的事件清單。

  • partition_id:清單中事件嘗試發行至 和 的分割區識別碼

  • error:與傳送失敗相關的例外狀況。

如果 buffered_modeFalse,on_error 回呼是選擇性的,而且錯誤會處理如下:

  • 如果在產生者用戶端具現化期間傳遞 on_error 回呼,

    接著,錯誤資訊會傳遞至 on_error回 呼,然後呼叫該回呼。

  • 如果在用戶端具現化期間未傳入 on_error 回呼,

    然後,預設會引發錯誤。

如果 buffered_modeTrue,則 on_error回呼是必要的,而且會處理錯誤,如下所示:

  • 如果事件無法在指定的逾時內排入佇列,則會直接引發錯誤。

  • 如果成功排入佇列之後無法傳送事件,則會呼叫 on_error回 呼。

max_buffer_length
int

僅限緩衝模式。 每個分割區的事件總數,可在觸發排清之前進行緩衝處理。 預設值為緩衝模式中的 1500。

max_wait_time
Optional[float]

僅限緩衝模式。 在發佈之前,等候批次在緩衝區中建置事件的時間量。 預設值為緩衝模式中的 1。

logging_enable
bool

是否要將網路追蹤記錄輸出至記錄器。 預設值為 False

http_proxy
dict

HTTP Proxy 設定。 這必須是具有下列索引鍵的字典: 'proxy_hostname' ( str 值) 和 'proxy_port' (int 值) 。 此外,也可能會出現下列金鑰: 'username'、'password'

auth_timeout
float

等候服務授權權杖的秒數。 預設值為 60 秒。 如果設定為 0,則不會從用戶端強制執行逾時。

user_agent
str

如果指定,這會新增到使用者代理程式字串前面。

retry_total
int

發生錯誤時,嘗試取消復原失敗作業的嘗試總數。 預設值為 3。

retry_backoff_factor
float

第二次嘗試之後要套用的輪詢因素, (大部分的錯誤都會立即由第二次嘗試解決,而不會延遲) 。 在固定模式中,重試原則一律會睡眠 {backoff factor}。 在 「指數」模式中,重試原則會睡眠: {backoff factor} * (2 ** ({total retries} - 1) ) 秒。 如果backoff_factor為 0.1,則重試會在重試之間睡眠 [0.0s, 0.2s, 0.4s, ...]。 預設值為 0.8。

retry_backoff_max
float

退班時間上限。 預設值為 120 秒, (2 分鐘) 。

retry_mode
str

重試嘗試之間的延遲行為。 支援的值為 'fixed' 或 'exponential',其中預設值為 'exponential'。

idle_timeout
float

逾時,以秒為單位,在此之後,如果沒有任何活動,此用戶端將會關閉基礎連線。 根據預設,此值為 None,表示除非服務起始,否則用戶端不會因為閒置而關閉。

transport_type
TransportType

將用於與事件中樞服務通訊的傳輸通訊協定類型。 預設值為 TransportType.Amqp ,在此情況下會使用埠 5671。 如果網路環境中無法使用/封鎖埠 5671,則可改用 TransportType.AmqpOverWebsocket 來使用埠 443 進行通訊。

custom_endpoint_address
Optional[str]

用來建立事件中樞服務的連線的自訂端點位址,允許透過任何應用程式閘道或其他主機環境所需的路徑路由傳送網路要求。 預設值為 None。 格式會像 「sb:// < custom_endpoint_hostname > : < custom_endpoint_port > 」。 如果未在 custom_endpoint_address中指定埠,預設會使用埠 443。

connection_verify
Optional[str]

SSL 憑證之自訂CA_BUNDLE檔案的路徑,用來驗證連線端點的身分識別。 預設值為 None,在此情況下會使用 certifi.where ()

uamqp_transport
bool

是否要使用 uamqp 程式庫作為基礎傳輸。 預設值為 False,而純 Python AMQP 程式庫將作為基礎傳輸。

傳回類型

例外狀況

如果 flush 設定為 True 或關閉緩衝模式中基礎 AMQP 連線,則排清緩衝區時發生錯誤。

範例

從 連接字串 建立 EventHubProducerClient 的新實例。


   import os
   from azure.eventhub.aio import EventHubProducerClient
   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   producer = EventHubProducerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

get_buffered_event_count

已緩衝處理和等待針對指定分割區發行的事件數目。 以非緩衝模式傳回 None。 注意:事件緩衝區是在背景協同程式中處理,因此此 API 所報告的緩衝區中事件數目應該只視為近似值,而且只建議用於偵錯。 對於未緩衝處理事件的分割區識別碼,不論該分割區識別碼是否確實存在於事件中樞內,都會傳回 0。

get_buffered_event_count(partition_id: str) -> int | None

參數

partition_id
str
必要

目標分割區識別碼。

傳回類型

int,

例外狀況

如果 flush 設定為 True 或關閉緩衝模式中基礎 AMQP 連線,則排清緩衝區時發生錯誤。

get_eventhub_properties

取得事件中樞的屬性。

傳回字典中的索引鍵包括:

  • eventhub_name str) (

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

async get_eventhub_properties() -> Dict[str, Any]

傳回

包含事件中樞相關資訊的字典。

傳回類型

例外狀況

get_partition_ids

取得事件中樞的資料分割識別碼。

async get_partition_ids() -> List[str]

傳回

資料分割識別碼的清單。

傳回類型

例外狀況

get_partition_properties

取得指定之分割區的屬性。

屬性字典中的索引鍵包括:

  • eventhub_name str) (

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset str) (

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

async get_partition_properties(partition_id: str) -> Dict[str, Any]

參數

partition_id
str
必要

目標分割區識別碼。

傳回

分割區屬性的聽寫。

傳回類型

例外狀況

send_batch

傳送事件資料的批次。 根據預設,方法會封鎖,直到收到通知或作業逾時為止。如果 EventHubProducerClient 設定為以緩衝模式執行,方法會將事件加入本機緩衝區並傳回。 製作者會在背景自動傳送。

如果 buffered_modeFalse,on_error 回呼是選擇性的,而且會處理錯誤,如下所示:

  • 如果在產生者用戶端具現化期間傳遞 on_error 回呼,

    然後,錯誤資訊會傳遞至 on_error 回呼,然後呼叫。

  • 如果在用戶端具現化期間未傳入 on_error 回呼,

    然後,預設會引發錯誤。

如果 buffered_mode 為 True, 則需要on_error 回呼,而且會處理錯誤,如下所示:

  • 如果事件無法在指定的逾時內排入佇列,則會直接引發錯誤。

  • 如果成功排入佇列之後無法傳送事件,則會呼叫 on_error 回呼。

在緩衝模式中,傳送批次會保持不變,並以單一單位的形式傳送。 批次不會重新排列。 這可能會導致傳送事件效率不佳。

如果您要傳送 EventDataAmqpAnnotatedMessage 的有限清單,而且您知道它位於事件中樞畫面大小限制內,您可以使用 send_batch 呼叫來傳送它們。 否則,請使用 create_batch 來建立 EventDataBatch ,並將 EventDataAmqpAnnotatedMessage 新增至批次,直到大小限制為止,然後呼叫此方法以傳送批次。

async send_batch(event_data_batch: EventDataBatch | List[EventData | AmqpAnnotatedMessage], **kwargs: Any) -> None

參數

event_data_batch
Union[EventDataBatch, List[Union[EventData, AmqpAnnotatedMessage]]
必要

要傳送的 EventDataBatch 物件,或要批次傳送的 EventData 清單。 清單中的所有 EventDataAmqpAnnotatedMessageEventDataBatch 都會落在相同的分割區上。

timeout
float

以非緩衝模式傳送事件資料的最大等候時間,或將事件資料加入緩衝模式中緩衝區的最大等候時間。 在非緩衝模式中,將會使用建立產生者時所指定的預設等候時間。 在緩衝模式中,預設等候時間為 None。

partition_id
str

要傳送至的特定資料分割識別碼。 預設值為 None,在此情況下,服務會使用迴圈配置資源指派給所有分割區。 如果指定了partition_id,而且event_data_batch是EventDataBatch,則會引發TypeError,因為EventDataBatch本身有partition_id。

partition_key
str

使用指定的partition_key,事件資料會傳送至服務所決定之事件中樞的特定分割區。 如果指定partition_key且event_data_batch為EventDataBatch,則會引發TypeError,因為EventDataBatch本身有partition_key。 如果同時提供partition_id和partition_key,則partition_id優先。 警告:不建議在要傳送的事件上設定非字串值的partition_key,因為事件中樞服務將會忽略partition_key,而且事件將會使用迴圈配置資源指派給所有分割區。 此外,有 SDK 可用來取用預期只有字串類型的事件partition_key,它們可能無法剖析非字串值。

傳回類型

例外狀況

如果 timeout 參數所指定的值經過,事件才能以非緩衝模式傳送,或事件可以加入緩衝模式的緩衝處理中。

範例

以非同步方式傳送事件資料


       async with producer:
           event_data_batch = await producer.create_batch()
           while True:
               try:
                   event_data_batch.add(EventData('Message inside EventBatchData'))
               except ValueError:
                   # The EventDataBatch object reaches its max_size.
                   # You can send the full EventDataBatch object and create a new one here.
                   break
           await producer.send_batch(event_data_batch)

send_event

傳送事件資料。 根據預設,方法會封鎖,直到收到通知或作業逾時為止。如果 EventHubProducerClient 設定為以緩衝模式執行,方法會將事件加入本機緩衝區並傳回。 產生者會在背景中自動批次處理和傳送。

如果 buffered_mode 為 False, 則on_error 回呼是選擇性的,而且錯誤會依照下列方式處理:* 如果在產生者用戶端具現化期間傳遞 on_error 回呼,

       then error information will be passed to the *on_error* callback, which will then be called.

  * If an *on_error* callback is not passed in during client instantiation,

       then the error will be raised by default.

如果 buffered_mode 為 True, on_error回呼是必要的,而且錯誤會依照下列方式處理:* 如果事件無法在指定的逾時內排入佇列,則會直接引發錯誤。

  * If events fail to send after enqueuing successfully, the *on_error* callback will be called.
async send_event(event_data: EventData | AmqpAnnotatedMessage, **kwargs: Any) -> None

參數

event_data
Union[EventData, AmqpAnnotatedMessage]
必要

要傳送 的 EventData 物件。

timeout
float

以非緩衝模式傳送事件資料的最大等候時間,或將事件資料加入緩衝模式中緩衝區的最大等候時間。 在非緩衝模式中,將會使用建立產生者時所指定的預設等候時間。 在緩衝模式中,預設等候時間為 None。

partition_id
str

要傳送至的特定分割區識別碼。 預設值為 None,在此情況下,服務會使用迴圈配置資源指派給所有分割區。 如果指定了 partition_id,且 event_data_batch 是EventDataBatch,則會引發TypeError,因為EventDataBatch本身有partition_id。

partition_key
str

使用指定的partition_key,事件資料將會傳送至服務決定的事件中樞特定分割區。 如果指定了 partition_key,且 event_data_batch 為EventDataBatch,則會引發TypeError,因為EventDataBatch本身有partition_key。 如果同時提供partition_id和partition_key,則partition_id優先。 警告:不建議在要傳送的事件上設定非字串值的partition_key,因為事件中樞服務將會忽略partition_key,而且事件將會使用迴圈配置資源指派給所有分割區。 此外,有 SDK 可用來取用預期只有字串類型的事件partition_key,它們可能無法剖析非字串值。

傳回類型

例外狀況

如果 timeout 參數所指定的值會經過,事件才能以非緩衝模式傳送,或事件無法加入緩衝模式中緩衝處理的事件。

屬性

total_buffered_event_count

目前緩衝處理並等待發佈的所有分割區的事件總數。 以非緩衝模式傳回 None。 注意:事件緩衝區是在背景協同程式中處理,因此此 API 所報告的緩衝區中事件數目應該只視為近似值,而且只建議用於偵錯。

傳回類型

int,