Поделиться через


Проверка схем для приложений Apache Kafka с помощью Avro (Java)

В этом кратком руководстве мы рассмотрим, как проверить событие из приложений Apache Kafka с помощью реестра схем Azure для Центров событий.

В этом случае приложение производителя Kafka использует схему Avro, хранящуюся в реестре схем Azure, для сериализации события и публикации их в разделе или концентраторе событий Kafka в Центрах событий Azure. Потребитель Kafka десериализирует события, которые он потребляет из Центров событий. Для этого он использует идентификатор схемы события и схемы Avro, которая хранится в реестре схем Azure.

Схема, показывающая сериализацию схемы и десериализацию для приложений Kafka с помощью схемы Avro.

Предпосылки

Если вы впервые используете Центры событий Azure, ознакомьтесь с общими сведениями о Центрах событий, прежде чем приступить к работе с этим руководством.

Для завершения этой быстрой установки вам потребуются следующие пререквизиты:

Создание концентратора событий

Следуйте инструкциям из краткого руководства: Создайте пространство имен Центров событий и концентратор событий, чтобы создать пространство имен Центров событий и концентратор событий. Затем следуйте инструкциям из раздела Получите строку подключения, чтобы получить строку подключения для вашего пространства имен Event Hubs.

Запишите следующие параметры, которые вы используете в текущем быстром старте.

  • Строка подключения для пространства имен Event Hubs
  • Имя концентратора событий

Создание схемы

Следуйте инструкциям из руководства по созданию схем с помощью реестра схем, чтобы создать группу схем и схему.

  1. Создайте группу схем с именем contoso-sg с помощью портала реестра схем. Используйте Avro в качестве типа сериализации и None для режима совместимости.

  2. В этой группе схем создайте новую схему Avro с именем схемы: Microsoft.Azure.Data.SchemaRegistry.example.Order используя следующее содержимое схемы.

    {
      "namespace": "Microsoft.Azure.Data.SchemaRegistry.example",
      "type": "record",
      "name": "Order",
      "fields": [
        {
          "name": "id",
          "type": "string"
        },
        {
          "name": "amount",
          "type": "double"
        },
        {
          "name": "description",
          "type": "string"
        }
      ]
    } 
    

Регистрация приложения для доступа к реестру схем

Идентификатор Microsoft Entra можно использовать для авторизации производителя и клиентского приложения Kafka для доступа к ресурсам реестра схем Azure, зарегистрировав клиентское приложение с помощью клиента Microsoft Entra на портале Azure.

Чтобы зарегистрировать приложение Microsoft Entra с именем example-app , см. статью "Регистрация приложения с помощью клиента Microsoft Entra".

  • tenant.id — задает идентификатор клиента приложения.
  • client.id — задает идентификатор клиента приложения.
  • client.secret — задает секрет клиента для проверки подлинности.

Если вы используете управляемое удостоверение, вам потребуется:

  • use.managed.identity.credential — указывает, что учетные данные MSI следует использовать, и они должны применяться для виртуальной машины с поддержкой MSI.
  • managed.identity.clientId — если задано, он создает учетные данные MSI с заданным идентификатором клиента.
  • managed.identity.resourceId — если задано, он создает учетные данные MSI с указанным идентификатором ресурса.

Добавление пользователя в роль читателя реестра схем

Добавьте учетную запись пользователя в роль читателя реестра схем на уровне пространства имен. Вы также можете использовать роль Контрибьютора реестра схем, но это не необходимо для данного краткого обзора.

  1. На странице пространства имен Центров событий выберите Управление доступом (IAM) в меню слева.
  2. На странице управления доступом (IAM) выберите + Добавить ->Добавить назначение роли в меню.
  3. На странице "Тип назначения" нажмите кнопку "Далее".
  4. На странице "Роли" выберите средство чтения реестра схем (предварительная версия) и нажмите кнопку "Далее " в нижней части страницы.
  5. Используйте ссылку +Выбрать участников , чтобы добавить example-app приложение, созданное на предыдущем шаге, в роль, а затем нажмите кнопку "Далее".
  6. На странице "Рецензирование и назначение" выберите "Рецензирование и назначение".

Обновление конфигурации клиентского приложения kafka

Необходимо обновить клиентскую конфигурацию приложений производителя и потребителей Kafka с конфигурацией, связанной с созданным приложением Microsoft Entra, и сведениями реестра схем.

Чтобы обновить конфигурацию производителя Kafka, перейдите к azure-schema-registry-for-kafka/tree/master/java/avro/samples/kafka-producer.

  1. Обновите конфигурацию приложения Kafka в src/main/resources/app.properties , следуя руководству по краткому руководству по Kafka для Центров событий.

  2. Обновите параметры конфигурации для производителя в src/main/resources/app.properties с помощью конфигурации, связанной с реестром схем, и приложения Microsoft Entra, которые вы создали выше, следующим образом:

    schema.group=contoso-sg
    schema.registry.url=https://<NAMESPACENAME>.servicebus.windows.net
    
     tenant.id=<>
     client.id=<>
     client.secret=<>
    
  3. Следуйте тем же инструкциям и обновите конфигурацию azure-schema-registry-for-kafka/tree/master/java/avro/samples/kafka-consumer .

  4. Для приложений производителя и потребителей Kafka используется следующая схема Avro:

    {
      "namespace": "com.azure.schemaregistry.samples",
      "type": "record",
      "name": "Order",
      "fields": [
        {
          "name": "id",
          "type": "string"
        },
        {
          "name": "amount",
          "type": "double"
        },
        {
          "name": "description",
          "type": "string"
        }
      ]
    }
    

Использование производителя Kafka с проверкой схемы Avro

Чтобы запустить приложение производителя Kafka, перейдите к azure-schema-registry-for-kafka/tree/master/java/avro/samples/kafka-producer.

  1. Вы можете запустить приложение-производитель, чтобы оно создавало как специфичные записи Avro, так и универсальные. Для конкретного режима записей необходимо сначала создать классы для любой схемы производителя с помощью следующей команды maven:

    mvn generate-sources
    
  2. Затем можно запустить приложение производителя с помощью следующих команд.

    mvn clean package
    mvn -e clean compile exec:java -Dexec.mainClass="com.azure.schemaregistry.samples.producer.App"
    
  3. После успешного выполнения приложения производителя он предложит выбрать сценарий производителя. В этом кратком руководстве можно выбрать вариант 1— создать Avro SpecificRecords.

    Enter case number:
    1 - produce Avro SpecificRecords
    2 - produce Avro GenericRecords
    
  4. После успешной сериализации и публикации данных в приложении производителя должны появиться следующие журналы консоли:

    INFO com.azure.schemaregistry.samples.producer.KafkaAvroSpecificRecord - Sent Order {"id": "ID-0", "amount": 10.0, "description": "Sample order 0"}
    INFO com.azure.schemaregistry.samples.producer.KafkaAvroSpecificRecord - Sent Order {"id": "ID-1", "amount": 11.0, "description": "Sample order 1"}
    INFO com.azure.schemaregistry.samples.producer.KafkaAvroSpecificRecord - Sent Order {"id": "ID-2", "amount": 12.0, "description": "Sample order 2"}
    

Использование потребителя Kafka с проверкой схемы Avro

Чтобы запустить приложение потребителя Kafka, перейдите к azure-schema-registry-for-kafka/tree/master/java/avro/samples/kafka-consumer.

  1. Вы можете запустить приложение-получатель, чтобы оно потребляло записи, специфичные для Avro, или универсальные записи. Для конкретного режима записей необходимо сначала создать классы для любой схемы производителя с помощью следующей команды maven:

    mvn generate-sources
    
  2. Затем можно запустить приложение-получатель с помощью следующей команды.

    mvn clean package
    mvn -e clean compile exec:java -Dexec.mainClass="com.azure.schemaregistry.samples.consumer.App"
    
  3. После успешного выполнения потребительского приложения программа предложит выбрать сценарий производителя. В этом кратком руководстве можно выбрать опцию 1 — использовать Avro SpecificRecords.

    Enter case number:
    1 - consume Avro SpecificRecords
    2 - consume Avro GenericRecords
    
  4. После успешного потребления и десериализации данных в приложении производителя должны появиться следующие журналы консоли:

    INFO com.azure.schemaregistry.samples.consumer.KafkaAvroSpecificRecord - Order received: {"id": "ID-0", "amount": 10.0, "description": "Sample order 0"}
    INFO com.azure.schemaregistry.samples.consumer.KafkaAvroSpecificRecord - Order received: {"id": "ID-1", "amount": 11.0, "description": "Sample order 1"}
    INFO com.azure.schemaregistry.samples.consumer.KafkaAvroSpecificRecord - Order received: {"id": "ID-2", "amount": 12.0, "description": "Sample order 2"}
    

Очистка ресурсов

Удалите пространство имен Центров событий или удалите группу ресурсов, содержащую пространство имен.