Tutorial: Memuat dan mengubah data menggunakan Apache Spark DataFrames
Tutorial ini menunjukkan kepada Anda cara memuat dan mengubah data menggunakan API DataFrame Apache Spark Python (PySpark), API DataFrame Apache Spark Scala, dan API SparkR SparkDataFrame di Azure Databricks.
Pada akhir tutorial ini, Anda akan memahami apa itu DataFrame dan terbiasa dengan tugas-tugas berikut:
Python
- Menentukan variabel dan menyalin data publik ke dalam volume Katalog Unity
- Membuat DataFrame dengan Python
- Memuat data ke dalam DataFrame dari file CSV
- Menampilkan dan berinteraksi dengan DataFrame
- Simpan DataFrame
- Menjalankan kueri SQL di PySpark
Lihat juga referensi API Apache Spark PySpark.
Scala
- Menentukan variabel dan menyalin data publik ke dalam volume Katalog Unity
- Membuat DataFrame dengan Scala
- Memuat data ke dalam DataFrame dari file CSV
- Menampilkan dan berinteraksi dengan DataFrame
- Simpan DataFrame
- Menjalankan kueri SQL di Apache Spark
Lihat juga referensi API Apache Spark Scala.
R
- Menentukan variabel dan menyalin data publik ke dalam volume Katalog Unity
- Membuat SparkR SparkDataFrames
- Memuat data ke dalam DataFrame dari file CSV
- Menampilkan dan berinteraksi dengan DataFrame
- Simpan DataFrame
- Menjalankan kueri SQL di SparkR
Lihat juga referensi Apache SparkR API.
Apa itu DataFrame?
DataFrame adalah struktur data berlabel dua dimensi dengan kolom dari jenis yang berpotensi berbeda. Anda dapat memikirkan DataFrame seperti spreadsheet, tabel SQL, atau kamus objek seri. Apache Spark DataFrames menyediakan serangkaian fungsi yang kaya (pilih kolom, filter, gabung, agregat) yang memungkinkan Anda menyelesaikan masalah analisis data umum secara efisien.
Apache Spark DataFrames adalah abstraksi yang dibangun di atas Himpunan Data Terdistribusi Tangguh (RDD). Spark DataFrames dan Spark SQL menggunakan mesin perencanaan dan pengoptimalan terpadu, memungkinkan Anda mendapatkan performa yang hampir identik di semua bahasa yang didukung di Azure Databricks (Python, SQL, Scala, dan R).
Persyaratan
Untuk menyelesaikan tutorial berikut, Anda harus memenuhi persyaratan berikut:
Untuk menggunakan contoh dalam tutorial ini, ruang kerja Anda harus mengaktifkan Katalog Unity.
Contoh dalam tutorial ini menggunakan volume Katalog Unity untuk menyimpan data sampel. Untuk menggunakan contoh ini, buat volume dan gunakan katalog, skema, dan nama volume volume tersebut untuk mengatur jalur volume yang digunakan oleh contoh.
Anda harus memiliki izin berikut di Katalog Unity:
READ VOLUME
danWRITE VOLUME
, atauALL PRIVILEGES
untuk volume yang digunakan untuk tutorial ini.USE SCHEMA
atauALL PRIVILEGES
untuk skema yang digunakan untuk tutorial ini.USE CATALOG
atauALL PRIVILEGES
untuk katalog yang digunakan untuk tutorial ini.
Untuk mengatur izin ini, lihat administrator Databricks atau hak istimewa Unity Catalog dan objek yang dapat diamankan.
Tip
Untuk buku catatan lengkap untuk artikel ini, lihat Buku catatan tutorial DataFrame.
Langkah 1: Tentukan variabel dan muat file CSV
Langkah ini menentukan variabel untuk digunakan dalam tutorial ini dan kemudian memuat file CSV yang berisi data nama bayi dari health.data.ny.gov ke dalam volume Katalog Unity Anda.
Buka buku catatan baru dengan mengklik ikon. Untuk mempelajari cara menavigasi buku catatan Azure Databricks, lihat Antarmuka dan kontrol buku catatan Databricks.
Salin dan tempel kode berikut ke dalam sel buku catatan kosong baru. Ganti
<catalog-name>
,<schema-name>
, dan<volume-name>
dengan katalog, skema, dan nama volume untuk volume Katalog Unity. Ganti<table_name>
dengan nama tabel pilihan Anda. Anda akan memuat data nama bayi ke dalam tabel ini nanti dalam tutorial ini.Python
catalog = "<catalog_name>" schema = "<schema_name>" volume = "<volume_name>" download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name = "rows.csv" table_name = "<table_name>" path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume path_table = catalog + "." + schema print(path_table) # Show the complete path print(path_volume) # Show the complete path
Scala
val catalog = "<catalog_name>" val schema = "<schema_name>" val volume = "<volume_name>" val downloadUrl = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" val fileName = "rows.csv" val tableName = "<table_name>" val pathVolume = s"/Volumes/$catalog/$schema/$volume" val pathTable = s"$catalog.$schema" print(pathVolume) // Show the complete path print(pathTable) // Show the complete path
R
catalog <- "<catalog_name>" schema <- "<schema_name>" volume <- "<volume_name>" download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name <- "rows.csv" table_name <- "<table_name>" path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "") path_table <- paste(catalog, ".", schema, sep = "") print(path_volume) # Show the complete path print(path_table) # Show the complete path
Tekan
Shift+Enter
untuk menjalankan sel dan buat sel kosong baru.Salin dan tempel kode berikut ke dalam sel buku catatan kosong baru. Kode ini menyalin
rows.csv
file dari health.data.ny.gov ke volume Unity Catalog Anda menggunakan perintah Databricks dbutuils .Python
dbutils.fs.cp(f"{download_url}", f"{path_volume}/{file_name}")
Scala
dbutils.fs.cp(downloadUrl, s"$pathVolume/$fileName")
R
dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
Tekan
Shift+Enter
untuk menjalankan sel lalu berpindah ke sel berikutnya.
Langkah 2: Membuat DataFrame
Langkah ini membuat DataFrame bernama df1
dengan data pengujian lalu menampilkan kontennya.
Salin dan tempel kode berikut ke dalam sel buku catatan kosong baru. Kode ini membuat DataFrame dengan data pengujian, lalu menampilkan konten dan skema DataFrame.
Python
data = [[2021, "test", "Albany", "M", 42]] columns = ["Year", "First_Name", "County", "Sex", "Count"] df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int") display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
Scala
val data = Seq((2021, "test", "Albany", "M", 42)) val columns = Seq("Year", "First_Name", "County", "Sex", "Count") val df1 = data.toDF(columns: _*) display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization. // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
R
# Load the SparkR package that is already preinstalled on the cluster. library(SparkR) data <- data.frame( Year = as.integer(c(2021)), First_Name = c("test"), County = c("Albany"), Sex = c("M"), Count = as.integer(c(42)) ) df1 <- createDataFrame(data) display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
Tekan
Shift+Enter
untuk menjalankan sel lalu berpindah ke sel berikutnya.
Langkah 3: Memuat data ke dalam DataFrame dari file CSV
Langkah ini membuat DataFrame bernama df_csv
dari file CSV yang sebelumnya Anda muat ke dalam volume Katalog Unity Anda. Lihat spark.read.csv.
Salin dan tempel kode berikut ke dalam sel buku catatan kosong baru. Kode ini memuat data nama bayi ke dalam DataFrame
df_csv
dari file CSV lalu menampilkan konten DataFrame.Python
df_csv = spark.read.csv(f"{path_volume}/{file_name}", header=True, inferSchema=True, sep=",") display(df_csv)
Scala
val dfCsv = spark.read .option("header", "true") .option("inferSchema", "true") .option("delimiter", ",") .csv(s"$pathVolume/$fileName") display(dfCsv)
R
df_csv <- read.df(paste(path_volume, "/", file_name, sep=""), source="csv", header = TRUE, inferSchema = TRUE, delimiter = ",") display(df_csv)
Tekan
Shift+Enter
untuk menjalankan sel lalu berpindah ke sel berikutnya.
Anda dapat memuat data dari banyak format file yang didukung.
Langkah 4: Melihat dan berinteraksi dengan DataFrame Anda
Lihat dan berinteraksi dengan nama bayi Anda DataFrames menggunakan metode berikut.
Mencetak skema DataFrame
Pelajari cara menampilkan skema Apache Spark DataFrame. Apache Spark menggunakan skema istilah untuk merujuk ke nama dan jenis data kolom di DataFrame.
Catatan
Azure Databricks juga menggunakan istilah skema untuk menjelaskan kumpulan tabel yang terdaftar ke katalog.
Salin dan tempel kode berikut ke dalam sel buku catatan kosong. Kode ini menunjukkan skema DataFrames Anda dengan
.printSchema()
metode untuk melihat skema dua DataFrames - untuk bersiap menyatukan dua DataFrames.Python
df_csv.printSchema() df1.printSchema()
Scala
dfCsv.printSchema() df1.printSchema()
R
printSchema(df_csv) printSchema(df1)
Tekan
Shift+Enter
untuk menjalankan sel lalu berpindah ke sel berikutnya.
Ganti nama kolom di DataFrame
Pelajari cara mengganti nama kolom di DataFrame.
Salin dan tempel kode berikut ke dalam sel buku catatan kosong. Kode ini mengganti nama kolom di
df1_csv
DataFrame agar sesuai dengan kolom masing-masing dalamdf1
DataFrame. Kode ini menggunakan metode Apache SparkwithColumnRenamed()
.Python
df_csv = df_csv.withColumnRenamed("First Name", "First_Name") df_csv.printSchema
Scala
val dfCsvRenamed = dfCsv.withColumnRenamed("First Name", "First_Name") // when modifying a DataFrame in Scala, you must assign it to a new variable dfCsvRenamed.printSchema()
R
df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name") printSchema(df_csv)
Tekan
Shift+Enter
untuk menjalankan sel lalu berpindah ke sel berikutnya.
Menggabungkan DataFrame
Pelajari cara membuat DataFrame baru yang menambahkan baris satu DataFrame ke DataFrame lainnya.
Salin dan tempel kode berikut ke dalam sel buku catatan kosong. Kode ini menggunakan metode Apache Spark
union()
untuk menggabungkan konten DataFramedf
pertama Anda dengan DataFramedf_csv
yang berisi data nama bayi yang dimuat dari file CSV.Python
df = df1.union(df_csv) display(df)
Scala
val df = df1.union(dfCsvRenamed) display(df)
R
display(df <- union(df1, df_csv))
Tekan
Shift+Enter
untuk menjalankan sel lalu berpindah ke sel berikutnya.
Memfilter baris dalam DataFrame
Temukan nama bayi paling populer dalam himpunan data Anda dengan memfilter baris, menggunakan Apache Spark .filter()
atau .where()
metode. Gunakan pemfilteran untuk memilih subset baris untuk dikembalikan atau diubah dalam DataFrame. Tidak ada perbedaan dalam performa atau sintaks, seperti yang terlihat dalam contoh berikut.
Menggunakan metode .filter()
Salin dan tempel kode berikut ke dalam sel buku catatan kosong. Kode ini menggunakan metode Apache Spark
.filter()
untuk menampilkan baris tersebut di DataFrame dengan jumlah lebih dari 50.Python
display(df.filter(df["Count"] > 50))
Scala
display(df.filter(df("Count") > 50))
R
display(filteredDF <- filter(df, df$Count > 50))
Tekan
Shift+Enter
untuk menjalankan sel lalu berpindah ke sel berikutnya.
Menggunakan metode .where()
Salin dan tempel kode berikut ke dalam sel buku catatan kosong. Kode ini menggunakan metode Apache Spark
.where()
untuk menampilkan baris tersebut di DataFrame dengan jumlah lebih dari 50.Python
display(df.where(df["Count"] > 50))
Scala
display(df.where(df("Count") > 50))
R
display(filtered_df <- where(df, df$Count > 50))
Tekan
Shift+Enter
untuk menjalankan sel lalu berpindah ke sel berikutnya.
Memilih kolom dari DataFrame dan mengurutkan menurut frekuensi
Pelajari tentang frekuensi nama bayi mana dengan select()
metode untuk menentukan kolom dari DataFrame yang akan dikembalikan. Gunakan Apache Spark orderby
dan desc
fungsi untuk mengurutkan hasilnya.
Modul pyspark.sql untuk Apache Spark menyediakan dukungan untuk fungsi SQL. Di antara fungsi-fungsi ini yang kita gunakan dalam tutorial ini adalah fungsi Apache Spark orderBy()
, desc()
, dan expr()
. Anda mengaktifkan penggunaan fungsi-fungsi ini dengan mengimpornya ke sesi Anda sesuai kebutuhan.
Salin dan tempel kode berikut ke dalam sel buku catatan kosong. Kode ini mengimpor
desc()
fungsi dan kemudian menggunakan metode Apache Sparkselect()
dan Apache SparkorderBy()
dandesc()
fungsi untuk menampilkan nama yang paling umum dan jumlahnya dalam urutan menurut.Python
from pyspark.sql.functions import desc display(df.select("First_Name", "Count").orderBy(desc("Count")))
Scala
import org.apache.spark.sql.functions.desc display(df.select("First_Name", "Count").orderBy(desc("Count")))
R
display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))
Tekan
Shift+Enter
untuk menjalankan sel lalu berpindah ke sel berikutnya.
Membuat DataFrame subset
Pelajari cara membuat DataFrame subset dari DataFrame yang sudah ada.
Salin dan tempel kode berikut ke dalam sel buku catatan kosong. Kode ini menggunakan metode Apache Spark
filter
untuk membuat DataFrame baru yang membatasi data menurut tahun, jumlah, dan jenis kelamin. Ini menggunakan metode Apache Sparkselect()
untuk membatasi kolom. Ini juga menggunakan Apache SparkorderBy()
dandesc()
fungsi untuk mengurutkan DataFrame baru menurut hitungan.Python
subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count")) display(subsetDF)
Scala
val subsetDF = df.filter((df("Year") === 2009) && (df("Count") > 100) && (df("Sex") === "F")).select("First_Name", "County", "Count").orderBy(desc("Count")) display(subsetDF)
R
subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count") display(subsetDF)
Tekan
Shift+Enter
untuk menjalankan sel lalu berpindah ke sel berikutnya.
Langkah 5: Simpan DataFrame
Pelajari cara menyimpan DataFrame,. Anda dapat menyimpan DataFrame Anda ke tabel atau menulis DataFrame ke file atau beberapa file.
Menyimpan DataFrame ke tabel
Azure Databricks menggunakan format Delta Lake untuk semua tabel secara default. Untuk menyimpan DataFrame, Anda harus memiliki CREATE
hak istimewa tabel pada katalog dan skema.
Salin dan tempel kode berikut ke dalam sel buku catatan kosong. Kode ini menyimpan konten DataFrame ke tabel menggunakan variabel yang Anda tentukan di awal tutorial ini.
Python
df.write.mode("overwrite").saveAsTable(f"{path_table}.{table_name}")
Scala
df.write.mode("overwrite").saveAsTable(s"$pathTable" + "." + s"$tableName")
R
saveAsTable(df, paste(path_table, ".", table_name), mode = "overwrite")
Tekan
Shift+Enter
untuk menjalankan sel lalu berpindah ke sel berikutnya.
Sebagian besar aplikasi Apache Spark bekerja pada himpunan data besar dan secara terdistribusi. Apache Spark menulis direktori file daripada satu file. Delta Lake membagi folder dan file Parquet. Banyak sistem data dapat membaca direktori file ini. Azure Databricks merekomendasikan penggunaan tabel melalui jalur file untuk sebagian besar aplikasi.
Menyimpan DataFrame ke file JSON
Salin dan tempel kode berikut ke dalam sel buku catatan kosong. Kode ini menyimpan DataFrame ke direktori file JSON.
Python
df.write.format("json").mode("overwrite").save("/tmp/json_data")
Scala
df.write.format("json").mode("overwrite").save("/tmp/json_data")
R
write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")
Tekan
Shift+Enter
untuk menjalankan sel lalu berpindah ke sel berikutnya.
Membaca DataFrame dari file JSON
Pelajari cara menggunakan metode Apache Spark spark.read.format()
untuk membaca data JSON dari direktori ke dalam DataFrame.
Salin dan tempel kode berikut ke dalam sel buku catatan kosong. Kode ini menampilkan file JSON yang Anda simpan dalam contoh sebelumnya.
Python
display(spark.read.format("json").json("/tmp/json_data"))
Scala
display(spark.read.format("json").json("/tmp/json_data"))
R
display(read.json("/tmp/json_data"))
Tekan
Shift+Enter
untuk menjalankan sel lalu berpindah ke sel berikutnya.
Tugas tambahan: Menjalankan kueri SQL di PySpark, Scala, dan R
Apache Spark DataFrames menyediakan opsi berikut untuk menggabungkan SQL dengan PySpark, Scala, dan R. Anda bisa menjalankan kode berikut di buku catatan yang sama dengan yang Anda buat untuk tutorial ini.
Menentukan kolom sebagai kueri SQL
Pelajari cara menggunakan metode Apache Spark selectExpr()
. Ini adalah varian select()
metode yang menerima ekspresi SQL dan mengembalikan DataFrame yang diperbarui. Metode ini memungkinkan Anda menggunakan ekspresi SQL, seperti upper
.
Salin dan tempel kode berikut ke dalam sel buku catatan kosong. Kode ini menggunakan metode Apache Spark
selectExpr()
dan ekspresi SQLupper
untuk mengonversi kolom string menjadi huruf besar (dan mengganti nama kolom).Python
display(df.selectExpr("Count", "upper(County) as big_name"))
Scala
display(df.selectExpr("Count", "upper(County) as big_name"))
R
display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))
Tekan
Shift+Enter
untuk menjalankan sel lalu berpindah ke sel berikutnya.
Gunakan expr()
untuk menggunakan sintaks SQL untuk kolom
Pelajari cara mengimpor dan menggunakan fungsi Apache Spark expr()
untuk menggunakan sintaks SQL di mana saja kolom akan ditentukan.
Salin dan tempel kode berikut ke dalam sel buku catatan kosong. Kode ini mengimpor
expr()
fungsi lalu menggunakan fungsi Apache Sparkexpr()
dan ekspresi SQLlower
untuk mengonversi kolom string menjadi huruf kecil (dan mengganti nama kolom).Python
from pyspark.sql.functions import expr display(df.select("Count", expr("lower(County) as little_name")))
Scala
import org.apache.spark.sql.functions.{col, expr} // Scala requires us to import the col() function as well as the expr() function display(df.select(col("Count"), expr("lower(County) as little_name")))
R
display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name")) # expr() function is not supported in R, selectExpr in SparkR replicates this functionality
Tekan
Shift+Enter
untuk menjalankan sel lalu berpindah ke sel berikutnya.
Menjalankan kueri SQL arbitrer menggunakan fungsi spark.sql()
Pelajari cara menggunakan fungsi Apache Spark spark.sql()
untuk menjalankan kueri SQL arbitrer.
Salin dan tempel kode berikut ke dalam sel buku catatan kosong. Kode ini menggunakan fungsi Apache Spark untuk mengkueri
spark.sql()
tabel SQL menggunakan sintaks SQL.Python
display(spark.sql(f"SELECT * FROM {path_table}.{table_name}"))
Scala
display(spark.sql(s"SELECT * FROM $pathTable.$tableName"))
R
display(sql(paste("SELECT * FROM", path_table, ".", table_name)))
Tekan
Shift+Enter
untuk menjalankan sel lalu berpindah ke sel berikutnya.
Notebook tutorial DataFrame
Notebook berikut menyertakan contoh kueri dari tutorial ini.