Udostępnij za pośrednictwem


Używanie schematu JSON z aplikacjami platformy Apache Kafka

W tym samouczku przedstawiono scenariusz, w którym używasz schematów JSON do serializacji i deserializacji zdarzeń przy użyciu rejestru schematów platformy Azure w usłudze Event Hubs.

W tym przypadku użycia aplikacja producenta platformy Kafka używa schematu JSON przechowywanego w rejestrze schematów platformy Azure do serializacji zdarzenia i publikowania ich w centrum tematu/zdarzeń platformy Kafka w usłudze Azure Event Hubs. Konsument platformy Kafka deserializuje zdarzenia używane przez usługę Event Hubs. W tym celu używa identyfikatora schematu zdarzenia i schematu JSON, który jest przechowywany w rejestrze schematów platformy Azure. Diagram przedstawiający serializacji/deserializację schematu dla aplikacji platformy Kafka przy użyciu schematu JSON.

Wymagania wstępne

Jeśli dopiero zaczynasz korzystać z usługi Azure Event Hubs, zobacz Omówienie usługi Event Hubs przed wykonaniem tego przewodnika Szybki start.

Do wykonania kroków tego przewodnika Szybki start niezbędne jest spełnienie następujących wymagań wstępnych:

Tworzenie centrum zdarzeń

Postępuj zgodnie z instrukcjami z przewodnika Szybki start: tworzenie przestrzeni nazw usługi Event Hubs i centrum zdarzeń w celu utworzenia przestrzeni nazw usługi Event Hubs i centrum zdarzeń. Następnie postępuj zgodnie z instrukcjami z sekcji Pobierz parametry połączenia, aby uzyskać parametry połączenia do przestrzeni nazw usługi Event Hubs.

Zanotuj następujące ustawienia, które są używane w bieżącym przewodniku Szybki start:

  • ciąg Połączenie ion dla przestrzeni nazw usługi Event Hubs
  • Nazwa centrum zdarzeń

Tworzenie schematu

Postępuj zgodnie z instrukcjami z sekcji Tworzenie schematów przy użyciu rejestru schematów, aby utworzyć grupę schematów i schemat.

  1. Utwórz grupę schematów o nazwie contoso-sg przy użyciu portalu rejestru schematów. Użyj schematu JSON jako typu serializacji.

  2. W tej grupie schematów utwórz nowy schemat JSON o nazwie schematu: Microsoft.Azure.Data.SchemaRegistry.example.CustomerInvoice przy użyciu następującej zawartości schematu.

    {
      "$id": "https://example.com/person.schema.json",
      "$schema": "https://json-schema.org/draft/2020-12/schema",
      "title": "CustomerInvoice",
      "type": "object",
      "properties": {
        "invoiceId": {
          "type": "string"
        },
        "merchantId": {
          "type": "string"
        },
        "transactionValueUsd": {
          "type": "integer"
        },
        "userId": {
          "type": "string"
        }
      }
    } 
    

Rejestrowanie aplikacji w celu uzyskania dostępu do rejestru schematów

Możesz użyć identyfikatora Entra firmy Microsoft, aby autoryzować aplikację producenta i konsumenta platformy Kafka w celu uzyskania dostępu do zasobów usługi Azure Schema Registry. Aby ją włączyć, musisz zarejestrować aplikację kliencką w dzierżawie firmy Microsoft Entra w witrynie Azure Portal.

Aby zarejestrować aplikację Firmy Microsoft Entra o nazwie example-app , zobacz Rejestrowanie aplikacji w dzierżawie firmy Microsoft Entra.

  • tenant.id — ustawia identyfikator dzierżawy aplikacji
  • client.id — ustawia identyfikator klienta aplikacji
  • client.secret — ustawia klucz tajny klienta na potrzeby uwierzytelniania

Jeśli używasz tożsamości zarządzanej, potrzebne są następujące elementy:

  • use.managed.identity.credential — wskazuje, że należy użyć poświadczeń tożsamości usługi zarządzanej, które powinny być używane dla maszyny wirtualnej z obsługą tożsamości usługi zarządzanej
  • managed.identity.clientId — jeśli zostanie określony, kompiluje poświadczenia tożsamości usługi zarządzanej przy użyciu podanego identyfikatora klienta managed.identity.resourceId — jeśli określono, kompiluje poświadczenia tożsamości usługi zarządzanej z danym identyfikatorem zasobu

Dodawanie użytkownika do roli Czytelnik rejestru schematów

Dodaj konto użytkownika do roli Czytelnik rejestru schematów na poziomie przestrzeni nazw. Możesz również użyć roli Współautor rejestru schematów, ale nie jest to konieczne w tym przewodniku Szybki start.

  1. Na stronie Przestrzeń nazw usługi Event Hubs wybierz pozycję Kontrola dostępu (Zarządzanie dostępem i tożsamościami) w menu po lewej stronie.
  2. Na stronie Kontrola dostępu (Zarządzanie dostępem i tożsamościami) wybierz pozycję + Dodaj ->Dodaj przypisanie roli w menu.
  3. Na stronie Typ przypisania wybierz pozycję Dalej.
  4. Na stronie Role wybierz pozycję Czytelnik rejestru schematów, a następnie wybierz pozycję Dalej w dolnej części strony.
  5. Użyj linku + Wybierz członków , aby dodać aplikację example-app utworzoną w poprzednim kroku do roli, a następnie wybierz przycisk Dalej.
  6. Na stronie Przeglądanie i przypisywanie wybierz pozycję Przejrzyj i przypisz.

Aktualizowanie konfiguracji aplikacji klienckiej aplikacji platformy Kafka

Należy zaktualizować konfigurację klienta aplikacji producentów i konsumentów platformy Kafka przy użyciu szczegółów aplikacji Microsoft Entra oraz informacji rejestru schematów.

Aby zaktualizować konfigurację producenta platformy Kafka, przejdź do strony azure-schema-registry-for-kafka/tree/master/java/json/samples/kafka-producer.

  1. Zaktualizuj konfigurację aplikacji platformy Kafka w pliku src/main/resources/app.properties , postępując zgodnie z przewodnikiem Szybki start platformy Kafka dla usługi Event Hubs.

  2. Zaktualizuj szczegóły konfiguracji producenta w pliku src/main/resources/app.properties przy użyciu konfiguracji powiązanej z rejestrem schematów i aplikacji Microsoft Entra utworzonej w poprzednim kroku w następujący sposób:

    schema.group=contoso-sg
    schema.registry.url=https://<NAMESPACENAME>.servicebus.windows.net
    
     tenant.id=<>
     client.id=<>
     client.secret=<>
    
  3. Postępuj zgodnie z tymi samymi instrukcjami i zaktualizuj również konfigurację azure-schema-registry-for-kafka/tree/master/java/json/samples/kafka-consumer .

  4. W przypadku aplikacji producentów i konsumentów platformy Kafka używane są następujące schematy JSON:

    {
      "$id": "https://example.com/person.schema.json",
      "$schema": "https://json-schema.org/draft/2020-12/schema",
      "title": "CustomerInvoice",
      "type": "object",
      "properties": {
        "invoiceId": {
          "type": "string"
        },
        "merchantId": {
          "type": "string"
        },
        "transactionValueUsd": {
          "type": "integer"
        },
        "userId": {
          "type": "string"
        }
      }
    }
    

Używanie producenta platformy Kafka z weryfikacją schematu JSON

Aby uruchomić aplikację producenta platformy Kafka, przejdź do strony azure-schema-registry-for-kafka/tree/master/java/json/samples/kafka-producer.

  1. Aplikację producenta można uruchomić, aby mogła wygenerować rekordy specyficzne dla schematu JSON lub rekordy ogólne. W przypadku określonego trybu rekordów należy najpierw wygenerować klasy względem schematu producenta przy użyciu następującego polecenia maven:

    mvn generate-sources
    
  2. Następnie możesz uruchomić aplikację producenta przy użyciu następujących poleceń.

    mvn clean package
    mvn -e clean compile exec:java -Dexec.mainClass="com.azure.schemaregistry.samples.producer.App"
    
  3. Po pomyślnym wykonaniu aplikacji producenta zostanie wyświetlony monit o wybranie scenariusza producenta. W tym przewodniku Szybki start możesz wybrać opcję 1 — generuj określonerekordy.

    Enter case number:
    1 - produce SpecificRecords
    
  4. Po pomyślnym serializacji i opublikowaniu danych w aplikacji producenta powinny zostać wyświetlone następujące dzienniki konsoli:

    INFO com.azure.schemaregistry.samples.producer.KafkaJsonSpecificRecord - Sent Order Invoice 0
    INFO com.azure.schemaregistry.samples.producer.KafkaJsonSpecificRecord - Sent Order Invoice 1
    INFO com.azure.schemaregistry.samples.producer.KafkaJsonSpecificRecord - Sent Order Invoice 2
    

Używanie konsumenta platformy Kafka z weryfikacją schematu JSON

Aby uruchomić aplikację konsumenta platformy Kafka, przejdź do adresu azure-schema-registry-for-kafka/tree/master/java/json/samples/kafka-consumer.

  1. Aplikację konsumenta można uruchomić, aby mogła korzystać z rekordów specyficznych dla schematu JSON lub rekordów ogólnych. W przypadku określonego trybu rekordów należy najpierw wygenerować klasy względem schematu producenta przy użyciu następującego polecenia maven:

    mvn generate-sources
    
  2. Następnie możesz uruchomić aplikację konsumenta przy użyciu następującego polecenia.

    mvn clean package
    mvn -e clean compile exec:java -Dexec.mainClass="com.azure.schemaregistry.samples.consumer.App"
    
  3. Po pomyślnym wykonaniu aplikacji konsumenckiej zostanie wyświetlony monit o wybranie scenariusza producenta. W tym przewodniku Szybki start możesz wybrać opcję 1 — korzystanie z określonychrekordów.

    Enter case number:
    1 - consume SpecificRecords
    
  4. Po pomyślnym użyciu danych i deserializacji powinny zostać wyświetlone następujące dzienniki konsoli w aplikacji producenta:

    INFO com.azure.schemaregistry.samples.consumer.KafkaJsonSpecificRecord - Invoice received: {invoiceId=Invoice 0, merchantId=Merchant Id 0, transactionValueUsd=0, userId=User Id 0}
    INFO com.azure.schemaregistry.samples.consumer.KafkaJsonSpecificRecord - Invoice received: {invoiceId=Invoice 1, merchantId=Merchant Id 1, transactionValueUsd=1, userId=User Id 1}
    INFO com.azure.schemaregistry.samples.consumer.KafkaJsonSpecificRecord - Invoice received: {invoiceId=Invoice 2, merchantId=Merchant Id 2, transactionValueUsd=2, userId=User Id 2}
    
    

Czyszczenie zasobów

Usuń przestrzeń nazw usługi Event Hubs lub usuń grupę zasobów zawierającą przestrzeń nazw.