Поделиться через


ServiceBusReceiver Класс

Класс ServiceBusReceiver определяет высокоуровневый интерфейс для получения сообщений из очереди Служебная шина Azure или подписки раздела.

Двумя основными каналами для получения сообщений являются receive() для выполнения единого запроса на сообщения и для сообщения в получателе: для непрерывного получения входящих сообщений.

Для создания экземпляра ServiceBusReceiver используйте get_<queue/subscription>_receiver метод ~azure.servicebus.ServiceBusClient.

Наследование
azure.servicebus._base_handler.BaseHandler
ServiceBusReceiver
azure.servicebus._common.receiver_mixins.ReceiverMixin
ServiceBusReceiver

Конструктор

ServiceBusReceiver(fully_qualified_namespace: str, credential: TokenCredential | AzureSasCredential | AzureNamedKeyCredential, *, queue_name: str | None = None, topic_name: str | None = None, subscription_name: str | None = None, receive_mode: ServiceBusReceiveMode | str = ServiceBusReceiveMode.PEEK_LOCK, max_wait_time: float | None = None, auto_lock_renewer: AutoLockRenewer | None = None, prefetch_count: int = 0, **kwargs: Any)

Параметры

fully_qualified_namespace
str
Обязательно

Полное имя узла для пространства имен служебной шины. Формат пространства имен: .servicebus.windows.net.

credential
TokenCredential или AzureSasCredential или AzureNamedKeyCredential
Обязательно

Объект учетных данных, используемый для проверки подлинности, который реализует определенный интерфейс для получения маркеров. Он принимает объекты учетных данных, созданные библиотекой azure-identity, и объекты, реализующие метод *get_token(self, scopes) или также можно предоставить AzureSasCredential.

queue_name
str

Путь к определенной очереди служебной шины, к которому подключается клиент.

topic_name
str

Путь к определенному разделу служебной шины, который содержит подписку, к которой подключается клиент.

subscription_name
str

Путь к определенной подписке служебной шины в указанном разделе, к которому подключается клиент.

max_wait_time
Optional[float]

Время ожидания в секундах между полученными сообщениями, после которого получатель автоматически перестанет получать сообщения. Значение по умолчанию — Нет, что означает отсутствие времени ожидания.

receive_mode
Union[ServiceBusReceiveMode, str]

Режим, в котором сообщения будут извлекаться из сущности. Два варианта: PEEK_LOCK и RECEIVE_AND_DELETE. Сообщения, полученные с PEEK_LOCK, должны быть урегулированы в течение заданного периода блокировки, прежде чем они будут удалены из очереди. Сообщения, полученные с RECEIVE_AND_DELETE, будут немедленно удалены из очереди и не могут быть впоследствии отменены или получены повторно, если клиенту не удается обработать сообщение. Режим по умолчанию — PEEK_LOCK.

logging_enable
bool

Указывает, следует ли выводить журналы трассировки сети в средство ведения журнала. Значение по умолчанию — False.

transport_type
TransportType

Тип транспортного протокола, который будет использоваться для взаимодействия со службой служебной шины. Значение по умолчанию — TransportType.Amqp.

http_proxy
Dict

Параметры прокси-сервера HTTP. Это должен быть словарь со следующими ключами: "proxy_hostname" (значение str) и "proxy_port" (значение int). Кроме того, могут присутствовать следующие ключи: "имя пользователя", "пароль".

user_agent
str

Если этот параметр указан, он будет добавлен перед встроенной строкой агента пользователя.

auto_lock_renewer
Optional[AutoLockRenewer]

Можно предоставить ~azure.servicebus.AutoLockRenewer, чтобы сообщения автоматически регистрировались при получении. Если получатель является приемником сеанса, он будет применяться к сеансу.

prefetch_count
int

Максимальное количество сообщений для кэширования при каждом запросе к службе. Этот параметр предназначен только для расширенной настройки производительности. Увеличение этого значения повысит производительность пропускной способности сообщений, но увеличит вероятность истечения срока действия сообщений во время их кэширования, если они не обрабатываются достаточно быстро. Значение по умолчанию — 0, то есть сообщения будут получаться от службы и обрабатываться по одному за раз. Если prefetch_count равно 0, ServiceBusReceiver.receive будет пытаться кэшировать max_message_count (если он указан) в своем запросе к службе.

client_identifier
str

Строковый идентификатор для уникальной идентификации экземпляра клиента. Служебная шина свяжет ее с некоторыми сообщениями об ошибках, чтобы упростить корреляцию ошибок. Если этот параметр не указан, будет создан уникальный идентификатор.

socket_timeout
float

Время в секундах, в течение времени ожидания базового сокета подключения при отправке и получении данных до истечения времени ожидания. Значение по умолчанию — 0,2 для TransportType.Amqp и 1 для TransportType.AmqpOverWebsocket. Если возникают ошибки подключения из-за превышения времени ожидания записи, может потребоваться передать значение, превышающее значение по умолчанию.

Переменные

fully_qualified_namespace
str

Полное имя узла для пространства имен служебной шины. Формат пространства имен: .servicebus.windows.net.

entity_path
str

Путь к сущности, к которому подключается клиент.

Методы

abandon_message

Отказаться от сообщения.

Это сообщение будет возвращено в очередь и станет доступным для получения снова.

close
complete_message

Завершите сообщение.

При этом сообщение будет удалено из очереди.

dead_letter_message

Переместите сообщение в очередь недоставленных сообщений.

Очередь недоставленных сообщений — это вложенная очередь, которая может использоваться для хранения сообщений, которые не удалось правильно обработать или которые иным образом требуют дальнейшей проверки или обработки. Очередь также можно настроить для отправки сообщений с истекшим сроком действия в очередь недоставленных сообщений.

defer_message

Откладывает сообщение.

Это сообщение останется в очереди, но его необходимо запросить специально по его порядковой номеру, чтобы его можно было получить.

next
peek_messages

Просмотр сообщений, ожидающих в очереди.

Просматриваемые сообщения не удаляются из очереди и не блокируются. Они не могут быть заполнены, отложены или недоставлены.

receive_deferred_messages

Получать сообщения, которые ранее были отложены.

При получении отложенных сообщений от секционированного объекта все предоставленные порядковые номера должны быть сообщениями из одной секции.

receive_messages

Получение пакета сообщений одновременно.

Этот подход является оптимальным, если вы хотите обрабатывать несколько сообщений одновременно или выполнять нерегламентированный прием в виде одного вызова.

Обратите внимание, что количество сообщений, полученных в одном пакете, будет зависеть от того, задано ли prefetch_count для получателя. Если prefetch_count не задано для получателя, получатель будет пытаться кэшировать max_message_count (если они указаны) сообщений в запросе к службе.

Этот вызов будет отдавать приоритет быстрому возвращению, а не собранию указанного размера пакета, и поэтому возвращается, как только будет получено по крайней мере одно сообщение и есть разрыв во входящих сообщениях независимо от указанного размера пакета.

renew_message_lock

Обновите блокировку сообщения.

Это сохранит блокировку сообщения, чтобы гарантировать, что оно не будет возвращено в очередь для повторной обработки.

Чтобы завершить (или иным образом урегулировать) сообщение, блокировка должна быть сохранена и не может еще истечет; Блокировка с истекшим сроком действия не может быть продлена.

Сообщения, полученные в режиме RECEIVE_AND_DELETE, не блокируются и, следовательно, не могут быть продлены. Эта операция также доступна только для сообщений, не относящихся к сеансу.

abandon_message

Отказаться от сообщения.

Это сообщение будет возвращено в очередь и станет доступным для получения снова.

abandon_message(message: ServiceBusReceivedMessage) -> None

Параметры

message
ServiceBusReceivedMessage
Обязательно

Полученное сообщение, которое необходимо отказаться.

Возвращаемый тип

Исключения

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Примеры

Отказаться от полученного сообщения.


       messages = servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           servicebus_receiver.abandon_message(message)

close

close() -> None

Исключения

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

complete_message

Завершите сообщение.

При этом сообщение будет удалено из очереди.

complete_message(message: ServiceBusReceivedMessage) -> None

Параметры

message
ServiceBusReceivedMessage
Обязательно

Полученное сообщение для завершения.

Возвращаемый тип

Исключения

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Примеры

Завершите полученное сообщение.


       messages = servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           servicebus_receiver.complete_message(message)

dead_letter_message

Переместите сообщение в очередь недоставленных сообщений.

Очередь недоставленных сообщений — это вложенная очередь, которая может использоваться для хранения сообщений, которые не удалось правильно обработать или которые иным образом требуют дальнейшей проверки или обработки. Очередь также можно настроить для отправки сообщений с истекшим сроком действия в очередь недоставленных сообщений.

dead_letter_message(message: ServiceBusReceivedMessage, reason: str | None = None, error_description: str | None = None) -> None

Параметры

message
ServiceBusReceivedMessage
Обязательно

Полученное сообщение, которое будет недоставлено.

reason
Optional[str]
значение по умолчанию: None

Причина недоставленных сообщений.

error_description
Optional[str]
значение по умолчанию: None

Подробное описание ошибки при недоставленных сообщениях.

Возвращаемый тип

Исключения

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Примеры

Недоставленное сообщение.


       messages = servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           servicebus_receiver.dead_letter_message(message)

defer_message

Откладывает сообщение.

Это сообщение останется в очереди, но его необходимо запросить специально по его порядковой номеру, чтобы его можно было получить.

defer_message(message: ServiceBusReceivedMessage) -> None

Параметры

message
ServiceBusReceivedMessage
Обязательно

Полученное сообщение, которое требуется отложить.

Возвращаемый тип

Исключения

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Примеры

Отложить полученное сообщение.


       messages = servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           servicebus_receiver.defer_message(message)

next

next()

Исключения

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

peek_messages

Просмотр сообщений, ожидающих в очереди.

Просматриваемые сообщения не удаляются из очереди и не блокируются. Они не могут быть заполнены, отложены или недоставлены.

peek_messages(max_message_count: int = 1, *, sequence_number: int = 0, timeout: float | None = None, **kwargs: Any) -> List[ServiceBusReceivedMessage]

Параметры

max_message_count
int
значение по умолчанию: 1

Максимальное количество сообщений для просмотра. Значение по умолчанию — 1.

sequence_number
int

Порядковый номер сообщения, с которого начинается просмотр сообщений.

timeout
Optional[float]

Общее время ожидания операции в секундах, включая все повторные попытки. Значение должно быть больше 0, если указано. Значение по умолчанию — Нет, что означает отсутствие времени ожидания.

Возвращаемое значение

Список ~azure.servicebus.ServiceBusReceivedMessage.

Возвращаемый тип

Исключения

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Примеры

Просмотрите ожидающие сообщения в очереди.


   with servicebus_receiver:
       messages = servicebus_receiver.peek_messages()
       for message in messages:
           print(str(message))

receive_deferred_messages

Получать сообщения, которые ранее были отложены.

При получении отложенных сообщений от секционированного объекта все предоставленные порядковые номера должны быть сообщениями из одной секции.

receive_deferred_messages(sequence_numbers: int | List[int], *, timeout: float | None = None, **kwargs: Any) -> List[ServiceBusReceivedMessage]

Параметры

sequence_numbers
Union[int,List[int]]
Обязательно

Список порядковых номеров отложенных сообщений.

timeout
Optional[float]

Общее время ожидания операции в секундах, включая все повторные попытки. Значение должно быть больше 0, если указано. Значение по умолчанию — Нет, что означает отсутствие времени ожидания.

Возвращаемое значение

Список запрошенных экземпляров ~azure.servicebus.ServiceBusReceivedMessage.

Возвращаемый тип

Исключения

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Примеры

Получение отложенных сообщений из Служебной шины.


   with servicebus_receiver:
       deferred_sequenced_numbers = []
       messages = servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           deferred_sequenced_numbers.append(message.sequence_number)
           print(str(message))
           servicebus_receiver.defer_message(message)

       received_deferred_msg = servicebus_receiver.receive_deferred_messages(
           sequence_numbers=deferred_sequenced_numbers
       )

       for msg in received_deferred_msg:
           servicebus_receiver.complete_message(msg)

receive_messages

Получение пакета сообщений одновременно.

Этот подход является оптимальным, если вы хотите обрабатывать несколько сообщений одновременно или выполнять нерегламентированный прием в виде одного вызова.

Обратите внимание, что количество сообщений, полученных в одном пакете, будет зависеть от того, задано ли prefetch_count для получателя. Если prefetch_count не задано для получателя, получатель будет пытаться кэшировать max_message_count (если они указаны) сообщений в запросе к службе.

Этот вызов будет отдавать приоритет быстрому возвращению, а не собранию указанного размера пакета, и поэтому возвращается, как только будет получено по крайней мере одно сообщение и есть разрыв во входящих сообщениях независимо от указанного размера пакета.

receive_messages(max_message_count: int | None = 1, max_wait_time: float | None = None) -> List[ServiceBusReceivedMessage]

Параметры

max_message_count
Optional[int]
значение по умолчанию: 1

Максимальное количество сообщений в пакете. Фактическое возвращаемое число будет зависеть от prefetch_count и скорости входящего потока. Установка значения Нет полностью зависит от конфигурации предварительной выборки. Значение по умолчанию — 1.

max_wait_time
Optional[float]
значение по умолчанию: None

Максимальное время ожидания первого сообщения в секундах. Если сообщения не поступают и не указано время ожидания, этот вызов не будет возвращаться, пока подключение не будет закрыто. Если этот параметр указан, сообщения не поступают в течение периода ожидания, возвращается пустой список.

Возвращаемое значение

Список полученных сообщений. Если сообщения недоступны, это будет пустой список.

Возвращаемый тип

Исключения

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Примеры

Получение сообщений из Служебной шины.


   with servicebus_receiver:
       messages = servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           print(str(message))
           servicebus_receiver.complete_message(message)

renew_message_lock

Обновите блокировку сообщения.

Это сохранит блокировку сообщения, чтобы гарантировать, что оно не будет возвращено в очередь для повторной обработки.

Чтобы завершить (или иным образом урегулировать) сообщение, блокировка должна быть сохранена и не может еще истечет; Блокировка с истекшим сроком действия не может быть продлена.

Сообщения, полученные в режиме RECEIVE_AND_DELETE, не блокируются и, следовательно, не могут быть продлены. Эта операция также доступна только для сообщений, не относящихся к сеансу.

renew_message_lock(message: ServiceBusReceivedMessage, *, timeout: float | None = None, **kwargs: Any) -> datetime

Параметры

message
ServiceBusReceivedMessage
Обязательно

Сообщение для возобновления блокировки.

timeout
Optional[float]

Общее время ожидания операции в секундах, включая все повторные попытки. Значение должно быть больше 0, если указано. Значение по умолчанию — None, что означает отсутствие времени ожидания.

Возвращаемое значение

Дата и время в формате UTC, в течение которого установлен срок действия блокировки.

Возвращаемый тип

Исключения

TypeError if the message is sessionful.
azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.MessageLockLostError if message lock has already expired.

Примеры

Обновите блокировку полученного сообщения.


       messages = servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           servicebus_receiver.renew_message_lock(message)

Атрибуты

client_identifier

Получение client_identifier ServiceBusReceiver, связанного с экземпляром получателя.

Возвращаемый тип

str

session

Получите объект ServiceBusSession, связанный с получателем. Сеанс доступен только для сущностей с поддержкой сеанса. При вызове для несеансового приемника он вернет значение None.

Возвращаемый тип

Примеры

Получение сеанса от получателя


       with servicebus_client.get_queue_receiver(queue_name=queue_name, session_id=session_id) as receiver:
           session = receiver.session