ServiceBusReceiver クラス

ServiceBusReceiver クラスは、Azure Service Bus キューまたはトピック サブスクリプションからメッセージを受信するための高レベルのインターフェイスを定義します。

メッセージ受信用の 2 つのプライマリ チャネルは、メッセージに対して 1 つの要求を行う receive() と、 受信側のメッセージ の場合は、継続的な方法で受信メッセージを継続的に受信する receive() です。

~azure.servicebus.ServiceBusClient のメソッドを使用 get_<queue/subscription>_receiver して、ServiceBusReceiver インスタンスを作成してください。

継承
azure.servicebus._base_handler.BaseHandler
ServiceBusReceiver
azure.servicebus._common.receiver_mixins.ReceiverMixin
ServiceBusReceiver

コンストラクター

ServiceBusReceiver(fully_qualified_namespace: str, credential: TokenCredential | AzureSasCredential | AzureNamedKeyCredential, *, queue_name: str | None = None, topic_name: str | None = None, subscription_name: str | None = None, receive_mode: ServiceBusReceiveMode | str = ServiceBusReceiveMode.PEEK_LOCK, max_wait_time: float | None = None, auto_lock_renewer: AutoLockRenewer | None = None, prefetch_count: int = 0, **kwargs: Any)

パラメーター

fully_qualified_namespace
str
必須

Service Bus 名前空間の完全修飾ホスト名。 名前空間の形式は .servicebus.windows.net です。

credential
TokenCredential または AzureSasCredential または AzureNamedKeyCredential
必須

トークンを取得するための特定のインターフェイスを実装する認証に使用される資格情報オブジェクト。 azure-identity ライブラリによって生成された資格情報オブジェクトと、*get_token(self, scopes) メソッドを実装するオブジェクトを受け入れます。または、AzureSasCredential も指定できます。

queue_name
str

クライアントが接続する特定の Service Bus キューのパス。

topic_name
str

クライアントが接続するサブスクリプションを含む特定の Service Bus トピックのパス。

subscription_name
str

クライアントが接続する指定されたトピックの下にある特定の Service Bus サブスクリプションのパス。

max_wait_time
Optional[float]

受信したメッセージの間のタイムアウト (秒単位)。その後、受信側は自動的に受信を停止します。 既定値は None で、タイムアウトがないことを意味します。

receive_mode
Union[ServiceBusReceiveMode, str]

エンティティからメッセージを取得するモード。 PEEK_LOCKとRECEIVE_AND_DELETEの 2 つのオプションがあります。 PEEK_LOCKで受信したメッセージは、キューから削除される前に、特定のロック期間内に解決する必要があります。 RECEIVE_AND_DELETEで受信したメッセージはすぐにキューから削除され、クライアントがメッセージの処理に失敗した場合は、その後破棄したり、再受信したりすることはできません。 既定のモードはPEEK_LOCK。

logging_enable
bool

ネットワーク トレース ログをロガーに出力するかどうか。 既定値は False です。

transport_type
TransportType

Service Bus サービスとの通信に使用されるトランスポート プロトコルの種類。 既定値は TransportType.Amqp です

http_proxy
Dict

HTTP プロキシ設定。 これは、次のキーを持つディクショナリである必要があります: 'proxy_hostname' (str 値) と 'proxy_port' (int 値)。 さらに、次のキーが存在する場合もあります: 'username'、'password'

user_agent
str

指定した場合、これは組み込みのユーザー エージェント文字列の前に追加されます。

auto_lock_renewer
Optional[AutoLockRenewer]

メッセージが受信時に自動的に登録されるように、~azure.servicebus.AutoLockRenewer を指定できます。 受信側がセッション レシーバーの場合は、代わりにセッションに適用されます。

prefetch_count
int

サービスへの各要求でキャッシュするメッセージの最大数。 この設定は、高度なパフォーマンス チューニングのみを目的としています。 この値を大きくすると、メッセージスループットのパフォーマンスが向上しますが、十分に高速に処理されていない場合は、メッセージがキャッシュされている間にメッセージが期限切れになる可能性が高くなります。 既定値は 0 です。つまり、メッセージはサービスから受信され、一度に 1 つずつ処理されます。 prefetch_countが 0 の場合、 ServiceBusReceiver.receive はサービスへの要求内 でmax_message_count (指定されている場合) をキャッシュしようとします。

client_identifier
str

クライアント インスタンスを一意に識別するための文字列ベースの識別子。 Service Bus は、エラーの関連付けを容易にするために、一部のエラー メッセージに関連付けます。 指定しない場合は、一意の ID が生成されます。

socket_timeout
float

データの送受信時に接続の基になるソケットがタイムアウトするまでに待機する時間 (秒単位)。既定値は、TransportType.Amqp の場合は 0.2、TransportType.AmqpOverWebsocket の場合は 1 です。 書き込みタイムアウトが原因で接続エラーが発生している場合は、既定値より大きい値を渡す必要があります。

変数

fully_qualified_namespace
str

Service Bus 名前空間の完全修飾ホスト名。 名前空間の形式は .servicebus.windows.net です。

entity_path
str

クライアントが接続するエンティティのパス。

メソッド

abandon_message

メッセージを破棄します。

このメッセージはキューに返され、再び受信できるようになります。

close
complete_message

メッセージを完了します。

これにより、キューからメッセージが削除されます。

dead_letter_message

メッセージを配信不能キューに移動します。

配信不能キューは、正しく処理できなかったメッセージを格納したり、それ以外の場合はさらなる検査や処理を必要としたりするために使用できるサブキューです。 キューは、期限切れのメッセージを配信不能キューに送信するように構成することもできます。

defer_message

メッセージを延期します。

このメッセージはキューに残りますが、受信するにはシーケンス番号で要求する必要があります。

next
peek_messages

キューで現在保留中のメッセージを参照します。

ピークされたメッセージはキューから削除されず、ロックもされません。 完了、遅延、または配信不能にすることはできません。

receive_deferred_messages

以前に延期されたメッセージを受信します。

パーティション分割されたエンティティから遅延メッセージを受信する場合、指定されたすべてのシーケンス番号は同じパーティションからのメッセージである必要があります。

receive_messages

一度にメッセージのバッチを受信します。

この方法は、複数のメッセージを同時に処理する場合や、アドホック受信を 1 回の呼び出しとして実行する場合に最適です。

1 つのバッチで取得されたメッセージの数は、受信側に prefetch_count が設定されているかどうかによって異なります。 受信側にprefetch_countが設定されていない場合、受信側はサービスへの要求内max_message_count (指定されている場合) メッセージをキャッシュしようとします。

この呼び出しでは、指定したバッチ サイズを満たすよりも迅速に返す優先順位が付きます。そのため、指定されたバッチ サイズに関係なく、少なくとも 1 つのメッセージが受信され、受信メッセージにギャップが発生するとすぐにが返されます。

renew_message_lock

メッセージ ロックを更新します。

これにより、メッセージが再処理されるキューに返されないように、メッセージのロックが維持されます。

メッセージを完了 (またはその他の方法で解決) するには、ロックを維持する必要があり、既に期限切れになっていることはできません。期限切れのロックを更新できません。

RECEIVE_AND_DELETE モードで受信したメッセージはロックされないため、更新できません。 この操作は、セッションフルでないメッセージに対してのみ使用できます。

abandon_message

メッセージを破棄します。

このメッセージはキューに返され、再び受信できるようになります。

abandon_message(message: ServiceBusReceivedMessage) -> None

パラメーター

message
ServiceBusReceivedMessage
必須

破棄するメッセージを受信しました。

の戻り値の型 :

例外

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

受信したメッセージを破棄します。


       messages = servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           servicebus_receiver.abandon_message(message)

close

close() -> None

例外

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

complete_message

メッセージを完了します。

これにより、キューからメッセージが削除されます。

complete_message(message: ServiceBusReceivedMessage) -> None

パラメーター

message
ServiceBusReceivedMessage
必須

完了する受信メッセージ。

の戻り値の型 :

例外

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

受信したメッセージを完了します。


       messages = servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           servicebus_receiver.complete_message(message)

dead_letter_message

メッセージを配信不能キューに移動します。

配信不能キューは、正しく処理できなかったメッセージを格納したり、それ以外の場合はさらなる検査や処理を必要としたりするために使用できるサブキューです。 キューは、期限切れのメッセージを配信不能キューに送信するように構成することもできます。

dead_letter_message(message: ServiceBusReceivedMessage, reason: str | None = None, error_description: str | None = None) -> None

パラメーター

message
ServiceBusReceivedMessage
必須

配信不能のメッセージを受信しました。

reason
Optional[str]
既定値: None

メッセージを配信不能にする理由。

error_description
Optional[str]
既定値: None

メッセージの配信不能に関する詳細なエラーの説明。

の戻り値の型 :

例外

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

受信したメッセージの配信不能。


       messages = servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           servicebus_receiver.dead_letter_message(message)

defer_message

メッセージを延期します。

このメッセージはキューに残りますが、受信するにはシーケンス番号で要求する必要があります。

defer_message(message: ServiceBusReceivedMessage) -> None

パラメーター

message
ServiceBusReceivedMessage
必須

遅延する受信メッセージ。

の戻り値の型 :

例外

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

受信したメッセージを保留します。


       messages = servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           servicebus_receiver.defer_message(message)

next

next()

例外

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

peek_messages

キューで現在保留中のメッセージを参照します。

ピークされたメッセージはキューから削除されず、ロックもされません。 完了、遅延、または配信不能にすることはできません。

peek_messages(max_message_count: int = 1, *, sequence_number: int = 0, timeout: float | None = None, **kwargs: Any) -> List[ServiceBusReceivedMessage]

パラメーター

max_message_count
int
既定値: 1

試行してピークするメッセージの最大数。 既定値は 1 です。

sequence_number
int

メッセージの参照を開始するメッセージ シーケンス番号。

timeout
Optional[float]

すべての再試行を含む合計操作タイムアウト (秒単位)。 指定する場合、値は 0 より大きくする必要があります。 既定値は None で、タイムアウトがないことを意味します。

戻り値

~azure.servicebus.ServiceBusReceivedMessage の一覧。

の戻り値の型 :

例外

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

キュー内の保留中のメッセージを確認します。


   with servicebus_receiver:
       messages = servicebus_receiver.peek_messages()
       for message in messages:
           print(str(message))

receive_deferred_messages

以前に延期されたメッセージを受信します。

パーティション分割されたエンティティから遅延メッセージを受信する場合、指定されたすべてのシーケンス番号は同じパーティションからのメッセージである必要があります。

receive_deferred_messages(sequence_numbers: int | List[int], *, timeout: float | None = None, **kwargs: Any) -> List[ServiceBusReceivedMessage]

パラメーター

sequence_numbers
Union[int,List[int]]
必須

遅延されたメッセージのシーケンス番号の一覧。

timeout
Optional[float]

すべての再試行を含む合計操作タイムアウト (秒単位)。 指定する場合、値は 0 より大きくする必要があります。 既定値は None で、タイムアウトがないことを意味します。

戻り値

要求された ~azure.servicebus.ServiceBusReceivedMessage インスタンスの一覧。

の戻り値の型 :

例外

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

ServiceBus から遅延メッセージを受信します。


   with servicebus_receiver:
       deferred_sequenced_numbers = []
       messages = servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           deferred_sequenced_numbers.append(message.sequence_number)
           print(str(message))
           servicebus_receiver.defer_message(message)

       received_deferred_msg = servicebus_receiver.receive_deferred_messages(
           sequence_numbers=deferred_sequenced_numbers
       )

       for msg in received_deferred_msg:
           servicebus_receiver.complete_message(msg)

receive_messages

一度にメッセージのバッチを受信します。

この方法は、複数のメッセージを同時に処理する場合や、アドホック受信を 1 回の呼び出しとして実行する場合に最適です。

1 つのバッチで取得されたメッセージの数は、受信側に prefetch_count が設定されているかどうかによって異なります。 受信側にprefetch_countが設定されていない場合、受信側はサービスへの要求内max_message_count (指定されている場合) メッセージをキャッシュしようとします。

この呼び出しでは、指定したバッチ サイズを満たすよりも迅速に返す優先順位が付きます。そのため、指定されたバッチ サイズに関係なく、少なくとも 1 つのメッセージが受信され、受信メッセージにギャップが発生するとすぐにが返されます。

receive_messages(max_message_count: int | None = 1, max_wait_time: float | None = None) -> List[ServiceBusReceivedMessage]

パラメーター

max_message_count
Optional[int]
既定値: 1

バッチ内のメッセージの最大数。 返される実際の数は、prefetch_countと受信ストリームレートによって異なります。 [なし] に設定すると、プリフェッチ構成に完全に依存します。既定値は 1 です。

max_wait_time
Optional[float]
既定値: None

最初のメッセージが到着するまでの最大待機時間 (秒単位)。 メッセージが到着せず、タイムアウトが指定されていない場合、この呼び出しは接続が閉じられるまで返されません。 指定した場合、タイムアウト期間内にメッセージが到着しない場合は、空のリストが返されます。

戻り値

受信したメッセージの一覧。 使用できるメッセージがない場合は、空のリストになります。

の戻り値の型 :

例外

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

ServiceBus からメッセージを受信します。


   with servicebus_receiver:
       messages = servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           print(str(message))
           servicebus_receiver.complete_message(message)

renew_message_lock

メッセージ ロックを更新します。

これにより、メッセージが再処理されるキューに返されないように、メッセージのロックが維持されます。

メッセージを完了 (またはその他の方法で解決) するには、ロックを維持する必要があり、既に期限切れになっていることはできません。期限切れのロックを更新できません。

RECEIVE_AND_DELETE モードで受信したメッセージはロックされないため、更新できません。 この操作は、セッションフルでないメッセージに対してのみ使用できます。

renew_message_lock(message: ServiceBusReceivedMessage, *, timeout: float | None = None, **kwargs: Any) -> datetime

パラメーター

message
ServiceBusReceivedMessage
必須

ロックを更新するメッセージ。

timeout
Optional[float]

すべての再試行を含む合計操作タイムアウト (秒単位)。 指定する場合、値は 0 より大きくする必要があります。 既定値は None で、タイムアウトがないことを意味します。

戻り値

ロックの有効期限が切れる utc datetime。

の戻り値の型 :

例外

TypeError if the message is sessionful.
azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.MessageLockLostError if message lock has already expired.

受信したメッセージのロックを更新します。


       messages = servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           servicebus_receiver.renew_message_lock(message)

属性

client_identifier

受信側インスタンスに関連付けられている ServiceBusReceiver client_identifierを取得します。

の戻り値の型 :

str

session

受信側にリンクされた ServiceBusSession オブジェクトを取得します。 セッションはセッションが有効なエンティティでのみ使用でき、セッションフルではないレシーバーで呼び出された場合は None を返します。

の戻り値の型 :

レシーバーからセッションを取得する


       with servicebus_client.get_queue_receiver(queue_name=queue_name, session_id=session_id) as receiver:
           session = receiver.session