Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Akış uygulamalarınızın performansını, maliyetini ve durumunu izlemek, güvenilir ve verimli ETL işlem hatları oluşturmak için gereklidir. Azure Databricks performans sorunlarını tanılamaya, performansı iyileştirmeye ve kaynak kullanımını ve maliyetlerini yönetmeye yardımcı olmak için İşler, Lakeflow Spark Bildirimli İşlem Hatları ve Lakeflow Connect genelinde zengin bir gözlemlenebilirlik özellikleri kümesi sağlar.
Bu makalede aşağıdaki alanlardaki en iyi yöntemler özetlenmiştir:
- Önemli akış performans ölçümleri
- Olay günlüğü şemaları ve örnek sorgular
- Akış sorgusu izleme
- Sistem tablolarını kullanarak maliyet gözlemlenebilirliği
- Günlükleri ve ölçümleri dış araçlara aktarma
Akış gözlemlenebilirliği için temel ölçümler
Akış işlem hatlarını çalıştırırken aşağıdaki temel ölçümleri izleyin:
| Metric | Purpose |
|---|---|
| Backpressure | Dosya sayısını ve ofsetleri (boyutlar) izler. Performans sorunlarının belirlenmesine yardımcı olur ve sistemin geri kalmadan gelen verileri işleyebilmesini sağlar. |
| Throughput | Mikro yığın başına işlenen mesaj sayısını izler. İşlem hattı verimliliğini değerlendirin ve veri alımına ayak uydurduğunu denetleyin. |
| Duration | Mikro işlem paketinde ortalama süreyi ölçer. İşleme hızını gösterir ve toplu iş aralıklarının ayarlanmasına yardımcı olur. |
| Latency | Zaman içinde kaç kaydın/iletinin işlendiğini gösterir. Uçtan uca işlem hattı gecikmelerini anlamaya ve daha düşük gecikme süreleri için iyileştirmeye yardımcı olur. |
| Küme kullanımı | CPU ve bellek kullanımını yansıtır (%). Verimli kaynak kullanımı sağlar ve işleme taleplerini karşılamak için kümeleri ölçeklendirmeye yardımcı olur. |
| Network | Aktarılan ve alınan verileri ölçer. Ağ performans sorunlarını belirlemek ve veri aktarımı performansını geliştirmek için kullanışlıdır. |
| Checkpoint | İşlenen verileri ve uzaklıkları tanımlar. Tutarlılık sağlar ve hatalar sırasında hataya dayanıklılık sağlar. |
| Cost | Akış uygulamasının saatlik, günlük ve aylık maliyetlerini gösterir. Bütçeleme ve kaynak iyileştirmeye yönelik yardımlar. |
| Lineage | Akış uygulamasında oluşturulan veri kümelerini ve katmanları görüntüler. Veri dönüştürmeyi, izlemeyi, kalite güvencesini ve hata ayıklamayı kolaylaştırır. |
Küme günlükleri ve ölçümleri
Azure Databricks küme günlükleri ve ölçümleri, küme performansı ve kullanımı hakkında ayrıntılı içgörüler sağlar. Bu günlükler ve ölçümler CPU, bellek, disk G/Ç, ağ trafiği ve diğer sistem ölçümleri hakkındaki bilgileri içerir. Bu ölçümleri izlemek küme performansını iyileştirmek, kaynakları verimli bir şekilde yönetmek ve sorunları gidermek için çok önemlidir.
Azure Databricks küme günlükleri ve ölçümleri, küme performansı ve kaynak kullanımı hakkında ayrıntılı içgörüler sunar. Bunlar CPU ve bellek kullanımı, disk G/Ç ve ağ trafiğini içerir. Bu ölçümlerin izlenmesi aşağıdakiler için kritik öneme sahiptir:
- Küme performansını iyileştirme.
- Kaynakları verimli bir şekilde yönetme.
- İşletimsel sorunları giderme.
Ölçümler Databricks kullanıcı arabirimi aracılığıyla kullanılabilir veya kişisel izleme araçlarına aktarılabilir. Bkz . Not defteri örneği: Datadog ölçümleri.
Spark Kullanıcı Arabirimi
Spark kullanıcı arabirimi, tamamlanan, bekleyen ve başarısız olan görev sayısı da dahil olmak üzere işlerin ve aşamaların ilerleme durumu hakkında ayrıntılı bilgiler gösterir. Bu, yürütme akışını anlamanıza ve performans sorunlarını belirlemenize yardımcı olur.
Akış uygulamaları için Akış sekmesi giriş hızı, işleme hızı ve toplu iş süresi gibi ölçümleri gösterir. Akış işlerinizin performansını izlemenize ve veri alımı veya işleme sorunlarını belirlemenize yardımcı olur.
Daha fazla bilgi için bkz. Spark kullanıcı arabirimiyle hata ayıklama .
İşlem ölçümleri
İşlem ölçümleri, küme kullanımını anlamanıza yardımcı olur. İşiniz çalıştıkça nasıl ölçeklendirildiğini ve kaynaklarınızın nasıl etkilendiğini görebilirsiniz. Bellek baskısı, OOM hatalarına yol açabilir; CPU baskısı ise uzun gecikmelere neden olabilir. Bu tür baskıları bulabilirsiniz. Göreceğiniz belirli ölçümler şunlardır:
- Sunucu Yükü Dağıtımı: Her düğümün son dakika içindeki CPU kullanımı.
- CPU Kullanımı: CPU'nun çeşitli modlarda harcadığı süre yüzdesi (örneğin, kullanıcı, sistem, boşta ve iowait).
- Bellek Kullanımı: Her moda göre toplam bellek kullanımı (örneğin, kullanılan, boş, arabellek ve önbelleğe alınmış).
- Bellek Değiştirme Kullanımı: Toplam bellek değiştirme kullanımı.
- Boş Dosya Sistemi Alanı: Her bağlama noktasına göre toplam dosya sistemi kullanımı.
- Ağ Aktarım Hızı: Her cihaz tarafından ağ üzerinden alınan ve iletilen bayt sayısı.
- Etkin Düğüm Sayısı: Verilen işlem için her zaman damgasındaki etkin düğüm sayısı.
Daha fazla bilgi için bkz. Performansı izleme ve Donanım ölçüm grafikleri .
Sistem tabloları
Maliyet izleme
Azure Databricks sistem tabloları, iş maliyetini ve performansını izlemek için yapılandırılmış bir yaklaşım sağlar. Bu tablolar aşağıdakileri içerebilir:
- Görev yürütme ayrıntıları.
- Kaynak kullanımı.
- İlişkili maliyetler.
İşletimsel sistem durumunu ve finansal etkiyi anlamak için bu tabloları kullanın.
Requirements
Maliyet izleme için sistem tablolarını kullanmak için:
- Hesap yöneticisinin
system.lakeflow schemaöğesini etkinleştirmesi gerekir. - Kullanıcılar aşağıdakilerden birini yapmalıdır:
- Hem meta veri deposu yöneticisi hem de hesap yöneticisi olun veya
- Sistem şemalarında
USEveSELECTizinlerine sahip olun.
Örnek sorgu: En pahalı işler (son 30 gün)
Bu sorgu, son 30 gün içindeki en pahalı işleri tanımlayarak maliyet analizine ve iyileştirmeye yardımcı olur.
WITH list_cost_per_job AS (
SELECT
t1.workspace_id,
t1.usage_metadata.job_id,
COUNT(DISTINCT t1.usage_metadata.job_run_id) AS runs,
SUM(t1.usage_quantity * list_prices.pricing.default) AS list_cost,
FIRST(identity_metadata.run_as, true) AS run_as,
FIRST(t1.custom_tags, true) AS custom_tags,
MAX(t1.usage_end_time) AS last_seen_date
FROM system.billing.usage t1
INNER JOIN system.billing.list_prices list_prices ON
t1.cloud = list_prices.cloud AND
t1.sku_name = list_prices.sku_name AND
t1.usage_start_time >= list_prices.price_start_time AND
(t1.usage_end_time <= list_prices.price_end_time OR list_prices.price_end_time IS NULL)
WHERE
t1.billing_origin_product = "JOBS"
AND t1.usage_date >= CURRENT_DATE() - INTERVAL 30 DAY
GROUP BY ALL
),
most_recent_jobs AS (
SELECT
*,
ROW_NUMBER() OVER(PARTITION BY workspace_id, job_id ORDER BY change_time DESC) AS rn
FROM
system.lakeflow.jobs QUALIFY rn=1
)
SELECT
t2.name,
t1.job_id,
t1.workspace_id,
t1.runs,
t1.run_as,
SUM(list_cost) AS list_cost,
t1.last_seen_date
FROM list_cost_per_job t1
LEFT JOIN most_recent_jobs t2 USING (workspace_id, job_id)
GROUP BY ALL
ORDER BY list_cost DESC
Lakeflow Spark Deklaratif İşlem Hatları
Lakeflow Spark Bildirimli İşlem Hatları olay günlüğü, aşağıdakiler dahil olmak üzere tüm işlem hattı olaylarının kapsamlı bir kaydını yakalar:
- Denetim günlükleri.
- Veri kalitesi denetimleri.
- İşlem hattı ilerleme durumu.
- Veri kökeni.
Olay günlüğü, tüm Lakeflow Spark Deklaratif İşlem Hatları için otomatik olarak etkinleştirilmiştir ve aşağıdaki yoluyla erişilebilir:
- İşlem hattı kullanıcı arabirimi: Günlükleri doğrudan görüntüleyin.
- İşlem hatları API'si: Programlı erişim.
- Doğrudan sorgu: Olay günlüğü tablosunu sorgular.
Daha fazla bilgi için bkz. Lakeflow Spark Bildirimli İşlem Hatları için olay günlüğü şeması.
Örnek sorgular
Bu örnek sorgular toplu iş süresi, aktarım hızı, geri baskı ve kaynak kullanımı gibi önemli ölçümler sağlayarak işlem hatlarının performansını ve sistem durumunu izlemeye yardımcı olur.
Ortalama işlem süresi
Bu sorgu, işlem hattı tarafından işlenen toplu işlemlerin ortalama süresini hesaplar.
SELECT
(max_t - min_t) / batch_count as avg_batch_duration_seconds,
batch_count,
min_t,
max_t,
date_hr,
message
FROM
-- /60 for minutes
(
SELECT
count(*) as batch_count,
unix_timestamp(
min(timestamp)
) as min_t,
unix_timestamp(
max(timestamp)
) as max_t,
date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
message
FROM
event_log
WHERE
event_type = 'flow_progress'
AND level = 'METRICS'
GROUP BY
date_hr,
message
)
ORDER BY
date_hr desc
Ortalama aktarım hızı
Bu sorgu, saniye başına işlenen satırlar açısından işlem hattının ortalama aktarım hızını hesaplar.
SELECT
(max_t - min_t) / total_rows as avg_throughput_rps,
total_rows,
min_t,
max_t,
date_hr,
message
FROM
-- /60 for minutes
(
SELECT
sum(
details:flow_progress:metrics:num_output_rows
) as total_rows,
unix_timestamp(
min(timestamp)
) as min_t,
unix_timestamp(
max(timestamp)
) as max_t,
date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
message
FROM
event_log
WHERE
event_type = 'flow_progress'
AND level = 'METRICS'
GROUP BY
date_hr,
message
)
ORDER BY
date_hr desc
Backpressure
Bu sorgu, veri birikimini kontrol ederek işlem hattının geri basıncını ölçer.
SELECT
timestamp,
DOUBLE(
details:flow_progress:metrics:backlog_bytes
) AS backlog_bytes,
DOUBLE(
details:flow_progress:metrics:backlog_files
) AS backlog_files
FROM
event_log
WHERE
event_type = 'flow_progress'
Küme ve yuva kullanımı
Bu sorgu, işlem hattı tarafından kullanılan kümelerin veya yuvaların kullanımıyla ilgili içgörülere sahiptir.
SELECT
date_trunc("hour", timestamp) AS hour,
AVG (
DOUBLE (
details:cluster_resources:num_task_slots
)
) AS num_task_slots,
AVG (
DOUBLE (
details:cluster_resources:avg_num_task_slots
)
) AS avg_num_task_slots,
AVG (
DOUBLE (
details:cluster_resources:num_executors
)
) AS num_executors,
AVG (
DOUBLE (
details:cluster_resources:avg_task_slot_utilization
)
) AS avg_utilization,
AVG (
DOUBLE (
details:cluster_resources:avg_num_queued_tasks
)
) AS queue_size
FROM
event_log
WHERE
details : cluster_resources : avg_num_queued_tasks IS NOT NULL
AND origin.update_id = '${latest_update_id}'
GROUP BY
1;
Jobs
Akış Sorgu Dinleyicisi aracılığıyla işlerdeki akış sorgularını izleyebilirsiniz.
Azure Databricks'te Akış Sorgusu Dinleyicisi'ni etkinleştirmek için Spark oturumuna bir dinleyici ekleyin. Bu dinleyici akış sorgularınızın ilerleme durumunu ve ölçümlerini izler. Ölçümleri dış izleme araçlarına aktarmak veya daha ayrıntılı analiz için kaydetmek amacıyla kullanılabilir.
Örnek: Ölçümleri dış izleme araçlarına aktarma
Note
Bu, Python ve Scala için Databricks Runtime 11.3 LTS ve üzerinde kullanılabilir.
Akış verilerini, StreamingQueryListener arabirimini kullanarak uyarı veya pano oluşturma için dış hizmetlere aktarabilirsiniz.
Aşağıda, dinleyici uygulama hakkında temel bir örnek verilmiştir:
from pyspark.sql.streaming import StreamingQueryListener
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
print("Query started: ", event.id)
def onQueryProgress(self, event):
print("Query made progress: ", event.progress)
def onQueryTerminated(self, event):
print("Query terminated: ", event.id)
spark.streams.addListener(MyListener())
Örnek: Azure Databricks içinde sorgu dinleyicisi kullanma
Aşağıda kafkadan Delta Lake'e akış sorgusu için StreamingQueryListener olay günlüğü örneği verilmiştir:
{
"id": "210f4746-7caa-4a51-bd08-87cabb45bdbe",
"runId": "42a2f990-c463-4a9c-9aae-95d6990e63f4",
"timestamp": "2024-05-15T21:57:50.782Z",
"batchId": 0,
"batchDuration": 3601,
"numInputRows": 20,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 5.55401277422938,
"durationMs": {
"addBatch": 1544,
"commitBatch": 686,
"commitOffsets": 27,
"getBatch": 12,
"latestOffset": 577,
"queryPlanning": 105,
"triggerExecution": 3600,
"walCommit": 34
},
"stateOperators": [
{
"operatorName": "symmetricHashJoin",
"numRowsTotal": 20,
"numRowsUpdated": 20,
"allUpdatesTimeMs": 473,
"numRowsRemoved": 0,
"allRemovalsTimeMs": 0,
"commitTimeMs": 277,
"memoryUsedBytes": 13120,
"numRowsDroppedByWatermark": 0,
"numShufflePartitions": 5,
"numStateStoreInstances": 20,
"customMetrics": {
"loadedMapCacheHitCount": 0,
"loadedMapCacheMissCount": 0,
"stateOnCurrentVersionSizeBytes": 5280
}
}
],
"sources": [
{
"description": "KafkaV2[Subscribe[topic-1]]",
"numInputRows": 10,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 2.77700638711469,
"metrics": {
"avgOffsetsBehindLatest": "0.0",
"estimatedTotalBytesBehindLatest": "0.0",
"maxOffsetsBehindLatest": "0",
"minOffsetsBehindLatest": "0"
}
},
{
"description": "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
"numInputRows": 10,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 2.77700638711469,
"metrics": {
"numBytesOutstanding": "0",
"numFilesOutstanding": "0"
}
}
]
}
Daha fazla örnek için bkz. Örnekler.
Sorgu ilerleme durumu ölçümleri
Sorgu ilerleme durumu ölçümleri, akış sorgularınızın performansını ve sistem durumunu izlemek için gereklidir. Bu ölçümler giriş satırlarının sayısını, işleme oranlarını ve sorgu yürütmeyle ilgili çeşitli süreleri içerir. Spark oturumuna bir StreamingQueryListener ekleyerek bu ölçümleri gözlemleyebilirsiniz. Dinleyici, her akış döneminin sonunda bu ölçümleri içeren olayları yayar.
Örneğin, dinleyicinin StreamingQueryProgress.observedMetrics yöntemindeki haritayı onQueryProgress kullanarak ölçümlere erişebilirsiniz. Bu sayede akış sorgularınızın performansını gerçek zamanlı olarak izleyebilir ve analiz edebilirsiniz.
class MyListener(StreamingQueryListener):
def onQueryProgress(self, event):
print("Query made progress: ", event.progress.observedMetrics)