Öğretici: Delta Lake

Bu öğretici, aşağıdakiler de dahil olmak üzere Azure Databricks'te yaygın olarak kullanılan Delta Lake işlemlerini tanıtır:

Bu makaledeki Python, R, Scala ve SQL kodu örneğini Azure Databricks kümesine bağlı bir not defterinin içinden çalıştırabilirsiniz. Bu makaledeki SQL kodunu Databricks SQL'deki bir SQL ambarıile ilişkilendirilmiş bir sorgunun içinden de çalıştırabilirsiniz.

Not

Aşağıdaki kod örneklerinden bazıları, şema (veritabanı olarak da adlandırılır) ve tablo veya görünümden (örneğin, default.people10m) oluşan iki düzeyli ad alanı gösterimini kullanır. Bu örnekleri Unity Kataloğu ile kullanmak için, iki düzeyli ad alanını katalog, şema ve tablo veya görünümden oluşan Unity Kataloğu üç düzeyli ad alanı gösterimiyle değiştirin (örneğin, main.default.people10m).

Tablo oluşturma

Azure Databricks'te oluşturulan tüm tablolar varsayılan olarak Delta Lake kullanır.

Not

Delta Lake, Azure Databricks'in tüm okuma, yazma ve tablo oluşturma komutları için varsayılandır.

Python

# Load the data from its source.
df = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")

# Write the data to a table.
table_name = "people_10m"
df.write.saveAsTable(table_name)

R

library(SparkR)
sparkR.session()

# Load the data from its source.
df = read.df(path = "/databricks-datasets/learning-spark-v2/people/people-10m.delta")

# Write the data to a table.
table_name = "people_10m"

saveAsTable(
  df = df,
  tableName = table_name
)

Scala

// Load the data from its source.
val people = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")

// Write the data to a table.
val table_name = "people_10m"

people.write.saveAsTable("people_10m")

SQL

DROP TABLE IF EXISTS people_10m;

CREATE TABLE IF NOT EXISTS people_10m
AS SELECT * FROM delta.`/databricks-datasets/learning-spark-v2/people/people-10m.delta`;

Önceki işlemler, verilerden çıkarılan şemayı kullanarak yeni bir yönetilen tablo oluşturur. Delta tablosu oluştururken kullanılabilen seçenekler hakkında bilgi için bkz . CREATE TABLE.

Yönetilen tablolar için Azure Databricks verilerin konumunu belirler. Konumu almak için DESCRIBE DETAIL deyimini kullanabilirsiniz, örneğin:

Python

display(spark.sql('DESCRIBE DETAIL people_10m'))

R

display(sql("DESCRIBE DETAIL people_10m"))

Scala

display(spark.sql("DESCRIBE DETAIL people_10m"))

SQL

DESCRIBE DETAIL people_10m;

Bazen veri eklemeden önce şemayı belirterek bir tablo oluşturmak isteyebilirsiniz. Bunu aşağıdaki SQL komutlarıyla tamamlayabilirsiniz:

CREATE TABLE IF NOT EXISTS people10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
)

CREATE OR REPLACE TABLE people10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
)

Databricks Runtime 13.3 LTS ve üzeri sürümlerde, kaynak Delta tablosunun şema ve tablo özelliklerini çoğaltan yeni bir boş Delta tablosu oluşturmak için kullanabilirsiniz CREATE TABLE LIKE . Bu, aşağıdaki kod örneğinde olduğu gibi, tabloları geliştirme ortamından üretime yükseltme sırasında özellikle yararlı olabilir:

CREATE TABLE prod.people10m LIKE dev.people10m

Tablo oluşturmak için Delta Lake'teki API'yi de kullanabilirsiniz DeltaTableBuilder . DataFrameWriter API'leriyle karşılaştırıldığında, bu API sütun açıklamaları, tablo özellikleri ve oluşturulan sütunlar gibi ek bilgileri belirtmeyi kolaylaştırır.

Önemli

Bu özellik Genel Önizlemededir.

Python

# Create table in the metastore
DeltaTable.createIfNotExists(spark) \
  .tableName("default.people10m") \
  .addColumn("id", "INT") \
  .addColumn("firstName", "STRING") \
  .addColumn("middleName", "STRING") \
  .addColumn("lastName", "STRING", comment = "surname") \
  .addColumn("gender", "STRING") \
  .addColumn("birthDate", "TIMESTAMP") \
  .addColumn("ssn", "STRING") \
  .addColumn("salary", "INT") \
  .execute()

# Create or replace table with path and add properties
DeltaTable.createOrReplace(spark) \
  .addColumn("id", "INT") \
  .addColumn("firstName", "STRING") \
  .addColumn("middleName", "STRING") \
  .addColumn("lastName", "STRING", comment = "surname") \
  .addColumn("gender", "STRING") \
  .addColumn("birthDate", "TIMESTAMP") \
  .addColumn("ssn", "STRING") \
  .addColumn("salary", "INT") \
  .property("description", "table with people data") \
  .location("/tmp/delta/people10m") \
  .execute()

Scala

// Create table in the metastore
DeltaTable.createOrReplace(spark)
  .tableName("default.people10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

// Create or replace table with path and add properties
DeltaTable.createOrReplace(spark)
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .property("description", "table with people data")
  .location("/tmp/delta/people10m")
  .execute()

Tabloya upsert

Bir dizi güncelleştirme ve eklemeyi var olan bir Delta tablosuyla birleştirmek için MERGE INTO deyimini kullanırsınız. Örneğin, aşağıdaki deyim kaynak tablodaki verileri alır ve hedef Delta tablosuyla birleştirir. Her iki tabloda da eşleşen bir satır olduğunda Delta Lake veri sütununu verilen ifadeyi kullanarak güncelleştirir. Eşleşen satır olmadığında Delta Lake yeni bir satır ekler. Bu işlem upsert olarak bilinir.

CREATE OR REPLACE TEMP VIEW people_updates (
  id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);

MERGE INTO people_10m
USING people_updates
ON people_10m.id = people_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

belirtirseniz *, bu hedef tablodaki tüm sütunları güncelleştirir veya ekler. Bu, kaynak tablonun hedef tablodaki sütunlarla aynı sütunlara sahip olduğunu varsayar, aksi takdirde sorgu bir çözümleme hatası oluşturur.

Bir işlem gerçekleştirirken (örneğin, mevcut veri kümesinde eşleşen satır olmadığında) tablonuzdaki her sütun için bir INSERT değer belirtmeniz gerekir. Ancak, tüm değerleri güncelleştirmeniz gerekmez.

Sonuçları görmek için tabloyu sorgula.

SELECT * FROM people_10m WHERE id >= 9999998

Tablo okuma

Delta tablolarındaki verilere, aşağıdaki örneklerde gösterildiği gibi tablo adı veya tablo yolu ile erişebilirsiniz:

Python

people_df = spark.read.table(table_name)

display(people_df)

## or

people_df = spark.read.load(table_path)

display(people_df)

R

people_df = tableToDF(table_name)

display(people_df)

Scala

val people_df = spark.read.table(table_name)

display(people_df)

\\ or

val people_df = spark.read.load(table_path)

display(people_df)

SQL

SELECT * FROM people_10m;

SELECT * FROM delta.`<path-to-table`;

Tabloya yazma

Delta Lake, tablolara veri yazmak için standart söz dizimi kullanır.

Mevcut Delta tablosuna atomik olarak yeni veri eklemek için aşağıdaki örneklerde olduğu gibi modu kullanın append :

SQL

INSERT INTO people10m SELECT * FROM more_people

Python

df.write.mode("append").saveAsTable("people10m")

Scala

df.write.mode("append").saveAsTable("people10m")

Tablodaki tüm verileri atomik olarak değiştirmek için aşağıdaki örneklerde olduğu gibi modu kullanın overwrite :

SQL

INSERT OVERWRITE TABLE people10m SELECT * FROM more_people

Python

df.write.mode("overwrite").saveAsTable("people10m")

Scala

df.write.mode("overwrite").saveAsTable("people10m")

Tabloyu güncelleştirme

Delta tablosundaki bir koşulla eşleşen verileri güncelleştirebilirsiniz. Örneğin, adlı people10m veya yolunda /tmp/delta/people-10mbulunan bir tabloda, sütundaki M bir kısaltmayı gender veya F veya olarak değiştirmek için MaleFemaleaşağıdakileri çalıştırabilirsiniz:

SQL

UPDATE people10m SET gender = 'Female' WHERE gender = 'F';
UPDATE people10m SET gender = 'Male' WHERE gender = 'M';

UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Female' WHERE gender = 'F';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Male' WHERE gender = 'M';

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')

# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
  condition = "gender = 'F'",
  set = { "gender": "'Female'" }
)

# Declare the predicate by using Spark SQL functions.
deltaTable.update(
  condition = col('gender') == 'M',
  set = { 'gender': lit('Male') }
)

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
  "gender = 'F'",
  Map("gender" -> "'Female'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
  col("gender") === "M",
  Map("gender" -> lit("Male")));

Tablodan silme

Delta tablosundan koşulla eşleşen verileri kaldırabilirsiniz. Örneğin, adlı people10m bir tabloda veya yolunda /tmp/delta/people-10m, sütununda değeri olan kişilere karşılık gelen tüm satırları öncesinden birthDate1955silmek için aşağıdakileri çalıştırabilirsiniz:

SQL

DELETE FROM people10m WHERE birthDate < '1955-01-01'

DELETE FROM delta.`/tmp/delta/people-10m` WHERE birthDate < '1955-01-01'

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')

# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")

Önemli

delete Delta tablosunun en son sürümündeki verileri kaldırır, ancak eski sürümler açıkça vakumlanana kadar fiziksel depolama alanından kaldırmaz. Ayrıntılar için bkz . vakum .

Tablo geçmişini görüntüleme

Tablonun geçmişini görüntülemek için tabloya yapılan her yazma işlemi için tablo sürümü, işlem, kullanıcı vb. gibi kanıtlanmış bilgiler sağlayan DESCRIBE HISTORY deyimini kullanın.

DESCRIBE HISTORY people_10m

Tablonun önceki bir sürümünü sorgulama (zaman yolculuğu)

Delta Lake zaman yolculuğu, Delta tablosunun eski bir anlık görüntüsünü sorgulamanıza olanak tanır.

Tablonun eski bir sürümünü sorgulamak için deyimde SELECT bir sürüm veya zaman damgası belirtin. Örneğin, yukarıdaki geçmişe ait sürüm 0'ı sorgulamak için şunu kullanın:

SELECT * FROM people_10m VERSION AS OF 0

veya

SELECT * FROM people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'

Zaman damgaları için, ve gibi "2019-01-01""2019-01-01'T'00:00:00.000Z"yalnızca tarih veya zaman damgası dizeleri kabul edilir.

DataFrameReader seçenekleri, örneğin Python'da tablonun belirli bir sürümüne sabit bir Delta tablosundan DataFrame oluşturmanıza olanak sağlar:

df1 = spark.read.format('delta').option('timestampAsOf', '2019-01-01').table("people_10m")

display(df1)

veya alternatif olarak:

df2 = spark.read.format('delta').option('versionAsOf', 0).table("people_10m")

display(df2)

Ayrıntılar için bkz . Delta Lake tablo geçmişiyle çalışma.

Tabloyu iyileştirme

Tabloda birden çok değişiklik yaptıktan sonra çok sayıda küçük dosyanız olabilir. Okuma sorgularının hızını artırmak için küçük dosyaları daha büyük dosyalara daraltmak için kullanabilirsiniz OPTIMIZE :

OPTIMIZE people_10m

Sütunlara göre Z sırası

Okuma performansını daha da geliştirmek için, Z-Ordering ile aynı dosya kümesindeki ilgili bilgileri birlikte bulabilirsiniz. Bu ortak yerellik, Delta Lake veri atlama algoritmaları tarafından okunması gereken veri miktarını önemli ölçüde azaltmak için otomatik olarak kullanılır. Z-Order verilerine, yan tümcesinde ZORDER BY sıralanması gereken sütunları belirtirsiniz. Örneğin, tarafından genderbirlikte bulmak için şunu çalıştırın:

OPTIMIZE people_10m
ZORDER BY (gender)

çalıştırırken OPTIMIZEkullanılabilen tüm seçenekler için bkz . Delta Lake'te iyileştirme ile veri dosyalarını sıkıştırma.

Ile anlık görüntüleri temizleme VACUUM

Delta Lake, okuma işlemleri için anlık görüntü yalıtımı sağlar. Bu, diğer kullanıcılar veya işler tabloyu sorgularken bile çalıştırılmasının OPTIMIZE güvenli olduğu anlamına gelir. Ancak sonunda eski anlık görüntüleri temizlemeniz gerekir. Komutunu çalıştırarak VACUUM bunu yapabilirsiniz:

VACUUM people_10m

Etkili bir şekilde kullanma VACUUM hakkında ayrıntılı bilgi için bkz . Kullanılmayan veri dosyalarını vakumla kaldırma.