Najlepsze praktyki dotyczące usługi Delta Lake

W tym artykule opisano najlepsze rozwiązania dotyczące korzystania z usługi Delta Lake.

Usługa Databricks zaleca korzystanie z optymalizacji predykcyjnej. Zobacz Optymalizacja predykcyjna usługi Delta Lake.

Podczas usuwania i ponownego tworzenia tabeli w tej samej lokalizacji należy zawsze użyć instrukcji CREATE OR REPLACE TABLE . Zobacz Usuwanie lub zastępowanie tabeli delty.

Używanie klastrowania liquid w celu zoptymalizowanego pomijania danych

Usługa Databricks zaleca używanie klastrowania liquid, a nie partycjonowania, kolejności Z lub innych strategii organizacji danych w celu optymalizacji układu danych pod kątem pomijania danych. Zobacz Użyj płynnego klastrowania dla tabel typu Delta).

Kompaktowanie plików

Optymalizacja predykcyjna jest uruchamiana automatycznie OPTIMIZE i VACUUM polecenia w tabelach zarządzanych wykazu aparatu Unity. Zobacz Optymalizacja predykcyjna usługi Delta Lake.

Usługa Databricks zaleca częste uruchamianie polecenia OPTIMIZE w celu kompaktowania małych plików.

Uwaga

Ta operacja nie powoduje usunięcia starych plików. Aby je usunąć, uruchom polecenie VACUUM .

Zastępowanie zawartości lub schematu tabeli

Czasami możesz zastąpić tabelę delty. Na przykład:

  • Dane w tabeli są niepoprawne i chcesz zamienić zawartość.
  • Chcesz ponownie napisać całą tabelę, aby wykonać niezgodne zmiany schematu (takie jak zmiana typów kolumn).

Chociaż można usunąć cały katalog tabeli delty i utworzyć nową tabelę w tej samej ścieżce, nie jest to zalecane , ponieważ:

  • Usuwanie katalogu nie jest wydajne. Usunięcie katalogu zawierającego bardzo duże pliki może potrwać kilka godzin, a nawet dni.
  • Utracisz całą zawartość usuniętych plików; Odzyskanie nieprawidłowej tabeli jest trudne.
  • Usunięcie katalogu nie jest niepodzielne. Podczas usuwania tabeli równoczesne odczytywanie tabeli może zakończyć się niepowodzeniem lub zobaczyć częściową tabelę.

Jeśli nie musisz zmieniać schematu tabeli, możesz usunąć dane z tabeli delty i wstawić nowe dane lub zaktualizować tabelę, aby naprawić nieprawidłowe wartości.

Jeśli chcesz zmienić schemat tabeli, możesz zastąpić całą tabelę niepodziealnie. Na przykład:

Python

dataframe.write \
  .format("delta") \
  .mode("overwrite") \
  .option("overwriteSchema", "true") \
  .saveAsTable("<your-table>") # Managed table

dataframe.write \
  .format("delta") \
  .mode("overwrite") \
  .option("overwriteSchema", "true") \
  .option("path", "<your-table-path>") \
  .saveAsTable("<your-table>") # External table

SQL

REPLACE TABLE <your-table> USING DELTA AS SELECT ... -- Managed table
REPLACE TABLE <your-table> USING DELTA LOCATION "<your-table-path>" AS SELECT ... -- External table

Scala

dataframe.write
  .format("delta")
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .saveAsTable("<your-table>") // Managed table

dataframe.write
  .format("delta")
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .option("path", "<your-table-path>")
  .saveAsTable("<your-table>") // External table

Istnieje wiele korzyści z tego podejścia:

  • Zastępowanie tabeli jest znacznie szybsze, ponieważ nie musi ponownie wyświetlać listy katalogów ani usuwać żadnych plików.
  • Stara wersja tabeli nadal istnieje. Jeśli usuniesz niewłaściwą tabelę, możesz łatwo pobrać stare dane przy użyciu podróży w czasie. Zobacz Praca z historią tabel platformy Delta Lake.
  • Jest to operacja niepodzielna. Zapytania współbieżne nadal mogą odczytywać tabelę podczas usuwania tabeli.
  • Ze względu na gwarancje transakcji Delta Lake ACID, jeśli zastąpienie tabeli zakończy się niepowodzeniem, tabela będzie w poprzednim stanie.

Ponadto, jeśli chcesz usunąć stare pliki, aby zaoszczędzić koszty magazynowania po zastąpieniu tabeli, możesz użyć funkcji VACUUM , aby je usunąć. Jest ona zoptymalizowana pod kątem usuwania plików i zwykle jest szybsza niż usuwanie całego katalogu.

Buforowanie platformy Spark

Usługa Databricks nie zaleca używania buforowania platformy Spark z następujących powodów:

  • Utracisz wszelkie pomijanie danych, które mogą pochodzić z dodatkowych filtrów dodanych w pamięci podręcznej DataFrame.
  • Dane, które są buforowane, mogą nie zostać zaktualizowane, jeśli do tabeli uzyskuje się dostęp przy użyciu innego identyfikatora.

Różnice między usługą Delta Lake i Parquet na platformie Apache Spark

Usługa Delta Lake automatycznie obsługuje następujące operacje. Nigdy nie należy wykonywać tych operacji ręcznie:

  • REFRESH TABLE: Tabele różnicowe zawsze zwracają najbardziej aktualne informacje, więc nie ma potrzeby ręcznego wywoływania REFRESH TABLE po zmianach.
  • Dodawanie i usuwanie partycji: usługa Delta Lake automatycznie śledzi zestaw partycji znajdujących się w tabeli i aktualizuje listę w miarę dodawania lub usuwania danych. W związku z tym nie ma potrzeby uruchamiania ALTER TABLE [ADD|DROP] PARTITION ani MSCK.
  • Ładowanie pojedynczej partycji: bezpośrednie odczytywanie partycji nie jest konieczne. Na przykład nie trzeba uruchamiać polecenia spark.read.format("parquet").load("/data/date=2017-01-01"). Zamiast tego należy użyć klauzuli WHERE pomijania danych, takiej jak spark.read.table("<table-name>").where("date = '2017-01-01'").
  • Nie modyfikuj ręcznie plików danych: usługa Delta Lake używa dziennika transakcji do zatwierdzania zmian w tabeli niepodzielnie. Nie należy bezpośrednio modyfikować, dodawać ani usuwać plików danych Parquet w tabeli delty, ponieważ może to prowadzić do utraty danych lub uszkodzenia tabeli.

Zwiększanie wydajności scalania usługi Delta Lake

Możesz skrócić czas potrzebny na scalenie, korzystając z następujących metod:

  • Zmniejsz przestrzeń wyszukiwania dla dopasowań: domyślnie merge operacja przeszukuje całą tabelę delty w celu znalezienia dopasowań w tabeli źródłowej. Jednym ze sposobów przyspieszenia merge jest zmniejszenie przestrzeni wyszukiwania przez dodanie znanych ograniczeń w warunku dopasowania. Załóżmy na przykład, że masz tabelę podzieloną na country partycje i date chcesz użyć merge jej do zaktualizowania informacji z ostatniego dnia i określonego kraju. Dodanie następującego warunku sprawia, że zapytanie jest szybsze, ponieważ wyszukuje dopasowania tylko w odpowiednich partycjach:

    events.date = current_date() AND events.country = 'USA'
    

    Ponadto to zapytanie zmniejsza również prawdopodobieństwo konfliktów z innymi operacjami współbieżnymi. Aby uzyskać więcej informacji, zobacz Poziomy izolacji i konflikty zapisu w usłudze Azure Databricks .

  • Pliki kompaktowe: jeśli dane są przechowywane w wielu małych plikach, odczytywanie danych w celu wyszukiwania dopasowań może stać się powolne. W celu zwiększenia przepływności odczytu można skompaktować małe pliki w większe pliki. Aby uzyskać szczegółowe informacje, zobacz Compact data files with optimize on Delta Lake (Kompaktowanie plików danych z optymalizacją w usłudze Delta Lake ).

  • Kontroluj partycje mieszania dla operacji zapisu: merge operacja tasuje dane wiele razy w celu obliczenia i zapisania zaktualizowanych danych. Liczba zadań używanych do mieszania jest kontrolowana przez konfigurację spark.sql.shuffle.partitionssesji platformy Spark. Ustawienie tego parametru nie tylko steruje równoległością, ale także określa liczbę plików wyjściowych. Zwiększenie wartości zwiększa równoległość, ale także generuje większą liczbę mniejszych plików danych.

  • Włącz zoptymalizowane zapisy: w przypadku tabel merge partycjonowanych może utworzyć znacznie większą liczbę małych plików niż liczba partycji mieszania. Dzieje się tak, ponieważ każde zadanie mieszania może zapisywać wiele plików w wielu partycjach i może stać się wąskim gardłem wydajności. Liczbę plików można zmniejszyć, włączając zoptymalizowane zapisy. Zobacz Zoptymalizowane zapisy dla usługi Delta Lake w usłudze Azure Databricks.

  • Dostrajanie rozmiarów plików w tabeli: usługa Azure Databricks może automatycznie wykrywać, czy tabela delty ma częste merge operacje, które ponownie zapisują pliki i mogą zmniejszyć rozmiar plików przepisanych w oczekiwaniu na dalsze ponowne zapisywanie plików w przyszłości. Aby uzyskać szczegółowe informacje, zobacz sekcję dotyczącą dostrajania rozmiarów plików.

  • Low Shuffle Merge: Low Shuffle Merge zapewnia zoptymalizowaną implementację MERGE , która zapewnia lepszą wydajność dla najbardziej typowych obciążeń. Ponadto zachowuje istniejące optymalizacje układu danych, takie jak porządkowanie Z na niezmodyfikowanych danych.

Zarządzanie recencyjnością danych

Na początku każdego zapytania tabele delty są automatycznie aktualizowane do najnowszej wersji tabeli. Ten proces można zaobserwować w notesach, gdy polecenie zgłasza stan polecenia: Updating the Delta table's state. Jednak podczas uruchamiania analizy historycznej w tabeli może nie być konieczne pobieranie danych z ostatnich minut, zwłaszcza w przypadku tabel, w których często pozyskiwane są dane przesyłane strumieniowo. W takich przypadkach zapytania mogą być uruchamiane na nieaktualnych migawkach tabeli delty. Takie podejście może zmniejszyć opóźnienie w uzyskiwaniu wyników z zapytań.

Można skonfigurować tolerancję dla nieaktualnych danych, ustawiając konfigurację spark.databricks.delta.stalenessLimit sesji platformy Spark z wartością ciągu czasu, taką jak 1h lub 15m (odpowiednio przez 1 godzinę lub 15 minut). Ta konfiguracja jest specyficzna dla sesji i nie ma wpływu na innych klientów, którzy uzyskują dostęp do tabeli. Jeśli stan tabeli został zaktualizowany w limicie nieaktualności, zapytanie względem tabeli zwraca wyniki bez oczekiwania na najnowszą aktualizację tabeli. To ustawienie nigdy nie uniemożliwia aktualizowaniu tabeli, a gdy są zwracane nieaktualne dane, proces aktualizacji w tle. Jeśli ostatnia aktualizacja tabeli jest starsza niż limit nieaktualności, zapytanie nie zwraca wyników do momentu zakończenia aktualizacji stanu tabeli.

Ulepszone punkty kontrolne dla zapytań o małych opóźnieniach

Usługa Delta Lake zapisuje punkty kontrolne jako stan agregacji tabeli delty z zoptymalizowaną częstotliwością. Te punkty kontrolne służą jako punkt wyjścia do obliczenia najnowszego stanu tabeli. Bez punktów kontrolnych usługa Delta Lake musiałaby odczytać dużą kolekcję plików JSON ("pliki delta" reprezentujące zatwierdzenia w dzienniku transakcji w celu obliczenia stanu tabeli. Ponadto statystyki na poziomie kolumny usługi Delta Lake używają funkcji delta lake do wykonywania pomijania danych są przechowywane w punkcie kontrolnym.

Ważne

Punkty kontrolne usługi Delta Lake różnią się od punktów kontrolnych przesyłania strumieniowego ze strukturą.

Statystyki na poziomie kolumny są przechowywane jako struktura i kod JSON (w celu zapewnienia zgodności z poprzednimi wersjami). Format struktury sprawia, że funkcja Delta Lake odczytuje znacznie szybciej, ponieważ:

  • Usługa Delta Lake nie wykonuje kosztownej analizy JSON w celu uzyskania statystyk na poziomie kolumny.
  • Możliwości oczyszczania kolumn Parquet znacznie zmniejszają liczbę operacji we/wy wymaganych do odczytania statystyk dla kolumny.

Format struktury umożliwia zbieranie optymalizacji, które zmniejszają obciążenie operacji odczytu usługi Delta Lake od sekund do dziesiątek milisekund, co znacznie zmniejsza opóźnienie w przypadku krótkich zapytań.

Zarządzanie statystykami na poziomie kolumny w punktach kontrolnych

Zarządzasz sposobem zapisywania statystyk w punktach kontrolnych przy użyciu właściwości delta.checkpoint.writeStatsAsJson tabeli i delta.checkpoint.writeStatsAsStruct. Jeśli obie właściwości tabeli to false, usługa Delta Lake nie może wykonać pomijania danych.

  • Usługa Batch zapisuje statystyki zapisu zarówno w formacie JSON, jak i w formacie struktury. Parametr delta.checkpoint.writeStatsAsJson ma wartość true.
  • delta.checkpoint.writeStatsAsStruct jest domyślnie niezdefiniowany.
  • Czytelnicy używają kolumny struktury, gdy są dostępne, a w przeciwnym razie wracają do korzystania z kolumny JSON.

Ważne

Ulepszone punkty kontrolne nie przerywają zgodności z czytnikami usługi Delta Lake typu open source. Jednak ustawienie delta.checkpoint.writeStatsAsJson wartości false może mieć wpływ na zastrzeżone czytniki usługi Delta Lake. Skontaktuj się z dostawcami, aby dowiedzieć się więcej o implikacjach dotyczących wydajności.

Włączanie rozszerzonych punktów kontrolnych dla zapytań przesyłania strumieniowego ze strukturą

Jeśli obciążenia przesyłania strumieniowego ze strukturą nie mają wymagań dotyczących małych opóźnień (opóźnienia podrzędne), możesz włączyć rozszerzone punkty kontrolne, uruchamiając następujące polecenie SQL:

ALTER TABLE [<table-name>|delta.`<path-to-table>`] SET TBLPROPERTIES
('delta.checkpoint.writeStatsAsStruct' = 'true')

Możesz również poprawić opóźnienie zapisu punktu kontrolnego, ustawiając następujące właściwości tabeli:

ALTER TABLE [<table-name>|delta.`<path-to-table>`] SET TBLPROPERTIES
(
 'delta.checkpoint.writeStatsAsStruct' = 'true',
 'delta.checkpoint.writeStatsAsJson' = 'false'
)

Jeśli pomijanie danych nie jest przydatne w aplikacji, możesz ustawić obie właściwości na wartość false. Następnie nie są zbierane ani zapisywane żadne statystyki. Usługa Databricks nie zaleca tej konfiguracji.