Tutorial: Uso de Producer API y Consumer API de Apache Kafka
Aprenda a usar Producer y Consumer API de Apache Kafka con Kafka en HDInsight.
Producer API de Kafka permite a las aplicaciones enviar flujos de datos al clúster de Kafka. Consumer API de Kafka permite a las aplicaciones leer flujos de datos del clúster.
En este tutorial, aprenderá a:
- Prerrequisitos
- Comprendiendo el código
- Compilar e implementar la aplicación
- Ejecutar la aplicación en el clúster
Para más información acerca de las API, consulte la documentación de Apache relativa a Producer API y a Consumer API.
- Apache Kafka en un clúster de HDInsight. Para obtener información sobre cómo crear el clúster, consulte Inicio con Apache Kafka en HDInsight.
- Java Developer Kit (JDK) versión 8, o un equivalente, como OpenJDK.
- Apache Maven correctamente instalado según Apache. Maven es un sistema de compilación de proyectos de Java.
- Un cliente SSH como Putty. Para más información, consulte Conexión a través de SSH con HDInsight (Apache Hadoop).
La aplicación de ejemplo se encuentra en https://github.com/Azure-Samples/hdinsight-kafka-java-get-started, en el subdirectorio Producer-Consumer
. Si usa un clúster de Kafka habilitado para Enterprise Security Package (ESP) , debería usar la versión de la aplicación ubicada en el subdirectorio DomainJoined-Producer-Consumer
.
Básicamente, la aplicación consta de cuatro archivos:
-
pom.xml
: este archivo define las dependencias del proyecto, la versión de Java y los métodos de empaquetado. -
Producer.java
: este archivo envía oraciones aleatorias a Kafka mediante Producer API. -
Consumer.java
: este archivo usa Consumer API para leer datos de Kafka y emitirlos en STDOUT. -
AdminClientWrapper.java
: Este archivo usa la API de administración para crear, describir y eliminar temas de Kafka. -
Run.java
: la interfaz de línea de comandos que se usa para ejecutar el código de Producer API y Consumer API.
Esto es lo más importante que hay que saber del archivo pom.xml
:
Dependencias: este proyecto utiliza Producer API y Consumer API de Kafka, ambas proporcionadas por el paquete
kafka-clients
. El siguiente código XML define esta dependencia:<!-- Kafka client for producer/consumer operations --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency>
La entrada
${kafka.version}
se declara en la sección<properties>..</properties>
depom.xml
y está configurada para la versión Kafka del clúster de HDInsight.Complementos: los complementos de Maven proporcionan varias funcionalidades. En este proyecto, se utilizan los siguientes complementos:
-
maven-compiler-plugin
: se utiliza para establecer que el proyecto utiliza la versión 8 de Java. Esta es la versión de Java que utiliza HDInsight 3.6. -
maven-shade-plugin
: se utiliza para generar un archivo uber-jar que contiene esta aplicación, así como todas las dependencias. También se usa para establecer el punto de entrada de la aplicación, con el fin de que pueda ejecutar directamente el archivo Jar sin tener que especificar la clase principal.
-
El productor se comunica con los hosts del agente de Kafka (nodos de trabajo) y envía datos a un tema de Kafka. El siguiente fragmento de código es del archivo Producer.java del repositorio de GitHub y muestra cómo establecer las propiedades del productor. En el caso de los clústeres con seguridad empresarial habilitada, se debe agregar una propiedad adicional "properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");"
Properties properties = new Properties();
// Set the brokers (bootstrap servers)
properties.setProperty("bootstrap.servers", brokers);
// Set how to serialize key/value pairs
properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
El consumidor se comunica con los hosts del agente de Kafka (nodos de trabajo) y lee los registros en un bucle. El siguiente fragmento de código del archivo Consumer.java establece las propiedades del consumidor. En el caso de los clústeres con seguridad empresarial habilitada, se debe agregar una propiedad adicional "properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");"
KafkaConsumer<String, String> consumer;
// Configure the consumer
Properties properties = new Properties();
// Point it to the brokers
properties.setProperty("bootstrap.servers", brokers);
// Set the consumer group (all consumers must belong to a group).
properties.setProperty("group.id", groupId);
// Set how to serialize key/value pairs
properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
// When a group is first created, it has no offset stored to start reading from. This tells it to start
// with the earliest record in the stream.
properties.setProperty("auto.offset.reset","earliest");
consumer = new KafkaConsumer<>(properties);
En este código, el consumidor está configurado para leer desde el principio del tema (auto.offset.reset
está establecido en earliest
).
El archivo Run.java proporciona una interfaz de línea de comandos que ejecuta el código del productor o del consumidor. Debe proporcionar la información del host del agente de Kafka como parámetro. También puede incluir un valor del identificador de grupo. Dicho valor lo utiliza el proceso de consumidor. Si crea varias instancias de consumidor con el mismo identificador de grupo, equilibrarán la carga de la lectura del tema.
Descargue los archivos jar del ejemplo Get de Kafka de introducción de Azure. Si el clúster está habilitado para Enterprise Security Package (ESP) , use el archivo kafka-producer-consumer-esp.jar. Use el siguiente comando para copiar los archivos JAR al clúster.
scp kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
Si desea omitir este paso, puede descargar archivos jar creados previamente desde el subdirectorio Prebuilt-Jars
. Descargue el archivo kafka-producer-consumer.jar. Si el clúster está habilitado para Enterprise Security Package (ESP) , use el archivo kafka-producer-consumer-esp.jar. Ejecute el paso 3 para copiar el archivo jar en el clúster de HDInsight.
Descarga y extracción de los ejemplos de https://github.com/Azure-Samples/hdinsight-kafka-java-get-started.
Establezca el directorio actual en la ubicación del directorio
hdinsight-kafka-java-get-started\Producer-Consumer
. Si usa un clúster de Kafka habilitado para Enterprise Security Package (ESP) , debería establecer la ubicación en el subdirectorioDomainJoined-Producer-Consumer
. Utilice el siguiente comando para compilar la aplicación:mvn clean package
Este comando crea un directorio denominado
target
, que contiene un archivo denominadokafka-producer-consumer-1.0-SNAPSHOT.jar
. En el caso de los clústeres ESP, el archivo serákafka-producer-consumer-esp-1.0-SNAPSHOT.jar
Reemplace
sshuser
por el usuario de SSH del clúster yCLUSTERNAME
por el nombre de su clúster. Escriba el siguiente comando para copiar el archivokafka-producer-consumer-1.0-SNAPSHOT.jar
en el clúster de HDInsight. Cuando se le solicite, escriba la contraseña del usuario SSH.scp ./target/kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
Reemplace
sshuser
por el usuario de SSH del clúster yCLUSTERNAME
por el nombre de su clúster. Abra una conexión de SSH con el clúster, para lo que debe escribir el siguiente comando. Si se le solicita, escriba la contraseña de la cuenta de usuario de SSH.ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
Para obtener los hosts del agente de Kafka, sustituya los valores de
<clustername>
y<password>
en el comando siguiente y ejecútelo. Mantenga el mismo uso de mayúsculas y minúsculas para<clustername>
que se muestra en Azure Portal. Reemplace<password>
por la contraseña de inicio de sesión del clúster y, a continuación, ejecute:sudo apt -y install jq export CLUSTER_NAME='<clustername>' export PASSWORD='<password>' export KAFKABROKERS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
Nota
Este comando requiere acceso a Ambari. Si el clúster se encuentra detrás de un grupo de seguridad de red, ejecute este comando desde una máquina que pueda acceder a Ambari.
Cree un tema de Kafka,
myTest
, para lo que debe usar el siguiente comando:java -jar kafka-producer-consumer.jar create myTest $KAFKABROKERS
Para ejecutar el productor y escribir datos en el tema, utilice el siguiente comando:
java -jar kafka-producer-consumer.jar producer myTest $KAFKABROKERS
Cuando finalice el productor, use el comando siguiente para leer desde el tema:
java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS scp ./target/kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
Se muestran los registros leídos, junto con un número de registros.
Use Ctrl + C para salir del consumidor.
Los consumidores de Kafka usan un grupo de consumidores al leer los registros. Usar el mismo grupo con varios consumidores deriva en lecturas de un tema con equilibrio de carga. Cada consumidor del grupo recibe una parte de los registros.
La aplicación del consumidor acepta un parámetro que se usa como identificador del grupo. Por ejemplo, el comando siguiente inicia un consumidor mediante un identificador de grupo myGroup
:
java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup
Use Ctrl + C para salir del consumidor.
Para ver este proceso en acción, use el siguiente comando:
tmux new-session 'java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup' \
\; split-window -h 'java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup' \
\; attach
Este comando usa tmux
para dividir el terminal en dos columnas. Se inicia un consumidor en cada una de ellas, con el mismo valor de identificador de grupo. Una vez que los consumidores terminan de leer, fíjese en que cada uno de ellos lee solo una parte de los registros. Use Ctrl + C dos veces para salir de tmux
.
El consumo por clientes del mismo grupo se controla mediante las particiones del tema. En este ejemplo de código, el tema test
creado anteriormente tiene ocho particiones. Si inicia ocho consumidores, cada uno de ellos lee los registros de una única partición del tema.
Importante
No puede haber más instancias de consumidor en un grupo de consumidores que particiones. En este ejemplo, un grupo de consumidores puede contener hasta ocho, ya que es el número de particiones del tema. O bien, puede tener varios grupos de consumidores, los cuales no tengan más de ocho consumidores cada uno.
Los registros almacenados en Kafka se almacenan en el orden en el que se reciben dentro de una partición. Para lograr la entrega ordenada de registros dentro de una partición, cree un grupo de consumidores donde el número de instancias de consumidor coincida con el número de particiones. Para lograr la entrega ordenada de registros dentro del tema, cree un grupo de consumidores con solo una instancia de consumidor.
No se puede crear el tema Si el clúster está habilitado para Enterprise Security Pack, use los archivos JAR pregenerados para el productor y el consumidor. El archivo JAR de ESP se puede crear a partir del código del subdirectorio
DomainJoined-Producer-Consumer
. Las propiedades del productor y el consumidor tienen una propiedad adicionalCommonClientConfigs.SECURITY_PROTOCOL_CONFIG
para los clústeres habilitados para ESP.Error en los clústeres habilitados para ESP: si se produce un error en las operaciones de producción y consumo, y se usa un clúster habilitado para ESP, compruebe que el usuario
kafka
existe en todas las directivas de Ranger. Si no está presente, agréguelo a todas las directivas de Ranger.
Para limpiar los recursos creados por este tutorial puede eliminar el grupo de recursos. Al eliminar el grupo de recursos, también se elimina el clúster de HDInsight asociado y otros recursos asociados al grupo.
Para quitar el grupo de recursos mediante Azure Portal:
- En Azure Portal, expanda el menú en el lado izquierdo para abrir el menú de servicios y elija Grupos de recursos para mostrar la lista de sus grupos de recursos.
- Busque el grupo de recursos que desea eliminar y haga clic con el botón derecho en Más (...) en el lado derecho de la lista.
- Seleccione Eliminar grupo de recursos y confirme la elección.
En este documento, aprendió a usar Producer y Consumer API de Apache Kafka con Kafka en HDInsight. Para más información sobre cómo trabajar con Kafka: