Bagikan melalui


Tutorial: Azure Data Lake Storage, Azure Databricks & Spark

Tutorial ini menunjukkan cara menyambungkan kluster Azure Databricks Anda ke data yang disimpan di akun penyimpanan Azure yang mengaktifkan Azure Data Lake Storage. Koneksi ini memungkinkan Anda menjalankan kueri dan analitik secara asli dari kluster Anda pada data Anda.

Dalam tutorial ini, Anda akan:

  • Menyerap data yang tidak terstruktur ke akun penyimpanan
  • Jalankan analitik pada data Anda di penyimpanan Blob

Jika Anda tidak memiliki langganan Azure, buatlah akun gratis sebelum Anda memulai.

Prerequisites

Membuat ruang kerja dan buku catatan Azure Databricks

  1. Membuat ruang kerja Azure Databricks. Lihat Membuat ruang kerja Azure Databricks.

  2. Buat buku catatan. Lihat Membuat buku catatan. Pilih Python sebagai bahasa default buku catatan.

Tetap buka buku catatan Anda. Anda menggunakannya di bagian berikut ini.

Unduh data penerbangan

Tutorial ini menggunakan data kinerja penerbangan tepat waktu untuk Januari 2016 dari Biro Statistik Transportasi untuk memperlihatkan cara melakukan operasi ETL. Anda harus mengunduh data ini untuk menyelesaikan tutorial.

  1. Unduh file On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2016_1.zip. File ini berisi data penerbangan.

  2. Buka zip isi file zip dan buat catatan nama file dan jalur file. Anda akan memerlukan informasi ini di langkah-langkah selanjutnya.

Jika Anda ingin mempelajari tentang informasi yang diambil dalam data performa pelaporan tepat waktu, Anda dapat melihat deskripsi bidang di situs web Statistik Biro Transportasi.

Menyerap data

Di bagian ini, Anda mengunggah data penerbangan .csv ke akun Azure Data Lake Storage Anda lalu memasang akun penyimpanan ke kluster Databricks Anda. Terakhir, Anda menggunakan Databricks untuk membaca data penerbangan .csv dan menulisnya kembali dalam format Apache Parquet ke penyimpanan.

Mengunggah data penerbangan ke akun penyimpanan Anda

Gunakan AzCopy untuk menyalin file .csv Anda ke akun Azure Data Lake Storage Anda. Anda menggunakan azcopy make perintah untuk membuat kontainer di akun penyimpanan Anda. Kemudian Anda menggunakan azcopy copy perintah untuk menyalin data csv yang baru saja Anda unduh ke direktori dalam kontainer tersebut.

Dalam langkah-langkah berikut, Anda perlu memasukkan nama untuk kontainer yang ingin Anda buat, dan direktori dan blob yang ingin Anda unggah data penerbangan ke dalam kontainer. Anda dapat menggunakan nama yang disarankan di setiap langkah atau menentukan nama anda sendiri dengan memperhatikan konvensi penamaan untuk kontainer, direktori, dan blob.

  1. Buka jendela prompt perintah, dan masukkan perintah berikut untuk masuk ke Azure Active Directory untuk mengakses akun penyimpanan Anda.

    azcopy login
    

    Ikuti instruksi yang muncul di jendela prompt perintah untuk mengautentikasi akun pengguna Anda.

  2. Untuk membuat kontainer di akun penyimpanan Anda untuk menyimpan data penerbangan, masukkan perintah berikut:

    azcopy make  "https://<storage-account-name>.dfs.core.windows.net/<container-name>" 
    
    • Ganti nilai placeholder <storage-account-name> dengan nama akun penyimpanan.

    • <container-name> Ganti tempat penampung dengan nama untuk kontainer yang ingin Anda buat untuk menyimpan data csv; misalnya, flight-data-container.

  3. Untuk mengunggah (menyalin) data csv ke akun penyimpanan Anda, masukkan perintah berikut.

    azcopy copy "<csv-folder-path>" https://<storage-account-name>.dfs.core.windows.net/<container-name>/<directory-name>/On_Time.csv
    
    • Ganti nilai <csv-folder-path> dengan jalur ke file .csv.

    • Ganti nilai placeholder <storage-account-name> dengan nama akun penyimpanan.

    • Ganti placeholder <container-name> dengan nama kontainer di akun penyimpanan Anda.

    • <directory-name> Ganti tempat penampung dengan nama direktori untuk menyimpan data Anda dalam kontainer; misalnya, jan2016.

Memasang akun penyimpanan Anda ke kluster Databricks Anda

Di bagian ini, Anda memasang penyimpanan objek cloud Azure Data Lake Storage ke Databricks File System (DBFS). Anda menggunakan prinsip layanan Azure AD yang Anda buat sebelumnya untuk autentikasi dengan akun penyimpanan. Untuk informasi selengkapnya, lihat Memasang penyimpanan objek cloud di Azure Databricks.

  1. Lampirkan buku catatan Anda ke kluster Anda.

    1. Di buku catatan yang Anda buat sebelumnya, pilih tombol Sambungkan di sudut kanan atas toolbar buku catatan. Tombol ini membuka pemilih komputasi. (Jika Anda telah menyambungkan buku catatan Anda ke kluster, nama kluster tersebut ditampilkan dalam teks tombol alih-alih Sambungkan).

    2. Di menu dropdown kluster, pilih kluster apa pun yang telah Anda buat sebelumnya.

    3. Perhatikan bahwa teks dalam pemilih kluster berubah menjadi dimulai. Tunggu hingga kluster selesai dimulai dan nama kluster muncul di tombol sebelum melanjutkan.

  2. Salin dan tempel blok kode berikut ke dalam sel pertama, tetapi jangan menjalankan kode ini dulu.

    configs = {"fs.azure.account.auth.type": "OAuth",
           "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
           "fs.azure.account.oauth2.client.id": "<appId>",
           "fs.azure.account.oauth2.client.secret": "<clientSecret>",
           "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<tenantId>/oauth2/token",
           "fs.azure.createRemoteFileSystemDuringInitialization": "true"}
    
    dbutils.fs.mount(
    source = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<directory-name>",
    mount_point = "/mnt/flightdata",
    extra_configs = configs)
    
  3. Dalam blok kode ini:

    • Di configs, ganti nilai tempat penampung <appId>, <clientSecret>, dan <tenantId> dengan ID aplikasi, rahasia klien, dan ID penyewa yang Anda salin saat Anda membuat perwakilan layanan dalam langkah prasyarat.

    • Dalam URI source, ganti nilai placeholder <storage-account-name>, <container-name>, dan <directory-name> dengan nama akun penyimpanan Azure Data Lake Storage Anda dan nama wadah serta direktori yang Anda tentukan saat mengunggah data penerbangan ke akun penyimpanan.

      Note

      Pengidentifikasi skema di URI, abfss, memberi tahu Databricks untuk menggunakan driver Azure Blob File System dengan Keamanan Lapisan Transportasi (TLS). Untuk mempelajari selengkapnya tentang URI, lihat Menggunakan URI Azure Data Lake Storage.

  4. Pastikan kluster Anda telah selesai memulai sebelum melanjutkan.

  5. Tekan kunci SHIFT + ENTER untuk menjalankan kode di blok ini.

Kontainer dan direktori tempat Anda mengunggah data penerbangan di akun penyimpanan Anda sekarang dapat diakses di notebook Anda melalui titik pemasangan, /mnt/flightdata.

Menggunakan Databricks Notebook untuk mengonversi CSV ke Parquet

Sekarang setelah data penerbangan csv dapat diakses melalui titik pemasangan DBFS, Anda dapat menggunakan Apache Spark DataFrame untuk memindahkannya ke ruang kerja Anda dan menulisnya kembali dalam format Apache Parquet ke penyimpanan objek Azure Data Lake Storage milik Anda.

  • Spark DataFrame adalah struktur data berlabel dua dimensi dengan kolom dari jenis yang berpotensi berbeda. Anda dapat menggunakan DataFrame untuk membaca dan menulis data dengan mudah dalam berbagai format yang didukung. Dengan DataFrame, Anda dapat memuat data dari penyimpanan objek cloud dan melakukan analisis dan transformasi di dalamnya di dalam kluster komputasi Anda tanpa memengaruhi data yang mendasar di penyimpanan objek cloud. Untuk mempelajari selengkapnya, lihat Bekerja dengan PySpark DataFrames di Azure Databricks.

  • Apache parquet adalah format file kolom dengan pengoptimalan yang mempercepat kueri. Ini adalah format file yang lebih efisien daripada CSV atau JSON. Untuk mempelajari lebih lanjut, lihat File Parquet.

Di buku catatan, tambahkan sel baru, dan tempelkan kode berikut ke dalamnya.

# Use the previously established DBFS mount point to read the data.
# Create a DataFrame to read the csv data.
# The header option specifies that the first row of data should be used as the DataFrame column names
# The inferschema option specifies that the column data types should be inferred from the data in the file
flight_df = spark.read.format('csv').options(
    header='true', inferschema='true').load("/mnt/flightdata/*.csv")

# Read the airline csv file and write the output to parquet format for easy query.
flight_df.write.mode("append").parquet("/mnt/flightdata/parquet/flights")
print("Done")

Tekan kunci SHIFT + ENTER untuk menjalankan kode di blok ini.

Sebelum melanjutkan ke bagian berikutnya, pastikan bahwa semua data parquet telah ditulis, dan "Selesai" muncul di output.

Jelajahi data

Di bagian ini, Anda menggunakan utilitas sistem file Databricks untuk menjelajahi penyimpanan objek Azure Data Lake Storage Menggunakan titik pemasangan DBFS yang Anda buat di bagian sebelumnya.

Di sel baru, tempelkan kode berikut untuk mendapatkan daftar file di titik pemasangan. Perintah pertama menghasilkan daftar file dan direktori. Perintah kedua menampilkan output dalam format tabular untuk pembacaan yang lebih mudah.

dbutils.fs.ls("/mnt/flightdata")
display(dbutils.fs.ls("/mnt/flightdata"))

Tekan kunci SHIFT + ENTER untuk menjalankan kode di blok ini.

Perhatikan bahwa direktori parquet muncul dalam daftar. Anda menyimpan data penerbangan .csv dalam format parquet ke direktori parquet/flights di bagian sebelumnya. Untuk mencantumkan file di direktori parket/penerbangan , tempelkan kode berikut ke dalam sel baru dan jalankan:

display(dbutils.fs.ls("/mnt/flightdata/parquet/flights"))

Untuk membuat file baru dan mencantumkannya, tempelkan kode berikut ke dalam sel baru dan jalankan:

dbutils.fs.put("/mnt/flightdata/mydirectory/mysubdirectory/1.txt", "Hello, World!", True)
display(dbutils.fs.ls("/mnt/flightdata/mydirectory/mysubdirectory"))

Karena Anda tidak memerlukan file 1.txt dalam tutorial ini, Anda dapat menempelkan kode berikut ke dalam sel dan menjalankannya untuk menghapus mydirectory secara rekursif. Parameter True menunjukkan penghapusan rekursif.

dbutils.fs.rm("/mnt/flightdata/mydirectory", True)

Sebagai kenyamanan, Anda dapat menggunakan perintah bantuan untuk mempelajari detail tentang perintah lain.

dbutils.fs.help("rm")

Dengan sampel kode ini, Anda telah menjelajahi sifat hierarki HDFS menggunakan data yang disimpan di akun penyimpanan dengan Azure Data Lake Storage diaktifkan.

Mengkueri data

Selanjutnya, Anda dapat mulai mengkueri data yang Anda unggah ke akun penyimpanan Anda. Masukkan setiap blok kode berikut ke dalam sel baru dan tekan SHIFT + ENTER untuk menjalankan skrip Python.

DataFrame menyediakan serangkaian fungsi yang kaya (pilih kolom, filter, gabung, agregat) yang memungkinkan Anda menyelesaikan masalah analisis data umum secara efisien.

Untuk memuat DataFrame dari data penerbangan parquet yang disimpan sebelumnya dan menjelajahi beberapa fungsionalitas yang didukung, masukkan skrip ini ke sel baru dan jalankan.

# Read the existing parquet file for the flights database that was created earlier
flight_df = spark.read.parquet("/mnt/flightdata/parquet/flights")

# Print the schema of the dataframe
flight_df.printSchema()

# Print the flight database size
print("Number of flights in the database: ", flight_df.count())

# Show the first 25 rows (20 is the default)
# To show the first n rows, run: df.show(n)
# The second parameter indicates that column lengths shouldn't be truncated (default is 20 characters)
flight_df.show(25, False)

# You can also use the DataFrame to run simple queries. Results are returned in a DataFrame.
# Show the first 25 rows of the results of a query that returns selected columns for all flights originating from airports in Texas
flight_df.select("FlightDate", "Reporting_Airline", "Flight_Number_Reporting_Airline", "OriginCityName", "DepTime", "DestCityName", "ArrTime", "ArrDelay").filter("OriginState = 'TX'").show(258, False)

# Use display to run visualizations
# Preferably run this in a separate cmd cell
display(flight_df)

Masukkan skrip ini di sel baru untuk menjalankan beberapa kueri analisis dasar terhadap data. Anda dapat memilih untuk menjalankan seluruh skrip (SHIFT + ENTER), menyoroti setiap kueri dan menjalankannya secara terpisah dengan CTRL + SHIFT + ENTER, atau memasukkan setiap kueri ke dalam sel terpisah dan menjalankannya di sana.

# create a temporary sql view for querying flight information
flight_data = spark.read.parquet('/mnt/flightdata/parquet/flights')
flight_data.createOrReplaceTempView('FlightTable')

# Print the total number of flights in Jan 2016 (the number of rows in the flight data).
print("Number of flights in Jan 2016: ", flight_data.count())

# Using spark sql, query the parquet file to return the total flights of each airline
num_flights_by_airline=spark.sql("SELECT Reporting_Airline, count(*) AS NumFlights FROM FlightTable GROUP BY Reporting_Airline ORDER BY NumFlights DESC")
num_flights_by_airline.show()

# List out all the airports in Texas
airports_in_texas = spark.sql(
    "SELECT DISTINCT(OriginCityName) FROM FlightTable WHERE OriginStateName = 'Texas'")
print('Airports in Texas: ', airports_in_texas.count())
airports_in_texas.show(100, False)

# Find all airlines that fly from Texas
airlines_flying_from_texas = spark.sql(
    "SELECT DISTINCT(Reporting_Airline) FROM FlightTable WHERE OriginStateName='Texas'")
print('Airlines that fly to/from Texas: ', airlines_flying_from_texas.count())
airlines_flying_from_texas.show(100, False)

# List airlines by average arrival delay (negative values indicate early flights)
avg_arrival_delay=spark.sql(
    "SELECT Reporting_Airline, count(*) AS NumFlights, avg(DepDelay) AS AverageDepDelay, avg(ArrDelay) AS AverageArrDelay FROM FlightTable GROUP BY Reporting_Airline ORDER BY AverageArrDelay DESC")
print("Airlines by average arrival delay")
avg_arrival_delay.show()

# List airlines by the highest percentage of delayed flights. A delayed flight is one with a  departure or arrival delay that is greater than 15 minutes
spark.sql("DROP VIEW IF EXISTS totalFlights")
spark.sql("DROP VIEW IF EXISTS delayedFlights")
spark.sql(
    "CREATE TEMPORARY VIEW totalFlights AS SELECT Reporting_Airline, count(*) AS NumFlights FROM FlightTable GROUP BY Reporting_Airline")
spark.sql(
    "CREATE TEMPORARY VIEW delayedFlights AS SELECT Reporting_Airline, count(*) AS NumDelayedFlights FROM FlightTable WHERE DepDelay>15 or ArrDelay>15 GROUP BY Reporting_Airline")
percent_delayed_flights=spark.sql(
    "SELECT totalFlights.Reporting_Airline, totalFlights.NumFlights, delayedFlights.NumDelayedFlights, delayedFlights.NumDelayedFlights/totalFlights.NumFlights*100 AS PercentFlightsDelayed FROM totalFlights INNER JOIN delayedFlights ON totalFlights.Reporting_Airline = delayedFlights.Reporting_Airline ORDER BY PercentFlightsDelayed DESC")
print("Airlines by percentage of flights delayed")
percent_delayed_flights.show()

Summary

Di tutorial ini, Anda akan:

  • Telah membuat sumber daya Azure, termasuk akun penyimpanan Azure Data Lake Storage dan prinsipal layanan Azure AD, dan menetapkan izin untuk mengakses akun penyimpanan.

  • Membuat ruang kerja dan buku catatan Azure Databricks.

  • Menggunakan AzCopy untuk mengunggah data penerbangan .csv yang tidak terstruktur ke akun penyimpanan Azure Data Lake Storage.

  • Menggunakan fungsi utilitas Sistem File Databricks untuk memasang akun penyimpanan Azure Data Lake Storage Anda dan menjelajahi sistem file hierarkisnya.

  • Menggunakan Apache Spark DataFrames untuk mengubah data penerbangan .csv Anda ke format parket Apache dan menyimpannya kembali ke akun penyimpanan Azure Data Lake Storage Anda.

  • DataFrame yang digunakan untuk menjelajahi data penerbangan dan melakukan kueri sederhana.

  • Menggunakan Apache Spark SQL untuk mengkueri data penerbangan untuk jumlah total penerbangan untuk setiap maskapai pada Januari 2016, bandara di Texas, maskapai yang terbang dari Texas, rata-rata penundaan kedatangan dalam hitungan menit untuk setiap maskapai secara nasional, dan persentase penerbangan setiap maskapai yang mengalami keterlambatan keberangkatan atau kedatangan.

Membersihkan sumber daya

Jika Anda ingin menyimpan notebook dan kembali mengaksesnya nanti, ada baiknya untuk mematikan (mengakhiri) kluster Anda untuk menghindari pengenaan biaya. Untuk mengakhiri kluster Anda, pilih di pemilih komputasi yang terletak di kanan atas toolbar buku catatan, pilih Hentikan dari menu, dan konfirmasi pilihan Anda. (Secara default, kluster akan secara otomatis berakhir setelah 120 menit tidak aktif.)

Jika Anda ingin menghapus sumber daya ruang kerja individual seperti buku catatan dan kluster, Anda bisa melakukannya dari bilah sisi kiri ruang kerja. Untuk instruksi mendetail, lihat Menghapus kluster atau Menghapus buku catatan.

Saat tidak perlu lagi, hapus grup sumber daya dan semua sumber daya terkait. Untuk melakukannya di portal Microsoft Azure, pilih grup sumber daya untuk akun penyimpanan dan ruang kerja dan pilih Hapus.

Langkah selanjutnya