Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of mappen te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen om mappen te wijzigen.
Microsoft Spark Utilities (MSSparkUtils) is een ingebouwd pakket om u te helpen eenvoudig algemene taken uit te voeren. U kunt MSSparkUtils gebruiken om te werken met bestandssystemen, om omgevingsvariabelen op te halen, notebooks te koppelen en met geheimen te werken. Het MSSparkUtils-pakket is beschikbaar in PySpark (Python) Scala, SparkR-notebooks en Fabric-pijplijnen.
Notitie
- MsSparkUtils is officieel hernoemd naar NotebookUtils. De bestaande code blijft achterwaarts compatibel en veroorzaakt geen wijzigingen die fouten veroorzaken. Het wordt ten zeerste aangeraden een upgrade uit te voeren naar notebookutils om te zorgen voor continue ondersteuning en toegang tot nieuwe functies. De mssparkutils-naamruimte wordt in de toekomst buiten gebruik gesteld.
- NotebookUtils is ontworpen voor gebruik met Spark 3.4(Runtime v1.2) en hoger. Alle nieuwe functies en updates zullen voortaan exclusief ondersteund worden door de 'notebookutils' naamruimte.
Hulpprogramma's voor bestandssysteem
mssparkutils.fs biedt hulpprogramma's voor het werken met verschillende bestandssystemen, waaronder Azure Data Lake Storage (ADLS) Gen2 en Azure Blob Storage. Zorg ervoor dat u de toegang tot Azure Data Lake Storage Gen2 en Azure Blob Storage op de juiste manier configureert.
Voer de volgende opdrachten uit voor een overzicht van de beschikbare methoden:
from notebookutils import mssparkutils
mssparkutils.fs.help()
Uitvoer
mssparkutils.fs provides utilities for working with various FileSystems.
Below is overview about the available methods:
cp(from: String, to: String, recurse: Boolean = false): Boolean -> Copies a file or directory, possibly across FileSystems
mv(from: String, to: String, recurse: Boolean = false): Boolean -> Moves a file or directory, possibly across FileSystems
ls(dir: String): Array -> Lists the contents of a directory
mkdirs(dir: String): Boolean -> Creates the given directory if it does not exist, also creating any necessary parent directories
put(file: String, contents: String, overwrite: Boolean = false): Boolean -> Writes the given String out to a file, encoded in UTF-8
head(file: String, maxBytes: int = 1024 * 100): String -> Returns up to the first 'maxBytes' bytes of the given file as a String encoded in UTF-8
append(file: String, content: String, createFileIfNotExists: Boolean): Boolean -> Append the content to a file
rm(dir: String, recurse: Boolean = false): Boolean -> Removes a file or directory
exists(file: String): Boolean -> Check if a file or directory exists
mount(source: String, mountPoint: String, extraConfigs: Map[String, Any]): Boolean -> Mounts the given remote storage directory at the given mount point
unmount(mountPoint: String): Boolean -> Deletes a mount point
mounts(): Array[MountPointInfo] -> Show information about what is mounted
getMountPath(mountPoint: String, scope: String = ""): String -> Gets the local path of the mount point
Use mssparkutils.fs.help("methodName") for more info about a method.
MSSparkUtils werkt op dezelfde manier met het bestandssysteem als Spark-API's. Neem mssparkuitls.fs.mkdirs() en Fabric lakehouse-gebruik, bijvoorbeeld:
| Gebruik | Relatief pad vanaf de HDFS-root | Absoluut pad voor ABFS-bestandssysteem | Absoluut pad voor lokaal bestandssysteem in stuurprogrammaknooppunt |
|---|---|---|---|
| Niet-standaard lakehouse | Niet ondersteund | mssparkutils.fs.mkdirs("abfss://< container_name>@<storage_account_name.dfs.core.windows.net/>< new_dir>") | mssparkutils.fs.mkdirs("file:/<new_dir>") |
| Standaard lakehouse | Map onder "Bestanden" of "Tabellen": mssparkutils.fs.mkdirs("Files/<new_dir>") | mssparkutils.fs.mkdirs("abfss://< container_name>@<storage_account_name.dfs.core.windows.net/>< new_dir>") | mssparkutils.fs.mkdirs("file:/<new_dir>") |
Bestanden in een lijst weergeven
Als u de inhoud van een map wilt weergeven, gebruikt u mssparkutils.fs.ls('Uw mappad'). Voorbeeld:
mssparkutils.fs.ls("Files/tmp") # works with the default lakehouse files using relative path
mssparkutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<path>") # based on ABFS file system
mssparkutils.fs.ls("file:/tmp") # based on local file system of driver node
Bestandseigenschappen weergeven
Deze methode retourneert bestandseigenschappen, waaronder bestandsnaam, bestandspad, bestandsgrootte en of het een map en een bestand is.
files = mssparkutils.fs.ls('Your directory path')
for file in files:
print(file.name, file.isDir, file.isFile, file.path, file.size)
Nieuwe map maken
Met deze methode wordt de opgegeven map gemaakt als deze niet bestaat, en worden eventuele benodigde bovenliggende mappen aangemaakt.
mssparkutils.fs.mkdirs('new directory name')
mssparkutils.fs. mkdirs("Files/<new_dir>") # works with the default lakehouse files using relative path
mssparkutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<new_dir>") # based on ABFS file system
mssparkutils.fs.ls("file:/<new_dir>") # based on local file system of driver node
Bestand kopiëren
Met deze methode wordt een bestand of map gekopieerd en worden kopieeractiviteiten in bestandssystemen ondersteund.
mssparkutils.fs.cp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively
Efficiënt bestand kopiëren
Deze methode biedt een snellere manier om bestanden te kopiëren of te verplaatsen, met name grote hoeveelheden gegevens.
mssparkutils.fs.fastcp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively
Voorbeeld van bestandsinhoud
Deze methode retourneert maximaal de eerste 'maxBytes' bytes van het opgegeven bestand als een tekenreeks die is gecodeerd in UTF-8.
# Set the second parameter as an integer for the maxBytes to read
mssparkutils.fs.head('file path', <maxBytes>)
Bestand verplaatsen
Met deze methode wordt een bestand of map verplaatst en worden verplaatsingen tussen bestandssystemen ondersteund.
mssparkutils.fs.mv('source file or directory', 'destination directory', True) # Set the last parameter as True to firstly create the parent directory if it does not exist
mssparkutils.fs.mv('source file or directory', 'destination directory', True, True) # Set the third parameter to True to firstly create the parent directory if it does not exist. Set the last parameter to True to overwrite the updates.
Bestand schrijven
Met deze methode wordt de opgegeven tekenreeks weggeschreven naar een bestand, gecodeerd in UTF-8.
mssparkutils.fs.put("file path", "content to write", True) # Set the last parameter as True to overwrite the file if it existed already
Inhoud toevoegen aan een bestand
Met deze methode wordt de opgegeven tekenreeks toegevoegd aan een bestand, gecodeerd in UTF-8.
mssparkutils.fs.append("file path", "content to append", True) # Set the last parameter as True to create the file if it does not exist
Notitie
Wanneer u de mssparkutils.fs.append API in een for lus gebruikt om naar hetzelfde bestand te schrijven, wordt u aangeraden een sleep instructie rond 0,5s~1s toe te voegen tussen de terugkerende schrijfbewerkingen. Dit komt doordat de interne mssparkutils.fs.append bewerking van de flush API asynchroon is, dus een korte vertraging zorgt voor gegevensintegriteit.
Bestand of map verwijderen
Met deze methode wordt een bestand of map verwijderd.
mssparkutils.fs.rm('file path', True) # Set the last parameter as True to remove all files and directories recursively
Map koppelen/ontkoppelen
Meer informatie over gedetailleerd gebruik in bestandskoppeling en ontkoppelen.
Hulpprogramma's voor notebooks
Gebruik de MSSparkUtils-notebookhulpprogramma's om een notebook uit te voeren of een notebook met een waarde af te sluiten. Voer de volgende opdracht uit om een overzicht te krijgen van de beschikbare methoden:
mssparkutils.notebook.help()
Uitvoer:
exit(value: String): Raises NotebookExit Exception -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map): String -> This method runs a notebook and returns its exit value.
Notitie
Hulpprogramma's voor notebooks zijn niet van toepassing op Apache Spark-taakdefinities (SJD).
Verwijzen naar een notitieblok
Deze methode verwijst naar een notebook en retourneert de uitvoerwaarde. U kunt geneste functie-aanroepen in een notebook interactief of in een pijplijn uitvoeren. Het notebook waarnaar wordt verwezen draait op de Spark-pool van het notebook dat deze functie aanroept.
mssparkutils.notebook.run("notebook name", <timeoutSeconds>, <parameterMap>, <workspaceId>)
Voorbeeld:
mssparkutils.notebook.run("Sample1", 90, {"input": 20 })
Fabric Notebook ondersteunt ook verwijzingen naar notebooks in meerdere werkruimten door de werkruimte-id op te geven.
mssparkutils.notebook.run("Sample1", 90, {"input": 20 }, "fe0a6e2a-a909-4aa3-a698-0a651de790aa")
U kunt de koppeling naar de momentopname van de referentierun openen binnen de celuitvoer. Met de momentopname worden de resultaten van de codeuitvoering vastgelegd en kunt u eenvoudig fouten opsporen in een verwijzingsuitvoering.
Notitie
- Het referentienotebook voor meerdere werkruimten wordt ondersteund door runtimeversie 1.2 en hoger.
- Als u de bestanden onder Notebook Resource gebruikt, gebruikt u
mssparkutils.nbResPathin het notitieblok waarnaar wordt verwezen om ervoor te zorgen dat deze naar dezelfde map wijst als de interactieve uitvoering.
Naslaginformatie over het parallel uitvoeren van meerdere notebooks
Belangrijk
Deze functie is beschikbaar als preview-versie.
Met de methode mssparkutils.notebook.runMultiple() kunt u meerdere notebooks parallel of met een vooraf gedefinieerde topologische structuur uitvoeren. De API maakt gebruik van een multi-threaded implementatie voor het indienen, in de wachtrij plaatsen en monitoren van kind-notebooks die uitgevoerd worden op geïsoleerde REPL-instanties (read-eval-print-loop) binnen de bestaande Spark-sessie. De rekenresources voor de sessies worden gedeeld door de verwezen kindnotebooks.
Met mssparkutils.notebook.runMultiple()kunt u het volgende doen:
Voer meerdere notebooks tegelijk uit, zonder te wachten tot elke notebook is voltooid.
Geef de afhankelijkheden en de volgorde van uitvoering voor uw notebooks op met behulp van een eenvoudige JSON-indeling.
Optimaliseer het gebruik van Spark-rekenresources en verlaag de kosten van uw Fabric-projecten.
Bekijk de momentopnamen van elke noteboekuitvoering in de uitvoer en debug en monitor uw notebooktaken gemakkelijk.
Haal de afsluitwaarde van elke uitvoerende activiteit op en gebruik deze in downstream taken.
U kunt ook proberen de mssparkutils.notebook.help("runMultiple") uit te voeren om het voorbeeld en het gedetailleerde gebruik te vinden.
Hier volgt een eenvoudig voorbeeld van het parallel uitvoeren van een lijst met notebooks met behulp van deze methode:
mssparkutils.notebook.runMultiple(["NotebookSimple", "NotebookSimple2"])
Het uitvoerresultaat van het hoofdnotebook is als volgt:
Hier volgt een voorbeeld van het uitvoeren van notebooks met topologische structuur met behulp van mssparkutils.notebook.runMultiple(). Gebruik deze methode om eenvoudig notebooks te organiseren via een code-ervaring.
# run multiple notebooks with parameters
DAG = {
"activities": [
{
"name": "NotebookSimple", # activity name, must be unique
"path": "NotebookSimple", # notebook path
"timeoutPerCellInSeconds": 90, # max timeout for each cell, default to 90 seconds
"args": {"p1": "changed value", "p2": 100}, # notebook parameters
},
{
"name": "NotebookSimple2",
"path": "NotebookSimple2",
"timeoutPerCellInSeconds": 120,
"args": {"p1": "changed value 2", "p2": 200}
},
{
"name": "NotebookSimple2.2",
"path": "NotebookSimple2",
"timeoutPerCellInSeconds": 120,
"args": {"p1": "changed value 3", "p2": 300},
"retry": 1,
"retryIntervalInSeconds": 10,
"dependencies": ["NotebookSimple"] # list of activity names that this activity depends on
}
],
"timeoutInSeconds": 43200, # max timeout for the entire DAG, default to 12 hours
"concurrency": 50 # max number of notebooks to run concurrently, defaults to 50 but ultimately constrained by the number of driver cores
}
mssparkutils.notebook.runMultiple(DAG, {"displayDAGViaGraphviz": False})
Het uitvoerresultaat van het hoofdnotebook is als volgt:
Notitie
- De bovengrens voor notebookactiviteiten of gelijktijdige notebooks wordt beperkt door het aantal stuurprogrammakernen. Een gemiddeld knooppuntstuurprogramma met 8 kernen kan bijvoorbeeld maximaal 8 notebooks gelijktijdig uitvoeren. Dit komt doordat elk notebook dat wordt verzonden, wordt uitgevoerd op een eigen REPL-exemplaar (read-eval-print-loop) en daarbij één besturingskern verbruikt.
- De standaardparameter gelijktijdigheid is ingesteld op 50 ter ondersteuning van het automatisch schalen van de maximale gelijktijdigheid wanneer gebruikers Spark-pools configureren met grotere knooppunten en dus meer stuurprogrammakernen. Hoewel u dit kunt instellen op een hogere waarde wanneer u een groter stuurprogrammaknooppunt gebruikt, wordt het aantal gelijktijdige processen dat wordt uitgevoerd op één stuurprogrammaknooppunt doorgaans niet lineair geschaald. Het verhogen van gelijktijdigheid kan leiden tot verminderde efficiëntie vanwege conflicten tussen stuurprogramma's en uitvoerdersresources. Elk uitgevoerd notebook wordt uitgevoerd op een toegewezen REPL-exemplaar dat CPU en geheugen op de driver verbruikt, en onder hoge concurrentie kan dit het risico op instabiliteit van de driver of out-of-memory-fouten vergroten, met name voor langlopende workloads.
- U kunt merken dat elke individuele taak langer duurt vanwege de overhead van het initialiseren van REPL-exemplaren en het coördineren van veel notebooks. Als er problemen optreden, kunt u overwegen de notebooks op te splitsen in meerdere
runMultipleaanroepen of de gelijktijdigheid te verlagen door het veld concurrency in de DAG-parameter bij te stellen. - Bij het uitvoeren van notebooks met een korte levensduur (bijvoorbeeld code die binnen 5 seconden wordt uitgevoerd), wordt de initialisatie-overhead dominant en kan de variabiliteit in de voorbereidingstijd de kans op gelijktijdig draaiende notebooks verkleinen, wat resulteert in een lagere daadwerkelijke gelijktijdigheid. In deze scenario's is het beter om kleine bewerkingen te combineren in een of meerdere notebooks.
- Hoewel multithreading wordt gebruikt voor verzending, wachtrijen en bewaking, moet u er rekening mee houden dat de code die in elk notebook wordt uitgevoerd, niet meerdere threads op elke uitvoerder heeft. Er is geen resourcedeling tussen notebookprocessen, omdat elk notebookproces een deel van de totale uitvoerdersresources krijgt toegewezen. Dit kan ertoe leiden dat kortere taken minder efficiënt worden uitgevoerd en dat langere taken moeten concurreren om resources.
- De standaardtime-out voor de hele DAG is 12 uur en de standaardtime-out voor elke cel in het onderliggende notitieblok is 90 seconden. U kunt de timeout wijzigen door de timeoutInSeconds en timeoutPerCellInSeconds in te stellen in de DAG-parameter. Als u de gelijktijdigheid verhoogt, moet u mogelijk de time-outPerCellInSeconds verhogen om te voorkomen dat mogelijke resourcecontentie onnodige time-outs veroorzaakt.
Een notitieblok afsluiten
Met deze methode verlaat je een notebook met een waarde. U kunt geneste functie-aanroepen in een notebook interactief of in een pijplijn uitvoeren.
Wanneer u een exit()-functie vanuit een notebook interactief aanroept, gooit het Fabric-notebook een uitzondering, slaat het het uitvoeren van de volgende cellen over en blijft de Spark-sessie actief.
Wanneer u een notebook indeelt in een pijplijn die een exit() -functie aanroept, wordt de notebookactiviteit geretourneerd met een afsluitwaarde, wordt de pijplijnuitvoering voltooid en wordt de Spark-sessie gestopt. Plaats de exit() -functie niet rond een try/catch omdat deze NotebookExit-uitzondering moet worden doorgegeven voor de pijplijn om de retourwaarde op te halen.
Wanneer u een exit() -functie aanroept in een notebook waarnaar wordt verwezen, stopt Fabric Spark de verdere uitvoering van het notitieblok waarnaar wordt verwezen en gaat u door met het uitvoeren van de volgende cellen in het hoofdnotitieblok waarmee de run() -functie wordt aangeroepen. Bijvoorbeeld: Notebook1 heeft drie cellen en roept een exit() -functie aan in de tweede cel. Notebook2 heeft vijf cellen en roept run(notebook1) aan in de derde cel. Wanneer u Notebook2 uitvoert, stopt Notebook1 in de tweede cel wanneer u de exit() -functie bereikt. Notebook2 blijft zijn vierde en vijfde cellen uitvoeren.
mssparkutils.notebook.exit("value string")
Voorbeeld:
Voorbeeld1 notebook met de volgende twee cellen:
Cel 1 definieert een invoerparameter met de standaardwaarde ingesteld op 10.
Cel 2 verlaat het notitieblok met invoer als uitvoerwaarde.
U kunt het voorbeeld1 uitvoeren in een ander notebook met standaardwaarden:
exitVal = mssparkutils.notebook.run("Sample1")
print (exitVal)
Uitvoer:
Notebook executed successfully with exit value 10
U kunt het voorbeeld1 uitvoeren in een ander notebook en de invoerwaarde instellen als 20:
exitVal = mssparkutils.notebook.run("Sample1", 90, {"input": 20 })
print (exitVal)
Uitvoer:
Notebook executed successfully with exit value 20
Hulpprogramma's voor inloggegevens
U kunt de MSSparkUtils Credentials Utilities gebruiken om toegangstokens op te halen en sleutels te beheren in een Azure Key Vault.
Voer de volgende opdracht uit om een overzicht te krijgen van de beschikbare methoden:
mssparkutils.credentials.help()
Uitvoer:
getToken(audience, name): returns AAD token for a given audience, name (optional)
getSecret(keyvault_endpoint, secret_name): returns secret for a given Key Vault and secret name
Token ophalen
getToken retourneert een Microsoft Entra-token voor een bepaalde doelgroep en naam (optioneel). In de volgende lijst ziet u de momenteel beschikbare doelgroepsleutels:
- Opslagbron voor gebruikersgroepen: "opslag"
- Power BI-resource: 'pbi'
- Azure Key Vault-resource: 'keyvault'
- Synapse RTA KQL DB Resource: 'kusto'
Voer de volgende opdracht uit om het token op te halen:
mssparkutils.credentials.getToken('audience Key')
Geheim ophalen met gebruikersreferenties
getSecret retourneert een Azure Key Vault-geheim voor een bepaald Azure Key Vault-eindpunt en een geheime naam met behulp van gebruikersreferenties.
mssparkutils.credentials.getSecret('https://<name>.vault.azure.net/', 'secret name')
Bestandskoppeling en ontkoppelen
Fabric ondersteunt de volgende koppelscenario's in het Microsoft Spark Utilities-pakket. U kunt de API's voor koppelen, ontkoppelen, getMountPath()en mounts() gebruiken om externe opslag (ADLS Gen2) te koppelen aan alle werkende knooppunten (stuurprogrammaknooppunt en werkknooppunten). Nadat het opslagkoppelingspunt is ingesteld, gebruikt u de API voor het lokale bestand om toegang te krijgen tot gegevens alsof deze zijn opgeslagen in het lokale bestandssysteem.
Hoe een ADLS Gen2-account te koppelen
In het volgende voorbeeld ziet u hoe u Azure Data Lake Storage Gen2 koppelt. Het koppelen van Blob Storage werkt op dezelfde manier.
In dit voorbeeld wordt ervan uitgegaan dat u één Data Lake Storage Gen2-account met de naam storegen2 hebt en dat het account één container heeft met de naam mycontainer die u wilt koppelen aan /test in uw Spark-notebooksessie.
Als u de container met de naam mycontainer wilt koppelen, moet mssparkutils eerst controleren of u gemachtigd bent om toegang te krijgen tot de container. Momenteel ondersteunt Fabric twee verificatiemethoden voor de triggerkoppelingsbewerking: accountKey en sastoken.
Koppelen via token voor gedeelde toegangsrechten of accountsleutel
MSSparkUtils ondersteunt het expliciet doorgeven van een accountsleutel of SAS-token (Shared Access Signature) als parameter om de doellocatie te koppelen.
Om veiligheidsredenen raden we u aan accountsleutels of SAS-tokens op te slaan in Azure Key Vault (zoals in de volgende schermopname wordt weergegeven). U kunt ze vervolgens ophalen met behulp van de mssparkutils.credentials.getSecret-API . Voor meer informatie over Azure Key Vault, zie Over door Azure Key Vault beheerde opslagaccountsleutels.
Voorbeeldcode voor de accountKey-methode :
from notebookutils import mssparkutils
# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
accountKey = mssparkutils.credentials.getSecret("<vaultURI>", "<secretName>")
mssparkutils.fs.mount(
"abfss://mycontainer@<accountname>.dfs.core.windows.net",
"/test",
{"accountKey":accountKey}
)
Voorbeeldcode voor sastoken:
from notebookutils import mssparkutils
# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
sasToken = mssparkutils.credentials.getSecret("<vaultURI>", "<secretName>")
mssparkutils.fs.mount(
"abfss://mycontainer@<accountname>.dfs.core.windows.net",
"/test",
{"sasToken":sasToken}
)
Notitie
Mogelijk moet u importeren mssparkutils als deze niet beschikbaar is:
from notebookutils import mssparkutils
Parameters koppelen:
- fileCacheTimeout: Blobs worden standaard gedurende 120 seconden in de lokale tijdelijke map in de cache opgeslagen. Gedurende deze tijd controleert blobfuse niet of het bestand up-to-date is of niet. De parameter kan worden ingesteld om de standaardtime-outtijd te wijzigen. Wanneer meerdere clients tegelijkertijd bestanden wijzigen, om inconsistenties tussen lokale en externe bestanden te voorkomen, raden we u aan de cachetijd te verkorten of zelfs te wijzigen in 0 en altijd de meest recente bestanden van de server op te halen.
- time-out: de time-out voor de koppelbewerking is standaard 120 seconden. De parameter kan worden ingesteld om de standaardtime-outtijd te wijzigen. Als er te veel uitvoerders zijn of wanneer het koppelingsproces een time-out heeft, raden we u aan de waarde te verhogen.
U kunt deze parameters als volgt gebruiken:
mssparkutils.fs.mount(
"abfss://mycontainer@<accountname>.dfs.core.windows.net",
"/test",
{"fileCacheTimeout": 120, "timeout": 120}
)
Notitie
Om veiligheidsredenen raden we u aan om referenties niet in code op te slaan. Om uw referenties verder te beveiligen, zullen we uw geheim in notebookuitvoer verbergen. Zie Geheime redaction voor meer informatie.
Hoe een lakehouse te monteren
Voorbeeldcode voor het koppelen van een lakehouse aan /test:
from notebookutils import mssparkutils
mssparkutils.fs.mount(
"abfss://<workspace_id>@onelake.dfs.fabric.microsoft.com/<lakehouse_id>",
"/test"
)
Notitie
Het koppelen van een regionaal eindpunt wordt niet ondersteund. Fabric biedt alleen ondersteuning voor het koppelen van het globale eindpunt. onelake.dfs.fabric.microsoft.com
Toegang tot bestanden onder het koppelpunt met behulp van de mssparktuils fs-API
Het belangrijkste doel van de koppelingsbewerking is om klanten toegang te geven tot de gegevens die zijn opgeslagen in een extern opslagaccount met een API voor het lokale bestandssysteem. U kunt de gegevens ook openen met behulp van de mssparkutils fs-API met een gekoppeld pad als parameter. Deze padindeling is iets anders.
Stel dat u de Data Lake Storage Gen2-container mycontainer hebt gekoppeld aan /test met behulp van de mount API. Wanneer u toegang krijgt tot de gegevens met een API voor een lokaal bestandssysteem, ziet de padindeling er als volgt uit:
/synfs/notebook/{sessionId}/test/{filename}
Als u toegang wilt krijgen tot de gegevens met behulp van de mssparkutils fs-API, raden we u aan om getMountPath() te gebruiken om het nauwkeurige pad op te halen:
path = mssparkutils.fs.getMountPath("/test")
Directorieën weergeven:
mssparkutils.fs.ls(f"file://{mssparkutils.fs.getMountPath('/test')}")Bestandsinhoud lezen:
mssparkutils.fs.head(f"file://{mssparkutils.fs.getMountPath('/test')}/myFile.txt")Maak een map aan:
mssparkutils.fs.mkdirs(f"file://{mssparkutils.fs.getMountPath('/test')}/newdir")
Toegang tot bestanden onder het mount point via het lokale pad
U kunt de bestanden eenvoudig lezen en schrijven op het aankoppelpunt met behulp van het standaard bestandssysteem. Hier volgt een Python-voorbeeld:
#File read
with open(mssparkutils.fs.getMountPath('/test2') + "/myFile.txt", "r") as f:
print(f.read())
#File write
with open(mssparkutils.fs.getMountPath('/test2') + "/myFile.txt", "w") as f:
print(f.write("dummy data"))
Bestaande koppelpunten controleren
U kunt mssparkutils.fs.mounts() API gebruiken om alle bestaande koppelingspuntgegevens te controleren:
mssparkutils.fs.mounts()
Hoe je het koppelpunt kunt ontkoppelen
Gebruik de volgende code om het koppelpunt los te koppelen (/test in dit voorbeeld):
mssparkutils.fs.unmount("/test")
Bekende beperkingen
De huidige koppeling is een configuratie op taakniveau; We raden u aan de API voor koppelen te gebruiken om te controleren of er een koppelpunt bestaat of niet beschikbaar is.
Het ontkoppelingsmechanisme is niet automatisch. Wanneer de uitvoering van de toepassing is voltooid, moet u expliciet een ontkoppel-API in uw code oproepen om de koppeling op te heffen en de schijfruimte vrij te maken. Anders bestaat het koppelpunt nog steeds in het knooppunt nadat de uitvoering van de toepassing is voltooid.
Het koppelen van een ADLS Gen1-opslagaccount wordt niet ondersteund.
Lakehouse-hulpprogramma's
mssparkutils.lakehouse biedt hulpprogramma's die speciaal zijn afgestemd op het beheren van Lakehouse-artefacten. Met deze hulpprogramma's kunnen gebruikers moeiteloos Lakehouse-artefacten maken, ophalen, bijwerken en verwijderen.
Notitie
Lakehouse-API's worden alleen ondersteund in Runtime versie 1.2+.
Overzicht van methoden
Hieronder vindt u een overzicht van de beschikbare methoden die worden geboden door mssparkutils.lakehouse:
# Create a new Lakehouse artifact
create(name: String, description: String = "", workspaceId: String = ""): Artifact
# Retrieve a Lakehouse artifact
get(name: String, workspaceId: String = ""): Artifact
# Update an existing Lakehouse artifact
update(name: String, newName: String, description: String = "", workspaceId: String = ""): Artifact
# Delete a Lakehouse artifact
delete(name: String, workspaceId: String = ""): Boolean
# List all Lakehouse artifacts
list(workspaceId: String = ""): Array[Artifact]
Voorbeelden van gebruik
Bekijk de volgende gebruiksvoorbeelden om deze methoden effectief te gebruiken:
Een Lakehouse-artefact maken
artifact = mssparkutils.lakehouse.create("artifact_name", "Description of the artifact", "optional_workspace_id")
Een Lakehouse-artefact ophalen
artifact = mssparkutils.lakehouse.get("artifact_name", "optional_workspace_id")
Een Lakehouse-artefact bijwerken
updated_artifact = mssparkutils.lakehouse.update("old_name", "new_name", "Updated description", "optional_workspace_id")
Een Lakehouse-artefact verwijderen
is_deleted = mssparkutils.lakehouse.delete("artifact_name", "optional_workspace_id")
Lakehouse-artefacten vermelden
artifacts_list = mssparkutils.lakehouse.list("optional_workspace_id")
Aanvullende informatie
Gebruik de mssparkutils.lakehouse.help("methodName") functie voor meer gedetailleerde informatie over elke methode en de bijbehorende parameters.
Met de lakehouse-hulpprogramma's van MSSparkUtils wordt het beheer van uw Lakehouse-artefacten efficiënter en geïntegreerd in uw Fabric-pijplijnen, waardoor uw algehele ervaring voor gegevensbeheer wordt verbeterd.
U kunt deze hulpprogramma's verkennen en opnemen in uw Fabric-werkstromen voor naadloos Beheer van Lakehouse-artefacten.
Runtimehulpprogramma's
De sessiecontextgegevens weergeven
Met mssparkutils.runtime.context kunt u de contextinformatie van de huidige livesessie ophalen, waaronder de naam van het notitieblok, het standaard-lakehouse, informatie over de werkruimte, of het een pijplijnuitvoering betreft, en meer.
mssparkutils.runtime.context
Notitie
mssparkutils.env wordt niet officieel ondersteund in Fabric, gebruik notebookutils.runtime.context als alternatief.
Bekend probleem
Wanneer u runtimeversie boven 1.2 gebruikt en mssparkutils.help() uitvoert, worden de vermelde fabricClient-, warehouse- en werkruimte-API's voorlopig niet ondersteund en zullen in de toekomst beschikbaar zijn.