Руководство. Использование API потоков Apache Kafka в Azure HDInsight

Узнайте, как создать приложение, использующее API для Apache Kafka Streams, и запустить его с помощью Kafka в HDInsight.

В этом руководстве используется приложение для подсчета слов во время потоковой передачи. Оно считывает текстовые данные из раздела Kafka, извлекает отдельные слова, а затем сохраняет слово и количество слов в другом разделе Kafka.

Обработка потока Kafka часто выполняется с помощью Apache Spark. Kafka версии 2.1.1 и 2.4.1 (в HDInsight 4.0 и 5.0) поддерживает API Kafka Потоки. Этот API позволяет преобразовать потоки данных между входными и выходными разделами.

Дополнительные сведения о Потоках Kafka см. в вводной документации на сайте Apache.org.

В этом руководстве описано следующее:

  • Изучение кода
  • Создание и развертывание приложения.
  • Настройка разделов Kafka.
  • Выполнение кода

Необходимые компоненты

Изучение кода

Пример приложения расположен в подкаталоге Streaming по адресу https://github.com/Azure-Samples/hdinsight-kafka-java-get-started. Приложение состоит из двух файлов:

  • файл pom.xml определяет зависимости проекта, версию Java и методы упаковки;
  • файл Stream.java реализует логику потоковой передачи.

Pom.xml

В файле pom.xml важны следующие элементы:

  • Зависимости. Этот проект использует API Потоков Kafka, предоставленный в пакете kafka-clients. Приведенный ниже код XML определяет эту зависимость:

    <!-- Kafka client for producer/consumer operations -->
    <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
    </dependency>
    

    Запись ${kafka.version} объявлена в разделе <properties>..</properties> файла pom.xml. Она настроена для версии Kafka кластера HDInsight.

  • Подключаемые модули. Подключаемые модули Maven предоставляют различные возможности. В этом проекте используются следующие подключаемые модули:

    • С помощью модуля maven-compiler-plugin можно задать для проекта Java версии 8. Для HDInsight 4.0 и 5.0 требуется Java 8.
    • maven-shade-plugin: используется для создания файла типа uber jar, содержащего это приложение, а также любые зависимости. Также используется для установки точки входа приложения, с помощью которой вы сможете напрямую запускать JAR-файл, не указывая основной класс.

Stream.java

Файл Stream.java использует API потоков для реализации приложения для подсчета слов. Оно считывает данные из раздела Kafka с именем test и записывает количество слов в раздел с именем wordcounts.

Следующий код определяет приложение для подсчета слов:

package com.microsoft.example;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;

import java.util.Arrays;
import java.util.Properties;

public class Stream
{
    public static void main( String[] args ) {
        Properties streamsConfig = new Properties();
        // The name must be unique on the Kafka cluster
        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-example");
        // Brokers
        streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, args[0]);
        // SerDes for key and values
        streamsConfig.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        streamsConfig.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

        // Serdes for the word and count
        Serde<String> stringSerde = Serdes.String();
        Serde<Long> longSerde = Serdes.Long();

        KStreamBuilder builder = new KStreamBuilder();
        KStream<String, String> sentences = builder.stream(stringSerde, stringSerde, "test");
        KStream<String, Long> wordCounts = sentences
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .map((key, word) -> new KeyValue<>(word, word))
                .countByKey("Counts")
                .toStream();
        wordCounts.to(stringSerde, longSerde, "wordcounts");

        KafkaStreams streams = new KafkaStreams(builder, streamsConfig);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

Создание и развертывание примера

Чтобы создать и развернуть проект для Kafka в кластере HDInsight, выполните следующие действия:

  1. Укажите для текущего каталога расположение каталога hdinsight-kafka-java-get-started-master\Streaming и выполните следующую команду, чтобы создать пакет JAR:

    mvn clean package
    

    Эта команда создает пакет в файле target/kafka-streaming-1.0-SNAPSHOT.jar.

  2. Замените sshuser именем пользователя SSH для кластера, а clustername — именем кластера. Используя следующую команду, скопируйте файл kafka-streaming-1.0-SNAPSHOT.jar в свой кластер HDInsight. При появлении запроса введите пароль для учетной записи пользователя SSH.

    scp ./target/kafka-streaming-1.0-SNAPSHOT.jar sshuser@clustername-ssh.azurehdinsight.net:kafka-streaming.jar
    

Создание разделов Apache Kafka

  1. Замените sshuser именем пользователя SSH для кластера, а CLUSTERNAME — именем кластера. Откройте SSH-подключение к кластеру, выполнив следующую команду. При появлении запроса введите пароль для учетной записи пользователя SSH.

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  2. Установите jq — обработчик командной строки JSON. В открытом сеансе SSH-подключения введите следующую команду для установки jq:

    sudo apt -y install jq
    
  3. Настройте переменную пароля. Замените PASSWORD паролем для входа в кластер, а затем введите следующую команду:

    export PASSWORD='PASSWORD'
    
  4. Извлеките имя кластера с правильным регистром. Фактический регистр имени кластера может отличаться от ожидаемого, в зависимости от способа создания кластера. Эта команда получает фактический регистр, а затем сохраняет его в переменной. Введите следующую команду:

    export CLUSTER_NAME=$(curl -u admin:$PASSWORD -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    

    Примечание.

    Если вы выполняете этот процесс вне кластера, используйте другой способ хранения имени кластера. Получите имя кластера в нижнем регистре на портале Azure. Затем измените имя кластера на <clustername> в следующей команде и выполните ее: export clusterName='<clustername>'.

  5. Чтобы получить узлы Apache Zookeeper и брокера Kafka, используйте приведенные ниже команды. При появлении запроса введите пароль для учетной записи администратора, чтобы войти на кластер.

    export KAFKAZKHOSTS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2);
    
    export KAFKABROKERS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
    

    Примечание.

    Для этих команд требуется доступ к Ambari. Если кластер находится за пределами NSG, выполните следующие команды на компьютере с доступом к Ambari.

  6. Чтобы создать разделы для операции потоковой передачи, используйте следующие команды:

    Примечание.

    Вы можете получить сообщение-ошибку о том, что раздел test уже существует. Это нормально, так как он, возможно, был создан в руководстве по API производителя и потребителя.

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic test --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic wordcounts --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic RekeyedIntermediateTopic --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic wordcount-example-Counts-changelog --zookeeper $KAFKAZKHOSTS
    

    Разделы используются для следующих целей:

    • test. В этот раздел поступают записи. Здесь приложение потоковой передачи считывает их.
    • wordcounts. В этом разделе приложение потоковой передачи хранит свои выходные данные.
    • RekeyedIntermediateTopic. Этот раздел используется для секционирования данных, так как счетчик обновляется оператором countByKey.
    • wordcount-example-Counts-changelog. Этот раздел является хранилищем состояний, используемым операцией countByKey.

    Кроме того, Kafka в HDInsight можно настроить на автоматическое создание разделов. Дополнительные сведения см. в статье How to configure Apache Kafka on HDInsight to automatically create topics (Настройка автоматического создания разделов в Apache Kafka в HDInsight).

Выполнение кода

  1. Для запуска приложения потоковой передачи в качестве фонового процесса используйте следующую команду:

    java -jar kafka-streaming.jar $KAFKABROKERS $KAFKAZKHOSTS &
    

    Вы можете получить предупреждение об Apache log4j. Его можно проигнорировать.

  2. Чтобы отправить записи в раздел test, используйте следующую команду для запуска приложения-отправителя:

    java -jar kafka-producer-consumer.jar producer test $KAFKABROKERS
    
  3. После завершения работы отправителя просмотрите сведения, хранящиеся в разделе wordcounts, с помощью следующей команды:

    /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server $KAFKABROKERS --topic wordcounts --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --from-beginning
    

    В соответствии с параметрами --property объект-получатель консоли печатает ключ (машинное слово) и число (значение). Кроме того, этот параметр настраивает десериализатор, используемый при считывании этих значений из Kafka.

    Результат будет аналогичен приведенному ниже:

    dwarfs  13635
    ago     13664
    snow    13636
    dwarfs  13636
    ago     13665
    a       13803
    ago     13666
    a       13804
    ago     13667
    ago     13668
    jumped  13640
    jumped  13641
    

    Параметр --from-beginning настраивает запуск объекта-получателя в начале записей, хранящихся в разделе. Число увеличивается каждый раз, когда встречается слово, поэтому раздел содержит несколько записей для каждого слова с увеличивающимся числом.

  4. Нажмите клавиши Ctrl+C, чтобы закрыть отправитель. Снова нажмите клавиши Ctrl+C, чтобы выйти из приложения и объекта-получателя.

  5. Чтобы удалить разделы для операции потоковой передачи, используйте следующие команды:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic test --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic wordcounts --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic RekeyedIntermediateTopic --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic wordcount-example-Counts-changelog --zookeeper $KAFKAZKHOSTS
    

Очистка ресурсов

Чтобы очистить ресурсы, созданные при работе с этим руководством, удалите группу ресурсов. При этом будет удален связанный кластер HDInsight и другие ресурсы, связанные с этой группой ресурсов.

Чтобы удалить группу ресурсов с помощью портала Azure, сделайте следующее:

  1. На портале Azure разверните меню слева, чтобы открыть меню служб, а затем выберите Группы ресурсов, чтобы просмотреть список групп ресурсов.
  2. Найдите группу ресурсов, которую нужно удалить, и щелкните правой кнопкой мыши кнопку Дополнительно (…) справа от списка.
  3. Выберите Удалить группу ресурсов и подтвердите выбор.

Следующие шаги

Из этого документа вы узнали, как использовать API для Apache Kafka Streams с Kafka в HDInsight. Дополнительные сведения о работе с Kafka см. в следующих материалах.