Udostępnij za pośrednictwem


Optymalizowanie i monitorowanie wydajności zapytań w trybie rzeczywistym

Na tej stronie opisano dostrajanie zasobów obliczeniowych, techniki zmniejszania całkowitego opóźnienia oraz podejścia do mierzenia wydajności zapytań w trybie czasu rzeczywistego.

Dostrajanie zasobów obliczeniowych

Podczas konfigurowania obliczeń należy wziąć pod uwagę następujące kwestie:

  • W przeciwieństwie do trybu mikrosadowego, zadania w czasie rzeczywistym mogą pozostawać bezczynne podczas oczekiwania na dane, więc odpowiednie dopasowanie rozmiaru jest niezbędne, aby uniknąć marnowania zasobów.
  • Celuj w celu osiągnięcia docelowego poziomu wykorzystania klastra, takiego jak 50%, przez dostrajanie:
    • maxPartitions (dla platformy Kafka)
    • spark.sql.shuffle.partitions (dla etapów tasowania)
  • Usługa Databricks zaleca ustawienie maxPartitions tak, aby każde zadanie obsługiwało wiele partycji platformy Kafka w celu zmniejszenia obciążenia.
  • Dostosuj gniazda zadań przypadające na pracownika, aby dopasować obciążenie pracą do prostych zadań jednofazowych.
  • W przypadku zadań wymagających intensywnego mieszania eksperymentuj, aby znaleźć minimalną liczbę partycji mieszania, które unikają opóźnień, a następnie dostosuj wartość początkową. Obliczenia nie będą planować zadania, jeśli nie ma wystarczającej liczby miejsc.

Uwaga / Notatka

W środowisku Databricks Runtime 16.4 LTS i nowszym wszystkie rurki w czasie rzeczywistym używają punktów kontrolnych w wersji 2, aby umożliwić płynne przełączanie z trybów czasu rzeczywistego na mikropartii.

Optymalizacja opóźnienia

Zorganizowane przesyłanie strumieniowe w trybie rzeczywistym oferuje opcjonalne techniki zmniejszania opóźnienia od źródła do odbiorcy. Żadna z nich nie jest domyślnie włączona. Należy je włączyć oddzielnie.

  • Śledzenie postępu asynchronicznego: przenosi zapisy do dzienników przesunięcia i zatwierdzania do wątku asynchronicznego, co zmniejsza czas międzysadowy dla zapytań bezstanowych.
  • Asynchroniczne tworzenie punktów kontrolnych stanu: rozpoczyna przetwarzanie następnej mikropartii natychmiast po zakończeniu obliczeń, bez potrzeby oczekiwania na tworzenie punktów kontrolnych stanu, co zmniejsza opóźnienie zapytań stanowych.

Monitorowanie i obserwowanie

W trybie czasu rzeczywistego tradycyjne metryki czasu trwania partii nie odzwierciedlają rzeczywistego opóźnienia kompleksowego. Skorzystaj z poniższych metod, aby dokładnie zmierzyć opóźnienie i zidentyfikować wąskie gardła w zapytaniach.

Kompleksowe opóźnienie jest specyficzne dla obciążenia, a czasami może być dokładnie mierzone tylko za pomocą logiki biznesowej. Jeśli na przykład źródłowy znacznik czasu jest przesyłany w Kafka, możesz obliczyć opóźnienie jako różnicę między sygnaturą czasową danych wyjściowych w Kafka a sygnaturą czasową źródła.

Wbudowane metryki z StreamingQueryProgress

Zdarzenie StreamingQueryProgress jest automatycznie rejestrowane w dziennikach sterowników i dostępne za pośrednictwem funkcji wywołania zwrotnego StreamingQueryListener.onQueryProgress() Dzięki temu można programowo reagować na zdarzenia postępu, na przykład jeśli chcesz opublikować metryki w zewnętrznym systemie monitorowania. QueryProgressEvent.json() lub toString() obejmują te metryki trybu w czasie rzeczywistym:

  1. Opóźnienie przetwarzania (processingLatencyMs). Czas, który upłynął między momentem, gdy zapytanie w trybie rzeczywistym odczytuje rekord, a momentem, gdy zapytanie zapisuje go w kolejnym etapie przetwarzania. W przypadku zapytań jednoetapowych mierzy ten sam czas trwania co opóźnienie od początku do końca. System raportuje tę metrykę na każde zadanie.
  2. Opóźnienie kolejkowania źródłowego (sourceQueuingLatencyMs). Czas, jaki upłynął między zapisem rekordu przez system w magistrali komunikatów — na przykład czas dołączenia dziennika w Kafce — a momentem, kiedy zapytanie w trybie czasu rzeczywistego po raz pierwszy odczytuje rekord. System raportuje tę metrykę na każde zadanie.
  3. Opóźnienie end-to-end (e2eLatencyMs). Czas między momentem, kiedy system zapisuje rekord do magistrali komunikatów, a momentem, kiedy zapytanie w trybie rzeczywistym zapisuje rekord dalej w procesie. System agreguje tę metrykę na każdą partię we wszystkich rekordach przetwarzanych przez wszystkie zadania.

Przykład:

"rtmMetrics" : {
    "processingLatencyMs" : {
      "P0" : 0,
      "P50" : 0,
      "P90" : 0,
      "P95" : 0,
      "P99" : 0
    },
    "sourceQueuingLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 3
    },
    "e2eLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 4
    }
}

Niestandardowy pomiar latencji za pomocą API Observe

Interfejs API Observer umożliwia pomiar opóźnienia na miejscu bez uruchamiania oddzielnego zadania. Jeśli masz znacznik czasu źródła, który przybliża czas przybycia danych źródłowych, możesz oszacować opóźnienie na partię, rejestrując znacznik czasu przed miejscem docelowym i obliczając różnicę. Wyniki są wyświetlane w raportach postępów i są dostępne dla słuchaczy.

Python

from datetime import datetime

from pyspark.sql.functions import avg, col, lit, max, percentile_approx, udf, unix_millis
from pyspark.sql.types import TimestampType

@udf(returnType=TimestampType())
def current_timestamp():
  return datetime.now()

# Query before outputting
.withColumn("temp-timestamp", current_timestamp())
.withColumn(
  "latency",
  unix_millis(col("temp-timestamp")).cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  "observedLatency",
  avg(col("latency")).alias("avg"),
  max(col("latency")).alias("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).alias("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).alias("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
# Output part of the query. For example, .WriteStream, etc.

Scala - język programowania

import org.apache.spark.sql.functions.{avg, col, lit, max, percentile_approx, udf, unix_millis}

val currentTimestampUDF = udf(() => System.currentTimeMillis())

// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
  "latency",
  col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  name = "observedLatency",
  avg(col("latency")).as("avg"),
  max(col("latency")).as("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.

Przykładowe dane wyjściowe:

"observedMetrics" : {
  "observedLatency" : {
    "avg" : 63.8369765176552,
    "max" : 219,
    "p99" : 154,
    "p50" : 49
  }
}