共用方式為


EventHubProducerClient 類別

EventHubProducerClient 類別會定義將事件傳送至Azure 事件中樞服務的高階介面。

繼承
azure.eventhub._client_base.ClientBase
EventHubProducerClient

建構函式

EventHubProducerClient(fully_qualified_namespace: str, eventhub_name: str, credential: CredentialTypes, *, buffered_mode: Literal[False] = False, **kwargs: Any)

參數

fully_qualified_namespace
str
必要

事件中樞命名空間的完整主機名稱。 這可能類似于 .servicebus.windows.net

eventhub_name
str
必要

要連接用戶端的特定事件中樞路徑。

credential
TokenCredentialAzureSasCredentialAzureNamedKeyCredential
必要

用於驗證的認證物件,會實作用於取得權杖的特定介面。 它會接受 EventHubSharedKeyCredential azure 身分識別程式庫所產生的認證物件,以及實作 *get_token (自我、 範圍) 方法的物件。

buffered_mode
bool

如果為 True,產生者用戶端會在緩衝區中收集事件,並有效率地批次,然後發佈。 預設值是 False。

buffer_concurrency
<xref:ThreadPoolExecutor> 或 intNone

用來發佈事件的 ThreadPoolExecutor 或 ThreadPoolExecutor 的背景工作角色數目。 預設值為 None,且會針對每個建立預設背景工作角色數目的 ThreadPoolExecutor https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor

on_success
Optional[Callable[[<xref:SendEventTypes>, Optional[str]], None]]

成功發佈批次後要呼叫的回呼。 回呼採用兩個參數:

  • events:已成功發行的事件清單

  • partition_id:清單中事件已發行至的資料分割識別碼。

回呼函式的定義方式如下: on_success (事件,partition_id) 。 當 buffered_mode 為 True 時,如果 buffered_mode 為 False,則為必要專案。

on_error
Optional[Callable[[<xref:SendEventTypes>, Optional[str], Exception], None]]

一旦批次無法發佈之後,要呼叫的回呼。 回呼函式的定義方式如下: on_error (事件、partition_id、錯誤) ,其中:

  • events:無法發佈的事件清單。

  • partition_id:清單中事件嘗試發行至 和 的分割區識別碼

  • error:與傳送失敗相關的例外狀況。

如果 buffered_modeFalse,on_error 回呼是選擇性的,而且會處理錯誤,如下所示:

  • 如果在產生者用戶端具現化期間傳遞 on_error 回呼,

    然後,錯誤資訊會傳遞至 on_error 回呼,然後呼叫。

  • 如果在用戶端具現化期間未傳入 on_error 回呼,

    然後,預設會引發錯誤。

如果 buffered_mode 為 True, 則需要on_error 回呼,而且會處理錯誤,如下所示:

  • 如果事件無法在指定的逾時內排入佇列,則會直接引發錯誤。

  • 如果成功排入佇列之後無法傳送事件,則會呼叫 on_error 回呼。

max_buffer_length
int

僅限緩衝模式。 每個分割區的事件總數,這些事件可以在觸發排清之前進行緩衝處理。 預設值為緩衝模式中的 1500。

max_wait_time
Optional[float]

僅限緩衝模式。 在發佈之前,等候批次以緩衝區中的事件建置的時間量。 預設值為緩衝模式中的 1。

logging_enable
bool

是否要將網路追蹤記錄輸出至記錄器。 預設值為 False

auth_timeout
float

等候服務授權權杖的秒數。 預設值為 60 秒。 如果設定為 0,則不會從用戶端強制執行逾時。

user_agent
str

如果指定,這會在使用者代理程式字串前面新增。

retry_total
int

發生錯誤時,重做失敗作業的嘗試總數。 預設值為 3。

retry_backoff_factor
float

第二次嘗試之後套用的輪詢因素 (大部分錯誤都會立即由第二次嘗試解決,而不會延遲) 。 在固定模式中,重試原則一律會睡眠 {backoff factor}。 在 「指數」模式中,重試原則會睡眠: {backoff factor} * (2 ** ({total retries} - 1) ) 秒。 如果backoff_factor為 0.1,則重試會在重試之間進入 [0.0s、0.2s、0.4s...] 的睡眠狀態。 預設值為 0.8。

retry_backoff_max
float

回溯時間上限。 預設值為 120 秒, (2 分鐘) 。

retry_mode
str

重試嘗試之間的延遲行為。 支援的值為 'fixed' 或 'exponential',其中預設值為 'exponential'。

idle_timeout
float

逾時,以秒為單位,在此之後,如果沒有活動,此用戶端將會關閉基礎連線。 根據預設,此值為 None,這表示除非由服務起始,否則用戶端不會因為閒置而關閉。

transport_type
TransportType

將用於與事件中樞服務通訊的傳輸通訊協定類型。 預設值為 TransportType.Amqp ,在此情況下會使用埠 5671。 如果網路環境中無法使用/封鎖埠 5671,則可改用 TransportType.AmqpOverWebsocket 來使用埠 443 進行通訊。

http_proxy
Dict

HTTP Proxy 設定。 這必須是具有下列索引鍵的字典: 'proxy_hostname' (str 值) 和 'proxy_port' (int 值) 。 此外,也可能有下列金鑰: 'username'、'password'

custom_endpoint_address
Optional[str]

用來建立事件中樞服務的連線的自訂端點位址,允許網路要求透過任何應用程式閘道或其他主機環境所需的路徑路由傳送。 預設值為 None。 格式會像 「sb:// < custom_endpoint_hostname > : < custom_endpoint_port > 」。 如果未在 custom_endpoint_address中指定埠,則會預設使用埠 443。

connection_verify
Optional[str]

SSL 憑證的自訂CA_BUNDLE檔案路徑,用來驗證連線端點的身分識別。 預設值為 None, 其中會使用 certifi.where ()

uamqp_transport
bool

是否要使用 uamqp 程式庫作為基礎傳輸。 預設值為 False,且純 Python AMQP 程式庫將作為基礎傳輸。

socket_timeout
float

連線上基礎通訊端在傳送和接收資料時應該等待的時間,以秒為單位逾時。TransportType.Amqp 的預設值為 0.2,而 TransportType.AmqpOverWebsocket 的預設值為 1。 如果 EventHubsConnectionError 錯誤因為寫入逾時而發生,可能需要傳入大於預設值。 這適用于進階使用案例,而且預設值通常應該足夠。

範例

建立 EventHubProducerClient 的新實例。


   import os
   from azure.eventhub import EventHubProducerClient, EventHubSharedKeyCredential

   fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
   shared_access_key = os.environ['EVENT_HUB_SAS_KEY']

   credential = EventHubSharedKeyCredential(shared_access_policy, shared_access_key)
   producer = EventHubProducerClient(
       fully_qualified_namespace=fully_qualified_namespace,
       eventhub_name=eventhub_name,  # EventHub name should be specified if it doesn't show up in connection string.
       credential=credential
   )

方法

close

關閉產生者用戶端基礎 AMQP 連線和連結。

create_batch

建立 EventDataBatch 物件,其中包含受max_size_in_bytes限制之所有內容的大小上限。

max_size_in_bytes不應大於服務所定義的允許訊息大小上限。

flush

僅限緩衝模式。 如果用戶端在緩衝模式中運作,則會在緩衝區中排清要立即傳送的事件。

from_connection_string

從連接字串建立 EventHubProducerClient。

get_buffered_event_count

已緩衝處理並等候針對指定分割區發行的事件數目。 在非緩衝模式中傳回 None。 注意:事件緩衝區是在背景執行緒中處理,因此此 API 所報告的緩衝區中事件數目應該只視為近似值,而且只建議用於偵錯。 對於未緩衝處理事件的資料分割識別碼,不論該分割區識別碼是否確實存在於事件中樞內,都會傳回 0。

get_eventhub_properties

取得事件中樞的屬性。

傳回字典中的索引鍵包括:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

get_partition_ids

取得事件中樞的資料分割識別碼。

get_partition_properties

取得指定資料分割的屬性。

屬性字典中的索引鍵包括:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

send_batch

傳送事件資料的批次。 根據預設,方法會封鎖,直到收到通知或作業逾時為止。如果 EventHubProducerClient 設定為以緩衝模式執行,則如果指定並傳回,方法會嘗試將事件加入至指定時間內的緩衝區。 製作者會在緩衝模式的背景中自動傳送。

如果 buffered_modeFalse,on_error 回呼是選擇性的,而且錯誤會處理如下:

  • 如果在產生者用戶端具現化期間傳遞 on_error 回呼,

    接著,錯誤資訊會傳遞至 on_error回 呼,然後呼叫該回呼。

  • 如果在用戶端具現化期間未傳入 on_error 回呼,

    然後,預設會引發錯誤。

如果 buffered_modeTrue,則 on_error回呼是必要的,而且會處理錯誤,如下所示:

  • 如果事件無法在指定的逾時內排入佇列,則會直接引發錯誤。

  • 如果成功排入佇列之後無法傳送事件,則會呼叫 on_error回 呼。

在緩衝模式中,傳送批次會保持不變,並以單一單位傳送。 批次不會重新排列。 這可能會導致傳送事件效率不佳。

如果您要傳送 EventDataAmqpAnnotatedMessage 的有限清單,而且您知道它位於事件中樞畫面大小限制內,您可以使用 send_batch 呼叫來傳送它們。 否則,請使用 create_batch 建立 EventDataBatch ,並將 EventDataAmqpAnnotatedMessage 新增至批次一次,直到大小限制為止,然後呼叫此方法以傳送批次。

send_event

傳送事件資料。 根據預設,方法會封鎖,直到收到通知或作業逾時為止。如果 EventHubProducerClient 設定為以緩衝模式執行,則如果指定並傳回,方法會嘗試將事件加入至指定時間內的緩衝區。 製作者會在緩衝模式的背景中自動傳送。

如果 buffered_modeFalse,on_error 回呼是選擇性的,而且會處理錯誤,如下所示:* 如果在產生者用戶端具現化期間傳遞 on_error 回呼,

       then error information will be passed to the *on_error* callback, which will then be called.

  * If an *on_error* callback is not passed in during client instantiation,

       then the error will be raised by default.

如果 buffered_mode 為 True, 則on_error 回呼是必要的,而且會處理錯誤,如下所示:* 如果事件無法在指定的逾時內排入佇列,則會直接引發錯誤。

  * If events fail to send after enqueuing successfully, the *on_error* callback will be called.

close

關閉產生者用戶端基礎 AMQP 連線和連結。

close(*, flush: bool = True, **kwargs: Any) -> None

參數

flush
bool

僅限緩衝模式。 如果設定為 True,則會立即傳送緩衝區中的事件。 預設值是 True。

timeout
floatNone

僅限緩衝模式。 關閉產生者的逾時。 預設值為 None,表示沒有逾時。

傳回類型

例外狀況

如果 flush 設定為 True 或關閉緩衝模式中基礎 AMQP 連線,則排清緩衝區時發生錯誤。

範例

關閉用戶端。


   import os
   from azure.eventhub import EventHubProducerClient, EventData

   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']

   producer = EventHubProducerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )
   try:
       event_data_batch = producer.create_batch()

       while True:
           try:
               event_data_batch.add(EventData('Message inside EventBatchData'))
           except ValueError:
               # EventDataBatch object reaches max_size.
               # New EventDataBatch object can be created here to send more data
               break

       producer.send_batch(event_data_batch)
   finally:
       # Close down the producer handler.
       producer.close()

create_batch

建立 EventDataBatch 物件,其中包含受max_size_in_bytes限制之所有內容的大小上限。

max_size_in_bytes不應大於服務所定義的允許訊息大小上限。

create_batch(**kwargs: Any) -> EventDataBatch

傳回類型

例外狀況

如果 flush 設定為 True 或關閉緩衝模式中基礎 AMQP 連線,則排清緩衝區時發生錯誤。

範例

在有限的大小內建立 EventDataBatch 物件


       event_data_batch = producer.create_batch()
       while True:
           try:
               event_data_batch.add(EventData('Message inside EventBatchData'))
           except ValueError:
               # The EventDataBatch object reaches its max_size.
               # You can send the full EventDataBatch object and create a new one here.
               break

flush

僅限緩衝模式。 如果用戶端在緩衝模式中運作,則會在緩衝區中排清要立即傳送的事件。

flush(**kwargs: Any) -> None

參數

timeout
Optional[float]

排清緩衝事件的逾時,預設值為 None,表示沒有逾時。

傳回類型

例外狀況

如果產生者無法在緩衝模式的指定逾時內排清緩衝區。

from_connection_string

從連接字串建立 EventHubProducerClient。

from_connection_string(conn_str: str, *, eventhub_name: str | None = None, buffered_mode: Literal[False] = False, **kwargs: Any) -> EventHubProducerClient

參數

conn_str
str
必要

事件中樞的連接字串。

eventhub_name
str

要連接用戶端的特定事件中樞路徑。

buffered_mode
bool

如果為 True,產生者用戶端會在緩衝區中收集事件,並有效率地批次,然後發佈。 預設值是 False。

buffer_concurrency
<xref:ThreadPoolExecutor> 或 intNone

用來發佈事件的 ThreadPoolExecutor 或 ThreadPoolExecutor 的背景工作角色數目。 預設值為 None,且會針對每個建立預設背景工作角色數目的 ThreadPoolExecutor https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor

on_success
Optional[Callable[[<xref:SendEventTypes>, Optional[str]], None]]

成功發佈批次後要呼叫的回呼。 回呼採用兩個參數:

  • events:已成功發行的事件清單

  • partition_id:清單中事件已發行至的資料分割識別碼。

回呼函式的定義方式如下: on_success (事件,partition_id) 當 buffered_mode為 True 時為必要條件,如果buffered_mode為 False,則為選擇性。

on_error
Optional[Callable[[<xref:SendEventTypes>, Optional[str], Exception], None]]

一旦批次無法發佈之後,要呼叫的回呼。 在 buffered_mode 為 True 時為必要條件,如果 buffered_mode 為 False,則為選擇性。 回呼函式的定義方式如下: on_error (事件、partition_id、錯誤) ,其中:

  • events:無法發佈的事件清單。

  • partition_id:清單中事件嘗試發行至 和 的分割區識別碼

  • error:與傳送失敗相關的例外狀況。

如果 buffered_modeFalse,on_error 回呼是選擇性的,而且會處理錯誤,如下所示:

  • 如果在產生者用戶端具現化期間傳遞 on_error 回呼,

    然後,錯誤資訊會傳遞至 on_error 回呼,然後呼叫。

  • 如果在用戶端具現化期間未傳入 on_error 回呼,

    然後,預設會引發錯誤。

如果 buffered_mode 為 True, 則需要on_error 回呼,而且會處理錯誤,如下所示:

  • 如果事件無法在指定的逾時內排入佇列,則會直接引發錯誤。

  • 如果成功排入佇列之後無法傳送事件,則會呼叫 on_error 回呼。

max_buffer_length
int

僅限緩衝模式。 每個分割區的事件總數,這些事件可以在觸發排清之前進行緩衝處理。 預設值為緩衝模式中的 1500。

max_wait_time
Optional[float]

僅限緩衝模式。 在發佈之前,等候批次以緩衝區中的事件建置的時間量。 預設值為緩衝模式中的 1。

logging_enable
bool

是否要將網路追蹤記錄輸出至記錄器。 預設值為 False

http_proxy
Dict

HTTP Proxy 設定。 這必須是具有下列索引鍵的字典: 'proxy_hostname' (str 值) 和 'proxy_port' (int 值) 。 此外,也可能有下列金鑰: 'username'、'password'

auth_timeout
float

等候服務授權權杖的秒數。 預設值為 60 秒。 如果設定為 0,則不會從用戶端強制執行逾時。

user_agent
str

如果指定,這會在使用者代理程式字串前面新增。

retry_total
int

發生錯誤時,重做失敗作業的嘗試總數。 預設值為 3。

retry_backoff_factor
float

第二次嘗試之後套用的輪詢因素 (大部分錯誤都會立即由第二次嘗試解決,而不會延遲) 。 在固定模式中,重試原則一律會睡眠 {backoff factor}。 在 「指數」模式中,重試原則會睡眠: {backoff factor} * (2 ** ({total retries} - 1) ) 秒。 如果backoff_factor為 0.1,則重試會在重試之間進入 [0.0s、0.2s、0.4s...] 的睡眠狀態。 預設值為 0.8。

retry_backoff_max
float

回溯時間上限。 預設值為 120 秒, (2 分鐘) 。

retry_mode
str

重試嘗試之間的延遲行為。 支援的值為 'fixed' 或 'exponential',其中預設值為 'exponential'。

idle_timeout
float

逾時,以秒為單位,在此之後,如果沒有活動,此用戶端將會關閉基礎連線。 根據預設,此值為 None,這表示除非由服務起始,否則用戶端不會因為閒置而關閉。

transport_type
TransportType

將用於與事件中樞服務通訊的傳輸通訊協定類型。 預設值為 TransportType.Amqp ,在此情況下會使用埠 5671。 如果網路環境中無法使用/封鎖埠 5671,則可改用 TransportType.AmqpOverWebsocket 來使用埠 443 進行通訊。

http_proxy

HTTP Proxy 設定。 這必須是具有下列索引鍵的字典: 'proxy_hostname' (str 值) 和 'proxy_port' (int 值) 。 此外,也可能有下列金鑰: 'username'、'password'

custom_endpoint_address
Optional[str]

用來建立事件中樞服務的連線的自訂端點位址,允許網路要求透過任何應用程式閘道或其他主機環境所需的路徑路由傳送。 預設值為 None。 格式會像 「sb:// < custom_endpoint_hostname > : < custom_endpoint_port > 」。 如果未在 custom_endpoint_address中指定埠,則會預設使用埠 443。

connection_verify
Optional[str]

SSL 憑證的自訂CA_BUNDLE檔案路徑,用來驗證連線端點的身分識別。 預設值為 None, 其中會使用 certifi.where ()

uamqp_transport
bool

是否要使用 uamqp 程式庫作為基礎傳輸。 預設值為 False,且純 Python AMQP 程式庫將作為基礎傳輸。

傳回類型

例外狀況

如果 flush 設定為 True 或關閉緩衝模式中基礎 AMQP 連線,則排清緩衝區時發生錯誤。

範例

從 連接字串 建立 EventHubProducerClient 的新實例。


   import os
   from azure.eventhub import EventHubProducerClient
   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   producer = EventHubProducerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

get_buffered_event_count

已緩衝處理並等候針對指定分割區發行的事件數目。 在非緩衝模式中傳回 None。 注意:事件緩衝區是在背景執行緒中處理,因此此 API 所報告的緩衝區中事件數目應該只視為近似值,而且只建議用於偵錯。 對於未緩衝處理事件的資料分割識別碼,不論該分割區識別碼是否確實存在於事件中樞內,都會傳回 0。

get_buffered_event_count(partition_id: str) -> int | None

參數

partition_id
str
必要

目標分割區識別碼。

傳回類型

int,

例外狀況

如果 flush 設定為 True 或關閉緩衝模式中基礎 AMQP 連線,則排清緩衝區時發生錯誤。

get_eventhub_properties

取得事件中樞的屬性。

傳回字典中的索引鍵包括:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

get_eventhub_properties() -> Dict[str, Any]

傳回

包含 eventhub 屬性的字典。

傳回類型

例外狀況

get_partition_ids

取得事件中樞的資料分割識別碼。

get_partition_ids() -> List[str]

傳回

資料分割識別碼的清單。

傳回類型

例外狀況

get_partition_properties

取得指定資料分割的屬性。

屬性字典中的索引鍵包括:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

get_partition_properties(partition_id: str) -> Dict[str, Any]

參數

partition_id
str
必要

目標分割區識別碼。

傳回

資料分割屬性的字典。

傳回類型

例外狀況

send_batch

傳送事件資料的批次。 根據預設,方法會封鎖,直到收到通知或作業逾時為止。如果 EventHubProducerClient 設定為以緩衝模式執行,則如果指定並傳回,方法會嘗試將事件加入至指定時間內的緩衝區。 製作者會在緩衝模式的背景中自動傳送。

如果 buffered_modeFalse,on_error 回呼是選擇性的,而且錯誤會處理如下:

  • 如果在產生者用戶端具現化期間傳遞 on_error 回呼,

    接著,錯誤資訊會傳遞至 on_error回 呼,然後呼叫該回呼。

  • 如果在用戶端具現化期間未傳入 on_error 回呼,

    然後,預設會引發錯誤。

如果 buffered_modeTrue,則 on_error回呼是必要的,而且會處理錯誤,如下所示:

  • 如果事件無法在指定的逾時內排入佇列,則會直接引發錯誤。

  • 如果成功排入佇列之後無法傳送事件,則會呼叫 on_error回 呼。

在緩衝模式中,傳送批次會保持不變,並以單一單位傳送。 批次不會重新排列。 這可能會導致傳送事件效率不佳。

如果您要傳送 EventDataAmqpAnnotatedMessage 的有限清單,而且您知道它位於事件中樞畫面大小限制內,您可以使用 send_batch 呼叫來傳送它們。 否則,請使用 create_batch 建立 EventDataBatch ,並將 EventDataAmqpAnnotatedMessage 新增至批次一次,直到大小限制為止,然後呼叫此方法以傳送批次。

send_batch(event_data_batch: EventDataBatch | List[EventData | AmqpAnnotatedMessage], **kwargs: Any) -> None

參數

event_data_batch
Union[EventDataBatch, List[Union[EventData, AmqpAnnotatedMessage]]
必要

要傳送的 EventDataBatch 物件,或要批次中傳送 的 EventData 清單。 清單中的所有 EventDataAmqpAnnotatedMessageEventDataBatch 都會落在相同的分割區上。

timeout
float

以非緩衝模式傳送事件資料的最大等候時間,或將事件資料加入緩衝模式中緩衝區的最大等候時間。 在非緩衝模式中,將會使用建立產生者時所指定的預設等候時間。 在緩衝模式中,預設等候時間為 None。

partition_id
str

要傳送至的特定資料分割識別碼。 預設值為 None,在此情況下,服務會使用迴圈配置資源指派給所有分割區。 如果指定了partition_id,而且event_data_batch是EventDataBatch,則會引發TypeError,因為EventDataBatch本身有partition_id。

partition_key
str

使用指定的partition_key,事件資料會傳送至服務所決定之事件中樞的特定分割區。 如果指定partition_key且event_data_batch為EventDataBatch,則會引發TypeError,因為EventDataBatch本身有partition_key。 如果同時提供partition_id和partition_key,則partition_id優先。 警告:不建議在要傳送的事件上設定非字串值的partition_key,因為事件中樞服務將會忽略partition_key,事件將會使用迴圈配置資源指派給所有分割區。 此外,有 SDK 可用來取用預期只有字串類型的事件partition_key,它們可能無法剖析非字串值。

傳回類型

例外狀況

如果 timeout 參數所指定的值會經過,事件才能以非緩衝模式傳送,或事件無法加入緩衝模式中緩衝處理的事件。

範例

傳送事件資料


       with producer:
           event_data_batch = producer.create_batch()

           while True:
               try:
                   event_data_batch.add(EventData('Message inside EventBatchData'))
               except ValueError:
                   # EventDataBatch object reaches max_size.
                   # New EventDataBatch object can be created here to send more data
                   break

           producer.send_batch(event_data_batch)

send_event

傳送事件資料。 根據預設,方法會封鎖,直到收到通知或作業逾時為止。如果 EventHubProducerClient 設定為以緩衝模式執行,則如果指定並傳回,方法會嘗試將事件加入至指定時間內的緩衝區。 製作者會在緩衝模式的背景中自動傳送。

如果 buffered_modeFalse,on_error 回呼是選擇性的,而且會處理錯誤,如下所示:* 如果在產生者用戶端具現化期間傳遞 on_error 回呼,

       then error information will be passed to the *on_error* callback, which will then be called.

  * If an *on_error* callback is not passed in during client instantiation,

       then the error will be raised by default.

如果 buffered_mode 為 True, 則on_error 回呼是必要的,而且會處理錯誤,如下所示:* 如果事件無法在指定的逾時內排入佇列,則會直接引發錯誤。

  * If events fail to send after enqueuing successfully, the *on_error* callback will be called.
send_event(event_data: EventData | AmqpAnnotatedMessage, **kwargs: Any) -> None

參數

event_data
Union[EventData, AmqpAnnotatedMessage]
必要

要傳送的 EventData 物件。

timeout
float

以非緩衝模式傳送事件資料的最大等候時間,或將事件資料加入緩衝模式中緩衝區的最大等候時間。 在非緩衝模式中,將會使用建立產生者時所指定的預設等候時間。 在緩衝模式中,預設等候時間為 None。

partition_id
str

要傳送至的特定資料分割識別碼。 預設值為 None,在此情況下,服務會使用迴圈配置資源指派給所有分割區。 如果指定了partition_id,而且event_data_batch是EventDataBatch,則會引發TypeError,因為EventDataBatch本身有partition_id。

partition_key
str

使用指定的partition_key,事件資料會傳送至服務所決定之事件中樞的特定分割區。 如果指定partition_key且event_data_batch為EventDataBatch,則會引發TypeError,因為EventDataBatch本身有partition_key。 如果同時提供partition_id和partition_key,則partition_id優先。 警告:不建議在要傳送的事件上設定非字串值的partition_key,因為事件中樞服務將會忽略partition_key,事件將會使用迴圈配置資源指派給所有分割區。 此外,有 SDK 可用來取用預期只有字串類型的事件partition_key,它們可能無法剖析非字串值。

傳回類型

例外狀況

如果 timeout 參數所指定的值經過,事件才能以非緩衝模式傳送,或事件可以加入緩衝模式的緩衝處理中。

屬性

total_buffered_event_count

目前已緩衝處理並等候發佈的所有分割區的事件總數。 在非緩衝模式中傳回 None。 注意:事件緩衝區是在背景執行緒中處理,因此此 API 所報告的緩衝區中事件數目應該只視為近似值,而且只建議用於偵錯。

傳回類型

int,