Rövid útmutató: Események küldése az Event Hubsba vagy események fogadása a Go használatával

Az Azure Event Hubs egy Big Data streamplatform és eseményfeldolgozó szolgáltatás, amely másodpercenként több millió esemény fogadására és feldolgozására képes. Az Event Hubs képes az elosztott szoftverek és eszközök által generált események, adatok vagy telemetria feldolgozására és tárolására. Az eseményközpontokba elküldött adatok bármilyen valós idejű elemzési szolgáltató vagy kötegelési/tárolóadapter segítségével átalakíthatók és tárolhatók. Az Event Hubs részletes áttekintéséért lásd az Event Hubs áttekintését és az Event Hubs-szolgáltatásokat ismertető cikket.

Ez a rövid útmutató bemutatja, hogyan írhat Go-alkalmazásokat események eseményközpontba való küldéséhez vagy fogadásához.

Feljegyzés

Ez a rövid útmutató a GitHubon https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubsa következő helyen található mintákon alapul: . A küldés a example_producing_events_test.go mintán alapul, a fogadás pedig a example_processor_test.go mintán alapul. A kód egyszerűbb a rövid útmutatóhoz, és az összes részletes megjegyzés el lesz távolítva, ezért további részletekért és magyarázatért tekintse meg a mintákat.

Előfeltételek

A rövid útmutató elvégzéséhez a következő előfeltételekre van szüksége:

  • A telepítés helyileg történik. Szükség esetén kövesse ezeket az utasításokat .
  • Aktív Azure-fiók. Ha még nincs Azure-előfizetése, kezdés előtt hozzon létre egy ingyenes fiókot.
  • Hozzon létre egy Event Hubs-névteret és egy eseményközpontot. Az Azure Portal használatával hozzon létre egy Event Hubs típusú névteret, és szerezze be az alkalmazás által az eseményközponttal való kommunikációhoz szükséges felügyeleti hitelesítő adatokat. Névtér és eseményközpont létrehozásához kövesse az ebben a cikkben ismertetett eljárást.

Események küldése

Ez a szakasz bemutatja, hogyan hozhat létre Go-alkalmazást események eseményközpontba való küldéséhez.

Go-csomag telepítése

Szerezze be az Event Hubs Go csomagját az alábbi példában látható módon.

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs

Események eseményközpontba küldéséhez használható kód

A következő kód segítségével küldhet eseményeket egy eseményközpontba. A kód fő lépései a következők:

  1. Hozzon létre egy Event Hubs-gyártó ügyfelet egy kapcsolati sztring az Event Hubs-névtérhez és az eseményközpont nevéhez.
  2. Hozzon létre egy kötegobjektumot, és adjon hozzá mintaeseményeket a köteghez.
  3. Küldje el az események kötegét az eseményeknek.

Fontos

Cserélje le NAMESPACE CONNECTION STRING a kapcsolati sztring az Event Hubs-névtérre és EVENT HUB NAME az eseményközpont nevére a mintakódban.

package main

import (
    "context"

    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

func main() {

    // create an Event Hubs producer client using a connection string to the namespace and the event hub
    producerClient, err := azeventhubs.NewProducerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", nil)

    if err != nil {
        panic(err)
    }

    defer producerClient.Close(context.TODO())

    // create sample events
    events := createEventsForSample()

    // create a batch object and add sample events to the batch
    newBatchOptions := &azeventhubs.EventDataBatchOptions{}

    batch, err := producerClient.NewEventDataBatch(context.TODO(), newBatchOptions)

    for i := 0; i < len(events); i++ {
        err = batch.AddEventData(events[i], nil)
    }

    // send the batch of events to the event hub
    producerClient.SendEventDataBatch(context.TODO(), batch, nil)
}

func createEventsForSample() []*azeventhubs.EventData {
    return []*azeventhubs.EventData{
        {
            Body: []byte("hello"),
        },
        {
            Body: []byte("world"),
        },
    }
}

Még ne futtassa az alkalmazást. Először futtatnia kell a fogadó alkalmazást, majd a küldő alkalmazást.

Események fogadása

Tárfiók és tároló létrehozása

Az állapotok, például a partíciók bérletei és az eseménystreamben lévő ellenőrzőpontok egy Azure Storage-tároló használatával vannak megosztva a fogadók között. A Go SDK-val tárfiókot és tárolót is létrehozhat, de létrehozhat egyet is az Azure Storage-fiókokra vonatkozó utasítások követésével.

Kövesse az alábbi javaslatokat az Azure Blob Storage ellenőrzőpont-tárolóként való használatakor:

  • Minden fogyasztói csoporthoz használjon külön tárolót. Használhatja ugyanazt a tárfiókot, de csoportonként egy tárolót.
  • Ne használja a tárolót semmi máshoz, és ne használja a tárfiókot semmi máshoz.
  • A tárfióknak ugyanabban a régióban kell lennie, amelyben az üzembe helyezett alkalmazás található. Ha az alkalmazás helyszíni, próbálja meg kiválasztani a lehető legközelebbi régiót.

Az Azure Portal Tárfiók lapján, a Blob szolgáltatás szakaszában győződjön meg arról, hogy a következő beállítások le vannak tiltva.

  • Hierarchikus névtér
  • Blob áltörlése
  • Verziókezelés

Go-csomagok

Az üzenetek fogadásához szerezze be az Event Hubs Go-csomagjait az alábbi példában látható módon.

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs

Események fogadásának kódja egy eseményközpontból

Az alábbi kód segítségével fogadhatja az eseményeket egy eseményközpontból. A kód fő lépései a következők:

  1. Ellenőrizze az eseményközpont által az ellenőrzéshez használt Azure Blob Storage-objektumot.
  2. Hozzon létre egy Event Hubs fogyasztói ügyfelet egy kapcsolati sztring az Event Hubs-névtérhez és az eseményközpont nevéhez.
  3. Hozzon létre egy eseményfeldolgozót az ügyfélobjektum és az ellenőrzőpont-tároló objektum használatával. A processzor eseményeket fogad és dolgoz fel.
  4. Az eseményközpontban minden partícióhoz hozzon létre egy partícióügyfélt, amely a processEvents függvényt tartalmazza az események feldolgozásához.
  5. Futtassa az összes particionálási ügyfelet az események fogadásához és feldolgozásához.

Fontos

Cserélje le a következő helyőrző értékeket tényleges értékekre:

  • AZURE STORAGE CONNECTION STRINGaz Azure Storage-fiók kapcsolati sztring
  • BLOB CONTAINER NAME a tárfiókban létrehozott blobtároló nevével
  • NAMESPACE CONNECTION STRINGaz Event Hubs-névtér kapcsolati sztring
  • EVENT HUB NAME az eseményközpont nevével a mintakódban.
package main

import (
    "context"
    "errors"
    "fmt"
    "time"

    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
)

func main() {

    // create a container client using a connection string and container name
    checkClient, err := container.NewClientFromConnectionString("AZURE STORAGE CONNECTION STRING", "CONTAINER NAME", nil)
    
    // create a checkpoint store that will be used by the event hub
    checkpointStore, err := checkpoints.NewBlobStore(checkClient, nil)

    if err != nil {
        panic(err)
    }

    // create a consumer client using a connection string to the namespace and the event hub
    consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", azeventhubs.DefaultConsumerGroup, nil)

    if err != nil {
        panic(err)
    }

    defer consumerClient.Close(context.TODO())

    // create a processor to receive and process events
    processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)

    if err != nil {
        panic(err)
    }

    //  for each partition in the event hub, create a partition client with processEvents as the function to process events
    dispatchPartitionClients := func() {
        for {
            partitionClient := processor.NextPartitionClient(context.TODO())

            if partitionClient == nil {
                break
            }

            go func() {
                if err := processEvents(partitionClient); err != nil {
                    panic(err)
                }
            }()
        }
    }

    // run all partition clients
    go dispatchPartitionClients()

    processorCtx, processorCancel := context.WithCancel(context.TODO())
    defer processorCancel()

    if err := processor.Run(processorCtx); err != nil {
        panic(err)
    }
}

func processEvents(partitionClient *azeventhubs.ProcessorPartitionClient) error {
    defer closePartitionResources(partitionClient)
    for {
        receiveCtx, receiveCtxCancel := context.WithTimeout(context.TODO(), time.Minute)
        events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
        receiveCtxCancel()

        if err != nil && !errors.Is(err, context.DeadlineExceeded) {
            return err
        }

        fmt.Printf("Processing %d event(s)\n", len(events))

        for _, event := range events {
            fmt.Printf("Event received with body %v\n", string(event.Body))
        }

        if len(events) != 0 {
            if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1]); err != nil {
                return err
            }
        }
    }
}

func closePartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) {
    defer partitionClient.Close(context.TODO())
}

Fogadó- és feladóalkalmazások futtatása

  1. Először futtassa a fogadóalkalmazást.

  2. Futtassa a küldő alkalmazást.

  3. Várjon egy percig, amíg a következő kimenet jelenik meg a fogadóablakban.

    Processing 2 event(s)
    Event received with body hello
    Event received with body world
    

Következő lépések

Minták megtekintése a GitHubon a következő címen https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs: .