Manipulasi data dengan kumpulan Apache Spark (tidak digunakan lagi)

BERLAKU UNTUK:Python SDK azureml v1

Peringatan

Integrasi Azure Synapse Analytics dengan Azure Machine Learning yang tersedia di Python SDK v1 tidak digunakan lagi. Pengguna dapat terus menggunakan ruang kerja Synapse yang terdaftar di Azure Machine Learning sebagai layanan tertaut. Namun, ruang kerja Synapse baru tidak dapat lagi terdaftar di Azure Machine Learning sebagai layanan tertaut. Sebaiknya gunakan komputasi Synapse Terkelola (Otomatis) dan kumpulan Synapse Spark terpasang yang tersedia di CLI v2 dan Python SDK v2. Silakan lihat https://aka.ms/aml-spark untuk detail lebih lanjut.

Dalam artikel ini, Anda mempelajari cara melakukan tugas perselisihan data secara interaktif dalam sesi Synapse khusus, yang didukung oleh Azure Synapse Analytics, di buku catatan Jupyter menggunakan SDK Python Azure Machine Learning.

Jika Anda lebih suka menggunakan alur Azure Machine Learning, lihat Cara menggunakan Apache Spark (didukung oleh Azure Synapse Analytics) di alur pembelajaran mesin (pratinjau) Anda.

Untuk panduan tentang cara menggunakan Azure Synapse Analytics dengan ruang kerja Synapse, lihat rangkaian memulai Azure Synapse Analytics.

Azure Machine Learning dan integrasi Azure Synapse Analytics

Integrasi Azure Synapse Analytics dengan Azure Machine Learning (pratinjau) memungkinkan Anda melampirkan kumpulan Apache Spark yang didukung oleh Azure Synapse untuk eksplorasi dan persiapan data interaktif. Dengan integrasi ini, Anda dapat memiliki komputasi khusus untuk perselisihan data dalam skala besar, semuanya dalam buku catatan Python yang sama yang Anda gunakan untuk melatih model pembelajaran mesin Anda.

Prasyarat

Meluncurkan kumpulan Synapse Spark untuk tugas perselisihan data

Untuk memulai penyiapan data dengan kumpulan Apache Spark, tentukan nama komputasi Spark Synapse terlampir. Nama ini dapat ditemukan melalui studio Azure Machine Learning di bawah tab Komputasi terlampir.

dapatkan nama komputasi terlampir

Penting

Untuk terus menggunakan kumpulan Apache Spark, Anda harus menunjukkan sumber daya komputasi mana yang akan digunakan di seluruh tugas perselisihan data dengan %synapse untuk satu baris kode dan %%synapse untuk beberapa baris. Pelajari selengkapnya tentang perintah ajaib %synapse.

%synapse start -c SynapseSparkPoolAlias

Setelah sesi dimulai, Anda dapat memeriksa metadata sesi.

%synapse meta

Anda dapat menentukan lingkungan Azure Machine Learning yang akan digunakan selama sesi Apache Spark Anda. Hanya dependensi Conda yang ditentukan dalam lingkungan yang akan berlaku. Gambar Docker tidak didukung.

Peringatan

Dependensi Python yang ditentukan dalam dependensi Conda lingkungan tidak didukung di kumpulan Apache Spark. Saat ini, hanya versi Python tetap yang didukung. Periksa versi Python Anda dengan menyertakan sys.version_info dalam skrip Anda.

Kode berikut, membuat lingkungan, myenv, yang menginstal azureml-core versi 1.20.0 dan numpy versi 1.17.0 sebelum sesi dimulai. Anda kemudian dapat menyertakan lingkungan ini dalam pernyataan start sesi Apache Spark Anda.


from azureml.core import Workspace, Environment

# creates environment with numpy and azureml-core dependencies
ws = Workspace.from_config()
env = Environment(name="myenv")
env.python.conda_dependencies.add_pip_package("azureml-core==1.20.0")
env.python.conda_dependencies.add_conda_package("numpy==1.17.0")
env.register(workspace=ws)

Untuk memulai penyiapan data dengan kumpulan Apache Spark dan lingkungan kustom Anda, tentukan nama kumpulan Apache Spark dan lingkungan mana yang akan digunakan selama sesi Apache Spark. Selain itu, Anda dapat memberikan ID langganan Anda, grup sumber daya ruang kerja pembelajaran mesin, dan nama ruang kerja pembelajaran mesin.

Penting

Pastikan untuk Mengizinkan paket tingkat sesi diaktifkan di ruang kerja Synapse tertaut.

aktifkan paket tingkat sesi

%synapse start -c SynapseSparkPoolAlias -e myenv -s AzureMLworkspaceSubscriptionID -r AzureMLworkspaceResourceGroupName -w AzureMLworkspaceName

Memuat data dari penyimpanan

Setelah sesi Apache Spark Anda dimulai, baca di data yang ingin Anda siapkan. Pemuatan data didukung untuk penyimpanan Azure Blob dan Azure Data Lake Storage Generasi 1 dan 2.

Ada dua cara untuk memuat data dari layanan penyimpanan ini:

Untuk mengakses layanan penyimpanan ini, Anda memerlukan izin Storage Blob Data Reader. Jika Anda berencana menulis data kembali ke layanan penyimpanan ini, Anda memerlukan izin Storage Blob Data Contributor. Pelajari lebih lanjut tentang izin dan peran penyimpanan.

Memuat data dengan jalur Hadoop Distributed Files System (HDFS)

Untuk memuat dan membaca data dari penyimpanan dengan jalur HDFS yang sesuai, Anda harus memiliki info masuk autentikasi akses data Anda yang tersedia. Info masuk ini berbeda tergantung pada jenis penyimpanan Anda.

Kode berikut menunjukkan cara membaca data dari penyimpanan Azure Blob ke dalam dataframe Spark dengan token tanda tangan akses bersama (SAS) atau kunci akses Anda.

%%synapse

# setup access key or SAS token
sc._jsc.hadoopConfiguration().set("fs.azure.account.key.<storage account name>.blob.core.windows.net", "<access key>")
sc._jsc.hadoopConfiguration().set("fs.azure.sas.<container name>.<storage account name>.blob.core.windows.net", "<sas token>")

# read from blob 
df = spark.read.option("header", "true").csv("wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv")

Kode berikut menunjukkan cara membaca data dari Azure Data Lake Storage Generasi 1 (ADLS Gen 1) dengan info masuk utama layanan Anda.

%%synapse

# setup service principal which has access of the data
sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.access.token.provider.type","ClientCredential")

sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.client.id", "<client id>")

sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.credential", "<client secret>")

sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.refresh.url",
"https://login.microsoftonline.com/<tenant id>/oauth2/token")

df = spark.read.csv("adl://<storage account name>.azuredatalakestore.net/<path>")

Kode berikut menunjukkan cara membaca data dari Azure Data Lake Storage Generasi 2 (ADLS Gen 2) dengan info masuk utama layanan Anda.

%%synapse

# setup service principal which has access of the data
sc._jsc.hadoopConfiguration().set("fs.azure.account.auth.type.<storage account name>.dfs.core.windows.net","OAuth")
sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth.provider.type.<storage account name>.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth2.client.id.<storage account name>.dfs.core.windows.net", "<client id>")
sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth2.client.secret.<storage account name>.dfs.core.windows.net", "<client secret>")
sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth2.client.endpoint.<storage account name>.dfs.core.windows.net",
"https://login.microsoftonline.com/<tenant id>/oauth2/token")

df = spark.read.csv("abfss://<container name>@<storage account>.dfs.core.windows.net/<path>")

Membaca data dari himpunan data yang terdaftar

Anda juga bisa mendapatkan himpunan data terdaftar yang sudah ada di ruang kerja Anda dan melakukan penyiapan data di atasnya dengan mengonversinya menjadi dataframe spark.

Contoh berikut mengautentikasi ke ruang kerja, mendapatkan TabularDataset terdaftar, blob_dset, yang mereferensikan file dalam penyimpanan blob, dan mengonversinya menjadi dataframe spark. Saat mengonversi himpunan data menjadi dataframe spark, Anda dapat menggunakan eksplorasi data pyspark dan pustaka persiapan.

%%synapse

from azureml.core import Workspace, Dataset

subscription_id = "<enter your subscription ID>"
resource_group = "<enter your resource group>"
workspace_name = "<enter your workspace name>"

ws = Workspace(workspace_name = workspace_name,
               subscription_id = subscription_id,
               resource_group = resource_group)

dset = Dataset.get_by_name(ws, "blob_dset")
spark_df = dset.to_spark_dataframe()

Melakukan tugas perselisihan data

Setelah mengambil dan menjelajahi data, Anda dapat melakukan tugas perselisihan data.

Kode berikut, diperluas berdasarkan contoh HDFS di bagian sebelumnya dan memfilter data dalam dataframe spark, df, berdasarkan kolom Survivor dan grup yang mencantumkan menurut Usia

%%synapse

from pyspark.sql.functions import col, desc

df.filter(col('Survived') == 1).groupBy('Age').count().orderBy(desc('count')).show(10)

df.show()

Menyimpan data ke penyimpanan dan menghentikan sesi spark

Setelah eksplorasi dan penyiapan data selesai, simpan data yang disiapkan untuk digunakan nanti di akun penyimpanan Anda di Azure.

Dalam contoh berikut, data yang disiapkan ditulis kembali ke penyimpanan Azure Blob dan menimpa file Titanic.csv asli di direktori training_data. Untuk menulis kembali ke penyimpanan, Anda memerlukan izin Storage Blob Data Contributor. Pelajari lebih lanjut tentang izin dan peran penyimpanan.

%% synapse

df.write.format("csv").mode("overwrite").save("wasbs://demo@dprepdata.blob.core.windows.net/training_data/Titanic.csv")

Setelah menyelesaikan penyiapan data dan menyimpan data yang disiapkan ke penyimpanan, berhenti menggunakan kumpulan Apache Spark dengan perintah berikut.

%synapse stop

Membuat himpunan data untuk mewakili data yang disiapkan

Saat Anda siap untuk menggunakan data yang disiapkan untuk pelatihan model, sambungkan ke penyimpanan Anda dengan datastore Azure Machine Learning, dan tentukan file mana yang ingin Anda gunakan dengan himpunan data Azure Machine Learning.

Contoh kode berikut,

  • Mengasumsikan Anda sudah membuat datastore yang tersambung ke layanan penyimpanan tempat Anda menyimpan data yang disiapkan.
  • Mendapatkan datastore yang ada, mydatastore, dari ruang kerja, ws dengan metode get().
  • Membuat FileDataset, train_ds, yang mereferensikan file data yang disiapkan yang terletak di direktori training_data di mydatastore.
  • Membuat variabel input1, yang dapat digunakan di lain waktu untuk membuat file data himpunan data train_ds yang tersedia untuk target komputasi untuk tugas pelatihan Anda.
from azureml.core import Datastore, Dataset

datastore = Datastore.get(ws, datastore_name='mydatastore')

datastore_paths = [(datastore, '/training_data/')]
train_ds = Dataset.File.from_files(path=datastore_paths, validate=True)
input1 = train_ds.as_mount()

Menggunakan ScriptRunConfig untuk mengirimkan eksekusi eksperimen ke kumpulan Synapse Spark

Jika Anda siap untuk mengotomatisasi dan membuat tugas perselisihan data, Anda dapat mengirimkan eksekusi eksperimen ke kumpulan Synapse Spark terlampir dengan objek ScriptRunConfig.

Demikian pula, jika Anda memiliki alur Azure Machine Learning, Anda dapat menggunakan SynapseSparkStep untuk menentukan kumpulan Synapse Spark Anda sebagai target komputasi untuk langkah penyiapan data di alur Anda.

Membuat data Anda tersedia untuk kumpulan Synapse Spark bergantung pada jenis himpunan data Anda.

  • Untuk FileDataset, Anda dapat menggunakan metode as_hdfs() ini. Saat eksekusi dikirim, himpunan data tersedia untuk kumpulan Synapse Spark sebagai sistem file terdistribusi Hadoop (HFDS).
  • Untuk TabularDataset, Anda dapat menggunakan metode as_named_input() ini.

Kode berikut,

  • Membuat variabel input2 dari FileDataset train_ds yang dibuat dalam contoh kode sebelumnya.
  • Membuat variabel output dengan kelas HDFSOutputDatasetConfiguration. Setelah eksekusi selesai, kelas ini memungkinkan kami menyimpan output eksekusi sebagai himpunan data, test di datastore, mydatastore. Di ruang kerja Azure Machine Learning, himpunan data test terdaftar dengan nama registered_dataset.
  • Mengonfigurasi pengaturan yang harus digunakan eksekusi untuk melakukan pada kumpulan Synapse Spark.
  • Menentukan parameter ScriptRunConfig untuk,
    • Gunakan dataprep.py, untuk eksekusi.
    • Tentukan data mana yang akan digunakan sebagai input dan cara membuatnya tersedia untuk kumpulan Synapse Spark.
    • Tentukan tempat untuk menyimpan data output, output.
from azureml.core import Dataset, HDFSOutputDatasetConfig
from azureml.core.environment import CondaDependencies
from azureml.core import RunConfiguration
from azureml.core import ScriptRunConfig 
from azureml.core import Experiment

input2 = train_ds.as_hdfs()
output = HDFSOutputDatasetConfig(destination=(datastore, "test").register_on_complete(name="registered_dataset")

run_config = RunConfiguration(framework="pyspark")
run_config.target = synapse_compute_name

run_config.spark.configuration["spark.driver.memory"] = "1g" 
run_config.spark.configuration["spark.driver.cores"] = 2 
run_config.spark.configuration["spark.executor.memory"] = "1g" 
run_config.spark.configuration["spark.executor.cores"] = 1 
run_config.spark.configuration["spark.executor.instances"] = 1 

conda_dep = CondaDependencies()
conda_dep.add_pip_package("azureml-core==1.20.0")

run_config.environment.python.conda_dependencies = conda_dep

script_run_config = ScriptRunConfig(source_directory = './code',
                                    script= 'dataprep.py',
                                    arguments = ["--file_input", input2,
                                                 "--output_dir", output],
                                    run_config = run_config)

Untuk informasi selengkapnya tentang run_config.spark.configuration dan konfigurasi Spark umum, lihat Kelas SparkConfiguration dan dokumentasi konfigurasi Apache Spark.

Setelah objek ScriptRunConfig Anda disiapkan, Anda dapat mengirimkan eksekusi.

from azureml.core import Experiment 

exp = Experiment(workspace=ws, name="synapse-spark") 
run = exp.submit(config=script_run_config) 
run

Untuk detail lebih lanjut, seperti skrip dataprep.py yang digunakan dalam contoh ini, lihat contoh notebook.

Setelah data Anda disiapkan, Anda kemudian dapat menggunakannya sebagai input untuk pekerjaan pelatihan Anda. Dalam contoh kode yang disebutkan di atas, registered_dataset adalah yang akan Anda tentukan sebagai data input untuk pekerjaan pelatihan.

Contoh buku catatan

Lihat contoh buku catatan untuk konsep dan demonstrasi selengkapnya tentang kemampuan integrasi Azure Synapse Analytics dan Azure Machine Learning.

Langkah berikutnya