Delta Live Tables Python-språkreferens

Den här artikeln innehåller information om Programmeringsgränssnittet för Delta Live Tables Python.

Information om SQL-API:et finns i sql-språkreferensen Delta Live Tables.

Mer information om hur du konfigurerar automatisk inläsning finns i Vad är automatisk inläsning?.

Begränsningar

Delta Live Tables Python-gränssnittet har följande begränsningar:

  • Python table och view funktionerna måste returnera en DataFrame. Vissa funktioner som körs på DataFrames returnerar inte DataFrames och bör inte användas. Eftersom DataFrame-transformeringar körs efter att det fullständiga dataflödesdiagrammet har lösts kan användning av sådana åtgärder ha oavsiktliga biverkningar. Dessa åtgärder omfattar funktioner som collect(), count(), toPandas(), save()och saveAsTable(). Du kan dock inkludera dessa funktioner utanför table eller view funktionsdefinitioner eftersom den här koden körs en gång under grafinitieringsfasen.
  • Funktionen pivot() stöds inte. Åtgärden pivot i Spark kräver ivrig inläsning av indata för att beräkna schemat för utdata. Den här funktionen stöds inte i Delta Live Tables.

dlt Importera Python-modulen

Python-funktioner för Delta Live Tables definieras i modulen dlt . Dina pipelines som implementeras med Python-API:et måste importera den här modulen:

import dlt

Skapa en materialiserad vy eller en strömmande tabell för Delta Live Tables

I Python avgör Delta Live Tables om en datauppsättning ska uppdateras som en materialiserad vy eller en strömmande tabell baserat på den definierande frågan. Dekoratören @table används för att definiera både materialiserade vyer och strömmande tabeller.

Om du vill definiera en materialiserad vy i Python gäller du @table för en fråga som utför en statisk läsning mot en datakälla. Om du vill definiera en strömningstabell gäller du @table för en fråga som utför en direktuppspelningsläsning mot en datakälla. Båda datauppsättningstyperna har samma syntaxspecifikation enligt följande:

import dlt

@dlt.table(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value", "<key" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  schema="schema-definition",
  temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Skapa en Delta Live Tables-vy

Om du vill definiera en vy i Python använder du dekoratören @view . Precis som dekoratören @table kan du använda vyer i Delta Live Tables för antingen statiska eller strömmande datauppsättningar. Följande är syntaxen för att definiera vyer med Python:

import dlt

@dlt.view(
  name="<name>",
  comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Exempel: Definiera tabeller och vyer

Om du vill definiera en tabell eller vy i Python använder du dekoratören @dlt.view eller @dlt.table på en funktion. Du kan använda funktionsnamnet eller parametern name för att tilldela tabellen eller visningsnamnet. I följande exempel definieras två olika datauppsättningar: en vy med namnet taxi_raw som tar en JSON-fil som indatakälla och en tabell med namnet filtered_data som tar taxi_raw vyn som indata:

import dlt

@dlt.view
def taxi_raw():
  return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")

# Use the function name as the table name
@dlt.table
def filtered_data():
  return dlt.read("taxi_raw").where(...)

# Use the name parameter as the table name
@dlt.table(
  name="filtered_data")
def create_filtered_data():
  return dlt.read("taxi_raw").where(...)

Exempel: Få åtkomst till en datauppsättning som definierats i samma pipeline

Förutom att läsa från externa datakällor kan du komma åt datauppsättningar som definierats i samma pipeline med funktionen Delta Live Tables read() . I följande exempel visas hur du skapar en customers_filtered datauppsättning med hjälp av read() funktionen:

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredA():
  return dlt.read("customers_raw").where(...)

Du kan också använda spark.table() funktionen för att komma åt en datauppsättning som definierats i samma pipeline. När du använder spark.table() funktionen för att komma åt en datauppsättning som definierats i pipelinen, förbereder nyckelordet i funktionsargumentet LIVE till datamängdens namn:

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredB():
  return spark.table("LIVE.customers_raw").where(...)

Exempel: Läsa från en tabell som är registrerad i ett metaarkiv

Om du vill läsa data från en tabell som är registrerad i Hive-metaarkivet utelämnar du nyckelordet i funktionsargumentet LIVE och kvalificerar eventuellt tabellnamnet med databasnamnet:

@dlt.table
def customers():
  return spark.table("sales.customers").where(...)

Ett exempel på läsning från en Unity Catalog-tabell finns i Mata in data i en Unity Catalog-pipeline.

Exempel: Få åtkomst till en datauppsättning med hjälp av spark.sql

Du kan också returnera en datauppsättning med ett spark.sql uttryck i en frågefunktion. Om du vill läsa från en intern datauppsättning förbereder du LIVE. till datamängdens namn:

@dlt.table
def chicago_customers():
  return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Chicago'")

Skriva till en strömmande tabell från flera källströmmar

Viktigt!

Delta Live Tables-stöd för @append_flow finns i offentlig förhandsversion.

Du kan använda dekoratören @append_flow för att skriva till en strömmande tabell från flera strömmande källor för att göra följande:

  • Lägg till och ta bort strömmande källor som lägger till data i en befintlig strömmande tabell utan att kräva en fullständig uppdatering. Du kan till exempel ha en tabell som kombinerar regionala data från varje region som du arbetar i. När nya regioner distribueras kan du lägga till nya regiondata i tabellen utan att utföra en fullständig uppdatering.
  • Uppdatera en strömmande tabell genom att lägga till saknade historiska data (återfyllnad). Du har till exempel en befintlig strömningstabell som skrivs till av ett Apache Kafka-ämne. Du har också historiska data lagrade i en tabell som du behöver infoga exakt en gång i strömningstabellen och du kan inte strömma data eftersom du behöver utföra en komplex aggregering innan du infogar data.

Om du vill skapa en måltabell för posternas utdata genom bearbetningen @append_flowanvänder du funktionen create_streaming_table().

Kommentar

Om du behöver definiera datakvalitetsbegränsningar med förväntningar definierar du förväntningarna på måltabellen som en del av create_streaming_table() funktionen. Du kan inte definiera förväntningar i @append_flow definitionen.

Följande är syntaxen för @append_flow:

import dlt

dlt.create_streaming_table("<target-table-name>")

@dlt.append_flow(
  target = "<target-table-name>",
  name = "<flow-name>", # optional, defaults to function name
  spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
  comment = "<comment") # optional
def <function-name>():
  return (<streaming query>)

Exempel: Skriva till en strömmande tabell från flera Kafka-ämnen

I följande exempel skapas en strömningstabell med namnet kafka_target och skrivningar till den strömmande tabellen från två Kafka-ämnen:

import dlt

dlt.create_streaming_table("kafka_target")

# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
  return (
    spark.readStream.format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic1")
      .load()
  )

@dlt.append_flow(target = "kafka_target")
def topic2():
  return (
    spark.readStream.format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic2")
      .load()
  )

Exempel: Kör en engångsdatapåfyllning

I följande exempel körs en fråga för att lägga till historiska data i en strömmande tabell:

Kommentar

Ta bort frågan när du har kört pipelinen en gång för att säkerställa en sann engångsefterfyllnad när återfyllnadsfrågan är en del av en pipeline som körs enligt schemat eller kontinuerligt. Om du vill lägga till nya data om de kommer till katalogen för återfyllnad lämnar du frågan på plats.

import dlt

@dlt.table()
def csv_target():
  return spark.readStream.format("csv").load("path/to/sourceDir")

@dlt.append_flow(target = "csv_target")
def backfill():
  return spark.readStream.format("csv").load("path/to/backfill/data/dir")

Skapa en tabell som ska användas som mål för strömningsåtgärder

create_streaming_table() Använd funktionen för att skapa en måltabell för posters utdata genom strömningsåtgärder, inklusive apply_changes() och @append_flow utdataposter.

Kommentar

Funktionerna create_target_table() och create_streaming_live_table() är inaktuella. Databricks rekommenderar att du uppdaterar befintlig kod för att använda create_streaming_table() funktionen.

create_streaming_table(
  name = "<table-name>",
  comment = "<comment>"
  spark_conf={"<key>" : "<value", "<key" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  partition_cols=["<partition-column>", "<partition-column>"],
  path="<storage-location-path>",
  schema="schema-definition",
  expect_all = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"}
)
Argument
name

Typ: str

Tabellnamnet.

Den här parametern krävs.
comment

Typ: str

En valfri beskrivning för tabellen.
spark_conf

Typ: dict

En valfri lista över Spark-konfigurationer för körning av den här frågan.
table_properties

Typ: dict

En valfri lista över tabellegenskaper för tabellen.
partition_cols

Typ: array

En valfri lista över en eller flera kolumner som ska användas för partitionering av tabellen.
path

Typ: str

En valfri lagringsplats för tabelldata. Om det inte anges kommer systemet som standard att vara platsen för pipelinelagringen.
schema

Typ: str eller StructType

En valfri schemadefinition för tabellen. Scheman kan definieras som en SQL DDL-sträng eller med en Python
StructType.
expect_all
expect_all_or_drop
expect_all_or_fail

Typ: dict

Valfria datakvalitetsbegränsningar för tabellen. Se flera förväntningar.

Kontrollera hur tabeller materialiseras

Tabeller ger också ytterligare kontroll över materialiseringen:

  • Ange hur tabeller partitioneras med .partition_cols Du kan använda partitionering för att påskynda frågor.
  • Du kan ange tabellegenskaper när du definierar en vy eller tabell. Se Tabellegenskaper för Delta Live Tables.
  • Ange en lagringsplats för tabelldata med hjälp av inställningen path . Som standard lagras tabelldata på lagringsplatsen för pipelinen om path de inte har angetts.
  • Du kan använda genererade kolumner i schemadefinitionen. Se Exempel: Ange ett schema och partitionskolumner.

Kommentar

För tabeller som är mindre än 1 TB i storlek rekommenderar Databricks att Delta Live Tables kan styra dataorganisationen. Om du inte förväntar dig att tabellen ska växa utöver en terabyte bör du vanligtvis inte ange partitionskolumner.

Exempel: Ange ett schema och partitionskolumner

Du kan också ange ett tabellschema med hjälp av en Python StructType - eller SQL DDL-sträng. När den anges med en DDL-sträng kan definitionen innehålla genererade kolumner.

I följande exempel skapas en tabell med namnet sales med ett schema som angetts med hjälp av en Python StructType:

sales_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True),
  StructField("number_of_line_items", StringType(), True),
  StructField("order_datetime", StringType(), True),
  StructField("order_number", LongType(), True)]
)

@dlt.table(
  comment="Raw data on sales",
  schema=sales_schema)
def sales():
  return ("...")

I följande exempel anges schemat för en tabell med hjälp av en DDL-sträng, definierar en genererad kolumn och definierar en partitionskolumn:

@dlt.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  partition_cols = ["order_day_of_week"])
def sales():
  return ("...")

Som standard härleder Delta Live Tables schemat från table definitionen om du inte anger något schema.

Konfigurera en strömmande tabell för att ignorera ändringar i en källströmningstabell

Kommentar

  • Flaggan skipChangeCommits fungerar bara med spark.readStream funktionen option() . Du kan inte använda den här flaggan i en dlt.read_stream() funktion.
  • Du kan inte använda skipChangeCommits flaggan när källuppspelningstabellen definieras som mål för en apply_changes() -funktion.

Som standard kräver strömmande tabeller tilläggskällor. När en strömmande tabell använder en annan strömmande tabell som källa, och källströmningstabellen kräver uppdateringar eller borttagningar, till exempel GDPR-bearbetningen skipChangeCommits "rätt att bli bortglömd", kan flaggan anges när du läser källströmningstabellen för att ignorera dessa ändringar. Mer information om den här flaggan finns i Ignorera uppdateringar och borttagningar.

@table
def b():
   return spark.readStream.option("skipChangeCommits", "true").table("LIVE.A")

Egenskaper för Python Delta Live Tables

Följande tabeller beskriver de alternativ och egenskaper som du kan ange när du definierar tabeller och vyer med Delta Live Tables:

@table eller @view
name

Typ: str

Ett valfritt namn för tabellen eller vyn. Om det inte har definierats används funktionsnamnet som tabell- eller vynamn.
comment

Typ: str

En valfri beskrivning för tabellen.
spark_conf

Typ: dict

En valfri lista över Spark-konfigurationer för körning av den här frågan.
table_properties

Typ: dict

En valfri lista över tabellegenskaper för tabellen.
path

Typ: str

En valfri lagringsplats för tabelldata. Om det inte anges kommer systemet som standard att vara platsen för pipelinelagringen.
partition_cols

Typ: a collection of str

En valfri samling, till exempel en list, av en eller flera kolumner som ska användas för partitionering av tabellen.
schema

Typ: str eller StructType

En valfri schemadefinition för tabellen. Scheman kan definieras som en SQL DDL-sträng eller med en Python
StructType.
temporary

Typ: bool

Skapa en tabell men publicera inte metadata för tabellen. Nyckelordet temporary instruerar Delta Live Tables att skapa en tabell som är tillgänglig för pipelinen men som inte ska nås utanför pipelinen. För att minska bearbetningstiden bevaras en tillfällig tabell under pipelinens livslängd som skapar den, och inte bara en enda uppdatering.

Standardvärdet är "False".
Tabell- eller vydefinition
def <function-name>()

En Python-funktion som definierar datauppsättningen. Om parametern name inte har angetts <function-name> används den som måldatauppsättningens namn.
query

En Spark SQL-instruktion som returnerar en Spark Dataset eller Koalas DataFrame.

Använd dlt.read() eller spark.table() för att utföra en fullständig läsning från en datauppsättning som definierats i samma pipeline. När du använder spark.table() funktionen för att läsa från en datauppsättning som definierats i samma pipeline, förbereder du nyckelordet LIVE till datamängdens namn i funktionsargumentet. Om du till exempel vill läsa från en datauppsättning med namnet customers:

spark.table("LIVE.customers")

Du kan också använda spark.table() funktionen för att läsa från en tabell som är registrerad i metaarkivet genom att utelämna nyckelordet LIVE och eventuellt kvalificera tabellnamnet med databasnamnet:

spark.table("sales.customers")

Använd dlt.read_stream() för att utföra en direktuppspelningsläsning från en datauppsättning som definierats i samma pipeline.

spark.sql Använd funktionen för att definiera en SQL-fråga för att skapa returdatauppsättningen.

Använd PySpark-syntax för att definiera Delta Live Tables-frågor med Python.
Förväntningar
@expect("description", "constraint")

Deklarera en datakvalitetsbegränsning som identifieras av
description. Om en rad bryter mot förväntningarna inkluderar du raden i måldatauppsättningen.
@expect_or_drop("description", "constraint")

Deklarera en datakvalitetsbegränsning som identifieras av
description. Om en rad bryter mot förväntningarna släpper du raden från måldatauppsättningen.
@expect_or_fail("description", "constraint")

Deklarera en datakvalitetsbegränsning som identifieras av
description. Om en rad bryter mot förväntningarna stoppar du omedelbart körningen.
@expect_all(expectations)

Deklarera en eller flera datakvalitetsbegränsningar.
expectations är en Python-ordlista, där nyckeln är förväntansbeskrivningen och värdet är förväntansbegränsningen. Om en rad bryter mot någon av förväntningarna ska du inkludera raden i måldatauppsättningen.
@expect_all_or_drop(expectations)

Deklarera en eller flera datakvalitetsbegränsningar.
expectations är en Python-ordlista, där nyckeln är förväntansbeskrivningen och värdet är förväntansbegränsningen. Om en rad bryter mot någon av förväntningarna släpper du raden från måldatauppsättningen.
@expect_all_or_fail(expectations)

Deklarera en eller flera datakvalitetsbegränsningar.
expectations är en Python-ordlista, där nyckeln är förväntansbeskrivningen och värdet är förväntansbegränsningen. Om en rad bryter mot någon av förväntningarna stoppar du omedelbart körningen.

Ändra datainsamling med Python i Delta Live Tables

apply_changes() Använd funktionen i Python-API:et för att använda DELTA Live Tables CDC-funktioner. Python-gränssnittet i Delta Live Tables innehåller även funktionen create_streaming_table(). Du kan använda den här funktionen för att skapa den måltabell som krävs av apply_changes() funktionen.

apply_changes(
  target = "<target-table>",
  source = "<data-source>",
  keys = ["key1", "key2", "keyN"],
  sequence_by = "<sequence-column>",
  ignore_null_updates = False,
  apply_as_deletes = None,
  apply_as_truncates = None,
  column_list = None,
  except_column_list = None,
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

Kommentar

Standardbeteendet för INSERT och UPDATE händelser är att uppdatera CDC-händelser från källan: uppdatera alla rader i måltabellen som matchar de angivna nycklarna eller infoga en ny rad när en matchande post inte finns i måltabellen. Hantering av DELETE händelser kan anges med villkoret APPLY AS DELETE WHEN .

Viktigt!

Du måste deklarera en måluppspelningstabell för att tillämpa ändringar i. Du kan också ange schemat för måltabellen. När du anger schemat för måltabellen apply_changes måste du även inkludera kolumnerna __START_AT och __END_AT med samma datatyp som fältet sequence_by .

Se Förenklad insamling av ändringsdata med API:et APPLY CHANGES i Delta Live Tables.

Argument
target

Typ: str

Namnet på tabellen som ska uppdateras. Du kan använda funktionen create_streaming_table() för att skapa måltabellen innan du apply_changes() kör funktionen.

Den här parametern krävs.
source

Typ: str

Datakällan som innehåller CDC-poster.

Den här parametern krävs.
keys

Typ: list

Kolumnen eller kombinationen av kolumner som unikt identifierar en rad i källdata. Detta används för att identifiera vilka CDC-händelser som gäller för specifika poster i måltabellen.

Du kan ange något av följande:

* En lista över strängar: ["userId", "orderId"]
* En lista över Spark SQL-funktioner col() : [col("userId"), col("orderId"]

Argument till col() funktioner kan inte innehålla kvalificerare. Du kan till exempel använda col(userId), men du kan inte använda col(source.userId).

Den här parametern krävs.
sequence_by

Typ: str eller col()

Kolumnnamnet som anger den logiska ordningen för CDC-händelser i källdata. Delta Live Tables använder den här sekvenseringen för att hantera ändringshändelser som kommer i fel ordning.

Du kan ange något av följande:

* En sträng: "sequenceNum"
* En Spark SQL-funktion col() : col("sequenceNum")

Argument till col() funktioner kan inte innehålla kvalificerare. Du kan till exempel använda col(userId), men du kan inte använda col(source.userId).

Den här parametern krävs.
ignore_null_updates

Typ: bool

Tillåt inmatning av uppdateringar som innehåller en delmängd av målkolumnerna. När en CDC-händelse matchar en befintlig rad och ignore_null_updates är True, behåller kolumner med en null sina befintliga värden i målet. Detta gäller även kapslade kolumner med värdet null. När ignore_null_updates är Falseskrivs befintliga värden över med null värden.

Den här parametern är valfri.

Standardvärdet är False.
apply_as_deletes

Typ: str eller expr()

Anger när en CDC-händelse ska behandlas som en DELETE i stället för en upsert. För att hantera oordnade data behålls den borttagna raden tillfälligt som en gravsten i den underliggande Delta-tabellen och en vy skapas i metaarkivet som filtrerar bort dessa gravstenar. Kvarhållningsintervallet kan konfigureras med
pipelines.cdc.tombstoneGCThresholdInSecondstabellegenskap.

Du kan ange något av följande:

* En sträng: "Operation = 'DELETE'"
* En Spark SQL-funktion expr() : expr("Operation = 'DELETE'")

Den här parametern är valfri.
apply_as_truncates

Typ: str eller expr()

Anger när en CDC-händelse ska behandlas som en fullständig tabell TRUNCATE. Eftersom den här satsen utlöser en fullständig trunkering av måltabellen bör den endast användas för specifika användningsfall som kräver den här funktionen.

Parametern apply_as_truncates stöds endast för SCD-typ 1. SCD-typ 2 stöder inte trunkering.

Du kan ange något av följande:

* En sträng: "Operation = 'TRUNCATE'"
* En Spark SQL-funktion expr() : expr("Operation = 'TRUNCATE'")

Den här parametern är valfri.
column_list

except_column_list

Typ: list

En delmängd av kolumner som ska inkluderas i måltabellen. Använd column_list för att ange den fullständiga listan över kolumner som ska inkluderas. Använd except_column_list för att ange vilka kolumner som ska undantas. Du kan deklarera antingen värdet som en lista med strängar eller som Spark SQL-funktioner col() :

* column_list = ["userId", "name", "city"].
* column_list = [col("userId"), col("name"), col("city")]
* except_column_list = ["operation", "sequenceNum"]
* except_column_list = [col("operation"), col("sequenceNum")

Argument till col() funktioner kan inte innehålla kvalificerare. Du kan till exempel använda col(userId), men du kan inte använda col(source.userId).

Den här parametern är valfri.

Standardvärdet är att inkludera alla kolumner i måltabellen när inget column_list eller except_column_list argument skickas till funktionen.
stored_as_scd_type

Typ: str eller int

Om poster ska lagras som SCD-typ 1 eller SCD typ 2.

Ange till 1 för SCD typ 1 eller 2 för SCD typ 2.

Den här satsen är valfri.

Standardvärdet är SCD typ 1.
track_history_column_list

track_history_except_column_list

Typ: list

En delmängd av utdatakolumner som ska spåras för historik i måltabellen. Använd track_history_column_list för att ange den fullständiga listan över kolumner som ska spåras. Använd
track_history_except_column_list för att ange vilka kolumner som ska undantas från spårning. Du kan deklarera antingen värdet som en lista med strängar eller som Spark SQL-funktioner col() : - track_history_column_list = ["userId", "name", "city"]. - track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum")

Argument till col() funktioner kan inte innehålla kvalificerare. Du kan till exempel använda col(userId), men du kan inte använda col(source.userId).

Den här parametern är valfri.

Standardvärdet är att inkludera alla kolumner i måltabellen när nej track_history_column_list eller
track_history_except_column_list skickas argumentet till funktionen.