クイックスタート: Go を使用して Event Hubs との間でイベントを送受信する

Azure Event Hubs はビッグ データ ストリーミング プラットフォームであり、毎秒数百万のイベントを受け取って処理できるイベント インジェスト サービスです。 Event Hubs では、分散されたソフトウェアやデバイスから生成されるイベント、データ、またはテレメトリを処理および格納できます。 イベント ハブに送信されたデータは、任意のリアルタイム分析プロバイダーやバッチ処理/ストレージ アダプターを使用して、変換および保存できます。 Event Hubs の詳しい概要については、Event Hubs の概要Event Hubs の機能に関するページをご覧ください。

このクイックスタートでは、イベント ハブとの間でイベントを送受信する Go アプリケーションを作成する方法について説明します。

注意

このクイックスタートは、https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs にある GitHub 上のサンプルに基づいています。 送信は example_producing_events_test.go サンプルに基づいており、受信は example_processor_test.go サンプルに基づいています。 コードはクイックスタート用に簡略化されており、すべての詳細コメントは削除されているため、詳細と説明についてはサンプルを参照してください。

前提条件

このクイック スタートを完了するには、次の前提条件を用意しておく必要があります。

  • Go がローカルにインストールされていること。 必要に応じて、こちらの手順に従います。
  • アクティブな Azure アカウントアカウントがない場合、Azure 試用版にサインアップして、最大 10 件の無料 Mobile Apps を入手できます。 Azure サブスクリプションをお持ちでない場合は、開始する前に 無料アカウント を作成してください。
  • Event Hubs 名前空間とイベント ハブを作成するAzure portal を使用して Event Hubs 型の名前空間を作成し、アプリケーションがイベント ハブと通信するために必要な管理資格情報を取得します。 名前空間とイベント ハブを作成するには、こちらの記事の手順に従います。

送信イベント

このセクションでは、イベント ハブにイベントを送信する Go アプリケーションの作成方法を説明します。

Go パッケージをインストールする

次の例に示すように、Event Hubs 用の Go パッケージを取得します。

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs

イベント ハブにイベントを送信するためのコード

イベント ハブにイベントを送信するためのコードを次に示します。 このコードでの主な手順は次のとおりです。

  1. Event Hubs 名前空間への接続文字列とイベント ハブ名を使用して、Event Hubs のプロデューサー クライアントを作成します。
  2. バッチ オブジェクトを作成し、バッチにサンプル イベントを追加します。
  3. イベントのバッチをイベント ハブに送信します。

重要

NAMESPACE CONNECTION STRING を Event Hubs 名前空間への接続文字列に置き換え、EVENT HUB NAME をサンプル コード内のイベント ハブ名に置き換えます。

package main

import (
    "context"

    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

func main() {

    // create an Event Hubs producer client using a connection string to the namespace and the event hub
    producerClient, err := azeventhubs.NewProducerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", nil)

    if err != nil {
        panic(err)
    }

    defer producerClient.Close(context.TODO())

    // create sample events
    events := createEventsForSample()

    // create a batch object and add sample events to the batch
    newBatchOptions := &azeventhubs.EventDataBatchOptions{}

    batch, err := producerClient.NewEventDataBatch(context.TODO(), newBatchOptions)

    for i := 0; i < len(events); i++ {
        err = batch.AddEventData(events[i], nil)
    }

    // send the batch of events to the event hub
    producerClient.SendEventDataBatch(context.TODO(), batch, nil)
}

func createEventsForSample() []*azeventhubs.EventData {
    return []*azeventhubs.EventData{
        {
            Body: []byte("hello"),
        },
        {
            Body: []byte("world"),
        },
    }
}

アプリケーションはまだ実行しないでください。 最初に受信側アプリを実行し、次に送信者アプリを実行する必要があります。

受信イベント

Storage アカウントとコンテナーの作成

パーティションのリースやイベント ストリーム内のチェックポイントなどの状態は、Azure Storage コンテナーを使用して受信者間で共有されます。 Go SDK を使用してストレージ アカウントとコンテナーを作成できますが、「Azure ストレージ アカウントについて」の手順に従って作成することもできます。

チェックポイント ストアとして Azure Blob Storage を使用する場合は、次の推奨事項に従ってください。

  • コンシューマー グループごとに別のコンテナーを使用します。 同じストレージ アカウントを使用できますが、各グループごとに 1 つのコンテナーを使用します。
  • コンテナーを他の何かに使用しないでください。また、ストレージ アカウントも他の何かに使用しないでください。
  • ストレージ アカウントは、デプロイされたアプリケーションが配置されているのと同じリージョンに存在する必要があります。 アプリケーションがオンプレミスの場合は、可能な中で最も近いリージョンを選択することを試みてください。

Azure portal の [ストレージ アカウント] ページの [Blob service] セクションで、次の設定が無効になっていることを確認してください。

  • 階層型名前空間
  • BLOB の論理的な削除
  • バージョン管理

Go パッケージ

メッセージを受信するには、次の例に示すように、Event Hubs 用の Go パッケージを取得します。

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs

イベント ハブからイベントを受信するためのコード

イベント ハブからイベントを受信するためのコードを次に示します。 このコードでの主な手順は次のとおりです。

  1. イベント ハブでチェックポイント処理に使用される Azure Blob Storage を表すチェックポイント ストア オブジェクトを確認します。
  2. Event Hubs 名前空間への接続文字列とイベント ハブ名を使用して、Event Hubs のコンシューマー クライアントを作成します。
  3. クライアント オブジェクトとチェックポイント ストア オブジェクトを使用してイベント プロセッサを作成します。 プロセッサによってイベントが受信されて処理されます。
  4. イベント ハブ内のパーティションごとに、イベントを処理する関数として processEvents を使用してパーティション クライアントを作成します。
  5. すべてのパーティション クライアントを実行して、イベントを受信して処理します。

重要

次のプレースホルダーの値を実際の値に置き換えます。

  • AZURE STORAGE CONNECTION STRING を、Azure ストレージ アカウントの接続文字列に
  • BLOB CONTAINER NAME を、ストレージ アカウントで作成した BLOB コンテナーの名前に
  • NAMESPACE CONNECTION STRING を、Event Hubs 名前空間への接続文字列に
  • EVENT HUB NAME を、サンプル コードのイベント ハブ名に
package main

import (
    "context"
    "errors"
    "fmt"
    "time"

    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
)

func main() {

    // create a container client using a connection string and container name
    checkClient, err := container.NewClientFromConnectionString("AZURE STORAGE CONNECTION STRING", "CONTAINER NAME", nil)
    
    // create a checkpoint store that will be used by the event hub
    checkpointStore, err := checkpoints.NewBlobStore(checkClient, nil)

    if err != nil {
        panic(err)
    }

    // create a consumer client using a connection string to the namespace and the event hub
    consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", azeventhubs.DefaultConsumerGroup, nil)

    if err != nil {
        panic(err)
    }

    defer consumerClient.Close(context.TODO())

    // create a processor to receive and process events
    processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)

    if err != nil {
        panic(err)
    }

    //  for each partition in the event hub, create a partition client with processEvents as the function to process events
    dispatchPartitionClients := func() {
        for {
            partitionClient := processor.NextPartitionClient(context.TODO())

            if partitionClient == nil {
                break
            }

            go func() {
                if err := processEvents(partitionClient); err != nil {
                    panic(err)
                }
            }()
        }
    }

    // run all partition clients
    go dispatchPartitionClients()

    processorCtx, processorCancel := context.WithCancel(context.TODO())
    defer processorCancel()

    if err := processor.Run(processorCtx); err != nil {
        panic(err)
    }
}

func processEvents(partitionClient *azeventhubs.ProcessorPartitionClient) error {
    defer closePartitionResources(partitionClient)
    for {
        receiveCtx, receiveCtxCancel := context.WithTimeout(context.TODO(), time.Minute)
        events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
        receiveCtxCancel()

        if err != nil && !errors.Is(err, context.DeadlineExceeded) {
            return err
        }

        fmt.Printf("Processing %d event(s)\n", len(events))

        for _, event := range events {
            fmt.Printf("Event received with body %v\n", string(event.Body))
        }

        if len(events) != 0 {
            if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1]); err != nil {
                return err
            }
        }
    }
}

func closePartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) {
    defer partitionClient.Close(context.TODO())
}

受信側と送信側アプリを実行する

  1. まず、受信側アプリを実行します。

  2. 送信側アプリを実行します。

  3. 受信側ウィンドウに次の出力が表示されるまで 1 分間待ちます。

    Processing 2 event(s)
    Event received with body hello
    Event received with body world
    

次の手順

https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs にある GitHub 上のサンプルを参照してください。