Sdílet prostřednictvím


Připojení aplikace Apache Spark pomocí Azure Event Hubs

Tento kurz vás provede připojením aplikace Spark ke službě Event Hubs pro streamování v reálném čase. Tato integrace umožňuje streamování bez nutnosti měnit klienty protokolu nebo spouštět vlastní clustery Kafka nebo Zookeeper. Tento kurz vyžaduje Apache Spark verze 2.4 nebo novější a Apache Kafka verze 2.0 nebo novější.

Poznámka

Tato ukázka je dostupná na GitHubu.

V tomto kurzu se naučíte:

  • Vytvoření oboru názvů služby Event Hubs
  • Naklonování ukázkového projektu
  • Spuštění Sparku
  • Čtení ze služby Event Hubs pro ekosystém Kafka
  • Zápis do služby Event Hubs pro ekosystém Kafka

Požadavky

Než začnete s tímto kurzem, ujistěte se, že máte následující:

Poznámka

Adaptér Spark-Kafka byl aktualizován tak, aby od Sparku v2.4 podporoval Kafka v2.0. V předchozích verzích Sparku adaptér podporoval Kafka v0.10 a novější, ale spoléhal se konkrétně na rozhraní API Kafka v0.10. Vzhledem k tomu, že Event Hubs pro ekosystém Kafka nepodporuje Kafka v0.10, nepodporuje ani adaptéry Spark-Kafka ze starších verzí Sparku než v2.4.

Vytvoření oboru názvů služby Event Hubs

K odesílání do jakékoli služby Event Hubs a příjmu z ní se vyžaduje obor názvů služby Event Hubs. Pokyny k vytvoření oboru názvů a centra událostí najdete v tématu Vytvoření centra událostí. Získejte plně kvalifikovaný název domény a připojovací řetězec služby Event Hubs pro pozdější použití. Pokyny najdete v tématu Získání připojovacího řetězce služby Event Hubs.

Naklonování ukázkového projektu

Naklonujte úložiště Azure Event Hubs a přejděte do podsložky tutorials/spark:

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/spark

Čtení ze služby Event Hubs pro ekosystém Kafka

Stačí několik změn konfigurace a můžete začít číst ze služby Event Hubs pro ekosystém Kafka. Aktualizujte hodnoty BOOTSTRAP_SERVERS a EH_SASL s použitím podrobností z vašeho oboru názvů a můžete začít streamovat se službou Event Hubs stejně jako s Kafka. Kompletní vzorový kód najdete v souboru sparkConsumer.scala na GitHubu.

//Read from your Event Hub!
val df = spark.readStream
    .format("kafka")
    .option("subscribe", TOPIC)
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.jaas.config", EH_SASL)
    .option("kafka.request.timeout.ms", "60000")
    .option("kafka.session.timeout.ms", "30000")
    .option("kafka.group.id", GROUP_ID)
    .option("failOnDataLoss", "true")
    .load()

//Use dataframe like normal (in this example, write to console)
val df_write = df.writeStream
    .outputMode("append")
    .format("console")
    .start()

Pokud se zobrazí chyba podobná následující chybě, přidejte .option("spark.streaming.kafka.allowNonConsecutiveOffsets", "true") do spark.readStream volání a zkuste to znovu.

IllegalArgumentException: requirement failed: Got wrong record for <spark job name> even after seeking to offset 4216 got offset 4217 instead. If this is a compacted topic, consider enabling spark.streaming.kafka.allowNonConsecutiveOffsets 

Zápis do služby Event Hubs pro ekosystém Kafka

Do služby Event Hubs můžete také zapisovat stejným způsobem jako do systému Kafka. Nezapomeňte aktualizovat konfiguraci a změnit hodnoty BOOTSTRAP_SERVERS a EH_SASL s použitím informací z vašeho oboru názvů služby Event Hubs. Kompletní vzorový kód najdete v souboru sparkProducer.scala na GitHubu.

df = /**Dataframe**/

//Write to your Event Hub!
df.writeStream
    .format("kafka")
    .option("topic", TOPIC)
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.jaas.config", EH_SASL)
    .option("checkpointLocation", "./checkpoint")
    .start()

Další kroky

Další informace o službě Event Hubs a Event Hubs pro Kafka najdete v následujících článcích: