ServiceBusClient クラス

ServiceBusClient クラスは、ServiceBusSender と ServiceBusReceiver を取得するための高度なインターフェイスを定義します。

継承
builtins.object
ServiceBusClient

コンストラクター

ServiceBusClient(fully_qualified_namespace: str, credential: TokenCredential | AzureSasCredential | AzureNamedKeyCredential, *, retry_total: int = 3, retry_backoff_factor: float = 0.8, retry_backoff_max: float = 120, retry_mode: str = 'exponential', **kwargs: Any)

パラメーター

fully_qualified_namespace
str
必須

Service Bus 名前空間の完全修飾ホスト名。 名前空間の形式は .servicebus.windows.net です。

credential
TokenCredential または AzureSasCredential または AzureNamedKeyCredential
必須

トークンを取得するための特定のインターフェイスを実装する認証に使用される資格情報オブジェクト。 azure-identity ライブラリによって生成された資格情報オブジェクトと、*get_token(self, scopes) メソッドを実装するオブジェクトを受け入れます。または、AzureSasCredential も指定できます。

logging_enable
bool

ネットワーク トレース ログをロガーに出力するかどうか。 既定値は False です。

transport_type
TransportType

Service Bus サービスとの通信に使用されるトランスポート プロトコルの種類。 既定値は TransportType.Amqp です。この場合、ポート 5671 が使用されます。 ネットワーク環境でポート 5671 が使用できないかブロックされている場合は、 代わりに TransportType.AmqpOverWebsocket を使用して、通信にポート 443 を使用できます。

http_proxy
Dict

HTTP プロキシ設定。 これは、次のキーを持つディクショナリである必要があります: 'proxy_hostname' (str 値) と 'proxy_port' (int 値)。 さらに、次のキーが存在する場合もあります: 'username'、'password'

user_agent
str

指定した場合、これは組み込みのユーザー エージェント文字列の前に追加されます。

retry_total
int

エラーが発生した場合に失敗した操作をやり直す試行の合計数。 既定値は 3 です。

retry_backoff_factor
float

再試行の間の秒単位の内部差分バックオフ。 既定値は 0.8 です。

retry_backoff_max
float

2 番目の単位の最大バックオフ間隔。 既定値は 120 です。

retry_mode
str

再試行の間の遅延動作。 サポートされている値は "固定" または "指数" で、既定値は "指数" です。

custom_endpoint_address
str

Service Bus サービスへの接続を確立するために使用するカスタム エンドポイント アドレス。これにより、ホスト環境に必要なアプリケーション ゲートウェイまたはその他のパスを介してネットワーク要求をルーティングできます。 既定値はなしです。 形式は "sb://< custom_endpoint_hostname>:<custom_endpoint_port>" のようになります。 custom_endpoint_addressで port が指定されていない場合、既定ではポート 443 が使用されます。

connection_verify
str

接続エンドポイントの ID を認証するために使用される SSL 証明書のカスタム CA_BUNDLE ファイルへのパス。 既定値は None で、その場合 は certifi.where() が使用されます。

uamqp_transport
bool

基になるトランスポートとして uamqp ライブラリを使用するかどうか。 既定値は False で、Pure Python AMQP ライブラリが基になるトランスポートとして使用されます。

ServiceBusClient の新しいインスタンスを作成します。


   import os
   from azure.identity import DefaultAzureCredential
   from azure.servicebus import ServiceBusClient
   fully_qualified_namespace = os.environ['SERVICEBUS_FULLY_QUALIFIED_NAMESPACE']
   servicebus_client = ServiceBusClient(
       fully_qualified_namespace=fully_qualified_namespace,
       credential=DefaultAzureCredential()
   )

変数

fully_qualified_namespace
str

Service Bus 名前空間の完全修飾ホスト名。 名前空間の形式は .servicebus.windows.net です。

メソッド

close

ServiceBus クライアントを閉じます。 生成されたすべての送信者、受信側、および基になる接続がシャットダウンされます。

from_connection_string

接続文字列から ServiceBusClient を作成します。

get_queue_receiver

特定のキューの ServiceBusReceiver を取得します。

get_queue_sender

特定のキューの ServiceBusSender を取得します。

get_subscription_receiver

トピックの特定のサブスクリプションの ServiceBusReceiver を取得します。

get_topic_sender

特定のトピックの ServiceBusSender を取得します。

close

ServiceBus クライアントを閉じます。 生成されたすべての送信者、受信側、および基になる接続がシャットダウンされます。

close() -> None

戻り値

なし

from_connection_string

接続文字列から ServiceBusClient を作成します。

from_connection_string(conn_str: str, *, retry_total: int = 3, retry_backoff_factor: float = 0.8, retry_backoff_max: float = 120, retry_mode: str = 'exponential', **kwargs: Any) -> ServiceBusClient

パラメーター

conn_str
str
必須

Service Bus の接続文字列。

logging_enable
bool

ネットワーク トレース ログをロガーに出力するかどうか。 既定値は False です。

transport_type
TransportType

Service Bus サービスとの通信に使用されるトランスポート プロトコルの種類。 既定値は TransportType.Amqp です。この場合、ポート 5671 が使用されます。 ネットワーク環境でポート 5671 が使用できないかブロックされている場合は、 代わりに TransportType.AmqpOverWebsocket を使用して、通信にポート 443 を使用できます。

http_proxy
Dict

HTTP プロキシ設定。 これは、次のキーを持つディクショナリである必要があります: 'proxy_hostname' (str 値) と 'proxy_port' (int 値)。 さらに、次のキーが存在する場合もあります: 'username'、'password'

user_agent
str

指定した場合、これは組み込みのユーザー エージェント文字列の前に追加されます。

retry_total
int

エラーが発生した場合に失敗した操作をやり直す試行の合計数。 既定値は 3 です。

retry_backoff_factor
float

再試行の間の秒単位の内部差分バックオフ。 既定値は 0.8 です。

retry_backoff_max
float

2 番目の単位の最大バックオフ間隔。 既定値は 120 です。

retry_mode
str

再試行の間の遅延動作。 サポートされている値は 'fixed' または 'exponential' で、既定値は 'exponential' です。

custom_endpoint_address
str

Service Bus サービスへの接続を確立するために使用するカスタム エンドポイント アドレス。これにより、ホスト環境に必要なアプリケーション ゲートウェイまたはその他のパスを介してネットワーク要求をルーティングできます。 既定値はなしです。 形式は "sb://< custom_endpoint_hostname>:<custom_endpoint_port>" のようになります。 custom_endpoint_addressで port が指定されていない場合、既定ではポート 443 が使用されます。

connection_verify
str

接続エンドポイントの ID を認証するために使用される SSL 証明書のカスタム CA_BUNDLE ファイルへのパス。 既定値は None で、その場合 は certifi.where() が使用されます。

uamqp_transport
bool

基になるトランスポートとして uamqp ライブラリを使用するかどうか。 既定値は False で、Pure Python AMQP ライブラリが基になるトランスポートとして使用されます。

の戻り値の型 :

接続文字列から ServiceBusClient の新しいインスタンスを作成します。


   import os
   from azure.servicebus import ServiceBusClient
   servicebus_connection_str = os.environ['SERVICEBUS_CONNECTION_STR']
   servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str)

get_queue_receiver

特定のキューの ServiceBusReceiver を取得します。

get_queue_receiver(queue_name: str, *, session_id: str | ~typing.Literal[<ServiceBusSessionFilter.NEXT_AVAILABLE: 0>] | None = None, sub_queue: ~azure.servicebus._common.constants.ServiceBusSubQueue | str | None = None, receive_mode: ~azure.servicebus._common.constants.ServiceBusReceiveMode | str = ServiceBusReceiveMode.PEEK_LOCK, max_wait_time: float | None = None, auto_lock_renewer: ~azure.servicebus._common.auto_lock_renewer.AutoLockRenewer | None = None, prefetch_count: int = 0, **kwargs: ~typing.Any) -> ServiceBusReceiver

パラメーター

queue_name
str
必須

クライアントが接続する特定の Service Bus キューのパス。

session_id
str または <xref:azure.servicebus.NEXT_AVAILABLE_SESSION>

受信元の特定のセッション。 これはセッションフル キューに対して指定する必要があります。それ以外の場合は None にする必要があります。 次に使用可能なセッションからメッセージを受信するには、これを ~azure.servicebus.NEXT_AVAILABLE_SESSION に設定します。

sub_queue
str または ServiceBusSubQueue または None

指定した場合、このレシーバーが接続するサブキュー。 これには、DEAD_LETTERキューとTRANSFER_DEAD_LETTERキューが含まれます。処理できない受信者またはメッセージに配信できないメッセージを保持します。 既定値は None です。つまり、プライマリ キューへの接続を意味します。 ServiceBusSubQueue 列挙型または同等の文字列値 "deadletter" と "transferdeadletter" から値を割り当てることができます。

receive_mode
Union[ServiceBusReceiveMode, str]

エンティティからメッセージを取得するreceive_mode。 PEEK_LOCKとRECEIVE_AND_DELETEの 2 つのオプションがあります。 PEEK_LOCKで受信したメッセージは、キューから削除される前に、特定のロック期間内に解決する必要があります。 RECEIVE_AND_DELETEで受信したメッセージはキューから直ちに削除され、クライアントがメッセージの処理に失敗した場合は、その後拒否または再受信することはできません。 既定のreceive_modeはPEEK_LOCKです。

max_wait_time
Optional[float]

受信したメッセージの間のタイムアウト (秒単位)。その後、受信側は自動的に受信を停止します。 既定値は None で、タイムアウトがないことを意味します。 書き込みタイムアウトが原因で接続エラーが発生している場合は、接続タイムアウト値の調整が必要になる場合があります。 詳細については、 省略可能なパラメーター socket_timeout 参照してください。

auto_lock_renewer
Optional[AutoLockRenewer]

メッセージが受信時に自動的に登録されるように、~azure.servicebus.AutoLockRenewer を指定できます。 受信側がセッション レシーバーの場合は、代わりにセッションに適用されます。

prefetch_count
int

サービスへの各要求でキャッシュするメッセージの最大数。 この設定は、高度なパフォーマンス チューニングのみを目的としています。 この値を大きくすると、メッセージスループットのパフォーマンスが向上しますが、十分に高速に処理されていない場合は、メッセージがキャッシュされている間にメッセージが期限切れになる可能性が高くなります。 既定値は 0 です。つまり、メッセージはサービスから受信され、一度に 1 つずつ処理されます。 prefetch_countが 0 の場合、 ServiceBusReceiver.receive はサービスへの要求内 でmax_message_count (指定されている場合) をキャッシュしようとします。

client_identifier
str

受信側インスタンスを一意に識別する文字列ベースの識別子。 Service Bus は、エラーの関連付けを容易にするために、一部のエラー メッセージに関連付けます。 指定しない場合は、一意の ID が生成されます。

socket_timeout
float

データの送受信時に接続の基になるソケットがタイムアウトするまでに待機する時間 (秒単位)。既定値は、TransportType.Amqp の場合は 0.2、TransportType.AmqpOverWebsocket の場合は 1 です。 書き込みタイムアウトが原因で接続エラーが発生している場合は、既定値より大きい値を渡す必要があります。

の戻り値の型 :

ServiceBusClient から ServiceBusReceiver の新しいインスタンスを作成します。


   import os
   from azure.servicebus import ServiceBusClient
   servicebus_connection_str = os.environ['SERVICEBUS_CONNECTION_STR']
   queue_name = os.environ['SERVICEBUS_QUEUE_NAME']
   servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str)
   with servicebus_client:
       queue_receiver = servicebus_client.get_queue_receiver(queue_name=queue_name)

get_queue_sender

特定のキューの ServiceBusSender を取得します。

get_queue_sender(queue_name: str, **kwargs: Any) -> ServiceBusSender

パラメーター

queue_name
str
必須

クライアントが接続する特定の Service Bus キューのパス。

client_identifier
str

送信者インスタンスを一意に識別する文字列ベースの識別子。 Service Bus は、エラーの関連付けを容易にするために、一部のエラー メッセージに関連付けます。 指定しない場合は、一意の ID が生成されます。

socket_timeout
float

データの送受信時に接続の基になるソケットがタイムアウトするまでに待機する時間 (秒単位)。既定値は、TransportType.Amqp の場合は 0.2、TransportType.AmqpOverWebsocket の場合は 1 です。 書き込みタイムアウトが原因で接続エラーが発生している場合は、既定値より大きい値を渡す必要があります。

戻り値

キュー送信者。

の戻り値の型 :

ServiceBusClient から ServiceBusSender の新しいインスタンスを作成します。


   import os
   from azure.servicebus import ServiceBusClient
   servicebus_connection_str = os.environ['SERVICEBUS_CONNECTION_STR']
   queue_name = os.environ['SERVICEBUS_QUEUE_NAME']
   servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str)
   with servicebus_client:
       queue_sender = servicebus_client.get_queue_sender(queue_name=queue_name)

get_subscription_receiver

トピックの特定のサブスクリプションの ServiceBusReceiver を取得します。

get_subscription_receiver(topic_name: str, subscription_name: str, *, session_id: str | ~typing.Literal[<ServiceBusSessionFilter.NEXT_AVAILABLE: 0>] | None = None, sub_queue: ~azure.servicebus._common.constants.ServiceBusSubQueue | str | None = None, receive_mode: ~azure.servicebus._common.constants.ServiceBusReceiveMode | str = ServiceBusReceiveMode.PEEK_LOCK, max_wait_time: float | None = None, auto_lock_renewer: ~azure.servicebus._common.auto_lock_renewer.AutoLockRenewer | None = None, prefetch_count: int = 0, **kwargs: ~typing.Any) -> ServiceBusReceiver

パラメーター

topic_name
str
必須

クライアントが接続する特定の Service Bus トピックの名前。

subscription_name
str
必須

指定された Service Bus トピックの下にある特定の Service Bus サブスクリプションの名前。

session_id
str または <xref:azure.servicebus.NEXT_AVAILABLE_SESSION>

受信元の特定のセッション。 これはセッションフル サブスクリプションに対して指定する必要があります。それ以外の場合は None にする必要があります。 次に使用可能なセッションからメッセージを受信するには、これを ~azure.servicebus.NEXT_AVAILABLE_SESSION に設定します。

sub_queue
str または ServiceBusSubQueue または None

指定した場合、このレシーバーが接続するサブキュー。 これには、DEAD_LETTERキューとTRANSFER_DEAD_LETTERキューが含まれます。処理できない受信者またはメッセージに配信できないメッセージを保持します。 既定値は None です。つまり、プライマリ キューへの接続を意味します。 ServiceBusSubQueue 列挙型または同等の文字列値 "deadletter" と "transferdeadletter" から値を割り当てることができます。

receive_mode
Union[ServiceBusReceiveMode, str]

エンティティからメッセージを取得するreceive_mode。 PEEK_LOCKとRECEIVE_AND_DELETEの 2 つのオプションがあります。 PEEK_LOCKで受信したメッセージは、サブスクリプションから削除される前に、特定のロック期間内に解決する必要があります。 RECEIVE_AND_DELETEで受信したメッセージはサブスクリプションから直ちに削除され、クライアントがメッセージの処理に失敗した場合は、その後拒否または再受信することはできません。 既定のreceive_modeはPEEK_LOCKです。

max_wait_time
Optional[float]

受信したメッセージの間のタイムアウト (秒単位)。その後、受信側は自動的に受信を停止します。 既定値は None で、タイムアウトがないことを意味します。 書き込みタイムアウトが原因で接続エラーが発生している場合は、接続タイムアウト値の調整が必要になる場合があります。 詳細については、 省略可能なパラメーター socket_timeout 参照してください。

auto_lock_renewer
Optional[AutoLockRenewer]

メッセージが受信時に自動的に登録されるように、~azure.servicebus.AutoLockRenewer を指定できます。 受信側がセッション レシーバーの場合は、代わりにセッションに適用されます。

prefetch_count
int

サービスへの各要求でキャッシュするメッセージの最大数。 この設定は、高度なパフォーマンス チューニングのみを目的としています。 この値を大きくすると、メッセージスループットのパフォーマンスが向上しますが、十分に高速に処理されていない場合は、メッセージがキャッシュされている間にメッセージが期限切れになる可能性が高くなります。 既定値は 0 です。つまり、メッセージはサービスから受信され、一度に 1 つずつ処理されます。 prefetch_countが 0 の場合、 ServiceBusReceiver.receive はサービスへの要求内 でmax_message_count (指定されている場合) をキャッシュしようとします。

client_identifier
str

受信側インスタンスを一意に識別する文字列ベースの識別子。 Service Bus は、エラーの関連付けを容易にするために、一部のエラー メッセージに関連付けます。 指定しない場合は、一意の ID が生成されます。

socket_timeout
float

データの送受信時に接続の基になるソケットがタイムアウトするまでに待機する時間 (秒単位)。既定値は、TransportType.Amqp の場合は 0.2、TransportType.AmqpOverWebsocket の場合は 1 です。 書き込みタイムアウトが原因で接続エラーが発生している場合は、既定値より大きい値を渡す必要があります。

の戻り値の型 :

ServiceBusClient から ServiceBusReceiver の新しいインスタンスを作成します。


   import os
   from azure.servicebus import ServiceBusClient
   servicebus_connection_str = os.environ['SERVICEBUS_CONNECTION_STR']
   topic_name = os.environ["SERVICEBUS_TOPIC_NAME"]
   subscription_name = os.environ["SERVICEBUS_SUBSCRIPTION_NAME"]
   servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str)
   with servicebus_client:
       subscription_receiver = servicebus_client.get_subscription_receiver(
           topic_name=topic_name,
           subscription_name=subscription_name,
       )

get_topic_sender

特定のトピックの ServiceBusSender を取得します。

get_topic_sender(topic_name: str, **kwargs: Any) -> ServiceBusSender

パラメーター

topic_name
str
必須

クライアントが接続する特定の Service Bus トピックのパス。

client_identifier
str

送信者インスタンスを一意に識別する文字列ベースの識別子。 Service Bus は、エラーの関連付けを容易にするために、一部のエラー メッセージに関連付けます。 指定しない場合は、一意の ID が生成されます。

socket_timeout
float

データの送受信時に接続の基になるソケットがタイムアウトするまでに待機する時間 (秒単位)。既定値は、TransportType.Amqp の場合は 0.2、TransportType.AmqpOverWebsocket の場合は 1 です。 書き込みタイムアウトが原因で接続エラーが発生している場合は、既定値より大きい値を渡す必要があります。

戻り値

トピックの送信者。

の戻り値の型 :

ServiceBusClient から ServiceBusSender の新しいインスタンスを作成します。


   import os
   from azure.servicebus import ServiceBusClient
   servicebus_connection_str = os.environ['SERVICEBUS_CONNECTION_STR']
   topic_name = os.environ['SERVICEBUS_TOPIC_NAME']
   servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str)
   with servicebus_client:
       topic_sender = servicebus_client.get_topic_sender(topic_name=topic_name)