Szybki start: wysyłanie zdarzeń do usługi Event Hubs lub odbieranie zdarzeń przy użyciu języka Go

Azure Event Hubs to platforma do pozyskiwania i strumieniowego przesyłania danych, która umożliwia odbieranie i przetwarzanie milionów zdarzeń na sekundę. Usługa Event Hubs pozwala przetwarzać i przechowywać zdarzenia, dane lub dane telemetryczne generowane przez rozproszone oprogramowanie i urządzenia. Dane wysłane do centrum zdarzeń mogą zostać przekształcone i zmagazynowane przy użyciu dowolnego dostawcy analityki czasu rzeczywistego lub adapterów przetwarzania wsadowego/magazynowania. Aby zapoznać się ze szczegółowym omówieniem usługi Event Hubs, zobacz Omówienie usługi Event Hubs i Funkcje usługi Event Hubs.

W tym przewodniku Szybki start opisano sposób pisania aplikacji języka Go w celu wysyłania zdarzeń do centrum zdarzeń lub odbierania zdarzeń.

Uwaga

Ten przewodnik Szybki start jest oparty na przykładach w witrynie GitHub pod adresem https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs. Wysłanie jest oparte na przykładzie example_producing_events_test.go , a odbieranie jest oparte na przykładzie example_processor_test.go . Kod jest uproszczony dla przewodnika Szybki start i wszystkie szczegółowe komentarze są usuwane, więc zapoznaj się z przykładami, aby uzyskać więcej szczegółów i wyjaśnień.

Wymagania wstępne

Do wykonania kroków tego przewodnika Szybki start niezbędne jest spełnienie następujących wymagań wstępnych:

  • Przejdź do zainstalowanego lokalnie. W razie potrzeby postępuj zgodnie z tymi instrukcjami .
  • Aktywne konto platformy Azure. Jeśli nie masz subskrypcji platformy Azure, przed rozpoczęciem utwórz bezpłatne konto.
  • Utwórz przestrzeń nazw usługi Event Hubs i centrum zdarzeń. Użyj witryny Azure Portal, aby utworzyć przestrzeń nazw typu Event Hubs i uzyskać poświadczenia zarządzania, które aplikacja musi komunikować się z centrum zdarzeń. Aby utworzyć przestrzeń nazw i centrum zdarzeń, wykonaj procedurę opisaną w tym artykule.

Wysyłanie zdarzeń

W tej sekcji pokazano, jak utworzyć aplikację Go w celu wysyłania zdarzeń do centrum zdarzeń.

Instalowanie pakietu Języka Go

Pobierz pakiet Go dla usługi Event Hubs, jak pokazano w poniższym przykładzie.

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs

Kod do wysyłania zdarzeń do centrum zdarzeń

Oto kod do wysyłania zdarzeń do centrum zdarzeń. Główne kroki w kodzie to:

  1. Utwórz klienta producenta usługi Event Hubs przy użyciu parametry połączenia do przestrzeni nazw usługi Event Hubs i nazwy centrum zdarzeń.
  2. Utwórz obiekt wsadowy i dodaj przykładowe zdarzenia do partii.
  3. Wyślij partię zdarzeń do tych zdarzeń.

Ważne

Zastąp NAMESPACE CONNECTION STRING ciąg parametry połączenia przestrzenią nazw usługi Event Hubs i EVENT HUB NAME nazwą centrum zdarzeń w przykładowym kodzie.

package main

import (
    "context"

    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

func main() {

    // create an Event Hubs producer client using a connection string to the namespace and the event hub
    producerClient, err := azeventhubs.NewProducerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", nil)

    if err != nil {
        panic(err)
    }

    defer producerClient.Close(context.TODO())

    // create sample events
    events := createEventsForSample()

    // create a batch object and add sample events to the batch
    newBatchOptions := &azeventhubs.EventDataBatchOptions{}

    batch, err := producerClient.NewEventDataBatch(context.TODO(), newBatchOptions)

    for i := 0; i < len(events); i++ {
        err = batch.AddEventData(events[i], nil)
    }

    // send the batch of events to the event hub
    producerClient.SendEventDataBatch(context.TODO(), batch, nil)
}

func createEventsForSample() []*azeventhubs.EventData {
    return []*azeventhubs.EventData{
        {
            Body: []byte("hello"),
        },
        {
            Body: []byte("world"),
        },
    }
}

Nie uruchamiaj jeszcze aplikacji. Najpierw należy uruchomić aplikację odbiorcy, a następnie aplikację nadawcy.

Odbieranie zdarzeń

Tworzenie konta magazynu i kontenera

Stan, taki jak dzierżawy partycji i punktów kontrolnych w strumieniu zdarzeń, są współużytkowane przez odbiorniki przy użyciu kontenera usługi Azure Storage. Konto magazynu i kontener można utworzyć za pomocą zestawu SDK języka Go, ale możesz je również utworzyć, postępując zgodnie z instrukcjami w temacie About Azure storage accounts (Informacje o kontach usługi Azure Storage).

Postępuj zgodnie z poniższymi zaleceniami w przypadku korzystania z usługi Azure Blob Storage jako magazynu punktów kontrolnych:

  • Użyj oddzielnego kontenera dla każdej grupy odbiorców. Możesz użyć tego samego konta magazynu, ale użyj jednego kontenera dla każdej grupy.
  • Nie używaj kontenera dla żadnych innych elementów i nie używaj konta magazynu dla innych elementów.
  • Konto magazynu powinno znajdować się w tym samym regionie co wdrożona aplikacja. Jeśli aplikacja jest lokalna, spróbuj wybrać możliwy region najbliżej.

Na stronie Konto magazynu w witrynie Azure Portal w sekcji Blob Service upewnij się, że następujące ustawienia są wyłączone.

  • Hierarchiczna przestrzeń nazw
  • Usuwanie nietrwałe obiektów blob
  • Wersje

Pakiety języka Go

Aby odbierać komunikaty, pobierz pakiety Języka Go dla usługi Event Hubs, jak pokazano w poniższym przykładzie.

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs

Kod odbierania zdarzeń z centrum zdarzeń

Oto kod do odbierania zdarzeń z centrum zdarzeń. Główne kroki w kodzie to:

  1. Sprawdź obiekt magazynu punktów kontrolnych reprezentujący usługę Azure Blob Storage używaną przez centrum zdarzeń do tworzenia punktów kontrolnych.
  2. Utwórz klienta odbiorcy usługi Event Hubs przy użyciu parametry połączenia do przestrzeni nazw usługi Event Hubs i nazwy centrum zdarzeń.
  3. Utwórz procesor zdarzeń przy użyciu obiektu klienta i obiektu magazynu punktów kontrolnych. Procesor odbiera i przetwarza zdarzenia.
  4. Dla każdej partycji w centrum zdarzeń utwórz klienta partycji z procesemEvents jako funkcji przetwarzania zdarzeń.
  5. Uruchom wszystkich klientów partycji, aby odbierać i przetwarzać zdarzenia.

Ważne

Zastąp następujące wartości zastępcze rzeczywistymi wartościami:

  • AZURE STORAGE CONNECTION STRINGz parametry połączenia dla konta usługi Azure Storage
  • BLOB CONTAINER NAME z nazwą kontenera obiektów blob utworzonych na koncie magazynu
  • NAMESPACE CONNECTION STRINGz parametry połączenia dla przestrzeni nazw usługi Event Hubs
  • EVENT HUB NAME z nazwą centrum zdarzeń w przykładowym kodzie.
package main

import (
    "context"
    "errors"
    "fmt"
    "time"

    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
)

func main() {

    // create a container client using a connection string and container name
    checkClient, err := container.NewClientFromConnectionString("AZURE STORAGE CONNECTION STRING", "CONTAINER NAME", nil)
    
    // create a checkpoint store that will be used by the event hub
    checkpointStore, err := checkpoints.NewBlobStore(checkClient, nil)

    if err != nil {
        panic(err)
    }

    // create a consumer client using a connection string to the namespace and the event hub
    consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", azeventhubs.DefaultConsumerGroup, nil)

    if err != nil {
        panic(err)
    }

    defer consumerClient.Close(context.TODO())

    // create a processor to receive and process events
    processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)

    if err != nil {
        panic(err)
    }

    //  for each partition in the event hub, create a partition client with processEvents as the function to process events
    dispatchPartitionClients := func() {
        for {
            partitionClient := processor.NextPartitionClient(context.TODO())

            if partitionClient == nil {
                break
            }

            go func() {
                if err := processEvents(partitionClient); err != nil {
                    panic(err)
                }
            }()
        }
    }

    // run all partition clients
    go dispatchPartitionClients()

    processorCtx, processorCancel := context.WithCancel(context.TODO())
    defer processorCancel()

    if err := processor.Run(processorCtx); err != nil {
        panic(err)
    }
}

func processEvents(partitionClient *azeventhubs.ProcessorPartitionClient) error {
    defer closePartitionResources(partitionClient)
    for {
        receiveCtx, receiveCtxCancel := context.WithTimeout(context.TODO(), time.Minute)
        events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
        receiveCtxCancel()

        if err != nil && !errors.Is(err, context.DeadlineExceeded) {
            return err
        }

        fmt.Printf("Processing %d event(s)\n", len(events))

        for _, event := range events {
            fmt.Printf("Event received with body %v\n", string(event.Body))
        }

        if len(events) != 0 {
            if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1]); err != nil {
                return err
            }
        }
    }
}

func closePartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) {
    defer partitionClient.Close(context.TODO())
}

Uruchamianie aplikacji odbiorcy i nadawcy

  1. Najpierw uruchom aplikację odbiorcy.

  2. Uruchom aplikację nadawcy.

  3. Poczekaj minutę, aby zobaczyć następujące dane wyjściowe w oknie odbiorcy.

    Processing 2 event(s)
    Event received with body hello
    Event received with body world
    

Następne kroki

Zobacz przykłady w witrynie GitHub pod adresem https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs.