Aracılığıyla paylaş


Linux Foundation Delta Lake'e genel bakış

Bu makale buradaki özgün muadili tarafından daha net bir şekilde uyarlanmıştır. Bu makale Delta Lake'in ana özelliklerini hızla keşfetmenize yardımcı olur. Bu makalede etkileşimli, toplu ve akış sorgularından Delta Lake tablolarından nasıl okunup yazıldığını gösteren kod parçacıkları sağlanır. Kod parçacıkları burada PySpark, Scala burada ve C# not defterleri kümesinde de kullanılabilir

Şunları ele alacağız:

  • Bir tablo oluşturma
  • Verileri okuma
  • Tablo verilerini güncelleştirme
  • Tablo verilerinin üzerine yazma
  • Üzerine yazmadan koşullu güncelleştirme
  • Time Travel kullanarak verilerin eski sürümlerini okuma
  • Tabloya veri akışı yazma
  • Tablodan değişiklik akışını okuma
  • SQL Desteği

Yapılandırma

Aşağıdakini ortamınıza uygun şekilde değiştirdiğinizden emin olun.

import random

session_id = random.randint(0,1000000)
delta_table_path = "/delta/delta-table-{0}".format(session_id)

delta_table_path
var sessionId = (new Random()).Next(10000000);
var deltaTablePath = $"/delta/delta-table-{sessionId}";

deltaTablePath
val sessionId = scala.util.Random.nextInt(1000000)
val deltaTablePath = s"/delta/delta-table-$sessionId";

Sonuçlar:

'/delta/delta-table-335323'

Bir tablo oluşturma

Delta Lake tablosu oluşturmak için bir DataFrame'i delta biçiminde bir DataFrame'e yazın. Biçimi Parquet, CSV, JSON vb. yerine delta olarak değiştirebilirsiniz.

Aşağıdaki kod, DataFrame'inizden çıkardığınız şemayı kullanarak yeni bir Delta Lake tablosu oluşturmayı gösterir.

data = spark.range(0,5)
data.show()
data.write.format("delta").save(delta_table_path)
var data = spark.Range(0,5);
data.Show();
data.Write().Format("delta").Save(deltaTablePath);
val data = spark.range(0, 5)
data.show
data.write.format("delta").save(deltaTablePath)

Sonuçlar:

ID
0
1
2
3
4

Verileri okuma

Delta Lake tablonuzdaki verileri, dosyaların yolunu ve delta biçimini belirterek okursunuz.

df = spark.read.format("delta").load(delta_table_path)
df.show()
var df = spark.Read().Format("delta").Load(deltaTablePath);
df.Show()
val df = spark.read.format("delta").load(deltaTablePath)
df.show()

Sonuçlar:

ID
1
3
4
0
2

Sonuçların çıktısı alınmadan önce açıkça belirtilen bir sıra olmadığından, sonuçların sırası yukarıdan farklıdır.

Tablo verilerini güncelleştirme

Delta Lake, standart DataFrame API'lerini kullanarak tabloları değiştirmek için çeşitli işlemleri destekler. Bu işlemler, delta biçiminin eklediği geliştirmelerden biridir. Aşağıdaki örnek, tablodaki verilerin üzerine yazmak için bir toplu iş çalıştırır.

data = spark.range(5,10)
data.write.format("delta").mode("overwrite").save(delta_table_path)
df.show()
var data = spark.Range(5,10);
data.Write().Format("delta").Mode("overwrite").Save(deltaTablePath);
df.Show();
val data = spark.range(5, 10)
data.write.format("delta").mode("overwrite").save(deltaTablePath)
df.show()

Sonuçlar:

ID
7
8
5
9
6

Burada beş kaydın da yeni değerleri barındıracak şekilde güncelleştirildiğini görebilirsiniz.

Katalog tabloları olarak kaydet

Delta Lake, yönetilen veya dış katalog tablolarına yazabilir.

data.write.format("delta").saveAsTable("ManagedDeltaTable")
spark.sql("CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '{0}'".format(delta_table_path))
spark.sql("SHOW TABLES").show()
data.Write().Format("delta").SaveAsTable("ManagedDeltaTable");
spark.Sql($"CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '{deltaTablePath}'");
spark.Sql("SHOW TABLES").Show();
data.write.format("delta").saveAsTable("ManagedDeltaTable")
spark.sql(s"CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '$deltaTablePath'")
spark.sql("SHOW TABLES").show

Sonuçlar:

database tableName isTemporary
default externaldeltatable yanlış
default manageddeltatable yanlış

Bu kodla, mevcut bir veri çerçevesinden katalogda yönetilen tablo olarak adlandırılan yeni bir tablo oluşturdunuz. Ardından katalogda dış tablo olarak adlandırılan var olan bir konumu kullanan yeni bir dış tablo tanımlamıştınız. Çıktıda, nasıl oluşturulduklarına bakılmaksızın her iki tablonun da katalogda listelendiğini görebilirsiniz.

Artık bu tabloların her ikisinin de genişletilmiş özelliklerine bakabilirsiniz

spark.sql("DESCRIBE EXTENDED ManagedDeltaTable").show(truncate=False)
spark.Sql("DESCRIBE EXTENDED ManagedDeltaTable").Show(truncate: 0);
spark.sql("DESCRIBE EXTENDED ManagedDeltaTable").show(truncate=false)

Sonuçlar:

col_name Data_type comment
kimlik bigint null
Ayrıntılı Tablo Bilgileri
Veritabanı default
Tablo manageddeltatable
Sahip trusted-service-user
Oluşturma Zamanı Sat 25 Nisan 00:35:34 UTC 2020
Son Erişim Per Jan 01 00:00:00 UTC 1970
Oluşturan Spark 2.4.4.2.6.99.201-11401300
Tür YÖNETİLEN
Sağlayıcı delta
Tablo Özellikleri [transient_lastDdlTime=1587774934]
İstatistikler 2407 bayt
Konum abfss://data@<data lake.dfs.core.windows.net/synapse/workspaces/>< workspace adı>/warehouse/manageddeltatable
Serde Kitaplığı org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat org.apache.hadoop.mapred.SequenceFileInputFormat
OutputFormat org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Depolama Özellikleri [serialization.format=1]
spark.sql("DESCRIBE EXTENDED ExternalDeltaTable").show(truncate=False)
spark.Sql("DESCRIBE EXTENDED ExternalDeltaTable").Show(truncate: 0);
spark.sql("DESCRIBE EXTENDED ExternalDeltaTable").show(truncate=false)

Sonuçlar:

col_name Data_type comment
kimlik bigint null
Ayrıntılı Tablo Bilgileri
Veritabanı default
Tablo externaldeltatable
Sahip trusted-service-user
Oluşturma Zamanı Sat 25 Nisan 00:35:38 UTC 2020
Son Erişim Per Jan 01 00:00:00 UTC 1970
Oluşturan Spark 2.4.4.2.6.99.201-11401300
Tür DIŞ
Sağlayıcı DELTA
Tablo Özellikleri [transient_lastDdlTime=1587774938]
Konum abfss://data@<data lake.dfs.core.windows.net/delta/delta-table-587152>
Serde Kitaplığı org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat org.apache.hadoop.mapred.SequenceFileInputFormat
OutputFormat org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Depolama Özellikleri [serialization.format=1]

Üzerine yazmadan koşullu güncelleştirme

Delta Lake koşullu güncelleştirme, silme ve birleştirme (bu komut genellikle upsert olarak adlandırılır) verilerini tablolara eklemek için programlı API'ler sağlar.

from delta.tables import *
from pyspark.sql.functions import *

delta_table = DeltaTable.forPath(spark, delta_table_path)

delta_table.update(
  condition = expr("id % 2 == 0"),
  set = { "id": expr("id + 100") })
delta_table.toDF().show()
using Microsoft.Spark.Extensions.Delta;
using Microsoft.Spark.Extensions.Delta.Tables;
using Microsoft.Spark.Sql;
using static Microsoft.Spark.Sql.Functions;

var deltaTable = DeltaTable.ForPath(deltaTablePath);

deltaTable.Update(
  condition: Expr("id % 2 == 0"),
  set: new Dictionary<string, Column>(){{ "id", Expr("id + 100") }});
deltaTable.ToDF().Show();
import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTable = DeltaTable.forPath(deltaTablePath)

// Update every even value by adding 100 to it
deltaTable.update(
  condition = expr("id % 2 == 0"),
  set = Map("id" -> expr("id + 100")))
deltaTable.toDF.show

Sonuçlar:

ID
106
108
5
7
9

Burada her çift kimlik için 100 eklediniz.

delta_table.delete("id % 2 == 0")
delta_table.toDF().show()
deltaTable.Delete(condition: Expr("id % 2 == 0"));
deltaTable.ToDF().Show();
deltaTable.delete(condition = expr("id % 2 == 0"))
deltaTable.toDF.show

Sonuçlar:

ID
5
7
9

Her çift satırın silindiğini fark edin.

new_data = spark.range(0,20).alias("newData")

delta_table.alias("oldData")\
    .merge(new_data.alias("newData"), "oldData.id = newData.id")\
    .whenMatchedUpdate(set = { "id": lit("-1")})\
    .whenNotMatchedInsert(values = { "id": col("newData.id") })\
    .execute()

delta_table.toDF().show(100)
var newData = spark.Range(20).As("newData");

deltaTable
    .As("oldData")
    .Merge(newData, "oldData.id = newData.id")
    .WhenMatched()
        .Update(new Dictionary<string, Column>() {{"id", Lit("-1")}})
    .WhenNotMatched()
        .Insert(new Dictionary<string, Column>() {{"id", Col("newData.id")}})
    .Execute();

deltaTable.ToDF().Show(100);
val newData = spark.range(0, 20).toDF

deltaTable.as("oldData").
  merge(
    newData.as("newData"),
    "oldData.id = newData.id").
  whenMatched.
  update(Map("id" -> lit(-1))).
  whenNotMatched.
  insert(Map("id" -> col("newData.id"))).
  execute()

deltaTable.toDF.show()

Sonuçlar:

ID
18
15
19
2
1
6
8
3
-1
10
13
0
16
4
-1
12
11
14
-1
17

Burada mevcut verilerin bir bileşimi vardır. Mevcut verilere update(WhenMatched) kod yolunda -1 değeri atanmıştır. Kod parçacığının en üstünde oluşturulan ve ekleme kodu yolu (WhenNotMatched) aracılığıyla eklenen yeni veriler de eklendi.

Geçmiş

Delta Lake's, bir tablonun geçmişinin araştırılabilmesini sağlar. Başka bir ifadeyle, temel delta tablosunda yapılan değişiklikler. Aşağıdaki hücre geçmişi incelemenin ne kadar kolay olduğunu gösterir.

delta_table.history().show(20, 1000, False)
deltaTable.History().Show(20, 1000, false);
deltaTable.history.show(false)

Sonuçlar:

sürüm timestamp userId userName operation operationParameters not defteri clusterId readVersion ısolationlevel isBlindAppend
4 2020-04-25 00:36:27 null null BİRLEŞTİRME [koşul -> (oldData.ID = newData.ID)] null null null 3 null yanlış
3 2020-04-25 00:36:08 null null DELETE [koşul -> ["((ID % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]] null null null 2 null yanlış
2 2020-04-25 00:35:51 null null UPDATE [koşul -> ((ID#744L % cast(2 as bigint)) = cast(0 as bigint))] null null null 1 null yanlış
1 2020-04-25 00:35:05 null null YAZMAK [mod -> Üzerine yaz, partitionBy -> []] null null null 0 null yanlış
0 2020-04-25 00:34:34 null null YAZMAK [mod -> ErrorIfExists, partitionBy -> []] null null null null null true

Burada yukarıdaki kod parçacıkları üzerinde yapılan tüm değişiklikleri görebilirsiniz.

Time Travel kullanarak verilerin eski sürümlerini okuma

Zaman Yolculuğu adlı bir özelliği kullanarak Delta Lake tablonuzun önceki anlık görüntülerini sorgulamak mümkündür. Üzerine yazdığınız verilere erişmek istiyorsanız, versionAsOf seçeneğini kullanarak ilk veri kümesinin üzerine yazmadan önce tablonun anlık görüntüsünü sorgulayabilirsiniz.

Aşağıdaki hücreyi çalıştırdıktan sonra, üzerine yazmadan önce ilk veri kümesini görmeniz gerekir. Time Travel, delta lake işlem günlüğünün gücünden yararlanarak artık tabloda bulunmayan verilere erişen güçlü bir özelliktir. Sürüm 0 seçeneğinin kaldırılması (veya sürüm 1'in belirtilmesi) daha yeni verileri yeniden görmenize olanak verir. Daha fazla bilgi için bkz . Tablonun eski anlık görüntüsünü sorgulama.

df = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
df.show()
var df = spark.Read().Format("delta").Option("versionAsOf", 0).Load(deltaTablePath);
df.Show();
val df = spark.read.format("delta").option("versionAsOf", 0).load(deltaTablePath)
df.show()

Sonuçlar:

ID
0
1
4
3
2

Burada verilerin en eski sürümüne geri döndüğünüz görebilirsiniz.

Tabloya veri akışı yazma

Spark'ın Yapılandırılmış Akış özelliğini kullanarak delta lake tablosuna da yazabilirsiniz. Delta Lake işlem günlüğü, tabloda eşzamanlı olarak çalışan başka akışlar veya toplu sorgular olsa bile tam olarak bir kez işlemeyi garanti eder. Varsayılan olarak, akışlar tabloya yeni kayıtlar ekleyen ekleme modunda çalışır.

Yapılandırılmış Akış ile Delta Lake tümleştirmesi hakkında daha fazla bilgi için bkz. Tablo Akışı Okumaları ve Yazmaları.

Aşağıdaki hücrelerde şunları yapıyoruz:

  • Hücre 30 Yeni eklenen verileri göster
  • Hücre 31 Geçmişi inceleme
  • Hücre 32 Yapılandırılmış akış işini durdurma
  • Hücre 33 İnceleme geçmişi <--Eklemelerin durdurulduğunu fark edeceksiniz

İlk olarak, bir dizi oluşturmak ve işin Delta Tablonuza yazılması için basit bir Spark Streaming işi ayarlayacaksınız.

streaming_df = spark.readStream.format("rate").load()
stream = streaming_df\
    .selectExpr("value as id")\
    .writeStream\
    .format("delta")\
    .option("checkpointLocation", "/tmp/checkpoint-{0}".format(session_id))\
    .start(delta_table_path)
var streamingDf = spark.ReadStream().Format("rate").Load();
var stream = streamingDf.SelectExpr("value as id").WriteStream().Format("delta").Option("checkpointLocation", $"/tmp/checkpoint-{sessionId}").Start(deltaTablePath);
val streamingDf = spark.readStream.format("rate").load()
val stream = streamingDf.select($"value" as "id").writeStream.format("delta").option("checkpointLocation", s"/tmp/checkpoint-$sessionId").start(deltaTablePath)

Tablodan değişiklik akışını okuma

Akış Delta Lake tablosuna yazılırken, bu tablodan akış kaynağı olarak da okuyabilirsiniz. Örneğin, Delta Lake tablosunda yapılan tüm değişiklikleri yazdıran başka bir akış sorgusu başlatabilirsiniz.

delta_table.toDF().sort(col("id").desc()).show(100)
deltaTable.ToDF().Sort(Col("id").Desc()).Show(100);
deltaTable.toDF.sort($"id".desc).show

Sonuçlar:

ID
19
18
17
16
15
14
13
12
11
10
8
6
4
3
2
1
0
-1
-1
-1
delta_table.history().drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").show(20, 1000, False)
deltaTable.History().Drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").Show(20, 1000, false);
deltaTable.history.show

Sonuçlar:

sürüm timestamp operation operationParameters readVersion
5 2020-04-25 00:37:09 AKıŞ GÜNCELLEŞTIRMESI [outputMode -> Append, queryId -> d26b4f8a-7e5a-44f2-a5fb-23a7bd02aef7, epochId -> 0] 4
4 2020-04-25 00:36:27 BİRLEŞTİRME [koşul -> (oldData.id = newData.id)] 3
3 2020-04-25 00:36:08 DELETE [koşul -> ["((id % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]] 2
2 2020-04-25 00:35:51 UPDATE [koşul -> ((id#744L % cast(2 as bigint)) = cast(0 as bigint))] 1
1 2020-04-25 00:35:05 YAZMAK [mod -> Üzerine yaz, partitionBy -> []] 0
0 2020-04-25 00:34:34 YAZMAK [mod -> ErrorIfExists, partitionBy -> []] null

Burada, geçmiş görünümünün görüntüleme deneyimini basitleştirmek için daha az ilgi çekici sütunlardan bazılarını bırakıyorsunuz.

stream.stop()
delta_table.history().drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").show(100, 1000, False)
stream.Stop();
deltaTable.History().Drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").Show(100, 1000, false);
stream.stop
deltaTable.history.show

Sonuçlar:

sürüm timestamp operation operationParameters readVersion
5 2020-04-25 00:37:09 AKıŞ GÜNCELLEŞTIRMESI [outputMode -> Append, queryId -> d26b4f8a-7e5a-44f2-a5fb-23a7bd02aef7, epochId -> 0] 4
4 2020-04-25 00:36:27 BİRLEŞTİRME [koşul -> (oldData.id = newData.id)] 3
3 2020-04-25 00:36:08 DELETE [koşul -> ["((id % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]] 2
2 2020-04-25 00:35:51 UPDATE [koşul -> ((id#744L % cast(2 as bigint)) = cast(0 as bigint))] 1
1 2020-04-25 00:35:05 YAZMAK [mod -> Üzerine yaz, partitionBy -> []] 0
0 2020-04-25 00:34:34 YAZMAK [mod -> ErrorIfExists, partitionBy -> []] null

Parquet'i Delta'ya Dönüştür

Parquet biçiminden Delta'ya yerinde dönüştürme yapabilirsiniz.

Burada mevcut tablonun delta biçiminde olup olmadığını test edeceksiniz.

parquet_path = "/parquet/parquet-table-{0}".format(session_id)
data = spark.range(0,5)
data.write.parquet(parquet_path)
DeltaTable.isDeltaTable(spark, parquet_path)
var parquetPath = $"/parquet/parquet-table-{sessionId}";
var data = spark.Range(0,5);
data.Write().Parquet(parquetPath);
DeltaTable.IsDeltaTable(parquetPath)
val parquetPath = s"/parquet/parquet-table-$sessionId"
val data = spark.range(0,5)
data.write.parquet(parquetPath)
DeltaTable.isDeltaTable(parquetPath)

Sonuçlar:

Yanlış

Şimdi verileri delta biçimine dönüştürecek ve çalıştığını doğrulayacağız.

DeltaTable.convertToDelta(spark, "parquet.`{0}`".format(parquet_path))
DeltaTable.isDeltaTable(spark, parquet_path)
DeltaTable.ConvertToDelta(spark, $"parquet.`{parquetPath}`");
DeltaTable.IsDeltaTable(parquetPath)
DeltaTable.convertToDelta(spark, s"parquet.`$parquetPath`")
DeltaTable.isDeltaTable(parquetPath)

Sonuçlar:

Doğru

SQL desteği

Delta, SQL aracılığıyla tablo yardımcı programı komutlarını destekler. SQL'i kullanarak:

  • DeltaTable'ın geçmişini alma
  • DeltaTable'ın vakumu
  • Parquet dosyasını Delta'ya dönüştürme
spark.sql("DESCRIBE HISTORY delta.`{0}`".format(delta_table_path)).show()
spark.Sql($"DESCRIBE HISTORY delta.`{deltaTablePath}`").Show();
spark.sql(s"DESCRIBE HISTORY delta.`$deltaTablePath`").show()

Sonuçlar:

sürüm timestamp userId userName operation operationParameters not defteri clusterId readVersion ısolationlevel isBlindAppend
5 2020-04-25 00:37:09 null null AKıŞ GÜNCELLEŞTIRMESI [outputMode -> Ap... null null null 4 null true
4 2020-04-25 00:36:27 null null BİRLEŞTİRME [koşul -> (ol... null null null 3 null yanlış
3 2020-04-25 00:36:08 null null DELETE [koşul -> ["(... null null null 2 null yanlış
2 2020-04-25 00:35:51 null null UPDATE [koşul -> ((i... null null null 1 null yanlış
1 2020-04-25 00:35:05 null null YAZMAK [mod -> Üzerine yaz... null null null 0 null yanlış
0 2020-04-25 00:34:34 null null YAZMAK [mod -> ErrorIfE... null null null null null true
spark.sql("VACUUM delta.`{0}`".format(delta_table_path)).show()
spark.Sql($"VACUUM delta.`{deltaTablePath}`").Show();
spark.sql(s"VACUUM delta.`$deltaTablePath`").show()

Sonuçlar:

path
abfss://data@arca...

Şimdi, tablonun delta biçimli bir tablo olmadığını doğrulayacağız. Ardından Spark SQL kullanarak tabloyu delta biçimine dönüştürecek ve doğru şekilde dönüştürüldüğünü onaylayacaksınız.

parquet_id = random.randint(0,1000)
parquet_path = "/parquet/parquet-table-{0}-{1}".format(session_id, parquet_id)
data = spark.range(0,5)
data.write.parquet(parquet_path)
DeltaTable.isDeltaTable(spark, parquet_path)
spark.sql("CONVERT TO DELTA parquet.`{0}`".format(parquet_path))
DeltaTable.isDeltaTable(spark, parquet_path)
var parquetId =  (new Random()).Next(10000000);
var parquetPath = $"/parquet/parquet-table-{sessionId}-{parquetId}";
var data = spark.Range(0,5);
data.Write().Parquet(parquetPath);
DeltaTable.IsDeltaTable(parquetPath);
spark.Sql($"CONVERT TO DELTA parquet.`{parquetPath}`");
DeltaTable.IsDeltaTable(parquetPath);
val parquetId = scala.util.Random.nextInt(1000)
val parquetPath = s"/parquet/parquet-table-$sessionId-$parquetId"
val data = spark.range(0,5)
data.write.parquet(parquetPath)
DeltaTable.isDeltaTable(parquetPath)
spark.sql(s"CONVERT TO DELTA parquet.`$parquetPath`")
DeltaTable.isDeltaTable(parquetPath)

Sonuçlar:

Doğru

Tüm belgeler için Bkz. Delta Lake Belgeleri Sayfası

Daha fazla bilgi için bkz . Delta Lake Project.

Sonraki adımlar