Поделиться через


ServiceBusReceiver Класс

Класс ServiceBusReceiver определяет высокоуровневый интерфейс для получения сообщений из очереди Служебная шина Azure или подписки раздела.

Двумя основными каналами для получения сообщений являются receive() для выполнения единого запроса сообщений и асинхронный канал для сообщения в получателе: для непрерывного получения входящих сообщений.

Чтобы создать экземпляр ServiceBusReceiver, get_<queue/subscription>_receiver используйте метод ~azure.servicebus.aio.ServiceBusClient.

Наследование
ServiceBusReceiver
azure.servicebus.aio._base_handler_async.BaseHandler
ServiceBusReceiver
azure.servicebus._common.receiver_mixins.ReceiverMixin
ServiceBusReceiver

Конструктор

ServiceBusReceiver(fully_qualified_namespace: str, credential: AsyncTokenCredential | AzureSasCredential | AzureNamedKeyCredential, *, queue_name: str | None = None, topic_name: str | None = None, subscription_name: str | None = None, receive_mode: ServiceBusReceiveMode | str = ServiceBusReceiveMode.PEEK_LOCK, max_wait_time: float | None = None, auto_lock_renewer: AutoLockRenewer | None = None, prefetch_count: int = 0, **kwargs: Any)

Параметры

fully_qualified_namespace
str
Обязательно

Полное имя узла для пространства имен служебной шины. Формат пространства имен: .servicebus.windows.net.

credential
AsyncTokenCredential или AzureSasCredential или AzureNamedKeyCredential
Обязательно

Объект учетных данных, используемый для проверки подлинности, который реализует определенный интерфейс для получения маркеров. Он принимает объекты учетных данных, созданные библиотекой azure-identity, и объекты, реализующие метод *get_token(self, scopes) или также можно предоставить AzureSasCredential.

queue_name
str

Путь к определенной очереди служебной шины, к которому подключается клиент.

topic_name
str

Путь к определенному разделу служебной шины, который содержит подписку, к которой подключается клиент.

subscription_name
str

Путь к определенной подписке служебной шины в указанном разделе, к которому подключается клиент.

receive_mode
Union[ServiceBusReceiveMode, str]

Режим, в котором сообщения будут извлекаться из сущности. Два варианта: PEEK_LOCK и RECEIVE_AND_DELETE. Сообщения, полученные с PEEK_LOCK, должны быть урегулированы в течение заданного периода блокировки, прежде чем они будут удалены из очереди. Сообщения, полученные с помощью RECEIVE_AND_DELETE, будут немедленно удалены из очереди и не могут быть впоследствии отменены или получены повторно, если клиенту не удается обработать сообщение. Режим по умолчанию — PEEK_LOCK.

max_wait_time
Optional[float]

Время ожидания в секундах между полученными сообщениями, после которого получатель автоматически прекращает получение. Значение по умолчанию — None, что означает отсутствие времени ожидания.

logging_enable
bool

Следует ли выводить журналы трассировки сети в средство ведения журнала. Значение по умолчанию — False.

transport_type
TransportType

Тип транспортного протокола, который будет использоваться для взаимодействия со службой служебной шины. Значение по умолчанию — TransportType.Amqp.

http_proxy
Dict

Параметры прокси-сервера HTTP. Это должен быть словарь со следующими ключами: "proxy_hostname" (значение str) и "proxy_port" (значение int). Кроме того, могут присутствовать следующие ключи: "имя пользователя", "пароль".

user_agent
str

Если этот параметр указан, он будет добавлен перед строкой встроенного агента пользователя.

auto_lock_renewer
Optional[AutoLockRenewer]

Можно указать ~azure.servicebus.aio.AutoLockRenewer, чтобы сообщения автоматически регистрировались при получении. Если получатель является приемником сеанса, он будет применяться к сеансу.

prefetch_count
int

Максимальное количество сообщений для кэширования при каждом запросе к службе. Этот параметр предназначен только для расширенной настройки производительности. Увеличение этого значения повысит производительность пропускной способности сообщений, но увеличит вероятность истечения срока действия сообщений во время их кэширования, если они не обрабатываются достаточно быстро. Значение по умолчанию — 0, то есть сообщения будут поступать от службы и обрабатываться по одному за раз. Если prefetch_count равно 0, ServiceBusReceiver.receive будет пытаться кэшировать max_message_count (если указано) в своем запросе к службе.

client_identifier
str

Строковый идентификатор для уникальной идентификации экземпляра клиента. Служебная шина связывает ее с некоторыми сообщениями об ошибках, чтобы упростить корреляцию ошибок. Если значение не указано, будет создан уникальный идентификатор.

socket_timeout
float

Время в секундах, в течение времени ожидания базового сокета в подключении при отправке и получении данных до истечения времени ожидания. Значение по умолчанию — 0,2 для TransportType.Amqp и 1 для TransportType.AmqpOverWebsocket. Если возникают ошибки подключения из-за истечения времени ожидания записи, может потребоваться передать значение, превышающее значение по умолчанию.

Переменные

fully_qualified_namespace
str

Полное имя узла для пространства имен служебной шины. Формат пространства имен: .servicebus.windows.net.

entity_path
str

Путь к сущности, к которому подключается клиент.

Методы

abandon_message

Откажитесь от сообщения.

Это сообщение будет возвращено в очередь и станет доступным для получения снова.

close
complete_message

Завершите сообщение.

При этом сообщение будет удалено из очереди.

dead_letter_message

Переместите сообщение в очередь недоставленных сообщений.

Очередь недоставленных сообщений — это вложенная очередь, которую можно использовать для хранения сообщений, которые не удалось правильно обработать или иным образом требуют дополнительной проверки или обработки. Очередь также можно настроить для отправки просроченных сообщений в очередь недоставленных сообщений.

defer_message

Откладывает сообщение.

Это сообщение останется в очереди, но его необходимо запросить специально по его порядковой номеру для получения.

peek_messages

Просмотр сообщений, ожидающих в очереди.

Просматриваемые сообщения не удаляются из очереди и не блокируются. Они не могут быть заполнены, отложены или недоставлены.

receive_deferred_messages

Получать сообщения, которые ранее были отложены.

При получении отложенных сообщений от секционированного объекта все предоставленные порядковые номера должны быть сообщениями из одной секции.

receive_messages

Получение пакета сообщений одновременно.

Этот подход является оптимальным, если вы хотите обрабатывать несколько сообщений одновременно или выполнять нерегламентированное получение в виде одного вызова.

Обратите внимание, что количество сообщений, полученных в одном пакете, будет зависеть от того, задано ли prefetch_count для получателя. Если prefetch_count не задано для получателя, получатель будет пытаться кэшировать max_message_count сообщения (если они предоставлены) в рамках запроса к службе.

Этот вызов будет определять приоритет при быстром возврате по сравнению с заданным размером пакета, и поэтому возвращается, как только будет получено хотя бы одно сообщение и есть пробел во входящих сообщениях независимо от указанного размера пакета.

renew_message_lock

Обновите блокировку сообщения.

Это сохранит блокировку сообщения, чтобы гарантировать, что оно не будет возвращено в очередь для повторной обработки.

Чтобы завершить (или иным образом урегулировать) сообщение, блокировка должна быть сохранена и не может еще истечет; Блокировка с истекшим сроком действия не может быть продлена.

Сообщения, полученные в режиме RECEIVE_AND_DELETE, не блокируются и, следовательно, не могут быть продлены. Эта операция также доступна только для сообщений, не относящихся к сеансу.

abandon_message

Откажитесь от сообщения.

Это сообщение будет возвращено в очередь и станет доступным для получения снова.

async abandon_message(message: ServiceBusReceivedMessage) -> None

Параметры

message
ServiceBusReceivedMessage
Обязательно

Полученное сообщение, которое необходимо прервать.

Возвращаемый тип

Исключения

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Примеры

Отказаться от полученного сообщения.


       messages = await servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           await servicebus_receiver.abandon_message(message)

close

async close() -> None

Исключения

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

complete_message

Завершите сообщение.

При этом сообщение будет удалено из очереди.

async complete_message(message: ServiceBusReceivedMessage) -> None

Параметры

message
ServiceBusReceivedMessage
Обязательно

Полученное сообщение для завершения.

Возвращаемый тип

Исключения

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Примеры

Завершите полученное сообщение.


       messages = await servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           await servicebus_receiver.complete_message(message)

dead_letter_message

Переместите сообщение в очередь недоставленных сообщений.

Очередь недоставленных сообщений — это вложенная очередь, которую можно использовать для хранения сообщений, которые не удалось правильно обработать или иным образом требуют дополнительной проверки или обработки. Очередь также можно настроить для отправки просроченных сообщений в очередь недоставленных сообщений.

async dead_letter_message(message: ServiceBusReceivedMessage, reason: str | None = None, error_description: str | None = None) -> None

Параметры

message
ServiceBusReceivedMessage
Обязательно

Полученное сообщение, которое должно быть недоставлено.

reason
Optional[str]
значение по умолчанию: None

Причина недоставленных сообщений.

error_description
Optional[str]
значение по умолчанию: None

Подробное описание ошибки для недоставленных сообщений.

Возвращаемый тип

Исключения

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Примеры

Недоставленное сообщение.


       messages = await servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           await servicebus_receiver.dead_letter_message(message)

defer_message

Откладывает сообщение.

Это сообщение останется в очереди, но его необходимо запросить специально по его порядковой номеру для получения.

async defer_message(message: ServiceBusReceivedMessage) -> None

Параметры

message
ServiceBusReceivedMessage
Обязательно

Полученное сообщение, которое будет отложено.

Возвращаемый тип

Исключения

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Примеры

Отложить полученное сообщение.


       messages = await servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           await servicebus_receiver.defer_message(message)

peek_messages

Просмотр сообщений, ожидающих в очереди.

Просматриваемые сообщения не удаляются из очереди и не блокируются. Они не могут быть заполнены, отложены или недоставлены.

async peek_messages(max_message_count: int = 1, *, sequence_number: int = 0, timeout: float | None = None, **kwargs: Any) -> List[ServiceBusReceivedMessage]

Параметры

max_message_count
int
значение по умолчанию: 1

Максимальное количество сообщений для просмотра. Значение по умолчанию — 1.

sequence_number
int

Порядковый номер сообщения, с которого начинается просмотр сообщений.

timeout
Optional[float]

Общее время ожидания операции в секундах, включая все повторные попытки. Значение должно быть больше 0, если указано. Значение по умолчанию — None, что означает отсутствие времени ожидания.

Возвращаемое значение

Список объектов ~azure.servicebus.ServiceBusReceivedMessage.

Возвращаемый тип

Исключения

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Примеры

Просмотр сообщений в очереди.


   async with servicebus_receiver:
       messages = await servicebus_receiver.peek_messages()
       for message in messages:
           print(str(message))

receive_deferred_messages

Получать сообщения, которые ранее были отложены.

При получении отложенных сообщений от секционированного объекта все предоставленные порядковые номера должны быть сообщениями из одной секции.

async receive_deferred_messages(sequence_numbers: int | List[int], *, timeout: float | None = None, **kwargs: Any) -> List[ServiceBusReceivedMessage]

Параметры

sequence_numbers
Union[int, list[int]]
Обязательно

Список порядковых номеров отложенных сообщений.

timeout
Optional[float]

Общее время ожидания операции в секундах, включая все повторные попытки. Значение должно быть больше 0, если указано. Значение по умолчанию — None, что означает отсутствие времени ожидания.

Возвращаемое значение

Список полученных сообщений.

Возвращаемый тип

list[<xref:azure.servicebus.aio.ServiceBusReceivedMessage>]

Исключения

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Примеры

Получение отложенных сообщений из Служебной шины.


   async with servicebus_receiver:
       deferred_sequenced_numbers = []
       messages = await servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           deferred_sequenced_numbers.append(message.sequence_number)
           print(str(message))
           await servicebus_receiver.defer_message(message)

       received_deferred_msg = await servicebus_receiver.receive_deferred_messages(
           sequence_numbers=deferred_sequenced_numbers
       )

       for message in received_deferred_msg:
           await servicebus_receiver.complete_message(message)

receive_messages

Получение пакета сообщений одновременно.

Этот подход является оптимальным, если вы хотите обрабатывать несколько сообщений одновременно или выполнять нерегламентированное получение в виде одного вызова.

Обратите внимание, что количество сообщений, полученных в одном пакете, будет зависеть от того, задано ли prefetch_count для получателя. Если prefetch_count не задано для получателя, получатель будет пытаться кэшировать max_message_count сообщения (если они предоставлены) в рамках запроса к службе.

Этот вызов будет определять приоритет при быстром возврате по сравнению с заданным размером пакета, и поэтому возвращается, как только будет получено хотя бы одно сообщение и есть пробел во входящих сообщениях независимо от указанного размера пакета.

async receive_messages(max_message_count: int | None = 1, max_wait_time: float | None = None) -> List[ServiceBusReceivedMessage]

Параметры

max_message_count
Optional[int]
значение по умолчанию: 1

Максимальное количество сообщений в пакете. Фактическое возвращаемое число будет зависеть от размера prefetch_count и скорости входящего потока. Установка значения Нет будет полностью зависеть от конфигурации предварительной выборки. Значение по умолчанию — 1.

max_wait_time
Optional[float]
значение по умолчанию: None

Максимальное время ожидания первого сообщения в секундах. Если сообщения не поступают и не указано время ожидания, этот вызов не будет возвращаться, пока подключение не будет закрыто. Если он указан и сообщения не поступают в течение периода ожидания, возвращается пустой список.

Возвращаемое значение

Список полученных сообщений. Если сообщения недоступны, это будет пустой список.

Возвращаемый тип

list[<xref:azure.servicebus.aio.ServiceBusReceivedMessage>]

Исключения

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Примеры

Получение сообщений из Служебной шины.


   async with servicebus_receiver:
       messages = await servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           print(str(message))
           await servicebus_receiver.complete_message(message)

renew_message_lock

Обновите блокировку сообщения.

Это сохранит блокировку сообщения, чтобы гарантировать, что оно не будет возвращено в очередь для повторной обработки.

Чтобы завершить (или иным образом урегулировать) сообщение, блокировка должна быть сохранена и не может еще истечет; Блокировка с истекшим сроком действия не может быть продлена.

Сообщения, полученные в режиме RECEIVE_AND_DELETE, не блокируются и, следовательно, не могут быть продлены. Эта операция также доступна только для сообщений, не относящихся к сеансу.

async renew_message_lock(message: ServiceBusReceivedMessage, *, timeout: float | None = None, **kwargs: Any) -> datetime

Параметры

message
ServiceBusReceivedMessage
Обязательно

Сообщение, для которое нужно обновить блокировку.

timeout
Optional[float]

Общее время ожидания операции в секундах, включая все повторные попытки. Значение должно быть больше 0, если указано. Значение по умолчанию — None, что означает отсутствие времени ожидания.

Возвращаемое значение

Дата и время в формате UTC, в течение которого установлен срок действия блокировки.

Возвращаемый тип

Исключения

TypeError if the message is sessionful.
azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.MessageLockLostError if message lock has already expired.

Примеры

Обновите блокировку полученного сообщения.


       messages = await servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           await servicebus_receiver.renew_message_lock(message)

Атрибуты

client_identifier

Получите идентификатор клиента ServiceBusReceiver, связанный с экземпляром получателя.

Возвращаемый тип

str

session

Получите объект ServiceBusSession, связанный с получателем. Сеанс доступен только для сущностей с поддержкой сеанса. При вызове для приемника, не являющегося сеансом, он возвращает значение None.

Возвращаемый тип

Примеры

Получение сеанса от получателя


       async with servicebus_client.get_queue_receiver(queue_name=queue_name, session_id=session_id) as receiver:
           session = receiver.session