Introducción a Fabric MSSparkUtils
Las utilidades de Spark para Microsoft (MSSparkUtils) son un paquete integrado que le ayuda a realizar las tareas más comunes con mayor facilidad. Puede usarMSSparkUtils para trabajar con sistemas de archivos, para obtener variables de entorno, encadenar cuadernos juntos y trabajar con secretos. MSSparkUtils están disponibles en PySpark (Python) Scala, cuadernos de SparkR y canalizaciones de Microsoft Fabric.
Importante
Microsoft Fabric está en versión preliminar.
Utilidades del sistema de archivos
mssparkutils.fs proporciona utilidades para trabajar con varios sistemas de archivos, incluidos Azure Data Lake Storage Gen2 (ADLS Gen2) y Azure Blob Storage. Asegúrese de configurar el acceso a Azure Data Lake Storage Gen2 y Azure Blob Storage adecuadamente.
Ejecute los siguientes comandos para obtener información general sobre los métodos disponibles:
from notebookutils import mssparkutils
mssparkutils.fs.help()
Salida
mssparkutils.fs provides utilities for working with various FileSystems.
Below is overview about the available methods:
cp(from: String, to: String, recurse: Boolean = false): Boolean -> Copies a file or directory, possibly across FileSystems
mv(from: String, to: String, recurse: Boolean = false): Boolean -> Moves a file or directory, possibly across FileSystems
ls(dir: String): Array -> Lists the contents of a directory
mkdirs(dir: String): Boolean -> Creates the given directory if it does not exist, also creating any necessary parent directories
put(file: String, contents: String, overwrite: Boolean = false): Boolean -> Writes the given String out to a file, encoded in UTF-8
head(file: String, maxBytes: int = 1024 * 100): String -> Returns up to the first 'maxBytes' bytes of the given file as a String encoded in UTF-8
append(file: String, content: String, createFileIfNotExists: Boolean): Boolean -> Append the content to a file
rm(dir: String, recurse: Boolean = false): Boolean -> Removes a file or directory
exists(file: String): Boolean -> Check if a file or directory exists
mount(source: String, mountPoint: String, extraConfigs: Map[String, Any]): Boolean -> Mounts the given remote storage directory at the given mount point
unmount(mountPoint: String): Boolean -> Deletes a mount point
mounts(): Array[MountPointInfo] -> Show information about what is mounted
getMountPath(mountPoint: String, scope: String = ""): String -> Gets the local path of the mount point
Use mssparkutils.fs.help("methodName") for more info about a method.
mssparkutils funciona con el sistema de archivos de la misma manera que las API de Spark. Use mssparkuitls.fs.mkdirs() y el uso de Fabric Lakehouse, por ejemplo:
Uso | Ruta de acceso relativa desde la raíz de HDFS | Ruta de acceso absoluta para el sistema de archivos ABFS | Ruta de acceso absoluta para el sistema de archivos local en el nodo del controlador |
---|---|---|---|
Lakehouse no predeterminado | No compatible | mssparkutils.fs.mkdirs("abfss://< container_name>@<storage_account_name.dfs.core.windows.net/<> new_dir>") | mssparkutils.fs.mkdirs("file:/<new_dir>") |
Lakehouse predeterminado | Directorio en "Files" o "Tables": mssparkutils.fs.mkdirs("Files/<new_dir>") | mssparkutils.fs.mkdirs("abfss://< container_name>@<storage_account_name.dfs.core.windows.net/<> new_dir>") | mssparkutils.fs.mkdirs("file:/<new_dir>") |
Enumerar archivos
Enumere el contenido de un directorio, use mssparkutils.fs.ls("Ruta de acceso del directorio"), por ejemplo:
mssparkutils.fs.ls("Files/tmp") # works with the default lakehouse files using relative path
mssparkutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<path>") # based on ABFS file system
mssparkutils.fs.ls("file:/tmp") # based on local file system of driver node
Vea las propiedades del archivo.
Devuelve propiedades de archivo, como el nombre de archivo, la ruta de acceso del archivo, el tamaño del archivo y si es un directorio y un archivo.
files = mssparkutils.fs.ls('Your directory path')
for file in files:
print(file.name, file.isDir, file.isFile, file.path, file.size)
Creación de un directorio
Crea el directorio especificado si no existe y los directorios primarios necesarios.
mssparkutils.fs.mkdirs('new directory name')
mssparkutils.fs. mkdirs("Files/<new_dir>") # works with the default lakehouse files using relative path
mssparkutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<new_dir>") # based on ABFS file system
mssparkutils.fs.ls("file:/<new_dir>") # based on local file system of driver node
Copiar archivo
Copia un archivo o un directorio. Admite la copia entre sistemas de archivos.
mssparkutils.fs.cp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively
Vista previa del contenido del archivo
Devuelve hasta los primeros "maxBytes" bytes del archivo especificado como una cadena codificada en UTF-8.
mssparkutils.fs.head('file path', maxBytes to read)
Mover archivo
Mueve un archivo o un directorio. Permite el movimiento entre sistemas de archivos.
mssparkutils.fs.mv('source file or directory', 'destination directory', True) # Set the last parameter as True to firstly create the parent directory if it does not exist
Escritura de archivos
Escribe la cadena especificada en un archivo, codificada en formato UTF-8. Escribe la cadena especificada en un archivo, codificada en formato UTF-8.
mssparkutils.fs.put("file path", "content to write", True) # Set the last parameter as True to overwrite the file if it existed already
Adición de contenido a un archivo
Anexa la cadena especificada a un archivo, codificada en formato UTF-8.
mssparkutils.fs.append("file path", "content to append", True) # Set the last parameter as True to create the file if it does not exist
Eliminación de un archivo o directorio
Quita un archivo o un directorio.
mssparkutils.fs.rm('file path', True) # Set the last parameter as True to remove all files and directories recursively
Montaje o desmontaje del directorio
Puede encontrar el uso detallado en Montaje y desmontaje de archivos.
Utilidades de cuaderno
Use las utilidades del cuaderno MSSparkUtils para ejecutar un cuaderno o salir de un cuaderno con un valor. Ejecute el siguiente comando para obtener información general sobre los métodos disponibles:
mssparkutils.notebook.help()
Salida:
exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map): String -> This method runs a notebook and returns its exit value.
Referencia a un cuaderno
Hace referencia un cuaderno y devuelve su valor de salida. Puede ejecutar llamadas de función anidadas en un cuaderno de manera interactiva o en una canalización. El cuaderno al que se hace referencia se ejecuta en el grupo de Spark del que el cuaderno llama a esta función.
mssparkutils.notebook.run("notebook name", <timeoutSeconds>, <parameterMap>)
Por ejemplo:
mssparkutils.notebook.run("Sample1", 90, {"input": 20 })
Puede abrir el vínculo de instantánea de la ejecución de referencia en la salida de la celda, la instantánea captura los resultados de la ejecución del código y le permite depurar fácilmente una ejecución de referencia.
Nota
Actualmente, el cuaderno de Fabric solo admite la referencia a cuadernos dentro de un área de trabajo.
Salida de un cuaderno
Sale de un cuaderno con un valor. Puede ejecutar llamadas de función anidadas en un cuaderno de manera interactiva o en una canalización.
- Cuando se llama a una función exit() desde un cuaderno de forma interactiva, fabric notebook produce una excepción, omite la ejecución de celdas de subsecuencia y mantiene activa la sesión de Spark.
- Al orquestar un cuaderno en una canalización que llama a una función exit(), la actividad notebook devolverá con un valor de salida, completará la ejecución de la canalización y detendrá la sesión de Spark.
- Cuando se llama a una función exit() en un cuaderno al que se hace referencia, Fabric Spark detendrá la ejecución posterior del cuaderno al que se hace referencia y seguirá ejecutando las celdas siguientes en el cuaderno principal que llama a la función run(). Por ejemplo: Notebook1 tiene tres celdas y llama a una función exit() en la segunda celda. Notebook2 tiene cinco celdas y llama a run(notebook1) en la tercera celda. Al ejecutar Notebook2, Notebook1 se detiene en la segunda celda al presionar la función exit(). Notebook2 sigue ejecutando su cuarta celda y quinta celda.
mssparkutils.notebook.exit("value string")
Por ejemplo:
Cuaderno sample1 con dos celdas siguientes:
La celda 1 define un parámetro de entrada con el valor predeterminado establecido en 10.
La celda 2 sale del cuaderno con entrada como valor de salida.
Puede ejecutar Sample1 en otro cuaderno con los valores predeterminados:
exitVal = mssparkutils.notebook.run("Sample1")
print (exitVal)
Salida:
Notebook executed successfully with exit value 10
Puede ejecutar Sample1 en otro cuaderno y establecer el valor de input en 20:
exitVal = mssparkutils.notebook.run("Sample1", 90, {"input": 20 })
print (exitVal)
Salida:
Notebook executed successfully with exit value 20
Utilidades de credenciales
Puede usar las utilidades de credenciales de MSSparkUtils para obtener los tokens de acceso y administrar secretos en Azure Key Vault.
Ejecute el siguiente comando para obtener información general sobre los métodos disponibles:
mssparkutils.credentials.help()
Salida:
getToken(audience, name): returns AAD token for a given audience, name (optional)
getSecret(akvName, secret): returns AKV secret for a given akvName, secret key
Obtener el token
Devuelve el token de Azure AD para una audiencia determinada, el nombre (opcional), en la lista siguiente se muestran las claves de audiencia disponibles actualmente:
- Recurso de audiencia de almacenamiento: "almacenamiento"
- Recurso de Power BI: "pbi"
- Recurso de Azure Key Vault: "keyvault"
- Recurso de base de datos KQL de Synapse RTA: "kusto"
Ejecute el siguiente comando para obtener el token:
mssparkutils.credentials.getToken('audience Key')
Obtención del secreto con las credenciales de usuario
Devuelve el secreto de Azure Key Vault para un nombre de instancia de Azure Key Vault, un nombre de secreto y un nombre de servicio vinculado concretos mediante las credenciales del usuario.
mssparkutils.credentials.getSecret('azure key vault name','secret name')
Montaje y desmontaje de archivos
Microsoft Fabric admite escenarios de montaje en el paquete de utilidades de Microsoft Spark. Puede usar las API de montaje, desmontaje, getMountPath() y mounts() para adjuntar almacenamiento remoto (Azure Data Lake Storage Gen2) a todos los nodos de trabajo (nodo de controlador y nodos de trabajo). Una vez implementado el punto de montaje de almacenamiento, use la API de archivo local para acceder a los datos como si estuviera almacenados en el sistema de archivos local.
Montaje de una cuenta de ADLS Gen2
En esta sección se muestra cómo montar Azure Data Lake Storage Gen2 paso a paso como ejemplo. El montaje de Blob Storage funciona de forma similar.
En el ejemplo se supone que tiene una cuenta Data Lake Storage Gen2 denominada storegen2. La cuenta tiene un contenedor denominado mycontainer que quiere montar en /test en la sesión de Spark del cuaderno.
Para montar el contenedor denominado mycontainer, mssparkutils primero debe comprobar si tiene permiso para acceder al contenedor. Actualmente, Microsoft Fabric admite dos métodos de autenticación para la operación de montaje del desencadenador: accountKey y sastoken.
Montaje con un token de firma de acceso compartido o una clave de cuenta
Mssparkutils admite el paso explícito de una clave de cuenta o un token de firma de acceso compartido (SAS) como parámetro para montar el destino.
Por motivos de seguridad, se recomienda almacenar las claves de cuenta o los tokens de SAS en Azure Key Vault (como se muestra en la siguiente captura de pantalla de ejemplo). Después, puede recuperarlos mediante la API mssparkutils.credentials.getSecret . Para conocer el uso de Azure Key Vault, consulte Acerca de Azure Key Vault claves de cuenta de almacenamiento administradas.
Este es el código de ejemplo del uso del método accountKey:
from notebookutils import mssparkutils
# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
accountKey = mssparkutils.credentials.getSecret("<vaultURI>", "<secretName>")
mssparkutils.fs.mount(
"abfss://mycontainer@<accountname>.dfs.core.windows.net",
"/test",
{"accountKey":accountKey}
)
Para sastoken, haga referencia al código de ejemplo siguiente:
from notebookutils import mssparkutils
# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
sasToken = mssparkutils.credentials.getSecret("<vaultURI>", "<secretName>")
mssparkutils.fs.mount(
"abfss://mycontainer@<accountname>.dfs.core.windows.net",
"/test",
{"sasToken":sasToken}
)
Nota
Por motivos de seguridad, no se recomienda almacenar las credenciales en el código. Para proteger aún más las credenciales, censuraremos el secreto en la salida del cuaderno. Para obtener más detalles, consulte Redacción secreta.
Cómo montar una casa de lago
Este es el código de ejemplo de montaje de una instancia de LakeHouse en /test.
from notebookutils import mssparkutils
mssparkutils.fs.mount(
"abfss://<workspace_id>@msit-onelake.dfs.fabric.microsoft.com/<lakehouse_id>",
"/test"
)
Acceso a archivos en el punto de montaje mediante mssparktuils fs API
El objetivo principal de la operación de montaje es permitir a los clientes acceder a los datos almacenados en una cuenta de almacenamiento remoto usando una API del sistema de archivos local. También puede acceder a los datos mediante mssparkutils fs API con una ruta de acceso montada como parámetro. El formato de ruta de acceso que se usa aquí es un poco diferente.
Supongamos que ha montado el contenedor de Data Lake Storage Gen2 mycontainer en /test mediante la API de montaje. Cuando accede a los datos usando la API del sistema de archivos local, el formato de la ruta de acceso es como el siguiente:
/synfs/notebook/{sessionId}/test/{filename}
Cuando quiera acceder a los datos mediante mssparkutils fs API, se recomienda usar getMountPath() para obtener la ruta de acceso precisa:
path = mssparkutils.fs.getMountPath("/test")
Enumerar directorios:
mssparkutils.fs.ls(f"file://{mssparkutils.fs.getMountPath('/test')}")
Leer el contenido de archivos:
mssparkutils.fs.head(f"file://{mssparkutils.fs.getMountPath('/test')}/myFile.txt")
Crear un directorio:
mssparkutils.fs.mkdirs(f"file://{mssparkutils.fs.getMountPath('/test')}/newdir")
Acceso a los archivos en el punto de montaje a través de la ruta de acceso local
Puede leer y escribir fácilmente los archivos en el punto de montaje mediante el sistema de archivos estándar, use Python como ejemplo:
#File read
with open(mssparkutils.fs.getMountPath('/test2') + "/myFile.txt", "r") as f:
print(f.read())
#File write
with open(mssparkutils.fs.getMountPath('/test2') + "/myFile.txt", "w") as f:
print(f.write("dummy data"))
Cómo comprobar los puntos de montaje existentes
Puede usar mssparkutils.fs.mounts() API para comprobar toda la información del punto de montaje existente:
mssparkutils.fs.mounts()
Desmontaje del punto de montaje.
Use el código siguiente para desmontar el punto de montaje (/test en este ejemplo):
mssparkutils.fs.unmount("/test")
Restricciones conocidas
- El montaje actual es una configuración de nivel de trabajo, se recomienda usar la API de montajes para comprobar si el punto de montaje existe o no está disponible.
- El mecanismo de desmontaje no es automático. Cuando termine la ejecución de la aplicación, para desmontar el punto de montaje y liberar el espacio en disco, debe llamar explícitamente a una API de desmontaje en el código. De lo contrario, el punto de montaje seguirá existiendo en el nodo cuando termine la ejecución de la aplicación.
- No se admite el montaje de una cuenta de almacenamiento de ADLS Gen1.