Condividi tramite


Esercitazione: Usare l'API di flussi Apache Kafka in Azure HDInsight

Informazioni su come creare un'applicazione che usa l'API Apache Kafka Streams ed eseguirla con Kafka in HDInsight.

In questa esercitazione viene usata un'applicazione di conteggio delle parole. L'applicazione legge i dati di testo da un argomento Kafka, estrae singole parole e quindi archivia il conteggio delle parole in un altro argomento Kafka.

L'elaborazione del flusso Kafka viene spesso eseguita usando Apache Spark. Kafka versione 2.1.1 e 2.4.1 (in HDInsight 4.0 e 5.0) supporta l'API di Flussi Kafka. che consente di trasformare i flussi di dati tra argomenti di input e argomenti di output.

Per altre informazioni su Kafka Streams, vedere la documentazione di introduzione ai flussi su Apache.org.

In questa esercitazione apprenderai a:

  • Informazioni sul codice
  • Compilare e distribuire l'applicazione
  • Configurare gli argomenti Kafka
  • Eseguire il codice

Prerequisiti

Informazioni sul codice

L'applicazione di esempio si trova in https://github.com/Azure-Samples/hdinsight-kafka-java-get-started nella sottodirectory Streaming. L'applicazione è costituita da due file:

  • pom.xml: definisce le dipendenze di progetto, la versione di Java e i metodi di creazione dei pacchetti.
  • Stream.java: implementa la logica di flusso.

Pom.xml

Gli aspetti importanti da comprendere nel file pom.xml sono:

  • Dipendenze: questo progetto si basa sull'API Kafka Streams, che è disponibile nel pacchetto kafka-clients. Il codice XML seguente definisce questa dipendenza:

    <!-- Kafka client for producer/consumer operations -->
    <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
    </dependency>
    

    La voce ${kafka.version} viene dichiarata nella sezione <properties>..</properties> di pom.xml ed è configurata per la versione Kafka del cluster HDInsight.

  • Plug-in: i plug-in Maven offrono varie funzionalità. In questo progetto vengono usati i plug-in seguenti:

    • maven-compiler-plugin: usato per impostare su 8 la versione di Java usata dal progetto. HDInsight 4.0 e 5.0 richiede Java 8.
    • maven-shade-plugin: usato per generare un file uber jar contenente questa applicazione e tutte le dipendenze. Viene inoltre usato per impostare il punto di ingresso dell'applicazione, in modo che sia possibile eseguire il file Jar direttamente senza dover specificare la classe principale.

Stream.Java

Il file Stream.java usa l'API Streams per implementare un'applicazione di conteggio delle parole. Legge i dati da un argomento Kafka denominato test e scrive il conteggio delle parole in un argomento denominato wordcounts.

Il codice seguente descrive l'applicazione di conteggio delle parole:

package com.microsoft.example;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;

import java.util.Arrays;
import java.util.Properties;

public class Stream
{
    public static void main( String[] args ) {
        Properties streamsConfig = new Properties();
        // The name must be unique on the Kafka cluster
        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-example");
        // Brokers
        streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, args[0]);
        // SerDes for key and values
        streamsConfig.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        streamsConfig.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

        // Serdes for the word and count
        Serde<String> stringSerde = Serdes.String();
        Serde<Long> longSerde = Serdes.Long();

        KStreamBuilder builder = new KStreamBuilder();
        KStream<String, String> sentences = builder.stream(stringSerde, stringSerde, "test");
        KStream<String, Long> wordCounts = sentences
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .map((key, word) -> new KeyValue<>(word, word))
                .countByKey("Counts")
                .toStream();
        wordCounts.to(stringSerde, longSerde, "wordcounts");

        KafkaStreams streams = new KafkaStreams(builder, streamsConfig);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

Creare e distribuire l'esempio

Per creare e distribuire il progetto in un cluster Kafka in HDInsight, seguire questa procedura:

  1. Impostare la directory corrente sul percorso della directory hdinsight-kafka-java-get-started-master\Streaming e quindi usare il comando seguente per creare un pacchetto JAR:

    mvn clean package
    

    Questo comando crea il pacchetto in target/kafka-streaming-1.0-SNAPSHOT.jar.

  2. Sostituire sshuser con il nome utente SSH del cluster e sostituire clustername con il nome del cluster. Usare il comando seguente per copiare il file kafka-streaming-1.0-SNAPSHOT.jar nel cluster HDInsight. Quando richiesto, immettere la password per l'account utente SSH.

    scp ./target/kafka-streaming-1.0-SNAPSHOT.jar sshuser@clustername-ssh.azurehdinsight.net:kafka-streaming.jar
    

Creare gli argomenti di Apache Kafka

  1. Sostituire sshuser con il nome utente SSH del cluster e sostituire CLUSTERNAME con il nome del cluster. Per aprire una connessione SSH al cluster, immettere il comando seguente. Quando richiesto, immettere la password per l'account utente SSH.

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  2. Installare jq, un processore JSON da riga di comando. Dalla connessione SSH aperta, immettere il comando seguente per installare jq:

    sudo apt -y install jq
    
  3. Configurare la variabile di password. Sostituire PASSWORD con la password di accesso al cluster e quindi immettere il comando:

    export PASSWORD='PASSWORD'
    
  4. Estrarre il nome del cluster con l'uso corretto di maiuscole e minuscole. L'uso effettivo di maiuscole e minuscole nel nome del cluster può differire dal previsto, a seconda della modalità di creazione del cluster. Questo comando ottiene l'effettiva combinazione di maiuscole e minuscole e quindi la archivia in una variabile. Immettere il comando seguente:

    export CLUSTER_NAME=$(curl -u admin:$PASSWORD -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    

    Nota

    Se si esegue questo processo dall'esterno del cluster, è disponibile una procedura diversa per l'archiviazione del nome del cluster. Recuperare il nome del cluster in lettere minuscole dal portale di Azure. Sostituire quindi <clustername> con il nome del cluster nel comando seguente ed eseguire il comando: export clusterName='<clustername>'.

  5. Per ottenere gli host del broker Kafka e gli host Apache Zookeeper, usare i comandi seguenti. Quando richiesto, immettere la password dell'account (admin) di accesso al cluster.

    export KAFKAZKHOSTS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2);
    
    export KAFKABROKERS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
    

    Nota

    Questi comandi richiedono l'accesso a Ambari. Se il cluster è protetto da un gruppo NSG, eseguire questi comandi da un computer in grado di accedere ad Ambari.

  6. Per creare gli argomenti usati dall'operazione di streaming, usare i comandi seguenti:

    Nota

    È possibile che venga visualizzato un errore che indica che l'argomento test esiste già. Non è un problema poiché potrebbe essere stato creato nell'esercitazione dell'API Producer e Consumer.

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic test --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic wordcounts --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic RekeyedIntermediateTopic --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic wordcount-example-Counts-changelog --zookeeper $KAFKAZKHOSTS
    

    Gli argomenti vengono usati per gli scopi seguenti:

    • test: in questo argomento vengono ricevuti i record. Applicazione di streaming legge i dati da questo argomento.
    • wordcounts: in questo argomento l'applicazione di streaming archivia l'output.
    • RekeyedIntermediateTopic: questo argomento viene usato per partizionare nuovamente i dati mentre il conteggio viene aggiornato dall'operatore countByKey.
    • wordcount-example-Counts-changelog: questo argomento è un archivio di stati usato dall'operazione countByKey.

    È possibile configurare Kafka in HDInsight anche in modo che gli argomenti vengano creati automaticamente. Per altre informazioni, vedere il documento Configure automatic topic creation (Configurare la creazione automatica degli argomenti).

Eseguire il codice

  1. Per avviare l'applicazione di streaming come processo in background, usare il comando seguente:

    java -jar kafka-streaming.jar $KAFKABROKERS $KAFKAZKHOSTS &
    

    È possibile che venga visualizzato un avviso su Apache log4j. È possibile ignorare questo avviso.

  2. Per inviare i record all'argomento test, usare il comando seguente per avviare l'applicazione producer:

    java -jar kafka-producer-consumer.jar producer test $KAFKABROKERS
    
  3. Al termine dell'elaborazione del producer, usare il comando seguente per visualizzare le informazioni archiviate nell'argomento wordcounts:

    /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server $KAFKABROKERS --topic wordcounts --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --from-beginning
    

    I parametri --property indicano al consumer di console di stampare sia la chiave (parola) sia il numero (valore). Questo parametro configura anche il deserializzatore da usare durante la lettura dei valori da Kafka.

    L'output è simile al testo seguente:

    dwarfs  13635
    ago     13664
    snow    13636
    dwarfs  13636
    ago     13665
    a       13803
    ago     13666
    a       13804
    ago     13667
    ago     13668
    jumped  13640
    jumped  13641
    

    Il parametro --from-beginning configura il consumer in modo che venga avviato all'inizio dei record archiviati nell'argomento. Il conteggio viene incrementato ogni volta che viene rilevata una parola, pertanto l'argomento contiene più voci per ogni parola, con un numero crescente.

  4. Usare Ctrl + C per chiudere il producer. Usare ancora Ctrl + C per chiudere l'applicazione e il consumer.

  5. Per eliminare gli argomenti usati dall'operazione di streaming, eseguire questi comandi:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic test --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic wordcounts --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic RekeyedIntermediateTopic --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic wordcount-example-Counts-changelog --zookeeper $KAFKAZKHOSTS
    

Pulire le risorse

Per pulire le risorse create da questa esercitazione, eliminare il gruppo di risorse. Se si elimina il gruppo di risorse, vengono eliminati anche il cluster HDInsight associato e tutte le altre risorse correlate al gruppo di risorse.

Per rimuovere il gruppo di risorse usando il portale di Azure:

  1. Nel portale di Azure espandere il menu a sinistra per aprire il menu dei servizi e quindi scegliere Gruppi di risorse per visualizzare l'elenco dei gruppi di risorse.
  2. Individuare il gruppo di risorse da eliminare e quindi fare clic con il pulsante destro del mouse su Altro (...) a destra dell'elenco.
  3. Scegliere Elimina gruppo di risorse e quindi confermare.

Passaggi successivi

In questo documento si è appreso come usare l'API Apache Kafka Streams con Kafka in HDInsight. Per altre informazioni sull'uso di Kafka, vedere: