Бөлісу құралы:


Использование схемы JSON с приложениями Apache Kafka

В этом руководстве описан сценарий, в котором схемы JSON используются для сериализации и десериализации событий с помощью реестра схем Azure в Центрах событий.

В этом случае приложение производителя Kafka использует схему JSON, хранящуюся в реестре схем Azure, для сериализации события и публикации их в концентраторе событий Kafka в Центры событий Azure. Потребитель Kafka десериализирует события, которые он потребляет из Центров событий. Для этого используется идентификатор схемы события и схемы JSON, которая хранится в реестре схем Azure. Схема, показывающая сериализацию схемы и десериализацию для приложений Kafka с помощью схемы JSON.

Необходимые компоненты

Если вы впервые используете Центры событий Azure, ознакомьтесь с общими сведениями, прежде чем приступить к работе с этим руководством.

Для работы с данным руководством необходимо следующее:

Создание концентратора событий

Следуйте инструкциям из краткого руководства. Создание пространства имен Центров событий и концентратора событий для создания пространства имен Центров событий и концентратора событий. Затем следуйте инструкциям из получения строка подключения, чтобы получить строка подключения в пространство имен Центров событий.

Запишите следующие параметры, которые вы используете в текущем кратком руководстве.

  • Строка подключения пространства имен Центров событий
  • Имя концентратора событий

Создание схемы

Следуйте инструкциям из руководства по созданию схем с помощью реестра схем, чтобы создать группу схем и схему.

  1. Создайте группу схем с именем contoso-sg с помощью портала реестра схем. Используйте схему JSON в качестве типа сериализации.

  2. В этой группе схем создайте новую схему JSON с именем схемы: Microsoft.Azure.Data.SchemaRegistry.example.CustomerInvoice используя следующее содержимое схемы.

    {
      "$id": "https://example.com/person.schema.json",
      "$schema": "https://json-schema.org/draft/2020-12/schema",
      "title": "CustomerInvoice",
      "type": "object",
      "properties": {
        "invoiceId": {
          "type": "string"
        },
        "merchantId": {
          "type": "string"
        },
        "transactionValueUsd": {
          "type": "integer"
        },
        "userId": {
          "type": "string"
        }
      }
    } 
    

Регистрация приложения для доступа к реестру схем

Вы можете использовать идентификатор Microsoft Entra для авторизации производителя Kafka и клиентского приложения для доступа к ресурсам реестра схем Azure. Чтобы включить его, необходимо зарегистрировать клиентское приложение в клиенте Microsoft Entra из портал Azure.

Чтобы зарегистрировать приложение Microsoft Entra с именем example-app , см. статью "Регистрация приложения с помощью клиента Microsoft Entra".

  • tenant.id — задает идентификатор клиента приложения.
  • client.id — задает идентификатор клиента приложения.
  • client.secret — задает секрет клиента для проверки подлинности.

Если вы используете управляемое удостоверение, вам потребуется:

  • use.managed.identity.credential — указывает, что учетные данные MSI следует использовать для виртуальной машины с поддержкой MSI.
  • managed.identity.clientId — если задано, он создает учетные данные MSI с заданным идентификатором клиента managed.identity.resourceId, если он указан, он создает учетные данные MSI с заданным идентификатором ресурса.

Добавление пользователя в роль читателя реестра схем

Добавьте учетную запись пользователя в роль читателя реестра схем на уровне пространства имен. Вы также можете использовать роль участника реестра схем, но это не обязательно для этого краткого руководства.

  1. На странице пространства имен Центров событий выберите элемент управления доступом (IAM) в меню слева.
  2. На странице управления доступом (IAM) выберите +Добавить -Добавить> назначение ролей в меню.
  3. На странице "Тип назначения" нажмите кнопку "Далее".
  4. На странице "Роли" выберите средство чтения реестра схем и нажмите кнопку "Далее" в нижней части страницы.
  5. Используйте ссылку +Выбрать участников , чтобы добавить example-app приложение, созданное на предыдущем шаге, в роль, а затем нажмите кнопку "Далее".
  6. На странице "Рецензирование и назначение" выберите "Рецензирование и назначение".

Обновление конфигурации клиентского приложения kafka

Необходимо обновить клиентскую конфигурацию производителей и потребительских приложений Kafka с сведениями о приложении Microsoft Entra и сведениями о реестре схем.

Чтобы обновить конфигурацию производителя Kafka, перейдите к azure-schema-registry-for-kafka/tree/master/java/json/samples/kafka-producer.

  1. Обновите конфигурацию приложения Kafka в src/main/resources/app.properties , следуя руководству по краткому руководству по Kafka для Центров событий.

  2. Обновите сведения о конфигурации производителя в src/main/resources/app.properties с помощью связанной конфигурации реестра схем и приложения Microsoft Entra, созданного на предыдущем шаге, следующим образом:

    schema.group=contoso-sg
    schema.registry.url=https://<NAMESPACENAME>.servicebus.windows.net
    
     tenant.id=<>
     client.id=<>
     client.secret=<>
    
  3. Следуйте тем же инструкциям и обновите конфигурацию azure-schema-registry-for-kafka/tree/master/java/json/samples/kafka-consumer .

  4. Для производителей Kafka и потребительских приложений используется следующая схема JSON:

    {
      "$id": "https://example.com/person.schema.json",
      "$schema": "https://json-schema.org/draft/2020-12/schema",
      "title": "CustomerInvoice",
      "type": "object",
      "properties": {
        "invoiceId": {
          "type": "string"
        },
        "merchantId": {
          "type": "string"
        },
        "transactionValueUsd": {
          "type": "integer"
        },
        "userId": {
          "type": "string"
        }
      }
    }
    

Использование производителя Kafka с проверкой схемы JSON

Чтобы запустить приложение производителя Kafka, перейдите к azure-schema-registry-for-kafka/tree/master/java/json/samples/kafka-producer.

  1. Вы можете запустить приложение-производитель, чтобы создать определенные записи схемы JSON или универсальные записи. Для конкретного режима записей необходимо сначала создать классы для любой схемы производителя с помощью следующей команды maven:

    mvn generate-sources
    
  2. Затем можно запустить приложение производителя с помощью следующих команд.

    mvn clean package
    mvn -e clean compile exec:java -Dexec.mainClass="com.azure.schemaregistry.samples.producer.App"
    
  3. После успешного выполнения приложения производителя он предложит выбрать сценарий производителя. В этом кратком руководстве можно выбрать вариант 1 — создать определенныеrecords.

    Enter case number:
    1 - produce SpecificRecords
    
  4. После успешной сериализации и публикации данных в приложении производителя должны появиться следующие журналы консоли:

    INFO com.azure.schemaregistry.samples.producer.KafkaJsonSpecificRecord - Sent Order Invoice 0
    INFO com.azure.schemaregistry.samples.producer.KafkaJsonSpecificRecord - Sent Order Invoice 1
    INFO com.azure.schemaregistry.samples.producer.KafkaJsonSpecificRecord - Sent Order Invoice 2
    

Использование потребителя Kafka с проверкой схемы JSON

Чтобы запустить приложение потребителя Kafka, перейдите к azure-schema-registry-for-kafka/tree/master/java/json/samples/kafka-consumer.

  1. Вы можете запустить приложение-получатель, чтобы оно потребляло определенные записи или универсальные записи схемы JSON. Для конкретного режима записей необходимо сначала создать классы для любой схемы производителя с помощью следующей команды maven:

    mvn generate-sources
    
  2. Затем можно запустить приложение-получатель с помощью следующей команды.

    mvn clean package
    mvn -e clean compile exec:java -Dexec.mainClass="com.azure.schemaregistry.samples.consumer.App"
    
  3. После успешного выполнения приложения-получателя он предложит выбрать сценарий производителя. В этом кратком руководстве можно выбрать вариант 1— использовать specificRecords.

    Enter case number:
    1 - consume SpecificRecords
    
  4. После успешного потребления и десериализации данных в приложении производителя должны появиться следующие журналы консоли:

    INFO com.azure.schemaregistry.samples.consumer.KafkaJsonSpecificRecord - Invoice received: {invoiceId=Invoice 0, merchantId=Merchant Id 0, transactionValueUsd=0, userId=User Id 0}
    INFO com.azure.schemaregistry.samples.consumer.KafkaJsonSpecificRecord - Invoice received: {invoiceId=Invoice 1, merchantId=Merchant Id 1, transactionValueUsd=1, userId=User Id 1}
    INFO com.azure.schemaregistry.samples.consumer.KafkaJsonSpecificRecord - Invoice received: {invoiceId=Invoice 2, merchantId=Merchant Id 2, transactionValueUsd=2, userId=User Id 2}
    
    

Очистка ресурсов

Удалите пространство имен Центров событий или удалите группу ресурсов, содержащую пространство имен.