你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
ServiceBusClient 类
ServiceBusClient 类定义了用于获取 ServiceBusSender 和 ServiceBusReceiver 的高级接口。
- 继承
-
builtins.objectServiceBusClient
构造函数
ServiceBusClient(fully_qualified_namespace: str, credential: TokenCredential | AzureSasCredential | AzureNamedKeyCredential, *, retry_total: int = 3, retry_backoff_factor: float = 0.8, retry_backoff_max: float = 120, retry_mode: str = 'exponential', **kwargs: Any)
参数
- credential
- TokenCredential 或 AzureSasCredential 或 AzureNamedKeyCredential
用于身份验证的凭据对象,该对象实现用于获取令牌的特定接口。 它接受 azure 标识库生成的凭据对象,以及实现 *get_token (self、 scopes) 方法的对象,或者也可以提供 AzureSasCredential。
- logging_enable
- bool
是否将网络跟踪日志输出到记录器。 默认值为 False。
- transport_type
- TransportType
将用于与服务总线服务通信的传输协议的类型。 默认值为 TransportType.Amqp ,在这种情况下,使用端口 5671。 如果端口 5671 在网络环境中不可用/被阻止,则可改用使用端口 443 进行通信的 TransportType.AmqpOverWebsocket 。
- http_proxy
- Dict
HTTP 代理设置。 这必须是具有以下键的字典: “proxy_hostname” (str 值) 和 “proxy_port” (int 值) 。 此外,还可能存在以下密钥:“username”、“password”。
- user_agent
- str
如果指定,则会将其添加到内置用户代理字符串的前面。
- retry_total
- int
发生错误时重做失败操作的尝试总数。 默认值为 3。
- retry_backoff_factor
- float
两次重试之间以秒为单位的内部增量回退。 默认值为 0.8。
- retry_backoff_max
- float
最大退让间隔(以秒为单位)。 默认值为 120。
- retry_mode
- str
重试尝试之间的延迟行为。 支持的值为“fixed”或“exponential”,默认值为“exponential”。
- custom_endpoint_address
- str
用于与服务总线服务建立连接的自定义终结点地址,允许通过任何应用程序网关或主机环境所需的其他路径路由网络请求。 默认值为 None。 格式类似于“sb://< custom_endpoint_hostname>:<custom_endpoint_port>”。 如果未在 custom_endpoint_address指定端口,则默认使用端口 443。
- connection_verify
- str
SSL 证书的自定义CA_BUNDLE文件的路径,该文件用于对连接终结点的标识进行身份验证。 默认值为 None,在这种情况下将使用 certifi.where () 。
- uamqp_transport
- bool
是否使用 uamqp 库作为基础传输。 默认值为 False,纯 Python AMQP 库将用作基础传输。
示例
创建 ServiceBusClient 的新实例。
import os
from azure.identity import DefaultAzureCredential
from azure.servicebus import ServiceBusClient
fully_qualified_namespace = os.environ['SERVICEBUS_FULLY_QUALIFIED_NAMESPACE']
servicebus_client = ServiceBusClient(
fully_qualified_namespace=fully_qualified_namespace,
credential=DefaultAzureCredential()
)
变量
- fully_qualified_namespace
- str
服务总线命名空间的完全限定主机名。 命名空间格式为: .servicebus.windows.net。
方法
close |
关闭 ServiceBus 客户端。 所有生成的发送方、接收方和基础连接都将关闭。 |
from_connection_string |
从连接字符串创建 ServiceBusClient。 |
get_queue_receiver |
获取特定队列的 ServiceBusReceiver。 |
get_queue_sender |
获取特定队列的 ServiceBusSender。 |
get_subscription_receiver |
获取主题下特定订阅的 ServiceBusReceiver。 |
get_topic_sender |
获取特定主题的 ServiceBusSender。 |
close
关闭 ServiceBus 客户端。 所有生成的发送方、接收方和基础连接都将关闭。
close() -> None
返回
无
from_connection_string
从连接字符串创建 ServiceBusClient。
from_connection_string(conn_str: str, *, retry_total: int = 3, retry_backoff_factor: float = 0.8, retry_backoff_max: float = 120, retry_mode: str = 'exponential', **kwargs: Any) -> ServiceBusClient
参数
- logging_enable
- bool
是否将网络跟踪日志输出到记录器。 默认值为 False。
- transport_type
- TransportType
将用于与服务总线服务通信的传输协议的类型。 默认值为 TransportType.Amqp ,在这种情况下,使用端口 5671。 如果端口 5671 在网络环境中不可用/被阻止,则可改用使用端口 443 进行通信的 TransportType.AmqpOverWebsocket 。
- http_proxy
- Dict
HTTP 代理设置。 这必须是具有以下键的字典: “proxy_hostname” (str 值) 和 “proxy_port” (int 值) 。 此外,还可能存在以下密钥:“username”、“password”。
- user_agent
- str
如果指定,则会将其添加到内置用户代理字符串的前面。
- retry_total
- int
发生错误时重做失败操作的尝试总数。 默认值为 3。
- retry_backoff_factor
- float
两次重试之间以秒为单位的内部增量回退。 默认值为 0.8。
- retry_backoff_max
- float
最大退让间隔(以秒为单位)。 默认值为 120。
- retry_mode
- str
重试尝试之间的延迟行为。 支持的值为“fixed”或“exponential”,默认值为“exponential”。
- custom_endpoint_address
- str
用于与服务总线服务建立连接的自定义终结点地址,允许通过任何应用程序网关或主机环境所需的其他路径路由网络请求。 默认值为 None。 格式类似于“sb://< custom_endpoint_hostname>:<custom_endpoint_port>”。 如果未在custom_endpoint_address中指定端口,则默认使用端口 443。
- connection_verify
- str
SSL 证书的自定义CA_BUNDLE文件的路径,该文件用于对连接终结点的标识进行身份验证。 默认值为 None,在这种情况下将使用 certifi.where () 。
- uamqp_transport
- bool
是否使用 uamqp 库作为基础传输。 默认值为 False,纯 Python AMQP 库将用作基础传输。
返回类型
示例
从连接字符串创建 ServiceBusClient 的新实例。
import os
from azure.servicebus import ServiceBusClient
servicebus_connection_str = os.environ['SERVICEBUS_CONNECTION_STR']
servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str)
get_queue_receiver
获取特定队列的 ServiceBusReceiver。
get_queue_receiver(queue_name: str, *, session_id: str | ~typing.Literal[<ServiceBusSessionFilter.NEXT_AVAILABLE: 0>] | None = None, sub_queue: ~azure.servicebus._common.constants.ServiceBusSubQueue | str | None = None, receive_mode: ~azure.servicebus._common.constants.ServiceBusReceiveMode | str = ServiceBusReceiveMode.PEEK_LOCK, max_wait_time: float | None = None, auto_lock_renewer: ~azure.servicebus._common.auto_lock_renewer.AutoLockRenewer | None = None, prefetch_count: int = 0, **kwargs: ~typing.Any) -> ServiceBusReceiver
参数
- session_id
- str 或 <xref:azure.servicebus.NEXT_AVAILABLE_SESSION>
要从中接收的特定会话。 必须为会话队列指定此参数,否则必须为 None。 若要从下一个可用会话接收消息,请将此项设置为 ~azure.servicebus.NEXT_AVAILABLE_SESSION。
- sub_queue
- str 或 ServiceBusSubQueue 或 None
如果指定,则此接收方将连接到的子队列。 这包括DEAD_LETTER和TRANSFER_DEAD_LETTER队列,包含无法传递给任何接收方的消息或无法处理的消息。 默认值为 None,表示连接到主队列。 可以从 ServiceBusSubQueue 枚举或等效的字符串值“deadletter”和“transferdeadletter”分配值。
- receive_mode
- Union[ServiceBusReceiveMode, str]
用于从实体检索消息的receive_mode。 这两个选项是PEEK_LOCK和RECEIVE_AND_DELETE。 使用 PEEK_LOCK 接收的消息必须在给定的锁定期内解决,然后才能从队列中删除消息。 使用 RECEIVE_AND_DELETE 接收的消息将立即从队列中删除,如果客户端无法处理消息,则以后无法拒绝或重新接收消息。 默认receive_mode为PEEK_LOCK。
接收消息之间的超时(以秒为单位),在此之后接收方将自动停止接收。 默认值为 None,表示无超时。 如果由于写入超时而发生连接错误,则可能需要调整连接超时值。 有关更多详细信息,请参阅 socket_timeout 可选参数。
- auto_lock_renewer
- Optional[AutoLockRenewer]
可以提供 ~azure.servicebus.AutoLockRenewer,以便消息在收到时自动注册。 如果接收方是会话接收器,它将改为应用于会话。
- prefetch_count
- int
每次向服务发出请求时要缓存的最大消息数。 此设置仅用于高级性能优化。 增大此值将提高消息吞吐量性能,但如果处理速度不够快,则会增加消息在缓存时过期的可能性。 默认值为 0,这意味着将从服务接收消息,并一次处理一个消息。 如果prefetch_count为 0,ServiceBusReceiver.receive 将尝试缓存 max_message_count ((如果) 请求中向服务提供)。
- client_identifier
- str
基于字符串的标识符,用于唯一标识接收方实例。 服务总线会将其与一些错误消息相关联,以便更轻松地关联错误。 如果未指定,将生成唯一 ID。
- socket_timeout
- float
在超时之前,连接上的基础套接字在发送和接收数据时应等待的时间(以秒为单位)。对于 TransportType.Amqp,默认值为 0.2,对于 TransportType.AmqpOverWebsocket,默认值为 1。 如果由于写入超时而发生连接错误,则可能需要传入大于默认值的值。
返回类型
示例
从 ServiceBusClient 创建 ServiceBusReceiver 的新实例。
import os
from azure.servicebus import ServiceBusClient
servicebus_connection_str = os.environ['SERVICEBUS_CONNECTION_STR']
queue_name = os.environ['SERVICEBUS_QUEUE_NAME']
servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str)
with servicebus_client:
queue_receiver = servicebus_client.get_queue_receiver(queue_name=queue_name)
get_queue_sender
获取特定队列的 ServiceBusSender。
get_queue_sender(queue_name: str, **kwargs: Any) -> ServiceBusSender
参数
- client_identifier
- str
基于字符串的标识符,用于唯一标识发送方实例。 服务总线会将其与一些错误消息相关联,以便更轻松地关联错误。 如果未指定,将生成唯一 ID。
- socket_timeout
- float
在超时之前,连接上的基础套接字在发送和接收数据时应等待的时间(以秒为单位)。对于 TransportType.Amqp,默认值为 0.2,对于 TransportType.AmqpOverWebsocket,默认值为 1。 如果由于写入超时而发生连接错误,则可能需要传入大于默认值的值。
返回
队列发送方。
返回类型
示例
从 ServiceBusClient 创建 ServiceBusSender 的新实例。
import os
from azure.servicebus import ServiceBusClient
servicebus_connection_str = os.environ['SERVICEBUS_CONNECTION_STR']
queue_name = os.environ['SERVICEBUS_QUEUE_NAME']
servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str)
with servicebus_client:
queue_sender = servicebus_client.get_queue_sender(queue_name=queue_name)
get_subscription_receiver
获取主题下特定订阅的 ServiceBusReceiver。
get_subscription_receiver(topic_name: str, subscription_name: str, *, session_id: str | ~typing.Literal[<ServiceBusSessionFilter.NEXT_AVAILABLE: 0>] | None = None, sub_queue: ~azure.servicebus._common.constants.ServiceBusSubQueue | str | None = None, receive_mode: ~azure.servicebus._common.constants.ServiceBusReceiveMode | str = ServiceBusReceiveMode.PEEK_LOCK, max_wait_time: float | None = None, auto_lock_renewer: ~azure.servicebus._common.auto_lock_renewer.AutoLockRenewer | None = None, prefetch_count: int = 0, **kwargs: ~typing.Any) -> ServiceBusReceiver
参数
- session_id
- str 或 <xref:azure.servicebus.NEXT_AVAILABLE_SESSION>
要从中接收的特定会话。 必须为会话订阅指定此项,否则必须为 None。 若要从下一个可用会话接收消息,请将此项设置为 ~azure.servicebus.NEXT_AVAILABLE_SESSION。
- sub_queue
- str 或 ServiceBusSubQueue 或 None
如果指定,则此接收器将连接到子排队。 这包括DEAD_LETTER和TRANSFER_DEAD_LETTER队列,包含无法传递到任何接收方的消息或无法处理的消息。 默认值为“无”,表示连接到主队列。 可以从 ServiceBusSubQueue 枚举或等效的字符串值“deadletter”和“transferdeadletter”中分配值。
- receive_mode
- Union[ServiceBusReceiveMode, str]
用于从实体检索消息的receive_mode。 这两个选项是PEEK_LOCK和RECEIVE_AND_DELETE。 使用PEEK_LOCK收到的消息必须在给定的锁定期内解决,然后才能从订阅中删除这些消息。 使用 RECEIVE_AND_DELETE 收到的消息将立即从订阅中删除,如果客户端无法处理消息,则无法随后拒绝或重新接收消息。 默认receive_mode为 PEEK_LOCK。
接收的消息之间的超时(以秒为单位),此后接收方将自动停止接收。 默认值为 None,表示无超时。 如果由于写入超时而发生连接错误,则可能需要调整连接超时值。 有关更多详细信息,请参阅 socket_timeout 可选参数。
- auto_lock_renewer
- Optional[AutoLockRenewer]
可以提供 ~azure.servicebus.AutoLockRenewer,以便邮件在收到时自动注册。 如果接收方是会话接收器,它将改为应用于会话。
- prefetch_count
- int
要缓存的最大消息数,其中包含对服务的每个请求。 此设置仅适用于高级性能优化。 增加此值将提高消息吞吐量性能,但增加消息在缓存时过期(如果处理速度不够快)的可能性。 默认值为 0,这意味着将从服务接收消息,并一次处理一个消息。 如果prefetch_count为 0,ServiceBusReceiver.receive 将尝试缓存 max_message_count ((如果向服务的请求) )。
- client_identifier
- str
基于字符串的标识符,用于唯一标识接收方实例。 服务总线会将其与一些错误消息相关联,以便更轻松地关联错误。 如果未指定,将生成唯一 ID。
- socket_timeout
- float
连接上的基础套接字在发送和接收数据时应在超时之前等待的时间(以秒为单位)。对于 TransportType.Amqp,默认值为 0.2,对于 TransportType.AmqpOverWebsocket,默认值为 1。 如果由于写入超时而发生连接错误,则可能需要传入大于默认值的值。
返回类型
示例
从 ServiceBusClient 创建 ServiceBusReceiver 的新实例。
import os
from azure.servicebus import ServiceBusClient
servicebus_connection_str = os.environ['SERVICEBUS_CONNECTION_STR']
topic_name = os.environ["SERVICEBUS_TOPIC_NAME"]
subscription_name = os.environ["SERVICEBUS_SUBSCRIPTION_NAME"]
servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str)
with servicebus_client:
subscription_receiver = servicebus_client.get_subscription_receiver(
topic_name=topic_name,
subscription_name=subscription_name,
)
get_topic_sender
获取特定主题的 ServiceBusSender。
get_topic_sender(topic_name: str, **kwargs: Any) -> ServiceBusSender
参数
- client_identifier
- str
基于字符串的标识符,用于唯一标识发送方实例。 服务总线会将其与一些错误消息相关联,以便更轻松地关联错误。 如果未指定,将生成唯一 ID。
- socket_timeout
- float
连接上的基础套接字在发送和接收数据时应在超时之前等待的时间(以秒为单位)。对于 TransportType.Amqp,默认值为 0.2,对于 TransportType.AmqpOverWebsocket,默认值为 1。 如果由于写入超时而发生连接错误,则可能需要传入大于默认值的值。
返回
主题发送方。
返回类型
示例
从 ServiceBusClient 创建 ServiceBusSender 的新实例。
import os
from azure.servicebus import ServiceBusClient
servicebus_connection_str = os.environ['SERVICEBUS_CONNECTION_STR']
topic_name = os.environ['SERVICEBUS_TOPIC_NAME']
servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str)
with servicebus_client:
topic_sender = servicebus_client.get_topic_sender(topic_name=topic_name)