你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

ServiceBusClient 类

ServiceBusClient 类定义了用于获取 ServiceBusSender 和 ServiceBusReceiver 的高级接口。

继承
builtins.object
ServiceBusClient

构造函数

ServiceBusClient(fully_qualified_namespace: str, credential: TokenCredential | AzureSasCredential | AzureNamedKeyCredential, *, retry_total: int = 3, retry_backoff_factor: float = 0.8, retry_backoff_max: float = 120, retry_mode: str = 'exponential', **kwargs: Any)

参数

fully_qualified_namespace
str
必需

服务总线命名空间的完全限定主机名。 命名空间格式为: .servicebus.windows.net

credential
TokenCredentialAzureSasCredentialAzureNamedKeyCredential
必需

用于身份验证的凭据对象,该对象实现用于获取令牌的特定接口。 它接受 azure 标识库生成的凭据对象,以及实现 *get_token (self、 scopes) 方法的对象,或者也可以提供 AzureSasCredential。

logging_enable
bool

是否将网络跟踪日志输出到记录器。 默认值为 False。

transport_type
TransportType

将用于与服务总线服务通信的传输协议的类型。 默认值为 TransportType.Amqp ,在这种情况下,使用端口 5671。 如果端口 5671 在网络环境中不可用/被阻止,则可改用使用端口 443 进行通信的 TransportType.AmqpOverWebsocket

http_proxy
Dict

HTTP 代理设置。 这必须是具有以下键的字典: “proxy_hostname” (str 值) 和 “proxy_port” (int 值) 。 此外,还可能存在以下密钥:“username”、“password”。

user_agent
str

如果指定,则会将其添加到内置用户代理字符串的前面。

retry_total
int

发生错误时重做失败操作的尝试总数。 默认值为 3。

retry_backoff_factor
float

两次重试之间以秒为单位的内部增量回退。 默认值为 0.8。

retry_backoff_max
float

最大退让间隔(以秒为单位)。 默认值为 120。

retry_mode
str

重试尝试之间的延迟行为。 支持的值为“fixed”或“exponential”,默认值为“exponential”。

custom_endpoint_address
str

用于与服务总线服务建立连接的自定义终结点地址,允许通过任何应用程序网关或主机环境所需的其他路径路由网络请求。 默认值为 None。 格式类似于“sb://< custom_endpoint_hostname>:<custom_endpoint_port>”。 如果未在 custom_endpoint_address指定端口,则默认使用端口 443。

connection_verify
str

SSL 证书的自定义CA_BUNDLE文件的路径,该文件用于对连接终结点的标识进行身份验证。 默认值为 None,在这种情况下将使用 certifi.where ()

uamqp_transport
bool

是否使用 uamqp 库作为基础传输。 默认值为 False,纯 Python AMQP 库将用作基础传输。

示例

创建 ServiceBusClient 的新实例。


   import os
   from azure.identity import DefaultAzureCredential
   from azure.servicebus import ServiceBusClient
   fully_qualified_namespace = os.environ['SERVICEBUS_FULLY_QUALIFIED_NAMESPACE']
   servicebus_client = ServiceBusClient(
       fully_qualified_namespace=fully_qualified_namespace,
       credential=DefaultAzureCredential()
   )

变量

fully_qualified_namespace
str

服务总线命名空间的完全限定主机名。 命名空间格式为: .servicebus.windows.net

方法

close

关闭 ServiceBus 客户端。 所有生成的发送方、接收方和基础连接都将关闭。

from_connection_string

从连接字符串创建 ServiceBusClient。

get_queue_receiver

获取特定队列的 ServiceBusReceiver。

get_queue_sender

获取特定队列的 ServiceBusSender。

get_subscription_receiver

获取主题下特定订阅的 ServiceBusReceiver。

get_topic_sender

获取特定主题的 ServiceBusSender。

close

关闭 ServiceBus 客户端。 所有生成的发送方、接收方和基础连接都将关闭。

close() -> None

返回

from_connection_string

从连接字符串创建 ServiceBusClient。

from_connection_string(conn_str: str, *, retry_total: int = 3, retry_backoff_factor: float = 0.8, retry_backoff_max: float = 120, retry_mode: str = 'exponential', **kwargs: Any) -> ServiceBusClient

参数

conn_str
str
必需

服务总线的连接字符串。

logging_enable
bool

是否将网络跟踪日志输出到记录器。 默认值为 False。

transport_type
TransportType

将用于与服务总线服务通信的传输协议的类型。 默认值为 TransportType.Amqp ,在这种情况下,使用端口 5671。 如果端口 5671 在网络环境中不可用/被阻止,则可改用使用端口 443 进行通信的 TransportType.AmqpOverWebsocket

http_proxy
Dict

HTTP 代理设置。 这必须是具有以下键的字典: “proxy_hostname” (str 值) 和 “proxy_port” (int 值) 。 此外,还可能存在以下密钥:“username”、“password”。

user_agent
str

如果指定,则会将其添加到内置用户代理字符串的前面。

retry_total
int

发生错误时重做失败操作的尝试总数。 默认值为 3。

retry_backoff_factor
float

两次重试之间以秒为单位的内部增量回退。 默认值为 0.8。

retry_backoff_max
float

最大退让间隔(以秒为单位)。 默认值为 120。

retry_mode
str

重试尝试之间的延迟行为。 支持的值为“fixed”或“exponential”,默认值为“exponential”。

custom_endpoint_address
str

用于与服务总线服务建立连接的自定义终结点地址,允许通过任何应用程序网关或主机环境所需的其他路径路由网络请求。 默认值为 None。 格式类似于“sb://< custom_endpoint_hostname>:<custom_endpoint_port>”。 如果未在custom_endpoint_address中指定端口,则默认使用端口 443。

connection_verify
str

SSL 证书的自定义CA_BUNDLE文件的路径,该文件用于对连接终结点的标识进行身份验证。 默认值为 None,在这种情况下将使用 certifi.where ()

uamqp_transport
bool

是否使用 uamqp 库作为基础传输。 默认值为 False,纯 Python AMQP 库将用作基础传输。

返回类型

示例

从连接字符串创建 ServiceBusClient 的新实例。


   import os
   from azure.servicebus import ServiceBusClient
   servicebus_connection_str = os.environ['SERVICEBUS_CONNECTION_STR']
   servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str)

get_queue_receiver

获取特定队列的 ServiceBusReceiver。

get_queue_receiver(queue_name: str, *, session_id: str | ~typing.Literal[<ServiceBusSessionFilter.NEXT_AVAILABLE: 0>] | None = None, sub_queue: ~azure.servicebus._common.constants.ServiceBusSubQueue | str | None = None, receive_mode: ~azure.servicebus._common.constants.ServiceBusReceiveMode | str = ServiceBusReceiveMode.PEEK_LOCK, max_wait_time: float | None = None, auto_lock_renewer: ~azure.servicebus._common.auto_lock_renewer.AutoLockRenewer | None = None, prefetch_count: int = 0, **kwargs: ~typing.Any) -> ServiceBusReceiver

参数

queue_name
str
必需

客户端连接到的特定服务总线队列的路径。

session_id
str 或 <xref:azure.servicebus.NEXT_AVAILABLE_SESSION>

要从中接收的特定会话。 必须为会话队列指定此参数,否则必须为 None。 若要从下一个可用会话接收消息,请将此项设置为 ~azure.servicebus.NEXT_AVAILABLE_SESSION。

sub_queue
strServiceBusSubQueueNone

如果指定,则此接收方将连接到的子队列。 这包括DEAD_LETTER和TRANSFER_DEAD_LETTER队列,包含无法传递给任何接收方的消息或无法处理的消息。 默认值为 None,表示连接到主队列。 可以从 ServiceBusSubQueue 枚举或等效的字符串值“deadletter”和“transferdeadletter”分配值。

receive_mode
Union[ServiceBusReceiveMode, str]

用于从实体检索消息的receive_mode。 这两个选项是PEEK_LOCK和RECEIVE_AND_DELETE。 使用 PEEK_LOCK 接收的消息必须在给定的锁定期内解决,然后才能从队列中删除消息。 使用 RECEIVE_AND_DELETE 接收的消息将立即从队列中删除,如果客户端无法处理消息,则以后无法拒绝或重新接收消息。 默认receive_mode为PEEK_LOCK。

max_wait_time
Optional[float]

接收消息之间的超时(以秒为单位),在此之后接收方将自动停止接收。 默认值为 None,表示无超时。 如果由于写入超时而发生连接错误,则可能需要调整连接超时值。 有关更多详细信息,请参阅 socket_timeout 可选参数。

auto_lock_renewer
Optional[AutoLockRenewer]

可以提供 ~azure.servicebus.AutoLockRenewer,以便消息在收到时自动注册。 如果接收方是会话接收器,它将改为应用于会话。

prefetch_count
int

每次向服务发出请求时要缓存的最大消息数。 此设置仅用于高级性能优化。 增大此值将提高消息吞吐量性能,但如果处理速度不够快,则会增加消息在缓存时过期的可能性。 默认值为 0,这意味着将从服务接收消息,并一次处理一个消息。 如果prefetch_count为 0,ServiceBusReceiver.receive 将尝试缓存 max_message_count ((如果) 请求中向服务提供)。

client_identifier
str

基于字符串的标识符,用于唯一标识接收方实例。 服务总线会将其与一些错误消息相关联,以便更轻松地关联错误。 如果未指定,将生成唯一 ID。

socket_timeout
float

在超时之前,连接上的基础套接字在发送和接收数据时应等待的时间(以秒为单位)。对于 TransportType.Amqp,默认值为 0.2,对于 TransportType.AmqpOverWebsocket,默认值为 1。 如果由于写入超时而发生连接错误,则可能需要传入大于默认值的值。

返回类型

示例

从 ServiceBusClient 创建 ServiceBusReceiver 的新实例。


   import os
   from azure.servicebus import ServiceBusClient
   servicebus_connection_str = os.environ['SERVICEBUS_CONNECTION_STR']
   queue_name = os.environ['SERVICEBUS_QUEUE_NAME']
   servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str)
   with servicebus_client:
       queue_receiver = servicebus_client.get_queue_receiver(queue_name=queue_name)

get_queue_sender

获取特定队列的 ServiceBusSender。

get_queue_sender(queue_name: str, **kwargs: Any) -> ServiceBusSender

参数

queue_name
str
必需

客户端连接到的特定服务总线队列的路径。

client_identifier
str

基于字符串的标识符,用于唯一标识发送方实例。 服务总线会将其与一些错误消息相关联,以便更轻松地关联错误。 如果未指定,将生成唯一 ID。

socket_timeout
float

在超时之前,连接上的基础套接字在发送和接收数据时应等待的时间(以秒为单位)。对于 TransportType.Amqp,默认值为 0.2,对于 TransportType.AmqpOverWebsocket,默认值为 1。 如果由于写入超时而发生连接错误,则可能需要传入大于默认值的值。

返回

队列发送方。

返回类型

示例

从 ServiceBusClient 创建 ServiceBusSender 的新实例。


   import os
   from azure.servicebus import ServiceBusClient
   servicebus_connection_str = os.environ['SERVICEBUS_CONNECTION_STR']
   queue_name = os.environ['SERVICEBUS_QUEUE_NAME']
   servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str)
   with servicebus_client:
       queue_sender = servicebus_client.get_queue_sender(queue_name=queue_name)

get_subscription_receiver

获取主题下特定订阅的 ServiceBusReceiver。

get_subscription_receiver(topic_name: str, subscription_name: str, *, session_id: str | ~typing.Literal[<ServiceBusSessionFilter.NEXT_AVAILABLE: 0>] | None = None, sub_queue: ~azure.servicebus._common.constants.ServiceBusSubQueue | str | None = None, receive_mode: ~azure.servicebus._common.constants.ServiceBusReceiveMode | str = ServiceBusReceiveMode.PEEK_LOCK, max_wait_time: float | None = None, auto_lock_renewer: ~azure.servicebus._common.auto_lock_renewer.AutoLockRenewer | None = None, prefetch_count: int = 0, **kwargs: ~typing.Any) -> ServiceBusReceiver

参数

topic_name
str
必需

客户端连接到的特定服务总线主题的名称。

subscription_name
str
必需

给定服务总线主题下的特定服务总线订阅的名称。

session_id
str 或 <xref:azure.servicebus.NEXT_AVAILABLE_SESSION>

要从中接收的特定会话。 必须为会话订阅指定此项,否则必须为 None。 若要从下一个可用会话接收消息,请将此项设置为 ~azure.servicebus.NEXT_AVAILABLE_SESSION。

sub_queue
strServiceBusSubQueueNone

如果指定,则此接收器将连接到子排队。 这包括DEAD_LETTER和TRANSFER_DEAD_LETTER队列,包含无法传递到任何接收方的消息或无法处理的消息。 默认值为“无”,表示连接到主队列。 可以从 ServiceBusSubQueue 枚举或等效的字符串值“deadletter”和“transferdeadletter”中分配值。

receive_mode
Union[ServiceBusReceiveMode, str]

用于从实体检索消息的receive_mode。 这两个选项是PEEK_LOCK和RECEIVE_AND_DELETE。 使用PEEK_LOCK收到的消息必须在给定的锁定期内解决,然后才能从订阅中删除这些消息。 使用 RECEIVE_AND_DELETE 收到的消息将立即从订阅中删除,如果客户端无法处理消息,则无法随后拒绝或重新接收消息。 默认receive_mode为 PEEK_LOCK。

max_wait_time
Optional[float]

接收的消息之间的超时(以秒为单位),此后接收方将自动停止接收。 默认值为 None,表示无超时。 如果由于写入超时而发生连接错误,则可能需要调整连接超时值。 有关更多详细信息,请参阅 socket_timeout 可选参数。

auto_lock_renewer
Optional[AutoLockRenewer]

可以提供 ~azure.servicebus.AutoLockRenewer,以便邮件在收到时自动注册。 如果接收方是会话接收器,它将改为应用于会话。

prefetch_count
int

要缓存的最大消息数,其中包含对服务的每个请求。 此设置仅适用于高级性能优化。 增加此值将提高消息吞吐量性能,但增加消息在缓存时过期(如果处理速度不够快)的可能性。 默认值为 0,这意味着将从服务接收消息,并一次处理一个消息。 如果prefetch_count为 0,ServiceBusReceiver.receive 将尝试缓存 max_message_count ((如果向服务的请求) )。

client_identifier
str

基于字符串的标识符,用于唯一标识接收方实例。 服务总线会将其与一些错误消息相关联,以便更轻松地关联错误。 如果未指定,将生成唯一 ID。

socket_timeout
float

连接上的基础套接字在发送和接收数据时应在超时之前等待的时间(以秒为单位)。对于 TransportType.Amqp,默认值为 0.2,对于 TransportType.AmqpOverWebsocket,默认值为 1。 如果由于写入超时而发生连接错误,则可能需要传入大于默认值的值。

返回类型

示例

从 ServiceBusClient 创建 ServiceBusReceiver 的新实例。


   import os
   from azure.servicebus import ServiceBusClient
   servicebus_connection_str = os.environ['SERVICEBUS_CONNECTION_STR']
   topic_name = os.environ["SERVICEBUS_TOPIC_NAME"]
   subscription_name = os.environ["SERVICEBUS_SUBSCRIPTION_NAME"]
   servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str)
   with servicebus_client:
       subscription_receiver = servicebus_client.get_subscription_receiver(
           topic_name=topic_name,
           subscription_name=subscription_name,
       )

get_topic_sender

获取特定主题的 ServiceBusSender。

get_topic_sender(topic_name: str, **kwargs: Any) -> ServiceBusSender

参数

topic_name
str
必需

客户端连接到的特定服务总线主题的路径。

client_identifier
str

基于字符串的标识符,用于唯一标识发送方实例。 服务总线会将其与一些错误消息相关联,以便更轻松地关联错误。 如果未指定,将生成唯一 ID。

socket_timeout
float

连接上的基础套接字在发送和接收数据时应在超时之前等待的时间(以秒为单位)。对于 TransportType.Amqp,默认值为 0.2,对于 TransportType.AmqpOverWebsocket,默认值为 1。 如果由于写入超时而发生连接错误,则可能需要传入大于默认值的值。

返回

主题发送方。

返回类型

示例

从 ServiceBusClient 创建 ServiceBusSender 的新实例。


   import os
   from azure.servicebus import ServiceBusClient
   servicebus_connection_str = os.environ['SERVICEBUS_CONNECTION_STR']
   topic_name = os.environ['SERVICEBUS_TOPIC_NAME']
   servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str)
   with servicebus_client:
       topic_sender = servicebus_client.get_topic_sender(topic_name=topic_name)