Uzdatnianie danych za pomocą pul platformy Apache Spark (przestarzałe)
DOTYCZY:Zestaw SDK języka Python w wersji 1
Ostrzeżenie
Integracja usługi Azure Synapse Analytics z usługą Azure Machine Learning dostępna w zestawie SDK języka Python w wersji 1 jest przestarzała. Użytkownicy mogą nadal korzystać z obszaru roboczego usługi Synapse zarejestrowanego w usłudze Azure Machine Learning jako połączonej usługi. Nowy obszar roboczy usługi Synapse nie może być już jednak zarejestrowany w usłudze Azure Machine Learning jako połączona usługa. Zalecamy używanie zarządzanych (automatycznych) zasobów obliczeniowych usługi Synapse i dołączonych pul platformy Spark synapse dostępnych w interfejsie wiersza polecenia w wersji 2 i zestawu Python SDK w wersji 2. Aby uzyskać więcej informacji, zobacz https://aka.ms/aml-spark .
Z tego artykułu dowiesz się, jak interaktywnie wykonywać zadania uzdatniania danych w ramach dedykowanej sesji usługi Synapse obsługiwanej przez usługę Azure Synapse Analytics w notesie Jupyter przy użyciu zestawu SDK języka Python usługi Azure Machine Learning.
Jeśli wolisz używać potoków usługi Azure Machine Learning, zobacz Jak używać platformy Apache Spark (obsługiwanej przez usługę Azure Synapse Analytics) w potoku uczenia maszynowego (wersja zapoznawcza).
Aby uzyskać wskazówki dotyczące korzystania z usługi Azure Synapse Analytics w obszarze roboczym usługi Synapse, zobacz serię wprowadzenie do usługi Azure Synapse Analytics.
Integracja usług Azure Machine Learning i Azure Synapse Analytics
Integracja usługi Azure Synapse Analytics z usługą Azure Machine Learning (wersja zapoznawcza) umożliwia dołączenie puli platformy Apache Spark wspieranej przez Azure Synapse na potrzeby interaktywnej eksploracji i przygotowywania danych. Dzięki tej integracji możesz mieć dedykowane zasoby obliczeniowe na potrzeby uzdatniania danych na dużą skalę, a wszystko to w tym samym notesie języka Python używanym do trenowania modeli uczenia maszynowego.
Wymagania wstępne
Zainstalowany zestaw SDK języka Python usługi Azure Machine Learning.
Utwórz obszar roboczy usługi Azure Synapse Analytics w Azure Portal.
Skonfiguruj środowisko projektowe , aby zainstalować zestaw SDK usługi Azure Machine Learning lub użyć wystąpienia obliczeniowego usługi Azure Machine Learning z już zainstalowanym zestawem SDK.
azureml-synapse
Zainstaluj pakiet (wersja zapoznawcza) przy użyciu następującego kodu:pip install azureml-synapse
Łączenie obszaru roboczego usługi Azure Machine Learning i obszaru roboczego usługi Azure Synapse Analytics przy użyciu zestawu SDK języka Python usługi Azure Machine Learning lub za pośrednictwem Azure Machine Learning studio
Dołącz pulę usługi Synapse Spark jako docelową wartość obliczeniową.
Uruchamianie puli usługi Synapse Spark na potrzeby zadań uzdatniania danych
Aby rozpocząć przygotowywanie danych za pomocą puli apache Spark, określ dołączoną nazwę obliczeniową spark Synapse. Tę nazwę można znaleźć za pośrednictwem Azure Machine Learning studio na karcie Dołączone obliczenia.
Ważne
Aby nadal korzystać z puli platformy Apache Spark, musisz wskazać, który zasób obliczeniowy ma być używany w ramach zadań uzdatniania danych dla %synapse
pojedynczych wierszy kodu i %%synapse
dla wielu wierszy. Dowiedz się więcej o poleceniu magic aplikacji %synapse.
%synapse start -c SynapseSparkPoolAlias
Po rozpoczęciu sesji można sprawdzić metadane sesji.
%synapse meta
Możesz określić środowisko usługi Azure Machine Learning do użycia podczas sesji platformy Apache Spark. Zostaną zastosowane tylko zależności Conda określone w środowisku. Obraz platformy Docker nie jest obsługiwany.
Ostrzeżenie
Zależności języka Python określone w zależnościach środowiska Conda nie są obsługiwane w pulach platformy Apache Spark. Obecnie obsługiwane są tylko stałe wersje języka Python.
Sprawdź wersję języka Python, dołączając sys.version_info
go do skryptu.
Poniższy kod tworzy środowisko , myenv
które instaluje azureml-core
wersję 1.20.0 i numpy
1.17.0 przed rozpoczęciem sesji. Następnie możesz uwzględnić to środowisko w instrukcji sesji platformy start
Apache Spark.
from azureml.core import Workspace, Environment
# creates environment with numpy and azureml-core dependencies
ws = Workspace.from_config()
env = Environment(name="myenv")
env.python.conda_dependencies.add_pip_package("azureml-core==1.20.0")
env.python.conda_dependencies.add_conda_package("numpy==1.17.0")
env.register(workspace=ws)
Aby rozpocząć przygotowywanie danych za pomocą puli Apache Spark i środowiska niestandardowego, określ nazwę puli platformy Apache Spark i środowisko, które ma być używane podczas sesji platformy Apache Spark. Ponadto możesz podać identyfikator subskrypcji, grupę zasobów obszaru roboczego uczenia maszynowego i nazwę obszaru roboczego uczenia maszynowego.
Ważne
Upewnij się, że pozycja Zezwalaj na pakiety na poziomie sesji jest włączona w połączonym obszarze roboczym usługi Synapse.
%synapse start -c SynapseSparkPoolAlias -e myenv -s AzureMLworkspaceSubscriptionID -r AzureMLworkspaceResourceGroupName -w AzureMLworkspaceName
Ładowanie danych z magazynu
Po rozpoczęciu sesji platformy Apache Spark przeczytaj dane, które chcesz przygotować. Ładowanie danych jest obsługiwane w przypadku usługi Azure Blob Storage i Azure Data Lake Storage generacji 1 i 2.
Istnieją dwa sposoby ładowania danych z tych usług magazynu:
Bezpośrednie ładowanie danych z magazynu przy użyciu ścieżki rozproszonego systemu plików Hadoop (HDFS).
Odczytywanie danych z istniejącego zestawu danych usługi Azure Machine Learning.
Aby uzyskać dostęp do tych usług magazynu, musisz mieć uprawnienia Czytelnik danych obiektów blob usługi Storage . Jeśli planujesz zapisywanie danych z powrotem do tych usług magazynu, potrzebujesz uprawnień Współautor danych obiektu blob usługi Storage . Dowiedz się więcej o uprawnieniach i rolach magazynu.
Ładowanie danych za pomocą ścieżki rozproszonego systemu plików Hadoop (HDFS)
Aby załadować i odczytać dane z magazynu przy użyciu odpowiedniej ścieżki systemu plików HDFS, musisz mieć łatwo dostępne poświadczenia uwierzytelniania dostępu do danych. Te poświadczenia różnią się w zależności od typu magazynu.
Poniższy kod pokazuje, jak odczytywać dane z usługi Azure Blob Storage do ramki danych platformy Spark przy użyciu tokenu sygnatury dostępu współdzielonego (SAS) lub klucza dostępu.
%%synapse
# setup access key or SAS token
sc._jsc.hadoopConfiguration().set("fs.azure.account.key.<storage account name>.blob.core.windows.net", "<access key>")
sc._jsc.hadoopConfiguration().set("fs.azure.sas.<container name>.<storage account name>.blob.core.windows.net", "<sas token>")
# read from blob
df = spark.read.option("header", "true").csv("wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv")
Poniższy kod pokazuje, jak odczytywać dane z Azure Data Lake Storage generacji 1 (ADLS Gen 1) przy użyciu poświadczeń jednostki usługi.
%%synapse
# setup service principal which has access of the data
sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.access.token.provider.type","ClientCredential")
sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.client.id", "<client id>")
sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.credential", "<client secret>")
sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.refresh.url",
"https://login.microsoftonline.com/<tenant id>/oauth2/token")
df = spark.read.csv("adl://<storage account name>.azuredatalakestore.net/<path>")
Poniższy kod pokazuje, jak odczytywać dane z Azure Data Lake Storage generacji 2 (ADLS Gen 2) przy użyciu poświadczeń jednostki usługi.
%%synapse
# setup service principal which has access of the data
sc._jsc.hadoopConfiguration().set("fs.azure.account.auth.type.<storage account name>.dfs.core.windows.net","OAuth")
sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth.provider.type.<storage account name>.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth2.client.id.<storage account name>.dfs.core.windows.net", "<client id>")
sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth2.client.secret.<storage account name>.dfs.core.windows.net", "<client secret>")
sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth2.client.endpoint.<storage account name>.dfs.core.windows.net",
"https://login.microsoftonline.com/<tenant id>/oauth2/token")
df = spark.read.csv("abfss://<container name>@<storage account>.dfs.core.windows.net/<path>")
Odczytywanie danych z zarejestrowanych zestawów danych
Możesz również pobrać istniejący zarejestrowany zestaw danych w obszarze roboczym i wykonać na nim przygotowywanie danych, konwertując go na ramkę danych platformy Spark.
Poniższy przykład uwierzytelnia się w obszarze roboczym, pobiera zarejestrowany element TabularDataset, blob_dset
, który odwołuje się do plików w magazynie obiektów blob i konwertuje go na ramkę danych platformy Spark. Podczas konwertowania zestawów danych na ramkę danych platformy Spark możesz użyć pyspark
bibliotek eksploracji i przygotowywania danych.
%%synapse
from azureml.core import Workspace, Dataset
subscription_id = "<enter your subscription ID>"
resource_group = "<enter your resource group>"
workspace_name = "<enter your workspace name>"
ws = Workspace(workspace_name = workspace_name,
subscription_id = subscription_id,
resource_group = resource_group)
dset = Dataset.get_by_name(ws, "blob_dset")
spark_df = dset.to_spark_dataframe()
Wykonywanie zadań uzdatniania danych
Po pobraniu i eksplorowaniu danych możesz wykonywać zadania uzdatniania danych.
Poniższy kod rozwija przykład systemu plików HDFS w poprzedniej sekcji i filtruje dane w ramce danych platformy Spark, df
na podstawie kolumny Survivor i grup, które są wyświetlane według wieku
%%synapse
from pyspark.sql.functions import col, desc
df.filter(col('Survived') == 1).groupBy('Age').count().orderBy(desc('count')).show(10)
df.show()
Zapisywanie danych w magazynie i zatrzymywanie sesji platformy Spark
Po zakończeniu eksploracji i przygotowania danych zapisz przygotowane dane do późniejszego użycia na koncie magazynu na platformie Azure.
W poniższym przykładzie przygotowane dane są zapisywane z powrotem w usłudze Azure Blob Storage i zastępują oryginalny Titanic.csv
plik w training_data
katalogu. Aby zapisywać z powrotem do magazynu, musisz mieć uprawnienia współautora danych obiektu blob usługi Storage . Dowiedz się więcej o uprawnieniach i rolach magazynu.
%% synapse
df.write.format("csv").mode("overwrite").save("wasbs://demo@dprepdata.blob.core.windows.net/training_data/Titanic.csv")
Po zakończeniu przygotowywania danych i zapisaniu przygotowanych danych do magazynu zatrzymaj korzystanie z puli platformy Apache Spark za pomocą następującego polecenia.
%synapse stop
Tworzenie zestawu danych do reprezentowania przygotowanych danych
Gdy wszystko będzie gotowe do użycia przygotowanych danych do trenowania modelu, połącz się z magazynem danych usługi Azure Machine Learning i określ pliki, których chcesz używać z zestawem danych usługi Azure Machine Learning.
Poniższy przykładowy kod
- Założono, że utworzono już magazyn danych, który łączy się z usługą magazynu, w której zapisano przygotowane dane.
- Pobiera ten istniejący magazyn danych z
mydatastore
obszaru roboczegows
za pomocą metody get(). - Tworzy zestaw plików FileDataset,
train_ds
który odwołuje się do przygotowanych plików danych znajdujących się w katalogu wmydatastore
katalogutraining_data
. - Tworzy zmienną
input1
, która może być używana później, aby plikitrain_ds
danych zestawu danych były dostępne dla docelowego obiektu obliczeniowego dla zadań szkoleniowych.
from azureml.core import Datastore, Dataset
datastore = Datastore.get(ws, datastore_name='mydatastore')
datastore_paths = [(datastore, '/training_data/')]
train_ds = Dataset.File.from_files(path=datastore_paths, validate=True)
input1 = train_ds.as_mount()
Używanie elementu do ScriptRunConfig
przesyłania przebiegu eksperymentu do puli usługi Synapse Spark
Jeśli wszystko jest gotowe do automatyzacji i produkcji zadań uzdatniania danych, możesz przesłać przebieg eksperymentu do dołączonej puli usługi Synapse Spark za pomocą obiektu ScriptRunConfig .
Podobnie, jeśli masz potok usługi Azure Machine Learning, możesz użyć elementu SynapseSparkStep, aby określić pulę usługi Synapse Spark jako docelowy obiekt obliczeniowy dla kroku przygotowywania danych w potoku.
Udostępnianie danych w puli Usługi Synapse Spark zależy od typu zestawu danych.
- W przypadku zestawu FileDataset można użyć
as_hdfs()
metody . Po przesłaniu przebiegu zestaw danych zostanie udostępniony puli Synapse Spark jako rozproszony system plików Hadoop (HFDS). - W przypadku elementu TabularDataset można użyć
as_named_input()
metody .
Poniższy kod,
- Tworzy zmienną
input2
na podstawie zestawu FileDatasettrain_ds
, który został utworzony w poprzednim przykładzie kodu. - Tworzy zmienną
output
za pomocą klasy HDFSOutputDatasetConfiguration. Po zakończeniu przebiegu ta klasa umożliwia zapisanie danych wyjściowych przebiegu jako zestawu danychtest
w magazyniemydatastore
danych . W obszarze roboczymtest
usługi Azure Machine Learning zestaw danych jest zarejestrowany pod nazwąregistered_dataset
. - Konfiguruje ustawienia, których uruchomienie powinno być używane do wykonywania w puli usługi Synapse Spark.
- Definiuje parametry ScriptRunConfig do,
- Użyj elementu
dataprep.py
, dla przebiegu. - Określ dane, które mają być używane jako dane wejściowe i jak udostępnić je w puli usługi Synapse Spark.
- Określ miejsce przechowywania danych wyjściowych,
output
.
- Użyj elementu
from azureml.core import Dataset, HDFSOutputDatasetConfig
from azureml.core.environment import CondaDependencies
from azureml.core import RunConfiguration
from azureml.core import ScriptRunConfig
from azureml.core import Experiment
input2 = train_ds.as_hdfs()
output = HDFSOutputDatasetConfig(destination=(datastore, "test").register_on_complete(name="registered_dataset")
run_config = RunConfiguration(framework="pyspark")
run_config.target = synapse_compute_name
run_config.spark.configuration["spark.driver.memory"] = "1g"
run_config.spark.configuration["spark.driver.cores"] = 2
run_config.spark.configuration["spark.executor.memory"] = "1g"
run_config.spark.configuration["spark.executor.cores"] = 1
run_config.spark.configuration["spark.executor.instances"] = 1
conda_dep = CondaDependencies()
conda_dep.add_pip_package("azureml-core==1.20.0")
run_config.environment.python.conda_dependencies = conda_dep
script_run_config = ScriptRunConfig(source_directory = './code',
script= 'dataprep.py',
arguments = ["--file_input", input2,
"--output_dir", output],
run_config = run_config)
Aby uzyskać więcej informacji na temat run_config.spark.configuration
i ogólnej konfiguracji platformy Spark, zobacz dokumentację dotyczącą klasy SparkConfiguration i konfiguracji platformy Apache Spark.
Po skonfigurowaniu ScriptRunConfig
obiektu można przesłać przebieg.
from azureml.core import Experiment
exp = Experiment(workspace=ws, name="synapse-spark")
run = exp.submit(config=script_run_config)
run
Aby uzyskać więcej informacji, takich jak dataprep.py
skrypt używany w tym przykładzie, zobacz przykładowy notes.
Po przygotowaniu danych możesz użyć ich jako danych wejściowych dla zadań szkoleniowych. W powyższym przykładzie kodu jest to, registered_dataset
co należy określić jako dane wejściowe dla zadań szkoleniowych.
Przykładowe notesy
Zobacz przykładowe notesy, aby uzyskać więcej pojęć i pokazów możliwości integracji Azure Synapse Analytics i Azure Machine Learning.
- Uruchom interaktywną sesję platformy Spark z notesu w obszarze roboczym usługi Azure Machine Learning.
- Prześlij przebieg eksperymentu usługi Azure Machine Learning z pulą platformy Synapse Spark jako obiektem docelowym obliczeń.