このクイック スタート ガイドでは、Azure Schema Registry for Event Hubs を使用して Apache Kafka アプリケーションからのイベントを検証する方法について説明します。
このユース ケースでは、Kafka プロデューサー アプリケーションは、Azure Schema Registry に格納されている Avro スキーマを使用して、イベントをシリアル化し、Azure Event Hubs の Kafka トピック/イベント ハブに発行します。 Kafka コンシューマーは、Event Hubs から使用するイベントを逆シリアル化します。 そのため、イベントのスキーマ ID と、Azure Schema Registry に格納されている Avro スキーマを使用します。
[前提条件]
Azure Event Hubs を初めて使用する場合は、このクイックスタートを行う前に 、Event Hubs の概要 を参照してください。
このクイックスタートを完了するには、次の前提条件が必要です。
- Azure サブスクリプションをお持ちでない場合は、開始する前に無料アカウントを作成してください。
- 開発環境で、次のコンポーネントをインストールします。
- Java Development Kit (JDK) 1.7 以降。
- Maven バイナリ アーカイブをダウンロードしてインストールします。
- Git
- Kafka 用 Azure Schema Registry リポジトリを複製します。
イベント ハブの作成
「クイックスタート: Event Hubs 名前空間とイベント ハブを作成して Event Hubs 名前空間とイベント ハブ を作成する」の手順に従います。 次に、「 接続文字列を取得する」の 手順に従って、Event Hubs 名前空間への接続文字列を取得します。
現在のクイック スタートで使用する次の設定に注意してください。
- Event Hubs 名前空間の接続文字列
- イベント ハブの名前
スキーマの作成
スキーマ レジストリを使用したスキーマの作成に関する記事の手順に従って、スキーマ グループとスキーマを作成します。
スキーマ レジストリ ポータルを使用して 、contoso-sg という名前のスキーマ グループを作成します。 シリアル化の種類として Avro を使用し、互換モードには None を使用します。
そのスキーマ グループで、スキーマ名を持つ新しい Avro スキーマを作成します。
Microsoft.Azure.Data.SchemaRegistry.example.Order次のスキーマ コンテンツを使用します。{ "namespace": "Microsoft.Azure.Data.SchemaRegistry.example", "type": "record", "name": "Order", "fields": [ { "name": "id", "type": "string" }, { "name": "amount", "type": "double" }, { "name": "description", "type": "string" } ] }
スキーマ レジストリにアクセスするためのアプリケーションの登録
Microsoft Entra ID を使用して、クライアント アプリケーションを Azure portal から Microsoft Entra テナントに登録することで、Kafka プロデューサーおよびコンシューマー アプリケーションが Azure Schema Registry リソースにアクセスすることを承認できます。
example-appという名前の Microsoft Entra アプリケーションを登録するには、「アプリケーションを Microsoft Entra テナントに登録する」を参照してください。
- tenant.id - アプリケーションのテナント ID を設定します
- client.id - アプリケーションのクライアント ID を設定します
- client.secret - 認証用のクライアント シークレットを設定します
マネージド ID を使用している場合は、次のものが必要です。
- use.managed.identity.credential - MSI 資格情報を使用する必要があることを示し、MSI 対応 VM に使用する必要があります
- managed.identity.clientId - 指定されている場合は、指定されたクライアント ID を使用して MSI 資格情報を構築します
- managed.identity.resourceId - 指定した場合は、指定されたリソース ID を使用して MSI 資格情報を構築します
スキーマ レジストリ閲覧者ロールにユーザーを追加する
名前空間レベルで スキーマ レジストリ閲覧者 ロールにユーザー アカウントを追加します。 スキーマ レジストリ共同作成者ロールを使用することもできますが、このクイック スタートでは必要ありません。
- [Event Hubs 名前空間] ページで、左側のメニューの [アクセス制御 (IAM)] を選択します。
- [アクセス制御 (IAM)] ページで、メニューの [+ 追加] -> [ロールの割り当ての追加] を選択します。
- [ 割り当ての種類 ] ページで、[ 次へ] を選択します。
- [ ロール ] ページで、 スキーマ レジストリ 閲覧者 (プレビュー) を選択し、ページの下部にある [次へ ] を選択します。
-
[+ メンバーの選択] リンクを使用して、前の手順で作成した
example-appアプリケーションをロールに追加し、[次へ] を選択します。 - [ 確認と割り当て ] ページで、[ 確認と割り当て] を選択します。
Kafka アプリケーションのクライアント アプリケーション構成を更新する
Kafka プロデューサーアプリケーションとコンシューマー アプリケーションのクライアント構成を、作成した Microsoft Entra アプリケーションに関連する構成とスキーマ レジストリ情報で更新する必要があります。
Kafka プロデューサー構成を更新するには、 azure-schema-registry-for-kafka/tree/master/java/avro/samples/kafka-producer に移動します。
Event Hubs の Kafka クイック スタート ガイドに従って、src/main/resources/app.properties で Kafka アプリケーションの構成を更新します。
スキーマ レジストリ関連の構成と、上記で作成した Microsoft Entra アプリケーションを使用して 、src/main/resources/app.properties のプロデューサーの構成の詳細を次のように更新します。
schema.group=contoso-sg schema.registry.url=https://<NAMESPACENAME>.servicebus.windows.net tenant.id=<> client.id=<> client.secret=<>同じ手順に従い、 azure-schema-registry-for-kafka/tree/master/java/avro/samples/kafka-consumer の構成も更新します。
Kafka プロデューサー アプリケーションとコンシューマー アプリケーションの両方で、次の Avro スキーマが使用されます。
{ "namespace": "com.azure.schemaregistry.samples", "type": "record", "name": "Order", "fields": [ { "name": "id", "type": "string" }, { "name": "amount", "type": "double" }, { "name": "description", "type": "string" } ] }
Avro スキーマ検証での Kafka プロデューサーの使用
Kafka プロデューサー アプリケーションを実行するには、 azure-schema-registry-for-kafka/tree/master/java/avro/samples/kafka-producer に移動します。
プロデューサー アプリケーションを実行して、Avro 固有のレコードまたは汎用レコードを生成できます。 特定のレコード モードでは、まず、次の maven コマンドを使用して、プロデューサー スキーマに対してクラスを生成する必要があります。
mvn generate-sources次のコマンドを使用して、プロデューサー アプリケーションを実行できます。
mvn clean package mvn -e clean compile exec:java -Dexec.mainClass="com.azure.schemaregistry.samples.producer.App"プロデューサー アプリケーションが正常に実行されると、プロデューサー シナリオを選択するように求められます。 このクイック スタートでは、オプション 1 - Avro SpecificRecords の生成を選択できます。
Enter case number: 1 - produce Avro SpecificRecords 2 - produce Avro GenericRecordsデータのシリアル化と発行が成功すると、プロデューサー アプリケーションに次のコンソール ログが表示されます。
INFO com.azure.schemaregistry.samples.producer.KafkaAvroSpecificRecord - Sent Order {"id": "ID-0", "amount": 10.0, "description": "Sample order 0"} INFO com.azure.schemaregistry.samples.producer.KafkaAvroSpecificRecord - Sent Order {"id": "ID-1", "amount": 11.0, "description": "Sample order 1"} INFO com.azure.schemaregistry.samples.producer.KafkaAvroSpecificRecord - Sent Order {"id": "ID-2", "amount": 12.0, "description": "Sample order 2"}
Avro スキーマ検証での Kafka コンシューマーの使用
Kafka コンシューマー アプリケーションを実行するには、 azure-schema-registry-for-kafka/tree/master/java/avro/samples/kafka-consumer に移動します。
コンシューマー アプリケーションを実行して、Avro 固有のレコードまたは汎用レコードを使用できるようにします。 特定のレコード モードでは、まず、次の maven コマンドを使用して、プロデューサー スキーマに対してクラスを生成する必要があります。
mvn generate-sourcesその後、次のコマンドを使用してコンシューマー アプリケーションを実行できます。
mvn clean package mvn -e clean compile exec:java -Dexec.mainClass="com.azure.schemaregistry.samples.consumer.App"コンシューマー アプリケーションが正常に実行されると、プロデューサー シナリオを選択するように求められます。 このクイック スタートでは、オプション 1 - Avro SpecificRecords を使用するを選択できます。
Enter case number: 1 - consume Avro SpecificRecords 2 - consume Avro GenericRecordsデータの使用と逆シリアル化が成功すると、プロデューサー アプリケーションに次のコンソール ログが表示されます。
INFO com.azure.schemaregistry.samples.consumer.KafkaAvroSpecificRecord - Order received: {"id": "ID-0", "amount": 10.0, "description": "Sample order 0"} INFO com.azure.schemaregistry.samples.consumer.KafkaAvroSpecificRecord - Order received: {"id": "ID-1", "amount": 11.0, "description": "Sample order 1"} INFO com.azure.schemaregistry.samples.consumer.KafkaAvroSpecificRecord - Order received: {"id": "ID-2", "amount": 12.0, "description": "Sample order 2"}
リソースをクリーンアップする
Event Hubs 名前空間を削除するか、名前空間を含むリソース グループを削除します。