Używanie schematu JSON z aplikacjami platformy Apache Kafka
W tym samouczku przedstawiono scenariusz, w którym używasz schematów JSON do serializacji i deserializacji zdarzeń przy użyciu rejestru schematów platformy Azure w usłudze Event Hubs.
W tym przypadku użycia aplikacja producenta platformy Kafka używa schematu JSON przechowywanego w rejestrze schematów platformy Azure do serializacji zdarzenia i publikowania ich w centrum tematu/zdarzeń platformy Kafka w usłudze Azure Event Hubs. Konsument platformy Kafka deserializuje zdarzenia używane przez usługę Event Hubs. W tym celu używa identyfikatora schematu zdarzenia i schematu JSON, który jest przechowywany w rejestrze schematów platformy Azure.
Wymagania wstępne
Jeśli dopiero zaczynasz korzystać z usługi Azure Event Hubs, zobacz Omówienie usługi Event Hubs przed wykonaniem tego przewodnika Szybki start.
Do wykonania kroków tego przewodnika Szybki start niezbędne jest spełnienie następujących wymagań wstępnych:
- Jeśli nie masz subskrypcji platformy Azure, przed rozpoczęciem utwórz bezpłatne konto.
- W środowisku projektowym zainstaluj następujące składniki:
- Zestaw Java Development Kit (JDK) 1.7+
- Pobierz i zainstaluj archiwum binarne Maven.
- Usługa Git
- Sklonuj rejestr schematów platformy Azure dla platformy Kafka .
Tworzenie centrum zdarzeń
Postępuj zgodnie z instrukcjami z przewodnika Szybki start: tworzenie przestrzeni nazw usługi Event Hubs i centrum zdarzeń w celu utworzenia przestrzeni nazw usługi Event Hubs i centrum zdarzeń. Następnie postępuj zgodnie z instrukcjami z sekcji Pobierz parametry połączenia, aby uzyskać parametry połączenia do przestrzeni nazw usługi Event Hubs.
Zanotuj następujące ustawienia, które są używane w bieżącym przewodniku Szybki start:
- ciąg Połączenie ion dla przestrzeni nazw usługi Event Hubs
- Nazwa centrum zdarzeń
Tworzenie schematu
Postępuj zgodnie z instrukcjami z sekcji Tworzenie schematów przy użyciu rejestru schematów, aby utworzyć grupę schematów i schemat.
Utwórz grupę schematów o nazwie contoso-sg przy użyciu portalu rejestru schematów. Użyj schematu JSON jako typu serializacji.
W tej grupie schematów utwórz nowy schemat JSON o nazwie schematu:
Microsoft.Azure.Data.SchemaRegistry.example.CustomerInvoice
przy użyciu następującej zawartości schematu.{ "$id": "https://example.com/person.schema.json", "$schema": "https://json-schema.org/draft/2020-12/schema", "title": "CustomerInvoice", "type": "object", "properties": { "invoiceId": { "type": "string" }, "merchantId": { "type": "string" }, "transactionValueUsd": { "type": "integer" }, "userId": { "type": "string" } } }
Rejestrowanie aplikacji w celu uzyskania dostępu do rejestru schematów
Możesz użyć identyfikatora Entra firmy Microsoft, aby autoryzować aplikację producenta i konsumenta platformy Kafka w celu uzyskania dostępu do zasobów usługi Azure Schema Registry. Aby ją włączyć, musisz zarejestrować aplikację kliencką w dzierżawie firmy Microsoft Entra w witrynie Azure Portal.
Aby zarejestrować aplikację Firmy Microsoft Entra o nazwie example-app
, zobacz Rejestrowanie aplikacji w dzierżawie firmy Microsoft Entra.
- tenant.id — ustawia identyfikator dzierżawy aplikacji
- client.id — ustawia identyfikator klienta aplikacji
- client.secret — ustawia klucz tajny klienta na potrzeby uwierzytelniania
Jeśli używasz tożsamości zarządzanej, potrzebne są następujące elementy:
- use.managed.identity.credential — wskazuje, że należy użyć poświadczeń tożsamości usługi zarządzanej, które powinny być używane dla maszyny wirtualnej z obsługą tożsamości usługi zarządzanej
- managed.identity.clientId — jeśli zostanie określony, kompiluje poświadczenia tożsamości usługi zarządzanej przy użyciu podanego identyfikatora klienta managed.identity.resourceId — jeśli określono, kompiluje poświadczenia tożsamości usługi zarządzanej z danym identyfikatorem zasobu
Dodawanie użytkownika do roli Czytelnik rejestru schematów
Dodaj konto użytkownika do roli Czytelnik rejestru schematów na poziomie przestrzeni nazw. Możesz również użyć roli Współautor rejestru schematów, ale nie jest to konieczne w tym przewodniku Szybki start.
- Na stronie Przestrzeń nazw usługi Event Hubs wybierz pozycję Kontrola dostępu (Zarządzanie dostępem i tożsamościami) w menu po lewej stronie.
- Na stronie Kontrola dostępu (Zarządzanie dostępem i tożsamościami) wybierz pozycję + Dodaj ->Dodaj przypisanie roli w menu.
- Na stronie Typ przypisania wybierz pozycję Dalej.
- Na stronie Role wybierz pozycję Czytelnik rejestru schematów, a następnie wybierz pozycję Dalej w dolnej części strony.
- Użyj linku + Wybierz członków , aby dodać aplikację
example-app
utworzoną w poprzednim kroku do roli, a następnie wybierz przycisk Dalej. - Na stronie Przeglądanie i przypisywanie wybierz pozycję Przejrzyj i przypisz.
Aktualizowanie konfiguracji aplikacji klienckiej aplikacji platformy Kafka
Należy zaktualizować konfigurację klienta aplikacji producentów i konsumentów platformy Kafka przy użyciu szczegółów aplikacji Microsoft Entra oraz informacji rejestru schematów.
Aby zaktualizować konfigurację producenta platformy Kafka, przejdź do strony azure-schema-registry-for-kafka/tree/master/java/json/samples/kafka-producer.
Zaktualizuj konfigurację aplikacji platformy Kafka w pliku src/main/resources/app.properties , postępując zgodnie z przewodnikiem Szybki start platformy Kafka dla usługi Event Hubs.
Zaktualizuj szczegóły konfiguracji producenta w pliku src/main/resources/app.properties przy użyciu konfiguracji powiązanej z rejestrem schematów i aplikacji Microsoft Entra utworzonej w poprzednim kroku w następujący sposób:
schema.group=contoso-sg schema.registry.url=https://<NAMESPACENAME>.servicebus.windows.net tenant.id=<> client.id=<> client.secret=<>
Postępuj zgodnie z tymi samymi instrukcjami i zaktualizuj również konfigurację azure-schema-registry-for-kafka/tree/master/java/json/samples/kafka-consumer .
W przypadku aplikacji producentów i konsumentów platformy Kafka używane są następujące schematy JSON:
{ "$id": "https://example.com/person.schema.json", "$schema": "https://json-schema.org/draft/2020-12/schema", "title": "CustomerInvoice", "type": "object", "properties": { "invoiceId": { "type": "string" }, "merchantId": { "type": "string" }, "transactionValueUsd": { "type": "integer" }, "userId": { "type": "string" } } }
Używanie producenta platformy Kafka z weryfikacją schematu JSON
Aby uruchomić aplikację producenta platformy Kafka, przejdź do strony azure-schema-registry-for-kafka/tree/master/java/json/samples/kafka-producer.
Aplikację producenta można uruchomić, aby mogła wygenerować rekordy specyficzne dla schematu JSON lub rekordy ogólne. W przypadku określonego trybu rekordów należy najpierw wygenerować klasy względem schematu producenta przy użyciu następującego polecenia maven:
mvn generate-sources
Następnie możesz uruchomić aplikację producenta przy użyciu następujących poleceń.
mvn clean package mvn -e clean compile exec:java -Dexec.mainClass="com.azure.schemaregistry.samples.producer.App"
Po pomyślnym wykonaniu aplikacji producenta zostanie wyświetlony monit o wybranie scenariusza producenta. W tym przewodniku Szybki start możesz wybrać opcję 1 — generuj określonerekordy.
Enter case number: 1 - produce SpecificRecords
Po pomyślnym serializacji i opublikowaniu danych w aplikacji producenta powinny zostać wyświetlone następujące dzienniki konsoli:
INFO com.azure.schemaregistry.samples.producer.KafkaJsonSpecificRecord - Sent Order Invoice 0 INFO com.azure.schemaregistry.samples.producer.KafkaJsonSpecificRecord - Sent Order Invoice 1 INFO com.azure.schemaregistry.samples.producer.KafkaJsonSpecificRecord - Sent Order Invoice 2
Używanie konsumenta platformy Kafka z weryfikacją schematu JSON
Aby uruchomić aplikację konsumenta platformy Kafka, przejdź do adresu azure-schema-registry-for-kafka/tree/master/java/json/samples/kafka-consumer.
Aplikację konsumenta można uruchomić, aby mogła korzystać z rekordów specyficznych dla schematu JSON lub rekordów ogólnych. W przypadku określonego trybu rekordów należy najpierw wygenerować klasy względem schematu producenta przy użyciu następującego polecenia maven:
mvn generate-sources
Następnie możesz uruchomić aplikację konsumenta przy użyciu następującego polecenia.
mvn clean package mvn -e clean compile exec:java -Dexec.mainClass="com.azure.schemaregistry.samples.consumer.App"
Po pomyślnym wykonaniu aplikacji konsumenckiej zostanie wyświetlony monit o wybranie scenariusza producenta. W tym przewodniku Szybki start możesz wybrać opcję 1 — korzystanie z określonychrekordów.
Enter case number: 1 - consume SpecificRecords
Po pomyślnym użyciu danych i deserializacji powinny zostać wyświetlone następujące dzienniki konsoli w aplikacji producenta:
INFO com.azure.schemaregistry.samples.consumer.KafkaJsonSpecificRecord - Invoice received: {invoiceId=Invoice 0, merchantId=Merchant Id 0, transactionValueUsd=0, userId=User Id 0} INFO com.azure.schemaregistry.samples.consumer.KafkaJsonSpecificRecord - Invoice received: {invoiceId=Invoice 1, merchantId=Merchant Id 1, transactionValueUsd=1, userId=User Id 1} INFO com.azure.schemaregistry.samples.consumer.KafkaJsonSpecificRecord - Invoice received: {invoiceId=Invoice 2, merchantId=Merchant Id 2, transactionValueUsd=2, userId=User Id 2}
Czyszczenie zasobów
Usuń przestrzeń nazw usługi Event Hubs lub usuń grupę zasobów zawierającą przestrzeń nazw.