Bagikan melalui


Tutorial: Memuat dan mengubah data menggunakan Apache Spark DataFrames

Tutorial ini menunjukkan kepada Anda cara memuat dan mengubah data menggunakan API DataFrame Apache Spark Python (PySpark), API DataFrame Apache Spark Scala, dan API SparkR SparkDataFrame di Azure Databricks.

Catatan

Jika Anda menggunakan Databricks Edisi Gratis, pilih tab Python untuk semua contoh kode dalam tutorial ini. Edisi Gratis tidak mendukung R atau Scala. Selain itu, Edisi Gratis membatasi akses internet keluar, jadi Anda harus mengunggah file CSV menggunakan UI ruang kerja alih-alih mengunduhnya dengan kode. Lihat Langkah 1 untuk instruksi terperinci.

Pada akhir tutorial ini, Anda akan memahami apa itu DataFrame dan terbiasa dengan tugas-tugas berikut:

Phyton

Lihat juga referensi API Apache Spark PySpark .

Scala

Lihat juga referensi API Apache Spark Scala.

R

Lihat juga referensi Apache SparkR API.

Apa itu DataFrame?

DataFrame adalah struktur data berlabel dua dimensi dengan kolom jenis yang berpotensi berbeda. Anda dapat memikirkan DataFrame seperti spreadsheet, tabel SQL, atau kamus objek seri. Apache Spark DataFrames menyediakan serangkaian fungsi yang kaya (pilih kolom, filter, gabung, agregat) yang memungkinkan Anda menyelesaikan masalah analisis data umum secara efisien.

Apache Spark DataFrames adalah abstraksi yang dibangun di atas Himpunan Data Terdistribusi Tangguh (RDD). Spark DataFrames dan Spark SQL menggunakan mesin perencanaan dan pengoptimalan terpadu, memungkinkan Anda mendapatkan performa yang hampir identik di semua bahasa yang didukung di Azure Databricks (Python, SQL, Scala, dan R).

Persyaratan

Untuk menyelesaikan tutorial berikut, Anda harus memenuhi persyaratan berikut:

  • Untuk menggunakan contoh dalam tutorial ini, ruang kerja Anda harus memiliki Katalog Unity diaktifkan. Azure Databricks Edisi Gratis dan ruang kerja uji coba gratis memiliki Unity Catalog yang diaktifkan secara default.

  • Contoh dalam tutorial ini menggunakan volume Katalog Unity untuk menyimpan data sampel. Untuk menggunakan contoh ini, buat volume dan gunakan katalog, skema, serta nama volume tersebut untuk menetapkan jalur volume yang digunakan oleh contoh-contoh ini. Pengguna Edisi Gratis memiliki akses ke katalog ruang kerja dan default skema secara default.

  • Anda harus memiliki izin berikut di Katalog Unity:

    • READ VOLUME dan WRITE VOLUME untuk volume yang digunakan untuk tutorial ini
    • USE SCHEMA untuk skema yang digunakan untuk tutorial ini
    • USE CATALOG untuk katalog yang digunakan untuk tutorial ini

    Untuk mengatur izin ini, hubungi administrator Azure Databricks Anda atau lihat hak istimewa Unity Catalog dan objek yang dapat diamankan. Pengguna Edisi Gratis memiliki hak istimewa ini pada katalog dan default skema ruang kerja secara default.

Petunjuk / Saran

Untuk buku catatan lengkap untuk artikel ini, lihat Buku catatan tutorial DataFrame.

Langkah 1: Tentukan variabel dan muat file CSV

Langkah ini mendefinisikan variabel untuk digunakan dalam tutorial ini dan kemudian memuat file CSV yang berisi data nama bayi dari health.data.ny.gov ke dalam volume Katalog Unity Anda. Anda memerlukan nama katalog, skema, dan volume dari Unity Catalog.

Petunjuk / Saran

Jika Anda tidak mengetahui nama katalog dan skema Anda, klik Ikon data.Katalog di bilah samping. Katalog ruang kerja memiliki nama yang sama dengan ruang kerja Anda dan dicantumkan di panel katalog. Perluas untuk melihat skema yang tersedia. Pengguna Edisi Gratis dan uji coba gratis dapat menggunakan katalog ruang kerja dan skema default.

Jika Anda tidak memiliki volume, buat dengan menjalankan perintah berikut ini di sel buku catatan (ganti <catalog_name> dan <schema_name> dengan nilai Anda):

CREATE VOLUME IF NOT EXISTS <catalog_name>.<schema_name>.my_volume
  1. Buka buku catatan baru dengan mengklik Ikon Baru ikon. Untuk mempelajari cara menavigasi buku catatan Azure Databricks, lihat Mengkustomisasi tampilan buku catatan.

  2. Salin dan tempel kode berikut ke dalam sel buku catatan kosong baru. Ganti <catalog-name>, <schema-name>, dan <volume-name> dengan katalog, skema, dan nama volume untuk volume Katalog Unity. Ganti <table_name> dengan nama tabel pilihan Anda. Anda memuat data nama bayi ke dalam tabel ini nanti dalam tutorial ini.

    Phyton

    catalog = "<catalog_name>"
    schema = "<schema_name>"
    volume = "<volume_name>"
    download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name = "rows.csv"
    table_name = "<table_name>"
    path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume
    path_table = catalog + "." + schema
    print(path_table) # Show the complete path
    print(path_volume) # Show the complete path
    

    Scala

    val catalog = "<catalog_name>"
    val schema = "<schema_name>"
    val volume = "<volume_name>"
    val downloadUrl = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    val fileName = "rows.csv"
    val tableName = "<table_name>"
    val pathVolume = s"/Volumes/$catalog/$schema/$volume"
    val pathTable = s"$catalog.$schema"
    print(pathVolume) // Show the complete path
    print(pathTable) // Show the complete path
    

    R

    catalog <- "<catalog_name>"
    schema <- "<schema_name>"
    volume <- "<volume_name>"
    download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name <- "rows.csv"
    table_name <- "<table_name>"
    path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "")
    path_table <- paste(catalog, ".", schema, sep = "")
    print(path_volume) # Show the complete path
    print(path_table) # Show the complete path
    
  3. Tekan Shift+Enter untuk menjalankan sel dan buat sel kosong baru.

  4. Unggah file CSV ke dalam volume Anda. Pilih salah satu metode berikut:

    • Unggah menggunakan UI ruang kerja — Gunakan metode ini jika Anda berada di Databricks Free Edition, atau jika pengunduhan kode dalam opsi B gagal dengan kesalahan jaringan. Edisi Gratis dan lingkungan komputasi tanpa server lainnya membatasi akses internet keluar, jadi Anda harus mengunggah file dari komputer lokal Anda.
    • Unduh menggunakan kode — Gunakan metode ini jika lingkungan komputasi Anda memiliki akses internet keluar.

    Opsi A: Unggah menggunakan UI ruang kerja

    1. Di komputer lokal Anda, buka health.data.ny.gov/api/views/jxy9-yhdk/rows.csv di browser Anda. File diunduh ke komputer Anda sebagai rows.csv, yang cocok dengan variabel file_name yang telah ditentukan sebelumnya.
    2. Kembali ke ruang kerja Azure Databricks Anda. Di bar samping, klik Ikon Baru> Tambahkan atau unggah data Baru.
    3. Klik Unggah file ke volume.
    4. Klik telusuri dan pilih rows.csv file, atau seret dan letakkan ke area unggahan.
    5. Di bawah Volume tujuan, pilih volume yang Anda tentukan di atas.
    6. Setelah unggahan selesai, kembali ke buku catatan Anda dan lanjutkan dengan Langkah 2.

    Untuk detail selengkapnya tentang mengunggah file, lihat Mengunggah file ke volume Katalog Unity.

    Opsi B: Unduh menggunakan kode

    Salin dan tempel kode berikut ke dalam sel buku catatan kosong baru. Kode ini menyalin rows.csv file dari health.data.ny.gov ke volume Unity Catalog Anda menggunakan perintah Databricks dbutils. Tekan Shift+Enter untuk menjalankan sel lalu berpindah ke sel berikutnya.

    Phyton

    dbutils.fs.cp(f"{download_url}", f"{path_volume}/{file_name}")
    

    Scala

    dbutils.fs.cp(downloadUrl, s"$pathVolume/$fileName")
    

    R

    dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
    

Langkah 2: Membuat DataFrame

Langkah ini membuat DataFrame bernama df1 dengan data pengujian lalu menampilkan kontennya.

  1. Salin dan tempel kode berikut ke dalam sel buku catatan kosong baru. Kode ini membuat DataFrame dengan data pengujian, lalu menampilkan konten dan skema DataFrame.

    Phyton

    data = [[2021, "test", "Albany", "M", 42]]
    columns = ["Year", "First_Name", "County", "Sex", "Count"]
    
    # highlight-next-line
    df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int")
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    Scala

    val data = Seq((2021, "test", "Albany", "M", 42))
    val columns = Seq("Year", "First_Name", "County", "Sex", "Count")
    
    // highlight-next-line
    val df1 = data.toDF(columns: _*)
    display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization.
    // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    R

    # Load the SparkR package that is already preinstalled on the cluster.
    library(SparkR)
    
    data <- data.frame(
      Year = as.integer(c(2021)),
      First_Name = c("test"),
      County = c("Albany"),
      Sex = c("M"),
      Count = as.integer(c(42))
    )
    
    # highlight-next-line
    df1 <- createDataFrame(data)
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
    
  2. Tekan Shift+Enter untuk menjalankan sel lalu berpindah ke sel berikutnya.

Langkah 3: Memuat data ke dalam DataFrame dari file CSV

Langkah ini membuat DataFrame bernama df_csv dari file CSV yang sebelumnya Anda muat ke dalam volume Unity Catalog Anda. Lihat spark.read.csv.

  1. Salin dan tempel kode berikut ke dalam sel buku catatan kosong baru. Kode ini memuat data nama bayi ke dalam DataFrame df_csv dari file CSV lalu menampilkan konten DataFrame.

    Phyton

    df_csv = spark.read.csv(f"{path_volume}/{file_name}",
        header=True,
        inferSchema=True,
        sep=",")
    display(df_csv)
    

    Scala

    val dfCsv = spark.read
        .option("header", "true")
        .option("inferSchema", "true")
        .option("delimiter", ",")
        .csv(s"$pathVolume/$fileName")
    
    display(dfCsv)
    

    R

    df_csv <- read.df(paste(path_volume, "/", file_name, sep=""),
        source="csv",
        header = TRUE,
        inferSchema = TRUE,
        delimiter = ",")
    
    display(df_csv)
    
  2. Tekan Shift+Enter untuk menjalankan sel lalu berpindah ke sel berikutnya.

Anda dapat memuat data dari banyak format file yang didukung.

Langkah 4: Melihat dan berinteraksi dengan DataFrame Anda

Lihat dan berinteraksi dengan nama bayi Anda DataFrames menggunakan metode berikut.

Pelajari cara menampilkan skema Apache Spark DataFrame. Apache Spark menggunakan istilah skema untuk merujuk ke nama dan jenis data kolom di DataFrame.

Catatan

Azure Databricks juga menggunakan istilah skema untuk menjelaskan kumpulan tabel yang terdaftar ke katalog.

  1. Salin dan tempel kode berikut ke dalam sel buku catatan kosong. Kode ini menampilkan skema DataFrame Anda dengan metode .printSchema() untuk melihat skema dua DataFrame - guna mempersiapkan penyatuan kedua DataFrame tersebut.

    Phyton

    df_csv.printSchema()
    df1.printSchema()
    

    Scala

    dfCsv.printSchema()
    df1.printSchema()
    

    R

    printSchema(df_csv)
    printSchema(df1)
    
  2. Tekan Shift+Enter untuk menjalankan sel lalu berpindah ke sel berikutnya.

Ganti nama kolom di DataFrame

Pelajari cara mengganti nama kolom di DataFrame.

  1. Salin dan tempel kode berikut ke dalam sel buku catatan kosong. Kode ini mengganti nama kolom di df1_csv DataFrame agar sesuai dengan kolom masing-masing di df1 DataFrame. Kode ini menggunakan metode Apache Spark withColumnRenamed() .

    Phyton

    df_csv = df_csv.withColumnRenamed("First Name", "First_Name")
    df_csv.printSchema()
    

    Scala

    val dfCsvRenamed = dfCsv.withColumnRenamed("First Name", "First_Name")
    // when modifying a DataFrame in Scala, you must assign it to a new variable
    dfCsvRenamed.printSchema()
    

    R

    df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name")
    printSchema(df_csv)
    
  2. Tekan Shift+Enter untuk menjalankan sel lalu berpindah ke sel berikutnya.

Menggabungkan DataFrames

Pelajari cara membuat DataFrame baru yang menambahkan baris satu DataFrame ke DataFrame lainnya.

  1. Salin dan tempel kode berikut ke dalam sel buku catatan kosong. Kode ini menggunakan metode Apache Spark union() untuk menggabungkan konten DataFrame df pertama Anda dengan DataFrame df_csv yang berisi data nama bayi yang dimuat dari file CSV.

    Phyton

    df = df1.union(df_csv)
    display(df)
    

    Scala

    val df = df1.union(dfCsvRenamed)
    display(df)
    

    R

    display(df <- union(df1, df_csv))
    
  2. Tekan Shift+Enter untuk menjalankan sel lalu berpindah ke sel berikutnya.

Memfilter baris dalam DataFrame

Temukan nama bayi paling populer dalam himpunan data Anda dengan memfilter baris, menggunakan metode apache Spark .filter() atau .where(). Gunakan pemfilteran untuk memilih subset baris untuk dikembalikan atau diubah dalam DataFrame. Tidak ada perbedaan dalam performa atau sintaks, seperti yang terlihat dalam contoh berikut.

Menggunakan metode .filter()

  1. Salin dan tempel kode berikut ke dalam sel buku catatan kosong. Kode ini menggunakan metode Apache Spark .filter() untuk menampilkan baris tersebut di DataFrame dengan jumlah lebih dari 50.

    Phyton
    display(df.filter(df["Count"] > 50))
    
    Scala
    display(df.filter(df("Count") > 50))
    
    R
    display(filteredDF <- filter(df, df$Count > 50))
    
  2. Tekan Shift+Enter untuk menjalankan sel lalu berpindah ke sel berikutnya.

Menggunakan metode .where()

  1. Salin dan tempel kode berikut ke dalam sel buku catatan kosong. Kode ini menggunakan metode Apache Spark .where() untuk menampilkan baris tersebut di DataFrame dengan jumlah lebih dari 50.

    Phyton
    display(df.where(df["Count"] > 50))
    
    Scala
    display(df.where(df("Count") > 50))
    
    R
    display(filtered_df <- where(df, df$Count > 50))
    
  2. Tekan Shift+Enter untuk menjalankan sel lalu berpindah ke sel berikutnya.

Memilih kolom dari DataFrame dan mengurutkan menurut frekuensi

Pelajari tentang frekuensi nama bayi mana dengan metode select() untuk menentukan kolom dari DataFrame yang akan dikembalikan. Gunakan Apache Spark orderby dan desc fungsi untuk mengurutkan hasilnya.

Modul pyspark.sql untuk Apache Spark menyediakan dukungan untuk fungsi SQL. Di antara fungsi-fungsi ini yang kita gunakan dalam tutorial ini adalah fungsi Apache Spark orderBy(), desc(), dan expr() . Anda mengaktifkan penggunaan fungsi-fungsi ini dengan mengimpornya ke sesi Anda sesuai kebutuhan.

  1. Salin dan tempel kode berikut ke dalam sel buku catatan kosong. Kode ini mengimpor fungsi desc() dan kemudian menggunakan metode Apache Spark select() dan fungsi Apache Spark orderBy() serta desc() untuk menampilkan nama yang paling umum dan jumlahnya dalam urutan menurun.

    Phyton

    from pyspark.sql.functions import desc
    display(df.select("First_Name", "Count").orderBy(desc("Count")))
    

    Scala

    import org.apache.spark.sql.functions.desc
    display(df.select("First_Name", "Count").orderBy(desc("Count")))
    

    R

    display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))
    
  2. Tekan Shift+Enter untuk menjalankan sel lalu berpindah ke sel berikutnya.

Membuat subset DataFrame

Pelajari cara membuat DataFrame subset dari DataFrame yang sudah ada.

  1. Salin dan tempel kode berikut ke dalam sel buku catatan kosong. Kode ini menggunakan metode Apache Spark filter untuk membuat DataFrame baru yang membatasi data menurut tahun, jumlah, dan jenis kelamin. Ini menggunakan metode apache Spark select() untuk membatasi kolom. Ini juga menggunakan Apache Spark orderBy() dan desc() fungsi untuk mengurutkan DataFrame baru menurut hitungan.

    Phyton

    subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
    display(subsetDF)
    

    Scala

    val subsetDF = df.filter((df("Year") === 2009) && (df("Count") > 100) && (df("Sex") === "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
    
    display(subsetDF)
    

    R

    subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count")
    display(subsetDF)
    
  2. Tekan Shift+Enter untuk menjalankan sel lalu berpindah ke sel berikutnya.

Langkah 5: Simpan DataFrame

Pelajari cara menyimpan DataFrame,. Anda dapat menyimpan DataFrame Anda ke tabel atau menulis DataFrame ke file atau beberapa file.

Menyimpan DataFrame ke tabel

Azure Databricks menggunakan format Delta Lake untuk semua tabel secara default. Untuk menyimpan DataFrame, Anda harus memiliki hak istimewa tabel CREATE pada katalog dan skema.

  1. Salin dan tempel kode berikut ke dalam sel buku catatan kosong. Kode ini menyimpan konten DataFrame ke tabel menggunakan variabel yang Anda tentukan di awal tutorial ini.

    Phyton

    df.write.mode("overwrite").saveAsTable(f"{path_table}.{table_name}")
    

    Scala

    df.write.mode("overwrite").saveAsTable(s"$pathTable" + "." + s"$tableName")
    

    R

    saveAsTable(df, paste(path_table, ".", table_name), mode = "overwrite")
    
  2. Tekan Shift+Enter untuk menjalankan sel lalu berpindah ke sel berikutnya.

Sebagian besar aplikasi Apache Spark bekerja pada himpunan data besar dan secara terdistribusi. Apache Spark menuliskan kumpulan file dalam sebuah direktori, bukan satu file tunggal. Delta Lake membagi folder dan file Parquet. Banyak sistem data dapat membaca direktori file ini. Azure Databricks merekomendasikan penggunaan tabel melalui jalur file untuk sebagian besar aplikasi.

Menyimpan DataFrame ke file JSON

  1. Salin dan tempel kode berikut ke dalam sel buku catatan kosong. Kode ini menyimpan DataFrame ke direktori file JSON.

    Phyton

    df.write.format("json").mode("overwrite").save("/tmp/json_data")
    

    Scala

    df.write.format("json").mode("overwrite").save("/tmp/json_data")
    

    R

    write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")
    
  2. Tekan Shift+Enter untuk menjalankan sel lalu berpindah ke sel berikutnya.

Membaca DataFrame dari file JSON

Pelajari cara menggunakan metode Apache Spark spark.read.format() untuk membaca data JSON dari direktori ke dalam DataFrame.

  1. Salin dan tempel kode berikut ke dalam sel buku catatan kosong. Kode ini menampilkan file JSON yang Anda simpan dalam contoh sebelumnya.

    Phyton

    display(spark.read.format("json").json("/tmp/json_data"))
    

    Scala

    display(spark.read.format("json").json("/tmp/json_data"))
    

    R

    display(read.json("/tmp/json_data"))
    
  2. Tekan Shift+Enter untuk menjalankan sel lalu berpindah ke sel berikutnya.

Tugas tambahan: Menjalankan kueri SQL di PySpark, Scala, dan R

Apache Spark DataFrames menyediakan opsi berikut untuk menggabungkan SQL dengan PySpark, Scala, dan R. Anda bisa menjalankan kode berikut di buku catatan yang sama dengan yang Anda buat untuk tutorial ini.

Menentukan kolom sebagai kueri SQL

Pelajari cara menggunakan metode Apache Spark selectExpr() . Ini adalah varian select() metode yang menerima ekspresi SQL dan mengembalikan DataFrame yang diperbarui. Metode ini memungkinkan Anda menggunakan ekspresi SQL, seperti upper.

  1. Salin dan tempel kode berikut ke dalam sel buku catatan kosong. Kode ini menggunakan metode apache Spark selectExpr() dan ekspresi upper SQL untuk mengonversi kolom string menjadi huruf besar (dan mengganti nama kolom).

    Phyton

    display(df.selectExpr("Count", "upper(County) as big_name"))
    

    Scala

    display(df.selectExpr("Count", "upper(County) as big_name"))
    

    R

    display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))
    
  2. Tekan Shift+Enter untuk menjalankan sel lalu berpindah ke sel berikutnya.

Menggunakan expr() untuk menggunakan sintaks SQL untuk kolom

Pelajari cara mengimpor dan menggunakan fungsi apache Spark expr() untuk menggunakan sintaks SQL di mana saja kolom akan ditentukan.

  1. Salin dan tempel kode berikut ke dalam sel buku catatan kosong. Kode ini mengimpor fungsi expr() lalu menggunakan fungsi apache Spark expr() dan ekspresi lower SQL untuk mengonversi kolom string menjadi huruf kecil (dan mengganti nama kolom).

    Phyton

    from pyspark.sql.functions import expr
    display(df.select("Count", expr("lower(County) as little_name")))
    

    Scala

    import org.apache.spark.sql.functions.{col, expr}
    // Scala requires us to import the col() function as well as the expr() function
    
    display(df.select(col("Count"), expr("lower(County) as little_name")))
    

    R

    display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name"))
    # expr() function is not supported in R, selectExpr in SparkR replicates this functionality
    
  2. Tekan Shift+Enter untuk menjalankan sel lalu berpindah ke sel berikutnya.

Menjalankan kueri SQL arbitrer menggunakan fungsi spark.sql()

Pelajari cara menggunakan fungsi Apache Spark spark.sql() untuk menjalankan kueri SQL arbitrer.

  1. Salin dan tempel kode berikut ke dalam sel buku catatan kosong. Kode ini menggunakan fungsi Apache Spark spark.sql() untuk mengkueri tabel SQL menggunakan sintaks SQL.

    Phyton

    display(spark.sql(f"SELECT * FROM {path_table}.{table_name}"))
    

    Scala

    display(spark.sql(s"SELECT * FROM $pathTable.$tableName"))
    

    R

    display(sql(paste("SELECT * FROM", path_table, ".", table_name)))
    
  2. Tekan Shift+Enter untuk menjalankan sel lalu berpindah ke sel berikutnya.

Notebook tutorial DataFrame

Notebook berikut menyertakan contoh kueri dari tutorial ini.

Phyton

Tutorial DataFrames menggunakan Python

Dapatkan buku catatan

Scala

Tutorial DataFrames menggunakan Scala

Dapatkan buku catatan

R

Tutorial DataFrames menggunakan R

Dapatkan buku catatan

Sumber Daya Tambahan: