Partilhar via


Gestão interativa de dados com Apache Spark

A manipulação de dados é um aspeto importante dos projetos de aprendizagem automática. Neste artigo, irá aprender a realizar manipulação interativa de dados ao executar notebooks do Azure Machine Learning num ambiente de cálculo serverless Apache Spark apoiado pelo Azure Synapse.

Este artigo explica como ligar e configurar um computador Spark serverless. O artigo mostra então como usar o cálculo serverless Spark para aceder e gerir dados de várias fontes.

Pré-requisitos

  • Permissões de proprietário ou atribuição de funções numa subscrição do Azure. Pode criar uma conta Azure gratuita.
  • Uma área de trabalho do Azure Machine Learning. Para obter mais informações, consulte Criar recursos de espaço de trabalho.
  • O conjunto de dados titanic.csv foi carregado para a partilha de ficheiros padrão no seu espaço de trabalho.
  • Uma conta de armazenamento Azure Data Lake Gen 2. Para mais informações, consulte Criar uma conta de armazenamento Azure Data Lake Gen 2.
  • As seguintes atribuições de funções atribuídas:
    • Para acesso à conta Azure Storage, são necessárias as funções Contributor e Storage Blob Data Contributor.
    • Para acesso secreto ao Azure Key Vault, papel de utilizador do Key Vault Secrets no cofre de chaves.

Para obter mais informações, consulte:

Use computação serverless do Spark em sessões de notebook

Usar um cálculo Spark serverless é a maneira mais fácil de obter acesso a um cluster Spark para a manipulação interativa de dados. Uma computação Spark totalmente gerida e sem servidor, ligada a um pool Synapse Spark , está disponível diretamente nos cadernos Azure Machine Learning.

Para usar qualquer uma das seguintes fontes e métodos de acesso e gestão de dados, anexe o Spark serverless compute selecionando Azure Machine Learning Serverless Spark>Serverless Spark Compute - Disponível ao lado de Compute no topo da página do ficheiro ou caderno. Pode demorar um ou dois minutos para a computação se ligar à sessão.

Configurar uma sessão serverless Spark

Depois de ligares a computação Spark serverless, podes opcionalmente configurar a sessão do Spark, definindo ou alterando vários valores. Para configurar a sessão do Spark:

  1. Selecione Configurar sessão no canto superior esquerdo na página do ficheiro ou caderno.
  2. No ecrã Configurar sessão , altere qualquer uma das seguintes definições:
    • No painel de Computação :

      • Altere o tamanho da máquina selecionando um tamanho diferente no menu suspenso em Node size.
      • Selecione se quer ou não alocar executores dinamicamente.
      • Selecione o número de Executores para a sessão do Spark.
      • Selecione um tamanho diferente do Executor , se disponível, no menu suspenso.
    • No painel de Definições :

      • Muda a versão do Apache Spark para uma versão diferente da 3.4, se estiver disponível.
      • Mude o valor de timeout da sessão em minutos para um valor mais alto para ajudar a evitar tempos de espera da sessão.
      • Em Definições de Configuração, adicione definições de nome/valor de propriedade para configurar a sessão conforme necessário.

        Gorjeta

        Se usar pacotes Conda ao nível da sessão, ao adicionar a propriedade de configuração spark.hadoop.aml.enable_cache com o valor true, pode melhorar o tempo de arranque a frio da sessão do Spark. Um início a frio de sessão com pacotes Conda de nível de sessão normalmente demora entre 10 a 15 minutos na primeira vez. Os arranques a frio das sessões seguintes com a variável de configuração definida para true normalmente demoram entre três a cinco minutos.

    • No painel de pacotes em Python :

      • Para usar um ficheiro Conda para configurar a sua sessão, selecione Carregar ficheiro Conda. A seguir, selecione Selecionar ficheiro Conda, selecione Navegar e depois procure e abra o ficheiro YAML Conda apropriado na sua máquina para carregar.
      • Para usar um ambiente personalizado, selecione Ambiente personalizado e selecione um ambiente personalizado em Tipo de Ambiente. Para mais informações, consulte Gerir ambientes de software.
  3. Para aplicar todas as configurações, selecione Aplicar.

As alterações na configuração da sessão persistem e estão disponíveis para outras sessões de notebook que utilizam o computador serverless Spark ligado.

Importar e preparar dados do Azure Data Lake Storage

Para aceder e manipular os dados armazenados nas contas de armazenamento Azure Data Lake, utiliza-se um abfss:// URI de protocolo com passagem da identidade de utilizador ou acesso baseado em entidade de serviço. A passagem de identidade do utilizador não requer configuração adicional.

Para usar qualquer um dos métodos, a identidade do utilizador ou o principal do serviço deve ter atribuições de papéisContribuidor e Contribuidor de Dados de Blob de Armazenamento na conta Azure Data Lake Storage.

Para a passagem de identidade do utilizador, execute o seguinte exemplo de código de processamento de dados para utilizar um URI de dados no formato abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/<PATH_TO_DATA> com pyspark.pandas e pyspark.ml.feature.Imputer. Substitua o <STORAGE_ACCOUNT_NAME> marcador pelo nome da sua conta Azure Data Lake Storage e <FILE_SYSTEM_NAME> pelo nome do contentor de dados.

import pyspark.pandas as pd
from pyspark.ml.feature import Imputer

df = pd.read_csv(
    "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/data/titanic.csv",
    index_col="PassengerId",
)
imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy(
    "mean"
)  # Replace missing values in Age column with the mean value
df.fillna(
    value={"Cabin": "None"}, inplace=True
)  # Fill Cabin column with value "None" if missing
df.dropna(inplace=True)  # Drop the rows which still have any missing value
df.to_csv(
    "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/data/wrangled",
    index_col="PassengerId",
)

Usar uma entidade de serviço

Para usar um principal de serviço para aceder e gerir dados do Azure Data Lake Storage, configure primeiro o principal de serviço da seguinte forma:

  1. Crie um principal de serviço e atribua-lhe os papéis necessários de Contribuidor de Dados de Blob de Armazenamento e Utilizador de Segredos do Cofre de Chaves.

  2. Obtenha o ID do tenant principal do serviço, ID do cliente e valores secretos do cliente a partir do registo da aplicação e crie segredos Azure Key Vault para esses valores.

  3. Defina o ID do inquilino principal do serviço, o ID do cliente e o segredo do cliente, adicionando os seguintes pares nome/valor na configuração da sessão. Substitua <STORAGE_ACCOUNT_NAME> pelo nome da sua conta de armazenamento e <TENANT_ID> pelo ID de inquilino principal do serviço.

    Nome da propriedade Valor
    fs.azure.account.oauth2.client.id.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net Valor do ID da aplicação (cliente)
    fs.azure.account.oauth2.client.endpoint.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net https://login.microsoftonline.com/<TENANT_ID>/oauth2/token
    fs.azure.account.oauth2.client.secret.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net Valor secreto do cliente
  4. Execute o código a seguir. A get_secret() chamada no código depende do nome do Key Vault e dos nomes dos segredos do Key Vault criados para o ID do tenant principal de serviço, ID do cliente e segredo do cliente.

    from pyspark.sql import SparkSession
    
    sc = SparkSession.builder.getOrCreate()
    token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
    
    # Set up service principal tenant ID, client ID, and secret from Azure Key Vault
    client_id = token_library.getSecret("<KEY_VAULT_NAME>", "<CLIENT_ID_SECRET_NAME>")
    tenant_id = token_library.getSecret("<KEY_VAULT_NAME>", "<TENANT_ID_SECRET_NAME>")
    client_secret = token_library.getSecret("<KEY_VAULT_NAME>", "<CLIENT_SECRET_NAME>")
    
    # Set up a service principal that has access to the data
    sc._jsc.hadoopConfiguration().set(
        "fs.azure.account.auth.type.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net", "OAuth"
    )
    sc._jsc.hadoopConfiguration().set(
        "fs.azure.account.oauth.provider.type.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
        "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
    )
    sc._jsc.hadoopConfiguration().set(
        "fs.azure.account.oauth2.client.id.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
        client_id,
    )
    sc._jsc.hadoopConfiguration().set(
        "fs.azure.account.oauth2.client.secret.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
        client_secret,
    )
    sc._jsc.hadoopConfiguration().set(
        "fs.azure.account.oauth2.client.endpoint.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
        "https://login.microsoftonline.com/" + tenant_id + "/oauth2/token",
    )
    
  5. Importa e manipula os dados titanic.csv usando um URI de dados no abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/<PATH_TO_DATA> formato, como mostrado no exemplo de código. Substitua o <STORAGE_ACCOUNT_NAME> marcador pelo nome da sua conta Azure Data Lake Storage e <FILE_SYSTEM_NAME> pelo nome do contentor de dados.

    import pyspark.pandas as pd
    from pyspark.ml.feature import Imputer
    
    df = pd.read_csv(
        "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/data/titanic.csv",
        index_col="PassengerId",
    )
    imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy(
        "mean"
    )  # Replace missing values in Age column with the mean value
    df.fillna(
        value={"Cabin": "None"}, inplace=True
    )  # Fill Cabin column with value "None" if missing
    df.dropna(inplace=True)  # Drop the rows which still have any missing value
    df.to_csv(
        "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/data/wrangled",
        index_col="PassengerId",
    )
    

Importar e processar dados do armazenamento de Blob do Azure

Pode aceder aos dados de armazenamento do Azure Blob com a chave de acesso da conta de armazenamento ou com um token de assinatura de acesso partilhada (SAS). Armazene a credencial no Azure Key Vault como um segredo e defina-a como propriedade na configuração da sessão Spark.

  1. Executa um dos seguintes excertos de código. As get_secret() chamadas nos excertos de código requerem o nome do cofre de chaves e os nomes dos segredos criados para a chave de acesso à conta de armazenamento Azure Blob ou token SAS.

    • Para configurar uma chave de acesso à conta de armazenamento, defina a fs.azure.account.key.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net propriedade conforme mostrado no seguinte excerto de código:

      from pyspark.sql import SparkSession
      
      sc = SparkSession.builder.getOrCreate()
      token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
      access_key = token_library.getSecret("<KEY_VAULT_NAME>", "<ACCESS_KEY_SECRET_NAME>")
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.key.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net", access_key
      )
      
    • Para configurar um token SAS, defina a fs.azure.sas.<BLOB_CONTAINER_NAME>.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net propriedade conforme mostrado no seguinte excerto de código:

      from pyspark.sql import SparkSession
      
      sc = SparkSession.builder.getOrCreate()
      token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
      sas_token = token_library.getSecret("<KEY_VAULT_NAME>", "<SAS_TOKEN_SECRET_NAME>")
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.sas.<BLOB_CONTAINER_NAME>.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net",
          sas_token,
      )
      
  2. Corra o seguinte código de manipulação de dados com o URI de dados formatado como wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/<PATH_TO_DATA>.

    import pyspark.pandas as pd
    from pyspark.ml.feature import Imputer
    
    df = pd.read_csv(
        "wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/data/titanic.csv",
        index_col="PassengerId",
    )
    imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy(
        "mean"
    )  # Replace missing values in Age column with the mean value
    df.fillna(
        value={"Cabin": "None"}, inplace=True
    )  # Fill Cabin column with value "None" if missing
    df.dropna(inplace=True)  # Drop the rows which still have any missing value
    df.to_csv(
        "wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/data/wrangled",
        index_col="PassengerId",
    )
    

Importar e manipular dados de um datastore do Azure Machine Learning

Para aceder a dados a partir de um armazenamento de dados Azure Machine Learning, defines um caminho para os dados no armazenamento com o formatoazureml://datastores/<DATASTORE_NAME>/paths/<PATH_TO_DATA> URI.

Execute o seguinte exemplo de código para ler e gerir dados titanic.csv de um datastore Azure Machine Learning usando azureml:// datastore URI, pyspark.pandas, e pyspark.ml.feature.Imputer.

import pyspark.pandas as pd
from pyspark.ml.feature import Imputer

df = pd.read_csv(
    "azureml://datastores/<DATASTORE_NAME>/paths/data/titanic.csv",
    index_col="PassengerId",
)
imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy(
    "mean"
)  # Replace missing values in Age column with the mean value
df.fillna(
    value={"Cabin": "None"}, inplace=True
)  # Fill Cabin column with value "None" if missing
df.dropna(inplace=True)  # Drop the rows which still have any missing value
df.to_csv(
    "azureml://datastores/<DATASTORE_NAME>/paths/data/wrangled",
    index_col="PassengerId",
)

Os armazenamentos de dados Azure Machine Learning podem aceder a dados usando uma chave de acesso à conta de armazenamento Azure, token SAS, credenciais do principal de serviço ou acesso a dados sem credenciais. Selecione o mecanismo de autenticação apropriado consoante o tipo de datastore e o tipo de conta de armazenamento Azure subjacente.

A tabela seguinte resume os mecanismos de autenticação para aceder a dados nos datastores Azure Machine Learning:

Tipo de conta de armazenamento Acesso a dados sem credenciais Mecanismo de acesso aos dados Atribuições de funções
Blob do Azure Não Chave de acesso ou token SAS Não são necessárias atribuições de funções.
Blob do Azure Sim Passagem de identidade do utilizador* A identidade do utilizador deve ter atribuições de funções apropriadas na conta de armazenamento Azure Blob.
Azure Data Lake Storage Não Service principal (Principal de serviço) O principal de serviço deve ter atribuições de funções apropriadas na conta de armazenamento Azure Data Lake.
Azure Data Lake Storage Sim Passagem de identidade do usuário A identidade do utilizador deve ter atribuições de funções apropriadas na conta de armazenamento Azure Data Lake.

* A passagem de identidade do utilizador funciona para datastores sem credenciais que apontam para contas de armazenamento Azure Blob apenas se a eliminação suave não estiver ativada.

Aceder aos dados na partilha de ficheiros predefinida

No Azure Machine Learning Studio, a partilha de ficheiros do teu espaço de trabalho predefinido é a árvore de diretórios sob o separador Ficheiros em Cadernos. O código do notebook pode aceder diretamente a ficheiros armazenados nesta partilha de ficheiros com o file:// protocolo, usando o caminho absoluto do ficheiro sem outra configuração. O compartilhamento de arquivos padrão é montado na computação do Spark sem servidor e nos pools Synapse Spark anexados.

Captura de ecrã a mostrar o uso de uma partilha de ficheiros.

O seguinte excerto de código acede e manipula dados do ficheirotitanic.csv armazenado numa pasta de dados diretamente sob o nome de utilizador na partilha de ficheiros predefinida. Substitui o <USER> marcador pelo teu nome de utilizador.

import os
import pyspark.pandas as pd
from pyspark.ml.feature import Imputer

abspath = os.path.abspath(".")
file = "file://" + abspath + "/Users/<USER>/data/titanic.csv"
print(file)
df = pd.read_csv(file, index_col="PassengerId")
imputer = Imputer(
    inputCols=["Age"],
    outputCol="Age").setStrategy("mean") # Replace missing values in Age column with the mean value
df.fillna(value={"Cabin" : "None"}, inplace=True) # Fill Cabin column with value "None" if missing
df.dropna(inplace=True) # Drop the rows which still have any missing value
output_path = "file://" + abspath + "/Users/<USER>/data/wrangled"
df.to_csv(output_path, index_col="PassengerId")