次の方法で共有


Databricks Connect for Python のユーザー定義関数

手記

この記事では、Databricks Runtime 13.3 以降の Databricks Connect について説明します。

Databricks Connect for Python では、 ユーザー定義関数 (UDF) がサポートされています。 UDF を含む DataFrame 操作が実行されると、UDF は Databricks Connect によってシリアル化され、要求の一部としてサーバーに送信されます。

Databricks Connect for Scala の UDF の詳細については、「 Databricks Connect for Scala のユーザー定義関数」を参照してください。

手記

ユーザー定義関数はシリアル化および逆シリアル化されるため、クライアントの Python バージョンは Azure Databricks コンピューティングの Python バージョンと一致する必要があります。 サポートされているバージョンについては、 バージョンのサポート マトリックスを参照してください。

UDF を定義する

Databricks Connect for Python で UDF を作成するには、次のサポートされている関数のいずれかを使用します。

たとえば、次の Python は、列の値を 2 乗する単純な UDF を設定します。

from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from databricks.connect import DatabricksSession

@udf(returnType=IntegerType())
def double(x):
    return x * x

spark = DatabricksSession.builder.getOrCreate()

df = spark.range(1, 2)
df = df.withColumn("doubled", double(col("id")))

df.show()

UDF の依存関係を管理する

Von Bedeutung

この機能は パブリック プレビュー 段階であり、Databricks Connect for Python 16.4 以降と、Databricks Runtime 16.4 以降を実行しているクラスターが必要です。 この機能を使用するには、ワークスペースで Unity Catalog のプレビュー機能「Enhanced Python UDFs」 を有効にします。

Databricks Connect では、UDF に必要な Python 依存関係の指定がサポートされています。 これらの依存関係は、UDF の Python 環境の一部として Databricks コンピューティングにインストールされます。

この機能を使用すると、ユーザーは、基本環境で提供されるパッケージに加えて、UDF に必要な依存関係を指定できます。 また、 基本環境で提供されているものとは異なるバージョンのパッケージをインストールするためにも使用できます。

依存関係は、次のソースからインストールできます。

  • PyPI パッケージ
    • PyPI パッケージは、 PEP 508 に従って指定できます ( dicepyjokes<1simplejson==3.19.*など)。
  • Unity カタログ ボリュームに格納されているパッケージ
    • ビルドされたディストリビューション (.whl) とソースディストリビューション (.tar.gz) の両方がサポートされています。
    • Unity カタログ ボリューム パッケージは、dbfs:<path>dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep-3.20.2-py3-none-any.whlなど、dbfs:/Volumes/users/someone@example.com/tars/my_private_dep-4.0.0.tar.gzとして指定できます。
    • ユーザーには、re:[UC] ボリューム内のファイル READ_FILE アクセス許可が付与されている必要があります。 すべてのアカウント ユーザーにこのアクセス許可を付与すると、新しいユーザーに対して自動的に有効になります。
  • ローカル パッケージ、フォルダー、Python ファイル
    • ローカルビルドディストリビューション(.whl)、ソースディストリビューション(.tar.gz)、フォルダー、Pythonファイルは、 local:<path>として指定できます( local:/path/to/my_private_dep-3.20.2-py3-none-any.whllocal:/path/to/my_private_dep-4.0.0.tar.gzlocal:/path/to/my_folderlocal:/path/to/my_file.pyなど)。
    • 絶対パスと相対パスの両方がサポートされています (例: local:/path/to/my_file.py または local:./path/to/my_file.py)。

UDF にカスタム依存関係を含めるには、 withDependenciesを使用して環境で指定し、その環境を使用して Spark セッションを作成します。 依存関係は Databricks コンピューティングにインストールされ、この Spark セッションを使用するすべての UDF で使用できます。

次のコードでは、PyPI パッケージ dice を依存関係として宣言しています。

from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dice==3.1.0")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

または、ボリューム内のホイールの依存関係を指定するには、次のようにします。

from databricks.connect import DatabricksSession, DatabricksEnv

env = DatabricksEnv().withDependencies("dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep-3.20.2-py3-none-any.whl")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

Databricks ノートブックおよびジョブでの動作

ノートブックとジョブでは、UDF の依存関係を REPL に直接インストールする必要があります。 Databricks Connect は、指定されたすべての依存関係が既にインストールされていることを確認して REPL Python 環境を検証し、インストールされていない場合は例外をスローします。 ノートブック環境の検証は、PyPI と Unity カタログの両方のボリューム依存関係に対して実行されますが、ローカルの依存関係には実行されません。

制限事項

  • pyspark.sql.streaming.DataStreamWriter.foreachに対する UDF 依存関係のサポートには、Databricks Connect for Python 18.0 以降と、Databricks Runtime 18.0 以降を実行しているクラスターが必要です。
  • pyspark.sql.streaming.DataStreamWriter.foreachBatchに対する UDF 依存関係のサポートには、Databricks Connect for Python 18.0 以降と、Databricks Runtime 18.0 以降を実行しているクラスターが必要です。 この機能は、サーバーレスではサポートされていません。
  • ローカル パッケージ、フォルダー、Python ファイルに対する UDF 依存関係のサポートには、Databricks Connect for Python 18.1 以降と、Databricks Runtime 18.1 以降を実行しているクラスターが必要です。
  • ウィンドウ関数に対する Pandas 集計 UDF では、UDF の依存関係はサポートされていません。
  • Unity カタログ ボリューム パッケージとローカル パッケージは、ホイールビルドディストリビューションの 場合は PEP-427 以降の標準 Python パッケージ仕様に従ってパッケージ化し、tar ソースディストリビューションの場合は PEP-241 以降を パッケージ化する必要があります。 Python パッケージ標準の詳細については、 PyPA のドキュメントを参照してください

例示

次の例では、環境で PyPI とボリュームの依存関係を定義し、その環境とのセッションを作成してから、それらの依存関係を使用する UDF を定義して呼び出します。

from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import udf, col, pandas_udf
from pyspark.sql.types import IntegerType, LongType, StringType
import pandas as pd

pypi_deps = ["pyjokes>=0.8,<1"]

volumes_deps = [
    # Example library from: https://pypi.org/project/dice/#files
    "dbfs:/Volumes/main/someone@example.com/test/dice-4.0.0.tar.gz",
]

local_deps = [
    # Example library from: https://pypi.org/project/simplejson/#files
    "local:./test/simplejson-3.20.2-py3-none-any.whl",
]

env = DatabricksEnv().withDependencies(pypi_deps).withDependencies(volumes_deps).withDependencies(local_deps)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

# UDFs
@udf(returnType=StringType())
def get_joke():
    from pyjokes import get_joke
    return get_joke()

@udf(returnType=IntegerType())
def double_and_json_parse(x):
    import simplejson
    return simplejson.loads(simplejson.dumps(x * 2))


@pandas_udf(returnType=LongType())
def multiply_and_add_roll(a: pd.Series, b: pd.Series) -> pd.Series:
    import dice
    return a * b + dice.roll(f"1d10")[0]


df = spark.range(1, 10)
df = df.withColumns({
    "joke": get_joke(),
    "doubled": double_and_json_parse(col("id")),
    "mutliplied_with_roll": multiply_and_add_roll(col("id"), col("doubled"))
})
df.show()

UDF 依存関係の自動管理

Von Bedeutung

この機能は パブリック プレビュー 段階であり、Databricks Connect for Python 18.1 以降、ローカル コンピューター上の Python 3.12、および Databricks Runtime 18.1 以降を実行しているクラスターが必要です。 この機能を使用するには、ワークスペースで Unity Catalog のプレビュー機能「Enhanced Python UDFs」 を有効にします。

Databricks Connect withAutoDependencies() API を使用すると、UDF の import ステートメントで使用されるローカル モジュールとパブリック PyPI 依存関係の自動検出とアップロードが可能になります。 依存関係を手動で指定する必要がなくなります。

次のコードは、依存関係の自動管理を有効にします。

from databricks.connect import DatabricksSession, DatabricksEnv

env = DatabricksEnv().withAutoDependencies(upload_local=True, use_index=True)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

withAutoDependencies() メソッドは、次のパラメーターを受け入れます。

  • upload_local: Trueに設定すると、UDF にインポートされたローカル モジュールが自動的に検出され、パッケージ化され、UDF サンドボックスにアップロードされます。
  • use_index: Trueに設定すると、UDF で使用されるパブリック PyPI 依存関係が自動的に検出され、Azure Databricks コンピューティングにインストールされます。 検出プロセスでは、ローカル コンピューターにインストールされているパッケージを使用してバージョンが決定され、ローカル環境とリモート実行環境の間の一貫性が確保されます。

制限事項

  • 動的インポート ( importlib.import_module("foo")など) はサポートされていません。
  • 名前空間パッケージ ( azure.eventhubgoogle.cloud.aiplatformなど) はサポートされていません。
  • 直接 URL 参照を使用してインストールされた依存関係はサポートされていません。 これには、ローカル ホイール ファイルからインストールされたものが含まれます。
  • プライベート パッケージ インデックスからインストールされた依存関係はサポートされていません。 この方法でインストールされたパッケージは、パブリック PyPI からインストールされたパッケージと区別できません。
  • 依存関係の検出は Python シェルでは機能しません。 Python スクリプト、IPython シェル、Jupyter Notebook のみがサポートされています。

例示

次の例では、ローカル モジュールと PyPI パッケージの両方を使用した自動依存関係管理を示します。 この例では、(simplejson を使用して) dicepip install simplejson diceをインストールしている必要があります。

まず、ローカル ヘルパー モジュールを作成します。

# my_helper.py
def double(x):
    return 2 * x
# my_json.py
import simplejson

def loads(x):
    return simplejson.loads(x)

def dumps(x):
    return simplejson.dumps(x)

次に、メイン スクリプトでこれらのモジュールをインポートし、UDF で使用します。

# main.py
import dice as dc
from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType, FloatType

import my_json
from my_helper import double

env = DatabricksEnv().withAutoDependencies(upload_local=True, use_index=True)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

@udf(returnType=IntegerType())
def double_and_json_parse(x):
    return my_json.loads(my_json.dumps(double(x)))

@udf(returnType=FloatType())
def sum_and_add_noise(x, y):
    return x + y + (dc.roll("d6")[0] / 6)

df = spark.range(1, 10)
df = df.withColumns({
    "doubled": double_and_json_parse(col("id")),
    "summed_with_noise": sum_and_add_noise(col("id"), col("doubled")),
})
df.show()

ログ記録

検出された依存関係を出力するには、 SPARK_CONNECT_LOG_LEVEL 環境変数を info または debugに設定します。 または、Python ログ フレームワークを構成します。

import logging
logging.basicConfig(level=logging.INFO)

関連するログは、 databricks.connect.auto_dependencies モジュールによって出力されます。次に例を示します。

DEBUG:databricks.connect.auto_dependencies.discovery:Discovered local module: my_json
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered local module: my_helper
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered distribution: simplejson for module simplejson
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered distribution: dice for module dice
INFO:databricks.connect.auto_dependencies.hook:Synced zip artifact for: my_json
INFO:databricks.connect.auto_dependencies.hook:Synced zip artifact for: my_helper
INFO:databricks.connect.auto_dependencies.hook:Updated simplejson with auto-detected version ==3.20.2
INFO:databricks.connect.auto_dependencies.hook:Updated dice with auto-detected version ==4.0.0

Python 基本環境

UDF は、クライアントではなく Databricks コンピューティングで実行されます。 UDF が実行される基本 Python 環境は、Databricks コンピューティングによって異なります。

クラスターの場合、基本 Python 環境は、クラスターで実行されている Databricks Runtime バージョンの Python 環境です。 この基本環境の Python バージョンとパッケージの一覧は、Databricks Runtime リリース ノートシステム環境インストールされている Python ライブラリセクションにあります。

サーバーレス コンピューティングの場合、基本 Python 環境は、次の表に従って サーバーレス環境のバージョン に対応します。 この表に記載されていない Databricks Connect バージョンは、サーバーレスをまだサポートしていないか、サポート終了に達しています。 バージョン サポート マトリックスサポート終了の Databricks Connect バージョンを参照してください

Databricks Connect バージョン UDF サーバーレス環境
18.0、Python 3.12 バージョン 5
17.2 から 17.3、Python 3.12 バージョン 4
16.4.1 から 17 未満、Python 3.12 バージョン 3
15.4.10 から 16 未満、Python 3.12 バージョン 3
15.4.10 から 16 未満、Python 3.11 バージョン 2