自動ローダーでのスキーマの推論と展開の構成
読み込まれたデータのスキーマを自動的に検出するように自動ローダーを構成できます。これにより、データ スキーマを明示的に宣言せずにテーブルを初期化し、新しい列が導入されたときにテーブル スキーマを進化させることができます。 これにより、スキーマの変更を常時手動で追跡して適用する必要がなくなります。
また、自動ローダーでは、予期しないデータ (データ型が異なるデータなど) を JSON BLOB 列に "復旧" することもできます。この場合、後で半構造化データ アクセス API を使用してアクセスすることを選択できます。
スキーマ推論と展開に対しては、以下の形式がサポートされています。
ファイル形式 | サポートされているバージョン |
---|---|
JSON |
すべてのバージョン |
CSV |
すべてのバージョン |
XML |
Databricks Runtime 14.3 LTS 以降 |
Avro |
Databricks Runtime 10.4 LTS 以降 |
Parquet |
Databricks Runtime 11.3 LTS 以降 |
ORC |
サポートされていない |
Text |
該当なし (固定スキーマ) |
Binaryfile |
該当なし (固定スキーマ) |
スキーマの推論と展開の構文
オプション cloudFiles.schemaLocation
にターゲット ディレクトリを指定すると、スキーマの推論と進化が可能になります。 checkpointLocation
に指定したのと同じディレクトリを使用することを選択できます。 Delta Live テーブルを使用する場合、Azure Databricks はスキーマの場所とその他のチェックポイント情報を自動的に管理します。
注意
ターゲット テーブルに複数のソース データの場所が読み込まれている場合、各自動ローダー インジェスト ワークロードには個別のストリーミング チェックポイントが必要です。
次の例では、cloudFiles.format
に parquet
を使用します。 他のファイル ソースには、csv
、avro
、または json
を使用します。 読み取りと書き込みの他のすべての設定の既定の動作は、各形式で同じままです。
Python
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "parquet")
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
)
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "parquet")
// The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
自動ローダー スキーマ推論のしくみ
自動ローダーでは、初めてデータを読み込んだときにスキーマを推論するために、検出された最初の 50 GB と 1000 ファイルのうち、先に上限を超えた方をサンプリングします。 自動ローダーは、入力データに対するスキーマの変更を時間の経過に合わせて追跡するために、構成済みの cloudFiles.schemaLocation
にある _schemas
ディレクトリにスキーマ情報を格納します。
注意
使用するサンプルのサイズを変更するには、次の SQL 構成を設定します。
spark.databricks.cloudFiles.schemaInference.sampleSize.numBytes
(バイト文字列 (例: 10gb
))
and
spark.databricks.cloudFiles.schemaInference.sampleSize.numFiles
(整数)
既定では、自動ローダー スキーマ推論では、型の不一致によるスキーマの進化の問題を回避します。 データ型をエンコードしない形式 (JSON、CSV、XML) の場合、自動ローダーはすべての列を文字列として推論します (JSON ファイル内の入れ子になったフィールドを含む)。 型指定されたスキーマ (Parquet および Avro) を含む形式の場合、自動ローダーはファイルのサブセットをサンプリングし、個々のファイルのスキーマをマージします。 この動作の概要を次の表に示します。
ファイル形式 | 既定の推論されたデータ型 |
---|---|
JSON |
String |
CSV |
String |
XML |
String |
Avro |
Avro スキーマでエンコードされた型 |
Parquet |
Parquet スキーマでエンコードされた型 |
Apache Spark DataFrameReader では、スキーマ推論に異なる動作が使用され、サンプル データに基づいて JSON、CSV、XML ソース内の列のデータ型が選択されます。 自動ローダーでこの動作を有効にするには、オプション cloudFiles.inferColumnTypes
を true
に設定します。
Note
CSV データのスキーマを推論する場合、自動ローダーでは、ファイルにヘッダーが含まれていると見なされます。 CSV ファイルにヘッダーが含まれていない場合は、オプション .option("header", "false")
を指定します。 また、自動ローダーは、サンプル内のすべてのファイルのスキーマをマージして、グローバル スキーマを取得します。 自動ローダーは、ヘッダーに従って各ファイルを読み取り、CSV を正しく解析できます。
Note
2 つの Parquet ファイル内に異なるデータ型の列がある場合、自動ローダーは最も広い種類を選択します。 schemaHints を使用して、この選択をオーバーライドできます。 スキーマ ヒントを指定すると、自動ローダーは列を指定された型にキャストするのではなく、Parquet リーダーに指定した型として列を読み取るように指示します。 不一致が発生した場合、その列は、復旧されたデータ列で復旧されます。
自動ローダー スキーマの進化のしくみ
自動ローダーでは、データを処理する際に新しい列の追加を検出します。 自動ローダーで新しい列が検出されると、ストリームが UnknownFieldException
で停止します。 ストリームからこのエラーがスローされる前に、自動ローダーによって、データの最新のマイクロバッチに対してスキーマ推論が実行され、新しい列をスキーマの末尾にマージすることによって、スキーマの場所が最新のスキーマで更新されます。 既存の列のデータ型は変更されません。
Databricks では、このようなスキーマの変更後に自動的に再起動するように、Databricks ジョブを使用して自動ローダー ストリームを構成することをお勧めします。
自動ローダーでは、スキーマの展開について次のモードがサポートされています。これは、cloudFiles.schemaEvolutionMode
オプションで設定します。
モード | 新しい列を読み取るときの動作 |
---|---|
addNewColumns (既定値) |
ストリームが失敗します。 新しい列がスキーマに追加されます。 既存の列ではデータ型は展開されません。 |
rescue |
スキーマは進化せず、スキーマの変更によりストリームが失敗することはありません。 すべての新しい列が、復旧されたデータ列に記録されます。 |
failOnNewColumns |
ストリームが失敗します。 提供されたスキーマが更新されるか、問題のあるデータ ファイルが削除されない限り、ストリームは再起動しません。 |
none |
スキーマは進化せず、新しい列は無視されます。また、rescuedDataColumn オプションが設定されない限り、データは復旧されません。 スキーマの変更によりストリームが失敗することはありません。 |
自動ローダー使用時のパーティションの動作
データが Hive 形式のパーティション分割でレイアウトされている場合、自動ローダーはデータの基になるディレクトリ構造からパーティション列の推論を試みます。 たとえば、ファイル パスが base_path/event=click/date=2021-04-01/f0.json
である場合、date
と event
がパーティション列として推論されます。 基になるディレクトリ構造に競合する Hive パーティションが含まれている場合、または Hive 形式のパーティション分割が含まれていない場合、パーティション列は無視されます。
バイナリ ファイル形式 (binaryFile
) と text
ファイル形式では、データ スキーマが固定されていますが、パーティション列の推論がサポートされています。 Databricks では、これらのファイル形式に対して cloudFiles.schemaLocation
設定をお勧めします。 これにより、潜在的なエラーや情報の損失が回避され、自動ローダーが開始されるたびにパーティション列が推論されるのを防ぐことができます。
スキーマの展開では、パーティション列は考慮されません。 base_path/event=click/date=2021-04-01/f0.json
のような初期ディレクトリ構造があり、base_path/event=click/date=2021-04-01/hour=01/f1.json
として新しいファイルの受信を開始した場合、自動ローダーは hour 列を無視します。 新しいパーティション列の情報を取得するには、cloudFiles.partitionColumns
を event,date,hour
に設定します。
注意
cloudFiles.partitionColumns
オプションは、列名のコンマ区切り一覧を取得します。 ディレクトリ構造に key=value
ペアとして存在する列のみが解析されます。
復旧されたデータ列とは
自動ローダーによってスキーマが推論されると、復旧されたデータ列が _rescued_data
としてスキーマに自動的に追加されます。 オプション rescuedDataColumn
を設定することにより、列の名前を変更することも、スキーマを提供する場合に列を含めることができます。
復旧されたデータ列を使用すると、スキーマと一致しない列が削除される変わりに、復旧されます。 復旧されたデータ列には、次の理由で解析されないデータが含まれています。
- スキーマに列がない。
- 型が一致しない。
- 大文字と小文字が一致しない。
復旧されたデータ列には、復旧された列と、レコードのソース ファイル パスを含む JSON が含まれています。
注意
JSON および CSV パーサーでは、レコードを解析するときに、PERMISSIVE
、DROPMALFORMED
、FAILFAST
の 3 つのモードがサポートされます。 rescuedDataColumn
と共に使用すると、データ型の不一致によって、DROPMALFORMED
モードでレコードが削除されたり、FAILFAST
モードでエラーがスローされたりすることはありません。 不完全であるか形式に誤りがある JSON や CSV などの、破損したレコードのみが削除されたり、エラーをスローしたりします。 JSON または CSV を解析するときに badRecordsPath
を使用する場合、rescuedDataColumn
を使用すると、データ型の不一致は無効なレコードとは見なされません。 badRecordsPath
には、不完全で形式に誤りがある JSON または CSV レコードだけが格納されます。
大文字と小文字が区別される動作を変更する
大文字小文字の区別を有効にしない限り、abc
、Abc
、ABC
の各列は、スキーマ推論の目的で、同じ列と見なされます。 大文字または小文字の選択は任意であり、サンプリングされたデータによって異なります。 スキーマ ヒントを使用すると、使用する文字種 (大文字または小文字) を強制できます。 選択が行われ、スキーマが推論されると、自動ローダーでは、大文字または小文字の選択されなかった方が、スキーマと一致するとは考慮されなくなります。
復旧されたデータ列が有効になっているときは、スキーマの大文字または小文字とは異なる名前が付けられたフィールドが _rescued_data
列に読み込まれます。 この動作を変更するには、オプション readerCaseSensitive
を false に設定します。これにより、自動ローダーは大文字と小文字を区別せずにデータを読み取ります。
スキーマヒントを使用してスキーマ推論をオーバーライドする
スキーマ ヒントを使用して、推論されたスキーマに対して知っているか予想されるスキーマ情報を適用できます。 列が特定のデータ型であることがわかっている場合や、さらに一般的なデータ型 (たとえば、integer
ではなく double
) を選択する場合は、次のように、SQL スキーマ仕様構文を使用して文字列として、列のデータ型に任意の数のヒントを指定できます。
.option("cloudFiles.schemaHints", "tags map<string,string>, version int")
サポートされているデータ型の一覧については、データ型に関するドキュメントを参照してください。
ストリームの開始時に列が存在しない場合は、スキーマ ヒントを使用して、その列を推論されたスキーマに追加することもできます。
スキーマ ヒントを使用したときの動作を確認するために、推論されたスキーマの例を次に示します。
推論されたスキーマは次のとおりです。
|-- date: string
|-- quantity: int
|-- user_info: struct
| |-- id: string
| |-- name: string
| |-- dob: string
|-- purchase_options: struct
| |-- delivery_address: string
次のスキーマ ヒントを指定します。
.option("cloudFiles.schemaHints", "date DATE, user_info.dob DATE, purchase_options MAP<STRING,STRING>, time TIMESTAMP")
次が得られます。
|-- date: string -> date
|-- quantity: int
|-- user_info: struct
| |-- id: string
| |-- name: string
| |-- dob: string -> date
|-- purchase_options: struct -> map<string,string>
|-- time: timestamp
注意
配列とマップのスキーマ ヒントは、Databricks Runtime 9.1 LTS 以降でサポートされています。
スキーマ ヒントを使用したときの動作を確認するために、複雑なデータ型で推論されたスキーマの例を次に示します。
推論されたスキーマは次のとおりです。
|-- products: array<string>
|-- locations: array<string>
|-- users: array<struct>
| |-- users.element: struct
| | |-- id: string
| | |-- name: string
| | |-- dob: string
|-- ids: map<string,string>
|-- names: map<string,string>
|-- prices: map<string,string>
|-- discounts: map<struct,string>
| |-- discounts.key: struct
| | |-- id: string
| |-- discounts.value: string
|-- descriptions: map<string,struct>
| |-- descriptions.key: string
| |-- descriptions.value: struct
| | |-- content: int
次のスキーマ ヒントを指定します。
.option("cloudFiles.schemaHints", "products ARRAY<INT>, locations.element STRING, users.element.id INT, ids MAP<STRING,INT>, names.key INT, prices.value INT, discounts.key.id INT, descriptions.value.content STRING")
次が得られます。
|-- products: array<string> -> array<int>
|-- locations: array<int> -> array<string>
|-- users: array<struct>
| |-- users.element: struct
| | |-- id: string -> int
| | |-- name: string
| | |-- dob: string
|-- ids: map<string,string> -> map<string,int>
|-- names: map<string,string> -> map<int,string>
|-- prices: map<string,string> -> map<string,int>
|-- discounts: map<struct,string>
| |-- discounts.key: struct
| | |-- id: string -> int
| |-- discounts.value: string
|-- descriptions: map<string,struct>
| |-- descriptions.key: string
| |-- descriptions.value: struct
| | |-- content: int -> string
注意
スキーマ ヒントが使用されるのは、自動ローダーにスキーマが提供されて "いない" 場合だけです。 cloudFiles.inferColumnTypes
が有効か無効かに関係なく、スキーマ ヒントを使用できます。