Bagikan melalui


Tutorial: Azure Data Lake Storage Gen2, Azure Databricks & Spark

Tutorial ini menunjukkan kepada Anda cara menyambungkan kluster Azure Databricks Anda ke data yang disimpan di akun penyimpanan Azure yang mengaktifkan Azure Data Lake Storage Gen2. Koneksi ini memungkinkan Anda menjalankan kueri dan analitik secara native dari kluster Anda pada data Anda.

Dalam tutorial ini, Anda akan:

  • Menyerap data yang tidak terstruktur ke dalam akun penyimpanan
  • Menjalankan analitik pada data Anda di penyimpanan Blob

Jika Anda tidak memiliki langganan Azure, buat akun gratis sebelum Anda memulai.

Prasyarat

Membuat ruang kerja, kluster, dan notebook Azure Databricks

  1. Membuat ruang kerja Azure Databricks. Lihat Membuat ruang kerja Azure Databricks.

  2. Membuat cluster. Lihat Buat kluster.

  3. Buat buku catatan. Lihat Membuat buku catatan. Pilih Python sebagai bahasa default buku catatan.

Tetap buka buku catatan Anda. Anda menggunakannya di bagian berikut.

Unduh data penerbangan

Tutorial ini menggunakan data penerbangan performa tepat waktu untuk Januari 2016 dari Biro Statistik Transportasi untuk menunjukkan cara melakukan operasi ETL. Anda harus mengunduh data ini untuk menyelesaikan tutorial.

  1. Unduh file On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2016_1.zip file. File ini berisi data penerbangan.

  2. Buka zip isi file zip dan buat catatan nama file dan jalur file. Anda akan memerlukan informasi ini di langkah-langkah selanjutnya.

Jika Anda ingin mempelajari tentang informasi yang diambil dalam data performa pelaporan tepat waktu, Anda dapat melihat deskripsi bidang di situs web Statistik Biro Transportasi.

Menyerap data

Di bagian ini, Anda mengunggah data penerbangan .csv ke akun Azure Data Lake Storage Gen2 Anda lalu memasang akun penyimpanan ke kluster Databricks Anda. Terakhir, Anda menggunakan Databricks untuk membaca data penerbangan .csv dan menulisnya kembali ke penyimpanan dalam format parket Apache.

Mengunggah data penerbangan ke akun penyimpanan Anda

Gunakan AzCopy untuk menyalin file .csv Anda ke akun Azure Data Lake Storage Gen2 Anda. Anda menggunakan azcopy make perintah untuk membuat kontainer di akun penyimpanan Anda. Kemudian Anda menggunakan azcopy copy perintah untuk menyalin data csv yang baru saja Anda unduh ke direktori dalam kontainer tersebut.

Dalam langkah-langkah berikut, Anda perlu memasukkan nama untuk kontainer yang ingin Anda buat, dan direktori dan blob yang ingin Anda unggah data penerbangan ke dalam kontainer. Anda dapat menggunakan nama yang disarankan di setiap langkah atau menentukan konvensi penamaan anda sendiri untuk kontainer, direktori, dan blob.

  1. Buka jendela prompt perintah, dan masukkan perintah berikut untuk masuk ke Azure Active Directory untuk mengakses akun penyimpanan Anda.

    azcopy login
    

    Ikuti instruksi yang muncul di jendela prompt perintah untuk mengautentikasi akun pengguna Anda.

  2. Untuk membuat kontainer di akun penyimpanan Anda untuk menyimpan data penerbangan, masukkan perintah berikut:

    azcopy make  "https://<storage-account-name>.dfs.core.windows.net/<container-name>" 
    
    • Ganti nilai placeholder <storage-account-name> dengan nama akun penyimpanan.

    • <container-name> Ganti tempat penampung dengan nama untuk kontainer yang ingin Anda buat untuk menyimpan data csv; misalnya, flight-data-container.

  3. Untuk mengunggah (menyalin) data csv ke akun penyimpanan Anda, masukkan perintah berikut.

    azcopy copy "<csv-folder-path>" https://<storage-account-name>.dfs.core.windows.net/<container-name>/<directory-name>/On_Time.csv
    
    • Ganti <csv-folder-path> nilai placeholder dengan jalur ke file .csv.

    • Ganti nilai placeholder <storage-account-name> dengan nama akun penyimpanan.

    • <container-name> Ganti tempat penampung dengan nama kontainer di akun penyimpanan Anda.

    • <directory-name> Ganti tempat penampung dengan nama direktori untuk menyimpan data Anda dalam kontainer; misalnya, jan2016.

Memasang akun penyimpanan Anda ke kluster Databricks Anda

Di bagian ini, Anda memasang penyimpanan objek cloud Azure Data Lake Storage Gen2 ke Databricks File System (DBFS). Anda menggunakan prinsip layanan Azure AD yang Anda buat sebelumnya untuk autentikasi dengan akun penyimpanan. Untuk informasi selengkapnya, lihat Memasang penyimpanan objek cloud di Azure Databricks.

  1. Lampirkan buku catatan Anda ke kluster Anda.

    1. Di buku catatan yang Anda buat sebelumnya, pilih tombol Koneksi di sudut kanan atas toolbar buku catatan. Tombol ini membuka pemilih komputasi. (Jika Anda telah menyambungkan buku catatan Anda ke kluster, nama kluster tersebut diperlihatkan dalam teks tombol, bukan Koneksi).

    2. Di menu dropdown kluster, pilih kluster yang sebelumnya Anda buat.

    3. Perhatikan bahwa teks dalam pemilih kluster berubah menjadi dimulai. Tunggu hingga kluster selesai dimulai dan nama kluster muncul di tombol sebelum melanjutkan.

  2. Salin dan tempel blok kode berikut ke dalam sel pertama, tetapi jangan menjalankan kode ini dulu.

    configs = {"fs.azure.account.auth.type": "OAuth",
           "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
           "fs.azure.account.oauth2.client.id": "<appId>",
           "fs.azure.account.oauth2.client.secret": "<clientSecret>",
           "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<tenantId>/oauth2/token",
           "fs.azure.createRemoteFileSystemDuringInitialization": "true"}
    
    dbutils.fs.mount(
    source = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<directory-name>",
    mount_point = "/mnt/flightdata",
    extra_configs = configs)
    
  3. Dalam blok kode ini:

    • Di configs, ganti <appId>nilai tempat penampung , <clientSecret>, dan <tenantId> dengan ID aplikasi, rahasia klien, dan ID penyewa yang Anda salin saat Membuat perwakilan layanan dalam prasyarat.

    • source Di URI, ganti <storage-account-name>nilai tempat penampung , <container-name>, dan <directory-name> dengan nama akun penyimpanan Azure Data Lake Storage Gen2 Anda dan nama kontainer dan direktori yang Anda tentukan saat mengunggah data penerbangan ke akun penyimpanan.

      Catatan

      Pengidentifikasi skema di URI, abfss, memberi tahu Databricks untuk menggunakan driver Azure Blob File System dengan Keamanan Lapisan Transportasi (TLS). Untuk mempelajari selengkapnya tentang URI, lihat Menggunakan URI Azure Data Lake Storage Gen2.

  4. Pastikan kluster Anda telah selesai memulai sebelum melanjutkan.

  5. Tekan kunci SHIFT + ENTER untuk menjalankan kode di blok ini.

Kontainer dan direktori tempat Anda mengunggah data penerbangan di akun penyimpanan Anda sekarang dapat diakses di notebook Anda melalui titik pemasangan, /mnt/flightdata.

Menggunakan Databricks Notebook untuk mengonversi CSV ke Parquet

Sekarang setelah data penerbangan csv dapat diakses melalui titik pemasangan DBFS, Anda dapat menggunakan Apache Spark DataFrame untuk memuatnya ke ruang kerja Anda dan menulisnya kembali dalam format parquet Apache ke penyimpanan objek Azure Data Lake Storage Gen2 Anda.

  • Spark DataFrame adalah struktur data berlabel dua dimensi dengan kolom dari jenis yang berpotensi berbeda. Anda dapat menggunakan DataFrame untuk membaca dan menulis data dengan mudah dalam berbagai format yang didukung. Dengan DataFrame, Anda dapat memuat data dari penyimpanan objek cloud dan melakukan analisis dan transformasi di dalamnya di dalam kluster komputasi Anda tanpa memengaruhi data yang mendasar di penyimpanan objek cloud. Untuk mempelajari selengkapnya, lihat Bekerja dengan PySpark DataFrames di Azure Databricks.

  • Apache parquet adalah format file kolom dengan pengoptimalan yang mempercepat kueri. Ini adalah format file yang lebih efisien daripada CSV atau JSON. Untuk mempelajari lebih lanjut, lihat File Parquet.

Di buku catatan, tambahkan sel baru, dan tempelkan kode berikut ke dalamnya.

# Use the previously established DBFS mount point to read the data.
# Create a DataFrame to read the csv data.
# The header option specifies that the first row of data should be used as the DataFrame column names
# The inferschema option specifies that the column data types should be inferred from the data in the file
flight_df = spark.read.format('csv').options(
    header='true', inferschema='true').load("/mnt/flightdata/*.csv")

# Read the airline csv file and write the output to parquet format for easy query.
flight_df.write.mode("append").parquet("/mnt/flightdata/parquet/flights")
print("Done")

Tekan kunci SHIFT + ENTER untuk menjalankan kode di blok ini.

Sebelum melanjutkan ke bagian berikutnya, pastikan bahwa semua data parquet telah ditulis, dan "Selesai" muncul di output.

Jelajahi data

Di bagian ini, Anda menggunakan utilitas sistem file Databricks untuk menjelajahi penyimpanan objek Azure Data Lake Storage Gen2 Anda menggunakan titik pemasangan DBFS yang Anda buat di bagian sebelumnya.

Di sel baru, tempelkan kode berikut untuk mendapatkan daftar file di titik pemasangan. Perintah pertama menghasilkan daftar file dan direktori. Perintah kedua menampilkan output dalam format tabular untuk pembacaan yang lebih mudah.

dbutils.fs.ls("/mnt/flightdata")
display(dbutils.fs.ls("/mnt/flightdata"))

Tekan kunci SHIFT + ENTER untuk menjalankan kode di blok ini.

Perhatikan bahwa direktori parquet muncul dalam daftar. Anda menyimpan data penerbangan .csv dalam format parquet ke direktori parquet/flights di bagian sebelumnya. Untuk mencantumkan file di direktori parket/penerbangan , tempelkan kode berikut ke dalam sel baru dan jalankan:

display(dbutils.fs.ls("/mnt/flightdata/parquet/flights"))

Untuk membuat file baru dan mencantumkannya, tempelkan kode berikut ke dalam sel baru dan jalankan:

dbutils.fs.put("/mnt/flightdata/mydirectory/mysubdirectory/1.txt", "Hello, World!", True)
display(dbutils.fs.ls("/mnt/flightdata/mydirectory/mysubdirectory"))

Karena Anda tidak memerlukan file 1.txt dalam tutorial ini, Anda dapat menempelkan kode berikut ke dalam sel dan menjalankannya untuk menghapus mydirectory secara rekursif. Parameter True menunjukkan penghapusan rekursif.

dbutils.fs.rm("/mnt/flightdata/mydirectory", True)

Sebagai kenyamanan, Anda dapat menggunakan perintah bantuan untuk mempelajari detail tentang perintah lain.

dbutils.fs.help("rm")

Dengan sampel kode ini, Anda telah menjelajahi sifat hierarki HDFS menggunakan data yang disimpan di akun penyimpanan dengan Azure Data Lake Storage Gen2 diaktifkan.

Mengueri data

Selanjutnya, Anda dapat mulai mengkueri data yang Anda unggah ke akun penyimpanan Anda. Masukkan setiap blok kode berikut ke dalam sel baru dan tekan SHIFT + ENTER untuk menjalankan skrip Python.

DataFrame menyediakan serangkaian fungsi yang kaya (pilih kolom, filter, gabung, agregat) yang memungkinkan Anda menyelesaikan masalah analisis data umum secara efisien.

Untuk memuat DataFrame dari data penerbangan parquet yang disimpan sebelumnya dan menjelajahi beberapa fungsionalitas yang didukung, masukkan skrip ini ke sel baru dan jalankan.

# Read the existing parquet file for the flights database that was created earlier
flight_df = spark.read.parquet("/mnt/flightdata/parquet/flights")

# Print the schema of the dataframe
flight_df.printSchema()

# Print the flight database size
print("Number of flights in the database: ", flight_df.count())

# Show the first 25 rows (20 is the default)
# To show the first n rows, run: df.show(n)
# The second parameter indicates that column lengths shouldn't be truncated (default is 20 characters)
flight_df.show(25, False)

# You can also use the DataFrame to run simple queries. Results are returned in a DataFrame.
# Show the first 25 rows of the results of a query that returns selected colums for all flights originating from airports in Texas
flight_df.select("FlightDate", "Reporting_Airline", "Flight_Number_Reporting_Airline", "OriginCityName", "DepTime", "DestCityName", "ArrTime", "ArrDelay").filter("OriginState = 'TX'").show(258, False)

# Use display to run visualizations
# Preferably run this in a separate cmd cell
display(flight_df)

Masukkan skrip ini di sel baru untuk menjalankan beberapa kueri analisis dasar terhadap data. Anda dapat memilih untuk menjalankan seluruh skrip (SHIFT + ENTER), menyoroti setiap kueri dan menjalankannya secara terpisah dengan CTRL + SHIFT + ENTER, atau memasukkan setiap kueri ke dalam sel terpisah dan menjalankannya di sana.

# create a temporary sql view for querying flight information
flight_data = spark.read.parquet('/mnt/flightdata/parquet/flights')
flight_data.createOrReplaceTempView('FlightTable')

# Print the total number of flights in Jan 2016 (the number of rows in the flight data).
print("Number of flights in Jan 2016: ", flight_data.count())

# Using spark sql, query the parquet file to return the total flights of each airline
num_flights_by_airline=spark.sql("SELECT Reporting_Airline, count(*) AS NumFlights FROM FlightTable GROUP BY Reporting_Airline ORDER BY NumFlights DESC")
num_flights_by_airline.show()

# List out all the airports in Texas
airports_in_texas = spark.sql(
    "SELECT DISTINCT(OriginCityName) FROM FlightTable WHERE OriginStateName = 'Texas'")
print('Airports in Texas: ', airports_in_texas.count())
airports_in_texas.show(100, False)

# Find all airlines that fly from Texas
airlines_flying_from_texas = spark.sql(
    "SELECT DISTINCT(Reporting_Airline) FROM FlightTable WHERE OriginStateName='Texas'")
print('Airlines that fly to/from Texas: ', airlines_flying_from_texas.count())
airlines_flying_from_texas.show(100, False)

# List airlines by average arrival delay (negative values indicate early flights)
avg_arrival_delay=spark.sql(
    "SELECT Reporting_Airline, count(*) AS NumFlights, avg(DepDelay) AS AverageDepDelay, avg(ArrDelay) AS AverageArrDelay FROM FlightTable GROUP BY Reporting_Airline ORDER BY AverageArrDelay DESC")
print("Airlines by average arrival delay")
avg_arrival_delay.show()

# List airlines by the highest percentage of delayed flights. A delayed flight is one with a  departure or arrival delay that is greater than 15 minutes
spark.sql("DROP VIEW IF EXISTS totalFlights")
spark.sql("DROP VIEW IF EXISTS delayedFlights")
spark.sql(
    "CREATE TEMPORARY VIEW totalFlights AS SELECT Reporting_Airline, count(*) AS NumFlights FROM FlightTable GROUP BY Reporting_Airline")
spark.sql(
    "CREATE TEMPORARY VIEW delayedFlights AS SELECT Reporting_Airline, count(*) AS NumDelayedFlights FROM FlightTable WHERE DepDelay>15 or ArrDelay>15 GROUP BY Reporting_Airline")
percent_delayed_flights=spark.sql(
    "SELECT totalFlights.Reporting_Airline, totalFlights.NumFlights, delayedFlights.NumDelayedFlights, delayedFlights.NumDelayedFlights/totalFlights.NumFlights*100 AS PercentFlightsDelayed FROM totalFlights INNER JOIN delayedFlights ON totalFlights.Reporting_Airline = delayedFlights.Reporting_Airline ORDER BY PercentFlightsDelayed DESC")
print("Airlines by percentage of flights delayed")
percent_delayed_flights.show()

Ringkasan

Di tutorial ini, Anda akan:

  • Sumber daya Azure yang dibuat, termasuk akun penyimpanan Azure Data Lake Storage Gen2 dan perwakilan layanan Azure AD, dan izin yang ditetapkan untuk mengakses akun penyimpanan.

  • Membuat ruang kerja, notebook, dan kluster komputasi Azure Databricks.

  • Menggunakan AzCopy untuk mengunggah data penerbangan .csv yang tidak terstruktur ke akun penyimpanan Azure Data Lake Storage Gen2.

  • Menggunakan fungsi utilitas Sistem File Databricks untuk memasang akun penyimpanan Azure Data Lake Storage Gen2 Anda dan menjelajahi sistem file hierarkisnya.

  • Menggunakan Apache Spark DataFrames untuk mengubah data penerbangan .csv Anda ke format parket Apache dan menyimpannya kembali ke akun penyimpanan Azure Data Lake Storage Gen2 Anda.

  • DataFrame yang digunakan untuk menjelajahi data penerbangan dan melakukan kueri sederhana.

  • Menggunakan Apache Spark SQL untuk mengkueri data penerbangan untuk jumlah total penerbangan untuk setiap maskapai pada Januari 2016, bandara di Texas, maskapai yang terbang dari Texas, rata-rata penundaan kedatangan dalam hitungan menit untuk setiap maskapai secara nasional, dan persentase penerbangan setiap maskapai yang mengalami keterlambatan keberangkatan atau kedatangan.

Membersihkan sumber daya

Jika Anda ingin mempertahankan notebook dan kembali ke sana nanti, ada baiknya untuk mematikan (mengakhiri) kluster Anda untuk menghindari biaya. Untuk mengakhiri kluster Anda, pilih di pemilih komputasi yang terletak di kanan atas toolbar buku catatan, pilih Hentikan dari menu, dan konfirmasi pilihan Anda. (Secara default, kluster akan secara otomatis berakhir setelah 120 menit tidak aktif.)

Jika Anda ingin menghapus sumber daya ruang kerja individual seperti buku catatan dan kluster, Anda bisa melakukannya dari bilah sisi kiri ruang kerja. Untuk instruksi mendetail, lihat Menghapus kluster atau Menghapus buku catatan.

Saat tidak perlu lagi, hapus grup sumber daya dan semua sumber daya terkait. Untuk melakukannya di portal Azure, pilih grup sumber daya untuk akun penyimpanan dan ruang kerja dan pilih Hapus.

Langkah berikutnya