Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Заметка
В этой статье рассматривается Databricks Connect для Databricks Runtime 13.3 и более поздних версий.
Databricks Connect для Python поддерживает определяемые пользователем функции (UDF). При выполнении операции DataFrame, включающей определяемые пользователем функции (UDF), они сериализуются Databricks Connect и отправляются на сервер в рамках запроса.
Сведения об определяемых пользователем функциях в Databricks Connect для Scala см. раздел "Определяемые пользователем функции" в Databricks Connect для Scala.
Заметка
Так как определяемая пользователем функция сериализуется и десериализирована, версия Python клиента должна соответствовать версии Python в вычислительной среде Azure Databricks. Сведения о поддерживаемых версиях см. в матрице поддержки версий.
Определение UDF
Чтобы создать UDF в Databricks Connect для Python, используйте одну из следующих поддерживаемых функций:
- Определяемые пользователем функции PySpark
- Функции потоковой передачи PySpark
Например, следующий код на Python создаёт простую пользовательскую функцию (UDF), которая возводит значения в столбце в квадрат.
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from databricks.connect import DatabricksSession
@udf(returnType=IntegerType())
def double(x):
return x * x
spark = DatabricksSession.builder.getOrCreate()
df = spark.range(1, 2)
df = df.withColumn("doubled", double(col("id")))
df.show()
Управление зависимостями UDF
Это важно
Эта функция доступна в общедоступной предварительной версии и требуется Databricks Connect для Python 16.4 или более поздней версии, а кластер под управлением Databricks Runtime 16.4 или более поздней версии. Чтобы использовать эту функцию, включите предварительный просмотр расширенных пользовательских функций Python в каталоге Unity в рабочей области.
Databricks Connect поддерживает указание зависимостей Python, необходимых для определяемых пользователем функций. Эти зависимости устанавливаются на вычислительных ресурсах Databricks в рамках среды Python UDF.
Эта функция позволяет пользователям указывать зависимости, необходимые UDF в дополнение к пакетам, предоставленным в базовой среде. Его также можно использовать для установки другой версии пакета, отличной от того, что предоставляется в базовой среде.
Зависимости можно установить из следующих источников:
- Пакеты PyPI
- Пакеты PyPI можно указать в соответствии с PEP 508, например,
dice,pyjokes<1илиsimplejson==3.19.*.
- Пакеты PyPI можно указать в соответствии с PEP 508, например,
- Пакеты, хранящиеся в томах каталога Unity
- Поддерживаются как собранные дистрибутивы (
.whl), так и исходные дистрибутивы (.tar.gz). - Пакеты томов каталога Unity можно указать как
dbfs:<path>, например,dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep-3.20.2-py3-none-any.whlилиdbfs:/Volumes/users/someone@example.com/tars/my_private_dep-4.0.0.tar.gz. - Пользователю необходимо предоставить
READ_FILEразрешение на файл в разделе re:[UC]. Предоставление этого разрешения всем пользователям учетной записи автоматически активирует его для новых пользователей.
- Поддерживаются как собранные дистрибутивы (
- Локальные пакеты, папки и файлы Python
- Локальные встроенные дистрибутивы (), исходные дистрибутивы (
.whl), папки и файлы Python можно указать как.tar.gz, напримерlocal:<path>, ,local:/path/to/my_private_dep-3.20.2-py3-none-any.whl,local:/path/to/my_private_dep-4.0.0.tar.gzlocal:/path/to/my_folder.local:/path/to/my_file.py - Поддерживаются абсолютные и относительные пути, например:
local:/path/to/my_file.pyилиlocal:./path/to/my_file.py.
- Локальные встроенные дистрибутивы (), исходные дистрибутивы (
Чтобы включить пользовательские зависимости в UDF, укажите их в среде с использованием withDependencies, а затем используйте эту среду для создания Spark-сеанса. Зависимости устанавливаются на вычислительных кластерах Databricks и будут доступны во всех определяемых пользователем функциях, использующих этот сеанс Spark.
Следующий код объявляет пакет dice PyPI как зависимость:
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dice==3.1.0")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
Или, чтобы указать зависимость колеса в томе:
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep-3.20.2-py3-none-any.whl")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
Поведение в тетрадях и заданиях Databricks
В записных книжках и заданиях необходимо установить зависимости UDF непосредственно в REPL. Databricks Connect проверяет среду REPL Python, убедившись, что все указанные зависимости уже установлены и вызывают исключение, если они не установлены. Проверка окружения записной книжки выполняется для зависимостей томов PyPI и Unity Catalog, но не для локальных зависимостей.
Ограничения
- Для поддержки зависимостей UDF в
pyspark.sql.streaming.DataStreamWriter.foreachтребуется Databricks Connect для Python версии не ниже 18.0, а кластер, использующий Databricks Runtime версии не ниже 18.0. - Для поддержки зависимостей UDF для
pyspark.sql.streaming.DataStreamWriter.foreachBatchнеобходимо использование Databricks Connect версии 18.0 или выше для Python, а также кластер, работающий на Databricks Runtime версии 18.0 или выше. Эта функция не поддерживается на бессерверном сервере. - Для поддержки зависимостей UDF для локальных пакетов, папок и файлов Python требуется Databricks Connect для Python 18.1 или более поздней версии, а кластер под управлением Databricks Runtime 18.1 или более поздней версии.
- Зависимости UDF не поддерживаются для пользовательских функций агрегирования pandas через функции окна.
- Пакеты томов каталога Unity и локальные пакеты необходимо упаковывать в соответствии со стандартными спецификациями упаковки Python из PEP-427 или более поздней версии для встроенных дистрибутивов колес и PEP-241 или более поздней версии для дистрибутивов источников tar. Дополнительные сведения о стандартах упаковки Python см. в документации по PyPA.
Примеры
Следующий пример определяет зависимости PyPI и зависимости томов в среде, создает сеанс в этой среде, а затем определяет и вызывает функции, определяемые пользователем, которые используют эти зависимости.
from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import udf, col, pandas_udf
from pyspark.sql.types import IntegerType, LongType, StringType
import pandas as pd
pypi_deps = ["pyjokes>=0.8,<1"]
volumes_deps = [
# Example library from: https://pypi.org/project/dice/#files
"dbfs:/Volumes/main/someone@example.com/test/dice-4.0.0.tar.gz",
]
local_deps = [
# Example library from: https://pypi.org/project/simplejson/#files
"local:./test/simplejson-3.20.2-py3-none-any.whl",
]
env = DatabricksEnv().withDependencies(pypi_deps).withDependencies(volumes_deps).withDependencies(local_deps)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
# UDFs
@udf(returnType=StringType())
def get_joke():
from pyjokes import get_joke
return get_joke()
@udf(returnType=IntegerType())
def double_and_json_parse(x):
import simplejson
return simplejson.loads(simplejson.dumps(x * 2))
@pandas_udf(returnType=LongType())
def multiply_and_add_roll(a: pd.Series, b: pd.Series) -> pd.Series:
import dice
return a * b + dice.roll(f"1d10")[0]
df = spark.range(1, 10)
df = df.withColumns({
"joke": get_joke(),
"doubled": double_and_json_parse(col("id")),
"mutliplied_with_roll": multiply_and_add_roll(col("id"), col("doubled"))
})
df.show()
Автоматическое управление зависимостями UDF
Это важно
Эта функция доступна в общедоступной предварительной версии и требуется Databricks Connect для Python 18.1 или более поздней версии, Python 3.12 на локальном компьютере и кластер под управлением Databricks Runtime 18.1 или более поздней версии. Чтобы использовать эту функцию, включите предварительный просмотр расширенных пользовательских функций Python в каталоге Unity в рабочей области.
API Databricks Connect withAutoDependencies() включает автоматическое обнаружение и отправку локальных модулей и общедоступных зависимостей PyPI, используемых в выражениях import в ваших UDF. Он удаляет необходимость вручную указывать зависимости.
Следующий код включает автоматическое управление зависимостями:
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withAutoDependencies(upload_local=True, use_index=True)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
Метод withAutoDependencies() принимает следующие параметры:
-
upload_local: если задано значениеTrue, локальные модули, импортированные в ваши пользовательские определяемые функции (UDF), автоматически обнаруживаются, упаковываются и передаются в песочницу UDF. -
use_index: если задано значениеTrue, общедоступные зависимости PyPI, используемые в ваших UDF, автоматически обнаруживаются и устанавливаются в вычислительных ресурсах Azure Databricks. Процесс обнаружения использует установленные пакеты на локальном компьютере для определения версий, обеспечения согласованности между локальной средой и удаленной средой выполнения.
Ограничения
- Динамические импорты (например,
importlib.import_module("foo")не поддерживаются). - Пакеты пространства имен (например,
azure.eventhubиgoogle.cloud.aiplatform) не поддерживаются. - Зависимости, установленные через прямые URL-ссылки, не поддерживаются. К ним относятся установленные из локальных файлов wheel.
- Зависимости, установленные из индексов частных пакетов, не поддерживаются. Пакеты, установленные таким образом, не могут отличаться от пакетов, установленных от общедоступного PyPI.
- Обнаружение зависимостей не работает в оболочке Python. Поддерживаются только скрипты Python, оболочка IPython и Jupyter Notebook.
Примеры
В следующем примере показано автоматическое управление зависимостями как с локальными модулями, так и с пакетами PyPI. В этом примере требуется, чтобы вы установили simplejson и dice (используете pip install simplejson dice).
Сначала создайте локальные вспомогательные модули:
# my_helper.py
def double(x):
return 2 * x
# my_json.py
import simplejson
def loads(x):
return simplejson.loads(x)
def dumps(x):
return simplejson.dumps(x)
Затем в основном скрипте импортируйте эти модули и используйте их в определяемых пользователем файлах:
# main.py
import dice as dc
from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType, FloatType
import my_json
from my_helper import double
env = DatabricksEnv().withAutoDependencies(upload_local=True, use_index=True)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
@udf(returnType=IntegerType())
def double_and_json_parse(x):
return my_json.loads(my_json.dumps(double(x)))
@udf(returnType=FloatType())
def sum_and_add_noise(x, y):
return x + y + (dc.roll("d6")[0] / 6)
df = spark.range(1, 10)
df = df.withColumns({
"doubled": double_and_json_parse(col("id")),
"summed_with_noise": sum_and_add_noise(col("id"), col("doubled")),
})
df.show()
Logging
Чтобы вывести обнаруженные зависимости, задайте переменной среды значение SPARK_CONNECT_LOG_LEVEL, info или debug. Кроме того, настройте платформу ведения журнала Python:
import logging
logging.basicConfig(level=logging.INFO)
Соответствующие журналы создаются databricks.connect.auto_dependencies модулем, например:
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered local module: my_json
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered local module: my_helper
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered distribution: simplejson for module simplejson
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered distribution: dice for module dice
INFO:databricks.connect.auto_dependencies.hook:Synced zip artifact for: my_json
INFO:databricks.connect.auto_dependencies.hook:Synced zip artifact for: my_helper
INFO:databricks.connect.auto_dependencies.hook:Updated simplejson with auto-detected version ==3.20.2
INFO:databricks.connect.auto_dependencies.hook:Updated dice with auto-detected version ==4.0.0
Базовая среда Python
Функции пользователя выполняются на вычислительных мощностях Databricks, а не на клиенте. Базовая среда Python, в которой выполняются UDF, зависит от вычислительных ресурсов Databricks.
Для кластеров базовая среда Python — это среда Python версии среды выполнения Databricks, запущенная в кластере. Версия Python и список пакетов в этой базовой среде находятся в разделе "Системная среда " и " Установленные библиотеки Python " заметок о выпуске Databricks Runtime.
Для бессерверных вычислений базовая среда Python соответствует версии бессерверной среды в соответствии со следующей таблицей. Версии Databricks Connect, не перечисленные в этой таблице, пока не поддерживают бессерверные или достигли конца поддержки. См. матрицу поддержки версий и конечные версии Databricks Connect.
| Версия Databricks Connect | Бессерверная среда UDF |
|---|---|
| 18.0, Python 3.12 | Версия 5 |
| 17.2–17.3, Python 3.12 | Версия 4 |
| 16.4.1 до ниже 17, Python 3.12 | Версия 3 |
| От 15.4.10 до ниже 16, Python 3.12 | Версия 3 |
| 15.4.10 до версии ниже 16, Python 3.11 | Версия 2 |