Estruturação de Dados Interativa com Apache Spark no Azure Machine Learning

A estruturação de dados se torna uma das etapas mais importantes em projetos de machine learning. A integração do Azure Machine Learning com o Azure Synapse Analytics fornece acesso a um pool do Apache Spark — com o apoio do Azure Synapse — para a estruturação de dados interativa usando Notebooks do Azure Machine Learning.

Neste artigo, você aprenderá a fazer a estruturação de dados usando

  • Computação do Spark sem servidor
  • Pool do Spark do Synapse anexado

Pré-requisitos

Antes de iniciar suas tarefas de estruturação de dados, saiba mais sobre o processo de armazenamento de segredos

  • Chave de acesso da conta de armazenamento de blobs do Azure
  • Um token SAS (Assinatura de Acesso Compartilhado)
  • Informações da entidade de serviço do ADLS (Azure Data Lake Storage) Gen 2

no Azure Key Vault. Você também precisa saber como lidar com atribuições de função nas contas de armazenamento do Azure. As seções a seguir analisam esses conceitos. Depois, veremos os detalhes da estruturação de dados interativa usando os pools do Spark nos Notebooks do Azure Machine Learning.

Dica

Para saber mais sobre a configuração de atribuição de função da conta de armazenamento do Azure ou se você acessa dados em suas contas de armazenamento usando a passagem de identidade de usuário, confira Adicionar atribuições de função em contas de armazenamento do Azure.

Estruturação de dados interativa com o Apache Spark

O Azure Machine Learning oferece computação do Spark sem servidor e um pool do Spark do Synapse anexado para uma estruturação de dados interativa com o Apache Spark nos Notebooks do Azure Machine Learning. A computação do Spark sem servidor não requer a criação de recursos no espaço de trabalho do Azure Synapse. Em vez disso, uma computação do Spark sem servidor totalmente gerenciada se torna disponível diretamente nos Notebooks do Azure Machine Learning. O uso de uma computação do Spark sem servidor é a abordagem mais fácil possível para acessar um cluster do Spark no Azure Machine Learning.

Computação do Spark sem servidor nos Notebooks do Azure Machine Learning

Uma computação do Spark sem servidor está disponível por padrão nos Notebooks de Machine Learning do Azure. Para acessá-la em um notebook, selecioneComputação do Spark Sem Servidor na guia Spark Sem Servidor do Azure Machine Learning do menu de seleção de Computação.

A interface de usuário do Notebooks também fornece opções de configuração de sessão do Spark para a computação do Spark sem servidor. Para configurar uma sessão do Spark:

  1. Selecione Configurar sessão na parte superior da tela.
  2. Selecione uma versão do Apache Spark no menu suspenso.

    Importante

    Runtime do Azure Synapse para o Apache Spark: Comunicados

    • Runtime do Azure Synapse para Apache Spark 3.2:
      • Data do Comunicado EOLA: 8 de julho de 2023
      • Data do Término do Suporte: 8 de julho de 2024. Após essa data, o runtime será desabilitado.
    • Para obter suporte contínuo e desempenho ideal, recomendamos a migração para o Apache Spark 3.3.
  3. Selecione Tipo de instância no menu suspenso. No momento, há suporte para os seguintes tipos de instância:
    • Standard_E4s_v3
    • Standard_E8s_v3
    • Standard_E16s_v3
    • Standard_E32s_v3
    • Standard_E64s_v3
  4. Insira um valor de Tempo limite da Sessão do Spark, em minutos.
  5. Selecione se deseja Alocar executores dinamicamente
  6. Selecione o número de Executores para a sessão do Spark.
  7. Selecione o Tamanho do executor no menu suspenso.
  8. Selecione o Tamanho do driver no menu suspenso.
  9. Para usar um arquivo Conda para configurar uma sessão do Spark, marque a caixa de seleção Carregar arquivo Conda. Em seguida, selecione Procurar e escolha o arquivo Conda com a configuração da sessão do Spark desejada.
  10. Adicione as propriedades de Definições de configuração, valores de entrada nas caixas de texto Propriedade e Valor e, em seguida, selecione Adicionar.
  11. Escolha Aplicar.
  12. Selecione Parar sessão no pop-up Configurar nova sessão?.

As alterações de configuração da sessão persistem e se tornam disponíveis para uma outra sessão de notebook que for iniciada usando a computação do Spark sem servidor.

Dica

Se você usar pacotes Conda no nível da sessão, poderá melhorar o tempo de inicialização a frio na sessão do Spark se definir a variável de configuração spark.hadoop.aml.enable_cache como true. Uma inicialização a frio da sessão com pacotes Conda de nível de sessão normalmente leva de 10 a 15 minutos quando a sessão é iniciada pela primeira vez. No entanto, a inicialização a frio da sessão subsequente com a variável de configuração definida como true normalmente leva de três a cinco minutos.

Importar e reorganizar os dados do ADLS (Azure Data Lake Storage) Gen 2

Você pode acessar e estruturar dados armazenados em contas de armazenamento do ADLS (Azure Data Lake Storage) Gen 2 com abfss:// URIs de dados seguindo um dos dois mecanismos de acesso a dados:

  • Passagem de identidade do usuário
  • Acesso aos dados com base na entidade de serviço

Dica

A organização dos dados com uma computação do Spark sem servidor e a passagem da identidade do usuário para acessar os dados em uma conta de armazenamento do Azure Data Lake Storage (ADLS) Gen 2 requerem o menor número de etapas de configuração.

Para iniciar a estruturação interativa de dados com a passagem de identidade do usuário:

  • Verifique se a identidade do usuário tem atribuições de função de Colaborador e Colaborador de Dados do Blob de Armazenamento na conta de armazenamento do ADLS (Azure Data Lake Storage) Gen 2.

  • Para usar a computação do Spark sem servidor, selecione Computação do Spark Sem Servidor na guia Spark Sem Servidor do Azure Machine Learning do menu de seleção Computação.

  • Para usar um pool do Spark do Synapse anexado, selecione um pool do Spark do Synapse anexado na guia Pools do Spark do Synapse do menu de seleção de Computação.

  • Este exemplo de código de estruturação de dados Titanic mostra o uso de um URI de dados no formato abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/<PATH_TO_DATA> com pyspark.pandas e pyspark.ml.feature.Imputer.

    import pyspark.pandas as pd
    from pyspark.ml.feature import Imputer
    
    df = pd.read_csv(
        "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/data/titanic.csv",
        index_col="PassengerId",
    )
    imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy(
        "mean"
    )  # Replace missing values in Age column with the mean value
    df.fillna(
        value={"Cabin": "None"}, inplace=True
    )  # Fill Cabin column with value "None" if missing
    df.dropna(inplace=True)  # Drop the rows which still have any missing value
    df.to_csv(
        "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/data/wrangled",
        index_col="PassengerId",
    )
    

    Observação

    Este exemplo de código Python usa pyspark.pandas. Somente a versão 3.2 do runtime do Spark ou posterior dá suporte a isso.

Para estruturar os dados por meio de acesso por uma entidade de serviço:

  1. Verifique se a entidade de serviço tem atribuições de função de Colaborador e Colaborador de Dados do Blob de Armazenamento na conta de armazenamento do ADLS (Azure Data Lake Storage) Gen 2.

  2. Crie segredos do Azure Key Vault para a ID do locatário da entidade de serviço, a ID do cliente e os valores do segredo do cliente.

  3. Selecione Computação do Spark Sem Servidor na guia Spark Sem Servidor do Azure Machine Learning do menu de seleção Computação, ou selecione um pool do Spark do Synapse anexado na guia Pools do Spark do Synapse do menu de seleção Computação.

  4. Para definir a ID do locatário da entidade de serviço, a ID do cliente e o segredo do cliente na configuração, execute a amostra de código a seguir.

    • A chamada get_secret() no código depende do nome do Azure Key Vault e dos nomes dos segredos do Azure Key Vault criados para a ID do locatário da entidade de serviço, a ID do cliente e o segredo do cliente. Defina estes valores/nome da propriedade correspondente na configuração:

      • Propriedade da ID do cliente: fs.azure.account.oauth2.client.id.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net
      • Propriedade do segredo do cliente: fs.azure.account.oauth2.client.secret.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net
      • Propriedade da ID do locatário: fs.azure.account.oauth2.client.endpoint.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net
      • Valor da ID do locatário: https://login.microsoftonline.com/<TENANT_ID>/oauth2/token
      from pyspark.sql import SparkSession
      
      sc = SparkSession.builder.getOrCreate()
      token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
      
      # Set up service principal tenant ID, client ID and secret from Azure Key Vault
      client_id = token_library.getSecret("<KEY_VAULT_NAME>", "<CLIENT_ID_SECRET_NAME>")
      tenant_id = token_library.getSecret("<KEY_VAULT_NAME>", "<TENANT_ID_SECRET_NAME>")
      client_secret = token_library.getSecret("<KEY_VAULT_NAME>", "<CLIENT_SECRET_NAME>")
      
      # Set up service principal which has access of the data
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.auth.type.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net", "OAuth"
      )
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.oauth.provider.type.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
          "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
      )
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.oauth2.client.id.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
          client_id,
      )
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.oauth2.client.secret.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
          client_secret,
      )
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.oauth2.client.endpoint.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
          "https://login.microsoftonline.com/" + tenant_id + "/oauth2/token",
      )
      
  5. Importe e estruture os dados usando o URI de dados no formato abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/<PATH_TO_DATA>, conforme mostrado na amostra de código usando os dados do Titanic.

Importar e estruturar os dados do Armazenamento de Blobs do Azure

Você pode acessar os dados do Armazenamento de Blobs do Azure com a chave de acesso da conta de armazenamento ou um token SAS (assinatura de acesso compartilhado). Você deve armazenar essas credenciais no Azure Key Vault como um segredo e defini-las como propriedades na configuração da sessão.

Para iniciar a estruturação interativa de dados:

  1. No painel esquerdo do Estúdio do Azure Machine Learning, selecione Notebooks.

  2. Selecione Computação do Spark Sem Servidor na guia Spark Sem Servidor do Azure Machine Learning do menu de seleção Computação, ou selecione um pool do Spark do Synapse anexado na guia Pools do Spark do Synapse do menu de seleção Computação.

  3. Para configurar a chave de acesso da conta de armazenamento ou um token SAS (assinatura de acesso compartilhado) para acesso a dados no Azure Machine Learning Notebooks:

    • Para a chave de acesso, defina a propriedade fs.azure.account.key.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net conforme mostrado neste snippet de código:

      from pyspark.sql import SparkSession
      
      sc = SparkSession.builder.getOrCreate()
      token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
      access_key = token_library.getSecret("<KEY_VAULT_NAME>", "<ACCESS_KEY_SECRET_NAME>")
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.key.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net", access_key
      )
      
    • Para o token SAS, defina a propriedade fs.azure.sas.<BLOB_CONTAINER_NAME>.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net conforme mostrado neste snippet de código:

      from pyspark.sql import SparkSession
      
      sc = SparkSession.builder.getOrCreate()
      token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
      sas_token = token_library.getSecret("<KEY_VAULT_NAME>", "<SAS_TOKEN_SECRET_NAME>")
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.sas.<BLOB_CONTAINER_NAME>.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net",
          sas_token,
      )
      

      Observação

      As chamadas get_secret() nos snippets de código acima exigem o nome do Azure Key Vault e os nomes dos segredos criados para a chave de acesso ou o token SAS da conta de Armazenamento de Blobs do Azure

  4. Execute o código de estruturação de dados no mesmo notebook. Formatar o URI de dados como wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/<PATH_TO_DATA>, semelhante ao que este snippet de código exibe:

    import pyspark.pandas as pd
    from pyspark.ml.feature import Imputer
    
    df = pd.read_csv(
        "wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/data/titanic.csv",
        index_col="PassengerId",
    )
    imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy(
        "mean"
    )  # Replace missing values in Age column with the mean value
    df.fillna(
        value={"Cabin": "None"}, inplace=True
    )  # Fill Cabin column with value "None" if missing
    df.dropna(inplace=True)  # Drop the rows which still have any missing value
    df.to_csv(
        "wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/data/wrangled",
        index_col="PassengerId",
    )
    

    Observação

    Este exemplo de código Python usa pyspark.pandas. Somente a versão 3.2 do runtime do Spark ou posterior dá suporte a isso.

Importar e estruturar os dados do armazenamento de dados do Azure Machine Learning

Para acessar dados do Armazenamento de dados do Azure Machine Learning, defina um caminho para os dados no armazenamento de dados com o formato URIazureml://datastores/<DATASTORE_NAME>/paths/<PATH_TO_DATA>. Para estruturar os dados de um Armazenamento de dados do Azure Machine Learning em uma sessão do Notebooks interativamente:

  1. Selecione Computação do Spark Sem Servidor na guia Spark Sem Servidor do Azure Machine Learning do menu de seleção Computação, ou selecione um pool do Spark do Synapse anexado na guia Pools do Spark do Synapse do menu de seleção Computação.

  2. Este exemplo de código mostra como ler e estruturar os dados Titanic de um armazenamento de dados do Azure Machine Learning, usando o URI do armazenamento de dados azureml://, pyspark.pandas e pyspark.ml.feature.Imputer.

    import pyspark.pandas as pd
    from pyspark.ml.feature import Imputer
    
    df = pd.read_csv(
        "azureml://datastores/workspaceblobstore/paths/data/titanic.csv",
        index_col="PassengerId",
    )
    imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy(
        "mean"
    )  # Replace missing values in Age column with the mean value
    df.fillna(
        value={"Cabin": "None"}, inplace=True
    )  # Fill Cabin column with value "None" if missing
    df.dropna(inplace=True)  # Drop the rows which still have any missing value
    df.to_csv(
        "azureml://datastores/workspaceblobstore/paths/data/wrangled",
        index_col="PassengerId",
    )
    

    Observação

    Este exemplo de código Python usa pyspark.pandas. Somente a versão 3.2 do runtime do Spark ou posterior dá suporte a isso.

Os armazenamentos de dados do Azure Machine Learning podem acessar os dados usando as credenciais da conta de armazenamento do Azure

  • chave de acesso
  • Token SAS
  • entidade de serviço

ou forneça acesso a dados sem credencial. Dependendo do tipo de armazenamento de dados e do tipo de conta de armazenamento subjacente do Azure, selecione um mecanismo de autenticação apropriado para garantir o acesso aos dados. Esta tabela resume os mecanismos de autenticação para acessar dados nos armazenamentos de dados do Azure Machine Learning:

Tipo de conta de armazenamento Acesso a dados sem credencial Mecanismo de acesso a dados Atribuições de função
Blob do Azure Não Chave de acesso ou token SAS Não é necessária uma atribuição de função
Blob do Azure Sim Passagem de identidade do usuário* A identidade do usuário deve ter atribuições de função apropriadas na conta de armazenamento de Blobs do Azure
ADLS (Azure Data Lake Storage) Gen 2 Não Entidade de serviço A entidade de serviço deve ter atribuições de função apropriadas na conta de armazenamento do ADLS (Azure Data Lake Storage) Gen 2
ADLS (Azure Data Lake Storage) Gen 2 Sim Passagem de identidade do usuário A identidade do usuário deve ter atribuições de função apropriadas na conta de armazenamento do ADLS (Azure Data Lake Storage) Gen 2

* A passagem de identidade do usuário funciona nos armazenamentos de dados sem credenciais que apontam para as contas de armazenamento de blobs do Azure somente se a exclusão temporária não está habilitada.

Acesso aos dados no compartilhamento de arquivo padrão

O compartilhamento de arquivos padrão é montado tanto para a computação do Spark sem servidor quanto para os Pools do Spark do Synapse anexados.

Screenshot showing use of a file share.

No Estúdio do Azure Machine Learning, os arquivos no compartilhamento de arquivo padrão são mostrados na árvore de diretórios na guia Arquivos. O código do notebook pode acessar diretamente os arquivos armazenados nesse compartilhamento de arquivo com o protocolo file://, juntamente com o caminho absoluto do arquivo, sem configuração adicional. Este snippet de código mostra como acessar um arquivo armazenado no compartilhamento de arquivo padrão:

import os
import pyspark.pandas as pd
from pyspark.ml.feature import Imputer

abspath = os.path.abspath(".")
file = "file://" + abspath + "/Users/<USER>/data/titanic.csv"
print(file)
df = pd.read_csv(file, index_col="PassengerId")
imputer = Imputer(
    inputCols=["Age"],
    outputCol="Age").setStrategy("mean") # Replace missing values in Age column with the mean value
df.fillna(value={"Cabin" : "None"}, inplace=True) # Fill Cabin column with value "None" if missing
df.dropna(inplace=True) # Drop the rows which still have any missing value
output_path = "file://" + abspath + "/Users/<USER>/data/wrangled"
df.to_csv(output_path, index_col="PassengerId")

Observação

Este exemplo de código Python usa pyspark.pandas. Somente a versão 3.2 do runtime do Spark ou posterior dá suporte a isso.

Próximas etapas