Compartir a través de


ServiceBusReceiver Clase

La clase ServiceBusReceiver define una interfaz de alto nivel para recibir mensajes de la cola de Azure Service Bus o la suscripción de tema.

Los dos canales principales para la recepción de mensajes son receive() para realizar una única solicitud de mensajes y para el mensaje en el receptor: para recibir continuamente mensajes entrantes de forma continua.

Use el get_<queue/subscription>_receiver método ~azure.servicebus.ServiceBusClient para crear una instancia de ServiceBusReceiver.

Herencia
azure.servicebus._base_handler.BaseHandler
ServiceBusReceiver
azure.servicebus._common.receiver_mixins.ReceiverMixin
ServiceBusReceiver

Constructor

ServiceBusReceiver(fully_qualified_namespace: str, credential: TokenCredential | AzureSasCredential | AzureNamedKeyCredential, *, queue_name: str | None = None, topic_name: str | None = None, subscription_name: str | None = None, receive_mode: ServiceBusReceiveMode | str = ServiceBusReceiveMode.PEEK_LOCK, max_wait_time: float | None = None, auto_lock_renewer: AutoLockRenewer | None = None, prefetch_count: int = 0, **kwargs: Any)

Parámetros

fully_qualified_namespace
str
Requerido

Nombre de host completo para el espacio de nombres de Service Bus. El formato del espacio de nombres es: .servicebus.windows.net.

credential
TokenCredential o AzureSasCredential o AzureNamedKeyCredential
Requerido

Objeto de credencial usado para la autenticación que implementa una interfaz determinada para obtener tokens. Acepta objetos de credenciales generados por la biblioteca azure-identity y los objetos que implementan el método *get_token (self, scopes) o, como alternativa, también se puede proporcionar una instancia de AzureSasCredential.

queue_name
str

Ruta de acceso de la cola de Service Bus específica a la que se conecta el cliente.

topic_name
str

Ruta de acceso del tema de Service Bus específico al que contiene la suscripción a la que se conecta el cliente.

subscription_name
str

Ruta de acceso de una suscripción de Service Bus específica en el tema especificado al que se conecta el cliente.

max_wait_time
Optional[float]

Tiempo de espera en segundos entre los mensajes recibidos después del cual el receptor dejará de recibirse automáticamente. El valor predeterminado es None, lo que significa que no se ha agotado el tiempo de espera.

receive_mode
Union[ServiceBusReceiveMode, str]

Modo con el que se recuperarán los mensajes de la entidad. Las dos opciones son PEEK_LOCK y RECEIVE_AND_DELETE. Los mensajes recibidos con PEEK_LOCK deben liquidarse dentro de un período de bloqueo determinado antes de que se quiten de la cola. Los mensajes recibidos con RECEIVE_AND_DELETE se quitarán inmediatamente de la cola y no se podrán abandonar o volver a recibir posteriormente si el cliente no puede procesar el mensaje. El modo predeterminado es PEEK_LOCK.

logging_enable
bool

Si se van a generar registros de seguimiento de red en el registrador. El valor predeterminado es false.

transport_type
TransportType

Tipo de protocolo de transporte que se usará para comunicarse con el servicio Service Bus. El valor predeterminado es TransportType.Amqp.

http_proxy
Dict

Configuración del proxy HTTP. Debe ser un diccionario con las siguientes claves: "proxy_hostname" (valor str) y "proxy_port" (valor int). Además, las siguientes claves también pueden estar presentes: "username", "password".

user_agent
str

Si se especifica, se agregará delante de la cadena de agente de usuario integrada.

auto_lock_renewer
Optional[AutoLockRenewer]

Se puede proporcionar un ~azure.servicebus.AutoLockRenewer para que los mensajes se registren automáticamente al recibirlos. Si el receptor es un receptor de sesión, se aplicará a la sesión en su lugar.

prefetch_count
int

Número máximo de mensajes que se van a almacenar en caché con cada solicitud al servicio. Esta configuración solo es para el ajuste avanzado del rendimiento. Aumentar este valor mejorará el rendimiento del mensaje, pero aumentará la posibilidad de que los mensajes expiren mientras se almacenan en caché si no se procesan lo suficientemente rápido. El valor predeterminado es 0, lo que significa que los mensajes se recibirán del servicio y se procesarán de uno en uno. En el caso de que prefetch_count sea 0, ServiceBusReceiver.receive intentaría almacenar en caché max_message_count (si se proporciona) dentro de su solicitud al servicio.

client_identifier
str

Identificador basado en cadenas para identificar de forma única la instancia de cliente. Service Bus lo asociará a algunos mensajes de error para facilitar la correlación de errores. Si no se especifica, se generará un identificador único.

socket_timeout
float

Tiempo en segundos que el socket subyacente de la conexión debe esperar al enviar y recibir datos antes de que se agote el tiempo de espera. El valor predeterminado es 0.2 para TransportType.Amqp y 1 para TransportType.AmqpOverWebsocket. Si se producen errores de conexión debido a un tiempo de espera de escritura, es posible que sea necesario pasar un valor mayor que el predeterminado.

Variables

fully_qualified_namespace
str

Nombre de host completo para el espacio de nombres de Service Bus. El formato del espacio de nombres es: .servicebus.windows.net.

entity_path
str

Ruta de acceso de la entidad a la que se conecta el cliente.

Métodos

abandon_message

Abandone el mensaje.

Este mensaje se devolverá a la cola y estará disponible para recibirse de nuevo.

close
complete_message

Complete el mensaje.

Esto quita el mensaje de la cola.

dead_letter_message

Mueva el mensaje a la cola de mensajes fallidos.

La cola de mensajes fallidos es una sub cola que se puede usar para almacenar los mensajes que no se han podido procesar correctamente o, de lo contrario, requieren una mayor inspección o procesamiento. La cola también se puede configurar para enviar mensajes expirados a la cola de mensajes fallidos.

defer_message

Aplaza el mensaje.

Este mensaje permanecerá en la cola, pero debe solicitarse específicamente por su número de secuencia para recibirlo.

next
peek_messages

Examine los mensajes pendientes actualmente en la cola.

Los mensajes inspeccionados no se quitan de la cola ni están bloqueados. No se pueden completar, aplazar o no escribir mensajes fallidos.

receive_deferred_messages

Recibir mensajes que se han aplazado anteriormente.

Al recibir mensajes aplazados de una entidad con particiones, todos los números de secuencia proporcionados deben ser mensajes de la misma partición.

receive_messages

Reciba un lote de mensajes a la vez.

Este enfoque es óptimo si desea procesar varios mensajes simultáneamente o realizar una recepción ad hoc como una sola llamada.

Tenga en cuenta que el número de mensajes recuperados en un solo lote dependerá de si prefetch_count se estableció para el receptor. Si no se establece prefetch_count para el receptor, el receptor intentará almacenar en caché max_message_count (si se proporciona) mensajes dentro de la solicitud al servicio.

Esta llamada dará prioridad a la devolución rápida sobre la reunión de un tamaño de lote especificado, por lo que devolverá tan pronto como se reciba al menos un mensaje y haya una brecha en los mensajes entrantes independientemente del tamaño de lote especificado.

renew_message_lock

Renueve el bloqueo del mensaje.

Esto mantendrá el bloqueo en el mensaje para asegurarse de que no se devuelve a la cola para que se vuelva a procesar.

Para completar (o liquidar) el mensaje, el bloqueo debe mantenerse y no puede haber expirado; No se puede renovar un bloqueo expirado.

Los mensajes recibidos a través del modo RECEIVE_AND_DELETE no están bloqueados y, por tanto, no se pueden renovar. Esta operación solo está disponible para mensajes que no son de sesión.

abandon_message

Abandone el mensaje.

Este mensaje se devolverá a la cola y estará disponible para recibirse de nuevo.

abandon_message(message: ServiceBusReceivedMessage) -> None

Parámetros

message
ServiceBusReceivedMessage
Requerido

Mensaje recibido que se va a abandonar.

Tipo de valor devuelto

Excepciones

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Ejemplos

Abandone un mensaje recibido.


       messages = servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           servicebus_receiver.abandon_message(message)

close

close() -> None

Excepciones

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

complete_message

Complete el mensaje.

Esto quita el mensaje de la cola.

complete_message(message: ServiceBusReceivedMessage) -> None

Parámetros

message
ServiceBusReceivedMessage
Requerido

Mensaje recibido que se va a completar.

Tipo de valor devuelto

Excepciones

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Ejemplos

Complete un mensaje recibido.


       messages = servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           servicebus_receiver.complete_message(message)

dead_letter_message

Mueva el mensaje a la cola de mensajes fallidos.

La cola de mensajes fallidos es una sub cola que se puede usar para almacenar los mensajes que no se han podido procesar correctamente o, de lo contrario, requieren una mayor inspección o procesamiento. La cola también se puede configurar para enviar mensajes expirados a la cola de mensajes fallidos.

dead_letter_message(message: ServiceBusReceivedMessage, reason: str | None = None, error_description: str | None = None) -> None

Parámetros

message
ServiceBusReceivedMessage
Requerido

Mensaje recibido que se va a enviar por mensajes fallidos.

reason
Optional[str]
valor predeterminado: None

Motivo para enviar mensajes fallidos al mensaje.

error_description
Optional[str]
valor predeterminado: None

Descripción detallada del error para enviar mensajes fallidos al mensaje.

Tipo de valor devuelto

Excepciones

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Ejemplos

Mensaje fallido un mensaje recibido.


       messages = servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           servicebus_receiver.dead_letter_message(message)

defer_message

Aplaza el mensaje.

Este mensaje permanecerá en la cola, pero debe solicitarse específicamente por su número de secuencia para recibirlo.

defer_message(message: ServiceBusReceivedMessage) -> None

Parámetros

message
ServiceBusReceivedMessage
Requerido

Mensaje recibido que se va a aplazar.

Tipo de valor devuelto

Excepciones

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Ejemplos

Aplazar un mensaje recibido.


       messages = servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           servicebus_receiver.defer_message(message)

next

next()

Excepciones

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

peek_messages

Examine los mensajes pendientes actualmente en la cola.

Los mensajes inspeccionados no se quitan de la cola ni están bloqueados. No se pueden completar, aplazar o no escribir mensajes fallidos.

peek_messages(max_message_count: int = 1, *, sequence_number: int = 0, timeout: float | None = None, **kwargs: Any) -> List[ServiceBusReceivedMessage]

Parámetros

max_message_count
int
valor predeterminado: 1

Número máximo de mensajes que se van a intentar ver. El valor predeterminado es 1.

sequence_number
int

Número de secuencia de mensajes desde el que se van a empezar a examinar los mensajes.

timeout
Optional[float]

Tiempo de espera total de la operación en segundos, incluidos todos los reintentos. El valor debe ser mayor que 0 si se especifica. El valor predeterminado es None, lo que significa que no se ha agotado el tiempo de espera.

Devoluciones

Lista de ~azure.servicebus.ServiceBusReceivedMessage.

Tipo de valor devuelto

Excepciones

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Ejemplos

Examine los mensajes pendientes en la cola.


   with servicebus_receiver:
       messages = servicebus_receiver.peek_messages()
       for message in messages:
           print(str(message))

receive_deferred_messages

Recibir mensajes que se han aplazado anteriormente.

Al recibir mensajes aplazados de una entidad con particiones, todos los números de secuencia proporcionados deben ser mensajes de la misma partición.

receive_deferred_messages(sequence_numbers: int | List[int], *, timeout: float | None = None, **kwargs: Any) -> List[ServiceBusReceivedMessage]

Parámetros

sequence_numbers
Union[int,List[int]]
Requerido

Lista de los números de secuencia de mensajes que se han aplazado.

timeout
Optional[float]

Tiempo de espera total de la operación en segundos, incluidos todos los reintentos. El valor debe ser mayor que 0 si se especifica. El valor predeterminado es None, lo que significa que no se ha agotado el tiempo de espera.

Devoluciones

Lista de las instancias de ~azure.servicebus.ServiceBusReceivedMessage solicitadas.

Tipo de valor devuelto

Excepciones

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Ejemplos

Recibir mensajes diferidos de ServiceBus.


   with servicebus_receiver:
       deferred_sequenced_numbers = []
       messages = servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           deferred_sequenced_numbers.append(message.sequence_number)
           print(str(message))
           servicebus_receiver.defer_message(message)

       received_deferred_msg = servicebus_receiver.receive_deferred_messages(
           sequence_numbers=deferred_sequenced_numbers
       )

       for msg in received_deferred_msg:
           servicebus_receiver.complete_message(msg)

receive_messages

Reciba un lote de mensajes a la vez.

Este enfoque es óptimo si desea procesar varios mensajes simultáneamente o realizar una recepción ad hoc como una sola llamada.

Tenga en cuenta que el número de mensajes recuperados en un solo lote dependerá de si prefetch_count se estableció para el receptor. Si no se establece prefetch_count para el receptor, el receptor intentará almacenar en caché max_message_count (si se proporciona) mensajes dentro de la solicitud al servicio.

Esta llamada dará prioridad a la devolución rápida sobre la reunión de un tamaño de lote especificado, por lo que devolverá tan pronto como se reciba al menos un mensaje y haya una brecha en los mensajes entrantes independientemente del tamaño de lote especificado.

receive_messages(max_message_count: int | None = 1, max_wait_time: float | None = None) -> List[ServiceBusReceivedMessage]

Parámetros

max_message_count
Optional[int]
valor predeterminado: 1

Número máximo de mensajes en el lote. El número real devuelto dependerá de prefetch_count y la velocidad de flujo entrante. Si se establece en Ninguno, dependerá completamente de la configuración de captura previa. El valor predeterminado es 1.

max_wait_time
Optional[float]
valor predeterminado: None

Tiempo máximo de espera en segundos para que llegue el primer mensaje. Si no llegan mensajes y no se especifica ningún tiempo de espera, esta llamada no devolverá hasta que se cierre la conexión. Si se especifica, no llegan mensajes dentro del período de tiempo de espera, se devolverá una lista vacía.

Devoluciones

Lista de mensajes recibidos. Si no hay ningún mensaje disponible, será una lista vacía.

Tipo de valor devuelto

Excepciones

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Ejemplos

Recibir mensajes de ServiceBus.


   with servicebus_receiver:
       messages = servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           print(str(message))
           servicebus_receiver.complete_message(message)

renew_message_lock

Renueve el bloqueo del mensaje.

Esto mantendrá el bloqueo en el mensaje para asegurarse de que no se devuelve a la cola para que se vuelva a procesar.

Para completar (o liquidar) el mensaje, el bloqueo debe mantenerse y no puede haber expirado; No se puede renovar un bloqueo expirado.

Los mensajes recibidos a través del modo RECEIVE_AND_DELETE no están bloqueados y, por tanto, no se pueden renovar. Esta operación solo está disponible para mensajes que no son de sesión.

renew_message_lock(message: ServiceBusReceivedMessage, *, timeout: float | None = None, **kwargs: Any) -> datetime

Parámetros

message
ServiceBusReceivedMessage
Requerido

Mensaje para el que se renovará el bloqueo.

timeout
Optional[float]

Tiempo de espera total de la operación en segundos, incluidos todos los reintentos. El valor debe ser mayor que 0 si se especifica. El valor predeterminado es None, lo que significa que no se ha agotado el tiempo de espera.

Devoluciones

Fecha y hora utc en la que se establece el bloqueo para expirar.

Tipo de valor devuelto

Excepciones

TypeError if the message is sessionful.
azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.MessageLockLostError if message lock has already expired.

Ejemplos

Renueve el bloqueo en un mensaje recibido.


       messages = servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           servicebus_receiver.renew_message_lock(message)

Atributos

client_identifier

Obtenga el client_identifier serviceBusReceiver asociado a la instancia del receptor.

Tipo de valor devuelto

str

session

Obtenga el objeto ServiceBusSession vinculado con el receptor. La sesión solo está disponible para las entidades habilitadas para sesión; devolvería None si se llama a en un receptor no con sesión.

Tipo de valor devuelto

Ejemplos

Obtener sesión de un receptor


       with servicebus_client.get_queue_receiver(queue_name=queue_name, session_id=session_id) as receiver:
           session = receiver.session