Valós idejű módú lekérdezési teljesítmény optimalizálása és monitorozása

Ez az oldal a számítás finomhangolását, a végpontok közötti késés csökkentésének technikáit és a lekérdezési teljesítmény valós idejű módban történő mérésének módszereit ismerteti.

Számítás finomhangolása

A számítás konfigurálásakor vegye figyelembe a következőket:

  • A mikroköteg módtól eltérően a valós idejű feladatok tétlenek maradhatnak az adatokra való várakozás során, ezért a megfelelő méretezés elengedhetetlen az erőforrások pazarlásának elkerülése érdekében.
  • Törekedjen a célfürt kihasználtsági szintjének elérésére, például 50%-ra, a finomhangolás révén.
    • maxPartitions (a Kafka esetében)
    • spark.sql.shuffle.partitions (shuffle szakaszok esetén)
  • A Databricks azt javasolja, hogy úgy állítsa be a maxPartitions, hogy minden tevékenység több Kafka-partíciót kezeljen a terhelés csökkentése érdekében.
  • Módosítsa a feladathelyeket munkavállalónként úgy, hogy azok illeszkedjenek az egyszerű egyszakaszos feladatok terheléséhez.
  • Shuffle-heavy feladatok esetén kísérletezzen a hátralékok elkerüléséhez szükséges shuffle partíciók minimális számának megkeresésével, és onnan végezzen módosításokat. A számítás nem ütemezi a feladatot, ha nem rendelkezik elegendő tárolóhellyel.

Megjegyzés:

A Databricks Runtime 16.4 LTS-ből és az azt meghaladó verziókból az összes valós idejű feldolgozás a checkpoint v2 használatával teszi lehetővé a zökkenőmentes váltást a valós idejű és a mikrokötegelt mód között.

Késés optimalizálása

A strukturált valós idejű streamelési mód választható technikákkal csökkenti a végpontok közötti késést. Alapértelmezés szerint egyik sem engedélyezett. Ezeket külön kell engedélyeznie.

  • Aszinkron folyamatkövetés: Az írásokat az eltoláshoz és a naplók véglegesítéséhez egy aszinkron szálba helyezi át, csökkentve az állapot nélküli lekérdezések kötegek közötti idejét.
  • Aszinkron állapot-ellenőrzőpontozás: A számítás befejezése után azonnal megkezdi a következő mikroköteg feldolgozását anélkül, hogy az állapotellenőrzésre vár, csökkentve az állapotalapú lekérdezések késését.

Monitorozás és megfigyelhetőség

Valós idejű módban a hagyományos kötegidő-metrikák nem tükrözik a teljes körű késést. Az alábbi módszerekkel pontosan mérheti a késést, és azonosíthatja a lekérdezések szűk keresztmetszeteit.

A végpontok közötti késés munkaterhelés-specifikus, és néha csak üzleti logikával mérhető pontosan. Ha például a forrás időbélyege a Kafka kimenete, akkor a késést a Kafka kimeneti időbélyege és a forrás időbélyege közötti különbségként számíthatja ki.

Beépített metrikák a következővel: StreamingQueryProgress

A StreamingQueryProgress rendszer automatikusan naplózza az eseményt az illesztőprogram-naplókban, és a visszahívási StreamingQueryListeneronQueryProgress() függvényen keresztül érhető el. Ez lehetővé teszi, hogy programozott módon reagáljon a folyamat eseményeire, például ha metrikákat szeretne közzétenni egy külső monitorozási rendszerben. QueryProgressEvent.json() vagy toString() vegye fel a valós idejű mód metrikáit:

  1. Feldolgozási késés (processingLatencyMs). A valós idejű módú lekérdezés egy rekord beolvasása és a lekérdezés következő szakaszba vagy lefelé történő írása között eltelt idő. Az egyfázisú lekérdezések esetében ez a végpontok közötti késés időtartamát méri. A rendszer tevékenységenként jelenti ezt a metrikát.
  2. Forrássorok késése (sourceQueuingLatencyMs). Az az idő, amely között eltelt, amikor a rendszer egy rekordot ír egy üzenetbuszba – például a Kafkában a napló hozzáfűzési idejét –, és amikor a valós idejű lekérdezés először beolvassa a rekordot. A rendszer tevékenységenként jelenti ezt a metrikát.
  3. Végpontok közötti késés (e2eLatencyMs). Az az idő, amikor a rendszer egy üzenetbuszba írja a rekordot, és amikor a valós idejű módú lekérdezés lefelé írja a rekordot. A rendszer kötegenként összesíti ezt a metrikát az összes tevékenység által feldolgozott összes rekordon.

Például:

"rtmMetrics" : {
    "processingLatencyMs" : {
      "P0" : 0,
      "P50" : 0,
      "P90" : 0,
      "P95" : 0,
      "P99" : 0
    },
    "sourceQueuingLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 3
    },
    "e2eLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 4
    }
}

Egyéni késés mérése a Observe API-val

A Megfigyelési API lehetővé teszi a késés beágyazott mérését anélkül, hogy külön feladatot indítanának el. Ha olyan forrásidőbélyege van, amely a forrásadatok érkezési idejét közelíti meg, a kötegenkénti késés becsléséhez rögzíthet egy időbélyeget a kimeneti csomópont előtt, és a különbség kiszámításra kerül. Az eredmények megjelennek a folyamatban lévő jelentésekben, és elérhetők a figyelők számára.

Python

from datetime import datetime

from pyspark.sql.functions import avg, col, lit, max, percentile_approx, udf, unix_millis
from pyspark.sql.types import TimestampType

@udf(returnType=TimestampType())
def current_timestamp():
  return datetime.now()

# Query before outputting
.withColumn("temp-timestamp", current_timestamp())
.withColumn(
  "latency",
  unix_millis(col("temp-timestamp")).cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  "observedLatency",
  avg(col("latency")).alias("avg"),
  max(col("latency")).alias("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).alias("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).alias("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
# Output part of the query. For example, .WriteStream, etc.

Scala

import org.apache.spark.sql.functions.{avg, col, lit, max, percentile_approx, udf, unix_millis}

val currentTimestampUDF = udf(() => System.currentTimeMillis())

// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
  "latency",
  col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  name = "observedLatency",
  avg(col("latency")).as("avg"),
  max(col("latency")).as("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.

A kimenet példája:

"observedMetrics" : {
  "observedLatency" : {
    "avg" : 63.8369765176552,
    "max" : 219,
    "p99" : 154,
    "p50" : 49
  }
}

További erőforrások