EventHubConsumerClient クラス

EventHubConsumerClient クラスは、Azure Event Hubs サービスからイベントを受信するための高度なインターフェイスを定義します。

EventHubConsumerClient のメインの目標は、負荷分散とチェックポイント処理を使用して EventHub のすべてのパーティションからイベントを受信することです。

同じイベント ハブ、コンシューマー グループ、チェックポイントの場所に対して複数の EventHubConsumerClient インスタンスが実行されている場合、パーティションはそれらの間で均等に分散されます。

負荷分散と永続化されたチェックポイントを有効にするには、 EventHubConsumerClient の作成時にcheckpoint_storeを設定する必要があります。 チェックポイント ストアが指定されていない場合、チェックポイントはメモリ内で内部的に維持されます。

EventHubConsumerClient は、メソッド receive() または receive_batch() を呼び出してpartition_idを指定するときに、特定のパーティションから受信することもできます。 負荷分散は、単一パーティション モードでは機能しません。 ただし、checkpoint_storeが設定されている場合でも、ユーザーはチェックポイントを保存できます。

継承
azure.eventhub.aio._client_base_async.ClientBaseAsync
EventHubConsumerClient

コンストラクター

EventHubConsumerClient(fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, credential: CredentialTypes, **kwargs: Any)

パラメーター

fully_qualified_namespace
str
必須

Event Hubs 名前空間の完全修飾ホスト名。 名前空間の形式は .servicebus.windows.net です。

eventhub_name
str
必須

クライアントを接続する特定のイベント ハブのパス。

consumer_group
str
必須

このコンシューマー グループのイベント ハブからイベントを受信します。

credential
AsyncTokenCredential または AzureSasCredential または AzureNamedKeyCredential
必須

トークンを取得するための特定のインターフェイスを実装する認証に使用される資格情報オブジェクト。 azure-identity ライブラリによって生成された 、または資格情報オブジェクトと、*get_token(self, scopes) メソッドを実装するオブジェクトを受け入れますEventHubSharedKeyCredential

logging_enable
bool

ネットワーク トレース ログをロガーに出力するかどうか。 既定値は False です。

auth_timeout
float

サービスによってトークンが承認されるまでの待機時間 (秒単位)。 既定値は 60 秒です。 0 に設定すると、クライアントからタイムアウトは適用されません。

user_agent
str

指定した場合、これはユーザー エージェント文字列の前に追加されます。

retry_total
int

エラーが発生した場合に失敗した操作をやり直す試行の合計数。 既定値は 3 です。 受信中の retry_total のコンテキストは特別です。受信メソッドは、各イテレーションで内部 受信 メソッドを呼び出す while ループによって実装されます。 受信ケースでは、retry_totalは while ループの内部受信メソッドによって発生したエラー後の再試行回数を指定します。 再試行が使い果たされると、 on_error コールバックが呼び出され (指定されている場合)、エラー情報が表示されます。 失敗した内部パーティション コンシューマーは閉じられ (指定するとon_partition_close が呼び出されます)、新しい内部パーティション コンシューマーが作成され (指定 された場合はon_partition_initializeが呼び出されます)、受信を再開します。

retry_backoff_factor
float

2 回目の試行後の試行の間に適用されるバックオフ係数 (ほとんどのエラーは、2 回目の試行によって遅延なく直ちに解決されます)。 固定モードでは、再試行ポリシーは常に {backoff factor} のスリープ状態になります。 'exponential' モードでは、再試行ポリシーは {backoff factor} * (2 ** ({合計再試行回数} - 1)) 秒でスリープ状態になります。 backoff_factorが 0.1 の場合、再試行の間に [0.0s, 0.2s, 0.4s, ...] の再試行がスリープ状態になります。 既定値は 0.8 です。

retry_backoff_max
float

最大バックオフ時間。 既定値は 120 秒 (2 分) です。

retry_mode
str

再試行の間の遅延動作。 サポートされている値は 'fixed' または 'exponential' で、既定値は 'exponential' です。

idle_timeout
float

タイムアウト (秒単位)。 それ以降のアクティビティがない場合、このクライアントは基になる接続を閉じます。 既定では、値は None です。つまり、サービスによって開始されない限り、非アクティブのためクライアントはシャットダウンされません。

transport_type
TransportType

Event Hubs サービスとの通信に使用されるトランスポート プロトコルの種類。 既定値は TransportType.Amqp です。この場合、ポート 5671 が使用されます。 ネットワーク環境でポート 5671 が使用できないかブロックされている場合は、 代わりに TransportType.AmqpOverWebsocket を使用して、通信にポート 443 を使用できます。

http_proxy

HTTP プロキシ設定。 これは、次のキーを持つディクショナリである必要があります: 'proxy_hostname' (str 値) と 'proxy_port' (int 値)。

checkpoint_store
Optional[CheckpointStore]

イベントの受信時にパーティション負荷分散とチェックポイント データを格納するマネージャー。 チェックポイント ストアは、すべてのパーティションまたは 1 つのパーティションから受信する場合の両方で使用されます。 後者の場合、負荷分散は適用されません。 チェックポイント ストアが指定されていない場合、チェックポイントはメモリ内で内部的に維持され、 EventHubConsumerClient インスタンスは負荷分散なしでイベントを受信します。

load_balancing_interval
float

負荷分散が開始されたとき。 これは、2 つの負荷分散評価の間隔 (秒単位) です。 既定値は 30 秒です。

partition_ownership_expiration_interval
float

パーティションの所有権は、この秒数後に期限切れになります。 すべての負荷分散評価では、所有権の有効期限が自動的に延長されます。 既定値は 6 * load_balancing_interval、既定のload_balancing_interval 30 秒を使用する場合は 180 秒です。

load_balancing_strategy
str または LoadBalancingStrategy

負荷分散が開始されると、この戦略を使用してパーティションの所有権を要求し、バランスを取ります。 "greedy" または LoadBalancingStrategy.GREEDY を greedy 戦略に使用します。これは、負荷分散評価ごとに、負荷のバランスを取るために必要な数の未請求パーティションを取得します。 "balanced" または LoadBalancingStrategy.BALANCED を使用して、負荷分散評価ごとに、他の EventHubConsumerClient によって要求されない 1 つのパーティションのみを要求するバランス戦略を行います。 EventHub のすべてのパーティションが他の EventHubConsumerClient によって要求され、このクライアントが要求するパーティションが少なすぎる場合、このクライアントは負荷分散戦略に関係なく、負荷分散評価ごとに他のクライアントから 1 つのパーティションを盗みます。 Greedy 戦略は既定で使用されます。

custom_endpoint_address
Optional[str]

Event Hubs サービスへの接続を確立するために使用するカスタム エンドポイント アドレス。これにより、ホスト環境に必要なアプリケーション ゲートウェイまたはその他のパスを介してネットワーク要求をルーティングできます。 既定値はなしです。 形式は "sb://< custom_endpoint_hostname>:<custom_endpoint_port>" のようになります。 custom_endpoint_addressで port が指定されていない場合、既定ではポート 443 が使用されます。

connection_verify
Optional[str]

接続エンドポイントの ID を認証するために使用される SSL 証明書のカスタム CA_BUNDLE ファイルへのパス。 既定値は None で、その場合 は certifi.where() が使用されます。

uamqp_transport
bool

基になるトランスポートとして uamqp ライブラリを使用するかどうか。 既定値は False で、Pure Python AMQP ライブラリが基になるトランスポートとして使用されます。

socket_timeout
float

データの送受信時に接続の基になるソケットがタイムアウトするまでに待機する時間 (秒単位)。既定値は、TransportType.Amqp の場合は 0.2、TransportType.AmqpOverWebsocket の場合は 1 です。 書き込みのタイムアウトが原因で EventHubsConnectionError エラーが発生している場合は、既定値より大きい値を渡す必要があります。 これは高度な使用シナリオの場合であり、通常は既定値で十分です。

EventHubConsumerClient の新しいインスタンスを作成します。


   import os
   from azure.eventhub.aio import EventHubConsumerClient, EventHubSharedKeyCredential

   fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
   shared_access_key = os.environ['EVENT_HUB_SAS_KEY']

   consumer = EventHubConsumerClient(fully_qualified_namespace=fully_qualified_namespace,
                                     consumer_group='$Default',
                                     eventhub_name=eventhub_name,
                                     credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key))

メソッド

close

イベント ハブからのイベントの取得を停止し、基になる AMQP 接続とリンクを閉じます。

from_connection_string

接続文字列から EventHubConsumerClient を作成します。

get_eventhub_properties

イベント ハブのプロパティを取得します。

返されるディクショナリのキーは次のとおりです。

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

get_partition_ids

イベント ハブのパーティション ID を取得します。

get_partition_properties

指定したパーティションのプロパティを取得します。

プロパティ ディクショナリのキーは次のとおりです。

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

receive

オプションの負荷分散とチェックポイント処理を使用して、パーティションからイベントを受信します。

receive_batch

オプションの負荷分散とチェックポイント処理を使用して、バッチでパーティションからイベントを受信します。

close

イベント ハブからのイベントの取得を停止し、基になる AMQP 接続とリンクを閉じます。

async close() -> None

の戻り値の型 :

クライアントを閉じます。


   import os

   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']

   from azure.eventhub.aio import EventHubConsumerClient
   consumer = EventHubConsumerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       consumer_group='$Default',
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

   logger = logging.getLogger("azure.eventhub")

   async def on_event(partition_context, event):
       # Put your code here.
       # If the operation is i/o intensive, async will have better performance.
       logger.info("Received event from partition: {}".format(partition_context.partition_id))

   # The receive method is a coroutine which will be blocking when awaited.
   # It can be executed in an async task for non-blocking behavior, and combined with the 'close' method.

   recv_task = asyncio.ensure_future(consumer.receive(on_event=on_event))
   await asyncio.sleep(3)  # keep receiving for 3 seconds
   recv_task.cancel()  # stop receiving

   # Close down the consumer handler explicitly.
   await consumer.close()

from_connection_string

接続文字列から EventHubConsumerClient を作成します。

from_connection_string(conn_str: str, consumer_group: str, *, eventhub_name: str | None = None, logging_enable: bool = False, http_proxy: Dict[str, str | int] | None = None, auth_timeout: float = 60, user_agent: str | None = None, retry_total: int = 3, transport_type: TransportType = TransportType.Amqp, checkpoint_store: 'CheckpointStore' | None = None, load_balancing_interval: float = 30, **kwargs: Any) -> EventHubConsumerClient

パラメーター

conn_str
str
必須

イベント ハブの接続文字列。

consumer_group
str
必須

このコンシューマー グループのイベント ハブからイベントを受信します。

eventhub_name
str

クライアントを接続する特定のイベント ハブのパス。

logging_enable
bool

ネットワーク トレース ログをロガーに出力するかどうか。 既定値は False です。

http_proxy
dict

HTTP プロキシ設定。 これは、次のキーを持つディクショナリである必要があります: 'proxy_hostname' (str 値) と 'proxy_port' (int 値)。 さらに、次のキーが存在する場合もあります: 'username'、'password'

auth_timeout
float

サービスによってトークンが承認されるまでの待機時間 (秒単位)。 既定値は 60 秒です。 0 に設定すると、クライアントからタイムアウトは適用されません。

user_agent
str

指定した場合、これはユーザー エージェント文字列の前に追加されます。

retry_total
int

エラーが発生した場合に失敗した操作をやり直す試行の合計数。 既定値は 3 です。 受信中の retry_total のコンテキストは特別です。受信メソッドは、各イテレーションで内部 受信 メソッドを呼び出す while ループによって実装されます。 受信ケースでは、retry_totalは while ループの内部受信メソッドによって発生したエラー後の再試行回数を指定します。 再試行が使い果たされると、 on_error コールバックが呼び出され (指定されている場合)、エラー情報が表示されます。 失敗した内部パーティション コンシューマーは閉じられ (指定するとon_partition_close が呼び出されます)、新しい内部パーティション コンシューマーが作成され (指定 された場合はon_partition_initializeが呼び出されます)、受信を再開します。

retry_backoff_factor
float

2 回目の試行後の試行の間に適用されるバックオフ係数 (ほとんどのエラーは、2 回目の試行によって遅延なく直ちに解決されます)。 固定モードでは、再試行ポリシーは常に {backoff factor} のスリープ状態になります。 'exponential' モードでは、再試行ポリシーは {backoff factor} * (2 ** ({合計再試行回数} - 1)) 秒でスリープ状態になります。 backoff_factorが 0.1 の場合、再試行の間に [0.0s,0.2s, 0.4s, ...] がスリープ状態になります。 既定値は 0.8 です。

retry_backoff_max
float

最大バックオフ時間。 既定値は 120 秒 (2 分) です。

retry_mode
str

再試行の間の遅延動作。 サポートされている値は 'fixed' または 'exponential' です。既定値は 'exponential' です。

idle_timeout
float

タイムアウト (秒単位)。 それ以降のアクティビティがない場合、このクライアントは基になる接続を閉じます。 既定では、値は None です。これは、サービスによって開始されない限り、非アクティブのためクライアントがシャットダウンしないことを意味します。

transport_type
TransportType

Event Hubs サービスとの通信に使用されるトランスポート プロトコルの種類。 既定値は TransportType.Amqp です。この場合、ポート 5671 が使用されます。 ネットワーク環境でポート 5671 が使用できないかブロックされている場合は、 代わりに TransportType.AmqpOverWebsocket を使用して、通信にポート 443 を使用できます。

checkpoint_store
Optional[CheckpointStore]

イベントの受信時にパーティション負荷分散とチェックポイント データを格納するマネージャー。 チェックポイント ストアは、すべてのパーティションまたは 1 つのパーティションから受信する場合の両方で使用されます。 後者の場合、負荷分散は適用されません。 チェックポイント ストアが指定されていない場合、チェックポイントはメモリ内に内部的に保持され、 EventHubConsumerClient インスタンスは負荷分散なしでイベントを受信します。

load_balancing_interval
float

負荷分散が開始されたとき。 これは、2 つの負荷分散評価の間隔 (秒単位) です。 既定値は 30 秒です。

partition_ownership_expiration_interval
float

パーティションの所有権は、この秒数後に期限切れになります。 すべての負荷分散評価では、所有権の有効期限が自動的に延長されます。 既定値は 6 * load_balancing_interval、つまり 30 秒の既定のload_balancing_intervalを使用する場合は 180 秒です。

load_balancing_strategy
str または LoadBalancingStrategy

負荷分散が開始されると、この戦略を使用してパーティションの所有権を要求し、バランスを取ります。 "greedy" または LoadBalancingStrategy.GREEDY を使用して、負荷分散の評価ごとに、負荷のバランスを取るために必要な解放されていないパーティションを取得します。 "balanced" または LoadBalancingStrategy.BALANCED は、負荷分散の評価ごとに、他の EventHubConsumerClient によって要求されない 1 つのパーティションのみを要求するバランス戦略に使用します。 EventHub のすべてのパーティションが他の EventHubConsumerClient によって要求され、このクライアントが要求するパーティションが少なすぎる場合、このクライアントは負荷分散戦略に関係なく、負荷分散評価ごとに他のクライアントから 1 つのパーティションを盗みます。 Greedy 戦略は既定で使用されます。

custom_endpoint_address
Optional[str]

Event Hubs サービスへの接続を確立するために使用するカスタム エンドポイント アドレス。これにより、ホスト環境に必要な任意のアプリケーション ゲートウェイまたはその他のパスを介してネットワーク要求をルーティングできます。 既定値はなしです。 形式は "sb://< custom_endpoint_hostname>:<custom_endpoint_port>" のようになります。 custom_endpoint_addressで port が指定されていない場合、既定ではポート 443 が使用されます。

connection_verify
Optional[str]

接続エンドポイントの ID を認証するために使用される SSL 証明書のカスタム CA_BUNDLE ファイルへのパス。 既定値は None で、その場合 は certifi.where() が使用されます。

uamqp_transport
bool

基になるトランスポートとして uamqp ライブラリを使用するかどうか。 既定値は False で、純粋な Python AMQP ライブラリが基になるトランスポートとして使用されます。

の戻り値の型 :

接続文字列から EventHubConsumerClient の新しいインスタンスを作成します。


   import os
   from azure.eventhub.aio import EventHubConsumerClient
   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   consumer = EventHubConsumerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       consumer_group='$Default',
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

get_eventhub_properties

イベント ハブのプロパティを取得します。

返されるディクショナリのキーは次のとおりです。

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

async get_eventhub_properties() -> Dict[str, Any]

戻り値

イベント ハブに関する情報を含むディクショナリ。

の戻り値の型 :

例外

get_partition_ids

イベント ハブのパーティション ID を取得します。

async get_partition_ids() -> List[str]

戻り値

パーティション ID の一覧。

の戻り値の型 :

例外

get_partition_properties

指定したパーティションのプロパティを取得します。

プロパティ ディクショナリのキーは次のとおりです。

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

async get_partition_properties(partition_id: str) -> Dict[str, Any]

パラメーター

partition_id
str
必須

ターゲット パーティション ID。

戻り値

パーティション プロパティを含むディクショナリ。

の戻り値の型 :

例外

receive

オプションの負荷分散とチェックポイント処理を使用して、パーティションからイベントを受信します。

async receive(on_event: Callable[['PartitionContext', 'EventData' | None], Awaitable[None]], *, max_wait_time: float | None = None, partition_id: str | None = None, owner_level: int | None = None, prefetch: int = 300, track_last_enqueued_event_properties: bool = False, starting_position: str | int | datetime.datetime | Dict[str, Any] | None = None, starting_position_inclusive: bool | Dict[str, bool] = False, on_error: Callable[['PartitionContext', Exception], Awaitable[None]] | None = None, on_partition_initialize: Callable[['PartitionContext'], Awaitable[None]] | None = None, on_partition_close: Callable[['PartitionContext', 'CloseReason'], Awaitable[None]] | None = None) -> None

パラメーター

on_event
Callable[PartitionContext, Optional[EventData]]
必須

受信したイベントを処理するためのコールバック関数。 コールバックは、パーティション コンテキストを含む partition_context と、受信したイベントである イベント という 2 つのパラメーターを受け取ります。 コールバック関数は、次のように定義する必要があります: on_event(partition_context, event)。 パーティション コンテキストの詳細については、 を PartitionContext参照してください。

max_wait_time
float

イベント プロセッサがコールバックを呼び出す前に待機する最大間隔 (秒単位)。 この期間内にイベントが受信されない場合、 on_event コールバックは None で呼び出されます。 この値が None または 0 (既定値) に設定されている場合、イベントを受信するまでコールバックは呼び出されません。

partition_id
str

指定した場合、クライアントはこのパーティションからのみ受信します。 それ以外の場合、クライアントはすべてのパーティションから受信します。

owner_level
int

排他的コンシューマーの優先順位。 owner_levelが設定されている場合は、排他的コンシューマーが作成されます。 より高いowner_levelを持つコンシューマーは、より高い排他的優先度を持ちます。 所有者レベルは、コンシューマーの "エポック値" とも認識されます。

prefetch
int

処理のためにサービスからプリフェッチするイベントの数。 既定値は 300 です。

track_last_enqueued_event_properties
bool

コンシューマーが、関連付けられたパーティションで最後にエンキューされたイベントに関する情報を要求し、イベントの受信時にその情報を追跡する必要があるかどうかを示します。 パーティションの最後にエンキューされたイベントに関する情報が追跡されている場合、Event Hubs サービスから受信した各イベントは、パーティションに関するメタデータを保持します。 これにより、イベント ハブ クライアントを使用してパーティション プロパティの要求を定期的に行う場合と見なされる場合、通常は有利なトレードオフとなるネットワーク帯域幅の使用量が少なくなります。 既定では False に設定されています。

starting_position
str, int, datetime または dict[str,any]

パーティションのチェックポイント データがない場合は、このイベント位置からの受信を開始します。 チェックポイント データは、使用可能な場合に使用されます。 これは、キーとしてパーティション ID を持ち、個々のパーティションの値としての位置を持つディクテーション、またはすべてのパーティションに対して 1 つの値にすることができます。 値の型には、str、int、datetime.datetime を指定できます。 ストリームの先頭から受信するための値 "-1" と、新しいイベントのみを受信するための "@latest" もサポートされています。

starting_position_inclusive
bool または dict[str,bool]

指定したstarting_positionが inclusive(=) であるかどうかを判断します (>>)。 インクルーシブの場合は True、排他の場合は False。 これは、パーティション ID をキーとして、値として bool を指定して、特定のパーティションのstarting_positionが包括的かどうかを示す dict にすることができます。 これは、すべてのstarting_positionに対して 1 つのブール値にすることもできます。 既定値は False です。

on_error
Callable[[PartitionContext, Exception]]

再試行が使い果たされた後、または負荷分散の処理中に、受信中にエラーが発生したときに呼び出されるコールバック関数。 コールバックは、パーティション情報を含む partition_context と例外である エラー という 2 つのパラメーターを受け取ります。 負荷分散の 処理中にエラーが発生した場合は、partition_context None になる可能性があります。 コールバックは次のように定義する必要があります: on_error(partition_context, error). on_errorコールバックは、on_eventコールバック中にハンドルされない例外が発生した場合にも呼び出されます。

on_partition_initialize
Callable[[PartitionContext]]

特定のパーティションのコンシューマーが初期化を完了した後に呼び出されるコールバック関数。 また、失敗した閉じられた内部パーティション コンシューマーの受信プロセスを引き継ぐ新しい内部パーティション コンシューマーが作成されるときにも呼び出されます。 コールバックは、パーティション情報を含む partition_context という 1 つのパラメーターを受け取ります。 コールバックは、 on_partition_initialize(partition_context)のように定義する必要があります。

on_partition_close
Callable[[PartitionContext, CloseReason]]

特定のパーティションのコンシューマーが閉じられた後に呼び出されるコールバック関数。 再試行が使い果たされた後の受信中にエラーが発生した場合にも呼び出されます。 コールバックは、パーティション情報とクローズの理由を含む partition_context という 2 つのパラメーターを受け取ります。 コールバックは次のように定義する必要があります: on_partition_close(partition_context, reason). 閉会の理由はこちらをご覧 CloseReason ください。

の戻り値の型 :

EventHub からイベントを受信します。


       logger = logging.getLogger("azure.eventhub")

       async def on_event(partition_context, event):
           # Put your code here.
           # If the operation is i/o intensive, async will have better performance.
           logger.info("Received event from partition: {}".format(partition_context.partition_id))

       async with consumer:
           await consumer.receive(
               on_event=on_event,
               starting_position="-1",  # "-1" is from the beginning of the partition.
           )

receive_batch

オプションの負荷分散とチェックポイント処理を使用して、バッチでパーティションからイベントを受信します。

async receive_batch(on_event_batch: Callable[['PartitionContext', List['EventData']], Awaitable[None]], *, max_batch_size: int = 300, max_wait_time: float | None = None, partition_id: str | None = None, owner_level: int | None = None, prefetch: int = 300, track_last_enqueued_event_properties: bool = False, starting_position: str | int | datetime.datetime | Dict[str, Any] | None = None, starting_position_inclusive: bool | Dict[str, bool] = False, on_error: Callable[['PartitionContext', Exception], Awaitable[None]] | None = None, on_partition_initialize: Callable[['PartitionContext'], Awaitable[None]] | None = None, on_partition_close: Callable[['PartitionContext', 'CloseReason'], Awaitable[None]] | None = None) -> None

パラメーター

on_event_batch
Callable[PartitionContext, List[EventData]]
必須

受信したイベントのバッチを処理するためのコールバック関数。 コールバックは、パーティション コンテキストを含む partition_context と、受信したイベント である event_batch の 2 つのパラメーターを受け取ります。 コールバック関数は、次のように定義する必要があります: on_event_batch(partition_context、event_batch)max_wait_time が None でも 0 でもなく、 max_wait_time 後にイベントが受信されない場合、 event_batchは空のリストになる可能性があります。 パーティション コンテキストの詳細については、 を PartitionContext参照してください。

max_batch_size
int

コールバック on_event_batchに渡されるバッチ内のイベントの最大数。 実際に受信したイベント数が max_batch_sizeを超える場合、受信したイベントはバッチに分割され、最大 max_batch_size イベントを含む各バッチのコールバックが呼び出されます。

max_wait_time
float

イベント プロセッサがコールバックを呼び出す前に待機する最大間隔 (秒単位)。 この間隔でイベントが受信されない場合、 on_event_batch コールバックは空のリストで呼び出されます。 この値が None または 0 (既定値) に設定されている場合、イベントが受信されるまでコールバックは呼び出されません。

partition_id
str

指定した場合、クライアントはこのパーティションからのみ受信します。 それ以外の場合、クライアントはすべてのパーティションから受信します。

owner_level
int

排他的コンシューマーの優先順位。 owner_levelが設定されている場合は、排他的コンシューマーが作成されます。 より高いowner_levelを持つコンシューマーは、より高い排他的優先順位を持ちます。 所有者レベルは、コンシューマーの "エポック値" とも認識されます。

prefetch
int

処理のためにサービスからプリフェッチするイベントの数。 既定値は 300 です。

track_last_enqueued_event_properties
bool

コンシューマーが関連付けられたパーティションで最後にエンキューされたイベントに関する情報を要求し、イベントの受信時にその情報を追跡するかどうかを示します。 パーティションの last-enqueued イベントに関する情報が追跡されている場合、Event Hubs サービスから受信した各イベントは、パーティションに関するメタデータを保持します。 これにより、Event Hub クライアントを使用してパーティション プロパティの要求を定期的に行うことを考慮すると、通常は有利なトレードオフとなる、追加のネットワーク帯域幅消費が少なくなります。 既定では False に設定されています。

starting_position
str, int, datetime または dict[str,any]

パーティションのチェックポイント データがない場合は、このイベント位置からの受信を開始します。 チェックポイント データが使用可能な場合は使用されます。 これは、キーとしてパーティション ID を持ち、個々のパーティションの値として位置を持つディクテーション、またはすべてのパーティションに対して 1 つの値にすることができます。 値の型は、str、int、または datetime.datetime です。 また、ストリームの先頭から受信するための値 "-1" と、新しいイベントのみを受信するための "@latest" もサポートされています。

starting_position_inclusive
bool または dict[str,bool]

指定したstarting_positionが包括的 (=) であるかどうかを判断します (>>)。 包括の場合は True、排他的の場合は False。 これは、キーとしてパーティション ID を持つディクテーション、および特定のパーティションのstarting_positionが包括的かどうかを示す値として bool にすることができます。 これは、すべてのstarting_positionの 1 つのブール値にすることもできます。 既定値は False です。

on_error
Callable[[PartitionContext, Exception]]

再試行が使い果たされた後、または負荷分散の処理中に受信中にエラーが発生したときに呼び出されるコールバック関数。 コールバックは、パーティション情報を含む partition_context と例外である エラー の 2 つのパラメーターを受け取ります。 負荷分散 の処理中にエラーが発生した場合、partition_contextは None になる可能性があります。 コールバックは次のように定義する必要があります: on_error(partition_context, error). on_event コールバック中に未処理の例外が発生した場合は、on_error コールバックも呼び出されます。

on_partition_initialize
Callable[[PartitionContext]]

特定のパーティションのコンシューマーが初期化を完了した後に呼び出されるコールバック関数。 また、障害が発生し、閉じられた内部パーティション コンシューマーの受信プロセスを引き継ぐ新しい内部パーティション コンシューマーが作成されるときにも呼び出されます。 コールバックは、パーティション情報を含む partition_context という 1 つのパラメーターを受け取ります。 コールバックは次のように定義する必要があります: on_partition_initialize(partition_context)

on_partition_close
Callable[[PartitionContext, CloseReason]]

特定のパーティションのコンシューマーが閉じられた後に呼び出されるコールバック関数。 再試行が使い果たされた後の受信中にエラーが発生した場合にも呼び出されます。 コールバックは、パーティション情報とクローズの理由を含む partition_context という 2 つのパラメーターを受け取ります。 コールバックは次のように定義する必要があります: on_partition_close(partition_context, reason). 閉会の理由はこちらをご覧 CloseReason ください。

の戻り値の型 :

EventHub からバッチでイベントを受信します。


       logger = logging.getLogger("azure.eventhub")

       async def on_event_batch(partition_context, event_batch):
           # Put your code here.
           # If the operation is i/o intensive, async will have better performance.
           logger.info(
               "{} events received from partition: {}".format(len(event_batch), partition_context.partition_id)
           )

       async with consumer:
           await consumer.receive_batch(
               on_event_batch=on_event_batch,
               starting_position="-1",  # "-1" is from the beginning of the partition.
           )