ORC ファイルを操作する

Apache ORC は、クエリを高速化するための最適化を提供する列形式のファイル形式です。 CSVJSON よりも効率的です。 Azure Databricks では、Apache Spark での読み取りと書き込みの両方に対して ORC がサポートされています。 詳細については、 ORC Files に関する Apache Spark のドキュメントを参照してください。

前提条件

Azure Databricks では、ORC ファイルを使用するために追加の構成は必要ありません。 ただし、ORC ファイルをストリーミングするには、自動ローダーが必要です。

DataFrame API で ORC を構成して使用する

スキーマ、パーティション分割、または書き込みの動作を完全に制御する必要がある場合は、Apache Spark DataFrame API を使用して ORC ファイルの読み取りと書き込みを行います。

読み取りと書き込みのオプション

サポートされている DataFrame API の読み取りおよび書き込みオプションについては、次の Apache Spark リファレンス記事を参照してください。

ORC ファイルの読み取りと書き込み

たとえば、dataFrame data.orcdfを読み取り、orc_outputに書き込みます。

Python

# Read an ORC file into a DataFrame
df = spark.read.format("orc").load("/tmp/data.orc")
df.show()

# Write a DataFrame to ORC format
df.write.format("orc").save("/tmp/orc_output")

# Write with overwrite mode
df.write.format("orc").mode("overwrite").save("/tmp/orc_output")

Scala

// Read an ORC file into a DataFrame
val df = spark.read.format("orc").load("/tmp/data.orc")
df.show()

// Write a DataFrame to ORC format
df.write.format("orc").save("/tmp/orc_output")

// Write with overwrite mode
df.write.format("orc").mode("overwrite").save("/tmp/orc_output")

SQL

-- Query ORC files directly
SELECT * FROM orc.`/tmp/data.orc`;

-- Create a table from ORC files
CREATE TABLE orc_table
USING ORC
OPTIONS (path "/tmp/data.orc");

SELECT * FROM orc_table;

スキーマ仕様で ORC ファイルを読み取る

スキーマ推論のオーバーヘッドを回避するために、ORC ファイルを読み取るときにスキーマを指定します。 たとえば、 nameage、および city フィールドを使用してスキーマを定義し、 data.orc を DataFrame dfに読み取ります。

Python

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("city", StringType(), True)
])

df = spark.read.format("orc").schema(schema).load("/tmp/data.orc")
df.printSchema()
df.show()

Scala

import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}

val schema = StructType(Array(
  StructField("name", StringType, nullable = true),
  StructField("age", IntegerType, nullable = true),
  StructField("city", StringType, nullable = true)
))

val df = spark.read.format("orc").schema(schema).load("/tmp/data.orc")
df.printSchema()
df.show()

SQL

-- Create a table with an explicit schema from ORC files
CREATE TABLE orc_table (
  name STRING,
  age INT,
  city STRING
)
USING ORC
OPTIONS (path "/tmp/data.orc");

SELECT * FROM orc_table;

パーティション分割された ORC ファイルを書き込む

パーティション分割された ORC ファイルを書き込み、大規模なデータセットのクエリ パフォーマンスを最適化します。 たとえば、dfyearmonth、およびname列を使用して DataFrame amountを作成し、partitioned_orcyearでパーティション分割されたmonthに書き込みます。

Python

df = spark.createDataFrame(
    [
        (2023, 1, "Alice", 100),
        (2023, 1, "Bob", 200),
        (2023, 2, "Alice", 150),
        (2024, 1, "Alice", 300),
    ],
    ["year", "month", "name", "amount"]
)

# Write partitioned by year and month
df.write.format("orc").partitionBy("year", "month").save("/tmp/partitioned_orc")

Scala

val df = Seq(
  (2023, 1, "Alice", 100),
  (2023, 1, "Bob", 200),
  (2023, 2, "Alice", 150),
  (2024, 1, "Alice", 300)
).toDF("year", "month", "name", "amount")

// Write partitioned by year and month
df.write.format("orc").partitionBy("year", "month").save("/tmp/partitioned_orc")

SQL

-- Create a partitioned ORC table
CREATE TABLE partitioned_orc_table (
  name STRING,
  amount INT
)
USING ORC
PARTITIONED BY (year INT, month INT);

SQL を使用して ORC ファイルを読み取る

テーブルを作成せずに SQL を使用してクラウド ストレージから直接 ORC ファイルに対してクエリを実行するには、 read_files を使用します。 たとえば、ファイルへのパスと orc 書式指定子を使用して、クラウド ストレージに格納されている ORC ファイルに対してクエリを実行します。

SELECT * FROM read_files(
  's3://<bucket>/<path>/<file>.orc',
  format => 'orc'
)

ORC 圧縮を設定する

compression オプションを使用して ORC 圧縮を構成します。 サポートされているコーデックには、 nonesnappyzliblzoが含まれます。 たとえば、df圧縮を使用してcompressed_orcからzlibに、またはsnappy_orc圧縮を使用してsnappyに変換を書きます。

Python

# Write with zlib compression
df.write.format("orc").option("compression", "zlib").save("/tmp/compressed_orc")

# Write with snappy compression (default)
df.write.format("orc").option("compression", "snappy").save("/tmp/snappy_orc")

Scala

// Write with zlib compression
df.write.format("orc").option("compression", "zlib").save("/tmp/compressed_orc")

// Write with snappy compression (default)
df.write.format("orc").option("compression", "snappy").save("/tmp/snappy_orc")

SQL

-- Create an ORC table with zlib compression
CREATE TABLE compressed_orc_table (
  name STRING,
  age INT,
  city STRING
)
USING ORC
TBLPROPERTIES ('orc.compress' = 'ZLIB');

-- Create an ORC table with snappy compression
CREATE TABLE snappy_orc_table (
  name STRING,
  age INT,
  city STRING
)
USING ORC
TBLPROPERTIES ('orc.compress' = 'SNAPPY');