Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
Tato stránka popisuje ladění výpočetních prostředků, techniky pro snížení celkové latence a přístupy pro měření výkonu dotazů v režimu v reálném čase.
Ladění výpočetních prostředků
Při konfiguraci výpočetních prostředků zvažte následující:
- Na rozdíl od režimu mikrodávkového režimu můžou úkoly v reálném čase zůstat nečinné při čekání na data, takže správné nastavení velikosti je nezbytné, aby nedocházelo k plýtvání zdroji.
- Usilujte o dosažení cílové úrovně využití clusteru, například 50 %, pomocí ladění:
-
maxPartitions(pro Kafka) -
spark.sql.shuffle.partitions(pro fáze náhodného prohazování)
-
- Databricks doporučuje nastavit
maxPartitions, aby každý úkol zpracoval více particí Kafka, aby se snížila režie. - Upravte sloty úkolů na pracovníka tak, aby odpovídaly zátěži pro jednoduché jednofázové úlohy.
- U úloh s vysokými nároky na shuffle experimentujte s nastavením minimálního počtu oddílů, které se vyhnou zdržení, a odtud jej upravte. Výpočetní uzel neplánuje úlohu, pokud nemá dostatek přidělených míst.
Poznámka:
Od Databricks Runtime 16.4 LTS a vyšších používají všechny datové toky v reálném čase checkpoint v2, který umožňuje bezproblémové přepínání mezi režimem reálného času a mikrodávkovým režimem.
Optimalizace latence
Režim strukturovaného streamování v reálném čase má volitelné techniky pro snížení celkové latence. Ve výchozím nastavení není povolená žádná možnost. Musíte je povolit samostatně.
- Asynchronní sledování průběhu: Přesune zápisy do protokolů posunu a potvrzení do asynchronního vlákna, čímž se sníží čas mezi dávkami pro bezstavové dotazy.
- Asynchronní vytváření kontrolních bodů stavu: Začne zpracovávat další mikrodávku hned po dokončení výpočtu, aniž by se čekalo na kontrolní bod stavu, což snižuje latenci stavových dotazů.
Monitorování a pozorovatelnost
V režimu v reálném čase tradiční metriky dávkové doby trvání neodráží skutečnou celkovou latenci. Pomocí následujících přístupů můžete přesně měřit latenci a identifikovat kritické body v dotazech.
Koncová latence je specifická pro úlohy a někdy se dá přesně měřit pouze s podnikovou logikou. Pokud je například zdrojové časové razítko výstupem v systému Kafka, můžete vypočítat latenci jako rozdíl mezi časovým razítkem výstupu Kafka a zdrojovým časovým razítkem.
Integrované metriky s využitím StreamingQueryProgress
Událost StreamingQueryProgress se automaticky zaprotokoluje v protokolech ovladačů a je přístupná prostřednictvím funkce zpětného StreamingQueryListeneronQueryProgress() volání. To vám umožní reagovat na probíhající události prostřednictvím kódu programu, například pokud chcete publikovat metriky do externího monitorovacího systému.
QueryProgressEvent.json() nebo toString() zahrnout tyto metriky režimu v reálném čase:
-
Latence zpracování (
processingLatencyMs). Uplynulý čas mezi tím, kdy dotaz v režimu v reálném čase přečte záznam a kdy ho zapíše do další fáze nebo podřízeného dotazu. U dotazů s jednou fází se měří stejná doba trvání jako celková latence. Systém tuto metriku hlásí na každou úlohu. -
Latence fronty zdroje (
sourceQueuingLatencyMs). Doba, která uplynula mezi tím, kdy systém zapíše záznam do sběrnice zpráv ( například čas připojení protokolu v Systému Kafka) a kdy dotaz v režimu v reálném čase poprvé přečte záznam. Systém tuto metriku hlásí na každou úlohu. -
Kompletní latence (
e2eLatencyMs). Doba mezi tím, kdy systém zapíše záznam do sběrnice zpráv a kdy dotaz v režimu v reálném čase zapíše záznam do podřízeného stavu. Systém agreguje tuto metriku pro každou dávku napříč všemi záznamy zpracovanými úkoly.
Například:
"rtmMetrics" : {
"processingLatencyMs" : {
"P0" : 0,
"P50" : 0,
"P90" : 0,
"P95" : 0,
"P99" : 0
},
"sourceQueuingLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 3
},
"e2eLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 4
}
}
Měření vlastní latence pomocí rozhraní API pro sledování
Rozhraní API sledování umožňuje měřit latenci přímo bez spuštění samostatné úlohy. Pokud máte časové razítko zdroje, které se blíží času příjezdu zdrojových dat, můžete odhadnout latenci pro každou dávku zaznamenáním časového razítka před datovým úložištěm a vypočítáním rozdílu. Výsledky se zobrazují ve zprávách o průběhu a jsou dostupné pro posluchače.
Python
from datetime import datetime
from pyspark.sql.functions import avg, col, lit, max, percentile_approx, udf, unix_millis
from pyspark.sql.types import TimestampType
@udf(returnType=TimestampType())
def current_timestamp():
return datetime.now()
# Query before outputting
.withColumn("temp-timestamp", current_timestamp())
.withColumn(
"latency",
unix_millis(col("temp-timestamp")).cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
"observedLatency",
avg(col("latency")).alias("avg"),
max(col("latency")).alias("max"),
percentile_approx(col("latency"), lit(0.99), lit(150)).alias("p99"),
percentile_approx(col("latency"), lit(0.5), lit(150)).alias("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
# Output part of the query. For example, .WriteStream, etc.
Scala
import org.apache.spark.sql.functions.{avg, col, lit, max, percentile_approx, udf, unix_millis}
val currentTimestampUDF = udf(() => System.currentTimeMillis())
// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
"latency",
col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
name = "observedLatency",
avg(col("latency")).as("avg"),
max(col("latency")).as("max"),
percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.
Ukázkový výstup:
"observedMetrics" : {
"observedLatency" : {
"avg" : 63.8369765176552,
"max" : 219,
"p99" : 154,
"p50" : 49
}
}