Udostępnij za pośrednictwem


Używanie schematu JSON z aplikacjami platformy Apache Kafka

W tym samouczku przedstawiono scenariusz, w którym używasz schematów JSON do serializacji i deserializacji zdarzeń przy użyciu rejestru schematów platformy Azure w usłudze Event Hubs.

W tym przypadku użycia aplikacja producenta platformy Kafka używa schematu JSON przechowywanego w rejestrze schematów platformy Azure do serializacji zdarzenia i publikowania ich w centrum tematu/zdarzeń platformy Kafka w usłudze Azure Event Hubs. Konsument platformy Kafka deserializuje zdarzenia, które konsumuje z usługi Event Hubs. W tym celu używa identyfikatora schematu zdarzenia i schematu JSON, który jest przechowywany w rejestrze schematów platformy Azure. Diagram przedstawiający serializacji/deserializację schematu dla aplikacji platformy Kafka przy użyciu schematu JSON.

Wymagania wstępne

Jeśli dopiero zaczynasz korzystać z usługi Azure Event Hubs, zobacz Omówienie usługi Event Hubs przed rozpoczęciem Szybkiego Startu.

Aby ukończyć ten szybki start, konieczne jest spełnienie następujących wymagań wstępnych:

Tworzenie centrum zdarzeń

Postępuj zgodnie z instrukcjami z przewodnika Szybki Start: Utwórz przestrzeń nazw Event Hubs i centrum zdarzeń, aby utworzyć przestrzeń nazw Event Hubs i centrum zdarzeń. Następnie postępuj zgodnie z instrukcjami z sekcji Pobieranie parametrów połączenia , aby pobrać parametry połączenia do przestrzeni nazw usługi Event Hubs.

Zanotuj następujące ustawienia, których używasz w bieżącym szybkim starcie.

  • Parametry połączenia dla przestrzeni nazw usługi Event Hubs
  • Nazwa centrum zdarzeń

Tworzenie schematu

Postępuj zgodnie z instrukcjami z sekcji Tworzenie schematów przy użyciu rejestru schematów , aby utworzyć grupę schematów i schemat.

  1. Utwórz grupę schematów o nazwie contoso-sg przy użyciu portalu rejestru schematów. Użyj schematu JSON jako typu serializacji.

  2. W tej grupie schematów utwórz nowy schemat JSON o nazwie schematu: Microsoft.Azure.Data.SchemaRegistry.example.CustomerInvoice przy użyciu następującej zawartości schematu.

    {
      "$id": "https://example.com/person.schema.json",
      "$schema": "https://json-schema.org/draft/2020-12/schema",
      "title": "CustomerInvoice",
      "type": "object",
      "properties": {
        "invoiceId": {
          "type": "string"
        },
        "merchantId": {
          "type": "string"
        },
        "transactionValueUsd": {
          "type": "integer"
        },
        "userId": {
          "type": "string"
        }
      }
    } 
    

Rejestrowanie aplikacji w celu uzyskania dostępu do rejestru schematów

Możesz użyć identyfikatora Entra firmy Microsoft, aby autoryzować aplikację producenta i konsumenta platformy Kafka w celu uzyskania dostępu do zasobów usługi Azure Schema Registry. Aby ją włączyć, musisz zarejestrować aplikację kliencką w dzierżawie Microsoft Entra w portalu Azure.

Aby zarejestrować aplikację Microsoft Entra o nazwie example-app, zobacz Rejestrowanie aplikacji w dzierżawie Microsoft Entra.

  • tenant.id — ustawia identyfikator najemcy aplikacji
  • client.id — ustawia identyfikator klienta aplikacji
  • client.secret — ustawia klucz tajny klienta na potrzeby uwierzytelniania

Jeśli używasz tożsamości zarządzanej, potrzebne są następujące elementy:

  • use.managed.identity.credential — wskazuje, że należy użyć poświadczeń zarządzanej tożsamości, dla maszyn wirtualnych obsługujących MSI
  • managed.identity.clientId — jeśli jest określony, tworzy poświadczenie tożsamości usługi zarządzanej z podanym identyfikatorem klienta managed.identity.resourceId — jeśli jest określony, tworzy poświadczenie tożsamości usługi zarządzanej z podanym identyfikatorem zasobu

Dodawanie użytkownika do roli Czytelnik rejestru schematów

Dodaj konto użytkownika do roli Czytelnik rejestru schematów na poziomie obszaru nazw. Możesz również użyć roli Współautor rejestru schematów , ale nie jest to konieczne w tym przewodniku Szybki start.

  1. Na stronie Przestrzeń nazw usługi Event Hubs wybierz pozycję Kontrola dostępu (IAM) w menu po lewej stronie.
  2. Na stronie Kontrola dostępu (Zarządzanie dostępem i tożsamościami) wybierz pozycję + Dodaj ->Dodaj przypisanie roli w menu.
  3. Na stronie Typ przypisania wybierz pozycję Dalej.
  4. Na stronie Role wybierz pozycję Czytelnik rejestru schematów, a następnie wybierz pozycję Dalej w dolnej części strony.
  5. Użyj linku + Wybierz członków , aby dodać aplikację example-app utworzoną w poprzednim kroku do roli, a następnie wybierz przycisk Dalej.
  6. Na stronie Przeglądanie i przypisywanie wybierz pozycję Przejrzyj i przypisz.

Aktualizowanie konfiguracji aplikacji klienckiej aplikacji platformy Kafka

Należy zaktualizować konfigurację klienta zarówno aplikacji producenta, jak i konsumenta platformy Kafka, korzystając ze szczegółów aplikacji Microsoft Entra oraz informacji dotyczących rejestru schematów.

Aby zaktualizować konfigurację producenta Kafki, przejdź do azure-schema-registry-for-kafka/tree/master/java/json/samples/kafka-producer.

  1. Zaktualizuj konfigurację aplikacji platformy Kafka w pliku src/main/resources/app.properties , postępując zgodnie z przewodnikiem Szybki start platformy Kafka dla usługi Event Hubs.

  2. Zaktualizuj szczegóły konfiguracji producenta w pliku src/main/resources/app.properties przy użyciu konfiguracji powiązanej z rejestrem schematów i aplikacji Microsoft Entra utworzonej w poprzednim kroku w następujący sposób:

    schema.group=contoso-sg
    schema.registry.url=https://<NAMESPACENAME>.servicebus.windows.net
    
     tenant.id=<>
     client.id=<>
     client.secret=<>
    
  3. Postępuj zgodnie z tymi samymi instrukcjami i zaktualizuj również konfigurację azure-schema-registry-for-kafka/tree/master/java/json/samples/kafka-consumer .

  4. W przypadku aplikacji producentów i konsumentów platformy Kafka używane są następujące schematy JSON:

    {
      "$id": "https://example.com/person.schema.json",
      "$schema": "https://json-schema.org/draft/2020-12/schema",
      "title": "CustomerInvoice",
      "type": "object",
      "properties": {
        "invoiceId": {
          "type": "string"
        },
        "merchantId": {
          "type": "string"
        },
        "transactionValueUsd": {
          "type": "integer"
        },
        "userId": {
          "type": "string"
        }
      }
    }
    

Używanie producenta platformy Kafka z weryfikacją schematu JSON

Aby uruchomić aplikację producenta Kafka, przejdź do katalogu azure-schema-registry-for-kafka/tree/master/java/json/samples/kafka-producer.

  1. Aplikację producenta można uruchomić, aby mogła wygenerować rekordy specyficzne dla schematu JSON lub rekordy ogólne. W przypadku określonego trybu rekordów należy najpierw wygenerować klasy względem schematu producenta przy użyciu następującego polecenia maven:

    mvn generate-sources
    
  2. Następnie możesz uruchomić aplikację producenta przy użyciu następujących poleceń.

    mvn clean package
    mvn -e clean compile exec:java -Dexec.mainClass="com.azure.schemaregistry.samples.producer.App"
    
  3. Po pomyślnym wykonaniu aplikacji producenta zostanie wyświetlony monit o wybranie scenariusza producenta. W tym przewodniku Szybki start możesz wybrać opcję 1 — generuj określonerekordy.

    Enter case number:
    1 - produce SpecificRecords
    
  4. Po pomyślnym serializacji i opublikowaniu danych w aplikacji producenta powinny zostać wyświetlone następujące dzienniki konsoli:

    INFO com.azure.schemaregistry.samples.producer.KafkaJsonSpecificRecord - Sent Order Invoice 0
    INFO com.azure.schemaregistry.samples.producer.KafkaJsonSpecificRecord - Sent Order Invoice 1
    INFO com.azure.schemaregistry.samples.producer.KafkaJsonSpecificRecord - Sent Order Invoice 2
    

Używanie konsumenta platformy Kafka z weryfikacją schematu JSON

Aby uruchomić aplikację Kafka consumer, przejdź do azure-schema-registry-for-kafka/tree/master/java/json/samples/kafka-consumer.

  1. Aplikację konsumenta można uruchomić, aby mogła korzystać z rekordów specyficznych dla schematu JSON lub rekordów ogólnych. W przypadku określonego trybu rekordów należy najpierw wygenerować klasy względem schematu producenta przy użyciu następującego polecenia maven:

    mvn generate-sources
    
  2. Następnie możesz uruchomić aplikację konsumenta przy użyciu następującego polecenia.

    mvn clean package
    mvn -e clean compile exec:java -Dexec.mainClass="com.azure.schemaregistry.samples.consumer.App"
    
  3. Po pomyślnym wykonaniu aplikacji konsumenckiej zostanie wyświetlony monit o wybranie scenariusza producenta. W tym przewodniku szybkiego startu możesz wybrać opcję 1 — korzystanie z określonych rekordów.

    Enter case number:
    1 - consume SpecificRecords
    
  4. Po pomyślnym użyciu danych i deserializacji powinny zostać wyświetlone następujące dzienniki konsoli w aplikacji producenta:

    INFO com.azure.schemaregistry.samples.consumer.KafkaJsonSpecificRecord - Invoice received: {invoiceId=Invoice 0, merchantId=Merchant Id 0, transactionValueUsd=0, userId=User Id 0}
    INFO com.azure.schemaregistry.samples.consumer.KafkaJsonSpecificRecord - Invoice received: {invoiceId=Invoice 1, merchantId=Merchant Id 1, transactionValueUsd=1, userId=User Id 1}
    INFO com.azure.schemaregistry.samples.consumer.KafkaJsonSpecificRecord - Invoice received: {invoiceId=Invoice 2, merchantId=Merchant Id 2, transactionValueUsd=2, userId=User Id 2}
    
    

Czyszczenie zasobów

Usuń przestrzeń nazw usługi Event Hubs lub usuń grupę zasobów zawierającą przestrzeń nazw.