EventHubConsumerClient クラス
EventHubConsumerClient クラスは、Azure Event Hubs サービスからイベントを受信するための高度なインターフェイスを定義します。
EventHubConsumerClient のメインの目標は、負荷分散とチェックポイント処理を使用して EventHub のすべてのパーティションからイベントを受信することです。
同じイベント ハブ、コンシューマー グループ、チェックポイントの場所に対して複数の EventHubConsumerClient インスタンスが実行されている場合、パーティションはそれらの間で均等に分散されます。
負荷分散と永続化されたチェックポイントを有効にするには、 EventHubConsumerClient の作成時にcheckpoint_storeを設定する必要があります。 チェックポイント ストアが指定されていない場合、チェックポイントはメモリ内で内部的に維持されます。
EventHubConsumerClient は、メソッド receive() または receive_batch() を呼び出してpartition_idを指定するときに、特定のパーティションから受信することもできます。 負荷分散は、単一パーティション モードでは機能しません。 ただし、checkpoint_storeが設定されている場合でも、ユーザーはチェックポイントを保存できます。
- 継承
-
azure.eventhub.aio._client_base_async.ClientBaseAsyncEventHubConsumerClient
コンストラクター
EventHubConsumerClient(fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, credential: CredentialTypes, **kwargs: Any)
パラメーター
- credential
- AsyncTokenCredential または AzureSasCredential または AzureNamedKeyCredential
トークンを取得するための特定のインターフェイスを実装する認証に使用される資格情報オブジェクト。 azure-identity ライブラリによって生成された 、または資格情報オブジェクトと、*get_token(self, scopes) メソッドを実装するオブジェクトを受け入れますEventHubSharedKeyCredential。
- logging_enable
- bool
ネットワーク トレース ログをロガーに出力するかどうか。 既定値は False です。
- auth_timeout
- float
サービスによってトークンが承認されるまでの待機時間 (秒単位)。 既定値は 60 秒です。 0 に設定すると、クライアントからタイムアウトは適用されません。
- user_agent
- str
指定した場合、これはユーザー エージェント文字列の前に追加されます。
- retry_total
- int
エラーが発生した場合に失敗した操作をやり直す試行の合計数。 既定値は 3 です。 受信中の retry_total のコンテキストは特別です。受信メソッドは、各イテレーションで内部 受信 メソッドを呼び出す while ループによって実装されます。 受信ケースでは、retry_totalは while ループの内部受信メソッドによって発生したエラー後の再試行回数を指定します。 再試行が使い果たされると、 on_error コールバックが呼び出され (指定されている場合)、エラー情報が表示されます。 失敗した内部パーティション コンシューマーは閉じられ (指定するとon_partition_close が呼び出されます)、新しい内部パーティション コンシューマーが作成され (指定 された場合はon_partition_initializeが呼び出されます)、受信を再開します。
- retry_backoff_factor
- float
2 回目の試行後の試行の間に適用されるバックオフ係数 (ほとんどのエラーは、2 回目の試行によって遅延なく直ちに解決されます)。 固定モードでは、再試行ポリシーは常に {backoff factor} のスリープ状態になります。 'exponential' モードでは、再試行ポリシーは {backoff factor} * (2 ** ({合計再試行回数} - 1)) 秒でスリープ状態になります。 backoff_factorが 0.1 の場合、再試行の間に [0.0s, 0.2s, 0.4s, ...] の再試行がスリープ状態になります。 既定値は 0.8 です。
- retry_backoff_max
- float
最大バックオフ時間。 既定値は 120 秒 (2 分) です。
- retry_mode
- str
再試行の間の遅延動作。 サポートされている値は 'fixed' または 'exponential' で、既定値は 'exponential' です。
- idle_timeout
- float
タイムアウト (秒単位)。 それ以降のアクティビティがない場合、このクライアントは基になる接続を閉じます。 既定では、値は None です。つまり、サービスによって開始されない限り、非アクティブのためクライアントはシャットダウンされません。
- transport_type
- TransportType
Event Hubs サービスとの通信に使用されるトランスポート プロトコルの種類。 既定値は TransportType.Amqp です。この場合、ポート 5671 が使用されます。 ネットワーク環境でポート 5671 が使用できないかブロックされている場合は、 代わりに TransportType.AmqpOverWebsocket を使用して、通信にポート 443 を使用できます。
- http_proxy
HTTP プロキシ設定。 これは、次のキーを持つディクショナリである必要があります: 'proxy_hostname' (str 値) と 'proxy_port' (int 値)。
- checkpoint_store
- Optional[CheckpointStore]
イベントの受信時にパーティション負荷分散とチェックポイント データを格納するマネージャー。 チェックポイント ストアは、すべてのパーティションまたは 1 つのパーティションから受信する場合の両方で使用されます。 後者の場合、負荷分散は適用されません。 チェックポイント ストアが指定されていない場合、チェックポイントはメモリ内で内部的に維持され、 EventHubConsumerClient インスタンスは負荷分散なしでイベントを受信します。
- load_balancing_interval
- float
負荷分散が開始されたとき。 これは、2 つの負荷分散評価の間隔 (秒単位) です。 既定値は 30 秒です。
- partition_ownership_expiration_interval
- float
パーティションの所有権は、この秒数後に期限切れになります。 すべての負荷分散評価では、所有権の有効期限が自動的に延長されます。 既定値は 6 * load_balancing_interval、既定のload_balancing_interval 30 秒を使用する場合は 180 秒です。
- load_balancing_strategy
- str または LoadBalancingStrategy
負荷分散が開始されると、この戦略を使用してパーティションの所有権を要求し、バランスを取ります。 "greedy" または LoadBalancingStrategy.GREEDY を greedy 戦略に使用します。これは、負荷分散評価ごとに、負荷のバランスを取るために必要な数の未請求パーティションを取得します。 "balanced" または LoadBalancingStrategy.BALANCED を使用して、負荷分散評価ごとに、他の EventHubConsumerClient によって要求されない 1 つのパーティションのみを要求するバランス戦略を行います。 EventHub のすべてのパーティションが他の EventHubConsumerClient によって要求され、このクライアントが要求するパーティションが少なすぎる場合、このクライアントは負荷分散戦略に関係なく、負荷分散評価ごとに他のクライアントから 1 つのパーティションを盗みます。 Greedy 戦略は既定で使用されます。
Event Hubs サービスへの接続を確立するために使用するカスタム エンドポイント アドレス。これにより、ホスト環境に必要なアプリケーション ゲートウェイまたはその他のパスを介してネットワーク要求をルーティングできます。 既定値はなしです。 形式は "sb://< custom_endpoint_hostname>:<custom_endpoint_port>" のようになります。 custom_endpoint_addressで port が指定されていない場合、既定ではポート 443 が使用されます。
接続エンドポイントの ID を認証するために使用される SSL 証明書のカスタム CA_BUNDLE ファイルへのパス。 既定値は None で、その場合 は certifi.where() が使用されます。
- uamqp_transport
- bool
基になるトランスポートとして uamqp ライブラリを使用するかどうか。 既定値は False で、Pure Python AMQP ライブラリが基になるトランスポートとして使用されます。
- socket_timeout
- float
データの送受信時に接続の基になるソケットがタイムアウトするまでに待機する時間 (秒単位)。既定値は、TransportType.Amqp の場合は 0.2、TransportType.AmqpOverWebsocket の場合は 1 です。 書き込みのタイムアウトが原因で EventHubsConnectionError エラーが発生している場合は、既定値より大きい値を渡す必要があります。 これは高度な使用シナリオの場合であり、通常は既定値で十分です。
例
EventHubConsumerClient の新しいインスタンスを作成します。
import os
from azure.eventhub.aio import EventHubConsumerClient, EventHubSharedKeyCredential
fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
eventhub_name = os.environ['EVENT_HUB_NAME']
shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
shared_access_key = os.environ['EVENT_HUB_SAS_KEY']
consumer = EventHubConsumerClient(fully_qualified_namespace=fully_qualified_namespace,
consumer_group='$Default',
eventhub_name=eventhub_name,
credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key))
メソッド
close |
イベント ハブからのイベントの取得を停止し、基になる AMQP 接続とリンクを閉じます。 |
from_connection_string |
接続文字列から EventHubConsumerClient を作成します。 |
get_eventhub_properties |
イベント ハブのプロパティを取得します。 返されるディクショナリのキーは次のとおりです。
|
get_partition_ids |
イベント ハブのパーティション ID を取得します。 |
get_partition_properties |
指定したパーティションのプロパティを取得します。 プロパティ ディクショナリのキーは次のとおりです。
|
receive |
オプションの負荷分散とチェックポイント処理を使用して、パーティションからイベントを受信します。 |
receive_batch |
オプションの負荷分散とチェックポイント処理を使用して、バッチでパーティションからイベントを受信します。 |
close
イベント ハブからのイベントの取得を停止し、基になる AMQP 接続とリンクを閉じます。
async close() -> None
の戻り値の型 :
例
クライアントを閉じます。
import os
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
from azure.eventhub.aio import EventHubConsumerClient
consumer = EventHubConsumerClient.from_connection_string(
conn_str=event_hub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
logger = logging.getLogger("azure.eventhub")
async def on_event(partition_context, event):
# Put your code here.
# If the operation is i/o intensive, async will have better performance.
logger.info("Received event from partition: {}".format(partition_context.partition_id))
# The receive method is a coroutine which will be blocking when awaited.
# It can be executed in an async task for non-blocking behavior, and combined with the 'close' method.
recv_task = asyncio.ensure_future(consumer.receive(on_event=on_event))
await asyncio.sleep(3) # keep receiving for 3 seconds
recv_task.cancel() # stop receiving
# Close down the consumer handler explicitly.
await consumer.close()
from_connection_string
接続文字列から EventHubConsumerClient を作成します。
from_connection_string(conn_str: str, consumer_group: str, *, eventhub_name: str | None = None, logging_enable: bool = False, http_proxy: Dict[str, str | int] | None = None, auth_timeout: float = 60, user_agent: str | None = None, retry_total: int = 3, transport_type: TransportType = TransportType.Amqp, checkpoint_store: 'CheckpointStore' | None = None, load_balancing_interval: float = 30, **kwargs: Any) -> EventHubConsumerClient
パラメーター
- eventhub_name
- str
クライアントを接続する特定のイベント ハブのパス。
- logging_enable
- bool
ネットワーク トレース ログをロガーに出力するかどうか。 既定値は False です。
- http_proxy
- dict
HTTP プロキシ設定。 これは、次のキーを持つディクショナリである必要があります: 'proxy_hostname' (str 値) と 'proxy_port' (int 値)。 さらに、次のキーが存在する場合もあります: 'username'、'password'。
- auth_timeout
- float
サービスによってトークンが承認されるまでの待機時間 (秒単位)。 既定値は 60 秒です。 0 に設定すると、クライアントからタイムアウトは適用されません。
- user_agent
- str
指定した場合、これはユーザー エージェント文字列の前に追加されます。
- retry_total
- int
エラーが発生した場合に失敗した操作をやり直す試行の合計数。 既定値は 3 です。 受信中の retry_total のコンテキストは特別です。受信メソッドは、各イテレーションで内部 受信 メソッドを呼び出す while ループによって実装されます。 受信ケースでは、retry_totalは while ループの内部受信メソッドによって発生したエラー後の再試行回数を指定します。 再試行が使い果たされると、 on_error コールバックが呼び出され (指定されている場合)、エラー情報が表示されます。 失敗した内部パーティション コンシューマーは閉じられ (指定するとon_partition_close が呼び出されます)、新しい内部パーティション コンシューマーが作成され (指定 された場合はon_partition_initializeが呼び出されます)、受信を再開します。
- retry_backoff_factor
- float
2 回目の試行後の試行の間に適用されるバックオフ係数 (ほとんどのエラーは、2 回目の試行によって遅延なく直ちに解決されます)。 固定モードでは、再試行ポリシーは常に {backoff factor} のスリープ状態になります。 'exponential' モードでは、再試行ポリシーは {backoff factor} * (2 ** ({合計再試行回数} - 1)) 秒でスリープ状態になります。 backoff_factorが 0.1 の場合、再試行の間に [0.0s,0.2s, 0.4s, ...] がスリープ状態になります。 既定値は 0.8 です。
- retry_backoff_max
- float
最大バックオフ時間。 既定値は 120 秒 (2 分) です。
- retry_mode
- str
再試行の間の遅延動作。 サポートされている値は 'fixed' または 'exponential' です。既定値は 'exponential' です。
- idle_timeout
- float
タイムアウト (秒単位)。 それ以降のアクティビティがない場合、このクライアントは基になる接続を閉じます。 既定では、値は None です。これは、サービスによって開始されない限り、非アクティブのためクライアントがシャットダウンしないことを意味します。
- transport_type
- TransportType
Event Hubs サービスとの通信に使用されるトランスポート プロトコルの種類。 既定値は TransportType.Amqp です。この場合、ポート 5671 が使用されます。 ネットワーク環境でポート 5671 が使用できないかブロックされている場合は、 代わりに TransportType.AmqpOverWebsocket を使用して、通信にポート 443 を使用できます。
- checkpoint_store
- Optional[CheckpointStore]
イベントの受信時にパーティション負荷分散とチェックポイント データを格納するマネージャー。 チェックポイント ストアは、すべてのパーティションまたは 1 つのパーティションから受信する場合の両方で使用されます。 後者の場合、負荷分散は適用されません。 チェックポイント ストアが指定されていない場合、チェックポイントはメモリ内に内部的に保持され、 EventHubConsumerClient インスタンスは負荷分散なしでイベントを受信します。
- load_balancing_interval
- float
負荷分散が開始されたとき。 これは、2 つの負荷分散評価の間隔 (秒単位) です。 既定値は 30 秒です。
- partition_ownership_expiration_interval
- float
パーティションの所有権は、この秒数後に期限切れになります。 すべての負荷分散評価では、所有権の有効期限が自動的に延長されます。 既定値は 6 * load_balancing_interval、つまり 30 秒の既定のload_balancing_intervalを使用する場合は 180 秒です。
- load_balancing_strategy
- str または LoadBalancingStrategy
負荷分散が開始されると、この戦略を使用してパーティションの所有権を要求し、バランスを取ります。 "greedy" または LoadBalancingStrategy.GREEDY を使用して、負荷分散の評価ごとに、負荷のバランスを取るために必要な解放されていないパーティションを取得します。 "balanced" または LoadBalancingStrategy.BALANCED は、負荷分散の評価ごとに、他の EventHubConsumerClient によって要求されない 1 つのパーティションのみを要求するバランス戦略に使用します。 EventHub のすべてのパーティションが他の EventHubConsumerClient によって要求され、このクライアントが要求するパーティションが少なすぎる場合、このクライアントは負荷分散戦略に関係なく、負荷分散評価ごとに他のクライアントから 1 つのパーティションを盗みます。 Greedy 戦略は既定で使用されます。
Event Hubs サービスへの接続を確立するために使用するカスタム エンドポイント アドレス。これにより、ホスト環境に必要な任意のアプリケーション ゲートウェイまたはその他のパスを介してネットワーク要求をルーティングできます。 既定値はなしです。 形式は "sb://< custom_endpoint_hostname>:<custom_endpoint_port>" のようになります。 custom_endpoint_addressで port が指定されていない場合、既定ではポート 443 が使用されます。
接続エンドポイントの ID を認証するために使用される SSL 証明書のカスタム CA_BUNDLE ファイルへのパス。 既定値は None で、その場合 は certifi.where() が使用されます。
- uamqp_transport
- bool
基になるトランスポートとして uamqp ライブラリを使用するかどうか。 既定値は False で、純粋な Python AMQP ライブラリが基になるトランスポートとして使用されます。
の戻り値の型 :
例
接続文字列から EventHubConsumerClient の新しいインスタンスを作成します。
import os
from azure.eventhub.aio import EventHubConsumerClient
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
consumer = EventHubConsumerClient.from_connection_string(
conn_str=event_hub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
get_eventhub_properties
イベント ハブのプロパティを取得します。
返されるディクショナリのキーは次のとおりです。
eventhub_name (str)
created_at (UTC datetime.datetime)
partition_ids (list[str])
async get_eventhub_properties() -> Dict[str, Any]
戻り値
イベント ハブに関する情報を含むディクショナリ。
の戻り値の型 :
例外
get_partition_ids
イベント ハブのパーティション ID を取得します。
async get_partition_ids() -> List[str]
戻り値
パーティション ID の一覧。
の戻り値の型 :
例外
get_partition_properties
指定したパーティションのプロパティを取得します。
プロパティ ディクショナリのキーは次のとおりです。
eventhub_name (str)
id (str)
beginning_sequence_number (int)
last_enqueued_sequence_number (int)
last_enqueued_offset (str)
last_enqueued_time_utc (UTC datetime.datetime)
is_empty (bool)
async get_partition_properties(partition_id: str) -> Dict[str, Any]
パラメーター
戻り値
パーティション プロパティを含むディクショナリ。
の戻り値の型 :
例外
receive
オプションの負荷分散とチェックポイント処理を使用して、パーティションからイベントを受信します。
async receive(on_event: Callable[['PartitionContext', 'EventData' | None], Awaitable[None]], *, max_wait_time: float | None = None, partition_id: str | None = None, owner_level: int | None = None, prefetch: int = 300, track_last_enqueued_event_properties: bool = False, starting_position: str | int | datetime.datetime | Dict[str, Any] | None = None, starting_position_inclusive: bool | Dict[str, bool] = False, on_error: Callable[['PartitionContext', Exception], Awaitable[None]] | None = None, on_partition_initialize: Callable[['PartitionContext'], Awaitable[None]] | None = None, on_partition_close: Callable[['PartitionContext', 'CloseReason'], Awaitable[None]] | None = None) -> None
パラメーター
- on_event
- Callable[PartitionContext, Optional[EventData]]
受信したイベントを処理するためのコールバック関数。 コールバックは、パーティション コンテキストを含む partition_context と、受信したイベントである イベント という 2 つのパラメーターを受け取ります。 コールバック関数は、次のように定義する必要があります: on_event(partition_context, event)。 パーティション コンテキストの詳細については、 を PartitionContext参照してください。
- max_wait_time
- float
イベント プロセッサがコールバックを呼び出す前に待機する最大間隔 (秒単位)。 この期間内にイベントが受信されない場合、 on_event コールバックは None で呼び出されます。 この値が None または 0 (既定値) に設定されている場合、イベントを受信するまでコールバックは呼び出されません。
- partition_id
- str
指定した場合、クライアントはこのパーティションからのみ受信します。 それ以外の場合、クライアントはすべてのパーティションから受信します。
- owner_level
- int
排他的コンシューマーの優先順位。 owner_levelが設定されている場合は、排他的コンシューマーが作成されます。 より高いowner_levelを持つコンシューマーは、より高い排他的優先度を持ちます。 所有者レベルは、コンシューマーの "エポック値" とも認識されます。
- prefetch
- int
処理のためにサービスからプリフェッチするイベントの数。 既定値は 300 です。
- track_last_enqueued_event_properties
- bool
コンシューマーが、関連付けられたパーティションで最後にエンキューされたイベントに関する情報を要求し、イベントの受信時にその情報を追跡する必要があるかどうかを示します。 パーティションの最後にエンキューされたイベントに関する情報が追跡されている場合、Event Hubs サービスから受信した各イベントは、パーティションに関するメタデータを保持します。 これにより、イベント ハブ クライアントを使用してパーティション プロパティの要求を定期的に行う場合と見なされる場合、通常は有利なトレードオフとなるネットワーク帯域幅の使用量が少なくなります。 既定では False に設定されています。
パーティションのチェックポイント データがない場合は、このイベント位置からの受信を開始します。 チェックポイント データは、使用可能な場合に使用されます。 これは、キーとしてパーティション ID を持ち、個々のパーティションの値としての位置を持つディクテーション、またはすべてのパーティションに対して 1 つの値にすることができます。 値の型には、str、int、datetime.datetime を指定できます。 ストリームの先頭から受信するための値 "-1" と、新しいイベントのみを受信するための "@latest" もサポートされています。
指定したstarting_positionが inclusive(=) であるかどうかを判断します (>>)。 インクルーシブの場合は True、排他の場合は False。 これは、パーティション ID をキーとして、値として bool を指定して、特定のパーティションのstarting_positionが包括的かどうかを示す dict にすることができます。 これは、すべてのstarting_positionに対して 1 つのブール値にすることもできます。 既定値は False です。
- on_error
- Callable[[PartitionContext, Exception]]
再試行が使い果たされた後、または負荷分散の処理中に、受信中にエラーが発生したときに呼び出されるコールバック関数。 コールバックは、パーティション情報を含む partition_context と例外である エラー という 2 つのパラメーターを受け取ります。 負荷分散の 処理中にエラーが発生した場合は、partition_context None になる可能性があります。 コールバックは次のように定義する必要があります: on_error(partition_context, error). on_errorコールバックは、on_eventコールバック中にハンドルされない例外が発生した場合にも呼び出されます。
- on_partition_initialize
- Callable[[PartitionContext]]
特定のパーティションのコンシューマーが初期化を完了した後に呼び出されるコールバック関数。 また、失敗した閉じられた内部パーティション コンシューマーの受信プロセスを引き継ぐ新しい内部パーティション コンシューマーが作成されるときにも呼び出されます。 コールバックは、パーティション情報を含む partition_context という 1 つのパラメーターを受け取ります。 コールバックは、 on_partition_initialize(partition_context)のように定義する必要があります。
- on_partition_close
- Callable[[PartitionContext, CloseReason]]
特定のパーティションのコンシューマーが閉じられた後に呼び出されるコールバック関数。 再試行が使い果たされた後の受信中にエラーが発生した場合にも呼び出されます。 コールバックは、パーティション情報とクローズの理由を含む partition_context という 2 つのパラメーターを受け取ります。 コールバックは次のように定義する必要があります: on_partition_close(partition_context, reason). 閉会の理由はこちらをご覧 CloseReason ください。
の戻り値の型 :
例
EventHub からイベントを受信します。
logger = logging.getLogger("azure.eventhub")
async def on_event(partition_context, event):
# Put your code here.
# If the operation is i/o intensive, async will have better performance.
logger.info("Received event from partition: {}".format(partition_context.partition_id))
async with consumer:
await consumer.receive(
on_event=on_event,
starting_position="-1", # "-1" is from the beginning of the partition.
)
receive_batch
オプションの負荷分散とチェックポイント処理を使用して、バッチでパーティションからイベントを受信します。
async receive_batch(on_event_batch: Callable[['PartitionContext', List['EventData']], Awaitable[None]], *, max_batch_size: int = 300, max_wait_time: float | None = None, partition_id: str | None = None, owner_level: int | None = None, prefetch: int = 300, track_last_enqueued_event_properties: bool = False, starting_position: str | int | datetime.datetime | Dict[str, Any] | None = None, starting_position_inclusive: bool | Dict[str, bool] = False, on_error: Callable[['PartitionContext', Exception], Awaitable[None]] | None = None, on_partition_initialize: Callable[['PartitionContext'], Awaitable[None]] | None = None, on_partition_close: Callable[['PartitionContext', 'CloseReason'], Awaitable[None]] | None = None) -> None
パラメーター
- on_event_batch
- Callable[PartitionContext, List[EventData]]
受信したイベントのバッチを処理するためのコールバック関数。 コールバックは、パーティション コンテキストを含む partition_context と、受信したイベント である event_batch の 2 つのパラメーターを受け取ります。 コールバック関数は、次のように定義する必要があります: on_event_batch(partition_context、event_batch)。 max_wait_time が None でも 0 でもなく、 max_wait_time 後にイベントが受信されない場合、 event_batchは空のリストになる可能性があります。 パーティション コンテキストの詳細については、 を PartitionContext参照してください。
- max_batch_size
- int
コールバック on_event_batchに渡されるバッチ内のイベントの最大数。 実際に受信したイベント数が max_batch_sizeを超える場合、受信したイベントはバッチに分割され、最大 max_batch_size イベントを含む各バッチのコールバックが呼び出されます。
- max_wait_time
- float
イベント プロセッサがコールバックを呼び出す前に待機する最大間隔 (秒単位)。 この間隔でイベントが受信されない場合、 on_event_batch コールバックは空のリストで呼び出されます。 この値が None または 0 (既定値) に設定されている場合、イベントが受信されるまでコールバックは呼び出されません。
- partition_id
- str
指定した場合、クライアントはこのパーティションからのみ受信します。 それ以外の場合、クライアントはすべてのパーティションから受信します。
- owner_level
- int
排他的コンシューマーの優先順位。 owner_levelが設定されている場合は、排他的コンシューマーが作成されます。 より高いowner_levelを持つコンシューマーは、より高い排他的優先順位を持ちます。 所有者レベルは、コンシューマーの "エポック値" とも認識されます。
- prefetch
- int
処理のためにサービスからプリフェッチするイベントの数。 既定値は 300 です。
- track_last_enqueued_event_properties
- bool
コンシューマーが関連付けられたパーティションで最後にエンキューされたイベントに関する情報を要求し、イベントの受信時にその情報を追跡するかどうかを示します。 パーティションの last-enqueued イベントに関する情報が追跡されている場合、Event Hubs サービスから受信した各イベントは、パーティションに関するメタデータを保持します。 これにより、Event Hub クライアントを使用してパーティション プロパティの要求を定期的に行うことを考慮すると、通常は有利なトレードオフとなる、追加のネットワーク帯域幅消費が少なくなります。 既定では False に設定されています。
パーティションのチェックポイント データがない場合は、このイベント位置からの受信を開始します。 チェックポイント データが使用可能な場合は使用されます。 これは、キーとしてパーティション ID を持ち、個々のパーティションの値として位置を持つディクテーション、またはすべてのパーティションに対して 1 つの値にすることができます。 値の型は、str、int、または datetime.datetime です。 また、ストリームの先頭から受信するための値 "-1" と、新しいイベントのみを受信するための "@latest" もサポートされています。
指定したstarting_positionが包括的 (=) であるかどうかを判断します (>>)。 包括の場合は True、排他的の場合は False。 これは、キーとしてパーティション ID を持つディクテーション、および特定のパーティションのstarting_positionが包括的かどうかを示す値として bool にすることができます。 これは、すべてのstarting_positionの 1 つのブール値にすることもできます。 既定値は False です。
- on_error
- Callable[[PartitionContext, Exception]]
再試行が使い果たされた後、または負荷分散の処理中に受信中にエラーが発生したときに呼び出されるコールバック関数。 コールバックは、パーティション情報を含む partition_context と例外である エラー の 2 つのパラメーターを受け取ります。 負荷分散 の処理中にエラーが発生した場合、partition_contextは None になる可能性があります。 コールバックは次のように定義する必要があります: on_error(partition_context, error). on_event コールバック中に未処理の例外が発生した場合は、on_error コールバックも呼び出されます。
- on_partition_initialize
- Callable[[PartitionContext]]
特定のパーティションのコンシューマーが初期化を完了した後に呼び出されるコールバック関数。 また、障害が発生し、閉じられた内部パーティション コンシューマーの受信プロセスを引き継ぐ新しい内部パーティション コンシューマーが作成されるときにも呼び出されます。 コールバックは、パーティション情報を含む partition_context という 1 つのパラメーターを受け取ります。 コールバックは次のように定義する必要があります: on_partition_initialize(partition_context)。
- on_partition_close
- Callable[[PartitionContext, CloseReason]]
特定のパーティションのコンシューマーが閉じられた後に呼び出されるコールバック関数。 再試行が使い果たされた後の受信中にエラーが発生した場合にも呼び出されます。 コールバックは、パーティション情報とクローズの理由を含む partition_context という 2 つのパラメーターを受け取ります。 コールバックは次のように定義する必要があります: on_partition_close(partition_context, reason). 閉会の理由はこちらをご覧 CloseReason ください。
の戻り値の型 :
例
EventHub からバッチでイベントを受信します。
logger = logging.getLogger("azure.eventhub")
async def on_event_batch(partition_context, event_batch):
# Put your code here.
# If the operation is i/o intensive, async will have better performance.
logger.info(
"{} events received from partition: {}".format(len(event_batch), partition_context.partition_id)
)
async with consumer:
await consumer.receive_batch(
on_event_batch=on_event_batch,
starting_position="-1", # "-1" is from the beginning of the partition.
)
Azure SDK for Python