Herstellen einer Verbindung zwischen Ihrer Apache Spark-Anwendung und Azure Event Hub

In diesem Tutorial wird ausführlich beschrieben, wie Sie zum Zweck des Echtzeitstreamings eine Verbindung zwischen Ihrer Spark-Anwendung und einer Event Hubs-Instanz herstellen. Diese Integration ermöglicht Streaming, ohne dass die Protokollclients geändert oder eigene Kafka- oder Zookeeper-Cluster ausgeführt werden müssen. Für dieses Tutorial sind mindestens Apache Spark v2.4 und Apache Kafka v2.0 erforderlich.

Hinweis

Dieses Beispiel ist auf GitHub verfügbar.

In diesem Tutorial lernen Sie Folgendes:

  • Erstellen eines Event Hubs-Namespace
  • Klonen des Beispielprojekts
  • Ausführen von Spark
  • Lesen aus Event Hubs für Kafka
  • Schreiben in Event Hubs für Kafka

Voraussetzungen

Stellen Sie vor dem Ausführen dieses Tutorials sicher, dass Sie über Folgendes verfügen:

Hinweis

Der Spark-Kafka-Adapter wurde aktualisiert, um Kafka v2.0 zu unterstützen (ab Spark v2.4). In früheren Spark-Versionen unterstützte der Adapter Kafka v0.10 und höhere Versionen, war jedoch speziell von Kafka v0.10-APIs abhängig. Da Kafka v0.10 von Event Hubs für Kafka nicht unterstützt wird, werden die Spark-Kafka-Adapter aus Spark-Versionen vor v2.4 von Event Hubs für Kafka-Ökosysteme nicht unterstützt.

Erstellen eines Event Hubs-Namespace

Ein Event Hubs-Namespace ist erforderlich, um Nachrichten an einen Event Hubs-Dienst zu senden und von diesem zu empfangen. Anweisungen zum Erstellen eines Namespace und eines Event Hub finden Sie unter Erstellen eines Event Hubs. Rufen Sie die Event Hubs-Verbindungszeichenfolge und den vollqualifizierten Domänennamen (Fully Qualified Domain Name, FQDN) zur späteren Verwendung ab. Anweisungen hierzu finden Sie unter Get an Event Hubs connection string (Abrufen einer Event Hubs-Verbindungszeichenfolge).

Klonen des Beispielprojekts

Klonen Sie das Azure Event Hubs-Repository, und navigieren Sie zum Unterordner tutorials/spark:

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/spark

Lesen aus Event Hubs für Kafka

Mit einigen wenigen Konfigurationsänderungen können Sie mit dem Lesen aus Event Hubs für Kafka beginnen. Aktualisieren Sie BOOTSTRAP_SERVERS und EH_SASL mit Details aus Ihrem Namespace, und Sie können wie mit Kafka mit Event Hubs streamen. Den vollständigen Beispielcode finden Sie auf GitHub in der Datei „sparkConsumer.scala“.

//Read from your Event Hub!
val df = spark.readStream
    .format("kafka")
    .option("subscribe", TOPIC)
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.jaas.config", EH_SASL)
    .option("kafka.request.timeout.ms", "60000")
    .option("kafka.session.timeout.ms", "30000")
    .option("kafka.group.id", GROUP_ID)
    .option("failOnDataLoss", "true")
    .load()

//Use dataframe like normal (in this example, write to console)
val df_write = df.writeStream
    .outputMode("append")
    .format("console")
    .start()

Wenn Sie eine Fehlermeldung ähnlich der folgenden erhalten, fügen Sie dem spark.readStream-Aufruf .option("spark.streaming.kafka.allowNonConsecutiveOffsets", "true") hinzu, und versuchen Sie es erneut.

IllegalArgumentException: requirement failed: Got wrong record for <spark job name> even after seeking to offset 4216 got offset 4217 instead. If this is a compacted topic, consider enabling spark.streaming.kafka.allowNonConsecutiveOffsets 

Schreiben in Event Hubs für Kafka

Sie können auch in Event Hubs schreiben. Die Vorgehensweise ist dieselbe wie bei Kafka. Vergessen Sie nicht, Ihre Konfiguration anzupassen und BOOTSTRAP_SERVERS und EH_SASL mit den Informationen aus Ihrem Event Hubs-Namespace zu aktualisieren. Den vollständigen Beispielcode finden Sie auf GitHub in der Datei „sparkProducer.scala“.

df = /**Dataframe**/

//Write to your Event Hub!
df.writeStream
    .format("kafka")
    .option("topic", TOPIC)
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.jaas.config", EH_SASL)
    .option("checkpointLocation", "./checkpoint")
    .start()

Nächste Schritte

Weitere Informationen zu Event Hubs und Event Hubs für Kafka finden Sie in folgenden Artikeln: