Verwenden des JSON-Schemas mit Apache Kafka-Anwendungen
Dieses Tutorial führt Sie durch ein Szenario, in dem Sie mithilfe von JSON-Schemas und der Azure-Schemaregistrierung in Event Hubs Ereignisse serialisieren und deserialisieren.
In diesem Anwendungsfall verwendet eine Kafka-Produceranwendung das in der Azure-Schemaregistrierung gespeicherte JSON-Schema, um das Ereignis zu serialisieren und in einem Kafka-Thema/EventHub in Azure Event Hubs zu veröffentlichen. Der Kafka-Consumer deserialisiert die Ereignisse, die er aus Event Hubs nutzt. Dazu werden die Schema-ID des Ereignisses und das JSON-Schema verwendet, das in der Azure-Schemaregistrierung gespeichert ist.
Voraussetzungen
Wenn Sie mit Azure Event Hubs noch nicht vertraut sind, lesen Sie vor dem Durcharbeiten dieser Schnellstartanleitung die Informationen unter Übersicht über Event Hubs.
Zum Durchführen dieser Schnellstartanleitung benötigen Sie Folgendes:
- Wenn Sie kein Azure-Abonnement besitzen, erstellen Sie ein kostenloses Konto, bevor Sie beginnen.
- Installieren Sie in Ihrer Entwicklungsumgebung die folgenden Komponenten:
- Java Development Kit (JDK) 1.7 oder höher
- Ein binäres Maven-Archiv (Download/Installationsanleitung)
- Git-Client
- Klonen Sie das Repository für die Azure-Schemaregistrierung für Kafka.
Erstellen eines Ereignis-Hubs
Befolgen Sie die Anweisungen aus der Schnellstartanleitung Erstellen eines Event Hubs-Namespaces und eines Event Hubs, um einen Event Hubs-Namespace und einen Event Hub zu erstellen. Befolgen Sie dann die Anweisungen unter Abrufen der Verbindungszeichenfolge, um eine Verbindungszeichenfolge für Ihren Event Hubs-Namespace abzurufen.
Notieren Sie sich die folgenden Einstellungen, die Sie in der aktuellen Schnellstartanleitung verwenden werden:
- Verbindungszeichenfolge für den Event Hubs-Namespace
- Name des Event Hubs
Erstellen eines Schemas
Befolgen Sie die Anweisungen unter Erstellen von Schemas mithilfe der Schemaregistrierung , um eine Schemagruppe und ein Schema zu erstellen.
Erstellen Sie im Schemaregistrierungsportal eine Schemagruppe namens contoso-sg. Verwenden Sie JSON-Schema als Serialisierungstyp.
Erstellen Sie in dieser Schemagruppe ein neues JSON-Schema mit dem Schemanamen
Microsoft.Azure.Data.SchemaRegistry.example.CustomerInvoice
. Verwenden Sie den folgenden Schemainhalt.{ "$id": "https://example.com/person.schema.json", "$schema": "https://json-schema.org/draft/2020-12/schema", "title": "CustomerInvoice", "type": "object", "properties": { "invoiceId": { "type": "string" }, "merchantId": { "type": "string" }, "transactionValueUsd": { "type": "integer" }, "userId": { "type": "string" } } }
Registrieren einer Anwendung für den Zugriff auf die Schemaregistrierung
Sie können Microsoft Entra ID verwenden, um Ihre Kafka-Produzent- und Verbraucheranwendung für den Zugriff auf Azure-Schemaregistrierungsressourcen zu autorisieren. Um dies zu aktivieren, müssen Sie Ihre Clientanwendung über das Azure-Portal bei einem Microsoft Entra-Mandanten registrieren.
Informationen zum Registrieren einer Microsoft Entra-Anwendung mit dem Namen example-app
finden Sie unter Registrieren Ihrer Anwendung bei einem Microsoft Entra-Mandanten.
- tenant.id – legt die Mandanten-ID der Anwendung fest
- client.id – legt die Client-ID der Anwendung fest
- client.secret – legt den geheimen Clientschlüssel für die Authentifizierung fest
Und wenn Sie verwaltete Identitäten verwenden, würden Sie Folgendes benötigen:
- use.managed.identity.credential: Gibt an, dass MSI-Anmeldeinformationen verwendet werden sollen. Dies sollte für MSI-fähige VMs verwendet werden.
- managed.identity.clientId: Wenn dies angegeben ist, werden MSI-Anmeldeinformationen mit der angegebenen Client-ID erstellt. managed.identity.resourceId: Wenn dies angegeben ist, werden MSI-Anmeldeinformationen mit der angegebenen Ressourcen-ID erstellt.
Hinzufügen eines Benutzers zur Rolle „Schemaregistrierungsleser“
Fügen Sie Ihr Benutzerkonto auf Namespaceebene der Rolle Schemaregistrierungsleser hinzu. Sie können auch die Rolle Mitwirkender der Schemaregistrierung verwenden, aber dies ist für diese Schnellstartanleitung nicht erforderlich.
- Wählen Sie auf der Seite Event Hubs-Namespace im linken Menü Zugriffssteuerung (IAM) aus.
- Wählen Sie auf der Seite Zugriffssteuerung (IAM) die Option + Hinzufügen ->Rollenzuweisung hinzufügen aus.
- Wählen Sie auf der Seite Zuweisungstyp die Option Weiter aus.
- Wählen Sie auf der Seite Rollen die Option Schemaregistrierungsleseraus und dann unten auf der Seite Weiter aus.
- Verwenden Sie den Link + Mitglieder auswählen, um die Anwendung
example-app
, die Sie im vorherigen Schritt erstellt haben, der Rolle hinzuzufügen. Wählen Sie dann Weiter aus. - Wählen Sie auf der Seite Überprüfen + zuweisen die Option Überprüfen + zuweisen aus.
Aktualisieren der Konfiguration der Clientanwendung von Kafka-Anwendungen
Sie müssen die Clientkonfiguration der Kafka-Produzenten- und Verbraucheranwendungen mit den Microsoft Entra-Anwendungsdetails und den Schemaregistrierungsinformationen aktualisieren.
Navigieren Sie zum Aktualisieren der Kafka Producerkonfiguration zu azure-schema-registry-for-kafka/tree/master/java/json/samples/kafka-producer.
Aktualisieren Sie die Konfiguration der Kafka-Anwendung in src/main/resources/app.properties, indem Sie der Kafka-Schnellstartanleitung für Event Hubs folgen.
Aktualisieren Sie die Konfigurationsdetails für den Producer in src/main/resources/app.properties mithilfe der Konfiguration für die Schemaregistrierung und der Microsoft Entra-Anwendung, die Sie im vorherigen Schritt erstellt haben, wie folgt:
schema.group=contoso-sg schema.registry.url=https://<NAMESPACENAME>.servicebus.windows.net tenant.id=<> client.id=<> client.secret=<>
Befolgen Sie dieselben Anweisungen, und aktualisieren Sie auch die Konfiguration azure-schema-registry-for-kafka/tree/master/java/json/samples/kafka-consumer.
Sowohl für Kafka-Producer- als auch für -Consumeranwendungen wird das folgende JSON-Schema verwendet:
{ "$id": "https://example.com/person.schema.json", "$schema": "https://json-schema.org/draft/2020-12/schema", "title": "CustomerInvoice", "type": "object", "properties": { "invoiceId": { "type": "string" }, "merchantId": { "type": "string" }, "transactionValueUsd": { "type": "integer" }, "userId": { "type": "string" } } }
Verwenden des Kafka-Producers mit der JSON-Schemavalidierung
Navigieren Sie zum Ausführen der Kafka-Produceranwendung zu azure-schema-registry-for-kafka/tree/master/java/json/samples/kafka-producer.
Sie können die Produceranwendung so ausführen, dass sie JSON-Schema-spezifische Datensätze oder generische Datensätze erstellt. Für einen bestimmten Datensatzmodus müssen Sie zuerst die Klassen für das Producer-Schema mit dem folgenden Maven-Befehl generieren:
mvn generate-sources
Anschließend können Sie die Produceranwendung mit den folgenden Befehlen ausführen.
mvn clean package mvn -e clean compile exec:java -Dexec.mainClass="com.azure.schemaregistry.samples.producer.App"
Nach erfolgreicher Ausführung der Produceranwendung werden Sie aufgefordert, das Producerszenario auszuwählen. In dieser Schnellstartanleitung können Sie die Option 1 - produce SpecificRecords auswählen.
Enter case number: 1 - produce SpecificRecords
Nach erfolgreicher Datenserialisierung und Veröffentlichung sollten die folgenden Konsolenprotokolle in Ihrer Produceranwendung angezeigt werden:
INFO com.azure.schemaregistry.samples.producer.KafkaJsonSpecificRecord - Sent Order Invoice 0 INFO com.azure.schemaregistry.samples.producer.KafkaJsonSpecificRecord - Sent Order Invoice 1 INFO com.azure.schemaregistry.samples.producer.KafkaJsonSpecificRecord - Sent Order Invoice 2
Verwenden des Kafka-Consumers mit der JSON-Schemavalidierung
Navigieren Sie zum Ausführen der Kafka-Consumeranwendung zu azure-schema-registry-for-kafka/tree/master/java/json/samples/kafka-consumer.
Sie können die Consumeranwendung so ausführen, dass sie JSON-Schema-spezifische Datensätze oder generische Datensätze nutzt. Für einen bestimmten Datensatzmodus müssen Sie zuerst die Klassen für das Producer-Schema mit dem folgenden Maven-Befehl generieren:
mvn generate-sources
Anschließend können Sie die Consumeranwendung mit dem folgenden Befehl ausführen.
mvn clean package mvn -e clean compile exec:java -Dexec.mainClass="com.azure.schemaregistry.samples.consumer.App"
Nach erfolgreicher Ausführung der Consumeranwendung werden Sie aufgefordert, das Producerszenario auszuwählen. In diesem Schnellstart können Sie die Option 1 –konsumieren von spezifischen Datensätzen auswählen.
Enter case number: 1 - consume SpecificRecords
Nach erfolgreicher Nutzung und Deserialisierung der Daten sollten die folgenden Konsolenprotokolle in Ihrer Produceranwendung angezeigt werden:
INFO com.azure.schemaregistry.samples.consumer.KafkaJsonSpecificRecord - Invoice received: {invoiceId=Invoice 0, merchantId=Merchant Id 0, transactionValueUsd=0, userId=User Id 0} INFO com.azure.schemaregistry.samples.consumer.KafkaJsonSpecificRecord - Invoice received: {invoiceId=Invoice 1, merchantId=Merchant Id 1, transactionValueUsd=1, userId=User Id 1} INFO com.azure.schemaregistry.samples.consumer.KafkaJsonSpecificRecord - Invoice received: {invoiceId=Invoice 2, merchantId=Merchant Id 2, transactionValueUsd=2, userId=User Id 2}
Bereinigen von Ressourcen
Löschen Sie den Event Hubs-Namespace, oder löschen Sie die Ressourcengruppe, die den Namespace enthält.