Microsoft Spark Utilities (MSSparkUtils) for Fabric

Microsoft Spark Utilities (MSSparkUtils) er en innebygd pakke som hjelper deg med å utføre vanlige oppgaver på en enkel måte. Du kan bruke MSSparkUtils til å arbeide med filsystemer, få miljøvariabler, kjede notatblokker sammen og til å arbeide med hemmeligheter. MSSparkUtils-pakken er tilgjengelig i PySpark (Python) Scala, SparkR-notatblokker og Fabric-rørledninger.

Filsystemverktøy

mssparkutils.fs tilbyr verktøy for å arbeide med ulike filsystemer, inkludert Azure Data Lake Storage (ADLS) Gen2 og Azure Blob Storage. Kontroller at du konfigurerer tilgang til Azure Data Lake Storage Gen2 og Azure Blob Storage på riktig måte.

Kjør følgende kommandoer for en oversikt over de tilgjengelige metodene:

from notebookutils import mssparkutils
mssparkutils.fs.help()

Utdata

mssparkutils.fs provides utilities for working with various FileSystems.

Below is overview about the available methods:

cp(from: String, to: String, recurse: Boolean = false): Boolean -> Copies a file or directory, possibly across FileSystems
mv(from: String, to: String, recurse: Boolean = false): Boolean -> Moves a file or directory, possibly across FileSystems
ls(dir: String): Array -> Lists the contents of a directory
mkdirs(dir: String): Boolean -> Creates the given directory if it does not exist, also creating any necessary parent directories
put(file: String, contents: String, overwrite: Boolean = false): Boolean -> Writes the given String out to a file, encoded in UTF-8
head(file: String, maxBytes: int = 1024 * 100): String -> Returns up to the first 'maxBytes' bytes of the given file as a String encoded in UTF-8
append(file: String, content: String, createFileIfNotExists: Boolean): Boolean -> Append the content to a file
rm(dir: String, recurse: Boolean = false): Boolean -> Removes a file or directory
exists(file: String): Boolean -> Check if a file or directory exists
mount(source: String, mountPoint: String, extraConfigs: Map[String, Any]): Boolean -> Mounts the given remote storage directory at the given mount point
unmount(mountPoint: String): Boolean -> Deletes a mount point
mounts(): Array[MountPointInfo] -> Show information about what is mounted
getMountPath(mountPoint: String, scope: String = ""): String -> Gets the local path of the mount point

Use mssparkutils.fs.help("methodName") for more info about a method.

MSSparkUtils fungerer med filsystemet på samme måte som Spark API-er. Ta mssparkuitls.fs.mkdirs() og Fabric lakehouse bruk for eksempel:

Bruk Relativ bane fra HDFS-rot Absolutt bane for ABFS-filsystem Absolutt bane for lokalt filsystem i drivernode
Nondefault lakehouse Støttes ikke mssparkutils.fs.mkdirs("abfss://< container_name>@<storage_account_name.dfs.core.windows.net/<> new_dir>") mssparkutils.fs.mkdirs("fil:/<new_dir>")
Standard lakehouse Katalog under «Filer» eller «Tabeller»: mssparkutils.fs.mkdirs(«Filer/<new_dir>») mssparkutils.fs.mkdirs("abfss://< container_name>@<storage_account_name.dfs.core.windows.net/<> new_dir>") mssparkutils.fs.mkdirs("fil:/<new_dir>")

Listefiler

Hvis du vil vise innholdet i en katalog, bruker du mssparkutils.fs.ls('Katalogbanen'). Eksempel:

mssparkutils.fs.ls("Files/tmp") # works with the default lakehouse files using relative path 
mssparkutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<path>")  # based on ABFS file system 
mssparkutils.fs.ls("file:/tmp")  # based on local file system of driver node 

Vis filegenskaper

Denne metoden returnerer filegenskaper, inkludert filnavn, filbane, filstørrelse og om det er en katalog og en fil.

files = mssparkutils.fs.ls('Your directory path')
for file in files:
    print(file.name, file.isDir, file.isFile, file.path, file.size)

Opprett ny katalog

Denne metoden oppretter den angitte katalogen hvis den ikke finnes, og oppretter eventuelle nødvendige overordnede mapper.

mssparkutils.fs.mkdirs('new directory name')  
mssparkutils.fs. mkdirs("Files/<new_dir>")  # works with the default lakehouse files using relative path 
mssparkutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<new_dir>")  # based on ABFS file system 
mssparkutils.fs.ls("file:/<new_dir>")  # based on local file system of driver node 

Kopier fil

Denne metoden kopierer en fil eller katalog, og støtter kopieringsaktivitet på tvers av filsystemer.

mssparkutils.fs.cp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively

Performant kopieringsfil

Denne metoden gir en raskere måte å kopiere eller flytte filer på, spesielt store mengder data.

mssparkutils.fs.fastcp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively

Forhåndsvis filinnhold

Denne metoden returnerer opptil de første maxBytes-byte for den angitte filen som en strengkodet i UTF-8.

mssparkutils.fs.head('file path', maxBytes to read)

Flytt fil

Denne metoden flytter en fil eller katalog, og støtter flyttinger på tvers av filsystemer.

mssparkutils.fs.mv('source file or directory', 'destination directory', True) # Set the last parameter as True to firstly create the parent directory if it does not exist

Skrive fil

Denne metoden skriver den angitte strengen ut til en fil, kodet i UTF-8.

mssparkutils.fs.put("file path", "content to write", True) # Set the last parameter as True to overwrite the file if it existed already

Tilføy innhold til en fil

Denne metoden tilføyer den angitte strengen til en fil, kodet i UTF-8.

mssparkutils.fs.append("file path", "content to append", True) # Set the last parameter as True to create the file if it does not exist

Slette fil eller katalog

Denne metoden fjerner en fil eller katalog.

mssparkutils.fs.rm('file path', True) # Set the last parameter as True to remove all files and directories recursively

Monter/demonter katalog

Finn mer informasjon om detaljert bruk i Filmontering og demontering.

Notatblokkverktøy

Bruk MSSparkUtils Notebook Utilities til å kjøre en notatblokk eller avslutte en notatblokk med en verdi. Kjør følgende kommando for å få en oversikt over de tilgjengelige metodene:

mssparkutils.notebook.help()

Utgang:


exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map): String -> This method runs a notebook and returns its exit value.

Merk

Notatblokkverktøy gjelder ikke for Apache Spark-jobbdefinisjoner (SJD).

Referere til en notatblokk

Denne metoden refererer til en notatblokk og returnerer avslutningsverdien. Du kan kjøre nestefunksjonskall i en notatblokk interaktivt eller i et datasamlebånd. Notatblokken som det refereres til, kjøres i Spark-utvalget i notatblokken som kaller denne funksjonen.

mssparkutils.notebook.run("notebook name", <timeoutSeconds>, <parameterMap>)

Eksempel:

mssparkutils.notebook.run("Sample1", 90, {"input": 20 })

Du kan åpne øyeblikksbildekoblingen for referansekjøringen i celleutdataene. Øyeblikksbildet registrerer resultatene for kjøring av kode og lar deg enkelt feilsøke en referansekjøring.

Skjermbilde av referansekjøringsresultat.

Skjermbilde av et eksempel på øyeblikksbilde.

Merk

  • For øyeblikket støtter Fabric-notatblokken bare referanse til notatblokker i et arbeidsområde.
  • Hvis du bruker filene under Notatblokkressurs, kan du bruke mssparkutils.nbResPath den i notatblokken det refereres til, for å sikre at den peker til den samme mappen som den interaktive kjøringen.

Referanse kjøre flere notatblokker parallelt

Med metoden mssparkutils.notebook.runMultiple() kan du kjøre flere notatblokker parallelt eller med en forhåndsdefinert topologisk struktur. API-en bruker en implementeringsmekanisme med flere tråder i en spark-økt, noe som betyr at databehandlingsressursene deles av referansenotatblokkkjøringene.

Med mssparkutils.notebook.runMultiple()kan du:

  • Utfør flere notatblokker samtidig, uten å vente på at hver av dem skal fullføres.

  • Angi avhengighetene og rekkefølgen på kjøringen for notatblokkene, ved hjelp av et enkelt JSON-format.

  • Optimaliser bruken av Spark-databehandlingsressurser og reduser kostnadene for Fabric-prosjektene dine.

  • Vis øyeblikksbilder av hver notatblokkkjøringspost i utdataene, og feilsøk/overvåk notatblokkoppgavene på en enkel måte.

  • Hent avslutningsverdien for hver lederaktivitet, og bruk dem i nedstrømsoppgaver.

Du kan også prøve å kjøre mssparkutils.notebook.help("runMultiple") for å finne eksemplet og detaljert bruk.

Her er et enkelt eksempel på hvordan du kjører en liste over notatblokker parallelt ved hjelp av denne metoden:


mssparkutils.notebook.runMultiple(["NotebookSimple", "NotebookSimple2"])

Utføringsresultatet fra rotnotatblokken er som følger:

Skjermbilde av referanse til en liste over notatblokker.

Følgende er et eksempel på hvordan du kjører notatblokker med topologisk struktur ved hjelp av mssparkutils.notebook.runMultiple(). Bruk denne metoden til enkelt å organisere notatblokker gjennom en kodeopplevelse.

# run multiple notebooks with parameters
DAG = {
    "activities": [
        {
            "name": "NotebookSimple", # activity name, must be unique
            "path": "NotebookSimple", # notebook path
            "timeoutPerCellInSeconds": 90, # max timeout for each cell, default to 90 seconds
            "args": {"p1": "changed value", "p2": 100}, # notebook parameters
        },
        {
            "name": "NotebookSimple2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 2", "p2": 200}
        },
        {
            "name": "NotebookSimple2.2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 3", "p2": 300},
            "retry": 1,
            "retryIntervalInSeconds": 10,
            "dependencies": ["NotebookSimple"] # list of activity names that this activity depends on
        }
    ]
}
mssparkutils.notebook.runMultiple(DAG, {"displayDAGViaGraphviz": False})

Utføringsresultatet fra rotnotatblokken er som følger:

Skjermbilde av referanse til en liste over notatblokker med parametere.

Merk

Parallellitetsgraden for kjøring av flere notatblokker er begrenset til den totale tilgjengelige databehandlingsressursen for en Spark-økt.

Avslutte en notatblokk

Denne metoden avslutter en notatblokk med en verdi. Du kan kjøre nestefunksjonskall i en notatblokk interaktivt eller i et datasamlebånd.

  • Når du kaller en exit() -funksjon fra en notatblokk interaktivt, kaster Fabric-notatblokken et unntak, hopper over etterfølgende celler og holder Spark-økten i live.

  • Når du orkestrerer en notatblokk i et datasamlebånd som kaller en exit() -funksjon, returnerer notatblokkaktiviteten med en avslutningsverdi, fullfører datasamlebåndkjøringen og stopper Spark-økten.

  • Når du kaller en exit() -funksjon i en notatblokk som det refereres til, stopper Fabric Spark den videre kjøringen av den refererte notatblokken, og fortsetter å kjøre de neste cellene i hovednotatblokken som kaller run() -funksjonen. Eksempel: Notatblokk1 har tre celler og kaller en exit()-funksjon i den andre cellen. Notatblokk2 har fem celler og kaller kjøring (notatblokk1) i den tredje cellen. Når du kjører Notatblokk2, stopper Notatblokk1 i den andre cellen når du trykker på exit() -funksjonen. Notatblokk2 fortsetter å kjøre sin fjerde celle og femte celle.

mssparkutils.notebook.exit("value string")

Eksempel:

Eksempel på 1 notatblokk med følgende to celler:

  • Celle 1 definerer en inndataparameter med standardverdien satt til 10.

  • Celle 2 avslutter notatblokken med inndata som avslutningsverdi.

Skjermbilde som viser en eksempelnotatblokk for exit-funksjonen.

Du kan kjøre Eksempel1 i en annen notatblokk med standardverdier:

exitVal = mssparkutils.notebook.run("Sample1")
print (exitVal)

Utgang:

Notebook executed successfully with exit value 10

Du kan kjøre eksempel1 i en annen notatblokk og angi inndataverdien som 20:

exitVal = mssparkutils.notebook.run("Sample1", 90, {"input": 20 })
print (exitVal)

Utgang:

Notebook executed successfully with exit value 20

Legitimasjonsverktøy

Du kan bruke MSSparkUtils Credentials Utilities til å få tilgangstokener og administrere hemmeligheter i et Azure Key Vault.

Kjør følgende kommando for å få en oversikt over de tilgjengelige metodene:

mssparkutils.credentials.help()

Utgang:

getToken(audience, name): returns AAD token for a given audience, name (optional)
getSecret(keyvault_endpoint, secret_name): returns secret for a given Key Vault and secret name

Hent token

getToken returnerer et Microsoft Entra-token for et gitt målgruppe og navn (valgfritt). Listen nedenfor viser de tilgjengelige målgruppenøklene:

  • Lagringsgrupperessurs: «lagring»
  • Power BI-ressurs: «pbi»
  • Azure Key Vault Resource: "keyvault"
  • Synapse RTA KQL DB Resource: "kusto"

Kjør følgende kommando for å hente tokenet:

mssparkutils.credentials.getToken('audience Key')

Bli hemmelig ved hjelp av brukerlegitimasjon

getSecret returnerer en Azure Key Vault-hemmelighet for et gitt Azure Key Vault-endepunkt og hemmelig navn ved hjelp av brukerlegitimasjon.

mssparkutils.credentials.getSecret('https://<name>.vault.azure.net/', 'secret name')

Filmontering og demontering

Fabric støtter følgende monteringsscenarioer i Microsoft Spark Utilities-pakken. Du kan bruke monterings-, demonterings-, getMountPath()- og mounts()-API-ene til å knytte ekstern lagring (ADLS Gen2) til alle fungerende noder (drivernoder og arbeidernoder). Når lagringsmonteringspunktet er på plass, kan du bruke den lokale fil-API-en til å få tilgang til data som om den er lagret i det lokale filsystemet.

Slik monterer du en ADLS Gen2-konto

Følgende eksempel illustrerer hvordan du monterer Azure Data Lake Storage Gen2. Montering av Blob Storage fungerer på samme måte.

Dette eksemplet forutsetter at du har én Data Lake Storage Gen2-konto med navnet Storegen2, og kontoen har én beholder med navnet mycontainer som du vil montere til /test i spark-økten for notatblokken.

Skjermbilde som viser hvor du velger en beholder som skal monteres.

Hvis du vil montere beholderen kalt mycontainer, må mssparkutils først kontrollere om du har tillatelse til å få tilgang til beholderen. For øyeblikket støtter Fabric to godkjenningsmetoder for utløsermonteringsoperasjonen: accountKey og sastoken.

Montere via signaturtoken for delt tilgang eller kontonøkkel

MSSparkUtils støtter eksplisitt å sende en kontonøkkel eller SAS-token (Shared Access Signature) som en parameter for å montere målet.

Av sikkerhetsgrunner anbefaler vi at du lagrer kontonøkler eller SAS-tokener i Azure Key Vault (som følgende skjermbilde viser). Deretter kan du hente dem ved hjelp av mssparkutils.credentials.getSecret API. Hvis du vil ha mer informasjon om Azure Key Vault, kan du se Om administrerte lagringskontonøkler for Azure Key Vault.

Skjermbilde som viser hvor hemmeligheter lagres i et Azure Key Vault.

Eksempelkode for accountKey-metoden :

from notebookutils import mssparkutils  
# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
accountKey = mssparkutils.credentials.getSecret("<vaultURI>", "<secretName>")
mssparkutils.fs.mount(  
    "abfss://mycontainer@<accountname>.dfs.core.windows.net",  
    "/test",  
    {"accountKey":accountKey}
)

Eksempelkode for sastoken:

from notebookutils import mssparkutils  
# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
sasToken = mssparkutils.credentials.getSecret("<vaultURI>", "<secretName>")
mssparkutils.fs.mount(  
    "abfss://mycontainer@<accountname>.dfs.core.windows.net",  
    "/test",  
    {"sasToken":sasToken}
)

Merk

Du må kanskje importere mssparkutils hvis den ikke er tilgjengelig:

from notebookutils import mssparkutils

Monteringsparametere:

  • fileCacheTimeout: Blober bufres i den lokale temp-mappen i 120 sekunder som standard. I løpet av denne tiden vil ikke blobfuse kontrollere om filen er oppdatert eller ikke. Parameteren kan angis for å endre standard tidsavbruddstid. Når flere klienter endrer filer samtidig, for å unngå inkonsekvenser mellom lokale og eksterne filer, anbefaler vi at du forkorter hurtigbuffertiden, eller til og med endrer den til 0, og alltid får de nyeste filene fra serveren.
  • tidsavbrudd: Tidsavbruddet for monteringsoperasjonen er 120 sekunder som standard. Parameteren kan angis for å endre standard tidsavbruddstid. Når det er for mange eksekutorer eller når monteringen blir tidsavbrutt, anbefaler vi at du øker verdien.

Du kan bruke disse parameterne slik:

mssparkutils.fs.mount(
   "abfss://mycontainer@<accountname>.dfs.core.windows.net",
   "/test",
   {"fileCacheTimeout": 120, "timeout": 120}
)

Merk

Av sikkerhetsgrunner anbefalte vi at du ikke lagrer legitimasjon i kode. Hvis du vil beskytte legitimasjonen ytterligere, endrer vi hemmeligheten din i notatblokkutdata. Hvis du vil ha mer informasjon, kan du se Hemmelig skjuling.

Slik monterer du et lakehouse

Eksempelkode for montering av et lakehouse til /test:

from notebookutils import mssparkutils 
mssparkutils.fs.mount( 
 "abfss://<workspace_id>@msit-onelake.dfs.fabric.microsoft.com/<lakehouse_id>", 
 "/test"
)

Få tilgang til filer under monteringspunktet ved hjelp av mssparktuils fs API

Hovedformålet med monteringsoperasjonen er å gi kundene tilgang til dataene som er lagret i en ekstern lagringskonto med en lokal filsystem-API. Du kan også få tilgang til dataene ved hjelp av mssparkutils fs API med en montert bane som parameter. Dette baneformatet er litt annerledes.

Anta at du monterte Data Lake Storage Gen2-beholderen mycontainer til /test ved hjelp av API-en for montering. Når du får tilgang til dataene med en lokal filsystem-API, er baneformatet slik:

/synfs/notebook/{sessionId}/test/{filename}

Når du vil ha tilgang til dataene ved hjelp av mssparkutils fs API, anbefaler vi at du bruker getMountPath() for å få den nøyaktige banen:

path = mssparkutils.fs.getMountPath("/test")
  • Listekataloger:

    mssparkutils.fs.ls(f"file://{mssparkutils.fs.getMountPath('/test')}")
    
  • Les filinnhold:

    mssparkutils.fs.head(f"file://{mssparkutils.fs.getMountPath('/test')}/myFile.txt")
    
  • Opprette en katalog:

    mssparkutils.fs.mkdirs(f"file://{mssparkutils.fs.getMountPath('/test')}/newdir")
    

Få tilgang til filer under monteringspunktet via lokal bane

Du kan enkelt lese og skrive filene i monteringspunktet ved hjelp av standard filsystem. Her er et Python-eksempel:

#File read
with open(mssparkutils.fs.getMountPath('/test2') + "/myFile.txt", "r") as f:
    print(f.read())
#File write
with open(mssparkutils.fs.getMountPath('/test2') + "/myFile.txt", "w") as f:
    print(f.write("dummy data"))

Slik kontrollerer du eksisterende monteringspunkter

Du kan bruke mssparkutils.fs.mounts() API til å kontrollere all eksisterende informasjon om monteringspunkt:

mssparkutils.fs.mounts()

Slik fjerner du monteringspunktet

Bruk følgende kode til å demontere monteringspunktet (/test i dette eksemplet):

mssparkutils.fs.unmount("/test")

Kjente begrensninger

  • Gjeldende montering er en jobbnivåkonfigurasjon. Vi anbefaler at du bruker monterings-API-en til å kontrollere om det finnes et monteringspunkt eller ikke er tilgjengelig.

  • Demonteringsmekanismen er ikke automatisk. Når programmet kjøres, må du eksplisitt kalle opp en umontert API i koden for å demontere monteringspunktet og frigjøre diskplassen. Ellers vil monteringspunktet fremdeles finnes i noden etter at programkjøringen er fullført.

  • Montering av en ADLS Gen1-lagringskonto støttes ikke.

Lakehouse verktøy

mssparkutils.lakehouse gir verktøy spesielt skreddersydd for administrasjon av Lakehouse artefakter. Disse verktøyene gjør det mulig for brukere å opprette, hente, oppdatere og slette Lakehouse-artefakter enkelt.

Merk

Lakehouse API-er støttes bare på Runtime versjon 1.2+.

Oversikt over metoder

Nedenfor finner du en oversikt over de tilgjengelige metodene som leveres av mssparkutils.lakehouse:

# Create a new Lakehouse artifact
create(name: String, description: String = "", workspaceId: String = ""): Artifact

# Retrieve a Lakehouse artifact
get(name: String, workspaceId: String = ""): Artifact

# Update an existing Lakehouse artifact
update(name: String, newName: String, description: String = "", workspaceId: String = ""): Artifact

# Delete a Lakehouse artifact
delete(name: String, workspaceId: String = ""): Boolean

# List all Lakehouse artifacts
list(workspaceId: String = ""): Array[Artifact]

Eksempler på bruk

Hvis du vil bruke disse metodene effektivt, kan du vurdere følgende brukseksempler:

Opprette en Lakehouse-artefakt

artifact = mssparkutils.lakehouse.create("artifact_name", "Description of the artifact", "optional_workspace_id")

Hente en Lakehouse Artefakt

artifact = mssparkutils.lakehouse.get("artifact_name", "optional_workspace_id")

Oppdatere en Lakehouse-artefakt

updated_artifact = mssparkutils.lakehouse.update("old_name", "new_name", "Updated description", "optional_workspace_id")

Slette en Lakehouse-artefakt

is_deleted = mssparkutils.lakehouse.delete("artifact_name", "optional_workspace_id")

Liste over Lakehouse-artefakter

artifacts_list = mssparkutils.lakehouse.list("optional_workspace_id")

Tilleggsinformasjon

Hvis du vil ha mer detaljert informasjon om hver metode og dens parametere, kan du bruke mssparkutils.lakehouse.help("methodName") funksjonen.

Med MSSparkUtils' Lakehouse-verktøy blir administrasjon av Lakehouse-artefaktene mer effektive og integrert i Fabric-rørledninger, noe som forbedrer den generelle databehandlingsopplevelsen.

Du kan gjerne utforske disse verktøyene og innlemme dem i Fabric-arbeidsflytene for sømløs Lakehouse-artefaktbehandling.