Share via


A Microsoft Spark segédprogramok bemutatása

A Microsoft Spark Utilities (MSSparkUtils) egy beépített csomag, amely megkönnyíti a gyakori feladatok elvégzését. Az MSSparkUtils használatával fájlrendszerekkel dolgozhat, környezeti változókat kérhet le, jegyzetfüzeteket láncba rendezhet és titkos kódokkal dolgozhat. Az MSSparkUtils a , Scala, .NET Spark (C#)és jegyzetfüzetekben és R (Preview) Synapse-folyamatokban PySpark (Python)érhető el.

Előfeltételek

Az Azure Data Lake Storage Gen2-hez való hozzáférés konfigurálása

A Synapse-jegyzetfüzetek a Microsoft Entra-átengedéssel férnek hozzá az ADLS Gen2-fiókokhoz. Az ADLS Gen2-fiók (vagy mappa) eléréséhez storage blobadat-közreműködőnek kell lennie.

A Synapse-folyamatok a munkaterület felügyeltszolgáltatás-identitását (MSI) használják a tárfiókok eléréséhez. Az MSSparkUtils folyamattevékenységekben való használatához a munkaterületi identitásnak tárolóblobadat-közreműködőnek kell lennie az ADLS Gen2-fiók (vagy mappa) eléréséhez.

Az alábbi lépéseket követve győződjön meg arról, hogy a Microsoft Entra-azonosító és a munkaterület MSI-jének hozzáférése van az ADLS Gen2-fiókhoz:

  1. Nyissa meg az Azure Portalt és a elérni kívánt tárfiókot. Keresse meg azt a tárolót, amelyhez hozzá szeretne férni.

  2. Válassza ki a hozzáférés-vezérlést (IAM) a bal oldali panelen.

  3. Kattintson a Hozzáadás>Szerepkör-hozzárendelés hozzáadása lehetőségre a Szerepkör-hozzárendelés hozzáadása oldal megnyitásához.

  4. Rendelje hozzá a következő szerepkört. A részletes lépésekért tekintse meg az Azure-szerepköröknek az Azure Portalon történő hozzárendelését ismertető cikket.

    Beállítás Érték
    Szerepkör Storage blobadat-közreműködő
    Hozzáférés hozzárendelése a következőhöz: U Standard kiadás R és MANAGEDIDENTITY
    Tagok Microsoft Entra-fiókja és munkaterületi identitása

    Feljegyzés

    A felügyelt identitás neve a munkaterület neve is.

    Szerepkör-hozzárendelési oldal hozzáadása az Azure Portalon.

  5. Válassza a Mentés lehetőséget.

Az ADLS Gen2-ben a Synapse Spark segítségével az alábbi URL-címen érheti el az adatokat:

abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<path>

Az Azure Blob Storage-hoz való hozzáférés konfigurálása

A Synapse közös hozzáférésű jogosultságkód (SAS) használatával éri el az Azure Blob Storage-t. Az SAS-kulcsok kódban való felfedésének elkerülése érdekében javasoljuk, hogy hozzon létre egy új társított szolgáltatást a Synapse-munkaterületen a elérni kívánt Azure Blob Storage-fiókhoz.

Az alábbi lépésekkel új társított szolgáltatást vehet fel egy Azure Blob Storage-fiókhoz:

  1. Nyissa meg az Azure Synapse Studiót.
  2. A bal oldali panelen válassza a Kezelés lehetőséget, majd a Külső kapcsolatok területen válassza a Csatolt szolgáltatások lehetőséget.
  3. Keressen az Azure Blob Storage-ban a jobb oldali Új társított szolgáltatás panelen.
  4. Válassza a Folytatás lehetőséget.
  5. Válassza ki az Azure Blob Storage-fiókot a társított szolgáltatásnév eléréséhez és konfigurálásához. Javasoljuk, hogy használja az Account key for the Authentication metódust.
  6. Válassza a Kapcsolat tesztelése lehetőséget a beállítások helyességének ellenőrzéséhez.
  7. A módosítások mentéséhez válassza a Létrehozás elemet, majd kattintson az Összes közzététele gombra.

Az Azure Blob Storage-on tárolt adatokat a Synapse Spark használatával a következő URL-címen érheti el:

wasb[s]://<container_name>@<storage_account_name>.blob.core.windows.net/<path>

Íme egy példa kódra:

from pyspark.sql import SparkSession

# Azure storage access info
blob_account_name = 'Your account name' # replace with your blob name
blob_container_name = 'Your container name' # replace with your container name
blob_relative_path = 'Your path' # replace with your relative folder path
linked_service_name = 'Your linked service name' # replace with your linked service name

blob_sas_token = mssparkutils.credentials.getConnectionStringOrCreds(linked_service_name)

# Allow SPARK to access from Blob remotely

wasb_path = 'wasbs://%s@%s.blob.core.windows.net/%s' % (blob_container_name, blob_account_name, blob_relative_path)

spark.conf.set('fs.azure.sas.%s.%s.blob.core.windows.net' % (blob_container_name, blob_account_name), blob_sas_token)
print('Remote blob path: ' + wasb_path)
val blob_account_name = "" // replace with your blob name
val blob_container_name = "" //replace with your container name
val blob_relative_path = "/" //replace with your relative folder path
val linked_service_name = "" //replace with your linked service name


val blob_sas_token = mssparkutils.credentials.getConnectionStringOrCreds(linked_service_name)

val wasbs_path = f"wasbs://$blob_container_name@$blob_account_name.blob.core.windows.net/$blob_relative_path"
spark.conf.set(f"fs.azure.sas.$blob_container_name.$blob_account_name.blob.core.windows.net",blob_sas_token)

var blob_account_name = "";  // replace with your blob name
var blob_container_name = "";     // replace with your container name
var blob_relative_path = "";  // replace with your relative folder path
var linked_service_name = "";    // replace with your linked service name
var blob_sas_token = Credentials.GetConnectionStringOrCreds(linked_service_name);

spark.Conf().Set($"fs.azure.sas.{blob_container_name}.{blob_account_name}.blob.core.windows.net", blob_sas_token);

var wasbs_path = $"wasbs://{blob_container_name}@{blob_account_name}.blob.core.windows.net/{blob_relative_path}";

Console.WriteLine(wasbs_path);

# Azure storage access info
blob_account_name <- 'Your account name' # replace with your blob name
blob_container_name <- 'Your container name' # replace with your container name
blob_relative_path <- 'Your path' # replace with your relative folder path
linked_service_name <- 'Your linked service name' # replace with your linked service name

blob_sas_token <- mssparkutils.credentials.getConnectionStringOrCreds(linked_service_name)

# Allow SPARK to access from Blob remotely
sparkR.session()
wasb_path <- sprintf('wasbs://%s@%s.blob.core.windows.net/%s',blob_container_name, blob_account_name, blob_relative_path)
sparkR.session(sprintf('fs.azure.sas.%s.%s.blob.core.windows.net',blob_container_name, blob_account_name), blob_sas_token)

print( paste('Remote blob path: ',wasb_path))

Az Azure Key Vaulthoz való hozzáférés konfigurálása

Az Azure Key Vaultot csatolt szolgáltatásként is hozzáadhatja, hogy a Synapse-ban kezelje a hitelesítő adatait. Kövesse az alábbi lépéseket egy Azure Key Vault Synapse-társított szolgáltatásként való hozzáadásához:

  1. Nyissa meg az Azure Synapse Studiót.

  2. A bal oldali panelen válassza a Kezelés lehetőséget, majd a Külső kapcsolatok területen válassza a Csatolt szolgáltatások lehetőséget.

  3. Keressen az Azure Key Vaultban a jobb oldali Új társított szolgáltatás panelen.

  4. Válassza ki az Azure Key Vault-fiókot a társított szolgáltatásnév eléréséhez és konfigurálásához.

  5. Válassza a Kapcsolat tesztelése lehetőséget a beállítások helyességének ellenőrzéséhez.

  6. Válassza az Első létrehozás lehetőséget, majd kattintson az Összes közzététele gombra a módosítás mentéséhez.

A Synapse-jegyzetfüzetek a Microsoft Entra-átengedéssel érik el az Azure Key Vaultot. A Synapse-folyamatok munkaterületi identitást (MSI) használnak az Azure Key Vault eléréséhez. Annak érdekében, hogy a kód a jegyzetfüzetben és a Synapse-folyamatban is működjön, javasoljuk, hogy titkos hozzáférési engedélyt adjon a Microsoft Entra-fiókhoz és a munkaterület identitásához is.

Kövesse az alábbi lépéseket a munkaterület identitásához való titkos hozzáférés biztosításához:

  1. Nyissa meg az Azure Portalt és a elérni kívánt Azure Key Vaultot.
  2. Válassza ki az Access-szabályzatokat a bal oldali panelen.
  3. Válassza a Hozzáférési szabályzat hozzáadása lehetőséget:
    • Válassza a Kulcs, titkos kulcs és tanúsítványkezelés konfigurációs sablonként lehetőséget.
    • Válassza ki a Microsoft Entra-fiókját és a munkaterület identitását (ugyanaz, mint a munkaterület neve) az egyszerű kijelölésben, vagy győződjön meg arról, hogy már ki van rendelve.
  4. Válassza a Kijelölés és a Hozzáadás lehetőséget.
  5. A módosítások véglegesítéséhez kattintson a Mentés gombra.

Fájlrendszer-segédprogramok

mssparkutils.fs Segédprogramokat biztosít a különböző fájlrendszerek, például az Azure Data Lake Storage Gen2 (ADLS Gen2) és az Azure Blob Storage használatához. Győződjön meg arról, hogy megfelelően konfigurálja az Azure Data Lake Storage Gen2 és az Azure Blob Storage elérését.

Futtassa az alábbi parancsokat az elérhető módszerek áttekintéséhez:

from notebookutils import mssparkutils
mssparkutils.fs.help()
mssparkutils.fs.help()
using Microsoft.Spark.Extensions.Azure.Synapse.Analytics.Notebook.MSSparkUtils;
FS.Help()
library(notebookutils)
mssparkutils.fs.help()

Találatok:


mssparkutils.fs provides utilities for working with various FileSystems.

Below is overview about the available methods:

cp(from: String, to: String, recurse: Boolean = false): Boolean -> Copies a file or directory, possibly across FileSystems
mv(src: String, dest: String, create_path: Boolean = False, overwrite: Boolean = False): Boolean -> Moves a file or directory, possibly across FileSystems
ls(dir: String): Array -> Lists the contents of a directory
mkdirs(dir: String): Boolean -> Creates the given directory if it does not exist, also creating any necessary parent directories
put(file: String, contents: String, overwrite: Boolean = false): Boolean -> Writes the given String out to a file, encoded in UTF-8
head(file: String, maxBytes: int = 1024 * 100): String -> Returns up to the first 'maxBytes' bytes of the given file as a String encoded in UTF-8
append(file: String, content: String, createFileIfNotExists: Boolean): Boolean -> Append the content to a file
rm(dir: String, recurse: Boolean = false): Boolean -> Removes a file or directory

Use mssparkutils.fs.help("methodName") for more info about a method.

Fájlok listázása

Címtár tartalmának listázása.

mssparkutils.fs.ls('Your directory path')
mssparkutils.fs.ls("Your directory path")
FS.Ls("Your directory path")
mssparkutils.fs.ls("Your directory path")

Fájltulajdonságok megtekintése

Visszaadja a fájltulajdonságokat, beleértve a fájlnevet, a fájl elérési útját, a fájlméretet, a fájlmódosítás idejét, valamint azt, hogy könyvtárról és fájlról van-e szó.

files = mssparkutils.fs.ls('Your directory path')
for file in files:
    print(file.name, file.isDir, file.isFile, file.path, file.size, file.modifyTime)
val files = mssparkutils.fs.ls("/")
files.foreach{
    file => println(file.name,file.isDir,file.isFile,file.size,file.modifyTime)
}
var Files = FS.Ls("/");
foreach(var File in Files) {
    Console.WriteLine(File.Name+" "+File.IsDir+" "+File.IsFile+" "+File.Size);
}
files <- mssparkutils.fs.ls("/")
for (file in files) {
    writeLines(paste(file$name, file$isDir, file$isFile, file$size, file$modifyTime))
}

Új címtár létrehozása

Létrehozza a megadott könyvtárat, ha az nem létezik, és minden szükséges szülőkönyvtárat.

mssparkutils.fs.mkdirs('new directory name')
mssparkutils.fs.mkdirs("new directory name")
FS.Mkdirs("new directory name")
mssparkutils.fs.mkdirs("new directory name")

Fájl másolása

Fájl vagy könyvtár másolása. Támogatja a fájlrendszerek közötti másolást.

mssparkutils.fs.cp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively
mssparkutils.fs.cp("source file or directory", "destination file or directory", true) // Set the third parameter as True to copy all files and directories recursively
FS.Cp("source file or directory", "destination file or directory", true) // Set the third parameter as True to copy all files and directories recursively
mssparkutils.fs.cp('source file or directory', 'destination file or directory', True)

Performant copy file

Ezzel a módszerrel gyorsabban másolhat vagy áthelyezhet fájlokat, különösen nagy mennyiségű adatot.

mssparkutils.fs.fastcp('source file or directory', 'destination file or directory', True) # Set the third parameter as True to copy all files and directories recursively

Feljegyzés

A módszer csak az Apache Spark 3.3-hoz készült Azure Synapse Runtime-ban és az Apache Spark 3.4-hez készült Azure Synapse Runtime-ban támogatott.

Fájltartalom előnézete

Az UTF-8-ban kódolt sztringként visszaadja az adott fájl első "maxBytes" bájtját.

mssparkutils.fs.head('file path', maxBytes to read)
mssparkutils.fs.head("file path", maxBytes to read)
FS.Head("file path", maxBytes to read)
mssparkutils.fs.head('file path', maxBytes to read)

Fájl áthelyezése

Fájl vagy könyvtár áthelyezése. Támogatja a fájlrendszerek közötti áthelyezést.

mssparkutils.fs.mv('source file or directory', 'destination directory', True) # Set the last parameter as True to firstly create the parent directory if it does not exist
mssparkutils.fs.mv("source file or directory", "destination directory", true) // Set the last parameter as True to firstly create the parent directory if it does not exist
FS.Mv("source file or directory", "destination directory", true)
mssparkutils.fs.mv('source file or directory', 'destination directory', True) # Set the last parameter as True to firstly create the parent directory if it does not exist

Fájl írása

Az adott sztringet egy UTF-8 kóddal kódolt fájlba írja ki.

mssparkutils.fs.put("file path", "content to write", True) # Set the last parameter as True to overwrite the file if it existed already
mssparkutils.fs.put("file path", "content to write", true) // Set the last parameter as True to overwrite the file if it existed already
FS.Put("file path", "content to write", true) // Set the last parameter as True to overwrite the file if it existed already
mssparkutils.fs.put("file path", "content to write", True) # Set the last parameter as True to overwrite the file if it existed already

Tartalom hozzáfűzése fájlhoz

Hozzáfűzi a megadott sztringet egy UTF-8 kódolt fájlhoz.

mssparkutils.fs.append("file path", "content to append", True) # Set the last parameter as True to create the file if it does not exist
mssparkutils.fs.append("file path","content to append",true) // Set the last parameter as True to create the file if it does not exist
FS.Append("file path", "content to append", true) // Set the last parameter as True to create the file if it does not exist
mssparkutils.fs.append("file path", "content to append", True) # Set the last parameter as True to create the file if it does not exist

Fájl vagy könyvtár törlése

Fájl vagy könyvtár eltávolítása.

mssparkutils.fs.rm('file path', True) # Set the last parameter as True to remove all files and directories recursively
mssparkutils.fs.rm("file path", true) // Set the last parameter as True to remove all files and directories recursively
FS.Rm("file path", true) // Set the last parameter as True to remove all files and directories recursively
mssparkutils.fs.rm('file path', True) # Set the last parameter as True to remove all files and directories recursively

Jegyzetfüzet-segédprogramok

Nem támogatott.

Az MSSparkUtils Notebook Segédprogrammal futtathat egy jegyzetfüzetet, vagy kiléphet egy értékekkel rendelkező jegyzetfüzetből. Futtassa a következő parancsot az elérhető módszerek áttekintéséhez:

mssparkutils.notebook.help()

Eredmények lekérése:

The notebook module.

exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map): String -> This method runs a notebook and returns its exit value.

Feljegyzés

A jegyzetfüzet-segédprogramok nem alkalmazhatók az Apache Spark-feladatdefiníciókra (SJD).

Jegyzetfüzet hivatkozása

Hivatkozzon egy jegyzetfüzetre, és adja vissza a kilépési értékét. A beágyazott függvényhívásokat interaktívan vagy folyamatként is futtathatja egy jegyzetfüzetben. A hivatkozott jegyzetfüzet azon a Spark-készleten fut, amelynek a jegyzetfüzete meghívja ezt a függvényt.


mssparkutils.notebook.run("notebook path", <timeoutSeconds>, <parameterMap>)

Példa:

mssparkutils.notebook.run("folder/Sample1", 90, {"input": 20 })

A futtatás befejezése után megjelenik egy "Jegyzetfüzet futtatása megtekintése: Jegyzetfüzet neve" nevű pillanatkép-hivatkozás a cellakimenetben, a hivatkozásra kattintva megtekintheti az adott futtatás pillanatképét.

Képernyőkép a snap link pythonról

Hivatkozás több jegyzetfüzet párhuzamos futtatására

A módszer mssparkutils.notebook.runMultiple() lehetővé teszi több jegyzetfüzet párhuzamos vagy előre definiált topológiai szerkezettel történő futtatását. Az API egy Spark-munkameneten belül többszálas implementációs mechanizmust használ, ami azt jelenti, hogy a számítási erőforrásokat a referenciajegyzetfüzet futtatja.

A következőkkel mssparkutils.notebook.runMultiple():

  • Egyszerre több jegyzetfüzet végrehajtása anélkül, hogy mindegyik befejeződik.

  • Egyszerű JSON-formátum használatával adja meg a jegyzetfüzetek függőségeit és végrehajtási sorrendjét.

  • Optimalizálhatja a Spark számítási erőforrásainak használatát, és csökkentheti a Synapse-projektek költségeit.

  • A kimenetben megtekintheti az egyes jegyzetfüzet-futtatási rekordok pillanatképeit, és kényelmesen hibakeresést/monitorozást végezhet a jegyzetfüzet-feladatokban.

  • Szerezze be az egyes vezetői tevékenységek kilépési értékét, és használja őket az alsóbb rétegbeli feladatokban.

Az mssparkutils.notebook.help("runMultiple") futtatásával is megkeresheti a példát és a részletes használatot.

Íme egy egyszerű példa a jegyzetfüzetek listájának párhuzamos futtatására ezzel a módszerrel:


mssparkutils.notebook.runMultiple(["NotebookSimple", "NotebookSimple2"])

A gyökérjegyzetfüzet végrehajtási eredménye a következő:

Képernyőkép a jegyzetfüzetek listájáról.

Az alábbi példa a topológiai struktúrával rendelkező jegyzetfüzetek futtatására mutat mssparkutils.notebook.runMultiple()be. Ezzel a módszerrel könnyedén vezényelheti a jegyzetfüzeteket egy kódélményen keresztül.

# run multiple notebooks with parameters
DAG = {
    "activities": [
        {
            "name": "NotebookSimple", # activity name, must be unique
            "path": "NotebookSimple", # notebook path
            "timeoutPerCellInSeconds": 90, # max timeout for each cell, default to 90 seconds
            "args": {"p1": "changed value", "p2": 100}, # notebook parameters
        },
        {
            "name": "NotebookSimple2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 2", "p2": 200}
        },
        {
            "name": "NotebookSimple2.2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 3", "p2": 300},
            "retry": 1,
            "retryIntervalInSeconds": 10,
            "dependencies": ["NotebookSimple"] # list of activity names that this activity depends on
        }
    ]
}
mssparkutils.notebook.runMultiple(DAG)

Feljegyzés

  • A módszer csak az Apache Spark 3.3-hoz készült Azure Synapse Runtime-ban és az Apache Spark 3.4-hez készült Azure Synapse Runtime-ban támogatott.
  • A több jegyzetfüzet-futtatás párhuzamossági foka a Spark-munkamenet teljes rendelkezésre álló számítási erőforrására korlátozódik.

Jegyzetfüzet kilépése

Kilép egy értékekkel rendelkező jegyzetfüzetből. A beágyazott függvényhívásokat interaktívan vagy folyamatként is futtathatja egy jegyzetfüzetben.

  • Amikor interaktívan meghív egy exit() függvényt egy jegyzetfüzetből, az Azure Synapse kivételt jelez, kihagyja az alhálózati cellák futtatását, és életben tartja a Spark-munkamenetet.

  • Amikor egy Synapse-folyamatban függvényt exit() meghívó jegyzetfüzetet vezényel, az Azure Synapse visszaad egy kilépési értéket, befejezi a folyamatfuttatást, és leállítja a Spark-munkamenetet.

  • Amikor meghív egy függvényt egy exit() hivatkozott jegyzetfüzetben, az Azure Synapse leállítja a hivatkozott jegyzetfüzet további végrehajtását, és folytatja a függvényt hívó run() jegyzetfüzet következő celláinak futtatását. Például: A Jegyzetfüzet1 három cellával rendelkezik, és meghív egy függvényt exit() a második cellában. A Jegyzetfüzet2 öt cellával és hívásokkal run(notebook1) rendelkezik a harmadik cellában. A Notebook2 futtatásakor a Jegyzetfüzet1 a második cellánál lesz leállítva, amikor eléri a függvényt exit() . A Notebook2 továbbra is futtatja a negyedik és az ötödik celláját.

mssparkutils.notebook.exit("value string")

Példa:

Az 1 . mintajegyzetfüzet a mappa alatt található, és a következő két cellát tartalmazza:

  • az 1. cella egy bemeneti paramétert határoz meg, amelynek alapértelmezett értéke 10.
  • a 2. cella kilépési értékkel lép ki a jegyzetfüzetből.

Képernyőkép egy mintajegyzetfüzetről

A Minta1 egy másik, alapértelmezett értékekkel rendelkező jegyzetfüzetben is futtatható:


exitVal = mssparkutils.notebook.run("folder/Sample1")
print (exitVal)

Találatok:

Sample1 run success with input is 10

A Minta1 egy másik jegyzetfüzetben is futtatható, és a bemeneti érték 20 lehet:

exitVal = mssparkutils.notebook.run("mssparkutils/folder/Sample1", 90, {"input": 20 })
print (exitVal)

Találatok:

Sample1 run success with input is 20

Az MSSparkUtils Notebook Segédprogrammal futtathat egy jegyzetfüzetet, vagy kiléphet egy értékekkel rendelkező jegyzetfüzetből. Futtassa a következő parancsot az elérhető módszerek áttekintéséhez:

mssparkutils.notebook.help()

Eredmények lekérése:

The notebook module.

exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map): String -> This method runs a notebook and returns its exit value.

Jegyzetfüzet hivatkozása

Hivatkozzon egy jegyzetfüzetre, és adja vissza a kilépési értékét. A beágyazott függvényhívásokat interaktívan vagy folyamatként is futtathatja egy jegyzetfüzetben. A hivatkozott jegyzetfüzet azon a Spark-készleten fut, amelynek a jegyzetfüzete meghívja ezt a függvényt.


mssparkutils.notebook.run("notebook path", <timeoutSeconds>, <parameterMap>)

Példa:

mssparkutils.notebook.run("folder/Sample1", 90, Map("input" -> 20))

A futtatás befejezése után megjelenik egy "Jegyzetfüzet futtatása megtekintése: Jegyzetfüzet neve" nevű pillanatkép-hivatkozás a cellakimenetben, a hivatkozásra kattintva megtekintheti az adott futtatás pillanatképét.

Képernyőkép a snap link scala-ról

Jegyzetfüzet kilépése

Kilép egy értékekkel rendelkező jegyzetfüzetből. A beágyazott függvényhívásokat interaktívan vagy folyamatként is futtathatja egy jegyzetfüzetben.

  • Amikor interaktívan meghív egy exit() függvényt egy jegyzetfüzetnek, az Azure Synapse kivételt fog kivenni, kihagyja az alhálózati cellák futtatását, és életben tartja a Spark-munkamenetet.

  • Amikor egy Synapse-folyamatban függvényt exit() meghívó jegyzetfüzetet vezényel, az Azure Synapse visszaad egy kilépési értéket, befejezi a folyamatfuttatást, és leállítja a Spark-munkamenetet.

  • Amikor meghív egy függvényt egy exit() hivatkozott jegyzetfüzetben, az Azure Synapse leállítja a hivatkozott jegyzetfüzet további végrehajtását, és folytatja a függvényt hívó run() jegyzetfüzet következő celláinak futtatását. Például: A Jegyzetfüzet1 három cellával rendelkezik, és meghív egy függvényt exit() a második cellában. A Jegyzetfüzet2 öt cellával és hívásokkal run(notebook1) rendelkezik a harmadik cellában. A Notebook2 futtatásakor a Jegyzetfüzet1 a második cellánál lesz leállítva, amikor eléri a függvényt exit() . A Notebook2 továbbra is futtatja a negyedik és az ötödik celláját.

mssparkutils.notebook.exit("value string")

Példa:

Az 1. mintajegyzetfüzet az mssparkutils/folder/ alatt található, a következő két cellával:

  • az 1. cella egy bemeneti paramétert határoz meg, amelynek alapértelmezett értéke 10.
  • a 2. cella kilépési értékkel lép ki a jegyzetfüzetből.

Képernyőkép egy mintajegyzetfüzetről

A Minta1 egy másik, alapértelmezett értékekkel rendelkező jegyzetfüzetben is futtatható:


val exitVal = mssparkutils.notebook.run("mssparkutils/folder/Sample1")
print(exitVal)

Találatok:

exitVal: String = Sample1 run success with input is 10
Sample1 run success with input is 10

A Minta1 egy másik jegyzetfüzetben is futtatható, és a bemeneti érték 20 lehet:

val exitVal = mssparkutils.notebook.run("mssparkutils/folder/Sample1", 90, {"input": 20 })
print(exitVal)

Találatok:

exitVal: String = Sample1 run success with input is 20
Sample1 run success with input is 20

Az MSSparkUtils Notebook Segédprogrammal futtathat egy jegyzetfüzetet, vagy kiléphet egy értékekkel rendelkező jegyzetfüzetből. Futtassa a következő parancsot az elérhető módszerek áttekintéséhez:

mssparkutils.notebook.help()

Eredmények lekérése:

The notebook module.

exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map): String -> This method runs a notebook and returns its exit value.

Jegyzetfüzet hivatkozása

Hivatkozzon egy jegyzetfüzetre, és adja vissza a kilépési értékét. A beágyazott függvényhívásokat interaktívan vagy folyamatként is futtathatja egy jegyzetfüzetben. A hivatkozott jegyzetfüzet azon a Spark-készleten fut, amelynek a jegyzetfüzete meghívja ezt a függvényt.


mssparkutils.notebook.run("notebook path", <timeoutSeconds>, <parameterMap>)

Példa:

mssparkutils.notebook.run("folder/Sample1", 90, list("input": 20))

A futtatás befejezése után megjelenik egy "Jegyzetfüzet futtatása megtekintése: Jegyzetfüzet neve" nevű pillanatkép-hivatkozás a cellakimenetben, a hivatkozásra kattintva megtekintheti az adott futtatás pillanatképét.

Jegyzetfüzet kilépése

Kilép egy értékekkel rendelkező jegyzetfüzetből. A beágyazott függvényhívásokat interaktívan vagy folyamatként is futtathatja egy jegyzetfüzetben.

  • Amikor interaktívan meghív egy exit() függvényt egy jegyzetfüzetnek, az Azure Synapse kivételt fog kivenni, kihagyja az alhálózati cellák futtatását, és életben tartja a Spark-munkamenetet.

  • Amikor egy Synapse-folyamatban függvényt exit() meghívó jegyzetfüzetet vezényel, az Azure Synapse visszaad egy kilépési értéket, befejezi a folyamatfuttatást, és leállítja a Spark-munkamenetet.

  • Amikor meghív egy függvényt egy exit() hivatkozott jegyzetfüzetben, az Azure Synapse leállítja a hivatkozott jegyzetfüzet további végrehajtását, és folytatja a függvényt hívó run() jegyzetfüzet következő celláinak futtatását. Például: A Jegyzetfüzet1 három cellával rendelkezik, és meghív egy függvényt exit() a második cellában. A Jegyzetfüzet2 öt cellával és hívásokkal run(notebook1) rendelkezik a harmadik cellában. A Notebook2 futtatásakor a Jegyzetfüzet1 a második cellánál lesz leállítva, amikor eléri a függvényt exit() . A Notebook2 továbbra is futtatja a negyedik és az ötödik celláját.

mssparkutils.notebook.exit("value string")

Példa:

Az 1 . mintajegyzetfüzet a mappa alatt található, és a következő két cellát tartalmazza:

  • az 1. cella egy bemeneti paramétert határoz meg, amelynek alapértelmezett értéke 10.
  • a 2. cella kilépési értékkel lép ki a jegyzetfüzetből.

Képernyőkép egy mintajegyzetfüzetről

A Minta1 egy másik, alapértelmezett értékekkel rendelkező jegyzetfüzetben is futtatható:


exitVal <- mssparkutils.notebook.run("folder/Sample1")
print (exitVal)

Találatok:

Sample1 run success with input is 10

A Minta1 egy másik jegyzetfüzetben is futtatható, és a bemeneti érték 20 lehet:

exitVal <- mssparkutils.notebook.run("mssparkutils/folder/Sample1", 90, list("input": 20))
print (exitVal)

Találatok:

Sample1 run success with input is 20

Hitelesítő adatok segédprogramok

Az MSSparkUtils hitelesítő adatok segédprogramjaival lekérheti a társított szolgáltatások hozzáférési jogkivonatait, és kezelheti a titkos kulcsokat az Azure Key Vaultban.

Futtassa a következő parancsot az elérhető módszerek áttekintéséhez:

mssparkutils.credentials.help()
mssparkutils.credentials.help()
Not supported.
mssparkutils.credentials.help()

Eredmény lekérése:

getToken(audience, name): returns AAD token for a given audience, name (optional)
isValidToken(token): returns true if token hasn't expired
getConnectionStringOrCreds(linkedService): returns connection string or credentials for linked service
getFullConnectionString(linkedService): returns full connection string with credentials
getPropertiesAll(linkedService): returns all the properties of a linked servicegetSecret(akvName, secret, linkedService): returns AKV secret for a given AKV linked service, akvName, secret key
getSecret(akvName, secret): returns AKV secret for a given akvName, secret key
getSecretWithLS(linkedService, secret): returns AKV secret for a given linked service, secret key
putSecret(akvName, secretName, secretValue, linkedService): puts AKV secret for a given akvName, secretName
putSecret(akvName, secretName, secretValue): puts AKV secret for a given akvName, secretName
putSecretWithLS(linkedService, secretName, secretValue): puts AKV secret for a given linked service, secretName
getToken(audience, name): returns AAD token for a given audience, name (optional)
isValidToken(token): returns true if token hasn't expired
getConnectionStringOrCreds(linkedService): returns connection string or credentials for linked service
getFullConnectionString(linkedService): returns full connection string with credentials
getPropertiesAll(linkedService): returns all the properties of a linked servicegetSecret(akvName, secret, linkedService): returns AKV secret for a given AKV linked service, akvName, secret key
getSecret(akvName, secret): returns AKV secret for a given akvName, secret key
getSecretWithLS(linkedService, secret): returns AKV secret for a given linked service, secret key
putSecret(akvName, secretName, secretValue, linkedService): puts AKV secret for a given akvName, secretName
putSecret(akvName, secretName, secretValue): puts AKV secret for a given akvName, secretName
putSecretWithLS(linkedService, secretName, secretValue): puts AKV secret for a given linked service, secretName
getToken(audience, name): returns AAD token for a given audience, name (optional)
isValidToken(token): returns true if token hasn't expired
getConnectionStringOrCreds(linkedService): returns connection string or credentials for linked service
getFullConnectionString(linkedService): returns full connection string with credentials
getPropertiesAll(linkedService): returns all the properties of a linked servicegetSecret(akvName, secret, linkedService): returns AKV secret for a given AKV linked service, akvName, secret key
getSecret(akvName, secret): returns AKV secret for a given akvName, secret key
putSecret(akvName, secretName, secretValue, linkedService): puts AKV secret for a given akvName, secretName
putSecret(akvName, secretName, secretValue): puts AKV secret for a given akvName, secretName
putSecretWithLS(linkedService, secretName, secretValue): puts AKV secret for a given linked service, secretName

Feljegyzés

A getSecretWithLS(linkedService, secret) jelenleg nem támogatott a C#-ban.

getToken(audience, name): returns AAD token for a given audience, name (optional)
isValidToken(token): returns true if token hasn't expired
getConnectionStringOrCreds(linkedService): returns connection string or credentials for linked service
getFullConnectionString(linkedService): returns full connection string with credentials
getPropertiesAll(linkedService): returns all the properties of a linked servicegetSecret(akvName, secret, linkedService): returns AKV secret for a given AKV linked service, akvName, secret key
getSecret(akvName, secret): returns AKV secret for a given akvName, secret key
getSecretWithLS(linkedService, secret): returns AKV secret for a given linked service, secret key
putSecret(akvName, secretName, secretValue, linkedService): puts AKV secret for a given akvName, secretName
putSecret(akvName, secretName, secretValue): puts AKV secret for a given akvName, secretName
putSecretWithLS(linkedService, secretName, secretValue): puts AKV secret for a given linked service, secretName

Jogkivonat lekérése

Microsoft Entra-jogkivonatot ad vissza egy adott célközönség számára, név (nem kötelező). Az alábbi táblázat felsorolja az összes elérhető célközönségtípust:

Célközönség típusa AZ API-hívásban használandó sztringkonstans
Azure Storage Storage
Azure Key Vault Vault
Azure-felügyelet AzureManagement
Azure SQL Data Warehouse (dedikált és kiszolgáló nélküli) DW
Azure Synapse Synapse
Azure Data Lake Store DataLakeStore
Azure Data Factory ADF
Azure Data Explorer AzureDataExplorer
Azure Database for MySQL AzureOSSDB
Azure Database for MariaDB AzureOSSDB
Azure Database for PostgreSQL AzureOSSDB
mssparkutils.credentials.getToken('audience Key')
mssparkutils.credentials.getToken("audience Key")
Credentials.GetToken("audience Key")
mssparkutils.credentials.getToken('audience Key')

Jogkivonat érvényesítése

Igaz értéket ad vissza, ha a jogkivonat nem járt le.

mssparkutils.credentials.isValidToken('your token')
mssparkutils.credentials.isValidToken("your token")
Credentials.IsValidToken("your token")
mssparkutils.credentials.isValidToken('your token')

Csatolt szolgáltatás kapcsolati sztring vagy hitelesítő adatainak lekérése

A társított szolgáltatáshoz tartozó kapcsolati sztring vagy hitelesítő adatokat adja vissza.

mssparkutils.credentials.getConnectionStringOrCreds('linked service name')
mssparkutils.credentials.getConnectionStringOrCreds("linked service name")
Credentials.GetConnectionStringOrCreds("linked service name")
mssparkutils.credentials.getConnectionStringOrCreds('linked service name')

Titkos kulcs lekérése munkaterületi identitással

Egy adott Azure Key Vault-névhez, titkos kódnévhez és társított szolgáltatásnévhez tartozó Azure Key Vault-titkos kulcsot ad vissza munkaterületi identitás használatával. Győződjön meg arról, hogy megfelelően konfigurálja az Azure Key Vaulthoz való hozzáférést.

mssparkutils.credentials.getSecret('azure key vault name','secret name','linked service name')
mssparkutils.credentials.getSecret("azure key vault name","secret name","linked service name")
Credentials.GetSecret("azure key vault name","secret name","linked service name")
mssparkutils.credentials.getSecret('azure key vault name','secret name','linked service name')

Titkos kulcs lekérése felhasználói hitelesítő adatokkal

Egy adott Azure Key Vault-névhez, titkos névhez és társított szolgáltatásnévhez tartozó Azure Key Vault-titkos kulcsot ad vissza felhasználói hitelesítő adatok használatával.

mssparkutils.credentials.getSecret('azure key vault name','secret name')
mssparkutils.credentials.getSecret("azure key vault name","secret name")
Credentials.GetSecret("azure key vault name","secret name")
mssparkutils.credentials.getSecret('azure key vault name','secret name')

Titkos kód elhelyezése munkaterületi identitással

Az Azure Key Vault egy adott Azure Key Vault-névhez, titkos névhez és társított szolgáltatásnévhez helyezi el az Azure Key Vault titkos kulcsát a munkaterületi identitás használatával. Győződjön meg arról, hogy megfelelően konfigurálja az Azure Key Vaulthoz való hozzáférést.

mssparkutils.credentials.putSecret('azure key vault name','secret name','secret value','linked service name')

Titkos kód elhelyezése munkaterületi identitással

Az Azure Key Vault egy adott Azure Key Vault-névhez, titkos névhez és társított szolgáltatásnévhez helyezi el az Azure Key Vault titkos kulcsát a munkaterületi identitás használatával. Győződjön meg arról, hogy megfelelően konfigurálja az Azure Key Vaulthoz való hozzáférést.

mssparkutils.credentials.putSecret("azure key vault name","secret name","secret value","linked service name")

Titkos kód elhelyezése munkaterületi identitással

Az Azure Key Vault egy adott Azure Key Vault-névhez, titkos névhez és társított szolgáltatásnévhez helyezi el az Azure Key Vault titkos kulcsát a munkaterületi identitás használatával. Győződjön meg arról, hogy megfelelően konfigurálja az Azure Key Vaulthoz való hozzáférést.

mssparkutils.credentials.putSecret('azure key vault name','secret name','secret value','linked service name')

Titkos kód elhelyezése felhasználói hitelesítő adatokkal

Az Azure Key Vault egy adott Azure Key Vault-névhez, titkos névhez és társított szolgáltatásnévhez helyezi el az Azure Key Vault titkos kulcsait a felhasználói hitelesítő adatok használatával.

mssparkutils.credentials.putSecret('azure key vault name','secret name','secret value')

Titkos kód elhelyezése felhasználói hitelesítő adatokkal

Az Azure Key Vault egy adott Azure Key Vault-névhez, titkos névhez és társított szolgáltatásnévhez helyezi el az Azure Key Vault titkos kulcsait a felhasználói hitelesítő adatok használatával.

mssparkutils.credentials.putSecret('azure key vault name','secret name','secret value')

Titkos kód elhelyezése felhasználói hitelesítő adatokkal

Az Azure Key Vault egy adott Azure Key Vault-névhez, titkos névhez és társított szolgáltatásnévhez helyezi el az Azure Key Vault titkos kulcsait a felhasználói hitelesítő adatok használatával.

mssparkutils.credentials.putSecret("azure key vault name","secret name","secret value")

Környezeti segédprogramok

Futtassa az alábbi parancsokat az elérhető módszerek áttekintéséhez:

mssparkutils.env.help()
mssparkutils.env.help()
mssparkutils.env.help()
Env.Help()

Eredmény lekérése:

getUserName(): returns user name
getUserId(): returns unique user id
getJobId(): returns job id
getWorkspaceName(): returns workspace name
getPoolName(): returns Spark pool name
getClusterId(): returns cluster id

Felhasználónév lekérése

Az aktuális felhasználónevet adja vissza.

mssparkutils.env.getUserName()
mssparkutils.env.getUserName()
mssparkutils.env.getUserName()
Env.GetUserName()

Felhasználói azonosító lekérése

Az aktuális felhasználói azonosítót adja vissza.

mssparkutils.env.getUserId()
mssparkutils.env.getUserId()
mssparkutils.env.getUserId()
Env.GetUserId()

Feladatazonosító lekérése

Feladatazonosítót ad vissza.

mssparkutils.env.getJobId()
mssparkutils.env.getJobId()
mssparkutils.env.getJobId()
Env.GetJobId()

Munkaterület nevének lekérése

A munkaterület nevét adja vissza.

mssparkutils.env.getWorkspaceName()
mssparkutils.env.getWorkspaceName()
mssparkutils.env.getWorkspaceName()
Env.GetWorkspaceName()

Készletnév lekérése

A Spark-készlet nevét adja vissza.

mssparkutils.env.getPoolName()
mssparkutils.env.getPoolName()
mssparkutils.env.getPoolName()
Env.GetPoolName()

Fürtazonosító lekérése

Az aktuális fürtazonosítót adja vissza.

mssparkutils.env.getClusterId()
mssparkutils.env.getClusterId()
mssparkutils.env.getClusterId()
Env.GetClusterId()

Futtatókörnyezet környezete

Az Mssparkutils runtime utils 3 futtatókörnyezeti tulajdonságot adott meg, az mssparkutils futtatókörnyezettel lekérheti az alábbi tulajdonságokat:

  • Jegyzetfüzetnév – Az aktuális jegyzetfüzet neve mindig az interaktív mód és a folyamat mód értékét adja vissza.
  • Pipelinejobid – A folyamatfuttatás azonosítója folyamat módban adja vissza az értéket, és interaktív módban üres sztringet ad vissza.
  • Activityrunid – A jegyzetfüzet tevékenységfuttatási azonosítója folyamat módban adja vissza az értéket, interaktív módban pedig üres sztringet ad vissza.

A futtatókörnyezet jelenleg a Pythont és a Scalát is támogatja.

mssparkutils.runtime.context
ctx <- mssparkutils.runtime.context()
for (key in ls(ctx)) {
    writeLines(paste(key, ctx[[key]], sep = "\t"))
}
%%spark
mssparkutils.runtime.context

Munkamenet-kezelés

Interaktív munkamenet leállítása

Ahelyett, hogy manuálisan kattintanál a Leállítás gombra, néha kényelmesebb leállítani egy interaktív munkamenetet egy API meghívásával a kódban. Ilyen esetekben biztosítunk egy API-t mssparkutils.session.stop() , amely támogatja az interaktív munkamenet kódon keresztüli leállítását, amely elérhető a Scala és a Python számára.

mssparkutils.session.stop()
mssparkutils.session.stop()
mssparkutils.session.stop()

mssparkutils.session.stop() Az API a háttérben aszinkron módon állítja le az aktuális interaktív munkamenetet, leállítja a Spark-munkamenetet, és felszabadítja a munkamenet által foglalt erőforrásokat, hogy azok elérhetők legyenek az ugyanabban a készletben lévő többi munkamenet számára.

Feljegyzés

Nem javasoljuk a beépített API-k hívását, például sys.exit a Scalában vagy sys.exit() a Pythonban a kódban, mert az ilyen API-k egyszerűen megölik az értelmező folyamatot, így a Spark-munkamenet életben marad, és az erőforrások nem szabadulnak fel.

Csomagfüggőségek

Ha helyileg szeretne jegyzetfüzeteket vagy feladatokat fejleszteni, és az összeállításhoz/IDE-tippekhez szükséges megfelelő csomagokra kell hivatkoznia, az alábbi csomagokat használhatja.

Következő lépések