Bagikan melalui


Fungsi yang ditentukan pengguna di Databricks Connect untuk Python

Nota

Artikel ini membahas Databricks Connect untuk Databricks Runtime 13.3 ke atas.

Databricks Connect for Python mendukung fungsi yang ditentukan pengguna (UDF). Ketika operasi DataFrame yang menyertakan UDF dijalankan, UDF diserialisasikan oleh Databricks Connect dan dikirim ke server sebagai bagian dari permintaan.

Untuk informasi tentang UDF untuk Databricks Connect for Scala, lihat Fungsi yang ditentukan pengguna di Databricks Connect for Scala.

Nota

Karena fungsi yang ditentukan pengguna diserialisasikan dan dideserialisasi, versi Python klien harus cocok dengan versi Python pada komputasi Azure Databricks. Untuk versi yang didukung, lihat matriks dukungan versi .

Menentukan UDF

Untuk membuat UDF di Databricks Connect for Python, gunakan salah satu fungsi yang didukung berikut:

Misalnya, Python berikut menyiapkan UDF sederhana yang mengkuadratkan nilai dalam kolom.

from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from databricks.connect import DatabricksSession

@udf(returnType=IntegerType())
def double(x):
    return x * x

spark = DatabricksSession.builder.getOrCreate()

df = spark.range(1, 2)
df = df.withColumn("doubled", double(col("id")))

df.show()

Mengelola dependensi UDF

Penting

Fitur ini ada di Pratinjau Umum dan memerlukan Databricks Connect untuk Python 16.4 atau lebih tinggi, dan kluster yang menjalankan Databricks Runtime 16.4 atau lebih tinggi. Untuk menggunakan fitur ini, aktifkan pratinjau Enhanced Python UDFs di Unity Catalog di ruang kerja Anda.

Databricks Connect mendukung penentuan dependensi Python yang diperlukan untuk UDF. Dependensi ini diinstal pada komputasi Databricks sebagai bagian dari lingkungan Python UDF.

Fitur ini memungkinkan pengguna untuk menentukan dependensi yang dibutuhkan UDF selain paket yang disediakan di lingkungan dasar. Ini juga dapat digunakan untuk menginstal versi paket yang berbeda dari apa yang disediakan di lingkungan dasar.

Dependensi dapat dipasang dari sumber berikut:

  • Paket PyPI
    • Paket PyPI dapat ditentukan sesuai dengan PEP 508, misalnya, dice, pyjokes<1 atau simplejson==3.19.*.
  • Paket yang disimpan dalam volume Katalog Unity
    • Distribusi bawaan (.whl) dan distribusi sumber (.tar.gz) didukung.
    • Paket volume Unity Catalog dapat ditentukan sebagai dbfs:<path>, misalnya, dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep-3.20.2-py3-none-any.whl atau dbfs:/Volumes/users/someone@example.com/tars/my_private_dep-4.0.0.tar.gz.
    • Pengguna harus diberikan izin READ_FILE pada file pada volume re:[UC]. Memberikan izin ini kepada semua pengguna akun secara otomatis mengaktifkan ini untuk pengguna baru.
  • Paket lokal, folder, dan file Python
    • Distribusi bawaan lokal (.whl), distribusi sumber (.tar.gz), folder dan file Python dapat ditentukan sebagai local:<path>, misalnya: local:/path/to/my_private_dep-3.20.2-py3-none-any.whl, , local:/path/to/my_private_dep-4.0.0.tar.gz, local:/path/to/my_folderlocal:/path/to/my_file.py.
    • Jalur absolut dan relatif didukung, misalnya: local:/path/to/my_file.py atau local:./path/to/my_file.py.

Untuk menyertakan dependensi kustom di UDF Anda, tentukan di lingkungan menggunakan withDependencies, lalu gunakan lingkungan tersebut untuk membuat sesi Spark. Dependensi diinstal pada komputasi Databricks Anda dan akan tersedia di semua UDF yang menggunakan sesi Spark ini.

Kode berikut menyatakan paket dice PyPI sebagai dependensi:

from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dice==3.1.0")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

Atau, untuk menentukan dependensi roda dalam volume:

from databricks.connect import DatabricksSession, DatabricksEnv

env = DatabricksEnv().withDependencies("dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep-3.20.2-py3-none-any.whl")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

Perilaku di buku catatan dan pekerjaan Databricks

Dalam buku catatan dan pekerjaan, dependensi UDF perlu diinstal langsung di REPL. Databricks Connect memvalidasi lingkungan REPL Python dengan memverifikasi bahwa semua dependensi yang ditentukan sudah diinstal dan melemparkan pengecualian jika ada yang tidak diinstal. Lingkungan notebook menjalankan validasi untuk dependensi volume PyPI dan Unity Catalog, tetapi tidak untuk dependensi lokal.

Keterbatasan

  • Dukungan dependensi UDF untuk pyspark.sql.streaming.DataStreamWriter.foreach memerlukan Databricks Connect untuk Python 18.0 atau lebih tinggi, dan kluster yang menjalankan Databricks Runtime 18.0 atau lebih tinggi.
  • Dukungan dependensi UDF untuk pyspark.sql.streaming.DataStreamWriter.foreachBatch memerlukan Databricks Connect untuk Python 18.0 atau lebih tinggi, dan kluster yang menjalankan Databricks Runtime 18.0 atau lebih tinggi. Fitur ini tidak didukung pada tanpa server.
  • Dukungan dependensi UDF untuk paket lokal, folder, dan file Python memerlukan Databricks Connect untuk Python 18.1 atau lebih tinggi, dan kluster yang menjalankan Databricks Runtime 18.1 atau lebih tinggi.
  • Dependensi UDF tidak didukung untuk UDF agregasi Pandas pada fungsi jendela.
  • Paket volume Unity Catalog dan paket lokal harus dikemas mengikuti spesifikasi kemasan Python standar dari PEP-427 atau yang lebih baru untuk distribusi berbentuk wheel dan PEP-241 atau yang lebih baru untuk distribusi sumber berbentuk tar. Untuk informasi selengkapnya tentang standar pengemasan Python, lihat dokumentasi PyPA.

Contoh

Contoh berikut mendefinisikan dependensi PyPI dan volume di lingkungan, membuat sesi dengan lingkungan tersebut, lalu menentukan dan memanggil UDF yang menggunakan dependensi tersebut:

from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import udf, col, pandas_udf
from pyspark.sql.types import IntegerType, LongType, StringType
import pandas as pd

pypi_deps = ["pyjokes>=0.8,<1"]

volumes_deps = [
    # Example library from: https://pypi.org/project/dice/#files
    "dbfs:/Volumes/main/someone@example.com/test/dice-4.0.0.tar.gz",
]

local_deps = [
    # Example library from: https://pypi.org/project/simplejson/#files
    "local:./test/simplejson-3.20.2-py3-none-any.whl",
]

env = DatabricksEnv().withDependencies(pypi_deps).withDependencies(volumes_deps).withDependencies(local_deps)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

# UDFs
@udf(returnType=StringType())
def get_joke():
    from pyjokes import get_joke
    return get_joke()

@udf(returnType=IntegerType())
def double_and_json_parse(x):
    import simplejson
    return simplejson.loads(simplejson.dumps(x * 2))


@pandas_udf(returnType=LongType())
def multiply_and_add_roll(a: pd.Series, b: pd.Series) -> pd.Series:
    import dice
    return a * b + dice.roll(f"1d10")[0]


df = spark.range(1, 10)
df = df.withColumns({
    "joke": get_joke(),
    "doubled": double_and_json_parse(col("id")),
    "mutliplied_with_roll": multiply_and_add_roll(col("id"), col("doubled"))
})
df.show()

Manajemen otomatis dependensi UDF

Penting

Fitur ini ada di Pratinjau Umum dan memerlukan Databricks Connect untuk Python 18.1 atau lebih tinggi, Python 3.12 pada komputer lokal Anda, dan kluster yang menjalankan Databricks Runtime 18.1 atau lebih tinggi. Untuk menggunakan fitur ini, aktifkan pratinjau Enhanced Python UDFs di Unity Catalog di ruang kerja Anda.

API Databricks Connect withAutoDependencies() memungkinkan penemuan dan pengunggahan otomatis modul lokal dan dependensi PyPI publik yang digunakan dalam pernyataan impor di UDF Anda. Ini menghapus kebutuhan untuk menentukan dependensi secara manual.

Kode berikut memungkinkan manajemen dependensi otomatis:

from databricks.connect import DatabricksSession, DatabricksEnv

env = DatabricksEnv().withAutoDependencies(upload_local=True, use_index=True)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

Metode withAutoDependencies() ini menerima parameter berikut:

  • upload_local: Saat diatur ke True, modul lokal yang diimpor di UDF Anda secara otomatis ditemukan, dibungkus, dan diunggah ke sandbox UDF.
  • use_index: Ketika diatur ke True, paket dependensi PyPI publik yang digunakan dalam UDF Anda akan secara otomatis ditemukan dan kemudian diinstal pada sistem komputasi Azure Databricks. Proses penemuan menggunakan paket yang diinstal pada komputer lokal Anda untuk menentukan versi, memastikan konsistensi antara lingkungan lokal Anda dan lingkungan eksekusi jarak jauh.

Keterbatasan

  • Impor dinamis (misalnya, importlib.import_module("foo")) tidak didukung.
  • Paket namespace (misalnya, azure.eventhub dan google.cloud.aiplatform) tidak didukung.
  • Dependensi yang diinstal menggunakan referensi URL langsung tidak didukung. Ini termasuk yang diinstal dari file wheel lokal.
  • Dependensi yang diinstal dari indeks paket privat tidak didukung. Paket yang diinstal dengan cara ini tidak dapat dibedakan dari paket yang diinstal dari PyPI publik.
  • Penemuan dependensi tidak berfungsi di shell Python. Hanya skrip Python, shell IPython, dan Jupyter Notebook yang didukung.

Contoh

Contoh berikut menunjukkan manajemen dependensi otomatis dengan modul lokal dan paket PyPI. Contoh ini mengharuskan Anda telah menginstal simplejson dan dice (menggunakan pip install simplejson dice).

Pertama, buat modul pembantu lokal:

# my_helper.py
def double(x):
    return 2 * x
# my_json.py
import simplejson

def loads(x):
    return simplejson.loads(x)

def dumps(x):
    return simplejson.dumps(x)

Kemudian, dalam skrip utama Anda, impor modul ini dan gunakan dalam UDF:

# main.py
import dice as dc
from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType, FloatType

import my_json
from my_helper import double

env = DatabricksEnv().withAutoDependencies(upload_local=True, use_index=True)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

@udf(returnType=IntegerType())
def double_and_json_parse(x):
    return my_json.loads(my_json.dumps(double(x)))

@udf(returnType=FloatType())
def sum_and_add_noise(x, y):
    return x + y + (dc.roll("d6")[0] / 6)

df = spark.range(1, 10)
df = df.withColumns({
    "doubled": double_and_json_parse(col("id")),
    "summed_with_noise": sum_and_add_noise(col("id"), col("doubled")),
})
df.show()

Penebangan kayu

Untuk menghasilkan dependensi yang ditemukan, atur SPARK_CONNECT_LOG_LEVEL variabel lingkungan ke info atau debug. Atau, konfigurasikan kerangka kerja pengelogan Python:

import logging
logging.basicConfig(level=logging.INFO)

Log yang relevan dihasilkan oleh modul databricks.connect.auto_dependencies, misalnya:

DEBUG:databricks.connect.auto_dependencies.discovery:Discovered local module: my_json
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered local module: my_helper
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered distribution: simplejson for module simplejson
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered distribution: dice for module dice
INFO:databricks.connect.auto_dependencies.hook:Synced zip artifact for: my_json
INFO:databricks.connect.auto_dependencies.hook:Synced zip artifact for: my_helper
INFO:databricks.connect.auto_dependencies.hook:Updated simplejson with auto-detected version ==3.20.2
INFO:databricks.connect.auto_dependencies.hook:Updated dice with auto-detected version ==4.0.0

Lingkungan dasar Python

UDF dijalankan pada komputasi Databricks dan bukan pada klien. Lingkungan Python dasar tempat UDF dijalankan tergantung pada komputasi Databricks.

Untuk kluster, lingkungan Python dasar adalah lingkungan Python dari versi Databricks Runtime yang berjalan pada kluster. Versi Python dan daftar paket di lingkungan dasar ini ditemukan di bawah lingkungan Sistem dan bagian pustaka Python terinstal dari catatan rilis Databricks Runtime.

Untuk komputasi tanpa server, lingkungan Python dasar sesuai dengan versi lingkungan tanpa server sesuai dengan tabel berikut. Versi Databricks Connect yang tidak tercantum dalam tabel ini belum mendukung tanpa server atau telah mencapai akhir dukungan. Lihat matriks dukungan versi dan versi Databricks Connect yang tidak lagi didukung.

Versi Databricks Connect Lingkungan tanpa server UDF
18.0, Python 3.12 Versi 5
17.2 hingga 17.3, Python 3.12 Versi 4
16.4.1 hingga di bawah 17, Python 3.12 Versi 3
15.4.10 hingga di bawah 16, Python 3.12 Versi 3
15.4.10 ke bawah 16, Python 3.11 Versi 2