Введение в Структуру MSSparkUtils

Служебные программы Microsoft Spark (MSSparkUtils) — это встроенный пакет, помогающий легко выполнять распространенные задачи. MSSparkUtils можно использовать для работы с файловыми системами, получения переменных среды, объединения записных книжек и работы с секретами. MSSparkUtils доступны в PySpark (Python) Scala, записных книжках SparkR и конвейерах Microsoft Fabric.

Важно!

Microsoft Fabric в настоящее время находится на этапе предварительной версии. Эти сведения относятся к предварительной версии продукта, который может быть существенно изменен перед выпуском. Корпорация Майкрософт не дает никаких гарантий, явных или подразумеваемых, в отношении предоставленной здесь информации.

Служебные программы файловой системы

Mssparkutils.fs предоставляет служебные программы для работы с различными файловыми системами, включая Azure Data Lake Storage 2-го поколения (ADLS 2-го поколения) и Хранилище BLOB-объектов Azure. Убедитесь, что доступ к Azure Data Lake Storage 2-го поколения и Хранилищу BLOB-объектов Azure настроен правильно.

Чтобы получить общие сведения о доступных методах, выполните следующие команды:

from notebookutils import mssparkutils
mssparkutils.fs.help()

Выходные данные

mssparkutils.fs provides utilities for working with various FileSystems.

Below is overview about the available methods:

cp(from: String, to: String, recurse: Boolean = false): Boolean -> Copies a file or directory, possibly across FileSystems
mv(from: String, to: String, recurse: Boolean = false): Boolean -> Moves a file or directory, possibly across FileSystems
ls(dir: String): Array -> Lists the contents of a directory
mkdirs(dir: String): Boolean -> Creates the given directory if it does not exist, also creating any necessary parent directories
put(file: String, contents: String, overwrite: Boolean = false): Boolean -> Writes the given String out to a file, encoded in UTF-8
head(file: String, maxBytes: int = 1024 * 100): String -> Returns up to the first 'maxBytes' bytes of the given file as a String encoded in UTF-8
append(file: String, content: String, createFileIfNotExists: Boolean): Boolean -> Append the content to a file
rm(dir: String, recurse: Boolean = false): Boolean -> Removes a file or directory
exists(file: String): Boolean -> Check if a file or directory exists
mount(source: String, mountPoint: String, extraConfigs: Map[String, Any]): Boolean -> Mounts the given remote storage directory at the given mount point
unmount(mountPoint: String): Boolean -> Deletes a mount point
mounts(): Array[MountPointInfo] -> Show information about what is mounted
getMountPath(mountPoint: String, scope: String = ""): String -> Gets the local path of the mount point

Use mssparkutils.fs.help("methodName") for more info about a method.

Mssparkutils работает с файловой системой так же, как и API Spark. Пример использования mssparkuitls.fs.mkdirs() и Fabric Lakehouse:

Использование Относительный путь из корневого каталога HDFS Абсолютный путь к файловой системе ABFS Абсолютный путь для локальной файловой системы в узле драйвера
Nondefault lakehouse Не поддерживается mssparkutils.fs.mkdirs("abfss://< container_name>@<storage_account_name.dfs.core.windows.net/<> new_dir>") mssparkutils.fs.mkdirs("file:/<new_dir>")
Lakehouse по умолчанию Каталог в разделе "Файлы" или "Таблицы": mssparkutils.fs.mkdirs("Files/<new_dir>") mssparkutils.fs.mkdirs("abfss://< container_name>@<storage_account_name.dfs.core.windows.net/<> new_dir>") mssparkutils.fs.mkdirs("file:/<new_dir>")

Перечень файлов

Выведите список содержимого каталога, используйте mssparkutils.fs.ls("Путь к каталогу"), например:

mssparkutils.fs.ls("Files/tmp") # works with the default lakehouse files using relative path 
mssparkutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<path>")  # based on ABFS file system 
mssparkutils.fs.ls("file:/tmp")  # based on local file system of driver node 

Просмотр свойств файла.

Возвращает свойства файла, включая имя файла, путь к файлу, размер файла, а также каталог и файл.

files = mssparkutils.fs.ls('Your directory path')
for file in files:
    print(file.name, file.isDir, file.isFile, file.path, file.size)

Создать новый каталог

Создает указанный каталог, если он не существует, и все необходимые родительские каталоги.

mssparkutils.fs.mkdirs('new directory name')  
mssparkutils.fs. mkdirs("Files/<new_dir>")  # works with the default lakehouse files using relative path 
mssparkutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<new_dir>")  # based on ABFS file system 
mssparkutils.fs.ls("file:/<new_dir>")  # based on local file system of driver node 

Копировать файл

Копирует файл или каталог. Поддерживает копирование в различных файловых системах.

mssparkutils.fs.cp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively

Предварительный просмотр содержимого файла

Возвращает первые "maxBytes" байтов заданного файла в виде строки в кодировке UTF-8.

mssparkutils.fs.head('file path', maxBytes to read)

Переместить файл

Перемещает файл или каталог. Поддерживает перемещение в рамках различных файловых систем.

mssparkutils.fs.mv('source file or directory', 'destination directory', True) # Set the last parameter as True to firstly create the parent directory if it does not exist

Записать в файл

Записывает заданную строку в файл в кодировке UTF-8. Записывает заданную строку в файл в кодировке UTF-8.

mssparkutils.fs.put("file path", "content to write", True) # Set the last parameter as True to overwrite the file if it existed already

Добавить содержимое в файл

Добавляет заданную строку в файл в кодировке UTF-8.

mssparkutils.fs.append("file path", "content to append", True) # Set the last parameter as True to create the file if it does not exist

Удалить файл или каталог

Перемещает файл или каталог.

mssparkutils.fs.rm('file path', True) # Set the last parameter as True to remove all files and directories recursively

Подключение и отключение каталога

Подробные сведения об использовании см. в разделе Подключение и отключение файлов.

Служебные программы записных книжек

Используйте служебные программы записной книжки MSSparkUtils для запуска записной книжки или выхода из записной книжки со значением. Чтобы получить общие сведения о доступных методах, используйте следующую команду:

mssparkutils.notebook.help()

Выходные данные:


exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map): String -> This method runs a notebook and returns its exit value.

Ссылка на записную книжку

Ссылается записную книжку и возвращает значение выхода. Вызовы вложенных функций можно запускать в записной книжке в интерактивном режиме или в конвейере. Записная книжка, на которую ссылается ссылка, выполняется в пуле Spark, из которого записная книжка вызывает эту функцию.

mssparkutils.notebook.run("notebook name", <timeoutSeconds>, <parameterMap>)

Пример:

mssparkutils.notebook.run("Sample1", 90, {"input": 20 })

Вы можете открыть ссылку snapshot ссылочного выполнения в выходных данных ячейки, snapshot фиксирует результаты выполнения кода и позволяет легко отлаживать выполнение ссылки.

Снимок экрана: ссылочный результат выполнения.

Снимок экрана: пример snapshot.

Примечание

В настоящее время записная книжка Fabric поддерживает ссылки только на записные книжки в рабочей области.

Выход из записной книжки

Выход из записной книжки со значением. Вызовы вложенных функций можно запускать в записной книжке в интерактивном режиме или в конвейере.

  • При интерактивном вызове функции exit() из записной книжки записная книжка Fabric создает исключение, пропускает выполняемые ячейки подчиненной последовательности и сохраняет сеанс Spark в активном режиме.
  • При оркестрации записной книжки в конвейере, который вызывает функцию exit(), действие Notebook возвращается со значением выхода, завершает выполнение конвейера и останавливает сеанс Spark.
  • При вызове функции exit() в записной книжке, на которую ссылается ссылка, Fabric Spark остановит дальнейшее выполнение указанной записной книжки и продолжит выполнять следующие ячейки в записной книжке main, которая вызывает функцию run(). Например: Notebook1 содержит три ячейки и вызывает функцию exit() во второй ячейке. Notebook2 содержит пять ячеек и вызывает run(notebook1) в третьей ячейке. При запуске Notebook2 Записная книжка Notebook1 останавливается на второй ячейке при нажатии функции exit(). Notebook2 продолжает выполнять свою четвертую и пятую ячейки.
mssparkutils.notebook.exit("value string")

Пример:

Записная книжка Sample1 со следующими двумя ячейками:

  • Ячейка 1 определяет входной параметр со значением по умолчанию 10.

  • Ячейка 2 выходит из записной книжки с входными данными в качестве значения выхода.

Снимок экрана: пример записной книжки функции выхода.

Вы можете запустить Sample1 в другой записной книжке со значениями по умолчанию:

exitVal = mssparkutils.notebook.run("Sample1")
print (exitVal)

Выходные данные:

Notebook executed successfully with exit value 10

Вы можете запустить Sample1 в другой записной книжке и указать входное значение 20:

exitVal = mssparkutils.notebook.run("Sample1", 90, {"input": 20 })
print (exitVal)

Выходные данные:

Notebook executed successfully with exit value 20

Управление сеансом

Остановка интерактивного сеанса

Вместо того чтобы вручную нажать кнопку остановки, иногда удобнее остановить интерактивный сеанс, вызвав API в коде. В таких случаях мы предоставляем API mssparkutils.session.stop() для поддержки остановки интерактивного сеанса с помощью кода. Он доступен для Scala и Python.

mssparkutils.session.stop()

mssparkutils.session.stop() API асинхронно останавливает текущий интерактивный сеанс в фоновом режиме, останавливает сеанс Spark и освобождает ресурсы, занятые сеансом, чтобы они были доступны другим сеансам в том же пуле.

Примечание

Мы не рекомендуем вызывать встроенные api языка, такие как sys.exit в Scala или sys.exit() в Python в коде, так как такие API просто завершают процесс интерпретатора, оставляя сеанс Spark активным, а ресурсы не освобождены.

Служебные программы для учетных данных

Вы можете использовать служебные программы учетных данных MSSparkUtils для получения маркеров доступа и управления секретами в Azure Key Vault.

Чтобы получить общие сведения о доступных методах, используйте следующую команду:

mssparkutils.credentials.help()

Выходные данные:

getToken(audience, name): returns AAD token for a given audience, name (optional)
getSecret(akvName, secret): returns AKV secret for a given akvName, secret key

Получение токена

Возвращает Azure AD маркер для заданной аудитории, имя (необязательно). В списке ниже показаны доступные в настоящее время ключи аудитории:

  • Ресурс аудитории хранилища: "storage"
  • Ресурс Power BI: "pbi"
  • Ресурс azure Key Vault: "хранилище ключей"
  • Ресурс базы данных KQL для Synapse RTA: "kusto"

Выполните следующую команду, чтобы получить маркер:

mssparkutils.credentials.getToken('audience Key')

Получить секрет с помощью учетных данных пользователя

Возвращает секрет Azure Key Vault для указанного имени Azure Key Vault, имени секрета и имени связанной службы с помощью учетных данных пользователя.

mssparkutils.credentials.getSecret('azure key vault name','secret name')

Подключение и отключение файлов

Microsoft Fabric поддерживает сценарии подключения в пакете служебных программ Microsoft Spark. Api-интерфейсы подключения, отключения, getMountPath() и mounts() можно использовать для подключения удаленного хранилища (Azure Data Lake Storage 2-го поколения) ко всем рабочим узлам (узлам драйвера и рабочим узлам). После того как точка подключения хранилища будет настроена, используйте API локального файла для доступа к данным, как если бы они хранились в локальной файловой системе.

Как подключить учетную запись ADLS 2-го поколения

В этом разделе показано, как подключить Azure Data Lake Storage 2-го поколения пошагово. Подключение Хранилища BLOB-объектов работает аналогично.

В примере предполагается, что у вас есть одна учетная запись Data Lake Storage 2-го поколения с именем storegen2. У учетной записи есть один контейнер с именем mycontainer , который вы хотите подключить к /test в сеансе Spark записной книжки.

Снимок экрана: выбор контейнера для подключения.

Чтобы подключить контейнер с именем mycontainer, mssparkutils сначала необходимо проверка, есть ли у вас разрешение на доступ к контейнеру. В настоящее время Microsoft Fabric поддерживает два метода проверки подлинности для операции подключения триггера: accountKey и sastoken.

Подключение с помощью маркера подписанного URL-адреса или ключа учетной записи

Mssparkutils поддерживает явную передачу ключа учетной записи или маркера подписанного URL-адреса (SAS) в качестве параметра для подключения целевого объекта.

По соображениям безопасности рекомендуется хранить ключи учетной записи или маркеры SAS в Azure Key Vault (как показано в следующем примере на снимке экрана). Затем их можно получить с помощью API mssparkutils.credentials.getSecret . Сведения об использовании azure Key Vault см. в статье Сведения об управляемых ключах учетной записи хранения Azure Key Vault.

Снимок экрана, показывающий, где хранятся секреты в Key Vault Azure.

Ниже приведен пример кода использования метода accountKey:

from notebookutils import mssparkutils  
# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
accountKey = mssparkutils.credentials.getSecret("<vaultURI>", "<secretName>")
mssparkutils.fs.mount(  
    "abfss://mycontainer@<accountname>.dfs.core.windows.net",  
    "/test",  
    {"accountKey":accountKey}
)

Для sastoken см. следующий пример кода:

from notebookutils import mssparkutils  
# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
sasToken = mssparkutils.credentials.getSecret("<vaultURI>", "<secretName>")
mssparkutils.fs.mount(  
    "abfss://mycontainer@<accountname>.dfs.core.windows.net",  
    "/test",  
    {"sasToken":sasToken}
)

Примечание

Из соображений безопасности не рекомендуется хранить учетные данные в коде. Чтобы дополнительно защитить ваши учетные данные, мы отредактируем ваш секрет в выходных данных записной книжки. Дополнительные сведения проверка Исправление секрета.

Как смонтировать дом на озере

Ниже приведен пример кода подключения lakehouse к /test.

from notebookutils import mssparkutils 
mssparkutils.fs.mount( 
 "abfss://<workspace_id>@msit-onelake.dfs.fabric.microsoft.com/<lakehouse_id>", 
 "/test"
)

Доступ к файлам в точке подключения с помощью API fs mssparktuils

Основная цель операции подключения — предоставить клиентам доступ к данным, хранящимся в удаленной учетной записи хранения, с помощью API локальной файловой системы. Вы также можете получить доступ к данным с помощью API mssparkutils fs с подключенным путем в качестве параметра. Используемый здесь формат пути немного отличается.

Предположим, что вы подключили Data Lake Storage 2-го поколения контейнер mycontainer к /test с помощью API подключения. Когда вы получаете доступ к данным с помощью API локальной файловой системы, формат пути выглядит следующим образом:

/synfs/notebook/{sessionId}/test/{filename}

Если вы хотите получить доступ к данным с помощью API mssparkutils fs, рекомендуется использовать getMountPath(), чтобы получить точный путь:

path = mssparkutils.fs.getMountPath("/test")
  • Список каталогов:

    mssparkutils.fs.ls(f"file://{mssparkutils.fs.getMountPath('/test')}")
    
  • Чтение содержимого файла:

    mssparkutils.fs.head(f"file://{mssparkutils.fs.getMountPath('/test')}/myFile.txt")
    
  • Создание каталога:

    mssparkutils.fs.mkdirs(f"file://{mssparkutils.fs.getMountPath('/test')}/newdir")
    

Доступ к файлам в точке подключения по локальному пути

Вы можете легко читать и записывать файлы в точке подключения, используя стандартный способ файловой системы, используйте в качестве примера Python:

#File read
with open(mssparkutils.fs.getMountPath('/test2') + "/myFile.txt", "r") as f:
    print(f.read())
#File write
with open(mssparkutils.fs.getMountPath('/test2') + "/myFile.txt", "w") as f:
    print(f.write("dummy data"))

Как проверка существующих точек подключения

Api mssparkutils.fs.mounts() можно использовать для проверка всех существующих сведений о точках подключения:

mssparkutils.fs.mounts()

Отключение точки подключения

Используйте следующий код, чтобы отключить точку подключения (/test в этом примере):

mssparkutils.fs.unmount("/test")

Известные ограничения

  • Текущее подключение является конфигурацией уровня задания. Мы повторно рекомендуем использовать API подключений, чтобы проверка, существует ли точка подключения или недоступна.
  • Механизм отключения не является автоматическим. По завершении выполнения приложения, чтобы отключить точку подключения и освободить место на диске, вам нужно явно вызвать API отключения в своем коде. В противном случае точка подключения по-прежнему будет существовать в узле после завершения работы приложения.
  • Подключение учетной записи хранения ADLS 1-го поколения не поддерживается.

Дальнейшие действия