Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Заметка
В этой статье рассматривается Databricks Connect для Databricks Runtime 13.3 и более поздних версий.
Databricks Connect для Python поддерживает определяемые пользователем функции (UDF). При выполнении операции DataFrame, включающей определяемые пользователем функции (UDF), они сериализуются Databricks Connect и отправляются на сервер в рамках запроса.
Сведения об определяемых пользователем функциях в Databricks Connect для Scala см. раздел "Определяемые пользователем функции" в Databricks Connect для Scala.
Заметка
Так как определяемая пользователем функция сериализуется и десериализирована, версия Python клиента должна соответствовать версии Python в вычислительной среде Azure Databricks. Сведения о поддерживаемых версиях см. в матрице поддержки версий.
Определение UDF
Чтобы создать UDF в Databricks Connect для Python, используйте одну из следующих поддерживаемых функций:
- Определяемые пользователем функции PySpark
- Функции потоковой передачи PySpark
Например, следующий код на Python создаёт простую пользовательскую функцию (UDF), которая возводит значения в столбце в квадрат.
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from databricks.connect import DatabricksSession
@udf(returnType=IntegerType())
def double(x):
return x * x
spark = DatabricksSession.builder.getOrCreate()
df = spark.range(1, 2)
df = df.withColumn("doubled", double(col("id")))
df.show()
Определяемые пользователем функции с зависимостями
Это важно
Эта функция доступна в общедоступной предварительной версии и требуется Databricks Connect для Python 16.4 или более поздней версии, а кластер под управлением Databricks Runtime 16.4 или более поздней версии. Чтобы использовать эту функцию, включите предварительный просмотр расширенных пользовательских функций Python в каталоге Unity в рабочей области.
Databricks Connect поддерживает указание зависимостей Python, необходимых для определяемых пользователем функций. Эти зависимости устанавливаются на вычислительных ресурсах Databricks в рамках среды Python UDF.
Эта функция позволяет пользователям указывать зависимости, необходимые UDF в дополнение к пакетам, предоставленным в базовой среде. Его также можно использовать для установки другой версии пакета, отличной от того, что предоставляется в базовой среде.
Зависимости можно установить из следующих источников:
- Пакеты PyPI
- Пакеты PyPI можно указать в соответствии с PEP 508, например,
dice,pyjokes<1илиsimplejson==3.19.*.
- Пакеты PyPI можно указать в соответствии с PEP 508, например,
- Файлы, хранящиеся в томах каталога Unity
- Поддерживаются wheel-пакеты (
.whl) и gzipped tar-файлы (.tar.gz). Пользователю необходимо предоставитьREAD_FILEразрешение на файл в разделе re:[UC]. - При установке пакетов из томов каталога Unity для вызова определяемых пользователем функций пользователям необходимо
READ VOLUMEразрешение на исходный том. Предоставление этого разрешения всем пользователям учетной записи автоматически активирует его для новых пользователей. - Файлы томов каталога Unity следует указать как
dbfs:<path>, например,dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep.whlилиdbfs:/Volumes/users/someone@example.com/tars/my_private_deps.tar.gz.
- Поддерживаются wheel-пакеты (
Чтобы включить пользовательские зависимости в UDF, укажите их в среде с использованием withDependencies, а затем используйте эту среду для создания Spark-сеанса. Зависимости устанавливаются на вычислительных кластерах Databricks и будут доступны во всех определяемых пользователем функциях, использующих этот сеанс Spark.
Следующий код объявляет пакет dice PyPI как зависимость:
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dice==3.1.0")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
Или, чтобы указать зависимость колеса в томе:
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep.whl")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
Поведение в тетрадях и заданиях Databricks
В записных книжках и заданиях необходимо установить зависимости UDF непосредственно в REPL. Databricks Connect проверяет среду REPL Python, убедившись, что все указанные зависимости уже установлены и вызывают исключение, если они не установлены.
Проверка среды ноутбука выполняется как для зависимостей томов PyPI, так и для зависимостей томов каталога Unity. Зависимости томов необходимо упаковать в соответствии со стандартами упаковки Python из PEP-427 или более поздними для wheel-файлов и PEP-241 или более поздними для исходных файлов распространения. Дополнительные сведения о стандартах упаковки Python см. в документации по PyPA.
Ограничения
- Файлы, такие как колесо Python или исходное распределение на локальном компьютере разработки, нельзя указать непосредственно в качестве зависимости. Сначала их необходимо загрузить в тома Unity Catalog.
- Поддержка зависимостей UDF для
pyspark.sql.streaming.DataStreamWriter.foreachиpyspark.sql.streaming.DataStreamWriter.foreachBatchтребует Databricks Connect для Python версии 18.0 или выше, а также кластер с Databricks Runtime версии 18.0 или выше. - Зависимости UDF не поддерживаются для пользовательских функций агрегирования pandas через функции окна.
Примеры
Следующий пример определяет зависимости PyPI и зависимости томов в среде, создает сеанс в этой среде, а затем определяет и вызывает функции, определяемые пользователем, которые используют эти зависимости.
from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import udf, col, pandas_udf
from pyspark.sql.types import IntegerType, LongType, StringType
import pandas as pd
pypi_deps = ["pyjokes>=0.8,<1"]
volumes_deps = [
# Example library from: https://pypi.org/project/dice/#files
"dbfs:/Volumes/main/someone@example.com/test/dice-4.0.0-py3-none-any.whl"
# Example library from: https://pypi.org/project/simplejson/#files
"dbfs:/Volumes/main/someone@example.com/test/simplejson-3.19.3.tar.gz",
]
env = DatabricksEnv().withDependencies(pypi_deps).withDependencies(volumes_deps)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
# UDFs
@udf(returnType=StringType())
def get_joke():
from pyjokes import get_joke
return get_joke()
@udf(returnType=IntegerType())
def double_and_json_parse(x):
import simplejson
return simplejson.loads(simplejson.dumps(x * 2))
@pandas_udf(returnType=LongType())
def multiply_and_add_roll(a: pd.Series, b: pd.Series) -> pd.Series:
import dice
return a * b + dice.roll(f"1d10")[0]
df = spark.range(1, 10)
df = df.withColumns({
"joke": get_joke(),
"doubled": double_and_json_parse(col("id")),
"mutliplied_with_roll": multiply_and_add_roll(col("id"), col("doubled"))
})
df.show()
Базовая среда Python
Функции пользователя выполняются на вычислительных мощностях Databricks, а не на клиенте. Базовая среда Python, в которой выполняются UDF, зависит от вычислительных ресурсов Databricks.
Для кластеров базовая среда Python — это среда Python версии среды выполнения Databricks, запущенная в кластере. Версия Python и список пакетов в этой базовой среде находятся в разделе "Системная среда " и " Установленные библиотеки Python " заметок о выпуске Databricks Runtime.
Для бессерверных вычислений базовая среда Python соответствует версии бессерверной среды в соответствии со следующей таблицей.
| Версия Databricks Connect | Бессерверная среда UDF |
|---|---|
| 17.0 до 17.3, Python 3.12 | Версия 4 |
| 16.4.1 до 17, Python 3.12 | Версия 3 |
| 15.4.10 до ниже 16, Python 3.12 | Версия 3 |
| 15.4.10 до ниже 16, Python 3.11 | Версия 2 |
| 15.4.0 до 15.4.9 и 16.0 до 16.3 | Последние бессерверные вычисления. Перейдите на 15.4.10 LTS и выше или 16.4.1 LTS и выше для стабильной среды Python. |