Uzdatnianie danych za pomocą pul platformy Apache Spark (przestarzałe)

DOTYCZY:Zestaw SDK języka Python w wersji 1

Ostrzeżenie

Integracja usługi Azure Synapse Analytics z usługą Azure Machine Learning dostępna w zestawie SDK języka Python w wersji 1 jest przestarzała. Użytkownicy mogą nadal korzystać z obszaru roboczego usługi Synapse zarejestrowanego w usłudze Azure Machine Learning jako połączonej usługi. Nowy obszar roboczy usługi Synapse nie może być już jednak zarejestrowany w usłudze Azure Machine Learning jako połączona usługa. Zalecamy używanie zarządzanych (automatycznych) zasobów obliczeniowych usługi Synapse i dołączonych pul platformy Spark synapse dostępnych w interfejsie wiersza polecenia w wersji 2 i zestawu Python SDK w wersji 2. Aby uzyskać więcej informacji, zobacz https://aka.ms/aml-spark .

Z tego artykułu dowiesz się, jak interaktywnie wykonywać zadania uzdatniania danych w ramach dedykowanej sesji usługi Synapse obsługiwanej przez usługę Azure Synapse Analytics w notesie Jupyter przy użyciu zestawu SDK języka Python usługi Azure Machine Learning.

Jeśli wolisz używać potoków usługi Azure Machine Learning, zobacz Jak używać platformy Apache Spark (obsługiwanej przez usługę Azure Synapse Analytics) w potoku uczenia maszynowego (wersja zapoznawcza).

Aby uzyskać wskazówki dotyczące korzystania z usługi Azure Synapse Analytics w obszarze roboczym usługi Synapse, zobacz serię wprowadzenie do usługi Azure Synapse Analytics.

Integracja usług Azure Machine Learning i Azure Synapse Analytics

Integracja usługi Azure Synapse Analytics z usługą Azure Machine Learning (wersja zapoznawcza) umożliwia dołączenie puli platformy Apache Spark wspieranej przez Azure Synapse na potrzeby interaktywnej eksploracji i przygotowywania danych. Dzięki tej integracji możesz mieć dedykowane zasoby obliczeniowe na potrzeby uzdatniania danych na dużą skalę, a wszystko to w tym samym notesie języka Python używanym do trenowania modeli uczenia maszynowego.

Wymagania wstępne

Uruchamianie puli usługi Synapse Spark na potrzeby zadań uzdatniania danych

Aby rozpocząć przygotowywanie danych za pomocą puli apache Spark, określ dołączoną nazwę obliczeniową spark Synapse. Tę nazwę można znaleźć za pośrednictwem Azure Machine Learning studio na karcie Dołączone obliczenia.

pobierz dołączoną nazwę obliczeniową

Ważne

Aby nadal korzystać z puli platformy Apache Spark, musisz wskazać, który zasób obliczeniowy ma być używany w ramach zadań uzdatniania danych dla %synapse pojedynczych wierszy kodu i %%synapse dla wielu wierszy. Dowiedz się więcej o poleceniu magic aplikacji %synapse.

%synapse start -c SynapseSparkPoolAlias

Po rozpoczęciu sesji można sprawdzić metadane sesji.

%synapse meta

Możesz określić środowisko usługi Azure Machine Learning do użycia podczas sesji platformy Apache Spark. Zostaną zastosowane tylko zależności Conda określone w środowisku. Obraz platformy Docker nie jest obsługiwany.

Ostrzeżenie

Zależności języka Python określone w zależnościach środowiska Conda nie są obsługiwane w pulach platformy Apache Spark. Obecnie obsługiwane są tylko stałe wersje języka Python. Sprawdź wersję języka Python, dołączając sys.version_info go do skryptu.

Poniższy kod tworzy środowisko , myenvktóre instaluje azureml-core wersję 1.20.0 i numpy 1.17.0 przed rozpoczęciem sesji. Następnie możesz uwzględnić to środowisko w instrukcji sesji platformy start Apache Spark.


from azureml.core import Workspace, Environment

# creates environment with numpy and azureml-core dependencies
ws = Workspace.from_config()
env = Environment(name="myenv")
env.python.conda_dependencies.add_pip_package("azureml-core==1.20.0")
env.python.conda_dependencies.add_conda_package("numpy==1.17.0")
env.register(workspace=ws)

Aby rozpocząć przygotowywanie danych za pomocą puli Apache Spark i środowiska niestandardowego, określ nazwę puli platformy Apache Spark i środowisko, które ma być używane podczas sesji platformy Apache Spark. Ponadto możesz podać identyfikator subskrypcji, grupę zasobów obszaru roboczego uczenia maszynowego i nazwę obszaru roboczego uczenia maszynowego.

Ważne

Upewnij się, że pozycja Zezwalaj na pakiety na poziomie sesji jest włączona w połączonym obszarze roboczym usługi Synapse.

włączanie pakietów na poziomie sesji

%synapse start -c SynapseSparkPoolAlias -e myenv -s AzureMLworkspaceSubscriptionID -r AzureMLworkspaceResourceGroupName -w AzureMLworkspaceName

Ładowanie danych z magazynu

Po rozpoczęciu sesji platformy Apache Spark przeczytaj dane, które chcesz przygotować. Ładowanie danych jest obsługiwane w przypadku usługi Azure Blob Storage i Azure Data Lake Storage generacji 1 i 2.

Istnieją dwa sposoby ładowania danych z tych usług magazynu:

Aby uzyskać dostęp do tych usług magazynu, musisz mieć uprawnienia Czytelnik danych obiektów blob usługi Storage . Jeśli planujesz zapisywanie danych z powrotem do tych usług magazynu, potrzebujesz uprawnień Współautor danych obiektu blob usługi Storage . Dowiedz się więcej o uprawnieniach i rolach magazynu.

Ładowanie danych za pomocą ścieżki rozproszonego systemu plików Hadoop (HDFS)

Aby załadować i odczytać dane z magazynu przy użyciu odpowiedniej ścieżki systemu plików HDFS, musisz mieć łatwo dostępne poświadczenia uwierzytelniania dostępu do danych. Te poświadczenia różnią się w zależności od typu magazynu.

Poniższy kod pokazuje, jak odczytywać dane z usługi Azure Blob Storage do ramki danych platformy Spark przy użyciu tokenu sygnatury dostępu współdzielonego (SAS) lub klucza dostępu.

%%synapse

# setup access key or SAS token
sc._jsc.hadoopConfiguration().set("fs.azure.account.key.<storage account name>.blob.core.windows.net", "<access key>")
sc._jsc.hadoopConfiguration().set("fs.azure.sas.<container name>.<storage account name>.blob.core.windows.net", "<sas token>")

# read from blob 
df = spark.read.option("header", "true").csv("wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv")

Poniższy kod pokazuje, jak odczytywać dane z Azure Data Lake Storage generacji 1 (ADLS Gen 1) przy użyciu poświadczeń jednostki usługi.

%%synapse

# setup service principal which has access of the data
sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.access.token.provider.type","ClientCredential")

sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.client.id", "<client id>")

sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.credential", "<client secret>")

sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.refresh.url",
"https://login.microsoftonline.com/<tenant id>/oauth2/token")

df = spark.read.csv("adl://<storage account name>.azuredatalakestore.net/<path>")

Poniższy kod pokazuje, jak odczytywać dane z Azure Data Lake Storage generacji 2 (ADLS Gen 2) przy użyciu poświadczeń jednostki usługi.

%%synapse

# setup service principal which has access of the data
sc._jsc.hadoopConfiguration().set("fs.azure.account.auth.type.<storage account name>.dfs.core.windows.net","OAuth")
sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth.provider.type.<storage account name>.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth2.client.id.<storage account name>.dfs.core.windows.net", "<client id>")
sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth2.client.secret.<storage account name>.dfs.core.windows.net", "<client secret>")
sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth2.client.endpoint.<storage account name>.dfs.core.windows.net",
"https://login.microsoftonline.com/<tenant id>/oauth2/token")

df = spark.read.csv("abfss://<container name>@<storage account>.dfs.core.windows.net/<path>")

Odczytywanie danych z zarejestrowanych zestawów danych

Możesz również pobrać istniejący zarejestrowany zestaw danych w obszarze roboczym i wykonać na nim przygotowywanie danych, konwertując go na ramkę danych platformy Spark.

Poniższy przykład uwierzytelnia się w obszarze roboczym, pobiera zarejestrowany element TabularDataset, blob_dset, który odwołuje się do plików w magazynie obiektów blob i konwertuje go na ramkę danych platformy Spark. Podczas konwertowania zestawów danych na ramkę danych platformy Spark możesz użyć pyspark bibliotek eksploracji i przygotowywania danych.

%%synapse

from azureml.core import Workspace, Dataset

subscription_id = "<enter your subscription ID>"
resource_group = "<enter your resource group>"
workspace_name = "<enter your workspace name>"

ws = Workspace(workspace_name = workspace_name,
               subscription_id = subscription_id,
               resource_group = resource_group)

dset = Dataset.get_by_name(ws, "blob_dset")
spark_df = dset.to_spark_dataframe()

Wykonywanie zadań uzdatniania danych

Po pobraniu i eksplorowaniu danych możesz wykonywać zadania uzdatniania danych.

Poniższy kod rozwija przykład systemu plików HDFS w poprzedniej sekcji i filtruje dane w ramce danych platformy Spark, dfna podstawie kolumny Survivor i grup, które są wyświetlane według wieku

%%synapse

from pyspark.sql.functions import col, desc

df.filter(col('Survived') == 1).groupBy('Age').count().orderBy(desc('count')).show(10)

df.show()

Zapisywanie danych w magazynie i zatrzymywanie sesji platformy Spark

Po zakończeniu eksploracji i przygotowania danych zapisz przygotowane dane do późniejszego użycia na koncie magazynu na platformie Azure.

W poniższym przykładzie przygotowane dane są zapisywane z powrotem w usłudze Azure Blob Storage i zastępują oryginalny Titanic.csv plik w training_data katalogu. Aby zapisywać z powrotem do magazynu, musisz mieć uprawnienia współautora danych obiektu blob usługi Storage . Dowiedz się więcej o uprawnieniach i rolach magazynu.

%% synapse

df.write.format("csv").mode("overwrite").save("wasbs://demo@dprepdata.blob.core.windows.net/training_data/Titanic.csv")

Po zakończeniu przygotowywania danych i zapisaniu przygotowanych danych do magazynu zatrzymaj korzystanie z puli platformy Apache Spark za pomocą następującego polecenia.

%synapse stop

Tworzenie zestawu danych do reprezentowania przygotowanych danych

Gdy wszystko będzie gotowe do użycia przygotowanych danych do trenowania modelu, połącz się z magazynem danych usługi Azure Machine Learning i określ pliki, których chcesz używać z zestawem danych usługi Azure Machine Learning.

Poniższy przykładowy kod

  • Założono, że utworzono już magazyn danych, który łączy się z usługą magazynu, w której zapisano przygotowane dane.
  • Pobiera ten istniejący magazyn danych z mydatastoreobszaru roboczego ws za pomocą metody get().
  • Tworzy zestaw plików FileDataset, train_dsktóry odwołuje się do przygotowanych plików danych znajdujących się w katalogu w mydatastorekatalogu training_data .
  • Tworzy zmienną input1, która może być używana później, aby pliki train_ds danych zestawu danych były dostępne dla docelowego obiektu obliczeniowego dla zadań szkoleniowych.
from azureml.core import Datastore, Dataset

datastore = Datastore.get(ws, datastore_name='mydatastore')

datastore_paths = [(datastore, '/training_data/')]
train_ds = Dataset.File.from_files(path=datastore_paths, validate=True)
input1 = train_ds.as_mount()

Używanie elementu do ScriptRunConfig przesyłania przebiegu eksperymentu do puli usługi Synapse Spark

Jeśli wszystko jest gotowe do automatyzacji i produkcji zadań uzdatniania danych, możesz przesłać przebieg eksperymentu do dołączonej puli usługi Synapse Spark za pomocą obiektu ScriptRunConfig .

Podobnie, jeśli masz potok usługi Azure Machine Learning, możesz użyć elementu SynapseSparkStep, aby określić pulę usługi Synapse Spark jako docelowy obiekt obliczeniowy dla kroku przygotowywania danych w potoku.

Udostępnianie danych w puli Usługi Synapse Spark zależy od typu zestawu danych.

  • W przypadku zestawu FileDataset można użyć as_hdfs() metody . Po przesłaniu przebiegu zestaw danych zostanie udostępniony puli Synapse Spark jako rozproszony system plików Hadoop (HFDS).
  • W przypadku elementu TabularDataset można użyć as_named_input() metody .

Poniższy kod,

  • Tworzy zmienną input2 na podstawie zestawu FileDataset train_ds , który został utworzony w poprzednim przykładzie kodu.
  • Tworzy zmienną output za pomocą klasy HDFSOutputDatasetConfiguration. Po zakończeniu przebiegu ta klasa umożliwia zapisanie danych wyjściowych przebiegu jako zestawu danych test w magazynie mydatastoredanych . W obszarze roboczym test usługi Azure Machine Learning zestaw danych jest zarejestrowany pod nazwą registered_dataset.
  • Konfiguruje ustawienia, których uruchomienie powinno być używane do wykonywania w puli usługi Synapse Spark.
  • Definiuje parametry ScriptRunConfig do,
    • Użyj elementu dataprep.py, dla przebiegu.
    • Określ dane, które mają być używane jako dane wejściowe i jak udostępnić je w puli usługi Synapse Spark.
    • Określ miejsce przechowywania danych wyjściowych, output.
from azureml.core import Dataset, HDFSOutputDatasetConfig
from azureml.core.environment import CondaDependencies
from azureml.core import RunConfiguration
from azureml.core import ScriptRunConfig 
from azureml.core import Experiment

input2 = train_ds.as_hdfs()
output = HDFSOutputDatasetConfig(destination=(datastore, "test").register_on_complete(name="registered_dataset")

run_config = RunConfiguration(framework="pyspark")
run_config.target = synapse_compute_name

run_config.spark.configuration["spark.driver.memory"] = "1g" 
run_config.spark.configuration["spark.driver.cores"] = 2 
run_config.spark.configuration["spark.executor.memory"] = "1g" 
run_config.spark.configuration["spark.executor.cores"] = 1 
run_config.spark.configuration["spark.executor.instances"] = 1 

conda_dep = CondaDependencies()
conda_dep.add_pip_package("azureml-core==1.20.0")

run_config.environment.python.conda_dependencies = conda_dep

script_run_config = ScriptRunConfig(source_directory = './code',
                                    script= 'dataprep.py',
                                    arguments = ["--file_input", input2,
                                                 "--output_dir", output],
                                    run_config = run_config)

Aby uzyskać więcej informacji na temat run_config.spark.configuration i ogólnej konfiguracji platformy Spark, zobacz dokumentację dotyczącą klasy SparkConfiguration i konfiguracji platformy Apache Spark.

Po skonfigurowaniu ScriptRunConfig obiektu można przesłać przebieg.

from azureml.core import Experiment 

exp = Experiment(workspace=ws, name="synapse-spark") 
run = exp.submit(config=script_run_config) 
run

Aby uzyskać więcej informacji, takich jak dataprep.py skrypt używany w tym przykładzie, zobacz przykładowy notes.

Po przygotowaniu danych możesz użyć ich jako danych wejściowych dla zadań szkoleniowych. W powyższym przykładzie kodu jest to, registered_dataset co należy określić jako dane wejściowe dla zadań szkoleniowych.

Przykładowe notesy

Zobacz przykładowe notesy, aby uzyskać więcej pojęć i pokazów możliwości integracji Azure Synapse Analytics i Azure Machine Learning.

Następne kroki