Freigeben über


Funktionen zum Definieren von Datasets

Das pyspark.pipelines (hier als Alias versehene dp) Modul implementiert einen Großteil seiner Kernfunktionen mithilfe von Dekoratoren. Diese Dekorateure akzeptieren eine Funktion, die entweder eine Streaming- oder Batchabfrage definiert und einen Apache Spark DataFrame zurückgibt. Die folgende Syntax zeigt ein einfaches Beispiel zum Definieren eines Pipeline-Datasets:

from pyspark import pipelines as dp

@dp.table()
def function_name(): # This is the function decorated
  return (<query>) # This is the query logic that defines the dataset

Diese Seite bietet eine Übersicht über die Funktionen und Abfragen, die Datasets in Pipelines definieren. Eine vollständige Liste der verfügbaren Dekorateure finden Sie in der Pipeline-Entwicklerreferenz.

Die Funktionen, die Sie zum Definieren von Datasets verwenden, sollten keine beliebige Python-Logik enthalten, die nicht mit dem Dataset verknüpft ist, einschließlich Aufrufe von APIs von Drittanbietern. Pipelines führen diese Funktionen während der Planung, Validierung und Updates mehrmals aus. Das Einschließen beliebiger Logik kann zu unerwarteten Ergebnissen führen.

Daten lesen, um eine Datensatzdefinition zu starten

Funktionen, die zum Definieren von Pipeline-Datasets verwendet werden, beginnen in der Regel mit einem spark.read Oder spark.readStream Vorgang. Diese Lesevorgänge geben ein statisches oder streaming-DataFrame-Objekt zurück, das Sie verwenden, um zusätzliche Transformationen zu definieren, bevor Der DataFrame zurückgegeben wird. Weitere Beispiele für Spark-Vorgänge, die einen DataFrame zurückgeben, sind spark.tableoder spark.range.

Funktionen sollten niemals auf DataFrames verweisen, die außerhalb der Funktion definiert sind. Der Versuch, auf DataFrames zu verweisen, die in einem anderen Bereich definiert sind, kann zu unerwartetem Verhalten führen. Ein Beispiel für ein Metaprogrammierungsmuster zum Erstellen mehrerer Tabellen finden Sie unter Erstellen von Tabellen in einer for Schleife.

Die folgenden Beispiele zeigen die grundlegende Syntax zum Lesen von Daten mithilfe von Batch- oder Streaminglogik:

from pyspark import pipelines as dp

# Batch read on a table
@dp.materialized_view()
def function_name():
  return spark.read.table("catalog_name.schema_name.table_name")

# Batch read on a path
@dp.materialized_view()
def function_name():
  return spark.read.format("parquet").load("/Volumes/catalog_name/schema_name/volume_name/data_path")


# Streaming read on a table
@dp.table()
def function_name():
  return spark.readStream.table("catalog_name.schema_name.table_name")

# Streaming read on a path
@dp.table()
def function_name():
  return (spark.read
    .format("cloudFiles")
    .option("cloudFile.format", "parquet")
    .load("/Volumes/catalog_name/schema_name/volume_name/data_path")
  )

Wenn Sie Daten aus einer externen REST-API lesen müssen, implementieren Sie diese Verbindung mit einer benutzerdefinierten Python-Datenquelle. Siehe pySpark benutzerdefinierte Datenquellen.

Hinweis

Es ist möglich, beliebige Apache Spark DataFrames aus Python-Sammlungen von Daten zu erstellen, einschließlich Pandas DataFrames, Diktieren und Listen. Diese Muster können während der Entwicklung und beim Testen nützlich sein, aber die meisten Definitionen von Datasetdefinitionen für Produktionspipeline sollten mit dem Laden von Daten aus Dateien, einem externen System oder einer vorhandenen Tabelle oder Ansicht beginnen.

Verketten von Transformationen

Pipelines unterstützen fast alle Apache Spark DataFrame-Transformationen. Sie können eine beliebige Anzahl von Transformationen in die Datasetdefinitionsfunktion einschließen, aber Sie sollten sicherstellen, dass die methoden, die Sie verwenden, immer ein DataFrame-Objekt zurückgeben.

Wenn Sie über eine zwischengeschaltete Transformation verfügen, die mehrere nachgeschaltete Workloads steuert, aber nicht als Tabelle materialisiert werden muss, verwenden Sie @dp.temporary_view(), um Ihrer Pipeline eine temporäre Ansicht hinzuzufügen. Anschließend können Sie diese Ansicht mit spark.read.table("temp_view_name") in mehreren nachgeschalteten Dataset-Definitionen referenzieren. Die folgende Syntax veranschaulicht dieses Muster:

from pyspark import pipelines as dp

@dp.temporary_view()
def a():
  return spark.read.table("source").filter(...)

@dp.materialized_view()
def b():
  return spark.read.table("a").groupBy(...)

@dp.materialized_view()
def c():
  return spark.read.table("a").groupBy(...)

Dadurch wird sichergestellt, dass die Pipeline die Transformationen in Ihrer Ansicht während der Pipelineplanung vollständig kennt und potenzielle Probleme im Zusammenhang mit beliebigem Python-Code verhindert, der außerhalb von Datasetdefinitionen ausgeführt wird.

Innerhalb Ihrer Funktion können Sie DataFrames miteinander verketten, um neue DataFrames zu erstellen, ohne inkrementelle Ergebnisse als Ansichten, materialisierte Ansichten oder Streamingtabellen zu schreiben, wie im folgenden Beispiel gezeigt:

from pyspark import pipelines as dp

@dp.table()
def multiple_transformations():
  df1 = spark.read.table("source").filter(...)
  df2 = df1.groupBy(...)
  return df2.filter(...)

Wenn alle Ihre DataFrames ihre anfänglichen Lesevorgänge mit Batchlogik ausführen, ist das Rückgabeergebnis ein statischer DataFrame. Wenn Sie Abfragen haben, die gestreamt werden, ist Ihr Ergebnis ein Streaming-DataFrame.

Zurückgeben eines DataFrames

Verwenden Sie @dp.table, um eine Streamingtabelle aus den Ergebnissen eines Streaming-Lesevorgangs zu erstellen. Verwenden Sie @dp.materialized_view, um eine materialisierte Ansicht aus den Ergebnissen einer Batch-Lesung zu erstellen. Die meisten anderen Decoratoren funktionieren sowohl für Streaming- als auch für statische DataFrames, während einige einen Streaming-DataFrame erfordern.

Die Zum Definieren eines Datasets verwendete Funktion muss einen Spark DataFrame zurückgeben. Verwenden Sie niemals Methoden, die als Teil des Pipeline-Datasetcodes in Dateien oder Tabellen speichern oder schreiben.

Beispiele für Apache Spark-Vorgänge, die niemals im Pipelinecode verwendet werden sollten:

  • collect()
  • count()
  • toPandas()
  • save()
  • saveAsTable()
  • start()
  • toTable()

Hinweis

Pipelines unterstützen auch die Verwendung von Pandas auf Spark für Datasetdefinitionsfunktionen. Siehe Pandas-API auf Spark.

Verwenden von SQL in einer Python-Pipeline

PySpark unterstützt den spark.sql Operator zum Schreiben von DataFrame-Code mit SQL. Wenn Sie dieses Muster im Pipelinequellcode verwenden, wird es zu materialisierten Ansichten oder Streamingtabellen kompiliert.

Das folgende Codebeispiel entspricht der Verwendung von spark.read.table("catalog_name.schema_name.table_name") für die Abfragelogik des Datasets:

@dp.materialized_view
def my_table():
  return spark.sql("SELECT * FROM catalog_name.schema_name.table_name")

dlt.read und dlt.read_stream (Vorversion)

Das ältere dlt Modul enthält dlt.read() und dlt.read_stream() Funktionen, die eingeführt wurden, um Funktionen im Legacy-Pipeline-Veröffentlichungsmodus zu unterstützen. Diese Methoden werden unterstützt, Databricks empfiehlt jedoch, die Funktionen spark.read.table() und spark.readStream.table() zu verwenden.

  • Die dlt Funktionen haben eingeschränkte Unterstützung für das Lesen von Datasets, die außerhalb der aktuellen Pipeline definiert sind.
  • Die spark Funktionen unterstützen das Angeben von Optionen, wie z. B. skipChangeCommits, für Leseoperationen. Das Angeben von Optionen wird von den dlt Funktionen nicht unterstützt.
  • Das dlt Modul selbst wurde durch das pyspark.pipelines Modul ersetzt. Databricks empfiehlt, from pyspark import pipelines as dp zu verwenden, um pyspark.pipelines beim Schreiben von Pipelines-Code in Python zu importieren.