連線 Apache Spark 應用程式與 Azure 事件中樞

本教學課程會引導您將 Spark 應用程式連線至事件中樞,以進行即時串流。 這項整合可讓您進行串流,而無須變更通訊協定用戶端或執行您自己的 Kafka 或 Zookeeper 叢集。 本教學課程需要 Apache Spark v2.4+ 和 Apache Kafka v2.0+。

注意

您可在 GitHub 上取得此範例

在本教學課程中,您會了解如何:

  • 建立事件中樞命名空間
  • 複製範例專案
  • 執行 Spark
  • 讀取適用於 Kafka 的事件中樞
  • 寫入適用於 Kafka 的事件中樞

必要條件

開始本教學課程之前,請確定您具有:

注意

自 Spark v2.4 起,Spark-Kafka 配接器已更新為支援 Kafka v2.0。 在舊版的 Spark 中,配接器可支援 Kafka v0.10 和更新版本,但須依賴 Kafka v0.10 API。 由於適用於 Kafka 的事件中樞不支援 Kafka v0.10,v2.4 之前的 Spark 版本隨附的 Spark-Kafka 配接器將不受「適用於 Kafka 的事件中樞」生態系統支援。

建立事件中樞命名空間

您需要事件中樞命名空間,才能從任何事件中樞服務傳送和接收。 請參閱建立事件中樞,以取得建立命名空間和事件中樞的指示。 請取得事件中樞連接字串和完整網域名稱 (FQDN) 以供稍後使用。 如需相關指示,請參閱取得事件中樞連接字串

複製範例專案

請複製 Azure 事件中樞存放庫,並瀏覽至 tutorials/spark 子資料夾:

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/spark

讀取適用於 Kafka 的事件中樞

透過一些組態變更,您即可開始讀取適用於 Kafka 的事件中樞。 使用您的命名空間詳細資料更新 BOOTSTRAP_SERVERSEH_SASL,您即可開始使用事件中樞進行串流處理,如同使用 Kafka 一般。 如需完整的範例程式碼,請參閱 GitHub 上的 sparkConsumer.scala 檔案。

//Read from your Event Hub!
val df = spark.readStream
    .format("kafka")
    .option("subscribe", TOPIC)
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.jaas.config", EH_SASL)
    .option("kafka.request.timeout.ms", "60000")
    .option("kafka.session.timeout.ms", "30000")
    .option("kafka.group.id", GROUP_ID)
    .option("failOnDataLoss", "true")
    .load()

//Use dataframe like normal (in this example, write to console)
val df_write = df.writeStream
    .outputMode("append")
    .format("console")
    .start()

如果您收到類似下列錯誤的錯誤,請將 .option("spark.streaming.kafka.allowNonConsecutiveOffsets", "true") 新增至 spark.readStream 呼叫,然後再試一次。

IllegalArgumentException: requirement failed: Got wrong record for <spark job name> even after seeking to offset 4216 got offset 4217 instead. If this is a compacted topic, consider enabling spark.streaming.kafka.allowNonConsecutiveOffsets 

寫入適用於 Kafka 的事件中樞

您也可以用寫入 Kafka 的相同方式來寫入事件中樞。 別忘了使用您事件中樞命名空間中的資訊來更新組態,以變更 BOOTSTRAP_SERVERSEH_SASL。 如需完整的範例程式碼,請參閱 GitHub 上的 sparkProducer.scala 檔案。

df = /**Dataframe**/

//Write to your Event Hub!
df.writeStream
    .format("kafka")
    .option("topic", TOPIC)
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.jaas.config", EH_SASL)
    .option("checkpointLocation", "./checkpoint")
    .start()

下一步

若要深入了解事件中樞和適用於 Kafka 的事件中樞,請參閱下列文章: