Como utilizar o Apache Spark (com tecnologia Azure Synapse Analytics) no seu pipeline de machine learning (preterido)

APLICA-SE A:Python SDK azureml v1

Aviso

A integração do Azure Synapse Analytics com o Azure Machine Learning disponível no SDK Python v1 foi preterida. Os utilizadores podem continuar a utilizar a área de trabalho do Synapse registada no Azure Machine Learning como um serviço associado. No entanto, já não pode ser registada uma nova área de trabalho do Synapse no Azure Machine Learning como um serviço associado. Recomendamos que utilize a computação Synapse Gerida (Automática) e os conjuntos do Synapse Spark anexados disponíveis na CLI v2 e no SDK Python v2. https://aka.ms/aml-spark Consulte para obter mais detalhes.

Neste artigo, irá aprender a utilizar conjuntos do Apache Spark com tecnologia Azure Synapse Analytics como destino de computação para um passo de preparação de dados num pipeline do Azure Machine Learning. Irá aprender como um único pipeline pode utilizar recursos de computação adequados para o passo específico, como preparação ou preparação de dados. Verá como os dados são preparados para o passo do Spark e como são transmitidos para o passo seguinte.

Pré-requisitos

Pode criar e administrar os conjuntos do Apache Spark numa área de trabalho do Azure Synapse Analytics. Para integrar um conjunto do Apache Spark numa área de trabalho do Azure Machine Learning, tem de ligar à área de trabalho do Azure Synapse Analytics.

Assim que a área de trabalho do Azure Machine Learning e as áreas de trabalho do Azure Synapse Analytics estiverem ligadas, pode anexar um conjunto do Apache Spark através de

  • estúdio do Azure Machine Learning

  • SDK python (conforme elaborado abaixo)

  • Modelo de Resource Manager do Azure (ARM) (veja este modelo arm de exemplo).

    • Pode utilizar a linha de comandos para seguir o modelo do ARM, adicionar o serviço ligado e anexar o conjunto do Apache Spark com o seguinte código:
    az deployment group create --name --resource-group <rg_name> --template-file "azuredeploy.json" --parameters @"azuredeploy.parameters.json"
    

Importante

Para ligar à área de trabalho do Azure Synapse Analytics com êxito, tem de ter a função Proprietário no recurso da área de trabalho do Azure Synapse Analytics. Verifique o acesso no portal do Azure.

O serviço ligado obterá uma identidade gerida atribuída pelo sistema (SAI) quando o criar. Tem de atribuir a este serviço de ligação a função "Administrador do Apache Spark do Synapse" a partir de Synapse Studio para que possa submeter a tarefa do Spark (veja Como gerir atribuições de funções RBAC do Synapse no Synapse Studio).

Também tem de atribuir ao utilizador da área de trabalho do Azure Machine Learning a função "Contribuidor" de portal do Azure de gestão de recursos.

Pode obter serviços ligados na área de trabalho com código como:

from azureml.core import Workspace, LinkedService, SynapseWorkspaceLinkedServiceConfiguration

ws = Workspace.from_config()

for service in LinkedService.list(ws) : 
    print(f"Service: {service}")

# Retrieve a known linked service
linked_service = LinkedService.get(ws, 'synapselink1')

Primeiro, Workspace.from_config() acede à área de trabalho do Azure Machine Learning com a configuração no config.json (veja Criar um ficheiro de configuração da área de trabalho). Em seguida, o código imprime todos os serviços ligados disponíveis na Área de Trabalho. Por fim, LinkedService.get() obtém um serviço ligado com o nome 'synapselink1'.

Anexar o conjunto do Apache Spark como destino de computação para o Azure Machine Learning

Para utilizar o conjunto do Apache Spark para alimentar um passo no seu pipeline de machine learning, tem de anexá-lo como um ComputeTarget para o passo do pipeline, conforme mostrado no código seguinte.

from azureml.core.compute import SynapseCompute, ComputeTarget

attach_config = SynapseCompute.attach_configuration(
        linked_service = linked_service,
        type="SynapseSpark",
        pool_name="spark01") # This name comes from your Synapse workspace

synapse_compute=ComputeTarget.attach(
        workspace=ws,
        name='link1-spark01',
        attach_configuration=attach_config)

synapse_compute.wait_for_completion()

O primeiro passo é configurar o SynapseCompute. O linked_service argumento é o LinkedService objeto que criou ou obteve no passo anterior. O type argumento tem de ser SynapseSpark. O pool_name argumento em SynapseCompute.attach_configuration() tem de corresponder ao de um conjunto existente na área de trabalho do Azure Synapse Analytics. Para obter mais informações sobre como criar um conjunto do Apache Spark na área de trabalho do Azure Synapse Analytics, veja Início Rápido: Criar um conjunto do Apache Spark sem servidor com Synapse Studio. O tipo de attach_config é ComputeTargetAttachConfiguration.

Assim que a configuração for criada, irá criar um machine learning ComputeTarget ao transmitir o Workspace, ComputeTargetAttachConfiguratione o nome pelo qual gostaria de fazer referência à computação na área de trabalho de machine learning. A chamada para ComputeTarget.attach() é assíncrona, pelo que o exemplo bloqueia até a chamada ser concluída.

Criar um SynapseSparkStep que utiliza o conjunto do Apache Spark ligado

O trabalho de exemplo do Apache Spark no conjunto do Apache Spark define um pipeline de machine learning simples. Primeiro, o bloco de notas define um passo de preparação de dados com tecnologia do synapse_compute definido no passo anterior. Em seguida, o bloco de notas define um passo de preparação com tecnologia de um destino de computação mais adequado para a preparação. O bloco de notas de exemplo utiliza a base de dados de sobrevivência do Titanic para demonstrar a entrada e saída de dados; Na verdade, não limpa os dados nem cria um modelo preditivo. Uma vez que não existe formação real neste exemplo, o passo de preparação utiliza um recurso de computação barato baseado na CPU.

Os dados fluem para um pipeline de machine learning através de DatasetConsumptionConfig objetos, que podem conter dados tabulares ou conjuntos de ficheiros. Os dados provêm frequentemente de ficheiros no armazenamento de blobs no arquivo de dados de uma área de trabalho. O código seguinte mostra alguns códigos típicos para criar entradas para um pipeline de machine learning:

from azureml.core import Dataset

datastore = ws.get_default_datastore()
file_name = 'Titanic.csv'

titanic_tabular_dataset = Dataset.Tabular.from_delimited_files(path=[(datastore, file_name)])
step1_input1 = titanic_tabular_dataset.as_named_input("tabular_input")

# Example only: it wouldn't make sense to duplicate input data, especially one as tabular and the other as files
titanic_file_dataset = Dataset.File.from_files(path=[(datastore, file_name)])
step1_input2 = titanic_file_dataset.as_named_input("file_input").as_hdfs()

O código acima pressupõe que o ficheiro Titanic.csv está no armazenamento de blobs. O código mostra como ler o ficheiro como e TabularDataset como um FileDataset. Este código destina-se apenas a fins de demonstração, uma vez que seria confuso duplicar entradas ou interpretar uma única origem de dados como um recurso que contém tabelas e como um ficheiro.

Importante

Para utilizar uma FileDataset como entrada, a sua azureml-core versão tem de ser, pelo menos 1.20.0, . Como especificar esta opção com a Environment classe é abordada abaixo.

Quando um passo for concluído, pode optar por armazenar dados de saída com código semelhante a:

from azureml.data import HDFSOutputDatasetConfig
step1_output = HDFSOutputDatasetConfig(destination=(datastore,"test")).register_on_complete(name="registered_dataset")

Neste caso, os dados seriam armazenados num datastore ficheiro chamado test e estariam disponíveis na área de trabalho de machine learning como um Dataset com o nome registered_dataset.

Além dos dados, um passo de pipeline pode ter dependências python por passo. Os objetos individuais SynapseSparkStep também podem especificar a respetiva Azure Synapse configuração do Apache Spark. Isto é apresentado no seguinte código, que especifica que a versão do azureml-core pacote tem de ser, pelo menos 1.20.0, . (Conforme mencionado anteriormente, este requisito para azureml-core é necessário para utilizar um FileDataset como entrada.)

from azureml.core.environment import Environment
from azureml.pipeline.steps import SynapseSparkStep

env = Environment(name="myenv")
env.python.conda_dependencies.add_pip_package("azureml-core>=1.20.0")

step_1 = SynapseSparkStep(name = 'synapse-spark',
                          file = 'dataprep.py',
                          source_directory="./code", 
                          inputs=[step1_input1, step1_input2],
                          outputs=[step1_output],
                          arguments = ["--tabular_input", step1_input1, 
                                       "--file_input", step1_input2,
                                       "--output_dir", step1_output],
                          compute_target = 'link1-spark01',
                          driver_memory = "7g",
                          driver_cores = 4,
                          executor_memory = "7g",
                          executor_cores = 2,
                          num_executors = 1,
                          environment = env)

O código acima especifica um único passo no pipeline de machine learning do Azure. Este passo environment especifica uma versão específica azureml-core e pode adicionar outras dependências conda ou pip conforme necessário.

O SynapseSparkStep irá zipar e carregar a partir do computador local o subdiretório ./code. Esse diretório será recriado no servidor de computação e o passo irá executar o ficheiro dataprep.py a partir desse diretório. Os inputs e outputs desse passo são os step1_input1objetos , step1_input2e step1_output anteriormente discutidos. A forma mais fácil de aceder a esses valores no dataprep.py script é associá-los ao nome arguments.

O próximo conjunto de argumentos para o controlo construtor do SynapseSparkStep Apache Spark. É compute_target o 'link1-spark01' que anexamos anteriormente como um destino de computação. Os outros parâmetros especificam a memória e os núcleos que gostaríamos de utilizar.

O bloco de notas de exemplo utiliza o seguinte código para dataprep.py:

import os
import sys
import azureml.core
from pyspark.sql import SparkSession
from azureml.core import Run, Dataset

print(azureml.core.VERSION)
print(os.environ)

import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--tabular_input")
parser.add_argument("--file_input")
parser.add_argument("--output_dir")
args = parser.parse_args()

# use dataset sdk to read tabular dataset
run_context = Run.get_context()
dataset = Dataset.get_by_id(run_context.experiment.workspace,id=args.tabular_input)
sdf = dataset.to_spark_dataframe()
sdf.show()

# use hdfs path to read file dataset
spark= SparkSession.builder.getOrCreate()
sdf = spark.read.option("header", "true").csv(args.file_input)
sdf.show()

sdf.coalesce(1).write\
.option("header", "true")\
.mode("append")\
.csv(args.output_dir)

Este script de "preparação de dados" não faz nenhuma transformação de dados real, mas ilustra como obter dados, convertê-lo num dataframe do Spark e como fazer alguma manipulação básica do Apache Spark. Pode encontrar a saída no Azure Machine Learning Studio ao abrir a tarefa subordinada, ao selecionar o separador Saídas + registos e ao abrir o logs/azureml/driver/stdout ficheiro, conforme mostrado na figura seguinte.

Captura de ecrã do Studio a mostrar o separador stdout da tarefa subordinada

Utilizar o SynapseSparkStep num pipeline

O exemplo seguinte utiliza a saída da SynapseSparkStep criada na secção anterior. Outros passos no pipeline podem ter os seus próprios ambientes exclusivos e executar em diferentes recursos de computação adequados à tarefa em questão. O bloco de notas de exemplo executa o "passo de preparação" num pequeno cluster da CPU:

from azureml.core.compute import AmlCompute

cpu_cluster_name = "cpucluster"

if cpu_cluster_name in ws.compute_targets:
    cpu_cluster = ComputeTarget(workspace=ws, name=cpu_cluster_name)
    print('Found existing cluster, use it.')
else:
    compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_D2_V2', max_nodes=1)
    cpu_cluster = ComputeTarget.create(ws, cpu_cluster_name, compute_config)
    print('Allocating new CPU compute cluster')

cpu_cluster.wait_for_completion(show_output=True)

step2_input = step1_output.as_input("step2_input").as_download()

step_2 = PythonScriptStep(script_name="train.py",
                          arguments=[step2_input],
                          inputs=[step2_input],
                          compute_target=cpu_cluster_name,
                          source_directory="./code",
                          allow_reuse=False)

O código acima cria o novo recurso de computação, se necessário. Em seguida, o step1_output resultado é convertido em entrada para o passo de preparação. A as_download() opção significa que os dados serão movidos para o recurso de computação, resultando num acesso mais rápido. Se os dados fossem tão grandes que não caberiam no disco rígido de computação local, utilizaria a opção as_mount() para transmitir os dados em fluxo através do sistema de ficheiros FUSE. O compute_target segundo passo é 'cpucluster', não o 'link1-spark01' recurso que utilizou no passo de preparação de dados. Este passo utiliza um programa train.py simples em vez do dataprep.py que utilizou no passo anterior. Pode ver os detalhes do no bloco de notas de train.py exemplo.

Depois de definir todos os passos, pode criar e executar o pipeline.

from azureml.pipeline.core import Pipeline

pipeline = Pipeline(workspace=ws, steps=[step_1, step_2])
pipeline_run = pipeline.submit('synapse-pipeline', regenerate_outputs=True)

O código acima cria um pipeline que consiste no passo de preparação de dados em conjuntos do Apache Spark com tecnologia Azure Synapse Analytics (step_1) e o passo de preparação (step_2). O Azure calcula o gráfico de execução ao examinar as dependências de dados entre os passos. Neste caso, existe apenas uma dependência simples que step2_input requer step1_outputnecessariamente .

A chamada para pipeline.submit criar, se necessário, uma Experimentação chamada synapse-pipeline e inicia assíncronamente uma Tarefa no mesmo. Os passos individuais no pipeline são executados como Tarefas Subordinadas desta tarefa principal e podem ser monitorizados e revistos na página Experimentações do Studio.

Passos seguintes