Nuta
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować się zalogować lub zmienić katalog.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Monitorowanie wydajności, kosztów i kondycji aplikacji przesyłania strumieniowego jest niezbędne do tworzenia niezawodnych, wydajnych potoków ETL. Usługa Azure Databricks udostępnia bogaty zestaw funkcji obserwowalności w zadaniach, deklaratywnych potokach Lakeflow Spark i Lakeflow Connect, aby ułatwić diagnozowanie wąskich gardeł, optymalizowanie wydajności oraz zarządzanie użyciem zasobów i kosztami.
W tym artykule opisano najlepsze rozwiązania w następujących obszarach:
- Kluczowe metryki wydajności przesyłania strumieniowego
- Schematy dziennika zdarzeń i przykładowe zapytania
- Monitorowanie zapytań przesyłanych strumieniowo
- Możliwość obserwowania kosztów przy użyciu tabel systemowych
- Eksportowanie dzienników i metryk do narzędzi zewnętrznych
Kluczowe metryki dotyczące obserwowalności przesyłania strumieniowego
Podczas obsługi potoków przesyłania strumieniowego monitoruj następujące kluczowe metryki:
| Metric | Purpose |
|---|---|
| Backpressure | Monitoruje liczbę plików i przesunięć (rozmiarów). Pomaga identyfikować wąskie gardła i zapewnia, że system jest w stanie obsługiwać dane przychodzące bez opóźnień. |
| Throughput | Śledzi liczbę komunikatów przetworzonych na mikropartię. Oceń wydajność potoku i sprawdź, czy nadąża za pozyskiwaniem danych. |
| Duration | Mierzy średni czas trwania mikrosadowej partii. Wskazuje szybkość przetwarzania i pomaga dostroić interwały wsadowe. |
| Latency | Wskazuje, ile rekordów/komunikatów jest przetwarzanych w czasie. Pomaga zrozumieć opóźnienia na poziomie całego procesu i zoptymalizować je pod kątem mniejszych opóźnień. |
| Wykorzystanie klastra | Odzwierciedla użycie procesora i pamięci (%). Zapewnia efektywne użycie zasobów i pomaga skalować klastry w celu spełnienia wymagań dotyczących przetwarzania. |
| Network | Mierzy dane przesyłane i odbierane. Przydatne do identyfikowania wąskich gardeł sieci i poprawy wydajności transferu danych. |
| Checkpoint | Identyfikuje przetworzone dane i wartości przesunięcia. Zapewnia spójność i zapewnia odporność na uszkodzenia podczas awarii. |
| Cost | Pokazuje godzinowe, dzienne i miesięczne koszty aplikacji przesyłania strumieniowego. Pomoc w budżetowaniu i optymalizacji zasobów. |
| Lineage | Wyświetla zestawy danych i warstwy utworzone w aplikacji przesyłania strumieniowego. Ułatwia przekształcanie danych, śledzenie, zapewnianie jakości i debugowanie. |
Dzienniki i metryki klastra
Dzienniki i metryki klastra usługi Azure Databricks zapewniają szczegółowy wgląd w wydajność i wykorzystanie klastra. Te dzienniki i metryki obejmują informacje na temat procesora CPU, pamięci, operacji we/wy dysku, ruchu sieciowego i innych metryk systemu. Monitorowanie tych metryk ma kluczowe znaczenie dla optymalizacji wydajności klastra, wydajnego zarządzania zasobami i rozwiązywania problemów.
Dzienniki i metryki klastra usługi Azure Databricks oferują szczegółowy wgląd w wydajność klastra i wykorzystanie zasobów. Obejmują one użycie procesora CPU i pamięci, we/wy dysku i ruch sieciowy. Monitorowanie tych metryk ma kluczowe znaczenie dla:
- Optymalizacja wydajności klastra.
- Efektywne zarządzanie zasobami.
- Rozwiązywanie problemów operacyjnych.
Metryki można wykorzystać za pomocą interfejsu użytkownika usługi Databricks lub wyeksportować je do osobistych narzędzi do monitorowania. Zobacz Przykład notatnika: metryki Datadog.
Interfejs użytkownika platformy Spark
Interfejs użytkownika platformy Spark zawiera szczegółowe informacje o postępie zadań i etapach, w tym liczbę ukończonych zadań, oczekujących i zakończonych niepowodzeniem. Pomaga to zrozumieć przebieg wykonywania i zidentyfikować wąskie gardła.
W przypadku aplikacji przesyłania strumieniowego karta Przesyłanie strumieniowe zawiera metryki, takie jak szybkość danych wejściowych, szybkość przetwarzania i czas trwania partii. Pomaga monitorować wydajność zadań przesyłania strumieniowego i identyfikować wszelkie problemy z pozyskiwaniem lub przetwarzaniem danych.
Aby uzyskać więcej informacji, zobacz Debugowanie za pomocą interfejsu użytkownika platformy Spark .
Metryki obliczeniowe
Metryki obliczeniowe pomogą Ci zrozumieć wykorzystanie klastra. W miarę uruchamiania zadania możesz zobaczyć, jak działa i jak wpływa na zasoby. Będzie można znaleźć ciśnienie pamięci, które może prowadzić do awarii Out Of Memory (OOM) lub obciążenie CPU, które może powodować długie opóźnienia. Poniżej przedstawiono konkretne metryki, które zobaczysz:
- Dystrybucja obciążenia serwera: wykorzystanie procesora CPU każdego węzła w ciągu ostatniej minuty.
- Użycie CPU: procent czasu spędzonego przez CPU w różnych trybach (na przykład użytkownik, system, bezczynny i iowait).
- Wykorzystanie pamięci: łączne użycie pamięci przez każdy tryb (na przykład używane, wolne, buforowe i buforowane).
- Wykorzystanie pamięci wymiennej: łączne użycie pamięci wymiennej.
- Wolne miejsce w systemie plików: łączne użycie systemu plików przez każdy punkt instalacji.
- Przepływność sieci: liczba bajtów odebranych i przesłanych za pośrednictwem sieci przez każde urządzenie.
- Liczba aktywnych węzłów: liczba aktywnych węzłów w każdym znaczniku czasu dla danego obliczenia.
Aby uzyskać więcej informacji, zobacz Monitoruj wydajność i Wykresy metryk sprzętu.
Tabele systemowe
Monitorowanie kosztów
Tabele systemowe usługi Azure Databricks zapewniają ustrukturyzowane podejście do monitorowania kosztów i wydajności zadań. Tabele te obejmują:
- Szczegóły przebiegu zadania.
- Wykorzystanie zasobów.
- Powiązane koszty.
Te tabele umożliwiają zrozumienie kondycji operacyjnej i wpływu finansowego.
Requirements
Aby użyć tabel systemowych do monitorowania kosztów:
- Administrator konta musi włączyć element
system.lakeflow schema. - Użytkownicy muszą albo:
- Bądź zarówno administratorem metadanych, jak i administratorem konta lub
- Mieć uprawnienia
USEiSELECTna schematach systemowych.
Przykładowe zapytanie: najdroższe zadania (ostatnie 30 dni)
To zapytanie identyfikuje najdroższe zadania w ciągu ostatnich 30 dni, co pomaga w analizie kosztów i optymalizacji.
WITH list_cost_per_job AS (
SELECT
t1.workspace_id,
t1.usage_metadata.job_id,
COUNT(DISTINCT t1.usage_metadata.job_run_id) AS runs,
SUM(t1.usage_quantity * list_prices.pricing.default) AS list_cost,
FIRST(identity_metadata.run_as, true) AS run_as,
FIRST(t1.custom_tags, true) AS custom_tags,
MAX(t1.usage_end_time) AS last_seen_date
FROM system.billing.usage t1
INNER JOIN system.billing.list_prices list_prices ON
t1.cloud = list_prices.cloud AND
t1.sku_name = list_prices.sku_name AND
t1.usage_start_time >= list_prices.price_start_time AND
(t1.usage_end_time <= list_prices.price_end_time OR list_prices.price_end_time IS NULL)
WHERE
t1.billing_origin_product = "JOBS"
AND t1.usage_date >= CURRENT_DATE() - INTERVAL 30 DAY
GROUP BY ALL
),
most_recent_jobs AS (
SELECT
*,
ROW_NUMBER() OVER(PARTITION BY workspace_id, job_id ORDER BY change_time DESC) AS rn
FROM
system.lakeflow.jobs QUALIFY rn=1
)
SELECT
t2.name,
t1.job_id,
t1.workspace_id,
t1.runs,
t1.run_as,
SUM(list_cost) AS list_cost,
t1.last_seen_date
FROM list_cost_per_job t1
LEFT JOIN most_recent_jobs t2 USING (workspace_id, job_id)
GROUP BY ALL
ORDER BY list_cost DESC
Potoki deklaratywne platformy Spark w usłudze Lakeflow
Dziennik zdarzeń Lakeflow Spark Deklaratywnych Potoków przechwytuje kompleksowy rekord wszystkich zdarzeń potoku, w tym:
- Dzienniki inspekcji.
- Kontrole jakości danych.
- Postęp rurociągu.
- Pochodzenie danych.
Dziennik zdarzeń jest automatycznie włączony dla wszystkich potoków deklaratywnych Lakeflow Spark i można uzyskać do niego dostęp za pośrednictwem:
- Interfejs użytkownika potoku: Przeglądaj dzienniki bezpośrednio.
- API potoków: dostęp za pośrednictwem programu.
- Zapytanie bezpośrednie: Kwerenda tabeli dziennika zdarzeń.
Aby uzyskać więcej informacji, zobacz Schemat dziennika zdarzeń dla Lakeflow Spark Deklaratywnych Potoków.
Przykładowe zapytania
Te przykładowe zapytania pomagają monitorować wydajność i kondycję potoków, udostępniając kluczowe metryki takie jak czas trwania partii, przepustowość, ciśnienie wsteczne oraz wykorzystanie zasobów.
Średni czas trwania serii
To zapytanie oblicza średni czas trwania partii przetwarzanych przez potok.
SELECT
(max_t - min_t) / batch_count as avg_batch_duration_seconds,
batch_count,
min_t,
max_t,
date_hr,
message
FROM
-- /60 for minutes
(
SELECT
count(*) as batch_count,
unix_timestamp(
min(timestamp)
) as min_t,
unix_timestamp(
max(timestamp)
) as max_t,
date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
message
FROM
event_log
WHERE
event_type = 'flow_progress'
AND level = 'METRICS'
GROUP BY
date_hr,
message
)
ORDER BY
date_hr desc
Średnia przepływność
To zapytanie oblicza średnią przepływność potoku pod względem przetworzonych wierszy na sekundę.
SELECT
(max_t - min_t) / total_rows as avg_throughput_rps,
total_rows,
min_t,
max_t,
date_hr,
message
FROM
-- /60 for minutes
(
SELECT
sum(
details:flow_progress:metrics:num_output_rows
) as total_rows,
unix_timestamp(
min(timestamp)
) as min_t,
unix_timestamp(
max(timestamp)
) as max_t,
date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
message
FROM
event_log
WHERE
event_type = 'flow_progress'
AND level = 'METRICS'
GROUP BY
date_hr,
message
)
ORDER BY
date_hr desc
Backpressure
Niniejsze zapytanie mierzy przeciążenie potoku, sprawdzając zaległości danych.
SELECT
timestamp,
DOUBLE(
details:flow_progress:metrics:backlog_bytes
) AS backlog_bytes,
DOUBLE(
details:flow_progress:metrics:backlog_files
) AS backlog_files
FROM
event_log
WHERE
event_type = 'flow_progress'
Wykorzystanie klastrów i gniazd
To zapytanie zawiera informacje o wykorzystaniu klastrów lub gniazd używanych przez potok.
SELECT
date_trunc("hour", timestamp) AS hour,
AVG (
DOUBLE (
details:cluster_resources:num_task_slots
)
) AS num_task_slots,
AVG (
DOUBLE (
details:cluster_resources:avg_num_task_slots
)
) AS avg_num_task_slots,
AVG (
DOUBLE (
details:cluster_resources:num_executors
)
) AS num_executors,
AVG (
DOUBLE (
details:cluster_resources:avg_task_slot_utilization
)
) AS avg_utilization,
AVG (
DOUBLE (
details:cluster_resources:avg_num_queued_tasks
)
) AS queue_size
FROM
event_log
WHERE
details : cluster_resources : avg_num_queued_tasks IS NOT NULL
AND origin.update_id = '${latest_update_id}'
GROUP BY
1;
Jobs
Można monitorować zapytania przesyłane strumieniowo w zadaniach za pomocą nasłuchiwacza zapytań strumieniowych.
Dołącz odbiornik do sesji platformy Spark, aby włączyć odbiornik zapytań przesyłanych strumieniowo w usłudzeAzure Databricks. Ten odbiornik będzie monitorować postęp i metryki zapytań przesyłanych strumieniowo. Może służyć do wypychania metryk do zewnętrznych narzędzi do monitorowania lub rejestrowania ich w celu dalszej analizy.
Przykład: Eksportowanie metryk do zewnętrznych narzędzi do monitorowania
Note
Jest to dostępne w środowisku Databricks Runtime 11.3 LTS i nowszym dla języków Python i Scala.
Metryki przesyłania strumieniowego można eksportować do usług zewnętrznych na potrzeby alertów lub pulpitów nawigacyjnych przy użyciu interfejsu StreamingQueryListener .
Oto podstawowy przykład implementacji odbiornika:
from pyspark.sql.streaming import StreamingQueryListener
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
print("Query started: ", event.id)
def onQueryProgress(self, event):
print("Query made progress: ", event.progress)
def onQueryTerminated(self, event):
print("Query terminated: ", event.id)
spark.streams.addListener(MyListener())
Przykład: używanie odbiornika zapytań w usłudze Azure Databricks
Poniżej przedstawiono przykład dziennika zdarzeń StreamingQueryListener dla zapytania przesyłania strumieniowego platformy Kafka do usługi Delta Lake:
{
"id": "210f4746-7caa-4a51-bd08-87cabb45bdbe",
"runId": "42a2f990-c463-4a9c-9aae-95d6990e63f4",
"timestamp": "2024-05-15T21:57:50.782Z",
"batchId": 0,
"batchDuration": 3601,
"numInputRows": 20,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 5.55401277422938,
"durationMs": {
"addBatch": 1544,
"commitBatch": 686,
"commitOffsets": 27,
"getBatch": 12,
"latestOffset": 577,
"queryPlanning": 105,
"triggerExecution": 3600,
"walCommit": 34
},
"stateOperators": [
{
"operatorName": "symmetricHashJoin",
"numRowsTotal": 20,
"numRowsUpdated": 20,
"allUpdatesTimeMs": 473,
"numRowsRemoved": 0,
"allRemovalsTimeMs": 0,
"commitTimeMs": 277,
"memoryUsedBytes": 13120,
"numRowsDroppedByWatermark": 0,
"numShufflePartitions": 5,
"numStateStoreInstances": 20,
"customMetrics": {
"loadedMapCacheHitCount": 0,
"loadedMapCacheMissCount": 0,
"stateOnCurrentVersionSizeBytes": 5280
}
}
],
"sources": [
{
"description": "KafkaV2[Subscribe[topic-1]]",
"numInputRows": 10,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 2.77700638711469,
"metrics": {
"avgOffsetsBehindLatest": "0.0",
"estimatedTotalBytesBehindLatest": "0.0",
"maxOffsetsBehindLatest": "0",
"minOffsetsBehindLatest": "0"
}
},
{
"description": "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
"numInputRows": 10,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 2.77700638711469,
"metrics": {
"numBytesOutstanding": "0",
"numFilesOutstanding": "0"
}
}
]
}
Aby uzyskać więcej przykładów, zobacz: Przykłady.
Metryki postępu zapytania
Metryki postępu zapytań są niezbędne do monitorowania wydajności i kondycji zapytań przesyłanych strumieniowo. Te metryki obejmują liczbę wierszy wejściowych, współczynniki przetwarzania i różne czasy trwania związane z wykonywaniem zapytania. Możesz obserwować te metryki, dołączając element StreamingQueryListener do sesji platformy Spark. Odbiornik będzie emitować zdarzenia zawierające te metryki na końcu każdej epoki przesyłania strumieniowego.
Na przykład możesz uzyskać dostęp do metryk używając StreamingQueryProgress.observedMetrics mapy w metodzie odbiornika onQueryProgress. Umożliwia to śledzenie i analizowanie wydajności zapytań przesyłanych strumieniowo w czasie rzeczywistym.
class MyListener(StreamingQueryListener):
def onQueryProgress(self, event):
print("Query made progress: ", event.progress.observedMetrics)