このページでは、データの読み取りと書き込みを行う Spark API で使用できる入力オプションと出力オプションの一覧を示します。
DataFrameReader オプション
これらのオプションは、DataFrameReader.option()、DataFrameReader.options()、read_files、COPY INTO、および Auto Loader と共に使用Azure Databricksデータ ファイルの読み取り方法を制御します。
Example
次の例では、JSON ファイルを読み取るためのTrueにmultiLineを設定します。
Python
df = spark.read.format("json").option("multiLine", True).load("/path/to/data")
Scala
val df = spark.read.format("json").option("multiLine", "true").load("/path/to/data")
SQL
SELECT * FROM read_files("/path/to/data", format => "json", multiLine => true)
共通
次のオプションは、すべてのファイル形式に適用されます。
| Key | デフォルト | Description |
|---|---|---|
ignoreCorruptFiles |
false |
破損したファイルを無視するかどうか。 true の場合、破損したファイルが検出されても Spark ジョブは引き続き実行され、読み取られた内容は引き続き返されます。
COPY INTOでは、Delta Lake 履歴のnumSkippedCorruptFiles列にoperationMetricsとして、スキップされた破損したファイルを観察できます。 Databricks Runtime 11.3 LTS 以降で使用できます。 |
ignoreMissingFiles |
false自動ローダーの場合は、COPY INTOのtrue (レガシ) |
行方不明のファイルを無視するかどうかを指定します。 true の場合、不足しているファイルが検出されても Spark ジョブは引き続き実行され、内容は引き続き返されます。 Databricks Runtime 11.3 LTS 以降で使用できます。 |
modifiedAfter |
None | 指定されたタイムスタンプの後に変更タイムスタンプを持つファイルのみを取り込むためのフィルターとしてのオプションのタイムスタンプ。 |
modifiedBefore |
None | 指定されたタイムスタンプの前に変更タイムスタンプを持つファイルのみを取り込むためのフィルターとしてのオプションのタイムスタンプ。 |
pathGlobFilter または fileNamePattern |
None | ファイルを選択するために使用できる可能性のある glob パターン。
COPY INTO (レガシ) でのPATTERNに相当します。
fileNamePattern では read_files を使用できます。 |
recursiveFileLookup |
false |
true場合、このオプションは、名前が date=2019-07-01 のようなパーティションの名前付けスキームに従っていない場合でも、入れ子になったディレクトリを検索します。 |
アブロ
| Key | デフォルト | Description |
|---|---|---|
avroSchema |
None | ユーザーによって Avro 形式で指定される省略可能なスキーマ。 Avro を読み取る場合、このオプションは互換性があるが、実際の Avro スキーマとは異なる進化したスキーマに設定できます。 逆シリアル化スキーマは、進化したスキーマと一致します。 たとえば、既定値を持つ追加の列を 1 つ含む進化したスキーマを設定した場合、読み取り結果にも新しい列が含まれます。 |
avroSchemaEvolutionMode |
none |
スキーマ レジストリを使用するときにスキーマの進化を処理する方法。 有効な値: none (スキーマの変更を無視してジョブを続行)、 restart (スキーマの変更が検出され、 UnknownFieldException が発生し、ジョブの再起動が必要な場合)。 |
datetimeRebaseMode |
LEGACY |
ユリウス暦と予期的グレゴリオ暦の間の日付値とタイムスタンプ値のリベースを制御します。 有効な値: EXCEPTION、LEGACY、CORRECTED。 |
enableStableIdentifiersForUnionType |
false |
Avro 共用体型に安定したフィールド名を使用するかどうか。 有効にすると、共用体の型フィールド名は、小文字の型名から派生します (たとえば、 member_int、 member_string)。 2 つの型名が小文字の後で同一の場合、例外をスローします。 |
mergeSchema |
false |
複数のファイル全体でスキーマを推論するか、各ファイルのスキーマをマージするかどうか。 Avro に対して mergeSchema を有効にしても、データ型は緩和されません。 |
mode |
FAILFAST |
破損したレコードを処理するためのパーサー モード。 有効な値: FAILFAST (例外がスローされます)、 PERMISSIVE (形式が正しくないフィールドを null に設定)、 DROPMALFORMED (警告なく無効なレコードを削除します)。 |
readerCaseSensitive |
true |
rescuedDataColumn が有効な場合、大文字と小文字の区別の動作を指定します。 true の場合は、名前がスキーマと大文字と小文字が異なるデータ列を復旧します。 false の場合は、大文字と小文字を区別せずにデータを読み取る。 |
recursiveFieldMaxDepth |
None | 再帰 Avro フィールドの最大再帰深度。 すべての再帰フィールドを切り捨てる 1 に設定し、1 レベルの再帰を許可する 2 など、最大 15に設定します。 設定解除または 0場合、再帰フィールドは許可されません。 有効な値: 15する0。 |
rescuedDataColumn |
None | データ型の不一致とスキーマの不一致 (列の大文字と小文字の区別を含む) が原因で解析できないすべてのデータを別の列に収集するかどうか。 自動ローダーを使用する場合、この列は既定で含まれます。COPY INTO (レガシ) では、COPY INTOを使用してスキーマを手動で設定できないため、復旧されたデータ列はサポートされません。 Databricks では、ほとんどのインジェスト シナリオで自動ローダーを使用することをお勧めします。詳細については、「復旧されたデータ列とは」を参照してください。 |
stableIdentifierPrefixForUnionType |
member_ |
enableStableIdentifiersForUnionType=trueするときに、安定した共用体型のフィールド名に使用するプレフィックス。 |
Csv
| Key | デフォルト | Description |
|---|---|---|
badRecordsPath |
None | 不正な CSV レコードに関する情報を記録するためのファイルを格納するパス。 |
charToEscapeQuoteEscaping |
\0 |
引用符のエスケープに使用する文字をエスケープするために使用する文字。 たとえば、レコードが [ " a\\", b ] の場合は次のようになります。
|
columnNameOfCorruptRecord |
_corrupt_record |
自動ローダーがサポートされています。
COPY INTO (レガシ) ではサポートされていません。形式に誤りがあり、解析できないレコードを格納するための列。 解析の mode を DROPMALFORMED に設定する場合、この列は空になります。 |
comment |
\0 |
テキスト行の先頭に配置した場合に行コメントを表す文字を定義します。 コメントのスキップを無効にするには、'\0' を使用します。 |
dateFormat |
yyyy-MM-dd |
日付文字列を解析するための形式。 |
emptyValue |
空の文字列 | 空の値の文字列表現。 |
enableDateTimeParsingFallback |
false |
指定した形式で値を解析できない場合に、従来の日付とタイムスタンプの解析動作にフォールバックするかどうかを指定します。
falseすると、解析エラーが発生するか、modeに応じて null が生成されます。 |
encoding または charset |
UTF-8 |
CSV ファイルのエンコードの名前。 オプションの一覧については、java.nio.charset.Charset を参照してください。
UTF-16 が UTF-32 の場合、multiline と true を使用することはできません。 |
enforceSchema |
true |
指定または推論されたスキーマを CSV ファイルに強制的に適用するかどうか。 このオプションを有効にすると、CSV ファイルのヘッダーは無視されます。 自動ローダーを使用してデータをレスキューし、スキーマの展開を許可する場合、このオプションは既定では無視されます。 |
escape |
\ |
データの解析時に使用するエスケープ文字。 |
extension |
csv |
予期されるファイル名拡張子。 この拡張子のないファイルは、読み取り中に除外されます。 |
failOnUnknownFields |
false |
CSV レコードにスキーマに存在しない列が含まれている場合に失敗するかどうか。
falseすると、認識されない列は、rescuedDataColumnに応じて自動的に削除または救助されます。 |
failOnWidenedFields |
false |
フィールド値を、拡大せずに宣言されたスキーマ型として解析できない場合に失敗するかどうか。
falseすると、rescuedDataColumnに応じて、型が拡大された値が自動的に復旧されます。
failOnUnknownFields=true設定すると、このオプションの効果をマスクできます。 |
header |
false |
CSV ファイルにヘッダーが含まれているかどうか。 自動ローダーによって、スキーマの推論時にファイルにヘッダーが含まれているものと見なされます。 |
ignoreLeadingWhiteSpace |
false |
解析対象の各値の先頭の空白文字を無視するかどうか。 |
ignoreTrailingWhiteSpace |
false |
解析対象の各値の末尾の空白文字を無視するかどうか。 |
inferSchema |
false |
解析対象の CSV レコードのデータ型を推論するか、すべての列が StringType であると見なすか。
true に設定した場合は、追加でデータを渡す必要があります。 自動ローダーの場合は、代わりに cloudFiles.inferColumnTypes を使います。 |
inputBufferSize |
1048576 (1 MB) |
CSV パーサーのバッファー サイズ (バイト単位)。 大きな CSV ファイルを解析するときにメモリ使用量を調整する場合に便利です。 有効な値: 正の整数。 |
lineSep |
なし。 \r、 \r\n、および \n |
連続する 2 つの CSV レコードの間の文字列。 |
locale |
US |
java.util.Locale 識別子。 CSV 内の既定の日付、タイムスタンプ、および 10 進数の解析に影響します。 |
maxCharsPerColumn |
-1 |
解析する値の予想最大文字数。 メモリ エラーを回避するために使用できます。 既定値は -1 で、無制限を意味します。 有効な値: 正の整数、または無制限の -1 。 |
maxColumns |
20480 |
レコードに含めることができる列数のハード制限。 有効な値: 正の整数。 |
mergeSchema |
false |
複数のファイル全体でスキーマを推論するか、各ファイルのスキーマをマージするかどうか。 スキーマの推論時に、自動ローダーに対して既定で有効になります。 |
mode |
PERMISSIVE |
形式に誤りがあるレコードの処理に関するパーサーのモード。 有効な値: PERMISSIVE、 DROPMALFORMED、 FAILFAST。 |
multiLine |
false |
CSV レコードが複数の行にまたがるかどうか。 |
nanValue |
NaN |
FloatType および DoubleType 列を解析する際の非数値の文字列表現。 |
negativeInf |
-Inf |
FloatType または DoubleType 列を解析する際の負の無限大の文字列表現。 |
nullValue |
空の文字列 | null 値の文字列表現。 |
parserCaseSensitive (非推奨) |
false |
ファイルの読み取り中に、ヘッダーに宣言されている列をスキーマの大文字と小文字の区別に合わせるかどうか。 自動ローダーについては、これは既定で true となります。 有効にすると、大文字と小文字が異なる列は rescuedDataColumn に救出されます。
readerCaseSensitive が優先されるため、このオプションは非推奨となりました。 |
positiveInf |
Inf |
FloatType または DoubleType 列を解析する際の正の無限大の文字列表現。 |
preferDate |
true |
可能な場合、タイムスタンプではなく日付として文字列を推論しようとします。 また、 inferSchema を有効にするか、自動ローダーで cloudFiles.inferColumnTypes を使用して、スキーマ推論を使用する必要があります。 |
quote |
" |
フィールド区切り記号が値に含まれる場合に、値のエスケープに使用する文字。 |
readerCaseSensitive |
true |
rescuedDataColumn が有効な場合、大文字と小文字の区別の動作を指定します。 true の場合は、名前がスキーマと大文字と小文字が異なるデータ列を復旧します。 false の場合は、大文字と小文字を区別せずにデータを読み取る。 |
rescuedDataColumn |
None | データ型の不一致とスキーマの不一致 (列の大文字と小文字の区別を含む) が原因で解析できないすべてのデータを別の列に収集するかどうか。 自動ローダーを使用する場合、この列は既定で含まれます。 詳細については、「復旧されたデータ列とは」を参照してください。COPY INTO (レガシ) では、COPY INTOを使用してスキーマを手動で設定できないため、復旧されたデータ列はサポートされません。 Databricks では、ほとんどのインジェスト シナリオで自動ローダーを使用することをお勧めします。 |
sep または delimiter |
, |
列の間の区切り文字列。 |
singleVariantColumn |
None | 列名に設定すると、各フィールドを独自の列に解析するのではなく、CSV レコード全体をその名前の単一の VariantType 列に読み取ります。
header=true が必要です。 |
skipRows |
0 |
無視する必要がある CSV ファイルの先頭からの行数 (コメント化された行や空の行を含みます)。
header が true の場合、ヘッダーは最初にスキップされていない行とコメントされていない行になります。 有効な値: 正の整数または 0。 |
timeFormat |
HH:mm:ss |
列の値 TimeType 解析するための形式。 |
timestampFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] |
タイムスタンプ文字列を解析するための形式。 |
timestampNTZFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS] |
タイム ゾーン (TimestampNTZType) 文字列を含まないタイムスタンプを解析するための形式。 |
timeZone |
None | タイムスタンプと日付を解析するときに使用する java.time.ZoneId。 |
unescapedQuoteHandling |
STOP_AT_DELIMITER |
エスケープされていない引用符を処理するための方策。 使用可能なオプション:
|
Excel
| Key | デフォルト | Description |
|---|---|---|
dataAddress |
None | Excel構文で読み取るセル範囲。 省略した場合は、最初のシートからすべての有効なセルを読み取ります。
"SheetName!C5:H10"を使用して、名前付きシートから範囲を読み取るか、最初のシートから範囲を読み取"C5:H10"するか、特定のシートからすべてのデータを読み取"SheetName"します。 |
headerRows |
0 |
列名ヘッダーとして使用する初期行の数。
dataAddressを指定すると、セル範囲内に適用されます。
0すると、列名は_c1、_c2、_c3などとして自動生成されます。有効な値: 0、1。 |
ignoreMissingSheet |
false |
dataAddressで指定されたシートを含まないファイルをサイレント スキップするかどうかを指定します。
falseすると、ファイルに要求されたシートがない場合にエラーがスローされます。 シート名が dataAddressで指定されている場合にのみ適用されます。 有効な値: true、 false。 |
includePhoneticRuns |
false |
XLSX ファイルを読み取るときに、セル文字列値に連結されたふりがな (pinyin やふりがななど) 発音注釈を含めるかどうか。 有効な値: true、 false。 |
operation |
readSheet |
Excel ブックに対して実行する操作。 有効な値: readSheet (シートからデータを読み取る)、 listSheets (各シートの sheetIndex: long フィールドと sheetName: String を持つ構造体を返します)。 |
timestampNTZFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS] |
Excelに文字列として格納されるタイムスタンプなしのタイムゾーン値のカスタム書式指定文字列。 カスタム日付形式は Datetime パターンの形式に従います。 |
dateFormat |
yyyy-MM-dd |
Dateとして読み取られた文字列値のカスタム書式指定文字列。 カスタム日付形式は Datetime パターンの形式に従います。 |
Json
| Key | デフォルト | Description |
|---|---|---|
allowBackslashEscapingAnyCharacter |
false |
バックスラッシュを使用して、後続の任意の 1 文字をエスケープすることを許可するかどうか。 有効にしない場合は、JSON の仕様に明示されている文字のみをエスケープできます。 |
allowComments |
false |
解析対象のコンテンツ内で Java、C、および C++ スタイルのコメント ('/'、'*'、および '//' の種類) の使用を許可するかどうか。 |
allowNonNumericNumbers |
true |
非数値 (NaN) トークンのセットを有効な浮動小数点数値として許可するかどうか。 |
allowNumericLeadingZeros |
false |
追加の (無視できる) ゼロで始まる整数値を許可するかどうか (例: 000001)。 |
allowSingleQuotes |
true |
単一引用符 (アポストロフィ、'\' 文字) を使用して、文字列 (名前と文字列値) を囲むことを許可するかどうか。 |
allowUnquotedControlChars |
false |
JSON 文字列に、エスケープされていない制御文字 (タブや改行文字など、値が 32 未満の ASCII 文字) を含めることを許可するかどうか。 |
allowUnquotedFieldNames |
false |
引用符で囲まれていないフィールド名の使用を許可するかどうか。これは JavaScript では許可されますが、JSON 仕様では許可されません。 |
alternateVariantEncoding |
None | ソース JSON の Variant 値に使用されるエンコード。 インライン JSON として格納されるのではなく、Base85 でエンコードされた Variant 値をデコードするには、 Z85 に設定します。 |
badRecordsPath |
None | 不正な JSON レコードに関する情報を記録するためのファイルを格納するパス。 ファイル ベースのデータ ソースで badRecordsPath オプションを使用する場合、次の制限があります。
|
columnNameOfCorruptRecord |
_corrupt_record |
形式に誤りがあり、解析できないレコードを格納するための列。 解析の mode を DROPMALFORMED に設定する場合、この列は空になります。 |
dateFormat |
yyyy-MM-dd |
日付文字列を解析するための形式。 |
dropFieldIfAllNull |
false |
スキーマの推論中に、すべて null 値の列または空の配列および構造体を無視するかどうか。 |
encoding または charset |
UTF-8 |
JSON ファイルのエンコードの名前。 オプションの一覧については、java.nio.charset.Charset を参照してください。
UTF-16 が UTF-32 の場合、multiline と true を使用することはできません。 |
inferTimestamp |
false |
タイムスタンプ文字列を TimestampType として推論を試みるかどうか。
trueに設定すると、スキーマの推論に著しく長い時間がかかる場合があります。 自動ローダーで使うには cloudFiles.inferColumnTypes を有効にする必要があります。 |
lineSep |
なし。 \r、 \r\n、および \n |
連続する 2 つの JSON レコードの間の文字列。 |
locale |
US |
java.util.Locale 識別子。 JSON 内の既定の日付、タイムスタンプ、および 10 進数の解析に影響します。 |
maxNestingDepth |
500 |
JSON オブジェクトと配列で許容される入れ子の深さの最大値。 深く入れ子になったドキュメントの場合は、この値を大きくします。 有効な値: 正の整数。 |
maxNumLen |
1000 |
JSON 入力内のトークン数の最大長。 大きな数値リテラルを含む JSON の場合は、この値を増やします。 有効な値: 正の整数。 |
maxStringLen |
無制限 | JSON 入力内の文字列値の最大長。 大きな文字列で JSON を解析するときのメモリ使用量を制限するように設定します。 有効な値: 正の整数。 |
mode |
PERMISSIVE |
形式に誤りがあるレコードの処理に関するパーサーのモード。 有効な値: PERMISSIVE、 DROPMALFORMED、 FAILFAST。 |
multiLine |
false |
JSON レコードが複数の行にまたがるかどうか。 |
prefersDecimal |
false |
可能な場合は float 型や double 型の代わりに DecimalType として文字列を推論しようとします。 また、 inferSchema を有効にするか、自動ローダーで cloudFiles.inferColumnTypes を使用して、スキーマ推論を使用する必要があります。 |
primitivesAsString |
false |
数値やブール値などのプリミティブ型を StringType として推論するかどうか。 |
readerCaseSensitive |
true |
rescuedDataColumn が有効な場合、大文字と小文字の区別の動作を指定します。 true の場合は、名前がスキーマと大文字と小文字が異なるデータ列を復旧します。 false の場合は、大文字と小文字を区別せずにデータを読み取る。 Databricks Runtime 13.3 以降で使用できます。 |
rescuedDataColumn |
None | データ型の不一致またはスキーマの不一致 (列の大文字と小文字の区別を含む) によって解析できないすべてのデータを別の列に収集するかどうか。 自動ローダーを使用する場合、この列は既定で含まれます。 詳細については、「復旧されたデータ列とは」を参照してください。COPY INTO (レガシ) では、COPY INTOを使用してスキーマを手動で設定できないため、復旧されたデータ列はサポートされません。 Databricks では、ほとんどのインジェスト シナリオで自動ローダーを使用することをお勧めします。 |
singleVariantColumn |
None | JSON ドキュメント全体を取り込むかどうか。列の名前として指定された文字列を持つ単一の Variant 列に解析されます。 設定されていない場合、JSON フィールドは独自の列に取り込まれます。 有効な値: 任意の文字列。 |
timestampFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] |
タイムスタンプ文字列を解析するための形式。 |
timestampNTZFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS] |
タイム ゾーン (TimestampNTZType) 文字列を含まないタイムスタンプを解析するための形式。 |
timeZone |
None | タイムスタンプと日付を解析するときに使用する java.time.ZoneId。 |
upgradeExceptionAsBadRecord |
false |
型アップグレード例外 (たとえば、宣言された列型に値を拡大できない場合) を、例外をスローするのではなく、無効なレコードとして扱うかどうか。 |
カフカ
Kafka リーダー オプションの完全な一覧については、「 DataStreamReader Kafka オプション」を参照してください。 次のオプションは、 spark.read.format("kafka")を使用したバッチ読み取りにのみ適用されます。
| Key | デフォルト | Description |
|---|---|---|
endingOffsets |
latest |
読み取りを停止する場所。 有効な値: latest、または各パーティションのオフセットの JSON 文字列 ( {"topicA":{"0":50,"1":-1}}など)。JSON 文字列では、 -1 は最新のオフセットです。
-2(最も早いオフセット) は、終了オフセットとして使用できません。 |
endingOffsetsByTimestamp |
None | タイムスタンプとして指定されたパーティションごとの終了オフセット (ミリ秒単位)。 有効な値: {"topicA":{"0":2000,"1":3000}}など、各パーティションのタイムスタンプの JSON 文字列。 |
endingTimestamp |
None | すべてのパーティションに適用されるグローバル終了タイムスタンプ (ミリ秒単位)。 有効な値: 負以外の整数。 |
オーク
| Key | デフォルト | Description |
|---|---|---|
mergeSchema |
false |
複数のファイル全体でスキーマを推論するか、各ファイルのスキーマをマージするかどうか。 |
寄木細工
| Key | デフォルト | Description |
|---|---|---|
datetimeRebaseMode |
LEGACY |
ユリウス暦と予期的グレゴリオ暦の間の日付値とタイムスタンプ値のリベースを制御します。 有効な値: EXCEPTION、LEGACY、CORRECTED。 |
int96RebaseMode |
LEGACY |
ユリウス暦と予期的グレゴリオ暦の間の INT96 タイムスタンプ値のリベースを制御します。 有効な値: EXCEPTION、LEGACY、CORRECTED。 |
mergeSchema |
false |
複数のファイル全体でスキーマを推論するか、各ファイルのスキーマをマージするかどうか。 |
readerCaseSensitive |
true |
rescuedDataColumn が有効な場合、大文字と小文字の区別の動作を指定します。 true の場合は、名前がスキーマと大文字と小文字が異なるデータ列を復旧します。 false の場合は、大文字と小文字を区別せずにデータを読み取る。 |
rescuedDataColumn |
None | データ型の不一致とスキーマの不一致 (列の大文字と小文字の区別を含む) が原因で解析できないすべてのデータを別の列に収集するかどうか。 自動ローダーを使用する場合、この列は既定で含まれます。 詳細については、「復旧されたデータ列とは」を参照してください。COPY INTO (レガシ) では、COPY INTOを使用してスキーマを手動で設定できないため、復旧されたデータ列はサポートされません。 Databricks では、ほとんどのインジェスト シナリオで自動ローダーを使用することをお勧めします。 |
状態ストア
構造化ストリーミング状態データを読み取るために、 spark.read.format("statestore") またはテーブル値関数 read_statestore これらのオプションを使用します。 「構造化ストリーミング状態情報の読み取り」をご覧ください。
| Key | デフォルト | Description |
|---|---|---|
batchId |
最新のバッチ ID | 読み取り対象のバッチ。 クエリの以前の状態を照会するために使用します。 バッチはコミットされるべきですが、まだクリーンアップされていません。 有効な値: 負以外の整数。 |
operatorId |
0 |
読み取り対象の演算子。 クエリに複数のステートフル演算子がある場合に使用します。 有効な値: 負以外の整数。 |
storeName |
DEFAULT |
読み取り対象の状態ストア名。 ステートフル 演算子に複数の状態ストア インスタンスがある場合に使用します。 ストリーム ストリーム結合には storeName または joinSide を指定する必要がありますが、両方を指定する必要はありません。 有効な値: 任意の文字列。 |
joinSide |
None | ストリーム ストリーム結合の読み取り先となるターゲット側。 ストリーム ストリーム結合には storeName または joinSide を指定する必要がありますが、両方を指定する必要はありません。 有効な値: left、 right。 |
snapshotStartBatchId |
None | 状態を読み取るときに開始点として使用するスナップショットのバッチ ID。 リーダーは、このスナップショットからの変更を batchIdまで再生することで、状態を再構築します。 スナップショットが破損している場合に便利です。
snapshotPartitionIdと共に指定する必要があります。
readChangeFeedでは使用できません。 変更ログのチェックポイント処理が有効になっている HDFS ベースの状態ストアと RocksDB 状態ストアをサポートします。 Databricks Runtime 15.4 LTS 以降で使用できます。 有効な値: 負以外の整数。 |
snapshotPartitionId |
None | 指定した場合、クエリはこのパーティションのみを読み取ります。
snapshotStartBatchIdと共に指定する必要があります。
readChangeFeedでは使用できません。 Databricks Runtime 15.4 LTS 以降で使用できます。 有効な値: 負以外の整数。 |
readChangeFeed |
false |
trueすると、changeStartBatchIdとchangeEndBatchIdの間のバッチの指定された範囲にわたって状態の変化を返します。
changeStartBatchId が必要です。
joinSide、batchId、snapshotStartBatchId、またはsnapshotPartitionIdでは使用できません。 Databricks Runtime 16.4 LTS 以降で使用できます。 有効な値: true、 false。詳細については、「 構造化ストリーミング状態の変更の読み取り」を参照してください。 |
changeStartBatchId |
None | 変更フィード範囲の開始バッチ ID。
readChangeFeed が true の場合に必要です。
readChangeFeedが true に設定されている場合にのみ適用されます。 Databricks Runtime 16.4 LTS 以降で使用できます。 有効な値: 負以外の整数。 |
changeEndBatchId |
最新のバッチ ID | 変更フィード範囲の終了バッチ ID。
changeStartBatchId以上である必要があります。
readChangeFeedが true に設定されている場合にのみ適用されます。 Databricks Runtime 16.4 LTS 以降で使用できます。 有効な値: 負以外の整数。 |
stateVarName |
None | 読み取る状態変数名。 状態変数名は、transformWithState 演算子によって使用されるStatefulProcessorのinit関数内の各変数の一意の名前です。
transformWithState演算子を使用する場合は必須です。 Databricks Runtime 16.4 LTS 以降で使用できます。 有効な値: 任意の文字列。 |
readRegisteredTimers |
false |
trueすると、transformWithState演算子によって使用される登録済みタイマーを読み取ります。
transformWithState演算子にのみ適用されます。 Databricks Runtime 16.4 LTS 以降で使用できます。 有効な値: true、 false。 |
flattenCollectionTypes |
true |
trueすると、マップとリストの状態変数に対して返されるレコードがフラット化されます。
falseすると、Spark SQL ArrayまたはMapとしてレコードが返されます。
transformWithState演算子にのみ適用されます。 Databricks Runtime 16.4 LTS 以降で使用できます。 有効な値: true、 false。 |
テキスト
| Key | デフォルト | Description |
|---|---|---|
encoding |
UTF-8 |
テキスト ファイルの行区切り記号のエンコードの名前。 オプションの一覧については、「 java.nio.charset.Charset」を参照してください。 ファイルの内容はこのオプションの影響を受けず、as-is読み取られます。 |
lineSep |
なし。 \r、 \r\n 、および \n |
連続する 2 つのテキスト レコード間の文字列。 |
wholeText |
false |
ファイルを単一レコードとして読み取るかどうか。 |
Xml
| Key | デフォルト | Description |
|---|---|---|
rowTag |
None | 行として扱う XML ファイルの行タグ。 XML <books> <book><book>...<books> の例では、適切な値は book です。 これは必須オプションです。 |
samplingRatio |
1.0 |
スキーマ推論に使用される行の割合を定義します。 XML 組み込み関数はこのオプションを無視します。 有効な値: 1.0する0.0。 |
excludeAttribute |
false |
要素内の属性を除外するかどうか。 |
mode |
None | 解析中に破損したレコードを処理するモードを許可します。
PERMISSIVE: 破損したレコードの場合は、columnNameOfCorruptRecord によって構成されたフィールドに形式に誤りがある文字列を格納し、形式に誤りがあるフィールドを null に設定します。 破損したレコードを保持するには、ユーザー定義スキーマで string という名前の columnNameOfCorruptRecord 型フィールドを設定できます。 スキーマにこのフィールドがない場合、破損したレコードは解析中に削除されます。 スキーマを推論すると、パーサーは出力スキーマに columnNameOfCorruptRecord フィールドを暗黙的に追加します。
DROPMALFORMED: 破損したレコードを無視します。 このモードは XML 組み込み関数ではサポートされていません。
FAILFAST: パーサーが破損したレコードを検出すると、例外をスローします。 |
inferSchema |
true |
true の場合は、結果として得られる各データフレーム列に対して適切な型を推論しようとします。
false の場合、結果の列はすべて string 型です。 XML 組み込み関数はこのオプションを無視します。 |
columnNameOfCorruptRecord |
spark.sql.columnNameOfCorruptRecord |
PERMISSIVE モードで作成された形式の正しくない文字列を含む新しいフィールドの名前を変更できます。 |
attributePrefix |
None | 属性と要素を区別するための属性のプレフィックス。 これはフィールド名のプレフィックスになります。 既定値は _ です。 XML の読み取り時は空にすることができますが、書き込み時は空にすることはできません。
DataFrameWriter XML オプションにも適用されます。 |
valueTag |
_VALUE |
属性または子要素の要素も持つ要素内の文字データに使用されるタグ。 ユーザーがスキーマで valueTag フィールドを指定することもできますが、文字データが他の要素や属性と一緒に要素に存在する場合、スキーマ推論中に自動的に追加されます。
DataFrameWriter XML オプションにも適用されます。 |
encoding |
UTF-8 |
読み取りの場合は、指定されたエンコードの種類で XML ファイルをデコードします。 書き込みの場合は、保存される XML ファイルのエンコード (文字セット) を指定します。 XML 組み込み関数はこのオプションを無視します。 DataFrameWriter XML オプションにも適用されます。 |
ignoreSurroundingSpaces |
true |
値を囲む空白をスキップする必要があるかどうか。 空白のみの文字データは無視されます。 |
rowValidationXSDPath |
None | 各行の省略可能な XML を個別に検証するために使用される XSD ファイルへのパス。 検証に失敗した行は、解析エラーのように扱われます。 XSD は、指定または推論されるかどうかにかかわらず、スキーマには影響しません。 |
ignoreNamespace |
false |
true場合、XML 要素と属性に対する名前空間のプレフィックスは無視されます。 たとえば、タグ <abc:author> と <def:author> は、どちらも単なる <author> として扱われます。
rowTag 要素では名前空間を無視することはできないため、その子要素の読み込みのみを取り扱うことが可能です。
false の場合でも、XML 解析は名前空間を認識しません。 |
timestampFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] |
カスタムタイムスタンプの形式文字列であり、datetime パターンに従います。 これは timestamp 型に適用されます。
DataFrameWriter XML オプションにも適用されます。 |
timestampNTZFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS] |
タイムゾーンを含まないタイムスタンプのカスタム形式の文字列で、datetime パターン形式に従っています。 これは TimestampNTZType 型に適用されます。 DataFrameWriter XML オプションにも適用されます。 |
dateFormat |
yyyy-MM-dd |
カスタム日付形式の文字列であり、datetime パターン形式に従います。 これは、日付型に適用されます。 DataFrameWriter XML オプションにも適用されます。 |
locale |
en-US |
IETF BCP 47 形式の言語タグとしてロケールを設定します。 たとえば、locale は日付とタイムスタンプの解析中に使用されます。 |
nullValue |
糸 null |
null 値の文字列表記を設定します。 これが null である場合、パーサーはフィールドの属性と要素を書き込みません。
DataFrameWriter XML オプションにも適用されます。 |
readerCaseSensitive |
true |
rescuedDataColumn が有効な場合、大文字と小文字の区別の動作を指定します。 true の場合は、名前がスキーマと大文字と小文字が異なるデータ列を復旧します。 false の場合は、大文字と小文字を区別せずにデータを読み取る。 |
rescuedDataColumn |
None | データ型の不一致とスキーマの不一致 (列の大文字と小文字の区別を含む) のために解析できないすべてのデータを別の列に収集するかどうか。 自動ローダーを使用する場合、この列は既定で含まれます。 詳細については、「復旧されたデータ列とは」を参照してください。
COPY INTO (レガシ) では、COPY INTOを使用してスキーマを手動で設定できないため、復旧されたデータ列はサポートされません。 Databricks では、ほとんどのインジェスト シナリオで自動ローダーを使用することをお勧めします。 |
singleVariantColumn |
none |
1 つのバリアント列の名前を指定します。 このオプションが読み取り用に指定されている場合は、指定されたオプション文字列値を列の名前として使用して、XML レコード全体を 1 つの Variant 列に解析します。 このオプションが書き込み用に指定されている場合は、単一の Variant 列の値を XML ファイルに書き込みます。 DataFrameWriter XML オプションにも適用されます。 |
useLegacyXMLParser |
true |
レガシ XML パーサーを使用するかどうか。 レガシ パーサーでは、形式が正しくないコンテンツに対する検証の厳格さが低くなりますが、メモリ効率は低くなります。 より厳密な既定のパーサーをオプトインするには、 false に設定します。 |
wildcardColName |
xs_any |
ワイルドカード (xs:any) スキーマ要素に一致する XML 要素をキャプチャするために使用される列名。
rescuedDataColumnと一緒に使用することはできません。 |
DataStreamReader オプション
これらのオプションを DataStreamReader.option() と共に使用して、Delta Lake テーブルやその他のファイル ベースのソースからのストリーミング読み取りを構成します。
ファイル形式のオプション (JSON、CSV、Parquet など) については、「 DataFrameReader オプション」を参照してください。
自動ローダー (cloudFiles.*) オプションについては、「 自動ローダー」を参照してください。
Example
次の例では、Delta Lake テーブル ストリームの10にmaxFilesPerTriggerを設定します。
Python
df = spark.readStream.format("delta").option("maxFilesPerTrigger", 10).load("/path/to/delta-table")
Scala
val df = spark.readStream.format("delta").option("maxFilesPerTrigger", "10").load("/path/to/delta-table")
共通
Delta Lake テーブルとその他のファイル ベースのストリーミング ソースには、次のオプションが適用されます。
| Key | デフォルト | Description |
|---|---|---|
cleanSource |
off |
ソース ファイルがストリームによって処理された後で処理する方法。 有効な値: off (アクションなし)、 delete (ソース ファイルを完全に削除)、 archive ( sourceArchiveDir に移動)。
archiveに設定する場合は、sourceArchiveDirも設定する必要があります。 Delta Lake テーブル ストリーミングには適用されません。 |
fileNameOnly |
false |
既に処理されているファイルを、完全なパスではなくファイル名のみで識別するかどうか。
trueすると、同じファイル名を持つ異なるパスにあるファイルは同じファイルとして扱われ、再処理されません。 Delta Lake テーブル ストリーミングには適用されません。 |
latestFirst |
false |
各マイクロバッチ内で、最後に変更されたファイルを最初に処理するかどうか。 可能な限り迅速に最新のデータを処理する場合に便利です。
trueとmaxFilesPerTriggerまたはmaxBytesPerTriggerが設定されている場合、maxFileAgeは無視されます。 Delta Lake テーブル ストリーミングには適用されません。 |
maxBytesPerTrigger |
None | マイクロバッチごとに処理されるデータ量のソフト最大値。 最小の入力ユニットが上限を超えると、バッチが制限を超えて処理される場合があります。
maxFilesPerTriggerと共に使用すると、マイクロバッチは、いずれかの制限に最初に達するまでデータを処理します。 有効な値: 正の整数。自動ローダーの場合は、代わりに cloudFiles.maxBytesPerTrigger を使います。
「共通」を参照してください。 |
maxCachedFiles |
10000 |
後続のマイクロバッチ用にキャッシュする未処理のファイルの最大数。 キャッシュをオフにするには、 0 に設定します。 ソース ディレクトリにトリガーごとに多数の新しいファイルが含まれている場合は、この値を大きくします。 Delta Lake テーブル ストリーミングには適用されません。 有効な値: 正の整数または 0。 |
maxFileAge |
7d |
現在のシステム時刻ではなく、最近変更されたファイルのタイムスタンプを基準とした、処理対象と見なされるファイルの最大有効期間。 このしきい値より古いファイルは無視されます。
7dや4hなどの期間文字列を受け入れます。
latestFirstがtrueされ、maxFilesPerTriggerまたはmaxBytesPerTriggerが設定されている場合は無視されます。 Delta Lake テーブル ストリーミングには適用されません。 |
maxFilesPerTrigger |
1000 Delta Lake と自動ローダー用。 その他のファイル ベースのソースに対する最大値はありません。 |
各マイクロバッチで処理される新しいファイルの数の上限。
maxBytesPerTriggerと共に使用すると、マイクロバッチは、いずれかの制限に最初に達するまでデータを処理します。 有効な値: 正の整数。自動ローダーの場合は、代わりに cloudFiles.maxFilesPerTrigger を使います。
「共通」を参照してください。 |
sourceArchiveDir |
None |
cleanSourceが archive に設定されている場合のアーカイブ ディレクトリへのパス。 ソース ファイルは、処理後にこのパスに移動され、相対ディレクトリ構造が維持されます。 Delta Lake テーブル ストリーミングには適用されません。 |
自動ローダー
これらのオプションを cloudFiles ソースと共に使用して、クラウド ストレージからのインジェストをストリーミングするように 自動ローダー を構成します。
cloudFiles ソースに固有のオプションには、他の構造化ストリーミング ソース オプションとは別の名前空間に保持するために、cloudFilesが付いています。
共通
| Key | デフォルト | Description |
|---|---|---|
cloudFiles.allowOverwrites |
false |
入力ディレクトリ ファイルの変更による既存のデータの上書きを許可するかどうか。 構成に関する注意事項については、「 ファイルが追加または上書きされたときに、自動ローダーによってファイルが再び処理されますか?」を参照してください。 |
cloudFiles.backfillInterval |
None | 自動ローダーは、特定の間隔で非同期バックフィルをトリガーできます。 たとえば、毎日バックフィルする 1 day や、毎週バックフィルする 1 week などです。 詳細については、「 cloudFiles.backfillInterval を使用して通常のバックフィルをトリガーする」を参照してください。cloudFiles.useManagedFileEventsが true に設定されている場合は使用しないでください。 |
cloudFiles.cleanSource |
OFF |
入力ディレクトリから処理されたファイルを自動的に削除するかどうかを指定します。
OFF (既定値) に設定すると、ファイルは削除されません。DELETEに設定すると、自動ローダーは処理されてから 30 日後に自動的にファイルを削除します。 これを行うには、自動ローダーにソース ディレクトリへの書き込みアクセス許可が必要です。MOVEに設定すると、自動ローダーは、ファイルが処理されてから 30 日後cloudFiles.cleanSource.moveDestination指定した場所に自動的に移動します。 これを行うには、自動ローダーには、ソース ディレクトリと移動場所に対する書き込みアクセス許可が必要です。ファイルは、 commit_timeテーブル値関数の結果でcloud_files_stateに null 以外の値がある場合に処理されたと見なされます。
テーブル値関数cloud_files_state参照してください。 処理後の 30 日間の追加待機は、 cloudFiles.cleanSource.retentionDurationを使用して構成できます。cloudFiles.cleanSourceを有効にする前に、次の考慮事項を確認してください。
Databricks Runtime 16.4 以降で使用できます。 |
cloudFiles.cleanSource.retentionDuration |
30 days |
処理されたファイルが cleanSourceを使用したアーカイブの候補になるまでの待機時間。
DELETEの場合は 7 日を超える必要があります。
MOVEの最小制限はありません。値は CalendarInterval 文字列です 。 たとえば、 "14 days"、"30 days"、"2 weeks"、または "1 month" です。Databricks Runtime 16.4 以降で使用できます。 |
cloudFiles.cleanSource.moveDestination |
None |
cloudFiles.cleanSourceがMOVEに設定されている場合に処理されたファイルをアーカイブするパス。 クラウド ストレージ パスまたは Unity カタログ ボリューム パス ( /Volumes/my_catalog/my_schema/my_volume/archive/ など) を指定できます。移動場所は次の条件を満たす必要があります。
自動ローダーには、このディレクトリへの書き込みアクセス許可が必要です。 Databricks Runtime 16.4 以降で使用できます。 |
cloudFiles.format |
なし (必須オプション) | ソース パスのデータ ファイル形式。 有効な値は次のとおりです。 |
cloudFiles.includeExistingFiles |
true |
ストリーム処理入力パスに既存のファイルを含めるか、初期セットアップ後に到着した新しいファイルのみを処理するか。 このオプションは、初めてストリームを開始するときにのみ評価されます。 ストリームの再起動後にこのオプションを変更した場合、効果はありません。 |
cloudFiles.inferColumnTypes |
false |
スキーマの推論を利用するときに、正確な列型を推論するかどうか。 既定では、列は JSON および CSV データセットを推論するときに文字列として推論されます。 詳細については、スキーマの推論に関する説明を参照してください。 |
cloudFiles.maxBytesPerTrigger |
None | 各トリガーで処理される新しいバイトの最大数。
10g などのバイト文字列を指定して、各マイクロバッチを 10 GB のデータに制限できます。 これはソフト最大値です。 それぞれ 3 GB のファイルがある場合、Azure Databricks は 1 マイクロバッチで 12 GB を処理します。
cloudFiles.maxFilesPerTrigger と使用すると、Azure Databricks では、cloudFiles.maxFilesPerTrigger または cloudFiles.maxBytesPerTrigger の下限のうち、最初に到達した方までを消費します。 このオプションは、Trigger.Once() (Trigger.Once() は非推奨) と共に使用しても無効です。Databricks Runtime 18.0 以降では、このオプションは動的に構成され、手動で設定する必要はありません。 |
cloudFiles.maxFileAge |
None | 重複排除を目的としてファイル イベントを追跡する期間。 Databricks では、1 時間に数百万のファイルの順序でデータを取り込む場合でない限り、このパラメーターのチューニングは推奨しません。 詳細については、 ファイル イベントの追跡 に関するセクションを参照してください。cloudFiles.maxFileAge のチューニングがアグレッシブすぎると、重複取り込みやファイル欠如など、データ品質の問題を引き起こすことがあります。 そのため、Databricks は cloudFiles.maxFileAge に 90 日間などの控えめな設定を推奨しています。同等のデータ インジェスト ソリューションもこのくらいを推奨しています。 |
cloudFiles.maxFilesPerTrigger |
1000 |
各トリガーで処理される新しいファイルの最大数。
cloudFiles.maxBytesPerTrigger と使用すると、Azure Databricks では、cloudFiles.maxFilesPerTrigger または cloudFiles.maxBytesPerTrigger の下限のうち、最初に到達した方までを消費します。
Trigger.Once() (非推奨) と一緒に使用すると、このオプションは無効です。Databricks Runtime 18.0 以降では、このオプションは動的に構成され、手動で設定する必要はありません。 |
cloudFiles.partitionColumns |
None | ファイルのディレクトリ構造から推論する Hive スタイルのパーティション列のコンマ区切りの一覧。 Hive スタイルのパーティション列は、 <base-path>/a=x/b=1/c=y/file.formatなどの等値記号で結合されたキーと値のペアです。 この例では、パーティション列は a、b、c です。 既定では、スキーマ推論を使用し、データを読み込む <base-path> を指定している場合、これらの列は自動的にスキーマに追加されます。 スキーマを指定すると、自動ローダーにより、これらの列がこのスキーマに含まれると想定されます。 これらの列をスキーマの一部に含めない場合は、"" を指定して、これらの列を無視することができます。 さらに、下の例のような複雑なディレクトリ構造のファイル パスから列を推論するときに、このオプションを使用できます。<base-path>/year=2022/week=1/file1.csv<base-path>/year=2022/month=2/day=3/file2.csv<base-path>/year=2022/month=2/day=4/file3.csvcloudFiles.partitionColumnsとしてyear,month,dayを指定すると、year=2022のfile1.csvが返されますが、month列とday列はnull。month と day は、 file2.csv と file3.csvに対して正しく解析されます。 |
cloudFiles.schemaEvolutionMode |
addNewColumns スキーマが指定されていない場合は none それ以外の場合 |
新しい列がデータで検出された場合にスキーマを展開するモード。 既定では、列は JSON データセットを推論するときに文字列として推論されます。 詳細については、スキーマの展開に関する説明を参照してください。 |
cloudFiles.schemaHints |
None | スキーマの推論中に自動ローダーに提供するスキーマ情報。 詳細については、スキーマ ヒントに関するページを参照してください。 |
cloudFiles.schemaLocation |
なし (スキーマを推論するために必要) | 推論されたスキーマとそれ以降の変更を保存する場所。 詳細については、スキーマの推論に関する説明を参照してください。 |
cloudFiles.useStrictGlobber |
false |
Apache Spark の他のファイル ソースの既定のグロビング動作に一致する厳密な globber を使用するかどうか。 詳細については、「一般的なデータ読み込みパターン」を参照してください。 Databricks Runtime 12.2 LTS 以降で使用できます。 |
cloudFiles.validateOptions |
true |
自動ローダー オプションを検証し、不明なオプションまたは一貫性のないオプションに対してエラーを返すかどうか。 |
ディレクトリの一覧
ファイル通知
必要なクラウドのアクセス許可、セットアップ手順、認証方法など、ファイル通知モードの構成については、「 ファイル通知モードでの自動ローダー ストリームの構成」を参照してください。
| Key | デフォルト | Description |
|---|---|---|
cloudFiles.fetchParallelism |
1 |
キュー サービスからメッセージを取得するときに使用するスレッドの数。cloudFiles.useManagedFileEventsが true に設定されている場合は使用しないでください。 |
cloudFiles.pathRewrites |
None | 複数の S3 バケットからファイル通知を受信する queueUrl を指定し、これらのコンテナー内のデータにアクセスするように構成されたマウント ポイントを使用する場合にのみ必要です。
bucket/key パスのプレフィックスをマウント ポイントで書き換える場合は、このオプションを使用します。 プレフィックスのみ書き換え可能です。 たとえば、構成 {"<databricks-mounted-bucket>/path": "dbfs:/mnt/data-warehouse"}の場合、パス s3://<databricks-mounted-bucket>/path/2017/08/fileA.json は dbfs:/mnt/data-warehouse/2017/08/fileA.jsonに書き換えられます。cloudFiles.useManagedFileEventsが true に設定されている場合は使用しないでください。 |
cloudFiles.resourceTag |
None | 関連するリソースの関連付けと識別に役立つ一連のキーと値のタグ ペア。次に例を示します。cloudFiles.option("cloudFiles.resourceTag.myFirstKey", "myFirstValue") .option("cloudFiles.resourceTag.mySecondKey", "mySecondValue")AWS の詳細については、Amazon SQS コスト割り当てタグと Amazon SNS のタグの構成に関するページを参照してください。 (1) Azure の詳細については、キューとメタデータの名前付けと、 properties.labelsのイベント サブスクリプションに関する情報をご覧ください。 自動ローダーは、これらのキーと値のタグ ペアを JSON にラベルとして保存します。
(1)GCP の詳細については、「ラベル付き使用状況の報告」を参照してください。 (1) cloudFiles.useManagedFileEventsが true に設定されている場合は使用しないでください。 代わりに、クラウド プロバイダー コンソールを使用してリソース タグを設定します。 |
cloudFiles.useManagedFileEvents |
false |
trueに設定すると、自動ローダーはファイル イベント サービスを使用して、外部の場所にあるファイルを検出します。 このオプションは、読み込みパスがファイル イベントが有効になっている外部の場所にある場合にのみ使用できます。
ファイル イベントでファイル通知モードを使用するを参照してください。自動ローダーは前回の実行後に新しいファイルを検出できるため、ファイル イベントはファイル検出で通知レベルのパフォーマンスを提供します。 ディレクトリ一覧とは異なり、このプロセスではディレクトリ内のすべてのファイルを一覧表示する必要はありません。 ファイル イベント オプションが有効になっている場合でも、自動ローダーでディレクトリ 一覧が使用される場合があります。
自動ローダーでこのオプション を指定してディレクトリ一覧を使用 する場合の状況の包括的な一覧については、「ファイル イベントを含む自動ローダーでディレクトリ一覧を使用するタイミング」を参照してください。 Databricks Runtime 14.3 LTS 以降で使用できます。 |
cloudFiles.listOnStart |
false |
trueに設定すると、自動ローダーは、チェックポイントの継続トークンで始まるのではなく、ストリームの開始時に完全なディレクトリ一覧を実行します。
CF_MANAGED_FILE_EVENTS_INVALID_CONTINUATION_TOKENなどのエラーから回復するには、このオプションを使用します。
「CF_MANAGED_FILE_EVENTS_INVALID_CONTINUATION_TOKEN エラーから回復する方法」を参照してください。 |
cloudFiles.useNotifications |
false |
ファイル通知モードを使用して、新しいファイルがあるときを判断するかどうか。
false の場合は、ディレクトリ一覧モードを使用します。 「自動ローダー ファイル検出モードを比較する」を参照してください。cloudFiles.useManagedFileEventsが true に設定されている場合は使用しないでください。 |
(1) 自動ローダーでは、ベスト エフォート ベースで、次のキーと値のタグ ペアが既定で追加されます。
-
vendor:Databricks -
path: データが読み込まれる場所。 ラベル付けの制限のため、GCP では使用できません。 -
checkpointLocation: ストリームのチェックポイントの場所。 ラベル付けの制限のため、GCP では使用できません。 -
streamId: ストリームのグローバルに一意な識別子。
Databricks はこれらのキー名を予約し、それらの値を上書きすることはできません。
クラウド固有
自動ローダーには、ファイル通知モード用にクラウド インフラストラクチャを構成するためのオプションが用意されています。 必要なクラウドのアクセス許可とセットアップ手順については、「 ファイル通知モードで自動ローダー ストリームを構成する」を参照してください。
Aws
[ cloudFiles.useNotifications = true ] を選択し、自動ローダーで通知サービスを設定する場合にのみ、次のオプションを指定します。
| Key | デフォルト | Description |
|---|---|---|
cloudFiles.region |
EC2 インスタンスのリージョン | ソース S3 バケットが存在し、AWS SNS および SQS サービスを作成するリージョン。 |
| Key | デフォルト | Description |
|---|---|---|
cloudFiles.restrictNotificationSetupToSameAWSAccountId |
false |
SNS トピックと同じアカウント内の AWS S3 バケットからのイベント通知のみを許可します。 true の場合、自動ローダーは、SNS トピックと同じアカウント内の AWS S3 バケットからのイベント通知のみを受け入れます。false場合、アクセス ポリシーはクロスアカウント バケットと SNS トピックの設定を制限しません。 これは、SNS トピックとバケットパスが異なるアカウントに関連付けられている場合に便利です。Databricks Runtime 17.2 以降で使用できます。 |
cloudFiles.useNotifications
=
true を選択し、既に設定したキューを自動ローダーで使用する場合にのみ、次のオプションを指定します。
| Key | デフォルト | Description |
|---|---|---|
cloudFiles.queueUrl |
None | SQS キューの URL。 指定がある場合、自動ローダーは独自の AWS SNS および SQS サービスを設定せずに、このキューから直接イベントを消費します。 |
AWS 認証オプション
Databricks サービス資格情報を使用するには、次の認証オプションを指定します。
| Key | デフォルト | Description |
|---|---|---|
databricks.serviceCredential |
None | Databricks のサービス資格情報の名前。 Databricks Runtime 16.1 以降で使用できます。 |
Databricks サービスの資格情報または IAM ロールを使用できない場合は、代わりに次の認証オプションを指定できます。
| Key | デフォルト | Description |
|---|---|---|
cloudFiles.awsAccessKey |
None | ユーザーの AWS アクセス キー ID。
cloudFiles.awsSecretKeyを指定する必要があります。 |
cloudFiles.awsSecretKey |
None | ユーザーの AWS シークレット アクセス キー。
cloudFiles.awsAccessKeyを指定する必要があります。 |
cloudFiles.roleArn |
None | 引き受ける IAM ロールの ARN (必要な場合)。 ロールは、クラスターのインスタンス プロファイルから、または cloudFiles.awsAccessKey と cloudFiles.awsSecretKeyで資格情報を指定することによって引き受けることができます。 |
cloudFiles.roleExternalId |
None |
cloudFiles.roleArn を使用してロールを引き受ける際に指定する識別子。 |
cloudFiles.roleSessionName |
None |
cloudFiles.roleArnを使用してロールを想定するときに使用するオプションのセッション名。 |
cloudFiles.stsEndpoint |
None |
cloudFiles.roleArn を使用してロールを引き受ける際に AWS STS にアクセスするために指定するオプションのエンドポイント。 |
Azure
cloudFiles.useNotifications
=
true を指定し、自動ローダーで通知サービスを設定する場合は、次のすべてのオプションの値を指定する必要があります。
| Key | デフォルト | Description |
|---|---|---|
cloudFiles.resourceGroup |
None | ストレージ アカウントが作成されるAzure リソース グループ。 |
cloudFiles.subscriptionId |
None | リソース グループが作成されるAzure サブスクリプション ID。 |
databricks.serviceCredential |
None | Databricks のサービス資格情報の名前。 Databricks Runtime 16.1 以降で使用できます。 |
Databricks サービスの資格情報を使用できない場合は、代わりに次の認証オプションを指定できます。
| Key | デフォルト | Description |
|---|---|---|
cloudFiles.clientId |
None | サービス プリンシパルのクライアント ID またはアプリケーション ID。 |
cloudFiles.clientSecret |
None | サービス プリンシパルのクライアント シークレット。 |
cloudFiles.connectionString |
None | アカウント アクセス キーまたは Shared Access Signature (SAS) に基づくストレージ アカウントの接続文字列。 |
cloudFiles.tenantId |
None | サービス プリンシパルが作成されるAzure テナント ID。 |
cloudFiles.useNotifications
=
trueを設定し、自動ローダーで既存のキューを使用する場合にのみ、次のオプションを指定します。
| Key | デフォルト | Description |
|---|---|---|
cloudFiles.queueName |
None | Azure キューの名前。 指定した場合、クラウド ファイル ソースは、独自の Azure Event Grid サービスと Queue Storage サービスを設定する代わりに、このキューのイベントを直接消費します。 その場合、databricks.serviceCredential または cloudFiles.connectionString では、キューに対する読み取りアクセス許可のみが必要です。 |
Gcp
自動ローダーは、Databricks サービス資格情報を利用して通知サービスを自動的に設定できます。 Databricks サービス資格情報を使用して作成されたサービス アカウントには、 ファイル通知モードでの自動ローダー ストリームの構成で指定されたアクセス許可が必要です。
| Key | デフォルト | Description |
|---|---|---|
cloudFiles.projectId |
None | GCS バケットが存在するプロジェクトの ID。 Google Cloud Pub/Sub サブスクリプションもこのプロジェクト内に作成されます。 |
databricks.serviceCredential |
None | Databricks のサービス資格情報の名前。 Databricks Runtime 16.1 以降で使用できます。 |
Databricks サービスの資格情報を使用できない場合は、Google サービス アカウントを直接使用できます。 Google サービスのセットアップ に従って、サービス アカウント 想定するようにクラスターを構成するか、次の認証オプションを直接指定できます。
| Key | デフォルト | Description |
|---|---|---|
cloudFiles.client |
None | Google Service Account のクライアント ID。 |
cloudFiles.clientEmail |
None | Google Service Account のメール アドレス。 |
cloudFiles.privateKey |
None | Google サービス アカウント用に生成される秘密キー。 |
cloudFiles.privateKeyId |
None | Google サービス アカウント用に生成された秘密キーの ID。 |
cloudFiles.useNotifications
=
true を選択し、既に設定したキューを自動ローダーで使用する場合にのみ、次のオプションを指定します。
| Key | デフォルト | Description |
|---|---|---|
cloudFiles.subscription |
None | Google Cloud Pub/Sub サブスクリプションの名前。 指定されている場合、クラウド ファイル ソースは、独自の GCS 通知と Google Cloud Pub/Sub サービスを設定する代わりに、このキューからのイベントを消費します。 |
Delta Lake
次のオプションは、 spark.readStreamを使用して Delta Lake テーブルから読み取るときに適用されます。
| Key | デフォルト | Description |
|---|---|---|
allowSourceColumnDrop |
None | 差分テーブルのバージョン番号または "always" に設定すると、ソース テーブル スキーマから列が削除された後もストリームを続行できます。 バージョン番号に設定すると、そのバージョンまでのすべてのスキーマ変更を確認します。
schemaTrackingLocation が必要です。
「Delta Lake 列マッピングを使用して列の名前を変更および削除する」を参照してください。 |
allowSourceColumnRename |
None | 差分テーブルのバージョン番号または "always" に設定すると、ソース テーブル内の列の名前が変更された後もストリームを続行できます。 バージョン番号に設定すると、そのバージョンまでのすべてのスキーマ変更を確認します。
schemaTrackingLocation が必要です。
「Delta Lake 列マッピングを使用して列の名前を変更および削除する」を参照してください。 |
allowSourceColumnTypeChange |
None | 差分テーブルのバージョン番号または "always" に設定すると、ソース テーブルで列の種類が変更された後もストリームを続行できます。 バージョン番号に設定すると、そのバージョンまでのすべてのスキーマ変更を確認します。
schemaTrackingLocation が必要です。 「タイプ拡張」を参照してください。 |
excludeRegex |
None | 正規表現パターン。 パスがパターンと一致するファイルは、ストリーミング読み取りから除外されます。 予期される名前付け規則に準拠していないファイルをフィルターで除外する場合に便利です。 |
failOnDataLoss |
true |
ログの保持 (logRetentionDuration) のためにソース データが削除された場合にストリーミング クエリを失敗させるかどうか。 不足しているデータをスキップして処理を続行するには、 false に設定します。 「タイム トラベル クエリのデータ保持を構成する」を参照してください。 |
ignoreChanges (非推奨) |
false |
Databricks Runtime 11.3 LTS 以前で使用できます。
UPDATE、MERGE INTO、DELETE、OVERWRITEなどの変更操作後に、書き換えられたデータ ファイルを再出力します。 変更されていない行は新しい行と共に出力される可能性があるため、ダウンストリーム コンシューマーは重複を処理する必要があります。 削除はダウンストリームには反映されません。 Databricks Runtime 12.2 LTS 以降の skipChangeCommits に置き換えられました。 |
ignoreDeletes (非推奨) |
false |
パーティション境界でデータを削除するトランザクションを無視します (パーティションの完全な削除のみ)。 パーティション以外の削除、更新、またはその他の変更は処理しません。
skipChangeCommits を代わりに使用します。 |
readChangeFeed または readChangeData |
false |
ストリーミング クエリの変更データ フィードの読み取りを有効にするかどうかを指定します。 有効にすると、ストリームは、追加のメタデータ列を含む行レベルの変更 (挿入、更新、および削除) を出力します。 Azure Databricks で Delta Lake 変更データ フィードを使用する を参照してください。 |
schemaTrackingLocation |
None | Delta Lake がストリーミング読み取りのスキーマ変更を追跡するディレクトリへのパス。 列マッピングが有効になっているテーブルからストリーミングし、スキーマの進化を処理するために allowSourceColumn* オプションを使用する場合に必要です。 ストリーミング クエリの checkpointLocation 内にある必要があります。
「Delta Lake 列マッピングを使用して列の名前を変更および削除する」を参照してください。 |
skipChangeCommits |
false |
既存のレコードを削除または変更するトランザクションを無視し、追加のみを処理します。 Databricks では、変更データ フィードを使用しないほとんどのワークロードに対して、このオプションをお勧めします。 Databricks Runtime 12.2 LTS 以降で使用できます。
skipChangeCommitsを使用したアップストリーム変更コミットのスキップを参照してください。 |
startingTimestamp |
利用可能な最新 | 読み取りを開始するタイムスタンプ。 ストリームは、指定されたタイムスタンプ以降にコミットされたすべてのテーブル変更を読み取ります。 使用可能なすべてのテーブルコミットの前にタイムスタンプがある場合、ストリームは使用可能な最も早いコミットから開始されます。
startingVersionと一緒に使用することはできません。 ストリーミング チェックポイントが既に存在する場合は無視されます。有効な値: "2019-01-01T00:00:00.000Z" などのタイムスタンプ文字列、または "2019-01-01"などの日付文字列。 |
startingVersion |
利用可能な最新 | 読み取りを開始する差分テーブルのバージョン。 ストリームは、指定したバージョン以降にコミットされたすべての変更を読み取ります。 最新の変更からのみ開始する "latest" を指定します。
startingTimestampと一緒に使用することはできません。 ストリーミング チェックポイントが既に存在する場合は無視されます。
テーブル履歴の操作を参照してください。 |
withEventTimeOrder |
false |
初期テーブル スナップショットをイベント時間バケットに分割して、レコードが誤って遅延イベントとしてマークされ、ウォーターマーク付きのステートフル クエリで削除されないようにします。 チェックポイントを削除しないと、初期スナップショット処理が開始された後は変更できません。 Databricks Runtime 11.3 LTS 以降で使用できます。 「データを削除せずに初期スナップショットを処理する」を参照してください。 |
カフカ
spark.readStream.format("kafka")またはspark.read.format("kafka")で次のオプションを使用します。
| Key | デフォルト | Description |
|---|---|---|
assign |
None | 使用する特定のパーティション。
subscribe、subscribePattern、またはassignオプションのいずれかを正確に指定する必要があります。 有効な値: {"topicA":[0,1],"topicB":[2,4]}などの JSON 文字列。 |
failOnDataLoss |
true |
削除されたトピックやオフセットの切り捨てなどによってデータが失われた可能性がある場合にクエリを失敗させるかどうか。 不足しているデータをスキップして続行するには、 false に設定します。 有効な値: true、 false。Databricks は、データが失われた可能性があるかどうかを保守的に見積もります。 ただし、誤ったアラームが発生する可能性があります。 |
fetchoffset.numretries |
3 |
Kafka オフセットのフェッチが失敗したときの再試行回数。 有効な値: 負以外の整数。 |
fetchoffset.retryintervalms |
1000 |
オフセット フェッチの再試行間隔 (ミリ秒単位)。 有効な値: 負以外の整数。 |
groupIdPrefix |
spark-kafka-source (ストリーミング)、 spark-kafka-relation (バッチ) |
自動生成された Kafka コンシューマー グループ ID に使用するカスタマイズされたプレフィックス。
kafka.group.idが明示的に設定されている場合、コネクタはこのオプションを無視します。 有効な値: 任意の文字列。 |
includeHeaders |
false |
Kafka メッセージ ヘッダーを列として出力に含めるかどうか。 有効な値: true、 false。 |
kafkaconsumer.polltimeoutms |
None | Kafka コンシューマー poll() 呼び出しのタイムアウト (ミリ秒)。 有効な値: 正の整数。 |
kafka.bootstrap.servers |
None | Kafka ブローカーの host:port アドレスのコンマ区切りのリスト。 Kafka クライアントの bootstrap.servers プロパティを設定します。Kafka からのデータがない場合は、このブローカーのアドレス一覧で正しくないアドレスを確認してください。 ブローカーのアドレス一覧が正しくない場合は、エラーがない可能性があります。 Kafka クライアントは、ブローカーが最終的に使用可能になると想定し、ネットワーク エラーを受信したときに永久に再試行します。 |
maxRecordsPerPartition |
None | 各 Spark パーティションのレコードの最大数。 設定すると、コネクタは Kafka パーティションを分割して、各 Spark パーティションが最大でこの多くのレコードを読み取るようにします。 有効な値: 正の整数。 このオプションは、 minPartitionsで使用することもできます。 両方のオプションが設定されている場合、Spark はどちらのオプションを使用してもパーティションが増えます。 |
minPartitions |
None | Kafka から読み取る Spark パーティションの最小数。 設定すると、コネクタは大きな Kafka パーティションを分割して並列処理を向上させます。 設定しない場合、Spark は Kafka トピック パーティションごとに 1 つのパーティションを作成します。 データ スキューやピーク時の負荷を処理する場合に便利です。 有効な値: 正の整数。 このオプションは、トリガーごとに Kafka コンシューマーを再初期化します。これは、SSL のパフォーマンスに影響する可能性があります。 |
startingOffsets |
latest (ストリーミング)、 earliest (バッチ) |
クエリが読み取りを開始するオフセット。 有効な値: earliest、 latest、または各パーティションのオフセットの JSON 文字列 ( {"topicA":{"0":23,"1":-2}}など)。 JSON 文字列では、 -1 は最新のオフセットです。
-2 は最も早いオフセットです。ストリーミング クエリの場合、このオプションは新しいクエリの開始時にのみ適用されます。 再開されたクエリでは、常にチェックポイントが使用されます。 クエリ中、新しいパーティションは最も早いオフセットで読み取りを開始します。 バッチ クエリの場合、 latest は許可されません。 |
startingOffsetsByTimestamp |
None | 各パーティションの開始オフセットのリスト。タイムスタンプとしてミリ秒単位で指定されます。 タイムスタンプのオフセットが存在しない場合、クエリの動作は startingOffsetsByTimestampStrategyによって決定されます。 有効な値: {"topicA":{"0":1000,"1":2000}}など、各パーティションのタイムスタンプの JSON 文字列。ストリーミング クエリの場合、このオプションは新しいクエリの開始時にのみ適用されます。 再開されたクエリでは、常にチェックポイントが使用されます。 クエリ中、新しいパーティションは最も早いオフセットで読み取りを開始します。 |
startingOffsetsByTimestampStrategy |
error |
startingOffsetsByTimestampまたはstartingTimestampで指定されたタイムスタンプのオフセットが見つからない場合に使用する戦略。 有効な値: error (例外が発生します)、 latest (使用可能な最新のオフセットが使用されます)。 |
startingTimestamp |
None | すべてのパーティションに適用されるグローバル開始タイムスタンプ (ミリ秒単位)。 タイムスタンプのオフセットが存在しない場合、動作は startingOffsetsByTimestampStrategyによって制御されます。 有効な値: 負以外の整数。 |
subscribe |
None | サブスクライブするトピック。
subscribe、subscribePattern、またはassignオプションのいずれかを正確に指定する必要があります。 有効な値: トピック名のコンマ区切りのリスト。 |
subscribePattern |
None | トピックのサブスクライブに使用されるパターン。
subscribe、subscribePattern、またはassignオプションのいずれかを正確に指定する必要があります。 たとえば、「 topic.* 」のように入力します。 有効な値: 任意のJava正規表現文字列。 |
次のオプションは、 spark.readStream.format("kafka")を使用したストリーミング読み取りにのみ適用されます。
| Key | デフォルト | Description |
|---|---|---|
bytesEstimateWindowLength |
300s |
estimatedTotalBytesBehindLatest メトリックの残りのバイト数を見積もるために使用される時間枠。 有効な値: 10m や 600sなどの期間文字列。 詳しくは、Kafka メトリックを取得するをご覧ください。 |
maxOffsetsPerTrigger |
None | トリガー間隔ごとに処理するオフセットの最大数。 オフセットは、トピック パーティション間で比例して分散されます。 有効な値: 正の整数。 |
maxTriggerDelay |
15m |
トリガーする前に minOffsetsPerTrigger が蓄積されるまで待機する最大時間。 有効な値: 10m や 600sなどの期間文字列。 |
minOffsetsPerTrigger |
None | マイクロバッチをトリガーする前に累積するオフセットの最小数。
maxTriggerDelayに達すると、マイクロバッチは関係なく実行されます。 有効な値: 正の整数。 |
spark.read.format("kafka")を使用したバッチ読み取りにのみ適用されるオフセット オプションについては、「DataFrameReader Kafka オプション」を参照してください。
Kafka クライアント (kafka.*) と認証オプションについては、「 オプション」を参照してください。
DataFrameWriter オプション
これらのオプションを DataFrameWriter.option() および DataFrameWriterV2.option() と共に使用して、データAzure Databricks書き込む方法を制御します。
Example
次の例では、Delta Lake テーブルを書き込むためのTrueにmergeSchemaを設定します。
Python
df.write.format("delta").option("mergeSchema", True).saveAsTable("my_table")
Scala
df.write.format("delta").option("mergeSchema", "true").saveAsTable("my_table")
アブロ
| Key | デフォルト | Description |
|---|---|---|
avroSchema |
None | JSON 文字列としての完全な Avro スキーマ。 Spark SQL 型を特定の Avro 型に変換するには、このオプションを使用します。 Avro ファイルに適用されます。 |
avroSchemaUrl |
None | Avro スキーマ ファイルを指す URL。 スキーマが外部に格納されている場合は、 avroSchema の代わりに使用します。
avroSchemaと相互に排他的です。
Avro ファイルに適用されます。 |
compression |
snappy |
書き込み時に使用する圧縮コーデック。 有効な値: uncompressed、 deflate、 snappy、 bzip2、 xz、 zstandard。
Avro ファイルに適用されます。 |
recordName |
topLevelRecord |
出力 Avro スキーマの最上位レベルのレコード名。 Avro ファイルに適用されます。 |
positionalFieldMatching |
false |
Spark スキーマと Avro スキーマの間の列を名前ではなくフィールドの位置で照合するかどうか。 Avro ファイルに適用されます。 |
recordNamespace |
空の文字列 | 出力 Avro スキーマの最上位レコードの名前空間。 Avro ファイルに適用されます。 |
Delta Lake と Apache Iceberg
| Key | デフォルト | Description |
|---|---|---|
clusterByAuto |
false |
自動液体クラスタリングを有効にするかどうか。Azure Databricksクエリ パターンに基づいてクラスタリング列が選択されます。
mode("overwrite")でのみ有効です。
append モードでは使用できません。 Databricks Runtime 16.4 以降で使用できます。
[テーブルに液体クラスタリングを使用する] に適用されます。 |
mergeSchema |
None | 書き込み操作でスキーマの進化を有効にするかどうかを指定します。 ソース DataFrame の新しい列がターゲット テーブル スキーマに追加されます。 バッチおよびストリーミングの追加に適用されます。 テーブル スキーマの更新に適用されます。 |
overwriteSchema |
None | 上書き時にテーブル スキーマとパーティション分割を置き換えるかどうか。
mode("overwrite")なしでreplaceWhereが必要です。
partitionOverwriteModeでは使用できません。
テーブル スキーマの更新に適用されます。 |
partitionOverwriteMode |
None | パーティション上書きモード。 これを dynamic に設定すると、新しいデータを含むパーティションのみが上書きされ、他のすべてのパーティションは変更されません。 レガシ モード。サーバーレス コンピューティングまたは Databricks SQL ではサポートされていません。 有効な値: static、 dynamic。 Delta Lake を使用して データを選択的に上書きする場合に適用されます。 |
replaceOn |
None | ソース クエリの行に置き換えるターゲット テーブル内の行と一致するブール式。 ターゲット テーブルとソース クエリの両方の列を参照できます。 ソース行と一致するターゲット内の行は削除され、置き換えられます。 ソースが空の場合、削除は行われません。 列参照のあいまいさを解消するには、 targetAlias を使用します。 Databricks Runtime 17.1 以降で使用できます。 Delta Lake を使用して データを選択的に上書きする場合に適用されます。 |
replaceUsing |
None | ターゲット テーブルとソース クエリの間の行を照合するために使用される列名のコンマ区切りのリスト。 ターゲットとソースの両方に、一覧表示されているすべての列が含まれている必要があります。 等値比較でソース行と一致するターゲット内の行は削除され、置き換えられます。
NULL 値は等しくないとして扱われ、一致しません。 Databricks Runtime 16.3 以降で使用できます。 Delta Lake を使用して データを選択的に上書きする場合に適用されます。 |
replaceWhere |
None | 述語式。 述語に一致するレコードのみをアトミックに上書きします。 Delta Lake を使用して データを選択的に上書きする場合に適用されます。 |
targetAlias |
None | ターゲット テーブルの文字列エイリアス。 条件がターゲット テーブルとソース クエリの両方の列を参照する場合に、 replaceOn または replaceWhere を使用して列参照を明確にします。 Delta Lake を使用して データを選択的に上書きする場合に適用されます。 |
txnAppId |
None |
foreachBatch操作でのべき等書き込みのアプリケーションを識別する一意の文字列。 複数の Delta Lake テーブルへの正確な 1 回の書き込みを保証するには、 txnVersion と共に使用します。
べき等テーブル書き込みにforeachBatchを使用するために適用されます。 |
txnVersion |
None |
foreachBatch操作でのべき等書き込みのトランザクション バージョンとして使用される単調に増加する数。 複数の Delta Lake テーブルへの正確な 1 回の書き込みを保証するには、 txnAppId と共に使用します。
べき等テーブル書き込みにforeachBatchを使用するために適用されます。 |
optimizeWrite |
None | この書き込み操作で自動最適化書き込みを有効にするかどうかを指定します。
spark.databricks.delta.optimizeWrite.enabled構成をオーバーライドします。
Azure Databricks?の Delta Lake に適用されます。 |
userMetadata |
None | 書き込み操作のコミット メタデータに追加されたユーザー定義文字列。
DESCRIBE HISTORYの出力に表示されます。
カスタム メタデータを使用したテーブルのエンリッチに適用されます。 |
Csv
| Key | デフォルト | Description |
|---|---|---|
charToEscapeQuoteEscaping |
\0 (無効) |
エスケープ文字が引用符文字と異なる場合にエスケープするために使用される文字。 csv (DataFrameWriter) に適用されます。 |
compression |
none |
書き込み時に使用する圧縮コーデック。 有効な値: none、 bzip2、 gzip、 lz4、 snappy、 deflate、 zstd。
csv (DataFrameWriter) に適用されます。 |
dateFormat |
yyyy-MM-dd |
日付列の値の書式指定文字列。 csv (DataFrameWriter) に適用されます。 |
emptyValue |
空の文字列 | 空の (null 以外の) 値に対して書き込まれた文字列。 csv (DataFrameWriter) に適用されます。 |
encoding |
UTF-8 |
出力ファイルの文字エンコード。 csv (DataFrameWriter) に適用されます。 |
escape |
\ |
引用符で囲まれた値をエスケープするために使用される文字。 csv (DataFrameWriter) に適用されます。 |
escapeQuotes |
true |
引用符で囲まれたフィールド値内の引用符文字をエスケープするかどうか。 csv (DataFrameWriter) に適用されます。 |
header |
false |
出力の最初の行として列名を書き込むかどうか。 csv (DataFrameWriter) に適用されます。 |
ignoreLeadingWhiteSpace |
false |
書き込み時に先頭の空白を値からトリミングするかどうか。 csv (DataFrameWriter) に適用されます。 |
ignoreTrailingWhiteSpace |
false |
書き込み時に値から末尾の空白をトリミングするかどうかを指定します。 csv (DataFrameWriter) に適用されます。 |
lineSep |
\n |
レコード間で使用される行区切り文字列。 csv (DataFrameWriter) に適用されます。 |
locale |
en-US |
java.util.Locale 識別子。 書き込み時の日付とタイムスタンプの値の書式設定に影響します。 |
nullValue |
空の文字列 | null 値に対して書き込まれた文字列。 csv (DataFrameWriter) に適用されます。 |
quote |
" |
区切り記号を含むフィールド値を引用符で囲む文字。 csv (DataFrameWriter) に適用されます。 |
quoteAll |
false |
内容に関係なく、すべてのフィールド値を引用符で囲むかどうか。 csv (DataFrameWriter) に適用されます。 |
sep |
, |
フィールド区切り文字。 csv (DataFrameWriter) に適用されます。 |
timestampFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] |
タイムスタンプ列の値の書式指定文字列。 csv (DataFrameWriter) に適用されます。 |
timestampNTZFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS] |
タイム ゾーン (TimestampNTZType) 列の値を含まないタイムスタンプの書式指定文字列。 |
Excel
| Key | デフォルト | Description |
|---|---|---|
dataAddress |
None | 書き込みのシート名または開始セル。 省略した場合、セル A1から始まるSheet1という名前のシートに書き込みます。 シート名 ("SheetName") または単一セル参照 ("SheetName!A1") を受け入れます。 セル範囲は書き込みではサポートされていません。 |
dateFormatInWrite |
yyyy-mm-dd |
Excel c0 /< > 列に適用されるセル書式指定文字列です。 Excel書式構文を使用します。 |
headerRows |
0 |
列名を最初の行として書き込むかどうか。 有効な値: 0、 1。 |
timestampNTZFormat |
yyyy-mm-dd hh:mm:ss |
Excel TimestampNTZ および Timestamp 列に適用されるセル書式指定文字列。 Excel書式構文を使用します。 |
version |
xlsx |
書き込むExcelファイル形式のバージョン。 有効な値: xlsx、 xls。 |
Json
| Key | デフォルト | Description |
|---|---|---|
compression |
none |
書き込み時に使用する圧縮コーデック。 有効な値: none、 bzip2、 gzip、 lz4、 snappy、 deflate、 zstd。
json (DataFrameWriter) に適用されます。 |
dateFormat |
yyyy-MM-dd |
日付列の値の書式指定文字列。 json (DataFrameWriter) に適用されます。 |
encoding |
UTF-8 |
出力ファイルの文字エンコード。 json (DataFrameWriter) に適用されます。 |
ignoreNullFields |
の値 spark.sql.jsonGenerator.ignoreNullFields |
JSON 出力から null 値を持つフィールドを省略するかどうか。 json (DataFrameWriter) に適用されます。 |
lineSep |
\n |
レコード間で使用される行区切り文字列。 json (DataFrameWriter) に適用されます。 |
locale |
en-US |
java.util.Locale 識別子。 書き込み時の日付とタイムスタンプの値の書式設定に影響します。 |
pretty |
false |
美しい (インデントされた複数行の) JSON 出力を有効にするかどうか。 |
sortKeys |
false |
出力で JSON オブジェクトのキーをアルファベット順に並べ替えるかどうか。 確定的な出力を生成する場合に便利です。 |
timestampFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] |
タイムスタンプ列の値の書式指定文字列。 json (DataFrameWriter) に適用されます。 |
timestampNTZFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS] |
タイム ゾーン (TimestampNTZType) 列の値を含まないタイムスタンプの書式指定文字列。 |
writeNonAsciiCharacterAsCodePoint |
false |
出力内のリテラル UTF-8 文字ではなく、非 ASCII 文字 \uXXXX Unicode エスケープ シーケンスとしてエンコードするかどうか。 |
オーク
| Key | デフォルト | Description |
|---|---|---|
compression |
zstd |
書き込み時に使用する圧縮コーデック。 有効な値: none、 uncompressed、 snappy、 zlib、 lzo、 zstd、 lz4、 brotli。
orc (DataFrameWriter) に適用されます。 |
寄木細工
| Key | デフォルト | Description |
|---|---|---|
compression |
snappy |
書き込み時に使用する圧縮コーデック。 有効な値: none、 uncompressed、 snappy、 gzip、 lzo、 brotli、 lz4、 lz4_raw、 zstd。
Parquet (DataFrameWriter) に適用されます。 |
spark.sql.parquet.outputTimestampType |
INT96 |
タイムスタンプ列のエンコードに使用される物理型。 有効な値: INT96、 TIMESTAMP_MICROS、 TIMESTAMP_MILLIS。 標準のタイムスタンプ型をサポートしていない従来の Parquet リーダーとの互換性を保つには、 INT96 を使用します。 |
テキスト
| Key | デフォルト | Description |
|---|---|---|
compression |
none |
書き込み時に使用する圧縮コーデック。 有効な値: none、 bzip2、 gzip、 lz4、 snappy、 deflate、 zstd。 テキスト ( DataFrameWriter) に適用されます。 |
encoding |
UTF-8 |
出力ファイルの文字エンコード。 |
lineSep |
\n |
レコード間で使用される行区切り文字列。 テキスト ( DataFrameWriter) に適用されます。 |
Xml
| Key | デフォルト | Description |
|---|---|---|
arrayElementName |
item |
明示的な名前を持たない配列要素の要素名。 xml (DataFrameWriter) に適用されます。 |
attributePrefix |
_ |
XML 属性に対応するフィールド名の前に付加されるプレフィックス。 xml (DataFrameWriter) に適用されます。 |
compression |
none |
書き込み時に使用する圧縮コーデック。 有効な値: none、 bzip2、 gzip、 lz4、 snappy、 deflate、 zstd。
xml (DataFrameWriter) に適用されます。 |
dateFormat |
yyyy-MM-dd |
日付列の値の書式指定文字列。 xml (DataFrameWriter) に適用されます。 |
declaration |
version="1.0" encoding="UTF-8" standalone="yes" |
各出力ファイルの先頭に書き込まれた XML 宣言文字列。 宣言を抑制する空の文字列に設定します。 xml (DataFrameWriter) に適用されます。 |
encoding |
UTF-8 |
出力ファイルの文字エンコード。 xml (DataFrameWriter) に適用されます。 |
indent |
4 スペース | 出力内の子要素のインデントに使用される文字列。 インデントをオフにし、各行を 1 行に書き込むには、空の文字列に設定します。 |
locale |
en-US |
java.util.Locale 識別子。 書き込み時の日付とタイムスタンプの値の書式設定に影響します。 |
nullValue |
null |
null 値に対して書き込まれた文字列。
nullに設定すると、null フィールドの属性と子要素は省略されます。
xml (DataFrameWriter) に適用されます。 |
rootTag |
ROWS |
出力内のすべての行要素をラップするルート要素タグ。 xml (DataFrameWriter) に適用されます。 |
rowTag |
ROW |
出力内の行を表す要素タグ。 xml (DataFrameWriter) に適用されます。 |
singleVariantColumn |
None | XML ファイルに書き込む 1 つの Variant 列の名前。 xml (DataFrameWriter) に適用されます。 |
timestampFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] |
タイムスタンプ列の値の書式指定文字列。 xml (DataFrameWriter) に適用されます。 |
timestampNTZFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS] |
タイム ゾーン列の値を含まないタイムスタンプの書式指定文字列。 xml (DataFrameWriter) に適用されます。 |
validateName |
true |
列名が有効な XML 要素識別子でない場合に例外をスローするかどうか。 xml (DataFrameWriter) に適用されます。 |
valueTag |
_VALUE |
属性または子要素を持つ XML 要素の文字データに使用されるフィールド名。 xml (DataFrameWriter) に適用されます。 |
DataStreamWriter オプション
ストリーミング書き込みを構成するには、これらのオプションと DataStreamWriter.option() を使用します。
Example
次の例では、ストリームのチェックポイントの場所を設定します。
Python
(df.writeStream
.format("delta")
.option("checkpointLocation", "/path/to/checkpoint")
.start("/path/to/table"))
Scala
df.writeStream
.format("delta")
.option("checkpointLocation", "/path/to/checkpoint")
.start("/path/to/table")
共通
| Key | デフォルト | Description |
|---|---|---|
checkpointLocation |
なし (必須) | ストリーミング クエリのチェックポイント ディレクトリへのパス。 フォールト トレランスと正確に 1 回の処理の保証に必要です。 各ストリーミング クエリでは、一意のチェックポイントの場所を使用する必要があります。 Databricks では、Unity カタログ ボリュームまたはクラウド ストレージ パスにチェックポイントを格納することをお勧めします。 「構造化ストリーミング チェックポイント」を参照してください。 |
path |
None | Parquet などのファイル ベースのストリーミング シンクの出力パス。 ファイル ベースの形式にのみ適用されます。 |
コンソール シンク
| Key | デフォルト | Description |
|---|---|---|
numRows |
20 |
コンソール シンクに書き込むときにマイクロバッチごとに表示する行数。 |
truncate |
true |
行を表示するときに長い文字列を切り捨てるかどうかを指定します。 完全な文字列値を表示するには、 false に設定します。 |
Delta Lake
format("delta")を使用して Delta Lake テーブルにストリームを書き込む場合は、次のオプションが適用されます。
overwriteSchema、replaceWhere、partitionOverwriteModeなどの上書き専用オプションは、ストリーミング書き込みではサポートされていません。
| Key | デフォルト | Description |
|---|---|---|
mergeSchema |
false |
ストリーミング DataFrame に新しい列が含まれている場合に Delta Lake テーブル スキーマを進化させるかどうか。 追加出力モードにのみ適用されます。 テーブル スキーマの更新に適用されます。 |
userMetadata |
None | 書き込み操作のコミット メタデータに追加されたユーザー定義文字列。
DESCRIBE HISTORYの出力に表示されます。
カスタム メタデータを使用したテーブルのエンリッチに適用されます。 |
ファイル シンク
次のオプションは、ストリームをファイル ベースの形式 (Parquet、JSON、CSV、ORC、text) に書き込む場合に適用されます。 形式固有のオプションについては、「 DataFrameWriter オプション」を参照してください。
| Key | デフォルト | Description |
|---|---|---|
retention |
None | フォールト トレランスと圧縮に使用されるシンク メタデータ ファイルを保持する期間。
7 daysや24 hoursなどの時刻文字列を受け入れます。 設定しない場合、メタデータ ファイルは無期限に保持されます。 |
Kafka シンク
Kafka にストリームを書き込むためのオプションの完全な一覧については、「 オプション」を参照してください。
| Key | デフォルト | Description |
|---|---|---|
kafka.bootstrap.servers |
None | 必須。 Kafka ブローカー host:port アドレスのコンマ区切りのリスト。 |
topic |
None | すべての行のターゲット Kafka トピック。 DataFrame に topic 列が含まれていない場合は必須です。 |
kafka.* |
None |
kafka.プレフィックスが付いた Kafka プロデューサー構成。 たとえば、「 kafka.compression.type 」のように入力します。 |
メモリ シンク
| Key | デフォルト | Description |
|---|---|---|
queryName |
なし (必須) | クエリが書き込むメモリ内テーブルの名前。 メモリ シンクに必要です。
.queryName()を介して構成することもできます。 |
mode |
exactlyonce |
メモリ シンクの配信保証。
exactlyonce は、厳密に 1 回のセマンティクスを持つマイクロバッチ モードを使用します。
atleastonce は、少なくとも 1 回のセマンティクスを持つ連続モードを使用します。 有効な値: exactlyonce、 atleastonce。 |
Spark 関数のオプション
一部の Spark SQL 組み込み関数は、解析またはシリアル化の動作を制御する options マップを受け入れます。 オプションを Python dict または Scala Map[String, String] として渡します。
Example
次の例では、形式が正しくないレコードを削除しながら JSON 列を解析します。
Python
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, StringType
schema = StructType([StructField("name", StringType())])
df = df.withColumn("parsed", from_json("json_col", schema, {"mode": "DROPMALFORMED"}))
Scala
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types._
val schema = StructType(Seq(StructField("name", StringType)))
val df = df.withColumn("parsed", from_json(col("json_col"), schema, Map("mode" -> "DROPMALFORMED")))
アブロ
Avro 関数は、対応する DataFrame オプションと同じオプションを受け入れます。
-
from_avroDataFrameReader Avro オプションを使用schema_of_avro。 -
to_avroでは 、DataFrameWriter Avro オプションが使用されます。
Example
次の例では、スキーマの進化が有効になっている Avro 列をデコードします。
Python
from pyspark.sql.functions import from_avro
df = df.withColumn("decoded", from_avro("avro_col", json_schema, {"avroSchemaEvolutionMode": "restart"}))
Scala
import org.apache.spark.sql.avro.functions.from_avro
val df = df.withColumn("decoded", from_avro(col("avro_col"), jsonSchema, Map("avroSchemaEvolutionMode" -> "restart")))
さらに、 from_avro と to_avro のスキーマ レジストリバリアントでは、次のオプションを受け入れます。
| Key | デフォルト | Description |
|---|---|---|
schemaId |
None |
jsonFormatSchemaと互換性のないスキーマでエンコードされた Avro データをデコードするときに使用する Confluent スキーマ レジストリのスキーマ ID。
from_avroにのみ適用されます。 |
confluent.schema.registry.* |
None | Confluent Schema Registry クライアント構成プロパティ。 基本認証資格情報の confluent.schema.registry.basic.auth.user.info など、このプレフィックスを使用して Confluent SR クライアント プロパティを渡します。
from_avroとto_avroのスキーマ レジストリバリアントに必要です。 |
Csv
CSV 関数は、対応する DataFrame オプションと同じオプションを受け入れます。
-
from_csvDataFrameReader CSV オプションを使用schema_of_csv。 -
to_csvでは 、DataFrameWriter CSV オプションが使用されます。
Example
次の例では、カスタム区切り記号と NULL 値を含む CSV を読み取ります。
Python
from pyspark.sql.functions import from_csv
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([StructField("id", IntegerType()), StructField("name", StringType())])
df = df.withColumn("parsed", from_csv("csv_col", schema, {"sep": "|", "nullValue": "N/A"}))
Scala
import org.apache.spark.sql.functions.from_csv
import org.apache.spark.sql.types._
val schema = StructType(Seq(StructField("id", IntegerType), StructField("name", StringType)))
val df = df.withColumn("parsed", from_csv(col("csv_col"), schema, Map("sep" -> "|", "nullValue" -> "N/A")))
Json
JSON 関数は、対応する DataFrame オプションと同じオプションを受け入れます。
-
from_jsonDataFrameReader JSON オプションを使用schema_of_json。 -
to_jsonでは 、DataFrameWriter JSON オプションが使用されます。
Example
次の例では、 NULL フィールドが無視され、整形書式設定が有効になっている JSON を書き込みます。
Python
from pyspark.sql.functions import to_json
df = df.withColumn("json_str", to_json("struct_col", {"pretty": "true", "ignoreNullFields": "true"}))
Scala
import org.apache.spark.sql.functions.to_json
val df = df.withColumn("json_str", to_json(col("struct_col"), Map("pretty" -> "true", "ignoreNullFields" -> "true")))
Protobuf
from_protobuf と to_protobuf はファイルベースのデータソースを使用しません。 Protobuf データは、これらの関数を使用して常にバイナリ列として読み書きされます。 オプションは Map[String, String] として渡され、大文字と小文字が区別されます。
Example
次の例では、PERMISSIVE モードを使用して Protobuf 列をデコードします。
Python
from pyspark.sql.functions import from_protobuf
df = df.withColumn("decoded", from_protobuf("proto_col", "MyMessage", "/path/to/descriptor.desc",
{"mode": "PERMISSIVE", "enums.as.ints": "true"}))
Scala
import org.apache.spark.sql.protobuf.functions.from_protobuf
val df = df.withColumn("decoded", from_protobuf(col("proto_col"), "MyMessage", "/path/to/descriptor.desc",
Map("mode" -> "PERMISSIVE", "enums.as.ints" -> "true")))
Protobuf 関数では、次のオプションを使用します。
| Key | デフォルト | Description |
|---|---|---|
mode |
FAILFAST |
破損したレコードを処理する方法。
FAILFAST で例外がスローされました。
PERMISSIVE は、形式が正しくないフィールドを null に設定します。 有効な値: FAILFAST、 PERMISSIVE。
from_protobufに適用されます。 |
recursive.fields.max.depth |
-1 (無効) |
再帰 Protobuf フィールドの最大再帰深度。 再帰フィールドのサポートをオフにするには、 0 に設定します。 有効な値: 10する0。
from_protobufに適用されます。 |
convert.any.fields.to.json |
false |
STRUCTではなく、Protobuf Anyフィールドを JSON 文字列に変換するかどうかを指定します。
from_protobufに適用されます。 |
emit.default.values |
false |
ゼロまたは既定値 (proto3 セマンティクス) を持つフィールドを出力するかどうか。
falseすると、既定値のフィールドは出力から省略されます。
from_protobufに適用されます。 |
enums.as.ints |
false |
列挙型フィールドを文字列の代わりに整数値としてレンダリングするかどうかを指定します。
from_protobufに適用されます。 |
upcast.unsigned.ints |
false |
整数オーバーフローを防ぐためにuint32をLongにアップキャストし、Decimal(20,0)にuint64するかどうか。
from_protobufに適用されます。 |
unwrap.primitive.wrapper.types |
false |
google.protobufラッパー型 (Int32Value、StringValueなど) を対応するプリミティブ Spark 型にラップ解除するかどうか。
from_protobufに適用されます。 |
retain.empty.message.types |
false |
ダミー列を挿入して、空の Protobuf メッセージ型を出力スキーマに保持するかどうかを指定します。
from_protobufに適用されます。 |
schema.registry.subject |
None | スキーマ レジストリのサブジェクト名。
from_protobufとto_protobufのスキーマ レジストリバリアントを使用する場合に必要です。 |
schema.registry.address |
None | スキーマ レジストリ アドレス (ホストとポート)。
from_protobufとto_protobufのスキーマ レジストリバリアントを使用する場合に必要です。 |
schema.registry.protobuf.name |
None | スキーマ レジストリのサブジェクトに複数のメッセージが含まれている場合に使用する Protobuf メッセージを指定します。 オプション。 |
Xml
XML 関数は、対応する DataFrame オプションと同じオプションを受け入れます。
-
from_xmlDataFrameReader XML オプションを使用schema_of_xml。 -
to_xmlでは 、DataFrameWriter XML オプションが使用されます。
Example
次の例では、カスタム ルートタグと行タグを使用して XML を書き込みます。
Python
from pyspark.sql.functions import to_xml
df = df.withColumn("xml_str", to_xml("struct_col", {"rootTag": "records", "rowTag": "record"}))
Scala
import org.apache.spark.sql.functions.to_xml
val df = df.withColumn("xml_str", to_xml(col("struct_col"), Map("rootTag" -> "records", "rowTag" -> "record")))