Nota
O acesso a esta página requer autorização. Podes tentar iniciar sessão ou mudar de diretório.
O acesso a esta página requer autorização. Podes tentar mudar de diretório.
A manipulação de dados é um aspeto importante dos projetos de aprendizagem automática. Neste artigo, irá aprender a realizar manipulação interativa de dados ao executar notebooks do Azure Machine Learning num ambiente de cálculo serverless Apache Spark apoiado pelo Azure Synapse.
Este artigo explica como ligar e configurar um computador Spark serverless. O artigo mostra então como usar o cálculo serverless Spark para aceder e gerir dados de várias fontes.
Pré-requisitos
- Permissões de proprietário ou atribuição de funções numa subscrição do Azure. Pode criar uma conta Azure gratuita.
- Uma área de trabalho do Azure Machine Learning. Para obter mais informações, consulte Criar recursos de espaço de trabalho.
- O conjunto de dados titanic.csv foi carregado para a partilha de ficheiros padrão no seu espaço de trabalho.
- Uma conta de armazenamento Azure Data Lake Gen 2. Para mais informações, consulte Criar uma conta de armazenamento Azure Data Lake Gen 2.
- As seguintes atribuições de funções atribuídas:
- Para acesso à conta Azure Storage, são necessárias as funções Contributor e Storage Blob Data Contributor.
- Para acesso secreto ao Azure Key Vault, papel de utilizador do Key Vault Secrets no cofre de chaves.
Para obter mais informações, consulte:
- Crie um Azure Key Vault
- Criar um service principal
- Anexe um pool Synapse Spark no espaço de trabalho Azure Machine Learning
Use computação serverless do Spark em sessões de notebook
Usar um cálculo Spark serverless é a maneira mais fácil de obter acesso a um cluster Spark para a manipulação interativa de dados. Uma computação Spark totalmente gerida e sem servidor, ligada a um pool Synapse Spark , está disponível diretamente nos cadernos Azure Machine Learning.
Para usar qualquer uma das seguintes fontes e métodos de acesso e gestão de dados, anexe o Spark serverless compute selecionando Azure Machine Learning Serverless Spark>Serverless Spark Compute - Disponível ao lado de Compute no topo da página do ficheiro ou caderno. Pode demorar um ou dois minutos para a computação se ligar à sessão.
Configurar uma sessão serverless Spark
Depois de ligares a computação Spark serverless, podes opcionalmente configurar a sessão do Spark, definindo ou alterando vários valores. Para configurar a sessão do Spark:
- Selecione Configurar sessão no canto superior esquerdo na página do ficheiro ou caderno.
- No ecrã Configurar sessão , altere qualquer uma das seguintes definições:
No painel de Computação :
- Altere o tamanho da máquina selecionando um tamanho diferente no menu suspenso em Node size.
- Selecione se quer ou não alocar executores dinamicamente.
- Selecione o número de Executores para a sessão do Spark.
- Selecione um tamanho diferente do Executor , se disponível, no menu suspenso.
No painel de Definições :
- Muda a versão do Apache Spark para uma versão diferente da 3.4, se estiver disponível.
- Mude o valor de timeout da sessão em minutos para um valor mais alto para ajudar a evitar tempos de espera da sessão.
-
Em Definições de Configuração, adicione definições de nome/valor de propriedade para configurar a sessão conforme necessário.
Gorjeta
Se usar pacotes Conda ao nível da sessão, ao adicionar a propriedade de configuração
spark.hadoop.aml.enable_cachecom o valortrue, pode melhorar o tempo de arranque a frio da sessão do Spark. Um início a frio de sessão com pacotes Conda de nível de sessão normalmente demora entre 10 a 15 minutos na primeira vez. Os arranques a frio das sessões seguintes com a variável de configuração definida para true normalmente demoram entre três a cinco minutos.
No painel de pacotes em Python :
- Para usar um ficheiro Conda para configurar a sua sessão, selecione Carregar ficheiro Conda. A seguir, selecione Selecionar ficheiro Conda, selecione Navegar e depois procure e abra o ficheiro YAML Conda apropriado na sua máquina para carregar.
- Para usar um ambiente personalizado, selecione Ambiente personalizado e selecione um ambiente personalizado em Tipo de Ambiente. Para mais informações, consulte Gerir ambientes de software.
- Para aplicar todas as configurações, selecione Aplicar.
As alterações na configuração da sessão persistem e estão disponíveis para outras sessões de notebook que utilizam o computador serverless Spark ligado.
Importar e preparar dados do Azure Data Lake Storage
Para aceder e manipular os dados armazenados nas contas de armazenamento Azure Data Lake, utiliza-se um abfss:// URI de protocolo com passagem da identidade de utilizador ou acesso baseado em entidade de serviço. A passagem de identidade do utilizador não requer configuração adicional.
Para usar qualquer um dos métodos, a identidade do utilizador ou o principal do serviço deve ter atribuições de papéisContribuidor e Contribuidor de Dados de Blob de Armazenamento na conta Azure Data Lake Storage.
Para a passagem de identidade do utilizador, execute o seguinte exemplo de código de processamento de dados para utilizar um URI de dados no formato abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/<PATH_TO_DATA> com pyspark.pandas e pyspark.ml.feature.Imputer. Substitua o <STORAGE_ACCOUNT_NAME> marcador pelo nome da sua conta Azure Data Lake Storage e <FILE_SYSTEM_NAME> pelo nome do contentor de dados.
import pyspark.pandas as pd
from pyspark.ml.feature import Imputer
df = pd.read_csv(
"abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/data/titanic.csv",
index_col="PassengerId",
)
imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy(
"mean"
) # Replace missing values in Age column with the mean value
df.fillna(
value={"Cabin": "None"}, inplace=True
) # Fill Cabin column with value "None" if missing
df.dropna(inplace=True) # Drop the rows which still have any missing value
df.to_csv(
"abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/data/wrangled",
index_col="PassengerId",
)
Usar uma entidade de serviço
Para usar um principal de serviço para aceder e gerir dados do Azure Data Lake Storage, configure primeiro o principal de serviço da seguinte forma:
Crie um principal de serviço e atribua-lhe os papéis necessários de Contribuidor de Dados de Blob de Armazenamento e Utilizador de Segredos do Cofre de Chaves.
Obtenha o ID do tenant principal do serviço, ID do cliente e valores secretos do cliente a partir do registo da aplicação e crie segredos Azure Key Vault para esses valores.
Defina o ID do inquilino principal do serviço, o ID do cliente e o segredo do cliente, adicionando os seguintes pares nome/valor na configuração da sessão. Substitua
<STORAGE_ACCOUNT_NAME>pelo nome da sua conta de armazenamento e<TENANT_ID>pelo ID de inquilino principal do serviço.Nome da propriedade Valor fs.azure.account.oauth2.client.id.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.netValor do ID da aplicação (cliente) fs.azure.account.oauth2.client.endpoint.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.nethttps://login.microsoftonline.com/<TENANT_ID>/oauth2/tokenfs.azure.account.oauth2.client.secret.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.netValor secreto do cliente Execute o código a seguir. A
get_secret()chamada no código depende do nome do Key Vault e dos nomes dos segredos do Key Vault criados para o ID do tenant principal de serviço, ID do cliente e segredo do cliente.from pyspark.sql import SparkSession sc = SparkSession.builder.getOrCreate() token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary # Set up service principal tenant ID, client ID, and secret from Azure Key Vault client_id = token_library.getSecret("<KEY_VAULT_NAME>", "<CLIENT_ID_SECRET_NAME>") tenant_id = token_library.getSecret("<KEY_VAULT_NAME>", "<TENANT_ID_SECRET_NAME>") client_secret = token_library.getSecret("<KEY_VAULT_NAME>", "<CLIENT_SECRET_NAME>") # Set up a service principal that has access to the data sc._jsc.hadoopConfiguration().set( "fs.azure.account.auth.type.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net", "OAuth" ) sc._jsc.hadoopConfiguration().set( "fs.azure.account.oauth.provider.type.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider", ) sc._jsc.hadoopConfiguration().set( "fs.azure.account.oauth2.client.id.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net", client_id, ) sc._jsc.hadoopConfiguration().set( "fs.azure.account.oauth2.client.secret.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net", client_secret, ) sc._jsc.hadoopConfiguration().set( "fs.azure.account.oauth2.client.endpoint.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net", "https://login.microsoftonline.com/" + tenant_id + "/oauth2/token", )Importa e manipula os dados titanic.csv usando um URI de dados no
abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/<PATH_TO_DATA>formato, como mostrado no exemplo de código. Substitua o<STORAGE_ACCOUNT_NAME>marcador pelo nome da sua conta Azure Data Lake Storage e<FILE_SYSTEM_NAME>pelo nome do contentor de dados.import pyspark.pandas as pd from pyspark.ml.feature import Imputer df = pd.read_csv( "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/data/titanic.csv", index_col="PassengerId", ) imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy( "mean" ) # Replace missing values in Age column with the mean value df.fillna( value={"Cabin": "None"}, inplace=True ) # Fill Cabin column with value "None" if missing df.dropna(inplace=True) # Drop the rows which still have any missing value df.to_csv( "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/data/wrangled", index_col="PassengerId", )
Importar e processar dados do armazenamento de Blob do Azure
Pode aceder aos dados de armazenamento do Azure Blob com a chave de acesso da conta de armazenamento ou com um token de assinatura de acesso partilhada (SAS). Armazene a credencial no Azure Key Vault como um segredo e defina-a como propriedade na configuração da sessão Spark.
Executa um dos seguintes excertos de código. As
get_secret()chamadas nos excertos de código requerem o nome do cofre de chaves e os nomes dos segredos criados para a chave de acesso à conta de armazenamento Azure Blob ou token SAS.Para configurar uma chave de acesso à conta de armazenamento, defina a
fs.azure.account.key.<STORAGE_ACCOUNT_NAME>.blob.core.windows.netpropriedade conforme mostrado no seguinte excerto de código:from pyspark.sql import SparkSession sc = SparkSession.builder.getOrCreate() token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary access_key = token_library.getSecret("<KEY_VAULT_NAME>", "<ACCESS_KEY_SECRET_NAME>") sc._jsc.hadoopConfiguration().set( "fs.azure.account.key.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net", access_key )Para configurar um token SAS, defina a
fs.azure.sas.<BLOB_CONTAINER_NAME>.<STORAGE_ACCOUNT_NAME>.blob.core.windows.netpropriedade conforme mostrado no seguinte excerto de código:from pyspark.sql import SparkSession sc = SparkSession.builder.getOrCreate() token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary sas_token = token_library.getSecret("<KEY_VAULT_NAME>", "<SAS_TOKEN_SECRET_NAME>") sc._jsc.hadoopConfiguration().set( "fs.azure.sas.<BLOB_CONTAINER_NAME>.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net", sas_token, )
Corra o seguinte código de manipulação de dados com o URI de dados formatado como
wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/<PATH_TO_DATA>.import pyspark.pandas as pd from pyspark.ml.feature import Imputer df = pd.read_csv( "wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/data/titanic.csv", index_col="PassengerId", ) imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy( "mean" ) # Replace missing values in Age column with the mean value df.fillna( value={"Cabin": "None"}, inplace=True ) # Fill Cabin column with value "None" if missing df.dropna(inplace=True) # Drop the rows which still have any missing value df.to_csv( "wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/data/wrangled", index_col="PassengerId", )
Importar e manipular dados de um datastore do Azure Machine Learning
Para aceder a dados a partir de um armazenamento de dados Azure Machine Learning, defines um caminho para os dados no armazenamento com o formatoazureml://datastores/<DATASTORE_NAME>/paths/<PATH_TO_DATA> URI.
Execute o seguinte exemplo de código para ler e gerir dados titanic.csv de um datastore Azure Machine Learning usando azureml:// datastore URI, pyspark.pandas, e pyspark.ml.feature.Imputer.
import pyspark.pandas as pd
from pyspark.ml.feature import Imputer
df = pd.read_csv(
"azureml://datastores/<DATASTORE_NAME>/paths/data/titanic.csv",
index_col="PassengerId",
)
imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy(
"mean"
) # Replace missing values in Age column with the mean value
df.fillna(
value={"Cabin": "None"}, inplace=True
) # Fill Cabin column with value "None" if missing
df.dropna(inplace=True) # Drop the rows which still have any missing value
df.to_csv(
"azureml://datastores/<DATASTORE_NAME>/paths/data/wrangled",
index_col="PassengerId",
)
Os armazenamentos de dados Azure Machine Learning podem aceder a dados usando uma chave de acesso à conta de armazenamento Azure, token SAS, credenciais do principal de serviço ou acesso a dados sem credenciais. Selecione o mecanismo de autenticação apropriado consoante o tipo de datastore e o tipo de conta de armazenamento Azure subjacente.
A tabela seguinte resume os mecanismos de autenticação para aceder a dados nos datastores Azure Machine Learning:
| Tipo de conta de armazenamento | Acesso a dados sem credenciais | Mecanismo de acesso aos dados | Atribuições de funções |
|---|---|---|---|
| Blob do Azure | Não | Chave de acesso ou token SAS | Não são necessárias atribuições de funções. |
| Blob do Azure | Sim | Passagem de identidade do utilizador* | A identidade do utilizador deve ter atribuições de funções apropriadas na conta de armazenamento Azure Blob. |
| Azure Data Lake Storage | Não | Service principal (Principal de serviço) | O principal de serviço deve ter atribuições de funções apropriadas na conta de armazenamento Azure Data Lake. |
| Azure Data Lake Storage | Sim | Passagem de identidade do usuário | A identidade do utilizador deve ter atribuições de funções apropriadas na conta de armazenamento Azure Data Lake. |
* A passagem de identidade do utilizador funciona para datastores sem credenciais que apontam para contas de armazenamento Azure Blob apenas se a eliminação suave não estiver ativada.
Aceder aos dados na partilha de ficheiros predefinida
No Azure Machine Learning Studio, a partilha de ficheiros do teu espaço de trabalho predefinido é a árvore de diretórios sob o separador Ficheiros em Cadernos. O código do notebook pode aceder diretamente a ficheiros armazenados nesta partilha de ficheiros com o file:// protocolo, usando o caminho absoluto do ficheiro sem outra configuração. O compartilhamento de arquivos padrão é montado na computação do Spark sem servidor e nos pools Synapse Spark anexados.
O seguinte excerto de código acede e manipula dados do ficheirotitanic.csv armazenado numa pasta de dados diretamente sob o nome de utilizador na partilha de ficheiros predefinida. Substitui o <USER> marcador pelo teu nome de utilizador.
import os
import pyspark.pandas as pd
from pyspark.ml.feature import Imputer
abspath = os.path.abspath(".")
file = "file://" + abspath + "/Users/<USER>/data/titanic.csv"
print(file)
df = pd.read_csv(file, index_col="PassengerId")
imputer = Imputer(
inputCols=["Age"],
outputCol="Age").setStrategy("mean") # Replace missing values in Age column with the mean value
df.fillna(value={"Cabin" : "None"}, inplace=True) # Fill Cabin column with value "None" if missing
df.dropna(inplace=True) # Drop the rows which still have any missing value
output_path = "file://" + abspath + "/Users/<USER>/data/wrangled"
df.to_csv(output_path, index_col="PassengerId")