Mulai cepat: Mengirim acara ke atau menerima acara dari Event Hubs menggunakan Go

Azure Event Hubs adalah platform streaming Big Data dan layanan pengolahan peristiwa, yang mampu menerima dan memproses jutaan peristiwa per detik. Event Hubs dapat memproses dan menyimpan peristiwa, data, atau telemetri yang dihasilkan oleh perangkat lunak dan perangkat yang terdistribusi. Data yang dikirim ke pusat aktivitas dapat ditransformasikan dan disimpan menggunakan penyedia analitik real-time atau adapter batching/penyimpanan. Untuk ringkasan detail Azure Event Hubs, lihat Ringkasan Azure Event Hubs dan fitur Azure Event Hubs.

Mulai cepat ini menjelaskan cara menulis aplikasi Go untuk mengirim peristiwa ke atau menerima peristiwa dari pusat aktivitas.

Catatan

Mulai cepat ini didasarkan pada sampel di GitHub di https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs. Pengiriman didasarkan pada sampel example_producing_events_test.go dan yang diterima didasarkan pada sampel example_processor_test.go . Kode disederhanakan untuk mulai cepat dan semua komentar terperinci dihapus, jadi lihat sampel untuk detail dan penjelasan selengkapnya.

Prasyarat

Untuk menyelesaikan mulai cepat ini, Anda memerlukan prasyarat berikut:

  • Go dipasang secara lokal. Ikuti instruksi ini jika perlu.
  • Akun Azure aktif. Jika Anda tidak memiliki langganan Azure, buat akun gratis sebelum Anda memulai.
  • Membuat ruang nama Azure Event Hubs dan pusat aktivitas. Gunakan portal Microsoft Azure untuk membuat kumpulan nama jenis Event Hubs, dan dapatkan info manajemen yang diperlukan aplikasi Anda untuk berkomunikasi dengan pusat aktivitas. Untuk membuat namespace layanan dan pusat aktivitas, ikuti prosedur dalam artikel ini.

Mengirim aktivitas

Bagian ini menunjukkan kepada Anda cara membuat aplikasi Go untuk mengirim aktivitas ke pusat aktivitas.

Memasang paket Go

Dapatkan paket Go untuk Azure Event Hubs seperti yang ditunjukkan dalam contoh berikut.

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs

Kode untuk mengirim peristiwa ke pusat aktivitas

Berikut adalah kode untuk mengirim peristiwa ke pusat aktivitas. Langkah utama dalam kode adalah:

  1. Buat klien produsen Azure Event Hubs menggunakan string koneksi ke namespace layanan Azure Event Hubs dan nama pusat aktivitas.
  2. Buat objek batch dan tambahkan peristiwa sampel ke batch.
  3. Kirim batch peristiwa ke peristiwa ke th.

Penting

Ganti NAMESPACE CONNECTION STRING dengan string koneksi ke namespace Layanan Pusat Aktivitas Anda dan EVENT HUB NAME dengan nama hub peristiwa dalam kode sampel.

package main

import (
    "context"

    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

func main() {

    // create an Event Hubs producer client using a connection string to the namespace and the event hub
    producerClient, err := azeventhubs.NewProducerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", nil)

    if err != nil {
        panic(err)
    }

    defer producerClient.Close(context.TODO())

    // create sample events
    events := createEventsForSample()

    // create a batch object and add sample events to the batch
    newBatchOptions := &azeventhubs.EventDataBatchOptions{}

    batch, err := producerClient.NewEventDataBatch(context.TODO(), newBatchOptions)

    for i := 0; i < len(events); i++ {
        err = batch.AddEventData(events[i], nil)
    }

    // send the batch of events to the event hub
    producerClient.SendEventDataBatch(context.TODO(), batch, nil)
}

func createEventsForSample() []*azeventhubs.EventData {
    return []*azeventhubs.EventData{
        {
            Body: []byte("hello"),
        },
        {
            Body: []byte("world"),
        },
    }
}

Jangan jalankan aplikasi. Anda harus terlebih dahulu menjalankan aplikasi penerima lalu aplikasi pengirim.

Menerima peristiwa

Membuat kontainer dan akun Azure Storage

Status seperti sewa pada partisi dan titik pemeriksaan dalam aliran acara dibagikan antara penerima menggunakan kontainer Azure Storage. Anda bisa membuat akun penyimpanan dan kontainer dengan Go SDK, tetapi Anda juga bisa membuatnya dengan mengikuti instruksi di Tentang akun Azure Storage.

Ikuti rekomendasi ini saat menggunakan Azure Blob Storage sebagai penyimpanan titik pemeriksaan:

  • Gunakan kontainer terpisah untuk setiap grup konsumen. Anda dapat menggunakan akun penyimpanan yang sama, tetapi menggunakan satu kontainer per setiap grup.
  • Jangan gunakan kontainer untuk hal lain, dan jangan gunakan akun penyimpanan untuk hal lain.
  • Akun penyimpanan harus berada di wilayah yang sama dengan aplikasi yang disebarkan berada. Jika aplikasi lokal, coba pilih wilayah terdekat yang mungkin.

Pada halaman Akun penyimpanan di portal Azure, di bagian Blob service, pastikan bahwa pengaturan berikut dinonaktifkan.

  • Namespace hierarkis
  • Penghapusan sementara blob
  • Penerapan versi

Paket Go

Untuk menerima pesan, dapatkan paket Go untuk Azure Event Hubs seperti yang ditunjukkan dalam contoh berikut.

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs

Kode untuk menerima peristiwa dari pusat aktivitas

Berikut adalah kode untuk menerima peristiwa dari pusat aktivitas. Langkah utama dalam kode adalah:

  1. Periksa objek penyimpanan titik pemeriksaan yang mewakili Azure Blob Storage yang digunakan oleh pusat aktivitas untuk titik pemeriksaan.
  2. Buat klien konsumen Azure Event Hubs menggunakan string koneksi ke namespace layanan Azure Event Hubs dan nama pusat aktivitas.
  3. Buat prosesor peristiwa menggunakan objek klien dan objek penyimpanan titik pemeriksaan. Prosesor menerima dan memproses peristiwa.
  4. Untuk setiap partisi di pusat aktivitas, buat klien partisi dengan processEvents sebagai fungsi untuk memproses peristiwa.
  5. Jalankan semua klien partisi untuk menerima dan memproses peristiwa.

Penting

Ganti nilai tempat penampung berikut dengan nilai aktual:

  • AZURE STORAGE CONNECTION STRINGdengan string koneksi untuk akun penyimpanan Azure Anda
  • BLOB CONTAINER NAME dengan nama kontainer blob yang Anda buat di akun penyimpanan
  • NAMESPACE CONNECTION STRINGdengan string koneksi untuk namespace Layanan Pusat Aktivitas Anda
  • EVENT HUB NAME dengan nama pusat aktivitas dalam kode sampel.
package main

import (
    "context"
    "errors"
    "fmt"
    "time"

    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
)

func main() {

    // create a container client using a connection string and container name
    checkClient, err := container.NewClientFromConnectionString("AZURE STORAGE CONNECTION STRING", "CONTAINER NAME", nil)
    
    // create a checkpoint store that will be used by the event hub
    checkpointStore, err := checkpoints.NewBlobStore(checkClient, nil)

    if err != nil {
        panic(err)
    }

    // create a consumer client using a connection string to the namespace and the event hub
    consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", azeventhubs.DefaultConsumerGroup, nil)

    if err != nil {
        panic(err)
    }

    defer consumerClient.Close(context.TODO())

    // create a processor to receive and process events
    processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)

    if err != nil {
        panic(err)
    }

    //  for each partition in the event hub, create a partition client with processEvents as the function to process events
    dispatchPartitionClients := func() {
        for {
            partitionClient := processor.NextPartitionClient(context.TODO())

            if partitionClient == nil {
                break
            }

            go func() {
                if err := processEvents(partitionClient); err != nil {
                    panic(err)
                }
            }()
        }
    }

    // run all partition clients
    go dispatchPartitionClients()

    processorCtx, processorCancel := context.WithCancel(context.TODO())
    defer processorCancel()

    if err := processor.Run(processorCtx); err != nil {
        panic(err)
    }
}

func processEvents(partitionClient *azeventhubs.ProcessorPartitionClient) error {
    defer closePartitionResources(partitionClient)
    for {
        receiveCtx, receiveCtxCancel := context.WithTimeout(context.TODO(), time.Minute)
        events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
        receiveCtxCancel()

        if err != nil && !errors.Is(err, context.DeadlineExceeded) {
            return err
        }

        fmt.Printf("Processing %d event(s)\n", len(events))

        for _, event := range events {
            fmt.Printf("Event received with body %v\n", string(event.Body))
        }

        if len(events) != 0 {
            if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1]); err != nil {
                return err
            }
        }
    }
}

func closePartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) {
    defer partitionClient.Close(context.TODO())
}

Menjalankan aplikasi penerima dan pengirim

  1. Jalankan aplikasi penerima terlebih dahulu.

  2. Jalankan aplikasi pengirim.

  3. Tunggu sebentar untuk melihat output berikut di jendela penerima.

    Processing 2 event(s)
    Event received with body hello
    Event received with body world
    

Langkah berikutnya

Lihat sampel di GitHub di https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs.