ServiceBusReceiver Klas
De klasse ServiceBusReceiver definieert een interface op hoog niveau voor het ontvangen van berichten van de Azure Service Bus Queue of Topic Subscription.
De twee primaire kanalen voor de ontvangst van berichten zijn receive() om één aanvraag voor berichten te doen en asynchroon voor bericht in ontvanger: om continu inkomende berichten op een doorlopende manier te ontvangen.
Gebruik de get_<queue/subscription>_receiver
methode van ~azure.servicebus.aio.ServiceBusClient om een ServiceBusReceiver-exemplaar te maken.
- Overname
-
ServiceBusReceiverazure.servicebus.aio._base_handler_async.BaseHandlerServiceBusReceiverazure.servicebus._common.receiver_mixins.ReceiverMixinServiceBusReceiver
Constructor
ServiceBusReceiver(fully_qualified_namespace: str, credential: AsyncTokenCredential | AzureSasCredential | AzureNamedKeyCredential, *, queue_name: str | None = None, topic_name: str | None = None, subscription_name: str | None = None, receive_mode: ServiceBusReceiveMode | str = ServiceBusReceiveMode.PEEK_LOCK, max_wait_time: float | None = None, auto_lock_renewer: AutoLockRenewer | None = None, prefetch_count: int = 0, **kwargs: Any)
Parameters
- fully_qualified_namespace
- str
De volledig gekwalificeerde hostnaam voor de Service Bus-naamruimte. De indeling van de naamruimte is: .servicebus.windows.net.
- credential
- AsyncTokenCredential of AzureSasCredential of AzureNamedKeyCredential
Het referentieobject dat wordt gebruikt voor verificatie, waarmee een bepaalde interface wordt geïmplementeerd voor het ophalen van tokens. Het accepteert referentieobjecten die zijn gegenereerd door de azure-identity-bibliotheek en objecten die de methode *get_token (zelf, bereiken) implementeren, of er kan ook een AzureSasCredential worden opgegeven.
- queue_name
- str
Het pad van een specifieke Service Bus-wachtrij waar de client verbinding mee maakt.
- topic_name
- str
Het pad van een specifiek Service Bus-onderwerp dat het abonnement bevat waarmee de client verbinding maakt.
- subscription_name
- str
Het pad van een specifiek Service Bus-abonnement onder het opgegeven onderwerp waar de client verbinding mee maakt.
- receive_mode
- Union[ServiceBusReceiveMode, str]
De modus waarmee berichten worden opgehaald van de entiteit. De twee opties zijn PEEK_LOCK en RECEIVE_AND_DELETE. Berichten die met PEEK_LOCK worden ontvangen, moeten binnen een bepaalde vergrendelingsperiode worden vereffend voordat ze uit de wachtrij worden verwijderd. Berichten die met RECEIVE_AND_DELETE worden ontvangen, worden onmiddellijk uit de wachtrij verwijderd en kunnen niet worden afgestaan of opnieuw worden ontvangen als de client het bericht niet kan verwerken. De standaardmodus is PEEK_LOCK.
De time-out in seconden tussen ontvangen berichten waarna de ontvanger automatisch stopt met ontvangen. De standaardwaarde is Geen, wat betekent dat er geen time-out is.
- logging_enable
- bool
Of netwerktraceringslogboeken naar de logboekregistratie moeten worden uitgevoerd. De standaardwaarde is Onwaar.
- transport_type
- TransportType
Het type transportprotocol dat wordt gebruikt voor de communicatie met de Service Bus-service. De standaardwaarde is TransportType.Amqp.
- http_proxy
- Dict
HTTP-proxyinstellingen. Dit moet een woordenlijst zijn met de volgende toetsen: 'proxy_hostname' (str-waarde) en 'proxy_port' (int-waarde). Daarnaast kunnen ook de volgende sleutels aanwezig zijn: 'gebruikersnaam', 'wachtwoord'.
- user_agent
- str
Indien opgegeven, wordt deze toegevoegd vóór de ingebouwde tekenreeks van de gebruikersagent.
- auto_lock_renewer
- Optional[AutoLockRenewer]
Een ~azure.servicebus.aio.AutoLockRenewer kan zodanig worden geleverd dat berichten automatisch worden geregistreerd bij ontvangst. Als de ontvanger een sessieontvanger is, is deze in plaats daarvan van toepassing op de sessie.
- prefetch_count
- int
Het maximum aantal berichten dat bij elke aanvraag naar de service in de cache moet worden opgeslagen. Deze instelling is alleen bedoeld voor geavanceerde afstemming van prestaties. Als u deze waarde verhoogt, worden de doorvoerprestaties van berichten verbeterd, maar is de kans groter dat berichten verlopen terwijl ze in de cache worden opgeslagen als ze niet snel genoeg worden verwerkt. De standaardwaarde is 0, wat betekent dat berichten van de service worden ontvangen en één voor één worden verwerkt. In het geval dat prefetch_count 0 is, probeert ServiceBusReceiver.receivemax_message_count (indien opgegeven) in de cache op te nemen in de aanvraag voor de service.
- client_identifier
- str
Een id op basis van tekenreeksen om het clientexemplaar uniek te identificeren. Service Bus koppelt deze aan een aantal foutberichten voor een eenvoudigere correlatie van fouten. Als dit niet wordt opgegeven, wordt er een unieke id gegenereerd.
- socket_timeout
- float
De tijd in seconden dat de onderliggende socket op de verbinding moet wachten bij het verzenden en ontvangen van gegevens voordat er een time-out optreedt. De standaardwaarde is 0,2 voor TransportType.Amqp en 1 voor TransportType.AmqpOverWebsocket. Als er verbindingsfouten optreden vanwege een time-out van schrijfbewerkingen, moet mogelijk een groter dan de standaardwaarde worden doorgegeven.
Variabelen
- fully_qualified_namespace
- str
De volledig gekwalificeerde hostnaam voor de Service Bus-naamruimte. De indeling van de naamruimte is: .servicebus.windows.net.
- entity_path
- str
Het pad van de entiteit waarmee de client verbinding maakt.
Methoden
abandon_message |
Het bericht verlaten. Dit bericht wordt teruggestuurd naar de wachtrij en beschikbaar gesteld om opnieuw te worden ontvangen. |
close | |
complete_message |
Vul het bericht in. Hiermee wordt het bericht uit de wachtrij verwijderd. |
dead_letter_message |
Verplaats het bericht naar de wachtrij Voor onbestelbare berichten. De wachtrij Voor onbestelbare berichten is een subwachtrij die kan worden gebruikt voor het opslaan van berichten die niet correct zijn verwerkt of die anderszins verder moeten worden gecontroleerd of verwerkt. De wachtrij kan ook worden geconfigureerd om verlopen berichten te verzenden naar de wachtrij Met onbestelbare berichten. |
defer_message |
Het bericht wordt uitstellen. Dit bericht blijft in de wachtrij staan, maar moet specifiek worden aangevraagd op basis van het volgnummer om te kunnen worden ontvangen. |
peek_messages |
Blader door berichten die momenteel in behandeling zijn in de wachtrij. Bekeken berichten worden niet verwijderd uit de wachtrij en worden ook niet vergrendeld. Ze kunnen niet worden voltooid, uitgesteld of onbestelbaar gemaakt. |
receive_deferred_messages |
Berichten ontvangen die eerder zijn uitgesteld. Wanneer u uitgestelde berichten van een gepartitioneerde entiteit ontvangt, moeten alle opgegeven volgnummers berichten van dezelfde partitie zijn. |
receive_messages |
Een batch berichten tegelijk ontvangen. Deze benadering is optimaal als u meerdere berichten tegelijk wilt verwerken of een ad-hoc ontvangst wilt uitvoeren als één aanroep. Houd er rekening mee dat het aantal berichten dat in één batch wordt opgehaald, afhankelijk is van het feit of prefetch_count is ingesteld voor de ontvanger. Als prefetch_count niet is ingesteld voor de ontvanger, probeert de ontvanger max_message_count (indien opgegeven) berichten in de cache in de aanvraag bij de service in de cache te plaatsen. Deze aanroep geeft prioriteit aan het snel retourneren boven het voldoen aan een opgegeven batchgrootte en wordt dus geretourneerd zodra ten minste één bericht is ontvangen en er een hiaat is in binnenkomende berichten, ongeacht de opgegeven batchgrootte. |
renew_message_lock |
Vernieuw de berichtvergrendeling. Hiermee blijft de vergrendeling van het bericht behouden om ervoor te zorgen dat het niet wordt teruggestuurd naar de wachtrij om opnieuw te worden verwerkt. Om het bericht te voltooien (of anderszins te vereffenen), moet de vergrendeling worden gehandhaafd en mag deze niet al zijn verlopen; een verlopen vergrendeling kan niet worden vernieuwd. Berichten die via RECEIVE_AND_DELETE-modus worden ontvangen, zijn niet vergrendeld en kunnen daarom niet worden vernieuwd. Deze bewerking is ook alleen beschikbaar voor niet-sessievolle berichten. |
abandon_message
Het bericht verlaten.
Dit bericht wordt teruggestuurd naar de wachtrij en beschikbaar gesteld om opnieuw te worden ontvangen.
async abandon_message(message: ServiceBusReceivedMessage) -> None
Parameters
Retourtype
Uitzonderingen
Voorbeelden
Een ontvangen bericht afbreken.
messages = await servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
await servicebus_receiver.abandon_message(message)
close
async close() -> None
Uitzonderingen
complete_message
Vul het bericht in.
Hiermee wordt het bericht uit de wachtrij verwijderd.
async complete_message(message: ServiceBusReceivedMessage) -> None
Parameters
Retourtype
Uitzonderingen
Voorbeelden
Een ontvangen bericht voltooien.
messages = await servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
await servicebus_receiver.complete_message(message)
dead_letter_message
Verplaats het bericht naar de wachtrij Voor onbestelbare berichten.
De wachtrij Voor onbestelbare berichten is een subwachtrij die kan worden gebruikt voor het opslaan van berichten die niet correct zijn verwerkt of die anderszins verder moeten worden gecontroleerd of verwerkt. De wachtrij kan ook worden geconfigureerd om verlopen berichten te verzenden naar de wachtrij Met onbestelbare berichten.
async dead_letter_message(message: ServiceBusReceivedMessage, reason: str | None = None, error_description: str | None = None) -> None
Parameters
De gedetailleerde foutbeschrijving voor het bericht in onbestelbare letters.
Retourtype
Uitzonderingen
Voorbeelden
Onbestelbare brief een ontvangen bericht.
messages = await servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
await servicebus_receiver.dead_letter_message(message)
defer_message
Het bericht wordt uitstellen.
Dit bericht blijft in de wachtrij staan, maar moet specifiek worden aangevraagd op basis van het volgnummer om te kunnen worden ontvangen.
async defer_message(message: ServiceBusReceivedMessage) -> None
Parameters
Retourtype
Uitzonderingen
Voorbeelden
Een ontvangen bericht uitstellen.
messages = await servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
await servicebus_receiver.defer_message(message)
peek_messages
Blader door berichten die momenteel in behandeling zijn in de wachtrij.
Bekeken berichten worden niet verwijderd uit de wachtrij en worden ook niet vergrendeld. Ze kunnen niet worden voltooid, uitgesteld of onbestelbaar gemaakt.
async peek_messages(max_message_count: int = 1, *, sequence_number: int = 0, timeout: float | None = None, **kwargs: Any) -> List[ServiceBusReceivedMessage]
Parameters
- max_message_count
- int
Het maximum aantal berichten dat u kunt proberen te bekijken. De standaardwaarde is 1.
- sequence_number
- int
Een berichtenvolgordenummer van waaruit u door berichten wilt bladeren.
De totale time-out van de bewerking in seconden, inclusief alle nieuwe pogingen. De waarde moet groter zijn dan 0 indien opgegeven. De standaardwaarde is Geen, wat betekent dat er geen time-out is.
Retouren
Een lijst met ~azure.servicebus.ServiceBusReceivedMessage-objecten.
Retourtype
Uitzonderingen
Voorbeelden
Berichten in de wachtrij bekijken.
async with servicebus_receiver:
messages = await servicebus_receiver.peek_messages()
for message in messages:
print(str(message))
receive_deferred_messages
Berichten ontvangen die eerder zijn uitgesteld.
Wanneer u uitgestelde berichten van een gepartitioneerde entiteit ontvangt, moeten alle opgegeven volgnummers berichten van dezelfde partitie zijn.
async receive_deferred_messages(sequence_numbers: int | List[int], *, timeout: float | None = None, **kwargs: Any) -> List[ServiceBusReceivedMessage]
Parameters
Een lijst met de reeksnummers van berichten die zijn uitgesteld.
De totale time-out van de bewerking in seconden, inclusief alle nieuwe pogingen. De waarde moet groter zijn dan 0 indien opgegeven. De standaardwaarde is Geen, wat betekent dat er geen time-out is.
Retouren
Een lijst met de ontvangen berichten.
Retourtype
Uitzonderingen
Voorbeelden
Uitgestelde berichten ontvangen van ServiceBus.
async with servicebus_receiver:
deferred_sequenced_numbers = []
messages = await servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
deferred_sequenced_numbers.append(message.sequence_number)
print(str(message))
await servicebus_receiver.defer_message(message)
received_deferred_msg = await servicebus_receiver.receive_deferred_messages(
sequence_numbers=deferred_sequenced_numbers
)
for message in received_deferred_msg:
await servicebus_receiver.complete_message(message)
receive_messages
Een batch berichten tegelijk ontvangen.
Deze benadering is optimaal als u meerdere berichten tegelijk wilt verwerken of een ad-hoc ontvangst wilt uitvoeren als één aanroep.
Houd er rekening mee dat het aantal berichten dat in één batch wordt opgehaald, afhankelijk is van het feit of prefetch_count is ingesteld voor de ontvanger. Als prefetch_count niet is ingesteld voor de ontvanger, probeert de ontvanger max_message_count (indien opgegeven) berichten in de cache in de aanvraag bij de service in de cache te plaatsen.
Deze aanroep geeft prioriteit aan het snel retourneren boven het voldoen aan een opgegeven batchgrootte en wordt dus geretourneerd zodra ten minste één bericht is ontvangen en er een hiaat is in binnenkomende berichten, ongeacht de opgegeven batchgrootte.
async receive_messages(max_message_count: int | None = 1, max_wait_time: float | None = None) -> List[ServiceBusReceivedMessage]
Parameters
Maximum aantal berichten in de batch. Het werkelijke aantal dat wordt geretourneerd, is afhankelijk van prefetch_count grootte en binnenkomende stroomsnelheid. De instelling op Geen is volledig afhankelijk van de prefetch-configuratie. De standaardwaarde is 1.
Maximale tijd om in seconden te wachten voordat het eerste bericht binnenkomt. Als er geen berichten binnenkomen en er geen time-out is opgegeven, wordt deze oproep pas geretourneerd als de verbinding is verbroken. Als dit is opgegeven en er geen berichten binnen de time-outperiode binnenkomen, wordt er een lege lijst geretourneerd.
Retouren
Een lijst met ontvangen berichten. Als er geen berichten beschikbaar zijn, is dit een lege lijst.
Retourtype
Uitzonderingen
Voorbeelden
Berichten ontvangen van ServiceBus.
async with servicebus_receiver:
messages = await servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
print(str(message))
await servicebus_receiver.complete_message(message)
renew_message_lock
Vernieuw de berichtvergrendeling.
Hiermee blijft de vergrendeling van het bericht behouden om ervoor te zorgen dat het niet wordt teruggestuurd naar de wachtrij om opnieuw te worden verwerkt.
Om het bericht te voltooien (of anderszins te vereffenen), moet de vergrendeling worden gehandhaafd en mag deze niet al zijn verlopen; een verlopen vergrendeling kan niet worden vernieuwd.
Berichten die via RECEIVE_AND_DELETE-modus worden ontvangen, zijn niet vergrendeld en kunnen daarom niet worden vernieuwd. Deze bewerking is ook alleen beschikbaar voor niet-sessievolle berichten.
async renew_message_lock(message: ServiceBusReceivedMessage, *, timeout: float | None = None, **kwargs: Any) -> datetime
Parameters
- message
- ServiceBusReceivedMessage
Het bericht waarvoor de vergrendeling moet worden vernieuwd.
De totale time-out van de bewerking in seconden, inclusief alle nieuwe pogingen. De waarde moet groter zijn dan 0 indien opgegeven. De standaardwaarde is Geen, wat betekent dat er geen time-out is.
Retouren
De utc-datum/tijd waarop de vergrendeling is ingesteld om te verlopen.
Retourtype
Uitzonderingen
Voorbeelden
Vernieuw de vergrendeling voor een ontvangen bericht.
messages = await servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
await servicebus_receiver.renew_message_lock(message)
Kenmerken
client_identifier
session
Haal het ServiceBusSession-object op dat is gekoppeld aan de ontvanger. Sessie is alleen beschikbaar voor entiteiten met sessiemogelijkheden. De sessie retourneert Geen als deze wordt aangeroepen op een niet-sessievolle ontvanger.
Retourtype
Voorbeelden
Sessie ophalen van een ontvanger
async with servicebus_client.get_queue_receiver(queue_name=queue_name, session_id=session_id) as receiver:
session = receiver.session