Поделиться через


Определяемые пользователем функции в Databricks Connect для Python

Заметка

В этой статье рассматривается Databricks Connect для Databricks Runtime 13.3 и более поздних версий.

Databricks Connect для Python поддерживает определяемые пользователем функции (UDF). При выполнении операции DataFrame, включающей определяемые пользователем функции (UDF), они сериализуются Databricks Connect и отправляются на сервер в рамках запроса.

Сведения об определяемых пользователем функциях в Databricks Connect для Scala см. раздел "Определяемые пользователем функции" в Databricks Connect для Scala.

Заметка

Так как определяемая пользователем функция сериализуется и десериализирована, версия Python клиента должна соответствовать версии Python в вычислительной среде Azure Databricks. Сведения о поддерживаемых версиях см. в матрице поддержки версий.

Определение UDF

Чтобы создать UDF в Databricks Connect для Python, используйте одну из следующих поддерживаемых функций:

Например, следующий код на Python создаёт простую пользовательскую функцию (UDF), которая возводит значения в столбце в квадрат.

from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from databricks.connect import DatabricksSession

@udf(returnType=IntegerType())
def double(x):
    return x * x

spark = DatabricksSession.builder.getOrCreate()

df = spark.range(1, 2)
df = df.withColumn("doubled", double(col("id")))

df.show()

Управление зависимостями UDF

Это важно

Эта функция доступна в общедоступной предварительной версии и требуется Databricks Connect для Python 16.4 или более поздней версии, а кластер под управлением Databricks Runtime 16.4 или более поздней версии. Чтобы использовать эту функцию, включите предварительный просмотр расширенных пользовательских функций Python в каталоге Unity в рабочей области.

Databricks Connect поддерживает указание зависимостей Python, необходимых для определяемых пользователем функций. Эти зависимости устанавливаются на вычислительных ресурсах Databricks в рамках среды Python UDF.

Эта функция позволяет пользователям указывать зависимости, необходимые UDF в дополнение к пакетам, предоставленным в базовой среде. Его также можно использовать для установки другой версии пакета, отличной от того, что предоставляется в базовой среде.

Зависимости можно установить из следующих источников:

  • Пакеты PyPI
    • Пакеты PyPI можно указать в соответствии с PEP 508, например, dice, pyjokes<1 или simplejson==3.19.*.
  • Пакеты, хранящиеся в томах каталога Unity
    • Поддерживаются как собранные дистрибутивы (.whl), так и исходные дистрибутивы (.tar.gz).
    • Пакеты томов каталога Unity можно указать как dbfs:<path>, например, dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep-3.20.2-py3-none-any.whl или dbfs:/Volumes/users/someone@example.com/tars/my_private_dep-4.0.0.tar.gz.
    • Пользователю необходимо предоставить READ_FILE разрешение на файл в разделе re:[UC]. Предоставление этого разрешения всем пользователям учетной записи автоматически активирует его для новых пользователей.
  • Локальные пакеты, папки и файлы Python
    • Локальные встроенные дистрибутивы (), исходные дистрибутивы (.whl), папки и файлы Python можно указать как .tar.gz, напримерlocal:<path>, , local:/path/to/my_private_dep-3.20.2-py3-none-any.whl, local:/path/to/my_private_dep-4.0.0.tar.gzlocal:/path/to/my_folder.local:/path/to/my_file.py
    • Поддерживаются абсолютные и относительные пути, например: local:/path/to/my_file.py или local:./path/to/my_file.py.

Чтобы включить пользовательские зависимости в UDF, укажите их в среде с использованием withDependencies, а затем используйте эту среду для создания Spark-сеанса. Зависимости устанавливаются на вычислительных кластерах Databricks и будут доступны во всех определяемых пользователем функциях, использующих этот сеанс Spark.

Следующий код объявляет пакет dice PyPI как зависимость:

from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dice==3.1.0")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

Или, чтобы указать зависимость колеса в томе:

from databricks.connect import DatabricksSession, DatabricksEnv

env = DatabricksEnv().withDependencies("dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep-3.20.2-py3-none-any.whl")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

Поведение в тетрадях и заданиях Databricks

В записных книжках и заданиях необходимо установить зависимости UDF непосредственно в REPL. Databricks Connect проверяет среду REPL Python, убедившись, что все указанные зависимости уже установлены и вызывают исключение, если они не установлены. Проверка окружения записной книжки выполняется для зависимостей томов PyPI и Unity Catalog, но не для локальных зависимостей.

Ограничения

  • Для поддержки зависимостей UDF в pyspark.sql.streaming.DataStreamWriter.foreach требуется Databricks Connect для Python версии не ниже 18.0, а кластер, использующий Databricks Runtime версии не ниже 18.0.
  • Для поддержки зависимостей UDF для pyspark.sql.streaming.DataStreamWriter.foreachBatch необходимо использование Databricks Connect версии 18.0 или выше для Python, а также кластер, работающий на Databricks Runtime версии 18.0 или выше. Эта функция не поддерживается на бессерверном сервере.
  • Для поддержки зависимостей UDF для локальных пакетов, папок и файлов Python требуется Databricks Connect для Python 18.1 или более поздней версии, а кластер под управлением Databricks Runtime 18.1 или более поздней версии.
  • Зависимости UDF не поддерживаются для пользовательских функций агрегирования pandas через функции окна.
  • Пакеты томов каталога Unity и локальные пакеты необходимо упаковывать в соответствии со стандартными спецификациями упаковки Python из PEP-427 или более поздней версии для встроенных дистрибутивов колес и PEP-241 или более поздней версии для дистрибутивов источников tar. Дополнительные сведения о стандартах упаковки Python см. в документации по PyPA.

Примеры

Следующий пример определяет зависимости PyPI и зависимости томов в среде, создает сеанс в этой среде, а затем определяет и вызывает функции, определяемые пользователем, которые используют эти зависимости.

from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import udf, col, pandas_udf
from pyspark.sql.types import IntegerType, LongType, StringType
import pandas as pd

pypi_deps = ["pyjokes>=0.8,<1"]

volumes_deps = [
    # Example library from: https://pypi.org/project/dice/#files
    "dbfs:/Volumes/main/someone@example.com/test/dice-4.0.0.tar.gz",
]

local_deps = [
    # Example library from: https://pypi.org/project/simplejson/#files
    "local:./test/simplejson-3.20.2-py3-none-any.whl",
]

env = DatabricksEnv().withDependencies(pypi_deps).withDependencies(volumes_deps).withDependencies(local_deps)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

# UDFs
@udf(returnType=StringType())
def get_joke():
    from pyjokes import get_joke
    return get_joke()

@udf(returnType=IntegerType())
def double_and_json_parse(x):
    import simplejson
    return simplejson.loads(simplejson.dumps(x * 2))


@pandas_udf(returnType=LongType())
def multiply_and_add_roll(a: pd.Series, b: pd.Series) -> pd.Series:
    import dice
    return a * b + dice.roll(f"1d10")[0]


df = spark.range(1, 10)
df = df.withColumns({
    "joke": get_joke(),
    "doubled": double_and_json_parse(col("id")),
    "mutliplied_with_roll": multiply_and_add_roll(col("id"), col("doubled"))
})
df.show()

Автоматическое управление зависимостями UDF

Это важно

Эта функция доступна в общедоступной предварительной версии и требуется Databricks Connect для Python 18.1 или более поздней версии, Python 3.12 на локальном компьютере и кластер под управлением Databricks Runtime 18.1 или более поздней версии. Чтобы использовать эту функцию, включите предварительный просмотр расширенных пользовательских функций Python в каталоге Unity в рабочей области.

API Databricks Connect withAutoDependencies() включает автоматическое обнаружение и отправку локальных модулей и общедоступных зависимостей PyPI, используемых в выражениях import в ваших UDF. Он удаляет необходимость вручную указывать зависимости.

Следующий код включает автоматическое управление зависимостями:

from databricks.connect import DatabricksSession, DatabricksEnv

env = DatabricksEnv().withAutoDependencies(upload_local=True, use_index=True)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

Метод withAutoDependencies() принимает следующие параметры:

  • upload_local: если задано значение True, локальные модули, импортированные в ваши пользовательские определяемые функции (UDF), автоматически обнаруживаются, упаковываются и передаются в песочницу UDF.
  • use_index: если задано значение True, общедоступные зависимости PyPI, используемые в ваших UDF, автоматически обнаруживаются и устанавливаются в вычислительных ресурсах Azure Databricks. Процесс обнаружения использует установленные пакеты на локальном компьютере для определения версий, обеспечения согласованности между локальной средой и удаленной средой выполнения.

Ограничения

  • Динамические импорты (например, importlib.import_module("foo")не поддерживаются).
  • Пакеты пространства имен (например, azure.eventhub и google.cloud.aiplatform) не поддерживаются.
  • Зависимости, установленные через прямые URL-ссылки, не поддерживаются. К ним относятся установленные из локальных файлов wheel.
  • Зависимости, установленные из индексов частных пакетов, не поддерживаются. Пакеты, установленные таким образом, не могут отличаться от пакетов, установленных от общедоступного PyPI.
  • Обнаружение зависимостей не работает в оболочке Python. Поддерживаются только скрипты Python, оболочка IPython и Jupyter Notebook.

Примеры

В следующем примере показано автоматическое управление зависимостями как с локальными модулями, так и с пакетами PyPI. В этом примере требуется, чтобы вы установили simplejson и dice (используете pip install simplejson dice).

Сначала создайте локальные вспомогательные модули:

# my_helper.py
def double(x):
    return 2 * x
# my_json.py
import simplejson

def loads(x):
    return simplejson.loads(x)

def dumps(x):
    return simplejson.dumps(x)

Затем в основном скрипте импортируйте эти модули и используйте их в определяемых пользователем файлах:

# main.py
import dice as dc
from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType, FloatType

import my_json
from my_helper import double

env = DatabricksEnv().withAutoDependencies(upload_local=True, use_index=True)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

@udf(returnType=IntegerType())
def double_and_json_parse(x):
    return my_json.loads(my_json.dumps(double(x)))

@udf(returnType=FloatType())
def sum_and_add_noise(x, y):
    return x + y + (dc.roll("d6")[0] / 6)

df = spark.range(1, 10)
df = df.withColumns({
    "doubled": double_and_json_parse(col("id")),
    "summed_with_noise": sum_and_add_noise(col("id"), col("doubled")),
})
df.show()

Logging

Чтобы вывести обнаруженные зависимости, задайте переменной среды значение SPARK_CONNECT_LOG_LEVEL, info или debug. Кроме того, настройте платформу ведения журнала Python:

import logging
logging.basicConfig(level=logging.INFO)

Соответствующие журналы создаются databricks.connect.auto_dependencies модулем, например:

DEBUG:databricks.connect.auto_dependencies.discovery:Discovered local module: my_json
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered local module: my_helper
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered distribution: simplejson for module simplejson
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered distribution: dice for module dice
INFO:databricks.connect.auto_dependencies.hook:Synced zip artifact for: my_json
INFO:databricks.connect.auto_dependencies.hook:Synced zip artifact for: my_helper
INFO:databricks.connect.auto_dependencies.hook:Updated simplejson with auto-detected version ==3.20.2
INFO:databricks.connect.auto_dependencies.hook:Updated dice with auto-detected version ==4.0.0

Базовая среда Python

Функции пользователя выполняются на вычислительных мощностях Databricks, а не на клиенте. Базовая среда Python, в которой выполняются UDF, зависит от вычислительных ресурсов Databricks.

Для кластеров базовая среда Python — это среда Python версии среды выполнения Databricks, запущенная в кластере. Версия Python и список пакетов в этой базовой среде находятся в разделе "Системная среда " и " Установленные библиотеки Python " заметок о выпуске Databricks Runtime.

Для бессерверных вычислений базовая среда Python соответствует версии бессерверной среды в соответствии со следующей таблицей. Версии Databricks Connect, не перечисленные в этой таблице, пока не поддерживают бессерверные или достигли конца поддержки. См. матрицу поддержки версий и конечные версии Databricks Connect.

Версия Databricks Connect Бессерверная среда UDF
18.0, Python 3.12 Версия 5
17.2–17.3, Python 3.12 Версия 4
16.4.1 до ниже 17, Python 3.12 Версия 3
От 15.4.10 до ниже 16, Python 3.12 Версия 3
15.4.10 до версии ниже 16, Python 3.11 Версия 2