Введение в Структуру MSSparkUtils
Служебные программы Microsoft Spark (MSSparkUtils) — это встроенный пакет, помогающий легко выполнять распространенные задачи. MSSparkUtils можно использовать для работы с файловыми системами, получения переменных среды, объединения записных книжек и работы с секретами. MSSparkUtils доступны в PySpark (Python) Scala, записных книжках SparkR и конвейерах Microsoft Fabric.
Важно!
Microsoft Fabric в настоящее время находится на этапе предварительной версии. Эти сведения относятся к предварительной версии продукта, который может быть существенно изменен перед выпуском. Корпорация Майкрософт не дает никаких гарантий, явных или подразумеваемых, в отношении предоставленной здесь информации.
Служебные программы файловой системы
Mssparkutils.fs предоставляет служебные программы для работы с различными файловыми системами, включая Azure Data Lake Storage 2-го поколения (ADLS 2-го поколения) и Хранилище BLOB-объектов Azure. Убедитесь, что доступ к Azure Data Lake Storage 2-го поколения и Хранилищу BLOB-объектов Azure настроен правильно.
Чтобы получить общие сведения о доступных методах, выполните следующие команды:
from notebookutils import mssparkutils
mssparkutils.fs.help()
Выходные данные
mssparkutils.fs provides utilities for working with various FileSystems.
Below is overview about the available methods:
cp(from: String, to: String, recurse: Boolean = false): Boolean -> Copies a file or directory, possibly across FileSystems
mv(from: String, to: String, recurse: Boolean = false): Boolean -> Moves a file or directory, possibly across FileSystems
ls(dir: String): Array -> Lists the contents of a directory
mkdirs(dir: String): Boolean -> Creates the given directory if it does not exist, also creating any necessary parent directories
put(file: String, contents: String, overwrite: Boolean = false): Boolean -> Writes the given String out to a file, encoded in UTF-8
head(file: String, maxBytes: int = 1024 * 100): String -> Returns up to the first 'maxBytes' bytes of the given file as a String encoded in UTF-8
append(file: String, content: String, createFileIfNotExists: Boolean): Boolean -> Append the content to a file
rm(dir: String, recurse: Boolean = false): Boolean -> Removes a file or directory
exists(file: String): Boolean -> Check if a file or directory exists
mount(source: String, mountPoint: String, extraConfigs: Map[String, Any]): Boolean -> Mounts the given remote storage directory at the given mount point
unmount(mountPoint: String): Boolean -> Deletes a mount point
mounts(): Array[MountPointInfo] -> Show information about what is mounted
getMountPath(mountPoint: String, scope: String = ""): String -> Gets the local path of the mount point
Use mssparkutils.fs.help("methodName") for more info about a method.
Mssparkutils работает с файловой системой так же, как и API Spark. Пример использования mssparkuitls.fs.mkdirs() и Fabric Lakehouse:
Использование | Относительный путь из корневого каталога HDFS | Абсолютный путь к файловой системе ABFS | Абсолютный путь для локальной файловой системы в узле драйвера |
---|---|---|---|
Nondefault lakehouse | Не поддерживается | mssparkutils.fs.mkdirs("abfss://< container_name>@<storage_account_name.dfs.core.windows.net/<> new_dir>") | mssparkutils.fs.mkdirs("file:/<new_dir>") |
Lakehouse по умолчанию | Каталог в разделе "Файлы" или "Таблицы": mssparkutils.fs.mkdirs("Files/<new_dir>") | mssparkutils.fs.mkdirs("abfss://< container_name>@<storage_account_name.dfs.core.windows.net/<> new_dir>") | mssparkutils.fs.mkdirs("file:/<new_dir>") |
Перечень файлов
Выведите список содержимого каталога, используйте mssparkutils.fs.ls("Путь к каталогу"), например:
mssparkutils.fs.ls("Files/tmp") # works with the default lakehouse files using relative path
mssparkutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<path>") # based on ABFS file system
mssparkutils.fs.ls("file:/tmp") # based on local file system of driver node
Просмотр свойств файла.
Возвращает свойства файла, включая имя файла, путь к файлу, размер файла, а также каталог и файл.
files = mssparkutils.fs.ls('Your directory path')
for file in files:
print(file.name, file.isDir, file.isFile, file.path, file.size)
Создать новый каталог
Создает указанный каталог, если он не существует, и все необходимые родительские каталоги.
mssparkutils.fs.mkdirs('new directory name')
mssparkutils.fs. mkdirs("Files/<new_dir>") # works with the default lakehouse files using relative path
mssparkutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<new_dir>") # based on ABFS file system
mssparkutils.fs.ls("file:/<new_dir>") # based on local file system of driver node
Копировать файл
Копирует файл или каталог. Поддерживает копирование в различных файловых системах.
mssparkutils.fs.cp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively
Предварительный просмотр содержимого файла
Возвращает первые "maxBytes" байтов заданного файла в виде строки в кодировке UTF-8.
mssparkutils.fs.head('file path', maxBytes to read)
Переместить файл
Перемещает файл или каталог. Поддерживает перемещение в рамках различных файловых систем.
mssparkutils.fs.mv('source file or directory', 'destination directory', True) # Set the last parameter as True to firstly create the parent directory if it does not exist
Записать в файл
Записывает заданную строку в файл в кодировке UTF-8. Записывает заданную строку в файл в кодировке UTF-8.
mssparkutils.fs.put("file path", "content to write", True) # Set the last parameter as True to overwrite the file if it existed already
Добавить содержимое в файл
Добавляет заданную строку в файл в кодировке UTF-8.
mssparkutils.fs.append("file path", "content to append", True) # Set the last parameter as True to create the file if it does not exist
Удалить файл или каталог
Перемещает файл или каталог.
mssparkutils.fs.rm('file path', True) # Set the last parameter as True to remove all files and directories recursively
Подключение и отключение каталога
Подробные сведения об использовании см. в разделе Подключение и отключение файлов.
Служебные программы записных книжек
Используйте служебные программы записной книжки MSSparkUtils для запуска записной книжки или выхода из записной книжки со значением. Чтобы получить общие сведения о доступных методах, используйте следующую команду:
mssparkutils.notebook.help()
Выходные данные:
exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map): String -> This method runs a notebook and returns its exit value.
Ссылка на записную книжку
Ссылается записную книжку и возвращает значение выхода. Вызовы вложенных функций можно запускать в записной книжке в интерактивном режиме или в конвейере. Записная книжка, на которую ссылается ссылка, выполняется в пуле Spark, из которого записная книжка вызывает эту функцию.
mssparkutils.notebook.run("notebook name", <timeoutSeconds>, <parameterMap>)
Пример:
mssparkutils.notebook.run("Sample1", 90, {"input": 20 })
Вы можете открыть ссылку snapshot ссылочного выполнения в выходных данных ячейки, snapshot фиксирует результаты выполнения кода и позволяет легко отлаживать выполнение ссылки.
Примечание
В настоящее время записная книжка Fabric поддерживает ссылки только на записные книжки в рабочей области.
Выход из записной книжки
Выход из записной книжки со значением. Вызовы вложенных функций можно запускать в записной книжке в интерактивном режиме или в конвейере.
- При интерактивном вызове функции exit() из записной книжки записная книжка Fabric создает исключение, пропускает выполняемые ячейки подчиненной последовательности и сохраняет сеанс Spark в активном режиме.
- При оркестрации записной книжки в конвейере, который вызывает функцию exit(), действие Notebook возвращается со значением выхода, завершает выполнение конвейера и останавливает сеанс Spark.
- При вызове функции exit() в записной книжке, на которую ссылается ссылка, Fabric Spark остановит дальнейшее выполнение указанной записной книжки и продолжит выполнять следующие ячейки в записной книжке main, которая вызывает функцию run(). Например: Notebook1 содержит три ячейки и вызывает функцию exit() во второй ячейке. Notebook2 содержит пять ячеек и вызывает run(notebook1) в третьей ячейке. При запуске Notebook2 Записная книжка Notebook1 останавливается на второй ячейке при нажатии функции exit(). Notebook2 продолжает выполнять свою четвертую и пятую ячейки.
mssparkutils.notebook.exit("value string")
Пример:
Записная книжка Sample1 со следующими двумя ячейками:
Ячейка 1 определяет входной параметр со значением по умолчанию 10.
Ячейка 2 выходит из записной книжки с входными данными в качестве значения выхода.
Вы можете запустить Sample1 в другой записной книжке со значениями по умолчанию:
exitVal = mssparkutils.notebook.run("Sample1")
print (exitVal)
Выходные данные:
Notebook executed successfully with exit value 10
Вы можете запустить Sample1 в другой записной книжке и указать входное значение 20:
exitVal = mssparkutils.notebook.run("Sample1", 90, {"input": 20 })
print (exitVal)
Выходные данные:
Notebook executed successfully with exit value 20
Управление сеансом
Остановка интерактивного сеанса
Вместо того чтобы вручную нажать кнопку остановки, иногда удобнее остановить интерактивный сеанс, вызвав API в коде. В таких случаях мы предоставляем API mssparkutils.session.stop() для поддержки остановки интерактивного сеанса с помощью кода. Он доступен для Scala и Python.
mssparkutils.session.stop()
mssparkutils.session.stop() API асинхронно останавливает текущий интерактивный сеанс в фоновом режиме, останавливает сеанс Spark и освобождает ресурсы, занятые сеансом, чтобы они были доступны другим сеансам в том же пуле.
Примечание
Мы не рекомендуем вызывать встроенные api языка, такие как sys.exit в Scala или sys.exit() в Python в коде, так как такие API просто завершают процесс интерпретатора, оставляя сеанс Spark активным, а ресурсы не освобождены.
Служебные программы для учетных данных
Вы можете использовать служебные программы учетных данных MSSparkUtils для получения маркеров доступа и управления секретами в Azure Key Vault.
Чтобы получить общие сведения о доступных методах, используйте следующую команду:
mssparkutils.credentials.help()
Выходные данные:
getToken(audience, name): returns AAD token for a given audience, name (optional)
getSecret(akvName, secret): returns AKV secret for a given akvName, secret key
Получение токена
Возвращает Azure AD маркер для заданной аудитории, имя (необязательно). В списке ниже показаны доступные в настоящее время ключи аудитории:
- Ресурс аудитории хранилища: "storage"
- Ресурс Power BI: "pbi"
- Ресурс azure Key Vault: "хранилище ключей"
- Ресурс базы данных KQL для Synapse RTA: "kusto"
Выполните следующую команду, чтобы получить маркер:
mssparkutils.credentials.getToken('audience Key')
Получить секрет с помощью учетных данных пользователя
Возвращает секрет Azure Key Vault для указанного имени Azure Key Vault, имени секрета и имени связанной службы с помощью учетных данных пользователя.
mssparkutils.credentials.getSecret('azure key vault name','secret name')
Подключение и отключение файлов
Microsoft Fabric поддерживает сценарии подключения в пакете служебных программ Microsoft Spark. Api-интерфейсы подключения, отключения, getMountPath() и mounts() можно использовать для подключения удаленного хранилища (Azure Data Lake Storage 2-го поколения) ко всем рабочим узлам (узлам драйвера и рабочим узлам). После того как точка подключения хранилища будет настроена, используйте API локального файла для доступа к данным, как если бы они хранились в локальной файловой системе.
Как подключить учетную запись ADLS 2-го поколения
В этом разделе показано, как подключить Azure Data Lake Storage 2-го поколения пошагово. Подключение Хранилища BLOB-объектов работает аналогично.
В примере предполагается, что у вас есть одна учетная запись Data Lake Storage 2-го поколения с именем storegen2. У учетной записи есть один контейнер с именем mycontainer , который вы хотите подключить к /test в сеансе Spark записной книжки.
Чтобы подключить контейнер с именем mycontainer, mssparkutils сначала необходимо проверка, есть ли у вас разрешение на доступ к контейнеру. В настоящее время Microsoft Fabric поддерживает два метода проверки подлинности для операции подключения триггера: accountKey и sastoken.
Подключение с помощью маркера подписанного URL-адреса или ключа учетной записи
Mssparkutils поддерживает явную передачу ключа учетной записи или маркера подписанного URL-адреса (SAS) в качестве параметра для подключения целевого объекта.
По соображениям безопасности рекомендуется хранить ключи учетной записи или маркеры SAS в Azure Key Vault (как показано в следующем примере на снимке экрана). Затем их можно получить с помощью API mssparkutils.credentials.getSecret . Сведения об использовании azure Key Vault см. в статье Сведения об управляемых ключах учетной записи хранения Azure Key Vault.
Ниже приведен пример кода использования метода accountKey:
from notebookutils import mssparkutils
# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
accountKey = mssparkutils.credentials.getSecret("<vaultURI>", "<secretName>")
mssparkutils.fs.mount(
"abfss://mycontainer@<accountname>.dfs.core.windows.net",
"/test",
{"accountKey":accountKey}
)
Для sastoken см. следующий пример кода:
from notebookutils import mssparkutils
# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
sasToken = mssparkutils.credentials.getSecret("<vaultURI>", "<secretName>")
mssparkutils.fs.mount(
"abfss://mycontainer@<accountname>.dfs.core.windows.net",
"/test",
{"sasToken":sasToken}
)
Примечание
Из соображений безопасности не рекомендуется хранить учетные данные в коде. Чтобы дополнительно защитить ваши учетные данные, мы отредактируем ваш секрет в выходных данных записной книжки. Дополнительные сведения проверка Исправление секрета.
Как смонтировать дом на озере
Ниже приведен пример кода подключения lakehouse к /test.
from notebookutils import mssparkutils
mssparkutils.fs.mount(
"abfss://<workspace_id>@msit-onelake.dfs.fabric.microsoft.com/<lakehouse_id>",
"/test"
)
Доступ к файлам в точке подключения с помощью API fs mssparktuils
Основная цель операции подключения — предоставить клиентам доступ к данным, хранящимся в удаленной учетной записи хранения, с помощью API локальной файловой системы. Вы также можете получить доступ к данным с помощью API mssparkutils fs с подключенным путем в качестве параметра. Используемый здесь формат пути немного отличается.
Предположим, что вы подключили Data Lake Storage 2-го поколения контейнер mycontainer к /test с помощью API подключения. Когда вы получаете доступ к данным с помощью API локальной файловой системы, формат пути выглядит следующим образом:
/synfs/notebook/{sessionId}/test/{filename}
Если вы хотите получить доступ к данным с помощью API mssparkutils fs, рекомендуется использовать getMountPath(), чтобы получить точный путь:
path = mssparkutils.fs.getMountPath("/test")
Список каталогов:
mssparkutils.fs.ls(f"file://{mssparkutils.fs.getMountPath('/test')}")
Чтение содержимого файла:
mssparkutils.fs.head(f"file://{mssparkutils.fs.getMountPath('/test')}/myFile.txt")
Создание каталога:
mssparkutils.fs.mkdirs(f"file://{mssparkutils.fs.getMountPath('/test')}/newdir")
Доступ к файлам в точке подключения по локальному пути
Вы можете легко читать и записывать файлы в точке подключения, используя стандартный способ файловой системы, используйте в качестве примера Python:
#File read
with open(mssparkutils.fs.getMountPath('/test2') + "/myFile.txt", "r") as f:
print(f.read())
#File write
with open(mssparkutils.fs.getMountPath('/test2') + "/myFile.txt", "w") as f:
print(f.write("dummy data"))
Как проверка существующих точек подключения
Api mssparkutils.fs.mounts() можно использовать для проверка всех существующих сведений о точках подключения:
mssparkutils.fs.mounts()
Отключение точки подключения
Используйте следующий код, чтобы отключить точку подключения (/test в этом примере):
mssparkutils.fs.unmount("/test")
Известные ограничения
- Текущее подключение является конфигурацией уровня задания. Мы повторно рекомендуем использовать API подключений, чтобы проверка, существует ли точка подключения или недоступна.
- Механизм отключения не является автоматическим. По завершении выполнения приложения, чтобы отключить точку подключения и освободить место на диске, вам нужно явно вызвать API отключения в своем коде. В противном случае точка подключения по-прежнему будет существовать в узле после завершения работы приложения.
- Подключение учетной записи хранения ADLS 1-го поколения не поддерживается.