Megosztás a következőn keresztül:


Oktatóanyag: Delta Lake

Ez az oktatóanyag bemutatja az Azure Databricks gyakori Delta Lake-műveleteit, beleértve a következőket:

A jelen cikkben szereplő Python-, Scala- és SQL-kódot egy Azure Databricks számítási erőforráshoz, például fürthöz csatolt jegyzetfüzetből futtathatja. A cikkben szereplő SQL-kódot a Databricks SQL-ben lévő SQL-raktárhoztársított lekérdezésből is futtathatja.

A forrásadatok előkészítése

Ez az oktatóanyag egy People 10 M nevű adatkészletre támaszkodik. 10 millió fiktív rekordot tartalmaz, amelyek olyan tényeket tartalmaznak az emberekről, mint a vezeték- és utónevek, a születési dátum és a fizetés. Ez az oktatóanyag feltételezi, hogy ez az adatkészlet egy Unity Catalog-kötetben található, amely a cél Azure Databricks-munkaterülethez van társítva.

Ha le szeretné szerezni a People 10 M adatkészletet ehhez az oktatóanyaghoz, tegye a következőket:

  1. Nyissa meg a Személyek 10 M lapot Kaggleben.
  2. Kattintson a Letöltés gombra a helyi gépre elnevezett archive.zip fájl letöltéséhez.
  3. Bontsa ki a fájlból archive.zip elnevezett export.csv fájlt. A export.csv fájl tartalmazza az oktatóanyag adatait.

A fájl kötetbe való feltöltéséhez export.csv tegye a következőket:

  1. Az oldalsávon kattintson a Katalógus elemre.
  2. A Katalóguskezelőben keresse meg és nyissa meg azt a kötetet, ahová fel szeretné tölteni a export.csv fájlt.
  3. Kattintson a Feltöltés erre a kötetre elemre.
  4. Húzza a fájlt a helyi gépen, vagy keresse meg és jelölje ki export.csv .
  5. Kattintson a Feltöltés gombra.

Az alábbi példakódban cserélje le /Volumes/main/default/my-volume/export.csv a export.csv célkötetben lévő fájl elérési útját.

Tábla létrehozása

Az Azure Databricksben létrehozott összes tábla alapértelmezés szerint a Delta Lake-t használja. A Databricks a Unity Catalog által felügyelt táblák használatát javasolja.

Az előző kód példában és az alábbi példakódokban cserélje le a tábla nevét main.default.people_10m a cél háromrészes katalógusra, sémára és táblanévre a Unity Catalogban.

Feljegyzés

A Delta Lake az azure Databricks összes olvasási, írási és táblázat-létrehozási parancsának alapértelmezett értéke.

Python

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("middleName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("birthDate", TimestampType(), True),
  StructField("ssn", StringType(), True),
  StructField("salary", IntegerType(), True)
])

df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/main/default/my-volume/export.csv")

# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()

# If you know the table does not already exist, you can call this instead:
# df.saveAsTable("main.default.people_10m")

Scala

import org.apache.spark.sql.types._

val schema = StructType(Array(
  StructField("id", IntegerType, nullable = true),
  StructField("firstName", StringType, nullable = true),
  StructField("middleName", StringType, nullable = true),
  StructField("lastName", StringType, nullable = true),
  StructField("gender", StringType, nullable = true),
  StructField("birthDate", TimestampType, nullable = true),
  StructField("ssn", StringType, nullable = true),
  StructField("salary", IntegerType, nullable = true)
))

val df = spark.read.format("csv").option("header", "true").schema(schema).load("/Volumes/main/default/my-volume/export.csv")

// Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()

// If you know that the table doesn't exist, call this instead:
// df.saveAsTable("main.default.people_10m")

SQL

CREATE OR REPLACE TABLE main.default.people_10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
);

COPY INTO main.default.people_10m
FROM '/Volumes/main/default/my-volume/export.csv'
FILEFORMAT = CSV
FORMAT_OPTIONS ( 'header' = 'true', 'inferSchema' = 'true' );

Az előző műveletek létrehoznak egy új felügyelt táblát. A Delta-tábla létrehozásakor elérhető lehetőségekről további információt a CREATE TABLE (TÁBLA LÉTREHOZÁSA) című témakörben talál.

A Databricks Runtime 13.3 LTS-ben és újabb verziókban a CREATE TABLE LIKE használatával létrehozhat egy új üres Delta-táblát, amely duplikálja egy forrás Delta-tábla séma- és táblatulajdonságait. Ez különösen hasznos lehet a táblák fejlesztési környezetből éles környezetbe való előléptetéséhez, ahogyan az alábbi kód példában is látható:

CREATE TABLE main.default.people_10m_prod LIKE main.default.people_10m

Üres tábla létrehozásához használhatja az API-t a Pythonhoz és a DeltaTableBuilder Scalához készült Delta Lake-ben is. Az egyenértékű DataFrameWriter API-khoz képest ezek az API-k megkönnyítik a további információk megadását, például az oszlopbejegyzéseket, a táblatulajdonságokat és a létrehozott oszlopokat.

Fontos

Ez a funkció a nyilvános előzetes verzióban érhető el.

Python

DeltaTable.createIfNotExists(spark)
  .tableName("main.default.people_10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

Scala

DeltaTable.createOrReplace(spark)
  .tableName("main.default.people_10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

Upsert to a table

Ha frissítéseket és beszúrásokat szeretne egy meglévő Delta-táblába egyesíteni, használja a Python és a DeltaTable.merge Scala metódusát, valamint az SQL MERGE INTO utasítását. Az alábbi példa például adatokat vesz fel a forrástáblából, és egyesíti őket a cél Delta-táblával. Ha mindkét táblában egyező sor található, a Delta Lake a megadott kifejezéssel frissíti az adatoszlopot. Ha nincs egyező sor, a Delta Lake új sort ad hozzá. Ez a művelet upsert néven ismert.

Python

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from datetime import date

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("middleName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("birthDate", DateType(), True),
  StructField("ssn", StringType(), True),
  StructField("salary", IntegerType(), True)
])

data = [
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', date.fromisoformat('1992-09-17'), '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', date.fromisoformat('1984-05-22'), '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', date.fromisoformat('1968-07-22'), '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', date.fromisoformat('1978-01-14'), '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', date.fromisoformat('1982-10-29'), '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', date.fromisoformat('1981-06-25'), '567-89-0123', 89900)
]

people_10m_updates = spark.createDataFrame(data, schema)
people_10m_updates.createTempView("people_10m_updates")

# ...

from delta.tables import DeltaTable

deltaTable = DeltaTable.forName(spark, 'main.default.people_10m')

(deltaTable.alias("people_10m")
  .merge(
    people_10m_updates.alias("people_10m_updates"),
    "people_10m.id = people_10m_updates.id")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .execute()
)

Scala

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import java.sql.Timestamp

val schema = StructType(Array(
  StructField("id", IntegerType, nullable = true),
  StructField("firstName", StringType, nullable = true),
  StructField("middleName", StringType, nullable = true),
  StructField("lastName", StringType, nullable = true),
  StructField("gender", StringType, nullable = true),
  StructField("birthDate", TimestampType, nullable = true),
  StructField("ssn", StringType, nullable = true),
  StructField("salary", IntegerType, nullable = true)
))

val data = Seq(
  Row(9999998, "Billy", "Tommie", "Luppitt", "M", Timestamp.valueOf("1992-09-17 00:00:00"), "953-38-9452", 55250),
  Row(9999999, "Elias", "Cyril", "Leadbetter", "M", Timestamp.valueOf("1984-05-22 00:00:00"), "906-51-2137", 48500),
  Row(10000000, "Joshua", "Chas", "Broggio", "M", Timestamp.valueOf("1968-07-22 00:00:00"), "988-61-6247", 90000),
  Row(20000001, "John", "", "Doe", "M", Timestamp.valueOf("1978-01-14 00:00:00"), "345-67-8901", 55500),
  Row(20000002, "Mary", "", "Smith", "F", Timestamp.valueOf("1982-10-29 00:00:00"), "456-78-9012", 98250),
  Row(20000003, "Jane", "", "Doe", "F", Timestamp.valueOf("1981-06-25 00:00:00"), "567-89-0123", 89900)
)

val people_10m_updates = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
people_10m_updates.createOrReplaceTempView("people_10m_updates")

// ...

import io.delta.tables.DeltaTable

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

deltaTable.as("people_10m")
  .merge(
    people_10m_updates.as("people_10m_updates"),
    "people_10m.id = people_10m_updates.id"
  )
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .execute()

SQL

CREATE OR REPLACE TEMP VIEW people_10m_updates (
  id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);

MERGE INTO people_10m
USING people_10m_updates
ON people_10m.id = people_10m_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

Ha megadja az SQL-t *, ez frissíti vagy beszúrja a céltábla összes oszlopát, feltéve, hogy a forrástábla oszlopai megegyeznek a céltáblával. Ha a céltábla nem rendelkezik ugyanazokkal az oszlopokkal, a lekérdezés elemzési hibát jelez.

Beszúrási művelet végrehajtásakor meg kell adnia egy értéket a táblázat minden oszlopához (például ha a meglévő adathalmazban nincs egyező sor). Azonban nem kell frissítenie az összes értéket.

Az eredmények megtekintéséhez kérdezze le a táblát.

Python

df = spark.read.table("main.default.people_10m")
df_filtered = df.filter(df["id"] >= 9999998)
display(df_filtered)

Scala

val df = spark.read.table("main.default.people_10m")
val df_filtered = df.filter($"id" >= 9999998)
display(df_filtered)

SQL

SELECT * FROM main.default.people_10m WHERE id >= 9999998

Táblázat olvasása

A Delta-táblák adatait a táblanév vagy a tábla elérési útja alapján érheti el, ahogyan az alábbi példákban is látható:

Python

people_df = spark.read.table("main.default.people_10m")
display(people_df)

Scala

val people_df = spark.read.table("main.default.people_10m")
display(people_df)

SQL

SELECT * FROM main.default.people_10m;

Írás táblába

A Delta Lake szabványos szintaxist használ az adatok táblákba való írásához.

Ha új adatokat szeretne atomilag hozzáadni egy meglévő Delta-táblához, használja a hozzáfűzési módot az alábbi példákban látható módon:

Python

df.write.mode("append").saveAsTable("main.default.people_10m")

Scala

df.write.mode("append").saveAsTable("main.default.people_10m")

SQL

INSERT INTO main.default.people_10m SELECT * FROM main.default.more_people

A tábla összes adatának cseréjéhez használja a felülírási módot az alábbi példákhoz hasonlóan:

Python

df.write.mode("overwrite").saveAsTable("main.default.people_10m")

Scala

df.write.mode("overwrite").saveAsTable("main.default.people_10m")

SQL

INSERT OVERWRITE TABLE main.default.people_10m SELECT * FROM main.default.more_people

Tábla frissítése

A Delta-táblák predikátumának megfelelő adatokat frissítheti. A példatáblában people_10m például a következőt futtathatja az oszlop MF rövidítésének gender módosításához MaleFemale:

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
  condition = "gender = 'F'",
  set = { "gender": "'Female'" }
)

# Declare the predicate by using Spark SQL functions.
deltaTable.update(
  condition = col('gender') == 'M',
  set = { 'gender': lit('Male') }
)

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
  "gender = 'F'",
  Map("gender" -> "'Female'")
)

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
  col("gender") === "M",
  Map("gender" -> lit("Male")));

SQL

UPDATE main.default.people_10m SET gender = 'Female' WHERE gender = 'F';
UPDATE main.default.people_10m SET gender = 'Male' WHERE gender = 'M';

Törlés táblából

Egy predikátumnak megfelelő adatokat eltávolíthat egy Delta-táblából. Ha például a példatáblában people_10m törölni szeretné az oszlopban birthDate1955lévő értékekkel rendelkező személyeknek megfelelő összes sort, futtassa a következőt:

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")

SQL

DELETE FROM main.default.people_10m WHERE birthDate < '1955-01-01'

Fontos

A törlés eltávolítja az adatokat a Delta-tábla legújabb verziójából, de nem távolítja el azokat a fizikai tárolóból, amíg a régi verziók explicit módon ki nem porszívózódnak. Részletekért lásd a vákuumot .

Táblaelőzmények megjelenítése

Egy tábla előzményeinek megtekintéséhez használja a DeltaTable.history Python és a Scala metódusát, valamint az SQL-ben található DESCRIBE HISTORY utasítást, amely az egyes táblákba való írások esetében a táblaverziót, a műveletet, a felhasználót és így tovább, az előzményadatokat tartalmazza.

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())

SQL

DESCRIBE HISTORY main.default.people_10m

A tábla egy korábbi verziójának lekérdezése (időutazás)

A Delta Lake időutazással lekérdezheti egy Delta-tábla régebbi pillanatképét.

A tábla régebbi verziójának lekérdezéséhez adja meg a tábla verzióját vagy időbélyegét. Ha például le szeretné kérdezni a 0-s verziót vagy az időbélyeget 2024-05-15T22:43:15.000+00:00Z az előző előzményekből, használja a következőket:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaHistory = deltaTable.history()

display(deltaHistory.where("version == 0"))
# Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
val deltaHistory = deltaTable.history()

display(deltaHistory.where("version == 0"))
// Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))

SQL

SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'

Időbélyegek esetén a rendszer csak dátum- vagy időbélyeg-sztringeket fogad el, például "2024-05-15T22:43:15.000+00:00""2024-05-15 22:43:15".

A DataFrameReader beállításaival olyan DataFrame-et hozhat létre egy Delta-táblából, amely a tábla egy adott verziójára vagy időbélyegére van rögzítve, például:

Python

df = spark.read.option('versionAsOf', 0).table("main.default.people_10m")
# Or:
df = spark.read.option('timestampAsOf', '2024-05-15T22:43:15.000+00:00').table("main.default.people_10m")

display(df)

Scala

val df = spark.read.option("versionAsOf", 0).table("main.default.people_10m")
// Or:
val df = spark.read.option("timestampAsOf", "2024-05-15T22:43:15.000+00:00").table("main.default.people_10m")

display(df)

SQL

SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2024-05-15T22:43:15.000+00:00'

További részletekért lásd : A Delta Lake táblaelőzményeinek ismertetése.

Táblázat optimalizálása

Miután több módosítást végzett egy táblán, sok kis fájllal rendelkezhet. Az olvasási lekérdezések sebességének javítása érdekében az optimalizálási művelettel összecsukhatja a kis fájlokat nagyobbakra:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize()

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize()

SQL

OPTIMIZE main.default.people_10m

Z-sorrend oszlopok szerint

Az olvasási teljesítmény további javítása érdekében a kapcsolódó információkat z-rendezéssel csoportosíthatja ugyanabban a fájlkészletben. A Delta Lake adat-kihagyási algoritmusai ezzel a rendezéssel jelentősen csökkentik az olvasandó adatok mennyiségét. A z-order adatokhoz meg kell adnia a z-sorrendben művelet szerint rendezendő oszlopokat. Például a rendezéshez futtassa a következőt gender:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")

SQL

OPTIMIZE main.default.people_10m
ZORDER BY (gender)

Az optimalizálási művelet futtatásakor elérhető összes beállításért tekintse meg az adatfájl-elrendezés optimalizálása című témakört.

Pillanatképek törlése a VACUUM

A Delta Lake pillanatkép-elkülönítést biztosít az olvasásokhoz, ami azt jelenti, hogy biztonságosan futtathat optimalizálási műveletet, még akkor is, ha más felhasználók vagy feladatok kérdezik le a táblát. Végül azonban törölnie kell a régi pillanatképeket. Ezt a vákuumművelet futtatásával teheti meg:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()

SQL

VACUUM main.default.people_10m

A vákuumművelet hatékony használatáról további információt a nem használt adatfájlok eltávolítása vákuummal című témakörben talál.