Uwaga
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Deklaratywne potoki Lakeflow wprowadzają kilka nowych konstrukcji kodu w języku Python, dzięki którym można definiować zmaterializowane widoki i tabele strumieniowe w potokach. Wsparcie dla Pythona w opracowywaniu potoków opiera się na podstawach PySpark DataFrame i Structured Streaming APIs.
Dla użytkowników nieznających Python oraz DataFrames, Databricks zaleca użycie interfejsu SQL. Zobacz Twórz kod potoku przy użyciu języka SQL.
Aby zapoznać się z pełną dokumentacją składni języka Python dla deklaratywnych potoków Lakeflow, zobacz Dokumentacja składni języka Python dla potoków Deklaratywnych Lakeflow.
Podstawy języka Python w rozwoju potoków
Kod języka Python, który tworzy zestawy danych potoków deklaratywnych lakeflow, musi zwracać ramki danych.
Wszystkie interfejsy API języka Python dla deklaratywnych potoków Lakeflow są implementowane w module dlt
. Kod deklaratywnych potoków Lakeflow zaimplementowany w Pythonie musi jawnie zaimportować moduł dlt
na górze notesów i plików Pythona.
Odczyty i zapisy domyślnie wykorzystują katalog i schemat określony podczas konfiguracji potoku. Zobacz Ustaw katalog docelowy i schemat.
Kod języka Python specyficzny dla potoków deklaratywnych usługi Lakeflow różni się od innych typów kodu języka Python w jeden krytyczny sposób: kod potoku języka Python nie wywołuje bezpośrednio funkcji wykonujących pozyskiwanie i przekształcanie danych w celu utworzenia zestawów danych potoków deklaratywnych lakeflow. Zamiast tego potoki deklaratywne Lakeflow interpretują funkcje dekoracyjne z modułu dlt
we wszystkich plikach kodu źródłowego skonfigurowanych w potoku i tworzą graf przepływu danych.
Ważne
Aby uniknąć nieoczekiwanego zachowania podczas działania potoku, nie umieszczaj kodu, który może mieć efekty uboczne, w funkcjach definiujących zestawy danych. Aby dowiedzieć się więcej, zobacz odniesienie do Python.
Tworzenie zmaterializowanego widoku lub tabeli przesyłania strumieniowego przy użyciu języka Python
Dekorator @dlt.table
instruuje potoki deklaratywne Lakeflow, aby utworzyły zmaterializowany widok lub tabelę strumieniową na podstawie wyników zwracanych przez funkcję. Wyniki odczytu wsadowego tworzą zmaterializowany widok, natomiast wyniki odczytu strumieniowego tworzą tabelę strumieniową.
Domyślnie nazwy zmaterializowanego widoku i tabeli strumieniowej są wnioskowane z nazw funkcji. Poniższy przykład kodu przedstawia podstawową składnię tworzenia zmaterializowanego widoku i tabeli strumieniowej:
Uwaga
Obie funkcje odwołują się do tej samej tabeli w katalogu samples
i używają tej samej funkcji dekoratora. Te przykłady podkreślają, że jedyną różnicą w podstawowej składni zmaterializowanych widoków i tabel przesyłania strumieniowego jest użycie spark.read
versus spark.readStream
.
Nie wszystkie źródła danych obsługują odczyty strumieniowe. Niektóre źródła danych powinny zawsze być przetwarzane z zastosowaniem semantyki strumieniowej.
import dlt
@dlt.table()
def basic_mv():
return spark.read.table("samples.nyctaxi.trips")
@dlt.table()
def basic_st():
return spark.readStream.table("samples.nyctaxi.trips")
Opcjonalnie możesz określić nazwę tabeli przy użyciu argumentu name
w dekoratorze @dlt.table
. W poniższym przykładzie pokazano ten wzorzec dla zmaterializowanego widoku i tabeli strumieniowej.
import dlt
@dlt.table(name = "trips_mv")
def basic_mv():
return spark.read.table("samples.nyctaxi.trips")
@dlt.table(name = "trips_st")
def basic_st():
return spark.readStream.table("samples.nyctaxi.trips")
Ładowanie danych z magazynu obiektów
Deklaratywne potoki Lakeflow obsługują ładowanie danych ze wszystkich formatów wspieranych przez Azure Databricks. Zobacz Opcje formatu danych.
Uwaga
W tych przykładach używane są dane dostępne w /databricks-datasets
, które są automatycznie zamontowane w obszarze roboczym. Databricks zaleca używanie ścieżek woluminów lub URI chmury do odniesienia się do danych przechowywanych w chmurowej pamięci masowej. Zobacz Czym są wolumeny Unity Catalog?.
Databricks zaleca używanie funkcji Auto Loader i tabel strumieniowych przy konfigurowaniu zadań pozyskiwania przyrostowego z danych przechowywanych w chmurowym magazynie obiektów. Zobacz Co to jest moduł automatycznego ładowania?.
W poniższym przykładzie tworzona jest tabela strumieniowa z plików JSON przy użyciu Auto Loader.
import dlt
@dlt.table()
def ingestion_st():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders")
)
W poniższym przykładzie użyto semantyki wsadowej do odczytania katalogu JSON i utworzenia zmaterializowanego widoku:
import dlt
@dlt.table()
def batch_mv():
return spark.read.format("json").load("/databricks-datasets/retail-org/sales_orders")
Walidacja danych względem oczekiwań
Możesz użyć oczekiwań, aby ustawić i wymusić ograniczenia dotyczące jakości danych. Zobacz Zarządzanie jakością danych przy użyciu oczekiwań dotyczących przepływu danych.
Następujący kod używa @dlt.expect_or_drop
do zdefiniowania oczekiwania o nazwie valid_data
, które usuwa rekordy będące wartościami null podczas pobierania danych.
import dlt
@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders_valid():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders")
)
Wykonywanie zapytań dotyczących zmaterializowanych widoków i tabel strumieniowych zdefiniowanych w potoku
W poniższym przykładzie zdefiniowano cztery zestawy danych:
- Tabela strumieniowa nazwana
orders
, która ładuje dane JSON. - Zmaterializowany widok o nazwie
customers
, który ładuje dane CSV. - Zmaterializowany widok o nazwie
customer_orders
, łączący rekordy z zestawów danychorders
icustomers
, przekształca znacznik czasu zamówienia na datę oraz wybiera polacustomer_id
,order_number
,state
iorder_date
. - Zmaterializowany widok o nazwie
daily_orders_by_state
, który agreguje dzienną liczbę zamówień dla każdego stanu.
Uwaga
Podczas zapytań dotyczących widoków lub tabel w potoku danych, można określić katalog i schemat bezpośrednio lub użyć wartości domyślnych skonfigurowanych w potoku danych. W tym przykładzie tabele orders
, customers
i customer_orders
są zapisywane i odczytywane z domyślnego wykazu i schematu skonfigurowanego dla przepływu danych.
Tryb publikowania Legacy używa schematu LIVE
do wykonywania zapytań dotyczących innych zmaterializowanych widoków i tabel przesyłania strumieniowego zdefiniowanych w potoku. W nowych potokach składnia schematu LIVE
jest ignorowana w trybie dyskretnym. Zobacz live schema (starsza wersja).
import dlt
from pyspark.sql.functions import col
@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders")
)
@dlt.table()
def customers():
return spark.read.format("csv").option("header", True).load("/databricks-datasets/retail-org/customers")
@dlt.table()
def customer_orders():
return (spark.read.table("orders")
.join(spark.read.table("customers"), "customer_id")
.select("customer_id",
"order_number",
"state",
col("order_datetime").cast("int").cast("timestamp").cast("date").alias("order_date"),
)
)
@dlt.table()
def daily_orders_by_state():
return (spark.read.table("customer_orders")
.groupBy("state", "order_date")
.count().withColumnRenamed("count", "order_count")
)
Tworzenie tabel w pętli for
Za pomocą pętli for
języka Python można programowo tworzyć wiele tabel. Może to być przydatne, gdy masz wiele źródeł danych lub docelowych zestawów danych, które różnią się tylko o kilka parametrów, co zmniejsza ilość kodu do utrzymania i redukuje jego redundancję.
Pętla for
ocenia logikę w kolejności szeregowej, ale gdy planowanie zestawów danych jest zakończone, potok wykonuje logikę równolegle.
Ważne
W przypadku używania tego wzorca do definiowania zestawów danych upewnij się, że lista wartości przekazanych do pętli for
jest zawsze dodawalna. Jeśli zestaw danych zdefiniowany wcześniej w potoku zostanie pominięty w przyszłym uruchomieniu potoku, zostanie on usunięty automatycznie ze schematu docelowego.
Poniższy przykład tworzy pięć tabel filtrujących zamówienia klientów według regionów. W tym miejscu nazwa regionu służy do ustawiania nazwy zmaterializowanych widoków docelowych i filtrowania danych źródłowych. Widoki tymczasowe służą do definiowania sprzężeń z tabel źródłowych używanych w konstruowaniu końcowych zmaterializowanych widoków.
import dlt
from pyspark.sql.functions import collect_list, col
@dlt.view()
def customer_orders():
orders = spark.read.table("samples.tpch.orders")
customer = spark.read.table("samples.tpch.customer")
return (orders.join(customer, orders.o_custkey == customer.c_custkey)
.select(
col("c_custkey").alias("custkey"),
col("c_name").alias("name"),
col("c_nationkey").alias("nationkey"),
col("c_phone").alias("phone"),
col("o_orderkey").alias("orderkey"),
col("o_orderstatus").alias("orderstatus"),
col("o_totalprice").alias("totalprice"),
col("o_orderdate").alias("orderdate"))
)
@dlt.view()
def nation_region():
nation = spark.read.table("samples.tpch.nation")
region = spark.read.table("samples.tpch.region")
return (nation.join(region, nation.n_regionkey == region.r_regionkey)
.select(
col("n_name").alias("nation"),
col("r_name").alias("region"),
col("n_nationkey").alias("nationkey")
)
)
# Extract region names from region table
region_list = spark.read.table("samples.tpch.region").select(collect_list("r_name")).collect()[0][0]
# Iterate through region names to create new region-specific materialized views
for region in region_list:
@dlt.table(name=f"{region.lower().replace(' ', '_')}_customer_orders")
def regional_customer_orders(region_filter=region):
customer_orders = spark.read.table("customer_orders")
nation_region = spark.read.table("nation_region")
return (customer_orders.join(nation_region, customer_orders.nationkey == nation_region.nationkey)
.select(
col("custkey"),
col("name"),
col("phone"),
col("nation"),
col("region"),
col("orderkey"),
col("orderstatus"),
col("totalprice"),
col("orderdate")
).filter(f"region = '{region_filter}'")
)
Poniżej znajduje się przykład grafu przepływu danych dla tej rury:
Rozwiązywanie problemów: pętla for
tworzy wiele tabel z tymi samymi wartościami
Model leniwego wykonywania, którego potoki używają do oceny kodu Python, wymaga, aby Twoja logika bezpośrednio odnosiła się do poszczególnych wartości, gdy wywoływana jest funkcja udekorowana przez @dlt.table()
.
W poniższym przykładzie pokazano dwa poprawne podejścia do definiowania tabel z pętlą for
. W obu przykładach każda nazwa tabeli z listy tables
jest jawnie przywołyowana w funkcji ozdobionej przez @dlt.table()
.
import dlt
# Create a parent function to set local variables
def create_table(table_name):
@dlt.table(name=table_name)
def t():
return spark.read.table(table_name)
tables = ["t1", "t2", "t3"]
for t_name in tables:
create_table(t_name)
# Call `@dlt.table()` within a for loop and pass values as variables
tables = ["t1", "t2", "t3"]
for t_name in tables:
@dlt.table(name=t_name)
def create_table(table_name=t_name):
return spark.read.table(table_name)
Następujący przykład nie odnosi się poprawnie do wartości. W tym przykładzie są tworzone tabele o różnych nazwach, ale wszystkie tabele ładują dane z ostatniej wartości w pętli for
:
import dlt
# Don't do this!
tables = ["t1", "t2", "t3"]
for t_name in tables:
@dlt.table(name=t_name)
def create_table():
return spark.read.table(t_name)
Trwale usuń rekordy z zmaterializowanego widoku lub tabeli strumieniowej
Aby trwale usunąć rekordy z zmaterializowanego widoku lub tabeli przesyłania strumieniowego z włączonymi wektorami usuwania, niezbędnymi na przykład dla zgodności z RODO, należy wykonać dodatkowe operacje na podstawowych tabelach Delta obiektu. Aby zapewnić usunięcie rekordów z zmaterializowanego widoku, zobacz Trwałe usuwanie rekordów z zmaterializowanego widoku z włączonymi wektorami usuwania. Aby zapewnić usunięcie rekordów z tabeli przesyłania strumieniowego, zobacz Trwałe usuwanie rekordów z tabeli przesyłania strumieniowego.