Notatka
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Ważna
Tryb czasu rzeczywistego w Lakeflow Spark Declarative Pipelines jest dostępny w publicznej wersji zapoznawczej w środowisku Databricks Runtime 18.1.2 w kanale preview.
Tryb czasu rzeczywistego umożliwia przetwarzanie danych o bardzo małych opóźnieniach z opóźnieniem końcowym nawet o pięciu milisekundach. Używaj trybu czasu rzeczywistego dla obciążeń operacyjnych, które wymagają natychmiastowej reakcji na dane przesyłane strumieniowo, takie jak wykrywanie oszustw i personalizacja w czasie rzeczywistym.
Tryb czasu rzeczywistego jest również dostępny bezpośrednio w Structured Streaming, poza potokami. Zobacz Tryb czasu rzeczywistego w Strukturalnym Przetwarzaniu Strumieniowym.
Jak tryb czasu rzeczywistego osiąga małe opóźnienie
Tryb czasu rzeczywistego różni się od standardowego ciągłego przetwarzania na trzy kluczowe sposoby:
- Długotrwałe partie: system przetwarza dane, gdy staną się dostępne w źródle w długotrwałych partiach (wartość domyślna to pięć minut).
- Jednoczesne planowanie etapów: wszystkie etapy zapytań są zaplanowane w tym samym czasie. Zasób obliczeniowy musi mieć wystarczającą ilość dostępnych miejsc zadań, aby uwzględnić wszystkie etapy jednocześnie. Zobacz Ustalanie rozmiaru zasobów obliczeniowych.
- Tasowanie strumieniowe: dane są przekazywane między etapami zaraz po ich wytworzeniu, zamiast czekać na zakończenie wcześniejszego etapu przed rozpoczęciem kolejnego.
Interwał punktu kontrolnego (skonfigurowany za pośrednictwem pipelines.trigger.interval) określa, jak często są utrwalane przesunięcia stanu i źródła w magazynie trwałym. Dłuższe interwały zmniejszają obciążenie punktów kontrolnych, ale zwiększają czas odzyskiwania po wystąpieniu awarii i opóźnieniu raportowania metryk. Krótsze interwały zwiększają trwałość, ale zwiększają obciążenie.
Tryb czasu rzeczywistego i ciągłe potoki przetwarzania
Tryb czasu rzeczywistego to wyspecjalizowany typ wyzwalacza ciągłego. Tryb ciągły jest nadal wymagany — tryb czasu rzeczywistego jedynie dodaje optymalizacje opóźnień na poziomie przepływu. Aby użyć trybu czasu rzeczywistego, potok musi najpierw działać w trybie ciągłym. Tryb czasu rzeczywistego stosuje następnie dodatkowe optymalizacje na poziomie przepływu, aby osiągnąć opóźnienie poniżej jednej sekundy, wykraczające poza to, co zapewnia standardowe przetwarzanie ciągłe.
Włączenie trybu w czasie rzeczywistym wymaga trzech kroków konfiguracji:
- Ustaw potok na tryb ciągły.
- Włącz tryb czasu rzeczywistego na poziomie pipeline’u.
- Zdefiniuj przepływ aktualizacji w czasie rzeczywistym.
Requirements
| Requirement | Value |
|---|---|
| Databricks Runtime | 18.1.2 w kanale SDP w wersji zapoznawczej |
| Typ środowiska obliczeniowego | Klasyczne obliczenia lub bezserwerowe |
Konfigurowanie trybu czasu rzeczywistego
Krok 1: Ustaw potok na tryb ciągły
W ustawieniach potoku ustaw parametr Tryb potoku na Ciągły, lub ustaw go w kodzie JSON potoku:
{
"continuous": true
}
Krok 2: Włącz tryb czasu rzeczywistego na poziomie potoku
W ustawieniach potoku dodaj następujący klucz do konfiguracji Spark w sekcji Zaawansowana > konfiguracja Spark:
spark.databricks.streaming.realTimeMode.enabled = true
Możesz też ustawić to w kodzie JSON potoku:
{
"continuous": true,
"spark_conf": {
"spark.databricks.streaming.realTimeMode.enabled": "true"
}
}
Krok 3. Definiowanie przepływu aktualizacji w czasie rzeczywistym
Tryb czasu rzeczywistego wymaga przepływu aktualizacji. Użyj dp.create_sink(), aby zdefiniować cel wyjściowy, a następnie użyj dekoratora @dp.update_flow z parametrem pipelines.trigger ustawionym na "RealTime" oraz z parametrem target wskazującym ujście.
from pyspark import pipelines as dp
# Define the output sink
dp.create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "<bootstrap-servers>",
"topic": "<output-topic>",
}
)
# Define the real-time update flow targeting the sink
@dp.update_flow(
name="my_rtm_flow",
target="my_kafka_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes", # optional; defaults to 5 minutes
}
)
def my_real_time_flow():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<bootstrap-servers>")
.option("subscribe", "<input-topic>")
.load()
)
Parametry konfiguracji na poziomie przepływu:
| Parameter | Wymagane | Domyślny | Description |
|---|---|---|---|
pipelines.trigger |
Yes | — | Ustaw na "RealTime", aby włączyć tryb czasu rzeczywistego dla tego przepływu. |
pipelines.trigger.interval |
No | "5 minutes" |
Interwał punktu kontrolnego. Określa, jak często zapisywane są stan i przesunięcia. Krótsze wartości zwiększają możliwości odzyskiwania; dłuższe wartości zmniejszają obciążenie. |
Przykłady kodu
Kafka do Kafka
Odczyt z tematu Kafka i zapis do wyjściowego miejsca docelowego Kafka:
from pyspark import pipelines as dp
dp.create_sink("kafka_output_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="kafka_rtm_flow",
target="kafka_output_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
}
)
def kafka_rtm_flow():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.option("startingOffsets", "latest")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
)
Wzbogać za pomocą złączenia rozgłoszeniowego
Połącz strumień Kafka ze statyczną tabelą wyszukiwania. Obsługiwane są tylko sprzężenia emisji (strumieniowo-statyczne). Łączenia między strumieniami nie są obsługiwane w trybie czasu rzeczywistego.
from pyspark import pipelines as dp
from pyspark.sql.functions import broadcast, expr
dp.create_sink("enriched_output_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": enriched_output_topic,
})
@dp.update_flow(
name="enriched_events_flow",
target="enriched_output_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
}
)
def enriched_events():
lookup = spark.read.table("catalog.schema.lookup_table")
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.withColumn("event_key", expr("CAST(value AS STRING)"))
.join(broadcast(lookup), expr("event_key = lookup_key"))
.select("event_key", "lookup_value", "timestamp")
)
Agregacja
Zlicz zdarzenia przy użyciu klucza stanowego groupBy. Ustaw spark.sql.shuffle.partitions wartość zgodną z liczbą partycji wejściowych dla operacji stanowych:
from pyspark import pipelines as dp
from pyspark.sql.functions import col
dp.create_sink("event_counts_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="event_counts_flow",
target="event_counts_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
"spark.sql.shuffle.partitions": "8",
}
)
def event_counts():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.selectExpr("CAST(key AS STRING) AS event_type", "timestamp")
.groupBy(col("event_type"))
.count()
)
Obsługiwane źródła i odbiorniki
| Connector | Jako źródło | Jako ujście | Notatki |
|---|---|---|---|
| Apache Kafka | ✓ | ✓ | — |
| AWS MSK | ✓ | ✓ | Używa interfejsu zgodnego z platformą Kafka. |
| Azure Event Hubs (konektor Kafka) | ✓ | ✓ | Używa interfejsu zgodnego z platformą Kafka. |
| Amazon Kinesis | ✓ | Niewspierane | Używać tylko w trybie EFO (rozszerzony fan-out). |
| Delta | Niewspierane | Niewspierane | — |
Obliczanie rozmiaru
Jeśli zasób obliczeniowy ma wystarczającą liczbę slotów zadań, możesz uruchomić jeden potok przetwarzania w czasie rzeczywistym na każdy zasób obliczeniowy. Dostępne sloty zadań muszą obejmować wszystkie zadania we wszystkich etapach wykonywania zapytań.
| Typ potoku | Configuration | Wymagane miejsca zadań |
|---|---|---|
| Jednostopniowy, bezstanowy proces (źródło i odbiornik Kafka) |
maxPartitions = 8 |
8 |
| Dwuetapowe stanowiskowe (źródło Kafka + shuffle) |
maxPartitions = 8, partycje mieszania = 20 |
28 (8 + 20) |
| Trzystopniowy (źródło Kafka + dwa przetasowania) |
maxPartitions = 8, dwa etapy mieszania, każdy po 20 elementów |
48 (8 + 20 + 20) |
Jeśli nie ustawisz maxPartitions, użyj liczby partycji w temacie w Kafka.
Obsługa operatora
| Kategoria | Operator | Supported |
|---|---|---|
| Bezstanowa | Wybór, projekcja | ✓ |
| UDFs | Scala UDF | √ (z ograniczeniami) |
| UDFs | Funkcja zdefiniowana przez użytkownika w języku Python | √ (z ograniczeniami) |
| Agregacja | suma, liczba, maksimum, minimum, średnia | ✓ |
| Windowing | Obracanie, przesuwanie | ✓ |
| Windowing | Session | Niewspierane |
| Deduplication | dropDuplicates |
√ (stan niezwiązany) |
| Deduplication | dropDuplicatesWithinWatermark |
Niewspierane |
| Joins | Łączenie tabeli przez rozgłoszenie | ✓ |
| Joins | Łączenie strumienia ze strumieniem | Niewspierane |
| Na zamówienie | transformWithState |
√ (z różnicami behawioralnymi) |
| Na zamówienie | union |
√ (z ograniczeniami) |
| Na zamówienie | forEach |
Niewspierane |
| Na zamówienie | flatMapGroupsWithState |
Niewspierane |
| Na zamówienie | mapPartitions |
Niewspierane |
| Na zamówienie | forEachBatch |
Niewspierane |
transformWithState w trybie czasu rzeczywistego
transformWithState jest obsługiwany w trybie czasu rzeczywistego z następującymi różnicami względem przetwarzania w mikropartiach:
-
handleInputRowsjest wywoływany raz na wiersz, a nie raz dla każdego klucza w każdej partii. IteratorinputRowszwraca jedną wartość przy każdym wywołaniu. - Czasomierze czasu zdarzeń nie są obsługiwane. Czasomierze przetwarzania są uruchamiane, gdy długotrwała partia kończy się, jeśli żadne dane nie dotarły.
-
transformWithStateInPandasnie jest obsługiwana.
UDF-y Pandas w czasie rzeczywistym
Aby zminimalizować opóźnienia przy użyciu funkcji UDF biblioteki pandas, ustaw spark.sql.execution.arrow.maxRecordsPerBatch na 1. Pozwala to zoptymalizować pod kątem opóźnień kosztem przepływności. Jeśli przepływność jest również ważna, ustaw tę wartość na 100 lub większą.
Monitorowanie wydajności trybu w czasie rzeczywistym
Tryb czasu rzeczywistego wyświetla metryki opóźnień w StreamingQueryProgress pod polem latencies. Uzyskaj dostęp do tych metryk przez element StreamingQueryListener lub sprawdzając właściwość lastProgress w zapytaniu strumieniowym.
| Metric | Description |
|---|---|
processingLatencyMs |
Czas między momentem odczytu rekordu przez przepływ a jego pełnym przetworzeniem przez przepływ |
sourceQueuingLatencyMs |
Czas między pomyślnym zapisaniem rekordu do magistrali komunikatów (na przykład czasem dopisania do logu w Kafka) a jego pierwszym odczytem przez przepływ |
e2eLatencyMs |
Łączne całkowite opóźnienie od momentu utworzenia rekordu w źródle do momentu pełnego przetworzenia go przez przepływ |
Każda metryka jest podawana w percentylach p50, p90, p95 i p99.
Ograniczenia
Zalecany jest jeden przepływ w czasie rzeczywistym na potok. Dozwolone jest wiele przepływów, ale rywalizacja o miejsca zadań w przepływach zwiększa opóźnienie.
Aby uzyskać pełną listę ograniczeń operatora i źródła, zobacz Ograniczenia trybu w czasie rzeczywistym.