Manipulasi data dengan kumpulan Apache Spark (tidak digunakan lagi)
BERLAKU UNTUK: Python SDK azureml v1
Peringatan
Integrasi Azure Synapse Analytics dengan Azure Pembelajaran Mesin, tersedia di Python SDK v1, tidak digunakan lagi. Pengguna masih dapat menggunakan ruang kerja Synapse, yang terdaftar di Azure Pembelajaran Mesin, sebagai layanan tertaut. Namun, ruang kerja Synapse baru tidak dapat lagi terdaftar di Azure Machine Learning sebagai layanan tertaut. Sebaiknya gunakan komputasi Spark tanpa server dan kumpulan Synapse Spark yang terpasang, tersedia di CLI v2 dan Python SDK v2. Untuk informasi selengkapnya, kunjungi https://aka.ms/aml-spark.
Dalam artikel ini, Anda mempelajari cara melakukan tugas perselisihan data secara interaktif dalam sesi Synapse khusus, yang didukung oleh Azure Synapse Analytics, dalam notebook Jupyter. Tugas-tugas ini mengandalkan Azure Pembelajaran Mesin Python SDK. Untuk informasi selengkapnya tentang alur Azure Pembelajaran Mesin, kunjungi Cara menggunakan Apache Spark (didukung oleh Azure Synapse Analytics) di alur pembelajaran mesin Anda (pratinjau). Untuk informasi selengkapnya tentang cara menggunakan Azure Synapse Analytics dengan ruang kerja Synapse, kunjungi seri memulai Azure Synapse Analytics.
Azure Machine Learning dan integrasi Azure Synapse Analytics
Dengan integrasi Azure Synapse Analytics dengan Azure Pembelajaran Mesin (pratinjau), Anda dapat melampirkan kumpulan Apache Spark, yang didukung oleh Azure Synapse, untuk eksplorasi dan persiapan data interaktif. Dengan integrasi ini, Anda dapat memiliki sumber daya komputasi khusus untuk perselisihan data dalam skala besar, semuanya dalam notebook Python yang sama dengan yang Anda gunakan untuk melatih model pembelajaran mesin Anda.
Prasyarat
Konfigurasikan lingkungan pengembangan Anda untuk memasang Azure Machine Learning SDK, atau gunakan instans komputasi Azure Machine Learning dengan SDK yang sudah dipasang
Membuat kumpulan Apache Spark menggunakan portal Azure, alat web, atau Synapse Studio
azureml-synapse
Instal paket (pratinjau) dengan kode ini:pip install azureml-synapse
Menautkan ruang kerja Azure Pembelajaran Mesin dan ruang kerja Azure Synapse Analytics Anda dengan Azure Pembelajaran Mesin Python SDK atau dengan studio Azure Pembelajaran Mesin
Melampirkan kumpulan Synapse Spark sebagai target komputasi
Meluncurkan kumpulan Synapse Spark untuk tugas perselisihan data
Untuk memulai persiapan data dengan kumpulan Apache Spark, tentukan nama komputasi Spark Synapse yang terlampir. Anda dapat menemukan nama ini dengan studio Azure Pembelajaran Mesin di bawah tab Komputasi terlampir.
Penting
Untuk terus menggunakan kumpulan Apache Spark, Anda harus menunjukkan sumber daya komputasi mana yang akan digunakan di seluruh tugas perselisihan data Anda. Gunakan %synapse
untuk satu baris kode, dan %%synapse
untuk beberapa baris:
%synapse start -c SynapseSparkPoolAlias
Setelah sesi dimulai, Anda dapat memeriksa metadata sesi:
%synapse meta
Anda dapat menentukan lingkungan Azure Machine Learning yang akan digunakan selama sesi Apache Spark Anda. Hanya dependensi Conda yang ditentukan dalam lingkungan yang akan berlaku. Gambar Docker tidak didukung.
Peringatan
Dependensi Python yang ditentukan dalam dependensi Conda lingkungan tidak didukung di kumpulan Apache Spark. Saat ini, hanya versi Python tetap yang didukung Sertakan sys.version_info
dalam skrip Anda untuk memeriksa versi Python Anda
Kode ini membuatmyenv
variabel lingkungan, untuk menginstal azureml-core
versi 1.20.0 dan numpy
versi 1.17.0 sebelum sesi dimulai. Anda kemudian dapat menyertakan lingkungan ini dalam pernyataan start
sesi Apache Spark Anda.
from azureml.core import Workspace, Environment
# creates environment with numpy and azureml-core dependencies
ws = Workspace.from_config()
env = Environment(name="myenv")
env.python.conda_dependencies.add_pip_package("azureml-core==1.20.0")
env.python.conda_dependencies.add_conda_package("numpy==1.17.0")
env.register(workspace=ws)
Untuk memulai persiapan data dengan kumpulan Apache Spark di lingkungan kustom Anda, tentukan nama kumpulan Apache Spark dan lingkungan yang akan digunakan selama sesi Apache Spark. Anda dapat memberikan ID langganan, grup sumber daya ruang kerja pembelajaran mesin, dan nama ruang kerja pembelajaran mesin.
Penting
Pastikan untuk mengaktifkan Izinkan paket tingkat sesi di ruang kerja Synapse yang ditautkan.
%synapse start -c SynapseSparkPoolAlias -e myenv -s AzureMLworkspaceSubscriptionID -r AzureMLworkspaceResourceGroupName -w AzureMLworkspaceName
Memuat data dari penyimpanan
Setelah sesi Apache Spark dimulai, baca dalam data yang ingin Anda siapkan. Pemuatan data didukung untuk penyimpanan Azure Blob dan Azure Data Lake Storage Generasi 1 dan 2.
Anda memiliki dua opsi untuk memuat data dari layanan penyimpanan ini:
Langsung memuat data dari penyimpanan dengan jalur Hadoop Distributed Files System (HDFS)
Membaca data dari himpunan data Azure Pembelajaran Mesin yang sudah ada
Untuk mengakses layanan penyimpanan ini, Anda memerlukan izin Storage Blob Data Reader. Untuk menulis data kembali ke layanan penyimpanan ini, Anda memerlukan izin Kontributor Data Blob Penyimpanan. Pelajari lebih lanjut tentang izin dan peran penyimpanan.
Memuat data dengan jalur Hadoop Distributed Files System (HDFS)
Untuk memuat dan membaca data dari penyimpanan dengan jalur HDFS yang sesuai, Anda memerlukan kredensial autentikasi akses data yang tersedia. Info masuk ini berbeda tergantung pada jenis penyimpanan Anda. Sampel kode ini menunjukkan cara membaca data dari penyimpanan Azure Blob ke dalam kerangka data Spark dengan token tanda tangan akses bersama (SAS) atau kunci akses Anda:
%%synapse
# setup access key or SAS token
sc._jsc.hadoopConfiguration().set("fs.azure.account.key.<storage account name>.blob.core.windows.net", "<access key>")
sc._jsc.hadoopConfiguration().set("fs.azure.sas.<container name>.<storage account name>.blob.core.windows.net", "<sas token>")
# read from blob
df = spark.read.option("header", "true").csv("wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv")
Sampel kode ini menunjukkan cara membaca data dari Azure Data Lake Storage Generation 1 (ADLS Gen 1) dengan kredensial perwakilan layanan Anda:
%%synapse
# setup service principal which has access of the data
sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.access.token.provider.type","ClientCredential")
sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.client.id", "<client id>")
sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.credential", "<client secret>")
sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.refresh.url",
"https://login.microsoftonline.com/<tenant id>/oauth2/token")
df = spark.read.csv("adl://<storage account name>.azuredatalakestore.net/<path>")
Sampel kode ini menunjukkan cara membaca data dari Azure Data Lake Storage Generation 2 (ADLS Gen 2) dengan kredensial perwakilan layanan Anda:
%%synapse
# setup service principal which has access of the data
sc._jsc.hadoopConfiguration().set("fs.azure.account.auth.type.<storage account name>.dfs.core.windows.net","OAuth")
sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth.provider.type.<storage account name>.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth2.client.id.<storage account name>.dfs.core.windows.net", "<client id>")
sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth2.client.secret.<storage account name>.dfs.core.windows.net", "<client secret>")
sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth2.client.endpoint.<storage account name>.dfs.core.windows.net",
"https://login.microsoftonline.com/<tenant id>/oauth2/token")
df = spark.read.csv("abfss://<container name>@<storage account>.dfs.core.windows.net/<path>")
Membaca data dari himpunan data yang terdaftar
Anda juga dapat menempatkan himpunan data terdaftar yang ada di ruang kerja Anda, dan melakukan persiapan data di dalamnya, jika Anda mengonversinya menjadi kerangka data spark. Contoh ini mengautentikasi ke ruang kerja, mendapatkan TabularDataset terdaftar - -blob_dset
yang mereferensikan file dalam penyimpanan blob, dan mengonversi TabularDataset tersebut ke dataframe Spark. Saat mengonversi himpunan data ke dataframeS Spark, Anda dapat menggunakan pyspark
pustaka eksplorasi dan persiapan data.
%%synapse
from azureml.core import Workspace, Dataset
subscription_id = "<enter your subscription ID>"
resource_group = "<enter your resource group>"
workspace_name = "<enter your workspace name>"
ws = Workspace(workspace_name = workspace_name,
subscription_id = subscription_id,
resource_group = resource_group)
dset = Dataset.get_by_name(ws, "blob_dset")
spark_df = dset.to_spark_dataframe()
Melakukan tugas perselisihan data
Setelah mengambil dan menjelajahi data, Anda dapat melakukan tugas manipulasi data. Sampel kode ini diperluas pada contoh HDFS di bagian sebelumnya. Berdasarkan kolom Survivor , kolom memfilter data dalam kerangka data df
spark dan grup yang mencantumkan menurut Usia:
%%synapse
from pyspark.sql.functions import col, desc
df.filter(col('Survived') == 1).groupBy('Age').count().orderBy(desc('count')).show(10)
df.show()
Menyimpan data ke penyimpanan dan menghentikan sesi spark
Setelah eksplorasi dan penyiapan data selesai, simpan data yang disiapkan untuk digunakan nanti di akun penyimpanan Anda di Azure. Dalam sampel kode ini, data yang disiapkan ditulis kembali ke penyimpanan Azure Blob, menimpa file asli Titanic.csv
di training_data
direktori. Untuk menulis kembali ke penyimpanan, Anda memerlukan izin Storage Blob Data Contributor. Untuk informasi selengkapnya, kunjungi Menetapkan peran Azure untuk akses ke data blob.
%% synapse
df.write.format("csv").mode("overwrite").save("wasbs://demo@dprepdata.blob.core.windows.net/training_data/Titanic.csv")
Setelah Anda menyelesaikan persiapan data, dan Anda menyimpan data yang disiapkan ke penyimpanan, akhiri penggunaan kumpulan Apache Spark Anda dengan perintah ini:
%synapse stop
Membuat himpunan data, untuk mewakili data yang disiapkan
Saat Anda siap menggunakan data yang disiapkan untuk pelatihan model, sambungkan ke penyimpanan Anda dengan datastore Azure Pembelajaran Mesin, dan tentukan file atau file yang ingin Anda gunakan dengan himpunan data Azure Pembelajaran Mesin.
Contoh kode ini
- Mengasumsikan Anda sudah membuat datastore yang tersambung ke layanan penyimpanan tempat Anda menyimpan data yang disiapkan
- Mengambil datastore yang ada - -
mydatastore
dari ruangws
kerja dengan metode get(). - Membuat FileDataset,
train_ds
, untuk mereferensikan file data yang disiapkan yang terletak dimydatastore
training_data
direktori - Membuat variabel
input1
. Di lain waktu, variabel ini dapat membuat file data himpunantrain_ds
data tersedia untuk target komputasi untuk tugas pelatihan Anda.
from azureml.core import Datastore, Dataset
datastore = Datastore.get(ws, datastore_name='mydatastore')
datastore_paths = [(datastore, '/training_data/')]
train_ds = Dataset.File.from_files(path=datastore_paths, validate=True)
input1 = train_ds.as_mount()
Menggunakan ScriptRunConfig
untuk mengirimkan eksekusi eksperimen ke kumpulan Synapse Spark
Jika Anda siap untuk mengotomatisasi dan membuat tugas perselisihan data, Anda dapat mengirimkan eksekusi eksperimen ke kumpulan Synapse Spark terlampir dengan objek ScriptRunConfig. Dengan cara yang sama, jika Anda memiliki alur Azure Pembelajaran Mesin, Anda dapat menggunakan SynapseSparkStep untuk menentukan kumpulan Synapse Spark Anda sebagai target komputasi untuk langkah persiapan data di alur Anda. Ketersediaan data Anda ke kumpulan Synapse Spark bergantung pada jenis himpunan data Anda.
- Untuk FileDataset, Anda dapat menggunakan metode
as_hdfs()
ini. Ketika eksekusi dikirimkan, himpunan data tersedia untuk kumpulan Synapse Spark sebagai sistem file terdistribusi Hadoop (HFDS) - Untuk TabularDataset, Anda dapat menggunakan
as_named_input()
metode
Sampel kode berikut
- Membuat variabel
input2
dari FileDatasettrain_ds
, yang dibuat sendiri dalam contoh kode sebelumnya - Membuat variabel
output
denganHDFSOutputDatasetConfiguration
kelas . Setelah eksekusi selesai, kelas ini memungkinkan kami menyimpan output eksekusi sebagai himpunan data,test
di datastoremydatastore
. Di ruang kerja Azure Pembelajaran Mesin, himpunantest
data terdaftar dengan namaregistered_dataset
- Mengonfigurasi pengaturan yang harus digunakan untuk melakukan pada kumpulan Synapse Spark
- Menentukan parameter ScriptRunConfig ke
dataprep.py
Menggunakan skrip untuk eksekusi- Tentukan data yang akan digunakan sebagai input, dan cara membuat data tersebut tersedia untuk kumpulan Synapse Spark
- Tentukan tempat menyimpan
output
data output
from azureml.core import Dataset, HDFSOutputDatasetConfig
from azureml.core.environment import CondaDependencies
from azureml.core import RunConfiguration
from azureml.core import ScriptRunConfig
from azureml.core import Experiment
input2 = train_ds.as_hdfs()
output = HDFSOutputDatasetConfig(destination=(datastore, "test").register_on_complete(name="registered_dataset")
run_config = RunConfiguration(framework="pyspark")
run_config.target = synapse_compute_name
run_config.spark.configuration["spark.driver.memory"] = "1g"
run_config.spark.configuration["spark.driver.cores"] = 2
run_config.spark.configuration["spark.executor.memory"] = "1g"
run_config.spark.configuration["spark.executor.cores"] = 1
run_config.spark.configuration["spark.executor.instances"] = 1
conda_dep = CondaDependencies()
conda_dep.add_pip_package("azureml-core==1.20.0")
run_config.environment.python.conda_dependencies = conda_dep
script_run_config = ScriptRunConfig(source_directory = './code',
script= 'dataprep.py',
arguments = ["--file_input", input2,
"--output_dir", output],
run_config = run_config)
Untuk informasi selengkapnya tentang run_config.spark.configuration
dan konfigurasi Spark umum, kunjungi SparkConfiguration Class dan dokumentasi konfigurasi Apache Spark.
Setelah menyiapkan ScriptRunConfig
objek, Anda dapat mengirimkan eksekusi.
from azureml.core import Experiment
exp = Experiment(workspace=ws, name="synapse-spark")
run = exp.submit(config=script_run_config)
run
Untuk informasi selengkapnya, termasuk informasi tentang skrip yang dataprep.py
digunakan dalam contoh ini, lihat contoh buku catatan.
Setelah menyiapkan data, Anda dapat menggunakannya sebagai input untuk pekerjaan pelatihan Anda. Dalam contoh kode di atas, Anda akan menentukan registered_dataset
sebagai data input Anda untuk pekerjaan pelatihan.
Contoh buku catatan
Tinjau contoh notebook ini untuk konsep dan demonstrasi lainnya dari kemampuan integrasi Azure Synapse Analytics dan Azure Pembelajaran Mesin:
- Jalankan sesi Spark interaktif dari buku catatan di ruang kerja Azure Machine Learning Anda.
- Kirim eksekusi eksperimen Azure Machine Learning dengan kumpulan Synapse Spark sebagai target komputasi Anda.