Omówienie przesyłania strumieniowego platformy Apache Spark
Przesyłanie strumieniowe platformy Apache Spark zapewnia przetwarzanie strumienia danych w klastrach platformy Spark w usłudze HDInsight. Dzięki gwarancji, że każde zdarzenie wejściowe jest przetwarzane dokładnie raz, nawet jeśli wystąpi awaria węzła. Usługa Spark Stream to długotrwałe zadanie, które odbiera dane wejściowe z wielu różnych źródeł, w tym z usługi Azure Event Hubs. Ponadto: Azure IoT Hub, Apache Kafka, Apache Flume, X, ZeroMQ
, nieprzetworzone gniazda TCP lub z monitorowania systemów plików Apache Hadoop YARN. W przeciwieństwie do procesu opartego wyłącznie na zdarzeniach usługa Spark Stream wsaduje dane wejściowe do okien czasowych. Na przykład 2-sekundowy fragment, a następnie przekształca każdą partię danych przy użyciu operacji mapowania, redukcji, sprzężenia i wyodrębniania. Następnie usługa Spark Stream zapisuje przekształcone dane w systemach plików, bazach danych, pulpitach nawigacyjnych i konsoli.
Aplikacje przesyłania strumieniowego platformy Spark muszą poczekać ułamek sekundy, aby zebrać wszystkie micro-batch
zdarzenia przed wysłaniem tej partii na potrzeby przetwarzania. Z kolei aplikacja sterowana zdarzeniami natychmiast przetwarza każde zdarzenie. Opóźnienie przesyłania strumieniowego platformy Spark trwa zwykle poniżej kilku sekund. Korzyści wynikające z podejścia mikrosadowego to wydajniejsze przetwarzanie danych i prostsze obliczenia agregujące.
Wprowadzenie do strumienia DStream
Przesyłanie strumieniowe platformy Spark reprezentuje ciągły strumień danych przychodzących przy użyciu zdyretowanego strumienia nazywanego strumieniem DStream. Strumień DStream można utworzyć na podstawie źródeł wejściowych, takich jak Event Hubs lub Kafka. Lub stosując przekształcenia na innym strumieniu DStream.
Strumień DStream zapewnia warstwę abstrakcji na podstawie nieprzetworzonych danych zdarzeń.
Zacznij od jednego zdarzenia, powiedzmy odczyt temperatury z połączonego termostatu. Po nadejściu tego zdarzenia do aplikacji Spark Streaming zdarzenie jest przechowywane w niezawodny sposób, gdzie jest replikowane na wielu węzłach. Ta odporność na uszkodzenia gwarantuje, że awaria żadnego pojedynczego węzła nie spowoduje utraty zdarzenia. Rdzenie platformy Spark używają struktury danych, która dystrybuuje dane między wieloma węzłami w klastrze. W przypadku, gdy każdy węzeł zwykle utrzymuje własne dane w pamięci, aby uzyskać najlepszą wydajność. Ta struktura danych jest nazywana odpornym rozproszonym zestawem danych (RDD).
Każdy RDD reprezentuje zdarzenia zebrane przez przedział czasu zdefiniowany przez użytkownika nazywany interwałem wsadowym. W miarę upływu każdego interwału wsadowego tworzony jest nowy RDD zawierający wszystkie dane z tego interwału. Ciągły zestaw RDD jest zbierany do strumienia DStream. Jeśli na przykład interwał wsadowy to jedna sekunda, strumień DStream emituje partię co sekundę zawierającą jeden RDD zawierający wszystkie dane pozyskane w ciągu tej sekundy. Podczas przetwarzania strumienia DStream zdarzenie temperatury jest wyświetlane w jednej z tych partii. Aplikacja Spark Streaming przetwarza partie zawierające zdarzenia i ostatecznie działa na danych przechowywanych w każdym RDD.
Struktura aplikacji do przesyłania strumieniowego platformy Spark
Aplikacja Spark Streaming to długotrwała aplikacja, która odbiera dane ze źródeł pozyskiwania. Stosuje przekształcenia do przetwarzania danych, a następnie wypycha dane do co najmniej jednego miejsca docelowego. Struktura aplikacji Spark Streaming ma statyczną część i część dynamiczną. Część statyczna definiuje, skąd pochodzą dane, jakie przetwarzanie ma być przetwarzane na danych. I gdzie powinny iść wyniki. Część dynamiczna uruchamia aplikację na czas nieokreślony, czekając na sygnał zatrzymania.
Na przykład następująca prosta aplikacja odbiera wiersz tekstu za pośrednictwem gniazda TCP i zlicza liczbę wyświetleń każdego wyrazu.
Definiowanie aplikacji
Definicja logiki aplikacji ma cztery kroki:
- Utwórz element StreamingContext.
- Utwórz strumień DStream na podstawie elementu StreamingContext.
- Zastosuj przekształcenia do strumienia DStream.
- Wyprowadź wyniki.
Ta definicja jest statyczna i żadne dane nie są przetwarzane do momentu uruchomienia aplikacji.
Tworzenie obiektu StreamingContext
Utwórz element StreamingContext z obiektu SparkContext, który wskazuje klaster. Podczas tworzenia obiektu StreamingContext można określić rozmiar partii w sekundach, na przykład:
import org.apache.spark._
import org.apache.spark.streaming._
val ssc = new StreamingContext(sc, Seconds(1))
Tworzenie strumienia DStream
Za pomocą wystąpienia StreamingContext utwórz wejściowy strumień DStream dla źródła danych wejściowych. W takim przypadku aplikacja obserwuje wygląd nowych plików w domyślnym dołączonym magazynie.
val lines = ssc.textFileStream("/uploads/Test/")
Stosowanie przekształceń
Przetwarzanie jest implementowane przez zastosowanie przekształceń w strumieniu DStream. Ta aplikacja odbiera jeden wiersz tekstu w danym momencie z pliku, dzieli każdy wiersz na wyrazy. Następnie używa wzorca redukcji mapy, aby zliczyć liczbę wyświetleń każdego wyrazu.
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
Wyniki wyjściowe
Wypchnij wyniki transformacji do systemów docelowych, stosując operacje wyjściowe. W takim przypadku wynik każdego przebiegu przez obliczenia jest drukowany w danych wyjściowych konsoli.
wordCounts.print()
Uruchamianie aplikacji
Uruchom aplikację do przesyłania strumieniowego i uruchom polecenie do momentu odebrania sygnału zakończenia.
ssc.start()
ssc.awaitTermination()
Aby uzyskać szczegółowe informacje na temat interfejsu API strumienia platformy Spark, zobacz Przewodnik programowania przesyłania strumieniowego platformy Apache Spark.
Poniższa przykładowa aplikacja jest samodzielna, więc można ją uruchomić w notesie Jupyter Notebook. W tym przykładzie utworzono pozorne źródło danych w klasie DummySource, które generuje wartość licznika i bieżącą godzinę w milisekundach co pięć sekund. Nowy obiekt StreamingContext ma interwał wsadowy wynoszący 30 sekund. Za każdym razem, gdy jest tworzona partia, aplikacja do przesyłania strumieniowego sprawdza wygenerowane rdD. Następnie konwertuje rdD na ramkę danych platformy Spark i tworzy tymczasową tabelę na ramce danych.
class DummySource extends org.apache.spark.streaming.receiver.Receiver[(Int, Long)](org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_2) {
/** Start the thread that simulates receiving data */
def onStart() {
new Thread("Dummy Source") { override def run() { receive() } }.start()
}
def onStop() { }
/** Periodically generate a random number from 0 to 9, and the timestamp */
private def receive() {
var counter = 0
while(!isStopped()) {
store(Iterator((counter, System.currentTimeMillis)))
counter += 1
Thread.sleep(5000)
}
}
}
// A batch is created every 30 seconds
val ssc = new org.apache.spark.streaming.StreamingContext(spark.sparkContext, org.apache.spark.streaming.Seconds(30))
// Set the active SQLContext so that we can access it statically within the foreachRDD
org.apache.spark.sql.SQLContext.setActive(spark.sqlContext)
// Create the stream
val stream = ssc.receiverStream(new DummySource())
// Process RDDs in the batch
stream.foreachRDD { rdd =>
// Access the SQLContext and create a table called demo_numbers we can query
val _sqlContext = org.apache.spark.sql.SQLContext.getOrCreate(rdd.sparkContext)
_sqlContext.createDataFrame(rdd).toDF("value", "time")
.registerTempTable("demo_numbers")
}
// Start the stream processing
ssc.start()
Poczekaj około 30 sekund po uruchomieniu powyższej aplikacji. Następnie można okresowo wykonywać zapytania dotyczące ramki danych, aby wyświetlić bieżący zestaw wartości znajdujących się w partii, na przykład przy użyciu tego zapytania SQL:
%%sql
SELECT * FROM demo_numbers
Wynikowe dane wyjściowe wyglądają podobnie do następujących danych wyjściowych:
wartość | time |
---|---|
10 | 1497314465256 |
11 | 1497314470272 |
12 | 1497314475289 |
13 | 1497314480310 |
14 | 1497314485327 |
15 | 1497314490346 |
Istnieje sześć wartości, ponieważ dummySource tworzy wartość co 5 sekund, a aplikacja emituje partię co 30 sekund.
Okna przesuwne
Aby przeprowadzić agregowanie obliczeń na strumieniu DStream w pewnym okresie, na przykład aby uzyskać średnią temperaturę w ciągu ostatnich dwóch sekund, użyj sliding window
operacji uwzględnionych w usłudze Spark Streaming. Okno przesuwane ma czas trwania (długość okna) i interwał, w którym zawartość okna jest obliczana (interwał slajdu).
Na przykład okna przesuwne mogą nakładać się na siebie, można zdefiniować okno o długości dwóch sekund, które przesuwa się co sekundę. Ta akcja oznacza, że za każdym razem, gdy wykonasz obliczenie agregacji, okno będzie zawierać dane z ostatniej sekundy poprzedniego okna. A wszystkie nowe dane w ciągu następnej sekundy.
Poniższy przykład aktualizuje kod, który używa dummySource, aby zebrać partie w oknie z jednominutowym czasem trwania i jednominutowym slajdem.
class DummySource extends org.apache.spark.streaming.receiver.Receiver[(Int, Long)](org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_2) {
/** Start the thread that simulates receiving data */
def onStart() {
new Thread("Dummy Source") { override def run() { receive() } }.start()
}
def onStop() { }
/** Periodically generate a random number from 0 to 9, and the timestamp */
private def receive() {
var counter = 0
while(!isStopped()) {
store(Iterator((counter, System.currentTimeMillis)))
counter += 1
Thread.sleep(5000)
}
}
}
// A batch is created every 30 seconds
val ssc = new org.apache.spark.streaming.StreamingContext(spark.sparkContext, org.apache.spark.streaming.Seconds(30))
// Set the active SQLContext so that we can access it statically within the foreachRDD
org.apache.spark.sql.SQLContext.setActive(spark.sqlContext)
// Create the stream
val stream = ssc.receiverStream(new DummySource())
// Process batches in 1 minute windows
stream.window(org.apache.spark.streaming.Minutes(1)).foreachRDD { rdd =>
// Access the SQLContext and create a table called demo_numbers we can query
val _sqlContext = org.apache.spark.sql.SQLContext.getOrCreate(rdd.sparkContext)
_sqlContext.createDataFrame(rdd).toDF("value", "time")
.registerTempTable("demo_numbers")
}
// Start the stream processing
ssc.start()
Po pierwszej minucie istnieje 12 wpisów — sześć wpisów z każdej z dwóch partii zebranych w oknie.
wartość | time |
---|---|
1 | 1497316294139 |
2 | 1497316299158 |
3 | 1497316304178 |
100 | 1497316309204 |
5 | 1497316314224 |
6 | 1497316319243 |
7 | 1497316324260 |
8 | 1497316329278 |
9 | 1497316334293 |
10 | 1497316339314 |
11 | 1497316344339 |
12 | 1497316349361 |
Funkcje okna przewijania dostępne w interfejsie API przesyłania strumieniowego platformy Spark obejmują okno, countByWindow, reduceByWindow i countByValueAndWindow. Aby uzyskać szczegółowe informacje na temat tych funkcji, zobacz Przekształcenia w strumieniach DStream.
Tworzenie punktów kontrolnych
Aby zapewnić odporność i odporność na uszkodzenia, przesyłanie strumieniowe platformy Spark opiera się na punktach kontrolnych, aby zapewnić nieprzerwane przetwarzanie strumienia, nawet w przypadku awarii węzłów. Platforma Spark tworzy punkty kontrolne w magazynie trwałym (Azure Storage lub Data Lake Storage). Te punkty kontrolne przechowują metadane aplikacji przesyłania strumieniowego, takie jak konfiguracja i operacje zdefiniowane przez aplikację. Ponadto wszystkie partie, które zostały w kolejce, ale nie zostały jeszcze przetworzone. Czasami punkty kontrolne będą również obejmować zapisywanie danych w RDD, aby szybciej ponownie skompilować stan danych z rdD zarządzanych przez platformę Spark.
Wdrażanie aplikacji do przesyłania strumieniowego platformy Spark
Zazwyczaj aplikację Spark Streaming można utworzyć lokalnie w pliku JAR. Następnie wdróż go na platformie Spark w usłudze HDInsight, kopiując plik JAR do domyślnego dołączonego magazynu. Aplikację można uruchomić przy użyciu interfejsów API REST usługi LIVY dostępnych w klastrze przy użyciu operacji POST. Treść pliku POST zawiera dokument JSON, który udostępnia ścieżkę do pliku JAR. Nazwa klasy, której główna metoda definiuje i uruchamia aplikację przesyłania strumieniowego oraz opcjonalnie wymagania dotyczące zasobów zadania (takie jak liczba funkcji wykonawczych, pamięci i rdzeni). Ponadto wszystkie ustawienia konfiguracji wymagane przez kod aplikacji.
Stan wszystkich aplikacji można również sprawdzić za pomocą żądania GET względem punktu końcowego usługi LIVY. Na koniec możesz zakończyć działającą aplikację, wysyłając żądanie DELETE względem punktu końcowego usługi LIVY. Aby uzyskać szczegółowe informacje na temat interfejsu API usługi LIVY, zobacz Zdalne zadania za pomocą usługi Apache LIVY