Manipulasi data interaktif dengan Apache Spark

Manipulasi data adalah aspek penting dari proyek pembelajaran mesin. Dalam artikel ini, Anda mempelajari cara melakukan pengolahan data interaktif dengan menjalankan notebook Azure Machine Learning pada pemrosesan Apache Spark tanpa server yang dijalankan oleh Azure Synapse.

Artikel ini menjelaskan cara melampirkan dan mengonfigurasi komputasi Spark tanpa server. Artikel ini kemudian menunjukkan cara menggunakan Spark tanpa server untuk mengakses dan mengolah data dari beberapa sumber.

Prasyarat

Untuk informasi selengkapnya, lihat:

Menggunakan komputasi Spark tanpa server dalam sesi buku catatan

Menggunakan komputasi Spark tanpa server adalah cara termampu untuk mengakses kluster Spark untuk manipulasi data interaktif. Komputasi Spark tanpa server yang dikelola sepenuhnya yang dilampirkan ke kumpulan Synapse Spark tersedia langsung di notebook Azure Machine Learning.

Untuk menggunakan salah satu sumber dan metode akses data dan manipulasi berikut, lampirkan komputasi tanpa server Spark dengan memilih Azure Machine Learning Serverless SparkServerless Spark >Compute - Tersedia di samping Komputasi di bagian atas halaman file atau buku catatan. Dibutuhkan satu atau dua menit agar komputasi melekat pada sesi.

Mengonfigurasi sesi Spark tanpa server

Setelah melampirkan komputasi Spark tanpa server, Anda dapat mengonfigurasi sesi Spark dengan mengatur atau mengubah beberapa nilai. Untuk mengonfigurasi sesi Spark:

  1. Pilih Konfigurasikan sesi di kiri atas pada halaman file atau buku catatan.
  2. Pada layar Konfigurasi sesi , ubah salah satu pengaturan berikut:
    • Di panel Komputasi :

      • Ubah ukuran komputer dengan memilih ukuran yang berbeda dari menu dropdown di bawah Ukuran simpul.
      • Pilih apakah akan mengalokasikan pelaksana secara dinamis atau tidak.
      • Pilih jumlah Pelaksana untuk sesi Spark.
      • Pilih ukuran Pelaksana yang berbeda jika tersedia dari menu dropdown.
    • Di panel Pengaturan :

      • Ubah versi Apache Spark ke versi yang berbeda dari 3.5 jika tersedia.

        Penting

        Azure Synapse Runtime untuk Apache Spark 3.4 mencapai akhir dukungan pada 31 Maret 2026. Migrasikan ke Apache Spark 3.5 untuk dukungan berkelanjutan. Untuk informasi selengkapnya, lihat Azure Synapse Runtime.

      • Ubah nilai Batas waktu sesi dalam menit ke angka yang lebih tinggi untuk membantu mencegah batas waktu sesi.

      • Di bawah Pengaturan konfigurasi, tambahkan Pengaturan nama/nilai properti untuk mengonfigurasi sesi sesuai kebutuhan.

        Petunjuk / Saran

        Jika Anda menggunakan paket Conda tingkat sesi, menambahkan spark.hadoop.aml.enable_cache properti konfigurasi dengan nilai true dapat meningkatkan waktu mulai dingin sesi Spark. Sesi cold start dengan paket Conda tingkat sesi biasanya membutuhkan waktu 10 hingga 15 menit untuk pertama kalinya. Sesi berikutnya dimulai dengan variabel konfigurasi yang diatur ke true biasanya memakan waktu tiga hingga lima menit.

    • pada panel paket Python :

      • Untuk menggunakan file Conda untuk mengonfigurasi sesi Anda, pilih Unggah file conda. Di samping Pilih file conda, pilih Telusuri, lalu telusuri dan buka file Conda YAML yang sesuai di komputer Anda untuk mengunggahnya.
      • Untuk menggunakan lingkungan kustom, pilih Lingkungan kustom dan pilih lingkungan kustom di bawah Jenis lingkungan. Untuk informasi selengkapnya, lihat Mengelola lingkungan perangkat lunak.
  3. Pilih Terapkan untuk menerapkan semua konfigurasi.

Perubahan konfigurasi sesi tetap ada dan tersedia untuk sesi notebook lain yang menggunakan komputasi Spark tanpa server terlampir.

Mengimpor dan mengolah data dari Azure Data Lake Storage

Untuk mengakses dan mengolah data yang disimpan di akun Azure Data Lake Storage, gunakan URI protokol dengan pengalihan identitas pengguna atau akses berbasis prinsipal layanan . Passthrough identitas pengguna tidak memerlukan konfigurasi tambahan.

Untuk menggunakan salah satu metode, identitas pengguna atau prinsipal layanan harus memiliki penetapan peran Kontributor dan Kontributor Data Blob Penyimpanan di akun Azure Data Lake Storage.

Untuk passthrough identitas pengguna, jalankan sampel kode manipulasi data berikut untuk menggunakan URI data dalam format abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/<PATH_TO_DATA> dengan pyspark.pandas. Ganti placeholder <STORAGE_ACCOUNT_NAME> dengan nama akun Azure Data Lake Storage Anda dan <FILE_SYSTEM_NAME> dengan nama kontainer data.

import pyspark.pandas as pd

df = pd.read_csv(
    "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/data/titanic.csv",
    index_col="PassengerId",
)
df.fillna(
    value={"Cabin": "None"}, inplace=True
)  # Fill Cabin column with value "None" if missing
df.dropna(inplace=True)  # Drop the rows which still have any missing value
df.to_csv(
    "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/data/wrangled",
    index_col="PassengerId",
)

Menggunakan perwakilan layanan

Untuk menggunakan perwakilan layanan untuk mengakses dan mengolah data dari Azure Data Lake Storage, pertama-tama siapkan perwakilan layanan sebagai berikut:

  1. Buat perwakilan layanan dan tetapkan peran Kontributor Data Blob Penyimpanan dan Pengguna Rahasia Key Vault yang diperlukan.

  2. Dapatkan ID penyewa prinsipal layanan, ID klien, dan nilai rahasia klien dari pendaftaran aplikasi serta buat rahasia di Azure Key Vault untuk nilai-nilai tersebut.

  3. Atur ID penyewa perwakilan layanan, ID klien, dan rahasia klien dengan menambahkan pasangan nama/nilai properti berikut dalam konfigurasi sesi. Ganti <STORAGE_ACCOUNT_NAME> dengan nama akun penyimpanan Anda dan <TENANT_ID> dengan ID penyewa perwakilan layanan.

    Nama properti Nilai
    fs.azure.account.oauth2.client.id.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net Nilai ID aplikasi (klien)
    fs.azure.account.oauth2.client.endpoint.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net https://login.microsoftonline.com/<TENANT_ID>/oauth2/token
    fs.azure.account.oauth2.client.secret.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net Nilai rahasia klien
  4. Jalankan kode berikut. Panggilan get_secret() dalam kode bergantung pada nama Key Vault dan nama-nama rahasia Key Vault yang dibuat untuk ID penyewa perwakilan layanan, ID klien, dan rahasia klien.

    from pyspark.sql import SparkSession
    
    sc = SparkSession.builder.getOrCreate()
    token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
    
    # Set up service principal tenant ID, client ID, and secret from Azure Key Vault
    client_id = token_library.getSecret("<KEY_VAULT_NAME>", "<CLIENT_ID_SECRET_NAME>")
    tenant_id = token_library.getSecret("<KEY_VAULT_NAME>", "<TENANT_ID_SECRET_NAME>")
    client_secret = token_library.getSecret("<KEY_VAULT_NAME>", "<CLIENT_SECRET_NAME>")
    
    # Set up a service principal that has access to the data
    sc._jsc.hadoopConfiguration().set(
        "fs.azure.account.auth.type.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net", "OAuth"
    )
    sc._jsc.hadoopConfiguration().set(
        "fs.azure.account.oauth.provider.type.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
        "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
    )
    sc._jsc.hadoopConfiguration().set(
        "fs.azure.account.oauth2.client.id.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
        client_id,
    )
    sc._jsc.hadoopConfiguration().set(
        "fs.azure.account.oauth2.client.secret.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
        client_secret,
    )
    sc._jsc.hadoopConfiguration().set(
        "fs.azure.account.oauth2.client.endpoint.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
        "https://login.microsoftonline.com/" + tenant_id + "/oauth2/token",
    )
    
  5. Impor dan manipulasi data titanic.csv menggunakan URI data dalam abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/<PATH_TO_DATA> format, seperti yang ditunjukkan dalam sampel kode. Ganti placeholder <STORAGE_ACCOUNT_NAME> dengan nama akun Azure Data Lake Storage Anda dan <FILE_SYSTEM_NAME> dengan nama kontainer data.

    import pyspark.pandas as pd
    
    df = pd.read_csv(
        "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/data/titanic.csv",
        index_col="PassengerId",
    )
    df.fillna(
        value={"Cabin": "None"}, inplace=True
    )  # Fill Cabin column with value "None" if missing
    df.dropna(inplace=True)  # Drop the rows which still have any missing value
    df.to_csv(
        "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/data/wrangled",
        index_col="PassengerId",
    )
    

Mengimpor dan mengolah data dari penyimpanan Azure Blob

Anda dapat mengakses data penyimpanan Azure Blob dengan kunci akses akun penyimpanan atau token tanda tangan akses bersama (SAS). Simpan kredensial di Azure Key Vault sebagai rahasia, dan atur sebagai properti dalam konfigurasi sesi Spark.

  1. Jalankan salah satu cuplikan kode berikut. Panggilan get_secret() dalam cuplikan kode memerlukan nama brankas kunci dan nama rahasia yang dibuat untuk kunci akses akun penyimpanan Azure Blob atau token SAS.

    • Untuk mengonfigurasi kunci akses akun penyimpanan, atur properti seperti yang fs.azure.account.key.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net ditunjukkan dalam cuplikan kode berikut:

      from pyspark.sql import SparkSession
      
      sc = SparkSession.builder.getOrCreate()
      token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
      access_key = token_library.getSecret("<KEY_VAULT_NAME>", "<ACCESS_KEY_SECRET_NAME>")
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.key.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net", access_key
      )
      
    • Untuk mengonfigurasi token SAS, atur properti seperti yang fs.azure.sas.<BLOB_CONTAINER_NAME>.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net ditunjukkan dalam cuplikan kode berikut:

      from pyspark.sql import SparkSession
      
      sc = SparkSession.builder.getOrCreate()
      token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
      sas_token = token_library.getSecret("<KEY_VAULT_NAME>", "<SAS_TOKEN_SECRET_NAME>")
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.sas.<BLOB_CONTAINER_NAME>.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net",
          sas_token,
      )
      
  2. Jalankan kode manipulasi data berikut dengan URI data yang diformat sebagai wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/<PATH_TO_DATA>.

    import pyspark.pandas as pd
    
    df = pd.read_csv(
        "wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/data/titanic.csv",
        index_col="PassengerId",
    )
    df.fillna(
        value={"Cabin": "None"}, inplace=True
    )  # Fill Cabin column with value "None" if missing
    df.dropna(inplace=True)  # Drop the rows which still have any missing value
    df.to_csv(
        "wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/data/wrangled",
        index_col="PassengerId",
    )
    

Mengimpor dan mengolah data dari Azure Machine Learning datastore

Untuk mengakses data dari datastore Azure Machine Learning, Anda menentukan jalur ke data di datastore dengan formatazureml://datastores/<DATASTORE_NAME>/paths/<PATH_TO_DATA> URI.

Jalankan sampel kode berikut untuk membaca dan mengolah data titanic.csv dari datastore Azure Machine Learning menggunakan azureml:// URI datastore dan pyspark.pandas.

import pyspark.pandas as pd

df = pd.read_csv(
    "azureml://datastores/<DATASTORE_NAME>/paths/data/titanic.csv",
    index_col="PassengerId",
)
df.fillna(
    value={"Cabin": "None"}, inplace=True
)  # Fill Cabin column with value "None" if missing
df.dropna(inplace=True)  # Drop the rows which still have any missing value
df.to_csv(
    "azureml://datastores/<DATASTORE_NAME>/paths/data/wrangled",
    index_col="PassengerId",
)

Datastore Azure Machine Learning dapat mengakses data menggunakan kunci akses akun penyimpanan Azure, token SAS, kredensial perwakilan layanan, atau akses data tanpa kredensial. Pilih mekanisme autentikasi yang sesuai tergantung pada jenis penyimpanan data dan jenis akun penyimpanan Azure yang mendasar.

Tabel berikut ini meringkas mekanisme autentikasi untuk mengakses data di penyimpanan data Azure Machine Learning:

Jenis akun penyimpanan Akses data tanpa kredensial Mekanisme akses data Penetapan peran
Azure Blob Tidak. Kunci akses atau token SAS Tidak diperlukan penetapan peran.
Azure Blob Ya Passthrough identitas pengguna* Identitas pengguna harus memiliki penetapan peran yang sesuai di akun penyimpanan Azure Blob.
Azure Data Lake Storage Tidak. Perwakilan layanan Perwakilan layanan harus memiliki penetapan peran yang sesuai di akun penyimpanan Azure Data Lake Storage.
Azure Data Lake Storage Ya Passthrough identitas pengguna Identitas pengguna harus memiliki penetapan peran yang sesuai di akun penyimpanan Azure Data Lake Storage.

* Passthrough identitas pengguna berfungsi untuk penyimpanan data tanpa kredensial yang menunjuk ke akun penyimpanan Azure Blob hanya jika penghapusan sementara tidak diaktifkan.

Mengakses data pada file share default

Di studio Azure Machine Learning, berbagi file ruang kerja default Anda adalah pohon direktori di bawah tab File di Notebooks. Kode notebook dapat langsung mengakses file yang disimpan dalam berbagi file ini dengan file:// protokol, menggunakan jalur absolut file tanpa konfigurasi lain. Berbagi file default dipasang ke komputasi Spark tanpa server dan kumpulan Synapse Spark yang terpasang.

Cuplikan layar memperlihatkan penggunaan berbagi file.

Cuplikan kode berikut mengakses dan mengolah data dari file titanic.csv yang disimpan dalam folder data dalam berbagi file default di bawah nama pengguna. Ganti <USER> dengan nama pengguna Anda.

import os
import pyspark.pandas as pd

abspath = os.path.abspath(".")
file = "file://" + abspath + "/Users/<USER>/data/titanic.csv"
print(file)
df = pd.read_csv(file, index_col="PassengerId")
df.fillna(value={"Cabin" : "None"}, inplace=True) # Fill Cabin column with value "None" if missing
df.dropna(inplace=True) # Drop the rows which still have any missing value
output_path = "file://" + abspath + "/Users/<USER>/data/wrangled"
df.to_csv(output_path, index_col="PassengerId")