Nuta
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować się zalogować lub zmienić katalog.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Ten samouczek przeprowadzi Cię przez proces łączenia aplikacji Spark z usługą Event Hubs na potrzeby przesyłania strumieniowego w czasie rzeczywistym. Ta integracja umożliwia przesyłanie strumieniowe bez konieczności zmiany klientów protokołu czy uruchamiania własnych klastrów platformy Kafka lub Zookeeper. Ten samouczek wymaga oprogramowania Apache Spark w wersji 2.4+ i Apache Kafka w wersji 2.0+.
Uwaga
Ten przykład jest dostępny w witrynie GitHub
Z tego samouczka dowiesz się, jak wykonywać następujące działania:
- Tworzenie przestrzeni nazw usługi Event Hubs
- Klonowanie projektu przykładowego
- Uruchamianie platformy Spark
- Odczyt z usługi Event Hubs dla platformy Kafka
- Zapisywanie w usłudze Event Hubs dla platformy Kafka
Wymagania wstępne
Przed rozpoczęciem tego samouczka upewnij się, że masz następujące elementy:
- Subskrypcja platformy Azure. Jeśli jej nie masz, utwórz bezpłatne konto.
- Apache Spark w wersji 2.4
- Apache Kafka v2.0
- Git
Uwaga
Adapter Spark-Kafka został zaktualizowany do obsługi Kafka w wersji 2.0 od Spark w wersji 2.4. W poprzednich wersjach oprogramowania Spark adapter obsługiwał platformę Kafka w wersji 0.10 lub nowszej, ale używał konkretnie interfejsów API platformy Kafka w wersji 0.10. Ponieważ Event Hubs dla systemu Kafka nie obsługuje wersji Kafka 0.10, adaptery Spark-Kafka z wersji Spark wcześniejszych niż 2.4 nie są obsługiwane przez Event Hubs dla ekosystemów Kafka.
Tworzenie przestrzeni nazw usługi Event Hubs
Przestrzeń nazw Event Hubs jest wymagana do wysyłania i odbierania ze wszystkich usług Event Hubs. Aby uzyskać instrukcje dotyczące tworzenia przestrzeni nazw i centrum zdarzeń, zobacz Tworzenie centrum zdarzeń. Pobierz parametry połączenia usługi Event Hubs i w pełni kwalifikowaną nazwę domeny (FQDN) w celu późniejszego użycia. Aby uzyskać instrukcje, zobacz Get an Event Hubs connection string.
Klonowanie projektu przykładowego
Sklonuj repozytorium usługi Azure Event Hubs i przejdź do podfolderu tutorials/spark :
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/spark
Odczyt z usługi Event Hubs dla platformy Kafka
Po kilku zmianach konfiguracji możesz rozpocząć odczytywanie z usługi Event Hubs dla platformy Kafka. Zaktualizuj BOOTSTRAP_SERVERS i EH_SASL przy użyciu szczegółów z przestrzeni nazw i możesz rozpocząć przesyłanie strumieniowe za pomocą usługi Event Hubs, tak jak w przypadku platformy Kafka. Pełny przykładowy kod można znaleźć w pliku sparkConsumer.scala w witrynie GitHub.
//Read from your Event Hub!
val df = spark.readStream
.format("kafka")
.option("subscribe", TOPIC)
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", EH_SASL)
.option("kafka.request.timeout.ms", "60000")
.option("kafka.session.timeout.ms", "30000")
.option("kafka.group.id", GROUP_ID)
.option("failOnDataLoss", "true")
.load()
//Use dataframe like normal (in this example, write to console)
val df_write = df.writeStream
.outputMode("append")
.format("console")
.start()
Jeśli wystąpi błąd podobny do poniższego błędu, dodaj .option("spark.streaming.kafka.allowNonConsecutiveOffsets", "true") do wywołania spark.readStream i spróbuj ponownie.
IllegalArgumentException: requirement failed: Got wrong record for <spark job name> even after seeking to offset 4216 got offset 4217 instead. If this is a compacted topic, consider enabling spark.streaming.kafka.allowNonConsecutiveOffsets
Zapisywanie w usłudze Event Hubs dla platformy Kafka
Możesz również pisać do Event Hubs w taki sam sposób, jak piszesz do Kafki. Nie zapomnij zaktualizować konfiguracji w celu zmiany BOOTSTRAP_SERVERS i EH_SASL przy użyciu informacji z przestrzeni nazw Event Hubs. Pełny przykładowy kod można znaleźć w pliku sparkProducer.scala w witrynie GitHub.
df = /**Dataframe**/
//Write to your Event Hub!
df.writeStream
.format("kafka")
.option("topic", TOPIC)
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", EH_SASL)
.option("checkpointLocation", "./checkpoint")
.start()
Następne kroki
Aby dowiedzieć się więcej o usłudze Event Hubs i usłudze Event Hubs dla platformy Kafka, zobacz następujące artykuły:
- Replikacja brokera Kafka w centrum zdarzeń
- Łączenie narzędzia Apache Flink z centrum zdarzeń
- Integrowanie platformy Kafka Connect z centrum zdarzeń
- Odkrywaj przykłady na GitHubie
- Łączenie usługi Akka Streams z centrum zdarzeń
- Przewodnik dla deweloperów platformy Apache Kafka dotyczący usługi Azure Event Hubs