Tutorial: Memuat dan mengubah data menggunakan Apache Spark DataFrames
Tutorial ini menunjukkan kepada Anda cara memuat dan mengubah data menggunakan API DataFrame Apache Spark Python (PySpark), API DataFrame Apache Spark Scala, dan API SparkR SparkDataFrame di Azure Databricks.
Pada akhir tutorial ini, Anda akan memahami apa itu DataFrame dan terbiasa dengan tugas-tugas berikut:
Python
- Menentukan variabel dan menyalin data publik ke dalam volume Katalog Unity
- Membuat DataFrame dengan Python
- Memuat data ke dalam DataFrame dari file CSV
- Menampilkan dan berinteraksi dengan DataFrame
- Simpan DataFrame
- Menjalankan kueri SQL di PySpark
Lihat juga referensi API Apache Spark PySpark.
Scala
- Menentukan variabel dan menyalin data publik ke dalam volume Katalog Unity
- Membuat DataFrame dengan Scala
- Memuat data ke dalam DataFrame dari file CSV
- Menampilkan dan berinteraksi dengan DataFrame
- Simpan DataFrame
- Menjalankan kueri SQL di Apache Spark
Lihat juga referensi API Apache Spark Scala.
R
- Menentukan variabel dan menyalin data publik ke dalam volume Katalog Unity
- Membuat SparkR SparkDataFrames
- Memuat data ke dalam DataFrame dari file CSV
- Menampilkan dan berinteraksi dengan DataFrame
- Simpan DataFrame
- Menjalankan kueri SQL di SparkR
Lihat juga referensi Apache SparkR API.
Apa itu DataFrame?
DataFrame adalah struktur data berlabel dua dimensi dengan kolom dari jenis yang berpotensi berbeda. Anda dapat memikirkan DataFrame seperti spreadsheet, tabel SQL, atau kamus objek seri. Apache Spark DataFrames menyediakan serangkaian fungsi yang kaya (pilih kolom, filter, gabung, agregat) yang memungkinkan Anda menyelesaikan masalah analisis data umum secara efisien.
Apache Spark DataFrames adalah abstraksi yang dibangun di atas Himpunan Data Terdistribusi Tangguh (RDD). Spark DataFrames dan Spark SQL menggunakan mesin perencanaan dan pengoptimalan terpadu, memungkinkan Anda mendapatkan performa yang hampir identik di semua bahasa yang didukung di Azure Databricks (Python, SQL, Scala, dan R).
Persyaratan
Untuk menyelesaikan tutorial berikut, Anda harus memenuhi persyaratan berikut:
Untuk menggunakan contoh dalam tutorial ini, ruang kerja Anda harus mengaktifkan Katalog Unity.
Contoh dalam tutorial ini menggunakan volume Katalog Unity untuk menyimpan data sampel. Untuk menggunakan contoh ini, buat volume dan gunakan katalog, skema, dan nama volume volume tersebut untuk mengatur jalur volume yang digunakan oleh contoh.
Anda harus memiliki izin berikut di Katalog Unity:
READ VOLUME
danWRITE VOLUME
, atauALL PRIVILEGES
untuk volume yang digunakan untuk tutorial ini.USE SCHEMA
atauALL PRIVILEGES
untuk skema yang digunakan untuk tutorial ini.USE CATALOG
atauALL PRIVILEGES
untuk katalog yang digunakan untuk tutorial ini.
Untuk mengatur izin ini, lihat administrator Databricks atau hak istimewa Unity Catalog dan objek yang dapat diamankan.
Tip
Untuk buku catatan lengkap untuk artikel ini, lihat Buku catatan tutorial DataFrame.
Langkah 1: Tentukan variabel dan muat file CSV
Langkah ini menentukan variabel untuk digunakan dalam tutorial ini dan kemudian memuat file CSV yang berisi data nama bayi dari health.data.ny.gov ke dalam volume Katalog Unity Anda.
Buka buku catatan baru dengan mengklik ikon. Untuk mempelajari cara menavigasi buku catatan Azure Databricks, lihat Antarmuka dan kontrol buku catatan Databricks.
Salin dan tempel kode berikut ke dalam sel buku catatan kosong baru. Ganti
<catalog-name>
,<schema-name>
, dan<volume-name>
dengan katalog, skema, dan nama volume untuk volume Katalog Unity. Ganti<table_name>
dengan nama tabel pilihan Anda. Anda akan memuat data nama bayi ke dalam tabel ini nanti dalam tutorial ini.Tekan
Shift+Enter
untuk menjalankan sel dan buat sel kosong baru.Python
catalog = "<catalog_name>" schema = "<schema_name>" volume = "<volume_name>" download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name = "rows.csv" table_name = "<table_name>" path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume path_tables = catalog + "." + schema print(path_tables) # Show the complete path print(path_volume) # Show the complete path
Scala
val catalog = "<catalog_name>" val schema = "<schema_name>" val volume = "<volume_name>" val download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" val file_name = "rows.csv" val table_name = "<table_name>" val path_volume = s"/Volumes/$catalog/$schema/$volume" val path_tables = s"$catalog.$schema.$table_name" print(path_volume) // Show the complete path print(path_tables) // Show the complete path
R
catalog <- "<catalog_name>" schema <- "<schema_name>" volume <- "<volume_name>" download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name <- "rows.csv" table_name <- "<table_name>" path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "") path_tables <- paste(catalog, ".", schema, sep = "") print(path_volume) # Show the complete path print(path_tables) # Show the complete path
Salin dan tempel kode berikut ke dalam sel buku catatan kosong baru. Kode ini menyalin
rows.csv
file dari health.data.ny.gov ke volume Unity Catalog Anda menggunakan perintah Databricks dbutuils .Tekan
Shift+Enter
untuk menjalankan sel lalu berpindah ke sel berikutnya.Python
dbutils.fs.cp(f"{download_url}", f"{path_volume}" + "/" + f"{file_name}")
Scala
dbutils.fs.cp(download_url, s"$path_volume/$file_name")
R
dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
Langkah 2: Membuat DataFrame
Langkah ini membuat DataFrame bernama df1
dengan data pengujian lalu menampilkan kontennya.
Salin dan tempel kode berikut ke dalam sel buku catatan kosong baru. Kode ini membuat Dataframe dengan data pengujian, lalu menampilkan konten dan skema DataFrame.
Tekan
Shift+Enter
untuk menjalankan sel lalu berpindah ke sel berikutnya.Python
data = [[2021, "test", "Albany", "M", 42]] columns = ["Year", "First_Name", "County", "Sex", "Count"] df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int") display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
Scala
val data = Seq((2021, "test", "Albany", "M", 42)) val columns = Seq("Year", "First_Name", "County", "Sex", "Count") val df1 = data.toDF(columns: _*) display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization. // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
R
# Load the SparkR package that is already preinstalled on the cluster. library(SparkR) data <- data.frame( Year = c(2021), First_Name = c("test"), County = c("Albany"), Sex = c("M"), Count = c(42) ) df1 <- createDataFrame(data) display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
Langkah 3: Memuat data ke dalam DataFrame dari file CSV
Langkah ini membuat DataFrame bernama df_csv
dari file CSV yang sebelumnya Anda muat ke dalam volume Katalog Unity Anda. Lihat spark.read.csv.
Salin dan tempel kode berikut ke dalam sel buku catatan kosong baru. Kode ini memuat data nama bayi ke dalam DataFrame
df_csv
dari file CSV lalu menampilkan konten DataFrame.Tekan
Shift+Enter
untuk menjalankan sel lalu berpindah ke sel berikutnya.Python
df_csv = spark.read.csv(f"{path_volume}/{file_name}", header=True, inferSchema=True, sep=",") display(df_csv)
Scala
val df_csv = spark.read .option("header", "true") .option("inferSchema", "true") .option("delimiter", ",") .csv(s"$path_volume/$file_name") display(df_csv)
R
df_csv <- read.df(paste(path_volume, "/", file_name, sep=""), source="csv", header = TRUE, inferSchema = TRUE, delimiter = ",") display(df_csv)
Anda dapat memuat data dari banyak format file yang didukung.
Langkah 4: Melihat dan berinteraksi dengan DataFrame Anda
Lihat dan berinteraksi dengan nama bayi Anda DataFrames menggunakan metode berikut.
Mencetak skema DataFrame
Pelajari cara menampilkan skema Apache Spark DataFrame. Apache Spark menggunakan skema istilah untuk merujuk ke nama dan jenis data kolom di DataFrame.
Salin dan tempel kode berikut ke dalam sel buku catatan kosong. Kode ini menunjukkan skema DataFrames Anda dengan .printSchema()
metode untuk melihat skema dua DataFrames - untuk bersiap menyatukan dua DataFrames.
Python
df_csv.printSchema()
df1.printSchema()
Scala
df_csv.printSchema()
df1.printSchema()
R
printSchema(df_csv)
printSchema(df1)
Catatan
Azure Databricks juga menggunakan istilah skema untuk menjelaskan kumpulan tabel yang terdaftar ke katalog.
Ganti nama kolom di DataFrame
Pelajari cara mengganti nama kolom di DataFrame.
Salin dan tempel kode berikut ke dalam sel buku catatan kosong. Kode ini mengganti nama kolom di df1_csv
DataFrame agar sesuai dengan kolom masing-masing dalam df1
DataFrame. Kode ini menggunakan metode Apache Spark withColumnRenamed()
.
Python
df_csv = df_csv.withColumnRenamed("First Name", "First_Name")
df_csv.printSchema
Scala
val df_csvRenamed = df_csv.withColumnRenamed("First Name", "First_Name")
// when modifying a DataFrame in Scala, you must assign it to a new variable
df_csv_renamed.printSchema()
R
df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name")
printSchema(df_csv)
Menggabungkan DataFrame
Pelajari cara membuat DataFrame baru yang menambahkan baris satu DataFrame ke DataFrame lainnya.
Salin dan tempel kode berikut ke dalam sel buku catatan kosong. Kode ini menggunakan metode Apache Spark union()
untuk menggabungkan konten DataFrame df
pertama Anda dengan DataFrame df_csv
yang berisi data nama bayi yang dimuat dari file CSV.
Python
df = df1.union(df_csv)
display(df)
Scala
val df = df1.union(df_csv_renamed)
display(df)
R
display(df <- union(df1, df_csv))
Memfilter baris dalam DataFrame
Temukan nama bayi paling populer dalam himpunan data Anda dengan memfilter baris, menggunakan Apache Spark .filter()
atau .where()
metode. Gunakan pemfilteran untuk memilih subset baris untuk dikembalikan atau diubah dalam DataFrame. Tidak ada perbedaan dalam performa atau sintaks, seperti yang terlihat dalam contoh berikut.
Menggunakan metode .filter()
Salin dan tempel kode berikut ke dalam sel buku catatan kosong. Kode ini menggunakan metode Apache Spark .filter()
untuk menampilkan baris tersebut di DataFrame dengan jumlah lebih dari 50.
Python
display(df.filter(df["Count"] > 50))
Scala
display(df.filter(df("Count") > 50))
R
display(filteredDF <- filter(df, df$Count > 50))
Menggunakan metode .where()
Salin dan tempel kode berikut ke dalam sel buku catatan kosong. Kode ini menggunakan metode Apache Spark .where()
untuk menampilkan baris tersebut di DataFrame dengan jumlah lebih dari 50.
Python
display(df.where(df["Count"] > 50))
Scala
display(df.where(df("Count") > 50))
R
display(filtered_df <- where(df, df$Count > 50))
Memilih kolom dari DataFrame dan mengurutkan menurut frekuensi
Pelajari tentang frekuensi nama bayi mana dengan select()
metode untuk menentukan kolom dari DataFrame yang akan dikembalikan. Gunakan Apache Spark orderby
dan desc
fungsi untuk mengurutkan hasilnya.
Modul pyspark.sql untuk Apache Spark menyediakan dukungan untuk fungsi SQL. Di antara fungsi-fungsi ini yang kita gunakan dalam tutorial ini adalah fungsi Apache Spark orderBy()
, desc()
, dan expr()
. Anda mengaktifkan penggunaan fungsi-fungsi ini dengan mengimpornya ke sesi Anda sesuai kebutuhan.
Salin dan tempel kode berikut ke dalam sel buku catatan kosong. Kode ini mengimpor desc()
fungsi dan kemudian menggunakan metode Apache Spark select()
dan Apache Spark orderBy()
dan desc()
fungsi untuk menampilkan nama yang paling umum dan jumlahnya dalam urutan menurut.
Python
from pyspark.sql.functions import desc
display(df.select("First_Name", "Count").orderBy(desc("Count")))
Scala
import org.apache.spark.sql.functions.desc
display(df.select("First_Name", "Count").orderBy(desc("Count")))
R
display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))
Membuat DataFrame subset
Pelajari cara membuat DataFrame subset dari DataFrame yang sudah ada.
Salin dan tempel kode berikut ke dalam sel buku catatan kosong. Kode ini menggunakan metode Apache Spark filter
untuk membuat DataFrame baru yang membatasi data menurut tahun, jumlah, dan jenis kelamin. Ini menggunakan metode Apache Spark select()
untuk membatasi kolom. Ini juga menggunakan Apache Spark orderBy()
dan desc()
fungsi untuk mengurutkan DataFrame baru menurut hitungan.
Python
subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
display(subsetDF)
Scala
val subsetDF = df.filter((df("Year") == 2009) && (df("Count") > 100) && (df("Sex") == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
display(subsetDF)
R
subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count")
display(subsetDF)
Langkah 5: Simpan DataFrame
Pelajari cara menyimpan DataFrame,. Anda dapat menyimpan DataFrame Anda ke tabel atau menulis DataFrame ke file atau beberapa file.
Menyimpan DataFrame ke tabel
Azure Databricks menggunakan format Delta Lake untuk semua tabel secara default. Untuk menyimpan DataFrame, Anda harus memiliki CREATE
hak istimewa tabel pada katalog dan skema.
Salin dan tempel kode berikut ke dalam sel buku catatan kosong. Kode ini menyimpan konten DataFrame ke tabel menggunakan variabel yang Anda tentukan di awal tutorial ini.
Python
df.write.saveAsTable(f"{path_tables}" + "." + f"{table_name}")
# To overwrite an existing table, use the following code:
# df.write.mode("overwrite").saveAsTable(f"{path_tables}" + "." + f"{table_name}")
Scala
df.write.saveAsTable(s"$path_tables" + "." + s"$table_name")
// To overwrite an existing table, use the following code:
// df.write.mode("overwrite").saveAsTable(s"$tables" + "." + s"$table_name")
R
saveAsTable(df, paste(path_tables, ".", table_name))
# To overwrite an existing table, use the following code:
# saveAsTable(df, paste(path_tables, ".", table_name), mode = "overwrite")
Sebagian besar aplikasi Apache Spark bekerja pada himpunan data besar dan secara terdistribusi. Apache Spark menulis direktori file daripada satu file. Delta Lake membagi folder dan file Parquet. Banyak sistem data dapat membaca direktori file ini. Azure Databricks merekomendasikan penggunaan tabel melalui jalur file untuk sebagian besar aplikasi.
Menyimpan DataFrame ke file JSON
Salin dan tempel kode berikut ke dalam sel buku catatan kosong. Kode ini menyimpan DataFrame ke direktori file JSON.
Python
df.write.format("json").save("/tmp/json_data")
# To overwrite an existing file, use the following code:
# df.write.format("json").mode("overwrite").save("/tmp/json_data")
Scala
df.write.format("json").save("/tmp/json_data")
// To overwrite an existing file, use the following code:
// df.write.format("json").mode("overwrite").save("/tmp/json_data")
R
write.df(df, path = "/tmp/json_data", source = "json")
# To overwrite an existing file, use the following code:
# write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")
Membaca DataFrame dari file JSON
Pelajari cara menggunakan metode Apache Spark spark.read.format()
untuk membaca data JSON dari direktori ke dalam DataFrame.
Salin dan tempel kode berikut ke dalam sel buku catatan kosong. Kode ini menampilkan file JSON yang Anda simpan dalam contoh sebelumnya.
Python
display(spark.read.format("json").json("/tmp/json_data"))
Scala
display(spark.read.format("json").json("/tmp/json_data"))
R
display(read.json("/tmp/json_data"))
Tugas tambahan: Menjalankan kueri SQL di PySpark, Scala, dan R
Apache Spark DataFrames menyediakan opsi berikut untuk menggabungkan SQL dengan PySpark, Scala, dan R. Anda bisa menjalankan kode berikut di buku catatan yang sama dengan yang Anda buat untuk tutorial ini.
Menentukan kolom sebagai kueri SQL
Pelajari cara menggunakan metode Apache Spark selectExpr()
. Ini adalah varian select()
metode yang menerima ekspresi SQL dan mengembalikan DataFrame yang diperbarui. Metode ini memungkinkan Anda menggunakan ekspresi SQL, seperti upper
.
Salin dan tempel kode berikut ke dalam sel buku catatan kosong. Kode ini menggunakan metode Apache Spark selectExpr()
dan ekspresi SQL upper
untuk mengonversi kolom string menjadi huruf besar (dan mengganti nama kolom).
Python
display(df.selectExpr("Count", "upper(County) as big_name"))
Scala
display(df.selectExpr("Count", "upper(County) as big_name"))
R
display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))
Gunakan expr()
untuk menggunakan sintaks SQL untuk kolom
Pelajari cara mengimpor dan menggunakan fungsi Apache Spark expr()
untuk menggunakan sintaks SQL di mana saja kolom akan ditentukan.
Salin dan tempel kode berikut ke dalam sel buku catatan kosong. Kode ini mengimpor expr()
fungsi lalu menggunakan fungsi Apache Spark expr()
dan ekspresi SQL lower
untuk mengonversi kolom string menjadi huruf kecil (dan mengganti nama kolom).
Python
from pyspark.sql.functions import expr
display(df.select("Count", expr("lower(County) as little_name")))
Scala
import org.apache.spark.sql.functions.{col, expr} // Scala requires us to import the col() function as well as the expr() function
display(df.select(col("Count"), expr("lower(County) as little_name")))
R
display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name"))
# expr() function is not supported in R, selectExpr in SparkR replicates this functionality
Menjalankan kueri SQL arbitrer menggunakan fungsi spark.sql()
Pelajari cara menggunakan fungsi Apache Spark spark.sql()
untuk menjalankan kueri SQL arbitrer.
Salin dan tempel kode berikut ke dalam sel buku catatan kosong. Kode ini menggunakan fungsi Apache Spark untuk mengkueri spark.sql()
tabel SQL menggunakan sintaks SQL.
Python
display(spark.sql(f"SELECT * FROM {path_tables}" + "." + f"{table_name}"))
Scala
display(spark.sql(s"SELECT * FROM $path_tables.$table_name"))
R
display(sql(paste("SELECT * FROM", path_tables, ".", table_name)))
Buku catatan tutorial DataFrame
Buku catatan berikut menyertakan contoh kueri dari tutorial ini.
Python
Tutorial DataFrames menggunakan notebook Python
Scala
Tutorial DataFrames menggunakan notebook Scala
R
Tutorial DataFrames menggunakan buku catatan R
Sumber Daya Tambahan:
Saran dan Komentar
https://aka.ms/ContentUserFeedback.
Segera hadir: Sepanjang tahun 2024 kami akan menghentikan penggunaan GitHub Issues sebagai mekanisme umpan balik untuk konten dan menggantinya dengan sistem umpan balik baru. Untuk mengetahui informasi selengkapnya, lihat:Kirim dan lihat umpan balik untuk