共用方式為


定義資料集的函數

pyspark.pipelines (此處別名為dp)模組使用裝飾器實現其大部分核心功能。 這些裝飾器接受一個定義串流或批次查詢的程式函式,並傳回 Apache Spark DataFrame。 下列語法顯示定義管線資料集的簡單範例:

from pyspark import pipelines as dp

@dp.table()
def function_name(): # This is the function decorated
  return (<query>) # This is the query logic that defines the dataset

此頁面提供定義管線中資料集之函式和查詢的概觀。 如需可用裝飾器的完整清單,請參閱 管線開發人員參考

您用來定義資料集的函數不應包含與資料集無關的任意 Python 邏輯,包括對協力廠商 API 的呼叫。 管線會在規劃、驗證和更新期間多次執行這些函式。 包含任意邏輯可能會導致意想不到的結果。

讀取資料以開始資料集定義

用來定義管線資料集的函式通常以 spark.readspark.readStream 作業開始。 這些讀取作業會傳回靜態或串流 DataFrame 物件,您在傳回 DataFrame 之前用來定義其他轉換。 傳回 DataFrame 的 Spark 作業的其他範例包括 spark.table、 或 spark.range

函式絕不應該參考函式外部定義的 DataFrame。 嘗試參考不同範圍定義的 DataFrame 可能會導致非預期的行為。 如需建立多個資料表的元程式設計模式範例,請參閱在 迴圈中 for 建立資料表

下列範例顯示使用批次或串流邏輯讀取資料的基本語法:

from pyspark import pipelines as dp

# Batch read on a table
@dp.materialized_view()
def function_name():
  return spark.read.table("catalog_name.schema_name.table_name")

# Batch read on a path
@dp.materialized_view()
def function_name():
  return spark.read.format("parquet").load("/Volumes/catalog_name/schema_name/volume_name/data_path")


# Streaming read on a table
@dp.table()
def function_name():
  return spark.readStream.table("catalog_name.schema_name.table_name")

# Streaming read on a path
@dp.table()
def function_name():
  return (spark.read
    .format("cloudFiles")
    .option("cloudFile.format", "parquet")
    .load("/Volumes/catalog_name/schema_name/volume_name/data_path")
  )

如果您需要從外部 REST API 讀取資料,請使用 Python 自訂資料來源實作此連線。 請參閱 PySpark 自訂資料來源

備註

可以從 Python 資料集合建立任意 Apache Spark DataFrame,包括 pandas DataFrame、dicts 和清單。 這些模式在開發和測試期間可能很有用,但大多數生產管線資料集定義應該從檔案、外部系統或現有資料表或檢視載入資料開始。

串接轉換

管線支援幾乎所有 Apache Spark DataFrame 轉換。 您可以在資料集定義函式中包含任意數目的轉換,但您應該確保您使用的方法一律傳回 DataFrame 物件。

如果您有驅動多個下游工作負載的中繼轉換,但不需要將其具體化為資料表,請使用 @dp.temporary_view() 將暫存檢視新增到您的管線中。 然後,您可以在多個下游資料集定義中使用spark.read.table("temp_view_name")來參考此檢視。 下列語法示範此模式:

from pyspark import pipelines as dp

@dp.temporary_view()
def a():
  return spark.read.table("source").filter(...)

@dp.materialized_view()
def b():
  return spark.read.table("a").groupBy(...)

@dp.materialized_view()
def c():
  return spark.read.table("a").groupBy(...)

這可確保管線在管線規劃期間完全了解視圖中的轉換,並防止與在資料集定義之外執行的任意 Python 程式碼相關的潛在問題。

在您的函數中,您可以將 DataFrame 鏈結在一起以建立新的 DataFrame,而無需將累加結果寫入為檢視、具體化檢視或串流資料表,如下列範例所示:

from pyspark import pipelines as dp

@dp.table()
def multiple_transformations():
  df1 = spark.read.table("source").filter(...)
  df2 = df1.groupBy(...)
  return df2.filter(...)

如果您所有的 DataFrame 都使用批次邏輯執行其初始讀取,則傳回結果是靜態 DataFrame。 如果您有任何查詢正在進行串流,您的返回結果將是一個串流的資料框。

傳回 DataFrame

@dp.table 來從串流讀取的結果建立串流資料表。 使用 @dp.materialized_view 從批次讀取的結果來建立具體化檢視。 大多數其他裝飾器適用於流式和靜態 DataFrame,但有少數只需流式 DataFrame。

用來定義資料集的函式必須傳回 Spark DataFrame。 切勿使用將檔案或資料表儲存或寫入為管線資料集程式碼一部分的方法。

絕不應在管線程式碼中使用的 Apache Spark 作業範例:

  • collect()
  • count()
  • toPandas()
  • save()
  • saveAsTable()
  • start()
  • toTable()

備註

管線也支援在 Spark 上使用 Pandas 來進行資料集定義函式。 請參閱 Spark 上的 Pandas API

在 Python 管線中使用 SQL

PySpark 支援運算子 spark.sql 使用 SQL 撰寫 DataFrame 程式碼。 當您在管線原始程式碼中使用此模式時,它會編譯為具體化檢視或串流資料表。

下列程式碼範例相當於使用 spark.read.table("catalog_name.schema_name.table_name") 的資料集查詢邏輯:

@dp.materialized_view
def my_table():
  return spark.sql("SELECT * FROM catalog_name.schema_name.table_name")

dlt.readdlt.read_stream (舊版)

較舊的 dlt 模組包含由 dlt.read()dlt.read_stream() 引入的函式,以支援舊版管線發佈模式中的功能。 支援這些方法,但 Databricks 建議一律使用 spark.read.table() and spark.readStream.table() 函式,因為下列原因:

  • 這些 dlt 函式對讀取目前管線外部定義的資料集的支援有限。
  • 這些 spark 功能支援指定選項,例如 skipChangeCommits,以進行讀取操作。 函數不支援 dlt 指定選項。
  • 模組 dlt 已被模組 pyspark.pipelines 取代。 Databricks 建議使用 from pyspark import pipelines as dp 來匯入 pyspark.pipelines,以便在撰寫 Python 管線程式碼時加以使用。