Condividi tramite


ServiceBusReceiver Classe

La classe ServiceBusReceiver definisce un'interfaccia di alto livello per la ricezione di messaggi dalla coda bus di servizio di Azure o dalla sottoscrizione dell'argomento.

I due canali principali per la ricezione dei messaggi sono receive() per effettuare una singola richiesta per i messaggi e asincrona per il messaggio nel ricevitore: per ricevere continuamente i messaggi in arrivo in modo continuativo.

Usare il get_<queue/subscription>_receiver metodo di ~azure.servicebus.aio.ServiceBusClient per creare un'istanza di ServiceBusReceiver.

Ereditarietà
ServiceBusReceiver
azure.servicebus.aio._base_handler_async.BaseHandler
ServiceBusReceiver
azure.servicebus._common.receiver_mixins.ReceiverMixin
ServiceBusReceiver

Costruttore

ServiceBusReceiver(fully_qualified_namespace: str, credential: AsyncTokenCredential | AzureSasCredential | AzureNamedKeyCredential, *, queue_name: str | None = None, topic_name: str | None = None, subscription_name: str | None = None, receive_mode: ServiceBusReceiveMode | str = ServiceBusReceiveMode.PEEK_LOCK, max_wait_time: float | None = None, auto_lock_renewer: AutoLockRenewer | None = None, prefetch_count: int = 0, **kwargs: Any)

Parametri

fully_qualified_namespace
str
Necessario

Nome host completo per lo spazio dei nomi del bus di servizio. Il formato dello spazio dei nomi è: .servicebus.windows.net.

credential
AsyncTokenCredential oppure AzureSasCredential oppure AzureNamedKeyCredential
Necessario

Oggetto credenziale usato per l'autenticazione che implementa un'interfaccia specifica per ottenere i token. Accetta oggetti credenziali generati dalla libreria e dagli oggetti azure-identity che implementano anche il metodo *get_token(self, ambiti) o in alternativa, è possibile specificare anche AzureSasCredential.

queue_name
str

Percorso di coda del bus di servizio specifico a cui si connette il client.

topic_name
str

Percorso di un argomento specifico del bus di servizio che contiene la sottoscrizione a cui si connette il client.

subscription_name
str

Percorso di sottoscrizione del bus di servizio specifico nell'argomento specificato a cui si connette il client.

receive_mode
Union[ServiceBusReceiveMode, str]

Modalità con cui verranno recuperati i messaggi dall'entità. Le due opzioni sono PEEK_LOCK e RECEIVE_AND_DELETE. I messaggi ricevuti con PEEK_LOCK devono essere risolti entro un determinato periodo di blocco prima che vengano rimossi dalla coda. I messaggi ricevuti con RECEIVE_AND_DELETE verranno immediatamente rimossi dalla coda e non possono essere successivamente abbandonati o re-ricevuti se il client non riesce a elaborare il messaggio. La modalità predefinita è PEEK_LOCK.

max_wait_time
Optional[float]

Timeout in secondi tra i messaggi ricevuti dopo il quale il ricevitore interrompe automaticamente la ricezione. Il valore predefinito è Nessuno, ovvero nessun timeout.

logging_enable
bool

Indica se restituire i log di traccia di rete al logger. L'impostazione predefinita è False.

transport_type
TransportType

Tipo di protocollo di trasporto che verrà usato per comunicare con il servizio bus di servizio. Il valore predefinito è TransportType.Amqp.

http_proxy
Dict

Impostazioni proxy HTTP. Deve essere un dizionario con le chiavi seguenti: 'proxy_hostname' (valore str) e 'proxy_port' (valore int). Inoltre, le chiavi seguenti possono essere presenti: 'username', 'password'.

user_agent
str

Se specificato, verrà aggiunto davanti alla stringa dell'agente utente predefinita.

auto_lock_renewer
Optional[AutoLockRenewer]

È possibile specificare ~azure.servicebus.aio.AutoLockRenewer in modo che i messaggi vengano registrati automaticamente nella ricezione. Se il ricevitore è un ricevitore di sessione, verrà applicato invece alla sessione.

prefetch_count
int

Numero massimo di messaggi da memorizzare nella cache con ogni richiesta al servizio. Questa impostazione è solo per l'ottimizzazione avanzata delle prestazioni. L'aumento di questo valore migliora le prestazioni della velocità effettiva dei messaggi, ma aumenta la probabilità che i messaggi scadano mentre vengono memorizzati nella cache se non vengono elaborati abbastanza velocemente. Il valore predefinito è 0, ovvero i messaggi verranno ricevuti dal servizio e elaborati uno alla volta. Nel caso di prefetch_count 0, ServiceBusReceiver.receive tenterà di memorizzare nella cache max_message_count (se specificato) all'interno della richiesta al servizio.

client_identifier
str

Identificatore basato su stringa per identificare in modo univoco l'istanza client. Il bus di servizio lo associa ad alcuni messaggi di errore per semplificare la correlazione degli errori. Se non specificato, verrà generato un ID univoco.

socket_timeout
float

Tempo in secondi in cui il socket sottostante sulla connessione deve attendere quando si inviano e ricevono dati prima del timeout. Il valore predefinito è 0.2 per TransportType.Amqp e 1 per TransportType.AmqpOverWebsocket. Se si verificano errori di connessione a causa del timeout di scrittura, potrebbe essere necessario passare un valore maggiore di quello predefinito.

Variabili

fully_qualified_namespace
str

Nome host completo per lo spazio dei nomi del bus di servizio. Il formato dello spazio dei nomi è: .servicebus.windows.net.

entity_path
str

Percorso dell'entità a cui si connette il client.

Metodi

abandon_message

Abbandonare il messaggio.

Questo messaggio verrà restituito alla coda e reso disponibile per essere ricevuto di nuovo.

close
complete_message

Completare il messaggio.

In questo modo il messaggio viene rimosso dalla coda.

dead_letter_message

Spostare il messaggio nella coda Lettera morta.

La coda di lettere non recapitabili è una coda secondaria che può essere usata per archiviare i messaggi che non sono riusciti a elaborare correttamente o in caso contrario richiedono un'ulteriore ispezione o elaborazione. La coda può essere configurata anche per inviare messaggi scaduti alla coda Lettera morta.

defer_message

Deferare il messaggio.

Questo messaggio rimarrà nella coda, ma deve essere richiesto in modo specifico dal relativo numero di sequenza per essere ricevuto.

peek_messages

Esplorare i messaggi attualmente in sospeso nella coda.

I messaggi in anteprima non vengono rimossi dalla coda, né sono bloccati. Non possono essere completati, posticipati o non recapitati.

receive_deferred_messages

Ricevere messaggi che in precedenza sono stati posticipati.

Quando si ricevono messaggi posticipati da un'entità partizionata, tutti i numeri di sequenza forniti devono essere messaggi dalla stessa partizione.

receive_messages

Ricevere un batch di messaggi contemporaneamente.

Questo approccio è ottimale se si desidera elaborare più messaggi contemporaneamente o eseguire una ricezione ad hoc come singola chiamata.

Si noti che il numero di messaggi recuperati in un singolo batch dipenderà dal fatto che prefetch_count sia stato impostato per il ricevitore. Se prefetch_count non è impostato per il ricevitore, il ricevitore tenterà di memorizzare nella cache max_message_count (se specificato) messaggi all'interno della richiesta al servizio.

Questa chiamata assegna la priorità alla restituzione rapidamente rispetto alle dimensioni del batch specificate e quindi restituirà non appena viene ricevuto almeno un messaggio e si verifica un divario nei messaggi in ingresso indipendentemente dalle dimensioni del batch specificate.

renew_message_lock

Rinnovare il blocco del messaggio.

Questo manterrà il blocco sul messaggio per assicurarsi che non venga restituito alla coda da rielaborare.

Per completare (o risolvere in altro modo) il messaggio, il blocco deve essere mantenuto e non può essere già scaduto; Non è possibile rinnovare un blocco scaduto.

I messaggi ricevuti tramite RECEIVE_AND_DELETE modalità non sono bloccati e pertanto non possono essere rinnovati. Questa operazione è disponibile solo per i messaggi non con sessione.

abandon_message

Abbandonare il messaggio.

Questo messaggio verrà restituito alla coda e reso disponibile per essere ricevuto di nuovo.

async abandon_message(message: ServiceBusReceivedMessage) -> None

Parametri

message
ServiceBusReceivedMessage
Necessario

Messaggio ricevuto da abbandonare.

Tipo restituito

Eccezioni

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Esempio

Abbandonare un messaggio ricevuto.


       messages = await servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           await servicebus_receiver.abandon_message(message)

close

async close() -> None

Eccezioni

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

complete_message

Completare il messaggio.

In questo modo il messaggio viene rimosso dalla coda.

async complete_message(message: ServiceBusReceivedMessage) -> None

Parametri

message
ServiceBusReceivedMessage
Necessario

Messaggio ricevuto da completare.

Tipo restituito

Eccezioni

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Esempio

Completare un messaggio ricevuto.


       messages = await servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           await servicebus_receiver.complete_message(message)

dead_letter_message

Spostare il messaggio nella coda Lettera morta.

La coda di lettere non recapitabili è una coda secondaria che può essere usata per archiviare i messaggi che non sono riusciti a elaborare correttamente o in caso contrario richiedono un'ulteriore ispezione o elaborazione. La coda può essere configurata anche per inviare messaggi scaduti alla coda Lettera morta.

async dead_letter_message(message: ServiceBusReceivedMessage, reason: str | None = None, error_description: str | None = None) -> None

Parametri

message
ServiceBusReceivedMessage
Necessario

Messaggio ricevuto da ricevere messaggi non recapitabili.

reason
Optional[str]
valore predefinito: None

Motivo della lettera non recapitata al messaggio.

error_description
Optional[str]
valore predefinito: None

Descrizione dettagliata dell'errore per la lettera non recapitata del messaggio.

Tipo restituito

Eccezioni

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Esempio

Lettera morta un messaggio ricevuto.


       messages = await servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           await servicebus_receiver.dead_letter_message(message)

defer_message

Deferare il messaggio.

Questo messaggio rimarrà nella coda, ma deve essere richiesto in modo specifico dal relativo numero di sequenza per essere ricevuto.

async defer_message(message: ServiceBusReceivedMessage) -> None

Parametri

message
ServiceBusReceivedMessage
Necessario

Messaggio ricevuto da rinviare.

Tipo restituito

Eccezioni

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Esempio

Rinviare un messaggio ricevuto.


       messages = await servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           await servicebus_receiver.defer_message(message)

peek_messages

Esplorare i messaggi attualmente in sospeso nella coda.

I messaggi in anteprima non vengono rimossi dalla coda, né sono bloccati. Non possono essere completati, posticipati o non recapitati.

async peek_messages(max_message_count: int = 1, *, sequence_number: int = 0, timeout: float | None = None, **kwargs: Any) -> List[ServiceBusReceivedMessage]

Parametri

max_message_count
int
valore predefinito: 1

Numero massimo di messaggi da provare e visualizzare. Il valore predefinito è 1.

sequence_number
int

Numero di sequenza di messaggi da cui avviare l'esplorazione dei messaggi.

timeout
Optional[float]

Il timeout totale dell'operazione in secondi, inclusi tutti i tentativi. Il valore deve essere maggiore di 0 se specificato. Il valore predefinito è Nessuno, ovvero nessun timeout.

Restituisce

Elenco di oggetti ~azure.servicebus.ServiceBusReceivedMessage.

Tipo restituito

Eccezioni

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Esempio

Visualizzare i messaggi nella coda.


   async with servicebus_receiver:
       messages = await servicebus_receiver.peek_messages()
       for message in messages:
           print(str(message))

receive_deferred_messages

Ricevere messaggi che in precedenza sono stati posticipati.

Quando si ricevono messaggi posticipati da un'entità partizionata, tutti i numeri di sequenza forniti devono essere messaggi dalla stessa partizione.

async receive_deferred_messages(sequence_numbers: int | List[int], *, timeout: float | None = None, **kwargs: Any) -> List[ServiceBusReceivedMessage]

Parametri

sequence_numbers
Union[int, list[int]]
Necessario

Elenco dei numeri di sequenza di messaggi posticipati.

timeout
Optional[float]

Il timeout totale dell'operazione in secondi, inclusi tutti i tentativi. Il valore deve essere maggiore di 0 se specificato. Il valore predefinito è Nessuno, ovvero nessun timeout.

Restituisce

Elenco dei messaggi ricevuti.

Tipo restituito

list[<xref:azure.servicebus.aio.ServiceBusReceivedMessage>]

Eccezioni

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Esempio

Ricevere messaggi posticipati da ServiceBus.


   async with servicebus_receiver:
       deferred_sequenced_numbers = []
       messages = await servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           deferred_sequenced_numbers.append(message.sequence_number)
           print(str(message))
           await servicebus_receiver.defer_message(message)

       received_deferred_msg = await servicebus_receiver.receive_deferred_messages(
           sequence_numbers=deferred_sequenced_numbers
       )

       for message in received_deferred_msg:
           await servicebus_receiver.complete_message(message)

receive_messages

Ricevere un batch di messaggi contemporaneamente.

Questo approccio è ottimale se si desidera elaborare più messaggi contemporaneamente o eseguire una ricezione ad hoc come singola chiamata.

Si noti che il numero di messaggi recuperati in un singolo batch dipenderà dal fatto che prefetch_count sia stato impostato per il ricevitore. Se prefetch_count non è impostato per il ricevitore, il ricevitore tenterà di memorizzare nella cache max_message_count (se specificato) messaggi all'interno della richiesta al servizio.

Questa chiamata assegna la priorità alla restituzione rapidamente rispetto alle dimensioni del batch specificate e quindi restituirà non appena viene ricevuto almeno un messaggio e si verifica un divario nei messaggi in ingresso indipendentemente dalle dimensioni del batch specificate.

async receive_messages(max_message_count: int | None = 1, max_wait_time: float | None = None) -> List[ServiceBusReceivedMessage]

Parametri

max_message_count
Optional[int]
valore predefinito: 1

Numero massimo di messaggi nel batch. Il numero effettivo restituito dipenderà dalle dimensioni prefetch_count e dalla frequenza di flusso in ingresso. L'impostazione su Nessuno dipende completamente dalla configurazione di prefetch. Il valore predefinito è 1.

max_wait_time
Optional[float]
valore predefinito: None

Tempo massimo di attesa in secondi per l'arrivo del primo messaggio. Se non arrivano messaggi e non viene specificato alcun timeout, questa chiamata non restituirà finché la connessione non viene chiusa. Se specificato, e non vengono restituiti messaggi entro il periodo di timeout, verrà restituito un elenco vuoto.

Restituisce

Elenco di messaggi ricevuti. Se non sono disponibili messaggi, questo sarà un elenco vuoto.

Tipo restituito

list[<xref:azure.servicebus.aio.ServiceBusReceivedMessage>]

Eccezioni

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Esempio

Ricevere messaggi da ServiceBus.


   async with servicebus_receiver:
       messages = await servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           print(str(message))
           await servicebus_receiver.complete_message(message)

renew_message_lock

Rinnovare il blocco del messaggio.

Questo manterrà il blocco sul messaggio per assicurarsi che non venga restituito alla coda da rielaborare.

Per completare (o risolvere in altro modo) il messaggio, il blocco deve essere mantenuto e non può essere già scaduto; Non è possibile rinnovare un blocco scaduto.

I messaggi ricevuti tramite RECEIVE_AND_DELETE modalità non sono bloccati e pertanto non possono essere rinnovati. Questa operazione è disponibile solo per i messaggi non con sessione.

async renew_message_lock(message: ServiceBusReceivedMessage, *, timeout: float | None = None, **kwargs: Any) -> datetime

Parametri

message
ServiceBusReceivedMessage
Necessario

Messaggio per rinnovare il blocco.

timeout
Optional[float]

Il timeout totale dell'operazione in secondi, inclusi tutti i tentativi. Il valore deve essere maggiore di 0 se specificato. Il valore predefinito è None, ovvero nessun timeout.

Restituisce

Il valore utc datetime al quale il blocco è impostato per scadere.

Tipo restituito

Eccezioni

TypeError if the message is sessionful.
azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.MessageLockLostError if message lock has already expired.

Esempio

Rinnovare il blocco in un messaggio ricevuto.


       messages = await servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           await servicebus_receiver.renew_message_lock(message)

Attributi

client_identifier

Ottenere l'identificatore client ServiceBusReceiver associato all'istanza del ricevitore.

Tipo restituito

str

session

Ottenere l'oggetto ServiceBusSession collegato al ricevitore. La sessione è disponibile solo per le entità abilitate per la sessione, restituisce Nessuno se viene chiamato in un ricevitore non sessione.

Tipo restituito

Esempio

Ottenere la sessione da un ricevitore


       async with servicebus_client.get_queue_receiver(queue_name=queue_name, session_id=session_id) as receiver:
           session = receiver.session