チュートリアル: Apache Spark DataFrame を使用してデータを読み込んで変換する

このチュートリアルでは、Azure Databricks で Apache Spark Python (PySpark) DataFrame API、Apache Spark Scala DataFrame API、SparkR SparkDataFrame API を使ってデータを読み込んで変換する方法について説明します。

このチュートリアルを最後まで進めると、DataFrame とは何であるかを理解し、以下のタスクを快適に実行できます。

Python

Apache Spark PySpark API リファレンスも参照してください。

Scala

Apache Spark Scala API リファレンスも参照してください。

R

Apache SparkR API リファレンスも参照してください。

DataFrame とは

DataFrame は、型が異なる可能性のある列を持つ、2次元のラベルの付いたデータ構造です。 DataFrame は、スプレッドシート、SQL テーブル、または一連のオブジェクトのディクショナリのようなものと考えることができます。 Apache Spark DataFrames には、一般的なデータ分析の問題を効率的に解決できるようにする豊富な機能セット (列の選択、フィルター、結合、集計) が用意されています。

Apache Spark DataFrames は、Resilient Distributed Datasets (RDD) に基づいて構築された抽象化です。 Spark DataFrames と Spark SQL では、統合された計画と最適化エンジンが使用されるため、Azure Databricks でサポートされているすべての言語 (Python、SQL、Scala、R) でほぼ同じパフォーマンスを得ることができます。

要件

次のチュートリアルを完了するには、次の要件を満たす必要があります。

  • このチュートリアルの例を使うには、ワークスペースで Unity Catalog が有効になっている必要があります。

  • このチュートリアルの例では、Unity Catalog ボリュームを使ってサンプル データを格納します。 これらの例を使うには、ボリュームを作成し、そのボリュームのカタログ、スキーマ、ボリュームの名前を使って、例で使われるボリューム パスを設定します。

  • Unity Catalog での次のアクセス許可が必要です。

    • このチュートリアルで使うボリュームに対する READ VOLUMEWRITE VOLUME、または ALL PRIVILEGES
    • このチュートリアルで使うスキーマに対する USE SCHEMA または ALL PRIVILEGES
    • このチュートリアルで使うカタログに対する USE CATALOG または ALL PRIVILEGES

    これらのアクセス許可を設定するには、Databricks 管理者に確認するか、「Unity Catalog の権限とセキュリティ保護可能なオブジェクト」を参照してください。

ステップ 1: 変数を定義して CSV ファイルを読み込む

このステップでは、このチュートリアルで使う変数を定義してから、赤ちゃんの名前のデータを含む CSV ファイルを health.data.ny.gov から Unity Catalog ボリュームに読み込みます。

  1. 新規アイコン アイコンをクリックして、新しいノートブックを開きます。 Azure Databricks ノートブック内を移動する方法を学習するには、「Databricks ノートブックのインターフェイスとコントロール」を参照してください。

  2. 次のコードをコピーして、新しい空のノートブック セルに貼り付けます。 <catalog-name><schema-name><volume-name> を、Unity Catalog ボリュームのカタログ、スキーマ、ボリュームの名前に置き換えます。 <table_name> を、任意のテーブル名に置き換えます。 このチュートリアルで後ほど、このテーブルに赤ちゃんの名前のデータを読み込みます。

  3. Shift+Enter キーを押してセルを実行し、新しい空のセルを作成します。

    Python

    catalog = "<catalog_name>"
    schema = "<schema_name>"
    volume = "<volume_name>"
    download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name = "rows.csv"
    table_name = "<table_name>"
    path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume
    path_tables = catalog + "." + schema
    print(path_tables) # Show the complete path
    print(path_volume) # Show the complete path
    

    Scala

    val catalog = "<catalog_name>"
    val schema = "<schema_name>"
    val volume = "<volume_name>"
    val download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    val file_name = "rows.csv"
    val table_name = "<table_name>"
    val path_volume = s"/Volumes/$catalog/$schema/$volume"
    val path_tables = s"$catalog.$schema.$table_name"
    print(path_volume) // Show the complete path
    print(path_tables) // Show the complete path
    

    R

    catalog <- "<catalog_name>"
    schema <- "<schema_name>"
    volume <- "<volume_name>"
    download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name <- "rows.csv"
    table_name <- "<table_name>"
    path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "")
    path_tables <- paste(catalog, ".", schema, sep = "")
    print(path_volume) # Show the complete path
    print(path_tables) # Show the complete path
    
  4. 次のコードをコピーして、新しい空のノートブック セルに貼り付けます。 このコードでは、Databricks dbutils コマンドを使って、health.data.ny.gov から Unity Catalog ボリュームに rows.csv ファイルをコピーします。

  5. Shift+Enter キーを押してセルを実行してから、次のセルに移動します。

    Python

    dbutils.fs.cp(f"{download_url}", f"{path_volume}" + "/" + f"{file_name}")
    

    Scala

    dbutils.fs.cp(download_url, s"$path_volume/$file_name")
    

    R

    dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
    

ステップ 2: DataFrame を作成する

このステップでは、テスト データを含む df1 という名前の DataFrame を作成してから、その内容を表示します。

  1. 次のコードをコピーして、新しい空のノートブック セルに貼り付けます。 このコードでは、テスト データを含む DataFrame を作成してから、DataFrame の内容とスキーマを表示します。

  2. Shift+Enter キーを押してセルを実行してから、次のセルに移動します。

    Python

    data = [[2021, "test", "Albany", "M", 42]]
    columns = ["Year", "First_Name", "County", "Sex", "Count"]
    
    df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int")
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    Scala

    val data = Seq((2021, "test", "Albany", "M", 42))
    val columns = Seq("Year", "First_Name", "County", "Sex", "Count")
    
    val df1 = data.toDF(columns: _*)
    display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization.
    // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    R

    # Load the SparkR package that is already preinstalled on the cluster.
    library(SparkR)
    
    data <- data.frame(
      Year = c(2021),
      First_Name = c("test"),
      County = c("Albany"),
      Sex = c("M"),
      Count = c(42)
    )
    
    df1 <- createDataFrame(data)
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
    

ステップ 3: CSV ファイルから DataFrame にデータを読み込む

このステップでは、前に Unity Catalog ボリュームに読み込んだ CSV ファイルから、df_csv という名前の DataFrame を作成します。 spark.read.csv を参照してください。

  1. 次のコードをコピーして、新しい空のノートブック セルに貼り付けます。 このコードでは、赤ちゃんの名前のデータを CSV ファイルから DataFrame df_csv に読み込み、DataFrame の内容を表示します。

  2. Shift+Enter キーを押してセルを実行してから、次のセルに移動します。

    Python

    df_csv = spark.read.csv(f"{path_volume}/{file_name}",
      header=True,
      inferSchema=True,
      sep=",")
    display(df_csv)
    

    Scala

    val df_csv = spark.read
      .option("header", "true")
      .option("inferSchema", "true")
      .option("delimiter", ",")
      .csv(s"$path_volume/$file_name")
    
    display(df_csv)
    

    R

    df_csv <- read.df(paste(path_volume, "/", file_name, sep=""),
      source="csv",
      header = TRUE,
      inferSchema = TRUE,
      delimiter = ",")
    
    display(df_csv)
    

サポートされている多くのファイル形式からデータを読み込むことができます。

ステップ 4: DataFrame を表示して操作する

次の方法を使って、赤ちゃんの名前の DataFrame を表示して操作します。

Apache Spark DataFrame のスキーマを表示する方法を説明します。 Apache Spark で使う "スキーマ" という用語は、DataFrame 内の列の名前とデータ型を意味します。

次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードでは、2 つの DataFrame を結合する準備として、.printSchema() メソッドを使って 2 つの DataFrame のスキーマを表示します。

Python

df_csv.printSchema()
df1.printSchema()

Scala

df_csv.printSchema()
df1.printSchema()

R

printSchema(df_csv)
printSchema(df1)

Note

Azure Databricks では、カタログに登録されているテーブルのコレクションも "スキーマ" という用語を使用して説明します。

DataFrame の列の名前を変更する

DataFrame の列の名前を変更する方法を説明します。

次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードでは、df1_csv DataFrame の列の名前を、df1 DataFrame のそれぞれの列と一致するように変更します。 このコードでは、Apache Spark withColumnRenamed() メソッドを使います。

Python

df_csv = df_csv.withColumnRenamed("First Name", "First_Name")
df_csv.printSchema

Scala

val df_csvRenamed = df_csv.withColumnRenamed("First Name", "First_Name")
// when modifying a DataFrame in Scala, you must assign it to a new variable
df_csv_renamed.printSchema()

R

df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name")
printSchema(df_csv)

DataFrame を結合する

1 つの DataFrame の行を別のものに追加する新しい DataFrame を作成する方法を説明します。

次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードでは、Apache Spark union() メソッドを使って、最初の DataFrame df の内容と、CSV ファイルから読み込まれた赤ちゃんの名前のデータを含む DataFrame df_csv を結合します。

Python

df = df1.union(df_csv)
display(df)

Scala

val df = df1.union(df_csv_renamed)
display(df)

R

display(df <- union(df1, df_csv))

DataFrame で行をフィルター処理する

Apache Spark .filter() または .where() メソッドを使って行をフィルター処理し、データ セット内で最も一般的な赤ちゃんの名前を検出します。 フィルター処理を使用して、DataFrame で返す行または変更する行のサブセットを選択します。 以下の例に示すように、パフォーマンスや構文に違いはありません。

.filter() メソッドの使用

次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードでは、Apache Spark .filter() メソッドを使って、DataFrame 内で数が 50 より多い行を表示します。

Python
display(df.filter(df["Count"] > 50))
Scala
display(df.filter(df("Count") > 50))
R
display(filteredDF <- filter(df, df$Count > 50))

.where() メソッドの使用

次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードでは、Apache Spark .where() メソッドを使って、DataFrame 内で数が 50 より多い行を表示します。

Python
display(df.where(df["Count"] > 50))
Scala
display(df.where(df("Count") > 50))
R
display(filtered_df <- where(df, df$Count > 50))

DataFrame から列を選択して頻度順に並べ替える

select() メソッドで赤ちゃんの名前の頻度を使って、返す DataFrame の列を指定する方法を説明します。 Apache Spark orderbydesc 関数を使って結果を並べ替えます。

Apache Spark 用の pyspark.sql モジュールでは、SQL 関数がサポートされています。 このチュートリアルで使うこれらの関数の中に、Apache Spark orderBy()desc()expr() 関数があります。 必要に応じてこれらの関数をセッションにインポートし、それらを使用できるようにします。

次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードでは、desc() 関数をインポートしてから、Apache Spark select() メソッドおよび Apache Spark orderBy()desc() 関数を使って、最も一般的な名前とその数を降順に表示します。

Python

from pyspark.sql.functions import desc
display(df.select("First_Name", "Count").orderBy(desc("Count")))

Scala

import org.apache.spark.sql.functions.desc
display(df.select("First_Name", "Count").orderBy(desc("Count")))

R

display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))

サブセットの DataFrame を作成する

既存の DataFrame からサブセット DataFrame を作成する方法を説明します。

次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードでは、Apache Spark filter メソッドを使い、年、数、性別でデータを制限して新しい DataFrame を作成します。 Apache Spark select() メソッドを使って列を制限します。 また、Apache Spark orderBy()desc() 関数を使って、数の順で新しい DataFrame を並べ替えます。

Python

subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
display(subsetDF)

Scala

val subsetDF = df.filter((df("Year") == 2009) && (df("Count") > 100) && (df("Sex") == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))

display(subsetDF)

R

subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count")
display(subsetDF)

ステップ 5: DataFrameを保存する

DataFrame を保存する方法を説明します。 DataFrame をテーブルに保存することも、DataFrame を 1 つのファイルまたは複数のファイルに書き込むこともできます。

DataFrame をテーブルに保存する

Azure Databricks では、既定ですべてのテーブルに Delta Lake 形式が使用されます。 DataFrame を保存するには、カタログとスキーマに対する CREATE テーブル権限が必要です。

次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードでは、このチュートリアルの先頭で定義した変数を使って、DataFrame の内容をテーブルに保存します。

Python

df.write.saveAsTable(f"{path_tables}" + "." + f"{table_name}")

# To overwrite an existing table, use the following code:
# df.write.mode("overwrite").saveAsTable(f"{path_tables}" + "." + f"{table_name}")

Scala

df.write.saveAsTable(s"$path_tables" + "." + s"$table_name")

// To overwrite an existing table, use the following code:
// df.write.mode("overwrite").saveAsTable(s"$path_volume" + "." + s"$table_name")

R

saveAsTable(df, paste(path_tables, ".", table_name))
# To overwrite an existing table, use the following code:
# saveAsTable(df, paste(path_tables, ".", table_name), mode = "overwrite")

ほとんどの Apache Spark アプリケーションは、大規模なデータ セットに対して分散方式で動作します。 Apache Spark は、1 つのファイルではなく、ファイルのディレクトリを書き出します。 Delta Lake では Parquet フォルダーとファイルを分割します。 多くのデータ システムでは、これらのファイルのディレクトリを読み取ることができます。 Azure Databricks では、ほとんどのアプリケーションでファイル パスよりもテーブルを使用することをお勧めします。

DataFrame を JSON ファイルに保存する

次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードは、DataFrame を JSON ファイルのディレクトリに保存します。

Python

df.write.format("json").save("/tmp/json_data")

# To overwrite an existing file, use the following code:
# df.write.format("json").mode("overwrite").save("/tmp/json_data")

Scala

df.write.format("json").save("/tmp/json_data")

// To overwrite an existing file, use the following code:
// df.write.format("json").mode("overwrite").save("/tmp/json_data")

R

write.df(df, path = "/tmp/json_data", source = "json")
# To overwrite an existing file, use the following code:
# write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")

DataFrame を JSON ファイルから読み取る

Apache Spark spark.read.format() メソッドを使って、ディレクトリから DataFrame に JSON データを読み取る方法を説明します。

次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードでは、前の例で保存した JSON ファイルが表示されます。

Python

display(spark.read.format("json").json("/tmp/json_data"))

Scala

display(spark.read.format("json").json("/tmp/json_data"))

R

display(read.json("/tmp/json_data"))

追加タスク: PySpark、Scala、R で SQL クエリを実行する

Apache Spark の DataFrame には、SQL と PySpark、Scala、R を組み合わせる次のオプションがあります。このチュートリアル用に作成したのと同じノートブックで、次のコードを実行できます。

列を SQL クエリとして指定する

Apache Spark の selectExpr() メソッドの使用方法を説明します。 これは、SQL 式を受け取って更新された DataFrame を返す select() メソッドのバリエーションです。 このメソッドでは、upper などの SQL 式を使用できます。

次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードでは、Apache Spark の selectExpr() メソッドと SQL の upper 式を使って、文字列の列を大文字に変換します (また、列の名前を変更します)。

Python

display(df.selectExpr("Count", "upper(County) as big_name"))

Scala

display(df.selectExpr("Count", "upper(County) as big_name"))

R

display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))

expr() を使用して列に SQL 構文を使用する

Apache Spark の expr() 関数をインポートして使用し、列が指定されている任意の場所で SQL 構文を使う方法を説明します。

次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードでは、expr() 関数をインポートしてから、Apache Spark の expr() 関数と SQL の lower 式を使って、文字列の列を小文字に変換します (また、列の名前を変更します)。

Python

from pyspark.sql.functions import expr
display(df.select("Count", expr("lower(County) as little_name")))

Scala

import org.apache.spark.sql.functions.{col, expr} // Scala requires us to import the col() function as well as the expr() function

display(df.select(col("Count"), expr("lower(County) as little_name")))

R

display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name"))
# expr() function is not supported in R, selectExpr in SparkR replicates this functionality

spark.sql() 関数を使用して任意の SQL クエリを実行する

Apache Spark の spark.sql() 関数を使って任意の SQL クエリを実行する方法を説明します。

次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードでは、Apache Spark の spark.sql() 関数を使い、SQL 構文を使って SQL テーブルのクエリを実行します。

Python

display(spark.sql(f"SELECT * FROM {path_tables}" + "." + f"{table_name}"))

Scala

display(spark.sql(s"SELECT * FROM $path_tables.$table_name"))

R

display(sql(paste("SELECT * FROM", path_tables, ".", table_name)))

DataFrame チュートリアルのノートブック

次のノートブックには、このチュートリアルのクエリ例が含まれています。

Python

DataFrame チュートリアルのノートブック

ノートブックを入手

Scala

DataFrame チュートリアルのノートブック

ノートブックを入手

R

DataFrame チュートリアルのノートブック

ノートブックを入手

その他のリソース