このチュートリアルでは、Azure Databricks で Apache Spark Python (PySpark) DataFrame API、Apache Spark Scala DataFrame API、SparkR SparkDataFrame API を使ってデータを読み込んで変換する方法について説明します。
このチュートリアルを最後まで進めると、DataFrame とは何であるかを理解し、以下のタスクを快適に実行できます。
Python
- 変数を定義し、パブリック データを Unity カタログ ボリュームにコピーする
- Python を使用して DataFrame を作成する
- CSV ファイルから DataFrame にデータを読み込む
- DataFrame を表示して操作する
- DataFrame を保存する
- PySpark で SQL クエリを実行する
Apache Spark PySpark API リファレンスも参照してください。
スカラ (プログラミング言語)
- 変数を定義し、パブリック データを Unity カタログ ボリュームにコピーする
- Scala を使用して DataFrame を作成する
- CSV ファイルから DataFrame にデータを読み込む
- DataFrame を表示して操作する
- DataFrame を保存する
- Apache Spark で SQL クエリを実行する
Apache Spark Scala API リファレンスも参照してください。
R
- 変数を定義し、パブリック データを Unity カタログ ボリュームにコピーする
- SparkR SparkDataFrames を作成する
- CSV ファイルから DataFrame にデータを読み込む
- DataFrame を表示して操作する
- DataFrame を保存する
- SparkR で SQL クエリを実行する
Apache SparkR API リファレンスも参照してください。
DataFrame とは
DataFrame は、潜在的に異なる型の列を持つ 2 次元のラベル付きデータ構造です。 DataFrame は、スプレッドシート、SQL テーブル、または一連のオブジェクトのディクショナリのようなものと考えることができます。 Apache Spark DataFrames には、一般的なデータ分析の問題を効率的に解決できるようにする豊富な機能セット (列の選択、フィルター、結合、集計) が用意されています。
Apache Spark DataFrames は、Resilient Distributed Datasets (RDD) に基づいて構築された抽象化です。 Spark DataFrames と Spark SQL では、統合された計画と最適化エンジンが使用されるため、Azure Databricks でサポートされているすべての言語 (Python、SQL、Scala、R) でほぼ同じパフォーマンスを得ることができます。
要件
次のチュートリアルを完了するには、次の要件を満たす必要があります。
このチュートリアルの例を使用するには、ワークスペースで Unity カタログ が有効になっている必要があります。
このチュートリアルの例では、Unity カタログ ボリューム を使用してサンプル データを格納します。 これらの例を使用するには、ボリュームを作成し、そのボリュームのカタログ、スキーマ、およびボリューム名を使用して、例で使用されるボリューム パスを設定します。
Unity カタログには、次のアクセス許可が必要です。
- このチュートリアルで使うボリュームに対する
READ VOLUMEとWRITE VOLUME、またはALL PRIVILEGES。 -
USE SCHEMAまたはALL PRIVILEGESは、このチュートリアルで使用されるスキーマです。 - このチュートリアルで使用するカタログとして
USE CATALOGまたはALL PRIVILEGESを使用します。
これらのアクセス許可を設定するには、Databricks 管理者または Unity カタログの権限とセキュリティ保護可能なオブジェクトを参照してください。
- このチュートリアルで使うボリュームに対する
ヒント
この記事の完成したノートブックについては、「DataFrame チュートリアルのノートブック」を参照してください。
ステップ 1: 変数を定義して CSV ファイルを読み込む
この手順では、このチュートリアルで使用する変数を定義し、赤ちゃんの名前データを含む CSV ファイルを health.data.ny.gov から Unity カタログ ボリュームに読み込みます。
アイコンをクリックして、新しいノートブックを開きます。 Azure Databricks ノートブックを操作する方法については、「ノートブックの外観をカスタマイズする」を参照してください。次のコードをコピーして、新しい空のノートブック セルに貼り付けます。
<catalog-name>、<schema-name>、<volume-name>を、Unity Catalog ボリュームのカタログ、スキーマ、ボリュームの名前に置き換えます。<table_name>を、選択したテーブル名に置き換えます。 このチュートリアルの後半で、このテーブルに赤ちゃんの名前データを読み込みます。Python
catalog = "<catalog_name>" schema = "<schema_name>" volume = "<volume_name>" download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name = "rows.csv" table_name = "<table_name>" path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume path_table = catalog + "." + schema print(path_table) # Show the complete path print(path_volume) # Show the complete pathスカラ (プログラミング言語)
val catalog = "<catalog_name>" val schema = "<schema_name>" val volume = "<volume_name>" val downloadUrl = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" val fileName = "rows.csv" val tableName = "<table_name>" val pathVolume = s"/Volumes/$catalog/$schema/$volume" val pathTable = s"$catalog.$schema" print(pathVolume) // Show the complete path print(pathTable) // Show the complete pathR
catalog <- "<catalog_name>" schema <- "<schema_name>" volume <- "<volume_name>" download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name <- "rows.csv" table_name <- "<table_name>" path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "") path_table <- paste(catalog, ".", schema, sep = "") print(path_volume) # Show the complete path print(path_table) # Show the complete pathShift+Enterキーを押してセルを実行し、新しい空のセルを作成します。次のコードをコピーして、新しい空のノートブック セルに貼り付けます。 このコードでは、
rows.csvコマンドを使って、health.data.ny.gov から Unity Catalog ボリュームに ファイルをコピーします。Python
dbutils.fs.cp(f"{download_url}", f"{path_volume}/{file_name}")スカラ (プログラミング言語)
dbutils.fs.cp(downloadUrl, s"$pathVolume/$fileName")R
dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))Shift+Enterキーを押してセルを実行してから、次のセルに移動します。
ステップ 2: DataFrame を作成する
このステップでは、テスト データを含む df1 という名前の DataFrame を作成してから、その内容を表示します。
次のコードをコピーして、新しい空のノートブック セルに貼り付けます。 このコードでは、テスト データを含む DataFrame を作成し、DataFrame の内容とスキーマを表示します。
Python
data = [[2021, "test", "Albany", "M", 42]] columns = ["Year", "First_Name", "County", "Sex", "Count"] # highlight-next-line df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int") display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.スカラ (プログラミング言語)
val data = Seq((2021, "test", "Albany", "M", 42)) val columns = Seq("Year", "First_Name", "County", "Sex", "Count") // highlight-next-line val df1 = data.toDF(columns: _*) display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization. // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.R
# Load the SparkR package that is already preinstalled on the cluster. library(SparkR) data <- data.frame( Year = as.integer(c(2021)), First_Name = c("test"), County = c("Albany"), Sex = c("M"), Count = as.integer(c(42)) ) # highlight-next-line df1 <- createDataFrame(data) display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.Shift+Enterキーを押してセルを実行してから、次のセルに移動します。
ステップ 3: CSV ファイルから DataFrame にデータを読み込む
この手順では、以前に Unity カタログ ボリュームに読み込んだ CSV ファイルから、 df_csv という名前の DataFrame を作成します。
spark.read.csv を参照してください。
次のコードをコピーして、新しい空のノートブック セルに貼り付けます。 このコードでは、赤ちゃんの名前のデータを CSV ファイルから DataFrame
df_csvに読み込み、DataFrame の内容を表示します。Python
df_csv = spark.read.csv(f"{path_volume}/{file_name}", header=True, inferSchema=True, sep=",") display(df_csv)スカラ (プログラミング言語)
val dfCsv = spark.read .option("header", "true") .option("inferSchema", "true") .option("delimiter", ",") .csv(s"$pathVolume/$fileName") display(dfCsv)R
df_csv <- read.df(paste(path_volume, "/", file_name, sep=""), source="csv", header = TRUE, inferSchema = TRUE, delimiter = ",") display(df_csv)Shift+Enterキーを押してセルを実行してから、次のセルに移動します。
サポートされている多くのファイル形式からデータを読み込むことができます。
ステップ 4: DataFrame を表示して操作する
次の方法を使って、赤ちゃんの名前の DataFrame を表示して操作します。
DataFrame スキーマを印刷する
Apache Spark DataFrame のスキーマを表示する方法について説明します。 Apache Spark では、 スキーマ という用語を使用して、DataFrame 内の列の名前とデータ型を参照します。
注
また、Azure Databricks では、スキーマという用語を使用して、カタログに登録されているテーブルのコレクションを記述します。
次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードでは、2 つの DataFrame のスキーマを表示する
.printSchema()メソッドを使用して DataFrames のスキーマを示します。2 つの DataFrame を結合するための準備をします。Python
df_csv.printSchema() df1.printSchema()スカラ (プログラミング言語)
dfCsv.printSchema() df1.printSchema()R
printSchema(df_csv) printSchema(df1)Shift+Enterキーを押してセルを実行してから、次のセルに移動します。
DataFrame の列の名前を変更する
DataFrame の列の名前を変更する方法について説明します。
次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードは、
df1_csvDataFrame 内の列の名前を、df1DataFrame 内のそれぞれの列と一致するように変更します。 このコードでは、Apache SparkwithColumnRenamed()メソッドを使います。Python
df_csv = df_csv.withColumnRenamed("First Name", "First_Name") df_csv.printSchemaスカラ (プログラミング言語)
val dfCsvRenamed = dfCsv.withColumnRenamed("First Name", "First_Name") // when modifying a DataFrame in Scala, you must assign it to a new variable dfCsvRenamed.printSchema()R
df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name") printSchema(df_csv)Shift+Enterキーを押してセルを実行してから、次のセルに移動します。
DataFrame を結合する
1 つの DataFrame の行を別のものに追加する新しい DataFrame を作成する方法を説明します。
次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードでは、Apache Spark
union()メソッドを使って、最初の DataFramedfの内容と、CSV ファイルから読み込まれた赤ちゃんの名前のデータを含む DataFramedf_csvを結合します。Python
df = df1.union(df_csv) display(df)スカラ (プログラミング言語)
val df = df1.union(dfCsvRenamed) display(df)R
display(df <- union(df1, df_csv))Shift+Enterキーを押してセルを実行してから、次のセルに移動します。
DataFrame で行をフィルター処理する
Apache Spark .filter() または .where() メソッドを使用して行をフィルター処理することで、データ セット内で最も一般的な赤ちゃんの名前を見つけられます。 フィルター処理を使用して、DataFrame で返す行または変更する行のサブセットを選択します。 以下の例に示すように、パフォーマンスや構文に違いはありません。
.filter() メソッドの使用
次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードでは、Apache Spark
.filter()メソッドを使用して、数が 50 を超えるデータフレーム内の行を表示します。Python
display(df.filter(df["Count"] > 50))スカラ (プログラミング言語)
display(df.filter(df("Count") > 50))R
display(filteredDF <- filter(df, df$Count > 50))Shift+Enterキーを押してセルを実行してから、次のセルに移動します。
.where() メソッドの使用
次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードでは、Apache Spark
.where()メソッドを使用して、数が 50 を超えるデータフレーム内の行を表示します。Python
display(df.where(df["Count"] > 50))スカラ (プログラミング言語)
display(df.where(df("Count") > 50))R
display(filtered_df <- where(df, df$Count > 50))Shift+Enterキーを押してセルを実行してから、次のセルに移動します。
データフレームから列を選択し、頻度で並べ替えます
赤ちゃんの名前の頻度について学ぶために、返す DataFrame の列を指定するselect()方法を使用します。 Apache Spark orderby と desc 関数を使って結果を並べ替えます。
Apache Spark 用の pyspark.sql モジュールでは、SQL 関数がサポートされています。 このチュートリアルで使用するこれらの関数の中には、Apache Spark orderBy()、 desc()、 expr() 関数があります。 必要に応じてこれらの関数をセッションにインポートし、それらを使用できるようにします。
次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードでは、
desc()関数をインポートしてから、Apache Sparkselect()メソッドおよび Apache SparkorderBy()とdesc()関数を使って、最も一般的な名前とその数を降順に表示します。Python
from pyspark.sql.functions import desc display(df.select("First_Name", "Count").orderBy(desc("Count")))スカラ (プログラミング言語)
import org.apache.spark.sql.functions.desc display(df.select("First_Name", "Count").orderBy(desc("Count")))R
display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))Shift+Enterキーを押してセルを実行してから、次のセルに移動します。
サブセットの DataFrame を作成する
既存の DataFrame からサブセット DataFrame を作成する方法を説明します。
次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードでは、Apache Spark
filterメソッドを使い、年、数、性別でデータを制限して新しい DataFrame を作成します。 Apache Sparkselect()メソッドを使用して列を制限します。 また、Apache SparkorderBy()とdesc()関数を使って、数の順で新しい DataFrame を並べ替えます。Python
subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count")) display(subsetDF)スカラ (プログラミング言語)
val subsetDF = df.filter((df("Year") === 2009) && (df("Count") > 100) && (df("Sex") === "F")).select("First_Name", "County", "Count").orderBy(desc("Count")) display(subsetDF)R
subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count") display(subsetDF)Shift+Enterキーを押してセルを実行してから、次のセルに移動します。
ステップ 5: DataFrameを保存する
DataFrame を保存する方法を説明します。 DataFrame をテーブルに保存するか、データフレームをファイルまたは複数のファイルに書き込むことができます。
DataFrame をテーブルに保存する
Azure Databricks では、既定ですべてのテーブルに Delta Lake 形式が使用されます。 DataFrame を保存するには、カタログとスキーマに対する CREATE テーブル権限が必要です。
次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードでは、このチュートリアルの開始時に定義した変数を使用して、DataFrame の内容をテーブルに保存します。
Python
df.write.mode("overwrite").saveAsTable(f"{path_table}.{table_name}")スカラ (プログラミング言語)
df.write.mode("overwrite").saveAsTable(s"$pathTable" + "." + s"$tableName")R
saveAsTable(df, paste(path_table, ".", table_name), mode = "overwrite")Shift+Enterキーを押してセルを実行してから、次のセルに移動します。
ほとんどの Apache Spark アプリケーションは、大規模なデータ セットに対して分散方式で動作します。 Apache Spark は、1 つのファイルではなく、ファイルのディレクトリを書き出します。 Delta Lake では Parquet フォルダーとファイルを分割します。 多くのデータ システムでは、これらのファイルのディレクトリを読み取ることができます。 Azure Databricks では、ほとんどのアプリケーションでファイル パスに対してテーブルを使用することをお勧めします。
DataFrame を JSON ファイルに保存する
次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードは、DataFrame を JSON ファイルのディレクトリに保存します。
Python
df.write.format("json").mode("overwrite").save("/tmp/json_data")スカラ (プログラミング言語)
df.write.format("json").mode("overwrite").save("/tmp/json_data")R
write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")Shift+Enterキーを押してセルを実行してから、次のセルに移動します。
DataFrame を JSON ファイルから読み取る
Apache Spark spark.read.format() メソッドを使って、ディレクトリから DataFrame に JSON データを読み取る方法を説明します。
次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードでは、前の例で保存した JSON ファイルが表示されます。
Python
display(spark.read.format("json").json("/tmp/json_data"))スカラ (プログラミング言語)
display(spark.read.format("json").json("/tmp/json_data"))R
display(read.json("/tmp/json_data"))Shift+Enterキーを押してセルを実行してから、次のセルに移動します。
追加タスク: PySpark、Scala、R で SQL クエリを実行する
Apache Spark の DataFrame には、SQL と PySpark、Scala、R を組み合わせる次のオプションがあります。このチュートリアル用に作成したのと同じノートブックで、次のコードを実行できます。
SQL クエリとして列を指定する
Apache Spark の selectExpr() メソッドの使用方法を説明します。 これは、SQL 式を受け取って更新された DataFrame を返す select() メソッドのバリエーションです。 このメソッドでは、upper などの SQL 式を使用できます。
次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードでは、Apache Spark
selectExpr()メソッドと SQLupper式を使用して、文字列列を大文字に変換します (列の名前を変更します)。Python
display(df.selectExpr("Count", "upper(County) as big_name"))スカラ (プログラミング言語)
display(df.selectExpr("Count", "upper(County) as big_name"))R
display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))Shift+Enterキーを押してセルを実行してから、次のセルに移動します。
expr()を使用して列に SQL 構文を使用する
Apache Spark expr() 関数をインポートして使用して、列を指定する任意の場所で SQL 構文を使用する方法について説明します。
次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードでは、
expr()関数をインポートし、Apache Sparkexpr()関数と SQLlower式を使用して、文字列列を小文字に変換します (列の名前を変更します)。Python
from pyspark.sql.functions import expr display(df.select("Count", expr("lower(County) as little_name")))スカラ (プログラミング言語)
import org.apache.spark.sql.functions.{col, expr} // Scala requires us to import the col() function as well as the expr() function display(df.select(col("Count"), expr("lower(County) as little_name")))R
display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name")) # expr() function is not supported in R, selectExpr in SparkR replicates this functionalityShift+Enterキーを押してセルを実行してから、次のセルに移動します。
spark.sql() 関数を使用して任意の SQL クエリを実行する
Apache Spark の spark.sql() 関数を使って任意の SQL クエリを実行する方法を説明します。
次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードでは、Apache Spark
spark.sql()関数を使用して、SQL 構文を使用して SQL テーブルに対してクエリを実行します。Python
display(spark.sql(f"SELECT * FROM {path_table}.{table_name}"))スカラ (プログラミング言語)
display(spark.sql(s"SELECT * FROM $pathTable.$tableName"))R
display(sql(paste("SELECT * FROM", path_table, ".", table_name)))Shift+Enterキーを押してセルを実行してから、次のセルに移動します。
DataFrame チュートリアルのノートブック
以下のノートブックには、このチュートリアルのクエリ例が含まれています。