biblioteka klienta Azure Service Bus dla języka Python — wersja 7.11.4
Azure Service Bus to usługa do obsługi komunikatów zarządzanych w chmurze o wysokiej wydajności umożliwiająca komunikację w czasie rzeczywistym i odporną na błędy między rozproszonych nadawców i odbiorników.
Usługa Service Bus udostępnia wiele mechanizmów asynchronicznej wysoce niezawodnej komunikacji, takich jak ustrukturyzowana obsługa komunikatów typu "pierwszy na pierwszy w poziomie", możliwości publikowania/subskrybowania oraz możliwość łatwego skalowania w miarę wzrostu potrzeb.
Biblioteka klienta usługi Service Bus dla języka Python umożliwia komunikowanie się między aplikacjami i usługami oraz implementowanie wzorców asynchronicznych obsługi komunikatów.
- Utwórz przestrzenie nazw, kolejki, tematy i subskrypcje usługi Service Bus oraz zmodyfikuj ich ustawienia.
- Wysyłanie i odbieranie komunikatów w kanałach usługi Service Bus.
- Korzystanie z funkcji blokad, sesji i utraconych komunikatów w celu zaimplementowania złożonych wzorców obsługi komunikatów.
Kod | źródłowyPakiet (PyPi) | Pakiet (Conda) | Dokumentacja referencyjna interfejsu | APIDokumentacja | produktuPróbki | Changelog
UWAGA: Jeśli używasz wersji 0.50 lub starszej i chcesz przeprowadzić migrację do najnowszej wersji tego pakietu, zapoznaj się z naszym przewodnikiem migracji, aby przejść z usługi Service Bus V0.50 do usługi Service Bus V7.
Wprowadzenie
Instalowanie pakietu
Zainstaluj bibliotekę klienta Azure Service Bus dla języka Python przy użyciu narzędzia pip:
pip install azure-servicebus
Wymagania wstępne:
Aby użyć tego pakietu, musisz mieć następujące elementy:
- Subskrypcja platformy Azure — można utworzyć bezpłatne konto
- Azure Service Bus — poświadczenia przestrzeni nazw i zarządzania
- Środowisko Python w wersji 3.7 lub nowszej — instalowanie języka Python
Jeśli potrzebujesz przestrzeni nazw usługi Azure Service Bus, możesz ją utworzyć za pośrednictwem witryny Azure Portal. Jeśli nie chcesz używać graficznego interfejsu użytkownika portalu, możesz użyć interfejsu wiersza polecenia platformy Azure za pośrednictwem Cloud Shell lub interfejsu wiersza polecenia platformy Azure uruchomić lokalnie, aby utworzyć go za pomocą tego polecenia interfejsu wiersza polecenia platformy Azure:
az servicebus namespace create --resource-group <resource-group-name> --name <servicebus-namespace-name> --location <servicebus-namespace-location>
Uwierzytelnianie klienta
Interakcja z usługą Service Bus rozpoczyna się od wystąpienia ServiceBusClient
klasy . Potrzebujesz parametry połączenia z kluczem SYGNATURy dostępu współdzielonego lub przestrzeni nazw i jednym z jego kluczy konta, aby utworzyć wystąpienie obiektu klienta.
Zapoznaj się z poniższymi przykładami, aby zapoznać się z przykładami dotyczącymi sposobu uwierzytelniania przy użyciu dowolnego podejścia.
Tworzenie klienta na podstawie parametry połączenia
- Aby uzyskać wymagane poświadczenia, możesz użyć fragmentu interfejsu wiersza polecenia platformy Azure (sformatowanego dla powłoki Bash) w górnej części połączonego przykładu, aby wypełnić zmienną środowiskową za pomocą parametry połączenia usługi Service Bus (możesz również znaleźć te wartości w witrynie Azure Portal, wykonując instrukcje krok po kroku, aby uzyskać parametry połączenia usługi Service Bus).
Utwórz klienta przy użyciu biblioteki azure-identity:
- Ten konstruktor przyjmuje w pełni kwalifikowaną przestrzeń nazw wystąpienia usługi Service Bus i poświadczenia implementujące protokół TokenCredential . Istnieją implementacje
TokenCredential
protokołu dostępnego w pakiecie azure-identity. W pełni kwalifikowana przestrzeń nazw ma format<yournamespace.servicebus.windows.net>
. - Aby użyć typów poświadczeń dostarczonych przez
azure-identity
program , zainstaluj pakiet:pip install azure-identity
- Ponadto aby korzystać z interfejsu API asynchronicznego, należy najpierw zainstalować transport asynchroniczny, taki jak
aiohttp
:pip install aiohttp
- W przypadku korzystania z usługi Azure Active Directory jednostka musi mieć przypisaną rolę, która umożliwia dostęp do usługi Service Bus, takiej jak rola właściciela danych Azure Service Bus. Aby uzyskać więcej informacji na temat używania autoryzacji usługi Azure Active Directory z usługą Service Bus, zapoznaj się z skojarzona dokumentacja.
Uwaga: klient można zainicjować bez menedżera kontekstu, ale należy go zamknąć ręcznie za pośrednictwem metody client.close(), aby nie wyciekać zasobów.
Kluczowe pojęcia
Po zainicjowaniu ServiceBusClient
klasy można wchodzić w interakcje z podstawowymi typami zasobów w przestrzeni nazw usługi Service Bus, z których wiele może istnieć i na których odbywa się rzeczywista transmisja komunikatów, przestrzeń nazw często pełni rolę kontenera aplikacji:
Kolejka: umożliwia wysyłanie i odbieranie komunikatów. Często używane do komunikacji punkt-punkt.
Temat: W przeciwieństwie do kolejek tematy lepiej nadają się do scenariuszy publikowania/subskrybowania. Temat można wysłać do, ale wymaga subskrypcji, z której może być wiele równolegle, do użycia.
Subskrypcja: mechanizm do korzystania z tematu. Każda subskrypcja jest niezależna i otrzymuje kopię każdego komunikatu wysłanego do tematu. Reguły i filtry mogą służyć do dostosowywania komunikatów odbieranych przez określoną subskrypcję.
Aby uzyskać więcej informacji na temat tych zasobów, zobacz Co to jest Azure Service Bus?.
Aby korzystać z tych zasobów, należy zapoznać się z następującymi pojęciami dotyczącymi zestawu SDK:
ServiceBusClient: jest to obiekt, który użytkownik powinien najpierw zainicjować, aby nawiązać połączenie z przestrzenią nazw usługi Service Bus. Aby wchodzić w interakcje z kolejką, tematem lub subskrypcją, jeden zduplikowałby nadawcę lub odbiorcę od tego klienta.
ServiceBusSender: aby wysyłać komunikaty do kolejki lub tematu, należy użyć odpowiedniej
get_queue_sender
metody lubget_topic_sender
wyłączonej zServiceBusClient
wystąpienia, jak pokazano tutaj.ServiceBusReceiver: aby odbierać komunikaty z kolejki lub subskrypcji, należy użyć odpowiedniej
get_queue_receiver
metody lubget_subscription_receiver
metody wyłączonej zServiceBusClient
wystąpienia, jak pokazano tutaj.ServiceBusMessage: Podczas wysyłania jest to typ, który zostanie utworzony tak, aby zawierał ładunek. Podczas odbierania jest to miejsce, w którym będziesz uzyskiwać dostęp do ładunku.
Bezpieczeństwo wątkowe
Nie gwarantujemy, że klasy ServiceBusClient, ServiceBusSender i ServiceBusReceiver są bezpieczne wątkowo. Nie zalecamy ponownego korzystania z tych wystąpień w wątkach. Do uruchomionej aplikacji należy użycie tych klas w sposób bezpieczny wątkowo.
Przykłady
Poniższe sekcje zawierają kilka fragmentów kodu obejmujących niektóre z najbardziej typowych zadań usługi Service Bus, w tym:
- Wysyłanie komunikatów do kolejki
- Odbieranie komunikatów z kolejki
- Wysyłanie i odbieranie komunikatu z kolejki z włączoną sesją
- Praca z tematami i subskrypcjami
- Ureguluj wiadomość po otrzymaniu
- Automatyczne odnawianie blokad wiadomości lub sesji
Aby wykonywać zadania zarządzania, takie jak tworzenie i usuwanie kolejek/tematów/subskrypcji, skorzystaj z biblioteki azure-mgmt-servicebus dostępnej tutaj.
Więcej przykładów można znaleźć w katalogu przykładów pokazujących typowe scenariusze usługi Service Bus, takie jak wysyłanie, odbieranie, zarządzanie sesjami i obsługa komunikatów.
Wysyłanie komunikatów do kolejki
UWAGA: zapoznaj się z dokumentacją referencyjną tutaj.
Ten przykład wysyła pojedynczy komunikat i tablicę komunikatów do kolejki, która została założona, że już istnieje, utworzona za pośrednictwem Azure Portal lub az poleceń.
from azure.servicebus import ServiceBusClient, ServiceBusMessage
import os
connstr = os.environ['SERVICE_BUS_CONNECTION_STR']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']
with ServiceBusClient.from_connection_string(connstr) as client:
with client.get_queue_sender(queue_name) as sender:
# Sending a single message
single_message = ServiceBusMessage("Single message")
sender.send_messages(single_message)
# Sending a list of messages
messages = [ServiceBusMessage("First message"), ServiceBusMessage("Second message")]
sender.send_messages(messages)
UWAGA: Komunikat może być zaplanowany na opóźnione dostarczanie przy użyciu
ServiceBusSender.schedule_messages()
metody lub przez określenie przed wywołaniemServiceBusMessage.scheduled_enqueue_time_utc
metodyServiceBusSender.send_messages()
Aby uzyskać więcej informacji na temat planowania i planowania anulowania, zobacz przykład tutaj.
Odbieranie komunikatów z kolejki
Aby odbierać z kolejki, można wykonać odbieranie ad hoc za pośrednictwem receiver.receive_messages()
lub trwale odbierać za pośrednictwem samego odbiornika.
Odbieranie komunikatów z kolejki przez iterowanie za pośrednictwem elementu ServiceBusReceiver
from azure.servicebus import ServiceBusClient
import os
connstr = os.environ['SERVICE_BUS_CONNECTION_STR']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']
with ServiceBusClient.from_connection_string(connstr) as client:
# max_wait_time specifies how long the receiver should wait with no incoming messages before stopping receipt.
# Default is None; to receive forever.
with client.get_queue_receiver(queue_name, max_wait_time=30) as receiver:
for msg in receiver: # ServiceBusReceiver instance is a generator.
print(str(msg))
# If it is desired to halt receiving early, one can break out of the loop here safely.
UWAGA: Każdy komunikat otrzymany
receive_mode=PEEK_LOCK
z (jest to wartość domyślna, z alternatywnym RECEIVE_AND_DELETE usunięcie komunikatu z kolejki natychmiast po otrzymaniu) ma blokadę, która musi zostać odnowiona za pośrednictwem,receiver.renew_message_lock
zanim wygaśnie, jeśli przetwarzanie potrwa dłużej niż czas trwania blokady. Zobacz AutoLockRenewer , aby uzyskać pomocnik, aby wykonać to automatycznie w tle. Czas trwania blokady jest ustawiany na platformie Azure w samej kolejce lub temacie.
Odbieranie komunikatów z kolejki za pośrednictwem ServiceBusReceiver.receive_messages()
UWAGA:
ServiceBusReceiver.receive_messages()
odbiera pojedynczą lub ograniczoną listę komunikatów za pośrednictwem wywołania metody ad hoc, w przeciwieństwie do ciągłego odbierania z generatora. Zawsze zwraca listę.
from azure.servicebus import ServiceBusClient
import os
connstr = os.environ['SERVICE_BUS_CONNECTION_STR']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']
with ServiceBusClient.from_connection_string(connstr) as client:
with client.get_queue_receiver(queue_name) as receiver:
received_message_array = receiver.receive_messages(max_wait_time=10) # try to receive a single message within 10 seconds
if received_message_array:
print(str(received_message_array[0]))
with client.get_queue_receiver(queue_name) as receiver:
received_message_array = receiver.receive_messages(max_message_count=5, max_wait_time=10) # try to receive maximum 5 messages in a batch within 10 seconds
for message in received_message_array:
print(str(message))
W tym przykładzie max_message_count deklaruje maksymalną liczbę komunikatów do odebrania przed naciśnięciem max_wait_time, jak określono w sekundach.
UWAGA: Należy również zauważyć, że
ServiceBusReceiver.peek_messages()
jest subtelnie inny niż odbieranie, ponieważ nie blokuje podglądu komunikatów, a tym samym nie można ich rozstrzygnąć.
Wysyłanie i odbieranie komunikatu z kolejki z włączoną sesją
UWAGA: zapoznaj się z dokumentacją referencyjną dotyczącą wysyłania i odbierania sesji.
Sesje zapewniają semantykę "pierwszy na pierwszym miejscu" i "jeden odbiornik" w górnej części kolejki lub subskrypcji. Chociaż rzeczywista składnia odbierania jest taka sama, inicjowanie różni się nieznacznie.
from azure.servicebus import ServiceBusClient, ServiceBusMessage
import os
connstr = os.environ['SERVICE_BUS_CONNECTION_STR']
queue_name = os.environ['SERVICE_BUS_SESSION_QUEUE_NAME']
session_id = os.environ['SERVICE_BUS_SESSION_ID']
with ServiceBusClient.from_connection_string(connstr) as client:
with client.get_queue_sender(queue_name) as sender:
sender.send_messages(ServiceBusMessage("Session Enabled Message", session_id=session_id))
# If session_id is null here, will receive from the first available session.
with client.get_queue_receiver(queue_name, session_id=session_id) as receiver:
for msg in receiver:
print(str(msg))
UWAGA: Komunikaty odebrane z sesji nie wymagają ich blokad odnowionych, takich jak odbiornik nieseksesyjny; zamiast tego zarządzanie blokadą odbywa się na poziomie sesji z blokadą sesji, która może zostać odnowiona za pomocą polecenia
receiver.session.renew_lock()
Praca z tematami i subskrypcjami
UWAGA: zapoznaj się z dokumentacją referencyjną tematów i subskrypcji.
Tematy i subskrypcje zapewniają alternatywę dla kolejek do wysyłania i odbierania komunikatów. Zobacz dokumenty tutaj , aby uzyskać bardziej szczegółowe informacje i jak różnią się one od kolejek.
from azure.servicebus import ServiceBusClient, ServiceBusMessage
import os
connstr = os.environ['SERVICE_BUS_CONNECTION_STR']
topic_name = os.environ['SERVICE_BUS_TOPIC_NAME']
subscription_name = os.environ['SERVICE_BUS_SUBSCRIPTION_NAME']
with ServiceBusClient.from_connection_string(connstr) as client:
with client.get_topic_sender(topic_name) as sender:
sender.send_messages(ServiceBusMessage("Data"))
# If session_id is null here, will receive from the first available session.
with client.get_subscription_receiver(topic_name, subscription_name) as receiver:
for msg in receiver:
print(str(msg))
Ureguluj wiadomość po otrzymaniu
Podczas odbierania z kolejki masz wiele akcji, które można wykonać na odbieranych komunikatach.
UWAGA: Można uregulować
ServiceBusReceivedMessage
tylko obiekty odbierane wServiceBusReceiveMode.PEEK_LOCK
trybie (jest to ustawienie domyślne).ServiceBusReceiveMode.RECEIVE_AND_DELETE
tryb usuwa komunikat z kolejki po otrzymaniu.ServiceBusReceivedMessage
komunikatów zwracanych zpeek_messages()
programu nie można rozstrzygnąć, ponieważ blokada komunikatu nie jest traktowana jak w wyżej wymienionych metodach odbierania.
Jeśli komunikat ma blokadę, jak wspomniano powyżej, rozliczenie zakończy się niepowodzeniem, jeśli blokada komunikatu wygasła.
Jeśli przetwarzanie trwa dłużej niż czas trwania blokady, musi być utrzymywane przez receiver.renew_message_lock
program przed wygaśnięciem.
Czas trwania blokady jest ustawiany na platformie Azure w samej kolejce lub temacie.
Zobacz AutoLockRenewer , aby uzyskać pomocnik, aby wykonać to automatycznie w tle.
Ukończ
Deklaruje pomyślne zakończenie przetwarzania komunikatów, usuwając komunikat z kolejki.
from azure.servicebus import ServiceBusClient
import os
connstr = os.environ['SERVICE_BUS_CONNECTION_STR']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']
with ServiceBusClient.from_connection_string(connstr) as client:
with client.get_queue_receiver(queue_name) as receiver:
for msg in receiver:
print(str(msg))
receiver.complete_message(msg)
Abandon
Porzucanie przetwarzania komunikatu w tym czasie, zwracając komunikat natychmiast z powrotem do kolejki, która ma zostać odebrana przez inny (lub ten sam) odbiornik.
from azure.servicebus import ServiceBusClient
import os
connstr = os.environ['SERVICE_BUS_CONNECTION_STR']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']
with ServiceBusClient.from_connection_string(connstr) as client:
with client.get_queue_receiver(queue_name) as receiver:
for msg in receiver:
print(str(msg))
receiver.abandon_message(msg)
DeadLetter
Przenieś komunikat z kolejki podstawowej do specjalnej "kolejki podrzędnej utraconych komunikatów", do której można uzyskać dostęp przy użyciu ServiceBusClient.get_<queue|subscription>_receiver
funkcji z parametrem sub_queue=ServiceBusSubQueue.DEAD_LETTER
i używanej z poziomu dowolnego innego odbiornika. (zobacz przykład tutaj)
from azure.servicebus import ServiceBusClient
import os
connstr = os.environ['SERVICE_BUS_CONNECTION_STR']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']
with ServiceBusClient.from_connection_string(connstr) as client:
with client.get_queue_receiver(queue_name) as receiver:
for msg in receiver:
print(str(msg))
receiver.dead_letter_message(msg)
Odroczyć
Odroczenie różni się od poprzednich metod rozliczenia. Uniemożliwia odbieranie komunikatu bezpośrednio z kolejki przez ustawienie go na bok, tak aby musiał zostać odebrany przez numer sekwencji w wywołaniu ( ServiceBusReceiver.receive_deferred_messages
zobacz przykład tutaj)
from azure.servicebus import ServiceBusClient
import os
connstr = os.environ['SERVICE_BUS_CONNECTION_STR']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']
with ServiceBusClient.from_connection_string(connstr) as client:
with client.get_queue_receiver(queue_name) as receiver:
for msg in receiver:
print(str(msg))
receiver.defer_message(msg)
Automatyczne odnawianie blokad wiadomości lub sesji
UWAGA: zobacz dokumentację referencyjną dotyczącą automatycznego odnawiania blokady.
AutoLockRenewer
jest prostą metodą zapewnienia, że komunikat lub sesja pozostaje zablokowana nawet przez długi czas, jeśli wywołanie receiver.renew_message_lock
/receiver.session.renew_lock
jest niepraktyczne lub niepożądane.
Wewnętrznie nie jest to znacznie więcej niż skrót do tworzenia współbieżnego strażnika do odnowienia blokady, jeśli obiekt zbliża się do wygaśnięcia.
Należy go użyć w następujący sposób:
- Automatyczne odnawianie blokady komunikatów
from azure.servicebus import ServiceBusClient, AutoLockRenewer
import os
connstr = os.environ['SERVICE_BUS_CONNECTION_STR']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']
# Can also be called via "with AutoLockRenewer() as renewer" to automate closing.
renewer = AutoLockRenewer()
with ServiceBusClient.from_connection_string(connstr) as client:
with client.get_queue_receiver(queue_name) as receiver:
for msg in receiver.receive_messages():
renewer.register(receiver, msg, max_lock_renewal_duration=60)
# Do your application logic here
receiver.complete_message(msg)
renewer.close()
- Automatyczne odnawianie blokady sesji
from azure.servicebus import ServiceBusClient, AutoLockRenewer
import os
connstr = os.environ['SERVICE_BUS_CONNECTION_STR']
session_queue_name = os.environ['SERVICE_BUS_SESSION_QUEUE_NAME']
session_id = os.environ['SERVICE_BUS_SESSION_ID']
# Can also be called via "with AutoLockRenewer() as renewer" to automate closing.
renewer = AutoLockRenewer()
with ServiceBusClient.from_connection_string(connstr) as client:
with client.get_queue_receiver(session_queue_name, session_id=session_id) as receiver:
renewer.register(receiver, receiver.session, max_lock_renewal_duration=300) # Duration for how long to maintain the lock for, in seconds.
for msg in receiver.receive_messages():
# Do your application logic here
receiver.complete_message(msg)
renewer.close()
Jeśli z jakiegoś powodu automatyczne odnawianie zostało przerwane lub nie powiodło się, można to zaobserwować za pośrednictwem auto_renew_error
właściwości obiektu odnawianego lub przez przekazanie wywołania zwrotnego do parametru on_lock_renew_failure
podczas inicjowania odnawiacza.
Manifestowałby się również podczas próby podjęcia akcji (na przykład ukończenia komunikatu) w określonym obiekcie.
Rozwiązywanie problemów
Rejestrowanie
- Włącz
azure.servicebus
rejestrator do zbierania śladów z biblioteki. - Włącz
uamqp
rejestrator do zbierania śladów z bazowej biblioteki uAMQP. - Włącz śledzenie na poziomie ramki protokołu AMQP przez ustawienie
logging_enable=True
podczas tworzenia klienta. - Mogą wystąpić przypadki, w których rejestrowanie
uamqp
jest zbyt pełne. Aby pominąć niepotrzebne rejestrowanie, dodaj następujący fragment kodu na początku kodu:
import logging
# The logging levels below may need to be changed based on the logging that you want to suppress.
uamqp_logger = logging.getLogger('uamqp')
uamqp_logger.setLevel(logging.ERROR)
# or even further fine-grained control, suppressing the warnings in uamqp.connection module
uamqp_connection_logger = logging.getLogger('uamqp.connection')
uamqp_connection_logger.setLevel(logging.ERROR)
Limity czasu
Istnieją różne limity czasu, o których użytkownik powinien wiedzieć w bibliotece.
- 10-minutowe zamknięcie łącza po stronie usługi: po otwarciu link zostanie zamknięty po upływie 10 minut bezczynności, aby chronić usługę przed wyciekiem zasobów. Powinno to być w dużej mierze niewidoczne dla użytkownika, ale jeśli zauważysz ponowne połączenie występujące po takim czasie trwania, jest to dlaczego. Wykonywanie wszystkich operacji, w tym operacji zarządzania, na linku spowoduje wydłużenie tego limitu czasu.
- max_wait_time: podane podczas tworzenia odbiornika lub podczas wywoływania
receive_messages()
metody , czas, po którym odbieranie komunikatów zostanie zatrzymane po braku ruchu. Dotyczy to zarówno funkcji imperatywnejreceive_messages()
, jak i długości, dla których zostanie uruchomiona funkcja odbierania w stylu generatora przed zakończeniem, jeśli nie ma komunikatów. Przekazywanie brak (wartość domyślna) będzie czekać na zawsze, aż do progu 10 minut, jeśli nie zostanie podjęta żadna inna akcja.
UWAGA: Jeśli przetwarzanie komunikatu lub sesji jest wystarczająco długie, aby spowodować przekroczenie limitu czasu, alternatywą dla wywołania
receiver.renew_message_lock
/receiver.session.renew_lock
ręcznego, można skorzystać z funkcji opisanejAutoLockRenewer
w artykule powyżej.
Typowe wyjątki
Interfejsy API usługi Service Bus generują następujące wyjątki w pliku azure.servicebus.exceptions:
- ServiceBusConnectionError: Wystąpił błąd w połączeniu z usługą. Może to być spowodowane przejściowym problemem z siecią lub problemem z usługą. Zaleca się ponowienie próby.
- ServiceBusAuthorizationError: Wystąpił błąd podczas autoryzowania połączenia z usługą. Może to być spowodowane tym, że poświadczenia nie mają odpowiednich uprawnień do wykonania operacji. Zaleca się sprawdzenie uprawnień poświadczeń.
- ServiceBusAuthenticationError: Wystąpił błąd podczas uwierzytelniania połączenia z usługą. Mogło to być spowodowane nieprawidłowymi poświadczeniami. Zaleca się sprawdzenie poświadczeń.
- OperationTimeoutError: Oznacza to, że usługa nie odpowiedziała na operację w oczekiwanym czasie. Może to być spowodowane przejściowym problemem z siecią lub problemem z usługą. Usługa może lub nie została pomyślnie ukończona żądania; stan nie jest znany. Zaleca się podjęcie próby zweryfikowania bieżącego stanu i ponowienia próby w razie potrzeby.
- MessageSizeExceededError: Oznacza to, że zawartość komunikatu jest większa niż rozmiar ramki magistrali usług.
Może się tak zdarzyć, gdy zbyt wiele komunikatów usługi Service Bus jest wysyłanych w partii lub zawartość przekazywana do treści
Message
obiektu jest zbyt duża. Zaleca się zmniejszenie liczby komunikatów wysyłanych w partii lub rozmiaru zawartości przekazywanej do pojedynczegoServiceBusMessage
elementu . - MessageAlreadySettled: Oznacza to, że nie można rozstrzygnąć komunikatu. Może się to zdarzyć podczas próby rozwiązania już rozstrzygniętej wiadomości.
- MessageLockLostError: Blokada komunikatu wygasła i została zwolniona z powrotem do kolejki.
Będzie musiał zostać ponownie odebrany, aby go rozstrzygnąć.
Należy pamiętać o czasie trwania blokady komunikatu i odnawianiu blokady przed wygaśnięciem w przypadku długiego czasu przetwarzania.
AutoLockRenewer
może pomóc w automatycznym utrzymywaniu blokady komunikatu. - SessionLockLostError: Blokada sesji wygasła.
Nie można już rozliczyć wszystkich niezatłoczonych komunikatów, które zostały odebrane.
Zaleca się ponowne nawiązanie połączenia z sesją w razie potrzeby ponownego odbierania komunikatów.
Należy pamiętać o czasie trwania blokady sesji i odnawianiu blokady przed wygaśnięciem w przypadku długiego czasu przetwarzania.
AutoLockRenewer
może pomóc w automatycznym utrzymywaniu blokady sesji. - MessageNotFoundError: Spróbuj odebrać komunikat z określonym numerem sekwencji. Nie można odnaleźć tej wiadomości. Upewnij się, że wiadomość nie została już odebrana. Sprawdź kolejkę deadletter, aby sprawdzić, czy komunikat został utracony.
- MessagingEntityNotFoundError: Jednostka skojarzona z operacją nie istnieje lub została usunięta. Upewnij się, że jednostka istnieje.
- MessagingEntityDisabledError: Żądanie wykonania operacji na wyłączonej jednostce. Aktywuj jednostkę.
- ServiceBusQuotaExceededError: Jednostka obsługi komunikatów osiągnęła maksymalny dozwolony rozmiar lub przekroczono maksymalną liczbę połączeń z przestrzenią nazw. Utwórz miejsce w jednostce, odbierając komunikaty z jednostki lub jej podzapytania.
- ServiceBusServerBusyError: Usługa nie może obecnie przetworzyć żądania. Klient może czekać przez pewien czas, a następnie ponowić próbę wykonania operacji.
- ServiceBusCommunicationError: Klient nie może nawiązać połączenia z usługą Service Bus. Upewnij się, że podana nazwa hosta jest poprawna, a host jest osiągalny. Jeśli kod działa w środowisku z zaporą/serwerem proxy, upewnij się, że ruch do domeny/adresu IP usługi Service Bus i portów nie jest blokowany.
- SessionCannotBeLockedError: Spróbuj nawiązać połączenie z sesją z określonym identyfikatorem sesji, ale sesja jest obecnie zablokowana przez innego klienta. Upewnij się, że sesja jest odblokowana przez innych klientów.
- AutoLockRenewFailed: Próba odnowienia blokady komunikatu lub sesji w tle nie powiodła się.
Może się tak zdarzyć, gdy odbiornik używany przez
AutoLockRenewer
program jest zamknięty lub blokada odnawialnych źródeł energii wygasła. Zaleca się ponowne zarejestrowanie komunikatu lub sesji odnawialnej przez ponowne odebranie komunikatu lub ponowne nawiązanie połączenia z jednostką sesji. - AutoLockRenewTimeout: Czas przydzielony do odnowienia komunikatu lub blokady sesji upłynął. Możesz ponownie zarejestrować obiekt, który ma być odnawiany automatycznie lub wydłużyć limit czasu z wyprzedzeniem.
- ServiceBusError: Wszystkie inne błędy związane z usługą Service Bus. Jest to główna klasa błędów wszystkich błędów opisanych powyżej.
Zapoznaj się z dokumentacją dotyczącą wyjątków , aby uzyskać szczegółowe opisy naszych typowych typów wyjątków.
Następne kroki
Więcej przykładów kodu
Więcej przykładów można znaleźć w katalogu przykładów pokazujących typowe scenariusze usługi Service Bus, takie jak wysyłanie, odbieranie, zarządzanie sesjami i obsługa komunikatów.
Dodatkowa dokumentacja
Aby uzyskać bardziej obszerną dokumentację dotyczącą usługi Service Bus, zobacz dokumentację usługi Service Bus dotyczącą docs.microsoft.com.
Możliwości zarządzania i dokumentacja
W przypadku użytkowników, którzy chcą wykonywać operacje zarządzania względem usługi ServiceBus (tworzenie kolejki/tematu itp., zmienianie reguł filtrowania, wyliczanie jednostek) można znaleźć w dokumentacji interfejsu API azure-mgmt-servicebus . Przykłady użycia można znaleźć również tutaj .
Czysta obsługa transportu AMQP protokołu AMQP i zgodności z poprzednimi wersjami języka Python
Biblioteka klienta Azure Service Bus jest teraz oparta na czystej implementacji protokołu AMQP języka Python. uAMQP
została usunięta jako wymagana zależność.
Aby użyć uAMQP
jako transportu podstawowego:
- Zainstaluj za pomocą narzędzia
uamqp
pip.
$ pip install uamqp
- Przekazywanie
uamqp_transport=True
podczas budowy klienta.
from azure.servicebus import ServiceBusClient
connection_str = '<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>'
queue_name = '<< NAME OF THE QUEUE >>'
client = ServiceBusClient.from_connection_string(
connection_str, uamqp_transport=True
)
Uwaga: message
atrybut naServiceBusMessageBatch
//ServiceBusMessage
ServiceBusReceivedMessage
, który wcześniej uwidocznił uamqp.Message
element , jest przestarzały.
Wprowadzono obiekty "Starsze" zwracane przez message
atrybut, aby ułatwić przejście.
Tworzenie koła uAMQP ze źródła
azure-servicebus
zależy od protokołu uAMQP dla implementacji protokołu AMQP.
Koła uAMQP są dostępne dla większości głównych systemów operacyjnych i będą instalowane automatycznie podczas instalowania programu azure-servicebus
.
Jeśli protokół uAMQP ma być używany jako podstawowa implementacja protokołu AMQP dla azure-servicebus
systemu , można znaleźć koła uAMQP dla większości głównych systemów operacyjnych.
Jeśli korzystasz z platformy, dla której nie podano kół uAMQP, postępuj zgodnie z instrukcjami w sekcji Jeśli zamierzasz używać uAMQP
i korzystasz z platformy, dla której nie podano kół uAMQP, postępuj zgodnie ze wskazówkami dotyczącymi instalacji uAMQP , aby zainstalować ze źródła.
Współtworzenie
W tym projekcie zachęcamy do współtworzenia i zgłaszania sugestii. Współtworzenie w większości przypadków wymaga zgody na umowę licencyjną dotyczącą współautorów (CLA, Contributor License Agreement), zgodnie z którą współautor ma prawo udzielić i faktycznie udziela nam praw do używania wytworzonej przez siebie zawartości. Aby uzyskać szczegółowe informacje, odwiedź stronę https://cla.microsoft.com.
Po przesłaniu żądania ściągnięcia robot CLA automatycznie określi, czy musisz przekazać umowę CLA, i doda odpowiednie informacje do tego żądania (na przykład etykietę czy komentarz). Po prostu postępuj zgodnie z instrukcjami robota. Wystarczy zrobić to raz dla wszystkich repozytoriów, w przypadku których jest używana nasza umowa CLA.
W tym projekcie przyjęto Kodeks postępowania oprogramowania Open Source firmy Microsoft. Aby uzyskać więcej informacji, zobacz Często zadawane pytania dotyczące kodeksu postępowania lub skontaktuj się z opencode@microsoft.com dodatkowymi pytaniami lub komentarzami.
Azure SDK for Python