Aracılığıyla paylaş


Delta Live Tables Python dil başvurusu

Bu makalede Delta Live Tables Python programlama arabiriminin ayrıntıları yer alır.

SQL API'si hakkında bilgi için Delta Live Tables SQL dil başvurusuna bakın.

Otomatik Yükleyici'yi yapılandırmaya özgü ayrıntılar için bkz . Otomatik Yükleyici nedir?.

Başlamadan önce

Delta Live Tables Python arabirimiyle işlem hatlarını uygularken dikkat edilmesi gereken önemli noktalar şunlardır:

  • İşlem hattı güncelleştirmesinin planlanması ve çalıştırılması sırasında Python table() ve view() işlevler birden çok kez çağrıldığından, yan etkileri olabilecek bu işlevlerden birine kod eklemeyin (örneğin, verileri değiştiren veya e-posta gönderen kod). Beklenmeyen davranışlardan kaçınmak için veri kümelerini tanımlayan Python işlevlerinizin yalnızca tabloyu veya görünümü tanımlamak için gereken kodu içermesi gerekir.
  • Özellikle veri kümelerini tanımlayan işlevlerde e-posta gönderme veya dış izleme hizmetiyle tümleştirme gibi işlemleri gerçekleştirmek için olay kancalarını kullanın. Bu işlemlerin veri kümelerinizi tanımlayan işlevlerde uygulanması beklenmeyen davranışlara neden olur.
  • Python table ve view işlevleri bir DataFrame döndürmelidir. DataFrame'lerde çalışan bazı işlevler DataFrame döndürmez ve kullanılmamalıdır. Bu işlemler , , count(), toPandas()save()ve saveAsTable()gibi collect()işlevleri içerir. DataFrame dönüştürmeleri, tam veri akışı grafiği çözümlendikten sonra yürütüldüğünden, bu tür işlemlerin kullanılması istenmeyen yan etkilere neden olabilir. Ancak, bu kod grafik başlatma aşamasında bir kez çalıştırıldığından table , bu işlevleri veya view işlev tanımlarının dışına ekleyebilirsiniz.

Python modülünü dlt içeri aktarma

Delta Live Tables Python işlevleri modülde dlt tanımlanır. Python API'si ile uygulanan işlem hatlarınızın şu modülü içeri aktarması gerekir:

import dlt

Delta Live Tabloları gerçekleştirilmiş görünüm veya akış tablosu oluşturma

Python'da Delta Live Tables, tanımlama sorgusuna göre bir veri kümesini gerçekleştirilmiş görünüm olarak mı yoksa akış tablosu olarak mı güncelleştireceğini belirler. Dekoratör @table hem gerçekleştirilmiş görünümleri hem de akış tablolarını tanımlamak için kullanılır.

Python'da gerçekleştirilmiş bir görünüm tanımlamak için, veri kaynağında statik okuma gerçekleştiren bir sorguya uygulayın @table . Akış tablosu tanımlamak için, veri kaynağında okuma akışı gerçekleştiren bir sorguya uygulayın @table . Her iki veri kümesi türü de aşağıdaki gibi aynı söz dizimi belirtimine sahiptir:

import dlt

@dlt.table(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  schema="schema-definition",
  temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Delta Live Tables görünümü oluşturma

Python'da bir görünüm tanımlamak için dekoratörü uygulayın @view . @table Dekoratörde olduğu gibi, statik veya akış veri kümeleri için Delta Live Tablolarındaki görünümleri kullanabilirsiniz. Python ile görünümleri tanımlamaya yönelik söz dizimi aşağıdadır:

import dlt

@dlt.view(
  name="<name>",
  comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Örnek: Tabloları ve görünümleri tanımlama

Python'da tablo veya görünüm tanımlamak için veya @dlt.table dekoratörlerini @dlt.view bir işleve uygulayın. Tablo veya name görünüm adını atamak için işlev adını veya parametresini kullanabilirsiniz. Aşağıdaki örnek iki farklı veri kümesini tanımlar: JSON dosyasını giriş kaynağı olarak alan adlı taxi_raw bir görünüm ve görünümü giriş olarak alan taxi_raw adlı filtered_data bir tablo:

import dlt

@dlt.view
def taxi_raw():
  return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")

# Use the function name as the table name
@dlt.table
def filtered_data():
  return dlt.read("taxi_raw").where(...)

# Use the name parameter as the table name
@dlt.table(
  name="filtered_data")
def create_filtered_data():
  return dlt.read("taxi_raw").where(...)

Örnek: Aynı işlem hattında tanımlanan bir veri kümesine erişme

Dış veri kaynaklarından okumanın yanı sıra Delta Live Tables read() işleviyle aynı işlem hattında tanımlanan veri kümelerine erişebilirsiniz. Aşağıdaki örnekte işlevini kullanarak veri customers_filtered kümesi oluşturma gösterilmektedir read() :

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredA():
  return dlt.read("customers_raw").where(...)

İşlevi spark.table() aynı işlem hattında tanımlanan bir veri kümesine erişmek için de kullanabilirsiniz. İşlevi spark.table() kullanarak işlem hattında tanımlanan bir veri kümesine erişirken işlev bağımsız değişkeninde LIVE anahtar sözcüğü veri kümesi adına ekleyin:

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredB():
  return spark.table("LIVE.customers_raw").where(...)

Örnek: Meta veri deposuna kayıtlı bir tablodan okuma

Hive meta veri deposunda kayıtlı bir tablodan verileri okumak için işlev bağımsız değişkeninde anahtar sözcüğü atlayıp LIVE isteğe bağlı olarak tablo adını veritabanı adıyla niteleyin:

@dlt.table
def customers():
  return spark.table("sales.customers").where(...)

Unity Kataloğu tablosundan okuma örneği için bkz . Unity Kataloğu işlem hattına veri alma.

Örnek: Kullanarak bir veri kümesine erişme spark.sql

Sorgu işlevindeki bir spark.sql ifadeyi kullanarak da veri kümesi döndürebilirsiniz. Bir iç veri kümesinden okumak için veri kümesi adına ekleyin LIVE. :

@dlt.table
def chicago_customers():
  return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Chicago'")

Akış işlemlerinin hedefi olarak kullanılacak bir tablo oluşturma

create_streaming_table() apply_changes() ve @append_flow çıkış kayıtları dahil olmak üzere akış işlemleriyle kayıt çıktısı için hedef tablo oluşturmak için işlevini kullanın.

Not

create_target_table() ve create_streaming_live_table() işlevleri kullanım dışıdır. Databricks, işlevi kullanmak için mevcut kodun güncelleştirilmesini create_streaming_table() önerir.

create_streaming_table(
  name = "<table-name>",
  comment = "<comment>"
  spark_conf={"<key>" : "<value", "<key" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  partition_cols=["<partition-column>", "<partition-column>"],
  path="<storage-location-path>",
  schema="schema-definition",
  expect_all = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"}
)
Bağımsız değişkenler
name

Tür: str

Tablo adı.

Bu parametre zorunludur.
comment

Tür: str

Tablo için isteğe bağlı bir açıklama.
spark_conf

Tür: dict

Bu sorgunun yürütülmesi için isteğe bağlı spark yapılandırmaları listesi.
table_properties

Tür: dict

Tablo için isteğe bağlı tablo özellikleri listesi.
partition_cols

Tür: array

Tabloyu bölümlendirmek için kullanılacak isteğe bağlı bir veya daha fazla sütun listesi.
path

Tür: str

Tablo verileri için isteğe bağlı bir depolama konumu. Ayarlanmazsa sistem varsayılan olarak işlem hattı depolama konumuna ayarlanır.
schema

Tür: str veya StructType

Tablo için isteğe bağlı bir şema tanımı. Şemalar SQL DDL dizesi olarak veya Python ile tanımlanabilir
StructType.
expect_all
expect_all_or_drop
expect_all_or_fail

Tür: dict

Tablo için isteğe bağlı veri kalitesi kısıtlamaları. Birden çok beklentiyi görün.

Tabloların nasıl gerçekleştirilmesini denetleme

Tablolar ayrıca bunların gerçekleştirilmesi için ek denetim sunar:

Not

Boyutu 1 TB'tan küçük tablolar için Databricks, Delta Live Tables'ın veri düzenlemesini denetlemesine izin vermenizi önerir. Tablonuzun bir terabayttan fazla büyümesini beklemiyorsanız, genellikle bölüm sütunlarını belirtmemelisiniz.

Örnek: Şema ve bölüm sütunları belirtme

İsteğe bağlı olarak Python StructType veya SQL DDL dizesi kullanarak bir tablo şeması belirtebilirsiniz. Bir DDL dizesiyle belirtildiğinde, tanım oluşturulan sütunları içerebilir.

Aşağıdaki örnek, Python StructTypekullanılarak belirtilen şema ile adlı sales bir tablo oluşturur:

sales_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True),
  StructField("number_of_line_items", StringType(), True),
  StructField("order_datetime", StringType(), True),
  StructField("order_number", LongType(), True)]
)

@dlt.table(
  comment="Raw data on sales",
  schema=sales_schema)
def sales():
  return ("...")

Aşağıdaki örnek, DDL dizesi kullanan bir tablonun şemasını belirtir, oluşturulan bir sütunu tanımlar ve bir bölüm sütunu tanımlar:

@dlt.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  partition_cols = ["order_day_of_week"])
def sales():
  return ("...")

Varsayılan olarak, bir şema belirtmezseniz Delta Live Tables şemayı table tanımdan çıkartır.

Kaynak akış tablosundaki değişiklikleri yoksaymak için akış tablosu yapılandırma

Not

  • skipChangeCommits bayrağı yalnızca işlevini kullanarak option() çalışırspark.readStream. Bu bayrağı bir dlt.read_stream() işlevde kullanamazsınız.
  • Kaynak akış tablosu bir apply_changes() işlevinin hedefi olarak tanımlandığında bayrağını kullanamazsınızskipChangeCommits.

Varsayılan olarak, akış tabloları yalnızca ekleme kaynakları gerektirir. Akış tablosu kaynak olarak başka bir akış tablosu kullanıyorsa ve kaynak akış tablosu güncelleştirmeleri veya silmeleri gerektiriyorsa ( örneğin GDPR "unutulma hakkı" işlemesi), skipChangeCommits bu değişiklikleri yoksaymak için kaynak akış tablosu okunurken bayrak ayarlanabilir. Bu bayrak hakkında daha fazla bilgi için bkz . Güncelleştirmeleri ve silmeleri yoksayma.

@table
def b():
   return spark.readStream.option("skipChangeCommits", "true").table("LIVE.A")

Python Delta Live Tables özellikleri

Aşağıdaki tablolarda, Delta Live Tables ile tablo ve görünüm tanımlarken belirtebileceğiniz seçenekler ve özellikler açıklanmaktadır:

@table veya @view
name

Tür: str

Tablo veya görünüm için isteğe bağlı bir ad. Tanımlanmamışsa, işlev adı tablo veya görünüm adı olarak kullanılır.
comment

Tür: str

Tablo için isteğe bağlı bir açıklama.
spark_conf

Tür: dict

Bu sorgunun yürütülmesi için isteğe bağlı spark yapılandırmaları listesi.
table_properties

Tür: dict

Tablo için isteğe bağlı tablo özellikleri listesi.
path

Tür: str

Tablo verileri için isteğe bağlı bir depolama konumu. Ayarlanmazsa sistem varsayılan olarak işlem hattı depolama konumuna ayarlanır.
partition_cols

Tür: a collection of str

Tabloyu bölümlendirmek için kullanılacak bir veya daha fazla sütundan oluşan isteğe bağlı bir listkoleksiyon.
schema

Tür: str veya StructType

Tablo için isteğe bağlı bir şema tanımı. Şemalar SQL DDL dizesi olarak veya Python ile tanımlanabilir
StructType.
temporary

Tür: bool

Bir tablo oluşturun, ancak tablo için meta verileri yayımlamayın. temporary anahtar sözcüğü Delta Live Tables'a işlem hattı tarafından kullanılabilen ancak işlem hattı dışında erişilmemesi gereken bir tablo oluşturmasını sağlar. İşlem süresini kısaltmak için geçici bir tablo, yalnızca bir güncelleştirme değil, bunu oluşturan işlem hattının ömrü boyunca kalır.

Varsayılan değer 'False'tur.
Tablo veya görünüm tanımı
def <function-name>()

Veri kümesini tanımlayan bir Python işlevi. name Parametresi ayarlanmadıysa hedef <function-name> veri kümesi adı olarak kullanılır.
query

Spark Veri Kümesi veya Koalas DataFrame döndüren bir Spark SQL deyimi.

Aynı işlem hattında tanımlanan bir veri kümesinden tam okuma gerçekleştirmek için veya spark.table() kullanındlt.read(). aynı işlem hattında tanımlanan bir veri kümesinden okumak için işlevini kullanırken spark.table() , LIVE anahtar sözcüğünü işlev bağımsız değişkenindeki veri kümesi adına ekleyin. Örneğin, adlı customersbir veri kümesinden okumak için:

spark.table("LIVE.customers")

işlevini, anahtar sözcüğünü atlayarak LIVE ve isteğe bağlı olarak tablo adını veritabanı adıyla niteleyerek meta veri deposunda kayıtlı bir tablodan okumak için de kullanabilirsinizspark.table():

spark.table("sales.customers")

Aynı işlem hattında tanımlanan bir veri kümesinden okuma akışı gerçekleştirmek için kullanın dlt.read_stream() .

spark.sql dönüş veri kümesini oluşturmak üzere bir SQL sorgusu tanımlamak için işlevini kullanın.

Python ile Delta Live Tables sorgularını tanımlamak için PySpark söz dizimlerini kullanın.
Beklenti
@expect("description", "constraint")

Tarafından tanımlanan bir veri kalitesi kısıtlaması bildirme
description. Bir satır beklentiyi ihlal ederse, satırı hedef veri kümesine ekleyin.
@expect_or_drop("description", "constraint")

Tarafından tanımlanan bir veri kalitesi kısıtlaması bildirme
description. Bir satır beklentiyi ihlal ederse, satırı hedef veri kümesinden bırakın.
@expect_or_fail("description", "constraint")

Tarafından tanımlanan bir veri kalitesi kısıtlaması bildirme
description. Bir satır beklentiyi ihlal ederse yürütmeyi hemen durdurun.
@expect_all(expectations)

Bir veya daha fazla veri kalitesi kısıtlaması bildirin.
expectations , anahtarın beklenti açıklaması, değeri ise beklenti kısıtlaması olduğu bir Python sözlüğüdür. Bir satır beklentileri ihlal ederse, satırı hedef veri kümesine ekleyin.
@expect_all_or_drop(expectations)

Bir veya daha fazla veri kalitesi kısıtlaması bildirin.
expectations , anahtarın beklenti açıklaması, değeri ise beklenti kısıtlaması olduğu bir Python sözlüğüdür. Bir satır beklentileri ihlal ederse, satırı hedef veri kümesinden bırakın.
@expect_all_or_fail(expectations)

Bir veya daha fazla veri kalitesi kısıtlaması bildirin.
expectations , anahtarın beklenti açıklaması, değeri ise beklenti kısıtlaması olduğu bir Python sözlüğüdür. Bir satır beklentileri ihlal ederse yürütmeyi hemen durdurun.

Delta Live Tablolarında Python ile veri yakalamayı değiştirme

apply_changes() Delta Live Tables CDC işlevini kullanmak için Python API'sindeki işlevini kullanın. Delta Live Tables Python arabirimi de create_streaming_table() işlevini sağlar. İşlevin gerektirdiği apply_changes() hedef tabloyu oluşturmak için bu işlevi kullanabilirsiniz.

apply_changes(
  target = "<target-table>",
  source = "<data-source>",
  keys = ["key1", "key2", "keyN"],
  sequence_by = "<sequence-column>",
  ignore_null_updates = False,
  apply_as_deletes = None,
  apply_as_truncates = None,
  column_list = None,
  except_column_list = None,
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

Not

ve UPDATE olayları için INSERT varsayılan davranış, kaynaktan CDC olayları eklemektir: hedef tablodaki belirtilen anahtarlarla eşleşen satırları güncelleştirin veya hedef tabloda eşleşen bir kayıt olmadığında yeni bir satır ekleyin. Olaylar için DELETE işleme koşuluyla APPLY AS DELETE WHEN belirtilebilir.

Önemli

Değişiklikleri uygulamak için bir hedef akış tablosu bildirmeniz gerekir. İsteğe bağlı olarak hedef tablonuzun şemasını belirtebilirsiniz. Hedef tablonun şemasını apply_changes belirtirken, alanla aynı veri türüne __START_AT sequence_by sahip ve __END_AT sütunlarını da eklemeniz gerekir.

Bkz . DEĞIŞIKLIKLERI UYGULAMA API'si: Delta Live Tablolarında değişiklik verilerini yakalamayı basitleştirme.

Bağımsız değişkenler
target

Tür: str

Güncelleştirilecek tablonun adı. İşlevi yürütmeden apply_changes() önce hedef tabloyu oluşturmak için create_streaming_table() işlevini kullanabilirsiniz.

Bu parametre zorunludur.
source

Tür: str

CDC kayıtlarını içeren veri kaynağı.

Bu parametre zorunludur.
keys

Tür: list

Kaynak verilerdeki bir satırı benzersiz olarak tanımlayan sütun veya sütun bileşimi. Bu, hedef tablodaki belirli kayıtlara hangi CDC olaylarının uygulanacağını belirlemek için kullanılır.

Şunları belirtebilirsiniz:

* Dizelerin listesi: ["userId", "orderId"]
* Spark SQL col() işlevlerinin listesi: [col("userId"), col("orderId"]

İşlevlere yönelik col() bağımsız değişkenler niteleyici içeremez. Örneğin, kullanabilirsiniz col(userId), ancak kullanamazsınız col(source.userId).

Bu parametre zorunludur.
sequence_by

Tür: str veya col()

Kaynak verilerdeki CDC olaylarının mantıksal sırasını belirten sütun adı. Delta Live Tables, sıra dışı gelen değişiklik olaylarını işlemek için bu sıralamayı kullanır.

Şunları belirtebilirsiniz:

* Bir dize: "sequenceNum"
* Spark SQL col() işlevi: col("sequenceNum")

İşlevlere yönelik col() bağımsız değişkenler niteleyici içeremez. Örneğin, kullanabilirsiniz col(userId), ancak kullanamazsınız col(source.userId).

Bu parametre zorunludur.
ignore_null_updates

Tür: bool

Hedef sütunların bir alt kümesini içeren güncelleştirmelerin alımına izin verin. BIR CDC olayı var olan bir satırla null eşleştiğinde ve ignore_null_updates olan Truesütunlar hedefteki mevcut değerlerini korur. Bu, değeri nullolan iç içe sütunlar için de geçerlidir. olduğunda ignore_null_updates False, mevcut değerlerin üzerine değer yazılır null .

Bu parametre isteğe bağlıdır.

Varsayılan değer: False.
apply_as_deletes

Tür: str veya expr()

Bir CDC olayının ne zaman upsert yerine bir DELETE olarak ele alınacağı belirtir. Sıra dışı verileri işlemek için, silinen satır geçici olarak temel delta tablosunda kaldırıldı olarak tutulur ve meta veri deposunda bu silinmiş öğe taşlarını filtreleyen bir görünüm oluşturulur. Bekletme aralığı,
pipelines.cdc.tombstoneGCThresholdInSecondstable özelliği.

Şunları belirtebilirsiniz:

* Bir dize: "Operation = 'DELETE'"
* Spark SQL expr() işlevi: expr("Operation = 'DELETE'")

Bu parametre isteğe bağlıdır.
apply_as_truncates

Tür: str veya expr()

Bir CDC olayının tam tablo TRUNCATEolarak ne zaman ele alınacağı belirtir. Bu yan tümce hedef tablonun tam kesilmesini tetiklediğinden, yalnızca bu işlevi gerektiren belirli kullanım örnekleri için kullanılmalıdır.

apply_as_truncates parametresi yalnızca SCD türü 1 için desteklenir. SCD tür 2 kesmeyi desteklemez.

Şunları belirtebilirsiniz:

* Bir dize: "Operation = 'TRUNCATE'"
* Spark SQL expr() işlevi: expr("Operation = 'TRUNCATE'")

Bu parametre isteğe bağlıdır.
column_list

except_column_list

Tür: list

Hedef tabloya eklenecek sütunların alt kümesi. Eklenecek sütunların tam listesini belirtmek için kullanın column_list . Dışlanması gereken sütunları belirtmek için kullanın except_column_list . Değeri dize listesi olarak veya Spark SQL col() işlevleri olarak bildirebilirsiniz:

* column_list = ["userId", "name", "city"].
* column_list = [col("userId"), col("name"), col("city")]
* except_column_list = ["operation", "sequenceNum"]
* except_column_list = [col("operation"), col("sequenceNum")

İşlevlere yönelik col() bağımsız değişkenler niteleyici içeremez. Örneğin, kullanabilirsiniz col(userId), ancak kullanamazsınız col(source.userId).

Bu parametre isteğe bağlıdır.

Varsayılan değer, işleve hiçbir column_list veya except_column_list bağımsız değişken geçirilmediğinde hedef tabloya tüm sütunları eklemektir.
stored_as_scd_type

Tür: str veya int

Kayıtların SCD türü 1 veya SCD tür 2 olarak depolanması.

1 SCD türü 1 veya 2 SCD türü 2 için olarak ayarlayın.

Bu yan tümce isteğe bağlıdır.

Varsayılan değer SCD tür 1'dir.
track_history_column_list

track_history_except_column_list

Tür: list

Hedef tablodaki geçmiş için izlenecek çıktı sütunlarının bir alt kümesi. İzlenecek sütunların tam listesini belirtmek için kullanın track_history_column_list . Kullanma
track_history_except_column_list izlemenin dışında tutulacak sütunları belirtmek için. Değeri dize listesi olarak veya Spark SQL col() işlevleri olarak bildirebilirsiniz: - track_history_column_list = ["userId", "name", "city"]. - track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum")

İşlevlere yönelik col() bağımsız değişkenler niteleyici içeremez. Örneğin, kullanabilirsiniz col(userId), ancak kullanamazsınız col(source.userId).

Bu parametre isteğe bağlıdır.

Varsayılan değer, hedef tablodaki tüm sütunların veya yok track_history_column_list olduğunda eklenmesidir
track_history_except_column_list bağımsız değişkeni işleve geçirilir.

Sınırlama

Delta Live Tables Python arabirimi aşağıdaki sınırlamalara sahiptir:

pivot() İşlev desteklenmez. pivot Spark'taki işlem, çıkışın şemasını hesaplamak için giriş verilerinin hevesle yüklenmesini gerektirir. Bu özellik Delta Live Tables'da desteklenmez.