Oktatóanyag: Delta Lake

Ez az oktatóanyag bemutatja az Azure Databricks gyakori Delta Lake-műveleteit, beleértve a következőket:

A cikkben szereplő Python-, R-, Scala- és SQL-kódot egy Azure Databricks-fürthöz csatolt jegyzetfüzetből futtathatja. A cikkben szereplő SQL-kódot a Databricks SQL-ben lévő SQL-raktárhoztársított lekérdezésből is futtathatja.

Feljegyzés

Az alábbi példakódok egy kétszintű névtér-jelölést használnak, amely egy sémából (más néven adatbázisból) és egy táblából vagy nézetből (például default.people10m) áll. Ha ezeket a példákat a Unity Catalogra szeretné használni, cserélje le a kétszintű névteret egy katalógusból, sémából és táblázatból vagy nézetből (példáulmain.default.people10m) álló Unity Catalog háromszintű névtér-jelölésre.

Tábla létrehozása

Az Azure Databricksben létrehozott összes tábla alapértelmezés szerint a Delta Lake-t használja.

Feljegyzés

A Delta Lake az azure Databricks összes olvasási, írási és táblázat-létrehozási parancsának alapértelmezett értéke.

Python

# Load the data from its source.
df = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")

# Write the data to a table.
table_name = "people_10m"
df.write.saveAsTable(table_name)

R

library(SparkR)
sparkR.session()

# Load the data from its source.
df = read.df(path = "/databricks-datasets/learning-spark-v2/people/people-10m.delta")

# Write the data to a table.
table_name = "people_10m"

saveAsTable(
  df = df,
  tableName = table_name
)

Scala

// Load the data from its source.
val people = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")

// Write the data to a table.
val table_name = "people_10m"

people.write.saveAsTable("people_10m")

SQL

DROP TABLE IF EXISTS people_10m;

CREATE TABLE IF NOT EXISTS people_10m
AS SELECT * FROM delta.`/databricks-datasets/learning-spark-v2/people/people-10m.delta`;

Az előző műveletek létrehoznak egy új felügyelt táblát az adatokból levont sémával. A Delta-tábla létrehozásakor elérhető lehetőségekről további információt a CREATE TABLE (TÁBLA LÉTREHOZÁSA) című témakörben talál.

Felügyelt táblák esetén az Azure Databricks határozza meg az adatok helyét. A hely lekéréséhez használhatja a DESCRIBE DETAIL utasítást, például:

Python

display(spark.sql('DESCRIBE DETAIL people_10m'))

R

display(sql("DESCRIBE DETAIL people_10m"))

Scala

display(spark.sql("DESCRIBE DETAIL people_10m"))

SQL

DESCRIBE DETAIL people_10m;

Előfordulhat, hogy az adatok beszúrása előtt a séma megadásával szeretne táblázatot létrehozni. Ezt a következő SQL-parancsokkal hajthatja végre:

CREATE TABLE IF NOT EXISTS people10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
)

CREATE OR REPLACE TABLE people10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
)

A Databricks Runtime 13.3 LTS-ben és újabb verziókban létrehozhat egy új üres Delta-táblát, CREATE TABLE LIKE amely duplikálja a forrás Delta-tábla séma- és táblatulajdonságait. Ez különösen hasznos lehet a táblák fejlesztési környezetből éles környezetbe való előléptetéséhez, például a következő kód példájában:

CREATE TABLE prod.people10m LIKE dev.people10m

A Delta Lake API-jának használatával DeltaTableBuilder táblákat is létrehozhat. A DataFrameWriter API-khoz képest ez az API megkönnyíti a további információk megadását, például az oszlopbejegyzéseket, a táblatulajdonságokat és a létrehozott oszlopokat.

Fontos

Ez a funkció a nyilvános előzetes verzióban érhető el.

Python

# Create table in the metastore
DeltaTable.createIfNotExists(spark) \
  .tableName("default.people10m") \
  .addColumn("id", "INT") \
  .addColumn("firstName", "STRING") \
  .addColumn("middleName", "STRING") \
  .addColumn("lastName", "STRING", comment = "surname") \
  .addColumn("gender", "STRING") \
  .addColumn("birthDate", "TIMESTAMP") \
  .addColumn("ssn", "STRING") \
  .addColumn("salary", "INT") \
  .execute()

# Create or replace table with path and add properties
DeltaTable.createOrReplace(spark) \
  .addColumn("id", "INT") \
  .addColumn("firstName", "STRING") \
  .addColumn("middleName", "STRING") \
  .addColumn("lastName", "STRING", comment = "surname") \
  .addColumn("gender", "STRING") \
  .addColumn("birthDate", "TIMESTAMP") \
  .addColumn("ssn", "STRING") \
  .addColumn("salary", "INT") \
  .property("description", "table with people data") \
  .location("/tmp/delta/people10m") \
  .execute()

Scala

// Create table in the metastore
DeltaTable.createOrReplace(spark)
  .tableName("default.people10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

// Create or replace table with path and add properties
DeltaTable.createOrReplace(spark)
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .property("description", "table with people data")
  .location("/tmp/delta/people10m")
  .execute()

Upsert to a table

Ha frissítéseket és beszúrásokat szeretne egy meglévő Delta-táblába egyesíteni, használja a MERGE INTO utasítást . A következő utasítás például adatokat vesz fel a forrástáblából, és egyesíti őket a cél Delta-táblával. Ha mindkét táblában egyező sor található, a Delta Lake a megadott kifejezéssel frissíti az adatoszlopot. Ha nincs egyező sor, a Delta Lake új sort ad hozzá. Ez a művelet upsert néven ismert.

CREATE OR REPLACE TEMP VIEW people_updates (
  id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);

MERGE INTO people_10m
USING people_updates
ON people_10m.id = people_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

Ha megadja *, ez frissíti vagy beszúrja a céltábla összes oszlopát. Ez feltételezi, hogy a forrástábla oszlopai megegyeznek a céltáblában szereplő oszlopokkal, ellenkező esetben a lekérdezés elemzési hibát jelez.

A művelet végrehajtásakor INSERT (például ha a meglévő adathalmazban nincs egyező sor) meg kell adnia egy értéket a tábla minden oszlopához. Azonban nem kell frissítenie az összes értéket.

Az eredmények megtekintéséhez kérdezze le a táblát.

SELECT * FROM people_10m WHERE id >= 9999998

Táblázat olvasása

A Delta-táblák adatait a táblanév vagy a tábla elérési útja alapján érheti el, ahogyan az alábbi példákban is látható:

Python

people_df = spark.read.table(table_name)

display(people_df)

## or

people_df = spark.read.load(table_path)

display(people_df)

R

people_df = tableToDF(table_name)

display(people_df)

Scala

val people_df = spark.read.table(table_name)

display(people_df)

\\ or

val people_df = spark.read.load(table_path)

display(people_df)

SQL

SELECT * FROM people_10m;

SELECT * FROM delta.`<path-to-table`;

Írás táblába

A Delta Lake szabványos szintaxist használ az adatok táblákba való írásához.

Ha új adatokat szeretne atomilag hozzáadni egy meglévő Delta-táblához, használja append a módot az alábbi példákhoz hasonlóan:

SQL

INSERT INTO people10m SELECT * FROM more_people

Python

df.write.mode("append").saveAsTable("people10m")

Scala

df.write.mode("append").saveAsTable("people10m")

A tábla összes adatának atomi cseréjéhez használja overwrite a módot az alábbi példákhoz hasonlóan:

SQL

INSERT OVERWRITE TABLE people10m SELECT * FROM more_people

Python

df.write.mode("overwrite").saveAsTable("people10m")

Scala

df.write.mode("overwrite").saveAsTable("people10m")

Tábla frissítése

A Delta-táblák predikátumának megfelelő adatokat frissítheti. Például egy névvel ellátott people10m táblában vagy a következő /tmp/delta/people-10melérési úton módosíthatja az oszlopban MFemaleMaleF lévő gender rövidítést:

SQL

UPDATE people10m SET gender = 'Female' WHERE gender = 'F';
UPDATE people10m SET gender = 'Male' WHERE gender = 'M';

UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Female' WHERE gender = 'F';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Male' WHERE gender = 'M';

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')

# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
  condition = "gender = 'F'",
  set = { "gender": "'Female'" }
)

# Declare the predicate by using Spark SQL functions.
deltaTable.update(
  condition = col('gender') == 'M',
  set = { 'gender': lit('Male') }
)

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
  "gender = 'F'",
  Map("gender" -> "'Female'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
  col("gender") === "M",
  Map("gender" -> lit("Male")));

Törlés táblából

Egy predikátumnak megfelelő adatokat eltávolíthat egy Delta-táblából. Például egy névvel ellátott people10m táblában vagy a következő /tmp/delta/people-10melérési úton törölheti az oszlopban birthDate1955lévő értékekkel rendelkező személyeknek megfelelő összes sort, a következőt futtathatja:

SQL

DELETE FROM people10m WHERE birthDate < '1955-01-01'

DELETE FROM delta.`/tmp/delta/people-10m` WHERE birthDate < '1955-01-01'

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')

# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")

Fontos

delete Eltávolítja az adatokat a Delta-tábla legújabb verziójából, de nem távolítja el azokat a fizikai tárolóból, amíg a régi verziók nem lesznek explicit módon porszívózva. Részletekért lásd a vákuumot .

Táblaelőzmények megjelenítése

Egy tábla előzményeinek megtekintéséhez használja a DESCRIBE HISTORY utasítást, amely az egyes táblákba történő íráshoz a táblaverziót, a műveletet, a felhasználót stb. is tartalmazza.

DESCRIBE HISTORY people_10m

A tábla egy korábbi verziójának lekérdezése (időutazás)

A Delta Lake időutazással lekérdezheti egy Delta-tábla régebbi pillanatképét.

A tábla régebbi verziójának lekérdezéséhez adjon meg egy verziót vagy időbélyeget egy SELECT utasításban. Ha például a fenti előzményekből szeretné lekérdezni a 0-s verziót, használja a következőt:

SELECT * FROM people_10m VERSION AS OF 0

vagy

SELECT * FROM people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'

Időbélyegek esetén a rendszer csak dátum- vagy időbélyeg-sztringeket fogad el, például "2019-01-01" és "2019-01-01'T'00:00:00.000Z".

A DataFrameReader beállításai lehetővé teszik, hogy dataFrame-et hozzon létre egy Delta-táblából, amely a tábla egy adott verziójára van rögzítve, például Pythonban:

df1 = spark.read.format('delta').option('timestampAsOf', '2019-01-01').table("people_10m")

display(df1)

vagy másik lehetőségként:

df2 = spark.read.format('delta').option('versionAsOf', 0).table("people_10m")

display(df2)

További részletekért lásd : A Delta Lake táblaelőzményeinek ismertetése.

Táblázat optimalizálása

Ha több módosítást hajtott végre egy táblán, előfordulhat, hogy sok kis fájllal rendelkezik. Az olvasási lekérdezések sebességének javítása érdekében kis méretű fájlokat összecsukhat OPTIMIZE nagyobbakra:

OPTIMIZE people_10m

Z-sorrend oszlopok szerint

Az olvasási teljesítmény további javítása érdekében a Z-Ordering segítségével azonos fájlkészletben keresheti meg a kapcsolódó információkat. Ezt a közös helységet a Delta Lake adatátugrási algoritmusai automatikusan használják az olvasni kívánt adatok mennyiségének drasztikus csökkentésére. A Z-Order adatokhoz meg kell adnia a záradékban a sorrendbe ZORDER BY rendezendő oszlopokat. Például a társkereséshez futtassa a következőt gender:

OPTIMIZE people_10m
ZORDER BY (gender)

A futtatáskor OPTIMIZEelérhető összes beállításért tekintse meg a Delta Lake-beli optimalizálással rendelkező adatfájlok tömörítését.

Pillanatképek törlése a VACUUM

A Delta Lake pillanatkép-elkülönítést biztosít az olvasásokhoz, ami azt jelenti, hogy biztonságosan futtatható OPTIMIZE , még akkor is, ha más felhasználók vagy feladatok kérdezik le a táblát. Végül azonban törölnie kell a régi pillanatképeket. Ezt a parancs futtatásával VACUUM teheti meg:

VACUUM people_10m

A hatékony használatról VACUUM további információt a nem használt adatfájlok eltávolítása vákuummal című témakörben talál.