Nuta
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować się zalogować lub zmienić katalog.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Azure Event Hubs to platforma do pozyskiwania i strumieniowego przesyłania danych, która umożliwia odbieranie i przetwarzanie milionów zdarzeń na sekundę. Usługa Event Hubs pozwala przetwarzać i przechowywać zdarzenia, dane lub dane telemetryczne generowane przez rozproszone oprogramowanie i urządzenia. Dane wysłane do centrum zdarzeń mogą zostać przekształcone i zmagazynowane przy użyciu dowolnego dostawcy analityki czasu rzeczywistego lub adapterów przetwarzania wsadowego/magazynowania. Aby zapoznać się ze szczegółowym omówieniem usługi Event Hubs, zobacz Omówienie usługi Event Hubs i Funkcje usługi Event Hubs.
W tym przewodniku szybkiego startu opisano, jak pisać aplikacje w języku Go w celu wysyłania zdarzeń do centrum zdarzeń lub ich odbierania.
Uwaga / Notatka
Ten szybki start opiera się na przykładach z witryny GitHub pod adresem https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs. Sekcja wysyłania zdarzeń jest oparta na przykładzie example_producing_events_test.go, a sekcja odbierania zdarzeń jest oparta na przykładzie example_processor_test.go. Kod jest uproszczony dla szybkiego startu, a wszystkie szczegółowe komentarze zostały usunięte, więc zapoznaj się z przykładami, aby uzyskać więcej szczegółów i wyjaśnień.
Wymagania wstępne
Aby ukończyć ten szybki start, konieczne jest spełnienie następujących wymagań wstępnych:
- Go zainstalowane lokalnie. W razie potrzeby postępuj zgodnie z tymi instrukcjami .
- Aktywne konto platformy Azure. Jeśli nie masz subskrypcji Azure, przed rozpoczęciem utwórz darmowe konto.
- Utwórz przestrzeń nazw usługi Event Hubs i centrum zdarzeń. Użyj portalu Azure, aby utworzyć przestrzeń nazw typu Event Hubs i uzyskać poświadczenia zarządzania, których aplikacja potrzebuje, aby komunikować się z centrum zdarzeń. Aby utworzyć przestrzeń nazw i centrum zdarzeń, wykonaj procedurę opisaną w tym artykule.
Wysyłanie zdarzeń
W tej sekcji pokazano, jak utworzyć aplikację Go w celu wysyłania zdarzeń do centrum zdarzeń.
Instalowanie pakietu Go
Pobierz pakiet Go dla usługi Event Hubs, jak pokazano w poniższym przykładzie.
go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs
Kod do wysyłania zdarzeń do centrum zdarzeń
Oto kod do wysyłania zdarzeń do centrum zdarzeń. Główne kroki w kodzie to:
- Utwórz klienta producenta usługi Event Hubs przy użyciu parametrów połączenia z przestrzenią nazw usługi Event Hubs i nazwą centrum zdarzeń.
- Utwórz obiekt wsadowy i dodaj przykładowe zdarzenia do wsadu.
- Wyślij partię zdarzeń do tych zdarzeń.
Ważne
Zastąp NAMESPACE CONNECTION STRING ciągiem parametrów połączenia do przestrzeni nazw Event Hubs i EVENT HUB NAME nazwą centrum zdarzeń w przykładowym kodzie.
package main
import (
"context"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)
func main() {
// create an Event Hubs producer client using a connection string to the namespace and the event hub
producerClient, err := azeventhubs.NewProducerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", nil)
if err != nil {
panic(err)
}
defer producerClient.Close(context.TODO())
// create sample events
events := createEventsForSample()
// create a batch object and add sample events to the batch
newBatchOptions := &azeventhubs.EventDataBatchOptions{}
batch, err := producerClient.NewEventDataBatch(context.TODO(), newBatchOptions)
if err != nil {
panic(err)
}
for i := 0; i < len(events); i++ {
err = batch.AddEventData(events[i], nil)
if err != nil {
panic(err)
}
}
// send the batch of events to the event hub
err = producerClient.SendEventDataBatch(context.TODO(), batch, nil)
if err != nil {
panic(err)
}
}
func createEventsForSample() []*azeventhubs.EventData {
return []*azeventhubs.EventData{
{
Body: []byte("hello"),
},
{
Body: []byte("world"),
},
}
}
Nie uruchamiaj jeszcze aplikacji. Najpierw należy uruchomić aplikację odbiorcy, a następnie aplikację nadawcy.
Odbieranie zdarzeń
Tworzenie konta magazynu i kontenera
Stan, takie jak dzierżawy partycji i punkty kontrolne w zdarzeniach, są współużytkowane przez odbiorniki za pomocą kontenera usługi Azure Storage. Konto magazynu i kontener można utworzyć za pomocą zestawu SDK języka Go, ale możesz je również utworzyć, postępując zgodnie z instrukcjami w temacie About Azure storage accounts (Informacje o kontach usługi Azure Storage).
Postępuj zgodnie z tymi zaleceniami, gdy używasz usługi Azure Blob Storage jako magazynu punktów kontrolnych:
- Użyj oddzielnego kontenera dla każdej grupy odbiorców. Możesz użyć tego samego konta magazynu, ale użyj jednego kontenera dla każdej grupy.
- Nie używaj konta magazynowego do niczego innego.
- Nie używaj kontenera do niczego innego.
- Utwórz konto magazynu w tym samym regionie co wdrożona aplikacja. Jeśli aplikacja jest na miejscu, spróbuj wybrać najbliższy możliwy region.
Na stronie konta magazynowego w portalu Azure, w sekcji usługi Blob, upewnij się, że następujące ustawienia zostały wyłączone.
- Hierarchiczna przestrzeń nazw
- Miękkie usuwanie obiektów blob
- Wersjonowanie
Pakiety języka Go
Aby odbierać komunikaty, pobierz pakiety Go dla usługi Event Hubs, jak pokazano w przykładzie poniżej.
go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs
go get github.com/Azure/azure-sdk-for-go/sdk/storage/azblob
Kod odbierania zdarzeń z centrum zdarzeń
Oto kod do odbierania zdarzeń z centrum zdarzeń. Główne kroki w kodzie to:
- Sprawdź obiekt magazynu punktów kontrolnych reprezentujący usługę Azure Blob Storage używaną przez centrum zdarzeń do tworzenia punktów kontrolnych.
- Utwórz klienta odbiorcy usługi Event Hubs przy użyciu parametrów połączenia z przestrzenią nazw usługi Event Hubs i nazwą centrum zdarzeń.
- Utwórz procesor zdarzeń przy użyciu obiektu klienta i obiektu magazynu punktów kontrolnych. Procesor odbiera i przetwarza zdarzenia.
- Dla każdej partycji w centrum zdarzeń utwórz klienta partycji z processEvents jako funkcją do przetwarzania zdarzeń.
- Uruchom wszystkich klientów partycji, aby odbierać i przetwarzać zdarzenia.
Ważne
Zastąp następujące wartości zastępcze rzeczywistymi wartościami:
-
AZURE STORAGE CONNECTION STRINGz parametrami połączenia dla konta Azure Storage -
BLOB CONTAINER NAMEz nazwą kontenera obiektów blob utworzonego na koncie magazynowym -
NAMESPACE CONNECTION STRINGz parametrami połączenia dla przestrzeni nazw usługi Event Hubs -
EVENT HUB NAMEz nazwą centrum zdarzeń w przykładowym kodzie.
package main
import (
"context"
"errors"
"fmt"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
)
func main() {
// create a container client using a connection string and container name
checkClient, err := container.NewClientFromConnectionString("AZURE STORAGE CONNECTION STRING", "CONTAINER NAME", nil)
if err != nil {
panic(err)
}
// create a checkpoint store that will be used by the event hub
checkpointStore, err := checkpoints.NewBlobStore(checkClient, nil)
if err != nil {
panic(err)
}
// create a consumer client using a connection string to the namespace and the event hub
consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", azeventhubs.DefaultConsumerGroup, nil)
if err != nil {
panic(err)
}
defer consumerClient.Close(context.TODO())
// create a processor to receive and process events
processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)
if err != nil {
panic(err)
}
// for each partition in the event hub, create a partition client with processEvents as the function to process events
dispatchPartitionClients := func() {
for {
partitionClient := processor.NextPartitionClient(context.TODO())
if partitionClient == nil {
break
}
go func() {
if err := processEvents(partitionClient); err != nil {
panic(err)
}
}()
}
}
// run all partition clients
go dispatchPartitionClients()
processorCtx, processorCancel := context.WithCancel(context.TODO())
defer processorCancel()
if err := processor.Run(processorCtx); err != nil {
panic(err)
}
}
func processEvents(partitionClient *azeventhubs.ProcessorPartitionClient) error {
defer closePartitionResources(partitionClient)
for {
receiveCtx, receiveCtxCancel := context.WithTimeout(context.TODO(), time.Minute)
events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
receiveCtxCancel()
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
return err
}
fmt.Printf("Processing %d event(s)\n", len(events))
for _, event := range events {
fmt.Printf("Event received with body %v\n", string(event.Body))
}
if len(events) != 0 {
if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1], nil); err != nil {
return err
}
}
}
}
func closePartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) {
defer partitionClient.Close(context.TODO())
}
Uruchamianie aplikacji odbiorcy i nadawcy
Najpierw uruchom aplikację odbiorcy.
Uruchom aplikację nadawcy.
Poczekaj minutę, aby zobaczyć następujące dane wyjściowe w oknie odbiorcy.
Processing 2 event(s) Event received with body hello Event received with body world
Dalsze kroki
Zobacz przykłady w witrynie GitHub pod adresem https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs.