Rövid útmutató: Események küldése az Event Hubsba vagy események fogadása a Go használatával
Az Azure Event Hubs egy Big Data streamplatform és eseményfeldolgozó szolgáltatás, amely másodpercenként több millió esemény fogadására és feldolgozására képes. Az Event Hubs képes az elosztott szoftverek és eszközök által generált események, adatok vagy telemetria feldolgozására és tárolására. Az eseményközpontokba elküldött adatok bármilyen valós idejű elemzési szolgáltató vagy kötegelési/tárolóadapter segítségével átalakíthatók és tárolhatók. Az Event Hubs részletes áttekintéséért lásd az Event Hubs áttekintését és az Event Hubs-szolgáltatásokat ismertető cikket.
Ez a rövid útmutató bemutatja, hogyan írhat Go-alkalmazásokat események eseményközpontba való küldéséhez vagy fogadásához.
Feljegyzés
Ez a rövid útmutató a GitHubon https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubsa következő helyen található mintákon alapul: . Az Események küldése szakasz a example_producing_events_test.go mintán alapul, a fogadás pedig a example_processor_test.go mintán alapul. A kód egyszerűbb a rövid útmutatóhoz, és az összes részletes megjegyzés el lesz távolítva, ezért további részletekért és magyarázatért tekintse meg a mintákat.
Előfeltételek
A rövid útmutató elvégzéséhez a következő előfeltételekre van szüksége:
- A telepítés helyileg történik. Szükség esetén kövesse ezeket az utasításokat .
- Aktív Azure-fiók. Ha még nincs Azure-előfizetése, kezdés előtt hozzon létre egy ingyenes fiókot.
- Hozzon létre egy Event Hubs-névteret és egy eseményközpontot. Az Azure Portal használatával hozzon létre egy Event Hubs típusú névteret, és szerezze be az alkalmazás által az eseményközponttal való kommunikációhoz szükséges felügyeleti hitelesítő adatokat. Névtér és eseményközpont létrehozásához kövesse az ebben a cikkben ismertetett eljárást.
Események küldése
Ez a szakasz bemutatja, hogyan hozhat létre Go-alkalmazást események eseményközpontba való küldéséhez.
Go-csomag telepítése
Szerezze be az Event Hubs Go csomagját az alábbi példában látható módon.
go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs
Események eseményközpontba küldéséhez használható kód
A következő kód segítségével küldhet eseményeket egy eseményközpontba. A kód fő lépései a következők:
- Hozzon létre egy Event Hubs-gyártó ügyfelet egy kapcsolati sztring az Event Hubs-névtérhez és az eseményközpont nevéhez.
- Hozzon létre egy kötegobjektumot, és adjon hozzá mintaeseményeket a köteghez.
- Küldje el az események kötegét az eseményeknek.
Fontos
Cserélje le NAMESPACE CONNECTION STRING
a kapcsolati sztring az Event Hubs-névtérre és EVENT HUB NAME
az eseményközpont nevére a mintakódban.
package main
import (
"context"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)
func main() {
// create an Event Hubs producer client using a connection string to the namespace and the event hub
producerClient, err := azeventhubs.NewProducerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", nil)
if err != nil {
panic(err)
}
defer producerClient.Close(context.TODO())
// create sample events
events := createEventsForSample()
// create a batch object and add sample events to the batch
newBatchOptions := &azeventhubs.EventDataBatchOptions{}
batch, err := producerClient.NewEventDataBatch(context.TODO(), newBatchOptions)
if err != nil {
panic(err)
}
for i := 0; i < len(events); i++ {
err = batch.AddEventData(events[i], nil)
if err != nil {
panic(err)
}
}
// send the batch of events to the event hub
err = producerClient.SendEventDataBatch(context.TODO(), batch, nil)
if err != nil {
panic(err)
}
}
func createEventsForSample() []*azeventhubs.EventData {
return []*azeventhubs.EventData{
{
Body: []byte("hello"),
},
{
Body: []byte("world"),
},
}
}
Még ne futtassa az alkalmazást. Először futtatnia kell a fogadó alkalmazást, majd a küldő alkalmazást.
Események fogadása
Tárfiók és tároló létrehozása
Az állapotok, például a partíciók bérletei és az események ellenőrzőpontjai egy Azure Storage-tároló használatával vannak megosztva a fogadók között. A Go SDK-val tárfiókot és tárolót is létrehozhat, de létrehozhat egyet is az Azure Storage-fiókokra vonatkozó utasítások követésével.
Kövesse az alábbi javaslatokat az Azure Blob Storage ellenőrzőpont-tárolóként való használatakor:
- Minden fogyasztói csoporthoz használjon külön tárolót. Használhatja ugyanazt a tárfiókot, de csoportonként egy tárolót.
- Ne használja a tárolót semmi máshoz, és ne használja a tárfiókot semmi máshoz.
- A tárfióknak ugyanabban a régióban kell lennie, amelyben az üzembe helyezett alkalmazás található. Ha az alkalmazás helyszíni, próbálja meg kiválasztani a lehető legközelebbi régiót.
Az Azure Portal Tárfiók lapján, a Blob szolgáltatás szakaszában győződjön meg arról, hogy a következő beállítások le vannak tiltva.
- Hierarchikus névtér
- Blob áltörlése
- Verziókezelés
Go-csomagok
Az üzenetek fogadásához szerezze be az Event Hubs Go-csomagjait az alábbi példában látható módon.
go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs
go get github.com/Azure/azure-sdk-for-go/sdk/storage/azblob
Események fogadásának kódja egy eseményközpontból
Az alábbi kód segítségével fogadhatja az eseményeket egy eseményközpontból. A kód fő lépései a következők:
- Ellenőrizze az eseményközpont által az ellenőrzéshez használt Azure Blob Storage-objektumot.
- Hozzon létre egy Event Hubs fogyasztói ügyfelet egy kapcsolati sztring az Event Hubs-névtérhez és az eseményközpont nevéhez.
- Hozzon létre egy eseményfeldolgozót az ügyfélobjektum és az ellenőrzőpont-tároló objektum használatával. A processzor eseményeket fogad és dolgoz fel.
- Az eseményközpontban minden partícióhoz hozzon létre egy partícióügyfélt, amely a processEvents függvényt tartalmazza az események feldolgozásához.
- Futtassa az összes particionálási ügyfelet az események fogadásához és feldolgozásához.
Fontos
Cserélje le a következő helyőrző értékeket tényleges értékekre:
AZURE STORAGE CONNECTION STRING
az Azure Storage-fiók kapcsolati sztringBLOB CONTAINER NAME
a tárfiókban létrehozott blobtároló nevévelNAMESPACE CONNECTION STRING
az Event Hubs-névtér kapcsolati sztringEVENT HUB NAME
az eseményközpont nevével a mintakódban.
package main
import (
"context"
"errors"
"fmt"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
)
func main() {
// create a container client using a connection string and container name
checkClient, err := container.NewClientFromConnectionString("AZURE STORAGE CONNECTION STRING", "CONTAINER NAME", nil)
if err != nil {
panic(err)
}
// create a checkpoint store that will be used by the event hub
checkpointStore, err := checkpoints.NewBlobStore(checkClient, nil)
if err != nil {
panic(err)
}
// create a consumer client using a connection string to the namespace and the event hub
consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", azeventhubs.DefaultConsumerGroup, nil)
if err != nil {
panic(err)
}
defer consumerClient.Close(context.TODO())
// create a processor to receive and process events
processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)
if err != nil {
panic(err)
}
// for each partition in the event hub, create a partition client with processEvents as the function to process events
dispatchPartitionClients := func() {
for {
partitionClient := processor.NextPartitionClient(context.TODO())
if partitionClient == nil {
break
}
go func() {
if err := processEvents(partitionClient); err != nil {
panic(err)
}
}()
}
}
// run all partition clients
go dispatchPartitionClients()
processorCtx, processorCancel := context.WithCancel(context.TODO())
defer processorCancel()
if err := processor.Run(processorCtx); err != nil {
panic(err)
}
}
func processEvents(partitionClient *azeventhubs.ProcessorPartitionClient) error {
defer closePartitionResources(partitionClient)
for {
receiveCtx, receiveCtxCancel := context.WithTimeout(context.TODO(), time.Minute)
events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
receiveCtxCancel()
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
return err
}
fmt.Printf("Processing %d event(s)\n", len(events))
for _, event := range events {
fmt.Printf("Event received with body %v\n", string(event.Body))
}
if len(events) != 0 {
if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1], nil); err != nil {
return err
}
}
}
}
func closePartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) {
defer partitionClient.Close(context.TODO())
}
Fogadó- és feladóalkalmazások futtatása
Először futtassa a fogadóalkalmazást.
Futtassa a küldő alkalmazást.
Várjon egy percig, amíg a következő kimenet jelenik meg a fogadóablakban.
Processing 2 event(s) Event received with body hello Event received with body world
Következő lépések
Minták megtekintése a GitHubon a következő címen https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs: .