Sdílet prostřednictvím


NotebookUtils (dříve MSSparkUtils) pro Fabric

Notebook Utilities (NotebookUtils) je integrovaný balíček, který vám pomůže snadno provádět běžné úlohy v poznámkovém bloku Fabric. NotebookUtils můžete použít k práci se systémy souborů, k získání proměnných prostředí, ke zřetězení poznámkových bloků a práci s tajnými kódy. Balíček NotebookUtils je k dispozici v noteboocích PySpark (Python), Scala, SparkR a ve Fabric pipelines.

Note

  • MsSparkUtils se oficiálně přejmenuje na NotebookUtils. Stávající kód zůstane zpětně kompatibilní a nezpůsobí žádné zásadní změny. Důrazně doporučujeme upgradovat na nástroje poznámkových bloků, abyste zajistili nepřetržitou podporu a přístup k novým funkcím. Obor názvů mssparkutils bude v budoucnu vyřazen.
  • NotebookUtils je navržený tak, aby fungoval se Sparkem 3.4(Runtime v1.2) a novějším. Všechny nové funkce a aktualizace jsou nadále výhradně podporovány s namespace notebookutils.

Nástroje systému souborů

notebookutils.fs poskytuje nástroje pro práci s různými systémy souborů, včetně Azure Data Lake Storage (ADLS) Gen2 a Azure Blob Storage. Ujistěte se, že správně nakonfigurujete přístup ke službě Azure Data Lake Storage Gen2 a Azure Blob Storage .

Pro přehled dostupných metod spusťte následující příkazy:

notebookutils.fs.help()

Output

notebookutils.fs provides utilities for working with various FileSystems.

Below is overview about the available methods:

cp(from: String, to: String, recurse: Boolean = false): Boolean -> Copies a file or directory, possibly across FileSystems
fastcp(from: String, to: String, recurse: Boolean = true): Boolean -> Copies a file or directory via azcopy, possibly across FileSystems
mv(from: String, to: String, createPath: Boolean = false, overwrite: Boolean = false): Boolean -> Moves a file or directory, possibly across FileSystems
ls(dir: String): Array -> Lists the contents of a directory
mkdirs(dir: String): Boolean -> Creates the given directory if it does not exist, also creating any necessary parent directories
put(file: String, contents: String, overwrite: Boolean = false): Boolean -> Writes the given String out to a file, encoded in UTF-8
head(file: String, maxBytes: int = 1024 * 100): String -> Returns up to the first 'maxBytes' bytes of the given file as a String encoded in UTF-8
append(file: String, content: String, createFileIfNotExists: Boolean): Boolean -> Append the content to a file
rm(dir: String, recurse: Boolean = false): Boolean -> Removes a file or directory
exists(file: String): Boolean -> Check if a file or directory exists
mount(source: String, mountPoint: String, extraConfigs: Map[String, Any]): Boolean -> Mounts the given remote storage directory at the given mount point
unmount(mountPoint: String): Boolean -> Deletes a mount point
mounts(): Array[MountPointInfo] -> Show information about what is mounted
getMountPath(mountPoint: String, scope: String = ""): String -> Gets the local path of the mount point

Use notebookutils.fs.help("methodName") for more info about a method.

NotebookUtils pracuje se systémem souborů stejným způsobem jako rozhraní API Sparku. Použití notebookutils.fs.mkdirs() a Fabric Lakehouse, například:

Usage Relativní cesta od kořenového adresáře HDFS Absolutní cesta pro systém souborů ABFS Absolutní cesta k místnímu systému souborů v uzlu ovladače
Nevýchozí lakehouse Není podporováno notebookutils.fs.mkdirs("abfss://< container_name>@<storage_account_name.dfs.core.windows.net/>< new_dir>") notebookutils.fs.mkdirs("file:/<new_dir>")
Výchozí jezero Adresář v části Soubory nebo Tabulky: notebookutils.fs.mkdirs("Soubory/<new_dir>") notebookutils.fs.mkdirs("abfss://< container_name>@<storage_account_name.dfs.core.windows.net/>< new_dir>") notebookutils.fs.mkdirs("file:/<new_dir>")
  • Pro výchozí Lakehouse jsou cesty k souborům připojeny do poznámkového bloku s výchozí dobou vypršení mezipaměti souborů 120 sekund. To znamená, že soubory se ukládají do mezipaměti v místní dočasné složce poznámkového bloku po dobu 120 sekund, i když se odeberou z Lakehouse. Pokud chcete změnit pravidlo časového limitu, můžete odpojit výchozí cesty k souborům Lakehouse a znovu je připojit s jinou hodnotou fileCacheTimeout .

  • U jiných než výchozích konfigurací Lakehouse můžete během připojování cest Lakehouse nastavit příslušný parametr fileCacheTimeout . Nastavením časového limitu na 0 zajistíte, že se nejnovější soubor načte ze serveru Lakehouse.

Výpis souborů

Pokud chcete zobrazit seznam obsahu adresáře, použijte notebookutils.fs.ls(Cesta k vašemu adresáři). Například:

notebookutils.fs.ls("Files/tmp") # The relatvie path may work with different base path, details in below 
notebookutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<path>")  # The absolute path, like: ABFS file system
notebookutils.fs.ls("file:/tmp")  # The full path of the local file system of driver node

Rozhraní API notebookutils.fs.ls() se při použití relativní cesty chová odlišně podle typu poznámkového bloku.

  • v poznámkovém bloku Sparku: Relativní cesta se odvíjí od výchozí cesty Lakehouse ABFSS. Například notebookutils.fs.ls("Files") odkazuje na adresář Files ve výchozím Lakehouse.

    Například:

    notebookutils.fs.ls("Files/sample_datasets/public_holidays.parquet")
    
  • V poznámkovém bloku Pythonu: Relativní cesta je vztažena k pracovnímu adresáři místního systému souborů, který je ve výchozím nastavení /home/trusted-service-user/work. Proto byste měli použít úplnou cestu místo relativní cesty notebookutils.fs.ls("/lakehouse/default/Files") pro přístup k adresáři Files ve výchozím Lakehouse.

    Například:

    notebookutils.fs.ls("/lakehouse/default/Files/sample_datasets/public_holidays.parquet")
    

Zobrazení vlastností souboru

Tato metoda vrátí vlastnosti souboru, včetně názvu souboru, cesty k souboru, velikosti souboru a toho, jestli se jedná o adresář a soubor.

files = notebookutils.fs.ls('Your directory path')
for file in files:
    print(file.name, file.isDir, file.isFile, file.path, file.size)

Vytvoření nového adresáře

Tato metoda vytvoří daný adresář, pokud neexistuje, a vytvoří všechny nezbytné nadřazené adresáře.

notebookutils.fs.mkdirs('new directory name')  
notebookutils.fs.mkdirs("Files/<new_dir>")  # works with the default lakehouse files using relative path 
notebookutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<new_dir>")  # based on ABFS file system 
notebookutils.fs.ls("file:/<new_dir>")  # based on local file system of driver node 

Kopírovat soubor

Tato metoda zkopíruje soubor nebo adresář a podporuje aktivitu kopírování napříč systémy souborů. Nastavili jsme recurse=True tak, aby kopírovalo všechny soubory a adresáře rekurzivně.

notebookutils.fs.cp('source file or directory', 'destination file or directory', recurse=True)

Note

Vzhledem k omezením zástupce OneLake , pokud potřebujete použít notebookutils.fs.cp() ke kopírování dat ze zástupce typu S3/GCS, doporučujeme místo cesty abfss použít připojenou (mountovanou) cestu.

Efektivní kopírování souborů

Tato metoda nabízí efektivnější přístup ke kopírování nebo přesouvání souborů, zejména při práci s velkými objemy dat. Pro zvýšení výkonu na platformě Fabric je vhodné použít fastcp jako náhradu za tradiční metodu cp.

notebookutils.fs.fastcp('source file or directory', 'destination file or directory', recurse=True)

Considerations:

  • notebookutils.fs.fastcp() nepodporuje kopírování souborů ve OneLake napříč oblastmi. V takovém případě můžete místo toho použít notebookutils.fs.cp() .
  • Vzhledem k omezením zástupce OneLake , pokud potřebujete použít notebookutils.fs.fastcp() ke kopírování dat ze zástupce typu S3/GCS, doporučujeme místo cesty abfss použít připojenou (mountovanou) cestu.

Náhled obsahu souboru

Tato metoda vrátí až první bajty maxBytes daného souboru jako String kódovaný v UTF-8.

notebookutils.fs.head('file path', maxBytes to read)

Přesunout soubor

Tato metoda přesune soubor nebo adresář a podporuje přesuny mezi systémy souborů.

notebookutils.fs.mv('source file or directory', 'destination directory', True) # Set the last parameter as True to firstly create the parent directory if it does not exist
notebookutils.fs.mv('source file or directory', 'destination directory', True, True) # Set the third parameter to True to firstly create the parent directory if it does not exist. Set the last parameter to True to overwrite the updates.

Zápis souboru

Tato metoda zapíše daný řetězec do souboru zakódovaného v UTF-8.

notebookutils.fs.put("file path", "content to write", True) # Set the last parameter as True to overwrite the file if it existed already

Připojení obsahu k souboru

Tato metoda připojí daný řetězec k souboru zakódovanému v UTF-8.

notebookutils.fs.append("file path", "content to append", True) # Set the last parameter as True to create the file if it does not exist

Considerations:

  • notebookutils.fs.append() a notebookutils.fs.put() nepodporuje souběžné zápisy do stejného souboru kvůli nedostatku záruk atomicity.
  • Při použití rozhraní API notebookutils.fs.append ve smyčce for k zápisu do stejného souboru doporučujeme přidat příkaz sleep přibližně 0,5 s ~ 1 s mezi opakovanými zápisy. Toto doporučení je proto, že interní operace notebookutils.fs.append rozhraní API flush je asynchronní, takže krátká prodleva pomáhá zajistit integritu dat.

Odstranění souboru nebo adresáře

Tato metoda odebere soubor nebo adresář. Nastavili jsme recurse=True k rekurzivnímu odstranění všech souborů a adresářů.

notebookutils.fs.rm('file path', recurse=True) 

Připojení nebo odpojení adresáře

Další informace o podrobném využití najdete v připojení a odpojení souboru.

Nástroje poznámkového bloku

Pomocí nástrojů poznámkového bloku spusťte poznámkový blok nebo ukončete poznámkový blok s hodnotou. Spuštěním následujícího příkazu získejte přehled dostupných metod:

notebookutils.notebook.help()

Output:


The notebook module.

exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map, workspace: String): String -> This method runs a notebook and returns its exit value.
runMultiple(DAG: Any): Map[String, MsNotebookRunResult] -> Runs multiple notebooks concurrently with support for dependency relationships.
validateDAG(DAG: Any): Boolean -> This method check if the DAG is correctly defined.

Below methods are only support Fabric Notebook.
create(name: String, description: String = "", content: String = "", defaultLakehouse: String = "", defaultLakehouseWorkspace: String = "", workspaceId: String = ""): Artifact -> Create a new Notebook.
get(name: String, workspaceId: String = ""): Artifact -> Get a Notebook by name or id.
update(name: String, newName: String, description: String = "", workspaceId: String = ""): Artifact -> Update a Artifact by name.
delete(name: String, workspaceId: String = ""): Boolean -> Delete a Notebook by name.
list(workspaceId: String = "", maxResults: Int = 1000): Array[Artifact] -> List all Notebooks in the workspace.
updateDefinition(name: String, content: String = "", defaultLakehouse: String = "", defaultLakehouseWorkspace: String = "", workspaceId: String = "") -> Update the definition of a Notebook.

Use notebookutils.notebook.help("methodName") for more info about a method.

Note

Nástroje poznámkového bloku se nevztahují na definice úloh Apache Sparku (SJD).

Odkaz na poznámkový blok

Tato metoda odkazuje na poznámkový blok a vrátí jeho výstupní hodnotu. Vnořená volání funkcí můžete v poznámkovém bloku spouštět interaktivně nebo v pipeline. Odkazovaný poznámkový blok běží ve fondu Sparku toho poznámkového bloku, který tuto funkci volá.

notebookutils.notebook.run("notebook name", <timeoutSeconds>, <parameterMap>, <workspaceId>)

Například:

notebookutils.notebook.run("Sample1", 90, {"input": 20 })

Poznámkový blok Prostředků infrastruktury také podporuje odkazování na poznámkové bloky napříč několika pracovními prostory zadáním ID pracovního prostoru.

notebookutils.notebook.run("Sample1", 90, {"input": 20 }, "fe0a6e2a-a909-4aa3-a698-0a651de790aa")

Odkaz na snímek referenčního běhu můžete otevřít ve výstupu buňky. Snímek zachycuje výsledky spuštění kódu a umožňuje snadné ladění referenčního spuštění.

Snímek obrazovky s výsledkem referenčního spuštění

Snímek obrazovky ukázkového snímku.

Considerations:

  • Poznámkový blok pro referencování mezi pracovními prostory je podporován verzí runtime 1.2 a vyšší.
  • Pokud používáte soubory v části Prostředek poznámkového bloku, použijte notebookutils.nbResPath v odkazovaném poznámkovém bloku, abyste měli jistotu, že odkazuje na stejnou složku jako interaktivní spuštění.
  • Spuštění odkazu umožňuje spouštění podřízených poznámkových bloků pouze v případě, že používají stejný objekt lakehouse jako nadřazený objekt, dědí nadřazený objekt lakehouse nebo ani jedno nedefinuje. Spuštění se zablokuje, pokud podřízená položka určuje jiný objekt lakehouse pro nadřazený poznámkový blok. Pokud chcete tuto kontrolu obejít, nastavte useRootDefaultLakehouse: True.

Paralelní spouštění více referenčních poznámkových bloků

Tato metoda notebookutils.notebook.runMultiple() umožňuje paralelně spouštět více poznámkových bloků nebo s předdefinovanou topologickou strukturou. Rozhraní API používá mechanismus vícevláknové implementace v rámci relace Sparku, což znamená, že referenční poznámkové sešity sdílejí výpočetní prostředky.

Pomocí notebookutils.notebook.runMultiple():

  • Spusťte několik poznámkových bloků současně, aniž byste museli čekat na dokončení každého z nich.

  • Pomocí jednoduchého formátu JSON určete závislosti a pořadí provádění poznámkových bloků.

  • Optimalizujte využití výpočetních prostředků Sparku a snižte náklady na vaše projekty Fabric.

  • Prohlédněte si snímky z každého záznamu běhu poznámkového bloku ve výstupu a pohodlně laďte a monitorujte úlohy poznámkového bloku.

  • Získejte výstupní hodnotu jednotlivých aktivit vedení a použijte je v podřízených úkolech.

Můžete se také pokusit spustit notebookutils.notebook.help("runMultiple") a vyhledat příklad a podrobné využití.

Tady je jednoduchý příklad paralelního spuštění seznamu poznámkových bloků pomocí této metody:

notebookutils.notebook.runMultiple(["NotebookSimple", "NotebookSimple2"])

Výsledek spuštění z kořenového poznámkového bloku je následující:

Snímek obrazovky zachycující seznam poznámkových bloků

Tady je příklad spouštění poznámkových bloků s topologickou strukturou pomocí notebookutils.notebook.runMultiple(). Pomocí této metody můžete snadno orchestrovat poznámkové bloky prostřednictvím prostředí kódu.

# run multiple notebooks with parameters
DAG = {
    "activities": [
        {
            "name": "NotebookSimple", # activity name, must be unique
            "path": "NotebookSimple", # notebook path
            "timeoutPerCellInSeconds": 90, # max timeout for each cell, default to 90 seconds
            "args": {"p1": "changed value", "p2": 100}, # notebook parameters
        },
        {
            "name": "NotebookSimple2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 2", "p2": 200}
        },
        {
            "name": "NotebookSimple2.2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 3", "p2": 300},
            "retry": 1,
            "retryIntervalInSeconds": 10,
            "dependencies": ["NotebookSimple"] # list of activity names that this activity depends on
        }
    ],
    "timeoutInSeconds": 43200, # max timeout for the entire DAG, default to 12 hours
    "concurrency": 50 # max number of notebooks to run concurrently, default to 50
}
notebookutils.notebook.runMultiple(DAG, {"displayDAGViaGraphviz": False})

Výsledek spuštění z kořenového poznámkového bloku je následující:

Snímek obrazovky se zobrazením seznamu poznámkových bloků s parametry

Poskytujeme také metodu, která zkontroluje, jestli je daG správně definovaná.

notebookutils.notebook.validateDAG(DAG)

Considerations:

  • Stupeň paralelismu při spuštění více poznámkových bloků je omezen celkovými dostupnými výpočetními prostředky relace Sparku.
  • Výchozí počet souběžných poznámkových bloků je 50 pro poznámkový blok Spark, zatímco výchozí hodnota je 25 pro poznámkový blok Pythonu. Tuto hodnotu můžete přizpůsobit, ale nadměrný paralelismus může vést k problémům se stabilitou a výkonem kvůli vysokému využití výpočetních prostředků. Pokud dojde k problémům, zvažte oddělení poznámkových bloků do několika runMultiple volání nebo snížení souběžnosti úpravou pole souběžnosti v parametru DAG.
  • Výchozí časový limit pro celý DAG je 12 hodin a výchozí časový limit pro každou buňku v podřízeném poznámkovém bloku je 90 sekund. Časový limit můžete změnit nastavením polí timeoutInSeconds a timeoutPerCellInSeconds v parametru DAG.

Opustit poznámkový blok

Tato metoda ukončí poznámkový blok s hodnotou. Vnořená volání funkcí můžete v poznámkovém bloku spouštět interaktivně nebo v pipeline.

  • Když z poznámkového bloku interaktivně zavoláte funkci exit(), poznámkový blok Fabric vyvolá výjimku, přeskočí následné buňky a udržuje relaci Sparku naživu.

  • Když orchestrujete poznámkový blok v pipeline, která volá funkci exit(), aktivita poznámkového bloku se vrátí s hodnotou ukončení. Tím se dokončuje spuštění kanálu a zastavuje se relace Sparku.

  • Při volání funkce exit() v poznámkovém bloku, na který se odkazuje, zastaví Spark další spuštění odkazovaného poznámkového bloku a bude dál spouštět další buňky v hlavním poznámkovém bloku, které volají funkci run(). Například: Notebook1 obsahuje tři buňky a volá funkci exit() ve druhé buňce. Poznámkový blok2 obsahuje pět buněk a volání run(notebook1) ve třetí buňce. Při spuštění poznámkového bloku 2 se poznámkový blok 1 zastaví na druhé buňce při stisknutí funkce exit(). Notebook2 nadále spouští čtvrtou a pátou buňku.

notebookutils.notebook.exit("value string")

Note

Funkce exit() přepíše aktuální výstup buňky. Pokud se chcete vyhnout ztrátě výstupu jiných příkazů kódu, zavolejte notebookutils.notebook.exit() v samostatné buňce.

Například:

Ukázkový 1 poznámkový blok s následujícími dvěma buňkami:

  • Buňka 1 definuje vstupní parametr s výchozí hodnotou nastavenou na 10.

  • Buňka 2 ukončí poznámkový blok se vstupem jako výstupní hodnotou.

Snímek obrazovky s ukázkovým poznámkovým blokem výstupní funkce

Ukázku 1 můžete spustit v jiném poznámkovém bloku s výchozími hodnotami:

exitVal = notebookutils.notebook.run("Sample1")
print (exitVal)

Output:

Notebook is executed successfully with exit value 10

Ukázku 1 můžete spustit v jiném poznámkovém bloku a nastavit vstupní hodnotu na 20:

exitVal = notebookutils.notebook.run("Sample1", 90, {"input": 20 })
print (exitVal)

Output:

Notebook is executed successfully with exit value 20

Správa artefaktů poznámkového bloku

notebookutils.notebook poskytuje specializované nástroje pro správu položek poznámkového bloku prostřednictvím kódu programu. Tato rozhraní API vám můžou pomoct snadno vytvářet, získávat, aktualizovat a odstraňovat položky poznámkového bloku.

Pokud chcete tyto metody efektivně využít, zvažte následující příklady použití:

Vytvoření poznámkového bloku

with open("/path/to/notebook.ipynb", "r") as f:
    content = f.read()

artifact = notebookutils.notebook.create("artifact_name", "description", "content", "default_lakehouse_name", "default_lakehouse_workspace_id", "optional_workspace_id")

Získání obsahu poznámkového bloku

artifact = notebookutils.notebook.get("artifact_name", "optional_workspace_id")

Aktualizace poznámkového bloku

updated_artifact = notebookutils.notebook.update("old_name", "new_name", "optional_description", "optional_workspace_id")
updated_artifact_definition = notebookutils.notebook.updateDefinition("artifact_name",  "content", "default_lakehouse_name", "default_Lakehouse_Workspace_name", "optional_workspace_id")

Odstranění poznámkového bloku

is_deleted = notebookutils.notebook.delete("artifact_name", "optional_workspace_id")

Výpis poznámkových bloků v pracovním prostoru

artifacts_list = notebookutils.notebook.list("optional_workspace_id")

Nástroje pro funkci uživatelských dat (UDF)

notebookutils.udf poskytuje nástroje navržené pro integraci kódu poznámkového bloku s funkcemi uživatelských dat (UDF). Tyto nástroje umožňují přístup k funkcím z UDF položky v rámci stejného pracovního prostoru nebo v různých pracovních prostorech. Pak můžete podle potřeby vyvolat funkce v rámci uživatelsky definované funkce (UDF).

Tady je několik příkladů, jak používat UDF utility:

# Get functions from a UDF item
myFunctions = notebookutils.udf.getFunctions('UDFItemName')
# Or from another workspace
myFunctions = notebookutils.udf.getFunctions('UDFItemName', 'workspaceId')

# Display function and item details
display(myFunctions.functionDetails)
display(myFunctions.itemDetails)

# Invoke a function
myFunctions.functionName('value1', 'value2')
# Or with named parameters
myFunctions.functionName(parameter1='value1', parameter2='value2')

Načtení funkcí z UDF

myFunctions = notebookutils.udf.getFunctions('UDFItemName')
myFunctions = notebookutils.udf.getFunctions('UDFItemName', 'workspaceId')
var myFunctions = notebookutils.udf.getFunctions("UDFItemName")
var myFunctions = notebookutils.udf.getFunctions("UDFItemName", "workspaceId")
myFunctions <- notebookutils.udf.getFunctions("UDFItemName")
myFunctions <- notebookutils.udf.getFunctions("UDFItemName", "workspaceId")

Vyvolat funkci

myFunctions.functionName('value1', 'value2'...)
val res = myFunctions.functionName('value1', 'value2'...)
myFunctions$functionName('value1', 'value2'...)

Podrobnosti položky definované uživatelem

display([myFunctions.itemDetails])
display(Array(myFunctions.itemDetails))
myFunctions$itemDetails()

Zobrazit podrobnosti funkce pro UDF

display(myFunctions.functionDetails)
display(myFunctions.functionDetails)
myFunctions$functionDetails()

Nástroje pro přihlašovací údaje

Pomocí nástrojů pro přihlašovací údaje můžete získat přístupové tokeny a spravovat tajné kódy ve službě Azure Key Vault.

Spuštěním následujícího příkazu získejte přehled dostupných metod:

notebookutils.credentials.help()

Output:

Help on module notebookutils.credentials in notebookutils:

NAME
    notebookutils.credentials - Utility for credentials operations in Fabric

FUNCTIONS
    getSecret(akvName, secret) -> str
        Gets a secret from the given Azure Key Vault.
        :param akvName: The name of the Azure Key Vault.
        :param secret: The name of the secret.
        :return: The secret value.
    
    getToken(audience) -> str
        Gets a token for the given audience.
        :param audience: The audience for the token.
        :return: The token.
    
    help(method_name=None)
        Provides help for the notebookutils.credentials module or the specified method.
        
        Examples:
        notebookutils.credentials.help()
        notebookutils.credentials.help("getToken")
        :param method_name: The name of the method to get help with.

DATA
    creds = <notebookutils.notebookutils.handlers.CredsHandler.CredsHandler...

FILE
    /home/trusted-service-user/cluster-env/trident_env/lib/python3.10/site-packages/notebookutils/credentials.py

Získání tokenu

getToken vrátí token Microsoft Entra pro danou cílovou skupinu a název (volitelné). V následujícím seznamu jsou uvedeny aktuálně dostupné klíče cílové skupiny:

  • Úložiště zdroje publika: "úložiště"
  • Prostředek Power BI: "pbi"
  • Azure Key Vault - prostředek: "keyvault"
  • Synapse RTA KQL DB Prostředek: "Kusto"

Spuštěním následujícího příkazu získejte token:

notebookutils.credentials.getToken('audience Key')

Considerations:

  • Obory tokenů s pbi se můžou v průběhu času měnit. V současné době se podporují následující obory.

  • Při volání notebookutils.credentials.getToken("pbi") má vrácený token omezený rozsah, pokud je poznámkový blok spuštěný pod instančním objektem. Token nemá úplný obor služby Fabric. Pokud je poznámkový blok spuštěný pod uživatelskou identitou, token má stále plný obor služby Fabric, ale to se může změnit s příchodem vylepšení zabezpečení. Pokud chcete zajistit, aby token měla úplný obor služby Fabric, použijte ověřování MSAL místo rozhraní notebookutils.credentials.getToken API. Další informace naleznete v tématu Ověřování pomocí Microsoft Entra ID.

  • Následuje seznam oborů, které má token při volání notebookutils.credentials.getToken s klíčem cílové skupiny pbi pod identitou instančního objektu:

    • Lakehouse.ReadWrite.All
    • MLExperiment.ReadWrite.All
    • MLModel.ReadWrite.All
    • Notebook.ReadWrite.All
    • SparkJobDefinition.ReadWrite.All
    • Workspace.ReadWrite.All
    • Dataset.ReadWrite.All

Získání tajného kódu

getSecret vrátí tajný klíč služby Azure Key Vault pro daný koncový bod služby Azure Key Vault a název tajného kódu pomocí přihlašovacích údajů uživatele.

notebookutils.credentials.getSecret('https://<name>.vault.azure.net/', 'secret name')

Připojení a odpojení disku

Podpora Fabric zahrnuje následující scénáře připojení v balíčku Microsoft Spark Utilities. K připojení vzdáleného úložiště (ADLS Gen2) můžete použít rozhraní API pro připojení připojení, odpojení, getMountPath() a připojení (ADLS Gen2) ke všem pracovním uzlům (uzel ovladače a pracovní uzly). Po umístění přípojného bodu úložiště použijte místní souborové rozhraní API pro přístup k datům, jako by byla uložena v místním systému souborů.

Připojení účtu ADLS Gen2

Následující příklad ukazuje, jak připojit Azure Data Lake Storage Gen2. Připojení služby Blob Storage a služby Azure File Share funguje podobně.

Tento příklad předpokládá, že máte jeden účet Data Lake Storage Gen2 s názvem storegen2, který má kontejner s názvem mycontainer , který chcete připojit k /test v relaci Sparku poznámkového bloku.

Snímek obrazovky znázorňující, kde vybrat kontejner, který se má připojit

Pokud chcete připojit kontejner s názvem mycontainer, musí nástroje notebookutils nejprve zkontrolovat, jestli máte oprávnění pro přístup k kontejneru. Prostředky infrastruktury v současné době podporují dvě metody ověřování pro operaci připojení triggeru: accountKey a sastoken.

Připojení prostřednictvím tokenu sdíleného přístupového podpisu nebo klíče účtu

NotebookUtils podporuje explicitní předání klíče účtu nebo tokenu sdíleného přístupového podpisu (SAS) jako parametru pro připojení cíle.

Z bezpečnostních důvodů doporučujeme ukládat klíče účtu nebo tokeny SAS ve službě Azure Key Vault (jak ukazuje následující snímek obrazovky). Pak je můžete načíst pomocí rozhraní notebookutils.credentials.getSecret API. Další informace o službě Azure Key Vault najdete v tématu o klíčích účtu spravovaného úložiště služby Azure Key Vault.

Snímek obrazovky znázorňující, kde jsou tajné kódy uložené ve službě Azure Key Vault

Ukázkový kód pro metodu accountKey :

# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
accountKey = notebookutils.credentials.getSecret("<vaultURI>", "<secretName>")
notebookutils.fs.mount(  
    "abfss://mycontainer@<accountname>.dfs.core.windows.net",  
    "/test",  
    {"accountKey":accountKey}
)

Ukázkový kód pro sastoken:

# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
sasToken = notebookutils.credentials.getSecret("<vaultURI>", "<secretName>")
notebookutils.fs.mount(  
    "abfss://mycontainer@<accountname>.dfs.core.windows.net",  
    "/test",  
    {"sasToken":sasToken}
)

Parametry připojení:

  • fileCacheTimeout: Objekty blob se ve výchozím nastavení ukládají do místní dočasné složky po dobu 120 sekund. Během této doby blobfuse nekontroluje, jestli je soubor aktuální nebo ne. Parametr může být nastaven tak, aby změnil výchozí časový limit. Pokud několik klientů současně upravuje soubory, aby nedocházelo k nekonzistence mezi místními a vzdálenými soubory, doporučujeme zkrátit dobu mezipaměti nebo dokonce změnit na 0 a vždy získat nejnovější soubory ze serveru.
  • časový limit: Časový limit operace připojení je ve výchozím nastavení 120 sekund. Parametr může být nastaven tak, aby změnil výchozí časový limit. Pokud je příliš mnoho úloh nebo pokud vyprší časový limit pro připojení disku, doporučujeme zvýšit hodnotu.

Můžete použít následující parametry:

notebookutils.fs.mount(
   "abfss://mycontainer@<accountname>.dfs.core.windows.net",
   "/test",
   {"fileCacheTimeout": 120, "timeout": 120}
)

Note

Pro účely zabezpečení se doporučuje vyhnout vkládání přihlašovacích údajů přímo do kódu. Pro ochranu vašich přihlašovacích údajů jsou všechny tajnosti zobrazené ve výstupech poznámkového bloku redigovány. Další informace najdete v tématu Redaction tajného kódu.

Jak namontovat lakehouse

Vzorový kód pro připojení jezera k /<mount_name>:

notebookutils.fs.mount( 
 "abfss://<workspace_name>@onelake.dfs.fabric.microsoft.com/<lakehouse_name>.Lakehouse", 
 "/<mount_name>"
)

Přístup k souborům pod přípojným bodem pomocí rozhraní API notebookutils fs

Hlavním účelem operace připojení je umožnit zákazníkům přístup k datům uloženým ve vzdáleném účtu úložiště pomocí místního rozhraní API systému souborů. K datům můžete přistupovat také pomocí rozhraní API služby notebookutils fs s připojenou cestou jako parametrem. Tento formát cesty je trochu odlišný.

Předpokládejme, že jste kontejner Data Lake Storage Gen2 připojili mycontainer k /test pomocí rozhraní API pro připojení. Při přístupu k datům pomocí místního rozhraní API systému souborů je formát cesty podobný tomuto:

/synfs/notebook/{sessionId}/test/{filename}

Pokud chcete získat přístup k datům pomocí rozhraní API služby notebookutils fs, doporučujeme použít getMountPath(), abyste získali přesnou cestu:

path = notebookutils.fs.getMountPath("/test")
  • Seznam adresářů:

    notebookutils.fs.ls(f"file://{notebookutils.fs.getMountPath('/test')}")
    
  • Přečti obsah souboru:

    notebookutils.fs.head(f"file://{notebookutils.fs.getMountPath('/test')}/myFile.txt")
    
  • Vytvořte adresář:

    notebookutils.fs.mkdirs(f"file://{notebookutils.fs.getMountPath('/test')}/newdir")
    

Přístup k souborům pod přípojným bodem prostřednictvím místní cesty

Soubory můžete snadno číst a zapisovat do přípojného bodu pomocí standardního systému souborů. Tady je příklad Pythonu:

#File read
with open(notebookutils.fs.getMountPath('/test2') + "/myFile.txt", "r") as f:
    print(f.read())
#File write
with open(notebookutils.fs.getMountPath('/test2') + "/myFile.txt", "w") as f:
    print(f.write("dummy data"))

Postup kontroly stávajících přípojných bodů

Ke kontrole všech existujících informací o přípojných bodech můžete použít rozhraní API notebookutils.fs.mounts( ):

notebookutils.fs.mounts()

Jak odpojit přípojný bod

Pomocí následujícího kódu odpojte přípojný bod (/otestujte v tomto příkladu):

notebookutils.fs.unmount("/test")

Známá omezení

  • Aktuální připojení je konfigurace na úrovni úlohy; Doporučujeme použít rozhraní API pro připojení ke kontrole, jestli přípojný bod existuje nebo není k dispozici.

  • Mechanismus odpojování se nepoužije automaticky. Po dokončení spuštění aplikace je potřeba odpojit přípojný bod a uvolnit místo na disku, musíte v kódu explicitně volat rozhraní API pro odpojení. V opačném případě bude přípojný bod stále existovat v uzlu po dokončení spuštění aplikace.

  • Připojení účtu úložiště ADLS Gen1 se nepodporuje.

Nástroje lakehouse

notebookutils.lakehouse poskytuje nástroje přizpůsobené pro správu položek Lakehouse. Tyto nástroje umožňují snadno vytvářet, získávat, aktualizovat a odstraňovat artefakty Lakehouse.

Přehled metod

Tady je přehled dostupných metod poskytovaných notebookutils.lakehouse:

# Create a new Lakehouse artifact
create(name: String, description: String = "", definition: ItemDefinition = null, workspaceId: String = ""): Artifact

# Create Lakehouse with Schema Support
create(name: String, description: String = "", definition: {"enableSchemas": True}): Artifact

# Retrieve a Lakehouse artifact
get(name: String, workspaceId: String = ""): Artifact

# Get a Lakehouse artifact with properties
getWithProperties(name: String, workspaceId: String = ""): Artifact

# Update an existing Lakehouse artifact
update(name: String, newName: String, description: String = "", workspaceId: String = ""): Artifact

# Delete a Lakehouse artifact
delete(name: String, workspaceId: String = ""): Boolean 

# List all Lakehouse artifacts
list(workspaceId: String = "", maxResults: Int = 1000): Array[Artifact]

# List all tables in a Lakehouse artifact
listTables(lakehouse: String, workspaceId: String = "", maxResults: Int = 1000): Array[Table] 

# Starts a load table operation in a Lakehouse artifact
loadTable(loadOption: collection.Map[String, Any], table: String, lakehouse: String, workspaceId: String = ""): Array[Table] 

Příklady použití

Pokud chcete tyto metody efektivně využít, zvažte následující příklady použití:

Vytvoření datového lakehouse

artifact = notebookutils.lakehouse.create("artifact_name", "Description of the artifact", "optional_workspace_id")
# Create Lakehouse with Schema Support
artifact = notebookutils.lakehouse.create("artifact_name", "Description of the artifact", {"enableSchemas": True})

Získání Lakehouse

artifact = notebookutils.lakehouse.get("artifact_name", "optional_workspace_id")
artifact = notebookutils.lakehouse.getWithProperties("artifact_name", "optional_workspace_id")

Aktualizace lakehouse

updated_artifact = notebookutils.lakehouse.update("old_name", "new_name", "Updated description", "optional_workspace_id")

Odstranění Lakehouse

is_deleted = notebookutils.lakehouse.delete("artifact_name", "optional_workspace_id")

Seznam lakehouses v pracovním prostoru

artifacts_list = notebookutils.lakehouse.list("optional_workspace_id")

Výpis všech tabulek v datovém skladu Lakehouse

artifacts_tables_list = notebookutils.lakehouse.listTables("artifact_name", "optional_workspace_id")

Spuštění operace načtení tabulky v prostředí Lakehouse

notebookutils.lakehouse.loadTable(
    {
        "relativePath": "Files/myFile.csv",
        "pathType": "File",
        "mode": "Overwrite",
        "recursive": False,
        "formatOptions": {
            "format": "Csv",
            "header": True,
            "delimiter": ","
        }
    }, "table_name", "artifact_name", "optional_workspace_id")

Další informace

Podrobnější informace o jednotlivých metodách a jejích parametrech najdete v této notebookutils.lakehouse.help("methodName") funkci.

Nástroje modulu runtime

Zobrazení informací o kontextu relace

Díky notebookutils.runtime.context můžete získat kontextové informace o aktuálním živém sezení, včetně názvu poznámkového bloku, výchozího lakehouse, informací o pracovním prostoru, zda jde o spuštění pipeline, atd.

notebookutils.runtime.context

Následující tabulka popisuje vlastnosti.

Parameter Explanation
currentNotebookName Název aktuálního poznámkového bloku
currentNotebookId Jedinečné ID aktuálního poznámkového bloku
currentWorkspaceName Název aktuálního pracovního prostoru
currentWorkspaceId ID aktuálního pracovního prostoru
defaultLakehouseName Zobrazovaný název výchozího jezera, pokud je definován
defaultLakehouseId ID výchozího jezera, pokud je definováno
defaultLakehouseWorkspaceName Název pracovního prostoru výchozího lakehouse, pokud je definován
defaultLakehouseWorkspaceId ID pracovního prostoru výchozího Lakehouse, pokud je definováno
currentRunId V referenčním spuštění aktuální ID spuštění
parentRunId V referenčním spuštění s vnořenými spuštěními je toto ID nadřazeného spuštění.
rootRunId V referenčním spuštění s vnořenými spuštěními je toto ID kořenového spuštění.
isForPipeline Jestli je spuštění pro potrubí
isReferenceRun Zda je aktuální spuštění referenčním spuštěním
referenceTreePath Stromová struktura vnořených referenčních běhů, používaná pouze pro hierarchii snímků na monitorovací stránce L2
rootNotebookId (Pouze v referenčním spuštění) ID kořenového poznámkového bloku v referenčním spuštění
rootNotebookName (Pouze v referenčním spuštění) Název kořenového poznámkového bloku v referenčním spuštění
rootWorkspaceId (Pouze v referenčním spuštění) ID pracovního prostoru kořenového poznámkového bloku v referenčním spuštění
rootWorkspaceName (Pouze v referenčním spuštění) Název pracovního prostoru kořenového poznámkového bloku v referenčním spuštění
activityId ID úlohy Livy pro aktuální aktivitu
hcRepId ID REPL v režimu vysoké souběžnosti
clusterId Identita clusteru Synapse Spark
poolName Název fondu Spark, který se používá
environmentId ID prostředí, ve kterém je úloha spuštěná.
environmentWorkspaceId ID pracovního prostoru prostředí
userId ID aktuálního uživatele
userName Uživatelské jméno aktuálního uživatele

Správa relací

Zastavení interaktivní relace

Místo ručního kliknutí na tlačítko Zastavit je někdy vhodnější zastavit interaktivní relaci voláním rozhraní API v kódu. V takových případech poskytujeme rozhraní API notebookutils.session.stop(), které podporuje zastavení interaktivní relace prostřednictvím kódu, je k dispozici pro Scala a PySpark.

notebookutils.session.stop()

notebookutils.session.stop() API asynchronně zastaví aktuální interaktivní relaci na pozadí. Zastaví také Spark relaci a uvolní prostředky, které relace obsazuje, takže jsou k dispozici pro jiné relace ve stejném fondu.

Restartování interpreta Pythonu

Nástroj notebookutils.session poskytuje způsob, jak restartovat interpret Pythonu.

notebookutils.session.restartPython()

Considerations:

  • V případě spuštění odkazu na poznámkový blok restartPython() restartuje interpret Pythonu pouze aktuálního poznámkového bloku.
  • Ve výjimečných případech může příkaz selhat kvůli reflexnímu mechanismu Sparku, proto může přidání opakování problém řešit.

Proměnné knihovní nástroje

Note

Nástroje knihovny proměnných v poznámkových blocích jsou ve verzi Preview.

Knihovny proměnných dávají možnost vyhnout se hardkodování hodnot ve vašem notebookovém kódu. Hodnoty v knihovně můžete aktualizovat místo úprav kódu. Poznámkový sešit odkazuje na knihovnu proměnných k získání těchto hodnot. Tento přístup zjednodušuje opakované použití kódu napříč týmy a projekty tím, že využívá centrálně spravovanou knihovnu.

Pro přehled dostupných metod spusťte následující příkazy:

notebookutils.variableLibrary.help()

Output

[Preview] notebookutils.variableLibrary is a utility to Variable Library.

Below is overview about the available methods:

get(variableReference: String): String
-> Run the variable value with type.
getLibrary(variableLibraryName: String): VariableLibrary
-> Get the variable library.
Use notebookutils.variableLibrary.help("methodName") for more info about a method.

Definování proměnné v knihovně proměnných

Před použitím notebookutils.variableLibrarydefinujte proměnné jako první .

Snímek obrazovky se seznamem proměnných v knihovně proměnných

Načti knihovnu proměnných z Notebooku

samplevl = notebookutils.variableLibrary.getLibrary("sampleVL")

samplevl.test_int
samplevl.test_str
val samplevl = notebookutils.variableLibrary.getLibrary("sampleVL")

samplevl.test_int
samplevl.test_str
samplevl <- notebookutils.variableLibrary.getLibrary("sampleVL")

samplevl.test_int
samplevl.test_str

Příklad dynamického používání proměnné.

samplevl = notebookutils.variableLibrary.getLibrary("sampleVL")

file_path = f"abfss://{samplevl.Workspace_name}@onelake.dfs.fabric.microsoft.com/{samplevl.Lakehouse_name}.Lakehouse/Files/<FileName>.csv" 
df = spark.read.format("csv").option("header","true").load(file_path) 

display(df) 

Přístup k jedné proměnné pomocí odkazu

notebookutils.variableLibrary.get("$(/**/samplevl/test_int)")
notebookutils.variableLibrary.get("$(/**/samplevl/test_str)")
notebookutils.variableLibrary.get("$(/**/samplevl/test_bool)")
notebookutils.variableLibrary.get("$(/**/samplevl/test_int)")
notebookutils.variableLibrary.get("$(/**/samplevl/test_str)")
notebookutils.variableLibrary.get("$(/**/samplevl/test_bool)")
notebookutils.variableLibrary.get("$(/**/samplevl/test_int)")
notebookutils.variableLibrary.get("$(/**/samplevl/test_str)")
notebookutils.variableLibrary.get("$(/**/samplevl/test_bool)")

Note

  • Rozhraní notebookutils.variableLibrary API podporuje přístup pouze k knihovnám proměnných v rámci stejného pracovního prostoru.
  • Načítání knihoven proměnných mezi pracovními prostory není během referenčního spuštění podporováno v podřízených notebookách.
  • Kód poznámkového bloku odkazuje na proměnné definované v aktivní sadě hodnot knihovny proměnných.

Známé problémy

  • Pokud používáte verzi modulu runtime vyšší než 1.2 a spustíte notebookutils.help(), uvedená rozhraní fabricClient, rozhraní API PBIClient se prozatím nepodporují, budou v další části k dispozici. V poznámkových blocích Scala se navíc zatím nepodporuje rozhraní API pro přihlašovací údaje .

  • Poznámkový blok Pythonu nepodporuje rozhraní API pro zastavení a restartováníPythonu při použití nástroje notebookutils.session pro správu relací.

  • V současné době se hlavní název služby (SPN) nepodporuje pro nástroje knihovny proměnných.