EventHubProducerClient クラス

EventHubProducerClient クラスは、Azure Event Hubs サービスにイベントを送信するための高レベルのインターフェイスを定義します。

継承
azure.eventhub._client_base.ClientBase
EventHubProducerClient

コンストラクター

EventHubProducerClient(fully_qualified_namespace: str, eventhub_name: str, credential: CredentialTypes, *, buffered_mode: Literal[False] = False, **kwargs: Any)

パラメーター

fully_qualified_namespace
str
必須

Event Hubs 名前空間の完全修飾ホスト名。 これは.servicebus.windows.net に似ている可能性があります

eventhub_name
str
必須

クライアントを接続する特定のイベント ハブのパス。

credential
TokenCredential または AzureSasCredential または AzureNamedKeyCredential
必須

トークンを取得するための特定のインターフェイスを実装する認証に使用される資格情報オブジェクト。 azure-identity ライブラリによって生成された 、または資格情報オブジェクトと、*get_token(self, scopes) メソッドを実装するオブジェクトを受け入れますEventHubSharedKeyCredential

buffered_mode
bool

True の場合、プロデューサー クライアントはバッファー内のイベントを収集し、効率的にバッチ処理してから発行します。 既定値は False です。

buffer_concurrency
<xref:ThreadPoolExecutor> または int または None

イベントの発行に使用する ThreadPoolExecutor、または ThreadPoolExecutor のワーカーの数。 既定値は None で、ワーカーの既定の数を持つ ThreadPoolExecutor は 次のように作成されます。 https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor

on_success
Optional[Callable[[<xref:SendEventTypes>, Optional[str]], None]]

バッチが正常に発行された後に呼び出されるコールバック。 コールバックは、次の 2 つのパラメーターを受け取ります。

  • events: 正常に発行されたイベントの一覧

  • partition_id: リスト内のイベントが発行されたパーティション ID。

コールバック関数は、次のように定義する必要があります: on_success(events, partition_id)buffered_modeが True の場合は必須ですが、buffered_modeが False の場合は省略可能です。

on_error
Optional[Callable[[<xref:SendEventTypes>, Optional[str], Exception], None]]

バッチの発行に失敗した後に呼び出されるコールバック。 コールバック関数は、次のように定義する必要があります: on_error(events, partition_id, error), where:

  • events: 発行に失敗したイベントの一覧。

  • partition_id: リスト内のイベントが および に発行しようとしたパーティション ID

  • error: 送信エラーに関連する例外。

buffered_modeが False の場合、on_errorコールバックは省略可能であり、エラーは次のように処理されます。

  • プロデューサー クライアントのインスタンス化中に on_error コールバックが渡された場合、

    その後、エラー情報が on_error コールバックに渡され、呼び出されます。

  • クライアントのインスタンス化中に on_error コールバックが渡されない場合は、

    その場合、エラーは既定で発生します。

buffered_modeが True の場合、on_errorコールバックが必要であり、エラーは次のように処理されます。

  • 指定されたタイムアウト内にイベントがエンキューに失敗した場合は、エラーが直接発生します。

  • エンキューに成功した後にイベントの送信に失敗した場合は、 on_error コールバックが呼び出されます。

max_buffer_length
int

バッファー モードのみ。 フラッシュがトリガーされる前にバッファーに格納できるパーティションあたりのイベントの合計数。 既定値は、バッファー モードでは 1500 です。

max_wait_time
Optional[float]

バッファー モードのみ。 発行する前に、バッファー内のイベントを使用してバッチがビルドされるのを待機する時間。 バッファー モードの既定値は 1 です。

logging_enable
bool

ネットワーク トレース ログをロガーに出力するかどうか。 既定値は False です。

auth_timeout
float

サービスによってトークンが承認されるまでの待機時間 (秒単位)。 既定値は 60 秒です。 0 に設定すると、クライアントからタイムアウトは適用されません。

user_agent
str

指定した場合、これはユーザー エージェント文字列の前に追加されます。

retry_total
int

エラーが発生した場合に失敗した操作をやり直す試行の合計数。 既定値は 3 です。

retry_backoff_factor
float

2 回目の試行後の試行の間に適用されるバックオフ係数 (ほとんどのエラーは、2 回目の試行によって遅延なく直ちに解決されます)。 固定モードでは、再試行ポリシーは常に {backoff factor} のスリープ状態になります。 'exponential' モードでは、再試行ポリシーは {backoff factor} * (2 ** ({合計再試行回数} - 1)) 秒でスリープ状態になります。 backoff_factorが 0.1 の場合、再試行の間に [0.0s, 0.2s, 0.4s, ...] の再試行がスリープ状態になります。 既定値は 0.8 です。

retry_backoff_max
float

最大バックオフ時間。 既定値は 120 秒 (2 分) です。

retry_mode
str

再試行の間の遅延動作。 サポートされている値は 'fixed' または 'exponential' で、既定値は 'exponential' です。

idle_timeout
float

タイムアウト (秒単位)。その後、アクティビティがない場合、このクライアントは基になる接続を閉じます。 既定では、値は None です。つまり、サービスによって開始されない限り、非アクティブのためクライアントはシャットダウンされません。

transport_type
TransportType

Event Hubs サービスとの通信に使用されるトランスポート プロトコルの種類。 既定値は TransportType.Amqp です。この場合、ポート 5671 が使用されます。 ネットワーク環境でポート 5671 が使用できないかブロックされている場合は、 代わりに TransportType.AmqpOverWebsocket を使用して、通信にポート 443 を使用できます。

http_proxy
Dict

HTTP プロキシ設定。 これは、次のキーを持つディクショナリである必要があります: 'proxy_hostname' (str 値) と 'proxy_port' (int 値)。 さらに、次のキーが存在する場合もあります: 'username'、'password'

custom_endpoint_address
Optional[str]

Event Hubs サービスへの接続を確立するために使用するカスタム エンドポイント アドレス。これにより、ホスト環境に必要なアプリケーション ゲートウェイまたはその他のパスを介してネットワーク要求をルーティングできます。 既定値はなしです。 形式は "sb://< custom_endpoint_hostname>:<custom_endpoint_port>" のようになります。 custom_endpoint_addressで port が指定されていない場合、既定ではポート 443 が使用されます。

connection_verify
Optional[str]

接続エンドポイントの ID を認証するために使用される SSL 証明書のカスタム CA_BUNDLE ファイルへのパス。 既定値は None で、その場合 は certifi.where() が使用されます。

uamqp_transport
bool

基になるトランスポートとして uamqp ライブラリを使用するかどうか。 既定値は False で、Pure Python AMQP ライブラリが基になるトランスポートとして使用されます。

socket_timeout
float

データの送受信時に接続の基になるソケットがタイムアウトするまでに待機する時間 (秒単位)。既定値は、TransportType.Amqp の場合は 0.2、TransportType.AmqpOverWebsocket の場合は 1 です。 書き込みのタイムアウトが原因で EventHubsConnectionError エラーが発生している場合は、既定値より大きい値を渡す必要があります。 これは高度な使用シナリオの場合であり、通常は既定値で十分です。

EventHubProducerClient の新しいインスタンスを作成します。


   import os
   from azure.eventhub import EventHubProducerClient, EventHubSharedKeyCredential

   fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
   shared_access_key = os.environ['EVENT_HUB_SAS_KEY']

   credential = EventHubSharedKeyCredential(shared_access_policy, shared_access_key)
   producer = EventHubProducerClient(
       fully_qualified_namespace=fully_qualified_namespace,
       eventhub_name=eventhub_name,  # EventHub name should be specified if it doesn't show up in connection string.
       credential=credential
   )

メソッド

close

プロデューサー クライアントの基になる AMQP 接続とリンクを閉じます。

create_batch

max_size_in_bytesによって制限されているすべてのコンテンツの最大サイズを持つ EventDataBatch オブジェクトを作成します。

max_size_in_bytesは、サービスによって定義された最大許容メッセージ サイズを超えてはなりません。

flush

バッファー モードのみ。 クライアントがバッファー モードで動作している場合は、バッファー内のイベントをフラッシュしてすぐに送信します。

from_connection_string

接続文字列から EventHubProducerClient を作成します。

get_buffered_event_count

バッファーに格納され、特定のパーティションに対して発行されるのを待機しているイベントの数。 バッファーなしモードで None を返します。 注: イベント バッファーはバックグラウンド スレッドで処理されるため、この API によって報告されるバッファー内のイベントの数は近似値のみを考慮する必要があり、デバッグでの使用にのみ推奨されます。 バッファーに入れられたイベントがないパーティション ID の場合、そのパーティション ID がイベント ハブ内に実際に存在するかどうかに関係なく、0 が返されます。

get_eventhub_properties

イベント ハブのプロパティを取得します。

返されるディクショナリのキーは次のとおりです。

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

get_partition_ids

イベント ハブのパーティション ID を取得します。

get_partition_properties

指定したパーティションのプロパティを取得します。

プロパティ ディクショナリのキーは次のとおりです。

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

send_batch

イベント データのバッチを送信します。 既定では、受信確認が受信されるか、操作がタイムアウトするまで、 メソッドはブロックされます。 EventHubProducerClient がバッファー モードで実行されるように構成されている場合、メソッドは、指定された時間内にイベントをバッファーにエンキューして返します。 プロデューサーは、バッファー モードでバックグラウンドで自動送信を行います。

buffered_modeが False の場合、on_errorコールバックは省略可能であり、エラーは次のように処理されます。

  • プロデューサー クライアントのインスタンス化中に on_error コールバックが渡された場合、

    その後、エラー情報が on_error コールバックに渡され、呼び出されます。

  • クライアントのインスタンス化中に on_error コールバックが渡されない場合は、

    その場合、エラーは既定で発生します。

buffered_modeが True の場合、on_errorコールバックが必要であり、エラーは次のように処理されます。

  • 指定されたタイムアウト内にイベントをエンキューできない場合は、エラーが直接発生します。

  • エンキューが正常に完了した後にイベントが送信されない場合は、 on_error コールバックが呼び出されます。

バッファー モードでは、バッチの送信はそのまま残り、1 つのユニットとして送信されます。 バッチは再配置されません。 これにより、イベントを送信する効率が低下する可能性があります。

EventData または AmqpAnnotatedMessage の有限リストを送信していて、それがイベント ハブのフレーム サイズの制限内にあることがわかっている場合は、send_batch呼び出しで送信できます。 それ以外の場合は、 を使用 create_batch して EventDataBatch を作成し、サイズ制限まで EventData または AmqpAnnotatedMessage をバッチに 1 つずつ追加し、このメソッドを呼び出してバッチを送信します。

send_event

イベント データを送信します。 既定では、受信確認が受信されるか、操作がタイムアウトするまで、 メソッドはブロックされます。 EventHubProducerClient がバッファー モードで実行されるように構成されている場合、メソッドは、指定された時間内にイベントをバッファーにエンキューして返します。 プロデューサーは、バッファー モードでバックグラウンドで自動送信を行います。

buffered_modeが False の場合、on_errorコールバックは省略可能であり、エラーは次のように処理されます。 * プロデューサー クライアントのインスタンス化中にon_errorコールバックが渡された場合は、

       then error information will be passed to the *on_error* callback, which will then be called.

  * If an *on_error* callback is not passed in during client instantiation,

       then the error will be raised by default.

buffered_modeが True の場合、on_errorコールバックが必要であり、エラーは次のように処理されます。 * イベントが指定されたタイムアウト内にエンキューに失敗した場合は、エラーが直接発生します。

  * If events fail to send after enqueuing successfully, the *on_error* callback will be called.

close

プロデューサー クライアントの基になる AMQP 接続とリンクを閉じます。

close(*, flush: bool = True, **kwargs: Any) -> None

パラメーター

flush
bool

バッファー モードのみ。 True に設定すると、バッファー内のイベントがすぐに送信されます。 既定値は True です。

timeout
float または None

バッファー モードのみ。 プロデューサーを閉じるにはタイムアウトです。 既定値は None です。これはタイムアウトがないことを意味します。

の戻り値の型 :

例外

flush が True に設定されているか、バッファー モードで基になる AMQP 接続を閉じる場合に、バッファーの フラッシュ 時にエラーが発生した場合。

クライアントを閉じます。


   import os
   from azure.eventhub import EventHubProducerClient, EventData

   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']

   producer = EventHubProducerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )
   try:
       event_data_batch = producer.create_batch()

       while True:
           try:
               event_data_batch.add(EventData('Message inside EventBatchData'))
           except ValueError:
               # EventDataBatch object reaches max_size.
               # New EventDataBatch object can be created here to send more data
               break

       producer.send_batch(event_data_batch)
   finally:
       # Close down the producer handler.
       producer.close()

create_batch

max_size_in_bytesによって制限されているすべてのコンテンツの最大サイズを持つ EventDataBatch オブジェクトを作成します。

max_size_in_bytesは、サービスによって定義された最大許容メッセージ サイズを超えてはなりません。

create_batch(**kwargs: Any) -> EventDataBatch

の戻り値の型 :

例外

flush が True に設定されているか、バッファー モードで基になる AMQP 接続を閉じる場合に、バッファーの フラッシュ 時にエラーが発生した場合。

限られたサイズの EventDataBatch オブジェクトを作成する


       event_data_batch = producer.create_batch()
       while True:
           try:
               event_data_batch.add(EventData('Message inside EventBatchData'))
           except ValueError:
               # The EventDataBatch object reaches its max_size.
               # You can send the full EventDataBatch object and create a new one here.
               break

flush

バッファー モードのみ。 クライアントがバッファー モードで動作している場合は、バッファー内のイベントをフラッシュしてすぐに送信します。

flush(**kwargs: Any) -> None

パラメーター

timeout
Optional[float]

バッファー内のイベントをフラッシュするためのタイムアウト。既定値は None です。これはタイムアウトがないことを意味します。

の戻り値の型 :

例外

プロデューサーがバッファー モードで指定されたタイムアウト内でバッファーをフラッシュできない場合。

from_connection_string

接続文字列から EventHubProducerClient を作成します。

from_connection_string(conn_str: str, *, eventhub_name: str | None = None, buffered_mode: Literal[False] = False, **kwargs: Any) -> EventHubProducerClient

パラメーター

conn_str
str
必須

イベント ハブの接続文字列。

eventhub_name
str

クライアントを接続する特定のイベント ハブのパス。

buffered_mode
bool

True の場合、プロデューサー クライアントはバッファー内のイベントを収集し、効率的にバッチ処理してから発行します。 既定値は False です。

buffer_concurrency
<xref:ThreadPoolExecutor> または int または None

イベントの発行に使用する ThreadPoolExecutor、または ThreadPoolExecutor のワーカーの数。 既定値は None で、既定のワーカー数を持つ ThreadPoolExecutor は、次のように作成されます。 https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor

on_success
Optional[Callable[[<xref:SendEventTypes>, Optional[str]], None]]

バッチが正常に発行された後に呼び出されるコールバック。 コールバックは、次の 2 つのパラメーターを受け取ります。

  • events: 正常に発行されたイベントの一覧

  • partition_id: リスト内のイベントが発行されたパーティション ID。

コールバック関数は、 on_success(events, partition_id)のように定義する必要があります。 buffered_modeが True の場合は必須。buffered_modeが False の場合は省略可能です。

on_error
Optional[Callable[[<xref:SendEventTypes>, Optional[str], Exception], None]]

バッチの発行に失敗した後に呼び出されるコールバック。 buffered_mode で True の場合は必須、buffered_modeが False の場合は省略可能です。 コールバック関数は、次のように定義する必要があります: on_error(events, partition_id, error), where:

  • events: 発行に失敗したイベントの一覧。

  • partition_id: リスト内のイベントが および に発行しようとしたパーティション ID

  • error: 送信エラーに関連する例外。

buffered_modeが False の場合、on_errorコールバックは省略可能であり、エラーは次のように処理されます。

  • プロデューサー クライアントのインスタンス化中に on_error コールバックが渡された場合、

    その後、エラー情報が on_error コールバックに渡され、呼び出されます。

  • クライアントのインスタンス化中に on_error コールバックが渡されない場合は、

    その場合、エラーは既定で発生します。

buffered_modeが True の場合、on_errorコールバックが必要であり、エラーは次のように処理されます。

  • 指定されたタイムアウト内にイベントをエンキューできない場合は、エラーが直接発生します。

  • エンキューが正常に完了した後にイベントが送信されない場合は、 on_error コールバックが呼び出されます。

max_buffer_length
int

バッファー モードのみ。 フラッシュがトリガーされる前にバッファーに格納できるパーティションごとのイベントの合計数。 既定値は、バッファー モードでは 1500 です。

max_wait_time
Optional[float]

バッファー モードのみ。 発行する前に、バッファー内のイベントを使用してバッチがビルドされるまでの待機時間。 既定値は、バッファー モードでは 1 です。

logging_enable
bool

ネットワーク トレース ログをロガーに出力するかどうか。 既定値は False です。

http_proxy
Dict

HTTP プロキシ設定。 これは、 'proxy_hostname' (str 値) と ' proxy_port' (int 値) のキーを持つディクショナリである必要があります。 さらに、次のキーが存在する場合もあります: 'username'、'password'

auth_timeout
float

サービスによってトークンが承認されるまで待機する時間 (秒単位)。 既定値は 60 秒です。 0 に設定すると、クライアントからタイムアウトは適用されません。

user_agent
str

指定した場合、これはユーザー エージェント文字列の前に追加されます。

retry_total
int

エラーが発生したときに失敗した操作をやり直す試行の合計数。 既定値は 3 です。

retry_backoff_factor
float

2 回目の試行後の試行間に適用するバックオフ係数 (ほとんどのエラーは、遅延なしで 2 回目の試行によってすぐに解決されます)。 固定モードでは、再試行ポリシーは常に {backoff factor} のスリープ状態になります。 'exponential' モードでは、再試行ポリシーは {backoff factor} * (2 ** ({合計再試行回数} - 1)) 秒でスリープ状態になります。 backoff_factorが 0.1 の場合、再試行の間に [0.0s,0.2s, 0.4s, ...] がスリープ状態になります。 既定値は 0.8 です。

retry_backoff_max
float

最大バックオフ時間。 既定値は 120 秒 (2 分) です。

retry_mode
str

再試行の間の遅延動作。 サポートされている値は 'fixed' または 'exponential' です。既定値は 'exponential' です。

idle_timeout
float

タイムアウト (秒単位)。その後、アクティビティがない場合、このクライアントは基になる接続を閉じます。 既定では、値は None です。これは、サービスによって開始されない限り、非アクティブのためクライアントがシャットダウンしないことを意味します。

transport_type
TransportType

Event Hubs サービスとの通信に使用されるトランスポート プロトコルの種類。 既定値は TransportType.Amqp です。この場合、ポート 5671 が使用されます。 ネットワーク環境でポート 5671 が使用できないかブロックされている場合は、 代わりに TransportType.AmqpOverWebsocket を使用して、通信にポート 443 を使用できます。

http_proxy

HTTP プロキシ設定。 これは、 'proxy_hostname' (str 値) と ' proxy_port' (int 値) のキーを持つディクショナリである必要があります。 さらに、次のキーが存在する場合もあります: 'username'、'password'

custom_endpoint_address
Optional[str]

Event Hubs サービスへの接続を確立するために使用するカスタム エンドポイント アドレス。これにより、ホスト環境に必要な任意のアプリケーション ゲートウェイまたはその他のパスを介してネットワーク要求をルーティングできます。 既定値はなしです。 形式は "sb://< custom_endpoint_hostname>:<custom_endpoint_port>" のようになります。 custom_endpoint_addressで port が指定されていない場合、既定ではポート 443 が使用されます。

connection_verify
Optional[str]

接続エンドポイントの ID を認証するために使用される SSL 証明書のカスタム CA_BUNDLE ファイルへのパス。 既定値は None で、その場合 は certifi.where() が使用されます。

uamqp_transport
bool

基になるトランスポートとして uamqp ライブラリを使用するかどうか。 既定値は False で、純粋な Python AMQP ライブラリが基になるトランスポートとして使用されます。

の戻り値の型 :

例外

flush が True に設定されているか、バッファー モードで基になる AMQP 接続を閉じる場合に、バッファーの フラッシュ 時にエラーが発生した場合。

接続文字列から EventHubProducerClient の新しいインスタンスを作成します。


   import os
   from azure.eventhub import EventHubProducerClient
   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   producer = EventHubProducerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

get_buffered_event_count

バッファーに格納され、特定のパーティションに対して発行されるのを待機しているイベントの数。 バッファーなしモードで None を返します。 注: イベント バッファーはバックグラウンド スレッドで処理されるため、この API によって報告されるバッファー内のイベントの数は近似値のみを考慮する必要があり、デバッグでの使用にのみ推奨されます。 バッファーに入れられたイベントがないパーティション ID の場合、そのパーティション ID がイベント ハブ内に実際に存在するかどうかに関係なく、0 が返されます。

get_buffered_event_count(partition_id: str) -> int | None

パラメーター

partition_id
str
必須

ターゲット パーティション ID。

の戻り値の型 :

int,

例外

flush が True に設定されているか、バッファー モードで基になる AMQP 接続を閉じる場合に、バッファーの フラッシュ 時にエラーが発生した場合。

get_eventhub_properties

イベント ハブのプロパティを取得します。

返されるディクショナリのキーは次のとおりです。

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

get_eventhub_properties() -> Dict[str, Any]

戻り値

eventhub プロパティを含むディクショナリ。

の戻り値の型 :

例外

get_partition_ids

イベント ハブのパーティション ID を取得します。

get_partition_ids() -> List[str]

戻り値

パーティション ID の一覧。

の戻り値の型 :

例外

get_partition_properties

指定したパーティションのプロパティを取得します。

プロパティ ディクショナリのキーは次のとおりです。

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

get_partition_properties(partition_id: str) -> Dict[str, Any]

パラメーター

partition_id
str
必須

ターゲット パーティション ID。

戻り値

パーティション プロパティのディクショナリ。

の戻り値の型 :

例外

send_batch

イベント データのバッチを送信します。 既定では、受信確認が受信されるか、操作がタイムアウトするまで、 メソッドはブロックされます。 EventHubProducerClient がバッファー モードで実行されるように構成されている場合、メソッドは、指定された時間内にイベントをバッファーにエンキューして返します。 プロデューサーは、バッファー モードでバックグラウンドで自動送信を行います。

buffered_modeが False の場合、on_errorコールバックは省略可能であり、エラーは次のように処理されます。

  • プロデューサー クライアントのインスタンス化中に on_error コールバックが渡された場合、

    その後、エラー情報が on_error コールバックに渡され、呼び出されます。

  • クライアントのインスタンス化中に on_error コールバックが渡されない場合は、

    その場合、エラーは既定で発生します。

buffered_modeが True の場合、on_errorコールバックが必要であり、エラーは次のように処理されます。

  • 指定されたタイムアウト内にイベントをエンキューできない場合は、エラーが直接発生します。

  • エンキューが正常に完了した後にイベントが送信されない場合は、 on_error コールバックが呼び出されます。

バッファー モードでは、バッチの送信はそのまま残り、1 つのユニットとして送信されます。 バッチは再配置されません。 これにより、イベントを送信する効率が低下する可能性があります。

EventData または AmqpAnnotatedMessage の有限リストを送信していて、それがイベント ハブのフレーム サイズの制限内にあることがわかっている場合は、send_batch呼び出しで送信できます。 それ以外の場合は、 を使用 create_batch して EventDataBatch を作成し、サイズ制限まで EventData または AmqpAnnotatedMessage をバッチに 1 つずつ追加し、このメソッドを呼び出してバッチを送信します。

send_batch(event_data_batch: EventDataBatch | List[EventData | AmqpAnnotatedMessage], **kwargs: Any) -> None

パラメーター

event_data_batch
Union[EventDataBatch, List[Union[EventData, AmqpAnnotatedMessage]]
必須

送信される EventDataBatch オブジェクト、またはバッチで送信される EventData の一覧。 リストまたは EventDataBatch 内のすべての EventData または AmqpAnnotatedMessage は、同じパーティションに格納されます。

timeout
float

バッファーなしモードでイベント データを送信する最大待機時間、またはバッファー モードでイベント データをバッファーにエンキューする最大待機時間。 バッファーなしモードでは、プロデューサーの作成時に指定された既定の待機時間が使用されます。 バッファー モードでは、既定の待機時間は None です。

partition_id
str

送信する特定のパーティション ID。 既定値は None です。この場合、サービスはすべてのパーティションにラウンド ロビンを使用して割り当てられます。 EventDataBatch 自体にpartition_idがあるため、partition_idが指定され、event_data_batchが EventDataBatch である場合、TypeError が発生します。

partition_key
str

指定したpartition_keyでは、イベント データは、サービスによって決定されたイベント ハブの特定のパーティションに送信されます。 EventDataBatch 自体にpartition_keyがあるため、partition_keyが指定され、event_data_batchが EventDataBatch である場合、TypeError が発生します。 partition_idとpartition_keyの両方が指定されている場合は、partition_idが優先されます。 警告: 送信するイベントに対して文字列以外の値のpartition_keyを設定することはお勧めしません。partition_keyは Event Hub サービスによって無視され、ラウンドロビンを使用するすべてのパーティションにイベントが割り当てられるので、お勧めしません。 さらに、partition_keyが文字列型のみを想定するイベントを使用するための SDK があり、文字列以外の値の解析に失敗する可能性があります。

の戻り値の型 :

例外

タイムアウト・パラメーターで指定された値が経過してから、非バッファー・モードでイベントを送信できる場合、またはイベントをバッファー・モードでバッファー・モードにエンキューできない場合。

イベント データを送信する


       with producer:
           event_data_batch = producer.create_batch()

           while True:
               try:
                   event_data_batch.add(EventData('Message inside EventBatchData'))
               except ValueError:
                   # EventDataBatch object reaches max_size.
                   # New EventDataBatch object can be created here to send more data
                   break

           producer.send_batch(event_data_batch)

send_event

イベント データを送信します。 既定では、受信確認が受信されるか、操作がタイムアウトするまで、 メソッドはブロックされます。 EventHubProducerClient がバッファー モードで実行されるように構成されている場合、メソッドは、指定された時間内にイベントをバッファーにエンキューして返します。 プロデューサーは、バッファー モードでバックグラウンドで自動送信を行います。

buffered_modeが False の場合、on_errorコールバックは省略可能であり、エラーは次のように処理されます。 * プロデューサー クライアントのインスタンス化中にon_errorコールバックが渡された場合は、

       then error information will be passed to the *on_error* callback, which will then be called.

  * If an *on_error* callback is not passed in during client instantiation,

       then the error will be raised by default.

buffered_modeが True の場合、on_errorコールバックが必要であり、エラーは次のように処理されます。 * イベントが指定されたタイムアウト内にエンキューに失敗した場合は、エラーが直接発生します。

  * If events fail to send after enqueuing successfully, the *on_error* callback will be called.
send_event(event_data: EventData | AmqpAnnotatedMessage, **kwargs: Any) -> None

パラメーター

event_data
Union[EventData, AmqpAnnotatedMessage]
必須

送信する EventData オブジェクト。

timeout
float

バッファーなしモードでイベント データを送信する最大待機時間、またはバッファー モードでイベント データをバッファーにエンキューする最大待機時間。 バッファーなしモードでは、プロデューサーの作成時に指定された既定の待機時間が使用されます。 バッファー モードでは、既定の待機時間は None です。

partition_id
str

送信する特定のパーティション ID。 既定値は None です。この場合、サービスはすべてのパーティションにラウンド ロビンを使用して割り当てられます。 EventDataBatch 自体にpartition_idがあるため、partition_idが指定され、event_data_batchが EventDataBatch である場合、TypeError が発生します。

partition_key
str

指定したpartition_keyでは、イベント データは、サービスによって決定されたイベント ハブの特定のパーティションに送信されます。 EventDataBatch 自体にpartition_keyがあるため、partition_keyが指定され、event_data_batchが EventDataBatch である場合、TypeError が発生します。 partition_idとpartition_keyの両方が指定されている場合は、partition_idが優先されます。 警告: 送信するイベントに対して文字列以外の値のpartition_keyを設定することはお勧めしません。partition_keyは Event Hub サービスによって無視され、ラウンドロビンを使用するすべてのパーティションにイベントが割り当てられるので、お勧めしません。 さらに、partition_keyが文字列型のみを想定するイベントを使用するための SDK があり、文字列以外の値の解析に失敗する可能性があります。

の戻り値の型 :

例外

タイムアウト パラメーターで指定された値が経過した場合は、イベントをバッファーなしモードで送信するか、バッファーモードでバッファーにイベントをエンキューできます。

属性

total_buffered_event_count

現在バッファーに格納され、すべてのパーティションでパブリッシュされるのを待機しているイベントの合計数。 バッファーなしモードで None を返します。 注: イベント バッファーはバックグラウンド スレッドで処理されるため、この API によって報告されるバッファー内のイベントの数は近似値と見なす必要があり、デバッグでのみ使用することをお勧めします。

の戻り値の型 :

int,