Descrição geral do Delta Lake da Fundação Linux

Este artigo foi adaptado para maior clareza por parte do seu homólogo original aqui. Este artigo ajuda-o a explorar rapidamente as principais funcionalidades do Delta Lake. O artigo fornece fragmentos de código que mostram como ler e escrever em tabelas do Delta Lake a partir de consultas interativas, em lote e de transmissão em fluxo. Os fragmentos de código também estão disponíveis num conjunto de blocos de notas PySpark aqui, Scala aqui e C# aqui

Eis o que vamos abordar:

  • Criar uma tabela
  • Ler dados
  • Atualizar dados da tabela
  • Substituir dados da tabela
  • Atualização condicional sem substituição
  • Ler versões mais antigas de dados com o Time Travel
  • Escrever um fluxo de dados numa tabela
  • Ler um fluxo de alterações a partir de uma tabela
  • Suporte do SQL

Configuração

Certifique-se de que modifica o seguinte conforme adequado para o seu ambiente.

import random

session_id = random.randint(0,1000000)
delta_table_path = "/delta/delta-table-{0}".format(session_id)

delta_table_path
var sessionId = (new Random()).Next(10000000);
var deltaTablePath = $"/delta/delta-table-{sessionId}";

deltaTablePath
val sessionId = scala.util.Random.nextInt(1000000)
val deltaTablePath = s"/delta/delta-table-$sessionId";

Resulta em:

"/delta/delta-table-335323"

Criar uma tabela

Para criar uma tabela do Delta Lake, escreva um DataFrame num DataFrame no formato delta. Pode alterar o formato de Parquet, CSV, JSON, etc., para delta.

O código que se segue mostra-lhe como criar uma nova tabela do Delta Lake com o esquema inferido do DataFrame.

data = spark.range(0,5)
data.show()
data.write.format("delta").save(delta_table_path)
var data = spark.Range(0,5);
data.Show();
data.Write().Format("delta").Save(deltaTablePath);
val data = spark.range(0, 5)
data.show
data.write.format("delta").save(deltaTablePath)

Resulta em:

ID
0
1
2
3
4

Ler dados

Pode ler dados na sua tabela do Delta Lake ao especificar o caminho para os ficheiros e o formato delta.

df = spark.read.format("delta").load(delta_table_path)
df.show()
var df = spark.Read().Format("delta").Load(deltaTablePath);
df.Show()
val df = spark.read.format("delta").load(deltaTablePath)
df.show()

Resulta em:

ID
1
3
4
0
2

A ordem dos resultados é diferente de acima, uma vez que não foi especificada explicitamente nenhuma ordem antes de exportar os resultados.

Atualizar dados da tabela

O Delta Lake suporta várias operações para modificar tabelas com APIs de DataFrame padrão. Estas operações são um dos melhoramentos que o formato delta adiciona. O exemplo seguinte executa uma tarefa de lote para substituir os dados na tabela.

data = spark.range(5,10)
data.write.format("delta").mode("overwrite").save(delta_table_path)
df.show()
var data = spark.Range(5,10);
data.Write().Format("delta").Mode("overwrite").Save(deltaTablePath);
df.Show();
val data = spark.range(5, 10)
data.write.format("delta").mode("overwrite").save(deltaTablePath)
df.show()

Resulta em:

ID
7
8
5
9
6

Aqui, pode ver que todos os cinco registos foram atualizados para conter novos valores.

Guardar como tabelas de catálogo

O Delta Lake pode escrever em tabelas de catálogo geridas ou externas.

data.write.format("delta").saveAsTable("ManagedDeltaTable")
spark.sql("CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '{0}'".format(delta_table_path))
spark.sql("SHOW TABLES").show()
data.Write().Format("delta").SaveAsTable("ManagedDeltaTable");
spark.Sql($"CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '{deltaTablePath}'");
spark.Sql("SHOW TABLES").Show();
data.write.format("delta").saveAsTable("ManagedDeltaTable")
spark.sql(s"CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '$deltaTablePath'")
spark.sql("SHOW TABLES").show

Resulta em:

base de dados tableName isTemporary
predefinição externaldeltatable false
predefinição geridodeltatable false

Com este código, criou uma nova tabela no catálogo a partir de um dataframe existente, referido como uma tabela gerida. Em seguida, definiu uma nova tabela externa no catálogo que utiliza uma localização existente, referida como uma tabela externa. No resultado, pode ver que ambas as tabelas, independentemente da forma como foram criadas, são listadas no catálogo.

Agora, pode ver as propriedades expandidas de ambas as tabelas

spark.sql("DESCRIBE EXTENDED ManagedDeltaTable").show(truncate=False)
spark.Sql("DESCRIBE EXTENDED ManagedDeltaTable").Show(truncate: 0);
spark.sql("DESCRIBE EXTENDED ManagedDeltaTable").show(truncate=false)

Resulta em:

col_name data_type comentário
ID bigint nulo
Informações Detalhadas da Tabela
Base de Dados predefinição
Tabela geridodeltatable
Proprietário trusted-service-user
Hora de Criação Sáb 25 00:35:34 UTC 2020
Último Acesso Qui Jan 01 00:00:00 UTC 1970
Criada Por Spark 2.4.4.2.6.99.201-11401300
Tipo GERIDO
Fornecedor delta
Propriedades da Tabela [transient_lastDdlTime=1587774934]
Estatísticas 2407 bytes
Localização abfss://data@<data lake.dfs.core.windows.net/synapse/workspaces/>< workspace name>/warehouse/manageddeltatable
Biblioteca serde org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat org.apache.hadoop.mapred.SequenceFileInputFormat
OutputFormat org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Propriedades de Armazenamento [serialization.format=1]
spark.sql("DESCRIBE EXTENDED ExternalDeltaTable").show(truncate=False)
spark.Sql("DESCRIBE EXTENDED ExternalDeltaTable").Show(truncate: 0);
spark.sql("DESCRIBE EXTENDED ExternalDeltaTable").show(truncate=false)

Resultados em:

col_name data_type comentário
ID bigint nulo
Informações Detalhadas da Tabela
Base de Dados predefinição
Tabela externaldeltatable
Proprietário trusted-service-user
Hora de Criação Sáb 25 00:35:38 UTC 2020
Último Acesso Thu Jan 01 00:00:00 UTC 1970
Criada Por Spark 2.4.4.2.6.99.201-11401300
Tipo EXTERNO
Fornecedor DELTA
Propriedades da Tabela [transient_lastDdlTime=1587774938]
Localização abfss://data@<data lake.dfs.core.windows.net/delta/delta-table-587152>
Biblioteca serde org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat org.apache.hadoop.mapred.SequenceFileInputFormat
OutputFormat org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Propriedades de Armazenamento [serialization.format=1]

Atualização condicional sem substituição

O Delta Lake fornece APIs programáticas para atualização condicional, eliminação e intercalação (este comando é normalmente referido como um upsert) em tabelas.

from delta.tables import *
from pyspark.sql.functions import *

delta_table = DeltaTable.forPath(spark, delta_table_path)

delta_table.update(
  condition = expr("id % 2 == 0"),
  set = { "id": expr("id + 100") })
delta_table.toDF().show()
using Microsoft.Spark.Extensions.Delta;
using Microsoft.Spark.Extensions.Delta.Tables;
using Microsoft.Spark.Sql;
using static Microsoft.Spark.Sql.Functions;

var deltaTable = DeltaTable.ForPath(deltaTablePath);

deltaTable.Update(
  condition: Expr("id % 2 == 0"),
  set: new Dictionary<string, Column>(){{ "id", Expr("id + 100") }});
deltaTable.ToDF().Show();
import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTable = DeltaTable.forPath(deltaTablePath)

// Update every even value by adding 100 to it
deltaTable.update(
  condition = expr("id % 2 == 0"),
  set = Map("id" -> expr("id + 100")))
deltaTable.toDF.show

Resultados em:

ID
106
108
5
7
9

Aqui acabou de adicionar 100 a cada ID par.

delta_table.delete("id % 2 == 0")
delta_table.toDF().show()
deltaTable.Delete(condition: Expr("id % 2 == 0"));
deltaTable.ToDF().Show();
deltaTable.delete(condition = expr("id % 2 == 0"))
deltaTable.toDF.show

Resultados em:

ID
5
7
9

Repare que todas as linhas pares foram eliminadas.

new_data = spark.range(0,20).alias("newData")

delta_table.alias("oldData")\
    .merge(new_data.alias("newData"), "oldData.id = newData.id")\
    .whenMatchedUpdate(set = { "id": lit("-1")})\
    .whenNotMatchedInsert(values = { "id": col("newData.id") })\
    .execute()

delta_table.toDF().show(100)
var newData = spark.Range(20).As("newData");

deltaTable
    .As("oldData")
    .Merge(newData, "oldData.id = newData.id")
    .WhenMatched()
        .Update(new Dictionary<string, Column>() {{"id", Lit("-1")}})
    .WhenNotMatched()
        .Insert(new Dictionary<string, Column>() {{"id", Col("newData.id")}})
    .Execute();

deltaTable.ToDF().Show(100);
val newData = spark.range(0, 20).toDF

deltaTable.as("oldData").
  merge(
    newData.as("newData"),
    "oldData.id = newData.id").
  whenMatched.
  update(Map("id" -> lit(-1))).
  whenNotMatched.
  insert(Map("id" -> col("newData.id"))).
  execute()

deltaTable.toDF.show()

Resultados em:

ID
18
15
19
2
1
6
8
3
-1
10
13
0
16
4
-1
12
11
14
-1
17

Aqui tem uma combinação dos dados existentes. Os dados existentes foram atribuídos ao valor -1 no caminho de código update(WhenMatched). Os novos dados que foram criados na parte superior do fragmento e que foram adicionados através do caminho de código de inserção (WhenNotMatched), também foram adicionados.

Histórico

O Delta Lake's tem a capacidade de permitir olhar para a história de uma tabela. Ou seja, as alterações efetuadas à Tabela Delta subjacente. A célula abaixo mostra como é simples inspecionar o histórico.

delta_table.history().show(20, 1000, False)
deltaTable.History().Show(20, 1000, false);
deltaTable.history.show(false)

Resultados em:

versão carimbo de data/hora userId userName operation operationParameters tarefa bloco de notas clusterId readVersion isolationLevel isBlindAppend
4 2020-04-25 00:36:27 nulo nulo INTERCALAR [predicado -> (oldData.ID = newData.ID)] nulo nulo nulo 3 nulo false
3 2020-04-25 00:36:08 nulo nulo DELETE [predicado -> ["((ID % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]] nulo nulo nulo 2 nulo false
2 2020-04-25 00:35:51 nulo nulo UPDATE [predicado -> ((ID#744L % cast(2 as bigint)) = cast(0 as bigint))] nulo nulo nulo 1 nulo false
1 2020-04-25 00:35:05 nulo nulo ESCREVER [modo -> Substituir, partitionBy -> []] nulo nulo nulo 0 nulo false
0 2020-04-25 00:34:34 nulo nulo ESCREVER [modo -> ErrorIfExists, partitionBy -> []] nulo nulo nulo nulo nulo true

Aqui, pode ver todas as modificações efetuadas nos fragmentos de código acima.

Ler versões mais antigas de dados com o Time Travel

É possível consultar instantâneos anteriores da sua tabela do Delta Lake através de uma funcionalidade denominada Viagem no Tempo. Se quiser aceder aos dados que substituiu, pode consultar um instantâneo da tabela antes de substituir o primeiro conjunto de dados com a opção versionAsOf.

Depois de executar a célula abaixo, deverá ver o primeiro conjunto de dados de antes de a substituir. O Time Travel é uma funcionalidade avançada que tira partido do poder do registo de transações do Delta Lake para aceder a dados que já não estão na tabela. Remover a opção da versão 0 (ou especificar a versão 1) permitir-lhe-ia ver novamente os dados mais recentes. Para obter mais informações, veja Consultar um instantâneo mais antigo de uma tabela.

df = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
df.show()
var df = spark.Read().Format("delta").Option("versionAsOf", 0).Load(deltaTablePath);
df.Show();
val df = spark.read.format("delta").option("versionAsOf", 0).load(deltaTablePath)
df.show()

Resulta em:

ID
0
1
4
3
2

Aqui, pode ver que voltou à versão mais antiga dos dados.

Escrever um fluxo de dados numa tabela

Também pode escrever numa tabela do Delta Lake com a Transmissão em Fluxo Estruturada do Spark. O registo de transações do Delta Lake garante exatamente uma vez o processamento, mesmo quando existem outros fluxos ou consultas em lote em execução em simultâneo na tabela. Por predefinição, os fluxos são executados no modo de acréscimo, o que adiciona novos registos à tabela.

Para obter mais informações sobre a integração do Delta Lake com a Transmissão em Fluxo Estruturada, veja Leituras e Escritas de Transmissão em Fluxo de Tabelas.

Nas células abaixo, eis o que estamos a fazer:

  • Célula 30 Mostrar os dados recém-anexados
  • Célula 31 Inspecionar histórico
  • Célula 32 Parar a tarefa de transmissão em fluxo estruturada
  • Célula 33 Inspecionar histórico <- Verá que os acréscimos pararam

Primeiro, vai configurar uma tarefa simples de Transmissão em Fluxo do Spark para gerar uma sequência e fazer com que a tarefa escreva na sua Tabela Delta.

streaming_df = spark.readStream.format("rate").load()
stream = streaming_df\
    .selectExpr("value as id")\
    .writeStream\
    .format("delta")\
    .option("checkpointLocation", "/tmp/checkpoint-{0}".format(session_id))\
    .start(delta_table_path)
var streamingDf = spark.ReadStream().Format("rate").Load();
var stream = streamingDf.SelectExpr("value as id").WriteStream().Format("delta").Option("checkpointLocation", $"/tmp/checkpoint-{sessionId}").Start(deltaTablePath);
val streamingDf = spark.readStream.format("rate").load()
val stream = streamingDf.select($"value" as "id").writeStream.format("delta").option("checkpointLocation", s"/tmp/checkpoint-$sessionId").start(deltaTablePath)

Ler um fluxo de alterações a partir de uma tabela

Enquanto o fluxo está a escrever na tabela do Delta Lake, também pode ler a partir dessa tabela como uma origem de transmissão em fluxo. Por exemplo, pode iniciar outra consulta de transmissão em fluxo que imprima todas as alterações efetuadas à tabela do Delta Lake.

delta_table.toDF().sort(col("id").desc()).show(100)
deltaTable.ToDF().Sort(Col("id").Desc()).Show(100);
deltaTable.toDF.sort($"id".desc).show

Resulta em:

ID
19
18
17
16
15
14
13
12
11
10
8
6
4
3
2
1
0
-1
-1
-1
delta_table.history().drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").show(20, 1000, False)
deltaTable.History().Drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").Show(20, 1000, false);
deltaTable.history.show

Resulta em:

versão carimbo de data/hora operation operationParameters readVersion
5 2020-04-25 00:37:09 ATUALIZAÇÃO DE TRANSMISSÃO EM FLUXO [outputMode -> Acrescentar, queryId -> d26b4f8a-7e5a-44f2-a5fb-23a7bd02aef7, epochId -> 0] 4
4 2020-04-25 00:36:27 INTERCALAR [predicado -> (oldData.id = newData.id)] 3
3 2020-04-25 00:36:08 DELETE [predicado -> ["((id % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]] 2
2 2020-04-25 00:35:51 UPDATE [predicado -> ((id#744L % cast(2 as bigint)) = cast(0 as bigint))] 1
1 2020-04-25 00:35:05 ESCREVER [modo -> Substituir, partitionBy -> []] 0
0 2020-04-25 00:34:34 ESCREVER [modo -> ErrorIfExists, partitionBy -> []] nulo

Aqui, está a remover algumas das colunas menos interessantes para simplificar a experiência de visualização da vista do histórico.

stream.stop()
delta_table.history().drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").show(100, 1000, False)
stream.Stop();
deltaTable.History().Drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").Show(100, 1000, false);
stream.stop
deltaTable.history.show

Resulta em:

versão carimbo de data/hora operation operationParameters readVersion
5 2020-04-25 00:37:09 ATUALIZAÇÃO DE TRANSMISSÃO EM FLUXO [outputMode -> Acrescentar, queryId -> d26b4f8a-7e5a-44f2-a5fb-23a7bd02aef7, epochId -> 0] 4
4 2020-04-25 00:36:27 INTERCALAR [predicado -> (oldData.id = newData.id)] 3
3 2020-04-25 00:36:08 DELETE [predicado -> ["((id % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]] 2
2 2020-04-25 00:35:51 UPDATE [predicado -> ((id#744L % cast(2 as bigint)) = cast(0 as bigint))] 1
1 2020-04-25 00:35:05 ESCREVER [modo -> Substituir, partitionBy -> []] 0
0 2020-04-25 00:34:34 ESCREVER [modo -> ErrorIfExists, partitionBy -> []] nulo

Converter Parquet em Delta

Pode fazer uma conversão no local do formato Parquet para Delta.

Aqui, irá testar se a tabela existente está ou não no formato delta.

parquet_path = "/parquet/parquet-table-{0}".format(session_id)
data = spark.range(0,5)
data.write.parquet(parquet_path)
DeltaTable.isDeltaTable(spark, parquet_path)
var parquetPath = $"/parquet/parquet-table-{sessionId}";
var data = spark.Range(0,5);
data.Write().Parquet(parquetPath);
DeltaTable.IsDeltaTable(parquetPath)
val parquetPath = s"/parquet/parquet-table-$sessionId"
val data = spark.range(0,5)
data.write.parquet(parquetPath)
DeltaTable.isDeltaTable(parquetPath)

Resulta em:

Falso

Agora, vai converter os dados em formato delta e verificar se funcionaram.

DeltaTable.convertToDelta(spark, "parquet.`{0}`".format(parquet_path))
DeltaTable.isDeltaTable(spark, parquet_path)
DeltaTable.ConvertToDelta(spark, $"parquet.`{parquetPath}`");
DeltaTable.IsDeltaTable(parquetPath)
DeltaTable.convertToDelta(spark, s"parquet.`$parquetPath`")
DeltaTable.isDeltaTable(parquetPath)

Resulta em:

Verdadeiro

Suporte do SQL

A Delta suporta comandos utilitários de tabela através do SQL. Pode utilizar o SQL para:

  • Obter o histórico de uma DeltaTable
  • Aspirar uma DeltaTable
  • Converter um ficheiro Parquet em Delta
spark.sql("DESCRIBE HISTORY delta.`{0}`".format(delta_table_path)).show()
spark.Sql($"DESCRIBE HISTORY delta.`{deltaTablePath}`").Show();
spark.sql(s"DESCRIBE HISTORY delta.`$deltaTablePath`").show()

Resultados em:

versão carimbo de data/hora userId userName operation operationParameters tarefa bloco de notas clusterId readVersion isolationLevel isBlindAppend
5 2020-04-25 00:37:09 nulo nulo ATUALIZAÇÃO DE TRANSMISSÃO EM FLUXO [outputMode -> Ap... nulo nulo nulo 4 nulo true
4 2020-04-25 00:36:27 nulo nulo INTERCALAR [predicado -> (ol... nulo nulo nulo 3 nulo false
3 2020-04-25 00:36:08 nulo nulo DELETE [predicado -> ["(... nulo nulo nulo 2 nulo false
2 2020-04-25 00:35:51 nulo nulo UPDATE [predicado -> ((i... nulo nulo nulo 1 nulo false
1 2020-04-25 00:35:05 nulo nulo ESCREVER [modo -> Substituir... nulo nulo nulo 0 nulo false
0 2020-04-25 00:34:34 nulo nulo ESCREVER [modo -> ErrorIfE... nulo nulo nulo nulo nulo true
spark.sql("VACUUM delta.`{0}`".format(delta_table_path)).show()
spark.Sql($"VACUUM delta.`{deltaTablePath}`").Show();
spark.sql(s"VACUUM delta.`$deltaTablePath`").show()

Resultados em:

caminho
abfss://data@arca...

Agora, vai verificar se uma tabela não é uma tabela de formato delta. Em seguida, irá converter a tabela em formato delta com o SQL do Spark e confirmar que foi convertida corretamente.

parquet_id = random.randint(0,1000)
parquet_path = "/parquet/parquet-table-{0}-{1}".format(session_id, parquet_id)
data = spark.range(0,5)
data.write.parquet(parquet_path)
DeltaTable.isDeltaTable(spark, parquet_path)
spark.sql("CONVERT TO DELTA parquet.`{0}`".format(parquet_path))
DeltaTable.isDeltaTable(spark, parquet_path)
var parquetId =  (new Random()).Next(10000000);
var parquetPath = $"/parquet/parquet-table-{sessionId}-{parquetId}";
var data = spark.Range(0,5);
data.Write().Parquet(parquetPath);
DeltaTable.IsDeltaTable(parquetPath);
spark.Sql($"CONVERT TO DELTA parquet.`{parquetPath}`");
DeltaTable.IsDeltaTable(parquetPath);
val parquetId = scala.util.Random.nextInt(1000)
val parquetPath = s"/parquet/parquet-table-$sessionId-$parquetId"
val data = spark.range(0,5)
data.write.parquet(parquetPath)
DeltaTable.isDeltaTable(parquetPath)
spark.sql(s"CONVERT TO DELTA parquet.`$parquetPath`")
DeltaTable.isDeltaTable(parquetPath)

Resultados em:

Verdadeiro

Para obter a documentação completa, consulte a Página de Documentação do Delta Lake

Para obter mais informações, veja Delta Lake Project.

Passos seguintes