Tutorial: Utilizar as APIs de Produtor e de Consumidor de Apache Kafka

Saiba como utilizar as APIs de Produtor e de Consumidor de Apache Kafka com o Kafka no HDInsight.

A API de Produtor de Kafka permite que as aplicações enviem fluxos de dados para o cluster de Kafka. A API de Consumidor de Kafka permite que as aplicações leiam fluxos de dados a partir do cluster.

Neste tutorial, ficará a saber como:

  • Pré-requisitos
  • Compreender o código
  • Criar e implementar a aplicação
  • Executar a aplicação no cluster

Para obter mais informações sobre as APIs, veja a documentação do Apache dedicada à API de Produtor e à API de Consumidor.

Pré-requisitos

Compreender o código

A aplicação de exemplo está localizada em https://github.com/Azure-Samples/hdinsight-kafka-java-get-started, no subdiretório Producer-Consumer. Se estiver a utilizar um cluster do Kafka com o Pacote de Segurança Enterprise (ESP), deve utilizar a versão da aplicação localizada no DomainJoined-Producer-Consumer subdiretório.

Essencialmente, a aplicação é composta por quatro ficheiros:

  • pom.xml: este ficheiro define as dependências do projeto, a versão de Java e os métodos de empacotamento.
  • Producer.java: este ficheiro envia frases aleatórias para o Kafka através da API de produtor.
  • Consumer.java: este ficheiro utiliza a API de consumidor para ler os dados a partir de Kafka e emiti-los para STDOUT.
  • AdminClientWrapper.java: este ficheiro utiliza a API de administração para criar, descrever e eliminar tópicos do Kafka.
  • Run.java: a interface de linha de comandos utilizada para executar o código do produtor e do consumidor.

Pom.xml

Seguem-se os aspetos importantes a compreender em relação ao ficheiro pom.xml:

  • Dependências: este projeto depende das APIs de produtor e de consumidor de Kafka, fornecidas pelo pacote kafka-clients. Esta dependência é definida pelo seguinte código XML:

    <!-- Kafka client for producer/consumer operations -->
    <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
    </dependency>
    

    A entrada ${kafka.version} é declarada na secção <properties>..</properties> de pom.xml e está configurada para a versão de Kafka do cluster do HDInsight.

  • Plug-ins: os plug-ins de Maven proporcionam diversas funcionalidades. Neste projeto, são utilizados os seguintes plug-ins:

    • maven-compiler-plugin: utilizado para definir a versão de Java utilizada pelo projeto para a versão 8. Esta é a versão de Java utilizada pelo HDInsight 3.6.
    • maven-shade-plugin: utilizado para gerar um JAR com dependências, que contém precisamente não só esta aplicação como todas as dependências. Também é utilizado para definir o ponto de entrada da aplicação, para que possa executar diretamente o ficheiro JAR sem ter de especificar a classe principal.

Producer.Java

O produtor comunica com os anfitriões de mediador (nós de trabalho) de Kafka e envia os dados para um tópico do Kafka. O fragmento de código seguinte é do ficheiro Producer.java do repositório do GitHub e mostra como definir as propriedades do produtor. Para clusters Com Segurança Empresarial Ativada, tem de ser adicionada uma propriedade adicional "properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");"

Properties properties = new Properties();
// Set the brokers (bootstrap servers)
properties.setProperty("bootstrap.servers", brokers);
// Set how to serialize key/value pairs
properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

Consumer.Java

O consumidor comunica com os anfitriões de mediador (nós de trabalho) de Kafka e lê os registos de forma cíclica. O fragmento de código seguinte do ficheiro Consumer.java define as propriedades do consumidor. Para clusters Com Segurança Empresarial Ativada, tem de ser adicionada uma propriedade adicional "properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");"

KafkaConsumer<String, String> consumer;
// Configure the consumer
Properties properties = new Properties();
// Point it to the brokers
properties.setProperty("bootstrap.servers", brokers);
// Set the consumer group (all consumers must belong to a group).
properties.setProperty("group.id", groupId);
// Set how to serialize key/value pairs
properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
// When a group is first created, it has no offset stored to start reading from. This tells it to start
// with the earliest record in the stream.
properties.setProperty("auto.offset.reset","earliest");

consumer = new KafkaConsumer<>(properties);

Neste código, o consumidor está configurado para ler a partir do início do tópico (auto.offset.reset está definido como earliest).

Run.Java

O ficheiro Run.java fornece uma interface de linha de comandos que executa o código de produtor ou de consumidor. Tem de fornecer as informações do anfitrião de mediador de Kafka como um parâmetro. Opcionalmente, pode incluir um valor de ID de grupo, que é utilizado pelo processo de consumidor. Se criar várias instâncias de consumidor com o mesmo ID de grupo, estas irão fazer o balanceamento de carga da leitura do tópico.

Criar e implementar o exemplo

Utilizar ficheiros JAR pré-criados

Transfira os jars a partir do exemplo de Introdução ao Azure do Kafka. Se o cluster tiver o Pacote de Segurança Enterprise (ESP) ativado, utilize kafka-producer-consumer-esp.jar. Utilize o comando abaixo para copiar os jars para o cluster.

scp kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar

Criar os ficheiros JAR a partir do código

Se quiser ignorar este passo, os jars pré-criados podem ser transferidos a Prebuilt-Jars partir do subdiretório. Transfira kafka-producer-consumer.jar. Se o cluster tiver o Pacote de Segurança Enterprise (ESP) ativado, utilize kafka-producer-consumer-esp.jar. Execute o passo 3 para copiar o jar para o cluster do HDInsight.

  1. Transfira e extraia os exemplos de https://github.com/Azure-Samples/hdinsight-kafka-java-get-started.

  2. Defina o diretório atual para a localização do hdinsight-kafka-java-get-started\Producer-Consumer diretório. Se estiver a utilizar o cluster do Kafka ativado pelo Pacote de Segurança Enterprise (ESP), deve definir a localização como DomainJoined-Producer-Consumersubdiretório. Utilize o seguinte comando para criar a aplicação:

    mvn clean package
    

    Este comando cria um diretório com o nome target, que contém um ficheiro com o nome kafka-producer-consumer-1.0-SNAPSHOT.jar. Para clusters ESP, o ficheiro será kafka-producer-consumer-esp-1.0-SNAPSHOT.jar

  3. Substitua sshuser pelo utilizador SSH do seu cluster e CLUSTERNAME pelo nome do seu cluster. Introduza o seguinte comando para copiar o ficheiro para o kafka-producer-consumer-1.0-SNAPSHOT.jar cluster do HDInsight. Quando lhe for pedido, introduza a palavra-passe do utilizador SSH.

    scp ./target/kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
    

Executar o exemplo

  1. Substitua sshuser pelo utilizador SSH do seu cluster e CLUSTERNAME pelo nome do seu cluster. Abra uma ligação SSH ao cluster ao introduzir o seguinte comando. Se tal lhe for pedido, introduza a palavra-passe da conta de utilizador SSH.

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  2. Para obter os anfitriões do mediador kafka, substitua os valores por <clustername> e <password> no seguinte comando e execute-os. Utilize a mesma caixa para <clustername> conforme mostrado na portal do Azure. Substitua <password> pela palavra-passe de início de sessão do cluster e, em seguida, execute:

    sudo apt -y install jq
    export CLUSTER_NAME='<clustername>'
    export PASSWORD='<password>'
    export KAFKABROKERS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
    

    Nota

    Este comando requer acesso ao Ambari. Se o cluster estiver protegido por um NSG, execute este comando a partir de um computador que possa aceder ao Ambari.

  3. Crie o tópico do Kafka, myTest, ao introduzir o seguinte comando:

    java -jar kafka-producer-consumer.jar create myTest $KAFKABROKERS
    
  4. Para executar o produtor e escrever dados para o tópico, utilize o seguinte comando:

    java -jar kafka-producer-consumer.jar producer myTest $KAFKABROKERS
    
  5. Quando o produtor tiver terminado, utilize o seguinte comando para ler a partir do tópico:

    java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS
    scp ./target/kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
    

    A leitura dos registos, juntamente com uma contagem de registos, é apresentada.

  6. Utilize Ctrl + C para sair do consumidor.

Vários consumidores

Os consumidores de Kafka utilizam um grupo de consumidores quando leem os registos. Utilizar o mesmo grupo com vários consumidores resulta em leituras com balanceamento de carga de um tópico. Cada consumidor no grupo recebe uma parte dos registos.

A aplicação de consumidor aceita um parâmetro que é utilizado como o ID de grupo. Por exemplo, o seguinte comando inicia um consumidor através de um ID de grupo de myGroup:

java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup

Utilize Ctrl + C para sair do consumidor.

Para ver este processo em ação, utilize o seguinte comando:

tmux new-session 'java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup' \
\; split-window -h 'java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup' \
\; attach

Este comando utiliza tmux para dividir o terminal em duas colunas. É iniciado um consumidor em cada coluna, com o mesmo valor de ID de grupo. Após os consumidores concluírem a leitura, tenha em atenção que cada um deles lê apenas uma parte dos registos. Utilize Ctrl + C duas vezes para sair tmuxde .

O consumo pelos clientes dentro do mesmo grupo é processado pelas partições do tópico. Neste exemplo de tópico, o tópico test criado anteriormente, tem oito partições. Se iniciar oito consumidores, cada consumidor lê os registos de uma única partição do tópico.

Importante

Não podem existir mais instâncias de consumidor num grupo de consumidores do que partições. Neste exemplo, um grupo de consumidores pode incluir até oito consumidores, pois esse é o número de partições no tópico. Também pode ter vários grupos de consumidores, em que cada grupo não tem mais do que oito consumidores.

Os registos armazenados no Kafka são armazenados pela ordem em que são recebidos numa partição. Para obter uma entrega por ordem dos registos dentro de uma partição, crie um grupo de consumidores em que o número de instâncias de consumidor corresponde ao número de partições. Para obter uma entrega por ordem dos registos dentro do tópico, crie um grupo de consumidores com apenas uma instância de consumidor.

Problemas Comuns enfrentados

  1. Falha na criação do tópico Se o cluster tiver o Enterprise Security Pack ativado, utilize os ficheiros JAR pré-criados para produtor e consumidor. O jar esp pode ser criado a partir do código no DomainJoined-Producer-Consumer subdiretório. As propriedades de produtor e consumidor têm uma propriedade CommonClientConfigs.SECURITY_PROTOCOL_CONFIG adicional para clusters preparados para ESP.

  2. Falha nos clusters preparados para ESP: se as operações de produção e consumo falharem e estiver a utilizar um cluster preparado para ESP, verifique se o utilizador kafka está presente em todas as políticas do Ranger. Se não estiver presente, adicione-o a todas as políticas do Ranger.

Limpar os recursos

Para limpar os recursos criados por este tutorial, pode eliminar o grupo de recursos. Ao eliminar o grupo de recursos também elimina o cluster do HDInsight associado e quaisquer outros recursos associados ao grupo de recursos.

Para remover o grupo de recursos através do Portal do Azure:

  1. No Portal do Azure, expanda o menu no lado esquerdo para abrir o menu de serviços e, em seguida, escolha Grupos de Recursos, para apresentar a lista dos seus grupos de recursos.
  2. Encontre o grupo de recursos a eliminar e, em seguida, clique com o botão direito do rato em Mais (...) no lado direito da lista.
  3. Selecione Eliminar grupo de recursos e, em seguida, confirme.

Passos seguintes

Neste documento, aprendeu a utilizar a API de Produtor e Consumidor do Apache Kafka com o Kafka no HDInsight. Utilize o seguinte para obter mais informações sobre como trabalhar com o Kafka: