Szybki start: wysyłanie zdarzeń do usługi Event Hubs lub odbieranie zdarzeń przy użyciu języka Go
Azure Event Hubs to platforma do pozyskiwania i strumieniowego przesyłania danych, która umożliwia odbieranie i przetwarzanie milionów zdarzeń na sekundę. Usługa Event Hubs pozwala przetwarzać i przechowywać zdarzenia, dane lub dane telemetryczne generowane przez rozproszone oprogramowanie i urządzenia. Dane wysłane do centrum zdarzeń mogą zostać przekształcone i zmagazynowane przy użyciu dowolnego dostawcy analityki czasu rzeczywistego lub adapterów przetwarzania wsadowego/magazynowania. Aby zapoznać się ze szczegółowym omówieniem usługi Event Hubs, zobacz Omówienie usługi Event Hubs i Funkcje usługi Event Hubs.
W tym przewodniku Szybki start opisano sposób pisania aplikacji języka Go w celu wysyłania zdarzeń do centrum zdarzeń lub odbierania zdarzeń.
Uwaga
Ten przewodnik Szybki start jest oparty na przykładach w witrynie GitHub pod adresem https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs. Sekcja wysyłanie zdarzeń jest oparta na przykładzie example_producing_events_test.go , a odbieranie jest oparte na przykładzie example_processor_test.go . Kod jest uproszczony dla przewodnika Szybki start i wszystkie szczegółowe komentarze są usuwane, więc zapoznaj się z przykładami, aby uzyskać więcej szczegółów i wyjaśnień.
Wymagania wstępne
Do wykonania kroków tego przewodnika Szybki start niezbędne jest spełnienie następujących wymagań wstępnych:
- Przejdź do zainstalowanego lokalnie. W razie potrzeby postępuj zgodnie z tymi instrukcjami .
- Aktywne konto platformy Azure. Jeśli nie masz subskrypcji platformy Azure, przed rozpoczęciem utwórz bezpłatne konto.
- Utwórz przestrzeń nazw usługi Event Hubs i centrum zdarzeń. Użyj witryny Azure Portal, aby utworzyć przestrzeń nazw typu Event Hubs i uzyskać poświadczenia zarządzania, które aplikacja musi komunikować się z centrum zdarzeń. Aby utworzyć przestrzeń nazw i centrum zdarzeń, wykonaj procedurę opisaną w tym artykule.
Wysyłanie zdarzeń
W tej sekcji pokazano, jak utworzyć aplikację Go w celu wysyłania zdarzeń do centrum zdarzeń.
Instalowanie pakietu Języka Go
Pobierz pakiet Go dla usługi Event Hubs, jak pokazano w poniższym przykładzie.
go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs
Kod do wysyłania zdarzeń do centrum zdarzeń
Oto kod do wysyłania zdarzeń do centrum zdarzeń. Główne kroki w kodzie to:
- Utwórz klienta producenta usługi Event Hubs przy użyciu parametry połączenia do przestrzeni nazw usługi Event Hubs i nazwy centrum zdarzeń.
- Utwórz obiekt wsadowy i dodaj przykładowe zdarzenia do partii.
- Wyślij partię zdarzeń do tych zdarzeń.
Ważne
Zastąp NAMESPACE CONNECTION STRING
ciąg parametry połączenia przestrzenią nazw usługi Event Hubs i EVENT HUB NAME
nazwą centrum zdarzeń w przykładowym kodzie.
package main
import (
"context"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)
func main() {
// create an Event Hubs producer client using a connection string to the namespace and the event hub
producerClient, err := azeventhubs.NewProducerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", nil)
if err != nil {
panic(err)
}
defer producerClient.Close(context.TODO())
// create sample events
events := createEventsForSample()
// create a batch object and add sample events to the batch
newBatchOptions := &azeventhubs.EventDataBatchOptions{}
batch, err := producerClient.NewEventDataBatch(context.TODO(), newBatchOptions)
if err != nil {
panic(err)
}
for i := 0; i < len(events); i++ {
err = batch.AddEventData(events[i], nil)
if err != nil {
panic(err)
}
}
// send the batch of events to the event hub
err = producerClient.SendEventDataBatch(context.TODO(), batch, nil)
if err != nil {
panic(err)
}
}
func createEventsForSample() []*azeventhubs.EventData {
return []*azeventhubs.EventData{
{
Body: []byte("hello"),
},
{
Body: []byte("world"),
},
}
}
Nie uruchamiaj jeszcze aplikacji. Najpierw należy uruchomić aplikację odbiorcy, a następnie aplikację nadawcy.
Odbieranie zdarzeń
Tworzenie konta magazynu i kontenera
Stan, taki jak dzierżawy partycji i punktów kontrolnych w zdarzeniach, są współużytkowane przez odbiorniki przy użyciu kontenera usługi Azure Storage. Konto magazynu i kontener można utworzyć za pomocą zestawu SDK języka Go, ale możesz je również utworzyć, postępując zgodnie z instrukcjami w temacie About Azure storage accounts (Informacje o kontach usługi Azure Storage).
Postępuj zgodnie z poniższymi zaleceniami w przypadku korzystania z usługi Azure Blob Storage jako magazynu punktów kontrolnych:
- Użyj oddzielnego kontenera dla każdej grupy odbiorców. Możesz użyć tego samego konta magazynu, ale użyj jednego kontenera dla każdej grupy.
- Nie używaj kontenera dla żadnych innych elementów i nie używaj konta magazynu dla innych elementów.
- Konto magazynu powinno znajdować się w tym samym regionie co wdrożona aplikacja. Jeśli aplikacja jest lokalna, spróbuj wybrać możliwy region najbliżej.
Na stronie Konto magazynu w witrynie Azure Portal w sekcji Blob Service upewnij się, że następujące ustawienia są wyłączone.
- Hierarchiczna przestrzeń nazw
- Usuwanie nietrwałe obiektów blob
- Wersje
Pakiety języka Go
Aby odbierać komunikaty, pobierz pakiety Języka Go dla usługi Event Hubs, jak pokazano w poniższym przykładzie.
go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs
go get github.com/Azure/azure-sdk-for-go/sdk/storage/azblob
Kod odbierania zdarzeń z centrum zdarzeń
Oto kod do odbierania zdarzeń z centrum zdarzeń. Główne kroki w kodzie to:
- Sprawdź obiekt magazynu punktów kontrolnych reprezentujący usługę Azure Blob Storage używaną przez centrum zdarzeń do tworzenia punktów kontrolnych.
- Utwórz klienta odbiorcy usługi Event Hubs przy użyciu parametry połączenia do przestrzeni nazw usługi Event Hubs i nazwy centrum zdarzeń.
- Utwórz procesor zdarzeń przy użyciu obiektu klienta i obiektu magazynu punktów kontrolnych. Procesor odbiera i przetwarza zdarzenia.
- Dla każdej partycji w centrum zdarzeń utwórz klienta partycji z procesemEvents jako funkcji przetwarzania zdarzeń.
- Uruchom wszystkich klientów partycji, aby odbierać i przetwarzać zdarzenia.
Ważne
Zastąp następujące wartości zastępcze rzeczywistymi wartościami:
AZURE STORAGE CONNECTION STRING
z parametry połączenia dla konta usługi Azure StorageBLOB CONTAINER NAME
z nazwą kontenera obiektów blob utworzonych na koncie magazynuNAMESPACE CONNECTION STRING
z parametry połączenia dla przestrzeni nazw usługi Event HubsEVENT HUB NAME
z nazwą centrum zdarzeń w przykładowym kodzie.
package main
import (
"context"
"errors"
"fmt"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
)
func main() {
// create a container client using a connection string and container name
checkClient, err := container.NewClientFromConnectionString("AZURE STORAGE CONNECTION STRING", "CONTAINER NAME", nil)
if err != nil {
panic(err)
}
// create a checkpoint store that will be used by the event hub
checkpointStore, err := checkpoints.NewBlobStore(checkClient, nil)
if err != nil {
panic(err)
}
// create a consumer client using a connection string to the namespace and the event hub
consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", azeventhubs.DefaultConsumerGroup, nil)
if err != nil {
panic(err)
}
defer consumerClient.Close(context.TODO())
// create a processor to receive and process events
processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)
if err != nil {
panic(err)
}
// for each partition in the event hub, create a partition client with processEvents as the function to process events
dispatchPartitionClients := func() {
for {
partitionClient := processor.NextPartitionClient(context.TODO())
if partitionClient == nil {
break
}
go func() {
if err := processEvents(partitionClient); err != nil {
panic(err)
}
}()
}
}
// run all partition clients
go dispatchPartitionClients()
processorCtx, processorCancel := context.WithCancel(context.TODO())
defer processorCancel()
if err := processor.Run(processorCtx); err != nil {
panic(err)
}
}
func processEvents(partitionClient *azeventhubs.ProcessorPartitionClient) error {
defer closePartitionResources(partitionClient)
for {
receiveCtx, receiveCtxCancel := context.WithTimeout(context.TODO(), time.Minute)
events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
receiveCtxCancel()
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
return err
}
fmt.Printf("Processing %d event(s)\n", len(events))
for _, event := range events {
fmt.Printf("Event received with body %v\n", string(event.Body))
}
if len(events) != 0 {
if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1], nil); err != nil {
return err
}
}
}
}
func closePartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) {
defer partitionClient.Close(context.TODO())
}
Uruchamianie aplikacji odbiorcy i nadawcy
Najpierw uruchom aplikację odbiorcy.
Uruchom aplikację nadawcy.
Poczekaj minutę, aby zobaczyć następujące dane wyjściowe w oknie odbiorcy.
Processing 2 event(s) Event received with body hello Event received with body world
Następne kroki
Zobacz przykłady w witrynie GitHub pod adresem https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs.