Freigeben über


Verwenden des JSON-Schemas mit Apache Kafka-Anwendungen

Dieses Tutorial führt Sie durch ein Szenario, in dem Sie mithilfe von JSON-Schemas und der Azure-Schemaregistrierung in Event Hubs Ereignisse serialisieren und deserialisieren.

In diesem Anwendungsfall verwendet eine Kafka-Produceranwendung das in der Azure-Schemaregistrierung gespeicherte JSON-Schema, um das Ereignis zu serialisieren und in einem Kafka-Thema/EventHub in Azure Event Hubs zu veröffentlichen. Der Kafka-Consumer deserialisiert die Ereignisse, die er aus Event Hubs nutzt. Dazu werden die Schema-ID des Ereignisses und das JSON-Schema verwendet, das in der Azure-Schemaregistrierung gespeichert ist. Diagramm der Schemaserialisierung/-deserialisierung für Kafka-Anwendungen mit einem JSON-Schema.

Voraussetzungen

Wenn Sie mit Azure Event Hubs noch nicht vertraut sind, lesen Sie vor dem Durcharbeiten dieser Schnellstartanleitung die Informationen unter Übersicht über Event Hubs.

Zum Durchführen dieser Schnellstartanleitung benötigen Sie Folgendes:

Erstellen eines Ereignis-Hubs

Befolgen Sie die Anweisungen aus der Schnellstartanleitung Erstellen eines Event Hubs-Namespaces und eines Event Hubs, um einen Event Hubs-Namespace und einen Event Hub zu erstellen. Befolgen Sie dann die Anweisungen unter Abrufen der Verbindungszeichenfolge, um eine Verbindungszeichenfolge für Ihren Event Hubs-Namespace abzurufen.

Notieren Sie sich die folgenden Einstellungen, die Sie in der aktuellen Schnellstartanleitung verwenden werden:

  • Verbindungszeichenfolge für den Event Hubs-Namespace
  • Name des Event Hubs

Erstellen eines Schemas

Befolgen Sie die Anweisungen unter Erstellen von Schemas mithilfe der Schemaregistrierung , um eine Schemagruppe und ein Schema zu erstellen.

  1. Erstellen Sie im Schemaregistrierungsportal eine Schemagruppe namens contoso-sg. Verwenden Sie JSON-Schema als Serialisierungstyp.

  2. Erstellen Sie in dieser Schemagruppe ein neues JSON-Schema mit dem Schemanamen Microsoft.Azure.Data.SchemaRegistry.example.CustomerInvoice. Verwenden Sie den folgenden Schemainhalt.

    {
      "$id": "https://example.com/person.schema.json",
      "$schema": "https://json-schema.org/draft/2020-12/schema",
      "title": "CustomerInvoice",
      "type": "object",
      "properties": {
        "invoiceId": {
          "type": "string"
        },
        "merchantId": {
          "type": "string"
        },
        "transactionValueUsd": {
          "type": "integer"
        },
        "userId": {
          "type": "string"
        }
      }
    } 
    

Registrieren einer Anwendung für den Zugriff auf die Schemaregistrierung

Sie können Microsoft Entra ID verwenden, um Ihre Kafka-Produzent- und Verbraucheranwendung für den Zugriff auf Azure-Schemaregistrierungsressourcen zu autorisieren. Um dies zu aktivieren, müssen Sie Ihre Clientanwendung über das Azure-Portal bei einem Microsoft Entra-Mandanten registrieren.

Informationen zum Registrieren einer Microsoft Entra-Anwendung mit dem Namen example-app finden Sie unter Registrieren Ihrer Anwendung bei einem Microsoft Entra-Mandanten.

  • tenant.id – legt die Mandanten-ID der Anwendung fest
  • client.id – legt die Client-ID der Anwendung fest
  • client.secret – legt den geheimen Clientschlüssel für die Authentifizierung fest

Und wenn Sie verwaltete Identitäten verwenden, würden Sie Folgendes benötigen:

  • use.managed.identity.credential: Gibt an, dass MSI-Anmeldeinformationen verwendet werden sollen. Dies sollte für MSI-fähige VMs verwendet werden.
  • managed.identity.clientId: Wenn dies angegeben ist, werden MSI-Anmeldeinformationen mit der angegebenen Client-ID erstellt. managed.identity.resourceId: Wenn dies angegeben ist, werden MSI-Anmeldeinformationen mit der angegebenen Ressourcen-ID erstellt.

Hinzufügen eines Benutzers zur Rolle „Schemaregistrierungsleser“

Fügen Sie Ihr Benutzerkonto auf Namespaceebene der Rolle Schemaregistrierungsleser hinzu. Sie können auch die Rolle Mitwirkender der Schemaregistrierung verwenden, aber dies ist für diese Schnellstartanleitung nicht erforderlich.

  1. Wählen Sie auf der Seite Event Hubs-Namespace im linken Menü Zugriffssteuerung (IAM) aus.
  2. Wählen Sie auf der Seite Zugriffssteuerung (IAM) die Option + Hinzufügen ->Rollenzuweisung hinzufügen aus.
  3. Wählen Sie auf der Seite Zuweisungstyp die Option Weiter aus.
  4. Wählen Sie auf der Seite Rollen die Option Schemaregistrierungsleseraus und dann unten auf der Seite Weiter aus.
  5. Verwenden Sie den Link + Mitglieder auswählen, um die Anwendung example-app, die Sie im vorherigen Schritt erstellt haben, der Rolle hinzuzufügen. Wählen Sie dann Weiter aus.
  6. Wählen Sie auf der Seite Überprüfen + zuweisen die Option Überprüfen + zuweisen aus.

Aktualisieren der Konfiguration der Clientanwendung von Kafka-Anwendungen

Sie müssen die Clientkonfiguration der Kafka-Produzenten- und Verbraucheranwendungen mit den Microsoft Entra-Anwendungsdetails und den Schemaregistrierungsinformationen aktualisieren.

Navigieren Sie zum Aktualisieren der Kafka Producerkonfiguration zu azure-schema-registry-for-kafka/tree/master/java/json/samples/kafka-producer.

  1. Aktualisieren Sie die Konfiguration der Kafka-Anwendung in src/main/resources/app.properties, indem Sie der Kafka-Schnellstartanleitung für Event Hubs folgen.

  2. Aktualisieren Sie die Konfigurationsdetails für den Producer in src/main/resources/app.properties mithilfe der Konfiguration für die Schemaregistrierung und der Microsoft Entra-Anwendung, die Sie im vorherigen Schritt erstellt haben, wie folgt:

    schema.group=contoso-sg
    schema.registry.url=https://<NAMESPACENAME>.servicebus.windows.net
    
     tenant.id=<>
     client.id=<>
     client.secret=<>
    
  3. Befolgen Sie dieselben Anweisungen, und aktualisieren Sie auch die Konfiguration azure-schema-registry-for-kafka/tree/master/java/json/samples/kafka-consumer.

  4. Sowohl für Kafka-Producer- als auch für -Consumeranwendungen wird das folgende JSON-Schema verwendet:

    {
      "$id": "https://example.com/person.schema.json",
      "$schema": "https://json-schema.org/draft/2020-12/schema",
      "title": "CustomerInvoice",
      "type": "object",
      "properties": {
        "invoiceId": {
          "type": "string"
        },
        "merchantId": {
          "type": "string"
        },
        "transactionValueUsd": {
          "type": "integer"
        },
        "userId": {
          "type": "string"
        }
      }
    }
    

Verwenden des Kafka-Producers mit der JSON-Schemavalidierung

Navigieren Sie zum Ausführen der Kafka-Produceranwendung zu azure-schema-registry-for-kafka/tree/master/java/json/samples/kafka-producer.

  1. Sie können die Produceranwendung so ausführen, dass sie JSON-Schema-spezifische Datensätze oder generische Datensätze erstellt. Für einen bestimmten Datensatzmodus müssen Sie zuerst die Klassen für das Producer-Schema mit dem folgenden Maven-Befehl generieren:

    mvn generate-sources
    
  2. Anschließend können Sie die Produceranwendung mit den folgenden Befehlen ausführen.

    mvn clean package
    mvn -e clean compile exec:java -Dexec.mainClass="com.azure.schemaregistry.samples.producer.App"
    
  3. Nach erfolgreicher Ausführung der Produceranwendung werden Sie aufgefordert, das Producerszenario auszuwählen. In dieser Schnellstartanleitung können Sie die Option 1 - produce SpecificRecords auswählen.

    Enter case number:
    1 - produce SpecificRecords
    
  4. Nach erfolgreicher Datenserialisierung und Veröffentlichung sollten die folgenden Konsolenprotokolle in Ihrer Produceranwendung angezeigt werden:

    INFO com.azure.schemaregistry.samples.producer.KafkaJsonSpecificRecord - Sent Order Invoice 0
    INFO com.azure.schemaregistry.samples.producer.KafkaJsonSpecificRecord - Sent Order Invoice 1
    INFO com.azure.schemaregistry.samples.producer.KafkaJsonSpecificRecord - Sent Order Invoice 2
    

Verwenden des Kafka-Consumers mit der JSON-Schemavalidierung

Navigieren Sie zum Ausführen der Kafka-Consumeranwendung zu azure-schema-registry-for-kafka/tree/master/java/json/samples/kafka-consumer.

  1. Sie können die Consumeranwendung so ausführen, dass sie JSON-Schema-spezifische Datensätze oder generische Datensätze nutzt. Für einen bestimmten Datensatzmodus müssen Sie zuerst die Klassen für das Producer-Schema mit dem folgenden Maven-Befehl generieren:

    mvn generate-sources
    
  2. Anschließend können Sie die Consumeranwendung mit dem folgenden Befehl ausführen.

    mvn clean package
    mvn -e clean compile exec:java -Dexec.mainClass="com.azure.schemaregistry.samples.consumer.App"
    
  3. Nach erfolgreicher Ausführung der Consumeranwendung werden Sie aufgefordert, das Producerszenario auszuwählen. In diesem Schnellstart können Sie die Option 1 –konsumieren von spezifischen Datensätzen auswählen.

    Enter case number:
    1 - consume SpecificRecords
    
  4. Nach erfolgreicher Nutzung und Deserialisierung der Daten sollten die folgenden Konsolenprotokolle in Ihrer Produceranwendung angezeigt werden:

    INFO com.azure.schemaregistry.samples.consumer.KafkaJsonSpecificRecord - Invoice received: {invoiceId=Invoice 0, merchantId=Merchant Id 0, transactionValueUsd=0, userId=User Id 0}
    INFO com.azure.schemaregistry.samples.consumer.KafkaJsonSpecificRecord - Invoice received: {invoiceId=Invoice 1, merchantId=Merchant Id 1, transactionValueUsd=1, userId=User Id 1}
    INFO com.azure.schemaregistry.samples.consumer.KafkaJsonSpecificRecord - Invoice received: {invoiceId=Invoice 2, merchantId=Merchant Id 2, transactionValueUsd=2, userId=User Id 2}
    
    

Bereinigen von Ressourcen

Löschen Sie den Event Hubs-Namespace, oder löschen Sie die Ressourcengruppe, die den Namespace enthält.