Introduction de l’infrastructure MSSparkUtils

Microsoft Spark Utilities (MSSparkUtils) est un package intégré qui vous permet d’effectuer aisément des tâches courantes. Vous pouvez utiliserMSSparkUtils pour utiliser des systèmes de fichiers, obtenir des variables d’environnement, chaîner des notebooks ensemble et utiliser des secrets. MSSparkUtils est disponible dans PySpark (Python) Scala, les notebooks SparkR et les pipelines Microsoft Fabric.

Important

Microsoft Fabric est actuellement en préversion. Certaines informations portent sur un produit en préversion susceptible d’être substantiellement modifié avant sa publication. Microsoft ne donne aucune garantie, expresse ou implicite, concernant les informations fournies ici.

Utilitaires du système de fichiers

mssparkutils.fs fournit des utilitaires permettant d’utiliser différents systèmes de fichiers, notamment Azure Data Lake Storage Gen2 (ADLS Gen2) et Stockage Blob Azure. Veillez à configurer l’accès à Azure Data Lake Storage Gen2 et Stockage Blob Azure de manière appropriée.

Exécutez les commandes suivantes pour obtenir une vue d’ensemble des méthodes disponibles :

from notebookutils import mssparkutils
mssparkutils.fs.help()

Sortie

mssparkutils.fs provides utilities for working with various FileSystems.

Below is overview about the available methods:

cp(from: String, to: String, recurse: Boolean = false): Boolean -> Copies a file or directory, possibly across FileSystems
mv(from: String, to: String, recurse: Boolean = false): Boolean -> Moves a file or directory, possibly across FileSystems
ls(dir: String): Array -> Lists the contents of a directory
mkdirs(dir: String): Boolean -> Creates the given directory if it does not exist, also creating any necessary parent directories
put(file: String, contents: String, overwrite: Boolean = false): Boolean -> Writes the given String out to a file, encoded in UTF-8
head(file: String, maxBytes: int = 1024 * 100): String -> Returns up to the first 'maxBytes' bytes of the given file as a String encoded in UTF-8
append(file: String, content: String, createFileIfNotExists: Boolean): Boolean -> Append the content to a file
rm(dir: String, recurse: Boolean = false): Boolean -> Removes a file or directory
exists(file: String): Boolean -> Check if a file or directory exists
mount(source: String, mountPoint: String, extraConfigs: Map[String, Any]): Boolean -> Mounts the given remote storage directory at the given mount point
unmount(mountPoint: String): Boolean -> Deletes a mount point
mounts(): Array[MountPointInfo] -> Show information about what is mounted
getMountPath(mountPoint: String, scope: String = ""): String -> Gets the local path of the mount point

Use mssparkutils.fs.help("methodName") for more info about a method.

mssparkutils fonctionne avec le système de fichiers de la même manière que les API Spark. Prenez mssparkuitls.fs.mkdirs() et l’utilisation de Fabric Lakehouse par exemple :

Utilisation Chemin relatif à partir de la racine HDFS Chemin d’accès absolu pour le système de fichiers ABFS Chemin d’accès absolu pour le système de fichiers local dans le nœud du pilote
Étang non par défaut Non prise en charge mssparkutils.fs.mkdirs(« abfss://< container_name>@<storage_account_name.dfs.core.windows.net/<> new_dir> ») mssparkutils.fs.mkdirs(« file:/<new_dir> »)
Lakehouse par défaut Répertoire sous « Files » ou « Tables » : mssparkutils.fs.mkdirs(« Files/<new_dir> ») mssparkutils.fs.mkdirs(« abfss://< container_name>@<storage_account_name.dfs.core.windows.net/<> new_dir> ») mssparkutils.fs.mkdirs(« file:/<new_dir> »)

Énumérer des fichiers

Répertoriez le contenu d’un répertoire, utilisez mssparkutils.fs.ls('Chemin de votre répertoire'), par exemple :

mssparkutils.fs.ls("Files/tmp") # works with the default lakehouse files using relative path 
mssparkutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<path>")  # based on ABFS file system 
mssparkutils.fs.ls("file:/tmp")  # based on local file system of driver node 

Affichez les propriétés de fichier

Retourne les propriétés de fichier, notamment le nom du fichier, le chemin d’accès du fichier, la taille du fichier et s’il s’agit d’un répertoire et d’un fichier.

files = mssparkutils.fs.ls('Your directory path')
for file in files:
    print(file.name, file.isDir, file.isFile, file.path, file.size)

Créer un répertoire

Crée le répertoire donné s’il n’existe pas et les répertoires parents nécessaires.

mssparkutils.fs.mkdirs('new directory name')  
mssparkutils.fs. mkdirs("Files/<new_dir>")  # works with the default lakehouse files using relative path 
mssparkutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<new_dir>")  # based on ABFS file system 
mssparkutils.fs.ls("file:/<new_dir>")  # based on local file system of driver node 

Copier le fichier

Copie un fichier ou un répertoire. Prend en charge la copie sur plusieurs systèmes de fichiers.

mssparkutils.fs.cp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively

Afficher un aperçu du contenu du fichier

Renvoie jusqu’aux premiers octets « maxBytes » du fichier donné sous la forme d’une chaîne encodée au format UTF-8.

mssparkutils.fs.head('file path', maxBytes to read)

Déplacer le fichier

Déplace un fichier ou un répertoire. Prend en charge le déplacement sur plusieurs systèmes de fichiers.

mssparkutils.fs.mv('source file or directory', 'destination directory', True) # Set the last parameter as True to firstly create the parent directory if it does not exist

Écrire dans un fichier

Écrit la chaîne donnée dans un fichier encodé au format UTF-8. Écrit la chaîne donnée dans un fichier encodé au format UTF-8.

mssparkutils.fs.put("file path", "content to write", True) # Set the last parameter as True to overwrite the file if it existed already

Ajouter du contenu à un fichier

Ajoute la chaîne donnée à un fichier, encodé au format UTF-8.

mssparkutils.fs.append("file path", "content to append", True) # Set the last parameter as True to create the file if it does not exist

Supprimer un fichier ou un répertoire

Déplace un fichier ou un répertoire.

mssparkutils.fs.rm('file path', True) # Set the last parameter as True to remove all files and directories recursively

Monter/démonter le répertoire

Vous trouverez l’utilisation détaillée dans Montage et démontage de fichiers.

Utilitaires de notebook

Utilisez les utilitaires MSSparkUtils Notebook pour exécuter un notebook ou quitter un bloc-notes avec une valeur. Exécutez la commande suivante pour obtenir une vue d’ensemble des méthodes disponibles :

mssparkutils.notebook.help()

Output:


exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map): String -> This method runs a notebook and returns its exit value.

Référencer un notebook

Référence un notebook et renvoie sa valeur de sortie. Vous pouvez exécuter des appels de fonction d’imbrication dans un notebook de manière interactive ou dans un pipeline. Le notebook référencé s’exécute sur le pool Spark dont le notebook appelle cette fonction.

mssparkutils.notebook.run("notebook name", <timeoutSeconds>, <parameterMap>)

Par exemple :

mssparkutils.notebook.run("Sample1", 90, {"input": 20 })

Vous pouvez ouvrir le lien instantané de l’exécution de référence dans la sortie de cellule, le instantané capture les résultats de l’exécution du code et vous permet de déboguer facilement une exécution de référence.

Capture d’écran du résultat de l’exécution de référence.

Capture d’écran d’un exemple de instantané.

Notes

Actuellement, le notebook Fabric prend uniquement en charge le référencement des notebooks au sein d’un espace de travail.

Quitter un notebook

Quitte un notebook avec une valeur. Vous pouvez exécuter des appels de fonction d’imbrication dans un notebook de manière interactive ou dans un pipeline.

  • Lorsque vous appelez une fonction exit() à partir d’un notebook de manière interactive, le bloc-notes Fabric lève une exception, ignore l’exécution des cellules de sous-séquence et maintient la session Spark en vie.
  • Lorsque vous orchestrez un notebook dans le pipeline qui appelle une fonction exit(), l’activité Notebook retourne avec une valeur de sortie, termine l’exécution du pipeline et arrête la session Spark.
  • Lorsque vous appelez une fonction exit() dans un bloc-notes référencé, Fabric Spark arrête l’exécution ultérieure du bloc-notes référencé et continue à exécuter les cellules suivantes dans le main notebook qui appelle la fonction run(). Par exemple : Notebook1 a trois cellules et appelle une fonction exit() dans la deuxième cellule. Notebook2 a cinq cellules et appelle run(notebook1) dans la troisième cellule. Lorsque vous exécutez Notebook2, Notebook1 s’arrête à la deuxième cellule lorsque vous appuyez sur la fonction exit(). Notebook2 continue d’exécuter sa quatrième cellule et sa cinquième cellule.
mssparkutils.notebook.exit("value string")

Par exemple :

Exemple1 de notebook avec les deux cellules suivantes :

  • La cellule 1 définit un paramètre d’entrée dont la valeur par défaut est définie sur 10.

  • La cellule 2 quitte le bloc-notes avec une entrée comme valeur de sortie.

Capture d’écran montrant un exemple de notebook de la fonction de sortie.

Vous pouvez exécuter Sample1 dans un autre bloc-notes avec les valeurs par défaut :

exitVal = mssparkutils.notebook.run("Sample1")
print (exitVal)

Output:

Notebook executed successfully with exit value 10

Vous pouvez exécuter Sample1 dans un autre notebook et définir la valeur d’entrée sur 20 :

exitVal = mssparkutils.notebook.run("Sample1", 90, {"input": 20 })
print (exitVal)

Output:

Notebook executed successfully with exit value 20

Gestion des sessions

Arrêter une session interactive

Au lieu de sélectionner manuellement le bouton Arrêter, il est parfois plus pratique d’arrêter une session interactive en appelant une API dans le code. Dans ce cas, nous fournissons une API mssparkutils.session.stop() pour prendre en charge l’arrêt de la session interactive via du code. Elle est disponible pour Scala et Python.

mssparkutils.session.stop()

mssparkutils.session.stop() L’API arrête la session interactive actuelle de manière asynchrone en arrière-plan, elle arrête la session Spark et libère les ressources occupées par la session afin qu’elles soient disponibles pour d’autres sessions du même pool.

Notes

Nous vous déconseillons d’appeler des API intégrées au langage comme sys.exit dans Scala ou sys.exit() dans Python dans votre code, car ces API tuent simplement le processus d’interpréteur, laissant la session Spark active et les ressources non publiées.

Utilitaires d’informations d’identification

Vous pouvez utiliser les utilitaires MSSparkUtils Credentials pour obtenir les jetons d’accès et gérer les secrets dans Azure Key Vault.

Exécutez la commande suivante pour obtenir une vue d’ensemble des méthodes disponibles :

mssparkutils.credentials.help()

Output:

getToken(audience, name): returns AAD token for a given audience, name (optional)
getSecret(akvName, secret): returns AKV secret for a given akvName, secret key

Obtenir un jeton

Retourne le jeton Azure AD pour un public donné, nom (facultatif). La liste ci-dessous montre les clés d’audience actuellement disponibles :

  • Ressource d’audience de stockage : « stockage »
  • Ressource Power BI : « pbi »
  • Ressource Azure Key Vault : « coffre de clés »
  • Ressource de base de données KQL Synapse RTA : « kusto »

Exécutez la commande suivante pour obtenir le jeton :

mssparkutils.credentials.getToken('audience Key')

Obtenir le secret à l’aide des informations d’identification de l’utilisateur

Renvoie le secret Azure Key Vault pour un nom Azure Key Vault donné, un nom de secret et un nom de service lié à l’aide des informations d’identification de l’utilisateur.

mssparkutils.credentials.getSecret('azure key vault name','secret name')

Montage et démontage de fichiers

Microsoft Fabric prend en charge les scénarios de montage dans le package Utilitaires Microsoft Spark. Vous pouvez utiliser les API mount, unmount, getMountPath() et mounts() pour attacher le stockage distant (Azure Data Lake Storage Gen2) à tous les nœuds de travail (nœud de pilote et nœuds Worker). Une fois le point de montage de stockage en place, utilisez l’API de fichier local pour accéder aux données comme si elles sont stockées dans le système de fichiers local.

Comment monter un compte ADLS Gen2

Cette section montre comment monter Azure Data Lake Storage Gen2 étape par étape à titre d’exemple. Le montage de Stockage Blob fonctionne de la même façon.

L’exemple suppose que vous disposez d’un compte Data Lake Storage Gen2 nommé storegen2. Le compte a un conteneur nommé mycontainer que vous souhaitez monter sur /test dans votre session Spark de notebook.

Capture d’écran montrant où sélectionner un conteneur à monter.

Pour monter le conteneur appelé mycontainer, mssparkutils doit d’abord case activée si vous avez l’autorisation d’accéder au conteneur. Actuellement, Microsoft Fabric prend en charge deux méthodes d’authentification pour l’opération de montage du déclencheur : accountKey et sastoken.

Montage via un jeton de signature d’accès partagé ou une clé de compte

Mssparkutils prend en charge le passage explicite d’une clé de compte ou d’un jeton de signature d’accès partagé (SAS) en tant que paramètre pour monter la cible.

Pour des raisons de sécurité, nous vous recommandons de stocker des clés de compte ou des jetons SAP dans Azure Key Vault (comme l’illustre l’exemple de capture d’écran suivant). Vous pouvez ensuite les récupérer à l’aide de l’API mssparkutils.credentials.getSecret . Pour l’utilisation d’Azure Key Vault, consultez À propos d’Azure Key Vault clés de compte de stockage managées.

Capture d’écran montrant où les secrets stockés dans un Key Vault Azure.

Voici l’exemple de code de l’utilisation de la méthode accountKey :

from notebookutils import mssparkutils  
# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
accountKey = mssparkutils.credentials.getSecret("<vaultURI>", "<secretName>")
mssparkutils.fs.mount(  
    "abfss://mycontainer@<accountname>.dfs.core.windows.net",  
    "/test",  
    {"accountKey":accountKey}
)

Pour sastoken, référencez l’exemple de code suivant :

from notebookutils import mssparkutils  
# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
sasToken = mssparkutils.credentials.getSecret("<vaultURI>", "<secretName>")
mssparkutils.fs.mount(  
    "abfss://mycontainer@<accountname>.dfs.core.windows.net",  
    "/test",  
    {"sasToken":sasToken}
)

Notes

Pour des raisons de sécurité, il n’est pas recommandé de stocker les informations d’identification dans le code. Pour mieux protéger vos informations d’identification, nous allons éditer votre secret dans la sortie du notebook. Pour plus d’informations, case activée rédaction du secret.

Comment monter un lakehouse

Voici l’exemple de code de montage d’un lakehouse sur /test.

from notebookutils import mssparkutils 
mssparkutils.fs.mount( 
 "abfss://<workspace_id>@msit-onelake.dfs.fabric.microsoft.com/<lakehouse_id>", 
 "/test"
)

Accéder aux fichiers sous le point de montage à l’aide de l’API mssparktuils fs

L'objectif principal de l'opération de montage est de permettre aux clients d'accéder aux données stockées dans un compte de stockage distant en utilisant une API de système de fichiers local. Vous pouvez également accéder aux données à l’aide de l’API mssparkutils fs avec un chemin monté en tant que paramètre. Le format de chemin utilisé ici est un peu différent.

Supposons que vous avez monté le conteneur Data Lake Storage Gen2 mycontainer sur /test à l’aide de l’API de montage. Lorsque vous accédez aux données à l’aide de l’API de système de fichiers local, le format du chemin d’accès est le suivant :

/synfs/notebook/{sessionId}/test/{filename}

Lorsque vous souhaitez accéder aux données à l’aide de l’API mssparkutils fs, nous vous recommandons d’utiliser un getMountPath() pour obtenir le chemin d’accès précis :

path = mssparkutils.fs.getMountPath("/test")
  • Lister des répertoires :

    mssparkutils.fs.ls(f"file://{mssparkutils.fs.getMountPath('/test')}")
    
  • Lisez le contenu du fichier :

    mssparkutils.fs.head(f"file://{mssparkutils.fs.getMountPath('/test')}/myFile.txt")
    
  • Créer un répertoire :

    mssparkutils.fs.mkdirs(f"file://{mssparkutils.fs.getMountPath('/test')}/newdir")
    

Accéder aux fichiers sous le point de montage via le chemin d’accès local

Vous pouvez facilement lire et écrire les fichiers dans le point de montage à l’aide du système de fichiers standard. Utilisez Python comme exemple :

#File read
with open(mssparkutils.fs.getMountPath('/test2') + "/myFile.txt", "r") as f:
    print(f.read())
#File write
with open(mssparkutils.fs.getMountPath('/test2') + "/myFile.txt", "w") as f:
    print(f.write("dummy data"))

Guide pratique pour case activée points de montage existants

Vous pouvez utiliser l’API mssparkutils.fs.mounts() pour case activée toutes les informations de point de montage existantes :

mssparkutils.fs.mounts()

Comment démonter le point de montage

Utilisez le code suivant pour démonter votre point de montage (/test dans cet exemple) :

mssparkutils.fs.unmount("/test")

Limitations connues

  • Le montage actuel est une configuration au niveau du travail. Nous vous recommandons d’utiliser l’API mounts pour case activée si un point de montage existe ou n’est pas disponible.
  • Le mécanisme de démontage n’est pas automatique. Une fois l’exécution de l’application terminée, pour démonter le point de montage afin de libérer l’espace disque, vous devez appeler explicitement une API de démontage dans votre code. Sinon, le point de montage existera toujours dans le nœud une fois l’exécution de l’application terminée.
  • Le montage d’un compte de stockage ADLS Gen1 n’est pas pris en charge.

Étapes suivantes