Руководство по Использование API производителя и потребителя Apache Kafka
Узнайте, как использовать API производителя и потребителя Apache Kafka для Kafka в HDInsight.
API производителя Kafka позволяет приложениям отправлять потоки данных в кластер Kafka. API потребителя Kafka позволяет приложениям считывать потоки данных из кластера.
В этом руководстве описано следующее:
- Предварительные требования
- Изучение кода
- Создание и развертывание приложения.
- Запуск приложения в кластере
Дополнительные сведения об интерфейсах API см. в документации по API производителя и API потребителя.
Предварительные требования
- Apache Kafka в кластере HDInsight. Чтобы узнать, как создать кластер, ознакомьтесь со статьей Краткое руководство. Создание кластера Apache Kafka в Azure HDInsight с помощью портала Azure.
- Пакет Java Developer Kit (JDK) версии 8 или аналогичный пакет, например OpenJDK.
- Средство Apache Maven, установленное согласно инструкций Apache. Maven — система сборки проектов Java.
- Клиент SSH, например, Putty. Дополнительные сведения см. в руководстве по подключению к HDInsight (Apache Hadoop) с помощью SSH.
Изучение кода
Пример приложения расположен в подкаталоге Producer-Consumer
по адресу https://github.com/Azure-Samples/hdinsight-kafka-java-get-started. Если вы используете кластер Kafka с включенным Корпоративным пакетом безопасности (ESP) , следует использовать версию приложения, расположенную в подкаталоге DomainJoined-Producer-Consumer
.
Приложение состоит в основном из четырех файлов:
-
pom.xml
: этот файл определяет зависимости проекта, версию Java и методы упаковки. -
Producer.java
: этот файл отправляет случайные предложения в Kafka, используя API производителя. -
Consumer.java
: этот файл использует API потребителя для чтения данных из Kafka и передачи их в STDOUT. -
AdminClientWrapper.java
: Этот файл использует API администратора для создания, описания и удаления разделов Kafka. -
Run.java
: интерфейс командной строки используется для выполнения кода потребителя и производителя.
Pom.xml
В файле pom.xml
важны следующие элементы:
Зависимости. Этот проект использует API производителя и потребителя Kafka, предоставленные в пакете
kafka-clients
. Приведенный ниже код XML определяет эту зависимость:<!-- Kafka client for producer/consumer operations --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency>
Запись
${kafka.version}
объявлена в разделе<properties>..</properties>
файлаpom.xml
. Она настроена для версии Kafka кластера HDInsight.Подключаемые модули. Подключаемые модули Maven предоставляют различные возможности. В этом проекте используются следующие подключаемые модули:
-
maven-compiler-plugin
: с помощью этого модуля можно задать для проекта Java версии 8. Это версия Java, используемая в HDInsight 3.6. -
maven-shade-plugin
: используется для создания файла типа uber jar, содержащего это приложение, а также любые зависимости. Он также используется для установки точки входа приложения, с помощью которой вы сможете напрямую запускать JAR-файл, не указывая основной класс.
-
Producer.java
Производитель взаимодействует с узлами брокера Kafka (рабочие узлы) и отправляет данные в разделы Kafka. Следующий фрагмент кода взят из файла Producer.java в репозитории GitHub. Этот фрагмент демонстрирует, как задавать свойства производителя. Для кластеров с поддержкой корпоративной безопасности необходимо добавить дополнительное свойство properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
Properties properties = new Properties();
// Set the brokers (bootstrap servers)
properties.setProperty("bootstrap.servers", brokers);
// Set how to serialize key/value pairs
properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
Consumer.java
Потребитель взаимодействует с узлами брокера Kafka (рабочие узлы) и считывает записи в цикле. В следующем фрагменте кода из файла Consumer.java задаются свойства потребителя. Для кластеров с поддержкой корпоративной безопасности необходимо добавить дополнительное свойство properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
KafkaConsumer<String, String> consumer;
// Configure the consumer
Properties properties = new Properties();
// Point it to the brokers
properties.setProperty("bootstrap.servers", brokers);
// Set the consumer group (all consumers must belong to a group).
properties.setProperty("group.id", groupId);
// Set how to serialize key/value pairs
properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
// When a group is first created, it has no offset stored to start reading from. This tells it to start
// with the earliest record in the stream.
properties.setProperty("auto.offset.reset","earliest");
consumer = new KafkaConsumer<>(properties);
В этом коде потребитель настроен для чтения с самого начала раздела (auto.offset.reset
имеет значение earliest
).
Run.java
Файл Run.java предоставляет интерфейс командной строки, который выполняет код производителя или потребителя. Нужно указать сведения об узле брокера Kafka в качестве параметра. При необходимости можно указать значение идентификатора группы, которое используется процессом потребителя. При создании нескольких экземпляров потребителя с одним и тем же идентификатором группы между ними будет распределяться нагрузка по операциям чтения из раздела.
Создание и развертывание примера
Использование предварительно созданных JAR-файлов
Скачайте JAR-файлы из примера Azure по началу работы с Kafka. Если кластер имеет Корпоративный пакет безопасности (ESP) , используйте файл kafka-producer-consumer-esp.jar. Выполните приведенную ниже команду, чтобы скопировать JAR-файлы в кластер.
scp kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
Создание JAR-файлов из кода
Если вы хотите пропустить этот шаг, предварительно созданные JAR можно скачать из подкаталога Prebuilt-Jars
. Скачайте файл kafka-producer-consumer.jar. Если кластер имеет Корпоративный пакет безопасности (ESP) , используйте файл kafka-producer-consumer-esp.jar. Выполните шаг 3, чтобы скопировать JAR-файл в кластер HDInsight.
Скачайте и извлеките примеры по ссылке: https://github.com/Azure-Samples/hdinsight-kafka-java-get-started.
Укажите для текущего каталога расположение каталога
hdinsight-kafka-java-get-started\Producer-Consumer
. Если вы используете кластер Kafka с включенным Корпоративным пакетом безопасности (ESP) , установите расположением подкаталогDomainJoined-Producer-Consumer
. Запустите сборку приложения с помощью следующей команды.mvn clean package
Эта команда создает каталог с именем
target
, который содержит файл с именемkafka-producer-consumer-1.0-SNAPSHOT.jar
. Для кластеров с корпоративным пакетом безопасности будет использоваться файлkafka-producer-consumer-esp-1.0-SNAPSHOT.jar
.Замените
sshuser
именем пользователя SSH для кластера, аCLUSTERNAME
— именем кластера. Используя следующую команду, скопируйте файлkafka-producer-consumer-1.0-SNAPSHOT.jar
в свой кластер HDInsight. При появлении запроса введите пароль пользователя SSH.scp ./target/kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
Выполнение примера
Замените
sshuser
именем пользователя SSH для кластера, аCLUSTERNAME
— именем кластера. Откройте SSH-подключение к кластеру, выполнив следующую команду. При появлении запроса введите пароль для учетной записи пользователя SSH.ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
Чтобы получить узлы брокера Kafka, замените значения для
<clustername>
и<password>
в следующей команде и выполните ее. Используйте тот же регистр для<clustername>
, как показано на портале Azure. Замените<password>
паролем для входа в кластер, а затем выполните:sudo apt -y install jq export CLUSTER_NAME='<clustername>' export PASSWORD='<password>' export KAFKABROKERS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
Примечание
Для этой команды требуется доступ к Ambari. Если кластер находится за пределами NSG, выполните эту команду на компьютере с доступом к Ambari.
Создайте раздел Kafka,
myTest
, использовав следующую команду.java -jar kafka-producer-consumer.jar create myTest $KAFKABROKERS
Чтобы запустить производитель и сохранить данные в разделе, выполните следующую команду:
java -jar kafka-producer-consumer.jar producer myTest $KAFKABROKERS
Когда процесс производителя завершится, выполните следующую команду для считывания данных из раздела:
java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS scp ./target/kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
Отобразятся считанные записи, а также их количество.
Нажмите клавиши Ctrl+C, чтобы закрыть потребитель.
Несколько потребителей
При считывании записей объект-получатель Kafka использует группу объектов-получателей. Использование одной группы с несколькими потребителями приведет к считыванию записей с балансировкой нагрузки из раздела. Каждый потребитель в группе получает часть записей.
Приложение-потребитель принимает параметр, который используется как идентификатор группы. Например, следующая команда предназначена для запуска нового потребителя с идентификатором группы myGroup
.
java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup
Нажмите клавиши Ctrl+C, чтобы закрыть потребитель.
Чтобы увидеть этот процесс в действии, выполните следующую команду:
tmux new-session 'java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup' \
\; split-window -h 'java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup' \
\; attach
Эта команда использует tmux
, чтобы разделить терминал на два столбца. Потребитель запускается в каждом столбце с тем же значением идентификатора группы. Когда потребители закончат чтение, обратите внимание, что каждый из них считал только часть записей. Дважды нажмите клавиши CTLR+C, чтобы выйти из tmux
.
Потребление по клиентам, входящих в одну группу, обрабатывается по секциям раздела. В этом примере кода созданный ранее раздел test
имеет восемь секций. Если запущено восемь потребителей, каждый из них считывает записи из одной секции раздела.
Важно!
Число экземпляров потребителя, входящих в группу потребителей, должно соответствовать числу секций. В этом примере одна группа объектов-получателей может содержать не больше восьми объектов-получателей, так как это число секций в разделе. Но у вас может быть несколько групп объектов-получателей, в каждую из которых входит не более восьми из них.
Записи хранятся в Kafka в том порядке, в котором они поступают в секцию. Чтобы обеспечить упорядоченную доставку записей в секцию, нужно создать группу потребителей. При этом число входящих в нее экземпляров потребителя должно соответствовать числу секций. Чтобы обеспечить упорядоченную доставку записей в раздел, нужно создать группу потребителей, которая содержит только один экземпляр потребителя.
Распространенные проблемы
Сбой при создании раздела. Если в кластере включен корпоративный пакет безопасности, используйте предварительно созданные JAR-файлы для производителя и потребителя. JAR-файл с корпоративным пакетом безопасности можно создать с помощью кода в подкаталоге
DomainJoined-Producer-Consumer
. В свойствах производителя и потребителя предусмотрено дополнительное свойствоCommonClientConfigs.SECURITY_PROTOCOL_CONFIG
для кластеров с корпоративным пакетом безопасности.Проблемы с кластерами с корпоративным пакетом безопасности. Если произошел сбой в операциях создания и потребления и вы используете кластер с корпоративным пакетом безопасности, убедитесь, что пользователь
kafka
присутствует во всех политиках Ranger. Если нет, добавьте его во все политики Ranger.
Очистка ресурсов
Чтобы очистить ресурсы, созданные при работе с этим руководством, удалите группу ресурсов. При этом будет удален связанный кластер HDInsight и другие ресурсы, связанные с этой группой ресурсов.
Чтобы удалить группу ресурсов с помощью портала Azure, сделайте следующее:
- На портале Azure разверните меню слева, чтобы открыть меню служб, а затем выберите Группы ресурсов, чтобы просмотреть список групп ресурсов.
- Найдите группу ресурсов, которую нужно удалить, и щелкните правой кнопкой мыши кнопку Дополнительно (…) справа от списка.
- Выберите Удалить группу ресурсов и подтвердите выбор.
Дальнейшие действия
Из этого документа вы узнали, как использовать API производителя и потребителя Apache Kafka для Kafka в HDInsight. Дополнительные сведения о работе с Kafka см. в следующих материалах.