Поделиться через


Определяемые пользователем функции в Databricks Connect для Python

Заметка

В этой статье рассматривается Databricks Connect для Databricks Runtime 13.3 и более поздних версий.

Databricks Connect для Python поддерживает определяемые пользователем функции (UDF). При выполнении операции DataFrame, включающей определяемые пользователем функции (UDF), они сериализуются Databricks Connect и отправляются на сервер в рамках запроса.

Сведения об определяемых пользователем функциях в Databricks Connect для Scala см. раздел "Определяемые пользователем функции" в Databricks Connect для Scala.

Заметка

Так как определяемая пользователем функция сериализуется и десериализирована, версия Python клиента должна соответствовать версии Python в вычислительной среде Azure Databricks. Сведения о поддерживаемых версиях см. в матрице поддержки версий.

Определение UDF

Чтобы создать UDF в Databricks Connect для Python, используйте одну из следующих поддерживаемых функций:

Например, следующий код на Python создаёт простую пользовательскую функцию (UDF), которая возводит значения в столбце в квадрат.

from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from databricks.connect import DatabricksSession

@udf(returnType=IntegerType())
def double(x):
    return x * x

spark = DatabricksSession.builder.getOrCreate()

df = spark.range(1, 2)
df = df.withColumn("doubled", double(col("id")))

df.show()

Определяемые пользователем функции с зависимостями

Это важно

Эта функция доступна в общедоступной предварительной версии и требуется Databricks Connect для Python 16.4 или более поздней версии, а кластер под управлением Databricks Runtime 16.4 или более поздней версии. Чтобы использовать эту функцию, включите предварительный просмотр расширенных пользовательских функций Python в каталоге Unity в рабочей области.

Databricks Connect поддерживает указание зависимостей Python, необходимых для определяемых пользователем функций. Эти зависимости устанавливаются на вычислительных ресурсах Databricks в рамках среды Python UDF.

Эта функция позволяет пользователям указывать зависимости, необходимые UDF в дополнение к пакетам, предоставленным в базовой среде. Его также можно использовать для установки другой версии пакета, отличной от того, что предоставляется в базовой среде.

Зависимости можно установить из следующих источников:

  • Пакеты PyPI
    • Пакеты PyPI можно указать в соответствии с PEP 508, например, dice, pyjokes<1 или simplejson==3.19.*.
  • Файлы, хранящиеся в томах каталога Unity
    • Поддерживаются wheel-пакеты (.whl) и gzipped tar-файлы (.tar.gz). Пользователю необходимо предоставить READ_FILE разрешение на файл в разделе re:[UC].
    • При установке пакетов из томов каталога Unity для вызова определяемых пользователем функций пользователям необходимо READ VOLUME разрешение на исходный том. Предоставление этого разрешения всем пользователям учетной записи автоматически активирует его для новых пользователей.
    • Файлы томов каталога Unity следует указать как dbfs:<path>, например, dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep.whl или dbfs:/Volumes/users/someone@example.com/tars/my_private_deps.tar.gz.

Чтобы включить пользовательские зависимости в UDF, укажите их в среде с использованием withDependencies, а затем используйте эту среду для создания Spark-сеанса. Зависимости устанавливаются на вычислительных кластерах Databricks и будут доступны во всех определяемых пользователем функциях, использующих этот сеанс Spark.

Следующий код объявляет пакет dice PyPI как зависимость:

from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dice==3.1.0")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

Или, чтобы указать зависимость колеса в томе:

from databricks.connect import DatabricksSession, DatabricksEnv

env = DatabricksEnv().withDependencies("dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep.whl")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

Поведение в тетрадях и заданиях Databricks

В записных книжках и заданиях необходимо установить зависимости UDF непосредственно в REPL. Databricks Connect проверяет среду REPL Python, убедившись, что все указанные зависимости уже установлены и вызывают исключение, если они не установлены.

Проверка среды ноутбука выполняется как для зависимостей томов PyPI, так и для зависимостей томов каталога Unity. Зависимости томов необходимо упаковать в соответствии со стандартами упаковки Python из PEP-427 или более поздними для wheel-файлов и PEP-241 или более поздними для исходных файлов распространения. Дополнительные сведения о стандартах упаковки Python см. в документации по PyPA.

Ограничения

  • Файлы, такие как колесо Python или исходное распределение на локальном компьютере разработки, нельзя указать непосредственно в качестве зависимости. Сначала их необходимо загрузить в тома Unity Catalog.
  • Поддержка зависимостей UDF для pyspark.sql.streaming.DataStreamWriter.foreach и pyspark.sql.streaming.DataStreamWriter.foreachBatch требует Databricks Connect для Python версии 18.0 или выше, а также кластер с Databricks Runtime версии 18.0 или выше.
  • Зависимости UDF не поддерживаются для пользовательских функций агрегирования pandas через функции окна.

Примеры

Следующий пример определяет зависимости PyPI и зависимости томов в среде, создает сеанс в этой среде, а затем определяет и вызывает функции, определяемые пользователем, которые используют эти зависимости.

from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import udf, col, pandas_udf
from pyspark.sql.types import IntegerType, LongType, StringType
import pandas as pd

pypi_deps = ["pyjokes>=0.8,<1"]

volumes_deps = [
    # Example library from: https://pypi.org/project/dice/#files
    "dbfs:/Volumes/main/someone@example.com/test/dice-4.0.0-py3-none-any.whl"
    # Example library from: https://pypi.org/project/simplejson/#files
    "dbfs:/Volumes/main/someone@example.com/test/simplejson-3.19.3.tar.gz",
]

env = DatabricksEnv().withDependencies(pypi_deps).withDependencies(volumes_deps)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

# UDFs
@udf(returnType=StringType())
def get_joke():
    from pyjokes import get_joke
    return get_joke()

@udf(returnType=IntegerType())
def double_and_json_parse(x):
    import simplejson
    return simplejson.loads(simplejson.dumps(x * 2))


@pandas_udf(returnType=LongType())
def multiply_and_add_roll(a: pd.Series, b: pd.Series) -> pd.Series:
    import dice
    return a * b + dice.roll(f"1d10")[0]


df = spark.range(1, 10)
df = df.withColumns({
    "joke": get_joke(),
    "doubled": double_and_json_parse(col("id")),
    "mutliplied_with_roll": multiply_and_add_roll(col("id"), col("doubled"))
})
df.show()

Базовая среда Python

Функции пользователя выполняются на вычислительных мощностях Databricks, а не на клиенте. Базовая среда Python, в которой выполняются UDF, зависит от вычислительных ресурсов Databricks.

Для кластеров базовая среда Python — это среда Python версии среды выполнения Databricks, запущенная в кластере. Версия Python и список пакетов в этой базовой среде находятся в разделе "Системная среда " и " Установленные библиотеки Python " заметок о выпуске Databricks Runtime.

Для бессерверных вычислений базовая среда Python соответствует версии бессерверной среды в соответствии со следующей таблицей.

Версия Databricks Connect Бессерверная среда UDF
17.0 до 17.3, Python 3.12 Версия 4
16.4.1 до 17, Python 3.12 Версия 3
15.4.10 до ниже 16, Python 3.12 Версия 3
15.4.10 до ниже 16, Python 3.11 Версия 2
15.4.0 до 15.4.9 и 16.0 до 16.3 Последние бессерверные вычисления. Перейдите на 15.4.10 LTS и выше или 16.4.1 LTS и выше для стабильной среды Python.