Aracılığıyla paylaş


Öğretici: Delta Lake

Bu öğretici, aşağıdakiler de dahil olmak üzere Azure Databricks'te yaygın olarak kullanılan Delta Lake işlemlerini tanıtır:

Bu makaledeki Python, Scala ve SQL kodu örneğini, küme gibi bir Azure Databricks işlem kaynağına bağlı bir not defterinin içinden çalıştırabilirsiniz. Bu makaledeki SQL kodunu Databricks SQL'deki bir SQL ambarıile ilişkilendirilmiş bir sorgunun içinden de çalıştırabilirsiniz.

Kaynak verileri hazırlama

Bu öğretici, Kişiler 10 M adlı bir veri kümesini temel alır. Ad ve soyadı, doğum tarihi ve maaş gibi insanlar hakkında gerçekler barındıran 10 milyon kurgusal kayıt içerir. Bu öğreticide, bu veri kümesinin hedef Azure Databricks çalışma alanınızla ilişkili bir Unity Kataloğu biriminde olduğu varsayılır.

Bu öğreticide Kişiler 10 M veri kümesini almak için aşağıdakileri yapın:

  1. Kaggle'da Kişiler 10 M sayfasına gidin.
  2. Adlı dosyayı archive.zip yerel makinenize indirmek için İndir'e tıklayın.
  3. adlı export.csv dosyayı dosyasından archive.zip ayıklayın. Dosya bu export.csv öğreticinin verilerini içerir.

Dosyayı birime yüklemek export.csv için aşağıdakileri yapın:

  1. Kenar çubuğunda Katalog'a tıklayın.
  2. Katalog Gezgini'nde, dosyayı karşıya yüklemek export.csv istediğiniz birimi bulun ve açın.
  3. Bu birime yükle'ye tıklayın.
  4. Yerel makinenizdeki dosyayı sürükleyip bırakın veya dosyaya export.csv göz atın ve seçin.
  5. Karşıya Yükle'ye tıklayın.

Aşağıdaki kod örneklerinde öğesini hedef biriminizdeki dosyanın yoluyla export.csv değiştirin/Volumes/main/default/my-volume/export.csv.

Tablo oluşturma

Azure Databricks'te oluşturulan tüm tablolar varsayılan olarak Delta Lake kullanır. Databricks, Unity Kataloğu yönetilen tablolarının kullanılmasını önerir.

Önceki kod örneğinde ve aşağıdaki kod örneklerinde, Unity Kataloğu'nda tablo adını main.default.people_10m hedef üç bölümlü kataloğunuz, şemanız ve tablo adınızla değiştirin.

Not

Delta Lake, Azure Databricks'in tüm okuma, yazma ve tablo oluşturma komutları için varsayılandır.

Python

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("middleName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("birthDate", TimestampType(), True),
  StructField("ssn", StringType(), True),
  StructField("salary", IntegerType(), True)
])

df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/main/default/my-volume/export.csv")

# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()

# If you know the table does not already exist, you can call this instead:
# df.saveAsTable("main.default.people_10m")

Scala

import org.apache.spark.sql.types._

val schema = StructType(Array(
  StructField("id", IntegerType, nullable = true),
  StructField("firstName", StringType, nullable = true),
  StructField("middleName", StringType, nullable = true),
  StructField("lastName", StringType, nullable = true),
  StructField("gender", StringType, nullable = true),
  StructField("birthDate", TimestampType, nullable = true),
  StructField("ssn", StringType, nullable = true),
  StructField("salary", IntegerType, nullable = true)
))

val df = spark.read.format("csv").option("header", "true").schema(schema).load("/Volumes/main/default/my-volume/export.csv")

// Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()

// If you know that the table doesn't exist, call this instead:
// df.saveAsTable("main.default.people_10m")

SQL

CREATE OR REPLACE TABLE main.default.people_10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
);

COPY INTO main.default.people_10m
FROM '/Volumes/main/default/my-volume/export.csv'
FILEFORMAT = CSV
FORMAT_OPTIONS ( 'header' = 'true', 'inferSchema' = 'true' );

Önceki işlemler yeni bir yönetilen tablo oluşturur. Delta tablosu oluştururken kullanılabilen seçenekler hakkında bilgi için bkz . CREATE TABLE.

Databricks Runtime 13.3 LTS ve üzeri sürümlerde CREATE TABLE LIKE kullanarak kaynak Delta tablosunun şema ve tablo özelliklerini çoğaltan yeni bir boş Delta tablosu oluşturabilirsiniz. Bu, aşağıdaki kod örneğinde gösterildiği gibi tabloları geliştirme ortamından üretime yükseltme sırasında özellikle yararlı olabilir:

CREATE TABLE main.default.people_10m_prod LIKE main.default.people_10m

Boş bir tablo oluşturmak için Python ve Scala için Delta Lake'teki API'yi de kullanabilirsinizDeltaTableBuilder. Eşdeğer DataFrameWriter API'leriyle karşılaştırıldığında, bu API'ler sütun açıklamaları, tablo özellikleri ve oluşturulan sütunlar gibi ek bilgileri belirtmeyi kolaylaştırır.

Önemli

Bu özellik Genel Önizlemededir.

Python

DeltaTable.createIfNotExists(spark)
  .tableName("main.default.people_10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

Scala

DeltaTable.createOrReplace(spark)
  .tableName("main.default.people_10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

Tabloya upsert

Bir dizi güncelleştirme ve eklemeyi mevcut delta tablosuyla birleştirmek için Python ve Scala yöntemini ve SQL için MERGE INTO deyimini kullanırsınızDeltaTable.merge. Örneğin, aşağıdaki örnek kaynak tablodaki verileri alır ve hedef Delta tablosuyla birleştirir. Her iki tabloda da eşleşen bir satır olduğunda Delta Lake veri sütununu verilen ifadeyi kullanarak güncelleştirir. Eşleşen satır olmadığında Delta Lake yeni bir satır ekler. Bu işlem upsert olarak bilinir.

Python

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from datetime import date

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("middleName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("birthDate", DateType(), True),
  StructField("ssn", StringType(), True),
  StructField("salary", IntegerType(), True)
])

data = [
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', date.fromisoformat('1992-09-17'), '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', date.fromisoformat('1984-05-22'), '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', date.fromisoformat('1968-07-22'), '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', date.fromisoformat('1978-01-14'), '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', date.fromisoformat('1982-10-29'), '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', date.fromisoformat('1981-06-25'), '567-89-0123', 89900)
]

people_10m_updates = spark.createDataFrame(data, schema)
people_10m_updates.createTempView("people_10m_updates")

# ...

from delta.tables import DeltaTable

deltaTable = DeltaTable.forName(spark, 'main.default.people_10m')

(deltaTable.alias("people_10m")
  .merge(
    people_10m_updates.alias("people_10m_updates"),
    "people_10m.id = people_10m_updates.id")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .execute()
)

Scala

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import java.sql.Timestamp

val schema = StructType(Array(
  StructField("id", IntegerType, nullable = true),
  StructField("firstName", StringType, nullable = true),
  StructField("middleName", StringType, nullable = true),
  StructField("lastName", StringType, nullable = true),
  StructField("gender", StringType, nullable = true),
  StructField("birthDate", TimestampType, nullable = true),
  StructField("ssn", StringType, nullable = true),
  StructField("salary", IntegerType, nullable = true)
))

val data = Seq(
  Row(9999998, "Billy", "Tommie", "Luppitt", "M", Timestamp.valueOf("1992-09-17 00:00:00"), "953-38-9452", 55250),
  Row(9999999, "Elias", "Cyril", "Leadbetter", "M", Timestamp.valueOf("1984-05-22 00:00:00"), "906-51-2137", 48500),
  Row(10000000, "Joshua", "Chas", "Broggio", "M", Timestamp.valueOf("1968-07-22 00:00:00"), "988-61-6247", 90000),
  Row(20000001, "John", "", "Doe", "M", Timestamp.valueOf("1978-01-14 00:00:00"), "345-67-8901", 55500),
  Row(20000002, "Mary", "", "Smith", "F", Timestamp.valueOf("1982-10-29 00:00:00"), "456-78-9012", 98250),
  Row(20000003, "Jane", "", "Doe", "F", Timestamp.valueOf("1981-06-25 00:00:00"), "567-89-0123", 89900)
)

val people_10m_updates = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
people_10m_updates.createOrReplaceTempView("people_10m_updates")

// ...

import io.delta.tables.DeltaTable

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

deltaTable.as("people_10m")
  .merge(
    people_10m_updates.as("people_10m_updates"),
    "people_10m.id = people_10m_updates.id"
  )
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .execute()

SQL

CREATE OR REPLACE TEMP VIEW people_10m_updates (
  id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);

MERGE INTO people_10m
USING people_10m_updates
ON people_10m.id = people_10m_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

SQL'de, belirtirseniz *, kaynak tablonun hedef tabloyla aynı sütunlara sahip olduğu varsayılarak, bu hedef tablodaki tüm sütunları güncelleştirir veya ekler. Hedef tabloda aynı sütunlar yoksa sorgu bir çözümleme hatası oluşturur.

Ekleme işlemi gerçekleştirirken (örneğin, mevcut veri kümesinde eşleşen satır olmadığında) tablonuzdaki her sütun için bir değer belirtmeniz gerekir. Ancak, tüm değerleri güncelleştirmeniz gerekmez.

Sonuçları görmek için tabloyu sorgula.

Python

df = spark.read.table("main.default.people_10m")
df_filtered = df.filter(df["id"] >= 9999998)
display(df_filtered)

Scala

val df = spark.read.table("main.default.people_10m")
val df_filtered = df.filter($"id" >= 9999998)
display(df_filtered)

SQL

SELECT * FROM main.default.people_10m WHERE id >= 9999998

Tablo okuma

Delta tablolarındaki verilere, aşağıdaki örneklerde gösterildiği gibi tablo adı veya tablo yolu ile erişebilirsiniz:

Python

people_df = spark.read.table("main.default.people_10m")
display(people_df)

Scala

val people_df = spark.read.table("main.default.people_10m")
display(people_df)

SQL

SELECT * FROM main.default.people_10m;

Tabloya yazma

Delta Lake, tablolara veri yazmak için standart söz dizimi kullanır.

Mevcut Delta tablosuna atomik olarak yeni veri eklemek için aşağıdaki örneklerde gösterildiği gibi ekleme modunu kullanın:

Python

df.write.mode("append").saveAsTable("main.default.people_10m")

Scala

df.write.mode("append").saveAsTable("main.default.people_10m")

SQL

INSERT INTO main.default.people_10m SELECT * FROM main.default.more_people

Bir tablodaki tüm verileri değiştirmek için, aşağıdaki örneklerde olduğu gibi üzerine yazma modunu kullanın:

Python

df.write.mode("overwrite").saveAsTable("main.default.people_10m")

Scala

df.write.mode("overwrite").saveAsTable("main.default.people_10m")

SQL

INSERT OVERWRITE TABLE main.default.people_10m SELECT * FROM main.default.more_people

Tabloyu güncelleştirme

Delta tablosundaki bir koşulla eşleşen verileri güncelleştirebilirsiniz. Örneğin, örnek people_10m tabloda, sütundaki M bir kısaltmayı gender veya F olarak değiştirmek için MaleFemaleaşağıdakileri çalıştırabilirsiniz:

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
  condition = "gender = 'F'",
  set = { "gender": "'Female'" }
)

# Declare the predicate by using Spark SQL functions.
deltaTable.update(
  condition = col('gender') == 'M',
  set = { 'gender': lit('Male') }
)

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
  "gender = 'F'",
  Map("gender" -> "'Female'")
)

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
  col("gender") === "M",
  Map("gender" -> lit("Male")));

SQL

UPDATE main.default.people_10m SET gender = 'Female' WHERE gender = 'F';
UPDATE main.default.people_10m SET gender = 'Male' WHERE gender = 'M';

Tablodan silme

Delta tablosundan koşulla eşleşen verileri kaldırabilirsiniz. Örneğin, örnek people_10m tabloda, sütununda değeri olan kişilere karşılık gelen tüm satırları öncesinden birthDate1955silmek için aşağıdakileri çalıştırabilirsiniz:

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")

SQL

DELETE FROM main.default.people_10m WHERE birthDate < '1955-01-01'

Önemli

Silme işlemi Delta tablosunun en son sürümündeki verileri kaldırır ancak eski sürümler açıkça vakumlanana kadar fiziksel depolama alanından kaldırmaz. Ayrıntılar için bkz . vakum .

Tablo geçmişini görüntüleme

Tablonun geçmişini görüntülemek için Python ve Scala yöntemini ve SQL'de tablo sürümü, işlem, kullanıcı vb. dahil olmak üzere bir tabloya her yazma işlemi için provenance bilgileri sağlayan DESCRIBE HISTORY deyimini kullanırsınızDeltaTable.history.

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())

SQL

DESCRIBE HISTORY main.default.people_10m

Tablonun önceki bir sürümünü sorgulama (zaman yolculuğu)

Delta Lake zaman yolculuğu, Delta tablosunun eski bir anlık görüntüsünü sorgulamanıza olanak tanır.

Tablonun eski bir sürümünü sorgulamak için tablonun sürümünü veya zaman damgasını belirtin. Örneğin, önceki geçmişe ait sürüm 0 veya zaman damgasını 2024-05-15T22:43:15.000+00:00Z sorgulamak için aşağıdakileri kullanın:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaHistory = deltaTable.history()

display(deltaHistory.where("version == 0"))
# Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
val deltaHistory = deltaTable.history()

display(deltaHistory.where("version == 0"))
// Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))

SQL

SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'

Zaman damgaları için, veya gibi "2024-05-15T22:43:15.000+00:00""2024-05-15 22:43:15"yalnızca tarih veya zaman damgası dizeleri kabul edilir.

DataFrameReader seçenekleri, bir Delta tablosundan, tablonun belirli bir sürümüne veya zaman damgasına sabit bir DataFrame oluşturmanıza olanak sağlar, örneğin:

Python

df = spark.read.option('versionAsOf', 0).table("main.default.people_10m")
# Or:
df = spark.read.option('timestampAsOf', '2024-05-15T22:43:15.000+00:00').table("main.default.people_10m")

display(df)

Scala

val df = spark.read.option("versionAsOf", 0).table("main.default.people_10m")
// Or:
val df = spark.read.option("timestampAsOf", "2024-05-15T22:43:15.000+00:00").table("main.default.people_10m")

display(df)

SQL

SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2024-05-15T22:43:15.000+00:00'

Ayrıntılar için bkz . Delta Lake tablo geçmişiyle çalışma.

Tabloyu iyileştirme

Tabloda birden çok değişiklik yaptıktan sonra çok sayıda küçük dosyanız olabilir. Okuma sorgularının hızını artırmak için iyileştirme işlemini kullanarak küçük dosyaları daha büyük dosyalara daraltabilirsiniz:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize()

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize()

SQL

OPTIMIZE main.default.people_10m

Sütunlara göre Z sırası

Okuma performansını daha da geliştirmek için, z-ordering ile aynı dosya kümesindeki ilgili bilgileri birlikte kullanabilirsiniz. Delta Lake veri atlama algoritmaları, okunması gereken veri miktarını önemli ölçüde azaltmak için bu birlikte bulundurmayı kullanır. Verileri z sıralamak için, z sırasına göre işlemde sıralanması gereken sütunları belirtirsiniz. Örneğin, ile genderbirlikte kullanmak için şunu çalıştırın:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")

SQL

OPTIMIZE main.default.people_10m
ZORDER BY (gender)

İyileştirme işlemi çalıştırılırken kullanılabilen tüm seçenekler için bkz . Veri dosyası düzenini iyileştirme.

Ile anlık görüntüleri temizleme VACUUM

Delta Lake, okumalar için anlık görüntü yalıtımı sağlar; başka bir deyişle, diğer kullanıcılar veya işler tabloyu sorgularken bile iyileştirme işlemini çalıştırmanın güvenli olduğu anlamına gelir. Ancak sonunda eski anlık görüntüleri temizlemeniz gerekir. Vakum işlemini çalıştırarak bunu yapabilirsiniz:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()

SQL

VACUUM main.default.people_10m

Vakum işlemini etkili bir şekilde kullanma hakkında ayrıntılı bilgi için bkz . Kullanılmayan veri dosyalarını vakumla kaldırma.