Bagikan melalui


Manipulasi data dengan kumpulan Apache Spark (tidak digunakan lagi)

BERLAKU UNTUK:SDK Python azureml v1

Peringatan

Integrasi Azure Synapse Analytics dengan Azure Pembelajaran Mesin, tersedia di Python SDK v1, tidak digunakan lagi. Pengguna masih dapat menggunakan ruang kerja Synapse, yang terdaftar di Azure Pembelajaran Mesin, sebagai layanan tertaut. Namun, ruang kerja Synapse baru tidak dapat lagi terdaftar di Azure Machine Learning sebagai layanan tertaut. Sebaiknya gunakan komputasi Spark tanpa server dan kumpulan Synapse Spark yang terpasang, tersedia di CLI v2 dan Python SDK v2. Untuk informasi selengkapnya, kunjungi https://aka.ms/aml-spark.

Dalam artikel ini, Anda mempelajari cara melakukan tugas perselisihan data secara interaktif dalam sesi Synapse khusus, yang didukung oleh Azure Synapse Analytics, dalam notebook Jupyter. Tugas-tugas ini mengandalkan Azure Pembelajaran Mesin Python SDK. Untuk informasi selengkapnya tentang alur Azure Pembelajaran Mesin, kunjungi Cara menggunakan Apache Spark (didukung oleh Azure Synapse Analytics) di alur pembelajaran mesin Anda (pratinjau). Untuk informasi selengkapnya tentang cara menggunakan Azure Synapse Analytics dengan ruang kerja Synapse, kunjungi seri memulai Azure Synapse Analytics.

Azure Machine Learning dan integrasi Azure Synapse Analytics

Dengan integrasi Azure Synapse Analytics dengan Azure Pembelajaran Mesin (pratinjau), Anda dapat melampirkan kumpulan Apache Spark, yang didukung oleh Azure Synapse, untuk eksplorasi dan persiapan data interaktif. Dengan integrasi ini, Anda dapat memiliki sumber daya komputasi khusus untuk perselisihan data dalam skala besar, semuanya dalam notebook Python yang sama dengan yang Anda gunakan untuk melatih model pembelajaran mesin Anda.

Prasyarat

Meluncurkan kumpulan Synapse Spark untuk tugas perselisihan data

Untuk memulai persiapan data dengan kumpulan Apache Spark, tentukan nama komputasi Spark Synapse yang terlampir. Anda dapat menemukan nama ini dengan studio Azure Pembelajaran Mesin di bawah tab Komputasi terlampir.

dapatkan nama komputasi terlampir

Penting

Untuk terus menggunakan kumpulan Apache Spark, Anda harus menunjukkan sumber daya komputasi mana yang akan digunakan di seluruh tugas perselisihan data Anda. Gunakan %synapse untuk satu baris kode, dan %%synapse untuk beberapa baris:

%synapse start -c SynapseSparkPoolAlias

Setelah sesi dimulai, Anda dapat memeriksa metadata sesi:

%synapse meta

Anda dapat menentukan lingkungan Azure Machine Learning yang akan digunakan selama sesi Apache Spark Anda. Hanya dependensi Conda yang ditentukan dalam lingkungan yang akan berlaku. Gambar Docker tidak didukung.

Peringatan

Dependensi Python yang ditentukan dalam dependensi Conda lingkungan tidak didukung di kumpulan Apache Spark. Saat ini, hanya versi Python tetap yang didukung Sertakan sys.version_info dalam skrip Anda untuk memeriksa versi Python Anda

Kode ini membuatmyenv variabel lingkungan, untuk menginstal azureml-core versi 1.20.0 dan numpy versi 1.17.0 sebelum sesi dimulai. Anda kemudian dapat menyertakan lingkungan ini dalam pernyataan start sesi Apache Spark Anda.


from azureml.core import Workspace, Environment

# creates environment with numpy and azureml-core dependencies
ws = Workspace.from_config()
env = Environment(name="myenv")
env.python.conda_dependencies.add_pip_package("azureml-core==1.20.0")
env.python.conda_dependencies.add_conda_package("numpy==1.17.0")
env.register(workspace=ws)

Untuk memulai persiapan data dengan kumpulan Apache Spark di lingkungan kustom Anda, tentukan nama kumpulan Apache Spark dan lingkungan yang akan digunakan selama sesi Apache Spark. Anda dapat memberikan ID langganan, grup sumber daya ruang kerja pembelajaran mesin, dan nama ruang kerja pembelajaran mesin.

Penting

Pastikan untuk mengaktifkan Izinkan paket tingkat sesi di ruang kerja Synapse yang ditautkan.

aktifkan paket tingkat sesi

%synapse start -c SynapseSparkPoolAlias -e myenv -s AzureMLworkspaceSubscriptionID -r AzureMLworkspaceResourceGroupName -w AzureMLworkspaceName

Memuat data dari penyimpanan

Setelah sesi Apache Spark dimulai, baca dalam data yang ingin Anda siapkan. Pemuatan data didukung untuk penyimpanan Azure Blob dan Azure Data Lake Storage Generasi 1 dan 2.

Anda memiliki dua opsi untuk memuat data dari layanan penyimpanan ini:

Untuk mengakses layanan penyimpanan ini, Anda memerlukan izin Storage Blob Data Reader. Untuk menulis data kembali ke layanan penyimpanan ini, Anda memerlukan izin Kontributor Data Blob Penyimpanan. Pelajari lebih lanjut tentang izin dan peran penyimpanan.

Memuat data dengan jalur Hadoop Distributed Files System (HDFS)

Untuk memuat dan membaca data dari penyimpanan dengan jalur HDFS yang sesuai, Anda memerlukan kredensial autentikasi akses data yang tersedia. Info masuk ini berbeda tergantung pada jenis penyimpanan Anda. Sampel kode ini menunjukkan cara membaca data dari penyimpanan Azure Blob ke dalam kerangka data Spark dengan token tanda tangan akses bersama (SAS) atau kunci akses Anda:

%%synapse

# setup access key or SAS token
sc._jsc.hadoopConfiguration().set("fs.azure.account.key.<storage account name>.blob.core.windows.net", "<access key>")
sc._jsc.hadoopConfiguration().set("fs.azure.sas.<container name>.<storage account name>.blob.core.windows.net", "<sas token>")

# read from blob 
df = spark.read.option("header", "true").csv("wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv")

Sampel kode ini menunjukkan cara membaca data dari Azure Data Lake Storage Generation 1 (ADLS Gen 1) dengan kredensial perwakilan layanan Anda:

%%synapse

# setup service principal which has access of the data
sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.access.token.provider.type","ClientCredential")

sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.client.id", "<client id>")

sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.credential", "<client secret>")

sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.refresh.url",
"https://login.microsoftonline.com/<tenant id>/oauth2/token")

df = spark.read.csv("adl://<storage account name>.azuredatalakestore.net/<path>")

Sampel kode ini menunjukkan cara membaca data dari Azure Data Lake Storage Generation 2 (ADLS Gen 2) dengan kredensial perwakilan layanan Anda:

%%synapse

# setup service principal which has access of the data
sc._jsc.hadoopConfiguration().set("fs.azure.account.auth.type.<storage account name>.dfs.core.windows.net","OAuth")
sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth.provider.type.<storage account name>.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth2.client.id.<storage account name>.dfs.core.windows.net", "<client id>")
sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth2.client.secret.<storage account name>.dfs.core.windows.net", "<client secret>")
sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth2.client.endpoint.<storage account name>.dfs.core.windows.net",
"https://login.microsoftonline.com/<tenant id>/oauth2/token")

df = spark.read.csv("abfss://<container name>@<storage account>.dfs.core.windows.net/<path>")

Membaca data dari himpunan data yang terdaftar

Anda juga dapat menempatkan himpunan data terdaftar yang ada di ruang kerja Anda, dan melakukan persiapan data di dalamnya, jika Anda mengonversinya menjadi kerangka data spark. Contoh ini mengautentikasi ke ruang kerja, mendapatkan TabularDataset terdaftar - -blob_dset yang mereferensikan file dalam penyimpanan blob, dan mengonversi TabularDataset tersebut ke dataframe Spark. Saat mengonversi himpunan data ke dataframeS Spark, Anda dapat menggunakan pyspark pustaka eksplorasi dan persiapan data.

%%synapse

from azureml.core import Workspace, Dataset

subscription_id = "<enter your subscription ID>"
resource_group = "<enter your resource group>"
workspace_name = "<enter your workspace name>"

ws = Workspace(workspace_name = workspace_name,
               subscription_id = subscription_id,
               resource_group = resource_group)

dset = Dataset.get_by_name(ws, "blob_dset")
spark_df = dset.to_spark_dataframe()

Melakukan tugas perselisihan data

Setelah mengambil dan menjelajahi data, Anda dapat melakukan tugas manipulasi data. Sampel kode ini diperluas pada contoh HDFS di bagian sebelumnya. Berdasarkan kolom Survivor , kolom memfilter data dalam kerangka data df spark dan grup yang mencantumkan menurut Usia:

%%synapse

from pyspark.sql.functions import col, desc

df.filter(col('Survived') == 1).groupBy('Age').count().orderBy(desc('count')).show(10)

df.show()

Menyimpan data ke penyimpanan dan menghentikan sesi spark

Setelah eksplorasi dan penyiapan data selesai, simpan data yang disiapkan untuk digunakan nanti di akun penyimpanan Anda di Azure. Dalam sampel kode ini, data yang disiapkan ditulis kembali ke penyimpanan Azure Blob, menimpa file asli Titanic.csv di training_data direktori. Untuk menulis kembali ke penyimpanan, Anda memerlukan izin Storage Blob Data Contributor. Untuk informasi selengkapnya, kunjungi Menetapkan peran Azure untuk akses ke data blob.

%% synapse

df.write.format("csv").mode("overwrite").save("wasbs://demo@dprepdata.blob.core.windows.net/training_data/Titanic.csv")

Setelah Anda menyelesaikan persiapan data, dan Anda menyimpan data yang disiapkan ke penyimpanan, akhiri penggunaan kumpulan Apache Spark Anda dengan perintah ini:

%synapse stop

Membuat himpunan data, untuk mewakili data yang disiapkan

Saat Anda siap menggunakan data yang disiapkan untuk pelatihan model, sambungkan ke penyimpanan Anda dengan datastore Azure Pembelajaran Mesin, dan tentukan file atau file yang ingin Anda gunakan dengan himpunan data Azure Pembelajaran Mesin.

Contoh kode ini

  • Mengasumsikan Anda sudah membuat datastore yang tersambung ke layanan penyimpanan tempat Anda menyimpan data yang disiapkan
  • Mengambil datastore yang ada - - mydatastore dari ruang ws kerja dengan metode get().
  • Membuat FileDataset, train_ds, untuk mereferensikan file data yang disiapkan yang terletak di mydatastoretraining_data direktori
  • Membuat variabel input1. Di lain waktu, variabel ini dapat membuat file data himpunan train_ds data tersedia untuk target komputasi untuk tugas pelatihan Anda.
from azureml.core import Datastore, Dataset

datastore = Datastore.get(ws, datastore_name='mydatastore')

datastore_paths = [(datastore, '/training_data/')]
train_ds = Dataset.File.from_files(path=datastore_paths, validate=True)
input1 = train_ds.as_mount()

Menggunakan ScriptRunConfig untuk mengirimkan eksekusi eksperimen ke kumpulan Synapse Spark

Jika Anda siap untuk mengotomatisasi dan membuat tugas perselisihan data, Anda dapat mengirimkan eksekusi eksperimen ke kumpulan Synapse Spark terlampir dengan objek ScriptRunConfig. Dengan cara yang sama, jika Anda memiliki alur Azure Pembelajaran Mesin, Anda dapat menggunakan SynapseSparkStep untuk menentukan kumpulan Synapse Spark Anda sebagai target komputasi untuk langkah persiapan data di alur Anda. Ketersediaan data Anda ke kumpulan Synapse Spark bergantung pada jenis himpunan data Anda.

  • Untuk FileDataset, Anda dapat menggunakan metode as_hdfs() ini. Ketika eksekusi dikirimkan, himpunan data tersedia untuk kumpulan Synapse Spark sebagai sistem file terdistribusi Hadoop (HFDS)
  • Untuk TabularDataset, Anda dapat menggunakan as_named_input() metode

Sampel kode berikut

  • Membuat variabel input2 dari FileDataset train_ds, yang dibuat sendiri dalam contoh kode sebelumnya
  • Membuat variabel output dengan HDFSOutputDatasetConfiguration kelas . Setelah eksekusi selesai, kelas ini memungkinkan kami menyimpan output eksekusi sebagai himpunan data, test di datastore mydatastore . Di ruang kerja Azure Pembelajaran Mesin, himpunan test data terdaftar dengan namaregistered_dataset
  • Mengonfigurasi pengaturan yang harus digunakan untuk melakukan pada kumpulan Synapse Spark
  • Menentukan parameter ScriptRunConfig ke
    • dataprep.py Menggunakan skrip untuk eksekusi
    • Tentukan data yang akan digunakan sebagai input, dan cara membuat data tersebut tersedia untuk kumpulan Synapse Spark
    • Tentukan tempat menyimpan output data output
from azureml.core import Dataset, HDFSOutputDatasetConfig
from azureml.core.environment import CondaDependencies
from azureml.core import RunConfiguration
from azureml.core import ScriptRunConfig 
from azureml.core import Experiment

input2 = train_ds.as_hdfs()
output = HDFSOutputDatasetConfig(destination=(datastore, "test").register_on_complete(name="registered_dataset")

run_config = RunConfiguration(framework="pyspark")
run_config.target = synapse_compute_name

run_config.spark.configuration["spark.driver.memory"] = "1g" 
run_config.spark.configuration["spark.driver.cores"] = 2 
run_config.spark.configuration["spark.executor.memory"] = "1g" 
run_config.spark.configuration["spark.executor.cores"] = 1 
run_config.spark.configuration["spark.executor.instances"] = 1 

conda_dep = CondaDependencies()
conda_dep.add_pip_package("azureml-core==1.20.0")

run_config.environment.python.conda_dependencies = conda_dep

script_run_config = ScriptRunConfig(source_directory = './code',
                                    script= 'dataprep.py',
                                    arguments = ["--file_input", input2,
                                                 "--output_dir", output],
                                    run_config = run_config)

Untuk informasi selengkapnya tentang run_config.spark.configuration dan konfigurasi Spark umum, kunjungi SparkConfiguration Class dan dokumentasi konfigurasi Apache Spark.

Setelah menyiapkan ScriptRunConfig objek, Anda dapat mengirimkan eksekusi.

from azureml.core import Experiment 

exp = Experiment(workspace=ws, name="synapse-spark") 
run = exp.submit(config=script_run_config) 
run

Untuk informasi selengkapnya, termasuk informasi tentang skrip yang dataprep.py digunakan dalam contoh ini, lihat contoh buku catatan.

Setelah menyiapkan data, Anda dapat menggunakannya sebagai input untuk pekerjaan pelatihan Anda. Dalam contoh kode di atas, Anda akan menentukan registered_dataset sebagai data input Anda untuk pekerjaan pelatihan.

Contoh buku catatan

Tinjau contoh notebook ini untuk konsep dan demonstrasi lainnya dari kemampuan integrasi Azure Synapse Analytics dan Azure Pembelajaran Mesin:

Langkah berikutnya