Поделиться через


Общие сведения о Microsoft Spark Utilities

Microsoft Spark Utilities (MSSparkUtils) — встроенный пакет, который упрощает выполнение стандартных задач. С помощью MSSparkUtils можно работать с файловыми системами и секретами, получать переменные среды и связывать записные книжки. MSSparkUtils доступны в PySpark (Python), Scala.NET Spark (C#)и записных книжках и R (Preview) конвейерах Synapse.

Предварительные требования

Настройка доступа к Azure Data Lake Storage 2-го поколения

Записные книжки Synapse используют сквозную передачу Microsoft Entra для доступа к учетным записям ADLS 2-го поколения. Для доступа к учетной записи (или папке) ADLS 2-го поколения необходимо быть участником для данных Blob-объектов хранилища.

Конвейеры Synapse используют удостоверение управляемой службы (MSI) среды для доступа к учетным записям хранения. Чтобы использовать пакеты MSSparkUtils в действиях конвейера, ваше удостоверение рабочей области должно быть участником для данных BLOB-объектов хранилища для доступа к учетной записи (или папке) ADLS 2-го поколения.

Выполните следующие действия, чтобы убедиться, что идентификатор Microsoft Entra и MSI рабочей области имеют доступ к учетной записи ADLS 2-го поколения:

  1. Откройте портал Azure и учетную запись хранения, к которой вы хотите получить доступ. Вы можете перейти к конкретному контейнеру, к которому необходим доступ.

  2. На левой панели выберите Управление доступом (IAM).

  3. Выберите Добавить>Добавить назначение ролей, чтобы открыть страницу "Добавление назначения ролей".

  4. Назначьте следующую роль. Подробные инструкции см. в статье Назначение ролей Azure с помощью портала Microsoft Azure.

    Параметр Значение
    Роль Участник данных хранилища BLOB-объектов
    Назначить доступ для USER и MANAGEDIDENTITY
    Участники учетная запись Microsoft Entra и удостоверение рабочей области

    Примечание.

    Имя управляемого удостоверения также является именем рабочей области.

    Страница

  5. Выберите Сохранить.

Вы можете получить доступ к данным в ADLS 2-го поколения с помощью Synapse Spark по следующему URL-адресу:

abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<path>

Настройка доступа к Хранилищу BLOB-объектов Azure

Synapse использует подписанный URL-адрес (SAS) для доступа к Хранилище BLOB-объектов Azure. Чтобы избежать предоставления ключей SAS в коде, рекомендуется создать новую связанную службу в рабочей области Synapse для учетной записи Хранилища BLOB-объектов Azure, к которой нужно получить доступ.

Чтобы добавить новую связанную службу для учетной записи Хранилища BLOB-объектов Azure, выполните эти действия:

  1. Откройте Azure Synapse Studio.
  2. В левой панели выберите Управление и выберите связанные службы в списке Внешние подключения.
  3. Найдите Хранилище BLOB-объектов Azure в панели Новая связанная служба справа.
  4. Выберите Продолжить.
  5. Выберите учетную запись Хранилища BLOB-объектов Azure для доступа и настройте имя связанной службы. Рекомендуется использовать ключ учетной записи в качестве способа проверки подлинности.
  6. Выберите Тестирование подключения для проверки настроек.
  7. Сначала выберите Создать, а затем щелкните Опубликовать все, чтобы сохранить изменения.

Вы можете получить доступ к данным в Хранилище BLOB-объектов Azure с помощью Synapse Spark по следующему URL-адресу:

wasb[s]://<container_name>@<storage_account_name>.blob.core.windows.net/<path>

Пример кода:

from pyspark.sql import SparkSession

# Azure storage access info
blob_account_name = 'Your account name' # replace with your blob name
blob_container_name = 'Your container name' # replace with your container name
blob_relative_path = 'Your path' # replace with your relative folder path
linked_service_name = 'Your linked service name' # replace with your linked service name

blob_sas_token = mssparkutils.credentials.getConnectionStringOrCreds(linked_service_name)

# Allow SPARK to access from Blob remotely

wasb_path = 'wasbs://%s@%s.blob.core.windows.net/%s' % (blob_container_name, blob_account_name, blob_relative_path)

spark.conf.set('fs.azure.sas.%s.%s.blob.core.windows.net' % (blob_container_name, blob_account_name), blob_sas_token)
print('Remote blob path: ' + wasb_path)
val blob_account_name = "" // replace with your blob name
val blob_container_name = "" //replace with your container name
val blob_relative_path = "/" //replace with your relative folder path
val linked_service_name = "" //replace with your linked service name


val blob_sas_token = mssparkutils.credentials.getConnectionStringOrCreds(linked_service_name)

val wasbs_path = f"wasbs://$blob_container_name@$blob_account_name.blob.core.windows.net/$blob_relative_path"
spark.conf.set(f"fs.azure.sas.$blob_container_name.$blob_account_name.blob.core.windows.net",blob_sas_token)

var blob_account_name = "";  // replace with your blob name
var blob_container_name = "";     // replace with your container name
var blob_relative_path = "";  // replace with your relative folder path
var linked_service_name = "";    // replace with your linked service name
var blob_sas_token = Credentials.GetConnectionStringOrCreds(linked_service_name);

spark.Conf().Set($"fs.azure.sas.{blob_container_name}.{blob_account_name}.blob.core.windows.net", blob_sas_token);

var wasbs_path = $"wasbs://{blob_container_name}@{blob_account_name}.blob.core.windows.net/{blob_relative_path}";

Console.WriteLine(wasbs_path);

# Azure storage access info
blob_account_name <- 'Your account name' # replace with your blob name
blob_container_name <- 'Your container name' # replace with your container name
blob_relative_path <- 'Your path' # replace with your relative folder path
linked_service_name <- 'Your linked service name' # replace with your linked service name

blob_sas_token <- mssparkutils.credentials.getConnectionStringOrCreds(linked_service_name)

# Allow SPARK to access from Blob remotely
sparkR.session()
wasb_path <- sprintf('wasbs://%s@%s.blob.core.windows.net/%s',blob_container_name, blob_account_name, blob_relative_path)
sparkR.session(sprintf('fs.azure.sas.%s.%s.blob.core.windows.net',blob_container_name, blob_account_name), blob_sas_token)

print( paste('Remote blob path: ',wasb_path))

Настройка доступа к Azure Key Vault

Вы можете добавить Azure Key Vault в качестве связанной службы для управления учетными данными в Synapse. Чтобы добавить Azure Key Vault в качестве связанной службы Synapse, выполните следующие действия:

  1. Откройте Azure Synapse Studio.

  2. В левой панели выберите Управление и выберите связанные службы в списке Внешние подключения.

  3. Найдите Azure Key Vault в панели Новая связанная служба справа.

  4. Выберите учетную запись Azure Key Vault для доступа и настройте имя связанной службы.

  5. Выберите Тестирование подключения для проверки настроек.

  6. Сначала выберите Создать, а затем щелкните Опубликовать все, чтобы сохранить изменения.

Записные книжки Synapse используют сквозную передачу Microsoft Entra для доступа к Azure Key Vault. Для доступа к Azure Key Vault в конвейерах Synapse используется удостоверение рабочей области (MSI). Чтобы убедиться, что код работает как в записной книжке, так и в конвейере Synapse, рекомендуется предоставить разрешение на секретный доступ как для учетной записи Microsoft Entra, так и для удостоверения рабочей области.

Чтобы предоставить удостоверению рабочей области доступ к секретам, выполните следующие действия:

  1. Откройте портал Azure и Azure Key Vault, к которым вы хотите получить доступ.
  2. На левой панели выберите Политики доступа.
  3. Выберите " Добавить политику доступа":
    • Выберите Управление ключами, секретами и сертификатами в качестве шаблона конфигурации.
    • Выберите учетную запись Microsoft Entra и удостоверение рабочей области (то же, что и имя рабочей области) в выбранном субъекте или убедитесь, что он уже назначен.
  4. Нажмите Выбрать и Добавить.
  5. Нажмите кнопку Сохранить, чтобы зафиксировать изменения.

Служебные программы файловой системы

mssparkutils.fs предоставляет служебные программы для работы с различными файловыми системами, включая Azure Data Lake Storage 2-го поколения (ADLS 2-го поколения) и Хранилище BLOB-объектов Azure. Убедитесь, что доступ к Azure Data Lake Storage 2-го поколения и Хранилищу BLOB-объектов Azure настроен правильно.

Чтобы получить общие сведения о доступных методах, выполните следующие команды:

from notebookutils import mssparkutils
mssparkutils.fs.help()
mssparkutils.fs.help()
using Microsoft.Spark.Extensions.Azure.Synapse.Analytics.Notebook.MSSparkUtils;
FS.Help()
library(notebookutils)
mssparkutils.fs.help()

Результат будет иметь такой вид:


mssparkutils.fs provides utilities for working with various FileSystems.

Below is overview about the available methods:

cp(from: String, to: String, recurse: Boolean = false): Boolean -> Copies a file or directory, possibly across FileSystems
mv(src: String, dest: String, create_path: Boolean = False, overwrite: Boolean = False): Boolean -> Moves a file or directory, possibly across FileSystems
ls(dir: String): Array -> Lists the contents of a directory
mkdirs(dir: String): Boolean -> Creates the given directory if it does not exist, also creating any necessary parent directories
put(file: String, contents: String, overwrite: Boolean = false): Boolean -> Writes the given String out to a file, encoded in UTF-8
head(file: String, maxBytes: int = 1024 * 100): String -> Returns up to the first 'maxBytes' bytes of the given file as a String encoded in UTF-8
append(file: String, content: String, createFileIfNotExists: Boolean): Boolean -> Append the content to a file
rm(dir: String, recurse: Boolean = false): Boolean -> Removes a file or directory

Use mssparkutils.fs.help("methodName") for more info about a method.

Перечень файлов

Список содержимое каталога.

mssparkutils.fs.ls('Your directory path')
mssparkutils.fs.ls("Your directory path")
FS.Ls("Your directory path")
mssparkutils.fs.ls("Your directory path")

Просмотр свойств файла.

Возвращает свойства файла, включая имя файла, путь к файлу, размер файла, время изменения файла и наличие каталога и файла.

files = mssparkutils.fs.ls('Your directory path')
for file in files:
    print(file.name, file.isDir, file.isFile, file.path, file.size, file.modifyTime)
val files = mssparkutils.fs.ls("/")
files.foreach{
    file => println(file.name,file.isDir,file.isFile,file.size,file.modifyTime)
}
var Files = FS.Ls("/");
foreach(var File in Files) {
    Console.WriteLine(File.Name+" "+File.IsDir+" "+File.IsFile+" "+File.Size);
}
files <- mssparkutils.fs.ls("/")
for (file in files) {
    writeLines(paste(file$name, file$isDir, file$isFile, file$size, file$modifyTime))
}

Создать новый каталог

Создает указанный каталог, если он не существует, и все необходимые родительские каталоги.

mssparkutils.fs.mkdirs('new directory name')
mssparkutils.fs.mkdirs("new directory name")
FS.Mkdirs("new directory name")
mssparkutils.fs.mkdirs("new directory name")

Копировать файл

Копирует файл или каталог. Поддерживает копирование в различных файловых системах.

mssparkutils.fs.cp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively
mssparkutils.fs.cp("source file or directory", "destination file or directory", true) // Set the third parameter as True to copy all files and directories recursively
FS.Cp("source file or directory", "destination file or directory", true) // Set the third parameter as True to copy all files and directories recursively
mssparkutils.fs.cp('source file or directory', 'destination file or directory', True)

Файл выполнения копирования

Этот метод обеспечивает более быстрый способ копирования или перемещения файлов, особенно больших объемов данных.

mssparkutils.fs.fastcp('source file or directory', 'destination file or directory', True) # Set the third parameter as True to copy all files and directories recursively

Примечание.

Этот метод поддерживается только в среде выполнения Azure Synapse для Apache Spark 3.3 и Azure Synapse Runtime для Apache Spark 3.4.

Предварительный просмотр содержимого файла

Возвращает первые "maxBytes" байтов заданного файла в виде строки в кодировке UTF-8.

mssparkutils.fs.head('file path', maxBytes to read)
mssparkutils.fs.head("file path", maxBytes to read)
FS.Head("file path", maxBytes to read)
mssparkutils.fs.head('file path', maxBytes to read)

Переместить файл

Перемещает файл или каталог. Поддерживает перемещение в рамках различных файловых систем.

mssparkutils.fs.mv('source file or directory', 'destination directory', True) # Set the last parameter as True to firstly create the parent directory if it does not exist
mssparkutils.fs.mv("source file or directory", "destination directory", true) // Set the last parameter as True to firstly create the parent directory if it does not exist
FS.Mv("source file or directory", "destination directory", true)
mssparkutils.fs.mv('source file or directory', 'destination directory', True) # Set the last parameter as True to firstly create the parent directory if it does not exist

Записать в файл

Записывает заданную строку в файл в кодировке UTF-8.

mssparkutils.fs.put("file path", "content to write", True) # Set the last parameter as True to overwrite the file if it existed already
mssparkutils.fs.put("file path", "content to write", true) // Set the last parameter as True to overwrite the file if it existed already
FS.Put("file path", "content to write", true) // Set the last parameter as True to overwrite the file if it existed already
mssparkutils.fs.put("file path", "content to write", True) # Set the last parameter as True to overwrite the file if it existed already

Добавить содержимое в файл

Добавляет заданную строку в файл в кодировке UTF-8.

mssparkutils.fs.append("file path", "content to append", True) # Set the last parameter as True to create the file if it does not exist
mssparkutils.fs.append("file path","content to append",true) // Set the last parameter as True to create the file if it does not exist
FS.Append("file path", "content to append", true) // Set the last parameter as True to create the file if it does not exist
mssparkutils.fs.append("file path", "content to append", True) # Set the last parameter as True to create the file if it does not exist

Удалить файл или каталог

Удаляет файл или каталог.

mssparkutils.fs.rm('file path', True) # Set the last parameter as True to remove all files and directories recursively
mssparkutils.fs.rm("file path", true) // Set the last parameter as True to remove all files and directories recursively
FS.Rm("file path", true) // Set the last parameter as True to remove all files and directories recursively
mssparkutils.fs.rm('file path', True) # Set the last parameter as True to remove all files and directories recursively

Служебные программы записных книжек

Не поддерживается.

Служебные программы записных книжек MSSparkUtils можно использовать для запуска записной книжки или выхода из нее со значением. Чтобы получить общие сведения о доступных методах, используйте следующую команду:

mssparkutils.notebook.help()

Результат:

The notebook module.

exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map): String -> This method runs a notebook and returns its exit value.

Примечание.

Служебные программы записной книжки не применимы для определений заданий Apache Spark (SJD).

Ссылка на записную книжку

Ссылается записную книжку и возвращает значение выхода. Вызовы вложенных функций можно запускать в записной книжке в интерактивном режиме или в конвейере. Записная книжка, на которую выполняется ссылка, будет выполняться в пуле Spark, в котором она вызывает эту функцию.


mssparkutils.notebook.run("notebook path", <timeoutSeconds>, <parameterMap>)

Например:

mssparkutils.notebook.run("folder/Sample1", 90, {"input": 20 })

Когда выполнение завершиться, вы увидите ссылку на моментальный снимок с именем View notebook run: Notebook Name (Просмотр выполнения записной книжки: "имя_записной_книжки") в выходных данных ячейки. Щелкните ссылку, чтобы просмотреть моментальный снимок для этого конкретного выполнения.

Снимок экрана: ссылка на моментальный снимок (Python)

Ссылка на параллельное выполнение нескольких записных книжек

Этот метод mssparkutils.notebook.runMultiple() позволяет выполнять несколько записных книжек параллельно или с предопределенной топологической структурой. API использует механизм реализации нескольких потоков в сеансе Spark, что означает, что вычислительные ресурсы совместно используются эталонной записной книжкой.

С помощью mssparkutils.notebook.runMultiple():

  • Одновременно выполняйте несколько записных книжек, не ожидая завершения каждой из них.

  • Укажите зависимости и порядок выполнения записных книжек с помощью простого формата JSON.

  • Оптимизируйте использование вычислительных ресурсов Spark и сократите затраты на проекты Synapse.

  • Просмотр моментальных снимков каждой записи запуска записной книжки в выходных данных и отладка и мониторинг задач записной книжки удобно.

  • Получите значение выхода для каждого исполнительного действия и используйте их в подчиненных задачах.

Вы также можете попытаться запустить mssparkutils.notebook.help("runMultiple"), чтобы найти пример и подробное использование.

Ниже приведен простой пример запуска списка записных книжек параллельно с помощью этого метода:


mssparkutils.notebook.runMultiple(["NotebookSimple", "NotebookSimple2"])

Результат выполнения корневой записной книжки выглядит следующим образом:

Снимок экрана: ссылка на список записных книжек.

Ниже приведен пример запуска записных книжек с топологической структурой с помощью mssparkutils.notebook.runMultiple(). Используйте этот метод для легкой оркестрации записных книжек с помощью интерфейса кода.

# run multiple notebooks with parameters
DAG = {
    "activities": [
        {
            "name": "NotebookSimple", # activity name, must be unique
            "path": "NotebookSimple", # notebook path
            "timeoutPerCellInSeconds": 90, # max timeout for each cell, default to 90 seconds
            "args": {"p1": "changed value", "p2": 100}, # notebook parameters
        },
        {
            "name": "NotebookSimple2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 2", "p2": 200}
        },
        {
            "name": "NotebookSimple2.2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 3", "p2": 300},
            "retry": 1,
            "retryIntervalInSeconds": 10,
            "dependencies": ["NotebookSimple"] # list of activity names that this activity depends on
        }
    ]
}
mssparkutils.notebook.runMultiple(DAG)

Примечание.

Выход из записной книжки

Выход из записной книжки со значением. Вызовы вложенных функций можно запускать в записной книжке в интерактивном режиме или в конвейере.

  • При вызове функции exit() из записной книжки в интерактивном режиме Azure Synapse вызовет исключение, пропустите ячейки subsequence и сохраните сеанс Spark в живых.

  • При управлении записной книжкой, которая вызывает функцию exit() в конвейере Synapse, Azure Synapse возвращает выходное значение, завершает выполнение конвейера и останавливает сеанс Spark.

  • При вызове функции exit() в записной книжке, на которую указывает ссылка, Azure Synapse прекращает работу в ней и продолжает выполнять следующие ячейки в записной книжке, которая вызывает функцию run(). Пример. Записная книжка Notebook1 содержит три ячейки и вызывает функцию exit() во второй ячейке. Записная книжка Notebook2 содержит пять ячеек и вызывает run(notebook1) в третьей ячейке. При запуске Notebook2 Notebook1 остановится на второй ячейке при вызове функции exit(). Notebook2 продолжит выполнять четвертую и пятую ячейки.

mssparkutils.notebook.exit("value string")

Например:

Записная книжка Sample1 находится в folder/ и содержит две ячейки:

  • Ячейка 1 определяет параметр ввода. Значение по умолчанию равно 10.
  • Ввод ячейки 2 имеет выходное значение.

Снимок экрана с примером записной книжки

Вы можете запустить Sample1 в другой записной книжке со значениями по умолчанию:


exitVal = mssparkutils.notebook.run("folder/Sample1")
print (exitVal)

Результат будет иметь такой вид:

Sample1 run success with input is 10

Вы можете запустить Sample1 в другой записной книжке и указать входное значение 20:

exitVal = mssparkutils.notebook.run("mssparkutils/folder/Sample1", 90, {"input": 20 })
print (exitVal)

Результат будет иметь такой вид:

Sample1 run success with input is 20

Служебные программы записных книжек MSSparkUtils можно использовать для запуска записной книжки или выхода из нее со значением. Чтобы получить общие сведения о доступных методах, используйте следующую команду:

mssparkutils.notebook.help()

Результат:

The notebook module.

exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map): String -> This method runs a notebook and returns its exit value.

Ссылка на записную книжку

Ссылается записную книжку и возвращает значение выхода. Вызовы вложенных функций можно запускать в записной книжке в интерактивном режиме или в конвейере. Записная книжка, на которую выполняется ссылка, будет выполняться в пуле Spark, в котором она вызывает эту функцию.


mssparkutils.notebook.run("notebook path", <timeoutSeconds>, <parameterMap>)

Например:

mssparkutils.notebook.run("folder/Sample1", 90, Map("input" -> 20))

Когда выполнение завершиться, вы увидите ссылку на моментальный снимок с именем View notebook run: Notebook Name (Просмотр выполнения записной книжки: "имя_записной_книжки") в выходных данных ячейки. Щелкните ссылку, чтобы просмотреть моментальный снимок для этого конкретного выполнения.

Снимок экрана: ссылка на моментальный снимок (Scala)

Выход из записной книжки

Выход из записной книжки со значением. Вызовы вложенных функций можно запускать в записной книжке в интерактивном режиме или в конвейере.

  • Когда вы вызываете функцию записной книжки exit() в интерактивном режиме, Azure Synapse выдает исключение, пропускает выполнение ячеек подпоследовательности и продолжает работу сеанса Spark.

  • При управлении записной книжкой, которая вызывает функцию exit() в конвейере Synapse, Azure Synapse возвращает выходное значение, завершает выполнение конвейера и останавливает сеанс Spark.

  • При вызове функции exit() в записной книжке, на которую указывает ссылка, Azure Synapse прекращает работу в ней и продолжает выполнять следующие ячейки в записной книжке, которая вызывает функцию run(). Пример. Записная книжка Notebook1 содержит три ячейки и вызывает функцию exit() во второй ячейке. Записная книжка Notebook2 содержит пять ячеек и вызывает run(notebook1) в третьей ячейке. При запуске Notebook2 Notebook1 остановится на второй ячейке при вызове функции exit(). Notebook2 продолжит выполнять четвертую и пятую ячейки.

mssparkutils.notebook.exit("value string")

Например:

Записная книжка Sample1 находится в mssparkutils/folder/ и содержит две ячейки:

  • Ячейка 1 определяет параметр ввода. Значение по умолчанию равно 10.
  • Ввод ячейки 2 имеет выходное значение.

Снимок экрана с примером записной книжки

Вы можете запустить Sample1 в другой записной книжке со значениями по умолчанию:


val exitVal = mssparkutils.notebook.run("mssparkutils/folder/Sample1")
print(exitVal)

Результат будет иметь такой вид:

exitVal: String = Sample1 run success with input is 10
Sample1 run success with input is 10

Вы можете запустить Sample1 в другой записной книжке и указать входное значение 20:

val exitVal = mssparkutils.notebook.run("mssparkutils/folder/Sample1", 90, {"input": 20 })
print(exitVal)

Результат будет иметь такой вид:

exitVal: String = Sample1 run success with input is 20
Sample1 run success with input is 20

Служебные программы записных книжек MSSparkUtils можно использовать для запуска записной книжки или выхода из нее со значением. Чтобы получить общие сведения о доступных методах, используйте следующую команду:

mssparkutils.notebook.help()

Результат:

The notebook module.

exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map): String -> This method runs a notebook and returns its exit value.

Ссылка на записную книжку

Ссылается записную книжку и возвращает значение выхода. Вызовы вложенных функций можно запускать в записной книжке в интерактивном режиме или в конвейере. Записная книжка, на которую выполняется ссылка, будет выполняться в пуле Spark, в котором она вызывает эту функцию.


mssparkutils.notebook.run("notebook path", <timeoutSeconds>, <parameterMap>)

Например:

mssparkutils.notebook.run("folder/Sample1", 90, list("input": 20))

Когда выполнение завершиться, вы увидите ссылку на моментальный снимок с именем View notebook run: Notebook Name (Просмотр выполнения записной книжки: "имя_записной_книжки") в выходных данных ячейки. Щелкните ссылку, чтобы просмотреть моментальный снимок для этого конкретного выполнения.

Выход из записной книжки

Выход из записной книжки со значением. Вызовы вложенных функций можно запускать в записной книжке в интерактивном режиме или в конвейере.

  • Когда вы вызываете функцию записной книжки exit() в интерактивном режиме, Azure Synapse выдает исключение, пропускает выполнение ячеек подпоследовательности и продолжает работу сеанса Spark.

  • При управлении записной книжкой, которая вызывает функцию exit() в конвейере Synapse, Azure Synapse возвращает выходное значение, завершает выполнение конвейера и останавливает сеанс Spark.

  • При вызове функции exit() в записной книжке, на которую указывает ссылка, Azure Synapse прекращает работу в ней и продолжает выполнять следующие ячейки в записной книжке, которая вызывает функцию run(). Пример. Записная книжка Notebook1 содержит три ячейки и вызывает функцию exit() во второй ячейке. Записная книжка Notebook2 содержит пять ячеек и вызывает run(notebook1) в третьей ячейке. При запуске Notebook2 Notebook1 остановится на второй ячейке при вызове функции exit(). Notebook2 продолжит выполнять четвертую и пятую ячейки.

mssparkutils.notebook.exit("value string")

Например:

Записная книжка Sample1 находится в folder/ и содержит две ячейки:

  • Ячейка 1 определяет параметр ввода. Значение по умолчанию равно 10.
  • Ввод ячейки 2 имеет выходное значение.

Снимок экрана с примером записной книжки

Вы можете запустить Sample1 в другой записной книжке со значениями по умолчанию:


exitVal <- mssparkutils.notebook.run("folder/Sample1")
print (exitVal)

Результат будет иметь такой вид:

Sample1 run success with input is 10

Вы можете запустить Sample1 в другой записной книжке и указать входное значение 20:

exitVal <- mssparkutils.notebook.run("mssparkutils/folder/Sample1", 90, list("input": 20))
print (exitVal)

Результат будет иметь такой вид:

Sample1 run success with input is 20

Служебные программы для учетных данных

Вы можете использовать служебные программы для учетных данных MSSparkUtils для получения маркеров доступа к связанным службам и управления секретами в Azure Key Vault.

Чтобы получить общие сведения о доступных методах, используйте следующую команду:

mssparkutils.credentials.help()
mssparkutils.credentials.help()
Not supported.
mssparkutils.credentials.help()

Полученный результат:

getToken(audience, name): returns AAD token for a given audience, name (optional)
isValidToken(token): returns true if token hasn't expired
getConnectionStringOrCreds(linkedService): returns connection string or credentials for linked service
getFullConnectionString(linkedService): returns full connection string with credentials
getPropertiesAll(linkedService): returns all the properties of a linked servicegetSecret(akvName, secret, linkedService): returns AKV secret for a given AKV linked service, akvName, secret key
getSecret(akvName, secret): returns AKV secret for a given akvName, secret key
getSecretWithLS(linkedService, secret): returns AKV secret for a given linked service, secret key
putSecret(akvName, secretName, secretValue, linkedService): puts AKV secret for a given akvName, secretName
putSecret(akvName, secretName, secretValue): puts AKV secret for a given akvName, secretName
putSecretWithLS(linkedService, secretName, secretValue): puts AKV secret for a given linked service, secretName
getToken(audience, name): returns AAD token for a given audience, name (optional)
isValidToken(token): returns true if token hasn't expired
getConnectionStringOrCreds(linkedService): returns connection string or credentials for linked service
getFullConnectionString(linkedService): returns full connection string with credentials
getPropertiesAll(linkedService): returns all the properties of a linked servicegetSecret(akvName, secret, linkedService): returns AKV secret for a given AKV linked service, akvName, secret key
getSecret(akvName, secret): returns AKV secret for a given akvName, secret key
getSecretWithLS(linkedService, secret): returns AKV secret for a given linked service, secret key
putSecret(akvName, secretName, secretValue, linkedService): puts AKV secret for a given akvName, secretName
putSecret(akvName, secretName, secretValue): puts AKV secret for a given akvName, secretName
putSecretWithLS(linkedService, secretName, secretValue): puts AKV secret for a given linked service, secretName
getToken(audience, name): returns AAD token for a given audience, name (optional)
isValidToken(token): returns true if token hasn't expired
getConnectionStringOrCreds(linkedService): returns connection string or credentials for linked service
getFullConnectionString(linkedService): returns full connection string with credentials
getPropertiesAll(linkedService): returns all the properties of a linked servicegetSecret(akvName, secret, linkedService): returns AKV secret for a given AKV linked service, akvName, secret key
getSecret(akvName, secret): returns AKV secret for a given akvName, secret key
putSecret(akvName, secretName, secretValue, linkedService): puts AKV secret for a given akvName, secretName
putSecret(akvName, secretName, secretValue): puts AKV secret for a given akvName, secretName
putSecretWithLS(linkedService, secretName, secretValue): puts AKV secret for a given linked service, secretName

Примечание.

В настоящее время getSecretWithLS(linkedService, secret) не поддерживается в C#.

getToken(audience, name): returns AAD token for a given audience, name (optional)
isValidToken(token): returns true if token hasn't expired
getConnectionStringOrCreds(linkedService): returns connection string or credentials for linked service
getFullConnectionString(linkedService): returns full connection string with credentials
getPropertiesAll(linkedService): returns all the properties of a linked servicegetSecret(akvName, secret, linkedService): returns AKV secret for a given AKV linked service, akvName, secret key
getSecret(akvName, secret): returns AKV secret for a given akvName, secret key
getSecretWithLS(linkedService, secret): returns AKV secret for a given linked service, secret key
putSecret(akvName, secretName, secretValue, linkedService): puts AKV secret for a given akvName, secretName
putSecret(akvName, secretName, secretValue): puts AKV secret for a given akvName, secretName
putSecretWithLS(linkedService, secretName, secretValue): puts AKV secret for a given linked service, secretName

Получение токена

Возвращает токен Microsoft Entra для заданной аудитории, имя (необязательно). В следующей таблице перечислены все доступные типы аудиторий.

Тип аудитории Строковый литерал, используемый в вызове API
Хранилище Azure Storage
Azure Key Vault Vault
Управление и безопасность AzureManagement
Хранилище данных SQL Azure (выделенное и бессерверное) DW
Azure Synapse Synapse
Azure Data Lake Store DataLakeStore
Azure Data Factory ADF
Azure Data Explorer AzureDataExplorer
База данных Azure для MySQL AzureOSSDB
База данных Azure для MariaDB AzureOSSDB
База данных Azure для PostgreSQL AzureOSSDB
mssparkutils.credentials.getToken('audience Key')
mssparkutils.credentials.getToken("audience Key")
Credentials.GetToken("audience Key")
mssparkutils.credentials.getToken('audience Key')

Проверить маркер

Возвращает значение true, если срок действия маркера не истек.

mssparkutils.credentials.isValidToken('your token')
mssparkutils.credentials.isValidToken("your token")
Credentials.IsValidToken("your token")
mssparkutils.credentials.isValidToken('your token')

Получить строку подключения или учетные данные для связанной службы

Возвращает строку подключения или учетные данные для связанной службы.

mssparkutils.credentials.getConnectionStringOrCreds('linked service name')
mssparkutils.credentials.getConnectionStringOrCreds("linked service name")
Credentials.GetConnectionStringOrCreds("linked service name")
mssparkutils.credentials.getConnectionStringOrCreds('linked service name')

Получить секрет с помощью удостоверения рабочей области

Возвращает секрет Azure Key Vault для указанного имени Azure Key Vault, имени секрета и имени связанной службы с помощью удостоверения рабочей области. Убедитесь, что доступ к Azure Key Vault правильно настроен.

mssparkutils.credentials.getSecret('azure key vault name','secret name','linked service name')
mssparkutils.credentials.getSecret("azure key vault name","secret name","linked service name")
Credentials.GetSecret("azure key vault name","secret name","linked service name")
mssparkutils.credentials.getSecret('azure key vault name','secret name','linked service name')

Получить секрет с помощью учетных данных пользователя

Возвращает секрет Azure Key Vault для указанного имени Azure Key Vault, имени секрета и имени связанной службы с помощью учетных данных пользователя.

mssparkutils.credentials.getSecret('azure key vault name','secret name')
mssparkutils.credentials.getSecret("azure key vault name","secret name")
Credentials.GetSecret("azure key vault name","secret name")
mssparkutils.credentials.getSecret('azure key vault name','secret name')

Разместить секрет с помощью удостоверения рабочей области

Размещает секрет Azure Key Vault для указанного имени Azure Key Vault, имени секрета и имени связанной службы с помощью удостоверения рабочей области. Убедитесь, что доступ к Azure Key Vault настроен правильно.

mssparkutils.credentials.putSecret('azure key vault name','secret name','secret value','linked service name')

Разместить секрет с помощью удостоверения рабочей области

Размещает секрет Azure Key Vault для указанного имени Azure Key Vault, имени секрета и имени связанной службы с помощью удостоверения рабочей области. Убедитесь, что доступ к Azure Key Vault настроен правильно.

mssparkutils.credentials.putSecret("azure key vault name","secret name","secret value","linked service name")

Разместить секрет с помощью удостоверения рабочей области

Размещает секрет Azure Key Vault для указанного имени Azure Key Vault, имени секрета и имени связанной службы с помощью удостоверения рабочей области. Убедитесь, что доступ к Azure Key Vault настроен правильно.

mssparkutils.credentials.putSecret('azure key vault name','secret name','secret value','linked service name')

Разместить секрет с помощью учетных данных пользователя

Размещает секрет Azure Key Vault для указанного имени Azure Key Vault, имени секрета и имени связанной службы с помощью учетных данных пользователя.

mssparkutils.credentials.putSecret('azure key vault name','secret name','secret value')

Разместить секрет с помощью учетных данных пользователя

Размещает секрет Azure Key Vault для указанного имени Azure Key Vault, имени секрета и имени связанной службы с помощью учетных данных пользователя.

mssparkutils.credentials.putSecret('azure key vault name','secret name','secret value')

Разместить секрет с помощью учетных данных пользователя

Размещает секрет Azure Key Vault для указанного имени Azure Key Vault, имени секрета и имени связанной службы с помощью учетных данных пользователя.

mssparkutils.credentials.putSecret("azure key vault name","secret name","secret value")

Служебные программы среды

Чтобы получить общие сведения о доступных методах, используйте следующие команды:

mssparkutils.env.help()
mssparkutils.env.help()
mssparkutils.env.help()
Env.Help()

Полученный результат:

getUserName(): returns user name
getUserId(): returns unique user id
getJobId(): returns job id
getWorkspaceName(): returns workspace name
getPoolName(): returns Spark pool name
getClusterId(): returns cluster id

Получить имя пользователя

Возвращает имя текущего пользователя.

mssparkutils.env.getUserName()
mssparkutils.env.getUserName()
mssparkutils.env.getUserName()
Env.GetUserName()

Получить идентификатор пользователя

Возвращает идентификатор текущего пользователя.

mssparkutils.env.getUserId()
mssparkutils.env.getUserId()
mssparkutils.env.getUserId()
Env.GetUserId()

Получить идентификатор задания

Возвращает идентификатор задания.

mssparkutils.env.getJobId()
mssparkutils.env.getJobId()
mssparkutils.env.getJobId()
Env.GetJobId()

Получить имя рабочей области

Возвращает имя рабочей области.

mssparkutils.env.getWorkspaceName()
mssparkutils.env.getWorkspaceName()
mssparkutils.env.getWorkspaceName()
Env.GetWorkspaceName()

Получить имя пула

Возвращает имя пула Spark.

mssparkutils.env.getPoolName()
mssparkutils.env.getPoolName()
mssparkutils.env.getPoolName()
Env.GetPoolName()

Получить идентификатор кластера

Возвращает идентификатор текущего кластера.

mssparkutils.env.getClusterId()
mssparkutils.env.getClusterId()
mssparkutils.env.getClusterId()
Env.GetClusterId()

Контекст среды выполнения

Служебные программы Utils среды выполнения Mssparkutils предоставляют 3 свойства среды выполнения, и вы можете использовать контекст среды выполнения mssparkutils для получения перечисленных ниже свойств:

  • Notebookname — имя текущей записной книжки; всегда возвращает значение как в интерактивном режиме, так и в режиме конвейера.
  • Pipelinejobid — идентификатор выполнения конвейера; возвращает значение в режиме конвейера и пустую строку в интерактивном режиме.
  • Activityrunid — идентификатор запуска действия записной книжки; возвращает значение в режиме конвейера и пустую строку в интерактивном режиме.

В настоящее время контекст среды выполнения поддерживает Python и Scala.

mssparkutils.runtime.context
ctx <- mssparkutils.runtime.context()
for (key in ls(ctx)) {
    writeLines(paste(key, ctx[[key]], sep = "\t"))
}
%%spark
mssparkutils.runtime.context

Управление сеансом

Остановка интерактивного сеанса

Вместо того, чтобы вручную нажимать кнопку "Стоп", иногда удобнее остановить интерактивный сеанс, вызвав API в коде. Для таких случаев мы предоставляем API mssparkutils.session.stop() для остановки интерактивного сеанса с помощью кода, который доступен для Scala и Python.

mssparkutils.session.stop()
mssparkutils.session.stop()
mssparkutils.session.stop()

API mssparkutils.session.stop() асинхронно остановит текущий интерактивный сеанс в фоновом режиме. Он останавливает сеанс Spark и освободит ресурсы, занятые сеансом, чтобы они были доступны для других сеансов в том же пуле.

Примечание.

Мы не рекомендуем вызывать встроенные API языка, такие как sys.exit в Scala или sys.exit() в Python, в вашем коде, так как такие API просто завершают процесс интерпретатора, оставляя сеанс Spark активным, а ресурсы не высвобождаемыми.

Зависимости пакетов

Если вы хотите локально разрабатывать записные книжки или задания и ссылаться на соответствующие пакеты для подсказок компиляции или интегрированной среды разработки, можно использовать следующие пакеты.

Следующие шаги