Поделиться через


Служебные программы Microsoft Spark (MSSparkUtils) для Fabric

Служебные программы Microsoft Spark (MSSparkUtils) — это встроенный пакет, помогающий легко выполнять распространенные задачи. С помощью MSSparkUtils можно работать с файловыми системами и секретами, получать переменные среды и связывать записные книжки. Пакет MSSparkUtils доступен в конвейерах PySpark (Python), Scala, SparkR notebooks и Fabric.

Примечание.

  • MsSparkUtils официально переименована в NotebookUtils. Существующий код будет оставаться обратно совместимым и не приведет к критическим изменениям. Настоятельно рекомендуется обновить записные книжки до записных книжек, чтобы обеспечить постоянную поддержку и доступ к новым функциям. Пространство имен mssparkutils будет прекращено в будущем.
  • NotebookUtils предназначен для работы с Spark 3.4(Runtime версии 1.2) и более поздних версий. Все новые функции и обновления будут поддерживаться исключительно с пространством имен notebookutils.

Служебные программы файловой системы

mssparkutils.fs предоставляет служебные программы для работы с различными файловыми системами, включая Azure Data Lake Storage (ADLS) 2-го поколения и Хранилище BLOB-объектов Azure. Убедитесь, что доступ к Azure Data Lake Storage 2-го поколения и Хранилищу BLOB-объектов Azure настроен правильно.

Чтобы получить общие сведения о доступных методах, выполните следующие команды:

from notebookutils import mssparkutils
mssparkutils.fs.help()

Выходные данные

mssparkutils.fs provides utilities for working with various FileSystems.

Below is overview about the available methods:

cp(from: String, to: String, recurse: Boolean = false): Boolean -> Copies a file or directory, possibly across FileSystems
mv(from: String, to: String, recurse: Boolean = false): Boolean -> Moves a file or directory, possibly across FileSystems
ls(dir: String): Array -> Lists the contents of a directory
mkdirs(dir: String): Boolean -> Creates the given directory if it does not exist, also creating any necessary parent directories
put(file: String, contents: String, overwrite: Boolean = false): Boolean -> Writes the given String out to a file, encoded in UTF-8
head(file: String, maxBytes: int = 1024 * 100): String -> Returns up to the first 'maxBytes' bytes of the given file as a String encoded in UTF-8
append(file: String, content: String, createFileIfNotExists: Boolean): Boolean -> Append the content to a file
rm(dir: String, recurse: Boolean = false): Boolean -> Removes a file or directory
exists(file: String): Boolean -> Check if a file or directory exists
mount(source: String, mountPoint: String, extraConfigs: Map[String, Any]): Boolean -> Mounts the given remote storage directory at the given mount point
unmount(mountPoint: String): Boolean -> Deletes a mount point
mounts(): Array[MountPointInfo] -> Show information about what is mounted
getMountPath(mountPoint: String, scope: String = ""): String -> Gets the local path of the mount point

Use mssparkutils.fs.help("methodName") for more info about a method.

MSSparkUtils работает с файловой системой так же, как и с API Spark. Пример использования mssparkuitls.fs.mkdirs() и Fabric lakehouse:

Использование Относительный путь из корневого каталога HDFS Абсолютный путь для файловой системы ABFS Абсолютный путь к локальной файловой системе на узле драйвера
Nondefault lakehouse Не поддерживается mssparkutils.fs.mkdirs("abfss:// container_name>@storage_account_name.dfs.core.windows.net/<>< new_dir")<> mssparkutils.fs.mkdirs("file:/<new_dir>")
Озеро по умолчанию Каталог в разделе "Файлы" или "Таблицы": mssparkutils.fs.mkdirs("Files/<new_dir>") mssparkutils.fs.mkdirs("abfss:// container_name>@storage_account_name.dfs.core.windows.net/<>< new_dir")<> mssparkutils.fs.mkdirs("file:/<new_dir>")

Перечень файлов

Чтобы получить список содержимого каталога, используйте mssparkutils.fs.ls("Путь к каталогу") Например:

mssparkutils.fs.ls("Files/tmp") # works with the default lakehouse files using relative path 
mssparkutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<path>")  # based on ABFS file system 
mssparkutils.fs.ls("file:/tmp")  # based on local file system of driver node 

Просмотр свойств файла.

Этот метод возвращает свойства файла, включая имя файла, путь к файлу, размер файла и то, является ли он каталогом и файлом.

files = mssparkutils.fs.ls('Your directory path')
for file in files:
    print(file.name, file.isDir, file.isFile, file.path, file.size)

Создать новый каталог

Этот метод создает указанный каталог, если он не существует, и создает все необходимые родительские каталоги.

mssparkutils.fs.mkdirs('new directory name')  
mssparkutils.fs. mkdirs("Files/<new_dir>")  # works with the default lakehouse files using relative path 
mssparkutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<new_dir>")  # based on ABFS file system 
mssparkutils.fs.ls("file:/<new_dir>")  # based on local file system of driver node 

Копировать файл

Этот метод копирует файл или каталог и поддерживает действие копирования в файловых системах.

mssparkutils.fs.cp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively

Файл выполнения копирования

Этот метод обеспечивает более быстрый способ копирования или перемещения файлов, особенно больших объемов данных.

mssparkutils.fs.fastcp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively

Предварительный просмотр содержимого файла

Этот метод возвращает до первых байтов maxBytes заданного файла в виде строки, закодированной в UTF-8.

mssparkutils.fs.head('file path', maxBytes to read)

Переместить файл

Этот метод перемещает файл или каталог и поддерживает перемещение между файловыми системами.

mssparkutils.fs.mv('source file or directory', 'destination directory', True) # Set the last parameter as True to firstly create the parent directory if it does not exist
mssparkutils.fs.mv('source file or directory', 'destination directory', True, True) # Set the third parameter to True to firstly create the parent directory if it does not exist. Set the last parameter to True to overwrite the updates.

Записать в файл

Этот метод записывает указанную строку в файл, закодированный в UTF-8.

mssparkutils.fs.put("file path", "content to write", True) # Set the last parameter as True to overwrite the file if it existed already

Добавить содержимое в файл

Этот метод добавляет заданную строку в файл, закодированный в UTF-8.

mssparkutils.fs.append("file path", "content to append", True) # Set the last parameter as True to create the file if it does not exist

Удалить файл или каталог

Этот метод удаляет файл или каталог.

mssparkutils.fs.rm('file path', True) # Set the last parameter as True to remove all files and directories recursively

Каталог mount/unmount

Дополнительные сведения об использовании в подключении файлов и отключении.

Служебные программы записных книжек

Используйте служебные программы записной книжки MSSparkUtils для запуска записной книжки или выхода из записной книжки со значением. Чтобы получить общие сведения о доступных методах, используйте следующую команду:

mssparkutils.notebook.help()

Выходные данные:


exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map): String -> This method runs a notebook and returns its exit value.

Примечание.

Служебные программы записной книжки не применимы для определений заданий Apache Spark (SJD).

Ссылка на записную книжку

Этот метод ссылается на записную книжку и возвращает значение выхода. Вызовы вложенных функций можно запускать в записной книжке в интерактивном режиме или в конвейере. На записную книжку, на которую ссылается ссылка, выполняется в пуле Spark записной книжки, которая вызывает эту функцию.

mssparkutils.notebook.run("notebook name", <timeoutSeconds>, <parameterMap>, <workspaceId>)

Например:

mssparkutils.notebook.run("Sample1", 90, {"input": 20 })

Записная книжка Fabric также поддерживает ссылки на записные книжки в нескольких рабочих областях, указав идентификатор рабочей области.

mssparkutils.notebook.run("Sample1", 90, {"input": 20 }, "fe0a6e2a-a909-4aa3-a698-0a651de790aa")

Ссылку моментального снимка ссылки можно открыть в выходных данных ячейки. Моментальный снимок записывает результаты выполнения кода и позволяет легко отлаживать эталонный запуск.

Снимок экрана: результат выполнения ссылки.

Снимок экрана: пример моментального снимка.

Примечание.

  • Справочная записная книжка между рабочими областями поддерживается средой выполнения версии 1.2 и выше.
  • Если вы используете файлы в разделе "Ресурс записной книжки", используйте mssparkutils.nbResPath в записной книжке, на которой ссылается ссылка, чтобы убедиться, что он указывает на ту же папку, что и в интерактивном запуске.

Ссылка на параллельное выполнение нескольких записных книжек

Внимание

Эта функция доступна в предварительной версии.

Этот метод mssparkutils.notebook.runMultiple() позволяет выполнять несколько записных книжек параллельно или с предопределенной топологической структурой. API использует механизм реализации нескольких потоков в сеансе Spark, что означает, что вычислительные ресурсы совместно используются эталонной записной книжкой.

С помощью mssparkutils.notebook.runMultiple():

  • Одновременно выполняйте несколько записных книжек, не ожидая завершения каждой из них.

  • Укажите зависимости и порядок выполнения записных книжек с помощью простого формата JSON.

  • Оптимизируйте использование вычислительных ресурсов Spark и уменьшите затраты на проекты Fabric.

  • Просмотр моментальных снимков каждой записи запуска записной книжки в выходных данных и отладка и мониторинг задач записной книжки удобно.

  • Получите значение выхода для каждого исполнительного действия и используйте их в подчиненных задачах.

Вы также можете попытаться запустить mssparkutils.notebook.help("runMultiple"), чтобы найти пример и подробное использование.

Ниже приведен простой пример запуска списка записных книжек параллельно с помощью этого метода:


mssparkutils.notebook.runMultiple(["NotebookSimple", "NotebookSimple2"])

Результат выполнения корневой записной книжки выглядит следующим образом:

Снимок экрана: ссылка на список записных книжек.

Ниже приведен пример запуска записных книжек с топологической структурой с помощью mssparkutils.notebook.runMultiple(). Используйте этот метод для легкой оркестрации записных книжек с помощью интерфейса кода.

# run multiple notebooks with parameters
DAG = {
    "activities": [
        {
            "name": "NotebookSimple", # activity name, must be unique
            "path": "NotebookSimple", # notebook path
            "timeoutPerCellInSeconds": 90, # max timeout for each cell, default to 90 seconds
            "args": {"p1": "changed value", "p2": 100}, # notebook parameters
        },
        {
            "name": "NotebookSimple2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 2", "p2": 200}
        },
        {
            "name": "NotebookSimple2.2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 3", "p2": 300},
            "retry": 1,
            "retryIntervalInSeconds": 10,
            "dependencies": ["NotebookSimple"] # list of activity names that this activity depends on
        }
    ],
    "timeoutInSeconds": 43200, # max timeout for the entire DAG, default to 12 hours
    "concurrency": 50 # max number of notebooks to run concurrently, default to 50
}
mssparkutils.notebook.runMultiple(DAG, {"displayDAGViaGraphviz": False})

Результат выполнения корневой записной книжки выглядит следующим образом:

Снимок экрана: ссылка на список записных книжек с параметрами.

Примечание.

  • Степень параллелизма выполнения нескольких записных книжек ограничена общим доступным вычислительным ресурсом сеанса Spark.
  • Верхний предел для действий записной книжки или параллельных записных книжек составляет 50. Превышение этого предела может привести к проблемам стабильности и производительности из-за высокого использования вычислительных ресурсов. Если возникают проблемы, рассмотрите возможность разделения записных книжек на несколько runMultiple вызовов или уменьшения параллелизма, изменив поле параллелизма в параметре DAG.
  • Время ожидания по умолчанию для всего DAG составляет 12 часов, а время ожидания по умолчанию для каждой ячейки в дочерней записной книжке составляет 90 секунд. Вы можете изменить время ожидания, задав поля timeoutInSeconds и timeoutPerCellInSeconds в параметре DAG.

Выход из записной книжки

Этот метод завершает записную книжку со значением. Вызовы вложенных функций можно запускать в записной книжке в интерактивном режиме или в конвейере.

  • При вызове функции exit() из записной книжки в интерактивном режиме записная книжка Fabric создает исключение, пропускает последующие ячейки и сохраняет сеанс Spark в живых.

  • При оркестрации записной книжки в конвейере, который вызывает функцию exit(), действие записной книжки возвращается со значением выхода, завершает выполнение конвейера и останавливает сеанс Spark.

  • При вызове функции exit() в записной книжке, на которую ссылается ссылка, Fabric Spark остановит дальнейшее выполнение записной книжки, на которую ссылается ссылка, и продолжит выполнять следующие ячейки в главной записной книжке, которая вызывает функцию run(). Например: Notebook1 имеет три ячейки и вызывает функцию exit() во второй ячейке. Notebook2 содержит пять ячеек и вызывает run(notebook1) в третьей ячейке. При запуске Notebook2 Записная книжка1 останавливается во второй ячейке при нажатии функции exit(). Notebook2 продолжает выполнять свою четвертую и пятую ячейку.

mssparkutils.notebook.exit("value string")

Например:

Пример1 записной книжки со следующими двумя ячейками:

  • Ячейка 1 определяет входной параметр со значением по умолчанию, равным 10.

  • Ячейка 2 выходит из записной книжки с входным значением.

Снимок экрана: пример записной книжки выхода.

Вы можете запустить Sample1 в другой записной книжке со значениями по умолчанию:

exitVal = mssparkutils.notebook.run("Sample1")
print (exitVal)

Выходные данные:

Notebook executed successfully with exit value 10

Вы можете запустить Sample1 в другой записной книжке и указать входное значение 20:

exitVal = mssparkutils.notebook.run("Sample1", 90, {"input": 20 })
print (exitVal)

Выходные данные:

Notebook executed successfully with exit value 20

Служебные программы для учетных данных

Служебные программы учетных данных MSSparkUtils можно использовать для получения маркеров доступа и управления секретами в Azure Key Vault.

Чтобы получить общие сведения о доступных методах, используйте следующую команду:

mssparkutils.credentials.help()

Выходные данные:

getToken(audience, name): returns AAD token for a given audience, name (optional)
getSecret(keyvault_endpoint, secret_name): returns secret for a given Key Vault and secret name

Получение токена

getToken возвращает маркер Microsoft Entra для заданной аудитории и имени (необязательно). В следующем списке показаны доступные в настоящее время ключи аудитории:

  • Ресурс аудитории хранилища: "хранилище"
  • Ресурс Power BI: "pbi"
  • Ресурс Azure Key Vault: keyvault
  • Ресурс RTA KQL DB Synapse: kusto

Выполните следующую команду, чтобы получить маркер:

mssparkutils.credentials.getToken('audience Key')

Получить секрет с помощью учетных данных пользователя

getSecret возвращает секрет Azure Key Vault для заданной конечной точки Azure Key Vault и имени секрета с использованием учетных данных пользователя.

mssparkutils.credentials.getSecret('https://<name>.vault.azure.net/', 'secret name')

Подключение файлов и отключение

Fabric поддерживает следующие сценарии подключения в пакете служебных программ Microsoft Spark. Вы можете использовать интерфейсы API подключения, отключения, getMountPath()и mounts() для подключения удаленного хранилища (ADLS 2-го поколения) ко всем рабочим узлам (узлам драйвера и рабочим узлам). После установки точки подключения хранилища используйте локальный API файлов для доступа к данным, как будто он хранится в локальной файловой системе.

Подключение учетной записи ADLS 2-го поколения

В следующем примере показано, как подключить Azure Data Lake Storage 2-го поколения. Подключение Хранилища BLOB-объектов работает аналогично.

В этом примере предполагается, что у вас есть одна учетная запись Data Lake Storage 2-го поколения с именем storegen2, а у учетной записи есть один контейнер с именем mycontainer, который требуется подключить к /test в сеанс Spark записной книжки.

Снимок экрана, показывающий, где выбрать контейнер для подключения.

Чтобы подключить контейнер с именем mycontainer, mssparkutils сначала необходимо проверить, есть ли у вас разрешение на доступ к контейнеру. В настоящее время Fabric поддерживает два метода проверки подлинности для операции подключения триггера: accountKey и sastoken.

Подключение с помощью маркера подписанного URL-адреса или ключа учетной записи

MSSparkUtils поддерживает явное передача ключа учетной записи или маркера подписанного URL-адреса ( SAS) в качестве параметра для подключения целевого объекта.

По соображениям безопасности рекомендуется хранить ключи учетной записи или маркеры SAS в Azure Key Vault (как показано на следующем снимке экрана). Затем их можно получить с помощью API mssparkutils.credentials.getSecret . Дополнительные сведения об Azure Key Vault см. в статье "Сведения о ключах управляемой учетной записи хранения Azure Key Vault".

Снимок экрана, показывающий, где хранятся секреты в Azure Key Vault.

Пример кода для метода accountKey :

from notebookutils import mssparkutils  
# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
accountKey = mssparkutils.credentials.getSecret("<vaultURI>", "<secretName>")
mssparkutils.fs.mount(  
    "abfss://mycontainer@<accountname>.dfs.core.windows.net",  
    "/test",  
    {"accountKey":accountKey}
)

Пример кода для sastoken:

from notebookutils import mssparkutils  
# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
sasToken = mssparkutils.credentials.getSecret("<vaultURI>", "<secretName>")
mssparkutils.fs.mount(  
    "abfss://mycontainer@<accountname>.dfs.core.windows.net",  
    "/test",  
    {"sasToken":sasToken}
)

Примечание.

Может потребоваться импортировать пакет mssparkutils, если он недоступен.

from notebookutils import mssparkutils

Параметры подключения:

  • fileCacheTimeout: большие двоичные объекты будут кэшироваться в локальной папке temp в течение 120 секунд по умолчанию. В течение этого времени blobfuse не будет проверять, обновлен ли файл. Параметр может быть задан для изменения времени ожидания по умолчанию. При одновременном изменении нескольких клиентов файлов, чтобы избежать несоответствий между локальными и удаленными файлами, рекомендуется сократить время кэша или даже изменить его на 0 и всегда получать последние файлы с сервера.
  • время ожидания: время ожидания операции подключения составляет 120 секунд по умолчанию. Параметр может быть задан для изменения времени ожидания по умолчанию. Если количество исполнителей или время ожидания подключения слишком много, рекомендуется увеличить значение.

Эти параметры можно использовать следующим образом:

mssparkutils.fs.mount(
   "abfss://mycontainer@<accountname>.dfs.core.windows.net",
   "/test",
   {"fileCacheTimeout": 120, "timeout": 120}
)

Примечание.

По соображениям безопасности рекомендуется не хранить учетные данные в коде. Чтобы защитить свои учетные данные, мы отредактируем секрет в выходных данных записной книжки. Дополнительные сведения см. в статье Скрытие секретов.

Как подключить озеро

Пример кода для подключения lakehouse к /test:

from notebookutils import mssparkutils 
mssparkutils.fs.mount( 
 "abfss://<workspace_id>@onelake.dfs.fabric.microsoft.com/<lakehouse_id>", 
 "/test"
)

Примечание.

Подключение региональной конечной точки не поддерживается. Fabric поддерживает подключение только глобальной конечной точки onelake.dfs.fabric.microsoft.com.

Доступ к файлам в точке подключения с помощью API mssparktuils fs

Основной целью операции подключения является предоставление клиентам доступа к данным, хранящимся в удаленной учетной записи хранения, с помощью API локальной файловой системы. Вы также можете получить доступ к данным с помощью API mssparkutils fs с подключенным путем в качестве параметра. Этот формат пути немного отличается.

Предположим, что вы подключили контейнер Data Lake Storage 2-го поколения mycontainer к /test с помощью API подключения. При доступе к данным с помощью локального API файловой системы формат пути выглядит следующим образом:

/synfs/notebook/{sessionId}/test/{filename}

Если вы хотите получить доступ к данным с помощью API mssparkutils fs, рекомендуется использовать getMountPath(), чтобы получить точный путь:

path = mssparkutils.fs.getMountPath("/test")
  • Список каталогов:

    mssparkutils.fs.ls(f"file://{mssparkutils.fs.getMountPath('/test')}")
    
  • Чтение содержимого файла:

    mssparkutils.fs.head(f"file://{mssparkutils.fs.getMountPath('/test')}/myFile.txt")
    
  • Создание каталога:

    mssparkutils.fs.mkdirs(f"file://{mssparkutils.fs.getMountPath('/test')}/newdir")
    

Доступ к файлам под точкой подключения через локальный путь

Вы можете легко считывать и записывать файлы в точке подключения с помощью стандартной файловой системы. Ниже приведен пример Python:

#File read
with open(mssparkutils.fs.getMountPath('/test2') + "/myFile.txt", "r") as f:
    print(f.read())
#File write
with open(mssparkutils.fs.getMountPath('/test2') + "/myFile.txt", "w") as f:
    print(f.write("dummy data"))

Как проверить существующие точки подключения

Api mssparkutils.fs.mounts() можно использовать для проверки всех существующих сведений о точке подключения:

mssparkutils.fs.mounts()

Отключение точки подключения

Используйте следующий код, чтобы отключить точку подключения (/test в этом примере):

mssparkutils.fs.unmount("/test")

Известные ограничения

  • Текущее подключение — это конфигурация уровня задания; Рекомендуется использовать API подключений, чтобы проверить, существует ли точка подключения или недоступна.

  • Механизм отключения не является автоматическим. Когда приложение завершит работу, чтобы отключить точку подключения и освободить место на диске, необходимо явно вызвать API отключения в коде. В противном случае точка подключения по-прежнему будет существовать в узле после завершения работы приложения.

  • Подключение учетной записи хранения ADLS 1-го поколения не поддерживается.

Служебные программы Lakehouse

mssparkutils.lakehouse предоставляет служебные программы специально для управления артефактами Lakehouse. Эти служебные программы позволяют пользователям создавать, извлекать, обновлять и удалять артефакты Lakehouse легко.

Примечание.

API Lakehouse поддерживаются только в среде выполнения версии 1.2+.

Обзор методов

Ниже приведен обзор доступных методов, предоставляемых mssparkutils.lakehouse:

# Create a new Lakehouse artifact
create(name: String, description: String = "", workspaceId: String = ""): Artifact

# Retrieve a Lakehouse artifact
get(name: String, workspaceId: String = ""): Artifact

# Update an existing Lakehouse artifact
update(name: String, newName: String, description: String = "", workspaceId: String = ""): Artifact

# Delete a Lakehouse artifact
delete(name: String, workspaceId: String = ""): Boolean

# List all Lakehouse artifacts
list(workspaceId: String = ""): Array[Artifact]

Примеры использования

Чтобы эффективно использовать эти методы, рассмотрим следующие примеры использования:

Создание артефакта Lakehouse

artifact = mssparkutils.lakehouse.create("artifact_name", "Description of the artifact", "optional_workspace_id")

Получение артефакта Lakehouse

artifact = mssparkutils.lakehouse.get("artifact_name", "optional_workspace_id")

Обновление артефакта Lakehouse

updated_artifact = mssparkutils.lakehouse.update("old_name", "new_name", "Updated description", "optional_workspace_id")

Удаление артефакта Lakehouse

is_deleted = mssparkutils.lakehouse.delete("artifact_name", "optional_workspace_id")

Перечисление артефактов Lakehouse

artifacts_list = mssparkutils.lakehouse.list("optional_workspace_id")

Дополнительная информация:

Для получения дополнительных сведений о каждом методе и его параметрах используйте функцию mssparkutils.lakehouse.help("methodName") .

С помощью служебных программ MSSparkUtils'Lakehouse управление артефактами Lakehouse становится более эффективным и интегрированным в конвейеры Fabric, повышая общий опыт управления данными.

Вы можете изучить эти служебные программы и включить их в рабочие процессы Fabric для простого управления артефактами Lakehouse.

Служебные программы среды выполнения

Отображение сведений о контексте сеанса

С mssparkutils.runtime.context помощью контекста текущего динамического сеанса можно получить сведения о контексте, включая имя записной книжки, озеро по умолчанию, сведения о рабочей области, если это запуск конвейера и т. д.

mssparkutils.runtime.context

Известная проблема

При использовании версии среды выполнения выше 1.2 и запуска mssparkutils.help()перечисленные API fabricClient, хранилища и рабочей области пока не поддерживаются, будут доступны в дальнейшем.