常見的資料載入模式

自動載入器可簡化一些常見的資料擷取工作。 此快速參考提供數種熱門模式的範例。

使用 Glob 模式篩選目錄或檔案

Glob 模式可用於在路徑中提供時篩選目錄和檔案。

模式 描述
? 比對任何單一字元
* 比對零或多個字元
[abc] 比對字元集 {a,b,c} 中的單一字元。
[a-z] 比對字元範圍 {a... 中的單一字元z}.
[^a] 比對不是來自字元集或範圍 {a} 的單一字元。 請注意, ^ 字元必須緊接在左括弧右邊。
{ab,cd} 比對字串集 {ab, cd} 中的字串。
{ab,c{de, fh}} 比對字串集 {ab, cde, cfh} 中的字串。

path使用 提供前置詞模式的 ,例如:

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", <format>) \
  .schema(schema) \
  .load("<base_path>/*/files")

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", <format>)
  .schema(schema)
  .load("<base_path>/*/files")

重要事項

您必須使用 選項 pathGlobFilter 來明確提供尾碼模式。 只會 path 提供前置詞篩選。

例如,如果您想要只 png 剖析目錄中含有不同尾碼之檔案的檔案,您可以執行:

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .option("pathGlobfilter", "*.png") \
  .load(<base_path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .option("pathGlobfilter", "*.png")
  .load(<base_path>)

啟用簡易 ETL

將資料放入 Delta Lake 而不遺失任何資料的簡單方式,就是使用下列模式,並使用自動載入器啟用架構推斷。 Databricks 建議在 Azure Databricks 作業中執行下列程式碼,以在來源資料的架構變更時自動重新開機串流。 根據預設,架構會推斷為字串類型,任何剖析錯誤 (如果一切都維持為字串,則) 會移至 _rescued_data ,而且任何新的資料行都會使資料流程失敗併發展架構。

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  .option("cloudFiles.schemaLocation", "<path_to_schema_location>") \
  .load("<path_to_source_data>") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path_to_checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path_to_schema_location>")
  .load("<path_to_source_data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path_to_checkpoint>")
  .start("<path_to_target")

防止資料在結構良好的資料中遺失

當您知道架構,但想要知道何時收到非預期的資料時,Databricks 建議使用 rescuedDataColumn

Python

spark.readStream.format("cloudFiles") \
  .schema(expected_schema) \
  .option("cloudFiles.format", "json") \
  # will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue") \
  .load("<path_to_source_data>") \
  .writeStream \
  .option("checkpointLocation", "<path_to_checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .schema(expected_schema)
  .option("cloudFiles.format", "json")
  // will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load("<path_to_source_data>")
  .writeStream
  .option("checkpointLocation", "<path_to_checkpoint>")
  .start("<path_to_target")

如果您想要讓串流在導入不符合架構的新欄位時停止處理,您可以新增:

.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")

啟用彈性半結構化資料管線

當您從引進新資料行的廠商接收資料給它們提供的資訊時,您可能不知道它們執行時,或者您可能沒有頻寬來更新資料管線。 您現在可以利用架構演進來重新開機資料流程,並讓自動載入器自動更新推斷的架構。 您也可以利用 schemaHints 廠商可能提供的一些「無架構」欄位。

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT") \
  .load("/api/requests") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path_to_checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT")
  .load("/api/requests")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path_to_checkpoint>")
  .start("<path_to_target")

轉換巢狀 JSON 資料

由於自動載入器會將最上層 JSON 資料行推斷為字串,因此您可以保留需要進一步轉換的巢狀 JSON 物件。 您可以使用 半結構化資料存取 API 來進一步轉換複雜的 JSON 內容。

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path_to_checkpoint>") \
  .load("<source_data_with_nested_json>") \
  .selectExpr(
    "*",
    "tags:page.name",    # extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int", # extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"     # extracts {"tags":{"eventType":...}}
  )

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path_to_checkpoint>")
  .load("<source_data_with_nested_json>")
  .selectExpr(
    "*",
    "tags:page.name",     // extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int",  // extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"      // extracts {"tags":{"eventType":...}}
  )

推斷巢狀 JSON 資料

當您有巢狀資料時,可以使用 cloudFiles.inferColumnTypes 選項來推斷資料和其他資料行類型的巢狀結構。

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path_to_checkpoint>") \
  .option("cloudFiles.inferColumnTypes", "true") \
  .load("<source_data_with_nested_json>")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path_to_checkpoint>")
  .option("cloudFiles.inferColumnTypes", "true")
  .load("<source_data_with_nested_json>")

載入不含標頭的 CSV 檔案

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

在具有標頭的 CSV 檔案上強制執行架構

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("header", "true") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("header", "true")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

將影像或二進位資料擷取至 Delta Lake for ML

一旦資料儲存在 Delta Lake 中,您就可以對資料執行分散式推斷。 請參閱 使用 pandas UDF 執行分散式推斷

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .load("<path_to_source_data>") \
  .writeStream \
  .option("checkpointLocation", "<path_to_checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .load("<path_to_source_data>")
  .writeStream
  .option("checkpointLocation", "<path_to_checkpoint>")
  .start("<path_to_target")