Sdílet prostřednictvím


Funkce pro definování datových sad

Modul pyspark.pipelines (zde označený jako dp) implementuje většinu své základní funkčnosti pomocí dekorátorů. Tyto dekorátory přijímají funkci, která definuje streamovací nebo dávkové dotazování a vrací datový rámec Apache Spark. Následující syntaxe ukazuje jednoduchý příklad definice datové sady kanálu:

from pyspark import pipelines as dp

@dp.table()
def function_name(): # This is the function decorated
  return (<query>) # This is the query logic that defines the dataset

Tato stránka obsahuje přehled funkcí a dotazů, které definují datové sady v kanálech. Úplný seznam dostupných dekorátorů najdete v referenci pro vývojáře pipeline.

Funkce, které používáte k definování datových sad, by neměly obsahovat libovolnou logiku Pythonu nesouvisející s datovou sadou, včetně volání rozhraní API třetích stran. Kanály tyto funkce spouští několikrát během plánování, ověřování a aktualizací. Zahrnutí libovolné logiky může vést k neočekávaným výsledkům.

Čtení dat pro zahájení definice datové sady

Funkce používané k definování datových sad kanálu obvykle začínají operací spark.read nebo spark.readStream. Tyto operace čtení vrací statický nebo streamující objekt DataFrame, který použijete k definování dalších transformací před vrácením datového rámce. Další příklady operací Sparku, které vracejí datový rámec, zahrnují spark.table, nebo spark.range.

Funkce by nikdy neměly odkazovat na datové rámce definované mimo funkci. Pokus o odkazování na datové rámce definované v jiném oboru může vést k neočekávanému chování. Příklad vzoru metaprogramování pro vytváření více tabulek naleznete v tématu Vytvoření tabulek ve smyčcefor.

Následující příklady ukazují základní syntaxi čtení dat pomocí logiky dávky nebo streamování:

from pyspark import pipelines as dp

# Batch read on a table
@dp.materialized_view()
def function_name():
  return spark.read.table("catalog_name.schema_name.table_name")

# Batch read on a path
@dp.materialized_view()
def function_name():
  return spark.read.format("parquet").load("/Volumes/catalog_name/schema_name/volume_name/data_path")


# Streaming read on a table
@dp.table()
def function_name():
  return spark.readStream.table("catalog_name.schema_name.table_name")

# Streaming read on a path
@dp.table()
def function_name():
  return (spark.read
    .format("cloudFiles")
    .option("cloudFile.format", "parquet")
    .load("/Volumes/catalog_name/schema_name/volume_name/data_path")
  )

Pokud potřebujete číst data z externího rozhraní REST API, implementujte toto připojení pomocí vlastního zdroje dat Pythonu. Viz vlastní zdroje dat PySpark.

Poznámka:

Z kolekcí dat Pythonu je možné vytvořit libovolné datové rámce Apache Sparku, včetně datových rámců pandas, diktování a seznamů. Tyto vzory můžou být užitečné při vývoji a testování, ale většina definic datových sad v produkčním kanálu by měla začínat načítáním dat ze souborů, externího systému nebo existující tabulky nebo zobrazení.

Řetězení transformací

Kanály podporují téměř všechny transformace datového rámce Apache Spark. Do funkce definice datové sady můžete zahrnout libovolný počet transformací, ale měli byste zajistit, aby používané metody vždy vracely objekt DataFrame.

Pokud máte zprostředkující transformaci, která řídí několik podřízených úloh, ale nemusíte ji materializovat jako tabulku, použijte @dp.temporary_view() k přidání dočasného pohledu do vaší pipeline. Pak můžete na toto zobrazení odkazovat pomocí spark.read.table("temp_view_name") ve více různých definicích podřízených datových sad. Následující syntaxe ukazuje tento vzor:

from pyspark import pipelines as dp

@dp.temporary_view()
def a():
  return spark.read.table("source").filter(...)

@dp.materialized_view()
def b():
  return spark.read.table("a").groupBy(...)

@dp.materialized_view()
def c():
  return spark.read.table("a").groupBy(...)

Tím zajistíte, že pipeline bude mít během plánování pipeline úplné povědomí o transformacích ve vašem zobrazení a zabrání potenciálním problémům souvisejícím s libovolným kódem Pythonu běžícím mimo definice datových sad.

Ve své funkci můžete zřetězit DataFramy a vytvořit nové DataFramy, aniž byste museli zapisovat přírůstkové výsledky jako zobrazení, materializovaná zobrazení nebo streamované tabulky, jako v následujícím příkladu:

from pyspark import pipelines as dp

@dp.table()
def multiple_transformations():
  df1 = spark.read.table("source").filter(...)
  df2 = df1.groupBy(...)
  return df2.filter(...)

Pokud všechny datové rámce provádějí počáteční čtení pomocí dávkové logiky, je výsledkem vrácení statický datový rámec. Pokud máte dotazy, které se streamují, výsledkem je streamovaný datový rámec.

Vrátit datový rámec

Použijte @dp.table, abyste vytvořili tabulku pro streamování z výsledků čtení streamu. Použijte @dp.materialized_view k vytvoření materializovaného zobrazení z výsledků dávkového čtení. Většina ostatních dekorátorů pracuje na streamovaných i statických datových rámcích, zatímco několik vyžaduje streamované datové rámce.

Funkce použitá k definování datové sady musí vrátit datový rámec Sparku. Nikdy nepoužívejte metody, které ukládají nebo zapisují do souborů nebo tabulek jako součást kódu datové sady pipeline.

Příklady operací Apache Sparku, které by se nikdy neměly používat v kódu zpracovatelského řetězce:

  • collect()
  • count()
  • toPandas()
  • save()
  • saveAsTable()
  • start()
  • toTable()

Poznámka:

Kanály také podporují použití Pandas ve Sparku pro funkce definice datové sady. Vizte Pandas API on Spark.

Použití SQL v Pythonovém zpracovatelském řetězci

PySpark podporuje spark.sql operátor pro zápis kódu datového rámce pomocí SQL. Pokud tento vzor použijete ve zdrojovém kódu pipeline, zkompiluje se do materializovaných zobrazení nebo streamovacích tabulek.

Následující příklad kódu je ekvivalentní použití spark.read.table("catalog_name.schema_name.table_name") pro logiku dotazu datové sady:

@dp.materialized_view
def my_table():
  return spark.sql("SELECT * FROM catalog_name.schema_name.table_name")

dlt.read a dlt.read_stream (starší verze)

Starší dlt modul obsahuje dlt.read() a dlt.read_stream() funkce, které byly zavedeny pro podporu funkcí v režimu publikování v mode pipeline. Tyto metody jsou podporované, ale Databricks doporučuje vždy používat funkce spark.read.table() a spark.readStream.table() z následujících důvodů:

  • Funkce dlt mají omezenou podporu čtení datových sad definovaných mimo aktuální kanál.
  • Funkce spark podporují zadávání možností k operacím čtení, například skipChangeCommits. Funkce dlt nepodporují specifikaci možností.
  • Modul dlt byl sám nahrazen modulem pyspark.pipelines . Databricks doporučuje použít from pyspark import pipelines as dp k importu pyspark.pipelines pro použití při psaní kódu kanálů v Pythonu.