Bagikan melalui


NotebookUtils (mantan MSSparkUtils) untuk Fabric

Utilitas Notebook (NotebookUtils) adalah paket bawaan untuk membantu Anda melakukan tugas umum dengan mudah di Fabric Notebook. Anda dapat menggunakan NotebookUtils untuk bekerja dengan sistem file, mendapatkan variabel lingkungan, menghubungkan buku catatan secara berantai, dan mengelola rahasia. Paket NotebookUtils tersedia di PySpark (Python), Scala, notebook SparkR, dan alur Fabric.

Catatan

  • MsSparkUtils secara resmi diganti namanya menjadi NotebookUtils. Kode yang ada tetap kompatibel ke belakang dan tidak menyebabkan perubahan yang merusak. Sangat disarankan untuk meningkatkan ke notebookutils untuk memastikan dukungan dan akses berkelanjutan ke fitur baru. Namespace mssparkutils akan dihentikan nantinya.
  • NotebookUtils dirancang untuk bekerja dengan Spark 3.4(Runtime v1.2) dan yang lebih baru. Semua fitur dan pembaruan terbaru hanya didukung oleh namespace notebookutils mulai sekarang.

Utilitas sistem file

notebookutils.fs menyediakan utilitas untuk bekerja dengan berbagai sistem file, termasuk Azure Data Lake Storage (ADLS) Gen2 dan Azure Blob Storage. Pastikan Anda mengonfigurasi akses ke Azure Data Lake Storage Gen2 dan Azure Blob Storage dengan tepat.

Jalankan perintah berikut untuk mendapatkan gambaran umum metode yang tersedia:

notebookutils.fs.help()

Hasil

notebookutils.fs provides utilities for working with various FileSystems.

Below is overview about the available methods:

cp(from: String, to: String, recurse: Boolean = false): Boolean -> Copies a file or directory, possibly across FileSystems
fastcp(from: String, to: String, recurse: Boolean = true): Boolean -> [Preview] Copies a file or directory via azcopy, possibly across FileSystems
mv(from: String, to: String, createPath: Boolean = false, overwrite: Boolean = false): Boolean -> Moves a file or directory, possibly across FileSystems
ls(dir: String): Array -> Lists the contents of a directory
mkdirs(dir: String): Boolean -> Creates the given directory if it does not exist, also creating any necessary parent directories
put(file: String, contents: String, overwrite: Boolean = false): Boolean -> Writes the given String out to a file, encoded in UTF-8
head(file: String, maxBytes: int = 1024 * 100): String -> Returns up to the first 'maxBytes' bytes of the given file as a String encoded in UTF-8
append(file: String, content: String, createFileIfNotExists: Boolean): Boolean -> Append the content to a file
rm(dir: String, recurse: Boolean = false): Boolean -> Removes a file or directory
exists(file: String): Boolean -> Check if a file or directory exists
mount(source: String, mountPoint: String, extraConfigs: Map[String, Any]): Boolean -> Mounts the given remote storage directory at the given mount point
unmount(mountPoint: String): Boolean -> Deletes a mount point
mounts(): Array[MountPointInfo] -> Show information about what is mounted
getMountPath(mountPoint: String, scope: String = ""): String -> Gets the local path of the mount point

Use notebookutils.fs.help("methodName") for more info about a method.

NotebookUtils bekerja dengan sistem file dengan cara yang sama seperti API Spark. Ambil sebagai contoh notebookutils.fs.mkdirs() dan penggunaan Fabric lakehouse misalnya:

Penggunaan Jalur relatif dari akar HDFS Jalur absolut untuk sistem file ABFS Jalur absolut untuk sistem file lokal dalam simpul driver
Lakehouse tidak standar Tidak didukung notebookutils.fs.mkdirs("abfss://< container_name>@<storage_account_name.dfs.core.windows.net/>< new_dir>") notebookutils.fs.mkdirs("file:/<new_dir>")
Lakehouse bawaan Direktori di bawah 'File' atau 'Tabel': notebookutils.fs.mkdirs("Files/<new_dir>") notebookutils.fs.mkdirs("abfss://< container_name>@<storage_account_name.dfs.core.windows.net/>< new_dir>") notebookutils.fs.mkdirs("file:/<new_dir>")

Catatan

  • Untuk Lakehouse default, jalur file dipasang di Notebook Anda dengan batas waktu cache file default 120 detik. Ini berarti bahwa file di-cache di folder sementara lokal Notebook selama 120 detik, bahkan jika file dihapus dari Lakehouse. Jika Anda ingin mengubah aturan batas waktu, Anda dapat melepas jalur file Lakehouse default dan memasangnya lagi dengan nilai fileCacheTimeout yang berbeda.
  • Untuk konfigurasi Lakehouse non-default, Anda dapat mengatur parameter fileCacheTimeout yang sesuai selama proses pemasangan jalur Lakehouse. Mengatur batas waktu ke 0 memastikan bahwa file terbaru diambil dari server Lakehouse.

Daftar file

Untuk mencantumkan konten direktori, gunakan notebookutils.fs.ls('Jalur direktori Anda'). Contohnya:

notebookutils.fs.ls("Files/tmp") # The relatvie path may work with different base path, details in below 
notebookutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<path>")  # The absolute path, like: ABFS file system
notebookutils.fs.ls("file:/tmp")  # The full path of the local file system of driver node

API notebookutils.fs.ls() berakibat berbeda saat menggunakan jalur relatif, tergantung pada jenis notebook.

  • Di notebook Spark: Jalur relatif adalah relatif terhadap jalur default ABFSS Lakehouse. Misalnya, notebookutils.fs.ls("Files") menunjuk ke direktori Files di Lakehouse default.

    Contohnya:

    notebookutils.fs.ls("Files/sample_datasets/public_holidays.parquet")
    
  • Dalam buku catatan Python: Jalur relatif di sini mengacu pada direktori kerja dari sistem berkas lokal, yang secara default adalah /home/trusted-service-user/work. Oleh karena itu, Anda harus menggunakan jalur lengkap alih-alih jalur relatif notebookutils.fs.ls("/lakehouse/default/Files") untuk mengakses direktori Files di Lakehouse default.

    Contohnya:

    notebookutils.fs.ls("/lakehouse/default/Files/sample_datasets/public_holidays.parquet")
    

Lihat properti file

Metode ini mengembalikan properti file termasuk nama file, jalur file, ukuran file, dan apakah itu direktori dan file.

files = notebookutils.fs.ls('Your directory path')
for file in files:
    print(file.name, file.isDir, file.isFile, file.path, file.size)

Buat direktori baru

Metode ini akan membuat direktori yang diberikan jika direktori tersebut belum ada, serta membuat semua direktori induk yang diperlukan.

notebookutils.fs.mkdirs('new directory name')  
notebookutils.fs.mkdirs("Files/<new_dir>")  # works with the default lakehouse files using relative path 
notebookutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<new_dir>")  # based on ABFS file system 
notebookutils.fs.ls("file:/<new_dir>")  # based on local file system of driver node 

Salin file

Metode ini menyalin file atau direktori, dan mendukung aktivitas salin di seluruh sistem file.

notebookutils.fs.cp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively

Catatan

Karena keterbatasan pintasan OneLake, ketika Anda perlu menggunakan notebookutils.fs.cp() untuk menyalin data dari pintasan jenis S3/GCS, disarankan untuk menggunakan jalur yang dipasang alih-alih jalur abfss.

Salinan file yang berkinerja tinggi

Metode ini menawarkan pendekatan yang lebih efisien untuk menyalin atau memindahkan file, terutama saat berhadapan dengan volume data besar. Untuk peningkatan performa pada Fabric, disarankan untuk menggunakan fastcp sebagai pengganti metode tradisional cp .

Catatan

  • notebookutils.fs.fastcp() tidak mendukung penyalinan file di OneLake di seluruh wilayah. Dalam hal ini, Anda dapat menggunakan notebookutils.fs.cp() sebagai gantinya.
  • Karena keterbatasan pintasan OneLake, ketika Anda perlu menggunakan notebookutils.fs.fastcp() untuk menyalin data dari pintasan jenis S3/GCS, disarankan untuk menggunakan jalur yang dipasang alih-alih jalur abfss.
notebookutils.fs.fastcp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively

Pratinjau isi file

Metode ini mengembalikan hingga 'maxBytes' byte pertama dari file yang diberikan sebagai String yang dikodekan dalam UTF-8.

notebookutils.fs.head('file path', maxBytes to read)

Pindahkan file

Metode ini memindahkan file atau direktori, dan mendukung perpindahan di seluruh sistem file.

notebookutils.fs.mv('source file or directory', 'destination directory', True) # Set the last parameter as True to firstly create the parent directory if it does not exist
notebookutils.fs.mv('source file or directory', 'destination directory', True, True) # Set the third parameter to True to firstly create the parent directory if it does not exist. Set the last parameter to True to overwrite the updates.

Menyimpan file

Metode ini menulis string yang diberikan ke file, dikodekan dalam UTF-8.

notebookutils.fs.put("file path", "content to write", True) # Set the last parameter as True to overwrite the file if it existed already

Tambahkan konten ke file

Metode ini menambahkan string yang diberikan ke file, yang dikodekan dalam UTF-8.

notebookutils.fs.append("file path", "content to append", True) # Set the last parameter as True to create the file if it does not exist

Catatan

  • notebookutils.fs.append() dan notebookutils.fs.put() tidak mendukung penulisan bersamaan ke file yang sama karena kurangnya jaminan atomitas.
  • Saat menggunakan API notebookutils.fs.append dalam perulangan for untuk menulis ke file yang sama, sebaiknya menambahkan pernyataan sleep dengan jeda sekitar 0,5 s ~ 1 s antara penulisan yang berulang. Rekomendasi ini karena operasi internal notebookutils.fs.append dari API flush bersifat asinkron, sehingga penundaan singkat membantu memastikan integritas data.

Hapus file atau direktori

Metode ini menghapus file atau direktori.

notebookutils.fs.rm('file path', True) # Set the last parameter as True to remove all files and directories recursively

Memasang/melepas direktori

Temukan informasi selengkapnya tentang penggunaan terperinci di Pemasangan dan pembongkaran file.

Utilitas laptop

Gunakan Utilitas Notebook untuk menjalankan buku catatan atau keluar dari buku catatan sambil menyertakan nilai. Jalankan perintah berikut untuk mendapatkan gambaran umum tentang metode yang tersedia:

notebookutils.notebook.help()

Hasil:


The notebook module.

exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map, workspace: String): String -> This method runs a notebook and returns its exit value.
runMultiple(DAG: Any): Map[String, MsNotebookRunResult] -> [Preview] Runs multiple notebooks concurrently with support for dependency relationships.
validateDAG(DAG: Any): Boolean -> [Preview] This method check if the DAG is correctly defined.

[Preview] Below methods are only support Fabric Notebook.
create(name: String, description: String = "", content: String = "", defaultLakehouse: String = "", defaultLakehouseWorkspace: String = "", workspaceId: String = ""): Artifact -> Create a new Notebook.
get(name: String, workspaceId: String = ""): Artifact -> Get a Notebook by name or id.
update(name: String, newName: String, description: String = "", workspaceId: String = ""): Artifact -> Update a Artifact by name.
delete(name: String, workspaceId: String = ""): Boolean -> Delete a Notebook by name.
list(workspaceId: String = "", maxResults: Int = 1000): Array[Artifact] -> List all Notebooks in the workspace.
updateDefinition(name: String, content: String = "", defaultLakehouse: String = "", defaultLakehouseWorkspace: String = "", workspaceId: String = "") -> Update the definition of a Notebook.

Use notebookutils.notebook.help("methodName") for more info about a method.

Catatan

Utilitas notebook tidak berlaku untuk definisi kerja Apache Spark (SJD).

Rujuk buku catatan

Metode ini mereferensikan buku catatan dan mengembalikan nilai keluarnya. Anda bisa menjalankan panggilan fungsi nesting di notebook secara interaktif atau dalam alur. Notebook yang dirujuk berjalan pada kumpulan Spark dari notebook yang memanggil fungsi ini.

notebookutils.notebook.run("notebook name", <timeoutSeconds>, <parameterMap>, <workspaceId>)

Contohnya:

notebookutils.notebook.run("Sample1", 90, {"input": 20 })

Fabric notebook juga mendukung referensi notebook di beberapa ruang kerja dengan menentukan ID ruang kerja.

notebookutils.notebook.run("Sample1", 90, {"input": 20 }, "fe0a6e2a-a909-4aa3-a698-0a651de790aa")

Anda dapat membuka tautan rekam jepret dari referensi yang dijalankan dalam output sel. Rekam jepret mengambil hasil eksekusi kode dan memungkinkan Anda untuk dengan mudah men-debug eksekusi referensi.

Cuplikan layar hasil eksekusi referensi.

Cuplikan layar contoh rekam jepret.

Catatan

  • Notebook referensi lintas ruang kerja didukung oleh runtime versi 1.2 ke atas.
  • Jika Anda menggunakan file di bawah Sumber Daya Buku Catatan, gunakan notebookutils.nbResPath di buku catatan yang dirujuk untuk memastikannya menunjuk ke folder yang sama dengan eksekusi interaktif.

Referensi menjalankan beberapa buku catatan secara paralel

Penting

Fitur ini sedang dalam tahap pratinjau.

Metode notebookutils.notebook.runMultiple() ini memungkinkan Anda menjalankan beberapa notebook secara paralel atau dengan struktur topologi yang telah ditentukan sebelumnya. API menggunakan mekanisme implementasi multi-thread dalam sesi Spark, yang berarti buku catatan referensi yang berjalan akan berbagi sumber daya komputasi.

Dengan notebookutils.notebook.runMultiple(), Anda dapat:

  • Jalankan beberapa notebook secara bersamaan, tanpa menunggu masing-masing selesai.

  • Tentukan dependensi dan urutan eksekusi untuk notebook Anda, menggunakan format JSON sederhana.

  • Optimalkan penggunaan sumber daya komputasi Spark dan kurangi biaya proyek Fabric Anda.

  • Tampilkan Cuplikan dari setiap catatan eksekusi notebook dalam output, dan debug dan pantau tugas notebook Anda dengan mudah.

  • Dapatkan nilai keluar dari setiap aktivitas eksekutif dan gunakan dalam tugas hilir.

Anda juga dapat mencoba menjalankan notebookutils.notebook.help("runMultiple") untuk menemukan contoh dan penggunaan terperinci.

Berikut adalah contoh sederhana menjalankan daftar buku catatan secara paralel menggunakan metode ini:


notebookutils.notebook.runMultiple(["NotebookSimple", "NotebookSimple2"])

Hasil eksekusi dari buku catatan akar adalah sebagai berikut:

Cuplikan layar referensi daftar buku catatan.

Berikut adalah contoh menjalankan notebook dengan struktur topologis menggunakan notebookutils.notebook.runMultiple(). Gunakan metode ini untuk mengatur notebook dengan mudah melalui pengalaman kode.

# run multiple notebooks with parameters
DAG = {
    "activities": [
        {
            "name": "NotebookSimple", # activity name, must be unique
            "path": "NotebookSimple", # notebook path
            "timeoutPerCellInSeconds": 90, # max timeout for each cell, default to 90 seconds
            "args": {"p1": "changed value", "p2": 100}, # notebook parameters
        },
        {
            "name": "NotebookSimple2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 2", "p2": 200}
        },
        {
            "name": "NotebookSimple2.2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 3", "p2": 300},
            "retry": 1,
            "retryIntervalInSeconds": 10,
            "dependencies": ["NotebookSimple"] # list of activity names that this activity depends on
        }
    ],
    "timeoutInSeconds": 43200, # max timeout for the entire DAG, default to 12 hours
    "concurrency": 50 # max number of notebooks to run concurrently, default to 50
}
notebookutils.notebook.runMultiple(DAG, {"displayDAGViaGraphviz": False})

Hasil eksekusi dari buku catatan akar adalah sebagai berikut:

Cuplikan layar yang menunjukkan daftar notebook dengan parameter.

Kami juga menyediakan metode untuk memeriksa apakah DAG ditentukan dengan benar.

notebookutils.notebook.validateDAG(DAG)

Catatan

  • Tingkat paralelisme dari penjalankan beberapa notebook dibatasi oleh total sumber daya komputasi yang ada dari sesi Spark.
  • Batas atas untuk aktivitas notebook atau notebook bersamaan adalah 50. Melebihi batas ini dapat menyebabkan masalah stabilitas dan performa karena penggunaan sumber daya komputasi yang tinggi. Jika masalah muncul, pertimbangkan untuk memisahkan notebook menjadi beberapa runMultiple panggilan atau mengurangi keserempakan dengan menyesuaikan bidang keserempakan dalam parameter DAG.
  • Batas waktu default untuk seluruh DAG adalah 12 jam, dan batas waktu default untuk setiap sel dalam buku catatan anak adalah 90 detik. Anda dapat mengubah batas waktu dengan mengatur timeoutInSeconds dan timeoutPerCellInSeconds bidang pada parameter DAG.

Keluar dari notebook

Metode ini mengakhiri sesi notebook dengan memberikan nilai. Anda bisa menjalankan panggilan fungsi nesting di notebook secara interaktif atau dalam alur.

  • Saat Anda memanggil fungsi exit() dari notebook secara interaktif, notebook Fabric melempar pengecualian, melewatkan eksekusi sel berikutnya, dan menjaga sesi Spark tetap hidup.

  • Saat Anda mengatur buku catatan dalam alur yang memanggil fungsi exit(), aktivitas notebook akan kembali dengan nilai keluar. Ini menyelesaikan proses alur dan menghentikan sesi Spark.

  • Ketika Anda memanggil fungsi exit() di notebook yang sedang dirujuk, Fabric Spark akan menghentikan eksekusi lebih lanjut dari notebook yang dirujuk, dan terus menjalankan sel berikutnya di notebook utama yang memanggil fungsi run(). Misalnya: Notebook1 memiliki tiga sel dan memanggil fungsi exit() di sel kedua. Notebook2 memiliki lima sel dan memanggil run(notebook1) di sel ketiga. Saat Anda menjalankan Notebook2, Notebook1 berhenti di sel kedua saat menekan fungsi exit(). Notebook2 terus menjalankan sel keempat dan sel kelimanya.

notebookutils.notebook.exit("value string")

Catatan

Fungsi exit() menimpa output sel saat ini. Untuk menghindari kehilangan output pernyataan kode lain, panggil notebookutils.notebook.exit() di sel terpisah.

Contohnya:

Buku catatan Sample1 dengan dua sel berikut:

  • Sel 1 menentukan parameter input dengan nilai default yang diatur ke 10.

  • Sel 2 keluar dari notebook dengan input sebagai nilai keluaran.

Cuplikan layar memperlihatkan contoh buku catatan fungsi keluar.

Anda bisa menjalankan Sample1 di notebook lain dengan nilai default:

exitVal = notebookutils.notebook.run("Sample1")
print (exitVal)

Hasil:

Notebook is executed successfully with exit value 10

Anda bisa menjalankan Sample1 di notebook lainnya dan mengatur nilai input sebagai 20:

exitVal = notebookutils.notebook.run("Sample1", 90, {"input": 20 })
print (exitVal)

Hasil:

Notebook is executed successfully with exit value 20

Mengelola item buku catatan

notebookutils.notebook menyediakan utilitas khusus untuk mengelola item Notebook secara terprogram. API ini dapat membantu Anda membuat, mendapatkan, memperbarui, dan menghapus item Notebook dengan mudah.

Untuk menggunakan metode ini secara efektif, pertimbangkan contoh penggunaan berikut:

Membuat Buku Catatan

with open("/path/to/notebook.ipynb", "r") as f:
    content = f.read()

artifact = notebookutils.notebook.create("artifact_name", "description", "content", "default_lakehouse_name", "default_lakehouse_workspace_id", "optional_workspace_id")

Mendapatkan konten Buku Catatan

artifact = notebookutils.notebook.get("artifact_name", "optional_workspace_id")

Memperbarui Buku Catatan

updated_artifact = notebookutils.notebook.update("old_name", "new_name", "optional_description", "optional_workspace_id")
updated_artifact_definition = notebookutils.notebook.updateDefinition("artifact_name",  "content", "default_lakehouse_name", "default_Lakehouse_Workspace_name", "optional_workspace_id")

Menghapus Buku Catatan

is_deleted = notebookutils.notebook.delete("artifact_name", "optional_workspace_id")

Mencantumkan Buku Catatan di ruang kerja

artifacts_list = notebookutils.notebook.list("optional_workspace_id")

Utilitas Fungsi Data Pengguna (UDF)

notebookutils.udf menyediakan utilitas yang dirancang untuk mengintegrasikan kode Notebook dengan User Data Functions (UDF). Utilitas ini memungkinkan Anda mengakses fungsi dari item UDF dalam ruang kerja yang sama atau di berbagai ruang kerja. Anda kemudian dapat memanggil fungsi dalam item UDF sesuai kebutuhan.

Berikut adalah gambaran umum metode yang tersedia:

# Get functions
myFunctions = notebookutils.udf.getFunctions('UDFItemName') # Get functions from UDF within the same workspace
myFunctions = notebookutils.udf.getFunctions('UDFItemName', 'workspaceId') # Get functions from UDF across different workspace

# Additional helper method to return all functions, their respective parameters, and types.
display(myFunctions.functionDetails)
display(myFunctions.itemDetails)

# Invoke the function
myFunctions.functionName('value1', 'value2')
myFunctions.functionName(parameter1='value1', parameter2='value2'...) # Another way to invoke the function

Mengambil fungsi dari UDF

myFunctions = notebookutils.udf.getFunctions('UDFItemName')
myFunctions = notebookutils.udf.getFunctions('UDFItemName', 'workspaceId')
var myFunctions = notebookutils.udf.getFunctions("UDFItemName")
var myFunctions = notebookutils.udf.getFunctions("UDFItemName", "workspaceId")
myFunctions <- notebookutils.udf.getFunctions("UDFItemName")
myFunctions <- notebookutils.udf.getFunctions("UDFItemName", "workspaceId")

Memanggil fungsi

myFunctions.functionName('value1', 'value2'...)
val res = myFunctions.functionName('value1', 'value2'...)
myFunctions$functionName('value1', 'value2'...)

Menampilkan detail untuk item UDF

display([myFunctions.itemDetails])
display(Array(myFunctions.itemDetails))
myFunctions$itemDetails()

Menampilkan detail fungsi untuk UDF

display(myFunctions.functionDetails)
display(myFunctions.functionDetails)
myFunctions$functionDetails()

Utilitas identitas

Anda dapat menggunakan Utilitas Kredensial untuk mendapatkan token akses dan mengelola rahasia di Azure Key Vault.

Jalankan perintah berikut untuk mendapatkan gambaran umum tentang metode yang tersedia:

notebookutils.credentials.help()

Hasil:

Help on module notebookutils.credentials in notebookutils:

NAME
    notebookutils.credentials - Utility for credentials operations in Fabric

FUNCTIONS
    getSecret(akvName, secret) -> str
        Gets a secret from the given Azure Key Vault.
        :param akvName: The name of the Azure Key Vault.
        :param secret: The name of the secret.
        :return: The secret value.
    
    getToken(audience) -> str
        Gets a token for the given audience.
        :param audience: The audience for the token.
        :return: The token.
    
    help(method_name=None)
        Provides help for the notebookutils.credentials module or the specified method.
        
        Examples:
        notebookutils.credentials.help()
        notebookutils.credentials.help("getToken")
        :param method_name: The name of the method to get help with.

DATA
    creds = <notebookutils.notebookutils.handlers.CredsHandler.CredsHandler...

FILE
    /home/trusted-service-user/cluster-env/trident_env/lib/python3.10/site-packages/notebookutils/credentials.py

Dapatkan token

getToken mengembalikan token Microsoft Entra untuk audiens dan nama tertentu (opsional). Daftar berikut ini memperlihatkan kunci audiens yang tersedia saat ini:

  • Sumber Daya Audiens Penyimpanan: "penyimpanan"
  • Sumber Daya Power BI: "pbi"
  • Azure Key Vault Sumber Daya: "keyvault"
  • Synapse RTA KQL DB Resource: "kusto"

Jalankan perintah berikut untuk mendapatkan token:

notebookutils.credentials.getToken('audience Key')

Dapatkan rahasia dengan menggunakan kredensial pengguna

getSecret mengembalikan rahasia Azure Key Vault untuk titik akhir Azure Key Vault tertentu dan nama rahasia menggunakan kredensial pengguna.

notebookutils.credentials.getSecret('https://<name>.vault.azure.net/', 'secret name')

Pemasangan dan pembongkaran file

Fabric mendukung skenario pemasangan berikut dalam paket Utilitas Microsoft Spark. Anda dapat menggunakan API mount, unmount, getMountPath(), dan mounts() untuk melampirkan penyimpanan jarak jauh (ADLS Gen2) ke semua node yang berfungsi (node driver dan node pekerja). Setelah titik pemasangan penyimpanan ada, gunakan API file lokal untuk mengakses data seolah-olah disimpan dalam sistem file lokal.

Cara memasang akun ADLS Gen2

Contoh berikut mengilustrasikan cara memasang Azure Data Lake Storage Gen2. Pemasangan Blob Storage juga berfungsi serupa.

Contoh ini mengasumsikan bahwa Anda memiliki satu akun Data Lake Storage Gen2 bernama storegen2, yang memiliki kontainer bernama mycontainer yang ingin Anda pasang ke /test di sesi Spark notebook Anda.

Cuplikan layar memperlihatkan tempat memilih kontainer untuk dipasang.

Untuk memasang kontainer yang disebut mycontainer, notebookutils terlebih dahulu perlu memeriksa apakah Anda memiliki izin untuk mengakses kontainer. Saat ini, Fabric mendukung dua metode autentikasi untuk operasi pemasangan pemicu: accountKey dan sastoken.

Memasang melalui token tanda tangan akses bersama atau kunci akun

NotebookUtils mendukung secara eksplisit meneruskan kunci akun atau token Tanda tangan akses bersama (SAS) sebagai parameter untuk memasang target.

Untuk alasan keamanan, kami sarankan Anda menyimpan kunci akun atau token SAS di Azure Key Vault (seperti yang ditunjukkan cuplikan layar berikut). Anda kemudian dapat mengambilnya dengan menggunakan notebookutils.credentials.getSecret API. Untuk informasi selengkapnya tentang Azure Key Vault, lihat Tentang kunci akun penyimpanan terkelola Azure Key Vault.

Cuplikan layar memperlihatkan tempat rahasia disimpan di Azure Key Vault.

Kode sampel untuk metode accountKey :

# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
accountKey = notebookutils.credentials.getSecret("<vaultURI>", "<secretName>")
notebookutils.fs.mount(  
    "abfss://mycontainer@<accountname>.dfs.core.windows.net",  
    "/test",  
    {"accountKey":accountKey}
)

Contoh kode untuk sastoken:

# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
sasToken = notebookutils.credentials.getSecret("<vaultURI>", "<secretName>")
notebookutils.fs.mount(  
    "abfss://mycontainer@<accountname>.dfs.core.windows.net",  
    "/test",  
    {"sasToken":sasToken}
)

Parameter pemasangan:

  • fileCacheTimeout: Blob di-cache di folder sementara lokal selama 120 detik secara default. Selama waktu ini, blobfuse tidak memeriksa apakah file sudah diperbarui atau tidak. Parameter dapat diatur untuk mengubah waktu tunggu default. Ketika beberapa klien memodifikasi file secara bersamaan, untuk menghindari inkonsistensi antara file lokal dan jarak jauh, sebaiknya persingkat waktu cache, atau bahkan mengubahnya menjadi 0, dan selalu mendapatkan file terbaru dari server.
  • batas waktu: Batas waktu operasi pemasangan adalah 120 detik secara default. Parameter dapat diatur untuk mengubah waktu tunggu default. Ketika ada terlalu banyak pelaksana atau ketika batas waktu pemasangan tercapai, disarankan untuk meningkatkan nilainya.

Anda dapat menggunakan parameter ini seperti ini:

notebookutils.fs.mount(
   "abfss://mycontainer@<accountname>.dfs.core.windows.net",
   "/test",
   {"fileCacheTimeout": 120, "timeout": 120}
)

Catatan

Untuk tujuan keamanan, disarankan untuk menghindari penyematan kredensial langsung dalam kode. Untuk lebih melindungi kredensial Anda, rahasia apa pun yang ditampilkan dalam output notebook diredaksi. Untuk informasi selengkapnya, lihat Redaksi rahasia.

Cara membangun rumah danau

Kode sampel untuk memasang lakehouse ke /<mount_name>:

notebookutils.fs.mount( 
 "abfss://<workspace_name>@onelake.dfs.fabric.microsoft.com/<lakehouse_name>.Lakehouse", 
 "/<mount_name>"
)

Mengakses file di bawah titik pemasangan dengan menggunakan notebookutils fs API

Tujuan utama operasi pemasangan adalah untuk memungkinkan pelanggan mengakses data yang disimpan di akun penyimpanan jarak jauh dengan API sistem file lokal. Anda juga dapat mengakses data dengan menggunakan notebookutils fs API dengan jalur yang dipasang sebagai parameter. Format jalur ini sedikit berbeda.

Asumsikan bahwa Anda telah memasang kontainer Data Lake Storage Gen2 mycontainer ke /test dengan menggunakan API pemasangan. Saat Anda mengakses data dengan API sistem file lokal, format jalurnya seperti ini:

/synfs/notebook/{sessionId}/test/{filename}

Saat Anda ingin mengakses data dengan menggunakan notebookutils fs API, sebaiknya gunakan getMountPath() untuk mendapatkan jalur yang akurat:

path = notebookutils.fs.getMountPath("/test")
  • Daftar direktori:

    notebookutils.fs.ls(f"file://{notebookutils.fs.getMountPath('/test')}")
    
  • Membaca isi file:

    notebookutils.fs.head(f"file://{notebookutils.fs.getMountPath('/test')}/myFile.txt")
    
  • Membuat direktori:

    notebookutils.fs.mkdirs(f"file://{notebookutils.fs.getMountPath('/test')}/newdir")
    

Mengakses file di bawah titik pemasangan melalui jalur lokal

Anda dapat dengan mudah membaca dan menulis file di titik pemasangan menggunakan sistem file standar. Berikut adalah contoh Python:

#File read
with open(notebookutils.fs.getMountPath('/test2') + "/myFile.txt", "r") as f:
    print(f.read())
#File write
with open(notebookutils.fs.getMountPath('/test2') + "/myFile.txt", "w") as f:
    print(f.write("dummy data"))

Cara memeriksa titik pemasangan yang ada

Anda dapat menggunakan API notebookutils.fs.mounts() untuk memeriksa semua info titik pemasangan yang ada:

notebookutils.fs.mounts()

Cara melepaskan poin pemasangan

Gunakan kode berikut untuk melepas titik pemasangan Anda (/test dalam contoh ini):

notebookutils.fs.unmount("/test")

Pembatasan yang diketahui

  • Pemasangan saat ini adalah konfigurasi tingkat pekerjaan; kami sarankan Anda menggunakan API pemasangan untuk memeriksa apakah ada titik pemasangan atau tidak tersedia.

  • Mekanisme pelepasan tidak diterapkan secara otomatis. Ketika aplikasi selesai dijalankan, untuk melepas titik pemasangan dan melepaskan ruang disk, Anda perlu memanggil secara eksplisit API unmount dalam kode Anda. Jika tidak, titik pemasangan masih akan ada di node setelah eksekusi aplikasi selesai.

  • Memasang akun penyimpanan ADLS Gen1 tidak didukung.

Fasilitas Lakehouse

notebookutils.lakehouse menyediakan utilitas yang disesuaikan untuk mengelola item Lakehouse. Utilitas ini memberdayakan Anda untuk membuat, mendapatkan, memperbarui, dan menghapus artefak Lakehouse dengan mudah.

Gambaran umum metode

Berikut adalah gambaran umum metode yang tersedia yang disediakan oleh notebookutils.lakehouse:

# Create a new Lakehouse artifact
create(name: String, description: String = "", definition: ItemDefinition = null, workspaceId: String = ""): Artifact

# Retrieve a Lakehouse artifact
get(name: String, workspaceId: String = ""): Artifact

# Get a Lakehouse artifact with properties
getWithProperties(name: String, workspaceId: String = ""): Artifact

# Update an existing Lakehouse artifact
update(name: String, newName: String, description: String = "", workspaceId: String = ""): Artifact

# Delete a Lakehouse artifact
delete(name: String, workspaceId: String = ""): Boolean 

# List all Lakehouse artifacts
list(workspaceId: String = "", maxResults: Int = 1000): Array[Artifact]

# List all tables in a Lakehouse artifact
listTables(lakehouse: String, workspaceId: String = "", maxResults: Int = 1000): Array[Table] 

# Starts a load table operation in a Lakehouse artifact
loadTable(loadOption: collection.Map[String, Any], table: String, lakehouse: String, workspaceId: String = ""): Array[Table] 

Contoh penggunaan

Untuk menggunakan metode ini secara efektif, pertimbangkan contoh penggunaan berikut:

Membuat Rumah Tepi Danau

artifact = notebookutils.lakehouse.create("artifact_name", "Description of the artifact", "optional_workspace_id")

Mendapatkan Rumah Danau

artifact = notebookutils.lakehouse.get("artifact_name", "optional_workspace_id")
artifact = notebookutils.lakehouse.getWithProperties("artifact_name", "optional_workspace_id")

Memperbarui Lakehouse

updated_artifact = notebookutils.lakehouse.update("old_name", "new_name", "Updated description", "optional_workspace_id")

Menghapus Rumah Danau

is_deleted = notebookutils.lakehouse.delete("artifact_name", "optional_workspace_id")

Mencantumkan Lakehouse di ruang kerja

artifacts_list = notebookutils.lakehouse.list("optional_workspace_id")

Menampilkan semua tabel pada Lakehouse

artifacts_tables_list = notebookutils.lakehouse.listTables("artifact_name", "optional_workspace_id")

Memulai operasi tabel beban di Lakehouse

notebookutils.lakehouse.loadTable(
    {
        "relativePath": "Files/myFile.csv",
        "pathType": "File",
        "mode": "Overwrite",
        "recursive": False,
        "formatOptions": {
            "format": "Csv",
            "header": True,
            "delimiter": ","
        }
    }, "table_name", "artifact_name", "optional_workspace_id")

Informasi Tambahan

Untuk informasi lebih rinci tentang setiap metode dan parameternya, gunakan notebookutils.lakehouse.help("methodName") fungsi .

Utilitas waktu jalan

Perlihatkan info konteks sesi

Dengan notebookutils.runtime.context Anda bisa mendapatkan informasi konteks sesi langsung yang sedang berlangsung, termasuk nama notebook, lakehouse yang ditetapkan sebagai default, info ruang kerja, jika itu adalah pemrosesan pipeline, dan lain-lain.

notebookutils.runtime.context

Tabel di bawah ini menguraikan properti.

Parameter Penjelasan
currentNotebookName Nama buku catatan saat ini
currentNotebookId ID unik buku catatan saat ini
currentWorkspaceName Nama ruang kerja saat ini
currentWorkspaceId ID ruang kerja saat ini
defaultLakehouseName Nama tampilan untuk lakehouse bawaan, jika sudah ditentukan
defaultLakehouseId ID lakehouse bawaan, jika ditentukan
defaultLakehouseWorkspaceName Nama lakehouse bawaan pada ruang kerja, jika ditentukan
defaultLakehouseWorkspaceId ID ruang kerja bawaan lakehouse, jika ditentukan
currentRunId Dalam proses acuan, ID proses saat ini
parentRunId Dalam eksekusi referensi dengan eksekusi berlapis, ini adalah ID eksekusi induk
rootRunId Dalam eksekusi referensi dengan eksekusi berlapis, ini adalah ID eksekusi root
isForPipeline Apakah proses berjalan untuk jalur
isReferenceRun Apakah pengoperasian saat ini adalah pengoperasian referensi
referenceTreePath Struktur pohon dari referensi bersarang, hanya digunakan untuk hierarki tangkapan layar di halaman L2 pemantauan
rootNotebookId (Hanya dalam eksekusi referensi) ID buku catatan akar dalam eksekusi referensi.
rootNotebookName (Hanya dalam eksekusi referensi) Nama buku catatan akar dalam eksekusi referensi.
rootWorkspaceId (Hanya dalam eksekusi referensi) ID ruang kerja buku catatan root dalam eksekusi referensi.
rootWorkspaceName (Hanya dalam eksekusi referensi) Nama ruang kerja dari buku catatan utama saat eksekusi referensi.
activityId ID pekerjaan Livy untuk aktivitas saat ini
hcRepId ID REPL dalam Mode Tingkat Penggunaan Tinggi
clusterId Identitas kluster Synapse Spark
poolName Nama kumpulan Spark yang digunakan
environmentId ID lingkungan tempat pekerjaan berjalan
environmentWorkspaceId ID lingkungan kerja
userId ID pengguna saat ini
userName Nama pengguna saat ini

Manajemen sesi

Menghentikan sesi interaktif

Alih-alih mengklik tombol berhenti secara manual, terkadang lebih nyaman untuk menghentikan sesi interaktif dengan memanggil API dalam kode. Untuk kasus seperti itu, kami menyediakan api notebookutils.session.stop() untuk mendukung penghentian sesi interaktif melalui kode, tersedia untuk Scala dan PySpark.

notebookutils.session.stop()

notebookutils.session.stop() API menghentikan sesi interaktif saat ini secara asinkron di latar belakang. Ini juga menghentikan sesi Spark dan melepaskan sumber daya yang digunakan oleh sesi tersebut, sehingga tersedia untuk sesi lain di kumpulan yang sama.

Mulai ulang penerjemah Python

utilitas notebookutils.session menyediakan cara untuk memulai ulang penerjemah Python.

notebookutils.session.restartPython()

Catatan

  • Dalam kasus eksekusi referensi buku catatan, restartPython() hanya memulai ulang penerjemah Python dari buku catatan saat ini yang sedang dirujuk.
  • Dalam kasus yang jarang terjadi, perintah mungkin gagal karena mekanisme refleksi Spark, menambahkan coba lagi dapat mengurangi masalah.

Masalah yang diketahui

  • Saat menggunakan versi runtime di atas 1.2 dan menjalankan notebookutils.help(), API fabricClient dan PBIClient yang tercantum tidak didukung untuk saat ini, namun akan tersedia di masa mendatang. Selain itu, API Kredensial tidak didukung di notebook Scala untuk saat ini.

  • Notebook Python tidak mendukung perintah menghentikan eksekusi, perintah restartPython API saat menggunakan utilitas notebookutils.session untuk manajemen sesi.