Limpieza y transformación de datos interactivos con Apache Spark en Azure Machine Learning
La limpieza y transformación de datos se convierte en uno de los pasos más importantes de los proyectos de aprendizaje automático. La integración de Azure Machine Learning con Azure Synapse Analytics proporciona acceso a un grupo de Apache Spark, respaldado por Azure Synapse, para la limpieza y transformación de datos interactivos mediante cuadernos de Azure Machine Learning.
En este artículo, aprenderá a realizar la limpieza y transformación de datos mediante
- Proceso de Spark sin servidor
- Grupo de Spark de Synapse asociado
Requisitos previos
- Una suscripción a Azure: si aún no tiene ninguna, cree una cuenta gratuita antes de empezar.
- Un área de trabajo de Azure Machine Learning. Consulte Creación de recursos del área de trabajo.
- Una cuenta de almacenamiento de Azure Data Lake Storage (ADLS) Gen 2. Vea Creación de una cuenta de almacenamiento para su uso con Azure Data Lake Storage Gen2 habilitado.
- (Opcional): una instancia de Azure Key Vault. Vea Crear una instancia de Azure Key Vault.
- (Opcional): una entidad de servicio. Vea Crear una entidad de servicio.
- (Opcional): Un grupo de Synapse Spark asociado en el área de trabajo de Azure Machine Learning.
Antes de iniciar las tareas de limpieza y transformación de datos, deberá estar familiarizado con el proceso de almacenamiento de secretos.
- Clave de acceso de la cuenta de Azure Blob Storage
- Token de firma de acceso compartido (SAS)
- Información de la entidad de servicio de Azure Data Lake Storage (ADLS) Gen 2
en Azure Key Vault. También deberá saber cómo controlar las asignaciones de roles en las cuentas de almacenamiento de Azure. Estos conceptos se tratan en las secciones siguientes. A continuación, exploraremos los detalles de la limpieza y transformación de datos interactivos mediante los grupos de Spark en cuadernos de Azure Machine Learning.
Sugerencia
Para obtener más información sobre la configuración de la asignación de roles de la cuenta de almacenamiento de Azure, o si accede a los datos de las cuentas de almacenamiento mediante el paso directo de identidad de usuario, puede ir directamente a Agregar asignaciones de roles en cuentas de almacenamiento de Azure.
Limpieza y transformación de datos interactivos con Apache Spark
Azure Machine Learning ofrece el proceso de Spark sin servidor y un grupo de Synapse Spark adjunto para la limpieza y transformación de datos interactivos con Apache Spark en cuadernos de Azure Machine Learning. El proceso de Spark sin servidor no requiere la creación de recursos en el área de trabajo de Azure Synapse. En su lugar, un proceso de Spark sin servidor totalmente administrado está disponible directamente en los cuadernos de Azure Machine Learning. El uso de un proceso de Spark sin servidor es el enfoque más sencillo para acceder a un clúster de Spark en Azure Machine Learning.
Proceso de Spark sin servidor en cuadernos de Azure Machine Learning
Un proceso de Spark sin servidor está disponible en cuadernos de Azure Machine Learning de manera predeterminada. Para acceder a él en un cuaderno, seleccione Proceso de Spark sin servidor en Spark sin servidor de Azure Machine Learning, en el menú de selección Proceso.
La interfaz de usuario de Notebooks también proporciona opciones para la configuración de sesión de Spark, para el proceso de Spark sin servidor. Para configurar una sesión de Spark:
- Seleccione Configurar sesión en la parte superior de la pantalla.
- Seleccione una versión de Apache Spark en el menú desplegable.
Importante
El anuncio de fin de vida útil (EOLA) de Azure Synapse Runtime para Apache Spark 3.1 se realizó el 26 de enero de 2023. Como consecuencia, Apache Spark 3.1 no será compatible después del 31 de julio de 2023. Se recomienda usar Apache Spark 3.2.
- Seleccione Tipo de instancia en el menú desplegable. Actualmente se admiten los siguientes tipos de instancias:
Standard_E4s_v3
Standard_E8s_v3
Standard_E16s_v3
Standard_E32s_v3
Standard_E64s_v3
- Escriba un valor de Tiempo de espera de sesión de Spark, en minutos.
- Seleccione si va a asignar ejecutores dinámicamente.
- Seleccione el número de Ejecutores para la sesión de Spark.
- Seleccione Tamaño del ejecutor en el menú desplegable.
- Seleccione Tamaño del controlador en el menú desplegable.
- Para usar un archivo de Conda para configurar una sesión de Spark, active la casilla Cargar archivo de Conda. A continuación, seleccione Examinar y elija el archivo de Conda con la configuración de sesión de Spark que desee.
- Agregue propiedades de Opciones de configuración, valores de entrada en los cuadros de texto Propiedad y Valor y seleccione Agregar.
- Seleccione Aplicar.
- Seleccione Detener sesión en el menú emergente ¿Configurar nueva sesión?.
Los cambios de configuración de sesión se conservan y están disponibles para otra sesión de cuaderno que se inicia mediante el proceso de Spark sin servidor.
Importación y limpieza y transformación de datos de Azure Data Lake Storage (ADLS) Gen 2
Puede acceder a los datos almacenados en cuentas de almacenamiento de Azure Data Lake Storage (ADLS) Gen 2 y limpiarlos y transformarlos con los URI de datos abfss://
siguiendo uno de los dos mecanismos de acceso a datos:
- Paso a través de la identidad del usuario
- Acceso a datos basado en la entidad de servicio
Sugerencia
La limpieza y transformación de datos con un proceso de Spark sin servidor y el paso a través de la identidad del usuario para acceder a los datos en la cuenta de almacenamiento de Azure Data Lake Storage (ADLS) Gen 2 requieren un número mínimo de pasos de configuración.
Para iniciar la limpieza y transformación de datos interactivos con el paso a través de la identidad del usuario:
Compruebe que la identidad del usuario tiene las asignaciones de rolesColaborador y Colaborador de datos de Storage Blob en la cuenta de almacenamiento de Azure Data Lake Storage (ADLS) Gen 2.
Para usar el proceso de Spark sin servidor, seleccione Proceso de Spark sin servidor en Spark sin servidor de Azure Machine Learning, en el menú de selección Proceso.
Para usar un grupo de Synapse Spark conectado, seleccione un grupo de Synapse Spark conectado en Grupos de Synapse Spark, en el menú de selección Proceso.
Este ejemplo de código de limpieza y transformación de datos de Titanic muestra el uso de un URI de datos en formato
abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/<PATH_TO_DATA>
conpyspark.pandas
ypyspark.ml.feature.Imputer
.import pyspark.pandas as pd from pyspark.ml.feature import Imputer df = pd.read_csv( "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/data/titanic.csv", index_col="PassengerId", ) imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy( "mean" ) # Replace missing values in Age column with the mean value df.fillna( value={"Cabin": "None"}, inplace=True ) # Fill Cabin column with value "None" if missing df.dropna(inplace=True) # Drop the rows which still have any missing value df.to_csv( "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/data/wrangled", index_col="PassengerId", )
Nota
En este ejemplo de código de Python se usa
pyspark.pandas
, que solo es compatible con la versión 3.2 del entorno de ejecución de Spark.
Para limpiar y transformar los datos mediante el acceso a través de una entidad de servicio:
Compruebe que la entidad de servicio tiene las asignaciones de rolesColaborador y Colaborador de datos de Storage Blob en la cuenta de almacenamiento de Azure Data Lake Storage (ADLS) Gen 2.
Cree secretos de Azure Key Vault para el identificador de inquilino de la entidad de servicio, el identificador de cliente y los valores de secretos de cliente.
Seleccione Proceso de Spark sin servidor en Spark sin servidor de Azure Machine Learning en el menú de selección Proceso, o seleccione un grupo de Synapse Spark adjunto en Grupos de Synapse Spark en el menú de selección Proceso.
Para establecer el identificador de inquilino de la entidad de servicio, el identificador de cliente y el secreto de cliente en la configuración, ejecute el ejemplo de código siguiente.
La llamada
get_secret()
en el código depende del nombre de la instancia de Azure Key Vault y de los nombres de los secretos de Azure Key Vault creados para el id. de inquilino de la entidad de servicio, el id. de cliente y el secreto de cliente. El nombre o los valores de propiedad correspondientes que se van a establecer en la configuración son los siguientes:- Propiedad de id. de cliente:
fs.azure.account.oauth2.client.id.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net
- Propiedad de secreto de cliente:
fs.azure.account.oauth2.client.secret.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net
- Propiedad de id. de inquilino:
fs.azure.account.oauth2.client.endpoint.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net
- Valor de identificador de inquilino:
https://login.microsoftonline.com/<TENANT_ID>/oauth2/token
from pyspark.sql import SparkSession sc = SparkSession.builder.getOrCreate() token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary # Set up service principal tenant ID, client ID and secret from Azure Key Vault client_id = token_library.getSecret("<KEY_VAULT_NAME>", "<CLIENT_ID_SECRET_NAME>") tenant_id = token_library.getSecret("<KEY_VAULT_NAME>", "<TENANT_ID_SECRET_NAME>") client_secret = token_library.getSecret("<KEY_VAULT_NAME>", "<CLIENT_SECRET_NAME>") # Set up service principal which has access of the data sc._jsc.hadoopConfiguration().set( "fs.azure.account.auth.type.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net", "OAuth" ) sc._jsc.hadoopConfiguration().set( "fs.azure.account.oauth.provider.type.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider", ) sc._jsc.hadoopConfiguration().set( "fs.azure.account.oauth2.client.id.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net", client_id, ) sc._jsc.hadoopConfiguration().set( "fs.azure.account.oauth2.client.secret.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net", client_secret, ) sc._jsc.hadoopConfiguration().set( "fs.azure.account.oauth2.client.endpoint.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net", "https://login.microsoftonline.com/" + tenant_id + "/oauth2/token", )
- Propiedad de id. de cliente:
Importe y limpie y transforme los datos mediante el URI de datos con formato
abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/<PATH_TO_DATA>
, como se muestra en el ejemplo de código, mediante los datos de Titanic.
Importación y limpieza y transformación de datos de Azure Blob Storage
Puede acceder a los datos de Azure Blob Storage con la clave de acceso de la cuenta de almacenamiento o un token de firma de acceso compartido (SAS). Debe almacenar estas credenciales en Azure Key Vault como un secreto y establecerlas como propiedades en la configuración de la sesión.
Para iniciar la limpieza y transformación de datos interactivos:
En el panel izquierdo de Estudio de Azure Machine Learning, seleccione Notebooks.
Seleccione Proceso de Spark sin servidor en Spark sin servidor de Azure Machine Learning en el menú de selección Proceso, o seleccione un grupo de Synapse Spark adjunto en Grupos de Synapse Spark en el menú de selección Proceso.
Para configurar la clave de acceso de la cuenta de almacenamiento o un token de firma de acceso compartido (SAS) para el acceso a datos en los cuadernos de Azure Machine Learning:
Para la clave de acceso, establezca la propiedad
fs.azure.account.key.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net
como se muestra en este fragmento de código:from pyspark.sql import SparkSession sc = SparkSession.builder.getOrCreate() token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary access_key = token_library.getSecret("<KEY_VAULT_NAME>", "<ACCESS_KEY_SECRET_NAME>") sc._jsc.hadoopConfiguration().set( "fs.azure.account.key.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net", access_key )
Para el token de SAS, establezca la propiedad
fs.azure.sas.<BLOB_CONTAINER_NAME>.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net
como se muestra en este fragmento de código:from pyspark.sql import SparkSession sc = SparkSession.builder.getOrCreate() token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary sas_token = token_library.getSecret("<KEY_VAULT_NAME>", "<SAS_TOKEN_SECRET_NAME>") sc._jsc.hadoopConfiguration().set( "fs.azure.sas.<BLOB_CONTAINER_NAME>.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net", sas_token, )
Nota:
Las llamadas
get_secret()
en los fragmentos de código anteriores requieren el nombre de la instancia de Azure Key Vault y los nombres de los secretos creados para la clave de acceso de la cuenta de Azure Blob Storage o el token de SAS.
Ejecute el código de limpieza y transformación de datos en el mismo cuaderno. Aplique un formato al URI de datos tipo
wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/<PATH_TO_DATA>
que sea similar a este fragmento de código.import pyspark.pandas as pd from pyspark.ml.feature import Imputer df = pd.read_csv( "wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/data/titanic.csv", index_col="PassengerId", ) imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy( "mean" ) # Replace missing values in Age column with the mean value df.fillna( value={"Cabin": "None"}, inplace=True ) # Fill Cabin column with value "None" if missing df.dropna(inplace=True) # Drop the rows which still have any missing value df.to_csv( "wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/data/wrangled", index_col="PassengerId", )
Nota
En este ejemplo de código de Python se usa
pyspark.pandas
, que solo es compatible con la versión 3.2 del entorno de ejecución de Spark.
Importación y limpieza y transformación de datos desde el almacén de datos de Azure Machine Learning
Para acceder a los datos desde el almacén de datos de Azure Machine Learning, defina una ruta de acceso a los datos en el almacén de datos con formato de URIazureml://datastores/<DATASTORE_NAME>/paths/<PATH_TO_DATA>
. Para limpiar y transformar datos de un almacén de datos de Azure Machine Learning en una sesión de Notebooks de forma interactiva:
Seleccione Proceso de Spark sin servidor en Spark sin servidor de Azure Machine Learning en el menú de selección Proceso, o seleccione un grupo de Synapse Spark adjunto en Grupos de Synapse Spark en el menú de selección Proceso.
En este ejemplo de código se muestra cómo leer y limpiar y transformar datos de Titanic desde un almacén de datos de Azure Machine Learning, mediante el URI del almacén de datos
azureml://
,pyspark.pandas
ypyspark.ml.feature.Imputer
.import pyspark.pandas as pd from pyspark.ml.feature import Imputer df = pd.read_csv( "azureml://datastores/workspaceblobstore/paths/data/titanic.csv", index_col="PassengerId", ) imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy( "mean" ) # Replace missing values in Age column with the mean value df.fillna( value={"Cabin": "None"}, inplace=True ) # Fill Cabin column with value "None" if missing df.dropna(inplace=True) # Drop the rows which still have any missing value df.to_csv( "azureml://datastores/workspaceblobstore/paths/data/wrangled", index_col="PassengerId", )
Nota
En este ejemplo de código de Python se usa
pyspark.pandas
, que solo es compatible con la versión 3.2 del entorno de ejecución de Spark.
Los almacenes de datos de Azure Machine Learning pueden acceder a los datos mediante las credenciales de la cuenta de almacenamiento de Azure
- clave de acceso
- Token de SAS
- entidad de servicio
o proporcionar acceso a datos sin credenciales. Según el tipo de almacén de datos y el tipo de cuenta de almacenamiento de Azure subyacente, adopte un mecanismo de autenticación adecuado para garantizar el acceso a los datos. En esta tabla se resumen los mecanismos de autenticación para acceder a los datos de los almacenes de datos de Azure Machine Learning:
Tipo de cuenta de almacenamiento | Acceso a datos sin credenciales | Mecanismo de acceso a datos | Asignaciones de roles |
---|---|---|---|
Blob de Azure | No | Clave de acceso o token de SAS | No se necesitan asignaciones de roles |
Blob de Azure | Sí | Paso a través de la identidad del usuario* | La identidad de usuario debe tener las asignaciones de roles adecuadas en la cuenta de Azure Blob Storage |
Azure Data Lake Storage (ADLS) Gen 2 | No | Entidad de servicio | La entidad de servicio debe tener las asignaciones de roles adecuadas en la cuenta de almacenamiento de Azure Data Lake Storage (ADLS) Gen 2 |
Azure Data Lake Storage (ADLS) Gen 2 | Sí | Paso a través de la identidad del usuario | La identidad de usuario debe tener las asignaciones de roles adecuadas en la cuenta de almacenamiento de Azure Data Lake Storage (ADLS) Gen 2 |
El paso a través de identidad de usuario * funciona para almacenes de datos sin credenciales que apuntan a cuentas de Azure Blob Storage, solo si la eliminación temporal no está habilitada.
Acceso a datos en el recurso compartido de archivos predeterminado
El recurso compartido de archivos predeterminado se monta en el proceso de Spark sin servidor y en los grupos de Synapse Spark conectados.
En Estudio de Azure Machine Learning, los archivos del recurso compartido de archivos predeterminado se muestran en el árbol de directorios de la pestaña Archivos. El código del cuaderno puede acceder directamente a los archivos almacenados en este recurso compartido de archivos con el protocolo file://
, junto con la ruta de acceso absoluta del archivo, sin configuraciones adicionales. Este fragmento de código muestra cómo acceder a un archivo almacenado en el recurso compartido de archivos predeterminado:
import os
import pyspark.pandas as pd
from pyspark.ml.feature import Imputer
abspath = os.path.abspath(".")
file = "file://" + abspath + "/Users/<USER>/data/titanic.csv"
print(file)
df = pd.read_csv(file, index_col="PassengerId")
imputer = Imputer(
inputCols=["Age"],
outputCol="Age").setStrategy("mean") # Replace missing values in Age column with the mean value
df.fillna(value={"Cabin" : "None"}, inplace=True) # Fill Cabin column with value "None" if missing
df.dropna(inplace=True) # Drop the rows which still have any missing value
output_path = "file://" + abspath + "/Users/<USER>/data/wrangled"
df.to_csv(output_path, index_col="PassengerId")
Nota
En este ejemplo de código de Python se usa pyspark.pandas
, que solo es compatible con la versión 3.2 del entorno de ejecución de Spark.