搭配 Apache Kafka 應用程式使用 JSON 架構
本教學課程會逐步引導您使用 JSON 架構,在事件中樞中使用 Azure 架構登錄來串行化和還原串行化事件。
在此使用案例中,Kafka 產生者應用程式會使用儲存在 Azure 結構描述登錄中的 JSON 結構描述,將事件序列化,並將其發佈至 Azure 事件中樞中的 Kafka 主題/事件中樞。 Kafka 取用者會還原序列化它從事件中樞取用的事件。 為此,它會使用事件的結構描述識別碼和儲存在 Azure 結構描述登錄中的 JSON 架構。
必要條件
如果您對 Azure 事件中樞並不熟悉,在進行此快速入門之前,請先參閱事件中樞概述。
若要完成本快速入門,您必須符合下列必要條件:
- 如果您沒有 Azure 訂用帳戶,請在開始前建立免費帳戶。
- 您必須在開發環境中安裝下列元件:
- Java Development Kit (JDK) 1.7+。
- 下載及安裝 Maven 二進位封存檔。
- Git
- 複製 適用於 Kafka 的 Azure 結構描述登錄 存放庫。
建立事件中樞
遵循快速入門: [建立事件中樞命名空間和事件中樞] 中的指示,建立事件中樞命名空間和事件中樞。 然後,遵循 [取得連接字串] 中的指示,以取得事件中樞命名空間的連接字串。
記下您將在目前快速入門中使用的下列設定:
- 事件中樞命名空間的連接字串
- 事件中樞的名稱。
建立結構描述
遵循 [使用結構描述登錄建立結構描述] 的指示,以建立結構描述群組和結構描述。
使用結構描述登錄入口網站來建立名為 contoso-sg 的結構描述群組。 使用 JSON 架構作為序列化類型。
在該結構描述群組中,使用下列結構描述內容建立新的 JSON 結構描述,並具有結構描述名稱:
Microsoft.Azure.Data.SchemaRegistry.example.CustomerInvoice
。{ "$id": "https://example.com/person.schema.json", "$schema": "https://json-schema.org/draft/2020-12/schema", "title": "CustomerInvoice", "type": "object", "properties": { "invoiceId": { "type": "string" }, "merchantId": { "type": "string" }, "transactionValueUsd": { "type": "integer" }, "userId": { "type": "string" } } }
註冊應用程式以存取結構描述登錄
您可以使用 Microsoft Entra 識別碼來授權 Kafka 產生者和取用者應用程式存取 Azure 架構登錄資源。 若要啟用,您必須從 Azure 入口網站向 Microsoft Entra 租用戶註冊用戶端應用程式。
若要註冊名為 example-app
的 Microsoft Entra 應用程式,請參閱 向 Microsoft Entra 租用戶註冊您的應用程式。
- tenant.id – 設定應用程式的租用戶識別碼
- client.id – 設定應用程式的用戶端識別碼
- client.secret – 設定用於驗證的客戶端密碼
如果您使用受控識別,則需要:
- use.managed.identity.credential – 指出應該使用 MSI 認證,應該用於已啟用 MSI 的 VM
- managed.identity.clientId – 如果指定,它會使用指定的用戶端識別碼 managed.identity.resourceId 組建 MSI 認證 – 如果指定,則會使用指定的資源識別碼組建 MSI 認證
將使用者新增至結構描述登錄讀者角色
將用戶帳戶新增至命名空間層級的結構描述登錄讀者角色。 您也可以使用結構描述登錄參與者角色,但這並非本快速入門的必要專案。
- 在 [事件中樞命名空間] 頁面上,選取左側功能表上的 [存取控制 (IAM)]。
- 在 [存取控制 (IAM)] 頁面上,在功能表上選取 [+ 新增] ->[新增角色指派]。
- 在 [指派類型] 頁面上,選取 [下一步]。
- 在 [角色] 頁面上,選取 [架構登錄讀取器],然後選取頁面底部的 [下一步]。
- 使用 [+ 選取成員] 連結,將您在上一個步驟中建立的
example-app
應用程式新增至角色,然後選取 [下一步]。 - 在 [檢閱 + 指派] 頁面,選取 [檢閱 + 指派]。
更新 Kafka 應用程式的用戶端應用程式設定
您必須使用 Microsoft Entra 應用程式詳細資料和架構登錄資訊來更新 Kafka 產生者和取用者應用程式的用戶端設定。
若要更新 Kafka 產生者設定,請瀏覽至 azure-schema-registry-for-kafka/tree/master/java/json/samples/kafka-producer。
遵循事件中樞的 Kafka 快速入門指南,更新 src/main/resources/app.properties 中的 Kafka 應用程式設定。
使用您在上一個步驟中建立的架構登錄相關設定和 Microsoft Entra 應用程式,src/main/resources/app.properties 更新產生者的設定詳細資料,如下所示:
schema.group=contoso-sg schema.registry.url=https://<NAMESPACENAME>.servicebus.windows.net tenant.id=<> client.id=<> client.secret=<>
請遵循相同的指示,並更新 azure-schema-registry-for-kafka/tree/master/java/json/samples/kafka-consumer 設定。
針對 Kafka 產生者和取用者應用程式,會使用下列 JSON 結構描述:
{ "$id": "https://example.com/person.schema.json", "$schema": "https://json-schema.org/draft/2020-12/schema", "title": "CustomerInvoice", "type": "object", "properties": { "invoiceId": { "type": "string" }, "merchantId": { "type": "string" }, "transactionValueUsd": { "type": "integer" }, "userId": { "type": "string" } } }
搭配 JSON 結構描述驗證使用 Kafka 產生者
若要執行 Kafka 產生者設定,請瀏覽至 azure-schema-registry-for-kafka/tree/master/java/json/samples/kafka-producer。
您可以執行產生者應用程式,以便產生 JSON 結構描述特定記錄或一般記錄。 針對特定記錄模式,您必須先使用下列 maven 命令,針對產生者結構描述產生類別:
mvn generate-sources
然後,您可以使用下列命令執行產生者應用程式。
mvn clean package mvn -e clean compile exec:java -Dexec.mainClass="com.azure.schemaregistry.samples.producer.App"
成功執行產生者應用程式後,它會提示您選擇產生者案例。 在本快速入門中,您可以選擇選項 1 – 產生 SpecificRecords。
Enter case number: 1 - produce SpecificRecords
成功進行資料序列化和發佈之後,您應該會在產生者應用程式中看到下列主控台記錄:
INFO com.azure.schemaregistry.samples.producer.KafkaJsonSpecificRecord - Sent Order Invoice 0 INFO com.azure.schemaregistry.samples.producer.KafkaJsonSpecificRecord - Sent Order Invoice 1 INFO com.azure.schemaregistry.samples.producer.KafkaJsonSpecificRecord - Sent Order Invoice 2
搭配 JSON 結構描述驗證使用 Kafka 取用者
若要執行 Kafka 取用者設定,請瀏覽至 azure-schema-registry-for-kafka/tree/master/java/json/samples/kafka-consumer。
您可以執行取用者應用程式,以便產生 JSON 結構描述特定記錄或一般記錄。 針對特定記錄模式,您必須先使用下列 maven 命令,針對產生者結構描述產生類別:
mvn generate-sources
接著,您可以使用下列命令執行取用者應用程式。
mvn clean package mvn -e clean compile exec:java -Dexec.mainClass="com.azure.schemaregistry.samples.consumer.App"
成功執行取用者應用程式後,它會提示您選擇產生者案例。 在本快速入門中,您可以選擇選項 1 – 使用 SpecificRecords。
Enter case number: 1 - consume SpecificRecords
成功進行資料取用和還原序列化之後,您應該會在產生者應用程式中看到下列主控台記錄:
INFO com.azure.schemaregistry.samples.consumer.KafkaJsonSpecificRecord - Invoice received: {invoiceId=Invoice 0, merchantId=Merchant Id 0, transactionValueUsd=0, userId=User Id 0} INFO com.azure.schemaregistry.samples.consumer.KafkaJsonSpecificRecord - Invoice received: {invoiceId=Invoice 1, merchantId=Merchant Id 1, transactionValueUsd=1, userId=User Id 1} INFO com.azure.schemaregistry.samples.consumer.KafkaJsonSpecificRecord - Invoice received: {invoiceId=Invoice 2, merchantId=Merchant Id 2, transactionValueUsd=2, userId=User Id 2}
清除資源
刪除事件中樞命名空間,或刪除包含命名空間的資源群組。