Udostępnij przez


Tryb czasu rzeczywistego w streamingu ustrukturyzowanym

Important

Ta funkcja jest dostępna w publicznej wersji testowej.

Na tej stronie opisano tryb czasu rzeczywistego, typ wyzwalacza dla Structured Streaming, który umożliwia przetwarzanie danych przy bardzo niskim poziomie opóźnień, z opóźnieniem end-to-end nawet do 5 ms. Ten tryb jest przeznaczony dla obciążeń operacyjnych, które wymagają natychmiastowej reakcji na dane przesyłane strumieniowo.

Tryb czasu rzeczywistego jest dostępny w środowisku Databricks Runtime 16.4 LTS lub nowszym.

Obciążenia operacyjne

Obciążenia przesyłania strumieniowego można szeroko podzielić na obciążenia analityczne i obciążenia operacyjne:

  • Obciążenia analityczne używają pozyskiwania i przekształcania danych, zwykle zgodnie z architekturą medalionu (na przykład pozyskiwaniu danych do tablic brązowych, srebrnych i złotych).
  • Obciążenia operacyjne zużywają dane w czasie rzeczywistym, stosują logikę biznesową i wyzwalają akcje podrzędne lub decyzje.

Oto kilka przykładów obciążeń operacyjnych:

  • Blokowanie lub flagowanie transakcji karty kredytowej w czasie rzeczywistym, jeśli wynik oszustwa przekracza próg, na podstawie czynników takich jak nietypowa lokalizacja, duży rozmiar transakcji lub szybkie wzorce wydatków.
  • Dostarczanie wiadomości promocyjnej, gdy dane strumienia kliknięć pokazują, że użytkownik przegląda dżinsy przez pięć minut, oferując 25% rabat, jeśli kupi w ciągu najbliższych 15 minut.

Ogólnie rzecz biorąc, obciążenia operacyjne charakteryzują się potrzebą opóźnienia end-to-end poniżej jednej sekundy. Można to osiągnąć w trybie czasu rzeczywistego w Strukturalnym Przetwarzaniu Strumieniowym Apache Spark.

Jak tryb czasu rzeczywistego osiąga małe opóźnienie

Tryb czasu rzeczywistego poprawia architekturę wykonywania przez:

  • Wykonywanie długotrwałych partii (wartość domyślna to 5 minut), w których dane są przetwarzane, gdy staną się dostępne w źródle.
  • Wszystkie etapy zapytania są zaplanowane jednocześnie. Wymaga to, aby liczba dostępnych slotów zadań była równa lub większa od liczby zadań we wszystkich etapach partii.
  • Dane są przekazywane między etapami natychmiast po ich utworzeniu przy użyciu przesyłania strumieniowego.

Na końcu przetwarzania partii, a przed rozpoczęciem następnej partii, punkty kontrolne w Structured Streaming zapisują postęp i umożliwiają dostęp do metryk dla ostatniej partii. Jeśli partie są dłuższe, te działania mogą być rzadsze, co prowadzi do dłuższych powtórzeń w przypadku awarii i opóźnienia dostępności metryk. Z drugiej strony, jeśli partie są mniejsze, te działania stają się częstsze, potencjalnie wpływające na opóźnienie. Usługa Databricks zaleca przeprowadzenie testu porównawczego trybu w czasie rzeczywistym względem obciążenia docelowego i wymagań w celu znalezienia odpowiedniego interwału wyzwalacza.

Konfiguracja klastra

Aby używać trybu w czasie rzeczywistym w Strumieniu Ustrukturyzowanym, należy skonfigurować klasyczne zadanie Lakeflow.

  1. W obszarze roboczym usługi Azure Databricks kliknij pozycję Nowy w lewym górnym rogu. Wybierz pozycję Więcej , a następnie kliknij pozycję Klaster.

  2. Usuń przyspieszenie Photon.

  3. Odznacz Włącz skalowanie automatyczne.

  4. W obszarze Zaawansowana wydajność wyczyść pole Użyj wystąpień typu spot.

  5. W obszarach zaawansowany i Tryb dostępu kliknij Ręczne i wybierz Dedykowane (dawniej: Pojedynczy użytkownik).

  6. W obszarze Spark wprowadź następujące polecenie w obszarze Konfiguracja platformy Spark:

    spark.databricks.streaming.realTimeMode.enabled true
    
  7. Kliknij pozycję Utwórz.

Wymagania dotyczące rozmiaru klastra

Jeśli klaster ma wystarczającą liczbę miejsc zadań, możesz uruchomić jedno zadanie w czasie rzeczywistym na klaster.

Aby uruchomić w trybie niskiego opóźnienia, łączna liczba dostępnych miejsc zadań musi być większa lub równa liczbie zadań we wszystkich etapach przetwarzania zapytania.

Przykłady obliczeń miejsca

Potok bezstanowy jednokrokowy (źródło i odbiornik Kafka)

Jeśli maxPartitions = 8, potrzebujesz co najmniej 8 miejsc. Jeśli nie ustalono maksymalnej liczby partycji, użyj liczby partycji tematu Kafka.

Dwuetapowy potok z zachowaniem stanu (źródło Kafka + przetasowanie):

Jeśli maxPartitions = 8 i shuffle partitions = 20, potrzebujesz 8 + 20 = 28 gniazd.

Potok trójetapowy (Źródło Kafka + przetasowanie + repartycjonowanie):

Przy maxPartitions = 8 i dwóch etapach mieszania, każdy po 20, potrzebujesz 8 + 20 + 20 = 48 slotów.

Kluczowe zagadnienia

Podczas konfigurowania klastra należy wziąć pod uwagę następujące kwestie:

  • W przeciwieństwie do trybu mikrosadowego, zadania w czasie rzeczywistym mogą pozostawać bezczynne podczas oczekiwania na dane, więc odpowiednie dopasowanie rozmiaru jest niezbędne, aby uniknąć marnowania zasobów.
  • Celuj w celu osiągnięcia docelowego poziomu wykorzystania (np. 50%) przez dostrajanie:
    • maxPartitions (dla platformy Kafka)
    • spark.sql.shuffle.partitions (dla etapów tasowania)
  • Usługa Databricks zaleca ustawienie wartości maxPartition, aby każde zadanie obsługiwało wiele partycji platformy Kafka w celu zmniejszenia obciążenia.
  • Dostosuj gniazda zadań przypadające na pracownika, aby dopasować obciążenie pracą do prostych zadań jednofazowych.
  • W przypadku zadań wymagających intensywnego mieszania eksperymentuj, aby znaleźć minimalną liczbę partycji mieszania, które unikają opóźnień, a następnie dostosuj wartość początkową. Zadanie nie zostanie zaplanowane, jeśli klaster nie ma wystarczającej liczby miejsc.

Note

W środowisku Databricks Runtime 16.4 LTS lub nowszym wszystkie potoki czasu rzeczywistego używają punktów kontrolnych w wersji 2, co umożliwia bezproblemowe przełączanie między trybami czasu rzeczywistego i mikrosadowego.

Konfiguracja zapytania

Należy włączyć wyzwalacz czasu rzeczywistego, aby określić, że zapytanie powinno być uruchamiane przy użyciu trybu małych opóźnień. Ponadto wyzwalacze w czasie rzeczywistym są obsługiwane tylko w trybie aktualizacji. Przykład:

Python

query = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("subscribe", input_topic)
        .load()
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("topic", output_topic)
        .option("checkpointLocation", checkpoint_location)
        .outputMode("update")
        # in PySpark, realTime trigger requires you to specify the interval.
        .trigger(realTime="5 minutes")
        .start()
)

Scala

import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val readStream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic).load()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .outputMode("update")
      .trigger(RealTimeTrigger.apply())
      // RealTimeTrigger can also accept an argument specifying the checkpoint interval.
      // For example, this code indicates a checkpoint interval of 5 minutes:
      // .trigger(RealTimeTrigger.apply("5 minutes"))
      .start()

Observability

Wcześniej opóźnienie zapytań końcowych było ściśle związane z czasem trwania partii przetwarzania wsadowego, co czyniło go dobrym wskaźnikiem opóźnienia zapytań. Jednak ta metoda nie jest już stosowana w trybie czasu rzeczywistego, co wymaga alternatywnych metod pomiaru opóźnienia. Kompleksowe opóźnienie jest specyficzne dla obciążenia, a czasami może być dokładnie mierzone tylko za pomocą logiki biznesowej. Jeśli na przykład źródłowy znacznik czasu jest wypisany na platformie Kafka, opóźnienie można obliczyć jako różnicę między sygnaturą czasową danych wyjściowych platformy Kafka a sygnaturą czasową źródła.

Opóźnienie end-to-end można szacować na kilka sposobów na podstawie częściowych informacji zebranych podczas procesu przesyłania strumieniowego.

Korzystanie z przesyłania strumieniowegoQueryProgress

Następujące metryki są uwzględniane w StreamingQueryProgress zdarzeniu, które jest automatycznie rejestrowane w dziennikach sterowników. Dostęp do nich można również uzyskać za pośrednictwem funkcji wywołania zwrotnego StreamingQueryListeneronQueryProgress(). QueryProgressEvent.json() lub toString() uwzględniają dodatkowe metryki trybu w czasie rzeczywistym.

  1. Opóźnienie przetwarzania (processingLatencyMs). Czas upływa między odczytem rekordu przez zapytanie w trybie rzeczywistym a jego zapisaniem w kolejnym etapie lub dalej w dół procesu. W przypadku zapytań jednoetapowych oznacza to ten sam czas trwania co opóźnienie E2E. Ta metryka jest podawana dla każdego zadania.
  2. Źródłowe opóźnienie kolejkowania (sourceQueuingLatencyMs). Czas, który minął od pomyślnego zapisu rekordu w magistrali komunikatów, na przykład czas dołączenia dziennika w systemie Kafka, do momentu pierwszego odczytu rekordu przez zapytanie w trybie czasu rzeczywistego. Ta metryka jest podawana dla każdego zadania.
  3. Opóźnienie E2E (e2eLatencyMs). Czas między pomyślnym zapisaniem rekordu w magistrali komunikatów a zapisaniem rekordu w dalszym etapie przez zapytanie w trybie rzeczywistym. Ta metryka jest agregowana na każdą partię we wszystkich rekordach przetwarzanych przez wszystkie zadania.

Przykład:

"rtmMetrics" : {
    "processingLatencyMs" : {
      "P0" : 0,
      "P50" : 0,
      "P90" : 0,
      "P95" : 0,
      "P99" : 0
    },
    "sourceQueuingLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 3
    },
    "e2eLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 4
    },

Korzystanie z API obserwowania w zadaniach

Interfejs API obserwowania pomaga zmierzyć opóźnienie bez uruchamiania innego zadania. Jeśli masz znacznik czasu, który przybliża czas przybycia danych źródłowych i jest przekazywany przed dotarciem do ujścia, lub jeśli możesz znaleźć sposób na jego przekazanie, możesz oszacować opóźnienie każdej partii za pomocą Observe API.

Python

from datetime import datetime

from pyspark.sql.functions import avg, col, lit, max, percentile_approx, udf, unix_millis
from pyspark.sql.types import TimestampType

@udf(returnType=TimestampType())
def current_timestamp():
  return datetime.now()

# Query before outputting
.withColumn("temp-timestamp", current_timestamp())
.withColumn(
  "latency",
  unix_millis(col("temp-timestamp")).cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  "observedLatency",
  avg(col("latency")).alias("avg"),
  max(col("latency")).alias("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).alias("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).alias("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
# Output part of the query. For example, .WriteStream, etc.

Scala

import org.apache.spark.sql.functions.{avg, col, lit, max, percentile_approx, udf, unix_millis}

val currentTimestampUDF = udf(() => System.currentTimeMillis())

// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
  "latency",
  col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  name = "observedLatency",
  avg(col("latency")).as("avg"),
  max(col("latency")).as("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.

W tym przykładzie bieżący znacznik czasu jest rejestrowany przed wyprowadzeniem wpisu, a opóźnienie jest szacowane przez obliczenie różnicy między tym znacznikiem czasu a znacznikiem czasu źródłowego rekordu. Wyniki są uwzględniane w raportach postępu i udostępniane odbiornikom. Oto przykładowe dane wyjściowe:

"observedMetrics" : {
  "observedLatency" : {
    "avg" : 63.8369765176552,
    "max" : 219,
    "p99" : 154,
    "p50" : 49
  }
}

Co jest obsługiwane?

Environments

Typ klastra Supported
Dedykowany (dawniej: dla pojedynczego użytkownika) Yes
Standardowa (dawniej: udostępnione) No
Lakeflow Spark Deklaratywne potoki klasyczne No
Deklaratywne potoki Lakeflow Spark bezserwerowe No
Serverless No

Languages

Język Supported
Scala Yes
Java Yes
Python Yes

Tryby wykonywania

Tryb wykonywania Supported
Tryb aktualizacji Yes
tryb dołączania No
Tryb ukończenia No

Sources

Sources Supported
Apache Kafka Yes
AWS MSK Yes
Eventhub (przy użyciu łącznika platformy Kafka) Yes
Kinesis Tak (tylko tryb EFO)
Google Pub/Sub (usługa przesyłania wiadomości) No
Apache Pulsar No

Sinks

Sinks Supported
Apache Kafka Yes
Eventhub (przy użyciu łącznika platformy Kafka) Yes
Kinesis No
Google Pub/Sub (usługa przesyłania wiadomości) No
Apache Pulsar No
Dowolne ujścia (przy użyciu elementu forEachWriter) Yes

Operators

Operators Supported
Operacje bezstanowe
  • Selection
Yes
  • Projection
Yes
UDFs
  • Scala UDF
Tak (z pewnymi ograniczeniami)
  • Python UDF
Tak (z pewnymi ograniczeniami)
Aggregation
  • sum
Yes
  • count
Yes
  • max
Yes
  • min
Yes
  • avg
Yes
Funkcje agregacji Yes
Windowing
  • Tumbling
Yes
  • Sliding
Yes
  • Session
No
Deduplication
  • dropDuplicates
Tak (stan jest niezwiązany)
  • dropDuplicatesWithinWatermark
No
Strumień — łączenie tabeli
  • Tabela nadawania (powinna być mała)
Yes
Strumień — przyłączanie do strumienia No
(płaskie)MapGroupsWithState No
transformWithState Tak (z pewnymi różnicami)
związek Tak (z pewnymi ograniczeniami)
forEach Yes
forEachBatch No
mapPartitions Nie (zobacz ograniczenie)

Używanie funkcji transformWithState w trybie czasu rzeczywistego

W przypadku tworzenia niestandardowych aplikacji stanowych usługa Databricks obsługuje API transformWithState w strukturze strumieniowej Apache Spark. Aby uzyskać więcej informacji na temat interfejsu API i przykładów kodu, zapoznaj się z Tworzenie niestandardowej aplikacji stanowej.

Istnieją jednak pewne różnice między działaniem interfejsu API w trybie pracy w czasie rzeczywistym a tradycyjnymi zapytaniami strumieniowymi korzystającymi z architektury mikrosadowej.

  • Metoda w trybie handleInputRows(key: String, inputRows: Iterator[T], timerValues: TimerValues) czasu rzeczywistego jest wywoływana dla każdego wiersza.
    • Iterator inputRows zwraca pojedynczą wartość. W trybie mikrosadowym jest wywoływana raz dla każdego klucza, a inputRows iterator zwraca wszystkie wartości dla klucza w mikropartii.
    • Musisz być świadomy tej różnicy podczas pisania kodu.
  • Czasomierze czasu zdarzeń nie są obsługiwane w trybie czasu rzeczywistego.
  • W trybie czasu rzeczywistego czasomierze są opóźniane w wyzwalaniu w zależności od przybycia danych. W przeciwnym razie, jeśli nie ma danych, zostanie ona wyzwolona na końcu długotrwałej partii. Na przykład, jeśli czasomierz ma się uruchomić o godzinie 10:00:00 i w tym samym czasie nie ma przychodzących danych, to nie zostanie uruchomiony. Zamiast tego, jeśli dane docierają do 10:00:10, czasomierz jest uruchamiany z opóźnieniem 10 sekund. Lub, jeśli żadne dane nie zostaną dostarczone, a długotrwała partia ma zostać zakończona, uruchamia czasomierz przed jej zakończeniem.

Funkcje zdefiniowane przez użytkownika języka Python

Usługa Databricks obsługuje większość funkcji zdefiniowanych przez użytkownika (UDF) języka Python w trybie czasu rzeczywistego:

Typ funkcji zdefiniowanej przez użytkownika Supported
Bezstanowa kontrola funkcji zdefiniowanej przez użytkownika
  • Scalar UDF języka Python (link)
Yes
  • Funkcja zdefiniowanej przez użytkownika strzałki
Yes
  • Skalarna funkcja UDF biblioteki Pandas (link)
Yes
  • Arrow, funkcja (mapInArrow)
Yes
  • Pandas, funkcja (link)
Yes
Stanowe grupowanie UDF (UDAF)
  • transformWithState (UWAGA: tylko Row interfejs)
Yes
  • applyInPandasWithState
No
Niestanowe grupowanie UDF (UDAF)
  • apply
No
  • applyInArrow
No
  • applyInPandas
No
Funkcja tabeli
No
UC UDF No

Istnieje kilka kwestii, które należy wziąć pod uwagę podczas korzystania z funkcji UDF języka Python w trybie czasu rzeczywistego:

  • Aby zminimalizować opóźnienie, skonfiguruj rozmiar partii strzałki (spark.sql.execution.arrow.maxRecordsPerBatch) na 1.
    • Kompromis: ta konfiguracja optymalizuje opóźnienia kosztem przepływności. W przypadku większości obciążeń to ustawienie jest zalecane.
    • Zwiększ rozmiar partii tylko wtedy, gdy wymagana jest większa przepływność w celu uwzględnienia woluminu wejściowego, akceptując potencjalny wzrost opóźnienia.
  • Funkcje i funkcje pandas nie działają dobrze z rozmiarem partii Strzałka 1.
    • Jeśli używasz funkcji zdefiniowanych przez użytkownika lub funkcji biblioteki pandas, ustaw rozmiar partii strzałki na wyższą wartość (na przykład 100 lub wyższą).
    • Należy pamiętać, że oznacza to większe opóźnienie. Usługa Databricks zaleca używanie funkcji UDF strzałki, jeśli jest to możliwe.
  • Ze względu na problem z wydajnością biblioteki pandas funkcja transformWithState jest obsługiwana tylko w interfejsie Row .

Techniki optymalizacji

Technique Domyślnie włączone
Śledzenie postępu asynchronicznego: przenosi zapis do dziennika przesunięć i dziennika zatwierdzeń do wątku asynchronicznego, skracając czas międzypartiowy między dwiema mikropartiami. Może to pomóc zmniejszyć opóźnienie bezstanowych zapytań strumieniowych. No
Asynchroniczne tworzenie punktów kontrolnych stanu: pomaga zmniejszyć opóźnienia w zapytaniach przesyłania strumieniowego stanowego, umożliwiając rozpoczęcie przetwarzania następnej mikropartii natychmiast po zakończeniu obliczeń poprzedniej, bez czekania na tworzenie punktów kontrolnych stanu. No

Limitations

Ograniczenie źródła

W przypadku kinesis tryb sondowania nie jest obsługiwany. Co więcej, częste ponowne partycjonowania mogą mieć negatywny wpływ na opóźnienie.

Ograniczenie unii

W przypadku Unii istnieją pewne ograniczenia:

  • Samodzielna unia nie jest obsługiwana:
    • Kafka: nie można użyć tego samego obiektu ramki danych źródłowej i łączyć z niego pochodnych ramek danych. Obejście: Użyj różnych ramek danych odczytywanych z tego samego źródła.
    • Kinesis: nie można unionować ramek danych pochodzących z tego samego źródła Kinesis z tą samą konfiguracją. Obejście: Oprócz używania różnych ramek danych można przypisać inną opcję "consumerName" do każdej ramki danych.
  • Operatory stanowe (na przykład , aggregate, deduplicatetransformWithState) zdefiniowane przed Unią nie są obsługiwane.
  • Łączenie ze źródłami wsadowymi nie jest obsługiwane.

Ograniczenie usługi MapPartitions

mapPartitions W języku Scala i podobnymi interfejsami API języka Python (mapInPandas, mapInArrow) wykonaj iterator całej partycji wejściowej i utwórz iterator całego danych wyjściowych z dowolnym mapowaniem między danymi wejściowymi i wyjściowymi. Te interfejsy programowania aplikacji (API) mogą powodować problemy z wydajnością w trybie przesyłania strumieniowego w czasie rzeczywistym, blokując cały strumień wyjściowy, co zwiększa opóźnienie. Semantyka tych interfejsów API nie wspiera dobrze propagacji znaków wodnych.

Użyj skalarnych funkcji zdefiniowanych przez użytkownika w połączeniu z przekształcaniem złożonych typów danych lub filter zamiast tego, aby uzyskać podobną funkcjonalność.

Examples

W poniższych przykładach przedstawiono obsługiwane zapytania.

Zapytania bezstanowe

Obsługiwane są zapytania bezstanowe, zarówno pojedyncze, jak i wieloetapowe.

Źródło platformy Kafka do ujścia platformy Kafka

W tym przykładzie odczytujesz ze źródła Kafka i zapisujesz je w odbiorniku Kafka.

Python
query = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("startingOffsets", "earliest")
        .option("subscribe", input_topic)
        .load()
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("topic", output_topic)
        .option("checkpointLocation", checkpoint_location)
        .trigger(realTime="5 minutes")
        .outputMode("update")
        .start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic)
      .load()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply())
      .outputMode(OutputMode.Update())
      .start()

Podział

W tym przykładzie odczytujesz ze źródła platformy Kafka, ponownie podzielisz dane na 20 partycji i zapiszesz je w ujściu platformy Kafka.

Ustaw konfigurację spark.sql.execution.sortBeforeRepartition platformy Spark na false wartość przed użyciem ponownego partycjonowania.

Python
# Sorting is not supported in repartition with real-time mode, so this has to be set to false to achieve low latency.
spark.conf.set("spark.sql.execution.sortBeforeRepartition", "false")

query = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("subscribe", input_topic)
    .option("startingOffsets", "earliest")
    .load()
    .repartition(20)
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("topic", output_topic)
    .option("checkpointLocation", checkpoint_location)
    .trigger(realTime="5 minutes")
    .outputMode("update")
    .start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

// Sorting is not supported in repartition with real-time mode, so this has to be set to false to achieve low latency.
spark.conf.set("spark.sql.execution.sortBeforeRepartition", "false")

spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic)
      .load()
      .repartition(20)
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply())
      .outputMode(OutputMode.Update())
      .start()

Łączenie migawek strumienia (tylko nadawanie)

W tym przykładzie odczytujesz z platformy Kafka, połączysz dane ze statyczną tabelą i zapiszesz je w ujściu platformy Kafka. Należy pamiętać, że obsługiwane są tylko łączenia stream-static, które rozpowszechniają tabelę statyczną, co oznacza, że tabela statyczna powinna mieścić się w pamięci.

Python
from pyspark.sql.functions import broadcast, expr

# We assume the static table in the path `stateic_table_location` has a column 'lookupKey'.

query = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("subscribe", input_topic)
    .option("startingOffsets", "earliest")
    .load()
    .withColumn("joinKey", expr("CAST(value AS STRING)"))
    .join(
        broadcast(spark.read.format("parquet").load(static_table_location)),
        expr("joinKey = lookupKey")
    )
    .selectExpr("value AS key", "value")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("topic", output_topic)
    .option("checkpointLocation", checkpoint_location)
    .trigger(realTime="5 minutes")
    .outputMode("update")
    .start()
)
Scala
import org.apache.spark.sql.functions.{broadcast, expr}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic)
      .load()
      .join(broadcast(spark.read.format("parquet").load(staticTableLocation)), expr("joinKey = lookupKey"))
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply())
      .outputMode(OutputMode.Update())
      .start()

Kinesis źródło ujścia platformy Kafka

W tym przykładzie odczytujesz ze źródła Kinesis i zapisujesz je w ujściu platformy Kafka.

Python
query = (
    spark.readStream
        .format("kinesis")
        .option("region", region_name)
        .option("awsAccessKey", aws_access_key_id)
        .option("awsSecretKey", aws_secret_access_key)
        .option("consumerMode", "efo")
        .option("consumerName", consumer_name)
        .load()
        .selectExpr("parttitionKey AS key", "CAST(data AS STRING) AS value")
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("topic", output_topic)
        .option("checkpointLocation", checkpoint_location)
        .trigger(realTime="5 minutes")
        .outputMode("update")
        .start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

spark.readStream
      .format("kinesis")
      .option("region", regionName)
      .option("awsAccessKey", awsAccessKeyId)
      .option("awsSecretKey", awsSecretAccessKey)
      .option("consumerMode", "efo")
      .option("consumerName", consumerName)
      .load()
      .select(
        col("partitionKey").alias("key"),
        col("data").cast("string").alias("value")
      )
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply())
      .outputMode(OutputMode.Update())
      .start()

Union

W tym przykładzie łączysz dwie ramki danych Kafka z dwóch różnych tematów i zapisujesz je do ujścia Kafka.

Python
df1 = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("startingOffsets", "earliest")
    .option("subscribe", input_topic_1)
    .load()
)

df2 = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("startingOffsets", "earliest")
    .option("subscribe", input_topic_2)
    .load()
)

query = (
    df1.union(df2)
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("topic", output_topic)
    .option("checkpointLocation", checkpoint_location)
    .trigger(realTime="5 minutes")
    .outputMode("update")
    .start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val df1 = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic1)
      .load()

val df2 = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic2)
      .load()

df1.union(df2)
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply())
      .outputMode(OutputMode.Update())
      .start()

Zapytania stanowe

Deduplication

Python
query = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("startingOffsets", "earliest")
    .option("subscribe", input_topic)
    .load()
    .dropDuplicates(["timestamp", "value"])
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("topic", output_topic)
    .option("checkpointLocation", checkpoint_location)
    .trigger(realTime="5 minutes")
    .outputMode("update")
    .start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic)
      .load()
      .dropDuplicates("timestamp", "value")
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply())
      .outputMode(OutputMode.Update())
      .start()

Aggregation

Python
from pyspark.sql.functions import col

query = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("startingOffsets", "earliest")
    .option("subscribe", input_topic)
    .load()
    .groupBy(col("timestamp"), col("value"))
    .count()
    .selectExpr("CAST(value AS STRING) AS key", "CAST(count AS STRING) AS value")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("topic", output_topic)
    .option("checkpointLocation", checkpoint_location)
    .trigger(realTime="5 minutes")
    .outputMode("update")
    .start()
)
Scala
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic)
      .load()
      .groupBy(col("timestamp"), col("value"))
      .count()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply("5 minutes"))
      .outputMode(OutputMode.Update())
      .start()

Łączenie z agregacją

W tym przykładzie najpierw łączysz dwie ramki danych platformy Kafka z dwóch różnych tematów, a następnie wykonujesz agregację. Na koniec zapisujesz do odbiornika Kafka.

Python
from pyspark.sql.functions import col

df1 = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("startingOffsets", "earliest")
    .option("subscribe", input_topic_1)
    .load()
)

df2 = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("startingOffsets", "earliest")
    .option("subscribe", input_topic_2)
    .load()
)

query = (
    df1.union(df2)
    .groupBy(col("timestamp"), col("value"))
    .count()
    .selectExpr("CAST(value AS STRING) AS key", "CAST(count AS STRING) AS value")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("topic", output_topic)
    .option("checkpointLocation", checkpoint_location)
    .trigger(realTime="5 minutes")
    .outputMode("update")
    .start()
)
Scala
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val df1 = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic1)
      .load()

val df2 = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic2)
      .load()

df1.union(df2)
      .groupBy(col("timestamp"), col("value"))
      .count()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply())
      .outputMode(OutputMode.Update())
      .start()

TransformWithState

import org.apache.spark.sql.Encoders
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
import org.apache.spark.sql.streaming.{ListState, MapState, StatefulProcessor, OutputMode, TTLConfig, TimeMode, TimerValues, ValueState}

/**
 * This processor counts the number of records it has seen for each key using state variables
 * with TTLs. It redundantly maintains this count with a value, list, and map state to put load
 * on the state variable cleanup mechanism. (In practice, only one value state is needed to maintain
 * the count for a given grouping key.)
 *
 * The input schema it expects is (String, Long) which represents a (key, source-timestamp) tuple.
 * The source-timestamp is passed through so that we can calculate end-to-end latency. The output
 * schema is (String, Long, Long), which represents a (key, count, source-timestamp) 3-tuple.
 *
 */

class RTMStatefulProcessor(ttlConfig: TTLConfig)
  extends StatefulProcessor[String, (String, Long), (String, Long, Long)] {
  @transient private var _value: ValueState[Long] = _
  @transient private var _map: MapState[Long, String] = _
  @transient private var _list: ListState[String] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    // Counts the number of records this key has seen
    _value = getHandle.getValueState("value", Encoders.scalaLong, ttlConfig)
    _map = getHandle.getMapState("map", Encoders.scalaLong, Encoders.STRING, ttlConfig)
    _list = getHandle.getListState("list", Encoders.STRING, ttlConfig)
  }

  override def handleInputRows(
      key: String,
      inputRows: Iterator[(String, Long)],
      timerValues: TimerValues): Iterator[(String, Long, Long)] = {
    inputRows.map { row =>
      val key = row._1
      val sourceTimestamp = row._2

      val oldValue = _value.get()
      _value.update(oldValue + 1)
      _map.updateValue(oldValue, key)
      _list.appendValue(key)

      (key, oldValue + 1, sourceTimestamp)
    }
  }
}

spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic)
      .load()
      .select(col("key").cast("STRING"), col("value").cast("STRING"), col("timestamp"))
      .as[(String, String, Timestamp)]
      .groupByKey(row => row._1)
      .transformWithState(new RTMStatefulProcessor(TTLConfig(Duration.ofSeconds(30))), TimeMode.ProcessingTime, OutputMode.Update)
      .as[(String, Long, Long)]
      .select(
            col("_1").as("key"),
            col("_2").as("value")
      )
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply("5 minutes"))
      .outputMode(OutputMode.Update())
      .start()

Note

Istnieje różnica między sposobem działania trybu w czasie rzeczywistym a innymi trybami operacyjnymi w Structured Streaming StatefulProcessor w transformWithState. Zobacz Use transformWithState in real-time mode (Używanie transformWithState w trybie czasu rzeczywistego)

TransformWithState (PySpark, interfejs wiersza)

from typing import Iterator, Tuple

from pyspark.sql import Row
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import LongType, StringType, TimestampType, StructField, StructType


class RTMStatefulProcessor(StatefulProcessor):
  """
  This processor counts the number of records it has seen for each key using state variables
  with TTLs. It redundantly maintains this count with a value, list, and map state to put load
  on the state variable cleanup mechanism. (In practice, only one value state is needed to maintain
  the count for a given grouping key.)

  The input schema it expects is (String, Long) which represents a (key, source-timestamp) tuple.
  The source-timestamp is passed through so that we can calculate end-to-end latency. The output
  schema is (String, Long, Long), which represents a (key, count, source-timestamp) 3-tuple.
  """

  def init(self, handle: StatefulProcessorHandle) -> None:
    state_schema = StructType([StructField("value", LongType(), True)])
    self.value_state = handle.getValueState("value", state_schema, 30000)
    map_key_schema = StructType([StructField("key", LongType(), True)])
    map_value_schema = StructType([StructField("value", StringType(), True)])
    self.map_state = handle.getMapState("map", map_key_schema, map_value_schema, 30000)
    list_schema = StructType([StructField("value", StringType(), True)])
    self.list_state = handle.getListState("list", list_schema, 30000)

  def handleInputRows(self, key, rows, timerValues) -> Iterator[Row]:
    for row in rows:
      # row is a tuple (key, source_timestamp)
      key_str = row[0]
      source_timestamp = row[1]
      old_value = value.get()
      if old_value is None:
        old_value = 0
      self.value_state.update((old_value + 1,))
      self.map_state.update((old_value,), (key_str,))
      self.list_state.appendValue((key_str,))
      yield Row(key=key_str, value=old_value + 1, timestamp=source_timestamp)

  def close(self) -> None:
    pass


output_schema = StructType(
  [
    StructField("key", StringType(), True),
    StructField("value", LongType(), True),
    StructField("timestamp", TimestampType(), True),
  ]
)

query = (
  spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", broker_address)
  .option("subscribe", input_topic)
  .load()
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
  .groupBy("key")
  .transformWithState(
    statefulProcessor=RTMStatefulProcessor(),
    outputStructType=output_schema,
    outputMode="Update",
    timeMode="processingTime",
  )
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", broker_address)
  .option("topic", output_topic)
  .option("checkpointLocation", checkpoint_location)
  .trigger(realTime="5 minutes")
  .outputMode("Update")
  .start()
)

Note

Istnieje różnica między tym, jak tryb czasu rzeczywistego i inne tryby wykonywania w przesyłania strumieniowego ze strukturą uruchamiają StatefulProcessor element w transformWithStateprogramie . Zobacz Use transformWithState in real-time mode (Używanie transformWithState w trybie czasu rzeczywistego)

Sinks

Pisanie do bazy danych Postgres za pośrednictwem foreachSink

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.spark.sql.{ForeachWriter, Row}

/**
 * Groups connection properties for
 * the JDBC writers.
 *
 * @param url JDBC url of the form jdbc:subprotocol:subname to connect to
 * @param dbtable database table that should be written into
 * @param username username for authentication
 * @param password password for authentication
 */
class JdbcWriterConfig(
    val url: String,
    val dbtable: String,
    val username: String,
    val password: String,
) extends Serializable

/**
 * Handles streaming data writes to a database sink via JDBC, by:
 *   - connecting to the database
 *   - buffering incoming data rows in batches to reduce write overhead
 *
 * @param config connection parameters and configuration knobs for the writer
 */
class JdbcStreamingDataWriter(config: JdbcWriterConfig)
  extends ForeachWriter[Row] with Serializable {
  // The writer currently only supports this hard-coded schema
  private val UPSERT_STATEMENT_SQL =
    s"""MERGE INTO "${config.dbtable}"
       |USING (
       |  SELECT
       |    CAST(? AS INTEGER) AS "id",
       |    CAST(? AS CHARACTER VARYING) AS "data"
       |) AS "source"
       |ON "test"."id" = "source"."id"
       |WHEN MATCHED THEN
       |  UPDATE SET "data" = "source"."data"
       |WHEN NOT MATCHED THEN
       |  INSERT ("id", "data") VALUES ("source"."id", "source"."data")
       |""".stripMargin

  private val MAX_BUFFER_SIZE = 3
  private val buffer = new Array[Row](MAX_BUFFER_SIZE)
  private var bufferSize = 0

  private var connection: Connection = _

  /**
   * Flushes the [[buffer]] by writing all rows in the buffer to the database.
   */
  private def flushBuffer(): Unit = {
    require(connection != null)

    if (bufferSize == 0) {
      return
    }

    var upsertStatement: PreparedStatement = null

    try {
      upsertStatement = connection.prepareStatement(UPSERT_STATEMENT_SQL)

      for (i <- 0 until bufferSize) {
        val row = buffer(i)
        upsertStatement.setInt(1, row.getAs[String]("key"))
        upsertStatement.setString(2, row.getAs[String]("value"))
        upsertStatement.addBatch()
      }

      upsertStatement.executeBatch()
      connection.commit()

      bufferSize = 0
    } catch { case e: Exception =>
      if (connection != null) {
        connection.rollback()
      }
      throw e
    } finally {
      if (upsertStatement != null) {
        upsertStatement.close()
      }
    }
  }

  override def open(partitionId: Long, epochId: Long): Boolean = {
    connection = DriverManager.getConnection(config.url, config.username, config.password)
    true
  }

  override def process(row: Row): Unit = {
    buffer(bufferSize) = row
    bufferSize += 1
    if (bufferSize >= MAX_BUFFER_SIZE) {
      flushBuffer()
    }
  }

  override def close(errorOrNull: Throwable): Unit = {
    flushBuffer()
    if (connection != null) {
      connection.close()
      connection = null
    }
  }
}


spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
      .option("subscribe", inputTopic)
      .load()
      .writeStream
      .outputMode(OutputMode.Update())
      .trigger(defaultTrigger)
      .foreach(new JdbcStreamingDataWriter(new JdbcWriterConfig(jdbcUrl, tableName, jdbcUsername, jdbcPassword)))
      .start()

Display

Important

Ta funkcja jest dostępna w środowisku Databricks Runtime 17.1 lub nowszym.

Źródło szybkości wyświetlania

W tym przykładzie odczytujesz z źródła danych i wyświetlasz Streaming DataFrame w notatniku.

Python
inputDF = (
  spark
  .readStream
  .format("rate")
  .option("numPartitions", 2)
  .option("rowsPerSecond", 1)
  .load()
)
display(inputDF, realTime="5 minutes", outputMode="update")
Scala
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.streaming.OutputMode

val inputDF = spark
  .readStream
  .format("rate")
  .option("numPartitions", 2)
  .option("rowsPerSecond", 1)
  .load()
display(inputDF, trigger=Trigger.RealTime(), outputMode=OutputMode.Update())