Avro (Java) を使用して Apache Kafka アプリケーションのスキーマを検証する

このクイックスタート ガイドでは、Event Hubs 用の Azure スキーマ レジストリを使用して Apache Kafka アプリケーションからのイベントを検証する方法について説明します。

このユース ケースの場合、Kafka プロデューサー アプリケーションでは、Azure スキーマ レジストリに保存されている Avro スキーマを使用して、イベントをシリアル化し、Azure Event Hubs の Kafka トピックまたはイベント ハブに発行します。 Kafka コンシューマーでは、それが使用する、Event Hubs からのイベントが逆シリアル化されます。 そのために、イベントと Avro スキーマのスキーマ ID が使用されます。この ID は Azure スキーマ レジストリに保存されています。

Diagram showing schema serialization/de-serialization for Kafka applications using Avro schema.

前提条件

Azure Event Hubs を初めて使用する場合は、このクイックスタートを行う前に Event Hubs の概要を参照してください。

このクイック スタートを完了するには、次の前提条件を用意しておく必要があります。

イベント ハブの作成

Event Hubs 名前空間とイベント ハブを作成する」のクイックスタートの手順に従って、Event Hubs 名前空間とイベント ハブを作成します。 次に、接続文字列を取得する方法に関するページの手順に従って、使用している Event Hubs 名前空間への接続文字列を取得します。

現在のクイックスタートで使用する次の設定をメモします。

  • Event Hubs 名前空間の接続文字列
  • イベント ハブの名前

スキーマの作成

スキーマ レジストリを使用してスキーマを作成する方法に関するページの手順に従って、スキーマ グループとスキーマを作成します。

  1. スキーマ レジストリ ポータルを使用して、contoso-sg という名前のスキーマ グループを作成します。 シリアル化の種類として Avro を使用し、互換性モードに [なし] を使用します。

  2. そのスキーマ グループで、次のスキーマ コンテンツを使用して、スキーマ名 Microsoft.Azure.Data.SchemaRegistry.example.Order を使用して新しい Avro スキーマを作成します。

    {
      "namespace": "Microsoft.Azure.Data.SchemaRegistry.example",
      "type": "record",
      "name": "Order",
      "fields": [
        {
          "name": "id",
          "type": "string"
        },
        {
          "name": "amount",
          "type": "double"
        },
        {
          "name": "description",
          "type": "string"
        }
      ]
    } 
    

スキーマ レジストリにアクセスするアプリケーションを登録する

Microsoft Entra ID を使用して、クライアント アプリケーションを Azure portal から Microsoft Entra テナントに登録することで、Kafka プロデューサーおよびコンシューマー アプリケーションによる Azure スキーマ レジストリ リソースへのアクセスを承認できます。

example-app という名前の Microsoft Entra アプリケーションの登録については、「アプリケーションを Microsoft Entra テナントに登録する」をご覧ください。

  • tenant.id - アプリケーションのテナント ID を設定します
  • client.id - アプリケーションのクライアント ID を設定します
  • client.secret - 認証用のクライアント シークレットを設定します

さらに、マネージド ID を使用している場合は、以下が必要になります。

  • use.managed.identity.credential - MSI 資格情報を使用する必要があることを示します。MSI 対応 VM に使用してください
  • managed.identity.clientId - 指定した場合は、指定されたクライアント ID を使用して MSI 資格情報が構築されます
  • managed.identity.clientId - 指定した場合は、指定されたリソース ID を使用して MSI 資格情報が構築されます

スキーマ レジストリ閲覧者ロールにユーザーを追加する

名前空間レベルでスキーマ レジストリ閲覧者ロールにユーザー アカウントを追加します。 スキーマ レジストリ共同作成者ロールを使用することもできますが、このクイックスタートでは必要ありません。

  1. [Event Hubs 名前空間] ページで、左側のメニューの [アクセス制御 (IAM)] を選択します。
  2. [アクセス制御 (IAM)] ページのメニューで、[+ 追加] ->[ロールの割り当てを追加] を選択します。
  3. [割り当ての種類] ページで、[次へ] を選択します。
  4. [ロール] ページで、[スキーマ レジストリ閲覧者 (プレビュー)] を選択し、ページの下部にある [次へ] を選択します。
  5. [+ Select members] (+ メンバーを選択する) リンクを使用して、前の手順で作成した example-app アプリケーションをロールに追加し、[次へ] を選択します。
  6. [確認と割り当て] ページで、[確認と割り当て] を選択します。

Kafka アプリケーションのクライアント アプリケーション構成を更新する

Kafka プロデューサー アプリケーションとコンシューマー アプリケーションのクライアント構成を、作成した Microsoft Entra アプリケーションに関連した構成とスキーマ レジストリの情報を使用して更新する必要があります。

Kafka プロデューサーの構成を更新するには、azure-schema-registry-for-kafka/tree/master/java/avro/samples/kafka-producer に移動します。

  1. Event Hubs の Kafka クイック スタート ガイドに従って、src/main/resources/app.properties で Kafka アプリケーションの構成を更新します。

  2. スキーマ レジストリ関連の構成と上記で作成した Microsoft Entra アプリケーションを使用して、src/main/resources/app.properties のプロデューサーの構成の詳細を、次のように更新します。

    schema.group=contoso-sg
    schema.registry.url=https://<NAMESPACENAME>.servicebus.windows.net
    
     tenant.id=<>
     client.id=<>
     client.secret=<>
    
  3. 同じ手順に従って、azure-schema-registry-for-kafka/tree/master/java/avro/samples/kafka-consumer の構成も更新します。

  4. Kafka プロデューサー アプリケーションとコンシューマー アプリケーションのどちらにも、次の Avro スキーマが使用されます。

    {
      "namespace": "com.azure.schemaregistry.samples",
      "type": "record",
      "name": "Order",
      "fields": [
        {
          "name": "id",
          "type": "string"
        },
        {
          "name": "amount",
          "type": "double"
        },
        {
          "name": "description",
          "type": "string"
        }
      ]
    }
    

Avro スキーマ検証で Kafka プロデューサーを使用する

Kafka プロデューサー アプリケーションを実行するには、azure-schema-registry-for-kafka/tree/master/java/avro/samples/kafka-producer に移動します。

  1. プロデューサー アプリケーションを実行すると、Avro 固有のレコードまたは汎用レコードをそのアプリケーションで生成できます。 特定のレコードのモードでは、まず、次の maven コマンドを使用して、プロデューサー スキーマに対してクラスを生成する必要があります。

    mvn generate-sources
    
  2. その後に、次のコマンドを使用してプロデューサー アプリケーションを実行できます。

    mvn clean package
    mvn -e clean compile exec:java -Dexec.mainClass="com.azure.schemaregistry.samples.producer.App"
    
  3. プロデューサー アプリケーションが正常に実行されると、プロデューサー シナリオを選択するように求められます。 このクイックスタートでは、オプション 1 - produce SpecificRecords を選択します。

    Enter case number:
    1 - produce Avro SpecificRecords
    2 - produce Avro GenericRecords
    
  4. データのシリアル化と発行が正常に行われると、プロデューサー アプリケーションに次のコンソール ログが表示されます。

    INFO com.azure.schemaregistry.samples.producer.KafkaAvroSpecificRecord - Sent Order {"id": "ID-0", "amount": 10.0, "description": "Sample order 0"}
    INFO com.azure.schemaregistry.samples.producer.KafkaAvroSpecificRecord - Sent Order {"id": "ID-1", "amount": 11.0, "description": "Sample order 1"}
    INFO com.azure.schemaregistry.samples.producer.KafkaAvroSpecificRecord - Sent Order {"id": "ID-2", "amount": 12.0, "description": "Sample order 2"}
    

Avro スキーマ検証で Kafka コンシューマーを使用する

Kafka コンシューマー アプリケーションを実行するには、azure-schema-registry-for-kafka/tree/master/java/avro/samples/kafka-consumer に移動します。

  1. コンシューマー アプリケーションを実行すると、Avro 固有のレコードまたは汎用レコードをそのアプリケーションで使用できます。 特定のレコードのモードでは、まず、次の maven コマンドを使用して、プロデューサー スキーマに対してクラスを生成する必要があります。

    mvn generate-sources
    
  2. その後に、次のコマンドを使用してコンシューマー アプリケーションを実行できます。

    mvn clean package
    mvn -e clean compile exec:java -Dexec.mainClass="com.azure.schemaregistry.samples.consumer.App"
    
  3. コンシューマー アプリケーションが正常に実行されると、プロデューサー シナリオを選択するように求められます。 このクイックスタートでは、オプション 1 - consume Avro SpecificRecords を選択します。

    Enter case number:
    1 - consume Avro SpecificRecords
    2 - consume Avro GenericRecords
    
  4. データの使用と逆シリアル化が正常に行われると、プロデューサー アプリケーションに次のコンソール ログが表示されます。

    INFO com.azure.schemaregistry.samples.consumer.KafkaAvroSpecificRecord - Order received: {"id": "ID-0", "amount": 10.0, "description": "Sample order 0"}
    INFO com.azure.schemaregistry.samples.consumer.KafkaAvroSpecificRecord - Order received: {"id": "ID-1", "amount": 11.0, "description": "Sample order 1"}
    INFO com.azure.schemaregistry.samples.consumer.KafkaAvroSpecificRecord - Order received: {"id": "ID-2", "amount": 12.0, "description": "Sample order 2"}
    

リソースをクリーンアップする

Event Hubs 名前空間を削除するか、名前空間を含むリソース グループを削除します。