次の方法で共有


Avro (Java) を使用して Apache Kafka アプリケーションのスキーマを検証する

このクイック スタート ガイドでは、Azure Schema Registry for Event Hubs を使用して Apache Kafka アプリケーションからのイベントを検証する方法について説明します。

このユース ケースでは、Kafka プロデューサー アプリケーションは、Azure Schema Registry に格納されている Avro スキーマを使用して、イベントをシリアル化し、Azure Event Hubs の Kafka トピック/イベント ハブに発行します。 Kafka コンシューマーは、Event Hubs から使用するイベントを逆シリアル化します。 そのため、イベントのスキーマ ID と、Azure Schema Registry に格納されている Avro スキーマを使用します。

Avro スキーマを使用した Kafka アプリケーションのスキーマのシリアル化/シリアル化解除を示す図。

[前提条件]

Azure Event Hubs を初めて使用する場合は、このクイックスタートを行う前に 、Event Hubs の概要 を参照してください。

このクイックスタートを完了するには、次の前提条件が必要です。

イベント ハブの作成

「クイックスタート: Event Hubs 名前空間とイベント ハブを作成して Event Hubs 名前空間とイベント ハブ を作成する」の手順に従います。 次に、「 接続文字列を取得する」の 手順に従って、Event Hubs 名前空間への接続文字列を取得します。

現在のクイック スタートで使用する次の設定に注意してください。

  • Event Hubs 名前空間の接続文字列
  • イベント ハブの名前

スキーマの作成

スキーマ レジストリを使用したスキーマの作成に関する記事の手順に従って、スキーマ グループとスキーマを作成します。

  1. スキーマ レジストリ ポータルを使用して 、contoso-sg という名前のスキーマ グループを作成します。 シリアル化の種類として Avro を使用し、互換モードには None を使用します。

  2. そのスキーマ グループで、スキーマ名を持つ新しい Avro スキーマを作成します。 Microsoft.Azure.Data.SchemaRegistry.example.Order 次のスキーマ コンテンツを使用します。

    {
      "namespace": "Microsoft.Azure.Data.SchemaRegistry.example",
      "type": "record",
      "name": "Order",
      "fields": [
        {
          "name": "id",
          "type": "string"
        },
        {
          "name": "amount",
          "type": "double"
        },
        {
          "name": "description",
          "type": "string"
        }
      ]
    } 
    

スキーマ レジストリにアクセスするためのアプリケーションの登録

Microsoft Entra ID を使用して、クライアント アプリケーションを Azure portal から Microsoft Entra テナントに登録することで、Kafka プロデューサーおよびコンシューマー アプリケーションが Azure Schema Registry リソースにアクセスすることを承認できます。

example-appという名前の Microsoft Entra アプリケーションを登録するには、「アプリケーションを Microsoft Entra テナントに登録する」を参照してください。

  • tenant.id - アプリケーションのテナント ID を設定します
  • client.id - アプリケーションのクライアント ID を設定します
  • client.secret - 認証用のクライアント シークレットを設定します

マネージド ID を使用している場合は、次のものが必要です。

  • use.managed.identity.credential - MSI 資格情報を使用する必要があることを示し、MSI 対応 VM に使用する必要があります
  • managed.identity.clientId - 指定されている場合は、指定されたクライアント ID を使用して MSI 資格情報を構築します
  • managed.identity.resourceId - 指定した場合は、指定されたリソース ID を使用して MSI 資格情報を構築します

スキーマ レジストリ閲覧者ロールにユーザーを追加する

名前空間レベルで スキーマ レジストリ閲覧者 ロールにユーザー アカウントを追加します。 スキーマ レジストリ共同作成者ロールを使用することもできますが、このクイック スタートでは必要ありません。

  1. [Event Hubs 名前空間] ページで、左側のメニューの [アクセス制御 (IAM)] を選択します。
  2. [アクセス制御 (IAM)] ページで、メニューの [+ 追加] -> [ロールの割り当ての追加] を選択します。
  3. [ 割り当ての種類 ] ページで、[ 次へ] を選択します。
  4. [ ロール ] ページで、 スキーマ レジストリ 閲覧者 (プレビュー) を選択し、ページの下部にある [次へ ] を選択します。
  5. [+ メンバーの選択] リンクを使用して、前の手順で作成したexample-app アプリケーションをロールに追加し、[次へ] を選択します。
  6. [ 確認と割り当て ] ページで、[ 確認と割り当て] を選択します。

Kafka アプリケーションのクライアント アプリケーション構成を更新する

Kafka プロデューサーアプリケーションとコンシューマー アプリケーションのクライアント構成を、作成した Microsoft Entra アプリケーションに関連する構成とスキーマ レジストリ情報で更新する必要があります。

Kafka プロデューサー構成を更新するには、 azure-schema-registry-for-kafka/tree/master/java/avro/samples/kafka-producer に移動します。

  1. Event Hubs の Kafka クイック スタート ガイドに従って、src/main/resources/app.properties で Kafka アプリケーションの構成を更新します。

  2. スキーマ レジストリ関連の構成と、上記で作成した Microsoft Entra アプリケーションを使用して 、src/main/resources/app.properties のプロデューサーの構成の詳細を次のように更新します。

    schema.group=contoso-sg
    schema.registry.url=https://<NAMESPACENAME>.servicebus.windows.net
    
     tenant.id=<>
     client.id=<>
     client.secret=<>
    
  3. 同じ手順に従い、 azure-schema-registry-for-kafka/tree/master/java/avro/samples/kafka-consumer の構成も更新します。

  4. Kafka プロデューサー アプリケーションとコンシューマー アプリケーションの両方で、次の Avro スキーマが使用されます。

    {
      "namespace": "com.azure.schemaregistry.samples",
      "type": "record",
      "name": "Order",
      "fields": [
        {
          "name": "id",
          "type": "string"
        },
        {
          "name": "amount",
          "type": "double"
        },
        {
          "name": "description",
          "type": "string"
        }
      ]
    }
    

Avro スキーマ検証での Kafka プロデューサーの使用

Kafka プロデューサー アプリケーションを実行するには、 azure-schema-registry-for-kafka/tree/master/java/avro/samples/kafka-producer に移動します。

  1. プロデューサー アプリケーションを実行して、Avro 固有のレコードまたは汎用レコードを生成できます。 特定のレコード モードでは、まず、次の maven コマンドを使用して、プロデューサー スキーマに対してクラスを生成する必要があります。

    mvn generate-sources
    
  2. 次のコマンドを使用して、プロデューサー アプリケーションを実行できます。

    mvn clean package
    mvn -e clean compile exec:java -Dexec.mainClass="com.azure.schemaregistry.samples.producer.App"
    
  3. プロデューサー アプリケーションが正常に実行されると、プロデューサー シナリオを選択するように求められます。 このクイック スタートでは、オプション 1 - Avro SpecificRecords の生成を選択できます。

    Enter case number:
    1 - produce Avro SpecificRecords
    2 - produce Avro GenericRecords
    
  4. データのシリアル化と発行が成功すると、プロデューサー アプリケーションに次のコンソール ログが表示されます。

    INFO com.azure.schemaregistry.samples.producer.KafkaAvroSpecificRecord - Sent Order {"id": "ID-0", "amount": 10.0, "description": "Sample order 0"}
    INFO com.azure.schemaregistry.samples.producer.KafkaAvroSpecificRecord - Sent Order {"id": "ID-1", "amount": 11.0, "description": "Sample order 1"}
    INFO com.azure.schemaregistry.samples.producer.KafkaAvroSpecificRecord - Sent Order {"id": "ID-2", "amount": 12.0, "description": "Sample order 2"}
    

Avro スキーマ検証での Kafka コンシューマーの使用

Kafka コンシューマー アプリケーションを実行するには、 azure-schema-registry-for-kafka/tree/master/java/avro/samples/kafka-consumer に移動します。

  1. コンシューマー アプリケーションを実行して、Avro 固有のレコードまたは汎用レコードを使用できるようにします。 特定のレコード モードでは、まず、次の maven コマンドを使用して、プロデューサー スキーマに対してクラスを生成する必要があります。

    mvn generate-sources
    
  2. その後、次のコマンドを使用してコンシューマー アプリケーションを実行できます。

    mvn clean package
    mvn -e clean compile exec:java -Dexec.mainClass="com.azure.schemaregistry.samples.consumer.App"
    
  3. コンシューマー アプリケーションが正常に実行されると、プロデューサー シナリオを選択するように求められます。 このクイック スタートでは、オプション 1 - Avro SpecificRecords を使用するを選択できます。

    Enter case number:
    1 - consume Avro SpecificRecords
    2 - consume Avro GenericRecords
    
  4. データの使用と逆シリアル化が成功すると、プロデューサー アプリケーションに次のコンソール ログが表示されます。

    INFO com.azure.schemaregistry.samples.consumer.KafkaAvroSpecificRecord - Order received: {"id": "ID-0", "amount": 10.0, "description": "Sample order 0"}
    INFO com.azure.schemaregistry.samples.consumer.KafkaAvroSpecificRecord - Order received: {"id": "ID-1", "amount": 11.0, "description": "Sample order 1"}
    INFO com.azure.schemaregistry.samples.consumer.KafkaAvroSpecificRecord - Order received: {"id": "ID-2", "amount": 12.0, "description": "Sample order 2"}
    

リソースをクリーンアップする

Event Hubs 名前空間を削除するか、名前空間を含むリソース グループを削除します。