Spark API オプション リファレンス

このページでは、データの読み取りと書き込みを行う Spark API で使用できる入力オプションと出力オプションの一覧を示します。

DataFrameReader オプション

これらのオプションは、DataFrameReader.option()DataFrameReader.options()read_filesCOPY INTO、および Auto Loader と共に使用Azure Databricksデータ ファイルの読み取り方法を制御します。

Example

次の例では、JSON ファイルを読み取るためのTruemultiLineを設定します。

Python
df = spark.read.format("json").option("multiLine", True).load("/path/to/data")
Scala
val df = spark.read.format("json").option("multiLine", "true").load("/path/to/data")
SQL
SELECT * FROM read_files("/path/to/data", format => "json", multiLine => true)

共通

次のオプションは、すべてのファイル形式に適用されます。

Key デフォルト Description
ignoreCorruptFiles false 破損したファイルを無視するかどうか。 true の場合、破損したファイルが検出されても Spark ジョブは引き続き実行され、読み取られた内容は引き続き返されます。 COPY INTOでは、Delta Lake 履歴のnumSkippedCorruptFiles列にoperationMetricsとして、スキップされた破損したファイルを観察できます。 Databricks Runtime 11.3 LTS 以降で使用できます。
ignoreMissingFiles false自動ローダーの場合は、COPY INTOtrue (レガシ) 行方不明のファイルを無視するかどうかを指定します。 true の場合、不足しているファイルが検出されても Spark ジョブは引き続き実行され、内容は引き続き返されます。 Databricks Runtime 11.3 LTS 以降で使用できます。
modifiedAfter None 指定されたタイムスタンプの後に変更タイムスタンプを持つファイルのみを取り込むためのフィルターとしてのオプションのタイムスタンプ。
modifiedBefore None 指定されたタイムスタンプの前に変更タイムスタンプを持つファイルのみを取り込むためのフィルターとしてのオプションのタイムスタンプ。
pathGlobFilter または fileNamePattern None ファイルを選択するために使用できる可能性のある glob パターン。 COPY INTO (レガシ) でのPATTERNに相当します。 fileNamePattern では read_files を使用できます。
recursiveFileLookup false true場合、このオプションは、名前が date=2019-07-01 のようなパーティションの名前付けスキームに従っていない場合でも、入れ子になったディレクトリを検索します。

アブロ

Key デフォルト Description
avroSchema None ユーザーによって Avro 形式で指定される省略可能なスキーマ。 Avro を読み取る場合、このオプションは互換性があるが、実際の Avro スキーマとは異なる進化したスキーマに設定できます。 逆シリアル化スキーマは、進化したスキーマと一致します。 たとえば、既定値を持つ追加の列を 1 つ含む進化したスキーマを設定した場合、読み取り結果にも新しい列が含まれます。
avroSchemaEvolutionMode none スキーマ レジストリを使用するときにスキーマの進化を処理する方法。 有効な値: none (スキーマの変更を無視してジョブを続行)、 restart (スキーマの変更が検出され、 UnknownFieldException が発生し、ジョブの再起動が必要な場合)。
datetimeRebaseMode LEGACY ユリウス暦と予期的グレゴリオ暦の間の日付値とタイムスタンプ値のリベースを制御します。 有効な値: EXCEPTIONLEGACYCORRECTED
enableStableIdentifiersForUnionType false Avro 共用体型に安定したフィールド名を使用するかどうか。 有効にすると、共用体の型フィールド名は、小文字の型名から派生します (たとえば、 member_intmember_string)。 2 つの型名が小文字の後で同一の場合、例外をスローします。
mergeSchema false 複数のファイル全体でスキーマを推論するか、各ファイルのスキーマをマージするかどうか。 Avro に対して mergeSchema を有効にしても、データ型は緩和されません。
mode FAILFAST 破損したレコードを処理するためのパーサー モード。 有効な値: FAILFAST (例外がスローされます)、 PERMISSIVE (形式が正しくないフィールドを null に設定)、 DROPMALFORMED (警告なく無効なレコードを削除します)。
readerCaseSensitive true rescuedDataColumn が有効な場合、大文字と小文字の区別の動作を指定します。 true の場合は、名前がスキーマと大文字と小文字が異なるデータ列を復旧します。 false の場合は、大文字と小文字を区別せずにデータを読み取る。
recursiveFieldMaxDepth None 再帰 Avro フィールドの最大再帰深度。 すべての再帰フィールドを切り捨てる 1 に設定し、1 レベルの再帰を許可する 2 など、最大 15に設定します。 設定解除または 0場合、再帰フィールドは許可されません。 有効な値: 15する0
rescuedDataColumn None データ型の不一致とスキーマの不一致 (列の大文字と小文字の区別を含む) が原因で解析できないすべてのデータを別の列に収集するかどうか。 自動ローダーを使用する場合、この列は既定で含まれます。
COPY INTO (レガシ) では、COPY INTOを使用してスキーマを手動で設定できないため、復旧されたデータ列はサポートされません。 Databricks では、ほとんどのインジェスト シナリオで自動ローダーを使用することをお勧めします。
詳細については、「復旧されたデータ列とは」を参照してください。
stableIdentifierPrefixForUnionType member_ enableStableIdentifiersForUnionType=trueするときに、安定した共用体型のフィールド名に使用するプレフィックス。

Csv

Key デフォルト Description
badRecordsPath None 不正な CSV レコードに関する情報を記録するためのファイルを格納するパス。
charToEscapeQuoteEscaping \0 引用符のエスケープに使用する文字をエスケープするために使用する文字。 たとえば、レコードが [ " a\\", b ] の場合は次のようになります。
  • '\'をエスケープする文字が未定義の場合、レコードは解析されません。 パーサーは文字 ([a],[\],["],[,],[ ],[b]) を読み取りますが、終了引用符が見つからないため、エラーをスローします。
  • '\' をエスケープする文字が '\'として定義されている場合、[a\][b]の 2 つの値でレコードが読み取られます。
columnNameOfCorruptRecord _corrupt_record 自動ローダーがサポートされています。 COPY INTO (レガシ) ではサポートされていません。
形式に誤りがあり、解析できないレコードを格納するための列。 解析の modeDROPMALFORMED に設定する場合、この列は空になります。
comment \0 テキスト行の先頭に配置した場合に行コメントを表す文字を定義します。 コメントのスキップを無効にするには、'\0' を使用します。
dateFormat yyyy-MM-dd 日付文字列を解析するための形式。
emptyValue 空の文字列 空の値の文字列表現。
enableDateTimeParsingFallback false 指定した形式で値を解析できない場合に、従来の日付とタイムスタンプの解析動作にフォールバックするかどうかを指定します。 falseすると、解析エラーが発生するか、modeに応じて null が生成されます。
encoding または charset UTF-8 CSV ファイルのエンコードの名前。 オプションの一覧については、java.nio.charset.Charset を参照してください。 UTF-16UTF-32 の場合、multilinetrue を使用することはできません。
enforceSchema true 指定または推論されたスキーマを CSV ファイルに強制的に適用するかどうか。 このオプションを有効にすると、CSV ファイルのヘッダーは無視されます。 自動ローダーを使用してデータをレスキューし、スキーマの展開を許可する場合、このオプションは既定では無視されます。
escape \ データの解析時に使用するエスケープ文字。
extension csv 予期されるファイル名拡張子。 この拡張子のないファイルは、読み取り中に除外されます。
failOnUnknownFields false CSV レコードにスキーマに存在しない列が含まれている場合に失敗するかどうか。 falseすると、認識されない列は、rescuedDataColumnに応じて自動的に削除または救助されます。
failOnWidenedFields false フィールド値を、拡大せずに宣言されたスキーマ型として解析できない場合に失敗するかどうか。 falseすると、rescuedDataColumnに応じて、型が拡大された値が自動的に復旧されます。 failOnUnknownFields=true設定すると、このオプションの効果をマスクできます。
header false CSV ファイルにヘッダーが含まれているかどうか。 自動ローダーによって、スキーマの推論時にファイルにヘッダーが含まれているものと見なされます。
ignoreLeadingWhiteSpace false 解析対象の各値の先頭の空白文字を無視するかどうか。
ignoreTrailingWhiteSpace false 解析対象の各値の末尾の空白文字を無視するかどうか。
inferSchema false 解析対象の CSV レコードのデータ型を推論するか、すべての列が StringType であると見なすか。 true に設定した場合は、追加でデータを渡す必要があります。 自動ローダーの場合は、代わりに cloudFiles.inferColumnTypes を使います。
inputBufferSize 1048576 (1 MB) CSV パーサーのバッファー サイズ (バイト単位)。 大きな CSV ファイルを解析するときにメモリ使用量を調整する場合に便利です。 有効な値: 正の整数。
lineSep なし。 \r\r\n、および \n 連続する 2 つの CSV レコードの間の文字列。
locale US java.util.Locale 識別子。 CSV 内の既定の日付、タイムスタンプ、および 10 進数の解析に影響します。
maxCharsPerColumn -1 解析する値の予想最大文字数。 メモリ エラーを回避するために使用できます。 既定値は -1 で、無制限を意味します。 有効な値: 正の整数、または無制限の -1
maxColumns 20480 レコードに含めることができる列数のハード制限。 有効な値: 正の整数。
mergeSchema false 複数のファイル全体でスキーマを推論するか、各ファイルのスキーマをマージするかどうか。 スキーマの推論時に、自動ローダーに対して既定で有効になります。
mode PERMISSIVE 形式に誤りがあるレコードの処理に関するパーサーのモード。 有効な値: PERMISSIVEDROPMALFORMEDFAILFAST
multiLine false CSV レコードが複数の行にまたがるかどうか。
nanValue NaN FloatType および DoubleType 列を解析する際の非数値の文字列表現。
negativeInf -Inf FloatType または DoubleType 列を解析する際の負の無限大の文字列表現。
nullValue 空の文字列 null 値の文字列表現。
parserCaseSensitive (非推奨) false ファイルの読み取り中に、ヘッダーに宣言されている列をスキーマの大文字と小文字の区別に合わせるかどうか。 自動ローダーについては、これは既定で true となります。 有効にすると、大文字と小文字が異なる列は rescuedDataColumn に救出されます。 readerCaseSensitive が優先されるため、このオプションは非推奨となりました。
positiveInf Inf FloatType または DoubleType 列を解析する際の正の無限大の文字列表現。
preferDate true 可能な場合、タイムスタンプではなく日付として文字列を推論しようとします。 また、 inferSchema を有効にするか、自動ローダーで cloudFiles.inferColumnTypes を使用して、スキーマ推論を使用する必要があります。
quote " フィールド区切り記号が値に含まれる場合に、値のエスケープに使用する文字。
readerCaseSensitive true rescuedDataColumn が有効な場合、大文字と小文字の区別の動作を指定します。 true の場合は、名前がスキーマと大文字と小文字が異なるデータ列を復旧します。 false の場合は、大文字と小文字を区別せずにデータを読み取る。
rescuedDataColumn None データ型の不一致とスキーマの不一致 (列の大文字と小文字の区別を含む) が原因で解析できないすべてのデータを別の列に収集するかどうか。 自動ローダーを使用する場合、この列は既定で含まれます。 詳細については、「復旧されたデータ列とは」を参照してください。
COPY INTO (レガシ) では、COPY INTOを使用してスキーマを手動で設定できないため、復旧されたデータ列はサポートされません。 Databricks では、ほとんどのインジェスト シナリオで自動ローダーを使用することをお勧めします。
sep または delimiter , 列の間の区切り文字列。
singleVariantColumn None 列名に設定すると、各フィールドを独自の列に解析するのではなく、CSV レコード全体をその名前の単一の VariantType 列に読み取ります。 header=true が必要です。
skipRows 0 無視する必要がある CSV ファイルの先頭からの行数 (コメント化された行や空の行を含みます)。 header が true の場合、ヘッダーは最初にスキップされていない行とコメントされていない行になります。 有効な値: 正の整数または 0。
timeFormat HH:mm:ss 列の値 TimeType 解析するための形式。
timestampFormat yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] タイムスタンプ文字列を解析するための形式。
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] タイム ゾーン (TimestampNTZType) 文字列を含まないタイムスタンプを解析するための形式。
timeZone None タイムスタンプと日付を解析するときに使用する java.time.ZoneId
unescapedQuoteHandling STOP_AT_DELIMITER エスケープされていない引用符を処理するための方策。 使用可能なオプション:
  • STOP_AT_CLOSING_QUOTE: 入力にエスケープされていない引用符が見つかった場合は、引用符文字を蓄積し、終了引用符が見つかるまで値を引用符で囲まれた値として解析します。
  • BACK_TO_DELIMITER: 入力にエスケープされていない引用符が見つかった場合は、その値を引用符で囲まれていない値と見なします。 これにより、sep によって定義された区切り記号が見つかるまで、パーサーは現在解析対象となっている値のすべての文字を蓄積します。 値に区切り記号が見つからない場合は、区切り記号または行末が見つかるまで、入力の文字がパーサーによって蓄積され続けます。
  • STOP_AT_DELIMITER: 入力にエスケープされていない引用符が見つかった場合は、その値を引用符で囲まれていない値と見なします。 これにより、sep に定義した区切り記号または行末が入力内で見つかるまで、すべての文字がパーサーによって蓄積されます。
  • SKIP_VALUE: 入力にエスケープされていない引用符が見つかった場合、指定された値に対して解析されたコンテンツはスキップされ (次の区切り記号が見つかるまで)、nullValue で設定された値が代わりに生成されます。
  • RAISE_ERROR: 入力にエスケープされていない引用符が見つかった場合は、 TextParsingException がスローされます。

Excel

Key デフォルト Description
dataAddress None Excel構文で読み取るセル範囲。 省略した場合は、最初のシートからすべての有効なセルを読み取ります。 "SheetName!C5:H10"を使用して、名前付きシートから範囲を読み取るか、最初のシートから範囲を読み取"C5:H10"するか、特定のシートからすべてのデータを読み取"SheetName"します。
headerRows 0 列名ヘッダーとして使用する初期行の数。 dataAddressを指定すると、セル範囲内に適用されます。 0すると、列名は_c1_c2_c3などとして自動生成されます。有効な値: 01
ignoreMissingSheet false dataAddressで指定されたシートを含まないファイルをサイレント スキップするかどうかを指定します。 falseすると、ファイルに要求されたシートがない場合にエラーがスローされます。 シート名が dataAddressで指定されている場合にのみ適用されます。 有効な値: truefalse
includePhoneticRuns false XLSX ファイルを読み取るときに、セル文字列値に連結されたふりがな (pinyin やふりがななど) 発音注釈を含めるかどうか。 有効な値: truefalse
operation readSheet Excel ブックに対して実行する操作。 有効な値: readSheet (シートからデータを読み取る)、 listSheets (各シートの sheetIndex: long フィールドと sheetName: String を持つ構造体を返します)。
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] Excelに文字列として格納されるタイムスタンプなしのタイムゾーン値のカスタム書式指定文字列。 カスタム日付形式は Datetime パターンの形式に従います。
dateFormat yyyy-MM-dd Dateとして読み取られた文字列値のカスタム書式指定文字列。 カスタム日付形式は Datetime パターンの形式に従います。

Json

Key デフォルト Description
allowBackslashEscapingAnyCharacter false バックスラッシュを使用して、後続の任意の 1 文字をエスケープすることを許可するかどうか。 有効にしない場合は、JSON の仕様に明示されている文字のみをエスケープできます。
allowComments false 解析対象のコンテンツ内で Java、C、および C++ スタイルのコメント ('/''*'、および '//' の種類) の使用を許可するかどうか。
allowNonNumericNumbers true 非数値 (NaN) トークンのセットを有効な浮動小数点数値として許可するかどうか。
allowNumericLeadingZeros false 追加の (無視できる) ゼロで始まる整数値を許可するかどうか (例: 000001)。
allowSingleQuotes true 単一引用符 (アポストロフィ、'\' 文字) を使用して、文字列 (名前と文字列値) を囲むことを許可するかどうか。
allowUnquotedControlChars false JSON 文字列に、エスケープされていない制御文字 (タブや改行文字など、値が 32 未満の ASCII 文字) を含めることを許可するかどうか。
allowUnquotedFieldNames false 引用符で囲まれていないフィールド名の使用を許可するかどうか。これは JavaScript では許可されますが、JSON 仕様では許可されません。
alternateVariantEncoding None ソース JSON の Variant 値に使用されるエンコード。 インライン JSON として格納されるのではなく、Base85 でエンコードされた Variant 値をデコードするには、 Z85 に設定します。
badRecordsPath None 不正な JSON レコードに関する情報を記録するためのファイルを格納するパス。
ファイル ベースのデータ ソースで badRecordsPath オプションを使用する場合、次の制限があります。
  • これは非トランザクションであり、一貫性のない結果につながる可能性があります。
  • 一時的なエラーはエラーとして扱われます。
columnNameOfCorruptRecord _corrupt_record 形式に誤りがあり、解析できないレコードを格納するための列。 解析の modeDROPMALFORMED に設定する場合、この列は空になります。
dateFormat yyyy-MM-dd 日付文字列を解析するための形式。
dropFieldIfAllNull false スキーマの推論中に、すべて null 値の列または空の配列および構造体を無視するかどうか。
encoding または charset UTF-8 JSON ファイルのエンコードの名前。 オプションの一覧については、java.nio.charset.Charset を参照してください。 UTF-16UTF-32 の場合、multilinetrue を使用することはできません。
inferTimestamp false タイムスタンプ文字列を TimestampType として推論を試みるかどうか。 trueに設定すると、スキーマの推論に著しく長い時間がかかる場合があります。 自動ローダーで使うには cloudFiles.inferColumnTypes を有効にする必要があります。
lineSep なし。 \r\r\n、および \n 連続する 2 つの JSON レコードの間の文字列。
locale US java.util.Locale 識別子。 JSON 内の既定の日付、タイムスタンプ、および 10 進数の解析に影響します。
maxNestingDepth 500 JSON オブジェクトと配列で許容される入れ子の深さの最大値。 深く入れ子になったドキュメントの場合は、この値を大きくします。 有効な値: 正の整数。
maxNumLen 1000 JSON 入力内のトークン数の最大長。 大きな数値リテラルを含む JSON の場合は、この値を増やします。 有効な値: 正の整数。
maxStringLen 無制限 JSON 入力内の文字列値の最大長。 大きな文字列で JSON を解析するときのメモリ使用量を制限するように設定します。 有効な値: 正の整数。
mode PERMISSIVE 形式に誤りがあるレコードの処理に関するパーサーのモード。 有効な値: PERMISSIVEDROPMALFORMEDFAILFAST
multiLine false JSON レコードが複数の行にまたがるかどうか。
prefersDecimal false 可能な場合は float 型や double 型の代わりに DecimalType として文字列を推論しようとします。 また、 inferSchema を有効にするか、自動ローダーで cloudFiles.inferColumnTypes を使用して、スキーマ推論を使用する必要があります。
primitivesAsString false 数値やブール値などのプリミティブ型を StringType として推論するかどうか。
readerCaseSensitive true rescuedDataColumn が有効な場合、大文字と小文字の区別の動作を指定します。 true の場合は、名前がスキーマと大文字と小文字が異なるデータ列を復旧します。 false の場合は、大文字と小文字を区別せずにデータを読み取る。 Databricks Runtime 13.3 以降で使用できます。
rescuedDataColumn None データ型の不一致またはスキーマの不一致 (列の大文字と小文字の区別を含む) によって解析できないすべてのデータを別の列に収集するかどうか。 自動ローダーを使用する場合、この列は既定で含まれます。 詳細については、「復旧されたデータ列とは」を参照してください。
COPY INTO (レガシ) では、COPY INTOを使用してスキーマを手動で設定できないため、復旧されたデータ列はサポートされません。 Databricks では、ほとんどのインジェスト シナリオで自動ローダーを使用することをお勧めします。
singleVariantColumn None JSON ドキュメント全体を取り込むかどうか。列の名前として指定された文字列を持つ単一の Variant 列に解析されます。 設定されていない場合、JSON フィールドは独自の列に取り込まれます。 有効な値: 任意の文字列。
timestampFormat yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] タイムスタンプ文字列を解析するための形式。
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] タイム ゾーン (TimestampNTZType) 文字列を含まないタイムスタンプを解析するための形式。
timeZone None タイムスタンプと日付を解析するときに使用する java.time.ZoneId
upgradeExceptionAsBadRecord false 型アップグレード例外 (たとえば、宣言された列型に値を拡大できない場合) を、例外をスローするのではなく、無効なレコードとして扱うかどうか。

カフカ

Kafka リーダー オプションの完全な一覧については、「 DataStreamReader Kafka オプション」を参照してください。 次のオプションは、 spark.read.format("kafka")を使用したバッチ読み取りにのみ適用されます。

Key デフォルト Description
endingOffsets latest 読み取りを停止する場所。 有効な値: latest、または各パーティションのオフセットの JSON 文字列 ( {"topicA":{"0":50,"1":-1}}など)。
JSON 文字列では、 -1 は最新のオフセットです。 -2(最も早いオフセット) は、終了オフセットとして使用できません。
endingOffsetsByTimestamp None タイムスタンプとして指定されたパーティションごとの終了オフセット (ミリ秒単位)。 有効な値: {"topicA":{"0":2000,"1":3000}}など、各パーティションのタイムスタンプの JSON 文字列。
endingTimestamp None すべてのパーティションに適用されるグローバル終了タイムスタンプ (ミリ秒単位)。 有効な値: 負以外の整数。

オーク

Key デフォルト Description
mergeSchema false 複数のファイル全体でスキーマを推論するか、各ファイルのスキーマをマージするかどうか。

寄木細工

Key デフォルト Description
datetimeRebaseMode LEGACY ユリウス暦と予期的グレゴリオ暦の間の日付値とタイムスタンプ値のリベースを制御します。 有効な値: EXCEPTIONLEGACYCORRECTED
int96RebaseMode LEGACY ユリウス暦と予期的グレゴリオ暦の間の INT96 タイムスタンプ値のリベースを制御します。 有効な値: EXCEPTIONLEGACYCORRECTED
mergeSchema false 複数のファイル全体でスキーマを推論するか、各ファイルのスキーマをマージするかどうか。
readerCaseSensitive true rescuedDataColumn が有効な場合、大文字と小文字の区別の動作を指定します。 true の場合は、名前がスキーマと大文字と小文字が異なるデータ列を復旧します。 false の場合は、大文字と小文字を区別せずにデータを読み取る。
rescuedDataColumn None データ型の不一致とスキーマの不一致 (列の大文字と小文字の区別を含む) が原因で解析できないすべてのデータを別の列に収集するかどうか。 自動ローダーを使用する場合、この列は既定で含まれます。 詳細については、「復旧されたデータ列とは」を参照してください。
COPY INTO (レガシ) では、COPY INTOを使用してスキーマを手動で設定できないため、復旧されたデータ列はサポートされません。 Databricks では、ほとんどのインジェスト シナリオで自動ローダーを使用することをお勧めします。

状態ストア

構造化ストリーミング状態データを読み取るために、 spark.read.format("statestore") またはテーブル値関数 read_statestore これらのオプションを使用します。 「構造化ストリーミング状態情報の読み取り」をご覧ください。

Key デフォルト Description
batchId 最新のバッチ ID 読み取り対象のバッチ。 クエリの以前の状態を照会するために使用します。 バッチはコミットされるべきですが、まだクリーンアップされていません。 有効な値: 負以外の整数。
operatorId 0 読み取り対象の演算子。 クエリに複数のステートフル演算子がある場合に使用します。 有効な値: 負以外の整数。
storeName DEFAULT 読み取り対象の状態ストア名。 ステートフル 演算子に複数の状態ストア インスタンスがある場合に使用します。 ストリーム ストリーム結合には storeName または joinSide を指定する必要がありますが、両方を指定する必要はありません。 有効な値: 任意の文字列。
joinSide None ストリーム ストリーム結合の読み取り先となるターゲット側。 ストリーム ストリーム結合には storeName または joinSide を指定する必要がありますが、両方を指定する必要はありません。 有効な値: leftright
snapshotStartBatchId None 状態を読み取るときに開始点として使用するスナップショットのバッチ ID。 リーダーは、このスナップショットからの変更を batchIdまで再生することで、状態を再構築します。 スナップショットが破損している場合に便利です。 snapshotPartitionIdと共に指定する必要があります。 readChangeFeedでは使用できません。 変更ログのチェックポイント処理が有効になっている HDFS ベースの状態ストアと RocksDB 状態ストアをサポートします。 Databricks Runtime 15.4 LTS 以降で使用できます。 有効な値: 負以外の整数。
snapshotPartitionId None 指定した場合、クエリはこのパーティションのみを読み取ります。 snapshotStartBatchIdと共に指定する必要があります。 readChangeFeedでは使用できません。 Databricks Runtime 15.4 LTS 以降で使用できます。 有効な値: 負以外の整数。
readChangeFeed false trueすると、changeStartBatchIdchangeEndBatchIdの間のバッチの指定された範囲にわたって状態の変化を返します。 changeStartBatchId が必要です。 joinSidebatchIdsnapshotStartBatchId、またはsnapshotPartitionIdでは使用できません。 Databricks Runtime 16.4 LTS 以降で使用できます。 有効な値: truefalse
詳細については、「 構造化ストリーミング状態の変更の読み取り」を参照してください。
changeStartBatchId None 変更フィード範囲の開始バッチ ID。 readChangeFeedtrue の場合に必要です。 readChangeFeedtrue に設定されている場合にのみ適用されます。 Databricks Runtime 16.4 LTS 以降で使用できます。 有効な値: 負以外の整数。
changeEndBatchId 最新のバッチ ID 変更フィード範囲の終了バッチ ID。 changeStartBatchId以上である必要があります。 readChangeFeedtrue に設定されている場合にのみ適用されます。 Databricks Runtime 16.4 LTS 以降で使用できます。 有効な値: 負以外の整数。
stateVarName None 読み取る状態変数名。 状態変数名は、transformWithState 演算子によって使用されるStatefulProcessorinit関数内の各変数の一意の名前です。 transformWithState演算子を使用する場合は必須です。 Databricks Runtime 16.4 LTS 以降で使用できます。 有効な値: 任意の文字列。
readRegisteredTimers false trueすると、transformWithState演算子によって使用される登録済みタイマーを読み取ります。 transformWithState演算子にのみ適用されます。 Databricks Runtime 16.4 LTS 以降で使用できます。 有効な値: truefalse
flattenCollectionTypes true trueすると、マップとリストの状態変数に対して返されるレコードがフラット化されます。 falseすると、Spark SQL ArrayまたはMapとしてレコードが返されます。 transformWithState演算子にのみ適用されます。 Databricks Runtime 16.4 LTS 以降で使用できます。 有効な値: truefalse

テキスト

Key デフォルト Description
encoding UTF-8 テキスト ファイルの行区切り記号のエンコードの名前。 オプションの一覧については、「 java.nio.charset.Charset」を参照してください。 ファイルの内容はこのオプションの影響を受けず、as-is読み取られます。
lineSep なし。 \r\r\n 、および \n 連続する 2 つのテキスト レコード間の文字列。
wholeText false ファイルを単一レコードとして読み取るかどうか。

Xml

Key デフォルト Description
rowTag None 行として扱う XML ファイルの行タグ。 XML <books> <book><book>...<books> の例では、適切な値は book です。 これは必須オプションです。
samplingRatio 1.0 スキーマ推論に使用される行の割合を定義します。 XML 組み込み関数はこのオプションを無視します。 有効な値: 1.0する0.0
excludeAttribute false 要素内の属性を除外するかどうか。
mode None 解析中に破損したレコードを処理するモードを許可します。 PERMISSIVE: 破損したレコードの場合は、columnNameOfCorruptRecord によって構成されたフィールドに形式に誤りがある文字列を格納し、形式に誤りがあるフィールドを null に設定します。 破損したレコードを保持するには、ユーザー定義スキーマで string という名前の columnNameOfCorruptRecord 型フィールドを設定できます。 スキーマにこのフィールドがない場合、破損したレコードは解析中に削除されます。 スキーマを推論すると、パーサーは出力スキーマに columnNameOfCorruptRecord フィールドを暗黙的に追加します。 DROPMALFORMED: 破損したレコードを無視します。 このモードは XML 組み込み関数ではサポートされていません。 FAILFAST: パーサーが破損したレコードを検出すると、例外をスローします。
inferSchema true true の場合は、結果として得られる各データフレーム列に対して適切な型を推論しようとします。 false の場合、結果の列はすべて string 型です。 XML 組み込み関数はこのオプションを無視します。
columnNameOfCorruptRecord spark.sql.columnNameOfCorruptRecord PERMISSIVE モードで作成された形式の正しくない文字列を含む新しいフィールドの名前を変更できます。
attributePrefix None 属性と要素を区別するための属性のプレフィックス。 これはフィールド名のプレフィックスになります。 既定値は _ です。 XML の読み取り時は空にすることができますが、書き込み時は空にすることはできません。 DataFrameWriter XML オプションにも適用されます。
valueTag _VALUE 属性または子要素の要素も持つ要素内の文字データに使用されるタグ。 ユーザーがスキーマで valueTag フィールドを指定することもできますが、文字データが他の要素や属性と一緒に要素に存在する場合、スキーマ推論中に自動的に追加されます。 DataFrameWriter XML オプションにも適用されます。
encoding UTF-8 読み取りの場合は、指定されたエンコードの種類で XML ファイルをデコードします。 書き込みの場合は、保存される XML ファイルのエンコード (文字セット) を指定します。 XML 組み込み関数はこのオプションを無視します。 DataFrameWriter XML オプションにも適用されます。
ignoreSurroundingSpaces true 値を囲む空白をスキップする必要があるかどうか。 空白のみの文字データは無視されます。
rowValidationXSDPath None 各行の省略可能な XML を個別に検証するために使用される XSD ファイルへのパス。 検証に失敗した行は、解析エラーのように扱われます。 XSD は、指定または推論されるかどうかにかかわらず、スキーマには影響しません。
ignoreNamespace false true場合、XML 要素と属性に対する名前空間のプレフィックスは無視されます。 たとえば、タグ <abc:author><def:author> は、どちらも単なる <author> として扱われます。 rowTag 要素では名前空間を無視することはできないため、その子要素の読み込みのみを取り扱うことが可能です。 false の場合でも、XML 解析は名前空間を認識しません。
timestampFormat yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] カスタムタイムスタンプの形式文字列であり、datetime パターンに従います。 これは timestamp 型に適用されます。 DataFrameWriter XML オプションにも適用されます。
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] タイムゾーンを含まないタイムスタンプのカスタム形式の文字列で、datetime パターン形式に従っています。 これは TimestampNTZType 型に適用されます。 DataFrameWriter XML オプションにも適用されます。
dateFormat yyyy-MM-dd カスタム日付形式の文字列であり、datetime パターン形式に従います。 これは、日付型に適用されます。 DataFrameWriter XML オプションにも適用されます。
locale en-US IETF BCP 47 形式の言語タグとしてロケールを設定します。 たとえば、locale は日付とタイムスタンプの解析中に使用されます。
nullValue null null 値の文字列表記を設定します。 これが null である場合、パーサーはフィールドの属性と要素を書き込みません。 DataFrameWriter XML オプションにも適用されます。
readerCaseSensitive true rescuedDataColumn が有効な場合、大文字と小文字の区別の動作を指定します。 true の場合は、名前がスキーマと大文字と小文字が異なるデータ列を復旧します。 false の場合は、大文字と小文字を区別せずにデータを読み取る。
rescuedDataColumn None データ型の不一致とスキーマの不一致 (列の大文字と小文字の区別を含む) のために解析できないすべてのデータを別の列に収集するかどうか。 自動ローダーを使用する場合、この列は既定で含まれます。 詳細については、「復旧されたデータ列とは」を参照してください。 COPY INTO (レガシ) では、COPY INTOを使用してスキーマを手動で設定できないため、復旧されたデータ列はサポートされません。 Databricks では、ほとんどのインジェスト シナリオで自動ローダーを使用することをお勧めします。
singleVariantColumn none 1 つのバリアント列の名前を指定します。 このオプションが読み取り用に指定されている場合は、指定されたオプション文字列値を列の名前として使用して、XML レコード全体を 1 つの Variant 列に解析します。 このオプションが書き込み用に指定されている場合は、単一の Variant 列の値を XML ファイルに書き込みます。 DataFrameWriter XML オプションにも適用されます。
useLegacyXMLParser true レガシ XML パーサーを使用するかどうか。 レガシ パーサーでは、形式が正しくないコンテンツに対する検証の厳格さが低くなりますが、メモリ効率は低くなります。 より厳密な既定のパーサーをオプトインするには、 false に設定します。
wildcardColName xs_any ワイルドカード (xs:any) スキーマ要素に一致する XML 要素をキャプチャするために使用される列名。 rescuedDataColumnと一緒に使用することはできません。

DataStreamReader オプション

これらのオプションを DataStreamReader.option() と共に使用して、Delta Lake テーブルやその他のファイル ベースのソースからのストリーミング読み取りを構成します。

ファイル形式のオプション (JSON、CSV、Parquet など) については、「 DataFrameReader オプション」を参照してください。

自動ローダー (cloudFiles.*) オプションについては、「 自動ローダー」を参照してください。

Example

次の例では、Delta Lake テーブル ストリームの10maxFilesPerTriggerを設定します。

Python
df = spark.readStream.format("delta").option("maxFilesPerTrigger", 10).load("/path/to/delta-table")
Scala
val df = spark.readStream.format("delta").option("maxFilesPerTrigger", "10").load("/path/to/delta-table")

共通

Delta Lake テーブルとその他のファイル ベースのストリーミング ソースには、次のオプションが適用されます。

Key デフォルト Description
cleanSource off ソース ファイルがストリームによって処理された後で処理する方法。 有効な値: off (アクションなし)、 delete (ソース ファイルを完全に削除)、 archive ( sourceArchiveDir に移動)。 archiveに設定する場合は、sourceArchiveDirも設定する必要があります。 Delta Lake テーブル ストリーミングには適用されません。
fileNameOnly false 既に処理されているファイルを、完全なパスではなくファイル名のみで識別するかどうか。 trueすると、同じファイル名を持つ異なるパスにあるファイルは同じファイルとして扱われ、再処理されません。 Delta Lake テーブル ストリーミングには適用されません。
latestFirst false 各マイクロバッチ内で、最後に変更されたファイルを最初に処理するかどうか。 可能な限り迅速に最新のデータを処理する場合に便利です。 truemaxFilesPerTriggerまたはmaxBytesPerTriggerが設定されている場合、maxFileAgeは無視されます。 Delta Lake テーブル ストリーミングには適用されません。
maxBytesPerTrigger None マイクロバッチごとに処理されるデータ量のソフト最大値。 最小の入力ユニットが上限を超えると、バッチが制限を超えて処理される場合があります。 maxFilesPerTriggerと共に使用すると、マイクロバッチは、いずれかの制限に最初に達するまでデータを処理します。 有効な値: 正の整数。
自動ローダーの場合は、代わりに cloudFiles.maxBytesPerTrigger を使います。 「共通」を参照してください。
maxCachedFiles 10000 後続のマイクロバッチ用にキャッシュする未処理のファイルの最大数。 キャッシュをオフにするには、 0 に設定します。 ソース ディレクトリにトリガーごとに多数の新しいファイルが含まれている場合は、この値を大きくします。 Delta Lake テーブル ストリーミングには適用されません。 有効な値: 正の整数または 0
maxFileAge 7d 現在のシステム時刻ではなく、最近変更されたファイルのタイムスタンプを基準とした、処理対象と見なされるファイルの最大有効期間。 このしきい値より古いファイルは無視されます。 7d4hなどの期間文字列を受け入れます。 latestFirsttrueされ、maxFilesPerTriggerまたはmaxBytesPerTriggerが設定されている場合は無視されます。 Delta Lake テーブル ストリーミングには適用されません。
maxFilesPerTrigger 1000 Delta Lake と自動ローダー用。 その他のファイル ベースのソースに対する最大値はありません。 各マイクロバッチで処理される新しいファイルの数の上限。 maxBytesPerTriggerと共に使用すると、マイクロバッチは、いずれかの制限に最初に達するまでデータを処理します。 有効な値: 正の整数。
自動ローダーの場合は、代わりに cloudFiles.maxFilesPerTrigger を使います。 「共通」を参照してください。
sourceArchiveDir None cleanSourcearchive に設定されている場合のアーカイブ ディレクトリへのパス。 ソース ファイルは、処理後にこのパスに移動され、相対ディレクトリ構造が維持されます。 Delta Lake テーブル ストリーミングには適用されません。

自動ローダー

これらのオプションを cloudFiles ソースと共に使用して、クラウド ストレージからのインジェストをストリーミングするように 自動ローダー を構成します。 cloudFiles ソースに固有のオプションには、他の構造化ストリーミング ソース オプションとは別の名前空間に保持するために、cloudFilesが付いています。

共通

Key デフォルト Description
cloudFiles.allowOverwrites false 入力ディレクトリ ファイルの変更による既存のデータの上書きを許可するかどうか。
構成に関する注意事項については、「 ファイルが追加または上書きされたときに、自動ローダーによってファイルが再び処理されますか?」を参照してください。
cloudFiles.backfillInterval None 自動ローダーは、特定の間隔で非同期バックフィルをトリガーできます。 たとえば、毎日バックフィルする 1 day や、毎週バックフィルする 1 week などです。 詳細については、「 cloudFiles.backfillInterval を使用して通常のバックフィルをトリガーする」を参照してください。
cloudFiles.useManagedFileEventstrue に設定されている場合は使用しないでください。
cloudFiles.cleanSource OFF 入力ディレクトリから処理されたファイルを自動的に削除するかどうかを指定します。 OFF (既定値) に設定すると、ファイルは削除されません。
DELETEに設定すると、自動ローダーは処理されてから 30 日後に自動的にファイルを削除します。 これを行うには、自動ローダーにソース ディレクトリへの書き込みアクセス許可が必要です。
MOVEに設定すると、自動ローダーは、ファイルが処理されてから 30 日後cloudFiles.cleanSource.moveDestination指定した場所に自動的に移動します。 これを行うには、自動ローダーには、ソース ディレクトリと移動場所に対する書き込みアクセス許可が必要です。
ファイルは、commit_timeテーブル値関数の結果でcloud_files_stateに null 以外の値がある場合に処理されたと見なされます。 テーブル値関数cloud_files_state参照してください。 処理後の 30 日間の追加待機は、 cloudFiles.cleanSource.retentionDurationを使用して構成できます。
cloudFiles.cleanSourceを有効にする前に、次の考慮事項を確認してください。
  • 最も高速なコンシューマーがファイルを削除し、低速のソースに取り込まれないため、ソースの場所からデータを消費するストリームが複数ある場合、Azure Databricksはこのオプションを使用しないことをお勧めします。
  • この機能を有効にするには、自動ローダーがそのチェックポイントで追加の状態を維持する必要があります。これにより、パフォーマンスのオーバーヘッドが発生しますが、テーブル値関数 cloud_files_state による可観測性が向上します。 テーブル値関数cloud_files_state参照してください。
  • cleanSource は、現在の設定を使用して、特定のファイルを MOVE するか DELETE するかを決定します。 たとえば、ファイルが最初に処理されたときに設定が MOVE されたが、30 日後にファイルがクリーンアップの候補になったときに DELETE に変更されたとします。 この場合、cleanSource によってファイルが削除されます。
  • ファイルは、 retentionDuration の有効期限が切れるとすぐに消去される保証はありません。 コストを抑えるために、自動ローダーはストリーム処理と同時にファイルを削除し、ストリーム処理が完了するか終了するとすぐに終了します。 クリーンアップの候補であったが、ストリーム処理中にクリーンアップできなかったファイルは、次回の自動ローダーの実行時に取得されます。

Databricks Runtime 16.4 以降で使用できます。
cloudFiles.cleanSource.retentionDuration 30 days 処理されたファイルが cleanSourceを使用したアーカイブの候補になるまでの待機時間。 DELETEの場合は 7 日を超える必要があります。 MOVEの最小制限はありません。
値は CalendarInterval 文字列です 。 たとえば、"14 days""30 days""2 weeks"、または "1 month" です。
Databricks Runtime 16.4 以降で使用できます。
cloudFiles.cleanSource.moveDestination None cloudFiles.cleanSourceMOVEに設定されている場合に処理されたファイルをアーカイブするパス。 クラウド ストレージ パスまたは Unity カタログ ボリューム パス ( /Volumes/my_catalog/my_schema/my_volume/archive/ など) を指定できます。
移動場所は次の条件を満たす必要があります。
  • ソース ディレクトリの子ではありません。 移動先をソース ディレクトリ内に配置すると、アーカイブされたファイルが再び取り込まれます。
  • ソースと同じ外部の場所、ボリューム、または DBFS マウント内にある。 クロスバケットとクロスコンテナーの移動はサポートされていないため、エラーが発生します。

自動ローダーには、このディレクトリへの書き込みアクセス許可が必要です。
Databricks Runtime 16.4 以降で使用できます。
cloudFiles.format なし (必須オプション) ソース パスのデータ ファイル形式。 有効な値は次のとおりです。
cloudFiles.includeExistingFiles true ストリーム処理入力パスに既存のファイルを含めるか、初期セットアップ後に到着した新しいファイルのみを処理するか。 このオプションは、初めてストリームを開始するときにのみ評価されます。 ストリームの再起動後にこのオプションを変更した場合、効果はありません。
cloudFiles.inferColumnTypes false スキーマの推論を利用するときに、正確な列型を推論するかどうか。 既定では、列は JSON および CSV データセットを推論するときに文字列として推論されます。 詳細については、スキーマの推論に関する説明を参照してください。
cloudFiles.maxBytesPerTrigger None 各トリガーで処理される新しいバイトの最大数。 10g などのバイト文字列を指定して、各マイクロバッチを 10 GB のデータに制限できます。 これはソフト最大値です。 それぞれ 3 GB のファイルがある場合、Azure Databricks は 1 マイクロバッチで 12 GB を処理します。 cloudFiles.maxFilesPerTrigger と使用すると、Azure Databricks では、cloudFiles.maxFilesPerTrigger または cloudFiles.maxBytesPerTrigger の下限のうち、最初に到達した方までを消費します。 このオプションは、Trigger.Once() (Trigger.Once() は非推奨) と共に使用しても無効です。
Databricks Runtime 18.0 以降では、このオプションは動的に構成され、手動で設定する必要はありません。
cloudFiles.maxFileAge None 重複排除を目的としてファイル イベントを追跡する期間。 Databricks では、1 時間に数百万のファイルの順序でデータを取り込む場合でない限り、このパラメーターのチューニングは推奨しません。 詳細については、 ファイル イベントの追跡 に関するセクションを参照してください。
cloudFiles.maxFileAge のチューニングがアグレッシブすぎると、重複取り込みやファイル欠如など、データ品質の問題を引き起こすことがあります。 そのため、Databricks は cloudFiles.maxFileAge に 90 日間などの控えめな設定を推奨しています。同等のデータ インジェスト ソリューションもこのくらいを推奨しています。
cloudFiles.maxFilesPerTrigger 1000 各トリガーで処理される新しいファイルの最大数。 cloudFiles.maxBytesPerTrigger と使用すると、Azure Databricks では、cloudFiles.maxFilesPerTrigger または cloudFiles.maxBytesPerTrigger の下限のうち、最初に到達した方までを消費します。 Trigger.Once() (非推奨) と一緒に使用すると、このオプションは無効です。
Databricks Runtime 18.0 以降では、このオプションは動的に構成され、手動で設定する必要はありません。
cloudFiles.partitionColumns None ファイルのディレクトリ構造から推論する Hive スタイルのパーティション列のコンマ区切りの一覧。 Hive スタイルのパーティション列は、 <base-path>/a=x/b=1/c=y/file.formatなどの等値記号で結合されたキーと値のペアです。 この例では、パーティション列は abc です。 既定では、スキーマ推論を使用し、データを読み込む <base-path> を指定している場合、これらの列は自動的にスキーマに追加されます。 スキーマを指定すると、自動ローダーにより、これらの列がこのスキーマに含まれると想定されます。 これらの列をスキーマの一部に含めない場合は、"" を指定して、これらの列を無視することができます。 さらに、下の例のような複雑なディレクトリ構造のファイル パスから列を推論するときに、このオプションを使用できます。
<base-path>/year=2022/week=1/file1.csv
<base-path>/year=2022/month=2/day=3/file2.csv
<base-path>/year=2022/month=2/day=4/file3.csv
cloudFiles.partitionColumnsとしてyear,month,dayを指定すると、year=2022file1.csvが返されますが、month列とday列はnull
monthday は、 file2.csvfile3.csvに対して正しく解析されます。
cloudFiles.schemaEvolutionMode addNewColumns スキーマが指定されていない場合は none それ以外の場合 新しい列がデータで検出された場合にスキーマを展開するモード。 既定では、列は JSON データセットを推論するときに文字列として推論されます。 詳細については、スキーマの展開に関する説明を参照してください。
cloudFiles.schemaHints None スキーマの推論中に自動ローダーに提供するスキーマ情報。 詳細については、スキーマ ヒントに関するページを参照してください。
cloudFiles.schemaLocation なし (スキーマを推論するために必要) 推論されたスキーマとそれ以降の変更を保存する場所。 詳細については、スキーマの推論に関する説明を参照してください。
cloudFiles.useStrictGlobber false Apache Spark の他のファイル ソースの既定のグロビング動作に一致する厳密な globber を使用するかどうか。 詳細については、「一般的なデータ読み込みパターン」を参照してください。 Databricks Runtime 12.2 LTS 以降で使用できます。
cloudFiles.validateOptions true 自動ローダー オプションを検証し、不明なオプションまたは一貫性のないオプションに対してエラーを返すかどうか。

ディレクトリの一覧

Key デフォルト Description
cloudFiles.useIncrementalListing (非推奨) autoDatabricks Runtime 17.2 以降では、Databricks Runtime 17.3 以降でfalse この機能は廃止されました。 Databricks では、の代わりにcloudFiles.useIncrementalListingを使用することをお勧めします。
ディレクトリ一覧モードで、完全な一覧ではなく、増分一覧を使用するかどうか。 既定では、自動ローダーは、特定のディレクトリが増分一覧に該当する場合に、ベスト エフォートで自動検出を行います。 これを true または false に設定することで、増分一覧または完全なディレクトリ一覧を明示的に使用できます。
構文指定されていないディレクトリでインクリメンタル リストを誤って有効にすると、自動ローダーが新しいファイルを検出できなくなります。
Azure Data Lake Storage (abfss://)、S3 (s3://)、GCS (gs://) で動作します。
Databricks Runtime 9.1 LTS 以降で使用できます。
使用できる値: autotruefalse

ファイル通知

必要なクラウドのアクセス許可、セットアップ手順、認証方法など、ファイル通知モードの構成については、「 ファイル通知モードでの自動ローダー ストリームの構成」を参照してください。

Key デフォルト Description
cloudFiles.fetchParallelism 1 キュー サービスからメッセージを取得するときに使用するスレッドの数。
cloudFiles.useManagedFileEventstrue に設定されている場合は使用しないでください。
cloudFiles.pathRewrites None 複数の S3 バケットからファイル通知を受信する queueUrl を指定し、これらのコンテナー内のデータにアクセスするように構成されたマウント ポイントを使用する場合にのみ必要です。 bucket/key パスのプレフィックスをマウント ポイントで書き換える場合は、このオプションを使用します。 プレフィックスのみ書き換え可能です。 たとえば、構成 {"<databricks-mounted-bucket>/path": "dbfs:/mnt/data-warehouse"}の場合、パス s3://<databricks-mounted-bucket>/path/2017/08/fileA.jsondbfs:/mnt/data-warehouse/2017/08/fileA.jsonに書き換えられます。
cloudFiles.useManagedFileEventstrue に設定されている場合は使用しないでください。
cloudFiles.resourceTag None 関連するリソースの関連付けと識別に役立つ一連のキーと値のタグ ペア。次に例を示します。
cloudFiles.option("cloudFiles.resourceTag.myFirstKey", "myFirstValue")
.option("cloudFiles.resourceTag.mySecondKey", "mySecondValue")
AWS の詳細については、Amazon SQS コスト割り当てタグAmazon SNS のタグの構成に関するページを参照してください。 (1)
Azure の詳細については、キューとメタデータの名前付けと、properties.labelsイベント サブスクリプションに関する情報をご覧ください。 自動ローダーは、これらのキーと値のタグ ペアを JSON にラベルとして保存します。 (1)
GCP の詳細については、「ラベル付き使用状況の報告」を参照してください。 (1)
cloudFiles.useManagedFileEventstrue に設定されている場合は使用しないでください。 代わりに、クラウド プロバイダー コンソールを使用してリソース タグを設定します。
cloudFiles.useManagedFileEvents false trueに設定すると、自動ローダーはファイル イベント サービスを使用して、外部の場所にあるファイルを検出します。 このオプションは、読み込みパスがファイル イベントが有効になっている外部の場所にある場合にのみ使用できます。 ファイル イベントでファイル通知モードを使用するを参照してください。
自動ローダーは前回の実行後に新しいファイルを検出できるため、ファイル イベントはファイル検出で通知レベルのパフォーマンスを提供します。 ディレクトリ一覧とは異なり、このプロセスではディレクトリ内のすべてのファイルを一覧表示する必要はありません。
ファイル イベント オプションが有効になっている場合でも、自動ローダーでディレクトリ 一覧が使用される場合があります。
  • 初期読み込み中に、 includeExistingFilestrue に設定されている場合、完全なディレクトリ一覧が実行され、自動ローダーが開始される前にディレクトリに存在していたすべてのファイルが検出されます。
  • ファイル イベント サービスは、最近作成されたファイルをキャッシュすることで、ファイルの検出を最適化します。 自動ローダーの実行頻度が低い場合、このキャッシュの有効期限が切れる可能性があり、自動ローダーはディレクトリ一覧にフォールバックしてファイルを検出し、キャッシュを更新します。 このシナリオを回避するには、少なくとも 7 日に 1 回は自動ローダーを呼び出します。

自動ローダーでこのオプション を指定してディレクトリ一覧を使用 する場合の状況の包括的な一覧については、「ファイル イベントを含む自動ローダーでディレクトリ一覧を使用するタイミング」を参照してください。
Databricks Runtime 14.3 LTS 以降で使用できます。
cloudFiles.listOnStart false trueに設定すると、自動ローダーは、チェックポイントの継続トークンで始まるのではなく、ストリームの開始時に完全なディレクトリ一覧を実行します。 CF_MANAGED_FILE_EVENTS_INVALID_CONTINUATION_TOKENなどのエラーから回復するには、このオプションを使用します。 CF_MANAGED_FILE_EVENTS_INVALID_CONTINUATION_TOKEN エラーから回復する方法」を参照してください。
cloudFiles.useNotifications false ファイル通知モードを使用して、新しいファイルがあるときを判断するかどうか。 false の場合は、ディレクトリ一覧モードを使用します。 「自動ローダー ファイル検出モードを比較する」を参照してください。
cloudFiles.useManagedFileEventstrue に設定されている場合は使用しないでください。

(1) 自動ローダーでは、ベスト エフォート ベースで、次のキーと値のタグ ペアが既定で追加されます。

  • vendor: Databricks
  • path: データが読み込まれる場所。 ラベル付けの制限のため、GCP では使用できません。
  • checkpointLocation: ストリームのチェックポイントの場所。 ラベル付けの制限のため、GCP では使用できません。
  • streamId: ストリームのグローバルに一意な識別子。

Databricks はこれらのキー名を予約し、それらの値を上書きすることはできません。

クラウド固有

自動ローダーには、ファイル通知モード用にクラウド インフラストラクチャを構成するためのオプションが用意されています。 必要なクラウドのアクセス許可とセットアップ手順については、「 ファイル通知モードで自動ローダー ストリームを構成する」を参照してください。

Aws

[ cloudFiles.useNotifications = true ] を選択し、自動ローダーで通知サービスを設定する場合にのみ、次のオプションを指定します。

Key デフォルト Description
cloudFiles.region EC2 インスタンスのリージョン ソース S3 バケットが存在し、AWS SNS および SQS サービスを作成するリージョン。
Key デフォルト Description
cloudFiles.restrictNotificationSetupToSameAWSAccountId false SNS トピックと同じアカウント内の AWS S3 バケットからのイベント通知のみを許可します。 true の場合、自動ローダーは、SNS トピックと同じアカウント内の AWS S3 バケットからのイベント通知のみを受け入れます。
false場合、アクセス ポリシーはクロスアカウント バケットと SNS トピックの設定を制限しません。 これは、SNS トピックとバケットパスが異なるアカウントに関連付けられている場合に便利です。
Databricks Runtime 17.2 以降で使用できます。

cloudFiles.useNotifications = true を選択し、既に設定したキューを自動ローダーで使用する場合にのみ、次のオプションを指定します。

Key デフォルト Description
cloudFiles.queueUrl None SQS キューの URL。 指定がある場合、自動ローダーは独自の AWS SNS および SQS サービスを設定せずに、このキューから直接イベントを消費します。

AWS 認証オプション

Databricks サービス資格情報を使用するには、次の認証オプションを指定します。

Key デフォルト Description
databricks.serviceCredential None Databricks のサービス資格情報の名前。 Databricks Runtime 16.1 以降で使用できます。

Databricks サービスの資格情報または IAM ロールを使用できない場合は、代わりに次の認証オプションを指定できます。

Key デフォルト Description
cloudFiles.awsAccessKey None ユーザーの AWS アクセス キー ID。 cloudFiles.awsSecretKeyを指定する必要があります。
cloudFiles.awsSecretKey None ユーザーの AWS シークレット アクセス キー。 cloudFiles.awsAccessKeyを指定する必要があります。
cloudFiles.roleArn None 引き受ける IAM ロールの ARN (必要な場合)。 ロールは、クラスターのインスタンス プロファイルから、または cloudFiles.awsAccessKeycloudFiles.awsSecretKeyで資格情報を指定することによって引き受けることができます。
cloudFiles.roleExternalId None cloudFiles.roleArn を使用してロールを引き受ける際に指定する識別子。
cloudFiles.roleSessionName None cloudFiles.roleArnを使用してロールを想定するときに使用するオプションのセッション名。
cloudFiles.stsEndpoint None cloudFiles.roleArn を使用してロールを引き受ける際に AWS STS にアクセスするために指定するオプションのエンドポイント。
Azure

cloudFiles.useNotifications = true を指定し、自動ローダーで通知サービスを設定する場合は、次のすべてのオプションの値を指定する必要があります。

Key デフォルト Description
cloudFiles.resourceGroup None ストレージ アカウントが作成されるAzure リソース グループ。
cloudFiles.subscriptionId None リソース グループが作成されるAzure サブスクリプション ID。
databricks.serviceCredential None Databricks のサービス資格情報の名前。 Databricks Runtime 16.1 以降で使用できます。

Databricks サービスの資格情報を使用できない場合は、代わりに次の認証オプションを指定できます。

Key デフォルト Description
cloudFiles.clientId None サービス プリンシパルのクライアント ID またはアプリケーション ID。
cloudFiles.clientSecret None サービス プリンシパルのクライアント シークレット。
cloudFiles.connectionString None アカウント アクセス キーまたは Shared Access Signature (SAS) に基づくストレージ アカウントの接続文字列。
cloudFiles.tenantId None サービス プリンシパルが作成されるAzure テナント ID。

cloudFiles.useNotifications = trueを設定し、自動ローダーで既存のキューを使用する場合にのみ、次のオプションを指定します。

Key デフォルト Description
cloudFiles.queueName None Azure キューの名前。 指定した場合、クラウド ファイル ソースは、独自の Azure Event Grid サービスと Queue Storage サービスを設定する代わりに、このキューのイベントを直接消費します。 その場合、databricks.serviceCredential または cloudFiles.connectionString では、キューに対する読み取りアクセス許可のみが必要です。
Gcp

自動ローダーは、Databricks サービス資格情報を利用して通知サービスを自動的に設定できます。 Databricks サービス資格情報を使用して作成されたサービス アカウントには、 ファイル通知モードでの自動ローダー ストリームの構成で指定されたアクセス許可が必要です。

Key デフォルト Description
cloudFiles.projectId None GCS バケットが存在するプロジェクトの ID。 Google Cloud Pub/Sub サブスクリプションもこのプロジェクト内に作成されます。
databricks.serviceCredential None Databricks のサービス資格情報の名前。 Databricks Runtime 16.1 以降で使用できます。

Databricks サービスの資格情報を使用できない場合は、Google サービス アカウントを直接使用できます。 Google サービスのセットアップ に従って、サービス アカウント 想定するようにクラスターを構成するか、次の認証オプションを直接指定できます。

Key デフォルト Description
cloudFiles.client None Google Service Account のクライアント ID。
cloudFiles.clientEmail None Google Service Account のメール アドレス。
cloudFiles.privateKey None Google サービス アカウント用に生成される秘密キー。
cloudFiles.privateKeyId None Google サービス アカウント用に生成された秘密キーの ID。

cloudFiles.useNotifications = true を選択し、既に設定したキューを自動ローダーで使用する場合にのみ、次のオプションを指定します。

Key デフォルト Description
cloudFiles.subscription None Google Cloud Pub/Sub サブスクリプションの名前。 指定されている場合、クラウド ファイル ソースは、独自の GCS 通知と Google Cloud Pub/Sub サービスを設定する代わりに、このキューからのイベントを消費します。

Delta Lake

次のオプションは、 spark.readStreamを使用して Delta Lake テーブルから読み取るときに適用されます。

Key デフォルト Description
allowSourceColumnDrop None 差分テーブルのバージョン番号または "always" に設定すると、ソース テーブル スキーマから列が削除された後もストリームを続行できます。 バージョン番号に設定すると、そのバージョンまでのすべてのスキーマ変更を確認します。 schemaTrackingLocation が必要です。 「Delta Lake 列マッピングを使用して列の名前を変更および削除する」を参照してください。
allowSourceColumnRename None 差分テーブルのバージョン番号または "always" に設定すると、ソース テーブル内の列の名前が変更された後もストリームを続行できます。 バージョン番号に設定すると、そのバージョンまでのすべてのスキーマ変更を確認します。 schemaTrackingLocation が必要です。 「Delta Lake 列マッピングを使用して列の名前を変更および削除する」を参照してください。
allowSourceColumnTypeChange None 差分テーブルのバージョン番号または "always" に設定すると、ソース テーブルで列の種類が変更された後もストリームを続行できます。 バージョン番号に設定すると、そのバージョンまでのすべてのスキーマ変更を確認します。 schemaTrackingLocation が必要です。 「タイプ拡張」を参照してください。
excludeRegex None 正規表現パターン。 パスがパターンと一致するファイルは、ストリーミング読み取りから除外されます。 予期される名前付け規則に準拠していないファイルをフィルターで除外する場合に便利です。
failOnDataLoss true ログの保持 (logRetentionDuration) のためにソース データが削除された場合にストリーミング クエリを失敗させるかどうか。 不足しているデータをスキップして処理を続行するには、 false に設定します。 「タイム トラベル クエリのデータ保持を構成する」を参照してください。
ignoreChanges (非推奨) false Databricks Runtime 11.3 LTS 以前で使用できます。 UPDATEMERGE INTODELETEOVERWRITEなどの変更操作後に、書き換えられたデータ ファイルを再出力します。 変更されていない行は新しい行と共に出力される可能性があるため、ダウンストリーム コンシューマーは重複を処理する必要があります。 削除はダウンストリームには反映されません。 Databricks Runtime 12.2 LTS 以降の skipChangeCommits に置き換えられました。
ignoreDeletes (非推奨) false パーティション境界でデータを削除するトランザクションを無視します (パーティションの完全な削除のみ)。 パーティション以外の削除、更新、またはその他の変更は処理しません。 skipChangeCommits を代わりに使用します。
readChangeFeed または readChangeData false ストリーミング クエリの変更データ フィードの読み取りを有効にするかどうかを指定します。 有効にすると、ストリームは、追加のメタデータ列を含む行レベルの変更 (挿入、更新、および削除) を出力します。 Azure Databricks で Delta Lake 変更データ フィードを使用する を参照してください。
schemaTrackingLocation None Delta Lake がストリーミング読み取りのスキーマ変更を追跡するディレクトリへのパス。 列マッピングが有効になっているテーブルからストリーミングし、スキーマの進化を処理するために allowSourceColumn* オプションを使用する場合に必要です。 ストリーミング クエリの checkpointLocation 内にある必要があります。 「Delta Lake 列マッピングを使用して列の名前を変更および削除する」を参照してください。
skipChangeCommits false 既存のレコードを削除または変更するトランザクションを無視し、追加のみを処理します。 Databricks では、変更データ フィードを使用しないほとんどのワークロードに対して、このオプションをお勧めします。 Databricks Runtime 12.2 LTS 以降で使用できます。 skipChangeCommitsを使用したアップストリーム変更コミットのスキップを参照してください。
startingTimestamp 利用可能な最新 読み取りを開始するタイムスタンプ。 ストリームは、指定されたタイムスタンプ以降にコミットされたすべてのテーブル変更を読み取ります。 使用可能なすべてのテーブルコミットの前にタイムスタンプがある場合、ストリームは使用可能な最も早いコミットから開始されます。 startingVersionと一緒に使用することはできません。 ストリーミング チェックポイントが既に存在する場合は無視されます。
有効な値: "2019-01-01T00:00:00.000Z" などのタイムスタンプ文字列、または "2019-01-01"などの日付文字列。
startingVersion 利用可能な最新 読み取りを開始する差分テーブルのバージョン。 ストリームは、指定したバージョン以降にコミットされたすべての変更を読み取ります。 最新の変更からのみ開始する "latest" を指定します。 startingTimestampと一緒に使用することはできません。 ストリーミング チェックポイントが既に存在する場合は無視されます。 テーブル履歴の操作を参照してください。
withEventTimeOrder false 初期テーブル スナップショットをイベント時間バケットに分割して、レコードが誤って遅延イベントとしてマークされ、ウォーターマーク付きのステートフル クエリで削除されないようにします。 チェックポイントを削除しないと、初期スナップショット処理が開始された後は変更できません。 Databricks Runtime 11.3 LTS 以降で使用できます。 「データを削除せずに初期スナップショットを処理する」を参照してください。

カフカ

spark.readStream.format("kafka")またはspark.read.format("kafka")で次のオプションを使用します。

Key デフォルト Description
assign None 使用する特定のパーティション。 subscribesubscribePattern、またはassignオプションのいずれかを正確に指定する必要があります。 有効な値: {"topicA":[0,1],"topicB":[2,4]}などの JSON 文字列。
failOnDataLoss true 削除されたトピックやオフセットの切り捨てなどによってデータが失われた可能性がある場合にクエリを失敗させるかどうか。 不足しているデータをスキップして続行するには、 false に設定します。 有効な値: truefalse
Databricks は、データが失われた可能性があるかどうかを保守的に見積もります。 ただし、誤ったアラームが発生する可能性があります。
fetchoffset.numretries 3 Kafka オフセットのフェッチが失敗したときの再試行回数。 有効な値: 負以外の整数。
fetchoffset.retryintervalms 1000 オフセット フェッチの再試行間隔 (ミリ秒単位)。 有効な値: 負以外の整数。
groupIdPrefix spark-kafka-source (ストリーミング)、 spark-kafka-relation (バッチ) 自動生成された Kafka コンシューマー グループ ID に使用するカスタマイズされたプレフィックス。 kafka.group.idが明示的に設定されている場合、コネクタはこのオプションを無視します。 有効な値: 任意の文字列。
includeHeaders false Kafka メッセージ ヘッダーを列として出力に含めるかどうか。 有効な値: truefalse
kafkaconsumer.polltimeoutms None Kafka コンシューマー poll() 呼び出しのタイムアウト (ミリ秒)。 有効な値: 正の整数。
kafka.bootstrap.servers None Kafka ブローカーの host:port アドレスのコンマ区切りのリスト。 Kafka クライアントの bootstrap.servers プロパティを設定します。
Kafka からのデータがない場合は、このブローカーのアドレス一覧で正しくないアドレスを確認してください。 ブローカーのアドレス一覧が正しくない場合は、エラーがない可能性があります。 Kafka クライアントは、ブローカーが最終的に使用可能になると想定し、ネットワーク エラーを受信したときに永久に再試行します。
maxRecordsPerPartition None 各 Spark パーティションのレコードの最大数。 設定すると、コネクタは Kafka パーティションを分割して、各 Spark パーティションが最大でこの多くのレコードを読み取るようにします。 有効な値: 正の整数。
このオプションは、 minPartitionsで使用することもできます。 両方のオプションが設定されている場合、Spark はどちらのオプションを使用してもパーティションが増えます。
minPartitions None Kafka から読み取る Spark パーティションの最小数。 設定すると、コネクタは大きな Kafka パーティションを分割して並列処理を向上させます。 設定しない場合、Spark は Kafka トピック パーティションごとに 1 つのパーティションを作成します。 データ スキューやピーク時の負荷を処理する場合に便利です。 有効な値: 正の整数。
このオプションは、トリガーごとに Kafka コンシューマーを再初期化します。これは、SSL のパフォーマンスに影響する可能性があります。
startingOffsets latest (ストリーミング)、 earliest (バッチ) クエリが読み取りを開始するオフセット。 有効な値: earliestlatest、または各パーティションのオフセットの JSON 文字列 ( {"topicA":{"0":23,"1":-2}}など)。 JSON 文字列では、 -1 は最新のオフセットです。 -2 は最も早いオフセットです。
ストリーミング クエリの場合、このオプションは新しいクエリの開始時にのみ適用されます。 再開されたクエリでは、常にチェックポイントが使用されます。 クエリ中、新しいパーティションは最も早いオフセットで読み取りを開始します。
バッチ クエリの場合、 latest は許可されません。
startingOffsetsByTimestamp None 各パーティションの開始オフセットのリスト。タイムスタンプとしてミリ秒単位で指定されます。 タイムスタンプのオフセットが存在しない場合、クエリの動作は startingOffsetsByTimestampStrategyによって決定されます。 有効な値: {"topicA":{"0":1000,"1":2000}}など、各パーティションのタイムスタンプの JSON 文字列。
ストリーミング クエリの場合、このオプションは新しいクエリの開始時にのみ適用されます。 再開されたクエリでは、常にチェックポイントが使用されます。 クエリ中、新しいパーティションは最も早いオフセットで読み取りを開始します。
startingOffsetsByTimestampStrategy error startingOffsetsByTimestampまたはstartingTimestampで指定されたタイムスタンプのオフセットが見つからない場合に使用する戦略。 有効な値: error (例外が発生します)、 latest (使用可能な最新のオフセットが使用されます)。
startingTimestamp None すべてのパーティションに適用されるグローバル開始タイムスタンプ (ミリ秒単位)。 タイムスタンプのオフセットが存在しない場合、動作は startingOffsetsByTimestampStrategyによって制御されます。 有効な値: 負以外の整数。
subscribe None サブスクライブするトピック。 subscribesubscribePattern、またはassignオプションのいずれかを正確に指定する必要があります。 有効な値: トピック名のコンマ区切りのリスト。
subscribePattern None トピックのサブスクライブに使用されるパターン。 subscribesubscribePattern、またはassignオプションのいずれかを正確に指定する必要があります。 たとえば、「 topic.* 」のように入力します。 有効な値: 任意のJava正規表現文字列。

次のオプションは、 spark.readStream.format("kafka")を使用したストリーミング読み取りにのみ適用されます。

Key デフォルト Description
bytesEstimateWindowLength 300s estimatedTotalBytesBehindLatest メトリックの残りのバイト数を見積もるために使用される時間枠。 有効な値: 10m600sなどの期間文字列。 詳しくは、Kafka メトリックを取得するをご覧ください。
maxOffsetsPerTrigger None トリガー間隔ごとに処理するオフセットの最大数。 オフセットは、トピック パーティション間で比例して分散されます。 有効な値: 正の整数。
maxTriggerDelay 15m トリガーする前に minOffsetsPerTrigger が蓄積されるまで待機する最大時間。 有効な値: 10m600sなどの期間文字列。
minOffsetsPerTrigger None マイクロバッチをトリガーする前に累積するオフセットの最小数。 maxTriggerDelayに達すると、マイクロバッチは関係なく実行されます。 有効な値: 正の整数。

spark.read.format("kafka")を使用したバッチ読み取りにのみ適用されるオフセット オプションについては、「DataFrameReader Kafka オプション」を参照してください。

Kafka クライアント (kafka.*) と認証オプションについては、「 オプション」を参照してください。

DataFrameWriter オプション

これらのオプションを DataFrameWriter.option() および DataFrameWriterV2.option() と共に使用して、データAzure Databricks書き込む方法を制御します。

Example

次の例では、Delta Lake テーブルを書き込むためのTruemergeSchemaを設定します。

Python
df.write.format("delta").option("mergeSchema", True).saveAsTable("my_table")
Scala
df.write.format("delta").option("mergeSchema", "true").saveAsTable("my_table")

アブロ

Key デフォルト Description
avroSchema None JSON 文字列としての完全な Avro スキーマ。 Spark SQL 型を特定の Avro 型に変換するには、このオプションを使用します。 Avro ファイルに適用されます。
avroSchemaUrl None Avro スキーマ ファイルを指す URL。 スキーマが外部に格納されている場合は、 avroSchema の代わりに使用します。 avroSchemaと相互に排他的です。 Avro ファイルに適用されます。
compression snappy 書き込み時に使用する圧縮コーデック。 有効な値: uncompresseddeflatesnappybzip2xzzstandardAvro ファイルに適用されます。
recordName topLevelRecord 出力 Avro スキーマの最上位レベルのレコード名。 Avro ファイルに適用されます。
positionalFieldMatching false Spark スキーマと Avro スキーマの間の列を名前ではなくフィールドの位置で照合するかどうか。 Avro ファイルに適用されます。
recordNamespace 空の文字列 出力 Avro スキーマの最上位レコードの名前空間。 Avro ファイルに適用されます。

Delta Lake と Apache Iceberg

Key デフォルト Description
clusterByAuto false 自動液体クラスタリングを有効にするかどうか。Azure Databricksクエリ パターンに基づいてクラスタリング列が選択されます。 mode("overwrite")でのみ有効です。 append モードでは使用できません。 Databricks Runtime 16.4 以降で使用できます。 [テーブルに液体クラスタリングを使用する] に適用されます。
mergeSchema None 書き込み操作でスキーマの進化を有効にするかどうかを指定します。 ソース DataFrame の新しい列がターゲット テーブル スキーマに追加されます。 バッチおよびストリーミングの追加に適用されます。 テーブル スキーマの更新に適用されます。
overwriteSchema None 上書き時にテーブル スキーマとパーティション分割を置き換えるかどうか。 mode("overwrite")なしでreplaceWhereが必要です。 partitionOverwriteModeでは使用できません。 テーブル スキーマの更新に適用されます。
partitionOverwriteMode None パーティション上書きモード。 これを dynamic に設定すると、新しいデータを含むパーティションのみが上書きされ、他のすべてのパーティションは変更されません。 レガシ モード。サーバーレス コンピューティングまたは Databricks SQL ではサポートされていません。 有効な値: staticdynamic。 Delta Lake を使用して データを選択的に上書きする場合に適用されます。
replaceOn None ソース クエリの行に置き換えるターゲット テーブル内の行と一致するブール式。 ターゲット テーブルとソース クエリの両方の列を参照できます。 ソース行と一致するターゲット内の行は削除され、置き換えられます。 ソースが空の場合、削除は行われません。 列参照のあいまいさを解消するには、 targetAlias を使用します。 Databricks Runtime 17.1 以降で使用できます。 Delta Lake を使用して データを選択的に上書きする場合に適用されます。
replaceUsing None ターゲット テーブルとソース クエリの間の行を照合するために使用される列名のコンマ区切りのリスト。 ターゲットとソースの両方に、一覧表示されているすべての列が含まれている必要があります。 等値比較でソース行と一致するターゲット内の行は削除され、置き換えられます。 NULL 値は等しくないとして扱われ、一致しません。 Databricks Runtime 16.3 以降で使用できます。 Delta Lake を使用して データを選択的に上書きする場合に適用されます。
replaceWhere None 述語式。 述語に一致するレコードのみをアトミックに上書きします。 Delta Lake を使用して データを選択的に上書きする場合に適用されます。
targetAlias None ターゲット テーブルの文字列エイリアス。 条件がターゲット テーブルとソース クエリの両方の列を参照する場合に、 replaceOn または replaceWhere を使用して列参照を明確にします。 Delta Lake を使用して データを選択的に上書きする場合に適用されます。
txnAppId None foreachBatch操作でのべき等書き込みのアプリケーションを識別する一意の文字列。 複数の Delta Lake テーブルへの正確な 1 回の書き込みを保証するには、 txnVersion と共に使用します。 べき等テーブル書き込みにforeachBatchを使用するために適用されます。
txnVersion None foreachBatch操作でのべき等書き込みのトランザクション バージョンとして使用される単調に増加する数。 複数の Delta Lake テーブルへの正確な 1 回の書き込みを保証するには、 txnAppId と共に使用します。 べき等テーブル書き込みにforeachBatchを使用するために適用されます。
optimizeWrite None この書き込み操作で自動最適化書き込みを有効にするかどうかを指定します。 spark.databricks.delta.optimizeWrite.enabled構成をオーバーライドします。 Azure Databricks?の Delta Lake に適用されます。
userMetadata None 書き込み操作のコミット メタデータに追加されたユーザー定義文字列。 DESCRIBE HISTORYの出力に表示されます。 カスタム メタデータを使用したテーブルのエンリッチに適用されます。

Csv

Key デフォルト Description
charToEscapeQuoteEscaping \0 (無効) エスケープ文字が引用符文字と異なる場合にエスケープするために使用される文字。 csv (DataFrameWriter) に適用されます。
compression none 書き込み時に使用する圧縮コーデック。 有効な値: nonebzip2gziplz4snappydeflatezstdcsv (DataFrameWriter) に適用されます。
dateFormat yyyy-MM-dd 日付列の値の書式指定文字列。 csv (DataFrameWriter) に適用されます。
emptyValue 空の文字列 空の (null 以外の) 値に対して書き込まれた文字列。 csv (DataFrameWriter) に適用されます。
encoding UTF-8 出力ファイルの文字エンコード。 csv (DataFrameWriter) に適用されます。
escape \ 引用符で囲まれた値をエスケープするために使用される文字。 csv (DataFrameWriter) に適用されます。
escapeQuotes true 引用符で囲まれたフィールド値内の引用符文字をエスケープするかどうか。 csv (DataFrameWriter) に適用されます。
header false 出力の最初の行として列名を書き込むかどうか。 csv (DataFrameWriter) に適用されます。
ignoreLeadingWhiteSpace false 書き込み時に先頭の空白を値からトリミングするかどうか。 csv (DataFrameWriter) に適用されます。
ignoreTrailingWhiteSpace false 書き込み時に値から末尾の空白をトリミングするかどうかを指定します。 csv (DataFrameWriter) に適用されます。
lineSep \n レコード間で使用される行区切り文字列。 csv (DataFrameWriter) に適用されます。
locale en-US java.util.Locale 識別子。 書き込み時の日付とタイムスタンプの値の書式設定に影響します。
nullValue 空の文字列 null 値に対して書き込まれた文字列。 csv (DataFrameWriter) に適用されます。
quote " 区切り記号を含むフィールド値を引用符で囲む文字。 csv (DataFrameWriter) に適用されます。
quoteAll false 内容に関係なく、すべてのフィールド値を引用符で囲むかどうか。 csv (DataFrameWriter) に適用されます。
sep , フィールド区切り文字。 csv (DataFrameWriter) に適用されます。
timestampFormat yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] タイムスタンプ列の値の書式指定文字列。 csv (DataFrameWriter) に適用されます。
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] タイム ゾーン (TimestampNTZType) 列の値を含まないタイムスタンプの書式指定文字列。

Excel

Key デフォルト Description
dataAddress None 書き込みのシート名または開始セル。 省略した場合、セル A1から始まるSheet1という名前のシートに書き込みます。 シート名 ("SheetName") または単一セル参照 ("SheetName!A1") を受け入れます。 セル範囲は書き込みではサポートされていません。
dateFormatInWrite yyyy-mm-dd Excel c0 /< > 列に適用されるセル書式指定文字列です。 Excel書式構文を使用します。
headerRows 0 列名を最初の行として書き込むかどうか。 有効な値: 01
timestampNTZFormat yyyy-mm-dd hh:mm:ss Excel TimestampNTZ および Timestamp 列に適用されるセル書式指定文字列。 Excel書式構文を使用します。
version xlsx 書き込むExcelファイル形式のバージョン。 有効な値: xlsxxls

Json

Key デフォルト Description
compression none 書き込み時に使用する圧縮コーデック。 有効な値: nonebzip2gziplz4snappydeflatezstdjson (DataFrameWriter) に適用されます。
dateFormat yyyy-MM-dd 日付列の値の書式指定文字列。 json (DataFrameWriter) に適用されます。
encoding UTF-8 出力ファイルの文字エンコード。 json (DataFrameWriter) に適用されます。
ignoreNullFields の値 spark.sql.jsonGenerator.ignoreNullFields JSON 出力から null 値を持つフィールドを省略するかどうか。 json (DataFrameWriter) に適用されます。
lineSep \n レコード間で使用される行区切り文字列。 json (DataFrameWriter) に適用されます。
locale en-US java.util.Locale 識別子。 書き込み時の日付とタイムスタンプの値の書式設定に影響します。
pretty false 美しい (インデントされた複数行の) JSON 出力を有効にするかどうか。
sortKeys false 出力で JSON オブジェクトのキーをアルファベット順に並べ替えるかどうか。 確定的な出力を生成する場合に便利です。
timestampFormat yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] タイムスタンプ列の値の書式指定文字列。 json (DataFrameWriter) に適用されます。
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] タイム ゾーン (TimestampNTZType) 列の値を含まないタイムスタンプの書式指定文字列。
writeNonAsciiCharacterAsCodePoint false 出力内のリテラル UTF-8 文字ではなく、非 ASCII 文字 \uXXXX Unicode エスケープ シーケンスとしてエンコードするかどうか。

オーク

Key デフォルト Description
compression zstd 書き込み時に使用する圧縮コーデック。 有効な値: noneuncompressedsnappyzliblzozstdlz4brotliorc (DataFrameWriter) に適用されます。

寄木細工

Key デフォルト Description
compression snappy 書き込み時に使用する圧縮コーデック。 有効な値: noneuncompressedsnappygziplzobrotlilz4lz4_rawzstdParquet (DataFrameWriter) に適用されます。
spark.sql.parquet.outputTimestampType INT96 タイムスタンプ列のエンコードに使用される物理型。 有効な値: INT96TIMESTAMP_MICROSTIMESTAMP_MILLIS。 標準のタイムスタンプ型をサポートしていない従来の Parquet リーダーとの互換性を保つには、 INT96 を使用します。

テキスト

Key デフォルト Description
compression none 書き込み時に使用する圧縮コーデック。 有効な値: nonebzip2gziplz4snappydeflatezstd。 テキスト ( DataFrameWriter) に適用されます。
encoding UTF-8 出力ファイルの文字エンコード。
lineSep \n レコード間で使用される行区切り文字列。 テキスト ( DataFrameWriter) に適用されます。

Xml

Key デフォルト Description
arrayElementName item 明示的な名前を持たない配列要素の要素名。 xml (DataFrameWriter) に適用されます。
attributePrefix _ XML 属性に対応するフィールド名の前に付加されるプレフィックス。 xml (DataFrameWriter) に適用されます。
compression none 書き込み時に使用する圧縮コーデック。 有効な値: nonebzip2gziplz4snappydeflatezstdxml (DataFrameWriter) に適用されます。
dateFormat yyyy-MM-dd 日付列の値の書式指定文字列。 xml (DataFrameWriter) に適用されます。
declaration version="1.0" encoding="UTF-8" standalone="yes" 各出力ファイルの先頭に書き込まれた XML 宣言文字列。 宣言を抑制する空の文字列に設定します。 xml (DataFrameWriter) に適用されます。
encoding UTF-8 出力ファイルの文字エンコード。 xml (DataFrameWriter) に適用されます。
indent 4 スペース 出力内の子要素のインデントに使用される文字列。 インデントをオフにし、各行を 1 行に書き込むには、空の文字列に設定します。
locale en-US java.util.Locale 識別子。 書き込み時の日付とタイムスタンプの値の書式設定に影響します。
nullValue null null 値に対して書き込まれた文字列。 nullに設定すると、null フィールドの属性と子要素は省略されます。 xml (DataFrameWriter) に適用されます。
rootTag ROWS 出力内のすべての行要素をラップするルート要素タグ。 xml (DataFrameWriter) に適用されます。
rowTag ROW 出力内の行を表す要素タグ。 xml (DataFrameWriter) に適用されます。
singleVariantColumn None XML ファイルに書き込む 1 つの Variant 列の名前。 xml (DataFrameWriter) に適用されます。
timestampFormat yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] タイムスタンプ列の値の書式指定文字列。 xml (DataFrameWriter) に適用されます。
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] タイム ゾーン列の値を含まないタイムスタンプの書式指定文字列。 xml (DataFrameWriter) に適用されます。
validateName true 列名が有効な XML 要素識別子でない場合に例外をスローするかどうか。 xml (DataFrameWriter) に適用されます。
valueTag _VALUE 属性または子要素を持つ XML 要素の文字データに使用されるフィールド名。 xml (DataFrameWriter) に適用されます。

DataStreamWriter オプション

ストリーミング書き込みを構成するには、これらのオプションと DataStreamWriter.option() を使用します。

Example

次の例では、ストリームのチェックポイントの場所を設定します。

Python
(df.writeStream
  .format("delta")
  .option("checkpointLocation", "/path/to/checkpoint")
  .start("/path/to/table"))
Scala
df.writeStream
  .format("delta")
  .option("checkpointLocation", "/path/to/checkpoint")
  .start("/path/to/table")

共通

Key デフォルト Description
checkpointLocation なし (必須) ストリーミング クエリのチェックポイント ディレクトリへのパス。 フォールト トレランスと正確に 1 回の処理の保証に必要です。 各ストリーミング クエリでは、一意のチェックポイントの場所を使用する必要があります。 Databricks では、Unity カタログ ボリュームまたはクラウド ストレージ パスにチェックポイントを格納することをお勧めします。 「構造化ストリーミング チェックポイント」を参照してください。
path None Parquet などのファイル ベースのストリーミング シンクの出力パス。 ファイル ベースの形式にのみ適用されます。

コンソール シンク

Key デフォルト Description
numRows 20 コンソール シンクに書き込むときにマイクロバッチごとに表示する行数。
truncate true 行を表示するときに長い文字列を切り捨てるかどうかを指定します。 完全な文字列値を表示するには、 false に設定します。

Delta Lake

format("delta")を使用して Delta Lake テーブルにストリームを書き込む場合は、次のオプションが適用されます。 overwriteSchemareplaceWherepartitionOverwriteModeなどの上書き専用オプションは、ストリーミング書き込みではサポートされていません。

Key デフォルト Description
mergeSchema false ストリーミング DataFrame に新しい列が含まれている場合に Delta Lake テーブル スキーマを進化させるかどうか。 追加出力モードにのみ適用されます。 テーブル スキーマの更新に適用されます。
userMetadata None 書き込み操作のコミット メタデータに追加されたユーザー定義文字列。 DESCRIBE HISTORYの出力に表示されます。 カスタム メタデータを使用したテーブルのエンリッチに適用されます。

ファイル シンク

次のオプションは、ストリームをファイル ベースの形式 (Parquet、JSON、CSV、ORC、text) に書き込む場合に適用されます。 形式固有のオプションについては、「 DataFrameWriter オプション」を参照してください。

Key デフォルト Description
retention None フォールト トレランスと圧縮に使用されるシンク メタデータ ファイルを保持する期間。 7 days24 hoursなどの時刻文字列を受け入れます。 設定しない場合、メタデータ ファイルは無期限に保持されます。

Kafka シンク

Kafka にストリームを書き込むためのオプションの完全な一覧については、「 オプション」を参照してください。

Key デフォルト Description
kafka.bootstrap.servers None 必須。 Kafka ブローカー host:port アドレスのコンマ区切りのリスト。
topic None すべての行のターゲット Kafka トピック。 DataFrame に topic 列が含まれていない場合は必須です。
kafka.* None kafka.プレフィックスが付いた Kafka プロデューサー構成。 たとえば、「 kafka.compression.type 」のように入力します。

メモリ シンク

Key デフォルト Description
queryName なし (必須) クエリが書き込むメモリ内テーブルの名前。 メモリ シンクに必要です。 .queryName()を介して構成することもできます。
mode exactlyonce メモリ シンクの配信保証。 exactlyonce は、厳密に 1 回のセマンティクスを持つマイクロバッチ モードを使用します。 atleastonce は、少なくとも 1 回のセマンティクスを持つ連続モードを使用します。 有効な値: exactlyonceatleastonce

Spark 関数のオプション

一部の Spark SQL 組み込み関数は、解析またはシリアル化の動作を制御する options マップを受け入れます。 オプションを Python dict または Scala Map[String, String] として渡します。

Example

次の例では、形式が正しくないレコードを削除しながら JSON 列を解析します。

Python
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, StringType

schema = StructType([StructField("name", StringType())])
df = df.withColumn("parsed", from_json("json_col", schema, {"mode": "DROPMALFORMED"}))
Scala
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types._

val schema = StructType(Seq(StructField("name", StringType)))
val df = df.withColumn("parsed", from_json(col("json_col"), schema, Map("mode" -> "DROPMALFORMED")))

アブロ

Avro 関数は、対応する DataFrame オプションと同じオプションを受け入れます。

Example

次の例では、スキーマの進化が有効になっている Avro 列をデコードします。

Python
from pyspark.sql.functions import from_avro

df = df.withColumn("decoded", from_avro("avro_col", json_schema, {"avroSchemaEvolutionMode": "restart"}))
Scala
import org.apache.spark.sql.avro.functions.from_avro

val df = df.withColumn("decoded", from_avro(col("avro_col"), jsonSchema, Map("avroSchemaEvolutionMode" -> "restart")))

さらに、 from_avroto_avro のスキーマ レジストリバリアントでは、次のオプションを受け入れます。

Key デフォルト Description
schemaId None jsonFormatSchemaと互換性のないスキーマでエンコードされた Avro データをデコードするときに使用する Confluent スキーマ レジストリのスキーマ ID。 from_avroにのみ適用されます。
confluent.schema.registry.* None Confluent Schema Registry クライアント構成プロパティ。 基本認証資格情報の confluent.schema.registry.basic.auth.user.info など、このプレフィックスを使用して Confluent SR クライアント プロパティを渡します。 from_avroto_avroのスキーマ レジストリバリアントに必要です。

Csv

CSV 関数は、対応する DataFrame オプションと同じオプションを受け入れます。

Example

次の例では、カスタム区切り記号と NULL 値を含む CSV を読み取ります。

Python
from pyspark.sql.functions import from_csv
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

schema = StructType([StructField("id", IntegerType()), StructField("name", StringType())])
df = df.withColumn("parsed", from_csv("csv_col", schema, {"sep": "|", "nullValue": "N/A"}))
Scala
import org.apache.spark.sql.functions.from_csv
import org.apache.spark.sql.types._

val schema = StructType(Seq(StructField("id", IntegerType), StructField("name", StringType)))
val df = df.withColumn("parsed", from_csv(col("csv_col"), schema, Map("sep" -> "|", "nullValue" -> "N/A")))

Json

JSON 関数は、対応する DataFrame オプションと同じオプションを受け入れます。

Example

次の例では、 NULL フィールドが無視され、整形書式設定が有効になっている JSON を書き込みます。

Python
from pyspark.sql.functions import to_json

df = df.withColumn("json_str", to_json("struct_col", {"pretty": "true", "ignoreNullFields": "true"}))
Scala
import org.apache.spark.sql.functions.to_json

val df = df.withColumn("json_str", to_json(col("struct_col"), Map("pretty" -> "true", "ignoreNullFields" -> "true")))

Protobuf

from_protobufto_protobuf はファイルベースのデータソースを使用しません。 Protobuf データは、これらの関数を使用して常にバイナリ列として読み書きされます。 オプションは Map[String, String] として渡され、大文字と小文字が区別されます。

Example

次の例では、PERMISSIVE モードを使用して Protobuf 列をデコードします。

Python
from pyspark.sql.functions import from_protobuf

df = df.withColumn("decoded", from_protobuf("proto_col", "MyMessage", "/path/to/descriptor.desc",
    {"mode": "PERMISSIVE", "enums.as.ints": "true"}))
Scala
import org.apache.spark.sql.protobuf.functions.from_protobuf

val df = df.withColumn("decoded", from_protobuf(col("proto_col"), "MyMessage", "/path/to/descriptor.desc",
    Map("mode" -> "PERMISSIVE", "enums.as.ints" -> "true")))

Protobuf 関数では、次のオプションを使用します。

Key デフォルト Description
mode FAILFAST 破損したレコードを処理する方法。 FAILFAST で例外がスローされました。 PERMISSIVE は、形式が正しくないフィールドを null に設定します。 有効な値: FAILFASTPERMISSIVEfrom_protobufに適用されます。
recursive.fields.max.depth -1 (無効) 再帰 Protobuf フィールドの最大再帰深度。 再帰フィールドのサポートをオフにするには、 0 に設定します。 有効な値: 10する0from_protobufに適用されます。
convert.any.fields.to.json false STRUCTではなく、Protobuf Anyフィールドを JSON 文字列に変換するかどうかを指定します。 from_protobufに適用されます。
emit.default.values false ゼロまたは既定値 (proto3 セマンティクス) を持つフィールドを出力するかどうか。 falseすると、既定値のフィールドは出力から省略されます。 from_protobufに適用されます。
enums.as.ints false 列挙型フィールドを文字列の代わりに整数値としてレンダリングするかどうかを指定します。 from_protobufに適用されます。
upcast.unsigned.ints false 整数オーバーフローを防ぐためにuint32Longにアップキャストし、Decimal(20,0)uint64するかどうか。 from_protobufに適用されます。
unwrap.primitive.wrapper.types false google.protobufラッパー型 (Int32ValueStringValueなど) を対応するプリミティブ Spark 型にラップ解除するかどうか。 from_protobufに適用されます。
retain.empty.message.types false ダミー列を挿入して、空の Protobuf メッセージ型を出力スキーマに保持するかどうかを指定します。 from_protobufに適用されます。
schema.registry.subject None スキーマ レジストリのサブジェクト名。 from_protobufto_protobufのスキーマ レジストリバリアントを使用する場合に必要です。
schema.registry.address None スキーマ レジストリ アドレス (ホストとポート)。 from_protobufto_protobufのスキーマ レジストリバリアントを使用する場合に必要です。
schema.registry.protobuf.name None スキーマ レジストリのサブジェクトに複数のメッセージが含まれている場合に使用する Protobuf メッセージを指定します。 オプション。

Xml

XML 関数は、対応する DataFrame オプションと同じオプションを受け入れます。

Example

次の例では、カスタム ルートタグと行タグを使用して XML を書き込みます。

Python
from pyspark.sql.functions import to_xml

df = df.withColumn("xml_str", to_xml("struct_col", {"rootTag": "records", "rowTag": "record"}))
Scala
import org.apache.spark.sql.functions.to_xml

val df = df.withColumn("xml_str", to_xml(col("struct_col"), Map("rootTag" -> "records", "rowTag" -> "record")))