Uw Apache Spark-toepassing verbinden met Azure Event Hubs

In deze zelfstudie leert u hoe u uw Spark-toepassing verbindt met Event Hubs voor realtime streaming. Deze integratie maakt streaming mogelijk zonder dat u uw protocolclients hoeft te wijzigen of uw eigen Kafka- of Zookeeper-clusters hoeft uit te voeren. Voor deze zelfstudie zijn Apache Spark v2.4+ en Apache Kafka v2.0+ vereist.

Notitie

Dit voorbeeld is beschikbaar op GitHub

In deze zelfstudie leert u het volgende:

  • Een Event Hubs-naamruimte maken
  • Het voorbeeldproject klonen
  • Spark uitvoeren
  • Lezen van Event Hubs voor Kafka
  • Schrijven naar Event Hubs voor Kafka

Vereisten

Zorg ervoor dat u het volgende hebt voordat u aan deze zelfstudie begint:

Notitie

De Spark-Kafka-adapter is vanaf Spark v2.4 bijgewerkt ter ondersteuning van Kafka v2.0. In vorige release van Spark werden Kafka v0.10 en nieuwere versies door de adapter ondersteund, maar werd specifiek vertrouwd op Kafka v0.10-API's. Omdat Event Hubs voor Kafka geen ondersteuning biedt voor Kafka v0.10, worden de Spark-Kafka-adapters van Spark-versies ouder dan v2.4 niet ondersteund door Event Hubs voor Kafka Ecosystems.

Een Event Hubs-naamruimte maken

Er is een Event Hubs-naamruimte vereist om gegevens te verzenden naar en te ontvangen van Event Hubs-services. Zie Een Event Hub maken voor instructies voor het maken van een naamruimte en een Event Hub. Haal de Event Hubs-verbindingsreeks en de Fully Qualified Domain Name (FQDN) op voor later gebruik. Zie Get an Event Hubs connection string. (Een Event Hubs-verbindingsreeks ophalen).

Het voorbeeldproject klonen

Kloon de Azure Event Hubs-opslagplaats en ga naar submap tutorials/spark:

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/spark

Lezen van Event Hubs voor Kafka

Na enkele configuratiewijzigingen kunt u lezen van Event Hubs voor Kafka. Werk BOOTSTRAP_SERVERS en EH_SASL bij met details van uw naamruimte. Vervolgens kunt u met Event Hubs met streamen beginnen, net zoals u dat met Kafka zou doen. Zie het bestand sparkConsumer.scala op GitHub voor de volledige voorbeeldcode.

//Read from your Event Hub!
val df = spark.readStream
    .format("kafka")
    .option("subscribe", TOPIC)
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.jaas.config", EH_SASL)
    .option("kafka.request.timeout.ms", "60000")
    .option("kafka.session.timeout.ms", "30000")
    .option("kafka.group.id", GROUP_ID)
    .option("failOnDataLoss", "true")
    .load()

//Use dataframe like normal (in this example, write to console)
val df_write = df.writeStream
    .outputMode("append")
    .format("console")
    .start()

Als u een fout ontvangt die lijkt op de volgende fout, voegt u toe .option("spark.streaming.kafka.allowNonConsecutiveOffsets", "true") aan de aanroep en probeert u het spark.readStream opnieuw.

IllegalArgumentException: requirement failed: Got wrong record for <spark job name> even after seeking to offset 4216 got offset 4217 instead. If this is a compacted topic, consider enabling spark.streaming.kafka.allowNonConsecutiveOffsets 

Schrijven naar Event Hubs voor Kafka

U kunt op dezelfde manier naar Event Hubs schrijven als u dat naar Kafka zou doen. Vergeet niet uw configuratie bij te werken door BOOTSTRAP_SERVERS en EH_SASL te wijzigen met gegevens uit uw Event Hubs-naamruimte. Zie het bestand sparkProducer.scala op GitHub voor de volledige voorbeeldcode.

df = /**Dataframe**/

//Write to your Event Hub!
df.writeStream
    .format("kafka")
    .option("topic", TOPIC)
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.jaas.config", EH_SASL)
    .option("checkpointLocation", "./checkpoint")
    .start()

Volgende stappen

Zie de volgende artikelen voor meer informatie over Event Hubs en Event Hubs voor Kafka: