Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
Tento kurz vás provede propojením aplikace Spark se službou Event Hubs pro streamování v reálném čase. Tato integrace umožňuje streamování bez nutnosti měnit klienty protokolů nebo provozovat vlastní clustery Kafka nebo Zookeeper. Tento kurz vyžaduje Apache Spark verze 2.4+ a Apache Kafka verze 2.0+.
Poznámka:
Tato ukázka je k dispozici na GitHubu.
V tomto kurzu se naučíte:
- Vytvořte obor názvů služby Event Hubs
- Naklonování ukázkového projektu
- Spuštění Sparku
- Čtení ze služby Event Hubs pro Kafka
- Pište do služby Event Hubs pro Kafka
Požadavky
Než začnete s tímto kurzem, ujistěte se, že máte:
- Předplatné Azure. Pokud žádné nemáte, vytvořte si bezplatný účet.
- Apache Spark v2.4
- Apache Kafka v2.0
- Git
Poznámka:
Adaptér Spark-Kafka byl aktualizován tak, aby podporoval Kafka verze 2.0 od Sparku verze 2.4. V předchozí verzi Sparku adaptér podporoval Kafka verze 0.10 a novější, ale byl konkrétně založen na rozhraních API Kafka v0.10. Protože Služba Event Hubs pro Kafka nepodporuje kafka v0.10, služba Event Hubs pro ekosystémy Kafka nepodporuje adaptéry Spark-Kafka z verzí Sparku před v2.4.
Vytvořte obor názvů služby Event Hubs
Pro odesílání a přijímání z jakékoli služby Event Hubs je vyžadován obor názvů služby Event Hubs. Pokyny pro vytvoření oboru názvů a centra událostí najdete v části Vytvoření centra událostí. Získejte připojovací řetězec pro Event Hubs a plně kvalifikovaný název domény (FQDN) pro pozdější použití. Pokyny najdete v tématu Získání připojovacího řetězce služby Event Hubs.
Naklonování ukázkového projektu
Naklonujte úložiště Azure Event Hubs a přejděte do tutorials/spark podsložky:
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/spark
Čtení ze služby Event Hubs pro Kafka
S několika změnami konfigurace můžete začít číst ze služby Event Hubs pro Kafka. Aktualizujte BOOTSTRAP_SERVERS a EH_SASL s podrobnostmi z oboru názvů a můžete začít streamovat se službou Event Hubs stejně jako se systémem Kafka. Úplný ukázkový kód najdete v souboru sparkConsumer.scala na GitHubu.
//Read from your Event Hub!
val df = spark.readStream
.format("kafka")
.option("subscribe", TOPIC)
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", EH_SASL)
.option("kafka.request.timeout.ms", "60000")
.option("kafka.session.timeout.ms", "30000")
.option("kafka.group.id", GROUP_ID)
.option("failOnDataLoss", "true")
.load()
//Use dataframe like normal (in this example, write to console)
val df_write = df.writeStream
.outputMode("append")
.format("console")
.start()
Pokud se zobrazí chyba podobná následující chybě, přidejte .option("spark.streaming.kafka.allowNonConsecutiveOffsets", "true") ke volání spark.readStream a zkuste to znovu.
IllegalArgumentException: requirement failed: Got wrong record for <spark job name> even after seeking to offset 4216 got offset 4217 instead. If this is a compacted topic, consider enabling spark.streaming.kafka.allowNonConsecutiveOffsets
Pište do služby Event Hubs pro Kafka
Do služby Event Hubs můžete také zapisovat stejným způsobem, jakým píšete do Kafka. Nezapomeňte aktualizovat konfiguraci, abyste změnili BOOTSTRAP_SERVERS a EH_SASL na informace z vaší oborové oblasti názvů Event Hubs. Úplný ukázkový kód najdete v souboru sparkProducer.scala na GitHubu.
df = /**Dataframe**/
//Write to your Event Hub!
df.writeStream
.format("kafka")
.option("topic", TOPIC)
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", EH_SASL)
.option("checkpointLocation", "./checkpoint")
.start()
Další kroky
Další informace o službě Event Hubs a Event Hubs pro Kafka najdete v následujících článcích: