Tutorial: Usar a API de streams do Apache Kafka no Azure HDInsight

Saiba como criar um aplicativo que usa a API de Streams do Apache Kafka e executá-lo com o Kafka no HDInsight.

O aplicativo usado neste tutorial é uma contagem de palavras de streaming. Ele lê dados de texto de um tópico Kafka, extrai palavras individuais e, em seguida, armazena a palavra e a contagem em outro tópico Kafka.

O processamento de fluxo do Kafka normalmente é feito usando o Apache Spark. O Kafka versão 2.1.1 e 2.4.1 (no HDInsight 4.0 e 5.0) dá suporte à API de Streams do Kafka. Essa API permite transformar fluxos de dados entre tópicos de entrada e saída.

Para obter mais informações sobre Streams do Kafka, consulte a documentaçãoIntrodução a Streams em Apache.org.

Neste tutorial, você aprenderá como:

  • Compreender o código
  • Compilar e implantar o aplicativo
  • Configurar tópicos Kafka
  • Executar o código

Pré-requisitos

Compreender o código

O aplicativo de exemplo está localizado em https://github.com/Azure-Samples/hdinsight-kafka-java-get-started, além do subdiretório Streaming. O aplicativo consiste de dois arquivos:

  • pom.xml: este arquivo define as dependências do projeto, versão do Java e os métodos de empacotamento.
  • Stream.java: Esse arquivo implementa a lógica de streaming.

Pom.xml

As coisas importantes para entender no arquivo pom.xml são:

  • Dependências: Este projeto depende da API do Kafka Streams, que é fornecida pelo pacote kafka-clients. O seguinte código XML define essa dependência:

    <!-- Kafka client for producer/consumer operations -->
    <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
    </dependency>
    

    A entrada ${kafka.version} é declarada na seção <properties>..</properties> de pom.xml, e está configurada para a versão Kafka do cluster HDInsight.

  • Plug-ins: os plug-ins do Maven oferecem várias funcionalidades. Neste projeto, são usados os seguintes plug-ins:

    • maven-compiler-plugin: usado para definir a versão do Java usada pelo projeto como 8. O HDInsight 4.0 e 5.0 requer Java 8.
    • maven-shade-plugin: usado para gerar um uber jar que contém esse aplicativo, bem como eventuais dependências. Também é usado para definir o ponto de entrada do aplicativo, para que você possa executar diretamente o arquivo Jar sem a necessidade de especificar a classe principal.

Stream.java

O arquivo Stream.java usa a API do Streams para implementar um aplicativo de contagem de palavras. Ele lê dados de um tópico Kafka chamado test e grava as contagens de palavras em um tópico chamado wordcounts.

O código a seguir define o aplicativo de contagem de palavras:

package com.microsoft.example;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;

import java.util.Arrays;
import java.util.Properties;

public class Stream
{
    public static void main( String[] args ) {
        Properties streamsConfig = new Properties();
        // The name must be unique on the Kafka cluster
        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-example");
        // Brokers
        streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, args[0]);
        // SerDes for key and values
        streamsConfig.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        streamsConfig.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

        // Serdes for the word and count
        Serde<String> stringSerde = Serdes.String();
        Serde<Long> longSerde = Serdes.Long();

        KStreamBuilder builder = new KStreamBuilder();
        KStream<String, String> sentences = builder.stream(stringSerde, stringSerde, "test");
        KStream<String, Long> wordCounts = sentences
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .map((key, word) -> new KeyValue<>(word, word))
                .countByKey("Counts")
                .toStream();
        wordCounts.to(stringSerde, longSerde, "wordcounts");

        KafkaStreams streams = new KafkaStreams(builder, streamsConfig);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

Compilar e implantar o exemplo

Para criar e implantar o projeto para o Kafka no Cluster HDInsight, utilize as seguintes etapas:

  1. Defina o diretório atual para o local do diretório hdinsight-kafka-java-get-started-master\Streaming e, em seguida, use o seguinte comando para criar um pacote jar:

    mvn clean package
    

    Este comando cria o pacote em target/kafka-streaming-1.0-SNAPSHOT.jar.

  2. Substitua sshuser pelo usuário do SSH do cluster e substitua clustername pelo nome do cluster. Use o seguinte comando para copiar o arquivo kafka-streaming-1.0-SNAPSHOT.jar para o Cluster HDInsight. Se solicitado, insira a senha para a conta de usuário SSH.

    scp ./target/kafka-streaming-1.0-SNAPSHOT.jar sshuser@clustername-ssh.azurehdinsight.net:kafka-streaming.jar
    

Criar tópicos do Apache Kafka

  1. Substitua sshuser pelo usuário do SSH do cluster e substitua CLUSTERNAME pelo nome do cluster. Abra uma conexão SSH para o cluster inserindo o seguinte comando. Se solicitado, insira a senha para a conta de usuário SSH.

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  2. Instale jq, um processador JSON de linha de comando. Na conexão SSH aberta, digite o seguinte comando para instalar o jq:

    sudo apt -y install jq
    
  3. Configurar variável de senha. Substitua PASSWORD pela senha de logon do cluster e insira o comando:

    export PASSWORD='PASSWORD'
    
  4. Extraia o nome do cluster com grafia correta de maiúsculas e minúsculas. A grafia de maiúsculas e minúsculas real do nome do cluster pode ser diferente do esperado, dependendo de como o cluster foi criado. Esse comando obtém o invólucro real e, em seguida, armazena-o em uma variável. Insira o seguinte comando:

    export CLUSTER_NAME=$(curl -u admin:$PASSWORD -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    

    Observação

    Se você estiver realizando esse processo de fora do cluster, haverá um procedimento diferente para armazenar o nome do cluster. Obtenha o nome do cluster em letras minúsculas do portal do Azure. Em seguida, substitua o nome do cluster por <clustername> no comando a seguir e execute-o: export clusterName='<clustername>'.

  5. Para obter os hosts de broker Kafka e os hosts Apache Zookeeper, use os comandos a seguir. Quando solicitado, insira a senha para a conta de logon do cluster (admin).

    export KAFKAZKHOSTS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2);
    
    export KAFKABROKERS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
    

    Observação

    Esses comandos exigem acesso ao Ambari. Se o cluster estiver atrás de um NSG, execute esses comandos em um computador que possa acessar o Ambari.

  6. Para criar os tópicos usados pela operação de streaming, use os seguintes comandos:

    Observação

    Você pode receber um erro que o tópico test já existe. Não há problema nisso, pois ele pode ter sido criado no tutorial de API de produtor e consumidor.

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic test --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic wordcounts --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic RekeyedIntermediateTopic --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic wordcount-example-Counts-changelog --zookeeper $KAFKAZKHOSTS
    

    Os tópicos são usados para as seguintes finalidades:

    • test: Este tópico é onde os registros são recebidos. O aplicativo de streaming faz a leitura daqui.
    • wordcounts: Este tópico é onde o aplicativo de transmissão armazena sua saída.
    • RekeyedIntermediateTopic: Este tópico é usado para reparticionar dados como a contagem é atualizada pelo operador countByKey.
    • wordcount-example-Counts-changelog: Este tópico é um armazenamento de estado usado pela operação countByKey

    O Kafka no HDInsight também pode ser configurado para criar tópicos automaticamente. Para obter mais informações, consulte o documento Configurar a criação automática de tópicos.

Executar o código

  1. Para iniciar o aplicativo de streaming como processo em segundo plano, use o seguinte comando:

    java -jar kafka-streaming.jar $KAFKABROKERS $KAFKAZKHOSTS &
    

    Você pode receber um aviso sobre o Apache log4j. Você pode ignorar esse aviso.

  2. Para enviar registros para o tópico test, use o seguinte comando para iniciar o aplicativo produtor:

    java -jar kafka-producer-consumer.jar producer test $KAFKABROKERS
    
  3. Após a conclusão do produtor, use o seguinte comando para exibir as informações armazenadas no tópico wordcounts:

    /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server $KAFKABROKERS --topic wordcounts --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --from-beginning
    

    Os parâmetros --property informam ao consumidor do console para imprimir a chave (palavra) juntamente com a contagem (valor). Esses parâmetros também configuram o desserializador a ser usado ao fazer a leitura desses valores do Kafka.

    A saída é semelhante ao texto a seguir:

    dwarfs  13635
    ago     13664
    snow    13636
    dwarfs  13636
    ago     13665
    a       13803
    ago     13666
    a       13804
    ago     13667
    ago     13668
    jumped  13640
    jumped  13641
    

    O parâmetro --from-beginning configura o consumidor para começar do início dos registros armazenados no tópico. A contagem aumenta sempre que uma palavra é encontrada, logo o tópico contém várias entradas para cada palavra, com uma contagem crescente.

  4. Use Ctrl + C para sair do produtor. Continue usando Ctrl + C para sair do aplicativo e do consumidor.

  5. Para excluir os tópicos usados pela operação de streaming, use os seguintes comandos:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic test --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic wordcounts --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic RekeyedIntermediateTopic --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic wordcount-example-Counts-changelog --zookeeper $KAFKAZKHOSTS
    

Limpar os recursos

Para limpar os recursos criados por este tutorial, você pode excluir o grupo de recursos. A exclusão do grupo de recursos também exclui o cluster HDInsight associado e todos os outros recursos associados ao grupo de recursos.

Para remover o grupo de recursos usando o portal do Azure:

  1. No portal do Azure, expanda o menu à esquerda para abrir o menu de serviços e escolha Grupo de Recursos para exibir a lista dos seus grupos de recursos.
  2. Localize o grupo de recursos a ser excluído e clique com o botão direito do mouse no botão Mais (...) do lado direito da lista.
  3. Selecione Excluir grupo de recursos e confirme.

Próximas etapas

Neste documento, você aprendeu a usar a API de Streams do Apache Kafka com Kafka no HDInsight. Confira o seguinte para saber mais sobre como trabalhar com o Kafka.