Bagikan melalui


Tutorial: Memuat dan mengubah data menggunakan Apache Spark DataFrames

Tutorial ini menunjukkan kepada Anda cara memuat dan mengubah data menggunakan API DataFrame Apache Spark Python (PySpark), API DataFrame Apache Spark Scala, dan API SparkR SparkDataFrame di Azure Databricks.

Pada akhir tutorial ini, Anda akan memahami apa itu DataFrame dan terbiasa dengan tugas-tugas berikut:

Python

Lihat juga referensi API Apache Spark PySpark.

Scala

Lihat juga referensi API Apache Spark Scala.

R

Lihat juga referensi Apache SparkR API.

Apa itu DataFrame?

DataFrame adalah struktur data berlabel dua dimensi dengan kolom dari jenis yang berpotensi berbeda. Anda dapat memikirkan DataFrame seperti spreadsheet, tabel SQL, atau kamus objek seri. Apache Spark DataFrames menyediakan serangkaian fungsi yang kaya (pilih kolom, filter, gabung, agregat) yang memungkinkan Anda menyelesaikan masalah analisis data umum secara efisien.

Apache Spark DataFrames adalah abstraksi yang dibangun di atas Himpunan Data Terdistribusi Tangguh (RDD). Spark DataFrames dan Spark SQL menggunakan mesin perencanaan dan pengoptimalan terpadu, memungkinkan Anda mendapatkan performa yang hampir identik di semua bahasa yang didukung di Azure Databricks (Python, SQL, Scala, dan R).

Persyaratan

Untuk menyelesaikan tutorial berikut, Anda harus memenuhi persyaratan berikut:

  • Untuk menggunakan contoh dalam tutorial ini, ruang kerja Anda harus mengaktifkan Katalog Unity.

  • Contoh dalam tutorial ini menggunakan volume Katalog Unity untuk menyimpan data sampel. Untuk menggunakan contoh ini, buat volume dan gunakan katalog, skema, dan nama volume volume tersebut untuk mengatur jalur volume yang digunakan oleh contoh.

  • Anda harus memiliki izin berikut di Katalog Unity:

    • READ VOLUME dan WRITE VOLUME, atau ALL PRIVILEGES untuk volume yang digunakan untuk tutorial ini.
    • USE SCHEMA atau ALL PRIVILEGES untuk skema yang digunakan untuk tutorial ini.
    • USE CATALOG atau ALL PRIVILEGES untuk katalog yang digunakan untuk tutorial ini.

    Untuk mengatur izin ini, lihat administrator Databricks atau hak istimewa Unity Catalog dan objek yang dapat diamankan.

Tip

Untuk buku catatan lengkap untuk artikel ini, lihat Buku catatan tutorial DataFrame.

Langkah 1: Tentukan variabel dan muat file CSV

Langkah ini menentukan variabel untuk digunakan dalam tutorial ini dan kemudian memuat file CSV yang berisi data nama bayi dari health.data.ny.gov ke dalam volume Katalog Unity Anda.

  1. Buka buku catatan baru dengan mengklik Ikon Baru ikon. Untuk mempelajari cara menavigasi buku catatan Azure Databricks, lihat Antarmuka dan kontrol buku catatan Databricks.

  2. Salin dan tempel kode berikut ke dalam sel buku catatan kosong baru. Ganti <catalog-name>, <schema-name>, dan <volume-name> dengan katalog, skema, dan nama volume untuk volume Katalog Unity. Ganti <table_name> dengan nama tabel pilihan Anda. Anda akan memuat data nama bayi ke dalam tabel ini nanti dalam tutorial ini.

    Python

    catalog = "<catalog_name>"
    schema = "<schema_name>"
    volume = "<volume_name>"
    download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name = "rows.csv"
    table_name = "<table_name>"
    path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume
    path_table = catalog + "." + schema
    print(path_table) # Show the complete path
    print(path_volume) # Show the complete path
    

    Scala

    val catalog = "<catalog_name>"
    val schema = "<schema_name>"
    val volume = "<volume_name>"
    val downloadUrl = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    val fileName = "rows.csv"
    val tableName = "<table_name>"
    val pathVolume = s"/Volumes/$catalog/$schema/$volume"
    val pathTable = s"$catalog.$schema"
    print(pathVolume) // Show the complete path
    print(pathTable) // Show the complete path
    

    R

    catalog <- "<catalog_name>"
    schema <- "<schema_name>"
    volume <- "<volume_name>"
    download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name <- "rows.csv"
    table_name <- "<table_name>"
    path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "")
    path_table <- paste(catalog, ".", schema, sep = "")
    print(path_volume) # Show the complete path
    print(path_table) # Show the complete path
    
  3. Tekan Shift+Enter untuk menjalankan sel dan buat sel kosong baru.

  4. Salin dan tempel kode berikut ke dalam sel buku catatan kosong baru. Kode ini menyalin rows.csv file dari health.data.ny.gov ke volume Unity Catalog Anda menggunakan perintah Databricks dbutuils .

    Python

    dbutils.fs.cp(f"{download_url}", f"{path_volume}/{file_name}")
    

    Scala

    dbutils.fs.cp(downloadUrl, s"$pathVolume/$fileName")
    

    R

    dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
    
  5. Tekan Shift+Enter untuk menjalankan sel lalu berpindah ke sel berikutnya.

Langkah 2: Membuat DataFrame

Langkah ini membuat DataFrame bernama df1 dengan data pengujian lalu menampilkan kontennya.

  1. Salin dan tempel kode berikut ke dalam sel buku catatan kosong baru. Kode ini membuat DataFrame dengan data pengujian, lalu menampilkan konten dan skema DataFrame.

    Python

    data = [[2021, "test", "Albany", "M", 42]]
    columns = ["Year", "First_Name", "County", "Sex", "Count"]
    
    df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int")
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    Scala

    val data = Seq((2021, "test", "Albany", "M", 42))
    val columns = Seq("Year", "First_Name", "County", "Sex", "Count")
    
    val df1 = data.toDF(columns: _*)
    display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization.
    // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    R

    # Load the SparkR package that is already preinstalled on the cluster.
    library(SparkR)
    
    data <- data.frame(
      Year = as.integer(c(2021)),
      First_Name = c("test"),
      County = c("Albany"),
      Sex = c("M"),
      Count = as.integer(c(42))
    )
    
    df1 <- createDataFrame(data)
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
    
  2. Tekan Shift+Enter untuk menjalankan sel lalu berpindah ke sel berikutnya.

Langkah 3: Memuat data ke dalam DataFrame dari file CSV

Langkah ini membuat DataFrame bernama df_csv dari file CSV yang sebelumnya Anda muat ke dalam volume Katalog Unity Anda. Lihat spark.read.csv.

  1. Salin dan tempel kode berikut ke dalam sel buku catatan kosong baru. Kode ini memuat data nama bayi ke dalam DataFrame df_csv dari file CSV lalu menampilkan konten DataFrame.

    Python

    df_csv = spark.read.csv(f"{path_volume}/{file_name}",
        header=True,
        inferSchema=True,
        sep=",")
    display(df_csv)
    

    Scala

    val dfCsv = spark.read
        .option("header", "true")
        .option("inferSchema", "true")
        .option("delimiter", ",")
        .csv(s"$pathVolume/$fileName")
    
    display(dfCsv)
    

    R

    df_csv <- read.df(paste(path_volume, "/", file_name, sep=""),
        source="csv",
        header = TRUE,
        inferSchema = TRUE,
        delimiter = ",")
    
    display(df_csv)
    
  2. Tekan Shift+Enter untuk menjalankan sel lalu berpindah ke sel berikutnya.

Anda dapat memuat data dari banyak format file yang didukung.

Langkah 4: Melihat dan berinteraksi dengan DataFrame Anda

Lihat dan berinteraksi dengan nama bayi Anda DataFrames menggunakan metode berikut.

Pelajari cara menampilkan skema Apache Spark DataFrame. Apache Spark menggunakan skema istilah untuk merujuk ke nama dan jenis data kolom di DataFrame.

Catatan

Azure Databricks juga menggunakan istilah skema untuk menjelaskan kumpulan tabel yang terdaftar ke katalog.

  1. Salin dan tempel kode berikut ke dalam sel buku catatan kosong. Kode ini menunjukkan skema DataFrames Anda dengan .printSchema() metode untuk melihat skema dua DataFrames - untuk bersiap menyatukan dua DataFrames.

    Python

    df_csv.printSchema()
    df1.printSchema()
    

    Scala

    dfCsv.printSchema()
    df1.printSchema()
    

    R

    printSchema(df_csv)
    printSchema(df1)
    
  2. Tekan Shift+Enter untuk menjalankan sel lalu berpindah ke sel berikutnya.

Ganti nama kolom di DataFrame

Pelajari cara mengganti nama kolom di DataFrame.

  1. Salin dan tempel kode berikut ke dalam sel buku catatan kosong. Kode ini mengganti nama kolom di df1_csv DataFrame agar sesuai dengan kolom masing-masing dalam df1 DataFrame. Kode ini menggunakan metode Apache Spark withColumnRenamed() .

    Python

    df_csv = df_csv.withColumnRenamed("First Name", "First_Name")
    df_csv.printSchema
    

    Scala

    val dfCsvRenamed = dfCsv.withColumnRenamed("First Name", "First_Name")
    // when modifying a DataFrame in Scala, you must assign it to a new variable
    dfCsvRenamed.printSchema()
    

    R

    df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name")
    printSchema(df_csv)
    
  2. Tekan Shift+Enter untuk menjalankan sel lalu berpindah ke sel berikutnya.

Menggabungkan DataFrame

Pelajari cara membuat DataFrame baru yang menambahkan baris satu DataFrame ke DataFrame lainnya.

  1. Salin dan tempel kode berikut ke dalam sel buku catatan kosong. Kode ini menggunakan metode Apache Spark union() untuk menggabungkan konten DataFrame df pertama Anda dengan DataFrame df_csv yang berisi data nama bayi yang dimuat dari file CSV.

    Python

    df = df1.union(df_csv)
    display(df)
    

    Scala

    val df = df1.union(dfCsvRenamed)
    display(df)
    

    R

    display(df <- union(df1, df_csv))
    
  2. Tekan Shift+Enter untuk menjalankan sel lalu berpindah ke sel berikutnya.

Memfilter baris dalam DataFrame

Temukan nama bayi paling populer dalam himpunan data Anda dengan memfilter baris, menggunakan Apache Spark .filter() atau .where() metode. Gunakan pemfilteran untuk memilih subset baris untuk dikembalikan atau diubah dalam DataFrame. Tidak ada perbedaan dalam performa atau sintaks, seperti yang terlihat dalam contoh berikut.

Menggunakan metode .filter()

  1. Salin dan tempel kode berikut ke dalam sel buku catatan kosong. Kode ini menggunakan metode Apache Spark .filter() untuk menampilkan baris tersebut di DataFrame dengan jumlah lebih dari 50.

    Python
    display(df.filter(df["Count"] > 50))
    
    Scala
    display(df.filter(df("Count") > 50))
    
    R
    display(filteredDF <- filter(df, df$Count > 50))
    
  2. Tekan Shift+Enter untuk menjalankan sel lalu berpindah ke sel berikutnya.

Menggunakan metode .where()

  1. Salin dan tempel kode berikut ke dalam sel buku catatan kosong. Kode ini menggunakan metode Apache Spark .where() untuk menampilkan baris tersebut di DataFrame dengan jumlah lebih dari 50.

    Python
    display(df.where(df["Count"] > 50))
    
    Scala
    display(df.where(df("Count") > 50))
    
    R
    display(filtered_df <- where(df, df$Count > 50))
    
  2. Tekan Shift+Enter untuk menjalankan sel lalu berpindah ke sel berikutnya.

Memilih kolom dari DataFrame dan mengurutkan menurut frekuensi

Pelajari tentang frekuensi nama bayi mana dengan select() metode untuk menentukan kolom dari DataFrame yang akan dikembalikan. Gunakan Apache Spark orderby dan desc fungsi untuk mengurutkan hasilnya.

Modul pyspark.sql untuk Apache Spark menyediakan dukungan untuk fungsi SQL. Di antara fungsi-fungsi ini yang kita gunakan dalam tutorial ini adalah fungsi Apache Spark orderBy(), desc(), dan expr() . Anda mengaktifkan penggunaan fungsi-fungsi ini dengan mengimpornya ke sesi Anda sesuai kebutuhan.

  1. Salin dan tempel kode berikut ke dalam sel buku catatan kosong. Kode ini mengimpor desc() fungsi dan kemudian menggunakan metode Apache Spark select() dan Apache Spark orderBy() dan desc() fungsi untuk menampilkan nama yang paling umum dan jumlahnya dalam urutan menurut.

    Python

    from pyspark.sql.functions import desc
    display(df.select("First_Name", "Count").orderBy(desc("Count")))
    

    Scala

    import org.apache.spark.sql.functions.desc
    display(df.select("First_Name", "Count").orderBy(desc("Count")))
    

    R

    display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))
    
  2. Tekan Shift+Enter untuk menjalankan sel lalu berpindah ke sel berikutnya.

Membuat DataFrame subset

Pelajari cara membuat DataFrame subset dari DataFrame yang sudah ada.

  1. Salin dan tempel kode berikut ke dalam sel buku catatan kosong. Kode ini menggunakan metode Apache Spark filter untuk membuat DataFrame baru yang membatasi data menurut tahun, jumlah, dan jenis kelamin. Ini menggunakan metode Apache Spark select() untuk membatasi kolom. Ini juga menggunakan Apache Spark orderBy() dan desc() fungsi untuk mengurutkan DataFrame baru menurut hitungan.

    Python

    subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
    display(subsetDF)
    

    Scala

    val subsetDF = df.filter((df("Year") === 2009) && (df("Count") > 100) && (df("Sex") === "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
    
    display(subsetDF)
    

    R

    subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count")
    display(subsetDF)
    
  2. Tekan Shift+Enter untuk menjalankan sel lalu berpindah ke sel berikutnya.

Langkah 5: Simpan DataFrame

Pelajari cara menyimpan DataFrame,. Anda dapat menyimpan DataFrame Anda ke tabel atau menulis DataFrame ke file atau beberapa file.

Menyimpan DataFrame ke tabel

Azure Databricks menggunakan format Delta Lake untuk semua tabel secara default. Untuk menyimpan DataFrame, Anda harus memiliki CREATE hak istimewa tabel pada katalog dan skema.

  1. Salin dan tempel kode berikut ke dalam sel buku catatan kosong. Kode ini menyimpan konten DataFrame ke tabel menggunakan variabel yang Anda tentukan di awal tutorial ini.

    Python

    df.write.mode("overwrite").saveAsTable(f"{path_table}.{table_name}")
    

    Scala

    df.write.mode("overwrite").saveAsTable(s"$pathTable" + "." + s"$tableName")
    

    R

    saveAsTable(df, paste(path_table, ".", table_name), mode = "overwrite")
    
  2. Tekan Shift+Enter untuk menjalankan sel lalu berpindah ke sel berikutnya.

Sebagian besar aplikasi Apache Spark bekerja pada himpunan data besar dan secara terdistribusi. Apache Spark menulis direktori file daripada satu file. Delta Lake membagi folder dan file Parquet. Banyak sistem data dapat membaca direktori file ini. Azure Databricks merekomendasikan penggunaan tabel melalui jalur file untuk sebagian besar aplikasi.

Menyimpan DataFrame ke file JSON

  1. Salin dan tempel kode berikut ke dalam sel buku catatan kosong. Kode ini menyimpan DataFrame ke direktori file JSON.

    Python

    df.write.format("json").mode("overwrite").save("/tmp/json_data")
    

    Scala

    df.write.format("json").mode("overwrite").save("/tmp/json_data")
    

    R

    write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")
    
  2. Tekan Shift+Enter untuk menjalankan sel lalu berpindah ke sel berikutnya.

Membaca DataFrame dari file JSON

Pelajari cara menggunakan metode Apache Spark spark.read.format() untuk membaca data JSON dari direktori ke dalam DataFrame.

  1. Salin dan tempel kode berikut ke dalam sel buku catatan kosong. Kode ini menampilkan file JSON yang Anda simpan dalam contoh sebelumnya.

    Python

    display(spark.read.format("json").json("/tmp/json_data"))
    

    Scala

    display(spark.read.format("json").json("/tmp/json_data"))
    

    R

    display(read.json("/tmp/json_data"))
    
  2. Tekan Shift+Enter untuk menjalankan sel lalu berpindah ke sel berikutnya.

Tugas tambahan: Menjalankan kueri SQL di PySpark, Scala, dan R

Apache Spark DataFrames menyediakan opsi berikut untuk menggabungkan SQL dengan PySpark, Scala, dan R. Anda bisa menjalankan kode berikut di buku catatan yang sama dengan yang Anda buat untuk tutorial ini.

Menentukan kolom sebagai kueri SQL

Pelajari cara menggunakan metode Apache Spark selectExpr() . Ini adalah varian select() metode yang menerima ekspresi SQL dan mengembalikan DataFrame yang diperbarui. Metode ini memungkinkan Anda menggunakan ekspresi SQL, seperti upper.

  1. Salin dan tempel kode berikut ke dalam sel buku catatan kosong. Kode ini menggunakan metode Apache Spark selectExpr() dan ekspresi SQL upper untuk mengonversi kolom string menjadi huruf besar (dan mengganti nama kolom).

    Python

    display(df.selectExpr("Count", "upper(County) as big_name"))
    

    Scala

    display(df.selectExpr("Count", "upper(County) as big_name"))
    

    R

    display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))
    
  2. Tekan Shift+Enter untuk menjalankan sel lalu berpindah ke sel berikutnya.

Gunakan expr() untuk menggunakan sintaks SQL untuk kolom

Pelajari cara mengimpor dan menggunakan fungsi Apache Spark expr() untuk menggunakan sintaks SQL di mana saja kolom akan ditentukan.

  1. Salin dan tempel kode berikut ke dalam sel buku catatan kosong. Kode ini mengimpor expr() fungsi lalu menggunakan fungsi Apache Spark expr() dan ekspresi SQL lower untuk mengonversi kolom string menjadi huruf kecil (dan mengganti nama kolom).

    Python

    from pyspark.sql.functions import expr
    display(df.select("Count", expr("lower(County) as little_name")))
    

    Scala

    import org.apache.spark.sql.functions.{col, expr}
    // Scala requires us to import the col() function as well as the expr() function
    
    display(df.select(col("Count"), expr("lower(County) as little_name")))
    

    R

    display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name"))
    # expr() function is not supported in R, selectExpr in SparkR replicates this functionality
    
  2. Tekan Shift+Enter untuk menjalankan sel lalu berpindah ke sel berikutnya.

Menjalankan kueri SQL arbitrer menggunakan fungsi spark.sql()

Pelajari cara menggunakan fungsi Apache Spark spark.sql() untuk menjalankan kueri SQL arbitrer.

  1. Salin dan tempel kode berikut ke dalam sel buku catatan kosong. Kode ini menggunakan fungsi Apache Spark untuk mengkueri spark.sql() tabel SQL menggunakan sintaks SQL.

    Python

    display(spark.sql(f"SELECT * FROM {path_table}.{table_name}"))
    

    Scala

    display(spark.sql(s"SELECT * FROM $pathTable.$tableName"))
    

    R

    display(sql(paste("SELECT * FROM", path_table, ".", table_name)))
    
  2. Tekan Shift+Enter untuk menjalankan sel lalu berpindah ke sel berikutnya.

Notebook tutorial DataFrame

Notebook berikut menyertakan contoh kueri dari tutorial ini.

Python

Tutorial DataFrames menggunakan Python

Dapatkan buku catatan

Scala

Tutorial DataFrames menggunakan Scala

Dapatkan buku catatan

R

Tutorial DataFrames menggunakan R

Dapatkan buku catatan

Sumber Daya Tambahan: