Udostępnij za pośrednictwem


biblioteka klienta Azure Event Hubs dla języka Python — wersja 5.11.5

Azure Event Hubs to wysoce skalowalna usługa publikowania-subskrybowania, która może pozyskiwać miliony zdarzeń na sekundę i przesyłać je strumieniowo do wielu użytkowników. Umożliwia to przetwarzanie i analizowanie ogromnych ilości danych generowanych przez połączone urządzenia i aplikacje. Po zebraniu danych przez usługę Event Hubs można je pobrać, przekształcić i przechowywać za pomocą dowolnego dostawcy analizy w czasie rzeczywistym lub adapterów przetwarzania wsadowego/magazynu. Jeśli chcesz dowiedzieć się więcej o Azure Event Hubs, możesz przejrzeć artykuł: Co to jest usługa Event Hubs?

Biblioteka klienta usługi Azure Event Hubs umożliwia publikowanie oraz korzystanie ze zdarzeń usługi Azure Event Hubs i może być używana w następujących celach:

  • Emitowanie danych telemetrycznych dotyczących aplikacji na potrzeby analizy biznesowej i diagnostyki.
  • Publikowanie faktów dotyczących stanu aplikacji, które zainteresowane strony mogą obserwować i wykorzystywać jako wyzwalacza do podjęcia działań.
  • Obserwacja interesujących operacji i interakcji zachodzących w Twojej firmie lub innym ekosystemie, umożliwiając luźno powiązanym systemom interakcję bez konieczności ich powiązania.
  • Odbieranie zdarzeń od jednego lub większej liczby wydawców, przekształcanie ich w celu lepszego spełniania potrzeb Twojego ekosystemu, a następnie publikowanie przekształconych zdarzeń w nowym strumieniu, aby odbiorcy mogli je obserwować.

Kod | źródłowyPakiet (PyPi) | Pakiet (Conda) | Dokumentacja referencyjna interfejsu | APIDokumentacja | produktuPróbki

Wprowadzenie

Wymagania wstępne

  • Środowisko Python w wersji 3.7 lub nowszej.

  • Subskrypcja platformy Microsoft Azure: Do korzystania z usług platformy Azure, w tym Azure Event Hubs, potrzebna jest subskrypcja. Jeśli nie masz istniejącego konta platformy Azure, możesz utworzyć konto bezpłatnej wersji próbnej lub skorzystać z korzyści dla subskrybentów MSDN podczas tworzenia konta.

  • Przestrzeń nazw usługi Event Hubs z centrum zdarzeń: Aby korzystać z Azure Event Hubs, musisz również mieć dostępną przestrzeń nazw i centrum zdarzeń. Jeśli nie znasz tworzenia zasobów platformy Azure, możesz skorzystać z przewodnika krok po kroku dotyczącego tworzenia centrum zdarzeń przy użyciu Azure Portal. W tym miejscu można również znaleźć szczegółowe instrukcje dotyczące tworzenia centrum zdarzeń przy użyciu interfejsu wiersza polecenia platformy Azure, Azure PowerShell lub szablonów usługi Azure Resource Manager (ARM).

Instalowanie pakietu

Zainstaluj bibliotekę klienta Azure Event Hubs dla języka Python przy użyciu narzędzia pip:

$ pip install azure-eventhub

Uwierzytelnianie klienta

Interakcja z usługą Event Hubs rozpoczyna się od wystąpienia klasy EventHubConsumerClient lub EventHubProducerClient. Potrzebujesz nazwy hosta, poświadczeń SAS/AAD i nazwy centrum zdarzeń lub parametry połączenia, aby utworzyć wystąpienie obiektu klienta.

Utwórz klienta na podstawie parametry połączenia:

W przypadku biblioteki klienta usługi Event Hubs w celu interakcji z centrum zdarzeń najprostszym sposobem jest użycie parametry połączenia, który jest tworzony automatycznie podczas tworzenia przestrzeni nazw usługi Event Hubs. Jeśli nie znasz zasad dostępu współdzielonego na platformie Azure, możesz skorzystać z przewodnika krok po kroku, aby uzyskać parametry połączenia usługi Event Hubs.

  • Metoda from_connection_string przyjmuje parametry połączenia formularza Endpoint=sb://<yournamespace>.servicebus.windows.net/;SharedAccessKeyName=<yoursharedaccesskeyname>;SharedAccessKey=<yoursharedaccesskey> i nazwy jednostki do wystąpienia centrum zdarzeń. Możesz pobrać parametry połączenia z Azure Portal.

Utwórz klienta przy użyciu biblioteki azure-identity:

Alternatywnie można użyć obiektu Credential do uwierzytelniania za pośrednictwem usługi AAD za pomocą pakietu azure-identity.

  • Ten konstruktor przedstawiony w przykładzie połączonym powyżej przyjmuje nazwę hosta i nazwę jednostki wystąpienia centrum zdarzeń oraz poświadczenia, które implementują protokół TokenCredential . Istnieją implementacje TokenCredential protokołu dostępnego w pakiecie azure-identity. Nazwa hosta ma format <yournamespace.servicebus.windows.net>.
  • Aby użyć typów poświadczeń dostarczonych przez azure-identityprogram , zainstaluj pakiet: pip install azure-identity
  • Ponadto aby korzystać z interfejsu API asynchronicznego, należy najpierw zainstalować transport asynchroniczny, taki jak aiohttp: pip install aiohttp
  • W przypadku korzystania z usługi Azure Active Directory podmiot zabezpieczeń musi mieć przypisaną rolę, która umożliwia dostęp do usługi Event Hubs, takiej jak rola właściciela danych Azure Event Hubs. Aby uzyskać więcej informacji na temat używania autoryzacji usługi Azure Active Directory z usługą Event Hubs, zapoznaj się ze skojarzona dokumentacją.

Kluczowe pojęcia

  • EventHubProducerClient jest źródłem danych telemetrycznych, informacji diagnostycznych, dzienników użycia lub innych danych dziennika, w ramach rozwiązania osadzonego urządzenia, aplikacji urządzenia przenośnego, tytułu gry działającego w konsoli lub innego urządzenia, niektórych rozwiązań biznesowych opartych na kliencie lub serwerze albo witryny internetowej.

  • Obiekt EventHubConsumerClient pobiera takie informacje z centrum zdarzeń i przetwarza je. Przetwarzanie może obejmować agregację, złożone obliczenia i filtrowanie. Przetwarzanie może również obejmować dystrybucję lub przechowywanie informacji w sposób pierwotny lub przekształcony. Odbiorcy usługi Event Hub są często niezawodnymi i skalowalnymi składnikami infrastruktury platformy z wbudowanymi funkcjami analitycznymi, takimi jak Azure Stream Analytics, Apache Spark lub Apache Storm.

  • Partycja to uporządkowana sekwencja zdarzeń przechowywanych w centrum zdarzeń. Azure Event Hubs zapewnia przesyłanie strumieniowe komunikatów za pośrednictwem partycjonowanego wzorca konsumenta, w którym każdy odbiorca odczytuje tylko określony podzbiór lub partycję strumienia komunikatów. Po nadejściu nowszych zdarzeń są one dodawane na końcu sekwencji. Liczba partycji jest określana w momencie utworzenia centrum zdarzeń i nie można jej zmienić.

  • Grupa odbiorców jest widokiem całego centrum zdarzeń. Grupy odbiorców umożliwiają wielu aplikacjom korzystającym z każdego z nich oddzielny widok strumienia zdarzeń oraz odczytywanie strumienia niezależnie we własnym tempie i na własną rękę. W partycji na grupę odbiorców może znajdować się co najwyżej 5 jednoczesnych czytników; jednak zaleca się, aby dla danej partycji i grupy odbiorców istniał tylko jeden aktywny odbiorca. Każdy aktywny czytnik odbiera wszystkie zdarzenia z partycji; Jeśli na tej samej partycji znajduje się wiele czytników, otrzymają one zduplikowane zdarzenia.

Aby uzyskać więcej pojęć i głębszą dyskusję, zobacz: Event Hubs Features (Funkcje usługi Event Hubs). Ponadto pojęcia dotyczące protokołu AMQP są dobrze udokumentowane w usłudze OASIS Advanced Messaging Queuing Protocol (AMQP) w wersji 1.0.

Bezpieczeństwo wątkowe

Nie gwarantujemy, że klasa EventHubProducerClient lub EventHubConsumerClient jest bezpieczna wątkowo. Nie zalecamy ponownego korzystania z tych wystąpień w wątkach. Do uruchomionej aplikacji należy użycie tych klas w sposób bezpieczny wątkowo.

Typ EventDataBatch modelu danych nie jest bezpieczny wątkowo. Nie należy go udostępniać między wątkami ani używać jednocześnie z metodami klienta.

Przykłady

Poniższe sekcje zawierają kilka fragmentów kodu obejmujących niektóre z najbardziej typowych zadań usługi Event Hubs, w tym:

Inspekcja centrum zdarzeń

Pobierz identyfikatory partycji centrum zdarzeń.

import os
from azure.eventhub import EventHubConsumerClient

CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"]
EVENTHUB_NAME = os.environ['EVENT_HUB_NAME']

consumer_client = EventHubConsumerClient.from_connection_string(
    conn_str=CONNECTION_STR,
    consumer_group='$Default',
    eventhub_name=EVENTHUB_NAME,
)

with consumer_client:
    pass # consumer_client is now ready to be used.

Publikowanie zdarzeń w centrum zdarzeń

create_batch Użyj metody w metodzie , EventHubProducerClient aby utworzyć EventDataBatch obiekt, który następnie można wysłać przy użyciu send_batch metody . Zdarzenia mogą być dodawane do EventDataBatch metody using do add momentu osiągnięcia maksymalnego limitu rozmiaru partii w bajtach.

def send_event_data_batch(producer):
    # Without specifying partition_id or partition_key
    # the events will be distributed to available partitions via round-robin.
    event_data_batch = producer.create_batch()
    event_data_batch.add(EventData('Single message'))
    producer.send_batch(event_data_batch)

Korzystanie ze zdarzeń z centrum zdarzeń

Istnieje wiele sposobów korzystania ze zdarzeń z usługi EventHub. Aby po prostu wyzwolić wywołanie zwrotne po odebraniu zdarzenia, EventHubConsumerClient.receive metoda będzie używana w następujący sposób:

import logging
from azure.eventhub import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

def on_event(partition_context, event):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    partition_context.update_checkpoint(event)

with client:
    client.receive(
        on_event=on_event,
        starting_position="-1",  # "-1" is from the beginning of the partition.
    )
    # receive events from specified partition:
    # client.receive(on_event=on_event, partition_id='0')

Używanie zdarzeń z centrum zdarzeń w partiach

Podczas gdy powyższy przykład wyzwala wywołanie zwrotne dla każdego komunikatu podczas odbierania, poniższy przykład wyzwala wywołanie zwrotne w partii zdarzeń, próbując jednocześnie odebrać liczbę.

import logging
from azure.eventhub import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

def on_event_batch(partition_context, events):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    partition_context.update_checkpoint()

with client:
    client.receive_batch(
        on_event_batch=on_event_batch,
        starting_position="-1",  # "-1" is from the beginning of the partition.
    )
    # receive events from specified partition:
    # client.receive_batch(on_event_batch=on_event_batch, partition_id='0')

Asynchroniczne publikowanie zdarzeń w centrum zdarzeń

create_batch Użyj metody w metodzie , EventHubProducer aby utworzyć EventDataBatch obiekt, który następnie można wysłać przy użyciu send_batch metody . Zdarzenia mogą być dodawane do EventDataBatch metody using do add momentu osiągnięcia maksymalnego limitu rozmiaru partii w bajtach.

import asyncio
from azure.eventhub.aio import EventHubProducerClient  # The package name suffixed with ".aio" for async
from azure.eventhub import EventData

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

async def create_batch(client):
    event_data_batch = await client.create_batch()
    can_add = True
    while can_add:
        try:
            event_data_batch.add(EventData('Message inside EventBatchData'))
        except ValueError:
            can_add = False  # EventDataBatch object reaches max_size.
    return event_data_batch

async def send():
    client = EventHubProducerClient.from_connection_string(connection_str, eventhub_name=eventhub_name)
    batch_data = await create_batch(client)
    async with client:
        await client.send_batch(batch_data)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(send())

Asynchronicznie zużywaj zdarzenia z centrum zdarzeń

Ten zestaw SDK obsługuje zarówno kod synchroniczny, jak i asynchroniczny. Aby otrzymać dane przedstawione w powyższych przykładach, ale w obrębie aio, potrzebne byłyby następujące elementy:

import logging
import asyncio
from azure.eventhub.aio import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

async def on_event(partition_context, event):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    await partition_context.update_checkpoint(event)

async def receive():
    client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)
    async with client:
        await client.receive(
            on_event=on_event,
            starting_position="-1",  # "-1" is from the beginning of the partition.
        )
        # receive events from specified partition:
        # await client.receive(on_event=on_event, partition_id='0')

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(receive())

Asynchronicznie zużywaj zdarzenia z centrum zdarzeń w partiach

Wszystkie funkcje synchroniczne są również obsługiwane w aio. Jak pokazano powyżej w przypadku potwierdzenia partii synchronicznej, można wykonać to samo w asyncio w następujący sposób:

import logging
import asyncio
from azure.eventhub.aio import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

async def on_event_batch(partition_context, events):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    await partition_context.update_checkpoint()

async def receive_batch():
    client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)
    async with client:
        await client.receive_batch(
            on_event_batch=on_event_batch,
            starting_position="-1",  # "-1" is from the beginning of the partition.
        )
        # receive events from specified partition:
        # await client.receive_batch(on_event_batch=on_event_batch, partition_id='0')

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(receive_batch())

Korzystanie ze zdarzeń i zapisywanie punktów kontrolnych przy użyciu magazynu punktów kontrolnych

EventHubConsumerClient to konstrukcja wysokiego poziomu, która umożliwia odbieranie zdarzeń z wielu partycji jednocześnie i równoważenie obciążenia z innymi użytkownikami przy użyciu tego samego centrum zdarzeń i grupy odbiorców.

Dzięki temu użytkownik może również śledzić postęp przetwarzania zdarzeń przy użyciu punktów kontrolnych.

Punkt kontrolny jest przeznaczony do reprezentowania ostatniego pomyślnie przetworzonego zdarzenia przez użytkownika z określonej partycji grupy odbiorców w wystąpieniu centrum zdarzeń. Program EventHubConsumerClient używa wystąpienia CheckpointStore klasy do aktualizowania punktów kontrolnych i przechowywania odpowiednich informacji wymaganych przez algorytm równoważenia obciążenia.

Wyszukaj plik pypi z prefiksem azure-eventhub-checkpointstore , aby znaleźć pakiety, które obsługują tę usługę, i użyj CheckpointStore implementacji z jednego takiego pakietu. Należy pamiętać, że dostępne są zarówno biblioteki synchronizacji, jak i async.

W poniższym przykładzie utworzymy wystąpienie EventHubConsumerClient klasy i użyjemy klasy BlobCheckpointStore. Aby uruchomić kod, musisz utworzyć konto usługi Azure Storage i kontener obiektów blob .

Azure Blob Storage programu Checkpoint Store Async i Azure Blob Storage Checkpoint Store Sync są jedną z CheckpointStore implementacji, które udostępniamy, które mają zastosowanie Azure Blob Storage jako magazyn trwały.

import asyncio

from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
storage_connection_str = '<< CONNECTION STRING FOR THE STORAGE >>'
container_name = '<<NAME OF THE BLOB CONTAINER>>'

async def on_event(partition_context, event):
    # do something
    await partition_context.update_checkpoint(event)  # Or update_checkpoint every N events for better performance.

async def receive(client):
    await client.receive(
        on_event=on_event,
        starting_position="-1",  # "-1" is from the beginning of the partition.
    )

async def main():
    checkpoint_store = BlobCheckpointStore.from_connection_string(storage_connection_str, container_name)
    client = EventHubConsumerClient.from_connection_string(
        connection_str,
        consumer_group,
        eventhub_name=eventhub_name,
        checkpoint_store=checkpoint_store,  # For load balancing and checkpoint. Leave None for no load balancing
    )
    async with client:
        await receive(client)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Używanie elementu EventHubConsumerClient do pracy z IoT Hub

Możesz również użyć EventHubConsumerClient polecenia , aby pracować z IoT Hub. Jest to przydatne w przypadku odbierania danych telemetrycznych IoT Hub z połączonej usługi EventHub. Skojarzone parametry połączenia nie będą miały oświadczeń wysyłania, dlatego wysyłanie zdarzeń nie jest możliwe.

Zwróć uwagę, że parametry połączenia musi być punktem końcowym zgodnym z centrum zdarzeń, np. "Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/; SharedAccessKeyName=my-SA-name; SharedAccessKey=my-SA-key; EntityPath=my-iot-hub-name"

Istnieją dwa sposoby uzyskiwania punktu końcowego zgodnego z usługą Event Hubs:

  • Ręcznie pobierz "wbudowane punkty końcowe" IoT Hub w witrynie Azure Portal i odbierz je od niego.
from azure.eventhub import EventHubConsumerClient

connection_str = 'Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-iot-hub-name'
consumer_group = '<< CONSUMER GROUP >>'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group)

partition_ids = client.get_partition_ids()

Rozwiązywanie problemów

azure-eventhubs Zobacz przewodnik rozwiązywania problemów, aby uzyskać szczegółowe informacje na temat diagnozowania różnych scenariuszy awarii.

Następne kroki

Więcej przykładów kodu

Zapoznaj się z katalogiem samples , aby uzyskać szczegółowe przykłady użycia tej biblioteki do wysyłania i odbierania zdarzeń do/z usługi Event Hubs.

Dokumentacja

Dokumentacja referencyjna jest dostępna tutaj.

Rejestr schematów i koder Avro

Zestaw SDK usługi EventHubs dobrze integruje się z usługą Rejestru schematów i avro. Aby uzyskać więcej informacji, zobacz Zestaw SDK rejestru schematów i zestaw SDK rejestru schematów Avro Encoder.

Czysta obsługa transportu AMQP protokołu AMQP i zgodności z poprzednimi wersjami języka Python

Biblioteka klienta Azure Event Hubs jest teraz oparta na czystej implementacji protokołu AMQP języka Python. uAMQP została usunięta jako wymagana zależność.

Aby użyć uAMQP jako transportu podstawowego:

  1. Zainstaluj za pomocą narzędzia uamqp pip.
$ pip install uamqp 
  1. Przekazywanie uamqp_transport=True podczas budowy klienta.
from azure.eventhub import EventHubProducerClient, EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

client = EventHubProducerClient.from_connection_string(
    connection_str, eventhub_name=eventhub_name, uamqp_transport=True
)
client = EventHubConsumerClient.from_connection_string(
    connection_str, consumer_group, eventhub_name=eventhub_name, uamqp_transport=True
)

Uwaga: message atrybut naEventDataBatchEventData/ , który wcześniej uwidocznił uamqp.Messageelement , jest przestarzały. Wprowadzono obiekty "Starsze", które zostały zwrócone przez EventData.message/EventDataBatch.message program, aby ułatwić przejście.

Tworzenie koła uAMQP ze źródła

Jeśli protokół uAMQP ma być używany jako podstawowa implementacja protokołu AMQP dla azure-eventhubsystemu , można znaleźć koła uAMQP dla większości głównych systemów operacyjnych.

Jeśli zamierzasz korzystać uAMQP z platformy, dla której nie podano kół uAMQP, postępuj zgodnie ze wskazówkami dotyczącymi instalacji uAMQP , aby zainstalować ze źródła.

Przekazywanie opinii

Jeśli napotkasz jakiekolwiek usterki lub masz sugestie, zgłoś problem w sekcji Problemy w projekcie.

Współtworzenie

W tym projekcie zachęcamy do współtworzenia i zgłaszania sugestii. Współtworzenie w większości przypadków wymaga zgody na umowę licencyjną dotyczącą współautorów (CLA, Contributor License Agreement), zgodnie z którą współautor ma prawo udzielić i faktycznie udziela nam praw do używania wytworzonej przez siebie zawartości. Aby uzyskać szczegółowe informacje, odwiedź stronę https://cla.microsoft.com.

Po przesłaniu żądania ściągnięcia robot CLA automatycznie określi, czy musisz przekazać umowę CLA, i doda odpowiednie informacje do tego żądania (na przykład etykietę czy komentarz). Po prostu postępuj zgodnie z instrukcjami robota. Wystarczy zrobić to raz dla wszystkich repozytoriów, w przypadku których jest używana nasza umowa CLA.

W tym projekcie przyjęto Kodeks postępowania oprogramowania Open Source firmy Microsoft. Aby uzyskać więcej informacji, zobacz Często zadawane pytania dotyczące kodeksu postępowania lub skontaktuj się z opencode@microsoft.com dodatkowymi pytaniami lub komentarzami.

Wrażenia