Share via


バイナリ ファイル

Databricks Runtime ではバイナリ ファイル データ ソースがサポートされます。これはバイナリ ファイルを読み取り、各ファイルを、ファイルの生のコンテンツとメタデータを含む 1 つのレコードに変換します。 バイナリ ファイル データ ソースでは、次の列と場合によってはパーティション列を含む DataFrame が生成されます。

  • path (StringType): ファイルのパス。
  • modificationTime (TimestampType): ファイルの変更時刻。 一部の Hadoop FileSystem 実装では、このパラメーターが使用できず、値が既定値に設定される場合があります。
  • length (LongType): ファイルの長さ (バイト単位)。
  • content (BinaryType): ファイルの内容。

バイナリ ファイルを読み取る場合は、データ ソース formatbinaryFile として指定します。

イメージ

Databricks では、バイナリ ファイル データ ソースを使用して、画像データを読み込むことが推奨されています。

Databricks display 関数では、バイナリ データ ソースを使用して読み込まれた画像データの表示がサポートされています。

読み込まれたすべてのファイルに、画像の拡張子を持つファイル名がある場合、画像プレビューは自動的に有効になります。

df = spark.read.format("binaryFile").load("<path-to-image-dir>")
display(df)    # image thumbnails are rendered in the "content" column

image preview

または、mimeType オプションと文字列値 "image/*" を使用してバイナリ列に注釈を付け、画像のプレビュー機能を強制することもできます。 画像は、バイナリ コンテンツ内のフォーマット情報に基づいてデコードされます。 サポートされている画像の種類は bmpgifjpeg、および png です。 サポートされていないファイルは、壊れた画像アイコンとして表示されます。

df = spark.read.format("binaryFile").option("mimeType", "image/*").load("<path-to-dir>")
display(df)    # unsupported files are displayed as a broken image icon

image preview with unsupported file type

イメージ データを処理するために推奨されるワークフローについては、「イメージ アプリケーションのリファレンス ソリューション」を参照してください。

オプション

パーティション検出の動作を維持しながら、特定の glob パターンに一致するパスを使用してファイルを読み込むには、pathGlobFilter オプションを使用できます。 次のコードでは、パーティション検出を使用して入力ディレクトリからすべての JPG ファイルを読み取ります。

df = spark.read.format("binaryFile").option("pathGlobFilter", "*.jpg").load("<path-to-dir>")

パーティション検出を無視し、入力ディレクトリの下にあるファイルを再帰的に検索する場合は、recursiveFileLookup オプションを使用します。 このオプションでは、名前が date=2019-07-01 のようなパーティション名前付けスキームに従って "いなくても"、入れ子になったディレクトリを検索します。 次のコードでは、すべての JPG ファイルを入力ディレクトリから再帰的に読み取り、パーティション検出を無視します。

df = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load("<path-to-dir>")

Scala、Java、R にも同様の API が存在します。

注意

データを再び読み込む際の読み取りパフォーマンスを向上させるために、Azure Databricks では、バイナリ ファイルから読み込まれたデータを保存するときに圧縮をオフに設定することが推奨されています。

spark.conf.set("spark.sql.parquet.compression.codec", "uncompressed")
df.write.format("delta").save("<path-to-table>")