Wysyłanie zdarzeń do centrów zdarzeń lub odbieranie z nich zdarzeń za pomocą języka Python

W tym przewodniku Szybki start pokazano, jak wysyłać zdarzenia do centrum zdarzeń i odbierać je z centrum zdarzeń przy użyciu pakietu języka Python azure-eventhub .

Wymagania wstępne

Jeśli dopiero zaczynasz korzystać z usługi Azure Event Hubs, zobacz Omówienie usługi Event Hubs przed wykonaniem tego przewodnika Szybki start.

Do wykonania kroków tego przewodnika Szybki start niezbędne jest spełnienie następujących wymagań wstępnych:

  • Subskrypcja platformy Microsoft Azure. Do korzystania z usług platformy Azure, w tym usługi Azure Event Hubs, potrzebna jest subskrypcja. Jeśli nie masz istniejącego konta platformy Azure, utwórz konto bezpłatnej wersji próbnej.
  • Środowisko Python w wersji 3.8 lub nowszej z zainstalowanym i zaktualizowanym programem pip.
  • Visual Studio Code (zalecane) lub inne zintegrowane środowisko projektowe (IDE).
  • Utwórz przestrzeń nazw usługi Event Hubs i centrum zdarzeń. Pierwszym krokiem jest utworzenie przestrzeni nazw usługi Event Hubs przy użyciu witryny Azure Portal i uzyskanie poświadczeń zarządzania, które aplikacja musi komunikować się z centrum zdarzeń. Aby utworzyć przestrzeń nazw i centrum zdarzeń, wykonaj procedurę opisaną w tym artykule.

Instalowanie pakietów w celu wysyłania zdarzeń

Aby zainstalować pakiety języka Python dla usługi Event Hubs, otwórz wiersz polecenia zawierający język Python w swojej ścieżce. Zmień katalog na folder, w którym chcesz zachować przykłady.

pip install azure-eventhub
pip install azure-identity
pip install aiohttp

Uwierzytelnianie aplikacji na platformie Azure

W tym przewodniku Szybki start przedstawiono dwa sposoby nawiązywania połączenia z usługą Azure Event Hubs: bez hasła i parametry połączenia. Pierwsza opcja pokazuje, jak używać podmiotu zabezpieczeń w usłudze Microsoft Entra ID i kontroli dostępu opartej na rolach (RBAC) w celu nawiązania połączenia z przestrzenią nazw usługi Event Hubs. Nie musisz martwić się o zakodowane parametry połączenia w kodzie lub w pliku konfiguracji lub w bezpiecznym magazynie, na przykład w usłudze Azure Key Vault. Druga opcja pokazuje, jak używać parametry połączenia do nawiązywania połączenia z przestrzenią nazw usługi Event Hubs. Jeśli dopiero zaczynasz korzystać z platformy Azure, możesz znaleźć opcję parametry połączenia łatwiejszą do naśladowania. Zalecamy użycie opcji bez hasła w rzeczywistych aplikacjach i środowiskach produkcyjnych. Aby uzyskać więcej informacji, zobacz Uwierzytelnianie i autoryzacja. Więcej informacji na temat uwierzytelniania bez hasła można również uzyskać na stronie przeglądu.

Przypisywanie ról do użytkownika firmy Microsoft Entra

Podczas tworzenia aplikacji lokalnie upewnij się, że konto użytkownika, które nawiązuje połączenie z usługą Azure Event Hubs, ma odpowiednie uprawnienia. Aby wysyłać i odbierać komunikaty, musisz mieć rolę Właściciela danych usługi Azure Event Hubs. Aby przypisać sobie tę rolę, musisz mieć rolę Administracja istratora dostępu użytkowników lub inną rolę obejmującą Microsoft.Authorization/roleAssignments/write akcję. Role RBAC platformy Azure można przypisać użytkownikowi przy użyciu witryny Azure Portal, interfejsu wiersza polecenia platformy Azure lub programu Azure PowerShell. Dowiedz się więcej o dostępnych zakresach przypisań ról na stronie przeglądu zakresu.

Poniższy przykład przypisuje Azure Event Hubs Data Owner rolę do konta użytkownika, co zapewnia pełny dostęp do zasobów usługi Azure Event Hubs. W rzeczywistym scenariuszu postępuj zgodnie z zasadą najniższych uprawnień , aby dać użytkownikom tylko minimalne uprawnienia wymagane do bezpieczniejszego środowiska produkcyjnego.

Wbudowane role platformy Azure dla usługi Azure Event Hubs

W przypadku usługi Azure Event Hubs zarządzanie przestrzeniami nazw i wszystkimi powiązanymi zasobami za pośrednictwem witryny Azure Portal i interfejsu API zarządzania zasobami platformy Azure jest już chronione przy użyciu modelu RBAC platformy Azure. Platforma Azure udostępnia poniższe wbudowane role platformy Azure umożliwiające autoryzowanie dostępu do przestrzeni nazw usługi Event Hubs:

  • Właściciel danych usługi Azure Event Hubs: umożliwia dostęp danych do przestrzeni nazw usługi Event Hubs i jej jednostek (kolejek, tematów, subskrypcji i filtrów)
  • Nadawca danych usługi Azure Event Hubs: ta rola umożliwia nadawcy dostęp do przestrzeni nazw usługi Event Hubs i jej jednostek.
  • Odbiornik danych usługi Azure Event Hubs: ta rola umożliwia odbiorcy dostęp do przestrzeni nazw usługi Event Hubs i jej jednostek.

Jeśli chcesz utworzyć rolę niestandardową, zobacz Prawa wymagane dla operacji usługi Event Hubs.

Ważne

W większości przypadków propagacja przypisania roli na platformie Azure potrwa minutę lub dwie. W rzadkich przypadkach może upłynąć do ośmiu minut. Jeśli podczas pierwszego uruchomienia kodu wystąpią błędy uwierzytelniania, zaczekaj chwilę i spróbuj ponownie.

  1. W witrynie Azure Portal znajdź przestrzeń nazw usługi Event Hubs przy użyciu głównego paska wyszukiwania lub nawigacji po lewej stronie.

  2. Na stronie przeglądu wybierz pozycję Kontrola dostępu (Zarządzanie dostępem i tożsamościami) z menu po lewej stronie.

  3. Na stronie Kontrola dostępu (Zarządzanie dostępem i tożsamościami) wybierz kartę Przypisania ról.

  4. Wybierz pozycję + Dodaj z górnego menu, a następnie pozycję Dodaj przypisanie roli z wyświetlonego menu rozwijanego.

    A screenshot showing how to assign a role.

  5. Użyj pola wyszukiwania, aby filtrować wyniki do żądanej roli. W tym przykładzie wyszukaj Azure Event Hubs Data Owner i wybierz pasujący wynik. Następnie wybierz pozycję Dalej.

  6. W obszarze Przypisz dostęp do wybierz pozycję Użytkownik, grupa lub jednostka usługi, a następnie wybierz pozycję + Wybierz członków.

  7. W oknie dialogowym wyszukaj nazwę użytkownika firmy Microsoft Entra (zazwyczaj adres e-mail user@domain ), a następnie wybierz pozycję Wybierz w dolnej części okna dialogowego.

  8. Wybierz pozycję Przejrzyj i przypisz , aby przejść do ostatniej strony, a następnie ponownie przejrzyj i przypisz, aby ukończyć proces.

Wysyłanie zdarzeń

W tej sekcji utwórz skrypt języka Python w celu wysyłania zdarzeń do utworzonego wcześniej centrum zdarzeń.

  1. Otwórz ulubiony edytor języka Python, taki jak Visual Studio Code.

  2. Utwórz skrypt o nazwie send.py. Ten skrypt wysyła partię zdarzeń do utworzonego wcześniej centrum zdarzeń.

  3. Wklej następujący kod do send.py:

    W kodzie użyj rzeczywistych wartości, aby zastąpić następujące symbole zastępcze:

    • EVENT_HUB_FULLY_QUALIFIED_NAMESPACE
    • EVENT_HUB_NAME
    import asyncio
    
    from azure.eventhub import EventData
    from azure.eventhub.aio import EventHubProducerClient
    from azure.identity.aio import DefaultAzureCredential
    
    EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"
    EVENT_HUB_NAME = "EVENT_HUB_NAME"
    
    credential = DefaultAzureCredential()
    
    async def run():
        # Create a producer client to send messages to the event hub.
        # Specify a credential that has correct role assigned to access
        # event hubs namespace and the event hub name.
        producer = EventHubProducerClient(
            fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
            eventhub_name=EVENT_HUB_NAME,
            credential=credential,
        )
        async with producer:
            # Create a batch.
            event_data_batch = await producer.create_batch()
    
            # Add events to the batch.
            event_data_batch.add(EventData("First event "))
            event_data_batch.add(EventData("Second event"))
            event_data_batch.add(EventData("Third event"))
    
            # Send the batch of events to the event hub.
            await producer.send_batch(event_data_batch)
    
            # Close credential when no longer needed.
            await credential.close()
    
    asyncio.run(run())
    

    Uwaga

    Przykłady innych opcji wysyłania zdarzeń do centrum zdarzeń asynchronicznie przy użyciu parametry połączenia można znaleźć na stronie usługi GitHub send_async.py. Pokazane wzorce mają również zastosowanie do wysyłania zdarzeń bez hasła.

Odbieranie zdarzeń

Ten przewodnik Szybki start używa usługi Azure Blob Storage jako magazynu punktów kontrolnych. Magazyn punktów kontrolnych służy do utrwalania punktów kontrolnych (czyli ostatnich pozycji odczytu).

Postępuj zgodnie z poniższymi zaleceniami w przypadku korzystania z usługi Azure Blob Storage jako magazynu punktów kontrolnych:

  • Użyj oddzielnego kontenera dla każdej grupy odbiorców. Możesz użyć tego samego konta magazynu, ale użyj jednego kontenera dla każdej grupy.
  • Nie używaj kontenera dla żadnych innych elementów i nie używaj konta magazynu dla innych elementów.
  • Konto magazynu powinno znajdować się w tym samym regionie co wdrożona aplikacja. Jeśli aplikacja jest lokalna, spróbuj wybrać możliwy region najbliżej.

Na stronie Konto magazynu w witrynie Azure Portal w sekcji Blob Service upewnij się, że następujące ustawienia są wyłączone.

  • Hierarchiczna przestrzeń nazw
  • Usuwanie nietrwałe obiektów blob
  • Wersje

Tworzenie konta usługi Azure Storage i kontenera obiektów blob

Utwórz w nim konto usługi Azure Storage i kontener obiektów blob, wykonując następujące czynności:

  1. Tworzenie konta usługi Azure Storage
  2. Utwórz kontener obiektów blob.
  3. Uwierzytelnij się w kontenerze obiektów blob.

Pamiętaj, aby zarejestrować parametry połączenia i nazwę kontenera do późniejszego użycia w kodzie odbioru.

Podczas tworzenia aplikacji lokalnie upewnij się, że konto użytkownika, które uzyskuje dostęp do danych obiektów blob, ma odpowiednie uprawnienia. Będziesz potrzebować współautora danych obiektu blob usługi Storage, aby odczytywać i zapisywać dane obiektów blob. Aby przypisać sobie tę rolę, musisz przypisać rolę Administracja istratora dostępu użytkowników lub inną rolę obejmującą akcję Microsoft.Authorization/roleAssignments/write. Role RBAC platformy Azure można przypisać użytkownikowi przy użyciu witryny Azure Portal, interfejsu wiersza polecenia platformy Azure lub programu Azure PowerShell. Więcej informacji na temat dostępnych zakresów przypisań ról można znaleźć na stronie przeglądu zakresu.

W tym scenariuszu przypiszesz uprawnienia do konta użytkownika w zakresie konta magazynu, aby postępować zgodnie z zasadą najniższych uprawnień. Ta praktyka zapewnia użytkownikom tylko minimalne wymagane uprawnienia i tworzy bezpieczniejsze środowiska produkcyjne.

W poniższym przykładzie do konta użytkownika zostanie przypisana rola Współautor danych obiektu blob usługi Storage, która zapewnia zarówno dostęp do odczytu, jak i zapisu do danych obiektów blob na koncie magazynu.

Ważne

W większości przypadków propagacja przypisania roli na platformie Azure potrwa minutę lub dwie, ale w rzadkich przypadkach może upłynąć do ośmiu minut. Jeśli podczas pierwszego uruchomienia kodu wystąpią błędy uwierzytelniania, zaczekaj chwilę i spróbuj ponownie.

  1. W witrynie Azure Portal znajdź konto magazynu przy użyciu głównego paska wyszukiwania lub nawigacji po lewej stronie.

  2. Na stronie przeglądu konta magazynu wybierz pozycję Kontrola dostępu (Zarządzanie dostępem i tożsamościami) z menu po lewej stronie.

  3. Na stronie Kontrola dostępu (Zarządzanie dostępem i tożsamościami) wybierz kartę Przypisania ról.

  4. Wybierz pozycję + Dodaj z górnego menu, a następnie pozycję Dodaj przypisanie roli z wyświetlonego menu rozwijanego.

    A screenshot showing how to assign a storage account role.

  5. Użyj pola wyszukiwania, aby filtrować wyniki do żądanej roli. W tym przykładzie wyszukaj pozycję Współautor danych obiektu blob usługi Storage i wybierz pasujący wynik, a następnie wybierz pozycję Dalej.

  6. W obszarze Przypisz dostęp do wybierz pozycję Użytkownik, grupa lub jednostka usługi, a następnie wybierz pozycję + Wybierz członków.

  7. W oknie dialogowym wyszukaj nazwę użytkownika firmy Microsoft Entra (zazwyczaj adres e-mail user@domain ), a następnie wybierz pozycję Wybierz w dolnej części okna dialogowego.

  8. Wybierz pozycję Przejrzyj i przypisz , aby przejść do ostatniej strony, a następnie ponownie przejrzyj i przypisz, aby ukończyć proces.

Instalowanie pakietów w celu odbierania zdarzeń

W przypadku strony odbieranej należy zainstalować co najmniej jeden pakiet. W tym przewodniku Szybki start użyjesz usługi Azure Blob Storage do utrwalania punktów kontrolnych, aby program nie odczytywał zdarzeń, które zostały już odczytane. Wykonuje ona punkty kontrolne metadanych na odebranych komunikatach w regularnych odstępach czasu w obiekcie blob. Takie podejście ułatwia kontynuowanie odbierania komunikatów później z miejsca, w którym zostało przerwane.

pip install azure-eventhub-checkpointstoreblob-aio
pip install azure-identity

Tworzenie skryptu języka Python w celu odbierania zdarzeń

W tej sekcji utworzysz skrypt języka Python do odbierania zdarzeń z centrum zdarzeń:

  1. Otwórz ulubiony edytor języka Python, taki jak Visual Studio Code.

  2. Utwórz skrypt o nazwie recv.py.

  3. Wklej następujący kod do recv.py:

    W kodzie użyj rzeczywistych wartości, aby zastąpić następujące symbole zastępcze:

    • BLOB_STORAGE_ACCOUNT_URL
    • BLOB_CONTAINER_NAME
    • EVENT_HUB_FULLY_QUALIFIED_NAMESPACE
    • EVENT_HUB_NAME
    import asyncio
    
    from azure.eventhub.aio import EventHubConsumerClient
    from azure.eventhub.extensions.checkpointstoreblobaio import (
        BlobCheckpointStore,
    )
    from azure.identity.aio import DefaultAzureCredential
    
    BLOB_STORAGE_ACCOUNT_URL = "BLOB_STORAGE_ACCOUNT_URL"
    BLOB_CONTAINER_NAME = "BLOB_CONTAINER_NAME"
    EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"
    EVENT_HUB_NAME = "EVENT_HUB_NAME"
    
    credential = DefaultAzureCredential()
    
    async def on_event(partition_context, event):
        # Print the event data.
        print(
            'Received the event: "{}" from the partition with ID: "{}"'.format(
                event.body_as_str(encoding="UTF-8"), partition_context.partition_id
            )
        )
    
        # Update the checkpoint so that the program doesn't read the events
        # that it has already read when you run it next time.
        await partition_context.update_checkpoint(event)
    
    
    async def main():
        # Create an Azure blob checkpoint store to store the checkpoints.
        checkpoint_store = BlobCheckpointStore(
            blob_account_url=BLOB_STORAGE_ACCOUNT_URL,
            container_name=BLOB_CONTAINER_NAME,
            credential=credential,
        )
    
        # Create a consumer client for the event hub.
        client = EventHubConsumerClient(
            fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
            eventhub_name=EVENT_HUB_NAME,
            consumer_group="$Default",
            checkpoint_store=checkpoint_store,
            credential=credential,
        )
        async with client:
            # Call the receive method. Read from the beginning of the partition
            # (starting_position: "-1")
            await client.receive(on_event=on_event, starting_position="-1")
    
        # Close credential when no longer needed.
        await credential.close()
    
    if __name__ == "__main__":
        # Run the main method.
        asyncio.run(main())
    

    Uwaga

    Przykłady innych opcji odbierania zdarzeń z centrum zdarzeń asynchronicznie przy użyciu parametry połączenia można znaleźć na stronie usługi GitHub recv_with_checkpoint_store_async.py. Pokazane wzorce mają również zastosowanie do odbierania zdarzeń bez hasła.

Uruchamianie aplikacji odbiorcy

Aby uruchomić skrypt, otwórz wiersz polecenia z językiem Python w swojej ścieżce, a następnie uruchom następujące polecenie:

python recv.py

Uruchamianie aplikacji nadawcy

Aby uruchomić skrypt, otwórz wiersz polecenia z językiem Python w swojej ścieżce, a następnie uruchom następujące polecenie:

python send.py

W oknie odbiorcy powinny być wyświetlane komunikaty, które zostały wysłane do centrum zdarzeń.

Rozwiązywanie problemów

Jeśli nie widzisz zdarzeń w oknie odbiorcy lub kod zgłasza błąd, wypróbuj następujące porady dotyczące rozwiązywania problemów:

  • Jeśli nie widzisz wyników z recy.py, uruchom send.py kilka razy.

  • Jeśli podczas korzystania z kodu bez hasła (z poświadczeniami) zostaną wyświetlone błędy dotyczące "coroutine", upewnij się, że używasz importowania z azure.identity.aioprogramu .

  • Jeśli zostanie wyświetlona opcja "Nieujawiona sesja klienta" z kodem bez hasła (z poświadczeniami), upewnij się, że po zakończeniu zamknij poświadczenie. Aby uzyskać więcej informacji, zobacz Async credentials (Poświadczenia asynchroniczne).

  • Jeśli podczas uzyskiwania dostępu do magazynu występują błędy autoryzacji w przypadku recv.py , upewnij się, że wykonano kroki opisane w artykule Tworzenie konta usługi Azure Storage i kontenera obiektów blob oraz przypisano rolę Współautor danych obiektu blob usługi Storage do jednostki usługi.

  • Jeśli otrzymasz zdarzenia z różnymi identyfikatorami partycji, ten wynik będzie oczekiwany. Partycje stanowią mechanizm organizacji danych powiązany z równoległością podrzędną wymaganą w aplikacjach korzystających z tych danych. Liczba partycji w centrum zdarzeń jest bezpośrednio związana z oczekiwaną liczbą jednoczesnych czytników. Aby uzyskać więcej informacji, zobacz Dowiedz się więcej o partycjach.

Następne kroki

W tym przewodniku Szybki start wysłano zdarzenia i odebrano je asynchronicznie. Aby dowiedzieć się, jak synchronicznie wysyłać i odbierać zdarzenia, przejdź do strony usługi GitHub sync_samples.

W przypadku wszystkich przykładów (synchronicznych i asynchronicznych) w usłudze GitHub przejdź do biblioteki klienta usługi Azure Event Hubs dla przykładów języka Python.