Uwaga
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
W tym artykule opisano sposób używania potoków deklaratywnych lakeflow do deklarowania przekształceń w zestawach danych i określania sposobu przetwarzania rekordów za pomocą logiki zapytań. Zawiera również przykłady typowych wzorców transformacji do tworzenia deklaratywnych potoków Lakeflow.
Zestaw danych można zdefiniować względem dowolnego zapytania, które zwraca ramkę danych. Możesz użyć wbudowanych operacji platformy Apache Spark, funkcji zdefiniowanych przez użytkownika, niestandardowej logiki i modeli MLflow jako przekształceń w potokach deklaratywnych Lakeflow. Po zaimportowaniu danych do potoku można zdefiniować nowe zestawy danych względem źródeł położonych wyżej w hierarchii w celu utworzenia nowych tabel przesyłania strumieniowego, zmaterializowanych widoków i zwykłych widoków.
Aby dowiedzieć się, jak skutecznie wykonywać przetwarzanie stanowe za pomocą Deklaratywnych Potoków Lakeflow, zobacz Optymalizowanie przetwarzania stanowego w Deklaratywnych Potokach Lakeflow przy użyciu znaków wodnych.
Kiedy używać widoków, zmaterializowanych widoków i tabel strumieniowych
Podczas implementowania zapytań w ramach potoków wybierz najlepszy typ zestawu danych, aby upewnić się, że są wydajne i możliwe do utrzymania.
Rozważ użycie widoku, aby wykonać następujące czynności:
- Podziel duże lub złożone zapytania na łatwiejsze do zarządzania części.
- Zweryfikuj wyniki pośrednie przy użyciu oczekiwań.
- Zmniejsz koszty magazynowania i zasobów obliczeniowych, aby uzyskać wyniki, których nie trzeba utrwalać. Ponieważ tabele są zmaterializowane, wymagają dodatkowych zasobów obliczeniowych i magazynu.
Rozważ użycie zmaterializowanego widoku, gdy:
- Wiele zapytań podrzędnych korzysta z tabeli. Ponieważ widoki są obliczane na żądanie, widok jest obliczany ponownie za każdym razem, gdy jest wykonywane zapytanie dotyczące widoku.
- Inne potoki, zadania lub zapytania wykorzystują tabelę. Ponieważ widoki nie są zmaterializowane, można ich używać tylko w tym samym przepływie.
- Chcesz wyświetlić wyniki zapytania podczas opracowywania. Ponieważ tabele są zmaterializowane i mogą być wyświetlane i odpytywane poza potokiem, użycie tabel podczas programowania może pomóc zweryfikować poprawność obliczeń. Po zweryfikowaniu przekonwertuj zapytania, które nie wymagają materializacji w widoki.
Rozważ użycie tabeli przesyłania strumieniowego, gdy:
- Zapytanie jest definiowane względem źródła danych, które stale lub przyrostowo rośnie.
- Wyniki zapytania powinny być obliczane przyrostowo.
- Rurociąg wymaga wysokiej przepływności i niskiej opóźnieni.
Notatka
Tabele przesyłania strumieniowego są zawsze definiowane względem źródeł transmitowanych strumieniowo. Można również używać źródeł przesyłania strumieniowego z AUTO CDC ... INTO
do stosowania aktualizacji z kanałów CDC. Zobacz Interfejsy API AUTO CDC: Uproszczenie przechwytywania danych zmian za pomocą deklaratywnych potoków Lakeflow.
Wykluczanie tabel ze schematu docelowego
Jeśli musisz obliczyć tabele pośrednie, które nie są przeznaczone do użycia zewnętrznego, możesz uniemożliwić ich publikowanie w schemacie przy użyciu słowa kluczowego TEMPORARY
. Tabele tymczasowe nadal przechowują i przetwarzają dane zgodnie z semantyką Lakeflow Declarative Pipelines, ale nie powinny być dostępne poza bieżącym potokiem. Tabela tymczasowa jest utrwalana przez okres istnienia przepływu, który ją tworzy. Użyj następującej składni, aby zadeklarować tabele tymczasowe:
SQL
CREATE TEMPORARY STREAMING TABLE temp_table
AS SELECT ... ;
Pyton
@dlt.table(
temporary=True)
def temp_table():
return ("...")
Połącz tabele strumieniowe i widoki zmaterializowane w jednym potoku
Tabele przesyłania strumieniowego dziedziczą gwarancje przetwarzania oferowane przez Apache Spark Structured Streaming i są skonfigurowane do przetwarzania zapytań z danych źródłowych, do których zawsze dodawane są nowe wiersze zamiast modyfikowanych.
Notatka
Mimo że domyślnie tabele przesyłania strumieniowego wymagają źródeł danych tylko do dołączania, gdy źródło przesyłania strumieniowego jest inną tabelą przesyłania strumieniowego, która wymaga aktualizacji lub usuwania, można zastąpić to zachowanie flagą skipChangeCommits
Typowy wzorzec przesyłania strumieniowego obejmuje pozyskiwanie danych źródłowych w celu utworzenia początkowych zestawów danych w potoku. Te początkowe zestawy danych są często nazywane tabelami z brązu i często wykonują proste przekształcenia.
Natomiast ostateczne tabele w potoku, nazywane często złotymi tabelami, często wymagają złożonych agregacji lub odczytywania z celów operacji typu AUTO CDC ... INTO
. Ponieważ te operacje z natury tworzą aktualizacje, a nie dołączają, nie są one obsługiwane jako dane wejściowe do tabel przesyłania strumieniowego. Te przekształcenia są bardziej odpowiednie dla zmaterializowanych widoków.
Łącząc tabele strumieniowe i widoki zmaterializowane w jedną linię przetwarzania, można uprościć przetwarzanie, uniknąć kosztownego ponownego pozyskiwania lub ponownego przetwarzania surowych danych oraz mieć pełną moc języka SQL do obliczania złożonych agregacji w wydajnie zakodowanym i filtrowanym zbiorze danych. W poniższym przykładzie przedstawiono ten typ przetwarzania mieszanego:
Notatka
W tych przykładach użyto modułu automatycznego ładującego do ładowania plików z magazynu w chmurze. Aby załadować pliki za pomocą automatycznego modułu ładującego w potoku z włączonym Unity Catalog, należy użyć lokalizacji zewnętrznych. Aby dowiedzieć się więcej na temat korzystania z wykazu aparatu Unity z potokami deklaratywnymi usługi Lakeflow, zobacz Use Unity Catalog with your Lakeflow Deklaratative Pipelines (Używanie wykazu aparatu Unity z potokami deklaratywnymi usługi Lakeflow).
Pyton
@dlt.table
def streaming_bronze():
return (
# Since this is a streaming source, this table is incremental.
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("abfss://path/to/raw/data")
)
@dlt.table
def streaming_silver():
# Since we read the bronze table as a stream, this silver table is also
# updated incrementally.
return spark.readStream.table("streaming_bronze").where(...)
@dlt.table
def live_gold():
# This table will be recomputed completely by reading the whole silver table
# when it is updated.
return spark.readStream.table("streaming_silver").groupBy("user_id").count()
SQL
CREATE OR REFRESH STREAMING TABLE streaming_bronze
AS SELECT * FROM STREAM read_files(
"abfss://path/to/raw/data",
format => "json"
)
CREATE OR REFRESH STREAMING TABLE streaming_silver
AS SELECT * FROM STREAM(streaming_bronze) WHERE...
CREATE OR REFRESH MATERIALIZED VIEW live_gold
AS SELECT count(*) FROM streaming_silver GROUP BY user_id
Dowiedz się więcej na temat używania Auto Loader do przyrostowego pozyskiwania plików JSON z Azure Storage.
statyczne łączenia strumieniowe
Statyczne sprzężenia strumienia są dobrym wyborem w przypadku denormalizacji ciągłego strumienia danych tylko do dodawania z głównie statyczną tabelą wymiarów.
Po każdej aktualizacji potoku nowe rekordy ze strumienia są łączone z najnowszym aktualnym stanem tabeli statycznej. Jeśli rekordy są dodawane lub aktualizowane w tabeli statycznej po przetworzeniu odpowiednich danych z tabeli przesyłania strumieniowego, wynikowe rekordy nie zostaną ponownie obliczone, chyba że zostanie wykonane pełne odświeżenie.
W potokach skonfigurowanych do wyzwalanego wykonywania tabela statyczna zwraca wyniki z chwilą rozpoczęcia aktualizacji. W potokach skonfigurowanych do ciągłego wykonywania najbardziej aktualna wersja tabeli statycznej jest odpytywana za każdym razem, gdy tabela przetwarza aktualizację.
Poniżej przedstawiono przykład łączenia strumienia z danymi statycznymi:
Pyton
@dlt.table
def customer_sales():
return spark.readStream.table("sales").join(spark.readStream.table("customers"), ["customer_id"], "left")
SQL
CREATE OR REFRESH STREAMING TABLE customer_sales
AS SELECT * FROM STREAM(sales)
INNER JOIN LEFT customers USING (customer_id)
Wydajne obliczanie agregacji
Tabele przesyłania strumieniowego umożliwiają przyrostowe obliczanie prostych agregacji dystrybucyjnych, takich jak liczba, minimalna, maksymalna lub suma oraz agregacje algebraiczne, takie jak średnia lub odchylenie standardowe. Usługa Databricks zaleca agregację przyrostową dla zapytań z ograniczoną liczbą grup, takich jak zapytanie z klauzulą GROUP BY country
. Tylko nowe dane wejściowe są odczytywane przy każdej aktualizacji.
Aby dowiedzieć się więcej na temat pisania zapytań Lakeflow Declarative Pipelines wykonujących przyrostowe agregacje, zobacz Wykonywanie agregacji okienkowych przy użyciu znaczników czasu.
Używanie modeli MLflow w deklaratywnych potokach Lakeflow
Notatka
Aby używać modeli MLflow w potoku z włączonym katalogiem Unity, potok musi być skonfigurowany do korzystania z kanału preview
. Aby użyć kanału current
, należy skonfigurować swój potok danych do publikowania w magazynie metadanych Hive.
W potokach deklaratywnych usługi Lakeflow można używać modeli wytrenowanych przez platformę MLflow. Modele MLflow są traktowane jako przekształcenia w usłudze Azure Databricks, co oznacza, że działają na wejściowej ramce danych Apache Spark i zwracają wyniki jako ramkę danych Apache Spark. Ponieważ Potoki Deklaratywne Lakeflow definiują zestawy danych względem ramek danych, można przekonwertować zadania Apache Spark, które używają biblioteki MLflow, na Potoki Deklaratywne Lakeflow, używając zaledwie kilku wierszy kodu. Aby uzyskać więcej informacji na temat platformy MLflow, zobacz MLflow for ML model lifecycle (Cykl życia modelu uczenia maszynowego MLflow).
Jeśli masz już notes języka Python wywołujący model MLflow, możesz dostosować ten kod do Lakeflow Declarative Pipelines przy użyciu dekoratora @dlt.table
i zapewnieniu, że funkcje są zdefiniowane w celu zwrócenia wyników transformacji. Lakeflow Declarative Pipelines nie instalują MLflow domyślnie, dlatego upewnij się, że biblioteki MLflow zostały zainstalowane z %pip install mlflow
i że zaimportowano mlflow
oraz dlt
na górze notesu. Aby zapoznać się z wprowadzeniem do składni potoków deklaratywnych usługi Lakeflow, zobacz Develop pipeline code with Python (Opracowywanie kodu potoku przy użyciu języka Python).
Aby użyć modeli MLflow w potokach deklaratywnych usługi Lakeflow, wykonaj następujące kroki:
- Uzyskaj identyfikator przebiegu i nazwę modelu modelu MLflow. Identyfikator przebiegu i nazwa modelu służą do tworzenia identyfikatora URI modelu MLflow.
- Użyj URI, aby zdefiniować funkcję UDF Spark do załadowania modelu MLflow.
- Wywołaj funkcję UDF w definicjach tabeli, aby użyć modelu MLflow.
W poniższym przykładzie przedstawiono podstawową składnię dla tego wzorca:
%pip install mlflow
import dlt
import mlflow
run_id= "<mlflow-run-id>"
model_name = "<the-model-name-in-run>"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)
@dlt.table
def model_predictions():
return spark.read.table(<input-data>)
.withColumn("prediction", loaded_model_udf(<model-features>))
Jako kompletny przykład poniższy kod definiuje funkcję UDF platformy Spark o nazwie loaded_model_udf
, która ładuje model MLflow wyszkolony na podstawie danych o ryzyku kredytowym. Kolumny danych używane do przewidywania są przekazywane jako argument do funkcji UDF. Tabela loan_risk_predictions
oblicza przewidywania dla każdego wiersza w loan_risk_input_data
.
%pip install mlflow
import dlt
import mlflow
from pyspark.sql.functions import struct
run_id = "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)
categoricals = ["term", "home_ownership", "purpose",
"addr_state","verification_status","application_type"]
numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
"revol_util", "total_acc", "credit_length_in_years"]
features = categoricals + numerics
@dlt.table(
comment="GBT ML predictions of loan risk",
table_properties={
"quality": "gold"
}
)
def loan_risk_predictions():
return spark.read.table("loan_risk_input_data")
.withColumn('predictions', loaded_model_udf(struct(features)))
zachowywać ręczne usuwanie oraz aktualizacje
Potoki Deklaratywne Lakeflow umożliwiają ręczne usuwanie lub aktualizowanie rekordów z tabeli oraz wykonywanie operacji odświeżania w celu ponownego przeliczenia tabel wynikowych.
Domyślnie potoki deklaratywne Lakeflow ponownie obliczają wyniki tabeli na podstawie danych wejściowych przy każdej aktualizacji potoku, więc należy upewnić się, że usunięty rekord nie zostanie ponownie załadowany z danych źródłowych. Ustawienie właściwości tabeli pipelines.reset.allowed
na wartość false
uniemożliwia odświeżanie tabeli, ale nie zapobiega przyrostowym zapisom w tabelach lub nowym danych przepływanych do tabeli.
Na poniższym diagramie przedstawiono przykład użycia dwóch tabel przesyłania strumieniowego:
-
raw_user_table
pozyskuje nieprzetworzone dane użytkownika ze źródła. -
bmi_table
przyrostowo oblicza wyniki BMI przy użyciu wagi i wysokości zraw_user_table
.
Chcesz ręcznie usunąć lub zaktualizować rekordy użytkowników z raw_user_table
i ponownie skompilować bmi_table
.
Poniższy kod pokazuje, jak ustawić właściwość tabeli pipelines.reset.allowed
na false
, aby wyłączyć pełne odświeżanie dla raw_user_table
, dzięki czemu zamierzone zmiany są zachowywane w czasie, natomiast tabele podrzędne są ponownie obliczane przy uruchomieniu aktualizacji potoku.
CREATE OR REFRESH STREAMING TABLE raw_user_table
TBLPROPERTIES(pipelines.reset.allowed = false)
AS SELECT * FROM STREAM read_files("/databricks-datasets/iot-stream/data-user", format => "csv");
CREATE OR REFRESH STREAMING TABLE bmi_table
AS SELECT userid, (weight/2.2) / pow(height*0.0254,2) AS bmi FROM STREAM(raw_user_table);