Compartilhar via


Como usar o Apache Spark (da plataforma do Azure Synapse Analytics) no pipeline de aprendizado de máquina (preterido)

APLICA-SE A:SDK do Python do Azure MLv1

Aviso

A integração do Azure Synapse Analytics com o Azure Machine Learning, disponível no SDK do Python v1, foi preterida. Os usuários ainda podem usar o workspace do Synapse, registrado no Azure Machine Learning, como um serviço vinculado. No entanto, um novo workspace do Synapse não pode mais ser registrado no Azure Machine Learning como um serviço vinculado. Recomendamos o uso de computação do Spark sem servidor e Pools do Spark do Synapse anexados, disponíveis na CLI v2 e no SDK do Python v2. Para obter mais informações, visite https://aka.ms/aml-spark.

Neste artigo, você aprenderá a usar pools do Apache Spark com tecnologia do Azure Synapse Analytics como o destino de computação para uma etapa de preparação de dados em um pipeline do Azure Machine Learning. Você aprenderá como um único pipeline pode usar recursos de computação adequados para a etapa específica - por exemplo, preparação de dados ou treinamento. Você também aprenderá como os dados são preparados para a etapa do Spark e como eles passam para a próxima etapa.

Pré-requisitos

Você pode criar e administrar os pools do Apache Spark em um espaço de trabalho do Azure Synapse Analytics. Para integrar um pool do Apache Spark com um espaço de trabalho do Azure Machine Learning, você deve vincular ao espaço de trabalho do Azure Synapse Analytics. Depois de vincular seu espaço de trabalho do Azure Machine Learning e seus espaços de trabalho do Azure Synapse Analytics, você pode anexar um pool do Apache Spark com

  • Estúdio do Azure Machine Learning

  • SKD do Python, conforme explicado posteriormente

  • Modelo do Azure Resource Manager (ARM). Para obter mais informações, visite Modelo ARM de exemplo

    • Você pode usar a linha de comando para seguir o modelo ARM, adicionar o serviço vinculado e anexar o pool do Apache Spark com este exemplo de código:
    az deployment group create --name --resource-group <rg_name> --template-file "azuredeploy.json" --parameters @"azuredeploy.parameters.json"
    

Importante

Para vincular com êxito ao workspace do Synapse, você deve receber a função Proprietário do workspace do Synapse. Verifique seu acesso no portal do Azure.

O serviço vinculado obterá uma identidade gerenciada pelo sistema (SAI) no momento da criação. Você deve atribuir a esse SAI de serviço vinculado a função "Administrador do Synapse Apache Spark" no Synapse Studio, para que ele possa enviar o trabalho do Spark (confira Como gerenciar atribuições de funções RBAC do Synapse no Synapse Studio).

Você também deve atribuir ao usuário do espaço de trabalho do Azure Machine Learning a função "Colaborador", no portal do Azure de gerenciamento de recursos.

Este código mostra como recuperar serviços vinculados em seu espaço de trabalho:

from azureml.core import Workspace, LinkedService, SynapseWorkspaceLinkedServiceConfiguration

ws = Workspace.from_config()

for service in LinkedService.list(ws) : 
    print(f"Service: {service}")

# Retrieve a known linked service
linked_service = LinkedService.get(ws, 'synapselink1')

Primeiro, Workspace.from_config() acessa seu espaço de trabalho do Azure Machine Learning com a configuração no arquivo config.json. (Para obter mais informações, visite Criar um arquivo de configuração de espaço de trabalho). Em seguida, o código imprime todos os serviços vinculados disponíveis no espaço de trabalho. Por fim, o LinkedService.get() recupera um serviço vinculado chamado 'synapselink1'.

Anexar o pool do Apache Spark como um destino de computação para o Azure Machine Learning

Para usar o pool de faíscas do Apache para alimentar uma etapa no pipeline de aprendizado de máquina, você deve anexá-lo como um ComputeTarget para a etapa de pipeline, conforme mostrado neste exemplo de código:

from azureml.core.compute import SynapseCompute, ComputeTarget

attach_config = SynapseCompute.attach_configuration(
        linked_service = linked_service,
        type="SynapseSpark",
        pool_name="spark01") # This name comes from your Synapse workspace

synapse_compute=ComputeTarget.attach(
        workspace=ws,
        name='link1-spark01',
        attach_configuration=attach_config)

synapse_compute.wait_for_completion()

O código primeiro configura o SynapseCompute. O argumento linked_service é o objeto LinkedService que você criou ou recuperou na etapa anterior. O argumento type deve ser SynapseSpark. O argumento pool_name em SynapseCompute.attach_configuration() deve corresponder ao de um pool existente no espaço de trabalho do Azure Synapse Analytics. Para obter mais informações sobre a criação de um pool de faíscas Apache no espaço de trabalho do Azure Synapse Analytics, visite Guia de início rápido: criar um pool Apache Spark sem servidor usando o Synapse Studio. O tipo ComputeTargetAttachConfiguration é attach_config.

Após a criação da configuração, crie um ComputeTarget de aprendizado de máquina passando os valores Workspace e ComputeTargetAttachConfiguration e o nome pelo qual você gostaria de se referir à computação dentro do espaço de trabalho de aprendizado de máquina. A chamada para ComputeTarget.attach() é assíncrona, portanto a amostra é bloqueada até que a chamada seja concluída.

Criar um SynapseSparkStep que usa o pool vinculado do Apache Spark

O trabalho do Spark no pool do Apache Spark do notebook de exemplo define um pipeline de machine learning simples. Primeiro, o notebook define uma etapa de preparação de dados da plataforma synapse_compute definida na etapa anterior. Em seguida, o notebook define uma etapa de treinamento fornecida por um destino de computação mais apropriado para treinamento. O bloco de anotações de exemplo usa o banco de dados de sobrevivência do Titanic para mostrar a entrada e a saída de dados. Na verdade, ele não limpa os dados ou faz um modelo preditivo. Como essa amostra não envolve realmente treinamento, a etapa de treinamento usa um recurso de computação barato baseado em CPU.

Os dados fluem para um pipeline de aprendizado de máquina por meio de objetos DatasetConsumptionConfig, que podem conter dados tabulares ou conjuntos de arquivos. Os dados geralmente são provenientes de arquivos no armazenamento de blobs em um armazenamento de dados de espaço de trabalho. Este exemplo de código mostra o código típico que cria entrada para um pipeline de aprendizado de máquina:

from azureml.core import Dataset

datastore = ws.get_default_datastore()
file_name = 'Titanic.csv'

titanic_tabular_dataset = Dataset.Tabular.from_delimited_files(path=[(datastore, file_name)])
step1_input1 = titanic_tabular_dataset.as_named_input("tabular_input")

# Example only: it wouldn't make sense to duplicate input data, especially one as tabular and the other as files
titanic_file_dataset = Dataset.File.from_files(path=[(datastore, file_name)])
step1_input2 = titanic_file_dataset.as_named_input("file_input").as_hdfs()

O exemplo de código pressupõe que o arquivo Titanic.csv está no armazenamento de blob. O código mostra como ler o arquivo como um TabularDataset e como um FileDataset. Esse código é apenas para fins de demonstração, porque se tornaria confuso duplicar entradas ou interpretar uma única fonte de dados como um recurso contendo tabela e estritamente como um arquivo.

Importante

Para usar um FileDataset como entrada, você precisa de uma versão azureml-core de pelo menos 1.20.0. Você pode especificar isso com a classe Environment, conforme mencionado posteriormente. Quando uma etapa é concluída, você pode armazenar os dados de saída, conforme mostrado neste exemplo de código:

from azureml.data import HDFSOutputDatasetConfig
step1_output = HDFSOutputDatasetConfig(destination=(datastore,"test")).register_on_complete(name="registered_dataset")

Neste exemplo de código, o datastore armazenaria os dados em um arquivo chamado test. Os dados estariam disponíveis no espaço de trabalho de aprendizado de máquina como um Dataset, com o nome registered_dataset.

Além dos dados, uma etapa do pipeline pode ter dependências Python por etapa. Além disso, objetos SynapseSparkStep individuais podem especificar sua configuração precisa do Azure Synapse Apache Spark. Para mostrar isso, o exemplo de código a seguir especifica que a versão do pacote azureml-core deve ser, no mínimo, 1.20.0. Como mencionado anteriormente, esse requisito para o pacote azureml-core é necessário para usar um FileDataset como entrada.

from azureml.core.environment import Environment
from azureml.pipeline.steps import SynapseSparkStep

env = Environment(name="myenv")
env.python.conda_dependencies.add_pip_package("azureml-core>=1.20.0")

step_1 = SynapseSparkStep(name = 'synapse-spark',
                          file = 'dataprep.py',
                          source_directory="./code", 
                          inputs=[step1_input1, step1_input2],
                          outputs=[step1_output],
                          arguments = ["--tabular_input", step1_input1, 
                                       "--file_input", step1_input2,
                                       "--output_dir", step1_output],
                          compute_target = 'link1-spark01',
                          driver_memory = "7g",
                          driver_cores = 4,
                          executor_memory = "7g",
                          executor_cores = 2,
                          num_executors = 1,
                          environment = env)

Esse código especifica uma única etapa no pipeline do Azure Machine Learning. O valor environment desse código define uma versão azureml-core específica, e o código pode adicionar outras dependências do conda ou do pip conforme necessário.

O SynapseSparkStep compacta e faz upload do subdiretório ./code a partir do computador local. Esse diretório é recriado no servidor de computação e a etapa executa o script dataprep.py desse diretório. Os inputs e outputs dessa etapa são os objetos step1_input1, step1_input2 e step1_output mencionados anteriormente. A maneira mais fácil de acessar esses valores no script do dataprep.py é associá-los aos arguments nomeados.

O próximo conjunto de argumentos para o construtor SynapseSparkStep controla o Apache Spark. O compute_target é o 'link1-spark01' que anexamos antes como destino de computação. Os outros parâmetros especificam a memória e os núcleos que gostaríamos de usar.

O notebook de amostra usa esse código para dataprep.py:

import os
import sys
import azureml.core
from pyspark.sql import SparkSession
from azureml.core import Run, Dataset

print(azureml.core.VERSION)
print(os.environ)

import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--tabular_input")
parser.add_argument("--file_input")
parser.add_argument("--output_dir")
args = parser.parse_args()

# use dataset sdk to read tabular dataset
run_context = Run.get_context()
dataset = Dataset.get_by_id(run_context.experiment.workspace,id=args.tabular_input)
sdf = dataset.to_spark_dataframe()
sdf.show()

# use hdfs path to read file dataset
spark= SparkSession.builder.getOrCreate()
sdf = spark.read.option("header", "true").csv(args.file_input)
sdf.show()

sdf.coalesce(1).write\
.option("header", "true")\
.mode("append")\
.csv(args.output_dir)

Esse script de "preparação de dados" não faz nenhuma transformação real de dados, mas mostra como recuperar dados, convertê-los em um dataframe do Spark e como fazer algumas manipulações básicas do Apache Spark. Para encontrar a saída do Estúdio do Azure Machine Learning, abra o trabalho filho, escolha a guia Saídas + logs e abra o arquivo logs/azureml/driver/stdout, conforme mostrado nesta captura de tela:

Captura de tela do Studio mostrando a guia stdout do trabalho filho

Usar o SynapseSparkStep em um pipeline

O próximo exemplo usa a saída do SynapseSparkStep criado na seção anterior. Outras etapas no pipeline podem ter seus próprios ambientes exclusivos e podem ser executadas em diferentes recursos de computação apropriados para a tarefa em questão. O notebook de exemplo executa a “etapa de treinamento” em um pequeno cluster de CPU:

from azureml.core.compute import AmlCompute

cpu_cluster_name = "cpucluster"

if cpu_cluster_name in ws.compute_targets:
    cpu_cluster = ComputeTarget(workspace=ws, name=cpu_cluster_name)
    print('Found existing cluster, use it.')
else:
    compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_D2_V2', max_nodes=1)
    cpu_cluster = ComputeTarget.create(ws, cpu_cluster_name, compute_config)
    print('Allocating new CPU compute cluster')

cpu_cluster.wait_for_completion(show_output=True)

step2_input = step1_output.as_input("step2_input").as_download()

step_2 = PythonScriptStep(script_name="train.py",
                          arguments=[step2_input],
                          inputs=[step2_input],
                          compute_target=cpu_cluster_name,
                          source_directory="./code",
                          allow_reuse=False)

Esse código cria o novo recurso de computação, se necessário. Em seguida, ele converte o resultado step1_output em entrada para a etapa de treinamento. A opção as_download() significa que os dados são movidos para o recurso de computação, resultando em acesso mais rápido. Se os dados forem tão grandes que não caberiam no disco rígido de computação local, você deverá usar a opção as_mount() para transmitir os dados com o sistema de arquivos FUSE. O compute_target dessa segunda etapa é 'cpucluster', não o recurso 'link1-spark01' usado na etapa de preparação de dados. Esta etapa usa um script de train.py simples em vez do script dataprep.py usado na etapa anterior. O bloco de anotações de exemplo tem detalhes do script train.py.

Depois de definir todas as etapas, você pode criar e executar o pipeline.

from azureml.pipeline.core import Pipeline

pipeline = Pipeline(workspace=ws, steps=[step_1, step_2])
pipeline_run = pipeline.submit('synapse-pipeline', regenerate_outputs=True)

Esse código cria um pipeline que consiste na etapa de preparação de dados em pools do Apache Spark, da plataforma Azure Synapse Analytics (step_1) e a etapa de treinamento (step_2). O Azure examina as dependências de dados entre as etapas para calcular o gráfico de execução. Nesse caso, há apenas uma dependência direta. Aqui, step2_input requer necessariamente step1_output.

A chamada pipeline.submit cria, se necessário, um Experimento chamado synapse-pipeline e inicia de forma assíncrona um trabalho dentro dele. As etapas individuais dentro do pipeline são executadas como trabalhos filhos desse trabalho principal, e a página Experimentos do Estúdio pode monitorar e revisar essas etapas.

Próximas etapas