Korzystanie z trybu czasu rzeczywistego w Lakeflow Spark Declarative Pipelines

Ważna

Tryb czasu rzeczywistego w Lakeflow Spark Declarative Pipelines jest dostępny w publicznej wersji zapoznawczej w środowisku Databricks Runtime 18.1.2 w kanale preview.

Tryb czasu rzeczywistego umożliwia przetwarzanie danych o bardzo małych opóźnieniach z opóźnieniem końcowym nawet o pięciu milisekundach. Używaj trybu czasu rzeczywistego dla obciążeń operacyjnych, które wymagają natychmiastowej reakcji na dane przesyłane strumieniowo, takie jak wykrywanie oszustw i personalizacja w czasie rzeczywistym.

Tryb czasu rzeczywistego jest również dostępny bezpośrednio w Structured Streaming, poza potokami. Zobacz Tryb czasu rzeczywistego w Strukturalnym Przetwarzaniu Strumieniowym.

Jak tryb czasu rzeczywistego osiąga małe opóźnienie

Tryb czasu rzeczywistego różni się od standardowego ciągłego przetwarzania na trzy kluczowe sposoby:

  • Długotrwałe partie: system przetwarza dane, gdy staną się dostępne w źródle w długotrwałych partiach (wartość domyślna to pięć minut).
  • Jednoczesne planowanie etapów: wszystkie etapy zapytań są zaplanowane w tym samym czasie. Zasób obliczeniowy musi mieć wystarczającą ilość dostępnych miejsc zadań, aby uwzględnić wszystkie etapy jednocześnie. Zobacz Ustalanie rozmiaru zasobów obliczeniowych.
  • Tasowanie strumieniowe: dane są przekazywane między etapami zaraz po ich wytworzeniu, zamiast czekać na zakończenie wcześniejszego etapu przed rozpoczęciem kolejnego.

Interwał punktu kontrolnego (skonfigurowany za pośrednictwem pipelines.trigger.interval) określa, jak często są utrwalane przesunięcia stanu i źródła w magazynie trwałym. Dłuższe interwały zmniejszają obciążenie punktów kontrolnych, ale zwiększają czas odzyskiwania po wystąpieniu awarii i opóźnieniu raportowania metryk. Krótsze interwały zwiększają trwałość, ale zwiększają obciążenie.

Tryb czasu rzeczywistego i ciągłe potoki przetwarzania

Tryb czasu rzeczywistego to wyspecjalizowany typ wyzwalacza ciągłego. Tryb ciągły jest nadal wymagany — tryb czasu rzeczywistego jedynie dodaje optymalizacje opóźnień na poziomie przepływu. Aby użyć trybu czasu rzeczywistego, potok musi najpierw działać w trybie ciągłym. Tryb czasu rzeczywistego stosuje następnie dodatkowe optymalizacje na poziomie przepływu, aby osiągnąć opóźnienie poniżej jednej sekundy, wykraczające poza to, co zapewnia standardowe przetwarzanie ciągłe.

Włączenie trybu w czasie rzeczywistym wymaga trzech kroków konfiguracji:

  1. Ustaw potok na tryb ciągły.
  2. Włącz tryb czasu rzeczywistego na poziomie pipeline’u.
  3. Zdefiniuj przepływ aktualizacji w czasie rzeczywistym.

Requirements

Requirement Value
Databricks Runtime 18.1.2 w kanale SDP w wersji zapoznawczej
Typ środowiska obliczeniowego Klasyczne obliczenia lub bezserwerowe

Konfigurowanie trybu czasu rzeczywistego

Krok 1: Ustaw potok na tryb ciągły

W ustawieniach potoku ustaw parametr Tryb potoku na Ciągły, lub ustaw go w kodzie JSON potoku:

{
  "continuous": true
}

Krok 2: Włącz tryb czasu rzeczywistego na poziomie potoku

W ustawieniach potoku dodaj następujący klucz do konfiguracji Spark w sekcji Zaawansowana > konfiguracja Spark:

spark.databricks.streaming.realTimeMode.enabled = true

Możesz też ustawić to w kodzie JSON potoku:

{
  "continuous": true,
  "spark_conf": {
    "spark.databricks.streaming.realTimeMode.enabled": "true"
  }
}

Krok 3. Definiowanie przepływu aktualizacji w czasie rzeczywistym

Tryb czasu rzeczywistego wymaga przepływu aktualizacji. Użyj dp.create_sink(), aby zdefiniować cel wyjściowy, a następnie użyj dekoratora @dp.update_flow z parametrem pipelines.trigger ustawionym na "RealTime" oraz z parametrem target wskazującym ujście.

from pyspark import pipelines as dp

# Define the output sink
dp.create_sink(
    "my_kafka_sink",
    "kafka",
    {
        "kafka.bootstrap.servers": "<bootstrap-servers>",
        "topic": "<output-topic>",
    }
)

# Define the real-time update flow targeting the sink
@dp.update_flow(
    name="my_rtm_flow",
    target="my_kafka_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",  # optional; defaults to 5 minutes
    }
)
def my_real_time_flow():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", "<bootstrap-servers>")
            .option("subscribe", "<input-topic>")
            .load()
    )

Parametry konfiguracji na poziomie przepływu:

Parameter Wymagane Domyślny Description
pipelines.trigger Yes Ustaw na "RealTime", aby włączyć tryb czasu rzeczywistego dla tego przepływu.
pipelines.trigger.interval No "5 minutes" Interwał punktu kontrolnego. Określa, jak często zapisywane są stan i przesunięcia. Krótsze wartości zwiększają możliwości odzyskiwania; dłuższe wartości zmniejszają obciążenie.

Przykłady kodu

Kafka do Kafka

Odczyt z tematu Kafka i zapis do wyjściowego miejsca docelowego Kafka:

from pyspark import pipelines as dp

dp.create_sink("kafka_output_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": output_topic,
})

@dp.update_flow(
    name="kafka_rtm_flow",
    target="kafka_output_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",
    }
)
def kafka_rtm_flow():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .option("startingOffsets", "latest")
            .load()
            .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
    )

Wzbogać za pomocą złączenia rozgłoszeniowego

Połącz strumień Kafka ze statyczną tabelą wyszukiwania. Obsługiwane są tylko sprzężenia emisji (strumieniowo-statyczne). Łączenia między strumieniami nie są obsługiwane w trybie czasu rzeczywistego.

from pyspark import pipelines as dp
from pyspark.sql.functions import broadcast, expr

dp.create_sink("enriched_output_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": enriched_output_topic,
})

@dp.update_flow(
    name="enriched_events_flow",
    target="enriched_output_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",
    }
)
def enriched_events():
    lookup = spark.read.table("catalog.schema.lookup_table")
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .load()
            .withColumn("event_key", expr("CAST(value AS STRING)"))
            .join(broadcast(lookup), expr("event_key = lookup_key"))
            .select("event_key", "lookup_value", "timestamp")
    )

Agregacja

Zlicz zdarzenia przy użyciu klucza stanowego groupBy. Ustaw spark.sql.shuffle.partitions wartość zgodną z liczbą partycji wejściowych dla operacji stanowych:

from pyspark import pipelines as dp
from pyspark.sql.functions import col

dp.create_sink("event_counts_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": output_topic,
})

@dp.update_flow(
    name="event_counts_flow",
    target="event_counts_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",
        "spark.sql.shuffle.partitions": "8",
    }
)
def event_counts():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .load()
            .selectExpr("CAST(key AS STRING) AS event_type", "timestamp")
            .groupBy(col("event_type"))
            .count()
    )

Obsługiwane źródła i odbiorniki

Connector Jako źródło Jako ujście Notatki
Apache Kafka
AWS MSK Używa interfejsu zgodnego z platformą Kafka.
Azure Event Hubs (konektor Kafka) Używa interfejsu zgodnego z platformą Kafka.
Amazon Kinesis Niewspierane Używać tylko w trybie EFO (rozszerzony fan-out).
Delta Niewspierane Niewspierane

Obliczanie rozmiaru

Jeśli zasób obliczeniowy ma wystarczającą liczbę slotów zadań, możesz uruchomić jeden potok przetwarzania w czasie rzeczywistym na każdy zasób obliczeniowy. Dostępne sloty zadań muszą obejmować wszystkie zadania we wszystkich etapach wykonywania zapytań.

Typ potoku Configuration Wymagane miejsca zadań
Jednostopniowy, bezstanowy proces (źródło i odbiornik Kafka) maxPartitions = 8 8
Dwuetapowe stanowiskowe (źródło Kafka + shuffle) maxPartitions = 8, partycje mieszania = 20 28 (8 + 20)
Trzystopniowy (źródło Kafka + dwa przetasowania) maxPartitions = 8, dwa etapy mieszania, każdy po 20 elementów 48 (8 + 20 + 20)

Jeśli nie ustawisz maxPartitions, użyj liczby partycji w temacie w Kafka.

Obsługa operatora

Kategoria Operator Supported
Bezstanowa Wybór, projekcja
UDFs Scala UDF √ (z ograniczeniami)
UDFs Funkcja zdefiniowana przez użytkownika w języku Python √ (z ograniczeniami)
Agregacja suma, liczba, maksimum, minimum, średnia
Windowing Obracanie, przesuwanie
Windowing Session Niewspierane
Deduplication dropDuplicates √ (stan niezwiązany)
Deduplication dropDuplicatesWithinWatermark Niewspierane
Joins Łączenie tabeli przez rozgłoszenie
Joins Łączenie strumienia ze strumieniem Niewspierane
Na zamówienie transformWithState √ (z różnicami behawioralnymi)
Na zamówienie union √ (z ograniczeniami)
Na zamówienie forEach Niewspierane
Na zamówienie flatMapGroupsWithState Niewspierane
Na zamówienie mapPartitions Niewspierane
Na zamówienie forEachBatch Niewspierane

transformWithState w trybie czasu rzeczywistego

transformWithState jest obsługiwany w trybie czasu rzeczywistego z następującymi różnicami względem przetwarzania w mikropartiach:

  • handleInputRows jest wywoływany raz na wiersz, a nie raz dla każdego klucza w każdej partii. Iterator inputRows zwraca jedną wartość przy każdym wywołaniu.
  • Czasomierze czasu zdarzeń nie są obsługiwane. Czasomierze przetwarzania są uruchamiane, gdy długotrwała partia kończy się, jeśli żadne dane nie dotarły.
  • transformWithStateInPandas nie jest obsługiwana.

UDF-y Pandas w czasie rzeczywistym

Aby zminimalizować opóźnienia przy użyciu funkcji UDF biblioteki pandas, ustaw spark.sql.execution.arrow.maxRecordsPerBatch na 1. Pozwala to zoptymalizować pod kątem opóźnień kosztem przepływności. Jeśli przepływność jest również ważna, ustaw tę wartość na 100 lub większą.

Monitorowanie wydajności trybu w czasie rzeczywistym

Tryb czasu rzeczywistego wyświetla metryki opóźnień w StreamingQueryProgress pod polem latencies. Uzyskaj dostęp do tych metryk przez element StreamingQueryListener lub sprawdzając właściwość lastProgress w zapytaniu strumieniowym.

Metric Description
processingLatencyMs Czas między momentem odczytu rekordu przez przepływ a jego pełnym przetworzeniem przez przepływ
sourceQueuingLatencyMs Czas między pomyślnym zapisaniem rekordu do magistrali komunikatów (na przykład czasem dopisania do logu w Kafka) a jego pierwszym odczytem przez przepływ
e2eLatencyMs Łączne całkowite opóźnienie od momentu utworzenia rekordu w źródle do momentu pełnego przetworzenia go przez przepływ

Każda metryka jest podawana w percentylach p50, p90, p95 i p99.

Ograniczenia

Zalecany jest jeden przepływ w czasie rzeczywistym na potok. Dozwolone jest wiele przepływów, ale rywalizacja o miejsca zadań w przepływach zwiększa opóźnienie.

Aby uzyskać pełną listę ograniczeń operatora i źródła, zobacz Ograniczenia trybu w czasie rzeczywistym.

Dodatkowe zasoby