Compartir a través de


Conexión de una aplicación de Apache Spark con Azure Event Hubs

Este tutorial le guía a través de la conexión de la aplicación de Spark a Event Hubs para el streaming en tiempo real. Esta integración permite el streaming sin tener que cambiar sus clientes de protocolo o ejecutar sus propios clústeres de Kafka o Zookeeper. Este tutorial necesita Apache Spark v2.4 o posterior y Apache Kafka v2.0 o posterior.

Nota

Este ejemplo está disponible en GitHub.

En este tutorial, aprenderá a:

  • Creación de un espacio de nombres de Event Hubs
  • Clonación del proyecto de ejemplo
  • Ejecución de Spark
  • Lectura de Event Hubs para Kafka
  • Escritura de Event Hubs para Kafka

Prerrequisitos

Antes de comenzar este tutorial, asegúrese de que dispone de lo siguiente:

Nota

El adaptador Spark-Kafka se actualizó para ser compatible con Kafka v2.0 a partir de Spark v2.4. En versiones anteriores de Spark, el adaptador admitía Kafka v0.10 y versiones posteriores, pero confiaba específicamente en las API de Kafka v0.10. Como Event Hubs para Kafka no admite Kafka v0.10, los adaptadores Spark-Kafka de versiones de Spark anteriores a la v2.4 no se admiten en Event Hubs para ecosistemas de Kafka.

Creación de un espacio de nombres de Event Hubs

Se requiere un espacio de nombres de Event Hubs para enviar y recibir de cualquier servicio de Event Hubs. Consulte Creación de un centro de eventos para obtener instrucciones sobre cómo crear un espacio de nombres y un centro de eventos. Obtenga la cadena de conexión de Event Hubs y el nombre de dominio completo (FQDN) para su uso posterior. Para obtener instrucciones, consulte Get an Event Hubs connection string (Obtención de una cadena de conexión de Event Hubs).

Clonación del proyecto de ejemplo

Clone el repositorio de Azure Event Hubs y vaya a la subcarpeta tutorials/spark:

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/spark

Lectura de Event Hubs para Kafka

Con unos pocos cambios de configuración, puede empezar a leer desde Event Hubs para Kafka. Actualice BOOTSTRAP_SERVERS y EH_SASL con los detalles de su espacio de nombres y puede empezar a transmitir con Event Hubs como lo haría con Kafka. Para ver el código de ejemplo completo, consulte el archivo sparkConsumer.scala en GitHub.

//Read from your Event Hub!
val df = spark.readStream
    .format("kafka")
    .option("subscribe", TOPIC)
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.jaas.config", EH_SASL)
    .option("kafka.request.timeout.ms", "60000")
    .option("kafka.session.timeout.ms", "30000")
    .option("kafka.group.id", GROUP_ID)
    .option("failOnDataLoss", "true")
    .load()

//Use dataframe like normal (in this example, write to console)
val df_write = df.writeStream
    .outputMode("append")
    .format("console")
    .start()

Si recibe un error similar al siguiente, agregue .option("spark.streaming.kafka.allowNonConsecutiveOffsets", "true") a la llamada spark.readStream e inténtelo de nuevo.

IllegalArgumentException: requirement failed: Got wrong record for <spark job name> even after seeking to offset 4216 got offset 4217 instead. If this is a compacted topic, consider enabling spark.streaming.kafka.allowNonConsecutiveOffsets 

Escritura de Event Hubs para Kafka

También puede escribir en Event Hubs de la misma manera que escribe en Kafka. No olvide actualizar su configuración para cambiar BOOTSTRAP_SERVERS y EH_SASL con información de su espacio de nombres de Event Hubs. Para ver el código de ejemplo completo, consulte el archivo sparkProducer.scala en GitHub.

df = /**Dataframe**/

//Write to your Event Hub!
df.writeStream
    .format("kafka")
    .option("topic", TOPIC)
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.jaas.config", EH_SASL)
    .option("checkpointLocation", "./checkpoint")
    .start()

Pasos siguientes

Para obtener más información acerca de Event Hubs y Event Hubs para Kafka, consulte los artículos siguientes: