Nuta
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować się zalogować lub zmienić katalog.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Important
Ta funkcja jest dostępna w publicznej wersji testowej.
Na tej stronie opisano tryb czasu rzeczywistego, typ wyzwalacza dla Structured Streaming, który umożliwia przetwarzanie danych przy bardzo niskim poziomie opóźnień, z opóźnieniem end-to-end nawet do 5 ms. Ten tryb jest przeznaczony dla obciążeń operacyjnych, które wymagają natychmiastowej reakcji na dane przesyłane strumieniowo.
Tryb czasu rzeczywistego jest dostępny w środowisku Databricks Runtime 16.4 LTS lub nowszym.
Obciążenia operacyjne
Obciążenia przesyłania strumieniowego można szeroko podzielić na obciążenia analityczne i obciążenia operacyjne:
- Obciążenia analityczne używają pozyskiwania i przekształcania danych, zwykle zgodnie z architekturą medalionu (na przykład pozyskiwaniu danych do tablic brązowych, srebrnych i złotych).
- Obciążenia operacyjne zużywają dane w czasie rzeczywistym, stosują logikę biznesową i wyzwalają akcje podrzędne lub decyzje.
Oto kilka przykładów obciążeń operacyjnych:
- Blokowanie lub flagowanie transakcji karty kredytowej w czasie rzeczywistym, jeśli wynik oszustwa przekracza próg, na podstawie czynników takich jak nietypowa lokalizacja, duży rozmiar transakcji lub szybkie wzorce wydatków.
- Dostarczanie wiadomości promocyjnej, gdy dane strumienia kliknięć pokazują, że użytkownik przegląda dżinsy przez pięć minut, oferując 25% rabat, jeśli kupi w ciągu najbliższych 15 minut.
Ogólnie rzecz biorąc, obciążenia operacyjne charakteryzują się potrzebą opóźnienia end-to-end poniżej jednej sekundy. Można to osiągnąć w trybie czasu rzeczywistego w Strukturalnym Przetwarzaniu Strumieniowym Apache Spark.
Jak tryb czasu rzeczywistego osiąga małe opóźnienie
Tryb czasu rzeczywistego poprawia architekturę wykonywania przez:
- Wykonywanie długotrwałych partii (wartość domyślna to 5 minut), w których dane są przetwarzane, gdy staną się dostępne w źródle.
- Wszystkie etapy zapytania są zaplanowane jednocześnie. Wymaga to, aby liczba dostępnych slotów zadań była równa lub większa od liczby zadań we wszystkich etapach partii.
- Dane są przekazywane między etapami natychmiast po ich utworzeniu przy użyciu przesyłania strumieniowego.
Na końcu przetwarzania partii, a przed rozpoczęciem następnej partii, punkty kontrolne w Structured Streaming zapisują postęp i umożliwiają dostęp do metryk dla ostatniej partii. Jeśli partie są dłuższe, te działania mogą być rzadsze, co prowadzi do dłuższych powtórzeń w przypadku awarii i opóźnienia dostępności metryk. Z drugiej strony, jeśli partie są mniejsze, te działania stają się częstsze, potencjalnie wpływające na opóźnienie. Usługa Databricks zaleca przeprowadzenie testu porównawczego trybu w czasie rzeczywistym względem obciążenia docelowego i wymagań w celu znalezienia odpowiedniego interwału wyzwalacza.
Konfiguracja klastra
Aby używać trybu w czasie rzeczywistym w Strumieniu Ustrukturyzowanym, należy skonfigurować klasyczne zadanie Lakeflow.
W obszarze roboczym usługi Azure Databricks kliknij pozycję Nowy w lewym górnym rogu. Wybierz pozycję Więcej , a następnie kliknij pozycję Klaster.
Usuń przyspieszenie Photon.
Odznacz Włącz skalowanie automatyczne.
W obszarze Zaawansowana wydajność wyczyść pole Użyj wystąpień typu spot.
W obszarach zaawansowany i Tryb dostępu kliknij Ręczne i wybierz Dedykowane (dawniej: Pojedynczy użytkownik).
W obszarze Spark wprowadź następujące polecenie w obszarze Konfiguracja platformy Spark:
spark.databricks.streaming.realTimeMode.enabled trueKliknij pozycję Utwórz.
Wymagania dotyczące rozmiaru klastra
Jeśli klaster ma wystarczającą liczbę miejsc zadań, możesz uruchomić jedno zadanie w czasie rzeczywistym na klaster.
Aby uruchomić w trybie niskiego opóźnienia, łączna liczba dostępnych miejsc zadań musi być większa lub równa liczbie zadań we wszystkich etapach przetwarzania zapytania.
Przykłady obliczeń miejsca
Potok bezstanowy jednokrokowy (źródło i odbiornik Kafka)
Jeśli maxPartitions = 8, potrzebujesz co najmniej 8 miejsc. Jeśli nie ustalono maksymalnej liczby partycji, użyj liczby partycji tematu Kafka.
Dwuetapowy potok z zachowaniem stanu (źródło Kafka + przetasowanie):
Jeśli maxPartitions = 8 i shuffle partitions = 20, potrzebujesz 8 + 20 = 28 gniazd.
Potok trójetapowy (Źródło Kafka + przetasowanie + repartycjonowanie):
Przy maxPartitions = 8 i dwóch etapach mieszania, każdy po 20, potrzebujesz 8 + 20 + 20 = 48 slotów.
Kluczowe zagadnienia
Podczas konfigurowania klastra należy wziąć pod uwagę następujące kwestie:
- W przeciwieństwie do trybu mikrosadowego, zadania w czasie rzeczywistym mogą pozostawać bezczynne podczas oczekiwania na dane, więc odpowiednie dopasowanie rozmiaru jest niezbędne, aby uniknąć marnowania zasobów.
- Celuj w celu osiągnięcia docelowego poziomu wykorzystania (np. 50%) przez dostrajanie:
-
maxPartitions(dla platformy Kafka) -
spark.sql.shuffle.partitions(dla etapów tasowania)
-
- Usługa Databricks zaleca ustawienie wartości maxPartition, aby każde zadanie obsługiwało wiele partycji platformy Kafka w celu zmniejszenia obciążenia.
- Dostosuj gniazda zadań przypadające na pracownika, aby dopasować obciążenie pracą do prostych zadań jednofazowych.
- W przypadku zadań wymagających intensywnego mieszania eksperymentuj, aby znaleźć minimalną liczbę partycji mieszania, które unikają opóźnień, a następnie dostosuj wartość początkową. Zadanie nie zostanie zaplanowane, jeśli klaster nie ma wystarczającej liczby miejsc.
Note
W środowisku Databricks Runtime 16.4 LTS lub nowszym wszystkie potoki czasu rzeczywistego używają punktów kontrolnych w wersji 2, co umożliwia bezproblemowe przełączanie między trybami czasu rzeczywistego i mikrosadowego.
Konfiguracja zapytania
Należy włączyć wyzwalacz czasu rzeczywistego, aby określić, że zapytanie powinno być uruchamiane przy użyciu trybu małych opóźnień. Ponadto wyzwalacze w czasie rzeczywistym są obsługiwane tylko w trybie aktualizacji. Przykład:
Python
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.outputMode("update")
# in PySpark, realTime trigger requires you to specify the interval.
.trigger(realTime="5 minutes")
.start()
)
Scala
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val readStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic).load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.outputMode("update")
.trigger(RealTimeTrigger.apply())
// RealTimeTrigger can also accept an argument specifying the checkpoint interval.
// For example, this code indicates a checkpoint interval of 5 minutes:
// .trigger(RealTimeTrigger.apply("5 minutes"))
.start()
Observability
Wcześniej opóźnienie zapytań końcowych było ściśle związane z czasem trwania partii przetwarzania wsadowego, co czyniło go dobrym wskaźnikiem opóźnienia zapytań. Jednak ta metoda nie jest już stosowana w trybie czasu rzeczywistego, co wymaga alternatywnych metod pomiaru opóźnienia. Kompleksowe opóźnienie jest specyficzne dla obciążenia, a czasami może być dokładnie mierzone tylko za pomocą logiki biznesowej. Jeśli na przykład źródłowy znacznik czasu jest wypisany na platformie Kafka, opóźnienie można obliczyć jako różnicę między sygnaturą czasową danych wyjściowych platformy Kafka a sygnaturą czasową źródła.
Opóźnienie end-to-end można szacować na kilka sposobów na podstawie częściowych informacji zebranych podczas procesu przesyłania strumieniowego.
Korzystanie z przesyłania strumieniowegoQueryProgress
Następujące metryki są uwzględniane w StreamingQueryProgress zdarzeniu, które jest automatycznie rejestrowane w dziennikach sterowników. Dostęp do nich można również uzyskać za pośrednictwem funkcji wywołania zwrotnego StreamingQueryListeneronQueryProgress().
QueryProgressEvent.json() lub toString() uwzględniają dodatkowe metryki trybu w czasie rzeczywistym.
- Opóźnienie przetwarzania (processingLatencyMs). Czas upływa między odczytem rekordu przez zapytanie w trybie rzeczywistym a jego zapisaniem w kolejnym etapie lub dalej w dół procesu. W przypadku zapytań jednoetapowych oznacza to ten sam czas trwania co opóźnienie E2E. Ta metryka jest podawana dla każdego zadania.
- Źródłowe opóźnienie kolejkowania (sourceQueuingLatencyMs). Czas, który minął od pomyślnego zapisu rekordu w magistrali komunikatów, na przykład czas dołączenia dziennika w systemie Kafka, do momentu pierwszego odczytu rekordu przez zapytanie w trybie czasu rzeczywistego. Ta metryka jest podawana dla każdego zadania.
- Opóźnienie E2E (e2eLatencyMs). Czas między pomyślnym zapisaniem rekordu w magistrali komunikatów a zapisaniem rekordu w dalszym etapie przez zapytanie w trybie rzeczywistym. Ta metryka jest agregowana na każdą partię we wszystkich rekordach przetwarzanych przez wszystkie zadania.
Przykład:
"rtmMetrics" : {
"processingLatencyMs" : {
"P0" : 0,
"P50" : 0,
"P90" : 0,
"P95" : 0,
"P99" : 0
},
"sourceQueuingLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 3
},
"e2eLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 4
},
Korzystanie z API obserwowania w zadaniach
Interfejs API obserwowania pomaga zmierzyć opóźnienie bez uruchamiania innego zadania. Jeśli masz znacznik czasu, który przybliża czas przybycia danych źródłowych i jest przekazywany przed dotarciem do ujścia, lub jeśli możesz znaleźć sposób na jego przekazanie, możesz oszacować opóźnienie każdej partii za pomocą Observe API.
Python
from datetime import datetime
from pyspark.sql.functions import avg, col, lit, max, percentile_approx, udf, unix_millis
from pyspark.sql.types import TimestampType
@udf(returnType=TimestampType())
def current_timestamp():
return datetime.now()
# Query before outputting
.withColumn("temp-timestamp", current_timestamp())
.withColumn(
"latency",
unix_millis(col("temp-timestamp")).cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
"observedLatency",
avg(col("latency")).alias("avg"),
max(col("latency")).alias("max"),
percentile_approx(col("latency"), lit(0.99), lit(150)).alias("p99"),
percentile_approx(col("latency"), lit(0.5), lit(150)).alias("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
# Output part of the query. For example, .WriteStream, etc.
Scala
import org.apache.spark.sql.functions.{avg, col, lit, max, percentile_approx, udf, unix_millis}
val currentTimestampUDF = udf(() => System.currentTimeMillis())
// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
"latency",
col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
name = "observedLatency",
avg(col("latency")).as("avg"),
max(col("latency")).as("max"),
percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.
W tym przykładzie bieżący znacznik czasu jest rejestrowany przed wyprowadzeniem wpisu, a opóźnienie jest szacowane przez obliczenie różnicy między tym znacznikiem czasu a znacznikiem czasu źródłowego rekordu. Wyniki są uwzględniane w raportach postępu i udostępniane odbiornikom. Oto przykładowe dane wyjściowe:
"observedMetrics" : {
"observedLatency" : {
"avg" : 63.8369765176552,
"max" : 219,
"p99" : 154,
"p50" : 49
}
}
Co jest obsługiwane?
Environments
| Typ klastra | Supported |
|---|---|
| Dedykowany (dawniej: dla pojedynczego użytkownika) | Yes |
| Standardowa (dawniej: udostępnione) | No |
| Lakeflow Spark Deklaratywne potoki klasyczne | No |
| Deklaratywne potoki Lakeflow Spark bezserwerowe | No |
| Serverless | No |
Languages
| Język | Supported |
|---|---|
| Scala | Yes |
| Java | Yes |
| Python | Yes |
Tryby wykonywania
| Tryb wykonywania | Supported |
|---|---|
| Tryb aktualizacji | Yes |
| tryb dołączania | No |
| Tryb ukończenia | No |
Sources
| Sources | Supported |
|---|---|
| Apache Kafka | Yes |
| AWS MSK | Yes |
| Eventhub (przy użyciu łącznika platformy Kafka) | Yes |
| Kinesis | Tak (tylko tryb EFO) |
| Google Pub/Sub (usługa przesyłania wiadomości) | No |
| Apache Pulsar | No |
Sinks
| Sinks | Supported |
|---|---|
| Apache Kafka | Yes |
| Eventhub (przy użyciu łącznika platformy Kafka) | Yes |
| Kinesis | No |
| Google Pub/Sub (usługa przesyłania wiadomości) | No |
| Apache Pulsar | No |
| Dowolne ujścia (przy użyciu elementu forEachWriter) | Yes |
Operators
| Operators | Supported |
|---|---|
| Operacje bezstanowe | |
|
Yes |
|
Yes |
| UDFs | |
|
Tak (z pewnymi ograniczeniami) |
|
Tak (z pewnymi ograniczeniami) |
| Aggregation | |
|
Yes |
|
Yes |
|
Yes |
|
Yes |
|
Yes |
| Funkcje agregacji | Yes |
| Windowing | |
|
Yes |
|
Yes |
|
No |
| Deduplication | |
|
Tak (stan jest niezwiązany) |
|
No |
| Strumień — łączenie tabeli | |
|
Yes |
| Strumień — przyłączanie do strumienia | No |
| (płaskie)MapGroupsWithState | No |
| transformWithState | Tak (z pewnymi różnicami) |
| związek | Tak (z pewnymi ograniczeniami) |
| forEach | Yes |
| forEachBatch | No |
| mapPartitions | Nie (zobacz ograniczenie) |
Używanie funkcji transformWithState w trybie czasu rzeczywistego
W przypadku tworzenia niestandardowych aplikacji stanowych usługa Databricks obsługuje API transformWithState w strukturze strumieniowej Apache Spark. Aby uzyskać więcej informacji na temat interfejsu API i przykładów kodu, zapoznaj się z Tworzenie niestandardowej aplikacji stanowej.
Istnieją jednak pewne różnice między działaniem interfejsu API w trybie pracy w czasie rzeczywistym a tradycyjnymi zapytaniami strumieniowymi korzystającymi z architektury mikrosadowej.
- Metoda w trybie
handleInputRows(key: String, inputRows: Iterator[T], timerValues: TimerValues)czasu rzeczywistego jest wywoływana dla każdego wiersza.- Iterator
inputRowszwraca pojedynczą wartość. W trybie mikrosadowym jest wywoływana raz dla każdego klucza, ainputRowsiterator zwraca wszystkie wartości dla klucza w mikropartii. - Musisz być świadomy tej różnicy podczas pisania kodu.
- Iterator
- Czasomierze czasu zdarzeń nie są obsługiwane w trybie czasu rzeczywistego.
- W trybie czasu rzeczywistego czasomierze są opóźniane w wyzwalaniu w zależności od przybycia danych. W przeciwnym razie, jeśli nie ma danych, zostanie ona wyzwolona na końcu długotrwałej partii. Na przykład, jeśli czasomierz ma się uruchomić o godzinie 10:00:00 i w tym samym czasie nie ma przychodzących danych, to nie zostanie uruchomiony. Zamiast tego, jeśli dane docierają do 10:00:10, czasomierz jest uruchamiany z opóźnieniem 10 sekund. Lub, jeśli żadne dane nie zostaną dostarczone, a długotrwała partia ma zostać zakończona, uruchamia czasomierz przed jej zakończeniem.
Funkcje zdefiniowane przez użytkownika języka Python
Usługa Databricks obsługuje większość funkcji zdefiniowanych przez użytkownika (UDF) języka Python w trybie czasu rzeczywistego:
| Typ funkcji zdefiniowanej przez użytkownika | Supported |
|---|---|
| Bezstanowa kontrola funkcji zdefiniowanej przez użytkownika | |
|
Yes |
|
Yes |
|
Yes |
|
Yes |
|
Yes |
| Stanowe grupowanie UDF (UDAF) | |
|
Yes |
|
No |
| Niestanowe grupowanie UDF (UDAF) | |
|
No |
|
No |
|
No |
| Funkcja tabeli | |
|
No |
| UC UDF | No |
Istnieje kilka kwestii, które należy wziąć pod uwagę podczas korzystania z funkcji UDF języka Python w trybie czasu rzeczywistego:
- Aby zminimalizować opóźnienie, skonfiguruj rozmiar partii strzałki (spark.sql.execution.arrow.maxRecordsPerBatch) na 1.
- Kompromis: ta konfiguracja optymalizuje opóźnienia kosztem przepływności. W przypadku większości obciążeń to ustawienie jest zalecane.
- Zwiększ rozmiar partii tylko wtedy, gdy wymagana jest większa przepływność w celu uwzględnienia woluminu wejściowego, akceptując potencjalny wzrost opóźnienia.
- Funkcje i funkcje pandas nie działają dobrze z rozmiarem partii Strzałka 1.
- Jeśli używasz funkcji zdefiniowanych przez użytkownika lub funkcji biblioteki pandas, ustaw rozmiar partii strzałki na wyższą wartość (na przykład 100 lub wyższą).
- Należy pamiętać, że oznacza to większe opóźnienie. Usługa Databricks zaleca używanie funkcji UDF strzałki, jeśli jest to możliwe.
- Ze względu na problem z wydajnością biblioteki pandas funkcja transformWithState jest obsługiwana tylko w interfejsie
Row.
Techniki optymalizacji
| Technique | Domyślnie włączone |
|---|---|
| Śledzenie postępu asynchronicznego: przenosi zapis do dziennika przesunięć i dziennika zatwierdzeń do wątku asynchronicznego, skracając czas międzypartiowy między dwiema mikropartiami. Może to pomóc zmniejszyć opóźnienie bezstanowych zapytań strumieniowych. | No |
| Asynchroniczne tworzenie punktów kontrolnych stanu: pomaga zmniejszyć opóźnienia w zapytaniach przesyłania strumieniowego stanowego, umożliwiając rozpoczęcie przetwarzania następnej mikropartii natychmiast po zakończeniu obliczeń poprzedniej, bez czekania na tworzenie punktów kontrolnych stanu. | No |
Limitations
Ograniczenie źródła
W przypadku kinesis tryb sondowania nie jest obsługiwany. Co więcej, częste ponowne partycjonowania mogą mieć negatywny wpływ na opóźnienie.
Ograniczenie unii
W przypadku Unii istnieją pewne ograniczenia:
- Samodzielna unia nie jest obsługiwana:
- Kafka: nie można użyć tego samego obiektu ramki danych źródłowej i łączyć z niego pochodnych ramek danych. Obejście: Użyj różnych ramek danych odczytywanych z tego samego źródła.
- Kinesis: nie można unionować ramek danych pochodzących z tego samego źródła Kinesis z tą samą konfiguracją. Obejście: Oprócz używania różnych ramek danych można przypisać inną opcję "consumerName" do każdej ramki danych.
- Operatory stanowe (na przykład ,
aggregate,deduplicatetransformWithState) zdefiniowane przed Unią nie są obsługiwane. - Łączenie ze źródłami wsadowymi nie jest obsługiwane.
Ograniczenie usługi MapPartitions
mapPartitions W języku Scala i podobnymi interfejsami API języka Python (mapInPandas, mapInArrow) wykonaj iterator całej partycji wejściowej i utwórz iterator całego danych wyjściowych z dowolnym mapowaniem między danymi wejściowymi i wyjściowymi. Te interfejsy programowania aplikacji (API) mogą powodować problemy z wydajnością w trybie przesyłania strumieniowego w czasie rzeczywistym, blokując cały strumień wyjściowy, co zwiększa opóźnienie. Semantyka tych interfejsów API nie wspiera dobrze propagacji znaków wodnych.
Użyj skalarnych funkcji zdefiniowanych przez użytkownika w połączeniu z przekształcaniem złożonych typów danych lub filter zamiast tego, aby uzyskać podobną funkcjonalność.
Examples
W poniższych przykładach przedstawiono obsługiwane zapytania.
Zapytania bezstanowe
Obsługiwane są zapytania bezstanowe, zarówno pojedyncze, jak i wieloetapowe.
Źródło platformy Kafka do ujścia platformy Kafka
W tym przykładzie odczytujesz ze źródła Kafka i zapisujesz je w odbiorniku Kafka.
Python
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
Podział
W tym przykładzie odczytujesz ze źródła platformy Kafka, ponownie podzielisz dane na 20 partycji i zapiszesz je w ujściu platformy Kafka.
Ustaw konfigurację spark.sql.execution.sortBeforeRepartition platformy Spark na false wartość przed użyciem ponownego partycjonowania.
Python
# Sorting is not supported in repartition with real-time mode, so this has to be set to false to achieve low latency.
spark.conf.set("spark.sql.execution.sortBeforeRepartition", "false")
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.option("startingOffsets", "earliest")
.load()
.repartition(20)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
// Sorting is not supported in repartition with real-time mode, so this has to be set to false to achieve low latency.
spark.conf.set("spark.sql.execution.sortBeforeRepartition", "false")
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic)
.load()
.repartition(20)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
Łączenie migawek strumienia (tylko nadawanie)
W tym przykładzie odczytujesz z platformy Kafka, połączysz dane ze statyczną tabelą i zapiszesz je w ujściu platformy Kafka. Należy pamiętać, że obsługiwane są tylko łączenia stream-static, które rozpowszechniają tabelę statyczną, co oznacza, że tabela statyczna powinna mieścić się w pamięci.
Python
from pyspark.sql.functions import broadcast, expr
# We assume the static table in the path `stateic_table_location` has a column 'lookupKey'.
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.option("startingOffsets", "earliest")
.load()
.withColumn("joinKey", expr("CAST(value AS STRING)"))
.join(
broadcast(spark.read.format("parquet").load(static_table_location)),
expr("joinKey = lookupKey")
)
.selectExpr("value AS key", "value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.functions.{broadcast, expr}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic)
.load()
.join(broadcast(spark.read.format("parquet").load(staticTableLocation)), expr("joinKey = lookupKey"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
Kinesis źródło ujścia platformy Kafka
W tym przykładzie odczytujesz ze źródła Kinesis i zapisujesz je w ujściu platformy Kafka.
Python
query = (
spark.readStream
.format("kinesis")
.option("region", region_name)
.option("awsAccessKey", aws_access_key_id)
.option("awsSecretKey", aws_secret_access_key)
.option("consumerMode", "efo")
.option("consumerName", consumer_name)
.load()
.selectExpr("parttitionKey AS key", "CAST(data AS STRING) AS value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
spark.readStream
.format("kinesis")
.option("region", regionName)
.option("awsAccessKey", awsAccessKeyId)
.option("awsSecretKey", awsSecretAccessKey)
.option("consumerMode", "efo")
.option("consumerName", consumerName)
.load()
.select(
col("partitionKey").alias("key"),
col("data").cast("string").alias("value")
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
Union
W tym przykładzie łączysz dwie ramki danych Kafka z dwóch różnych tematów i zapisujesz je do ujścia Kafka.
Python
df1 = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic_1)
.load()
)
df2 = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic_2)
.load()
)
query = (
df1.union(df2)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val df1 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic1)
.load()
val df2 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic2)
.load()
df1.union(df2)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
Zapytania stanowe
Deduplication
Python
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic)
.load()
.dropDuplicates(["timestamp", "value"])
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic)
.load()
.dropDuplicates("timestamp", "value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
Aggregation
Python
from pyspark.sql.functions import col
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic)
.load()
.groupBy(col("timestamp"), col("value"))
.count()
.selectExpr("CAST(value AS STRING) AS key", "CAST(count AS STRING) AS value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic)
.load()
.groupBy(col("timestamp"), col("value"))
.count()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply("5 minutes"))
.outputMode(OutputMode.Update())
.start()
Łączenie z agregacją
W tym przykładzie najpierw łączysz dwie ramki danych platformy Kafka z dwóch różnych tematów, a następnie wykonujesz agregację. Na koniec zapisujesz do odbiornika Kafka.
Python
from pyspark.sql.functions import col
df1 = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic_1)
.load()
)
df2 = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic_2)
.load()
)
query = (
df1.union(df2)
.groupBy(col("timestamp"), col("value"))
.count()
.selectExpr("CAST(value AS STRING) AS key", "CAST(count AS STRING) AS value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val df1 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic1)
.load()
val df2 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic2)
.load()
df1.union(df2)
.groupBy(col("timestamp"), col("value"))
.count()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
TransformWithState
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
import org.apache.spark.sql.streaming.{ListState, MapState, StatefulProcessor, OutputMode, TTLConfig, TimeMode, TimerValues, ValueState}
/**
* This processor counts the number of records it has seen for each key using state variables
* with TTLs. It redundantly maintains this count with a value, list, and map state to put load
* on the state variable cleanup mechanism. (In practice, only one value state is needed to maintain
* the count for a given grouping key.)
*
* The input schema it expects is (String, Long) which represents a (key, source-timestamp) tuple.
* The source-timestamp is passed through so that we can calculate end-to-end latency. The output
* schema is (String, Long, Long), which represents a (key, count, source-timestamp) 3-tuple.
*
*/
class RTMStatefulProcessor(ttlConfig: TTLConfig)
extends StatefulProcessor[String, (String, Long), (String, Long, Long)] {
@transient private var _value: ValueState[Long] = _
@transient private var _map: MapState[Long, String] = _
@transient private var _list: ListState[String] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
// Counts the number of records this key has seen
_value = getHandle.getValueState("value", Encoders.scalaLong, ttlConfig)
_map = getHandle.getMapState("map", Encoders.scalaLong, Encoders.STRING, ttlConfig)
_list = getHandle.getListState("list", Encoders.STRING, ttlConfig)
}
override def handleInputRows(
key: String,
inputRows: Iterator[(String, Long)],
timerValues: TimerValues): Iterator[(String, Long, Long)] = {
inputRows.map { row =>
val key = row._1
val sourceTimestamp = row._2
val oldValue = _value.get()
_value.update(oldValue + 1)
_map.updateValue(oldValue, key)
_list.appendValue(key)
(key, oldValue + 1, sourceTimestamp)
}
}
}
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic)
.load()
.select(col("key").cast("STRING"), col("value").cast("STRING"), col("timestamp"))
.as[(String, String, Timestamp)]
.groupByKey(row => row._1)
.transformWithState(new RTMStatefulProcessor(TTLConfig(Duration.ofSeconds(30))), TimeMode.ProcessingTime, OutputMode.Update)
.as[(String, Long, Long)]
.select(
col("_1").as("key"),
col("_2").as("value")
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply("5 minutes"))
.outputMode(OutputMode.Update())
.start()
Note
Istnieje różnica między sposobem działania trybu w czasie rzeczywistym a innymi trybami operacyjnymi w Structured Streaming StatefulProcessor w transformWithState. Zobacz Use transformWithState in real-time mode (Używanie transformWithState w trybie czasu rzeczywistego)
TransformWithState (PySpark, interfejs wiersza)
from typing import Iterator, Tuple
from pyspark.sql import Row
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import LongType, StringType, TimestampType, StructField, StructType
class RTMStatefulProcessor(StatefulProcessor):
"""
This processor counts the number of records it has seen for each key using state variables
with TTLs. It redundantly maintains this count with a value, list, and map state to put load
on the state variable cleanup mechanism. (In practice, only one value state is needed to maintain
the count for a given grouping key.)
The input schema it expects is (String, Long) which represents a (key, source-timestamp) tuple.
The source-timestamp is passed through so that we can calculate end-to-end latency. The output
schema is (String, Long, Long), which represents a (key, count, source-timestamp) 3-tuple.
"""
def init(self, handle: StatefulProcessorHandle) -> None:
state_schema = StructType([StructField("value", LongType(), True)])
self.value_state = handle.getValueState("value", state_schema, 30000)
map_key_schema = StructType([StructField("key", LongType(), True)])
map_value_schema = StructType([StructField("value", StringType(), True)])
self.map_state = handle.getMapState("map", map_key_schema, map_value_schema, 30000)
list_schema = StructType([StructField("value", StringType(), True)])
self.list_state = handle.getListState("list", list_schema, 30000)
def handleInputRows(self, key, rows, timerValues) -> Iterator[Row]:
for row in rows:
# row is a tuple (key, source_timestamp)
key_str = row[0]
source_timestamp = row[1]
old_value = value.get()
if old_value is None:
old_value = 0
self.value_state.update((old_value + 1,))
self.map_state.update((old_value,), (key_str,))
self.list_state.appendValue((key_str,))
yield Row(key=key_str, value=old_value + 1, timestamp=source_timestamp)
def close(self) -> None:
pass
output_schema = StructType(
[
StructField("key", StringType(), True),
StructField("value", LongType(), True),
StructField("timestamp", TimestampType(), True),
]
)
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
.groupBy("key")
.transformWithState(
statefulProcessor=RTMStatefulProcessor(),
outputStructType=output_schema,
outputMode="Update",
timeMode="processingTime",
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("Update")
.start()
)
Note
Istnieje różnica między tym, jak tryb czasu rzeczywistego i inne tryby wykonywania w przesyłania strumieniowego ze strukturą uruchamiają StatefulProcessor element w transformWithStateprogramie . Zobacz Use transformWithState in real-time mode (Używanie transformWithState w trybie czasu rzeczywistego)
Sinks
Pisanie do bazy danych Postgres za pośrednictwem foreachSink
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.sql.{ForeachWriter, Row}
/**
* Groups connection properties for
* the JDBC writers.
*
* @param url JDBC url of the form jdbc:subprotocol:subname to connect to
* @param dbtable database table that should be written into
* @param username username for authentication
* @param password password for authentication
*/
class JdbcWriterConfig(
val url: String,
val dbtable: String,
val username: String,
val password: String,
) extends Serializable
/**
* Handles streaming data writes to a database sink via JDBC, by:
* - connecting to the database
* - buffering incoming data rows in batches to reduce write overhead
*
* @param config connection parameters and configuration knobs for the writer
*/
class JdbcStreamingDataWriter(config: JdbcWriterConfig)
extends ForeachWriter[Row] with Serializable {
// The writer currently only supports this hard-coded schema
private val UPSERT_STATEMENT_SQL =
s"""MERGE INTO "${config.dbtable}"
|USING (
| SELECT
| CAST(? AS INTEGER) AS "id",
| CAST(? AS CHARACTER VARYING) AS "data"
|) AS "source"
|ON "test"."id" = "source"."id"
|WHEN MATCHED THEN
| UPDATE SET "data" = "source"."data"
|WHEN NOT MATCHED THEN
| INSERT ("id", "data") VALUES ("source"."id", "source"."data")
|""".stripMargin
private val MAX_BUFFER_SIZE = 3
private val buffer = new Array[Row](MAX_BUFFER_SIZE)
private var bufferSize = 0
private var connection: Connection = _
/**
* Flushes the [[buffer]] by writing all rows in the buffer to the database.
*/
private def flushBuffer(): Unit = {
require(connection != null)
if (bufferSize == 0) {
return
}
var upsertStatement: PreparedStatement = null
try {
upsertStatement = connection.prepareStatement(UPSERT_STATEMENT_SQL)
for (i <- 0 until bufferSize) {
val row = buffer(i)
upsertStatement.setInt(1, row.getAs[String]("key"))
upsertStatement.setString(2, row.getAs[String]("value"))
upsertStatement.addBatch()
}
upsertStatement.executeBatch()
connection.commit()
bufferSize = 0
} catch { case e: Exception =>
if (connection != null) {
connection.rollback()
}
throw e
} finally {
if (upsertStatement != null) {
upsertStatement.close()
}
}
}
override def open(partitionId: Long, epochId: Long): Boolean = {
connection = DriverManager.getConnection(config.url, config.username, config.password)
true
}
override def process(row: Row): Unit = {
buffer(bufferSize) = row
bufferSize += 1
if (bufferSize >= MAX_BUFFER_SIZE) {
flushBuffer()
}
}
override def close(errorOrNull: Throwable): Unit = {
flushBuffer()
if (connection != null) {
connection.close()
connection = null
}
}
}
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic)
.load()
.writeStream
.outputMode(OutputMode.Update())
.trigger(defaultTrigger)
.foreach(new JdbcStreamingDataWriter(new JdbcWriterConfig(jdbcUrl, tableName, jdbcUsername, jdbcPassword)))
.start()
Display
Important
Ta funkcja jest dostępna w środowisku Databricks Runtime 17.1 lub nowszym.
Źródło szybkości wyświetlania
W tym przykładzie odczytujesz z źródła danych i wyświetlasz Streaming DataFrame w notatniku.
Python
inputDF = (
spark
.readStream
.format("rate")
.option("numPartitions", 2)
.option("rowsPerSecond", 1)
.load()
)
display(inputDF, realTime="5 minutes", outputMode="update")
Scala
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.streaming.OutputMode
val inputDF = spark
.readStream
.format("rate")
.option("numPartitions", 2)
.option("rowsPerSecond", 1)
.load()
display(inputDF, trigger=Trigger.RealTime(), outputMode=OutputMode.Update())