A Fabric MSSparkUtils bemutatása
A Microsoft Spark Utilities (MSSparkUtils) egy beépített csomag, amely segít a gyakori feladatok egyszerű végrehajtásában. AzMSSparkUtils használatával együttműködhet a fájlrendszerekkel, lekérheti a környezeti változókat, összekapcsolhatja a jegyzetfüzeteket, és titkos kódokkal dolgozhat. Az MSSparkUtils a PySpark (Python) Scala, a SparkR-jegyzetfüzetek és a Microsoft Fabric-folyamatokban érhető el.
Fontos
A Microsoft Fabric előzetes verzióban érhető el.
Fájlrendszer-segédprogramok
Az mssparkutils.fs segédprogramokat biztosít a különböző fájlrendszerek, például a Azure Data Lake Storage Gen2 (ADLS Gen2) és a Azure Blob Storage használatához. Győződjön meg arról, hogy megfelelően konfigurálja a hozzáférést Azure Data Lake Storage Gen2 és Azure Blob Storage.
Futtassa az alábbi parancsokat az elérhető módszerek áttekintéséhez:
from notebookutils import mssparkutils
mssparkutils.fs.help()
Kimenet
mssparkutils.fs provides utilities for working with various FileSystems.
Below is overview about the available methods:
cp(from: String, to: String, recurse: Boolean = false): Boolean -> Copies a file or directory, possibly across FileSystems
mv(from: String, to: String, recurse: Boolean = false): Boolean -> Moves a file or directory, possibly across FileSystems
ls(dir: String): Array -> Lists the contents of a directory
mkdirs(dir: String): Boolean -> Creates the given directory if it does not exist, also creating any necessary parent directories
put(file: String, contents: String, overwrite: Boolean = false): Boolean -> Writes the given String out to a file, encoded in UTF-8
head(file: String, maxBytes: int = 1024 * 100): String -> Returns up to the first 'maxBytes' bytes of the given file as a String encoded in UTF-8
append(file: String, content: String, createFileIfNotExists: Boolean): Boolean -> Append the content to a file
rm(dir: String, recurse: Boolean = false): Boolean -> Removes a file or directory
exists(file: String): Boolean -> Check if a file or directory exists
mount(source: String, mountPoint: String, extraConfigs: Map[String, Any]): Boolean -> Mounts the given remote storage directory at the given mount point
unmount(mountPoint: String): Boolean -> Deletes a mount point
mounts(): Array[MountPointInfo] -> Show information about what is mounted
getMountPath(mountPoint: String, scope: String = ""): String -> Gets the local path of the mount point
Use mssparkutils.fs.help("methodName") for more info about a method.
Az mssparkutils ugyanúgy működik a fájlrendszerrel, mint a Spark API-k. Vegyük például az mssparkuitls.fs.mkdirs() és a Fabric Lakehouse használatát:
Használat | Relatív elérési út a HDFS-gyökérből | Az ABFS fájlrendszer abszolút elérési útja | A helyi fájlrendszer abszolút elérési útja az illesztőprogram-csomópontban |
---|---|---|---|
Nemdefault-tóház | Nem támogatott | mssparkutils.fs.mkdirs("abfss://< container_name>@<storage_account_name.dfs.core.windows.net/<> new_dir>") | mssparkutils.fs.mkdirs("fájl:/<new_dir>") |
Alapértelmezett lakehouse | Könyvtár a "Files" vagy a "Tables" alatt: mssparkutils.fs.mkdirs("Files/<new_dir>") | mssparkutils.fs.mkdirs("abfss://< container_name>@<storage_account_name.dfs.core.windows.net/<> new_dir>") | mssparkutils.fs.mkdirs("fájl:/<new_dir>") |
Fájlok listázása
Listázhatja egy könyvtár tartalmát, használhatja a mssparkutils.fs.ls('Saját könyvtár elérési útja'), például:
mssparkutils.fs.ls("Files/tmp") # works with the default lakehouse files using relative path
mssparkutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<path>") # based on ABFS file system
mssparkutils.fs.ls("file:/tmp") # based on local file system of driver node
Fájltulajdonságok megtekintése
Fájltulajdonságokat ad vissza, beleértve a fájlnevet, a fájl elérési útját, a fájlméretet, valamint azt, hogy könyvtárról és fájlról van-e szó.
files = mssparkutils.fs.ls('Your directory path')
for file in files:
print(file.name, file.isDir, file.isFile, file.path, file.size)
Új könyvtár létrehozása
Létrehozza az adott könyvtárat, ha nem létezik, és a szükséges szülőkönyvtárakat.
mssparkutils.fs.mkdirs('new directory name')
mssparkutils.fs. mkdirs("Files/<new_dir>") # works with the default lakehouse files using relative path
mssparkutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<new_dir>") # based on ABFS file system
mssparkutils.fs.ls("file:/<new_dir>") # based on local file system of driver node
Fájl másolása
Fájl vagy könyvtár másolása. Támogatja a fájlrendszerek közötti másolást.
mssparkutils.fs.cp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively
Fájltartalom előnézete
Az UTF-8-ban kódolt sztringként visszaadja az adott fájl első "maxBytes" bájtját.
mssparkutils.fs.head('file path', maxBytes to read)
Fájl áthelyezése
Fájl vagy könyvtár áthelyezése. Támogatja a fájlrendszerek közötti áthelyezést.
mssparkutils.fs.mv('source file or directory', 'destination directory', True) # Set the last parameter as True to firstly create the parent directory if it does not exist
Fájl írása
Az adott sztringet egy UTF-8 kóddal kódolt fájlba írja ki. Az adott sztringet egy UTF-8 kóddal kódolt fájlba írja ki.
mssparkutils.fs.put("file path", "content to write", True) # Set the last parameter as True to overwrite the file if it existed already
Tartalom hozzáfűzése egy fájlhoz
Hozzáfűzi az adott sztringet egy UTF-8 kóddal kódolt fájlhoz.
mssparkutils.fs.append("file path", "content to append", True) # Set the last parameter as True to create the file if it does not exist
Fájl vagy könyvtár törlése
Fájl vagy könyvtár eltávolítása.
mssparkutils.fs.rm('file path', True) # Set the last parameter as True to remove all files and directories recursively
Könyvtár csatlakoztatása/leválasztása
A részletes használati adatokat a Fájl csatlakoztatása és leválasztása című témakörben találja.
Jegyzetfüzet-segédprogramok
Az MSSparkUtils Notebook Utilities használatával futtathat egy jegyzetfüzetet, vagy kiléphet egy értékekkel rendelkező jegyzetfüzetből. Futtassa a következő parancsot az elérhető módszerek áttekintéséhez:
mssparkutils.notebook.help()
Kimeneti:
exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map): String -> This method runs a notebook and returns its exit value.
Jegyzetfüzet hivatkozása
Hivatkozzon egy jegyzetfüzetre, és adja vissza a kilépési értékét. A beágyazott függvényhívásokat interaktívan vagy folyamatként is futtathatja egy jegyzetfüzetben. A hivatkozott jegyzetfüzet azon a Spark-készleten fut, amelynek jegyzetfüzete meghívja ezt a függvényt.
mssparkutils.notebook.run("notebook name", <timeoutSeconds>, <parameterMap>)
Például:
mssparkutils.notebook.run("Sample1", 90, {"input": 20 })
A cellakimenetben megnyithatja a referenciafuttatás pillanatkép-hivatkozását, a pillanatkép rögzíti a kódfuttatás eredményeit, és lehetővé teszi a referenciafuttatások egyszerű hibakeresését.
Megjegyzés
A Hálójegyzetfüzet jelenleg csak a munkaterületen belüli hivatkozási jegyzetfüzeteket támogatja.
Jegyzetfüzetből való kilépés
Kilép egy értékekkel rendelkező jegyzetfüzetből. A beágyazott függvényhívásokat interaktívan vagy folyamatként is futtathatja egy jegyzetfüzetben.
- Amikor interaktív módon meghív egy exit() függvényt egy jegyzetfüzetből, a Háló jegyzetfüzet kivételt jelez, kihagyja az alhálózati cellák futtatását, és életben tartja a Spark-munkamenetet.
- Amikor egy kilépési() függvényt meghívó folyamatban vezényl egy jegyzetfüzetet, a jegyzetfüzet-tevékenység egy kilépési értékkel tér vissza, befejezi a folyamat futtatását, és leállítja a Spark-munkamenetet.
- Amikor meghív egy exit() függvényt egy hivatkozott jegyzetfüzetben, a Fabric Spark leállítja a hivatkozott jegyzetfüzet további végrehajtását, és folytatja a run() függvényt meghívó fő jegyzetfüzet következő celláinak futtatását. Például: A Jegyzetfüzet1 három cellával rendelkezik, és meghív egy exit() függvényt a második cellában. A Notebook2 öt cellával rendelkezik, és meghívja a run(notebook1) elemet a harmadik cellában. A Notebook2 futtatásakor a Notebook1 a második cellánál áll meg, amikor eléri a exit() függvényt. A Notebook2 továbbra is futtatja a negyedik és az ötödik celláját.
mssparkutils.notebook.exit("value string")
Például:
1. mintajegyzetfüzet a következő két cellával:
Az 1. cella egy bemeneti paramétert határoz meg, amelynek alapértelmezett értéke 10.
A 2. cella kilépési értékként adja meg a bemenetet tartalmazó jegyzetfüzetet.
A Minta1 egy másik, alapértelmezett értékekkel rendelkező jegyzetfüzetben is futtatható:
exitVal = mssparkutils.notebook.run("Sample1")
print (exitVal)
Kimeneti:
Notebook executed successfully with exit value 10
A Minta1 egy másik jegyzetfüzetben is futtatható, és a bemeneti értéket 20-ra állíthatja:
exitVal = mssparkutils.notebook.run("Sample1", 90, {"input": 20 })
print (exitVal)
Kimeneti:
Notebook executed successfully with exit value 20
Hitelesítő adatok segédprogramok
Az MSSparkUtils hitelesítő adatok segédprogramjaival lekérheti a hozzáférési jogkivonatokat, és kezelheti a titkos kódokat az Azure Key Vault.
Futtassa a következő parancsot az elérhető módszerek áttekintéséhez:
mssparkutils.credentials.help()
Kimeneti:
getToken(audience, name): returns AAD token for a given audience, name (optional)
getSecret(akvName, secret): returns AKV secret for a given akvName, secret key
Jogkivonat lekérése
Visszaadja Azure AD jogkivonatot egy adott célközönséghez, név (nem kötelező), Az alábbi lista a jelenleg elérhető célközönségkulcsokat jeleníti meg:
- Tároló célközönségének erőforrása: "storage"
- Power BI-erőforrás: "pbi"
- Azure Key Vault-erőforrás: "keyvault"
- Synapse RTA KQL DB-erőforrás: "kusto"
Futtassa a következő parancsot a jogkivonat lekéréséhez:
mssparkutils.credentials.getToken('audience Key')
Titkos kód lekérése felhasználói hitelesítő adatokkal
Egy adott Azure-Key Vault nevét, titkos nevét és társított szolgáltatásnevét adja vissza az Azure Key Vault titkos kódjának visszaadása felhasználói hitelesítő adatokkal.
mssparkutils.credentials.getSecret('azure key vault name','secret name')
Fájl csatlakoztatása és leválasztása
A Microsoft Fabric támogatja a csatlakoztatási forgatókönyveket a Microsoft Spark Utilities csomagban. A csatlakoztatási, leválasztási, getMountPath() és mounts() API-k segítségével távoli tárolást (Azure Data Lake Storage Gen2) csatolhat az összes munkacsomóponthoz (illesztőprogram-csomópont és feldolgozó csomópont). Miután a tároló csatlakoztatási pontja elkészült, a helyi fájl API-val úgy érheti el az adatokat, mintha a helyi fájlrendszerben tárolva lenne.
ADLS Gen2-fiók csatlakoztatása
Ez a szakasz bemutatja, hogyan csatlakoztathat Azure Data Lake Storage Gen2 lépésről lépésre példaként. A Blob Storage csatlakoztatása hasonlóan működik.
A példa feltételezi, hogy van egy storegen2 nevű Data Lake Storage Gen2 fiókja. A fiókhoz tartozik egy mycontainer nevű tároló, amelyet csatlakoztatni szeretne a /test fájlhoz a jegyzetfüzet Spark-munkamenetében.
A mycontainer nevű tároló csatlakoztatásához az mssparkutilsnak először ellenőriznie kell, hogy rendelkezik-e engedéllyel a tároló eléréséhez. A Microsoft Fabric jelenleg két hitelesítési módszert támogat az eseményindító csatlakoztatási műveletéhez: accountKey és sastoken.
Csatlakoztatás közös hozzáférésű jogosultságkód-jogkivonattal vagy fiókkulccsal
Az Mssparkutils támogatja a fiókkulcs vagy a közös hozzáférésű jogosultságkód (SAS) token paraméterként történő átadását a cél csatlakoztatásához.
Biztonsági okokból javasoljuk, hogy a fiókkulcsokat vagy SAS-jogkivonatokat az Azure Key Vault tárolja (ahogy az alábbi képernyőképen látható). Ezután lekérheti őket az mssparkutils.credentials.getSecret API használatával. Az Azure Key Vault használatáról az Azure Key Vault felügyelt tárfiókkulcsok ismertetése című témakörben olvashat.
Íme az accountKey metódus használatának mintakódja:
from notebookutils import mssparkutils
# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
accountKey = mssparkutils.credentials.getSecret("<vaultURI>", "<secretName>")
mssparkutils.fs.mount(
"abfss://mycontainer@<accountname>.dfs.core.windows.net",
"/test",
{"accountKey":accountKey}
)
Sastoken esetén tekintse meg a következő mintakódot:
from notebookutils import mssparkutils
# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
sasToken = mssparkutils.credentials.getSecret("<vaultURI>", "<secretName>")
mssparkutils.fs.mount(
"abfss://mycontainer@<accountname>.dfs.core.windows.net",
"/test",
{"sasToken":sasToken}
)
Megjegyzés
Biztonsági okokból nem ajánlott a hitelesítő adatokat kódban tárolni. A hitelesítő adatok további védelme érdekében újra ki fogjuk alakítani a titkos kódot a jegyzetfüzet kimenetében. További részletekért tekintse meg a titkos kód újrakiválasztását ismertető cikket.
Tótárház csatlakoztatása
Az alábbi mintakód egy tóház /test csatlakoztatásához.
from notebookutils import mssparkutils
mssparkutils.fs.mount(
"abfss://<workspace_id>@msit-onelake.dfs.fabric.microsoft.com/<lakehouse_id>",
"/test"
)
A csatlakoztatási pont alatti fájlok elérése az mssparktuils fs API használatával
A csatlakoztatási művelet fő célja, hogy lehetővé tegye az ügyfelek számára a távoli tárfiókban tárolt adatok elérését egy helyi fájlrendszer API használatával. Az adatokhoz az mssparkutils fs API-val is hozzáférhet, paraméterként csatlakoztatott elérési úttal. Az itt használt elérésiút-formátum kissé eltérő.
Tegyük fel, hogy csatlakoztatta a Data Lake Storage Gen2 mycontainer tárolót a /test parancshoz a csatlakoztatási API használatával. Ha egy helyi fájlrendszer API-val fér hozzá az adatokhoz, az elérési út formátuma a következő:
/synfs/notebook/{sessionId}/test/{filename}
Ha az mssparkutils fs API-val szeretne hozzáférni az adatokhoz, javasoljuk, hogy a getMountPath() használatával kérje le a pontos elérési utat:
path = mssparkutils.fs.getMountPath("/test")
Címtárak listázása:
mssparkutils.fs.ls(f"file://{mssparkutils.fs.getMountPath('/test')}")
Fájltartalom olvasása:
mssparkutils.fs.head(f"file://{mssparkutils.fs.getMountPath('/test')}/myFile.txt")
Hozzon létre egy könyvtárat:
mssparkutils.fs.mkdirs(f"file://{mssparkutils.fs.getMountPath('/test')}/newdir")
A csatlakoztatási pont alatti fájlok elérése helyi elérési úton
A fájlokat egyszerűen olvashatja és írhatja a csatlakoztatási pontban szabványos fájlrendszeri módszerrel, példaként a Pythont használhatja:
#File read
with open(mssparkutils.fs.getMountPath('/test2') + "/myFile.txt", "r") as f:
print(f.read())
#File write
with open(mssparkutils.fs.getMountPath('/test2') + "/myFile.txt", "w") as f:
print(f.write("dummy data"))
Meglévő csatlakoztatási pontok ellenőrzése
Az mssparkutils.fs.mounts() API használatával ellenőrizheti az összes meglévő csatlakoztatási pont adatait:
mssparkutils.fs.mounts()
A csatlakoztatási pont leválasztása
A csatlakoztatási pont leválasztásához használja az alábbi kódot (/ebben a példában a tesztelést):
mssparkutils.fs.unmount("/test")
Ismert korlátozások
- Az aktuális csatlakoztatás egy feladatszintű konfiguráció, ezért javasoljuk, hogy használja a mounts API-t annak ellenőrzéséhez, hogy létezik-e csatlakoztatási pont, vagy nem érhető el.
- A leválasztott mechanizmus nem automatikus. Ha az alkalmazás futása befejeződött, a csatlakoztatási pont leválasztásához a lemezterület felszabadításához explicit módon meg kell hívnia egy leválasztott API-t a kódban. Ellenkező esetben a csatlakoztatási pont az alkalmazás futásának befejeződése után is megmarad a csomópontban.
- Az ADLS Gen1-tárfiók csatlakoztatása nem támogatott.