Samouczek: usługa Delta Lake

W tym samouczku przedstawiono typowe operacje usługi Delta Lake w usłudze Azure Databricks, w tym następujące elementy:

W tym artykule możesz uruchomić przykładowy kod Python, R, Scala i SQL z poziomu notesu dołączonego do klastra usługi Azure Databricks. Możesz również uruchomić kod SQL w tym artykule z poziomu zapytania skojarzonego z usługą SQL Warehouse w usłudze Databricks SQL.

Uwaga

Niektóre z poniższych przykładów kodu używają notacji dwupoziomowej przestrzeni nazw składającej się ze schematu (nazywanego również bazą danych) i tabeli lub widoku (na przykład default.people10m). Aby użyć tych przykładów z wykazem aparatu Unity, zastąp dwupoziomową przestrzeń nazw notacją wykazu aparatu Unity składającą się z wykazu, schematu i tabeli lub widoku (na przykład main.default.people10m).

Tworzenie tabeli

Wszystkie tabele utworzone w usłudze Azure Databricks domyślnie używają usługi Delta Lake.

Uwaga

Usługa Delta Lake jest domyślną wartością dla wszystkich poleceń odczytu, zapisu i tworzenia tabel w usłudze Azure Databricks.

Python

# Load the data from its source.
df = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")

# Write the data to a table.
table_name = "people_10m"
df.write.saveAsTable(table_name)

R

library(SparkR)
sparkR.session()

# Load the data from its source.
df = read.df(path = "/databricks-datasets/learning-spark-v2/people/people-10m.delta")

# Write the data to a table.
table_name = "people_10m"

saveAsTable(
  df = df,
  tableName = table_name
)

Scala

// Load the data from its source.
val people = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")

// Write the data to a table.
val table_name = "people_10m"

people.write.saveAsTable("people_10m")

SQL

DROP TABLE IF EXISTS people_10m;

CREATE TABLE IF NOT EXISTS people_10m
AS SELECT * FROM delta.`/databricks-datasets/learning-spark-v2/people/people-10m.delta`;

Poprzednie operacje tworzą nową zarządzaną tabelę przy użyciu schematu, który został wywnioskowany z danych. Aby uzyskać informacje o dostępnych opcjach podczas tworzenia tabeli delty, zobacz CREATE TABLE (TWORZENIE TABELI).

W przypadku tabel zarządzanych usługa Azure Databricks określa lokalizację danych. Aby uzyskać lokalizację, możesz użyć instrukcji DESCRIBE DETAIL , na przykład:

Python

display(spark.sql('DESCRIBE DETAIL people_10m'))

R

display(sql("DESCRIBE DETAIL people_10m"))

Scala

display(spark.sql("DESCRIBE DETAIL people_10m"))

SQL

DESCRIBE DETAIL people_10m;

Czasami warto utworzyć tabelę, określając schemat przed wstawieniem danych. Można to wykonać za pomocą następujących poleceń SQL:

CREATE TABLE IF NOT EXISTS people10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
)

CREATE OR REPLACE TABLE people10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
)

W środowisku Databricks Runtime 13.0 lub nowszym można utworzyć CREATE TABLE LIKE nową pustą tabelę delty, która duplikuje właściwości schematu i tabeli źródłowej tabeli delty. Może to być szczególnie przydatne podczas promowania tabel ze środowiska deweloperskiego do środowiska produkcyjnego, na przykład w poniższym przykładzie kodu:

CREATE TABLE prod.people10m LIKE dev.people10m

Do tworzenia tabel można również użyć interfejsu DeltaTableBuilder API w usłudze Delta Lake. W porównaniu z interfejsami API DataFrameWriter ten interfejs API ułatwia określenie dodatkowych informacji, takich jak komentarze kolumn, właściwości tabeli i wygenerowane kolumny.

Ważne

Ta funkcja jest dostępna w publicznej wersji zapoznawczej.

Python

# Create table in the metastore
DeltaTable.createIfNotExists(spark) \
  .tableName("default.people10m") \
  .addColumn("id", "INT") \
  .addColumn("firstName", "STRING") \
  .addColumn("middleName", "STRING") \
  .addColumn("lastName", "STRING", comment = "surname") \
  .addColumn("gender", "STRING") \
  .addColumn("birthDate", "TIMESTAMP") \
  .addColumn("ssn", "STRING") \
  .addColumn("salary", "INT") \
  .execute()

# Create or replace table with path and add properties
DeltaTable.createOrReplace(spark) \
  .addColumn("id", "INT") \
  .addColumn("firstName", "STRING") \
  .addColumn("middleName", "STRING") \
  .addColumn("lastName", "STRING", comment = "surname") \
  .addColumn("gender", "STRING") \
  .addColumn("birthDate", "TIMESTAMP") \
  .addColumn("ssn", "STRING") \
  .addColumn("salary", "INT") \
  .property("description", "table with people data") \
  .location("/tmp/delta/people10m") \
  .execute()

Scala

// Create table in the metastore
DeltaTable.createOrReplace(spark)
  .tableName("default.people10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

// Create or replace table with path and add properties
DeltaTable.createOrReplace(spark)
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .property("description", "table with people data")
  .location("/tmp/delta/people10m")
  .execute()

Upsert do tabeli

Aby scalić zestaw aktualizacji i wstawiania do istniejącej tabeli delty, należy użyć instrukcji MERGE INTO . Na przykład poniższa instrukcja pobiera dane z tabeli źródłowej i scala je z docelową tabelą delty. Jeśli w obu tabelach istnieje pasujący wiersz, usługa Delta Lake aktualizuje kolumnę danych przy użyciu danego wyrażenia. Jeśli nie ma pasującego wiersza, usługa Delta Lake dodaje nowy wiersz. Ta operacja jest nazywana operacją upsert.

CREATE OR REPLACE TEMP VIEW people_updates (
  id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);

MERGE INTO people_10m
USING people_updates
ON people_10m.id = people_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

Jeśli określisz *wartość , spowoduje to zaktualizowanie lub wstawienie wszystkich kolumn w tabeli docelowej. Przyjęto założenie, że tabela źródłowa ma te same kolumny co w tabeli docelowej. W przeciwnym razie zapytanie zgłosi błąd analizy.

Należy określić wartość dla każdej kolumny w tabeli podczas wykonywania INSERT operacji (na przykład gdy w istniejącym zestawie danych nie ma pasującego wiersza). Nie trzeba jednak aktualizować wszystkich wartości.

Aby wyświetlić wyniki, wykonaj zapytanie względem tabeli.

SELECT * FROM people_10m WHERE id >= 9999998

Odczytywanie tabeli

Dostęp do danych w tabelach delty można uzyskać według nazwy tabeli lub ścieżki tabeli, jak pokazano w następujących przykładach:

Python

people_df = spark.read.table(table_name)

display(people_df)

## or

people_df = spark.read.load(table_path)

display(people_df)

R

people_df = tableToDF(table_name)

display(people_df)

Scala

val people_df = spark.read.table(table_name)

display(people_df)

\\ or

val people_df = spark.read.load(table_path)

display(people_df)

SQL

SELECT * FROM people_10m;

SELECT * FROM delta.`<path-to-table`;

Zapisywanie w tabeli

Usługa Delta Lake używa standardowej składni do zapisywania danych w tabelach.

Aby niepodziecznie dodać nowe dane do istniejącej tabeli delty, użyj append trybu, jak w następujących przykładach:

SQL

INSERT INTO people10m SELECT * FROM more_people

Python

df.write.mode("append").saveAsTable("people10m")

Scala

df.write.mode("append").saveAsTable("people10m")

Aby niepodziealnie zastąpić wszystkie dane w tabeli, użyj overwrite trybu, jak w następujących przykładach:

SQL

INSERT OVERWRITE TABLE people10m SELECT * FROM more_people

Python

df.write.mode("overwrite").saveAsTable("people10m")

Scala

df.write.mode("overwrite").saveAsTable("people10m")

Aktualizowanie tabeli

Możesz zaktualizować dane zgodne z predykatem w tabeli delty. Na przykład w tabeli o nazwie people10m lub ścieżce w /tmp/delta/people-10mlokalizacji , aby zmienić skrót w gender kolumnie z M lub na Male lub FFemale, można uruchomić następujące polecenie:

SQL

UPDATE people10m SET gender = 'Female' WHERE gender = 'F';
UPDATE people10m SET gender = 'Male' WHERE gender = 'M';

UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Female' WHERE gender = 'F';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Male' WHERE gender = 'M';

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')

# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
  condition = "gender = 'F'",
  set = { "gender": "'Female'" }
)

# Declare the predicate by using Spark SQL functions.
deltaTable.update(
  condition = col('gender') == 'M',
  set = { 'gender': lit('Male') }
)

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
  "gender = 'F'",
  Map("gender" -> "'Female'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
  col("gender") === "M",
  Map("gender" -> lit("Male")));

Usuwanie z tabeli

Możesz usunąć dane zgodne z predykatem z tabeli delty. Na przykład w tabeli o nazwie people10m lub ścieżce w /tmp/delta/people-10mlokalizacji , aby usunąć wszystkie wiersze odpowiadające osobom z wartością w birthDate kolumnie sprzed 1955, można uruchomić następujące polecenie:

SQL

DELETE FROM people10m WHERE birthDate < '1955-01-01'

DELETE FROM delta.`/tmp/delta/people-10m` WHERE birthDate < '1955-01-01'

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')

# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")

Ważne

delete Usuwa dane z najnowszej wersji tabeli delty, ale nie usuwa ich z magazynu fizycznego, dopóki stare wersje nie zostaną jawnie opróżnione. Zobacz czyszczenie , aby uzyskać szczegółowe informacje.

Wyświetlanie historii tabeli

Aby wyświetlić historię tabeli, użyj instrukcji DESCRIBE HISTORY , która zawiera informacje o pochodzenia, w tym wersję tabeli, operację, użytkownika itd., dla każdego zapisu w tabeli.

DESCRIBE HISTORY people_10m

Wykonywanie zapytań względem wcześniejszej wersji tabeli (podróż czasowa)

Podróż w czasie usługi Delta Lake umożliwia wykonywanie zapytań dotyczących starszej migawki tabeli delty.

Aby wysłać zapytanie do starszej wersji tabeli, określ wersję lub znacznik czasu w instrukcji SELECT . Aby na przykład wysłać zapytanie o wersję 0 z powyższej historii, użyj:

SELECT * FROM people_10m VERSION AS OF 0

lub

SELECT * FROM people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'

W przypadku znaczników czasu akceptowane są tylko ciągi daty lub znacznika czasu, na przykład "2019-01-01" i "2019-01-01'T'00:00:00.000Z".

Opcje elementu DataFrameReader umożliwiają utworzenie ramki danych na podstawie tabeli delty, która jest stała do określonej wersji tabeli, na przykład w języku Python:

df1 = spark.read.format('delta').option('timestampAsOf', '2019-01-01').table("people_10m")

display(df1)

lub, alternatywnie:

df2 = spark.read.format('delta').option('versionAsOf', 0).table("people_10m")

display(df2)

Aby uzyskać szczegółowe informacje, zobacz Praca z historią tabel usługi Delta Lake.

Optymalizowanie tabeli

Po wykonaniu wielu zmian w tabeli może istnieć wiele małych plików. Aby zwiększyć szybkość zapytań odczytu, możesz użyć OPTIMIZE polecenia , aby zwinąć małe pliki na większe:

OPTIMIZE people_10m

Kolejność Z według kolumn

Aby jeszcze bardziej zwiększyć wydajność odczytu, możesz zlokalizować powiązane informacje w tym samym zestawie plików według kolejności Z. Ta współlokalność jest automatycznie używana przez algorytmy pomijania danych usługi Delta Lake w celu znacznego zmniejszenia ilości danych, które należy odczytać. W polu Z-Order data (Kolejność Z) należy określić kolumny do kolejności w klauzuli ZORDER BY . Na przykład, aby współlokować według genderpolecenia , uruchom polecenie:

OPTIMIZE people_10m
ZORDER BY (gender)

Aby uzyskać pełny zestaw opcji dostępnych podczas uruchamiania OPTIMIZEprogramu , zobacz Compact data files with optimize on Delta Lake (Kompaktowanie plików danych z optymalizacją w usłudze Delta Lake).

Czyszczenie migawek za pomocą polecenia VACUUM

Usługa Delta Lake zapewnia izolację migawek dla operacji odczytu, co oznacza, że uruchamianie OPTIMIZE jest bezpieczne nawet wtedy, gdy inni użytkownicy lub zadania wysyłają zapytania do tabeli. W końcu jednak należy wyczyścić stare migawki. Możesz to zrobić, uruchamiając VACUUM polecenie:

VACUUM people_10m

Aby uzyskać szczegółowe informacje na temat efektywnego używania VACUUM , zobacz Usuwanie nieużywanych plików danych z próżnią.