Samouczek: usługa Delta Lake
W tym samouczku przedstawiono typowe operacje usługi Delta Lake w usłudze Azure Databricks, w tym następujące elementy:
- Tworzenie tabeli.
- Wykonanie operacji upsert do tabeli.
- Odczyt z tabeli.
- Wyświetlanie historii tabeli.
- Wykonywanie zapytań względem wcześniejszej wersji tabeli.
- Optymalizowanie tabeli.
- Dodawanie indeksu zamówienia Z.
- Opróżnianie plików bez odniesienia.
W tym artykule możesz uruchomić przykładowy kod Python, R, Scala i SQL z poziomu notesu dołączonego do klastra usługi Azure Databricks. Możesz również uruchomić kod SQL w tym artykule z poziomu zapytania skojarzonego z usługą SQL Warehouse w usłudze Databricks SQL.
Uwaga
Niektóre z poniższych przykładów kodu używają notacji dwupoziomowej przestrzeni nazw składającej się ze schematu (nazywanego również bazą danych) i tabeli lub widoku (na przykład default.people10m
). Aby użyć tych przykładów z wykazem aparatu Unity, zastąp dwupoziomową przestrzeń nazw notacją wykazu aparatu Unity składającą się z wykazu, schematu i tabeli lub widoku (na przykład main.default.people10m
).
Tworzenie tabeli
Wszystkie tabele utworzone w usłudze Azure Databricks domyślnie używają usługi Delta Lake.
Uwaga
Usługa Delta Lake jest domyślną wartością dla wszystkich poleceń odczytu, zapisu i tworzenia tabel w usłudze Azure Databricks.
Python
# Load the data from its source.
df = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")
# Write the data to a table.
table_name = "people_10m"
df.write.saveAsTable(table_name)
R
library(SparkR)
sparkR.session()
# Load the data from its source.
df = read.df(path = "/databricks-datasets/learning-spark-v2/people/people-10m.delta")
# Write the data to a table.
table_name = "people_10m"
saveAsTable(
df = df,
tableName = table_name
)
Scala
// Load the data from its source.
val people = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")
// Write the data to a table.
val table_name = "people_10m"
people.write.saveAsTable("people_10m")
SQL
DROP TABLE IF EXISTS people_10m;
CREATE TABLE IF NOT EXISTS people_10m
AS SELECT * FROM delta.`/databricks-datasets/learning-spark-v2/people/people-10m.delta`;
Poprzednie operacje tworzą nową zarządzaną tabelę przy użyciu schematu, który został wywnioskowany z danych. Aby uzyskać informacje o dostępnych opcjach podczas tworzenia tabeli delty, zobacz CREATE TABLE (TWORZENIE TABELI).
W przypadku tabel zarządzanych usługa Azure Databricks określa lokalizację danych. Aby uzyskać lokalizację, możesz użyć instrukcji DESCRIBE DETAIL , na przykład:
Python
display(spark.sql('DESCRIBE DETAIL people_10m'))
R
display(sql("DESCRIBE DETAIL people_10m"))
Scala
display(spark.sql("DESCRIBE DETAIL people_10m"))
SQL
DESCRIBE DETAIL people_10m;
Czasami warto utworzyć tabelę, określając schemat przed wstawieniem danych. Można to wykonać za pomocą następujących poleceń SQL:
CREATE TABLE IF NOT EXISTS people10m (
id INT,
firstName STRING,
middleName STRING,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
)
CREATE OR REPLACE TABLE people10m (
id INT,
firstName STRING,
middleName STRING,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
)
W środowisku Databricks Runtime 13.0 lub nowszym można utworzyć CREATE TABLE LIKE
nową pustą tabelę delty, która duplikuje właściwości schematu i tabeli źródłowej tabeli delty. Może to być szczególnie przydatne podczas promowania tabel ze środowiska deweloperskiego do środowiska produkcyjnego, na przykład w poniższym przykładzie kodu:
CREATE TABLE prod.people10m LIKE dev.people10m
Do tworzenia tabel można również użyć interfejsu DeltaTableBuilder
API w usłudze Delta Lake. W porównaniu z interfejsami API DataFrameWriter ten interfejs API ułatwia określenie dodatkowych informacji, takich jak komentarze kolumn, właściwości tabeli i wygenerowane kolumny.
Ważne
Ta funkcja jest dostępna w publicznej wersji zapoznawczej.
Python
# Create table in the metastore
DeltaTable.createIfNotExists(spark) \
.tableName("default.people10m") \
.addColumn("id", "INT") \
.addColumn("firstName", "STRING") \
.addColumn("middleName", "STRING") \
.addColumn("lastName", "STRING", comment = "surname") \
.addColumn("gender", "STRING") \
.addColumn("birthDate", "TIMESTAMP") \
.addColumn("ssn", "STRING") \
.addColumn("salary", "INT") \
.execute()
# Create or replace table with path and add properties
DeltaTable.createOrReplace(spark) \
.addColumn("id", "INT") \
.addColumn("firstName", "STRING") \
.addColumn("middleName", "STRING") \
.addColumn("lastName", "STRING", comment = "surname") \
.addColumn("gender", "STRING") \
.addColumn("birthDate", "TIMESTAMP") \
.addColumn("ssn", "STRING") \
.addColumn("salary", "INT") \
.property("description", "table with people data") \
.location("/tmp/delta/people10m") \
.execute()
Scala
// Create table in the metastore
DeltaTable.createOrReplace(spark)
.tableName("default.people10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build())
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.execute()
// Create or replace table with path and add properties
DeltaTable.createOrReplace(spark)
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build())
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.property("description", "table with people data")
.location("/tmp/delta/people10m")
.execute()
Upsert do tabeli
Aby scalić zestaw aktualizacji i wstawiania do istniejącej tabeli delty, należy użyć instrukcji MERGE INTO . Na przykład poniższa instrukcja pobiera dane z tabeli źródłowej i scala je z docelową tabelą delty. Jeśli w obu tabelach istnieje pasujący wiersz, usługa Delta Lake aktualizuje kolumnę danych przy użyciu danego wyrażenia. Jeśli nie ma pasującego wiersza, usługa Delta Lake dodaje nowy wiersz. Ta operacja jest nazywana operacją upsert.
CREATE OR REPLACE TEMP VIEW people_updates (
id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
(9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
(10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
(20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
(20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
(20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);
MERGE INTO people_10m
USING people_updates
ON people_10m.id = people_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
Jeśli określisz *
wartość , spowoduje to zaktualizowanie lub wstawienie wszystkich kolumn w tabeli docelowej. Przyjęto założenie, że tabela źródłowa ma te same kolumny co w tabeli docelowej. W przeciwnym razie zapytanie zgłosi błąd analizy.
Należy określić wartość dla każdej kolumny w tabeli podczas wykonywania INSERT
operacji (na przykład gdy w istniejącym zestawie danych nie ma pasującego wiersza). Nie trzeba jednak aktualizować wszystkich wartości.
Aby wyświetlić wyniki, wykonaj zapytanie względem tabeli.
SELECT * FROM people_10m WHERE id >= 9999998
Odczytywanie tabeli
Dostęp do danych w tabelach delty można uzyskać według nazwy tabeli lub ścieżki tabeli, jak pokazano w następujących przykładach:
Python
people_df = spark.read.table(table_name)
display(people_df)
## or
people_df = spark.read.load(table_path)
display(people_df)
R
people_df = tableToDF(table_name)
display(people_df)
Scala
val people_df = spark.read.table(table_name)
display(people_df)
\\ or
val people_df = spark.read.load(table_path)
display(people_df)
SQL
SELECT * FROM people_10m;
SELECT * FROM delta.`<path-to-table`;
Zapisywanie w tabeli
Usługa Delta Lake używa standardowej składni do zapisywania danych w tabelach.
Aby niepodziecznie dodać nowe dane do istniejącej tabeli delty, użyj append
trybu, jak w następujących przykładach:
SQL
INSERT INTO people10m SELECT * FROM more_people
Python
df.write.mode("append").saveAsTable("people10m")
Scala
df.write.mode("append").saveAsTable("people10m")
Aby niepodziealnie zastąpić wszystkie dane w tabeli, użyj overwrite
trybu, jak w następujących przykładach:
SQL
INSERT OVERWRITE TABLE people10m SELECT * FROM more_people
Python
df.write.mode("overwrite").saveAsTable("people10m")
Scala
df.write.mode("overwrite").saveAsTable("people10m")
Aktualizowanie tabeli
Możesz zaktualizować dane zgodne z predykatem w tabeli delty. Na przykład w tabeli o nazwie people10m
lub ścieżce w /tmp/delta/people-10m
lokalizacji , aby zmienić skrót w gender
kolumnie z M
lub na Male
lub F
Female
, można uruchomić następujące polecenie:
SQL
UPDATE people10m SET gender = 'Female' WHERE gender = 'F';
UPDATE people10m SET gender = 'Male' WHERE gender = 'M';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Female' WHERE gender = 'F';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Male' WHERE gender = 'M';
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
condition = "gender = 'F'",
set = { "gender": "'Female'" }
)
# Declare the predicate by using Spark SQL functions.
deltaTable.update(
condition = col('gender') == 'M',
set = { 'gender': lit('Male') }
)
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
"gender = 'F'",
Map("gender" -> "'Female'")
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
col("gender") === "M",
Map("gender" -> lit("Male")));
Usuwanie z tabeli
Możesz usunąć dane zgodne z predykatem z tabeli delty. Na przykład w tabeli o nazwie people10m
lub ścieżce w /tmp/delta/people-10m
lokalizacji , aby usunąć wszystkie wiersze odpowiadające osobom z wartością w birthDate
kolumnie sprzed 1955
, można uruchomić następujące polecenie:
SQL
DELETE FROM people10m WHERE birthDate < '1955-01-01'
DELETE FROM delta.`/tmp/delta/people-10m` WHERE birthDate < '1955-01-01'
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")
Ważne
delete
Usuwa dane z najnowszej wersji tabeli delty, ale nie usuwa ich z magazynu fizycznego, dopóki stare wersje nie zostaną jawnie opróżnione. Zobacz czyszczenie , aby uzyskać szczegółowe informacje.
Wyświetlanie historii tabeli
Aby wyświetlić historię tabeli, użyj instrukcji DESCRIBE HISTORY , która zawiera informacje o pochodzenia, w tym wersję tabeli, operację, użytkownika itd., dla każdego zapisu w tabeli.
DESCRIBE HISTORY people_10m
Wykonywanie zapytań względem wcześniejszej wersji tabeli (podróż czasowa)
Podróż w czasie usługi Delta Lake umożliwia wykonywanie zapytań dotyczących starszej migawki tabeli delty.
Aby wysłać zapytanie do starszej wersji tabeli, określ wersję lub znacznik czasu w instrukcji SELECT
. Aby na przykład wysłać zapytanie o wersję 0 z powyższej historii, użyj:
SELECT * FROM people_10m VERSION AS OF 0
lub
SELECT * FROM people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'
W przypadku znaczników czasu akceptowane są tylko ciągi daty lub znacznika czasu, na przykład "2019-01-01"
i "2019-01-01'T'00:00:00.000Z"
.
Opcje elementu DataFrameReader umożliwiają utworzenie ramki danych na podstawie tabeli delty, która jest stała do określonej wersji tabeli, na przykład w języku Python:
df1 = spark.read.format('delta').option('timestampAsOf', '2019-01-01').table("people_10m")
display(df1)
lub, alternatywnie:
df2 = spark.read.format('delta').option('versionAsOf', 0).table("people_10m")
display(df2)
Aby uzyskać szczegółowe informacje, zobacz Praca z historią tabel usługi Delta Lake.
Optymalizowanie tabeli
Po wykonaniu wielu zmian w tabeli może istnieć wiele małych plików. Aby zwiększyć szybkość zapytań odczytu, możesz użyć OPTIMIZE
polecenia , aby zwinąć małe pliki na większe:
OPTIMIZE people_10m
Kolejność Z według kolumn
Aby jeszcze bardziej zwiększyć wydajność odczytu, możesz zlokalizować powiązane informacje w tym samym zestawie plików według kolejności Z. Ta współlokalność jest automatycznie używana przez algorytmy pomijania danych usługi Delta Lake w celu znacznego zmniejszenia ilości danych, które należy odczytać. W polu Z-Order data (Kolejność Z) należy określić kolumny do kolejności w klauzuli ZORDER BY
. Na przykład, aby współlokować według gender
polecenia , uruchom polecenie:
OPTIMIZE people_10m
ZORDER BY (gender)
Aby uzyskać pełny zestaw opcji dostępnych podczas uruchamiania OPTIMIZE
programu , zobacz Compact data files with optimize on Delta Lake (Kompaktowanie plików danych z optymalizacją w usłudze Delta Lake).
Czyszczenie migawek za pomocą polecenia VACUUM
Usługa Delta Lake zapewnia izolację migawek dla operacji odczytu, co oznacza, że uruchamianie OPTIMIZE
jest bezpieczne nawet wtedy, gdy inni użytkownicy lub zadania wysyłają zapytania do tabeli. W końcu jednak należy wyczyścić stare migawki. Możesz to zrobić, uruchamiając VACUUM
polecenie:
VACUUM people_10m
Aby uzyskać szczegółowe informacje na temat efektywnego używania VACUUM
, zobacz Usuwanie nieużywanych plików danych z próżnią.