Öğretici: Delta Lake
Bu öğretici, aşağıdakiler de dahil olmak üzere Azure Databricks'te yaygın olarak kullanılan Delta Lake işlemlerini tanıtır:
- Bir tablo oluşturun.
- Bir tabloya güncelleştirin/ekleyin.
- Bir tablodan okuyun.
- Tablo geçmişini görüntüleyin.
- Tablonun önceki bir sürümünü sorgulayın.
- Tabloyu iyileştirin.
- Bir Z sırası dizini ekleyin.
- Başvurulmayan dosyaları vakumlayın.
Bu makaledeki Python, R, Scala ve SQL kodu örneğini Azure Databricks kümesine bağlı bir not defterinin içinden çalıştırabilirsiniz. Bu makaledeki SQL kodunu Databricks SQL'deki bir SQL ambarıile ilişkilendirilmiş bir sorgunun içinden de çalıştırabilirsiniz.
Not
Aşağıdaki kod örneklerinden bazıları, şema (veritabanı olarak da adlandırılır) ve tablo veya görünümden (örneğin, default.people10m
) oluşan iki düzeyli ad alanı gösterimini kullanır. Bu örnekleri Unity Kataloğu ile kullanmak için, iki düzeyli ad alanını katalog, şema ve tablo veya görünümden oluşan Unity Kataloğu üç düzeyli ad alanı gösterimiyle değiştirin (örneğin, main.default.people10m
).
Tablo oluşturma
Azure Databricks'te oluşturulan tüm tablolar varsayılan olarak Delta Lake kullanır.
Not
Delta Lake, Azure Databricks'in tüm okuma, yazma ve tablo oluşturma komutları için varsayılandır.
Python
# Load the data from its source.
df = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")
# Write the data to a table.
table_name = "people_10m"
df.write.saveAsTable(table_name)
R
library(SparkR)
sparkR.session()
# Load the data from its source.
df = read.df(path = "/databricks-datasets/learning-spark-v2/people/people-10m.delta")
# Write the data to a table.
table_name = "people_10m"
saveAsTable(
df = df,
tableName = table_name
)
Scala
// Load the data from its source.
val people = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")
// Write the data to a table.
val table_name = "people_10m"
people.write.saveAsTable("people_10m")
SQL
DROP TABLE IF EXISTS people_10m;
CREATE TABLE IF NOT EXISTS people_10m
AS SELECT * FROM delta.`/databricks-datasets/learning-spark-v2/people/people-10m.delta`;
Önceki işlemler, verilerden çıkarılan şemayı kullanarak yeni bir yönetilen tablo oluşturur. Delta tablosu oluştururken kullanılabilen seçenekler hakkında bilgi için bkz . CREATE TABLE.
Yönetilen tablolar için Azure Databricks verilerin konumunu belirler. Konumu almak için DESCRIBE DETAIL deyimini kullanabilirsiniz, örneğin:
Python
display(spark.sql('DESCRIBE DETAIL people_10m'))
R
display(sql("DESCRIBE DETAIL people_10m"))
Scala
display(spark.sql("DESCRIBE DETAIL people_10m"))
SQL
DESCRIBE DETAIL people_10m;
Bazen veri eklemeden önce şemayı belirterek bir tablo oluşturmak isteyebilirsiniz. Bunu aşağıdaki SQL komutlarıyla tamamlayabilirsiniz:
CREATE TABLE IF NOT EXISTS people10m (
id INT,
firstName STRING,
middleName STRING,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
)
CREATE OR REPLACE TABLE people10m (
id INT,
firstName STRING,
middleName STRING,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
)
Databricks Runtime 13.3 LTS ve üzeri sürümlerde, kaynak Delta tablosunun şema ve tablo özelliklerini çoğaltan yeni bir boş Delta tablosu oluşturmak için kullanabilirsiniz CREATE TABLE LIKE
. Bu, aşağıdaki kod örneğinde olduğu gibi, tabloları geliştirme ortamından üretime yükseltme sırasında özellikle yararlı olabilir:
CREATE TABLE prod.people10m LIKE dev.people10m
Tablo oluşturmak için Delta Lake'teki API'yi de kullanabilirsiniz DeltaTableBuilder
. DataFrameWriter API'leriyle karşılaştırıldığında, bu API sütun açıklamaları, tablo özellikleri ve oluşturulan sütunlar gibi ek bilgileri belirtmeyi kolaylaştırır.
Önemli
Bu özellik Genel Önizlemededir.
Python
# Create table in the metastore
DeltaTable.createIfNotExists(spark) \
.tableName("default.people10m") \
.addColumn("id", "INT") \
.addColumn("firstName", "STRING") \
.addColumn("middleName", "STRING") \
.addColumn("lastName", "STRING", comment = "surname") \
.addColumn("gender", "STRING") \
.addColumn("birthDate", "TIMESTAMP") \
.addColumn("ssn", "STRING") \
.addColumn("salary", "INT") \
.execute()
# Create or replace table with path and add properties
DeltaTable.createOrReplace(spark) \
.addColumn("id", "INT") \
.addColumn("firstName", "STRING") \
.addColumn("middleName", "STRING") \
.addColumn("lastName", "STRING", comment = "surname") \
.addColumn("gender", "STRING") \
.addColumn("birthDate", "TIMESTAMP") \
.addColumn("ssn", "STRING") \
.addColumn("salary", "INT") \
.property("description", "table with people data") \
.location("/tmp/delta/people10m") \
.execute()
Scala
// Create table in the metastore
DeltaTable.createOrReplace(spark)
.tableName("default.people10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build())
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.execute()
// Create or replace table with path and add properties
DeltaTable.createOrReplace(spark)
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build())
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.property("description", "table with people data")
.location("/tmp/delta/people10m")
.execute()
Tabloya upsert
Bir dizi güncelleştirme ve eklemeyi var olan bir Delta tablosuyla birleştirmek için MERGE INTO deyimini kullanırsınız. Örneğin, aşağıdaki deyim kaynak tablodaki verileri alır ve hedef Delta tablosuyla birleştirir. Her iki tabloda da eşleşen bir satır olduğunda Delta Lake veri sütununu verilen ifadeyi kullanarak güncelleştirir. Eşleşen satır olmadığında Delta Lake yeni bir satır ekler. Bu işlem upsert olarak bilinir.
CREATE OR REPLACE TEMP VIEW people_updates (
id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
(9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
(10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
(20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
(20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
(20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);
MERGE INTO people_10m
USING people_updates
ON people_10m.id = people_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
belirtirseniz *
, bu hedef tablodaki tüm sütunları güncelleştirir veya ekler. Bu, kaynak tablonun hedef tablodaki sütunlarla aynı sütunlara sahip olduğunu varsayar, aksi takdirde sorgu bir çözümleme hatası oluşturur.
Bir işlem gerçekleştirirken (örneğin, mevcut veri kümesinde eşleşen satır olmadığında) tablonuzdaki her sütun için bir INSERT
değer belirtmeniz gerekir. Ancak, tüm değerleri güncelleştirmeniz gerekmez.
Sonuçları görmek için tabloyu sorgula.
SELECT * FROM people_10m WHERE id >= 9999998
Tablo okuma
Delta tablolarındaki verilere, aşağıdaki örneklerde gösterildiği gibi tablo adı veya tablo yolu ile erişebilirsiniz:
Python
people_df = spark.read.table(table_name)
display(people_df)
## or
people_df = spark.read.load(table_path)
display(people_df)
R
people_df = tableToDF(table_name)
display(people_df)
Scala
val people_df = spark.read.table(table_name)
display(people_df)
\\ or
val people_df = spark.read.load(table_path)
display(people_df)
SQL
SELECT * FROM people_10m;
SELECT * FROM delta.`<path-to-table`;
Tabloya yazma
Delta Lake, tablolara veri yazmak için standart söz dizimi kullanır.
Mevcut Delta tablosuna atomik olarak yeni veri eklemek için aşağıdaki örneklerde olduğu gibi modu kullanın append
:
SQL
INSERT INTO people10m SELECT * FROM more_people
Python
df.write.mode("append").saveAsTable("people10m")
Scala
df.write.mode("append").saveAsTable("people10m")
Tablodaki tüm verileri atomik olarak değiştirmek için aşağıdaki örneklerde olduğu gibi modu kullanın overwrite
:
SQL
INSERT OVERWRITE TABLE people10m SELECT * FROM more_people
Python
df.write.mode("overwrite").saveAsTable("people10m")
Scala
df.write.mode("overwrite").saveAsTable("people10m")
Tabloyu güncelleştirme
Delta tablosundaki bir koşulla eşleşen verileri güncelleştirebilirsiniz. Örneğin, adlı people10m
veya yolunda /tmp/delta/people-10m
bulunan bir tabloda, sütundaki M
bir kısaltmayı gender
veya F
veya olarak değiştirmek için Male
Female
aşağıdakileri çalıştırabilirsiniz:
SQL
UPDATE people10m SET gender = 'Female' WHERE gender = 'F';
UPDATE people10m SET gender = 'Male' WHERE gender = 'M';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Female' WHERE gender = 'F';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Male' WHERE gender = 'M';
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
condition = "gender = 'F'",
set = { "gender": "'Female'" }
)
# Declare the predicate by using Spark SQL functions.
deltaTable.update(
condition = col('gender') == 'M',
set = { 'gender': lit('Male') }
)
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
"gender = 'F'",
Map("gender" -> "'Female'")
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
col("gender") === "M",
Map("gender" -> lit("Male")));
Tablodan silme
Delta tablosundan koşulla eşleşen verileri kaldırabilirsiniz. Örneğin, adlı people10m
bir tabloda veya yolunda /tmp/delta/people-10m
, sütununda değeri olan kişilere karşılık gelen tüm satırları öncesinden birthDate
1955
silmek için aşağıdakileri çalıştırabilirsiniz:
SQL
DELETE FROM people10m WHERE birthDate < '1955-01-01'
DELETE FROM delta.`/tmp/delta/people-10m` WHERE birthDate < '1955-01-01'
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")
Önemli
delete
Delta tablosunun en son sürümündeki verileri kaldırır, ancak eski sürümler açıkça vakumlanana kadar fiziksel depolama alanından kaldırmaz. Ayrıntılar için bkz . vakum .
Tablo geçmişini görüntüleme
Tablonun geçmişini görüntülemek için tabloya yapılan her yazma işlemi için tablo sürümü, işlem, kullanıcı vb. gibi kanıtlanmış bilgiler sağlayan DESCRIBE HISTORY deyimini kullanın.
DESCRIBE HISTORY people_10m
Tablonun önceki bir sürümünü sorgulama (zaman yolculuğu)
Delta Lake zaman yolculuğu, Delta tablosunun eski bir anlık görüntüsünü sorgulamanıza olanak tanır.
Tablonun eski bir sürümünü sorgulamak için deyimde SELECT
bir sürüm veya zaman damgası belirtin. Örneğin, yukarıdaki geçmişe ait sürüm 0'ı sorgulamak için şunu kullanın:
SELECT * FROM people_10m VERSION AS OF 0
veya
SELECT * FROM people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'
Zaman damgaları için, ve gibi "2019-01-01"
"2019-01-01'T'00:00:00.000Z"
yalnızca tarih veya zaman damgası dizeleri kabul edilir.
DataFrameReader seçenekleri, örneğin Python'da tablonun belirli bir sürümüne sabit bir Delta tablosundan DataFrame oluşturmanıza olanak sağlar:
df1 = spark.read.format('delta').option('timestampAsOf', '2019-01-01').table("people_10m")
display(df1)
veya alternatif olarak:
df2 = spark.read.format('delta').option('versionAsOf', 0).table("people_10m")
display(df2)
Ayrıntılar için bkz . Delta Lake tablo geçmişiyle çalışma.
Tabloyu iyileştirme
Tabloda birden çok değişiklik yaptıktan sonra çok sayıda küçük dosyanız olabilir. Okuma sorgularının hızını artırmak için küçük dosyaları daha büyük dosyalara daraltmak için kullanabilirsiniz OPTIMIZE
:
OPTIMIZE people_10m
Sütunlara göre Z sırası
Okuma performansını daha da geliştirmek için, Z-Ordering ile aynı dosya kümesindeki ilgili bilgileri birlikte bulabilirsiniz. Bu ortak yerellik, Delta Lake veri atlama algoritmaları tarafından okunması gereken veri miktarını önemli ölçüde azaltmak için otomatik olarak kullanılır. Z-Order verilerine, yan tümcesinde ZORDER BY
sıralanması gereken sütunları belirtirsiniz. Örneğin, tarafından gender
birlikte bulmak için şunu çalıştırın:
OPTIMIZE people_10m
ZORDER BY (gender)
çalıştırırken OPTIMIZE
kullanılabilen tüm seçenekler için bkz . Delta Lake'te iyileştirme ile veri dosyalarını sıkıştırma.
Ile anlık görüntüleri temizleme VACUUM
Delta Lake, okuma işlemleri için anlık görüntü yalıtımı sağlar. Bu, diğer kullanıcılar veya işler tabloyu sorgularken bile çalıştırılmasının OPTIMIZE
güvenli olduğu anlamına gelir. Ancak sonunda eski anlık görüntüleri temizlemeniz gerekir. Komutunu çalıştırarak VACUUM
bunu yapabilirsiniz:
VACUUM people_10m
Etkili bir şekilde kullanma VACUUM
hakkında ayrıntılı bilgi için bkz . Kullanılmayan veri dosyalarını vakumla kaldırma.