Gerçek zamanlı mod sorgu performansını iyileştirme ve izleme

Bu sayfada işlem ayarlama, uçtan uca gecikme süresini azaltma teknikleri ve gerçek zamanlı modda sorgu performansını ölçmeye yönelik yaklaşımlar yer alır.

Hesaplama ayarlama

Hesaplama ortamınızı yapılandırırken aşağıdakileri göz önünde bulundurun:

  • Mikro toplu iş modundan farklı olarak, gerçek zamanlı görevler verileri beklerken boşta kalabilir, bu nedenle kaynakların boşa harcanmasını önlemek için doğru boyutlandırma gereklidir.
  • Ayarlama yaparak 50%gibi bir hedef küme kullanım düzeyini hedefleyin:
    • maxPartitions (Kafka için)
    • spark.sql.shuffle.partitions (karıştırma aşaması için)
  • Databricks, her görevin ek yükü azaltmak için birden fazla Kafka bölümünü işlemesi için maxPartitions'yi ayarlamanızı önerir.
  • Çalışan başına görev yuvalarını, basit tek aşamalı işler için iş yüküyle eşleşecek şekilde ayarlayın.
  • Yoğun karıştırmalı işler için birikimlerden kaçınan en az karıştırma bölümü sayısını bulmak için denemeler yapın ve buradan ayarlayın. Bilgisayar sistemi yeterli yuvaya sahip değilse işi zamanlamaz.

Uyarı

Databricks Runtime 16.4 LTS ve daha üst versiyonlarında, tüm gerçek zamanlı iş hatları, gerçek zamanlı ve mikro toplu iş modları arasında sorunsuz geçiş sağlamak için checkpoint v2 kullanır.

Gecikme süresini iyileştirme

Yapılandırılmış Akış gerçek zamanlı modu, uçtan uca gecikme süresini azaltmak için isteğe bağlı tekniklere sahiptir. Hiçbiri varsayılan olarak etkin değildir. Bunları ayrı olarak etkinleştirmeniz gerekir.

  • Zaman uyumsuz ilerleme izleme: Yazma işlemlerini uzaklığa taşır ve günlükleri zaman uyumsuz bir iş parçacığına işler ve durum bilgisi olmayan sorgular için toplu işler arası süreyi azaltır.
  • Asenkron durum denetim noktası oluşturma: Durum denetim noktasını oluşturmadan, işlem tamamlanır tamamlanmaz bir sonraki mikro toplu işlemi işlemeye başlar ve durum bilgisi olan sorgular için gecikmeyi azaltır.

İzleme ve gözlemlenebilirlik

Gerçek zamanlı modda geleneksel toplu iş süresi ölçümleri gerçek uçtan uca gecikme süresini yansıtmaz. Gecikme süresini doğru bir şekilde ölçmek ve sorgularınızdaki performans sorunlarını belirlemek için aşağıdaki yaklaşımları kullanın.

Uçtan uca gecikme süresi iş yüküne özgüdür ve bazen yalnızca iş mantığıyla doğru bir şekilde ölçülebilir. Örneğin, kaynak zaman damgası Kafka'da çıktı ise, gecikme süresini Kafka'nın çıkış zaman damgası ile kaynak zaman damgası arasındaki fark olarak hesaplayabilirsiniz.

Yerleşik ölçümler StreamingQueryProgress ile

Olay StreamingQueryProgress, sürücü günlüklerine otomatik olarak kaydedilir ve StreamingQueryListener, onQueryProgress() geri çağırma işlevi aracılığıyla erişilebilir. Bu, ilerleme olaylarına program aracılığıyla, örneğin ölçümleri bir dış izleme sisteminde yayımlamak istiyorsanız tepki vermenizi sağlar. QueryProgressEvent.json() veya toString() şu gerçek zamanlı mod ölçümlerini ekleyin:

  1. İşleme gecikme süresi (processingLatencyMs). Gerçek zamanlı mod sorgusunun bir kaydı okuması ile sorgunun bir sonraki aşamaya veya aşağı akışa yazması arasında geçen süre. Tek aşamalı sorgular için bu, uçtan uca gecikme süresiyle aynı süreyi ölçer. Sistem bu ölçümü görev başına bildirir.
  2. Kaynak kuyruğa alma gecikme süresi (sourceQueuingLatencyMs). Sistemin bir ileti veriyoluna kayıt yazması (örneğin, Kafka'daki günlük ekleme süresi) ile gerçek zamanlı mod sorgusunun kaydı ilk kez okuması arasında geçen süre. Sistem bu ölçümü görev başına bildirir.
  3. Uçtan uca gecikme süresi (e2eLatencyMs). Sistemin kaydı bir ileti veri yolu'na yazması ile gerçek zamanlı mod sorgusunun kaydı aşağı akışa yazması arasındaki süre. Sistem, tüm görevler tarafından işlenen tüm kayıtlarda toplu iş başına bu ölçümü birleştirir.

Örneğin:

"rtmMetrics" : {
    "processingLatencyMs" : {
      "P0" : 0,
      "P50" : 0,
      "P90" : 0,
      "P95" : 0,
      "P99" : 0
    },
    "sourceQueuingLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 3
    },
    "e2eLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 4
    }
}

Gözlem API'siyle özel gecikme süresi ölçümü

Gözlemleme API'si, ayrı bir iş başlatmadan satır içi gecikme süresini ölçmenizi sağlar. Elinizde kaynak veri varış zamanını yaklaşık olarak belirten bir kaynak zaman damgası varsa, alıcı öncesinde bir zaman damgası kaydederek ve farkı hesaplayarak toplu iş başına gecikmeyi tahmin edebilirsiniz. Sonuçlar ilerleme raporlarında görünür ve dinleyiciler tarafından kullanılabilir.

Python

from datetime import datetime

from pyspark.sql.functions import avg, col, lit, max, percentile_approx, udf, unix_millis
from pyspark.sql.types import TimestampType

@udf(returnType=TimestampType())
def current_timestamp():
  return datetime.now()

# Query before outputting
.withColumn("temp-timestamp", current_timestamp())
.withColumn(
  "latency",
  unix_millis(col("temp-timestamp")).cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  "observedLatency",
  avg(col("latency")).alias("avg"),
  max(col("latency")).alias("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).alias("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).alias("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
# Output part of the query. For example, .WriteStream, etc.

Scala

import org.apache.spark.sql.functions.{avg, col, lit, max, percentile_approx, udf, unix_millis}

val currentTimestampUDF = udf(() => System.currentTimeMillis())

// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
  "latency",
  col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  name = "observedLatency",
  avg(col("latency")).as("avg"),
  max(col("latency")).as("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.

Örnek çıkış:

"observedMetrics" : {
  "observedLatency" : {
    "avg" : 63.8369765176552,
    "max" : 219,
    "p99" : 154,
    "p50" : 49
  }
}

Ek kaynaklar