Az Apache Spark-alkalmazás csatlakoztatása Azure Event Hubs

Ez az oktatóanyag bemutatja, hogy a Spark-alkalmazást csatlakoztathatja az Event Hubshoz valós idejű streamelés céljából. Ez az integráció lehetővé teszi a streamelést anélkül, hogy módosítania kellene a protokollügyfeleket, vagy saját Kafka- vagy Zookeeper-fürtöket kellene futtatnia. Ehhez az oktatóanyaghoz az Apache Spark v2.4+ és az Apache Kafka v2.0+ szükséges.

Megjegyzés

Ez a minta elérhető a GitHubon.

Eben az oktatóanyagban az alábbiakkal fog megismerkedni:

  • Event Hubs-névtér létrehozása
  • A példaprojekt klónozása
  • A Spark futtatása
  • Olvasás a Kafkához készült Event Hubsból
  • Írás a Kafkához készült Event Hubsba

Előfeltételek

Mielőtt nekikezdene az oktatóanyagnak, győződjön meg arról, hogy rendelkezik a következőkkel:

Megjegyzés

A Spark-Kafka adapter frissítve lett, hogy a Spark 2.4-es verziójától kezdődően támogassa a Kafka 2.0-s verzióját. A Spark korábbi kiadásaiban az adapter támogatta ugyan a Kafka 0.10-es és újabb verzióit, de elsősorban a Kafka 0.10-es verziójának API-jaira támaszkodott. Mivel a Kafkához készült Event Hubs nem támogatja a Kafka 0.10-es verzióját, az ökoszisztémája nem támogatja a 2.4-es előtti Spark-verziók Spark-Kafka adaptereit.

Event Hubs-névtér létrehozása

Az Event Hubs-szolgáltatásokból való küldéshez és fogadáshoz szükség van egy Event Hubs-névtérre. A névtér és az eseményközpont létrehozásának útmutatásáért lásd: Eseményközpont létrehozása . Szerezze be az Event Hubs kapcsolati sztringjét és teljes tartománynevét (FQDN) későbbi használatra. Útmutatásért lásd az Event Hubs kapcsolati sztring lekérésével foglalkozó témakört.

A példaprojekt klónozása

Klónozza az Azure Event Hubs-adattárat, és keresse meg a tutorials/spark almappát:

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/spark

Olvasás a Kafkához készült Event Hubsból

Valamennyit módosítani kell a konfiguráció beállításain, hogy megkezdhesse a beolvasást a Kafkához készült Event Hubsból. Frissítse a BOOTSTRAP_SERVERS és az EH_SASL elemet a névtér részleteivel, és már meg is kezdheti a streamelést az Event Hubs segítségével, ugyanúgy, ahogy a Kafkával tenné. A teljes mintakódért tekintse meg a sparkConsumer.scala fájlt a GitHubon.

//Read from your Event Hub!
val df = spark.readStream
    .format("kafka")
    .option("subscribe", TOPIC)
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.jaas.config", EH_SASL)
    .option("kafka.request.timeout.ms", "60000")
    .option("kafka.session.timeout.ms", "30000")
    .option("kafka.group.id", GROUP_ID)
    .option("failOnDataLoss", "true")
    .load()

//Use dataframe like normal (in this example, write to console)
val df_write = df.writeStream
    .outputMode("append")
    .format("console")
    .start()

Ha a következőhöz hasonló hibaüzenetet kap, adja hozzá .option("spark.streaming.kafka.allowNonConsecutiveOffsets", "true") a spark.readStream hívást, és próbálkozzon újra.

IllegalArgumentException: requirement failed: Got wrong record for <spark job name> even after seeking to offset 4216 got offset 4217 instead. If this is a compacted topic, consider enabling spark.streaming.kafka.allowNonConsecutiveOffsets 

Írás a Kafkához készült Event Hubsba

Az Event Hubsba ugyanúgy írhat, mint a Kafkába. Ne felejtse el frissíteni a konfigurációt, és módosítani a BOOTSTRAP_SERVERS és az EH_SASL elemet az Event Hubs-névtér adataival. A teljes mintakódért tekintse meg a sparkProducer.scala fájlt a GitHubon.

df = /**Dataframe**/

//Write to your Event Hub!
df.writeStream
    .format("kafka")
    .option("topic", TOPIC)
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.jaas.config", EH_SASL)
    .option("checkpointLocation", "./checkpoint")
    .start()

Következő lépések

Az Event Hubsról és a Kafkához készült Event Hubsról az alábbi cikkekben talál további információt: