EventHubProducerClient クラス

EventHubProducerClient クラスは、Azure Event Hubs サービスにイベントを送信するための高レベルのインターフェイスを定義します。

継承
azure.eventhub.aio._client_base_async.ClientBaseAsync
EventHubProducerClient

コンストラクター

EventHubProducerClient(fully_qualified_namespace: str, eventhub_name: str, credential: CredentialTypes, *, buffered_mode: bool = False, on_error: Callable[[List[EventData | AmqpAnnotatedMessage], str | None, Exception], Awaitable[None]] | None = None, on_success: Callable[[List[EventData | AmqpAnnotatedMessage], str | None], Awaitable[None]] | None = None, max_buffer_length: int | None = None, max_wait_time: float | None = None, **kwargs: Any)

パラメーター

fully_qualified_namespace
str
必須

Event Hubs 名前空間の完全修飾ホスト名。 これは .servicebus.windows.net に似ている可能性があります

eventhub_name
str
必須

クライアントを接続する特定のイベント ハブのパス。

credential
AsyncTokenCredential または AzureSasCredential または AzureNamedKeyCredential
必須

トークンを取得するための特定のインターフェイスを実装する認証に使用される資格情報オブジェクト。 azure-identity ライブラリによって生成された 、または資格情報オブジェクトと、*get_token(self, scopes) メソッドを実装するオブジェクトを受け入れますEventHubSharedKeyCredential

buffered_mode
bool

True の場合、プロデューサー クライアントはバッファー内のイベントを収集し、効率的にバッチ処理してから発行します。 既定値は False です。

on_success
Optional[Callable[[<xref:SendEventTypes>, Optional[str]], Awaitable[None]]]

バッチが正常に発行された後に呼び出されるコールバック。 コールバックは、次の 2 つのパラメーターを受け取ります。

  • events: 正常に発行されたイベントの一覧

  • partition_id: リスト内のイベントが発行されたパーティション ID。

コールバック関数は、 on_success(events, partition_id)のように定義する必要があります。 buffered_modeが True の場合は必須。buffered_modeが False の場合は省略可能です。

on_error
Optional[Callable[[<xref:SendEventTypes>, Optional[str], Exception], Awaitable[None]]]

バッチの発行に失敗した後に呼び出されるコールバック。 buffered_mode で True の場合は必須、buffered_modeが False の場合は省略可能です。 コールバック関数は、次のように定義する必要があります: on_error(events, partition_id, error), where:

  • events: 発行に失敗したイベントの一覧。

  • partition_id: リスト内のイベントが および に発行しようとしたパーティション ID

  • error: 送信エラーに関連する例外。

buffered_modeが False の場合、on_errorコールバックは省略可能であり、エラーは次のように処理されます。

  • プロデューサー クライアントのインスタンス化中に on_error コールバックが渡された場合、

    その後、エラー情報が on_error コールバックに渡され、呼び出されます。

  • クライアントのインスタンス化中に on_error コールバックが渡されない場合は、

    その場合、エラーは既定で発生します。

buffered_modeが True の場合、on_errorコールバックが必要であり、エラーは次のように処理されます。

  • 指定されたタイムアウト内にイベントをエンキューできない場合は、エラーが直接発生します。

  • エンキューが正常に完了した後にイベントが送信されない場合は、 on_error コールバックが呼び出されます。

max_buffer_length
int

バッファー モードのみ。 フラッシュがトリガーされる前にバッファーに格納できるパーティションごとのイベントの合計数。 既定値は、バッファー モードでは 1500 です。

max_wait_time
Optional[float]

バッファー モードのみ。 発行する前に、バッファー内のイベントを使用してバッチがビルドされるまでの待機時間。 既定値は、バッファー モードでは 1 です。

logging_enable
bool

ネットワーク トレース ログをロガーに出力するかどうか。 既定値は False です。

auth_timeout
float

サービスによってトークンが承認されるまで待機する時間 (秒単位)。 既定値は 60 秒です。 0 に設定すると、クライアントからタイムアウトは適用されません。

user_agent
str

指定した場合、これはユーザー エージェント文字列の前に追加されます。

retry_total
int

エラーが発生したときに失敗した操作をやり直す試行の合計数。 既定値は 3 です。

retry_backoff_factor
float

2 回目の試行後の試行間に適用するバックオフ係数 (ほとんどのエラーは、遅延なしで 2 回目の試行によってすぐに解決されます)。 固定モードでは、再試行ポリシーは常に {backoff factor} のスリープ状態になります。 'exponential' モードでは、再試行ポリシーは {backoff factor} * (2 ** ({合計再試行回数} - 1)) 秒でスリープ状態になります。 backoff_factorが 0.1 の場合、再試行の間に [0.0s,0.2s, 0.4s, ...] がスリープ状態になります。 既定値は 0.8 です。

retry_backoff_max
float

最大バックオフ時間。 既定値は 120 秒 (2 分) です。

retry_mode
str

再試行の間の遅延動作。 サポートされている値は 'fixed' または 'exponential' です。既定値は 'exponential' です。

idle_timeout
float

タイムアウト (秒単位)。その後、アクティビティがない場合、このクライアントは基になる接続を閉じます。 既定では、値は None です。これは、サービスによって開始されない限り、非アクティブのためクライアントがシャットダウンしないことを意味します。

transport_type
TransportType

Event Hubs サービスとの通信に使用されるトランスポート プロトコルの種類。 既定値は TransportType.Amqp です。この場合、ポート 5671 が使用されます。 ネットワーク環境でポート 5671 が使用できないかブロックされている場合は、 代わりに TransportType.AmqpOverWebsocket を使用して、通信にポート 443 を使用できます。

http_proxy
dict

HTTP プロキシ設定。 これは、 'proxy_hostname' (str 値) と ' proxy_port' (int 値) のキーを持つディクショナリである必要があります。 さらに、次のキーが存在する場合もあります: 'username'、'password'

custom_endpoint_address
Optional[str]

Event Hubs サービスへの接続を確立するために使用するカスタム エンドポイント アドレス。これにより、ホスト環境に必要な任意のアプリケーション ゲートウェイまたはその他のパスを介してネットワーク要求をルーティングできます。 既定値はなしです。 形式は "sb://< custom_endpoint_hostname>:<custom_endpoint_port>" のようになります。 custom_endpoint_addressで port が指定されていない場合、既定ではポート 443 が使用されます。

connection_verify
Optional[str]

接続エンドポイントの ID を認証するために使用される SSL 証明書のカスタム CA_BUNDLE ファイルへのパス。 既定値は None で、その場合 は certifi.where() が使用されます。

uamqp_transport
bool

基になるトランスポートとして uamqp ライブラリを使用するかどうか。 既定値は False で、純粋な Python AMQP ライブラリが基になるトランスポートとして使用されます。

socket_timeout
float

接続の基になるソケットが、データの送受信時にタイムアウトするまで待機する時間 (秒単位)。既定値は、TransportType.Amqp の場合は 0.2、TransportType.AmqpOverWebsocket の場合は 1 です。 書き込みタイムアウトが原因で EventHubsConnectionError エラーが発生している場合は、既定値よりも大きい値を渡す必要がある場合があります。 これは高度な使用シナリオの場合であり、通常は既定値で十分です。

EventHubProducerClient の新しいインスタンスを作成します。


   import os
   from azure.eventhub.aio import EventHubProducerClient, EventHubSharedKeyCredential

   fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
   shared_access_key = os.environ['EVENT_HUB_SAS_KEY']

   producer = EventHubProducerClient(fully_qualified_namespace=fully_qualified_namespace,
                                     eventhub_name=eventhub_name,
                                     credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key))

メソッド

close

プロデューサー クライアントの基になる AMQP 接続とリンクを閉じます。

create_batch

max_size_in_bytesによって制限されているすべてのコンテンツの最大サイズを持つ EventDataBatch オブジェクトを作成します。

max_size_in_bytesは、サービスで定義されている最大許容メッセージ サイズを超えてはなりません。

flush

バッファー モードのみ。 クライアントがバッファー モードで動作している場合にすぐに送信されるバッファー内のイベントをフラッシュします。

from_connection_string

接続文字列から EventHubProducerClient を作成します。

get_buffered_event_count

バッファーに格納され、特定のパーティションに対して発行されるのを待機しているイベントの数。 バッファーなしモードで None を返します。 注: イベント バッファーはバックグラウンド コルーチンで処理されるため、この API によって報告されるバッファー内のイベントの数は近似値のみを考慮する必要があり、デバッグでの使用のみをお勧めします。 バッファーに入れられたイベントがないパーティション ID の場合、そのパーティション ID がイベント ハブ内に実際に存在するかどうかに関係なく、0 が返されます。

get_eventhub_properties

イベント ハブのプロパティを取得します。

返されるディクショナリのキーは次のとおりです。

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

get_partition_ids

イベント ハブのパーティション ID を取得します。

get_partition_properties

指定したパーティションのプロパティを取得します。

プロパティ ディクショナリのキーは次のとおりです。

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

send_batch

イベント データのバッチを送信します。 既定では、受信確認が受信されるか、操作がタイムアウトするまで、 メソッドはブロックされます。 EventHubProducerClient がバッファー モードで実行されるように構成されている場合、メソッドはイベントをローカル バッファーにエンキューして返します。 プロデューサーはバックグラウンドで自動送信を行います。

buffered_modeが False の場合、on_errorコールバックは省略可能であり、エラーは次のように処理されます。

  • プロデューサー クライアントのインスタンス化中に on_error コールバックが渡された場合、

    その後、エラー情報が on_error コールバックに渡され、呼び出されます。

  • クライアントのインスタンス化中に on_error コールバックが渡されない場合は、

    その場合、エラーは既定で発生します。

buffered_modeが True の場合、on_errorコールバックが必要であり、エラーは次のように処理されます。

  • 指定されたタイムアウト内にイベントがエンキューに失敗した場合は、エラーが直接発生します。

  • エンキューに成功した後にイベントの送信に失敗した場合は、 on_error コールバックが呼び出されます。

バッファー モードでは、バッチの送信はそのまま残り、1 つのユニットとして送信されます。 バッチは再配置されません。 これにより、イベントを送信する効率が低下する可能性があります。

EventData または AmqpAnnotatedMessage の有限リストを送信していて、イベント ハブのフレーム サイズ制限内にあることがわかっている場合は、send_batch呼び出しで送信できます。 それ以外の場合は、 を使用 create_batch して EventDataBatch を作成し、サイズ制限まで EventData または AmqpAnnotatedMessage をバッチに 1 つずつ追加し、このメソッドを呼び出してバッチを送信します。

send_event

イベント データを送信します。 既定では、受信確認が受信されるか、操作がタイムアウトするまで、 メソッドはブロックされます。 EventHubProducerClient がバッファー モードで実行されるように構成されている場合、メソッドはイベントをローカル バッファーにエンキューして返します。 プロデューサーは、バックグラウンドで自動バッチ処理と送信を行います。

buffered_modeが False の場合、on_errorコールバックは省略可能であり、エラーは次のように処理されます。 * プロデューサー クライアントのインスタンス化中にon_errorコールバックが渡された場合は、

       then error information will be passed to the *on_error* callback, which will then be called.

  * If an *on_error* callback is not passed in during client instantiation,

       then the error will be raised by default.

buffered_modeが True の場合、on_errorコールバックが必要であり、エラーは次のように処理されます。 * イベントが指定されたタイムアウト内にエンキューに失敗した場合は、エラーが直接発生します。

  * If events fail to send after enqueuing successfully, the *on_error* callback will be called.

close

プロデューサー クライアントの基になる AMQP 接続とリンクを閉じます。

async close(*, flush: bool = True, **kwargs: Any) -> None

パラメーター

flush
bool

バッファー モードのみ。 True に設定すると、バッファー内のイベントがすぐに送信されます。 既定値は True です。

timeout
float または None

バッファー モードのみ。 プロデューサーを閉じるにはタイムアウトです。 既定値は None です。これはタイムアウトがないことを意味します。

の戻り値の型 :

例外

flush が True に設定されている場合、またはバッファー モードで基になる AMQP 接続を閉じる場合に、バッファーの フラッシュ 時にエラーが発生した場合。

ハンドラーを閉じます。


   import os
   from azure.eventhub.aio import EventHubProducerClient
   from azure.eventhub import EventData

   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']

   producer = EventHubProducerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )
   try:
       event_data_batch = await producer.create_batch()
       while True:
           try:
               event_data_batch.add(EventData('Message inside EventBatchData'))
           except ValueError:
               # The EventDataBatch object reaches its max_size.
               # You can send the full EventDataBatch object and create a new one here.
               break
       await producer.send_batch(event_data_batch)
   finally:
       # Close down the producer handler.
       await producer.close()

create_batch

max_size_in_bytesによって制限されているすべてのコンテンツの最大サイズを持つ EventDataBatch オブジェクトを作成します。

max_size_in_bytesは、サービスで定義されている最大許容メッセージ サイズを超えてはなりません。

async create_batch(*, partition_id: str | None = None, partition_key: str | None = None, max_size_in_bytes: int | None = None) -> EventDataBatch

の戻り値の型 :

例外

flush が True に設定されている場合、またはバッファー モードで基になる AMQP 接続を閉じる場合に、バッファーの フラッシュ 時にエラーが発生した場合。

限られたサイズの EventDataBatch オブジェクトを作成する


       from azure.eventhub import EventData
       event_data_batch = await producer.create_batch()
       while True:
           try:
               event_data_batch.add(EventData('Message inside EventBatchData'))
           except ValueError:
               # The EventDataBatch object reaches its max_size.
               # You can send the full EventDataBatch object and create a new one here.
               break

flush

バッファー モードのみ。 クライアントがバッファー モードで動作している場合にすぐに送信されるバッファー内のイベントをフラッシュします。

async flush(**kwargs: Any) -> None

パラメーター

timeout
float または None

バッファー内のイベントをフラッシュするためのタイムアウト。既定値は None です。これはタイムアウトがないことを意味します。

の戻り値の型 :

例外

プロデューサーがバッファー モードで指定されたタイムアウト内にバッファーをフラッシュできない場合。

from_connection_string

接続文字列から EventHubProducerClient を作成します。

from_connection_string(conn_str: str, *, eventhub_name: str | None = None, buffered_mode: bool = False, on_error: Callable[[List[EventData | AmqpAnnotatedMessage], str | None, Exception], Awaitable[None]] | None = None, on_success: Callable[[List[EventData | AmqpAnnotatedMessage], str | None], Awaitable[None]] | None = None, max_buffer_length: int | None = None, max_wait_time: float | None = None, logging_enable: bool = False, http_proxy: Dict[str, str | int] | None = None, auth_timeout: float = 60, user_agent: str | None = None, retry_total: int = 3, transport_type: TransportType = TransportType.Amqp, **kwargs: Any) -> EventHubProducerClient

パラメーター

conn_str
str
必須

イベント ハブの接続文字列。

eventhub_name
str

クライアントを接続する特定のイベント ハブのパス。

buffered_mode
bool

True の場合、プロデューサー クライアントはバッファー内のイベントを収集し、効率的にバッチ処理してから発行します。 既定値は False です。

on_success
Optional[Callable[[<xref:SendEventTypes>, Optional[str]], Awaitable[None]]]

バッチが正常に発行された後に呼び出されるコールバック。 コールバックは、次の 2 つのパラメーターを受け取ります。

  • events: 正常に発行されたイベントの一覧

  • partition_id: リスト内のイベントが発行されたパーティション ID。

コールバック関数は、 on_success(events, partition_id)のように定義する必要があります。 buffered_modeが True の場合は必須ですが、buffered_modeが False の場合は省略可能です。

on_error
Optional[Callable[[<xref:SendEventTypes>, Optional[str], Exception], Awaitable[None]]]

バッチの発行に失敗した後に呼び出されるコールバック。 コールバック関数は、次のように定義する必要があります: on_error(events, partition_id, error), where:

  • events: 発行に失敗したイベントの一覧。

  • partition_id: リスト内のイベントが および に発行しようとしたパーティション ID

  • error: 送信エラーに関連する例外。

buffered_modeが False の場合、on_errorコールバックは省略可能であり、エラーは次のように処理されます。

  • プロデューサー クライアントのインスタンス化中に on_error コールバックが渡された場合、

    その後、エラー情報が on_error コールバックに渡され、呼び出されます。

  • クライアントのインスタンス化中に on_error コールバックが渡されない場合は、

    その場合、エラーは既定で発生します。

buffered_modeが True の場合、on_errorコールバックが必要であり、エラーは次のように処理されます。

  • 指定されたタイムアウト内にイベントをエンキューできない場合は、エラーが直接発生します。

  • エンキューが正常に完了した後にイベントが送信されない場合は、 on_error コールバックが呼び出されます。

max_buffer_length
int

バッファー モードのみ。 フラッシュがトリガーされる前にバッファーに格納できるパーティションごとのイベントの合計数。 既定値は、バッファー モードでは 1500 です。

max_wait_time
Optional[float]

バッファー モードのみ。 発行する前に、バッファー内のイベントを使用してバッチがビルドされるまでの待機時間。 既定値は、バッファー モードでは 1 です。

logging_enable
bool

ネットワーク トレース ログをロガーに出力するかどうか。 既定値は False です。

http_proxy
dict

HTTP プロキシ設定。 これは、 'proxy_hostname' (str 値) と ' proxy_port' (int 値) のキーを持つディクショナリである必要があります。 さらに、次のキーが存在する場合もあります: 'username'、'password'

auth_timeout
float

サービスによってトークンが承認されるまで待機する時間 (秒単位)。 既定値は 60 秒です。 0 に設定すると、クライアントからタイムアウトは適用されません。

user_agent
str

指定した場合、これはユーザー エージェント文字列の前に追加されます。

retry_total
int

エラーが発生したときに失敗した操作をやり直す試行の合計数。 既定値は 3 です。

retry_backoff_factor
float

2 回目の試行後の試行間に適用するバックオフ係数 (ほとんどのエラーは、遅延なしで 2 回目の試行によってすぐに解決されます)。 固定モードでは、再試行ポリシーは常に {backoff factor} のスリープ状態になります。 'exponential' モードでは、再試行ポリシーは {backoff factor} * (2 ** ({合計再試行回数} - 1)) 秒でスリープ状態になります。 backoff_factorが 0.1 の場合、再試行の間に [0.0s,0.2s, 0.4s, ...] がスリープ状態になります。 既定値は 0.8 です。

retry_backoff_max
float

最大バックオフ時間。 既定値は 120 秒 (2 分) です。

retry_mode
str

再試行の間の遅延動作。 サポートされている値は 'fixed' または 'exponential' です。既定値は 'exponential' です。

idle_timeout
float

タイムアウト (秒単位)。その後、アクティビティがない場合、このクライアントは基になる接続を閉じます。 既定では、値は None です。これは、サービスによって開始されない限り、非アクティブのためクライアントがシャットダウンしないことを意味します。

transport_type
TransportType

Event Hubs サービスとの通信に使用されるトランスポート プロトコルの種類。 既定値は TransportType.Amqp です。この場合、ポート 5671 が使用されます。 ネットワーク環境でポート 5671 が使用できないかブロックされている場合は、 代わりに TransportType.AmqpOverWebsocket を使用して、通信にポート 443 を使用できます。

custom_endpoint_address
Optional[str]

Event Hubs サービスへの接続を確立するために使用するカスタム エンドポイント アドレス。これにより、ホスト環境に必要な任意のアプリケーション ゲートウェイまたはその他のパスを介してネットワーク要求をルーティングできます。 既定値はなしです。 形式は "sb://< custom_endpoint_hostname>:<custom_endpoint_port>" のようになります。 custom_endpoint_addressで port が指定されていない場合、既定ではポート 443 が使用されます。

connection_verify
Optional[str]

接続エンドポイントの ID を認証するために使用される SSL 証明書のカスタム CA_BUNDLE ファイルへのパス。 既定値は None で、その場合 は certifi.where() が使用されます。

uamqp_transport
bool

基になるトランスポートとして uamqp ライブラリを使用するかどうか。 既定値は False で、純粋な Python AMQP ライブラリが基になるトランスポートとして使用されます。

の戻り値の型 :

例外

flush が True に設定されている場合、またはバッファー モードで基になる AMQP 接続を閉じる場合に、バッファーの フラッシュ 時にエラーが発生した場合。

接続文字列から EventHubProducerClient の新しいインスタンスを作成します。


   import os
   from azure.eventhub.aio import EventHubProducerClient
   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   producer = EventHubProducerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

get_buffered_event_count

バッファーに格納され、特定のパーティションに対して発行されるのを待機しているイベントの数。 バッファーなしモードで None を返します。 注: イベント バッファーはバックグラウンド コルーチンで処理されるため、この API によって報告されるバッファー内のイベントの数は近似値のみを考慮する必要があり、デバッグでの使用のみをお勧めします。 バッファーに入れられたイベントがないパーティション ID の場合、そのパーティション ID がイベント ハブ内に実際に存在するかどうかに関係なく、0 が返されます。

get_buffered_event_count(partition_id: str) -> int | None

パラメーター

partition_id
str
必須

ターゲット パーティション ID。

の戻り値の型 :

int,

例外

flush が True に設定されている場合、またはバッファー モードで基になる AMQP 接続を閉じる場合に、バッファーの フラッシュ 時にエラーが発生した場合。

get_eventhub_properties

イベント ハブのプロパティを取得します。

返されるディクショナリのキーは次のとおりです。

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

async get_eventhub_properties() -> Dict[str, Any]

戻り値

イベント ハブに関する情報を含むディクショナリ。

の戻り値の型 :

例外

get_partition_ids

イベント ハブのパーティション ID を取得します。

async get_partition_ids() -> List[str]

戻り値

パーティション ID の一覧。

の戻り値の型 :

例外

get_partition_properties

指定したパーティションのプロパティを取得します。

プロパティ ディクショナリのキーは次のとおりです。

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

async get_partition_properties(partition_id: str) -> Dict[str, Any]

パラメーター

partition_id
str
必須

ターゲット パーティション ID。

戻り値

パーティションプロパティのディクテーション。

の戻り値の型 :

例外

send_batch

イベント データのバッチを送信します。 既定では、受信確認が受信されるか、操作がタイムアウトするまで、 メソッドはブロックされます。 EventHubProducerClient がバッファー モードで実行されるように構成されている場合、メソッドはイベントをローカル バッファーにエンキューして返します。 プロデューサーはバックグラウンドで自動送信を行います。

buffered_modeが False の場合、on_errorコールバックは省略可能であり、エラーは次のように処理されます。

  • プロデューサー クライアントのインスタンス化中に on_error コールバックが渡された場合、

    その後、エラー情報が on_error コールバックに渡され、呼び出されます。

  • クライアントのインスタンス化中に on_error コールバックが渡されない場合は、

    その場合、エラーは既定で発生します。

buffered_modeが True の場合、on_errorコールバックが必要であり、エラーは次のように処理されます。

  • 指定されたタイムアウト内にイベントがエンキューに失敗した場合は、エラーが直接発生します。

  • エンキューに成功した後にイベントの送信に失敗した場合は、 on_error コールバックが呼び出されます。

バッファー モードでは、バッチの送信はそのまま残り、1 つのユニットとして送信されます。 バッチは再配置されません。 これにより、イベントを送信する効率が低下する可能性があります。

EventData または AmqpAnnotatedMessage の有限リストを送信していて、イベント ハブのフレーム サイズ制限内にあることがわかっている場合は、send_batch呼び出しで送信できます。 それ以外の場合は、 を使用 create_batch して EventDataBatch を作成し、サイズ制限まで EventData または AmqpAnnotatedMessage をバッチに 1 つずつ追加し、このメソッドを呼び出してバッチを送信します。

async send_batch(event_data_batch: EventDataBatch | List[EventData | AmqpAnnotatedMessage], **kwargs: Any) -> None

パラメーター

event_data_batch
Union[EventDataBatch, List[Union[EventData, AmqpAnnotatedMessage]]
必須

送信される EventDataBatch オブジェクト、またはバッチで送信される EventData の一覧。 リストまたは EventDataBatch 内のすべての EventData または AmqpAnnotatedMessage は、同じパーティションに格納されます。

timeout
float

バッファーなしモードでイベント データを送信する最大待機時間、またはバッファー モードでイベント データをバッファーにエンキューする最大待機時間。 バッファーなしモードでは、プロデューサーの作成時に指定された既定の待機時間が使用されます。 バッファー モードでは、既定の待機時間は None です。

partition_id
str

送信する特定のパーティション ID。 既定値は None です。この場合、サービスはすべてのパーティションにラウンド ロビンを使用して割り当てられます。 EventDataBatch 自体にpartition_idがあるため、partition_idが指定され、event_data_batchが EventDataBatch である場合、TypeError が発生します。

partition_key
str

指定したpartition_keyでは、イベント データは、サービスによって決定されたイベント ハブの特定のパーティションに送信されます。 EventDataBatch 自体にpartition_keyがあるため、partition_keyが指定され、event_data_batchが EventDataBatch である場合、TypeError が発生します。 partition_idとpartition_keyの両方が指定されている場合は、partition_idが優先されます。 警告: 送信するイベントに対して文字列以外の値のpartition_keyを設定することはお勧めしません。イベント ハブ サービスによってpartition_keyが無視され、ラウンド ロビンを使用してすべてのパーティションにイベントが割り当てられるので、お勧めしません。 さらに、partition_keyが文字列型のみを想定するイベントを使用するための SDK があり、文字列以外の値の解析に失敗する可能性があります。

の戻り値の型 :

例外

タイムアウト パラメーターで指定された値が経過した場合は、イベントをバッファーなしモードで送信するか、バッファーモードでバッファーにイベントをエンキューできます。

イベント データを非同期的に送信する


       async with producer:
           event_data_batch = await producer.create_batch()
           while True:
               try:
                   event_data_batch.add(EventData('Message inside EventBatchData'))
               except ValueError:
                   # The EventDataBatch object reaches its max_size.
                   # You can send the full EventDataBatch object and create a new one here.
                   break
           await producer.send_batch(event_data_batch)

send_event

イベント データを送信します。 既定では、受信確認が受信されるか、操作がタイムアウトするまで、 メソッドはブロックされます。 EventHubProducerClient がバッファー モードで実行されるように構成されている場合、メソッドはイベントをローカル バッファーにエンキューして返します。 プロデューサーは、バックグラウンドで自動バッチ処理と送信を行います。

buffered_modeが False の場合、on_errorコールバックは省略可能であり、エラーは次のように処理されます。 * プロデューサー クライアントのインスタンス化中にon_errorコールバックが渡された場合は、

       then error information will be passed to the *on_error* callback, which will then be called.

  * If an *on_error* callback is not passed in during client instantiation,

       then the error will be raised by default.

buffered_modeが True の場合、on_errorコールバックが必要であり、エラーは次のように処理されます。 * イベントが指定されたタイムアウト内にエンキューに失敗した場合は、エラーが直接発生します。

  * If events fail to send after enqueuing successfully, the *on_error* callback will be called.
async send_event(event_data: EventData | AmqpAnnotatedMessage, **kwargs: Any) -> None

パラメーター

event_data
Union[EventData, AmqpAnnotatedMessage]
必須

送信する EventData オブジェクト。

timeout
float

バッファーなしモードでイベント データを送信する最大待機時間、またはバッファー モードでイベント データをバッファーにエンキューするための最大待機時間。 バッファーなしモードでは、プロデューサーの作成時に指定された既定の待機時間が使用されます。 バッファー モードでは、既定の待機時間は None です。

partition_id
str

送信する特定のパーティション ID。 既定値は None です。この場合、サービスはラウンド ロビンを使用してすべてのパーティションに割り当てます。 EventDataBatch 自体がpartition_idされているため、partition_idが指定され、event_data_batchが EventDataBatch である場合、TypeError が発生します。

partition_key
str

指定したpartition_keyでは、イベント データは、サービスによって決定された Event Hub の特定のパーティションに送信されます。 EventDataBatch 自体がpartition_keyされているため、partition_keyが指定され、event_data_batchが EventDataBatch である場合、TypeError が発生します。 partition_idとpartition_keyの両方が指定されている場合は、partition_idが優先されます。 警告: 送信するイベントに対して文字列以外の値のpartition_keyを設定することはお勧めしません。イベント ハブ サービスによってpartition_keyが無視され、ラウンド ロビンを使用してすべてのパーティションにイベントが割り当てられるので、お勧めしません。 さらに、partition_keyが文字列型のみを想定するイベントを使用するための SDK があり、文字列以外の値の解析に失敗する可能性があります。

の戻り値の型 :

例外

イベントをバッファーなしモードで送信する前に、タイムアウト パラメーターで指定された値が経過した場合、またはイベントをバッファーモードでバッファー内にエンキューできない場合。

属性

total_buffered_event_count

現在バッファーに格納され、すべてのパーティションにわたって発行されるのを待機しているイベントの合計数。 バッファーなしモードで None を返します。 注: イベント バッファーはバックグラウンド コルーチンで処理されるため、この API によって報告されるバッファー内のイベントの数は近似値のみを考慮する必要があり、デバッグでの使用のみをお勧めします。

の戻り値の型 :

int,