自動ローダーでのスキーマの推論と展開の構成

読み込まれたデータのスキーマを自動的に検出するように自動ローダーを構成できます。これにより、データ スキーマを明示的に宣言せずにテーブルを初期化し、新しい列が導入されたときにテーブル スキーマを進化させることができます。 これにより、スキーマの変更を常時手動で追跡して適用する必要がなくなります。

また、自動ローダーでは、予期しないデータ (データ型が異なるデータなど) を JSON BLOB 列に "復旧" することもできます。この場合、後で半構造化データ アクセス API を使用してアクセスすることを選択できます。

スキーマ推論と展開に対しては、以下の形式がサポートされています。

ファイル形式 サポートされているバージョン
JSON すべてのバージョン
CSV すべてのバージョン
XML Databricks Runtime 14.3 LTS 以降
Avro Databricks Runtime 10.4 LTS 以降
Parquet Databricks Runtime 11.3 LTS 以降
ORC サポートされていない
Text 該当なし (固定スキーマ)
Binaryfile 該当なし (固定スキーマ)

スキーマの推論と展開の構文

オプション cloudFiles.schemaLocation にターゲット ディレクトリを指定すると、スキーマの推論と進化が可能になります。 checkpointLocation に指定したのと同じディレクトリを使用することを選択できます。 Delta Live テーブルを使用する場合、Azure Databricks はスキーマの場所とその他のチェックポイント情報を自動的に管理します。

注意

ターゲット テーブルに複数のソース データの場所が読み込まれている場合、各自動ローダー インジェスト ワークロードには個別のストリーミング チェックポイントが必要です。

次の例では、cloudFiles.formatparquet を使用します。 他のファイル ソースには、csvavro、または json を使用します。 読み取りと書き込みの他のすべての設定の既定の動作は、各形式で同じままです。

Python

(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "parquet")
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")
)

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "parquet")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

自動ローダー スキーマ推論のしくみ

自動ローダーでは、初めてデータを読み込んだときにスキーマを推論するために、検出された最初の 50 GB と 1000 ファイルのうち、先に上限を超えた方をサンプリングします。 自動ローダーは、入力データに対するスキーマの変更を時間の経過に合わせて追跡するために、構成済みの cloudFiles.schemaLocation にある _schemas ディレクトリにスキーマ情報を格納します。

注意

使用するサンプルのサイズを変更するには、次の SQL 構成を設定します。

spark.databricks.cloudFiles.schemaInference.sampleSize.numBytes

(バイト文字列 (例: 10gb))

and

spark.databricks.cloudFiles.schemaInference.sampleSize.numFiles

(整数)

既定では、自動ローダー スキーマ推論では、型の不一致によるスキーマの進化の問題を回避します。 データ型をエンコードしない形式 (JSON、CSV、XML) の場合、自動ローダーはすべての列を文字列として推論します (JSON ファイル内の入れ子になったフィールドを含む)。 型指定されたスキーマ (Parquet および Avro) を含む形式の場合、自動ローダーはファイルのサブセットをサンプリングし、個々のファイルのスキーマをマージします。 この動作の概要を次の表に示します。

ファイル形式 既定の推論されたデータ型
JSON String
CSV String
XML String
Avro Avro スキーマでエンコードされた型
Parquet Parquet スキーマでエンコードされた型

Apache Spark DataFrameReader では、スキーマ推論に異なる動作が使用され、サンプル データに基づいて JSON、CSV、XML ソース内の列のデータ型が選択されます。 自動ローダーでこの動作を有効にするには、オプション cloudFiles.inferColumnTypestrue に設定します。

Note

CSV データのスキーマを推論する場合、自動ローダーでは、ファイルにヘッダーが含まれていると見なされます。 CSV ファイルにヘッダーが含まれていない場合は、オプション .option("header", "false") を指定します。 また、自動ローダーは、サンプル内のすべてのファイルのスキーマをマージして、グローバル スキーマを取得します。 自動ローダーは、ヘッダーに従って各ファイルを読み取り、CSV を正しく解析できます。

Note

2 つの Parquet ファイル内に異なるデータ型の列がある場合、自動ローダーは最も広い種類を選択します。 schemaHints を使用して、この選択をオーバーライドできます。 スキーマ ヒントを指定すると、自動ローダーは列を指定された型にキャストするのではなく、Parquet リーダーに指定した型として列を読み取るように指示します。 不一致が発生した場合、その列は、復旧されたデータ列で復旧されます。

自動ローダー スキーマの進化のしくみ

自動ローダーでは、データを処理する際に新しい列の追加を検出します。 自動ローダーで新しい列が検出されると、ストリームが UnknownFieldException で停止します。 ストリームからこのエラーがスローされる前に、自動ローダーによって、データの最新のマイクロバッチに対してスキーマ推論が実行され、新しい列をスキーマの末尾にマージすることによって、スキーマの場所が最新のスキーマで更新されます。 既存の列のデータ型は変更されません。

Databricks では、このようなスキーマの変更後に自動的に再起動するように、ワークフローを使用して自動ローダー ストリームを構成することをお勧めします。

自動ローダーでは、スキーマの展開について次のモードがサポートされています。これは、cloudFiles.schemaEvolutionMode オプションで設定します。

モード 新しい列を読み取るときの動作
addNewColumns (既定値) ストリームが失敗します。 新しい列がスキーマに追加されます。 既存の列ではデータ型は展開されません。
rescue スキーマは進化せず、スキーマの変更によりストリームが失敗することはありません。 すべての新しい列が、復旧されたデータ列に記録されます。
failOnNewColumns ストリームが失敗します。 提供されたスキーマが更新されるか、問題のあるデータ ファイルが削除されない限り、ストリームは再起動しません。
none スキーマは進化せず、新しい列は無視されます。また、rescuedDataColumn オプションが設定されない限り、データは復旧されません。 スキーマの変更によりストリームが失敗することはありません。

自動ローダー使用時のパーティションの動作

データが Hive 形式のパーティション分割でレイアウトされている場合、自動ローダーはデータの基になるディレクトリ構造からパーティション列の推論を試みます。 たとえば、ファイル パスが base_path/event=click/date=2021-04-01/f0.json である場合、dateevent がパーティション列として推論されます。 基になるディレクトリ構造に競合する Hive パーティションが含まれている場合、または Hive 形式のパーティション分割が含まれていない場合、パーティション列は無視されます。

バイナリ ファイル形式 (binaryFile) と text ファイル形式では、データ スキーマが固定されていますが、パーティション列の推論がサポートされています。 Databricks では、これらのファイル形式に対して cloudFiles.schemaLocation 設定をお勧めします。 これにより、潜在的なエラーや情報の損失が回避され、自動ローダーが開始されるたびにパーティション列が推論されるのを防ぐことができます。

スキーマの展開では、パーティション列は考慮されません。 base_path/event=click/date=2021-04-01/f0.json のような初期ディレクトリ構造があり、base_path/event=click/date=2021-04-01/hour=01/f1.json として新しいファイルの受信を開始した場合、自動ローダーは hour 列を無視します。 新しいパーティション列の情報を取得するには、cloudFiles.partitionColumnsevent,date,hour に設定します。

注意

cloudFiles.partitionColumns オプションは、列名のコンマ区切り一覧を取得します。 ディレクトリ構造に key=value ペアとして存在する列のみが解析されます。

復旧されたデータ列とは

自動ローダーによってスキーマが推論されると、復旧されたデータ列が _rescued_data としてスキーマに自動的に追加されます。 オプション rescuedDataColumn を設定することにより、列の名前を変更することも、スキーマを提供する場合に列を含めることができます。

復旧されたデータ列を使用すると、スキーマと一致しない列が削除される変わりに、復旧されます。 復旧されたデータ列には、次の理由で解析されないデータが含まれています。

  • スキーマに列がない。
  • 型が一致しない。
  • 大文字と小文字が一致しない。

復旧されたデータ列には、復旧された列と、レコードのソース ファイル パスを含む JSON が含まれています。

注意

JSON および CSV パーサーでは、レコードを解析するときに、PERMISSIVEDROPMALFORMEDFAILFAST の 3 つのモードがサポートされます。 rescuedDataColumn と共に使用すると、データ型の不一致によって、DROPMALFORMED モードでレコードが削除されたり、FAILFAST モードでエラーがスローされたりすることはありません。 不完全であるか形式に誤りがある JSON や CSV などの、破損したレコードのみが削除されたり、エラーをスローしたりします。 JSON または CSV を解析するときに badRecordsPath を使用する場合、rescuedDataColumn を使用すると、データ型の不一致は無効なレコードとは見なされません。 badRecordsPath には、不完全で形式に誤りがある JSON または CSV レコードだけが格納されます。

大文字と小文字が区別される動作を変更する

大文字小文字の区別を有効にしない限り、abcAbcABC の各列は、スキーマ推論の目的で、同じ列と見なされます。 大文字または小文字の選択は任意であり、サンプリングされたデータによって異なります。 スキーマ ヒントを使用すると、使用する文字種 (大文字または小文字) を強制できます。 選択が行われ、スキーマが推論されると、自動ローダーでは、大文字または小文字の選択されなかった方が、スキーマと一致するとは考慮されなくなります。

復旧されたデータ列が有効になっているときは、スキーマの大文字または小文字とは異なる名前が付けられたフィールドが _rescued_data 列に読み込まれます。 この動作を変更するには、オプション readerCaseSensitive を false に設定します。これにより、自動ローダーは大文字と小文字を区別せずにデータを読み取ります。

スキーマヒントを使用してスキーマ推論をオーバーライドする

スキーマ ヒントを使用して、推論されたスキーマに対して知っているか予想されるスキーマ情報を適用できます。 列が特定のデータ型であることがわかっている場合や、さらに一般的なデータ型 (たとえば、integer ではなく double) を選択する場合は、次のように、SQL スキーマ仕様構文を使用して文字列として、列のデータ型に任意の数のヒントを指定できます。

.option("cloudFiles.schemaHints", "tags map<string,string>, version int")

サポートされているデータ型の一覧については、データ型に関するドキュメントを参照してください。

ストリームの開始時に列が存在しない場合は、スキーマ ヒントを使用して、その列を推論されたスキーマに追加することもできます。

スキーマ ヒントを使用したときの動作を確認するために、推論されたスキーマの例を次に示します。

推論されたスキーマは次のとおりです。

|-- date: string
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string
|-- purchase_options: struct
|    |-- delivery_address: string

次のスキーマ ヒントを指定します。

.option("cloudFiles.schemaHints", "date DATE, user_info.dob DATE, purchase_options MAP<STRING,STRING>, time TIMESTAMP")

次が得られます。

|-- date: string -> date
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string -> date
|-- purchase_options: struct -> map<string,string>
|-- time: timestamp

注意

配列とマップのスキーマ ヒントは、Databricks Runtime 9.1 LTS 以降でサポートされています。

スキーマ ヒントを使用したときの動作を確認するために、複雑なデータ型で推論されたスキーマの例を次に示します。

推論されたスキーマは次のとおりです。

|-- products: array<string>
|-- locations: array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string>
|-- names: map<string,string>
|-- prices: map<string,string>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int

次のスキーマ ヒントを指定します。

.option("cloudFiles.schemaHints", "products ARRAY<INT>, locations.element STRING, users.element.id INT, ids MAP<STRING,INT>, names.key INT, prices.value INT, discounts.key.id INT, descriptions.value.content STRING")

次が得られます。

|-- products: array<string> -> array<int>
|-- locations: array<int> -> array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string -> int
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string> -> map<string,int>
|-- names: map<string,string> -> map<int,string>
|-- prices: map<string,string> -> map<string,int>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string -> int
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int -> string

注意

スキーマ ヒントが使用されるのは、自動ローダーにスキーマが提供されて "いない" 場合だけです。 cloudFiles.inferColumnTypes が有効か無効かに関係なく、スキーマ ヒントを使用できます。