Aracılığıyla paylaş


Python için Databricks Connect'te kullanıcı tanımlı işlevler

Not

Bu makale Databricks Runtime 13.3 ve üzeri için Databricks Connect'i kapsar.

Python için Databricks Connect , kullanıcı tanımlı işlevleri (UDF) destekler. UDF'leri içeren bir DataFrame işlemi yürütürse, UDF'ler Databricks Connect tarafından serileştirilir ve isteğin bir parçası olarak sunucuya gönderilir.

Scala için Databricks Connect UDF'leri hakkında bilgi için bkz. Scala için Databricks Connect'te kullanıcı tanımlı işlevler.

Not

Kullanıcı tanımlı işlev hem serileştirildiği hem de seri durumundan çıkarıldığı için, istemcinin Python sürümünün Azure Databricks'teki işlem üzerinde kullanılan Python sürümüyle bire bir aynı olması gerekir. Desteklenen sürümler için bkz. sürüm destek matrisi.

UDF tanımlama

Python için Databricks Connect'te UDF oluşturmak için aşağıdaki desteklenen işlevlerden birini kullanın:

Örneğin, aşağıdaki Python kodu bir sütundaki değerlerin karesini alan basit bir kullanıcı tanımlı fonksiyonu (UDF) oluşturur.

from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from databricks.connect import DatabricksSession

@udf(returnType=IntegerType())
def double(x):
    return x * x

spark = DatabricksSession.builder.getOrCreate()

df = spark.range(1, 2)
df = df.withColumn("doubled", double(col("id")))

df.show()

Bağımlılıkları olan UDF'ler

Önemli

Bu özellik Genel Önizleme aşamasındadır ve Python 16.4 veya üzeri için Databricks Connect ve Databricks Runtime 16.4 veya üzerini çalıştıran bir küme gerektirir. Bu özelliği kullanmak için çalışma alanınızdaki Unity Kataloğu'nda Gelişmiş Python UDF'lerinin önizlemesini etkinleştirin.

Databricks Connect, UDF'ler için gereken Python bağımlılıklarını belirtmeyi destekler. Bu bağımlılıklar, UDF'nin Python ortamının bir parçası olarak Databricks hesaplama kaynakları üzerinde kurulur.

Bu özellik, kullanıcıların temel ortamda sağlanan paketlere ek olarak UDF'nin ihtiyaç duyduğu bağımlılıkları belirtmesine olanak tanır. Temel ortamda sağlanandan paketin farklı bir sürümünü yüklemek için de kullanılabilir.

Bağımlılıklar aşağıdaki kaynaklardan yüklenebilir:

  • PyPI paketleri
    • PyPI paketleri PEP 508'e göre belirtilebilir, örneğin, diceveya pyjokes<1simplejson==3.19.*.
  • Unity Kataloğu birimlerinde depolanan dosyalar
    • Hem tekerlek paketleri (.whl) hem de gzipped tar dosyaları (.tar.gz) desteklenir. Kullanıcıya re:[UC] birimindeki dosya üzerinde izin verilmelidir READ_FILE .
    • Unity Kataloğu birimlerinden paketleri yüklerken, UDF'leri çağırmak için kullanıcıların kaynak birim üzerinde izni olmalıdır READ VOLUME . Bu iznin tüm hesap kullanıcılarına verilmesi, bunu yeni kullanıcılar için otomatik olarak etkinleştirir.
    • Unity Kataloğu birim dosyaları dbfs:<path> olarak belirtilmelidir; örneğin, dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep.whl veya dbfs:/Volumes/users/someone@example.com/tars/my_private_deps.tar.gz.

UDF'nize özel bağımlılıkları eklemek için, withDependencies kullanarak bir ortamda belirtin ve ardından bu ortamı kullanarak bir Spark oturumu oluşturun. Bağımlılıklar Databricks işleminize yüklenir ve bu Spark oturumunu kullanan tüm UDF'lerde kullanılabilir.

Aşağıdaki kod, PyPI paketini dice bağımlılık olarak bildirir:

from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dice==3.1.0")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

Veya bir hacimdeki tekerleğin bağımlılığını belirtmek için:

from databricks.connect import DatabricksSession, DatabricksEnv

env = DatabricksEnv().withDependencies("dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep.whl")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

Databricks not defterlerinde ve işlerinde davranış

Not defterlerinde ve işlerde UDF bağımlılıklarının doğrudan REPL'ye yüklenmesi gerekir. Databricks Connect, belirtilen tüm bağımlılıkların zaten yüklü olduğunu doğrulayarak REPL Python ortamını doğrular ve yüklü değilse bir özel durum oluşturur.

Not defteri ortamı doğrulaması hem PyPI hem de Unity Kataloğu birim bağımlılıkları için gerçekleşir. Hacim bağımlılıklarının, wheel dosyaları için PEP-427 veya daha yeni sürümlerden standart Python paketleme belirtimlerine ve kaynak dağıtım dosyaları için PEP-241 veya daha yeni sürümlerine göre paketlenmesi gerekir. Python paketleme standartları hakkında daha fazla bilgi için PyPA belgelerine bakın.

Sınırlamalar

  • Yerel geliştirme makinenizde Python tekerleği veya kaynak dağıtımı gibi dosyalar doğrudan bağımlılık olarak belirtilemez. Önce Unity Katalog birimlerine yüklenmelidir.
  • UDF bağımlılığı ve pyspark.sql.streaming.DataStreamWriter.foreachpyspark.sql.streaming.DataStreamWriter.foreachBatch desteği için, Python 18.0 veya üzeri için Databricks Connect ve Databricks Runtime 18.0 veya üzerini çalıştıran bir küme gereklidir.
  • UDF bağımlılıkları, pencere işlevleri üzerinden pandas aggregation UDF'leri için desteklenmez.

Örnekler

Aşağıdaki örnek PyPI'yi ve bir ortamdaki bağımlılıkları tanımlar, bu ortamla bir oturum oluşturur, ardından bu bağımlılıkları kullanan UDF'leri tanımlar ve çağırır:

from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import udf, col, pandas_udf
from pyspark.sql.types import IntegerType, LongType, StringType
import pandas as pd

pypi_deps = ["pyjokes>=0.8,<1"]

volumes_deps = [
    # Example library from: https://pypi.org/project/dice/#files
    "dbfs:/Volumes/main/someone@example.com/test/dice-4.0.0-py3-none-any.whl"
    # Example library from: https://pypi.org/project/simplejson/#files
    "dbfs:/Volumes/main/someone@example.com/test/simplejson-3.19.3.tar.gz",
]

env = DatabricksEnv().withDependencies(pypi_deps).withDependencies(volumes_deps)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

# UDFs
@udf(returnType=StringType())
def get_joke():
    from pyjokes import get_joke
    return get_joke()

@udf(returnType=IntegerType())
def double_and_json_parse(x):
    import simplejson
    return simplejson.loads(simplejson.dumps(x * 2))


@pandas_udf(returnType=LongType())
def multiply_and_add_roll(a: pd.Series, b: pd.Series) -> pd.Series:
    import dice
    return a * b + dice.roll(f"1d10")[0]


df = spark.range(1, 10)
df = df.withColumns({
    "joke": get_joke(),
    "doubled": double_and_json_parse(col("id")),
    "mutliplied_with_roll": multiply_and_add_roll(col("id"), col("doubled"))
})
df.show()

Python temel ortamı

UDF'ler, istemcide değil, Databricks hesaplama ortamında yürütülür. UDF'lerin yürütüldüğü temel Python ortamı, Databricks hesaplama ortamına bağlıdır.

Kümeler için temel Python ortamı, kümede çalışan Databricks Runtime sürümünün Python ortamıdır. Python sürümü ve bu temel ortamdaki paketlerin listesi Databricks Runtime sürüm notlarınınSistem ortamı ve Yüklü Python kitaplıkları bölümlerinde bulunur.

Sunucusuz işlem için temel Python ortamı, aşağıdaki tabloya göre sunucusuz ortam sürümüne karşılık gelir.

Databricks Connect sürümü UDF sunucusuz ortam
17.0 - 17.3, Python 3.12 Sürüm 4
16.4.1'den 17'ye kadar, Python 3.12 Sürüm 3
15.4.10'dan 16'nın altına kadar, Python 3.12 Sürüm 3
15.4.10 sürümünden 16 sürümüne kadar, Python 3.11 Sürüm 2
15.4.0 - 15.4.9 ve 16.0 - 16.3 En son sunucusuz işlem. Kararlı bir Python ortamı için lütfen 15.4.10 LTS ve üzeri veya 16.4.1 LTS ve üzeri sürümlerine geçin.