連線 Apache Spark 應用程式與 Azure 事件中樞
本教學課程會引導您將 Spark 應用程式連線至事件中樞,以進行即時串流。 這項整合可讓您進行串流,而無須變更通訊協定用戶端或執行您自己的 Kafka 或 Zookeeper 叢集。 本教學課程需要 Apache Spark v2.4+ 和 Apache Kafka v2.0+。
注意
您可在 GitHub 上取得此範例
在本教學課程中,您會了解如何:
- 建立事件中樞命名空間
- 複製範例專案
- 執行 Spark
- 讀取適用於 Kafka 的事件中樞
- 寫入適用於 Kafka 的事件中樞
必要條件
開始本教學課程之前,請確定您具有:
- Azure 訂閱。 如果您沒有訂用帳戶,請建立免費帳戶。
- Apache Spark v2.4
- Apache Kafka v2.0
- Git
注意
自 Spark v2.4 起,Spark-Kafka 配接器已更新為支援 Kafka v2.0。 在舊版的 Spark 中,配接器可支援 Kafka v0.10 和更新版本,但須依賴 Kafka v0.10 API。 由於適用於 Kafka 的事件中樞不支援 Kafka v0.10,v2.4 之前的 Spark 版本隨附的 Spark-Kafka 配接器將不受「適用於 Kafka 的事件中樞」生態系統支援。
建立事件中樞命名空間
您需要事件中樞命名空間,才能從任何事件中樞服務傳送和接收。 請參閱建立事件中樞,以取得建立命名空間和事件中樞的指示。 請取得事件中樞連接字串和完整網域名稱 (FQDN) 以供稍後使用。 如需相關指示,請參閱取得事件中樞連接字串。
複製範例專案
請複製 Azure 事件中樞存放庫,並瀏覽至 tutorials/spark
子資料夾:
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/spark
讀取適用於 Kafka 的事件中樞
透過一些組態變更,您即可開始讀取適用於 Kafka 的事件中樞。 使用您的命名空間詳細資料更新 BOOTSTRAP_SERVERS 和 EH_SASL,您即可開始使用事件中樞進行串流處理,如同使用 Kafka 一般。 如需完整的範例程式碼,請參閱 GitHub 上的 sparkConsumer.scala 檔案。
//Read from your Event Hub!
val df = spark.readStream
.format("kafka")
.option("subscribe", TOPIC)
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", EH_SASL)
.option("kafka.request.timeout.ms", "60000")
.option("kafka.session.timeout.ms", "30000")
.option("kafka.group.id", GROUP_ID)
.option("failOnDataLoss", "true")
.load()
//Use dataframe like normal (in this example, write to console)
val df_write = df.writeStream
.outputMode("append")
.format("console")
.start()
如果您收到類似下列錯誤的錯誤,請將 .option("spark.streaming.kafka.allowNonConsecutiveOffsets", "true")
新增至 spark.readStream
呼叫,然後再試一次。
IllegalArgumentException: requirement failed: Got wrong record for <spark job name> even after seeking to offset 4216 got offset 4217 instead. If this is a compacted topic, consider enabling spark.streaming.kafka.allowNonConsecutiveOffsets
寫入適用於 Kafka 的事件中樞
您也可以用寫入 Kafka 的相同方式來寫入事件中樞。 別忘了使用您事件中樞命名空間中的資訊來更新組態,以變更 BOOTSTRAP_SERVERS 和 EH_SASL。 如需完整的範例程式碼,請參閱 GitHub 上的 sparkProducer.scala 檔案。
df = /**Dataframe**/
//Write to your Event Hub!
df.writeStream
.format("kafka")
.option("topic", TOPIC)
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", EH_SASL)
.option("checkpointLocation", "./checkpoint")
.start()
下一步
若要深入了解事件中樞和適用於 Kafka 的事件中樞,請參閱下列文章: