Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
Modul pyspark.pipelines (zde označený jako dp) implementuje většinu své základní funkčnosti pomocí dekorátorů. Tyto dekorátory přijímají funkci, která definuje streamovací nebo dávkové dotazování a vrací datový rámec Apache Spark. Následující syntaxe ukazuje jednoduchý příklad definice datové sady kanálu:
from pyspark import pipelines as dp
@dp.table()
def function_name(): # This is the function decorated
return (<query>) # This is the query logic that defines the dataset
Tato stránka obsahuje přehled funkcí a dotazů, které definují datové sady v kanálech. Úplný seznam dostupných dekorátorů najdete v referenci pro vývojáře pipeline.
Funkce, které používáte k definování datových sad, by neměly obsahovat libovolnou logiku Pythonu nesouvisející s datovou sadou, včetně volání rozhraní API třetích stran. Kanály tyto funkce spouští několikrát během plánování, ověřování a aktualizací. Zahrnutí libovolné logiky může vést k neočekávaným výsledkům.
Čtení dat pro zahájení definice datové sady
Funkce používané k definování datových sad kanálu obvykle začínají operací spark.read nebo spark.readStream. Tyto operace čtení vrací statický nebo streamující objekt DataFrame, který použijete k definování dalších transformací před vrácením datového rámce. Další příklady operací Sparku, které vracejí datový rámec, zahrnují spark.table, nebo spark.range.
Funkce by nikdy neměly odkazovat na datové rámce definované mimo funkci. Pokus o odkazování na datové rámce definované v jiném oboru může vést k neočekávanému chování. Příklad vzoru metaprogramování pro vytváření více tabulek naleznete v tématu Vytvoření tabulek ve smyčcefor.
Následující příklady ukazují základní syntaxi čtení dat pomocí logiky dávky nebo streamování:
from pyspark import pipelines as dp
# Batch read on a table
@dp.materialized_view()
def function_name():
return spark.read.table("catalog_name.schema_name.table_name")
# Batch read on a path
@dp.materialized_view()
def function_name():
return spark.read.format("parquet").load("/Volumes/catalog_name/schema_name/volume_name/data_path")
# Streaming read on a table
@dp.table()
def function_name():
return spark.readStream.table("catalog_name.schema_name.table_name")
# Streaming read on a path
@dp.table()
def function_name():
return (spark.read
.format("cloudFiles")
.option("cloudFile.format", "parquet")
.load("/Volumes/catalog_name/schema_name/volume_name/data_path")
)
Pokud potřebujete číst data z externího rozhraní REST API, implementujte toto připojení pomocí vlastního zdroje dat Pythonu. Viz vlastní zdroje dat PySpark.
Poznámka:
Z kolekcí dat Pythonu je možné vytvořit libovolné datové rámce Apache Sparku, včetně datových rámců pandas, diktování a seznamů. Tyto vzory můžou být užitečné při vývoji a testování, ale většina definic datových sad v produkčním kanálu by měla začínat načítáním dat ze souborů, externího systému nebo existující tabulky nebo zobrazení.
Řetězení transformací
Kanály podporují téměř všechny transformace datového rámce Apache Spark. Do funkce definice datové sady můžete zahrnout libovolný počet transformací, ale měli byste zajistit, aby používané metody vždy vracely objekt DataFrame.
Pokud máte zprostředkující transformaci, která řídí několik podřízených úloh, ale nemusíte ji materializovat jako tabulku, použijte @dp.temporary_view() k přidání dočasného pohledu do vaší pipeline. Pak můžete na toto zobrazení odkazovat pomocí spark.read.table("temp_view_name") ve více různých definicích podřízených datových sad. Následující syntaxe ukazuje tento vzor:
from pyspark import pipelines as dp
@dp.temporary_view()
def a():
return spark.read.table("source").filter(...)
@dp.materialized_view()
def b():
return spark.read.table("a").groupBy(...)
@dp.materialized_view()
def c():
return spark.read.table("a").groupBy(...)
Tím zajistíte, že pipeline bude mít během plánování pipeline úplné povědomí o transformacích ve vašem zobrazení a zabrání potenciálním problémům souvisejícím s libovolným kódem Pythonu běžícím mimo definice datových sad.
Ve své funkci můžete zřetězit DataFramy a vytvořit nové DataFramy, aniž byste museli zapisovat přírůstkové výsledky jako zobrazení, materializovaná zobrazení nebo streamované tabulky, jako v následujícím příkladu:
from pyspark import pipelines as dp
@dp.table()
def multiple_transformations():
df1 = spark.read.table("source").filter(...)
df2 = df1.groupBy(...)
return df2.filter(...)
Pokud všechny datové rámce provádějí počáteční čtení pomocí dávkové logiky, je výsledkem vrácení statický datový rámec. Pokud máte dotazy, které se streamují, výsledkem je streamovaný datový rámec.
Vrátit datový rámec
Použijte @dp.table, abyste vytvořili tabulku pro streamování z výsledků čtení streamu. Použijte @dp.materialized_view k vytvoření materializovaného zobrazení z výsledků dávkového čtení. Většina ostatních dekorátorů pracuje na streamovaných i statických datových rámcích, zatímco několik vyžaduje streamované datové rámce.
Funkce použitá k definování datové sady musí vrátit datový rámec Sparku. Nikdy nepoužívejte metody, které ukládají nebo zapisují do souborů nebo tabulek jako součást kódu datové sady pipeline.
Příklady operací Apache Sparku, které by se nikdy neměly používat v kódu zpracovatelského řetězce:
collect()count()toPandas()save()saveAsTable()start()toTable()
Poznámka:
Kanály také podporují použití Pandas ve Sparku pro funkce definice datové sady. Vizte Pandas API on Spark.
Použití SQL v Pythonovém zpracovatelském řetězci
PySpark podporuje spark.sql operátor pro zápis kódu datového rámce pomocí SQL. Pokud tento vzor použijete ve zdrojovém kódu pipeline, zkompiluje se do materializovaných zobrazení nebo streamovacích tabulek.
Následující příklad kódu je ekvivalentní použití spark.read.table("catalog_name.schema_name.table_name") pro logiku dotazu datové sady:
@dp.materialized_view
def my_table():
return spark.sql("SELECT * FROM catalog_name.schema_name.table_name")
dlt.read a dlt.read_stream (starší verze)
Starší dlt modul obsahuje dlt.read() a dlt.read_stream() funkce, které byly zavedeny pro podporu funkcí v režimu publikování v mode pipeline. Tyto metody jsou podporované, ale Databricks doporučuje vždy používat funkce spark.read.table() a spark.readStream.table() z následujících důvodů:
- Funkce
dltmají omezenou podporu čtení datových sad definovaných mimo aktuální kanál. - Funkce
sparkpodporují zadávání možností k operacím čtení, napříkladskipChangeCommits. Funkcedltnepodporují specifikaci možností. - Modul
dltbyl sám nahrazen modulempyspark.pipelines. Databricks doporučuje použítfrom pyspark import pipelines as dpk importupyspark.pipelinespro použití při psaní kódu kanálů v Pythonu.