Představení prostředků infrastruktury MSSparkUtils

Microsoft Spark Utilities (MSSparkUtils) je integrovaný balíček, který vám pomůže snadno provádět běžné úlohy. Pomocí nástrojeMSSparkUtils můžete pracovat se systémy souborů, získávat proměnné prostředí, řetězit poznámkové bloky a pracovat s tajnými kódy. MSSparkUtils jsou k dispozici v PySpark (Python) Scala, poznámkových blocích SparkR a kanálech Microsoft Fabric.

Důležité

Microsoft Fabric je v současné době ve verzi PREVIEW. Tyto informace se týkají předběžné verze produktu, který může být před vydáním podstatně změněn. Společnost Microsoft neposkytuje na zde uvedené informace žádné záruky, vyjádřené ani předpokládané.

Nástroje systému souborů

mssparkutils.fs poskytuje nástroje pro práci s různými systémy souborů, včetně Azure Data Lake Storage Gen2 (ADLS Gen2) a Azure Blob Storage. Ujistěte se, že jste správně nakonfigurovali přístup pro Azure Data Lake Storage Gen2 a Azure Blob Storage.

Spuštěním následujících příkazů získáte přehled dostupných metod:

from notebookutils import mssparkutils
mssparkutils.fs.help()

Výstup

mssparkutils.fs provides utilities for working with various FileSystems.

Below is overview about the available methods:

cp(from: String, to: String, recurse: Boolean = false): Boolean -> Copies a file or directory, possibly across FileSystems
mv(from: String, to: String, recurse: Boolean = false): Boolean -> Moves a file or directory, possibly across FileSystems
ls(dir: String): Array -> Lists the contents of a directory
mkdirs(dir: String): Boolean -> Creates the given directory if it does not exist, also creating any necessary parent directories
put(file: String, contents: String, overwrite: Boolean = false): Boolean -> Writes the given String out to a file, encoded in UTF-8
head(file: String, maxBytes: int = 1024 * 100): String -> Returns up to the first 'maxBytes' bytes of the given file as a String encoded in UTF-8
append(file: String, content: String, createFileIfNotExists: Boolean): Boolean -> Append the content to a file
rm(dir: String, recurse: Boolean = false): Boolean -> Removes a file or directory
exists(file: String): Boolean -> Check if a file or directory exists
mount(source: String, mountPoint: String, extraConfigs: Map[String, Any]): Boolean -> Mounts the given remote storage directory at the given mount point
unmount(mountPoint: String): Boolean -> Deletes a mount point
mounts(): Array[MountPointInfo] -> Show information about what is mounted
getMountPath(mountPoint: String, scope: String = ""): String -> Gets the local path of the mount point

Use mssparkutils.fs.help("methodName") for more info about a method.

mssparkutils funguje se systémem souborů stejným způsobem jako rozhraní API Sparku. Vezměte například použití mssparkuitls.fs.mkdirs() a Fabric Lakehouse:

Použití Relativní cesta z kořenového adresáře HDFS Absolutní cesta pro systém souborů ABFS Absolutní cesta pro místní systém souborů v uzlu ovladače
Nevýchozí lakehouse Nepodporováno mssparkutils.fs.mkdirs("abfss://< container_name>@<storage_account_name.dfs.core.windows.net/<> new_dir>") mssparkutils.fs.mkdirs("file:/<new_dir>")
Výchozí lakehouse Adresář ve složce Files nebo Tables: mssparkutils.fs.mkdirs("Files/<new_dir>") mssparkutils.fs.mkdirs("abfss://< container_name>@<storage_account_name.dfs.core.windows.net/<> new_dir>") mssparkutils.fs.mkdirs("file:/<new_dir>")

Zobrazení souborů

Vypište obsah adresáře, použijte mssparkutils.fs.ls('Cesta k vašemu adresáři'), například:

mssparkutils.fs.ls("Files/tmp") # works with the default lakehouse files using relative path 
mssparkutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<path>")  # based on ABFS file system 
mssparkutils.fs.ls("file:/tmp")  # based on local file system of driver node 

Zobrazení vlastností souboru

Vrátí vlastnosti souboru, včetně názvu souboru, cesty k souboru, velikosti souboru a toho, jestli se jedná o adresář a soubor.

files = mssparkutils.fs.ls('Your directory path')
for file in files:
    print(file.name, file.isDir, file.isFile, file.path, file.size)

Vytvořit nový adresář

Vytvoří daný adresář, pokud neexistuje, a všechny potřebné nadřazené adresáře.

mssparkutils.fs.mkdirs('new directory name')  
mssparkutils.fs. mkdirs("Files/<new_dir>")  # works with the default lakehouse files using relative path 
mssparkutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<new_dir>")  # based on ABFS file system 
mssparkutils.fs.ls("file:/<new_dir>")  # based on local file system of driver node 

Kopírovat soubor

Zkopíruje soubor nebo adresář. Podporuje kopírování mezi systémy souborů.

mssparkutils.fs.cp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively

Náhled obsahu souboru

Vrátí až první "maxBytes" bajtů daného souboru jako řetězec kódovaný v UTF-8.

mssparkutils.fs.head('file path', maxBytes to read)

Přesunout soubor

Přesune soubor nebo adresář. Podporuje přesun mezi systémy souborů.

mssparkutils.fs.mv('source file or directory', 'destination directory', True) # Set the last parameter as True to firstly create the parent directory if it does not exist

Zápis souboru

Zapíše daný řetězec do souboru zakódovaného v UTF-8. Zapíše daný řetězec do souboru zakódovaného v UTF-8.

mssparkutils.fs.put("file path", "content to write", True) # Set the last parameter as True to overwrite the file if it existed already

Připojení obsahu k souboru

Připojí daný řetězec k souboru zakódovanému v UTF-8.

mssparkutils.fs.append("file path", "content to append", True) # Set the last parameter as True to create the file if it does not exist

Odstranění souboru nebo adresáře

Odebere soubor nebo adresář.

mssparkutils.fs.rm('file path', True) # Set the last parameter as True to remove all files and directories recursively

Připojit nebo odpojit adresář

Podrobné informace o využití najdete v tématu Připojení a odpojení souborů.

Nástroje poznámkových bloků

Pomocí nástroje MSSparkUtils Notebook Utilities můžete spustit poznámkový blok nebo ukončit poznámkový blok s hodnotou. Spuštěním následujícího příkazu získáte přehled o dostupných metodách:

mssparkutils.notebook.help()

Výstup:


exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map): String -> This method runs a notebook and returns its exit value.

Odkaz na poznámkový blok

Odkaz na poznámkový blok a vrátí jeho výstupní hodnotu. Volání funkcí vnořování můžete v poznámkovém bloku spouštět interaktivně nebo v kanálu. Odkazovaný poznámkový blok běží ve fondu Sparku, ve kterém poznámkový blok volá tuto funkci.

mssparkutils.notebook.run("notebook name", <timeoutSeconds>, <parameterMap>)

Například:

mssparkutils.notebook.run("Sample1", 90, {"input": 20 })

Odkaz na snímek referenčního spuštění můžete otevřít ve výstupu buňky, snímek zachytí výsledky spuštění kódu a umožní snadné ladění referenčního spuštění.

Snímek obrazovky s výsledkem referenčního spuštění

Snímek obrazovky s příkladem snímku

Poznámka

Poznámkový blok Prostředků infrastruktury v současné době podporuje odkazování pouze na poznámkové bloky v rámci pracovního prostoru.

Ukončení poznámkového bloku

Ukončí poznámkový blok s hodnotou. Volání funkcí vnořování můžete v poznámkovém bloku spouštět interaktivně nebo v kanálu.

  • Při interaktivním volání funkce exit() z poznámkového bloku vyvolá poznámkový blok Prostředků infrastruktury výjimku, přeskočí spouštění dílčích buněk a udržuje relaci Sparku aktivní.
  • Když orchestrujete poznámkový blok v kanálu, který volá funkci exit(), aktivita poznámkového bloku se vrátí s hodnotou exit, dokončí spuštění kanálu a zastaví relaci Sparku.
  • Při volání funkce exit() v poznámkovém bloku, na který se odkazuje, Zastaví Spark prostředků infrastruktury další provádění odkazovaného poznámkového bloku a bude pokračovat ve spouštění dalších buněk v hlavním poznámkovém bloku, který volá funkci run(). Například: Poznámkový blok1 má tři buňky a v druhé buňce volá funkci exit(). Poznámkový blok2 obsahuje pět buněk a volání run(poznámkový blok1) ve třetí buňce. Když spustíte Poznámkový blok 2, zastaví se Poznámkový blok1 v druhé buňce při stisknutí funkce exit(). Poznámkový blok 2 dál spouští svou čtvrtou a pátou buňku.
mssparkutils.notebook.exit("value string")

Například:

Poznámkový blok Sample1 s následujícími dvěma buňkami:

  • Buňka 1 definuje vstupní parametr s výchozí hodnotou nastavenou na 10.

  • Buňka 2 opouští poznámkový blok se vstupem jako výstupní hodnotou.

Snímek obrazovky s ukázkovým poznámkovým blokem funkce exit

Ukázku 1 můžete spustit v jiném poznámkovém bloku s výchozími hodnotami:

exitVal = mssparkutils.notebook.run("Sample1")
print (exitVal)

Výstup:

Notebook executed successfully with exit value 10

Ukázku 1 můžete spustit v jiném poznámkovém bloku a nastavit vstupní hodnotu na 20:

exitVal = mssparkutils.notebook.run("Sample1", 90, {"input": 20 })
print (exitVal)

Výstup:

Notebook executed successfully with exit value 20

Nástroje pro přihlašovací údaje

Pomocí nástrojů MSSparkUtils Credentials můžete získat přístupové tokeny a spravovat tajné kódy v Azure Key Vault.

Spuštěním následujícího příkazu získáte přehled o dostupných metodách:

mssparkutils.credentials.help()

Výstup:

getToken(audience, name): returns AAD token for a given audience, name (optional)
getSecret(akvName, secret): returns AKV secret for a given akvName, secret key

Získání tokenu

Vrátí Azure AD token pro danou cílovou skupinu, název (volitelné). Následující seznam obsahuje aktuálně dostupné klíče cílové skupiny:

  • Prostředek cílové skupiny úložiště: "úložiště"
  • Prostředek Power BI: pbi
  • Prostředek Azure Key Vault: keyvault
  • Prostředek synapse RTA KQL DB: kusto

Token získáte spuštěním následujícího příkazu:

mssparkutils.credentials.getToken('audience Key')

Získání tajného kódu pomocí přihlašovacích údajů uživatele

Vrátí tajný klíč Azure Key Vault pro daný název azure Key Vault, název tajného klíče a název propojené služby pomocí přihlašovacích údajů uživatele.

mssparkutils.credentials.getSecret('azure key vault name','secret name')

Připojení a odpojení souboru

Scénáře připojení podpory Microsoft Fabric v balíčku Microsoft Spark Utilities Pomocí rozhraní API mount, unmount, getMountPath() a mounts() můžete připojit vzdálené úložiště (Azure Data Lake Storage Gen2) ke všem pracovním uzlům (uzel ovladače a pracovní uzly). Po vytvoření přípojného bodu úložiště použijte rozhraní API pro místní soubory pro přístup k datům, jako by byla uložená v místním systému souborů.

Připojení účtu ADLS Gen2

Tato část ukazuje, jak připojit Azure Data Lake Storage Gen2 jako příklad. Připojení služby Blob Storage funguje podobně.

Příklad předpokládá, že máte jeden účet Data Lake Storage Gen2 s názvem storegen2. Účet má jeden kontejner s názvem mycontainer , který chcete připojit k /test do relace Spark poznámkového bloku.

Snímek obrazovky znázorňující, kde vybrat kontejner, který chcete připojit

Pokud chcete připojit kontejner s názvem mycontainer, musí nástroj mssparkutils nejprve zkontrolovat, jestli máte oprávnění pro přístup ke kontejneru. Microsoft Fabric v současné době podporuje dvě metody ověřování pro operaci připojení triggeru: accountKey a sastoken.

Připojení pomocí tokenu sdíleného přístupových podpisů nebo klíče účtu

Mssparkutils podporuje explicitní předání klíče účtu nebo tokenu sdíleného přístupového podpisu (SAS) jako parametru pro připojení cíle.

Z bezpečnostních důvodů doporučujeme ukládat klíče účtu nebo tokeny SAS v Azure Key Vault (jak ukazuje následující příklad snímku obrazovky). Pak je můžete načíst pomocí rozhraní MSsparkutils.credentials.getSecret API. Informace o využití azure Key Vault najdete v tématu Informace o klíčích spravovaného účtu úložiště Key Vault Azure.

Snímek obrazovky znázorňující, kde jsou tajné kódy uložené v azure Key Vault

Tady je ukázkový kód použití metody accountKey:

from notebookutils import mssparkutils  
# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
accountKey = mssparkutils.credentials.getSecret("<vaultURI>", "<secretName>")
mssparkutils.fs.mount(  
    "abfss://mycontainer@<accountname>.dfs.core.windows.net",  
    "/test",  
    {"accountKey":accountKey}
)

V případě sastokenu se odkažte na následující ukázkový kód:

from notebookutils import mssparkutils  
# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
sasToken = mssparkutils.credentials.getSecret("<vaultURI>", "<secretName>")
mssparkutils.fs.mount(  
    "abfss://mycontainer@<accountname>.dfs.core.windows.net",  
    "/test",  
    {"sasToken":sasToken}
)

Poznámka

Z bezpečnostních důvodů se nedoporučuje ukládat přihlašovací údaje v kódu. Abychom dále chránili vaše přihlašovací údaje, budeme váš tajný kód redigovat ve výstupu poznámkového bloku. Další podrobnosti najdete v tématu Redakce tajných kódů.

Jak připojit lakehouse

Tady je ukázkový kód připojení lakehouse k /test.

from notebookutils import mssparkutils 
mssparkutils.fs.mount( 
 "abfss://<workspace_id>@msit-onelake.dfs.fabric.microsoft.com/<lakehouse_id>", 
 "/test"
)

Přístup k souborům pod přípojným bodem pomocí rozhraní mssparktuils fs API

Hlavním účelem operace připojení je umožnit zákazníkům přístup k datům uloženým ve vzdáleném účtu úložiště pomocí místního rozhraní API systému souborů. K datům můžete přistupovat také pomocí rozhraní mssparkutils fs API s připojenou cestou jako parametrem. Tady použitý formát cesty se trochu liší.

Předpokládejme, že jste připojili Data Lake Storage Gen2 kontejner mycontainer k /test pomocí rozhraní API pro připojení. Při přístupu k datům pomocí rozhraní API místního systému souborů je formát cesty podobný tomuto:

/synfs/notebook/{sessionId}/test/{filename}

Pokud chcete získat přístup k datům pomocí rozhraní mssparkutils fs API, doporučujeme použít getMountPath(), abyste získali přesnou cestu:

path = mssparkutils.fs.getMountPath("/test")
  • Seznamy adresářů:

    mssparkutils.fs.ls(f"file://{mssparkutils.fs.getMountPath('/test')}")
    
  • Čtení obsahu souboru:

    mssparkutils.fs.head(f"file://{mssparkutils.fs.getMountPath('/test')}/myFile.txt")
    
  • Vytvořte adresář:

    mssparkutils.fs.mkdirs(f"file://{mssparkutils.fs.getMountPath('/test')}/newdir")
    

Přístup k souborům pod přípojným bodem prostřednictvím místní cesty

Soubory v přípojné bodě můžete snadno číst a zapisovat pomocí standardního systému souborů, jako příklad použijte Python:

#File read
with open(mssparkutils.fs.getMountPath('/test2') + "/myFile.txt", "r") as f:
    print(f.read())
#File write
with open(mssparkutils.fs.getMountPath('/test2') + "/myFile.txt", "w") as f:
    print(f.write("dummy data"))

Postup kontroly existujících přípojných bodů

Ke kontrole všech existujících informací o přípojných bodech můžete použít rozhraní API mssparkutils.fs.mounts():

mssparkutils.fs.mounts()

Jak odpojit přípojný bod

K odpojení přípojného bodu použijte následující kód (v tomto příkladu otestujte ):

mssparkutils.fs.unmount("/test")

Známá omezení

  • Aktuální připojení je konfigurace na úrovni úlohy. Doporučujeme použít rozhraní API pro připojení ke kontrole, jestli přípojný bod existuje, nebo není k dispozici.
  • Mechanismus odpojení není automatický. Po dokončení spuštění aplikace je nutné v kódu explicitně volat rozhraní API pro odpojení přípojného bodu a uvolnit tak místo na disku. Jinak bude přípojný bod v uzlu existovat i po dokončení spuštění aplikace.
  • Připojení účtu úložiště ADLS Gen1 se nepodporuje.

Další kroky