Udostępnij przez


Szybki start: wysyłanie zdarzeń do usługi Event Hubs lub odbieranie zdarzeń przy użyciu języka Go

Azure Event Hubs to platforma do pozyskiwania i strumieniowego przesyłania danych, która umożliwia odbieranie i przetwarzanie milionów zdarzeń na sekundę. Usługa Event Hubs pozwala przetwarzać i przechowywać zdarzenia, dane lub dane telemetryczne generowane przez rozproszone oprogramowanie i urządzenia. Dane wysłane do centrum zdarzeń mogą zostać przekształcone i zmagazynowane przy użyciu dowolnego dostawcy analityki czasu rzeczywistego lub adapterów przetwarzania wsadowego/magazynowania. Aby zapoznać się ze szczegółowym omówieniem usługi Event Hubs, zobacz Omówienie usługi Event Hubs i Funkcje usługi Event Hubs.

W tym przewodniku szybkiego startu opisano, jak pisać aplikacje w języku Go w celu wysyłania zdarzeń do centrum zdarzeń lub ich odbierania.

Uwaga / Notatka

Ten szybki start opiera się na przykładach z witryny GitHub pod adresem https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs. Sekcja wysyłania zdarzeń jest oparta na przykładzie example_producing_events_test.go, a sekcja odbierania zdarzeń jest oparta na przykładzie example_processor_test.go. Kod jest uproszczony dla szybkiego startu, a wszystkie szczegółowe komentarze zostały usunięte, więc zapoznaj się z przykładami, aby uzyskać więcej szczegółów i wyjaśnień.

Wymagania wstępne

Aby ukończyć ten szybki start, konieczne jest spełnienie następujących wymagań wstępnych:

  • Go zainstalowane lokalnie. W razie potrzeby postępuj zgodnie z tymi instrukcjami .
  • Aktywne konto platformy Azure. Jeśli nie masz subskrypcji Azure, przed rozpoczęciem utwórz darmowe konto.
  • Utwórz przestrzeń nazw usługi Event Hubs i centrum zdarzeń. Użyj portalu Azure, aby utworzyć przestrzeń nazw typu Event Hubs i uzyskać poświadczenia zarządzania, których aplikacja potrzebuje, aby komunikować się z centrum zdarzeń. Aby utworzyć przestrzeń nazw i centrum zdarzeń, wykonaj procedurę opisaną w tym artykule.

Wysyłanie zdarzeń

W tej sekcji pokazano, jak utworzyć aplikację Go w celu wysyłania zdarzeń do centrum zdarzeń.

Instalowanie pakietu Go

Pobierz pakiet Go dla usługi Event Hubs, jak pokazano w poniższym przykładzie.

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs

Kod do wysyłania zdarzeń do centrum zdarzeń

Oto kod do wysyłania zdarzeń do centrum zdarzeń. Główne kroki w kodzie to:

  1. Utwórz klienta producenta usługi Event Hubs przy użyciu parametrów połączenia z przestrzenią nazw usługi Event Hubs i nazwą centrum zdarzeń.
  2. Utwórz obiekt wsadowy i dodaj przykładowe zdarzenia do wsadu.
  3. Wyślij partię zdarzeń do tych zdarzeń.

Ważne

Zastąp NAMESPACE CONNECTION STRING ciągiem parametrów połączenia do przestrzeni nazw Event Hubs i EVENT HUB NAME nazwą centrum zdarzeń w przykładowym kodzie.

package main

import (
	"context"

	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

func main() {
	// create an Event Hubs producer client using a connection string to the namespace and the event hub
	producerClient, err := azeventhubs.NewProducerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", nil)

	if err != nil {
		panic(err)
	}

	defer producerClient.Close(context.TODO())

	// create sample events
	events := createEventsForSample()

	// create a batch object and add sample events to the batch
	newBatchOptions := &azeventhubs.EventDataBatchOptions{}

	batch, err := producerClient.NewEventDataBatch(context.TODO(), newBatchOptions)

	if err != nil {
		panic(err)
	}

	for i := 0; i < len(events); i++ {
		err = batch.AddEventData(events[i], nil)

		if err != nil {
			panic(err)
		}
	}

	// send the batch of events to the event hub
	err = producerClient.SendEventDataBatch(context.TODO(), batch, nil)

	if err != nil {
		panic(err)
	}
}

func createEventsForSample() []*azeventhubs.EventData {
	return []*azeventhubs.EventData{
		{
			Body: []byte("hello"),
		},
		{
			Body: []byte("world"),
		},
	}
}

Nie uruchamiaj jeszcze aplikacji. Najpierw należy uruchomić aplikację odbiorcy, a następnie aplikację nadawcy.

Odbieranie zdarzeń

Tworzenie konta magazynu i kontenera

Stan, takie jak dzierżawy partycji i punkty kontrolne w zdarzeniach, są współużytkowane przez odbiorniki za pomocą kontenera usługi Azure Storage. Konto magazynu i kontener można utworzyć za pomocą zestawu SDK języka Go, ale możesz je również utworzyć, postępując zgodnie z instrukcjami w temacie About Azure storage accounts (Informacje o kontach usługi Azure Storage).

Postępuj zgodnie z tymi zaleceniami, gdy używasz usługi Azure Blob Storage jako magazynu punktów kontrolnych:

  • Użyj oddzielnego kontenera dla każdej grupy odbiorców. Możesz użyć tego samego konta magazynu, ale użyj jednego kontenera dla każdej grupy.
  • Nie używaj konta magazynowego do niczego innego.
  • Nie używaj kontenera do niczego innego.
  • Utwórz konto magazynu w tym samym regionie co wdrożona aplikacja. Jeśli aplikacja jest na miejscu, spróbuj wybrać najbliższy możliwy region.

Na stronie konta magazynowego w portalu Azure, w sekcji usługi Blob, upewnij się, że następujące ustawienia zostały wyłączone.

  • Hierarchiczna przestrzeń nazw
  • Miękkie usuwanie obiektów blob
  • Wersjonowanie

Pakiety języka Go

Aby odbierać komunikaty, pobierz pakiety Go dla usługi Event Hubs, jak pokazano w przykładzie poniżej.

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs
go get github.com/Azure/azure-sdk-for-go/sdk/storage/azblob

Kod odbierania zdarzeń z centrum zdarzeń

Oto kod do odbierania zdarzeń z centrum zdarzeń. Główne kroki w kodzie to:

  1. Sprawdź obiekt magazynu punktów kontrolnych reprezentujący usługę Azure Blob Storage używaną przez centrum zdarzeń do tworzenia punktów kontrolnych.
  2. Utwórz klienta odbiorcy usługi Event Hubs przy użyciu parametrów połączenia z przestrzenią nazw usługi Event Hubs i nazwą centrum zdarzeń.
  3. Utwórz procesor zdarzeń przy użyciu obiektu klienta i obiektu magazynu punktów kontrolnych. Procesor odbiera i przetwarza zdarzenia.
  4. Dla każdej partycji w centrum zdarzeń utwórz klienta partycji z processEvents jako funkcją do przetwarzania zdarzeń.
  5. Uruchom wszystkich klientów partycji, aby odbierać i przetwarzać zdarzenia.

Ważne

Zastąp następujące wartości zastępcze rzeczywistymi wartościami:

  • AZURE STORAGE CONNECTION STRING z parametrami połączenia dla konta Azure Storage
  • BLOB CONTAINER NAME z nazwą kontenera obiektów blob utworzonego na koncie magazynowym
  • NAMESPACE CONNECTION STRING z parametrami połączenia dla przestrzeni nazw usługi Event Hubs
  • EVENT HUB NAME z nazwą centrum zdarzeń w przykładowym kodzie.
package main

import (
	"context"
	"errors"
	"fmt"
	"time"

	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
	"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
)

func main() {

	// create a container client using a connection string and container name
	checkClient, err := container.NewClientFromConnectionString("AZURE STORAGE CONNECTION STRING", "CONTAINER NAME", nil)

	if err != nil {
		panic(err)
	}

	// create a checkpoint store that will be used by the event hub
	checkpointStore, err := checkpoints.NewBlobStore(checkClient, nil)

	if err != nil {
		panic(err)
	}

	// create a consumer client using a connection string to the namespace and the event hub
	consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", azeventhubs.DefaultConsumerGroup, nil)

	if err != nil {
		panic(err)
	}

	defer consumerClient.Close(context.TODO())

	// create a processor to receive and process events
	processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)

	if err != nil {
		panic(err)
	}

	//  for each partition in the event hub, create a partition client with processEvents as the function to process events
	dispatchPartitionClients := func() {
		for {
			partitionClient := processor.NextPartitionClient(context.TODO())

			if partitionClient == nil {
				break
			}

			go func() {
				if err := processEvents(partitionClient); err != nil {
					panic(err)
				}
			}()
		}
	}

	// run all partition clients
	go dispatchPartitionClients()

	processorCtx, processorCancel := context.WithCancel(context.TODO())
	defer processorCancel()

	if err := processor.Run(processorCtx); err != nil {
		panic(err)
	}
}

func processEvents(partitionClient *azeventhubs.ProcessorPartitionClient) error {
	defer closePartitionResources(partitionClient)
	for {
		receiveCtx, receiveCtxCancel := context.WithTimeout(context.TODO(), time.Minute)
		events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
		receiveCtxCancel()

		if err != nil && !errors.Is(err, context.DeadlineExceeded) {
			return err
		}

		fmt.Printf("Processing %d event(s)\n", len(events))

		for _, event := range events {
			fmt.Printf("Event received with body %v\n", string(event.Body))
		}

		if len(events) != 0 {
			if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1], nil); err != nil {
				return err
			}
		}
	}
}

func closePartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) {
	defer partitionClient.Close(context.TODO())
}

Uruchamianie aplikacji odbiorcy i nadawcy

  1. Najpierw uruchom aplikację odbiorcy.

  2. Uruchom aplikację nadawcy.

  3. Poczekaj minutę, aby zobaczyć następujące dane wyjściowe w oknie odbiorcy.

    Processing 2 event(s)
    Event received with body hello
    Event received with body world
    

Dalsze kroki

Zobacz przykłady w witrynie GitHub pod adresem https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs.