A Microsoft Spark segédprogramok bemutatása
A Microsoft Spark Utilities (MSSparkUtils) egy beépített csomag, amely megkönnyíti a gyakori feladatok elvégzését. Az MSSparkUtils használatával fájlrendszerekkel dolgozhat, környezeti változókat kérhet le, jegyzetfüzeteket láncba rendezhet és titkos kódokkal dolgozhat. Az MSSparkUtils a , Scala
, .NET Spark (C#)
és jegyzetfüzetekben és R (Preview)
Synapse-folyamatokban PySpark (Python)
érhető el.
Előfeltételek
Az Azure Data Lake Storage Gen2-hez való hozzáférés konfigurálása
A Synapse-jegyzetfüzetek a Microsoft Entra-átengedéssel férnek hozzá az ADLS Gen2-fiókokhoz. Az ADLS Gen2-fiók (vagy mappa) eléréséhez storage blobadat-közreműködőnek kell lennie.
A Synapse-folyamatok a munkaterület felügyeltszolgáltatás-identitását (MSI) használják a tárfiókok eléréséhez. Az MSSparkUtils folyamattevékenységekben való használatához a munkaterületi identitásnak tárolóblobadat-közreműködőnek kell lennie az ADLS Gen2-fiók (vagy mappa) eléréséhez.
Az alábbi lépéseket követve győződjön meg arról, hogy a Microsoft Entra-azonosító és a munkaterület MSI-jének hozzáférése van az ADLS Gen2-fiókhoz:
Nyissa meg az Azure Portalt és a elérni kívánt tárfiókot. Keresse meg azt a tárolót, amelyhez hozzá szeretne férni.
Válassza ki a hozzáférés-vezérlést (IAM) a bal oldali panelen.
Kattintson a Hozzáadás>Szerepkör-hozzárendelés hozzáadása lehetőségre a Szerepkör-hozzárendelés hozzáadása oldal megnyitásához.
Rendelje hozzá a következő szerepkört. A részletes lépésekért tekintse meg az Azure-szerepköröknek az Azure Portalon történő hozzárendelését ismertető cikket.
Beállítás Érték Szerepkör Storage blobadat-közreműködő Hozzáférés hozzárendelése a következőhöz: FELHASZNÁLÓ és MANAGEDIDENTITY Tagok Microsoft Entra-fiókja és munkaterületi identitása Feljegyzés
A felügyelt identitás neve a munkaterület neve is.
Válassza a Mentés lehetőséget.
Az ADLS Gen2-ben a Synapse Spark segítségével az alábbi URL-címen érheti el az adatokat:
abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<path>
Az Azure Blob Storage-hoz való hozzáférés konfigurálása
A Synapse közös hozzáférésű jogosultságkód (SAS) használatával éri el az Azure Blob Storage-t. Az SAS-kulcsok kódban való felfedésének elkerülése érdekében javasoljuk, hogy hozzon létre egy új társított szolgáltatást a Synapse-munkaterületen a elérni kívánt Azure Blob Storage-fiókhoz.
Az alábbi lépésekkel új társított szolgáltatást vehet fel egy Azure Blob Storage-fiókhoz:
- Nyissa meg az Azure Synapse Studiót.
- A bal oldali panelen válassza a Kezelés lehetőséget, majd a Külső kapcsolatok területen válassza a Csatolt szolgáltatások lehetőséget.
- Keressen az Azure Blob Storage-ban a jobb oldali Új társított szolgáltatás panelen.
- Válassza a Folytatás lehetőséget.
- Válassza ki az Azure Blob Storage-fiókot a társított szolgáltatásnév eléréséhez és konfigurálásához. Javasoljuk, hogy használja az Account key for the Authentication metódust.
- Válassza a Kapcsolat tesztelése lehetőséget a beállítások helyességének ellenőrzéséhez.
- A módosítások mentéséhez válassza a Létrehozás elemet, majd kattintson az Összes közzététele gombra.
Az Azure Blob Storage-on tárolt adatokat a Synapse Spark használatával a következő URL-címen érheti el:
wasb[s]://<container_name>@<storage_account_name>.blob.core.windows.net/<path>
Íme egy példa kódra:
from pyspark.sql import SparkSession
# Azure storage access info
blob_account_name = 'Your account name' # replace with your blob name
blob_container_name = 'Your container name' # replace with your container name
blob_relative_path = 'Your path' # replace with your relative folder path
linked_service_name = 'Your linked service name' # replace with your linked service name
blob_sas_token = mssparkutils.credentials.getConnectionStringOrCreds(linked_service_name)
# Allow SPARK to access from Blob remotely
wasb_path = 'wasbs://%s@%s.blob.core.windows.net/%s' % (blob_container_name, blob_account_name, blob_relative_path)
spark.conf.set('fs.azure.sas.%s.%s.blob.core.windows.net' % (blob_container_name, blob_account_name), blob_sas_token)
print('Remote blob path: ' + wasb_path)
val blob_account_name = "" // replace with your blob name
val blob_container_name = "" //replace with your container name
val blob_relative_path = "/" //replace with your relative folder path
val linked_service_name = "" //replace with your linked service name
val blob_sas_token = mssparkutils.credentials.getConnectionStringOrCreds(linked_service_name)
val wasbs_path = f"wasbs://$blob_container_name@$blob_account_name.blob.core.windows.net/$blob_relative_path"
spark.conf.set(f"fs.azure.sas.$blob_container_name.$blob_account_name.blob.core.windows.net",blob_sas_token)
var blob_account_name = ""; // replace with your blob name
var blob_container_name = ""; // replace with your container name
var blob_relative_path = ""; // replace with your relative folder path
var linked_service_name = ""; // replace with your linked service name
var blob_sas_token = Credentials.GetConnectionStringOrCreds(linked_service_name);
spark.Conf().Set($"fs.azure.sas.{blob_container_name}.{blob_account_name}.blob.core.windows.net", blob_sas_token);
var wasbs_path = $"wasbs://{blob_container_name}@{blob_account_name}.blob.core.windows.net/{blob_relative_path}";
Console.WriteLine(wasbs_path);
# Azure storage access info
blob_account_name <- 'Your account name' # replace with your blob name
blob_container_name <- 'Your container name' # replace with your container name
blob_relative_path <- 'Your path' # replace with your relative folder path
linked_service_name <- 'Your linked service name' # replace with your linked service name
blob_sas_token <- mssparkutils.credentials.getConnectionStringOrCreds(linked_service_name)
# Allow SPARK to access from Blob remotely
sparkR.session()
wasb_path <- sprintf('wasbs://%s@%s.blob.core.windows.net/%s',blob_container_name, blob_account_name, blob_relative_path)
sparkR.session(sprintf('fs.azure.sas.%s.%s.blob.core.windows.net',blob_container_name, blob_account_name), blob_sas_token)
print( paste('Remote blob path: ',wasb_path))
Az Azure Key Vaulthoz való hozzáférés konfigurálása
Az Azure Key Vaultot csatolt szolgáltatásként is hozzáadhatja, hogy a Synapse-ban kezelje a hitelesítő adatait. Kövesse az alábbi lépéseket egy Azure Key Vault Synapse-társított szolgáltatásként való hozzáadásához:
Nyissa meg az Azure Synapse Studiót.
A bal oldali panelen válassza a Kezelés lehetőséget, majd a Külső kapcsolatok területen válassza a Csatolt szolgáltatások lehetőséget.
Keressen az Azure Key Vaultban a jobb oldali Új társított szolgáltatás panelen.
Válassza ki az Azure Key Vault-fiókot a társított szolgáltatásnév eléréséhez és konfigurálásához.
Válassza a Kapcsolat tesztelése lehetőséget a beállítások helyességének ellenőrzéséhez.
Válassza az Első létrehozás lehetőséget, majd kattintson az Összes közzététele gombra a módosítás mentéséhez.
A Synapse-jegyzetfüzetek a Microsoft Entra-átengedéssel érik el az Azure Key Vaultot. A Synapse-folyamatok munkaterületi identitást (MSI) használnak az Azure Key Vault eléréséhez. Annak érdekében, hogy a kód a jegyzetfüzetben és a Synapse-folyamatban is működjön, javasoljuk, hogy titkos hozzáférési engedélyt adjon a Microsoft Entra-fiókhoz és a munkaterület identitásához is.
Kövesse az alábbi lépéseket a munkaterület identitásához való titkos hozzáférés biztosításához:
- Nyissa meg az Azure Portalt és a elérni kívánt Azure Key Vaultot.
- Válassza ki az Access-szabályzatokat a bal oldali panelen.
- Válassza a Hozzáférési szabályzat hozzáadása lehetőséget:
- Válassza a Kulcs, titkos kulcs és tanúsítványkezelés konfigurációs sablonként lehetőséget.
- Válassza ki a Microsoft Entra-fiókját és a munkaterület identitását (ugyanaz, mint a munkaterület neve) az egyszerű kijelölésben, vagy győződjön meg arról, hogy már ki van rendelve.
- Válassza a Kijelölés és a Hozzáadás lehetőséget.
- A módosítások véglegesítéséhez kattintson a Mentés gombra.
Fájlrendszer-segédprogramok
mssparkutils.fs
Segédprogramokat biztosít a különböző fájlrendszerek, például az Azure Data Lake Storage Gen2 (ADLS Gen2) és az Azure Blob Storage használatához. Győződjön meg arról, hogy megfelelően konfigurálja az Azure Data Lake Storage Gen2 és az Azure Blob Storage elérését.
Futtassa az alábbi parancsokat az elérhető módszerek áttekintéséhez:
from notebookutils import mssparkutils
mssparkutils.fs.help()
mssparkutils.fs.help()
using Microsoft.Spark.Extensions.Azure.Synapse.Analytics.Notebook.MSSparkUtils;
FS.Help()
library(notebookutils)
mssparkutils.fs.help()
Találatok:
mssparkutils.fs provides utilities for working with various FileSystems.
Below is overview about the available methods:
cp(from: String, to: String, recurse: Boolean = false): Boolean -> Copies a file or directory, possibly across FileSystems
mv(src: String, dest: String, create_path: Boolean = False, overwrite: Boolean = False): Boolean -> Moves a file or directory, possibly across FileSystems
ls(dir: String): Array -> Lists the contents of a directory
mkdirs(dir: String): Boolean -> Creates the given directory if it does not exist, also creating any necessary parent directories
put(file: String, contents: String, overwrite: Boolean = false): Boolean -> Writes the given String out to a file, encoded in UTF-8
head(file: String, maxBytes: int = 1024 * 100): String -> Returns up to the first 'maxBytes' bytes of the given file as a String encoded in UTF-8
append(file: String, content: String, createFileIfNotExists: Boolean): Boolean -> Append the content to a file
rm(dir: String, recurse: Boolean = false): Boolean -> Removes a file or directory
Use mssparkutils.fs.help("methodName") for more info about a method.
Fájlok listázása
Címtár tartalmának listázása.
mssparkutils.fs.ls('Your directory path')
mssparkutils.fs.ls("Your directory path")
FS.Ls("Your directory path")
mssparkutils.fs.ls("Your directory path")
Fájltulajdonságok megtekintése
Visszaadja a fájltulajdonságokat, beleértve a fájlnevet, a fájl elérési útját, a fájlméretet, a fájlmódosítás idejét, valamint azt, hogy könyvtárról és fájlról van-e szó.
files = mssparkutils.fs.ls('Your directory path')
for file in files:
print(file.name, file.isDir, file.isFile, file.path, file.size, file.modifyTime)
val files = mssparkutils.fs.ls("/")
files.foreach{
file => println(file.name,file.isDir,file.isFile,file.size,file.modifyTime)
}
var Files = FS.Ls("/");
foreach(var File in Files) {
Console.WriteLine(File.Name+" "+File.IsDir+" "+File.IsFile+" "+File.Size);
}
files <- mssparkutils.fs.ls("/")
for (file in files) {
writeLines(paste(file$name, file$isDir, file$isFile, file$size, file$modifyTime))
}
Új címtár létrehozása
Létrehozza a megadott könyvtárat, ha az nem létezik, és minden szükséges szülőkönyvtárat.
mssparkutils.fs.mkdirs('new directory name')
mssparkutils.fs.mkdirs("new directory name")
FS.Mkdirs("new directory name")
mssparkutils.fs.mkdirs("new directory name")
Fájl másolása
Fájl vagy könyvtár másolása. Támogatja a fájlrendszerek közötti másolást.
mssparkutils.fs.cp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively
mssparkutils.fs.cp("source file or directory", "destination file or directory", true) // Set the third parameter as True to copy all files and directories recursively
FS.Cp("source file or directory", "destination file or directory", true) // Set the third parameter as True to copy all files and directories recursively
mssparkutils.fs.cp('source file or directory', 'destination file or directory', True)
Performant copy file
Ezzel a módszerrel gyorsabban másolhat vagy áthelyezhet fájlokat, különösen nagy mennyiségű adatot.
mssparkutils.fs.fastcp('source file or directory', 'destination file or directory', True) # Set the third parameter as True to copy all files and directories recursively
Feljegyzés
A módszer csak az Apache Spark 3.3-hoz készült Azure Synapse Runtime-ban és az Apache Spark 3.4-hez készült Azure Synapse Runtime-ban támogatott.
Fájltartalom előnézete
Az UTF-8-ban kódolt sztringként visszaadja az adott fájl első "maxBytes" bájtját.
mssparkutils.fs.head('file path', maxBytes to read)
mssparkutils.fs.head("file path", maxBytes to read)
FS.Head("file path", maxBytes to read)
mssparkutils.fs.head('file path', maxBytes to read)
Fájl áthelyezése
Fájl vagy könyvtár áthelyezése. Támogatja a fájlrendszerek közötti áthelyezést.
mssparkutils.fs.mv('source file or directory', 'destination directory', True) # Set the last parameter as True to firstly create the parent directory if it does not exist
mssparkutils.fs.mv("source file or directory", "destination directory", true) // Set the last parameter as True to firstly create the parent directory if it does not exist
FS.Mv("source file or directory", "destination directory", true)
mssparkutils.fs.mv('source file or directory', 'destination directory', True) # Set the last parameter as True to firstly create the parent directory if it does not exist
Fájl írása
Az adott sztringet egy UTF-8 kóddal kódolt fájlba írja ki.
mssparkutils.fs.put("file path", "content to write", True) # Set the last parameter as True to overwrite the file if it existed already
mssparkutils.fs.put("file path", "content to write", true) // Set the last parameter as True to overwrite the file if it existed already
FS.Put("file path", "content to write", true) // Set the last parameter as True to overwrite the file if it existed already
mssparkutils.fs.put("file path", "content to write", True) # Set the last parameter as True to overwrite the file if it existed already
Tartalom hozzáfűzése fájlhoz
Hozzáfűzi a megadott sztringet egy UTF-8 kódolt fájlhoz.
mssparkutils.fs.append("file path", "content to append", True) # Set the last parameter as True to create the file if it does not exist
mssparkutils.fs.append("file path","content to append",true) // Set the last parameter as True to create the file if it does not exist
FS.Append("file path", "content to append", true) // Set the last parameter as True to create the file if it does not exist
mssparkutils.fs.append("file path", "content to append", True) # Set the last parameter as True to create the file if it does not exist
Fájl vagy könyvtár törlése
Fájl vagy könyvtár eltávolítása.
mssparkutils.fs.rm('file path', True) # Set the last parameter as True to remove all files and directories recursively
mssparkutils.fs.rm("file path", true) // Set the last parameter as True to remove all files and directories recursively
FS.Rm("file path", true) // Set the last parameter as True to remove all files and directories recursively
mssparkutils.fs.rm('file path', True) # Set the last parameter as True to remove all files and directories recursively
Jegyzetfüzet-segédprogramok
Nem támogatott.
Az MSSparkUtils Notebook Segédprogrammal futtathat egy jegyzetfüzetet, vagy kiléphet egy értékekkel rendelkező jegyzetfüzetből. Futtassa a következő parancsot az elérhető módszerek áttekintéséhez:
mssparkutils.notebook.help()
Eredmények lekérése:
The notebook module.
exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map): String -> This method runs a notebook and returns its exit value.
Feljegyzés
A jegyzetfüzet-segédprogramok nem alkalmazhatók az Apache Spark-feladatdefiníciókra (SJD).
Jegyzetfüzet hivatkozása
Hivatkozzon egy jegyzetfüzetre, és adja vissza a kilépési értékét. A beágyazott függvényhívásokat interaktívan vagy folyamatként is futtathatja egy jegyzetfüzetben. A hivatkozott jegyzetfüzet azon a Spark-készleten fut, amelynek a jegyzetfüzete meghívja ezt a függvényt.
mssparkutils.notebook.run("notebook path", <timeoutSeconds>, <parameterMap>)
Példa:
mssparkutils.notebook.run("folder/Sample1", 90, {"input": 20 })
A futtatás befejezése után megjelenik egy "Jegyzetfüzet futtatása megtekintése: Jegyzetfüzet neve" nevű pillanatkép-hivatkozás a cellakimenetben, a hivatkozásra kattintva megtekintheti az adott futtatás pillanatképét.
Hivatkozás több jegyzetfüzet párhuzamos futtatására
A módszer mssparkutils.notebook.runMultiple()
lehetővé teszi több jegyzetfüzet párhuzamos vagy előre definiált topológiai szerkezettel történő futtatását. Az API egy Spark-munkameneten belül többszálas implementációs mechanizmust használ, ami azt jelenti, hogy a számítási erőforrásokat a referenciajegyzetfüzet futtatja.
A következőkkel mssparkutils.notebook.runMultiple()
:
Egyszerre több jegyzetfüzet végrehajtása anélkül, hogy mindegyik befejeződik.
Egyszerű JSON-formátum használatával adja meg a jegyzetfüzetek függőségeit és végrehajtási sorrendjét.
Optimalizálhatja a Spark számítási erőforrásainak használatát, és csökkentheti a Synapse-projektek költségeit.
A kimenetben megtekintheti az egyes jegyzetfüzet-futtatási rekordok pillanatképeit, és kényelmesen hibakeresést/monitorozást végezhet a jegyzetfüzet-feladatokban.
Szerezze be az egyes vezetői tevékenységek kilépési értékét, és használja őket az alsóbb rétegbeli feladatokban.
Az mssparkutils.notebook.help("runMultiple") futtatásával is megkeresheti a példát és a részletes használatot.
Íme egy egyszerű példa a jegyzetfüzetek listájának párhuzamos futtatására ezzel a módszerrel:
mssparkutils.notebook.runMultiple(["NotebookSimple", "NotebookSimple2"])
A gyökérjegyzetfüzet végrehajtási eredménye a következő:
Az alábbi példa a topológiai struktúrával rendelkező jegyzetfüzetek futtatására mutat mssparkutils.notebook.runMultiple()
be. Ezzel a módszerrel könnyedén vezényelheti a jegyzetfüzeteket egy kódélményen keresztül.
# run multiple notebooks with parameters
DAG = {
"activities": [
{
"name": "NotebookSimple", # activity name, must be unique
"path": "NotebookSimple", # notebook path
"timeoutPerCellInSeconds": 90, # max timeout for each cell, default to 90 seconds
"args": {"p1": "changed value", "p2": 100}, # notebook parameters
},
{
"name": "NotebookSimple2",
"path": "NotebookSimple2",
"timeoutPerCellInSeconds": 120,
"args": {"p1": "changed value 2", "p2": 200}
},
{
"name": "NotebookSimple2.2",
"path": "NotebookSimple2",
"timeoutPerCellInSeconds": 120,
"args": {"p1": "changed value 3", "p2": 300},
"retry": 1,
"retryIntervalInSeconds": 10,
"dependencies": ["NotebookSimple"] # list of activity names that this activity depends on
}
]
}
mssparkutils.notebook.runMultiple(DAG)
Feljegyzés
- A módszer csak az Apache Spark 3.3-hoz készült Azure Synapse Runtime-ban és az Apache Spark 3.4-hez készült Azure Synapse Runtime-ban támogatott.
- A több jegyzetfüzet-futtatás párhuzamossági foka a Spark-munkamenet teljes rendelkezésre álló számítási erőforrására korlátozódik.
Jegyzetfüzet kilépése
Kilép egy értékekkel rendelkező jegyzetfüzetből. A beágyazott függvényhívásokat interaktívan vagy folyamatként is futtathatja egy jegyzetfüzetben.
Amikor interaktívan meghív egy exit() függvényt egy jegyzetfüzetből, az Azure Synapse kivételt jelez, kihagyja az alhálózati cellák futtatását, és életben tartja a Spark-munkamenetet.
Amikor egy Synapse-folyamatban függvényt
exit()
meghívó jegyzetfüzetet vezényel, az Azure Synapse visszaad egy kilépési értéket, befejezi a folyamatfuttatást, és leállítja a Spark-munkamenetet.Amikor meghív egy függvényt egy
exit()
hivatkozott jegyzetfüzetben, az Azure Synapse leállítja a hivatkozott jegyzetfüzet további végrehajtását, és folytatja a függvényt hívórun()
jegyzetfüzet következő celláinak futtatását. Például: A Jegyzetfüzet1 három cellával rendelkezik, és meghív egy függvénytexit()
a második cellában. A Jegyzetfüzet2 öt cellával és hívásokkalrun(notebook1)
rendelkezik a harmadik cellában. A Notebook2 futtatásakor a Jegyzetfüzet1 a második cellánál lesz leállítva, amikor eléri a függvénytexit()
. A Notebook2 továbbra is futtatja a negyedik és az ötödik celláját.
mssparkutils.notebook.exit("value string")
Példa:
Az 1 . mintajegyzetfüzet a mappa alatt található, és a következő két cellát tartalmazza:
- az 1. cella egy bemeneti paramétert határoz meg, amelynek alapértelmezett értéke 10.
- a 2. cella kilépési értékkel lép ki a jegyzetfüzetből.
A Minta1 egy másik, alapértelmezett értékekkel rendelkező jegyzetfüzetben is futtatható:
exitVal = mssparkutils.notebook.run("folder/Sample1")
print (exitVal)
Találatok:
Sample1 run success with input is 10
A Minta1 egy másik jegyzetfüzetben is futtatható, és a bemeneti érték 20 lehet:
exitVal = mssparkutils.notebook.run("mssparkutils/folder/Sample1", 90, {"input": 20 })
print (exitVal)
Találatok:
Sample1 run success with input is 20
Az MSSparkUtils Notebook Segédprogrammal futtathat egy jegyzetfüzetet, vagy kiléphet egy értékekkel rendelkező jegyzetfüzetből. Futtassa a következő parancsot az elérhető módszerek áttekintéséhez:
mssparkutils.notebook.help()
Eredmények lekérése:
The notebook module.
exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map): String -> This method runs a notebook and returns its exit value.
Jegyzetfüzet hivatkozása
Hivatkozzon egy jegyzetfüzetre, és adja vissza a kilépési értékét. A beágyazott függvényhívásokat interaktívan vagy folyamatként is futtathatja egy jegyzetfüzetben. A hivatkozott jegyzetfüzet azon a Spark-készleten fut, amelynek a jegyzetfüzete meghívja ezt a függvényt.
mssparkutils.notebook.run("notebook path", <timeoutSeconds>, <parameterMap>)
Példa:
mssparkutils.notebook.run("folder/Sample1", 90, Map("input" -> 20))
A futtatás befejezése után megjelenik egy "Jegyzetfüzet futtatása megtekintése: Jegyzetfüzet neve" nevű pillanatkép-hivatkozás a cellakimenetben, a hivatkozásra kattintva megtekintheti az adott futtatás pillanatképét.
Jegyzetfüzet kilépése
Kilép egy értékekkel rendelkező jegyzetfüzetből. A beágyazott függvényhívásokat interaktívan vagy folyamatként is futtathatja egy jegyzetfüzetben.
Amikor interaktívan meghív egy
exit()
függvényt egy jegyzetfüzetnek, az Azure Synapse kivételt fog kivenni, kihagyja az alhálózati cellák futtatását, és életben tartja a Spark-munkamenetet.Amikor egy Synapse-folyamatban függvényt
exit()
meghívó jegyzetfüzetet vezényel, az Azure Synapse visszaad egy kilépési értéket, befejezi a folyamatfuttatást, és leállítja a Spark-munkamenetet.Amikor meghív egy függvényt egy
exit()
hivatkozott jegyzetfüzetben, az Azure Synapse leállítja a hivatkozott jegyzetfüzet további végrehajtását, és folytatja a függvényt hívórun()
jegyzetfüzet következő celláinak futtatását. Például: A Jegyzetfüzet1 három cellával rendelkezik, és meghív egy függvénytexit()
a második cellában. A Jegyzetfüzet2 öt cellával és hívásokkalrun(notebook1)
rendelkezik a harmadik cellában. A Notebook2 futtatásakor a Jegyzetfüzet1 a második cellánál lesz leállítva, amikor eléri a függvénytexit()
. A Notebook2 továbbra is futtatja a negyedik és az ötödik celláját.
mssparkutils.notebook.exit("value string")
Példa:
Az 1. mintajegyzetfüzet az mssparkutils/folder/ alatt található, a következő két cellával:
- az 1. cella egy bemeneti paramétert határoz meg, amelynek alapértelmezett értéke 10.
- a 2. cella kilépési értékkel lép ki a jegyzetfüzetből.
A Minta1 egy másik, alapértelmezett értékekkel rendelkező jegyzetfüzetben is futtatható:
val exitVal = mssparkutils.notebook.run("mssparkutils/folder/Sample1")
print(exitVal)
Találatok:
exitVal: String = Sample1 run success with input is 10
Sample1 run success with input is 10
A Minta1 egy másik jegyzetfüzetben is futtatható, és a bemeneti érték 20 lehet:
val exitVal = mssparkutils.notebook.run("mssparkutils/folder/Sample1", 90, {"input": 20 })
print(exitVal)
Találatok:
exitVal: String = Sample1 run success with input is 20
Sample1 run success with input is 20
Az MSSparkUtils Notebook Segédprogrammal futtathat egy jegyzetfüzetet, vagy kiléphet egy értékekkel rendelkező jegyzetfüzetből. Futtassa a következő parancsot az elérhető módszerek áttekintéséhez:
mssparkutils.notebook.help()
Eredmények lekérése:
The notebook module.
exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map): String -> This method runs a notebook and returns its exit value.
Jegyzetfüzet hivatkozása
Hivatkozzon egy jegyzetfüzetre, és adja vissza a kilépési értékét. A beágyazott függvényhívásokat interaktívan vagy folyamatként is futtathatja egy jegyzetfüzetben. A hivatkozott jegyzetfüzet azon a Spark-készleten fut, amelynek a jegyzetfüzete meghívja ezt a függvényt.
mssparkutils.notebook.run("notebook path", <timeoutSeconds>, <parameterMap>)
Példa:
mssparkutils.notebook.run("folder/Sample1", 90, list("input": 20))
A futtatás befejezése után megjelenik egy "Jegyzetfüzet futtatása megtekintése: Jegyzetfüzet neve" nevű pillanatkép-hivatkozás a cellakimenetben, a hivatkozásra kattintva megtekintheti az adott futtatás pillanatképét.
Jegyzetfüzet kilépése
Kilép egy értékekkel rendelkező jegyzetfüzetből. A beágyazott függvényhívásokat interaktívan vagy folyamatként is futtathatja egy jegyzetfüzetben.
Amikor interaktívan meghív egy
exit()
függvényt egy jegyzetfüzetnek, az Azure Synapse kivételt fog kivenni, kihagyja az alhálózati cellák futtatását, és életben tartja a Spark-munkamenetet.Amikor egy Synapse-folyamatban függvényt
exit()
meghívó jegyzetfüzetet vezényel, az Azure Synapse visszaad egy kilépési értéket, befejezi a folyamatfuttatást, és leállítja a Spark-munkamenetet.Amikor meghív egy függvényt egy
exit()
hivatkozott jegyzetfüzetben, az Azure Synapse leállítja a hivatkozott jegyzetfüzet további végrehajtását, és folytatja a függvényt hívórun()
jegyzetfüzet következő celláinak futtatását. Például: A Jegyzetfüzet1 három cellával rendelkezik, és meghív egy függvénytexit()
a második cellában. A Jegyzetfüzet2 öt cellával és hívásokkalrun(notebook1)
rendelkezik a harmadik cellában. A Notebook2 futtatásakor a Jegyzetfüzet1 a második cellánál lesz leállítva, amikor eléri a függvénytexit()
. A Notebook2 továbbra is futtatja a negyedik és az ötödik celláját.
mssparkutils.notebook.exit("value string")
Példa:
Az 1 . mintajegyzetfüzet a mappa alatt található, és a következő két cellát tartalmazza:
- az 1. cella egy bemeneti paramétert határoz meg, amelynek alapértelmezett értéke 10.
- a 2. cella kilépési értékkel lép ki a jegyzetfüzetből.
A Minta1 egy másik, alapértelmezett értékekkel rendelkező jegyzetfüzetben is futtatható:
exitVal <- mssparkutils.notebook.run("folder/Sample1")
print (exitVal)
Találatok:
Sample1 run success with input is 10
A Minta1 egy másik jegyzetfüzetben is futtatható, és a bemeneti érték 20 lehet:
exitVal <- mssparkutils.notebook.run("mssparkutils/folder/Sample1", 90, list("input": 20))
print (exitVal)
Találatok:
Sample1 run success with input is 20
Hitelesítő adatok segédprogramok
Az MSSparkUtils hitelesítő adatok segédprogramjaival lekérheti a társított szolgáltatások hozzáférési jogkivonatait, és kezelheti a titkos kulcsokat az Azure Key Vaultban.
Futtassa a következő parancsot az elérhető módszerek áttekintéséhez:
mssparkutils.credentials.help()
mssparkutils.credentials.help()
Not supported.
mssparkutils.credentials.help()
Eredmény lekérése:
getToken(audience, name): returns AAD token for a given audience, name (optional)
isValidToken(token): returns true if token hasn't expired
getConnectionStringOrCreds(linkedService): returns connection string or credentials for linked service
getFullConnectionString(linkedService): returns full connection string with credentials
getPropertiesAll(linkedService): returns all the properties of a linked servicegetSecret(akvName, secret, linkedService): returns AKV secret for a given AKV linked service, akvName, secret key
getSecret(akvName, secret): returns AKV secret for a given akvName, secret key
getSecretWithLS(linkedService, secret): returns AKV secret for a given linked service, secret key
putSecret(akvName, secretName, secretValue, linkedService): puts AKV secret for a given akvName, secretName
putSecret(akvName, secretName, secretValue): puts AKV secret for a given akvName, secretName
putSecretWithLS(linkedService, secretName, secretValue): puts AKV secret for a given linked service, secretName
getToken(audience, name): returns AAD token for a given audience, name (optional)
isValidToken(token): returns true if token hasn't expired
getConnectionStringOrCreds(linkedService): returns connection string or credentials for linked service
getFullConnectionString(linkedService): returns full connection string with credentials
getPropertiesAll(linkedService): returns all the properties of a linked servicegetSecret(akvName, secret, linkedService): returns AKV secret for a given AKV linked service, akvName, secret key
getSecret(akvName, secret): returns AKV secret for a given akvName, secret key
getSecretWithLS(linkedService, secret): returns AKV secret for a given linked service, secret key
putSecret(akvName, secretName, secretValue, linkedService): puts AKV secret for a given akvName, secretName
putSecret(akvName, secretName, secretValue): puts AKV secret for a given akvName, secretName
putSecretWithLS(linkedService, secretName, secretValue): puts AKV secret for a given linked service, secretName
getToken(audience, name): returns AAD token for a given audience, name (optional)
isValidToken(token): returns true if token hasn't expired
getConnectionStringOrCreds(linkedService): returns connection string or credentials for linked service
getFullConnectionString(linkedService): returns full connection string with credentials
getPropertiesAll(linkedService): returns all the properties of a linked servicegetSecret(akvName, secret, linkedService): returns AKV secret for a given AKV linked service, akvName, secret key
getSecret(akvName, secret): returns AKV secret for a given akvName, secret key
putSecret(akvName, secretName, secretValue, linkedService): puts AKV secret for a given akvName, secretName
putSecret(akvName, secretName, secretValue): puts AKV secret for a given akvName, secretName
putSecretWithLS(linkedService, secretName, secretValue): puts AKV secret for a given linked service, secretName
Feljegyzés
A getSecretWithLS(linkedService, secret) jelenleg nem támogatott a C#-ban.
getToken(audience, name): returns AAD token for a given audience, name (optional)
isValidToken(token): returns true if token hasn't expired
getConnectionStringOrCreds(linkedService): returns connection string or credentials for linked service
getFullConnectionString(linkedService): returns full connection string with credentials
getPropertiesAll(linkedService): returns all the properties of a linked servicegetSecret(akvName, secret, linkedService): returns AKV secret for a given AKV linked service, akvName, secret key
getSecret(akvName, secret): returns AKV secret for a given akvName, secret key
getSecretWithLS(linkedService, secret): returns AKV secret for a given linked service, secret key
putSecret(akvName, secretName, secretValue, linkedService): puts AKV secret for a given akvName, secretName
putSecret(akvName, secretName, secretValue): puts AKV secret for a given akvName, secretName
putSecretWithLS(linkedService, secretName, secretValue): puts AKV secret for a given linked service, secretName
Jogkivonat lekérése
Microsoft Entra-jogkivonatot ad vissza egy adott célközönség számára, név (nem kötelező). Az alábbi táblázat felsorolja az összes elérhető célközönségtípust:
Célközönség típusa | AZ API-hívásban használandó sztringkonstans |
---|---|
Azure Storage | Storage |
Azure Key Vault | Vault |
Azure-felügyelet | AzureManagement |
Azure SQL Data Warehouse (dedikált és kiszolgáló nélküli) | DW |
Azure Synapse | Synapse |
Azure Data Lake Store | DataLakeStore |
Azure Data Factory | ADF |
Azure Adatkezelő | AzureDataExplorer |
Azure Database for MySQL | AzureOSSDB |
Azure Database for MariaDB | AzureOSSDB |
Azure Database for PostgreSQL | AzureOSSDB |
mssparkutils.credentials.getToken('audience Key')
mssparkutils.credentials.getToken("audience Key")
Credentials.GetToken("audience Key")
mssparkutils.credentials.getToken('audience Key')
Jogkivonat érvényesítése
Igaz értéket ad vissza, ha a jogkivonat nem járt le.
mssparkutils.credentials.isValidToken('your token')
mssparkutils.credentials.isValidToken("your token")
Credentials.IsValidToken("your token")
mssparkutils.credentials.isValidToken('your token')
Csatolt szolgáltatás kapcsolati sztring vagy hitelesítő adatainak lekérése
A társított szolgáltatáshoz tartozó kapcsolati sztring vagy hitelesítő adatokat adja vissza.
mssparkutils.credentials.getConnectionStringOrCreds('linked service name')
mssparkutils.credentials.getConnectionStringOrCreds("linked service name")
Credentials.GetConnectionStringOrCreds("linked service name")
mssparkutils.credentials.getConnectionStringOrCreds('linked service name')
Titkos kulcs lekérése munkaterületi identitással
Egy adott Azure Key Vault-névhez, titkos kódnévhez és társított szolgáltatásnévhez tartozó Azure Key Vault-titkos kulcsot ad vissza munkaterületi identitás használatával. Győződjön meg arról, hogy megfelelően konfigurálja az Azure Key Vaulthoz való hozzáférést.
mssparkutils.credentials.getSecret('azure key vault name','secret name','linked service name')
mssparkutils.credentials.getSecret("azure key vault name","secret name","linked service name")
Credentials.GetSecret("azure key vault name","secret name","linked service name")
mssparkutils.credentials.getSecret('azure key vault name','secret name','linked service name')
Titkos kulcs lekérése felhasználói hitelesítő adatokkal
Egy adott Azure Key Vault-névhez, titkos névhez és társított szolgáltatásnévhez tartozó Azure Key Vault-titkos kulcsot ad vissza felhasználói hitelesítő adatok használatával.
mssparkutils.credentials.getSecret('azure key vault name','secret name')
mssparkutils.credentials.getSecret("azure key vault name","secret name")
Credentials.GetSecret("azure key vault name","secret name")
mssparkutils.credentials.getSecret('azure key vault name','secret name')
Titkos kód elhelyezése munkaterületi identitással
Az Azure Key Vault egy adott Azure Key Vault-névhez, titkos névhez és társított szolgáltatásnévhez helyezi el az Azure Key Vault titkos kulcsát a munkaterületi identitás használatával. Győződjön meg arról, hogy megfelelően konfigurálja az Azure Key Vaulthoz való hozzáférést.
mssparkutils.credentials.putSecret('azure key vault name','secret name','secret value','linked service name')
Titkos kód elhelyezése munkaterületi identitással
Az Azure Key Vault egy adott Azure Key Vault-névhez, titkos névhez és társított szolgáltatásnévhez helyezi el az Azure Key Vault titkos kulcsát a munkaterületi identitás használatával. Győződjön meg arról, hogy megfelelően konfigurálja az Azure Key Vaulthoz való hozzáférést.
mssparkutils.credentials.putSecret("azure key vault name","secret name","secret value","linked service name")
Titkos kód elhelyezése munkaterületi identitással
Az Azure Key Vault egy adott Azure Key Vault-névhez, titkos névhez és társított szolgáltatásnévhez helyezi el az Azure Key Vault titkos kulcsát a munkaterületi identitás használatával. Győződjön meg arról, hogy megfelelően konfigurálja az Azure Key Vaulthoz való hozzáférést.
mssparkutils.credentials.putSecret('azure key vault name','secret name','secret value','linked service name')
Titkos kód elhelyezése felhasználói hitelesítő adatokkal
Az Azure Key Vault egy adott Azure Key Vault-névhez, titkos névhez és társított szolgáltatásnévhez helyezi el az Azure Key Vault titkos kulcsait a felhasználói hitelesítő adatok használatával.
mssparkutils.credentials.putSecret('azure key vault name','secret name','secret value')
Titkos kód elhelyezése felhasználói hitelesítő adatokkal
Az Azure Key Vault egy adott Azure Key Vault-névhez, titkos névhez és társított szolgáltatásnévhez helyezi el az Azure Key Vault titkos kulcsait a felhasználói hitelesítő adatok használatával.
mssparkutils.credentials.putSecret('azure key vault name','secret name','secret value')
Titkos kód elhelyezése felhasználói hitelesítő adatokkal
Az Azure Key Vault egy adott Azure Key Vault-névhez, titkos névhez és társított szolgáltatásnévhez helyezi el az Azure Key Vault titkos kulcsait a felhasználói hitelesítő adatok használatával.
mssparkutils.credentials.putSecret("azure key vault name","secret name","secret value")
Környezeti segédprogramok
Futtassa az alábbi parancsokat az elérhető módszerek áttekintéséhez:
mssparkutils.env.help()
mssparkutils.env.help()
mssparkutils.env.help()
Env.Help()
Eredmény lekérése:
getUserName(): returns user name
getUserId(): returns unique user id
getJobId(): returns job id
getWorkspaceName(): returns workspace name
getPoolName(): returns Spark pool name
getClusterId(): returns cluster id
Felhasználónév lekérése
Az aktuális felhasználónevet adja vissza.
mssparkutils.env.getUserName()
mssparkutils.env.getUserName()
mssparkutils.env.getUserName()
Env.GetUserName()
Felhasználói azonosító lekérése
Az aktuális felhasználói azonosítót adja vissza.
mssparkutils.env.getUserId()
mssparkutils.env.getUserId()
mssparkutils.env.getUserId()
Env.GetUserId()
Feladatazonosító lekérése
Feladatazonosítót ad vissza.
mssparkutils.env.getJobId()
mssparkutils.env.getJobId()
mssparkutils.env.getJobId()
Env.GetJobId()
Munkaterület nevének lekérése
A munkaterület nevét adja vissza.
mssparkutils.env.getWorkspaceName()
mssparkutils.env.getWorkspaceName()
mssparkutils.env.getWorkspaceName()
Env.GetWorkspaceName()
Készletnév lekérése
A Spark-készlet nevét adja vissza.
mssparkutils.env.getPoolName()
mssparkutils.env.getPoolName()
mssparkutils.env.getPoolName()
Env.GetPoolName()
Fürtazonosító lekérése
Az aktuális fürtazonosítót adja vissza.
mssparkutils.env.getClusterId()
mssparkutils.env.getClusterId()
mssparkutils.env.getClusterId()
Env.GetClusterId()
Futtatókörnyezet környezete
Az Mssparkutils runtime utils 3 futtatókörnyezeti tulajdonságot adott meg, az mssparkutils futtatókörnyezettel lekérheti az alábbi tulajdonságokat:
- Jegyzetfüzetnév – Az aktuális jegyzetfüzet neve mindig az interaktív mód és a folyamat mód értékét adja vissza.
- Pipelinejobid – A folyamatfuttatás azonosítója folyamat módban adja vissza az értéket, és interaktív módban üres sztringet ad vissza.
- Activityrunid – A jegyzetfüzet tevékenységfuttatási azonosítója folyamat módban adja vissza az értéket, interaktív módban pedig üres sztringet ad vissza.
A futtatókörnyezet jelenleg a Pythont és a Scalát is támogatja.
mssparkutils.runtime.context
ctx <- mssparkutils.runtime.context()
for (key in ls(ctx)) {
writeLines(paste(key, ctx[[key]], sep = "\t"))
}
%%spark
mssparkutils.runtime.context
Munkamenet-kezelés
Interaktív munkamenet leállítása
Ahelyett, hogy manuálisan kattintanál a Leállítás gombra, néha kényelmesebb leállítani egy interaktív munkamenetet egy API meghívásával a kódban. Ilyen esetekben biztosítunk egy API-t mssparkutils.session.stop()
, amely támogatja az interaktív munkamenet kódon keresztüli leállítását, amely elérhető a Scala és a Python számára.
mssparkutils.session.stop()
mssparkutils.session.stop()
mssparkutils.session.stop()
mssparkutils.session.stop()
Az API a háttérben aszinkron módon állítja le az aktuális interaktív munkamenetet, leállítja a Spark-munkamenetet, és felszabadítja a munkamenet által foglalt erőforrásokat, hogy azok elérhetők legyenek az ugyanabban a készletben lévő többi munkamenet számára.
Feljegyzés
Nem javasoljuk a beépített API-k hívását, például sys.exit
a Scalában vagy sys.exit()
a Pythonban a kódban, mert az ilyen API-k egyszerűen megölik az értelmező folyamatot, így a Spark-munkamenet életben marad, és az erőforrások nem szabadulnak fel.
Csomagfüggőségek
Ha helyileg szeretne jegyzetfüzeteket vagy feladatokat fejleszteni, és az összeállításhoz/IDE-tippekhez szükséges megfelelő csomagokra kell hivatkoznia, az alábbi csomagokat használhatja.