Microsoft Spark Utilities (MSSparkUtils) för Fabric

Microsoft Spark Utilities (MSSparkUtils) är ett inbyggt paket som hjälper dig att enkelt utföra vanliga uppgifter. Du kan använda MSSparkUtils för att arbeta med filsystem, hämta miljövariabler, länka ihop notebook-filer och arbeta med hemligheter. MSSparkUtils-paketet är tillgängligt i PySpark (Python) Scala, SparkR Notebooks och Fabric-pipelines.

Filsystemverktyg

mssparkutils.fs tillhandahåller verktyg för att arbeta med olika filsystem, inklusive Azure Data Lake Storage (ADLS) Gen2 och Azure Blob Storage. Se till att du konfigurerar åtkomsten till Azure Data Lake Storage Gen2 och Azure Blob Storage på rätt sätt.

Kör följande kommandon för en översikt över tillgängliga metoder:

from notebookutils import mssparkutils
mssparkutils.fs.help()

Output

mssparkutils.fs provides utilities for working with various FileSystems.

Below is overview about the available methods:

cp(from: String, to: String, recurse: Boolean = false): Boolean -> Copies a file or directory, possibly across FileSystems
mv(from: String, to: String, recurse: Boolean = false): Boolean -> Moves a file or directory, possibly across FileSystems
ls(dir: String): Array -> Lists the contents of a directory
mkdirs(dir: String): Boolean -> Creates the given directory if it does not exist, also creating any necessary parent directories
put(file: String, contents: String, overwrite: Boolean = false): Boolean -> Writes the given String out to a file, encoded in UTF-8
head(file: String, maxBytes: int = 1024 * 100): String -> Returns up to the first 'maxBytes' bytes of the given file as a String encoded in UTF-8
append(file: String, content: String, createFileIfNotExists: Boolean): Boolean -> Append the content to a file
rm(dir: String, recurse: Boolean = false): Boolean -> Removes a file or directory
exists(file: String): Boolean -> Check if a file or directory exists
mount(source: String, mountPoint: String, extraConfigs: Map[String, Any]): Boolean -> Mounts the given remote storage directory at the given mount point
unmount(mountPoint: String): Boolean -> Deletes a mount point
mounts(): Array[MountPointInfo] -> Show information about what is mounted
getMountPath(mountPoint: String, scope: String = ""): String -> Gets the local path of the mount point

Use mssparkutils.fs.help("methodName") for more info about a method.

MSSparkUtils fungerar med filsystemet på samma sätt som Spark-API:er. Ta mssparkuitls.fs.mkdirs() och Fabric lakehouse-användning till exempel:

Användning Relativ sökväg från HDFS-rot Absolut sökväg för ABFS-filsystem Absolut sökväg för det lokala filsystemet i drivrutinsnoden
Nondefault lakehouse Stöds inte mssparkutils.fs.mkdirs("abfss://< container_name>@<storage_account_name.dfs.core.windows.net/<> new_dir>") mssparkutils.fs.mkdirs("file:/<new_dir>")
Standard lakehouse Katalog under "Files" eller "Tables": mssparkutils.fs.mkdirs("Files/<new_dir>") mssparkutils.fs.mkdirs("abfss://< container_name>@<storage_account_name.dfs.core.windows.net/<> new_dir>") mssparkutils.fs.mkdirs("file:/<new_dir>")

Lista filer

Om du vill visa en lista över innehållet i en katalog använder du mssparkutils.fs.ls('Din katalogsökväg'). Till exempel:

mssparkutils.fs.ls("Files/tmp") # works with the default lakehouse files using relative path 
mssparkutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<path>")  # based on ABFS file system 
mssparkutils.fs.ls("file:/tmp")  # based on local file system of driver node 

Visa filegenskaper

Den här metoden returnerar filegenskaper som filnamn, filsökväg, filstorlek och om det är en katalog och en fil.

files = mssparkutils.fs.ls('Your directory path')
for file in files:
    print(file.name, file.isDir, file.isFile, file.path, file.size)

Skapa ny katalog

Den här metoden skapar den angivna katalogen om den inte finns och skapar alla nödvändiga överordnade kataloger.

mssparkutils.fs.mkdirs('new directory name')  
mssparkutils.fs. mkdirs("Files/<new_dir>")  # works with the default lakehouse files using relative path 
mssparkutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<new_dir>")  # based on ABFS file system 
mssparkutils.fs.ls("file:/<new_dir>")  # based on local file system of driver node 

Kopiera fil

Den här metoden kopierar en fil eller katalog och stöder kopieringsaktivitet mellan filsystem.

mssparkutils.fs.cp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively

Performant copy-fil

Den här metoden ger ett snabbare sätt att kopiera eller flytta filer, särskilt stora mängder data.

mssparkutils.fs.fastcp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively

Förhandsgranska filinnehåll

Den här metoden returnerar upp till de första "maxBytes"-byteen för den angivna filen som en strängkodad i UTF-8.

mssparkutils.fs.head('file path', maxBytes to read)

Flytta fil

Den här metoden flyttar en fil eller katalog och stöder flyttningar mellan filsystem.

mssparkutils.fs.mv('source file or directory', 'destination directory', True) # Set the last parameter as True to firstly create the parent directory if it does not exist

Skrivfil

Den här metoden skriver ut den angivna strängen till en fil, kodad i UTF-8.

mssparkutils.fs.put("file path", "content to write", True) # Set the last parameter as True to overwrite the file if it existed already

Lägga till innehåll i en fil

Den här metoden lägger till den angivna strängen i en fil, kodad i UTF-8.

mssparkutils.fs.append("file path", "content to append", True) # Set the last parameter as True to create the file if it does not exist

Ta bort fil eller katalog

Den här metoden tar bort en fil eller katalog.

mssparkutils.fs.rm('file path', True) # Set the last parameter as True to remove all files and directories recursively

Montera/demontera katalog

Mer information om detaljerad användning finns i Montera och demontera filer.

Verktyg för notebook-filer

Använd MSSparkUtils Notebook Utilities för att köra en notebook-fil eller avsluta en notebook-fil med ett värde. Kör följande kommando för att få en översikt över tillgängliga metoder:

mssparkutils.notebook.help()

Produktionen:


exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map): String -> This method runs a notebook and returns its exit value.

Kommentar

Notebook-verktyg gäller inte för Apache Spark-jobbdefinitioner (SJD).

Referera till en notebook-fil

Den här metoden refererar till en notebook-fil och returnerar dess slutvärde. Du kan köra kapslingsfunktionsanrop i en notebook-fil interaktivt eller i en pipeline. Notebook-filen som refereras körs i Spark-poolen i notebook-filen som anropar den här funktionen.

mssparkutils.notebook.run("notebook name", <timeoutSeconds>, <parameterMap>)

Till exempel:

mssparkutils.notebook.run("Sample1", 90, {"input": 20 })

Du kan öppna länken för ögonblicksbilden av referenskörningen i cellutdata. Ögonblicksbilden fångar kodkörningsresultatet och gör att du enkelt kan felsöka en referenskörning.

Skärmbild av resultatet av referenskörningen.

Skärmbild av ett exempel på en ögonblicksbild.

Kommentar

  • För närvarande har Fabric Notebook endast stöd för att referera till notebook-filer på en arbetsyta.
  • Om du använder filerna under Notebook-resurs använder du mssparkutils.nbResPath i den refererade notebook-filen för att se till att den pekar på samma mapp som den interaktiva körningen.

Referenskörning av flera notebook-filer parallellt

Med metoden mssparkutils.notebook.runMultiple() kan du köra flera notebook-filer parallellt eller med en fördefinierad topologisk struktur. API:et använder en mekanism för implementering av flera trådar i en Spark-session, vilket innebär att beräkningsresurserna delas av referensanteckningsbokens körningar.

Med mssparkutils.notebook.runMultiple()kan du:

  • Kör flera notebook-filer samtidigt, utan att vänta på att var och en ska slutföras.

  • Ange beroenden och körningsordning för dina notebook-filer med hjälp av ett enkelt JSON-format.

  • Optimera användningen av Spark-beräkningsresurser och minska kostnaden för dina Fabric-projekt.

  • Visa ögonblicksbilder av varje notebook-körningspost i utdata och felsöka/övervaka dina notebook-uppgifter på ett bekvämt sätt.

  • Hämta slutvärdet för varje verkställande aktivitet och använd dem i underordnade uppgifter.

Du kan också försöka köra mssparkutils.notebook.help("runMultiple") för att hitta exemplet och den detaljerade användningen.

Här är ett enkelt exempel på hur du kör en lista över notebook-filer parallellt med den här metoden:


mssparkutils.notebook.runMultiple(["NotebookSimple", "NotebookSimple2"])

Körningsresultatet från rotanteckningsboken är följande:

Skärmbild av referens till en lista med notebook-filer.

Följande är ett exempel på hur du kör notebook-filer med topologisk struktur med hjälp av mssparkutils.notebook.runMultiple(). Använd den här metoden för att enkelt orkestrera notebook-filer via en kodupplevelse.

# run multiple notebooks with parameters
DAG = {
    "activities": [
        {
            "name": "NotebookSimple", # activity name, must be unique
            "path": "NotebookSimple", # notebook path
            "timeoutPerCellInSeconds": 90, # max timeout for each cell, default to 90 seconds
            "args": {"p1": "changed value", "p2": 100}, # notebook parameters
        },
        {
            "name": "NotebookSimple2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 2", "p2": 200}
        },
        {
            "name": "NotebookSimple2.2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 3", "p2": 300},
            "retry": 1,
            "retryIntervalInSeconds": 10,
            "dependencies": ["NotebookSimple"] # list of activity names that this activity depends on
        }
    ]
}
mssparkutils.notebook.runMultiple(DAG, {"displayDAGViaGraphviz": False})

Körningsresultatet från rotanteckningsboken är följande:

Skärmbild av referens till en lista med notebook-filer med parametrar.

Kommentar

Parallellitetsgraden för flera notebook-körningar är begränsad till den totala tillgängliga beräkningsresursen för en Spark-session.

Avsluta en notebook-fil

Den här metoden avslutar en notebook-fil med ett värde. Du kan köra kapslingsfunktionsanrop i en notebook-fil interaktivt eller i en pipeline.

  • När du anropar en exit() -funktion från en notebook-fil interaktivt genererar Fabric-notebook-filen ett undantag, hoppar över efterföljande celler och håller Spark-sessionen vid liv.

  • När du dirigerar en notebook-fil i en pipeline som anropar en exit() -funktion returneras notebook-aktiviteten med ett slutvärde, slutför pipelinekörningen och stoppar Spark-sessionen.

  • När du anropar en exit()-funktion i en notebook-fil som refereras stoppar Fabric Spark ytterligare körning av den refererade notebook-filen och fortsätter att köra nästa celler i huvudanteckningsboken som anropar funktionen run(). Till exempel: Notebook1 har tre celler och anropar en exit() -funktion i den andra cellen. Notebook2 har fem celler och anrop som körs (notebook1) i den tredje cellen. När du kör Notebook2 stoppas Notebook1 vid den andra cellen när du trycker på funktionen exit(). Notebook2 fortsätter att köra sin fjärde cell och femte cell.

mssparkutils.notebook.exit("value string")

Till exempel:

Sample1 Notebook med följande två celler:

  • Cell 1 definierar en indataparameter med standardvärdet inställt på 10.

  • Cell 2 avslutar notebook-filen med indata som utgångsvärde.

Skärmbild som visar en exempelanteckningsbok med avsluta-funktionen.

Du kan köra Sample1 i en annan notebook-fil med standardvärden:

exitVal = mssparkutils.notebook.run("Sample1")
print (exitVal)

Produktionen:

Notebook executed successfully with exit value 10

Du kan köra Sample1 i en annan notebook-fil och ange indatavärdet som 20:

exitVal = mssparkutils.notebook.run("Sample1", 90, {"input": 20 })
print (exitVal)

Produktionen:

Notebook executed successfully with exit value 20

Verktyg för autentiseringsuppgifter

Du kan använda verktygen för MSSparkUtils-autentiseringsuppgifter för att hämta åtkomsttoken och hantera hemligheter i ett Azure Key Vault.

Kör följande kommando för att få en översikt över tillgängliga metoder:

mssparkutils.credentials.help()

Produktionen:

getToken(audience, name): returns AAD token for a given audience, name (optional)
getSecret(keyvault_endpoint, secret_name): returns secret for a given Key Vault and secret name

Hämta token

getToken returnerar en Microsoft Entra-token för en viss målgrupp och ett visst namn (valfritt). I följande lista visas tillgängliga målgruppsnycklar:

  • Resurs för lagringspublik: "lagring"
  • Power BI-resurs: "pbi"
  • Azure Key Vault-resurs: "keyvault"
  • Synapse RTA KQL DB-resurs: "kusto"

Kör följande kommando för att hämta token:

mssparkutils.credentials.getToken('audience Key')

Hämta hemlighet med användarautentiseringsuppgifter

getSecret returnerar en Azure Key Vault-hemlighet för en viss Azure Key Vault-slutpunkt och ett hemligt namn med hjälp av autentiseringsuppgifter.

mssparkutils.credentials.getSecret('https://<name>.vault.azure.net/', 'secret name')

Montera och demontera filer

Fabric stöder följande monteringsscenarier i Microsoft Spark Utilities-paketet. Du kan använda API:erna mount, unmount, getMountPath()och mounts() för att ansluta fjärrlagring (ADLS Gen2) till alla arbetsnoder (drivrutinsnoder och arbetsnoder). När lagringsmonteringspunkten är på plats använder du det lokala fil-API:et för att komma åt data som om de lagras i det lokala filsystemet.

Montera ett ADLS Gen2-konto

I följande exempel visas hur du monterar Azure Data Lake Storage Gen2. Montering av Blob Storage fungerar på liknande sätt.

Det här exemplet förutsätter att du har ett Data Lake Storage Gen2-konto med namnet storegen2, och kontot har en container med namnet mycontainer som du vill montera till /test i din Notebook Spark-session.

Skärmbild som visar var du väljer en container att montera.

För att montera containern mycontainer måste mssparkutils först kontrollera om du har behörighet att komma åt containern. För närvarande stöder Fabric två autentiseringsmetoder för utlösarmonteringsåtgärden: accountKey och sastoken.

Montera via signaturtoken för delad åtkomst eller kontonyckel

MSSparkUtils stöder explicit överföring av en kontonyckel eller SAS-token (Signatur för delad åtkomst) som en parameter för att montera målet.

Av säkerhetsskäl rekommenderar vi att du lagrar kontonycklar eller SAS-token i Azure Key Vault (som följande skärmbild visar). Du kan sedan hämta dem med hjälp av API:et mssparkutils.credentials.getSecret . Mer information om Azure Key Vault finns i Om Azure Key Vault-hanterade lagringskontonycklar.

Skärmbild som visar var hemligheter lagras i ett Azure Key Vault.

Exempelkod för metoden accountKey :

from notebookutils import mssparkutils  
# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
accountKey = mssparkutils.credentials.getSecret("<vaultURI>", "<secretName>")
mssparkutils.fs.mount(  
    "abfss://mycontainer@<accountname>.dfs.core.windows.net",  
    "/test",  
    {"accountKey":accountKey}
)

Exempelkod för sastoken:

from notebookutils import mssparkutils  
# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
sasToken = mssparkutils.credentials.getSecret("<vaultURI>", "<secretName>")
mssparkutils.fs.mount(  
    "abfss://mycontainer@<accountname>.dfs.core.windows.net",  
    "/test",  
    {"sasToken":sasToken}
)

Kommentar

Du kan behöva importera mssparkutils om den inte är tillgänglig:

from notebookutils import mssparkutils

Monteringsparametrar:

  • fileCacheTimeout: Blobar cachelagras i den lokala temp-mappen i 120 sekunder som standard. Under den här tiden kontrollerar blobfuse inte om filen är uppdaterad eller inte. Parametern kan ställas in för att ändra standardtidsgränsen för timeout. När flera klienter ändrar filer samtidigt, för att undvika inkonsekvenser mellan lokala filer och fjärrfiler, rekommenderar vi att du förkortar cachetiden eller till och med ändrar den till 0 och alltid får de senaste filerna från servern.
  • timeout: Tidsgränsen för monteringsåtgärden är som standard 120 sekunder. Parametern kan ställas in för att ändra standardtidsgränsen för timeout. När det finns för många exekutorer eller när monteringen överskrider tidsgränsen rekommenderar vi att du ökar värdet.

Du kan använda följande parametrar:

mssparkutils.fs.mount(
   "abfss://mycontainer@<accountname>.dfs.core.windows.net",
   "/test",
   {"fileCacheTimeout": 120, "timeout": 120}
)

Kommentar

Av säkerhetsskäl rekommenderar vi att du inte lagrar autentiseringsuppgifter i kod. För att ytterligare skydda dina autentiseringsuppgifter redigerar vi din hemlighet i notebook-utdata. Mer information finns i Hemlig redigering.

Hur man monterar ett sjöhus

Exempelkod för montering av ett sjöhus till /test:

from notebookutils import mssparkutils 
mssparkutils.fs.mount( 
 "abfss://<workspace_id>@msit-onelake.dfs.fabric.microsoft.com/<lakehouse_id>", 
 "/test"
)

Komma åt filer under monteringspunkten med hjälp av mssparktuils fs API

Huvudsyftet med monteringsåtgärden är att låta kunder komma åt data som lagras i ett fjärrlagringskonto med ett lokalt filsystem-API. Du kan också komma åt data med hjälp av mssparkutils fs API med en monterad sökväg som parameter. Det här sökvägsformatet är lite annorlunda.

Anta att du monterade Data Lake Storage Gen2-containern mycontainer till /test med hjälp av monterings-API:et. När du kommer åt data med ett lokalt filsystems-API är sökvägsformatet så här:

/synfs/notebook/{sessionId}/test/{filename}

När du vill komma åt data med hjälp av mssparkutils fs-API:et rekommenderar vi att du använder getMountPath() för att få rätt sökväg:

path = mssparkutils.fs.getMountPath("/test")
  • Lista kataloger:

    mssparkutils.fs.ls(f"file://{mssparkutils.fs.getMountPath('/test')}")
    
  • Läsa filinnehåll:

    mssparkutils.fs.head(f"file://{mssparkutils.fs.getMountPath('/test')}/myFile.txt")
    
  • Skapa en katalog:

    mssparkutils.fs.mkdirs(f"file://{mssparkutils.fs.getMountPath('/test')}/newdir")
    

Komma åt filer under monteringspunkten via lokal sökväg

Du kan enkelt läsa och skriva filerna i monteringspunkten med hjälp av standardfilsystemet. Här är ett Python-exempel:

#File read
with open(mssparkutils.fs.getMountPath('/test2') + "/myFile.txt", "r") as f:
    print(f.read())
#File write
with open(mssparkutils.fs.getMountPath('/test2') + "/myFile.txt", "w") as f:
    print(f.write("dummy data"))

Så här kontrollerar du befintliga monteringspunkter

Du kan använda API:et mssparkutils.fs.mounts() för att kontrollera all befintlig monteringspunktsinformation:

mssparkutils.fs.mounts()

Demontera monteringspunkten

Använd följande kod för att demontera monteringspunkten (/testa i det här exemplet):

mssparkutils.fs.unmount("/test")

Kända begränsningar

  • Den aktuella monteringen är en konfiguration på jobbnivå. Vi rekommenderar att du använder monterings-API:et för att kontrollera om det finns en monteringspunkt eller inte är tillgänglig.

  • Avmonteringsmekanismen är inte automatisk. När programkörningen är klar måste du uttryckligen anropa ett avmonterings-API i koden för att demontera monteringspunkten och frigöra diskutrymmet. Annars finns monteringspunkten fortfarande i noden när programkörningen har slutförts.

  • Det går inte att montera ett ADLS Gen1-lagringskonto.

Lakehouse-verktyg

mssparkutils.lakehouse tillhandahåller verktyg som är särskilt skräddarsydda för att hantera Lakehouse-artefakter. De här verktygen gör det enkelt för användarna att skapa, hämta, uppdatera och ta bort Lakehouse-artefakter.

Kommentar

Lakehouse-API:er stöds endast på Runtime version 1.2+.

Översikt över metoder

Nedan visas en översikt över tillgängliga metoder som tillhandahålls av mssparkutils.lakehouse:

# Create a new Lakehouse artifact
create(name: String, description: String = "", workspaceId: String = ""): Artifact

# Retrieve a Lakehouse artifact
get(name: String, workspaceId: String = ""): Artifact

# Update an existing Lakehouse artifact
update(name: String, newName: String, description: String = "", workspaceId: String = ""): Artifact

# Delete a Lakehouse artifact
delete(name: String, workspaceId: String = ""): Boolean

# List all Lakehouse artifacts
list(workspaceId: String = ""): Array[Artifact]

Exempel på användning

Tänk på följande användningsexempel för att använda dessa metoder effektivt:

Skapa en Lakehouse-artefakt

artifact = mssparkutils.lakehouse.create("artifact_name", "Description of the artifact", "optional_workspace_id")

Hämtar en Lakehouse-artefakt

artifact = mssparkutils.lakehouse.get("artifact_name", "optional_workspace_id")

Uppdatera en Lakehouse-artefakt

updated_artifact = mssparkutils.lakehouse.update("old_name", "new_name", "Updated description", "optional_workspace_id")

Ta bort en Lakehouse-artefakt

is_deleted = mssparkutils.lakehouse.delete("artifact_name", "optional_workspace_id")

Lista Lakehouse-artefakter

artifacts_list = mssparkutils.lakehouse.list("optional_workspace_id")

Ytterligare information

Mer detaljerad information om varje metod och dess parametrar finns i mssparkutils.lakehouse.help("methodName") funktionen.

Med MSSparkUtils Lakehouse-verktyg blir hanteringen av Lakehouse-artefakter effektivare och integreras i dina Fabric-pipelines, vilket förbättrar din övergripande datahanteringsupplevelse.

Utforska dessa verktyg och införliva dem i dina Fabric-arbetsflöden för sömlös hantering av Lakehouse-artefakter.