一般的なデータ読み込みパターン

自動ローダーを使用すると、一般的なデータ インジェスト タスクが簡単になります。 このクイック リファレンスでは、いくつかの一般的なパターンの例を示します。

glob パターンを使用したディレクトリまたはファイルのフィルター処理

glob パターンは、パスに指定されているときに、ディレクトリとファイルのフィルター処理に使用できます。

Pattern 説明
? 任意の 1 文字と一致します
* 0 個以上の文字と一致します
[abc] 文字セット {a, b, c} の 1 文字と一致します。
[a-z] 文字範囲 {a…z} の 1 文字と一致します。
[^a] 文字セットまたは範囲 {a} からのものではない 1 文字と一致します。 ^ 文字は左角かっこのすぐ右側に表示されることに注意してください。
{ab,cd} 文字列セット {ab, cd} の文字列と一致します。
{ab,c{de, fh}} 文字列セット {ab, cde, cfh} の文字列と一致します。

次の例のように、プレフィックス パターンを指定するには path を使用します。

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", <format>) \
  .schema(schema) \
  .load("<base-path>/*/files")

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", <format>)
  .schema(schema)
  .load("<base-path>/*/files")

重要

サフィックス パターンを明示的に指定するには、オプション pathGlobFilter を使用する必要があります。 path は、プレフィックス フィルターのみを提供します。

たとえば、さまざまなサフィックスのファイルが含まれているディレクトリ内で png ファイルのみを解析する場合は、次のようにします。

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .option("pathGlobfilter", "*.png") \
  .load(<base-path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .option("pathGlobfilter", "*.png")
  .load(<base-path>)

注意

自動ローダーの既定のグロビング動作は、他の Spark ファイル ソースの既定の動作とは異なります。 ファイル ソースに対する既定の Spark 動作に一致するグロビングを使用するには、読み取りに .option("cloudFiles.useStrictGlobber", "true") を追加します。 グロビングについて詳しくは、次の表をご覧ください。

Pattern [ファイル パス] 既定の globber 厳密な globber
/a/b /a/b/c/file.txt はい はい
/a/b /a/b_dir/c/file.txt いいえ いいえ
/a/b /a/b.txt いいえ いいえ
/a/b/ /a/b.txt いいえ いいえ
/a/*/c/ /a/b/c/file.txt はい はい
/a/*/c/ /a/b/c/d/file.txt はい はい
/a/*/c/ /a/b/x/y/c/file.txt はい いいえ
/a/*/c /a/b/c_file.txt はい いいえ
/a/*/c/ /a/b/c_file.txt はい いいえ
/a/*/c/ /a/*/cookie/file.txt はい いいえ
/a/b* /a/b.txt はい はい
/a/b* /a/b/file.txt はい はい
/a/{0.txt,1.txt} /a/0.txt はい はい
/a/*/{0.txt,1.txt} /a/0.txt いいえ いいえ
/a/b/[cde-h]/i/ /a/b/c/i/file.txt はい はい

簡単な ETL を有効にする

データが失われることなく、Delta Lake にデータを取り込む簡単な方法は、次のパターンを使用し、自動ローダーによるスキーマ推論を有効にすることです。 Databricks では、ソース データのスキーマが変更されたときにストリームを自動的に再起動するために、Azure Databricks ジョブで次のコードを実行することを推奨しています。 既定では、スキーマは文字列型として推論され、解析エラー (すべてが文字列として保持されていれば発生しません) は _rescued_data に移されます。また、新しい列により、ストリームが失敗し、スキーマが展開されます。

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>")
  .load("<path-to-source-data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

適切に構造化されたデータのデータ損失を防ぐ

スキーマは把握しているが、予期しないデータを受け取ったときにそれがわかるようにしたい場合、Databricks では rescuedDataColumn を使用することを推奨しています。

Python

spark.readStream.format("cloudFiles") \
  .schema(expected_schema) \
  .option("cloudFiles.format", "json") \
  # will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .schema(expected_schema)
  .option("cloudFiles.format", "json")
  // will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

スキーマと一致しない新しいフィールドが導入された場合にストリームの処理を停止する場合は、次のコードを追加します。

.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")

柔軟な半構造化データ パイプラインを有効にする

提供する情報に新しい列を導入するベンダーからデータを受け取っている場合、それがいつ行われたのかが正確にわからない場合や、データ パイプラインを更新するための帯域幅がない場合があります。 スキーマの展開を利用してストリームを再起動し、自動ローダーによって、推論されたスキーマが自動的に更新されるようにすることができるようになりました。 ベンダーが提供する可能性のある "スキーマレス" フィールドに schemaHints を利用することもできます。

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT") \
  .load("/api/requests") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT")
  .load("/api/requests")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

入れ子になった JSON データを変換する

Auto Loader によって最上位の JSON 列が文字列として推論されるため、入れ子になった JSON オブジェクトが、さらに変換が必要な状態で残される可能性があります。 半構造化データ アクセス API を使用して、複雑な JSON コンテンツをさらに変換できます。

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .load("<source-data-with-nested-json>") \
  .selectExpr(
    "*",
    "tags:page.name",    # extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int", # extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"     # extracts {"tags":{"eventType":...}}
  )

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<source-data-with-nested-json>")
  .selectExpr(
    "*",
    "tags:page.name",     // extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int",  // extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"      // extracts {"tags":{"eventType":...}}
  )

入れ子になった JSON データを推論する

入れ子になったデータがある場合は、cloudFiles.inferColumnTypes オプションを使用して、データ型およびその他の列の型の入れ子構造を推論できます。

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .option("cloudFiles.inferColumnTypes", "true") \
  .load("<source-data-with-nested-json>")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .option("cloudFiles.inferColumnTypes", "true")
  .load("<source-data-with-nested-json>")

ヘッダーのない CSV ファイルを読み込む

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

ヘッダーがある CSV ファイルにスキーマを適用する

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("header", "true") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("header", "true")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

イメージまたはバイナリ データを ML 用の Delta Lake に取り込む

データが Delta Lake に格納されると、データに対して分散推論を実行できます。 「pandas UDF を使用して分散推論を実行する」を参照してください。

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

DLT の自動ローダー構文

Delta Live Tables は、自動ローダーの SQL サポートを追加する、自動ローダー用の少し変更された Python 構文を提供しています。

次の例では、自動ローダーを使用して CSV と JSON ファイルからデータセットを作成します。

Python

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json")

自動ローダーではサポートされている形式オプションを使用できます。 map() 関数を使用すると、cloud_files() メソッドにオプションを渡すことができます。 オプションはキーと値のペアで、キーと値は文字列です。 SQL で自動ローダーを操作するための構文を次に示します。

CREATE OR REFRESH STREAMING TABLE <table-name>
AS SELECT *
  FROM cloud_files(
    "<file-path>",
    "<file-format>",
    map(
      "<option-key>", "<option_value",
      "<option-key>", "<option_value",
      ...
    )
  )

次の例では、ヘッダーのあるタブ区切りの CSV ファイルからデータを読み取ります。

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv", map("delimiter", "\t", "header", "true"))

schema を使用して書式を手動で指定できます。スキーマ推論をサポートしていない形式の場合は schema を指定する必要があります。

Python

@dlt.table
def wiki_raw():
  return (
    spark.readStream.format("cloudFiles")
      .schema("title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
      .option("cloudFiles.format", "parquet")
      .load("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")
  )

SQL

CREATE OR REFRESH STREAMING TABLE wiki_raw
AS SELECT *
  FROM cloud_files(
    "/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet",
    "parquet",
    map("schema", "title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
  )

注意

Delta Live Tables は、自動ローダーを使用してファイルを読み取る際にスキーマとチェックポイント ディレクトリを自動的に構成して管理します。 ただし、これらのディレクトリのいずれかを手動で構成した場合は、完全な更新を実行しても、構成されたディレクトリの内容には影響しません。 Databricks では、処理中の予期しない副作用を回避するために、自動的に構成されたディレクトリを使用することが推奨されています。