ServiceBusReceiver Класс
Класс ServiceBusReceiver определяет высокоуровневый интерфейс для получения сообщений из очереди Служебная шина Azure или подписки раздела.
Двумя основными каналами для получения сообщений являются receive() для выполнения единого запроса сообщений и асинхронный канал для сообщения в получателе: для непрерывного получения входящих сообщений.
Чтобы создать экземпляр ServiceBusReceiver, get_<queue/subscription>_receiver
используйте метод ~azure.servicebus.aio.ServiceBusClient.
- Наследование
-
ServiceBusReceiverazure.servicebus.aio._base_handler_async.BaseHandlerServiceBusReceiverazure.servicebus._common.receiver_mixins.ReceiverMixinServiceBusReceiver
Конструктор
ServiceBusReceiver(fully_qualified_namespace: str, credential: AsyncTokenCredential | AzureSasCredential | AzureNamedKeyCredential, *, queue_name: str | None = None, topic_name: str | None = None, subscription_name: str | None = None, receive_mode: ServiceBusReceiveMode | str = ServiceBusReceiveMode.PEEK_LOCK, max_wait_time: float | None = None, auto_lock_renewer: AutoLockRenewer | None = None, prefetch_count: int = 0, **kwargs: Any)
Параметры
- fully_qualified_namespace
- str
Полное имя узла для пространства имен служебной шины. Формат пространства имен: .servicebus.windows.net.
- credential
- AsyncTokenCredential или AzureSasCredential или AzureNamedKeyCredential
Объект учетных данных, используемый для проверки подлинности, который реализует определенный интерфейс для получения маркеров. Он принимает объекты учетных данных, созданные библиотекой azure-identity, и объекты, реализующие метод *get_token(self, scopes) или также можно предоставить AzureSasCredential.
- queue_name
- str
Путь к определенной очереди служебной шины, к которому подключается клиент.
- topic_name
- str
Путь к определенному разделу служебной шины, который содержит подписку, к которой подключается клиент.
- subscription_name
- str
Путь к определенной подписке служебной шины в указанном разделе, к которому подключается клиент.
- receive_mode
- Union[ServiceBusReceiveMode, str]
Режим, в котором сообщения будут извлекаться из сущности. Два варианта: PEEK_LOCK и RECEIVE_AND_DELETE. Сообщения, полученные с PEEK_LOCK, должны быть урегулированы в течение заданного периода блокировки, прежде чем они будут удалены из очереди. Сообщения, полученные с помощью RECEIVE_AND_DELETE, будут немедленно удалены из очереди и не могут быть впоследствии отменены или получены повторно, если клиенту не удается обработать сообщение. Режим по умолчанию — PEEK_LOCK.
Время ожидания в секундах между полученными сообщениями, после которого получатель автоматически прекращает получение. Значение по умолчанию — None, что означает отсутствие времени ожидания.
- logging_enable
- bool
Следует ли выводить журналы трассировки сети в средство ведения журнала. Значение по умолчанию — False.
- transport_type
- TransportType
Тип транспортного протокола, который будет использоваться для взаимодействия со службой служебной шины. Значение по умолчанию — TransportType.Amqp.
- http_proxy
- Dict
Параметры прокси-сервера HTTP. Это должен быть словарь со следующими ключами: "proxy_hostname" (значение str) и "proxy_port" (значение int). Кроме того, могут присутствовать следующие ключи: "имя пользователя", "пароль".
- user_agent
- str
Если этот параметр указан, он будет добавлен перед строкой встроенного агента пользователя.
- auto_lock_renewer
- Optional[AutoLockRenewer]
Можно указать ~azure.servicebus.aio.AutoLockRenewer, чтобы сообщения автоматически регистрировались при получении. Если получатель является приемником сеанса, он будет применяться к сеансу.
- prefetch_count
- int
Максимальное количество сообщений для кэширования при каждом запросе к службе. Этот параметр предназначен только для расширенной настройки производительности. Увеличение этого значения повысит производительность пропускной способности сообщений, но увеличит вероятность истечения срока действия сообщений во время их кэширования, если они не обрабатываются достаточно быстро. Значение по умолчанию — 0, то есть сообщения будут поступать от службы и обрабатываться по одному за раз. Если prefetch_count равно 0, ServiceBusReceiver.receive будет пытаться кэшировать max_message_count (если указано) в своем запросе к службе.
- client_identifier
- str
Строковый идентификатор для уникальной идентификации экземпляра клиента. Служебная шина связывает ее с некоторыми сообщениями об ошибках, чтобы упростить корреляцию ошибок. Если значение не указано, будет создан уникальный идентификатор.
- socket_timeout
- float
Время в секундах, в течение времени ожидания базового сокета в подключении при отправке и получении данных до истечения времени ожидания. Значение по умолчанию — 0,2 для TransportType.Amqp и 1 для TransportType.AmqpOverWebsocket. Если возникают ошибки подключения из-за истечения времени ожидания записи, может потребоваться передать значение, превышающее значение по умолчанию.
Переменные
- fully_qualified_namespace
- str
Полное имя узла для пространства имен служебной шины. Формат пространства имен: .servicebus.windows.net.
- entity_path
- str
Путь к сущности, к которому подключается клиент.
Методы
abandon_message |
Откажитесь от сообщения. Это сообщение будет возвращено в очередь и станет доступным для получения снова. |
close | |
complete_message |
Завершите сообщение. При этом сообщение будет удалено из очереди. |
dead_letter_message |
Переместите сообщение в очередь недоставленных сообщений. Очередь недоставленных сообщений — это вложенная очередь, которую можно использовать для хранения сообщений, которые не удалось правильно обработать или иным образом требуют дополнительной проверки или обработки. Очередь также можно настроить для отправки просроченных сообщений в очередь недоставленных сообщений. |
defer_message |
Откладывает сообщение. Это сообщение останется в очереди, но его необходимо запросить специально по его порядковой номеру для получения. |
peek_messages |
Просмотр сообщений, ожидающих в очереди. Просматриваемые сообщения не удаляются из очереди и не блокируются. Они не могут быть заполнены, отложены или недоставлены. |
receive_deferred_messages |
Получать сообщения, которые ранее были отложены. При получении отложенных сообщений от секционированного объекта все предоставленные порядковые номера должны быть сообщениями из одной секции. |
receive_messages |
Получение пакета сообщений одновременно. Этот подход является оптимальным, если вы хотите обрабатывать несколько сообщений одновременно или выполнять нерегламентированное получение в виде одного вызова. Обратите внимание, что количество сообщений, полученных в одном пакете, будет зависеть от того, задано ли prefetch_count для получателя. Если prefetch_count не задано для получателя, получатель будет пытаться кэшировать max_message_count сообщения (если они предоставлены) в рамках запроса к службе. Этот вызов будет определять приоритет при быстром возврате по сравнению с заданным размером пакета, и поэтому возвращается, как только будет получено хотя бы одно сообщение и есть пробел во входящих сообщениях независимо от указанного размера пакета. |
renew_message_lock |
Обновите блокировку сообщения. Это сохранит блокировку сообщения, чтобы гарантировать, что оно не будет возвращено в очередь для повторной обработки. Чтобы завершить (или иным образом урегулировать) сообщение, блокировка должна быть сохранена и не может еще истечет; Блокировка с истекшим сроком действия не может быть продлена. Сообщения, полученные в режиме RECEIVE_AND_DELETE, не блокируются и, следовательно, не могут быть продлены. Эта операция также доступна только для сообщений, не относящихся к сеансу. |
abandon_message
Откажитесь от сообщения.
Это сообщение будет возвращено в очередь и станет доступным для получения снова.
async abandon_message(message: ServiceBusReceivedMessage) -> None
Параметры
Возвращаемый тип
Исключения
Примеры
Отказаться от полученного сообщения.
messages = await servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
await servicebus_receiver.abandon_message(message)
close
async close() -> None
Исключения
complete_message
Завершите сообщение.
При этом сообщение будет удалено из очереди.
async complete_message(message: ServiceBusReceivedMessage) -> None
Параметры
Возвращаемый тип
Исключения
Примеры
Завершите полученное сообщение.
messages = await servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
await servicebus_receiver.complete_message(message)
dead_letter_message
Переместите сообщение в очередь недоставленных сообщений.
Очередь недоставленных сообщений — это вложенная очередь, которую можно использовать для хранения сообщений, которые не удалось правильно обработать или иным образом требуют дополнительной проверки или обработки. Очередь также можно настроить для отправки просроченных сообщений в очередь недоставленных сообщений.
async dead_letter_message(message: ServiceBusReceivedMessage, reason: str | None = None, error_description: str | None = None) -> None
Параметры
- message
- ServiceBusReceivedMessage
Полученное сообщение, которое должно быть недоставлено.
Подробное описание ошибки для недоставленных сообщений.
Возвращаемый тип
Исключения
Примеры
Недоставленное сообщение.
messages = await servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
await servicebus_receiver.dead_letter_message(message)
defer_message
Откладывает сообщение.
Это сообщение останется в очереди, но его необходимо запросить специально по его порядковой номеру для получения.
async defer_message(message: ServiceBusReceivedMessage) -> None
Параметры
Возвращаемый тип
Исключения
Примеры
Отложить полученное сообщение.
messages = await servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
await servicebus_receiver.defer_message(message)
peek_messages
Просмотр сообщений, ожидающих в очереди.
Просматриваемые сообщения не удаляются из очереди и не блокируются. Они не могут быть заполнены, отложены или недоставлены.
async peek_messages(max_message_count: int = 1, *, sequence_number: int = 0, timeout: float | None = None, **kwargs: Any) -> List[ServiceBusReceivedMessage]
Параметры
- max_message_count
- int
Максимальное количество сообщений для просмотра. Значение по умолчанию — 1.
- sequence_number
- int
Порядковый номер сообщения, с которого начинается просмотр сообщений.
Общее время ожидания операции в секундах, включая все повторные попытки. Значение должно быть больше 0, если указано. Значение по умолчанию — None, что означает отсутствие времени ожидания.
Возвращаемое значение
Список объектов ~azure.servicebus.ServiceBusReceivedMessage.
Возвращаемый тип
Исключения
Примеры
Просмотр сообщений в очереди.
async with servicebus_receiver:
messages = await servicebus_receiver.peek_messages()
for message in messages:
print(str(message))
receive_deferred_messages
Получать сообщения, которые ранее были отложены.
При получении отложенных сообщений от секционированного объекта все предоставленные порядковые номера должны быть сообщениями из одной секции.
async receive_deferred_messages(sequence_numbers: int | List[int], *, timeout: float | None = None, **kwargs: Any) -> List[ServiceBusReceivedMessage]
Параметры
Общее время ожидания операции в секундах, включая все повторные попытки. Значение должно быть больше 0, если указано. Значение по умолчанию — None, что означает отсутствие времени ожидания.
Возвращаемое значение
Список полученных сообщений.
Возвращаемый тип
Исключения
Примеры
Получение отложенных сообщений из Служебной шины.
async with servicebus_receiver:
deferred_sequenced_numbers = []
messages = await servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
deferred_sequenced_numbers.append(message.sequence_number)
print(str(message))
await servicebus_receiver.defer_message(message)
received_deferred_msg = await servicebus_receiver.receive_deferred_messages(
sequence_numbers=deferred_sequenced_numbers
)
for message in received_deferred_msg:
await servicebus_receiver.complete_message(message)
receive_messages
Получение пакета сообщений одновременно.
Этот подход является оптимальным, если вы хотите обрабатывать несколько сообщений одновременно или выполнять нерегламентированное получение в виде одного вызова.
Обратите внимание, что количество сообщений, полученных в одном пакете, будет зависеть от того, задано ли prefetch_count для получателя. Если prefetch_count не задано для получателя, получатель будет пытаться кэшировать max_message_count сообщения (если они предоставлены) в рамках запроса к службе.
Этот вызов будет определять приоритет при быстром возврате по сравнению с заданным размером пакета, и поэтому возвращается, как только будет получено хотя бы одно сообщение и есть пробел во входящих сообщениях независимо от указанного размера пакета.
async receive_messages(max_message_count: int | None = 1, max_wait_time: float | None = None) -> List[ServiceBusReceivedMessage]
Параметры
Максимальное количество сообщений в пакете. Фактическое возвращаемое число будет зависеть от размера prefetch_count и скорости входящего потока. Установка значения Нет будет полностью зависеть от конфигурации предварительной выборки. Значение по умолчанию — 1.
Максимальное время ожидания первого сообщения в секундах. Если сообщения не поступают и не указано время ожидания, этот вызов не будет возвращаться, пока подключение не будет закрыто. Если он указан и сообщения не поступают в течение периода ожидания, возвращается пустой список.
Возвращаемое значение
Список полученных сообщений. Если сообщения недоступны, это будет пустой список.
Возвращаемый тип
Исключения
Примеры
Получение сообщений из Служебной шины.
async with servicebus_receiver:
messages = await servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
print(str(message))
await servicebus_receiver.complete_message(message)
renew_message_lock
Обновите блокировку сообщения.
Это сохранит блокировку сообщения, чтобы гарантировать, что оно не будет возвращено в очередь для повторной обработки.
Чтобы завершить (или иным образом урегулировать) сообщение, блокировка должна быть сохранена и не может еще истечет; Блокировка с истекшим сроком действия не может быть продлена.
Сообщения, полученные в режиме RECEIVE_AND_DELETE, не блокируются и, следовательно, не могут быть продлены. Эта операция также доступна только для сообщений, не относящихся к сеансу.
async renew_message_lock(message: ServiceBusReceivedMessage, *, timeout: float | None = None, **kwargs: Any) -> datetime
Параметры
Общее время ожидания операции в секундах, включая все повторные попытки. Значение должно быть больше 0, если указано. Значение по умолчанию — None, что означает отсутствие времени ожидания.
Возвращаемое значение
Дата и время в формате UTC, в течение которого установлен срок действия блокировки.
Возвращаемый тип
Исключения
Примеры
Обновите блокировку полученного сообщения.
messages = await servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
await servicebus_receiver.renew_message_lock(message)
Атрибуты
client_identifier
Получите идентификатор клиента ServiceBusReceiver, связанный с экземпляром получателя.
Возвращаемый тип
session
Получите объект ServiceBusSession, связанный с получателем. Сеанс доступен только для сущностей с поддержкой сеанса. При вызове для приемника, не являющегося сеансом, он возвращает значение None.
Возвращаемый тип
Примеры
Получение сеанса от получателя
async with servicebus_client.get_queue_receiver(queue_name=queue_name, session_id=session_id) as receiver:
session = receiver.session
Обратная связь
https://aka.ms/ContentUserFeedback.
Ожидается в ближайшее время: в течение 2024 года мы постепенно откажемся от GitHub Issues как механизма обратной связи для контента и заменим его новой системой обратной связи. Дополнительные сведения см. в разделеОтправить и просмотреть отзыв по