Sdílet prostřednictvím


Nástroje Microsoft Spark (MSSparkUtils) pro Fabric

Microsoft Spark Utilities (MSSparkUtils) je integrovaný balíček, který vám pomůže snadno provádět běžné úlohy. MsSparkUtils můžete použít k práci se systémy souborů, k získání proměnných prostředí, ke zřetězení poznámkových bloků a práci s tajnými kódy. Balíček MSSparkUtils je k dispozici v notebookách PySpark (Python), Scala, SparkR a v pipelinách Fabric.

Poznámka:

  • MsSparkUtils byl oficiálně přejmenován na NotebookUtils. Stávající kód zůstane zpětně kompatibilní a nezpůsobí žádné zásadní změny. Je důrazně doporučeno upgradovat na notebookutils, abyste zajistili nepřetržitou podporu a přístup k novým funkcím. Obor názvů mssparkutils bude v budoucnu vyřazen.
  • NotebookUtils je navržený tak, aby fungoval se Sparkem 3.4(Runtime v1.2) a novějším. Všechny nové funkce a aktualizace budou do budoucna podporovány výhradně pomocí oboru názvů notebookutils.

Nástroje systému souborů

mssparkutils.fs poskytuje nástroje pro práci s různými systémy souborů, včetně Azure Data Lake Storage (ADLS) Gen2 a Azure Blob Storage. Ujistěte se, že správně nakonfigurujete přístup ke službě Azure Data Lake Storage Gen2 a Azure Blob Storage .

Pro přehled dostupných metod spusťte následující příkazy:

from notebookutils import mssparkutils
mssparkutils.fs.help()

Výstup

mssparkutils.fs provides utilities for working with various FileSystems.

Below is overview about the available methods:

cp(from: String, to: String, recurse: Boolean = false): Boolean -> Copies a file or directory, possibly across FileSystems
mv(from: String, to: String, recurse: Boolean = false): Boolean -> Moves a file or directory, possibly across FileSystems
ls(dir: String): Array -> Lists the contents of a directory
mkdirs(dir: String): Boolean -> Creates the given directory if it does not exist, also creating any necessary parent directories
put(file: String, contents: String, overwrite: Boolean = false): Boolean -> Writes the given String out to a file, encoded in UTF-8
head(file: String, maxBytes: int = 1024 * 100): String -> Returns up to the first 'maxBytes' bytes of the given file as a String encoded in UTF-8
append(file: String, content: String, createFileIfNotExists: Boolean): Boolean -> Append the content to a file
rm(dir: String, recurse: Boolean = false): Boolean -> Removes a file or directory
exists(file: String): Boolean -> Check if a file or directory exists
mount(source: String, mountPoint: String, extraConfigs: Map[String, Any]): Boolean -> Mounts the given remote storage directory at the given mount point
unmount(mountPoint: String): Boolean -> Deletes a mount point
mounts(): Array[MountPointInfo] -> Show information about what is mounted
getMountPath(mountPoint: String, scope: String = ""): String -> Gets the local path of the mount point

Use mssparkutils.fs.help("methodName") for more info about a method.

MSSparkUtils pracuje se systémem souborů stejným způsobem jako rozhraní API Sparku. Vezměte si například použití mssparkuitls.fs.mkdirs() a také využití Fabric lakehouse:

Využití Relativní cesta od kořenového adresáře HDFS Absolutní cesta pro systém souborů ABFS Absolutní cesta k místnímu systému souborů v uzlu ovladače
Nestandardní lakehouse Nepodporováno mssparkutils.fs.mkdirs("abfss://< container_name>@<storage_account_name.dfs.core.windows.net/>< new_dir>") mssparkutils.fs.mkdirs("file:/<new_dir>")
Výchozí jezero Adresář v části Soubory nebo Tabulky: mssparkutils.fs.mkdirs("Soubory/<new_dir>") mssparkutils.fs.mkdirs("abfss://< container_name>@<storage_account_name.dfs.core.windows.net/>< new_dir>") mssparkutils.fs.mkdirs("file:/<new_dir>")

Zobrazení souborů

Pokud chcete zobrazit seznam obsahu adresáře, použijte mssparkutils.fs.ls(Cesta k vašemu adresáři). Příklad:

mssparkutils.fs.ls("Files/tmp") # works with the default lakehouse files using relative path 
mssparkutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<path>")  # based on ABFS file system 
mssparkutils.fs.ls("file:/tmp")  # based on local file system of driver node 

Zobrazení vlastností souboru

Tato metoda vrátí vlastnosti souboru, včetně názvu souboru, cesty k souboru, velikosti souboru a toho, jestli se jedná o adresář a soubor.

files = mssparkutils.fs.ls('Your directory path')
for file in files:
    print(file.name, file.isDir, file.isFile, file.path, file.size)

Vytvoření nového adresáře

Tato metoda vytvoří daný adresář, pokud neexistuje, a vytvoří všechny nezbytné nadřazené adresáře.

mssparkutils.fs.mkdirs('new directory name')  
mssparkutils.fs. mkdirs("Files/<new_dir>")  # works with the default lakehouse files using relative path 
mssparkutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<new_dir>")  # based on ABFS file system 
mssparkutils.fs.ls("file:/<new_dir>")  # based on local file system of driver node 

Kopírování souboru

Tato metoda zkopíruje soubor nebo adresář a podporuje aktivitu kopírování napříč systémy souborů.

mssparkutils.fs.cp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively

Efektivní kopírování souborů

Tato metoda poskytuje rychlejší způsob kopírování nebo přesouvání souborů, zejména velkých objemů dat.

mssparkutils.fs.fastcp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively

Náhled obsahu souboru

Tato metoda vrátí až první bajty maxBytes daného souboru jako String kódovaný v UTF-8.

# Set the second parameter as an integer for the maxBytes to read
mssparkutils.fs.head('file path', <maxBytes>)

Přesun souboru

Tato metoda přesune soubor nebo adresář a podporuje přesuny mezi systémy souborů.

mssparkutils.fs.mv('source file or directory', 'destination directory', True) # Set the last parameter as True to firstly create the parent directory if it does not exist
mssparkutils.fs.mv('source file or directory', 'destination directory', True, True) # Set the third parameter to True to firstly create the parent directory if it does not exist. Set the last parameter to True to overwrite the updates.

Zápis souboru

Tato metoda zapíše daný řetězec do souboru zakódovaného v UTF-8.

mssparkutils.fs.put("file path", "content to write", True) # Set the last parameter as True to overwrite the file if it existed already

Připojení obsahu k souboru

Tato metoda připojí daný řetězec k souboru zakódovanému v UTF-8.

mssparkutils.fs.append("file path", "content to append", True) # Set the last parameter as True to create the file if it does not exist

Poznámka:

Při použití mssparkutils.fs.append rozhraní API ve for smyčce pro zápis do stejného souboru doporučujeme přidat sleep příkaz kolem 0,5s~1s mezi opakující se zápisy. Důvodem je to, že mssparkutils.fs.append interní flush operace rozhraní API je asynchronní, takže krátká prodleva pomáhá zajistit integritu dat.

Odstranění souboru nebo adresáře

Tato metoda odebere soubor nebo adresář.

mssparkutils.fs.rm('file path', True) # Set the last parameter as True to remove all files and directories recursively

Připojení nebo odpojení adresáře

Další informace o podrobném využití najdete v připojení a odpojení souboru.

Nástroje poznámkového bloku

Pomocí nástrojů MSSparkUtils Notebook spusťte poznámkový blok nebo ukončete poznámkový blok s hodnotou. Spuštěním následujícího příkazu získejte přehled dostupných metod:

mssparkutils.notebook.help()

Výstup:


exit(value: String): Raises NotebookExit Exception -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map): String -> This method runs a notebook and returns its exit value.

Poznámka:

Nástroje poznámkového bloku se nevztahují na definice úloh Apache Sparku (SJD).

Odkaz na poznámkový blok

Tato metoda odkazuje na poznámkový blok a vrátí jeho výstupní hodnotu. Volání vnořené funkce můžete v poznámkovém bloku spouštět interaktivně nebo v potrubí. Referencovaný poznámkový blok běží na Spark fondu poznámkového bloku, který tuto funkci volá.

mssparkutils.notebook.run("notebook name", <timeoutSeconds>, <parameterMap>, <workspaceId>)

Příklad:

mssparkutils.notebook.run("Sample1", 90, {"input": 20 })

Notebook Fabric také podporuje odkazování na notebooky napříč několika pracovními prostory pomocí specifikace ID pracovního prostoru.

mssparkutils.notebook.run("Sample1", 90, {"input": 20 }, "fe0a6e2a-a909-4aa3-a698-0a651de790aa")

Odkaz na snímek referenčního běhu můžete otevřít ve výstupu buňky. Snímek zachycuje výsledky spuštění kódu a umožňuje snadné ladění referenčního spuštění.

Snímek obrazovky, který ukazuje výsledek referenčního spuštění.

Snímek obrazovky se snímkem s výsledky spuštění kódu

Poznámka:

  • Referenční poznámkový blok mezi pracovními prostory podporuje modul runtime verze 1.2 a vyšší.
  • Pokud používáte soubory v části Zdroj poznámkového bloku, použijte mssparkutils.nbResPath v odkazovaném poznámkovém bloku, aby odkazoval na stejnou složku jako interaktivní spuštění.

Paralelní spouštění více referenčních poznámkových bloků

Důležité

Tato funkce je ve verzi Preview.

Tato metoda mssparkutils.notebook.runMultiple() umožňuje paralelně spouštět více poznámkových bloků nebo s předdefinovanou topologickou strukturou. API používá vícevláknové implementace k odesílání, řazení do fronty a monitorování podřízených poznámkových bloků, které se spouští v izolovaných instancích REPL v rámci existující relace Spark. Výpočetní prostředky relace jsou sdíleny odkazovanými podřízenými poznámkovými bloky.

Pomocí mssparkutils.notebook.runMultiple():

  • Spusťte několik poznámkových bloků současně, aniž byste museli čekat na dokončení každého z nich.

  • Pomocí jednoduchého formátu JSON určete závislosti a pořadí provádění poznámkových bloků.

  • Optimalizujte využití výpočetních prostředků Sparku a snižte náklady na vaše projekty Fabric.

  • Prohlédněte si snímky každého záznamu spuštění poznámkového bloku v sekci výstupu a pohodlně laďte a monitorujte úlohy poznámkového bloku.

  • Získejte výstupní hodnotu jednotlivých aktivit vedení a použijte je v podřízených úkolech.

Můžete se také pokusit spustit mssparkutils.notebook.help("runMultiple") a vyhledat příklad a podrobné využití.

Tady je jednoduchý příklad paralelního spuštění seznamu poznámkových bloků pomocí této metody:


mssparkutils.notebook.runMultiple(["NotebookSimple", "NotebookSimple2"])

Výsledek spuštění z kořenového poznámkového bloku je následující:

Snímek obrazovky s odkazem na seznam poznámkových bloků

Následuje příklad spouštění poznámkových bloků s topologickou strukturou pomocí mssparkutils.notebook.runMultiple(). Pomocí této metody můžete snadno orchestrovat poznámkové bloky prostřednictvím prostředí kódu.

# run multiple notebooks with parameters
DAG = {
    "activities": [
        {
            "name": "NotebookSimple", # activity name, must be unique
            "path": "NotebookSimple", # notebook path
            "timeoutPerCellInSeconds": 90, # max timeout for each cell, default to 90 seconds
            "args": {"p1": "changed value", "p2": 100}, # notebook parameters
        },
        {
            "name": "NotebookSimple2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 2", "p2": 200}
        },
        {
            "name": "NotebookSimple2.2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 3", "p2": 300},
            "retry": 1,
            "retryIntervalInSeconds": 10,
            "dependencies": ["NotebookSimple"] # list of activity names that this activity depends on
        }
    ],
    "timeoutInSeconds": 43200, # max timeout for the entire DAG, default to 12 hours
    "concurrency": 50 # max number of notebooks to run concurrently, defaults to 50 but ultimately constrained by the number of driver cores
}
mssparkutils.notebook.runMultiple(DAG, {"displayDAGViaGraphviz": False})

Výsledek spuštění z kořenového poznámkového bloku je následující:

Snímek obrazovky se zobrazením seznamu poznámkových bloků s parametry.

Poznámka:

  • Horní limit pro aktivity poznámkového bloku nebo souběžné poznámkové bloky je omezený počtem jader ovladačů. Například ovladač středního uzlu s 8 jádry by mohl souběžně spouštět až 8 poznámkových bloků. Důvodem je, že každý odeslaný poznámkový blok se spouští na vlastní instanci REPL (read-eval-print-loop), z nichž každý využívá jedno jádro ovladače.
  • Výchozí parametr souběžnosti je nastavený na 50 , aby podporoval automatické škálování maximální souběžnosti, protože uživatelé konfigurují fondy Sparku s většími uzly a tím více jader ovladačů. I když tuto hodnotu můžete nastavit na vyšší hodnotu při použití většího uzlu ovladače, zvýšení počtu souběžných procesů spuštěných na jednom uzlu ovladače obvykle nedochází k lineárnímu škálování. Zvýšení souběžnosti může vést ke snížení efektivity kvůli soutěži o prostředky mezi ovladačem a vykonavatelem. Každý spuštěný poznámkový blok běží na vyhrazené instanci REPL, která spotřebovává procesor a paměť na ovladači, a při vysoké souběžnosti to může zvýšit riziko nestability ovladačů nebo chyb kvůli nedostatku paměti, zejména u dlouhotrvajících úloh.
  • Může se stát, že jednotlivé úlohy budou trvat déle kvůli režii inicializace instancí REPL a koordinaci mnoha poznámkových bloků. Pokud dojde k problémům, zvažte oddělení poznámkových bloků do několika runMultiple volání nebo snížení souběžnosti úpravou pole souběžnosti v parametru DAG.
  • Při spouštění krátkodobých poznámkových bloků (např. 5sekundový čas provádění kódu) se režie inicializace stává dominantní a variabilita v čase přípravy může snížit pravděpodobnost překrývajících se poznámkových bloků, což vede ke snížení realizované souběžnosti. V těchto případech může být výhodnější kombinovat malé operace do jednoho nebo více sešitů.
  • Zatímco více vláken se používá k odesílání, řazení front a monitorování, mějte na paměti, že kód spuštěný v každém poznámkovém bloku není vícevláknový v každém exekutoru. Mezi jednotlivými procesy poznámkového bloku nedochází ke sdílení prostředků, protože každý proces má přidělenu část celkových prostředků exekutoru. To může vést k neefektivnímu běhu kratších úloh a k tomu, že delší úlohy budou soupeřit o prostředky.
  • Výchozí časový limit pro celý DAG je 12 hodin a výchozí časový limit pro každou buňku v podřízeném poznámkovém bloku je 90 sekund. Časový limit můžete změnit nastavením polí timeoutInSeconds a timeoutPerCellInSeconds v parametru DAG. Když zvýšíte souběžnost, možná budete muset zvýšit timeoutPerCellInSeconds, abyste zabránili možným kolizím prostředků, které by mohly způsobit zbytečné časové limity.

Ukončit poznámkový blok

Tato metoda ukončí poznámkový blok s hodnotou. Volání vnořené funkce můžete v poznámkovém bloku spouštět interaktivně nebo v potrubí.

  • Když z poznámkového bloku interaktivně zavoláte funkci exit(), poznámkový blok Fabric vyvolá výjimku, přeskočí následné buňky a udržuje relaci Sparku naživu.

  • Když orchestrujete poznámkový blok v kanálu, který volá funkci exit(), aktivita poznámkového bloku se vrátí s výstupní hodnotou, dokončí spuštění kanálu a zastaví relaci Sparku. Neuzavírajte funkci exit() kolem try/catch, protože tato výjimka NotebookExit se musí rozšířit, aby kanál získal návratovou hodnotu.

  • Při volání funkce exit() v poznámkovém bloku, na který se odkazuje, zastaví Spark další spuštění odkazovaného poznámkového bloku a bude dál spouštět další buňky v hlavním poznámkovém bloku, které volají funkci run(). Například: Notebook1 obsahuje tři buňky a volá funkci exit() ve druhé buňce. Poznámkový blok2 obsahuje pět buněk a volá run(notebook1) ve třetí buňce. Při spuštění poznámkového bloku 2 se poznámkový blok 1 zastaví na druhé buňce při stisknutí funkce exit(). Poznámkový blok 2 nadále spouští svou čtvrtou a pátou buňku.

mssparkutils.notebook.exit("value string")

Příklad:

Ukázkový 1 poznámkový blok s následujícími dvěma buňkami:

  • Buňka 1 definuje vstupní parametr s výchozí hodnotou nastavenou na 10.

  • Buňka 2 vyjede z poznámkového bloku s vstupem jako výstupní hodnotou.

Snímek obrazovky s ukázkovým poznámkovým blokem výstupní funkce

Ukázku 1 můžete spustit v jiném poznámkovém bloku s výchozími hodnotami:

exitVal = mssparkutils.notebook.run("Sample1")
print (exitVal)

Výstup:

Notebook executed successfully with exit value 10

Ukázku 1 můžete spustit v jiném poznámkovém bloku a nastavit vstupní hodnotu na 20:

exitVal = mssparkutils.notebook.run("Sample1", 90, {"input": 20 })
print (exitVal)

Výstup:

Notebook executed successfully with exit value 20

Nástroje pro přihlašovací údaje

Pomocí nástrojů MSSparkUtils Credentials Utilities můžete získat přístupové tokeny a spravovat tajné kódy ve službě Azure Key Vault.

Spuštěním následujícího příkazu získejte přehled dostupných metod:

mssparkutils.credentials.help()

Výstup:

getToken(audience, name): returns AAD token for a given audience, name (optional)
getSecret(keyvault_endpoint, secret_name): returns secret for a given Key Vault and secret name

Získání tokenu

getToken vrátí token Microsoft Entra pro danou cílovou skupinu a název (volitelné). V následujícím seznamu jsou uvedeny aktuálně dostupné klíče cílové skupiny:

  • Úložiště zdroje publika: "úložiště"
  • Prostředek Power BI: "pbi"
  • Prostředek služby Azure Key Vault: "keyvault"
  • Synapse RTA KQL DB Prostředek: "Kusto"

Spuštěním následujícího příkazu získejte token:

mssparkutils.credentials.getToken('audience Key')

Získání tajného kódu pomocí přihlašovacích údajů uživatele

getSecret vrátí tajný klíč služby Azure Key Vault pro daný koncový bod služby Azure Key Vault a název tajného kódu pomocí přihlašovacích údajů uživatele.

mssparkutils.credentials.getSecret('https://<name>.vault.azure.net/', 'secret name')

Připojení a odpojení souborů

Fabric podporuje následující připojovací scénáře v balíčku Microsoft Spark Utilities. K připojení vzdáleného úložiště (ADLS Gen2) ke všem pracovním uzlům (uzel ovladače a pracovní uzly) můžete použít rozhraní API mount, unmount, getMountPath() a mounts(). Po umístění přípojného bodu úložiště použijte místní souborové rozhraní API pro přístup k datům, jako by byla uložena v místním systému souborů.

Připojení účtu ADLS Gen2

Následující příklad ukazuje, jak připojit Azure Data Lake Storage Gen2. Připojení služby Blob Storage funguje podobně.

Tento příklad předpokládá, že máte účet Data Lake Storage Gen2 s názvem storegen2, který obsahuje kontejner nazvaný mycontainer, a tento kontejner chcete připojit na cestu /test ve své Spark relaci ve vývojovém prostředí poznámkového bloku.

Snímek obrazovky znázorňující, kde vybrat kontejner, který se má připojit

Pokud chcete připojit kontejner s názvem mycontainer, musí nástroj mssparkutils nejprve zkontrolovat, jestli máte oprávnění pro přístup k kontejneru. V současné době Fabric podporuje dvě metody ověřování pro operaci připojení spouštěče: accountKey a sastoken.

Připojení prostřednictvím tokenu sdíleného přístupového podpisu nebo klíče účtu

MSSparkUtils podporuje explicitní předání klíče účtu nebo tokenu sdíleného přístupového podpisu (SAS) jako parametru pro připojení cíle.

Z bezpečnostních důvodů doporučujeme ukládat klíče účtu nebo tokeny SAS ve službě Azure Key Vault (jak ukazuje následující snímek obrazovky). Pak je můžete načíst pomocí API mssparkutils.credentials.getSecret. Další informace o službě Azure Key Vault najdete v tématu o klíčích účtu spravovaného úložiště služby Azure Key Vault.

Snímek obrazovky znázorňující, kde jsou tajné kódy uložené ve službě Azure Key Vault

Ukázkový kód pro metodu accountKey :

from notebookutils import mssparkutils  
# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
accountKey = mssparkutils.credentials.getSecret("<vaultURI>", "<secretName>")
mssparkutils.fs.mount(  
    "abfss://mycontainer@<accountname>.dfs.core.windows.net",  
    "/test",  
    {"accountKey":accountKey}
)

Ukázkový kód pro sastoken:

from notebookutils import mssparkutils  
# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
sasToken = mssparkutils.credentials.getSecret("<vaultURI>", "<secretName>")
mssparkutils.fs.mount(  
    "abfss://mycontainer@<accountname>.dfs.core.windows.net",  
    "/test",  
    {"sasToken":sasToken}
)

Poznámka:

Pokud není k dispozici, budete možná muset importovat mssparkutils :

from notebookutils import mssparkutils

Parametry připojení:

  • fileCacheTimeout: Objekty blob se ve výchozím nastavení ukládají do místní dočasné složky po dobu 120 sekund. Během této doby blobfuse nekontroluje, jestli je soubor aktuální nebo ne. Parametr může být nastavený tak, aby změnil výchozí časový limit. Pokud více klientů současně upravuje soubory, aby nedocházelo k nekonzistence mezi místními a vzdálenými soubory, doporučujeme zkrátit dobu mezipaměti nebo dokonce změnit na 0 a vždy získat nejnovější soubory ze serveru.
  • časový limit: Časový limit operace připojení je ve výchozím nastavení 120 sekund. Parametr může být nastavený tak, aby změnil výchozí časový limit. Pokud je příliš mnoho úloh nebo pokud vyprší časový limit pro připojení disku, doporučujeme zvýšit hodnotu.

Můžete použít následující parametry:

mssparkutils.fs.mount(
   "abfss://mycontainer@<accountname>.dfs.core.windows.net",
   "/test",
   {"fileCacheTimeout": 120, "timeout": 120}
)

Poznámka:

Z bezpečnostních důvodů doporučujeme neukládejte přihlašovací údaje do kódu. Abychom mohli vaše přihlašovací údaje dále chránit, vypíšeme váš tajný kód ve výstupu poznámkového bloku. Další informace naleznete v tématu Tajná redakce.

Jak namontovat lakehouse

Vzorový kód pro připojení jezera k /test:

from notebookutils import mssparkutils 
mssparkutils.fs.mount( 
 "abfss://<workspace_id>@onelake.dfs.fabric.microsoft.com/<lakehouse_id>", 
 "/test"
)

Poznámka:

Montáž regionálního koncového bodu není podporována. Fabric podporuje pouze připojení globálního koncového bodu onelake.dfs.fabric.microsoft.com.

Přistupujte k souborům pod přípojným bodem pomocí rozhraní mssparktuils fs API

Hlavním účelem operace připojení je umožnit zákazníkům přístup k datům uloženým ve vzdáleném účtu úložiště pomocí místního rozhraní API systému souborů. K datům můžete přistupovat také pomocí API mssparkutils fs s připojenou cestou jako parametrem. Tento formát cesty je trochu odlišný.

Předpokládejme, že jste kontejner Data Lake Storage Gen2 připojili mycontainer k /test pomocí rozhraní API pro připojení. Při přístupu k datům pomocí místního rozhraní API systému souborů je formát cesty podobný tomuto:

/synfs/notebook/{sessionId}/test/{filename}

Pokud chcete získat přístup k datům pomocí rozhraní API mssparkutils fs, doporučujeme použít getMountPath() a získat přesnou cestu:

path = mssparkutils.fs.getMountPath("/test")
  • Seznam adresářů:

    mssparkutils.fs.ls(f"file://{mssparkutils.fs.getMountPath('/test')}")
    
  • Přečíst obsah souboru:

    mssparkutils.fs.head(f"file://{mssparkutils.fs.getMountPath('/test')}/myFile.txt")
    
  • Vytvořte adresář:

    mssparkutils.fs.mkdirs(f"file://{mssparkutils.fs.getMountPath('/test')}/newdir")
    

Přístup k souborům pod přípojným bodem prostřednictvím místní cesty

Soubory můžete snadno číst a zapisovat do přípojného bodu pomocí standardního systému souborů. Tady je příklad Pythonu:

#File read
with open(mssparkutils.fs.getMountPath('/test2') + "/myFile.txt", "r") as f:
    print(f.read())
#File write
with open(mssparkutils.fs.getMountPath('/test2') + "/myFile.txt", "w") as f:
    print(f.write("dummy data"))

Postup kontroly stávajících přípojných bodů

Ke kontrole všech existujících informací o přípojných bodech můžete použít rozhraní API mssparkutils.fs.mounts( ):

mssparkutils.fs.mounts()

Jak odpojit přípojný bod

Pomocí následujícího kódu odpojte bod připojení (/test v tomto příkladu):

mssparkutils.fs.unmount("/test")

Známá omezení

  • Aktuální připojení je konfigurace na úrovni úlohy; Doporučujeme použít rozhraní API pro připojení ke kontrole, jestli přípojný bod existuje nebo není dostupný.

  • Mechanismus odpojování není automatický. Po dokončení spuštění aplikace je potřeba odpojit přípojný bod a uvolnit místo na disku, musíte v kódu explicitně volat rozhraní API pro odpojení. V opačném případě bude přípojný bod stále existovat v uzlu po dokončení spuštění aplikace.

  • Připojení účtu úložiště ADLS Gen1 se nepodporuje.

Nástroje lakehouse

mssparkutils.lakehouse poskytuje nástroje speciálně přizpůsobené pro správu artefaktů Lakehouse. Tyto nástroje uživatelům umožňují snadno vytvářet, načítat, aktualizovat a odstraňovat artefakty Lakehouse.

Poznámka:

Rozhraní API Lakehouse jsou podporována pouze na prostředí runtime ve verzi 1.2 a novější.

Přehled metod

Níže je uveden přehled dostupných metod poskytovaných mssparkutils.lakehouse:

# Create a new Lakehouse artifact
create(name: String, description: String = "", workspaceId: String = ""): Artifact

# Retrieve a Lakehouse artifact
get(name: String, workspaceId: String = ""): Artifact

# Update an existing Lakehouse artifact
update(name: String, newName: String, description: String = "", workspaceId: String = ""): Artifact

# Delete a Lakehouse artifact
delete(name: String, workspaceId: String = ""): Boolean

# List all Lakehouse artifacts
list(workspaceId: String = ""): Array[Artifact]

Příklady použití

Pokud chcete tyto metody efektivně využít, zvažte následující příklady použití:

Vytvoření artefaktu Lakehouse

artifact = mssparkutils.lakehouse.create("artifact_name", "Description of the artifact", "optional_workspace_id")

Načtení artefaktu z Lakehouse

artifact = mssparkutils.lakehouse.get("artifact_name", "optional_workspace_id")

Aktualizace artefaktu Lakehouse

updated_artifact = mssparkutils.lakehouse.update("old_name", "new_name", "Updated description", "optional_workspace_id")

Odstranění artefaktu Lakehouse

is_deleted = mssparkutils.lakehouse.delete("artifact_name", "optional_workspace_id")

Výpis artefaktů Lakehouse

artifacts_list = mssparkutils.lakehouse.list("optional_workspace_id")

Další informace

Podrobnější informace o jednotlivých metodách a jejích parametrech najdete v této mssparkutils.lakehouse.help("methodName") funkci.

Díky nástrojům MSSparkUtils' Lakehouse se správa artefaktů Lakehouse stává efektivnější a integrovaná do kanálů Fabric, což zvyšuje celkové prostředí pro správu dat.

Můžete volně prozkoumávat tyto nástroje a začlenit je do svých pracovních postupů pro Fabric pro bezproblémovou správu artefaktů Lakehouse.

Nástroje běhového prostředí

Zobrazení informací o kontextu relace

Díky mssparkutils.runtime.context tomu můžete získat kontextové informace o aktuální živé relaci, včetně názvu poznámkového bloku, výchozího objektu lakehouse, informací o pracovním prostoru, pokud se jedná o spuštění kanálu atd.

mssparkutils.runtime.context

Poznámka:

mssparkutils.env není na Fabric oficiálně podporován, jako alternativu použijte notebookutils.runtime.context.

Známý problém

Při použití verze modulu runtime vyšší než 1.2 a spuštění mssparkutils.help(), uvedená rozhraní fabricClient, warehouse a workspace API nejsou prozatím podporována, ale budou k dispozici v budoucnu.