Nuta
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Moduł pyspark.pipelines (tutaj aliasowany jako dp) implementuje większość podstawowych funkcji przy użyciu dekoratorów. Te dekoratory akceptują funkcję, która definiuje zapytanie przesyłane strumieniowo lub wsadowe i zwraca ramkę danych platformy Apache Spark. Poniższa składnia przedstawia prosty przykład na potrzeby definiowania zestawu danych potoku:
from pyspark import pipelines as dp
@dp.table()
def function_name(): # This is the function decorated
return (<query>) # This is the query logic that defines the dataset
Ta strona zawiera omówienie funkcji i zapytań, które definiują zestawy danych w potokach. Aby uzyskać pełną listę dostępnych dekoratorów, zobacz Referencję dewelopera potoku.
Funkcje używane do definiowania zestawów danych nie powinny zawierać dowolnej logiki języka Python niepowiązanej z zestawem danych, w tym wywołań interfejsów API innych firm. Potoki uruchamiają te funkcje wiele razy podczas planowania, walidacji i aktualizacji. Uwzględnienie dowolnej logiki może prowadzić do nieoczekiwanych wyników.
Odczytywanie danych w celu rozpoczęcia definicji zestawu danych
Funkcje używane do definiowania zestawów danych potoku zwykle zaczynają się od operacji spark.read lub spark.readStream. Te operacje odczytu zwracają statyczny lub przesyłany strumieniowo obiekt ramki danych używany do definiowania dodatkowych przekształceń przed zwróceniem ramki danych. Inne przykłady operacji platformy Spark, które zwracają ramkę danych, to spark.table, lub spark.range.
Funkcje nigdy nie powinny odwoływać się do ramek danych zdefiniowanych poza funkcją. Próba odwołania się do ramek danych zdefiniowanych w innym zakresie może spowodować nieoczekiwane zachowanie. Aby zapoznać się z przykładem wzorca programowania metadanych do tworzenia wielu tabel, zobacz Tworzenie tabel w for pętli.
W poniższych przykładach przedstawiono podstawową składnię odczytywania danych przy użyciu logiki wsadowej lub przesyłania strumieniowego:
from pyspark import pipelines as dp
# Batch read on a table
@dp.materialized_view()
def function_name():
return spark.read.table("catalog_name.schema_name.table_name")
# Batch read on a path
@dp.materialized_view()
def function_name():
return spark.read.format("parquet").load("/Volumes/catalog_name/schema_name/volume_name/data_path")
# Streaming read on a table
@dp.table()
def function_name():
return spark.readStream.table("catalog_name.schema_name.table_name")
# Streaming read on a path
@dp.table()
def function_name():
return (spark.read
.format("cloudFiles")
.option("cloudFile.format", "parquet")
.load("/Volumes/catalog_name/schema_name/volume_name/data_path")
)
Jeśli musisz odczytać dane z zewnętrznego interfejsu API REST, zaimplementuj to połączenie przy użyciu niestandardowego źródła danych języka Python. Zobacz Niestandardowe źródła danych PySpark.
Uwaga / Notatka
Istnieje możliwość utworzenia dowolnych ramek danych platformy Apache Spark z kolekcji danych języka Python, w tym ramek danych biblioteki pandas, dykt i list. Te wzorce mogą być przydatne podczas programowania i testowania, ale większość definicji zestawu danych potoku produkcyjnego powinna zaczynać się od ładowania danych z plików, systemu zewnętrznego lub istniejącej tabeli lub widoku.
Przekształcenia łańcuchowe
Potoki obsługują prawie wszystkie przekształcenia ramek danych platformy Apache Spark. W funkcji definicji zestawu danych można uwzględnić dowolną liczbę przekształceń, ale należy upewnić się, że metody, których używasz, zawsze zwracają obiekt DataFrame.
Jeśli masz pośrednie przekształcenie, które napędza kilka obciążeń podrzędnych, ale nie musisz go zmaterializować jako tabelę, użyj @dp.temporary_view(), aby dodać tymczasowy widok do potoku. Następnie możesz odwołać się do tego widoku za pomocą spark.read.table("temp_view_name") w wielu podrzędnych definicjach zestawów danych. Następująca składnia demonstruje ten wzorzec:
from pyspark import pipelines as dp
@dp.temporary_view()
def a():
return spark.read.table("source").filter(...)
@dp.materialized_view()
def b():
return spark.read.table("a").groupBy(...)
@dp.materialized_view()
def c():
return spark.read.table("a").groupBy(...)
Dzięki temu potok ma pełną świadomość przekształceń w widoku podczas planowania potoku i zapobiega potencjalnym problemom związanym z dowolnym kodem języka Python uruchomionym poza definicjami zestawu danych.
W ramach funkcji można łączyć ramki danych w celu tworzenia nowych ramek danych bez zapisywania wyników przyrostowych jako widoków, zmaterializowanych widoków lub tabel przesyłania strumieniowego, jak w poniższym przykładzie:
from pyspark import pipelines as dp
@dp.table()
def multiple_transformations():
df1 = spark.read.table("source").filter(...)
df2 = df1.groupBy(...)
return df2.filter(...)
Jeśli wszystkie ramki danych wykonują początkowe operacje odczytu przy użyciu logiki wsadowej, zwracany wynik jest statyczną ramką danych. Jeśli masz zapytania strumieniowe, zwracany wynik jest strumieniowym obiektem DataFrame.
Zwracanie ramki danych
Użyj @dp.table, aby utworzyć tabelę przesyłania strumieniowego na podstawie wyników odczytu przesyłania strumieniowego. Użyj @dp.materialized_view polecenia, aby utworzyć zmaterializowany widok na podstawie wyników odczytu wsadowego. Większość innych dekoratorów działa zarówno w przypadku ramek danych przesyłania strumieniowego, jak i statycznych, a kilka z nich wymaga ramki danych przesyłania strumieniowego.
Funkcja używana do definiowania zestawu danych musi zwracać ramkę danych platformy Spark. Nigdy nie używaj metod zapisywania lub pisania w plikach ani tabelach w ramach kodu zbioru danych potoku.
Przykłady operacji platformy Apache Spark, które nigdy nie powinny być używane w kodzie w potoku przetwarzania:
collect()count()toPandas()save()saveAsTable()start()toTable()
Uwaga / Notatka
Potoki obsługują również używanie biblioteki Pandas na platformie Spark do funkcji definiowania zestawu danych. Zobacz Interfejs API biblioteki Pandas na platformie Spark.
Użycie SQL w potoku Python
Narzędzie PySpark obsługuje operator spark.sql do pisania kodu DataFrame przy użyciu SQL. W przypadku użycia tego wzorca w kodzie źródłowym potoku, jest on kompilowany do zmaterializowanych widoków lub tabel strumieniowych.
Poniższy przykład kodu jest odpowiednikiem użycia spark.read.table("catalog_name.schema_name.table_name") dla logiki zapytań zestawu danych:
@dp.materialized_view
def my_table():
return spark.sql("SELECT * FROM catalog_name.schema_name.table_name")
dlt.read i dlt.read_stream (starsza wersja)
Starszy moduł dlt zawiera funkcje dlt.read() i dlt.read_stream(), które zostały wprowadzone aby wspierać funkcjonalność w trybie publikowania potoku przeznaczonym dla starszych wersji. Te metody są obsługiwane, ale usługa Databricks zaleca zawsze używanie funkcji spark.read.table() oraz spark.readStream.table() z następujących powodów:
- Funkcje
dltmają ograniczoną obsługę odczytywania zestawów danych zdefiniowanych poza bieżącym potokiem. - Funkcje
sparkobsługują określanie opcji, takich jakskipChangeCommits, do odczytu operacji. Określanie opcji nie jest obsługiwane przez funkcjedlt. - Moduł
dltzostał zastąpiony przez modułpyspark.pipelines. Databricks zaleca użyciefrom pyspark import pipelines as dpdo importowaniapyspark.pipelinespodczas pisania kodu potoków w języku Python.