Rychlý start: Odesílání událostí nebo příjem událostí ze služby Event Hubs pomocí Go

Azure Event Hubs je platforma pro streamování velkých objemů dat a služba pro ingestování událostí, která je schopná přijmout a zpracovat miliony událostí za sekundu. Služba Event Hubs dokáže zpracovávat a ukládat události, data nebo telemetrické údaje produkované distribuovaným softwarem a zařízeními. Data odeslaná do centra událostí je možné transformovat a uložit pomocí libovolného poskytovatele analýz v reálném čase nebo adaptérů pro dávkové zpracování a ukládání. Podrobnější přehled služby Event Hubs najdete v tématech Přehled služby Event Hubs a Funkce služby Event Hubs.

Tento rychlý start popisuje, jak psát aplikace Go pro odesílání událostí do centra událostí nebo příjem událostí z centra událostí.

Poznámka:

Tento rychlý start vychází z ukázek na GitHubu na webu https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs. Odeslání vychází z ukázky example_producing_events_test.go a příjem vychází z ukázky example_processor_test.go . Kód je pro rychlý start zjednodušený a odeberou se všechny podrobné komentáře, takže se podívejte na ukázky, kde najdete další podrobnosti a vysvětlení.

Požadavky

K dokončení tohoto rychlého startu potřebujete následující požadavky:

  • Go nainstalovaný místně. V případě potřeby postupujte podle těchto pokynů .
  • Aktivní účet Azure Pokud ještě nemáte předplatné Azure, vytvořte si napřed bezplatný účet.
  • Vytvořte obor názvů služby Event Hubs a centrum událostí. Pomocí webu Azure Portal vytvořte obor názvů typu Event Hubs a získejte přihlašovací údaje pro správu, které vaše aplikace potřebuje ke komunikaci s centrem událostí. Pokud chcete vytvořit obor názvů a centrum událostí, postupujte podle pokynů v tomto článku.

Odesílání událostí

V této části se dozvíte, jak vytvořit aplikaci Go pro odesílání událostí do centra událostí.

Instalace balíčku Go

Získejte balíček Go pro službu Event Hubs, jak je znázorněno v následujícím příkladu.

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs

Kód pro odesílání událostí do centra událostí

Tady je kód pro odesílání událostí do centra událostí. Hlavní kroky v kódu jsou:

  1. Vytvořte klienta producenta služby Event Hubs pomocí připojovací řetězec do oboru názvů služby Event Hubs a názvu centra událostí.
  2. Vytvořte dávkový objekt a přidejte do dávky ukázkové události.
  3. Odešle dávku událostí do těchto událostí.

Důležité

Nahraďte NAMESPACE CONNECTION STRING připojovací řetězec do oboru názvů služby Event Hubs a EVENT HUB NAME názvem centra událostí v ukázkovém kódu.

package main

import (
    "context"

    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

func main() {

    // create an Event Hubs producer client using a connection string to the namespace and the event hub
    producerClient, err := azeventhubs.NewProducerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", nil)

    if err != nil {
        panic(err)
    }

    defer producerClient.Close(context.TODO())

    // create sample events
    events := createEventsForSample()

    // create a batch object and add sample events to the batch
    newBatchOptions := &azeventhubs.EventDataBatchOptions{}

    batch, err := producerClient.NewEventDataBatch(context.TODO(), newBatchOptions)

    for i := 0; i < len(events); i++ {
        err = batch.AddEventData(events[i], nil)
    }

    // send the batch of events to the event hub
    producerClient.SendEventDataBatch(context.TODO(), batch, nil)
}

func createEventsForSample() []*azeventhubs.EventData {
    return []*azeventhubs.EventData{
        {
            Body: []byte("hello"),
        },
        {
            Body: []byte("world"),
        },
    }
}

Aplikaci zatím nespousíte. Nejdřív musíte spustit aplikaci příjemce a pak aplikaci odesílatele.

Příjem událostí

Vytvoření účtu úložiště a kontejneru

Stav, například zapůjčení oddílů a kontrolních bodů v datovém proudu událostí, se sdílí mezi příjemci pomocí kontejneru Azure Storage. Pomocí sady Go SDK můžete vytvořit účet úložiště a kontejner, ale můžete ho vytvořit také podle pokynů v části O účtech úložiště Azure.

Při používání služby Azure Blob Storage jako úložiště kontrolních bodů postupujte podle těchto doporučení:

  • Pro každou skupinu příjemců použijte samostatný kontejner. Můžete použít stejný účet úložiště, ale pro každou skupinu použít jeden kontejner.
  • Nepoužívejte kontejner pro nic jiného a nepoužívejte účet úložiště pro nic jiného.
  • Účet úložiště by měl být ve stejné oblasti jako nasazená aplikace. Pokud je aplikace místní, zkuste zvolit nejbližší možnou oblast.

Na stránce účtu úložiště na webu Azure Portal v části Blob Service se ujistěte, že jsou zakázaná následující nastavení.

  • Hierarchický obor názvů
  • Obnovitelné odstranění objektu blob
  • Vytváření verzí

Balíčky Go

Pokud chcete přijímat zprávy, získejte balíčky Go pro službu Event Hubs, jak je znázorněno v následujícím příkladu.

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs

Kód pro příjem událostí z centra událostí

Tady je kód pro příjem událostí z centra událostí. Hlavní kroky v kódu jsou:

  1. Zkontrolujte objekt úložiště kontrolních bodů, který představuje službu Azure Blob Storage používanou centrem událostí pro vytváření kontrolních bodů.
  2. Vytvořte klienta příjemce služby Event Hubs pomocí připojovací řetězec do oboru názvů služby Event Hubs a názvu centra událostí.
  3. Vytvořte procesor událostí pomocí objektu klienta a objektu úložiště kontrolních bodů. Procesor přijímá a zpracovává události.
  4. Pro každý oddíl v centru událostí vytvořte klienta oddílu s processEvents jako funkcí pro zpracování událostí.
  5. Spuštěním všech klientů oddílů můžete přijímat a zpracovávat události.

Důležité

Nahraďte následující zástupné hodnoty skutečnými hodnotami:

  • AZURE STORAGE CONNECTION STRINGs připojovací řetězec pro váš účet úložiště Azure
  • BLOB CONTAINER NAME s názvem kontejneru objektů blob, který jste vytvořili v účtu úložiště
  • NAMESPACE CONNECTION STRINGs připojovací řetězec pro váš obor názvů služby Event Hubs
  • EVENT HUB NAME název centra událostí v ukázkovém kódu.
package main

import (
    "context"
    "errors"
    "fmt"
    "time"

    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
)

func main() {

    // create a container client using a connection string and container name
    checkClient, err := container.NewClientFromConnectionString("AZURE STORAGE CONNECTION STRING", "CONTAINER NAME", nil)
    
    // create a checkpoint store that will be used by the event hub
    checkpointStore, err := checkpoints.NewBlobStore(checkClient, nil)

    if err != nil {
        panic(err)
    }

    // create a consumer client using a connection string to the namespace and the event hub
    consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", azeventhubs.DefaultConsumerGroup, nil)

    if err != nil {
        panic(err)
    }

    defer consumerClient.Close(context.TODO())

    // create a processor to receive and process events
    processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)

    if err != nil {
        panic(err)
    }

    //  for each partition in the event hub, create a partition client with processEvents as the function to process events
    dispatchPartitionClients := func() {
        for {
            partitionClient := processor.NextPartitionClient(context.TODO())

            if partitionClient == nil {
                break
            }

            go func() {
                if err := processEvents(partitionClient); err != nil {
                    panic(err)
                }
            }()
        }
    }

    // run all partition clients
    go dispatchPartitionClients()

    processorCtx, processorCancel := context.WithCancel(context.TODO())
    defer processorCancel()

    if err := processor.Run(processorCtx); err != nil {
        panic(err)
    }
}

func processEvents(partitionClient *azeventhubs.ProcessorPartitionClient) error {
    defer closePartitionResources(partitionClient)
    for {
        receiveCtx, receiveCtxCancel := context.WithTimeout(context.TODO(), time.Minute)
        events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
        receiveCtxCancel()

        if err != nil && !errors.Is(err, context.DeadlineExceeded) {
            return err
        }

        fmt.Printf("Processing %d event(s)\n", len(events))

        for _, event := range events {
            fmt.Printf("Event received with body %v\n", string(event.Body))
        }

        if len(events) != 0 {
            if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1]); err != nil {
                return err
            }
        }
    }
}

func closePartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) {
    defer partitionClient.Close(context.TODO())
}

Spuštění aplikací pro příjemce a odesílatele

  1. Nejprve spusťte aplikaci příjemce.

  2. Spusťte aplikaci odesílatele.

  3. Počkejte minutu, než se v okně příjemce zobrazí následující výstup.

    Processing 2 event(s)
    Event received with body hello
    Event received with body world
    

Další kroky

Podívejte se na ukázky na GitHubu na adrese https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs.