Bagikan melalui


Fungsi yang ditentukan pengguna di Databricks Connect untuk Python

Nota

Artikel ini membahas Databricks Connect untuk Databricks Runtime 13.3 ke atas.

Databricks Connect for Python mendukung fungsi yang ditentukan pengguna (UDF). Ketika operasi DataFrame yang menyertakan UDF dijalankan, UDF diserialisasikan oleh Databricks Connect dan dikirim ke server sebagai bagian dari permintaan.

Untuk informasi tentang UDF untuk Databricks Connect for Scala, lihat Fungsi yang ditentukan pengguna di Databricks Connect for Scala.

Nota

Karena fungsi yang ditentukan pengguna diserialisasikan dan dideserialisasi, versi Python klien harus cocok dengan versi Python pada komputasi Azure Databricks. Untuk versi yang didukung, lihat matriks dukungan versi .

Menentukan UDF

Untuk membuat UDF di Databricks Connect for Python, gunakan salah satu fungsi yang didukung berikut:

Misalnya, Python berikut menyiapkan UDF sederhana yang mengkuadratkan nilai dalam kolom.

from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from databricks.connect import DatabricksSession

@udf(returnType=IntegerType())
def double(x):
    return x * x

spark = DatabricksSession.builder.getOrCreate()

df = spark.range(1, 2)
df = df.withColumn("doubled", double(col("id")))

df.show()

UDF dengan ketergantungan

Penting

Fitur ini ada di Pratinjau Umum dan memerlukan Databricks Connect untuk Python 16.4 atau lebih tinggi, dan kluster yang menjalankan Databricks Runtime 16.4 atau lebih tinggi. Untuk menggunakan fitur ini, aktifkan pratinjau Enhanced Python UDFs di Unity Catalog di ruang kerja Anda.

Databricks Connect mendukung penentuan dependensi Python yang diperlukan untuk UDF. Dependensi ini diinstal pada komputasi Databricks sebagai bagian dari lingkungan Python UDF.

Fitur ini memungkinkan pengguna untuk menentukan dependensi yang dibutuhkan UDF selain paket yang disediakan di lingkungan dasar. Ini juga dapat digunakan untuk menginstal versi paket yang berbeda dari apa yang disediakan di lingkungan dasar.

Dependensi dapat dipasang dari sumber berikut:

  • Paket PyPI
    • Paket PyPI dapat ditentukan sesuai dengan PEP 508, misalnya, dice, pyjokes<1 atau simplejson==3.19.*.
  • File-file yang disimpan dalam volume Katalog Unity
    • Paket wheel (.whl) dan file tar terkompresi gzip (.tar.gz) didukung. Pengguna harus diberikan izin READ_FILE pada file pada volume re:[UC].
    • Saat menginstal paket dari volume Unity Catalog, untuk memanggil UDF, pengguna memerlukan READ VOLUME izin pada volume sumber. Memberikan izin ini kepada semua pengguna akun secara otomatis mengaktifkan ini untuk pengguna baru.
    • File volume Katalog Unity harus ditentukan sebagai dbfs:<path>, misalnya, dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep.whl atau dbfs:/Volumes/users/someone@example.com/tars/my_private_deps.tar.gz.

Untuk menyertakan dependensi kustom di UDF Anda, tentukan di lingkungan menggunakan withDependencies, lalu gunakan lingkungan tersebut untuk membuat sesi Spark. Dependensi diinstal pada komputasi Databricks Anda dan akan tersedia di semua UDF yang menggunakan sesi Spark ini.

Kode berikut menyatakan paket dice PyPI sebagai dependensi:

from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dice==3.1.0")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

Atau, untuk menentukan dependensi roda dalam volume:

from databricks.connect import DatabricksSession, DatabricksEnv

env = DatabricksEnv().withDependencies("dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep.whl")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

Perilaku di buku catatan dan pekerjaan Databricks

Dalam buku catatan dan pekerjaan, dependensi UDF perlu diinstal langsung di REPL. Databricks Connect memvalidasi lingkungan REPL Python dengan memverifikasi bahwa semua dependensi yang ditentukan sudah diinstal dan melemparkan pengecualian jika ada yang tidak diinstal.

Proses validasi lingkungan kerja notebook dilakukan untuk dependensi volume PyPI dan Unity Catalog. Dependensi volume perlu dikemas mengikuti spesifikasi pengemasan Python standar dari PEP-427 atau yang lebih baru untuk file roda, dan PEP-241 atau yang lebih baru untuk file distribusi sumber. Untuk informasi selengkapnya tentang standar pengemasan Python, lihat dokumentasi PyPA.

Keterbatasan

  • File seperti roda Python atau distribusi sumber pada komputer pengembangan lokal Anda tidak dapat ditentukan langsung sebagai dependensi. Mereka harus terlebih dahulu diunggah ke volume Katalog Unity.
  • Dukungan dependensi UDF untuk pyspark.sql.streaming.DataStreamWriter.foreach dan pyspark.sql.streaming.DataStreamWriter.foreachBatch memerlukan Databricks Connect untuk Python 18.0 atau lebih tinggi, dan kluster yang menjalankan Databricks Runtime 18.0 atau lebih tinggi.
  • Dependensi UDF tidak didukung untuk UDF agregasi Pandas pada fungsi jendela.

Contoh

Contoh berikut mendefinisikan dependensi PyPI dan volume di lingkungan, membuat sesi dengan lingkungan tersebut, lalu menentukan dan memanggil UDF yang menggunakan dependensi tersebut:

from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import udf, col, pandas_udf
from pyspark.sql.types import IntegerType, LongType, StringType
import pandas as pd

pypi_deps = ["pyjokes>=0.8,<1"]

volumes_deps = [
    # Example library from: https://pypi.org/project/dice/#files
    "dbfs:/Volumes/main/someone@example.com/test/dice-4.0.0-py3-none-any.whl"
    # Example library from: https://pypi.org/project/simplejson/#files
    "dbfs:/Volumes/main/someone@example.com/test/simplejson-3.19.3.tar.gz",
]

env = DatabricksEnv().withDependencies(pypi_deps).withDependencies(volumes_deps)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

# UDFs
@udf(returnType=StringType())
def get_joke():
    from pyjokes import get_joke
    return get_joke()

@udf(returnType=IntegerType())
def double_and_json_parse(x):
    import simplejson
    return simplejson.loads(simplejson.dumps(x * 2))


@pandas_udf(returnType=LongType())
def multiply_and_add_roll(a: pd.Series, b: pd.Series) -> pd.Series:
    import dice
    return a * b + dice.roll(f"1d10")[0]


df = spark.range(1, 10)
df = df.withColumns({
    "joke": get_joke(),
    "doubled": double_and_json_parse(col("id")),
    "mutliplied_with_roll": multiply_and_add_roll(col("id"), col("doubled"))
})
df.show()

Lingkungan dasar Python

UDF dijalankan pada komputasi Databricks dan bukan pada klien. Lingkungan Python dasar tempat UDF dijalankan tergantung pada komputasi Databricks.

Untuk kluster, lingkungan Python dasar adalah lingkungan Python dari versi Databricks Runtime yang berjalan pada kluster. Versi Python dan daftar paket di lingkungan dasar ini ditemukan di bawah lingkungan Sistem dan bagian pustaka Python terinstal dari catatan rilis Databricks Runtime.

Untuk komputasi tanpa server, lingkungan Python dasar sesuai dengan versi lingkungan tanpa server sesuai dengan tabel berikut.

Versi Databricks Connect Lingkungan tanpa server UDF
17.0 hingga 17.3, Python 3.12 Versi 4
16.4.1 hingga di bawah 17, Python 3.12 Versi 3
15.4.10 hingga di bawah 16, Python 3.12 Versi 3
15.4.10 ke bawah 16, Python 3.11 Versi 2
15.4.0 hingga 15.4.9 dan 16.0 hingga 16.3 Komputasi tanpa server terbaru. Harap migrasikan ke 15.4.10 LTS ke atas atau 16.4.1 LTS ke atas untuk lingkungan Python yang stabil.