常见数据加载模式

自动加载程序简化了许多常见的数据引入任务。 此快速参考提供了几种流行模式的示例。

使用 glob 模式筛选目录或文件

在路径中提供 Glob 模式时,可用于筛选目录和文件。

模式 说明
? 匹配任何单一字符
* 与零个或多个字符匹配
[abc] 匹配字符集中的单个字符 {a,b,c}。
[a-z] 匹配字符范围 {a...z} 中的单个字符。
[^a] 匹配不是来自字符集或范围 {a} 的单个字符。 请注意,^ 字符必须立即出现在左括号的右侧。
{ab,cd} 匹配字符串集 {ab, cd} 中的字符串。
{ab,c{de, fh}} 匹配字符串集 {ab, cde, cfh} 中的字符串。

使用 path 来提供前缀模式,例如:

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", <format>) \
  .schema(schema) \
  .load("<base-path>/*/files")

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", <format>)
  .schema(schema)
  .load("<base-path>/*/files")

重要

你需要使用 pathGlobFilter 选项来显式提供后缀模式。 path 仅提供前缀筛选器。

例如,如果只想分析包含采用不同后缀的文件的目录中的 png 文件,则可以执行以下操作:

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .option("pathGlobfilter", "*.png") \
  .load(<base-path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .option("pathGlobfilter", "*.png")
  .load(<base-path>)

注意

自动加载程序的默认通配符行为不同于其他 Spark 文件源的默认行为。 将 .option("useStrictGlobber", "true") 添加到读取中,以使用将默认 Spark 行为与文件源相匹配的通配符。 有关通配符的详细信息,请参阅下表:

模式 文件路径 默认通配程序 严格通配程序
/a/b /a/b/c/file.txt
/a/b /a/b_dir/c/file.txt
/a/b /a/b.txt
/a/b/ /a/b.txt
/a/*/c/ /a/b/c/file.txt
/a/*/c/ /a/b/c/d/file.txt
/a/*/c/ /a/b/x/y/c/file.txt
/a/*/c /a/b/c_file.txt
/a/*/c/ /a/b/c_file.txt
/a/*/c/ /a/*/cookie/file.txt
/a/b* /a/b.txt
/a/b* /a/b/file.txt
/a/{0.txt,1.txt} /a/0.txt
/a/*/{0.txt,1.txt} /a/0.txt
/a/b/[cde-h]/i/ /a/b/c/i/file.txt

启用简易 ETL

将数据引入 Delta Lake 而不丢失任何数据的一种简单方法是使用以下模式并在自动加载程序中启用架构推理。 Databricks 建议为此在某个 Azure Databricks 作业中运行以下代码,以便在源数据架构发生更改时自动重启流。 默认情况下,架构将推理为字符串类型,所有分析错误(如果所有内容保留为字符串,则不应会出现任何错误)将进入 _rescued_data,任何新列将导致流失败并使架构演变。

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>")
  .load("<path-to-source-data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

防止结构良好的数据丢失

如果你知道自己的架构,但希望每次收到意外数据时知道这一情况,则 Databricks 建议使用 rescuedDataColumn

Python

spark.readStream.format("cloudFiles") \
  .schema(expected_schema) \
  .option("cloudFiles.format", "json") \
  # will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .schema(expected_schema)
  .option("cloudFiles.format", "json")
  // will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

如果你希望在引入了与架构不匹配的新字段时流停止处理,可以添加:

.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")

启用灵活的半结构化数据管道

当你从供应商收到了数据,而这些数据会将新列引入到他们提供的信息时,你可能不知道他们何时提供了信息,或者没有带宽来更新数据管道。 你现在可以利用架构演变来重启流,并让自动加载程序自动更新推理的架构。 对于供应商可能提供的一些“无架构”字段,还可以利用 schemaHints

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT") \
  .load("/api/requests") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT")
  .load("/api/requests")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

转换嵌套的 JSON 数据

由于自动加载程序将顶级 JSON 列推断为字符串,因此可能会留下需要进一步转换的嵌套 JSON 对象。 可以使用半结构化数据访问 API 进一步转换复杂的 JSON 内容。

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .load("<source-data-with-nested-json>") \
  .selectExpr(
    "*",
    "tags:page.name",    # extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int", # extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"     # extracts {"tags":{"eventType":...}}
  )

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<source-data-with-nested-json>")
  .selectExpr(
    "*",
    "tags:page.name",     // extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int",  // extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"      // extracts {"tags":{"eventType":...}}
  )

推断嵌套的 JSON 数据

如果你有嵌套数据,可以使用 cloudFiles.inferColumnTypes 选项推断数据和其他列类型的嵌套结构。

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .option("cloudFiles.inferColumnTypes", "true") \
  .load("<source-data-with-nested-json>")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .option("cloudFiles.inferColumnTypes", "true")
  .load("<source-data-with-nested-json>")

加载不带标头的 CSV 文件

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

使用标头在 CSV 文件上强制实施架构

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("header", "true") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("header", "true")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

将图像或二进制数据引入 Delta Lake for ML

将数据存储在 Delta Lake 中后,就可以对数据运行分布式推理。 请参阅使用 pandas UDF 执行分布式推理

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")