Megjegyzés
Az oldalhoz való hozzáféréshez engedély szükséges. Megpróbálhat bejelentkezni vagy módosítani a címtárat.
Az oldalhoz való hozzáféréshez engedély szükséges. Megpróbálhatja módosítani a címtárat.
Ez az oldal a számítás finomhangolását, a végpontok közötti késés csökkentésének technikáit és a lekérdezési teljesítmény valós idejű módban történő mérésének módszereit ismerteti.
Számítás finomhangolása
A számítás konfigurálásakor vegye figyelembe a következőket:
- A mikroköteg módtól eltérően a valós idejű feladatok tétlenek maradhatnak az adatokra való várakozás során, ezért a megfelelő méretezés elengedhetetlen az erőforrások pazarlásának elkerülése érdekében.
- Törekedjen a célfürt kihasználtsági szintjének elérésére, például 50%-ra, a finomhangolás révén.
-
maxPartitions(a Kafka esetében) -
spark.sql.shuffle.partitions(shuffle szakaszok esetén)
-
- A Databricks azt javasolja, hogy úgy állítsa be a
maxPartitions, hogy minden tevékenység több Kafka-partíciót kezeljen a terhelés csökkentése érdekében. - Módosítsa a feladathelyeket munkavállalónként úgy, hogy azok illeszkedjenek az egyszerű egyszakaszos feladatok terheléséhez.
- Shuffle-heavy feladatok esetén kísérletezzen a hátralékok elkerüléséhez szükséges shuffle partíciók minimális számának megkeresésével, és onnan végezzen módosításokat. A számítás nem ütemezi a feladatot, ha nem rendelkezik elegendő tárolóhellyel.
Megjegyzés:
A Databricks Runtime 16.4 LTS-ből és az azt meghaladó verziókból az összes valós idejű feldolgozás a checkpoint v2 használatával teszi lehetővé a zökkenőmentes váltást a valós idejű és a mikrokötegelt mód között.
Késés optimalizálása
A strukturált valós idejű streamelési mód választható technikákkal csökkenti a végpontok közötti késést. Alapértelmezés szerint egyik sem engedélyezett. Ezeket külön kell engedélyeznie.
- Aszinkron folyamatkövetés: Az írásokat az eltoláshoz és a naplók véglegesítéséhez egy aszinkron szálba helyezi át, csökkentve az állapot nélküli lekérdezések kötegek közötti idejét.
- Aszinkron állapot-ellenőrzőpontozás: A számítás befejezése után azonnal megkezdi a következő mikroköteg feldolgozását anélkül, hogy az állapotellenőrzésre vár, csökkentve az állapotalapú lekérdezések késését.
Monitorozás és megfigyelhetőség
Valós idejű módban a hagyományos kötegidő-metrikák nem tükrözik a teljes körű késést. Az alábbi módszerekkel pontosan mérheti a késést, és azonosíthatja a lekérdezések szűk keresztmetszeteit.
A végpontok közötti késés munkaterhelés-specifikus, és néha csak üzleti logikával mérhető pontosan. Ha például a forrás időbélyege a Kafka kimenete, akkor a késést a Kafka kimeneti időbélyege és a forrás időbélyege közötti különbségként számíthatja ki.
Beépített metrikák a következővel: StreamingQueryProgress
A StreamingQueryProgress rendszer automatikusan naplózza az eseményt az illesztőprogram-naplókban, és a visszahívási StreamingQueryListeneronQueryProgress() függvényen keresztül érhető el. Ez lehetővé teszi, hogy programozott módon reagáljon a folyamat eseményeire, például ha metrikákat szeretne közzétenni egy külső monitorozási rendszerben.
QueryProgressEvent.json() vagy toString() vegye fel a valós idejű mód metrikáit:
-
Feldolgozási késés (
processingLatencyMs). A valós idejű módú lekérdezés egy rekord beolvasása és a lekérdezés következő szakaszba vagy lefelé történő írása között eltelt idő. Az egyfázisú lekérdezések esetében ez a végpontok közötti késés időtartamát méri. A rendszer tevékenységenként jelenti ezt a metrikát. -
Forrássorok késése (
sourceQueuingLatencyMs). Az az idő, amely között eltelt, amikor a rendszer egy rekordot ír egy üzenetbuszba – például a Kafkában a napló hozzáfűzési idejét –, és amikor a valós idejű lekérdezés először beolvassa a rekordot. A rendszer tevékenységenként jelenti ezt a metrikát. -
Végpontok közötti késés (
e2eLatencyMs). Az az idő, amikor a rendszer egy üzenetbuszba írja a rekordot, és amikor a valós idejű módú lekérdezés lefelé írja a rekordot. A rendszer kötegenként összesíti ezt a metrikát az összes tevékenység által feldolgozott összes rekordon.
Például:
"rtmMetrics" : {
"processingLatencyMs" : {
"P0" : 0,
"P50" : 0,
"P90" : 0,
"P95" : 0,
"P99" : 0
},
"sourceQueuingLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 3
},
"e2eLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 4
}
}
Egyéni késés mérése a Observe API-val
A Megfigyelési API lehetővé teszi a késés beágyazott mérését anélkül, hogy külön feladatot indítanának el. Ha olyan forrásidőbélyege van, amely a forrásadatok érkezési idejét közelíti meg, a kötegenkénti késés becsléséhez rögzíthet egy időbélyeget a kimeneti csomópont előtt, és a különbség kiszámításra kerül. Az eredmények megjelennek a folyamatban lévő jelentésekben, és elérhetők a figyelők számára.
Python
from datetime import datetime
from pyspark.sql.functions import avg, col, lit, max, percentile_approx, udf, unix_millis
from pyspark.sql.types import TimestampType
@udf(returnType=TimestampType())
def current_timestamp():
return datetime.now()
# Query before outputting
.withColumn("temp-timestamp", current_timestamp())
.withColumn(
"latency",
unix_millis(col("temp-timestamp")).cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
"observedLatency",
avg(col("latency")).alias("avg"),
max(col("latency")).alias("max"),
percentile_approx(col("latency"), lit(0.99), lit(150)).alias("p99"),
percentile_approx(col("latency"), lit(0.5), lit(150)).alias("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
# Output part of the query. For example, .WriteStream, etc.
Scala
import org.apache.spark.sql.functions.{avg, col, lit, max, percentile_approx, udf, unix_millis}
val currentTimestampUDF = udf(() => System.currentTimeMillis())
// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
"latency",
col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
name = "observedLatency",
avg(col("latency")).as("avg"),
max(col("latency")).as("max"),
percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.
A kimenet példája:
"observedMetrics" : {
"observedLatency" : {
"avg" : 63.8369765176552,
"max" : 219,
"p99" : 154,
"p50" : 49
}
}