Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Not
Bu makale Databricks Runtime 13.3 ve üzeri için Databricks Connect'i kapsar.
Python için Databricks Connect , kullanıcı tanımlı işlevleri (UDF) destekler. UDF'leri içeren bir DataFrame işlemi yürütürse, UDF'ler Databricks Connect tarafından serileştirilir ve isteğin bir parçası olarak sunucuya gönderilir.
Scala için Databricks Connect UDF'leri hakkında bilgi için bkz. Scala için Databricks Connect'te kullanıcı tanımlı işlevler.
Not
Kullanıcı tanımlı işlev hem serileştirildiği hem de seri durumundan çıkarıldığı için, istemcinin Python sürümünün Azure Databricks'teki işlem üzerinde kullanılan Python sürümüyle bire bir aynı olması gerekir. Desteklenen sürümler için bkz. sürüm destek matrisi.
UDF tanımlama
Python için Databricks Connect'te UDF oluşturmak için aşağıdaki desteklenen işlevlerden birini kullanın:
- PySpark kullanıcı tanımlı işlevler
- PySpark akış işlevleri
Örneğin, aşağıdaki Python kodu bir sütundaki değerlerin karesini alan basit bir kullanıcı tanımlı fonksiyonu (UDF) oluşturur.
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from databricks.connect import DatabricksSession
@udf(returnType=IntegerType())
def double(x):
return x * x
spark = DatabricksSession.builder.getOrCreate()
df = spark.range(1, 2)
df = df.withColumn("doubled", double(col("id")))
df.show()
UDF bağımlılıklarını yönetme
Önemli
Bu özellik Genel Önizleme aşamasındadır ve Python 16.4 veya üzeri için Databricks Connect ve Databricks Runtime 16.4 veya üzerini çalıştıran bir küme gerektirir. Bu özelliği kullanmak için çalışma alanınızdaki Unity Kataloğu'nda Gelişmiş Python UDF'lerinin önizlemesini etkinleştirin.
Databricks Connect, UDF'ler için gereken Python bağımlılıklarını belirtmeyi destekler. Bu bağımlılıklar, UDF'nin Python ortamının bir parçası olarak Databricks hesaplama kaynakları üzerinde kurulur.
Bu özellik, kullanıcıların temel ortamda sağlanan paketlere ek olarak UDF'nin ihtiyaç duyduğu bağımlılıkları belirtmesine olanak tanır. Temel ortamda sağlanandan paketin farklı bir sürümünü yüklemek için de kullanılabilir.
Bağımlılıklar aşağıdaki kaynaklardan yüklenebilir:
- PyPI paketleri
- PyPI paketleri PEP 508'e göre belirtilebilir, örneğin,
diceveyapyjokes<1simplejson==3.19.*.
- PyPI paketleri PEP 508'e göre belirtilebilir, örneğin,
- Unity Kataloğu birimlerinde depolanan paketler
- Hem yerleşik dağıtımlar (
.whl) hem de kaynak dağıtımları (.tar.gz) desteklenir. - Unity Catalog hacim paketleri, örneğin
dbfs:<path>,dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep-3.20.2-py3-none-any.whlolarak veyadbfs:/Volumes/users/someone@example.com/tars/my_private_dep-4.0.0.tar.gzbelirtilebilir. - Kullanıcıya re:[UC] birimindeki dosya üzerinde izin verilmelidir
READ_FILE. Bu iznin tüm hesap kullanıcılarına verilmesi, bunu yeni kullanıcılar için otomatik olarak etkinleştirir.
- Hem yerleşik dağıtımlar (
- Yerel paketler, klasörler ve Python dosyaları
- Yerel yerleşik dağıtımlar (
.whl), kaynak dağıtımları (.tar.gz), klasörler ve Python dosyaları olaraklocal:<path>belirtilebilir, örneğin:local:/path/to/my_private_dep-3.20.2-py3-none-any.whl,local:/path/to/my_private_dep-4.0.0.tar.gz,local:/path/to/my_folder,local:/path/to/my_file.py. - Hem mutlak hem de göreli yollar desteklenir, örneğin:
local:/path/to/my_file.pyveyalocal:./path/to/my_file.py.
- Yerel yerleşik dağıtımlar (
UDF'nize özel bağımlılıkları eklemek için, withDependencies kullanarak bir ortamda belirtin ve ardından bu ortamı kullanarak bir Spark oturumu oluşturun. Bağımlılıklar Databricks işleminize yüklenir ve bu Spark oturumunu kullanan tüm UDF'lerde kullanılabilir.
Aşağıdaki kod, PyPI paketini dice bağımlılık olarak bildirir:
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dice==3.1.0")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
Veya bir hacimdeki tekerleğin bağımlılığını belirtmek için:
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep-3.20.2-py3-none-any.whl")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
Databricks not defterlerinde ve işlerinde davranış
Not defterlerinde ve işlerde UDF bağımlılıklarının doğrudan REPL'ye yüklenmesi gerekir. Databricks Connect, belirtilen tüm bağımlılıkların zaten yüklü olduğunu doğrulayarak REPL Python ortamını doğrular ve yüklü değilse bir özel durum oluşturur. Notebook ortamının doğrulaması, hem PyPI hem de Unity Kataloğu hacim bağımlılıkları için çalışır, ancak yerel bağımlılıklar için çalışmaz.
Sınırlamalar
-
pyspark.sql.streaming.DataStreamWriter.foreachUDF bağımlılıkları desteği için, Python 18.0 veya üzeri ile Databricks Connect ve Databricks Runtime 18.0 veya daha üzerini çalıştıran bir küme gerektirir. - UDF bağımlılıkları desteği için
pyspark.sql.streaming.DataStreamWriter.foreachBatch, Python 18.0 veya üzeri için Databricks Connect ve Databricks Runtime 18.0 veya üzerini çalıştıran bir küme gerektirir. Bu özellik sunucusuz olarak desteklenmez. - Yerel paketler, klasörler ve Python dosyaları için UDF bağımlılıkları desteği için Python 18.1 veya üzeri için Databricks Connect ve Databricks Runtime 18.1 veya üzerini çalıştıran bir küme gerekir.
- UDF bağımlılıkları, pencere işlevleri üzerinden pandas aggregation UDF'leri için desteklenmez.
- Unity Kataloğu birim paketleri ve yerel paketler, tekerlekle oluşturulmuş dağıtımlar için PEP-427 veya sonraki sürümlerden standart Python paketleme belirtimlerine ve tar kaynağı dağıtımları için PEP-241 veya sonraki sürümlere göre paketlenmelidir. Python paketleme standartları hakkında daha fazla bilgi için PyPA belgelerine bakın.
Örnekler
Aşağıdaki örnek PyPI'yi ve bir ortamdaki bağımlılıkları tanımlar, bu ortamla bir oturum oluşturur, ardından bu bağımlılıkları kullanan UDF'leri tanımlar ve çağırır:
from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import udf, col, pandas_udf
from pyspark.sql.types import IntegerType, LongType, StringType
import pandas as pd
pypi_deps = ["pyjokes>=0.8,<1"]
volumes_deps = [
# Example library from: https://pypi.org/project/dice/#files
"dbfs:/Volumes/main/someone@example.com/test/dice-4.0.0.tar.gz",
]
local_deps = [
# Example library from: https://pypi.org/project/simplejson/#files
"local:./test/simplejson-3.20.2-py3-none-any.whl",
]
env = DatabricksEnv().withDependencies(pypi_deps).withDependencies(volumes_deps).withDependencies(local_deps)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
# UDFs
@udf(returnType=StringType())
def get_joke():
from pyjokes import get_joke
return get_joke()
@udf(returnType=IntegerType())
def double_and_json_parse(x):
import simplejson
return simplejson.loads(simplejson.dumps(x * 2))
@pandas_udf(returnType=LongType())
def multiply_and_add_roll(a: pd.Series, b: pd.Series) -> pd.Series:
import dice
return a * b + dice.roll(f"1d10")[0]
df = spark.range(1, 10)
df = df.withColumns({
"joke": get_joke(),
"doubled": double_and_json_parse(col("id")),
"mutliplied_with_roll": multiply_and_add_roll(col("id"), col("doubled"))
})
df.show()
UDF bağımlılıklarının otomatik yönetimi
Önemli
Bu özellik Genel Önizleme aşamasındadır ve Python 18.1 veya üzeri için Databricks Connect, yerel makinenizde Python 3.12 ve Databricks Runtime 18.1 veya üzerini çalıştıran bir küme gerektirir. Bu özelliği kullanmak için çalışma alanınızdaki Unity Kataloğu'nda Gelişmiş Python UDF'lerinin önizlemesini etkinleştirin.
Databricks Connect withAutoDependencies() API'si, UDF'lerinizdeki içeri aktarma deyimlerinde kullanılan yerel modüllerin ve genel PyPI bağımlılıklarının otomatik olarak bulunmasını ve karşıya yüklenmesini sağlar. Bağımlılıkları el ile belirtme gereksinimini ortadan kaldırır.
Aşağıdaki kod otomatik bağımlılık yönetimini etkinleştirir:
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withAutoDependencies(upload_local=True, use_index=True)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
withAutoDependencies() yöntemi aşağıdaki parametreleri kabul eder:
-
upload_local: olarakTrueayarlandığında, UDF'lerinizde içeri aktarılan yerel modüller otomatik olarak bulunur, paketlenir ve UDF korumalı alanına yüklenir. -
use_index: olarakTrueayarlandığında, UDF'lerinizde kullanılan genel PyPI bağımlılıkları otomatik olarak bulunur ve Azure Databricks işlemine yüklenir. Bulma işlemi, yerel ortamınızla uzaktan yürütme ortamı arasında tutarlılık sağlayarak sürümleri belirlemek için yerel makinenizde yüklü paketleri kullanır.
Sınırlamalar
- Dinamik içeri aktarmalar (örneğin,
importlib.import_module("foo")) desteklenmez. - Ad alanı paketleri (örneğin,
azure.eventhubvegoogle.cloud.aiplatform) desteklenmez. - Doğrudan URL başvuruları kullanılarak yüklenen bağımlılıklar desteklenmez. Bu, yerel wheel dosyalarından yüklenenleri içerir.
- Özel paket dizinlerinden yüklenen bağımlılıklar desteklenmez. Bu şekilde yüklenen paketler, genel PyPI'dan yüklenen paketlerden ayırt edilemez.
- Bağımlılık bulma, Python kabuğunda çalışmaz. Yalnızca Python betikleri, IPython kabuğu ve Jupyter Not Defterleri desteklenir.
Örnekler
Aşağıdaki örnekte hem yerel modüller hem de PyPI paketleriyle otomatik bağımlılık yönetimi gösterilmektedir. Bu örnek, pip install simplejson dice kullanarak simplejson ve dice'in yüklenmiş olmasını gerektirir.
İlk olarak yerel yardımcı modüller oluşturun:
# my_helper.py
def double(x):
return 2 * x
# my_json.py
import simplejson
def loads(x):
return simplejson.loads(x)
def dumps(x):
return simplejson.dumps(x)
Ardından ana betiğinizde bu modülleri içeri aktarıp UDF'lerde kullanın:
# main.py
import dice as dc
from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType, FloatType
import my_json
from my_helper import double
env = DatabricksEnv().withAutoDependencies(upload_local=True, use_index=True)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
@udf(returnType=IntegerType())
def double_and_json_parse(x):
return my_json.loads(my_json.dumps(double(x)))
@udf(returnType=FloatType())
def sum_and_add_noise(x, y):
return x + y + (dc.roll("d6")[0] / 6)
df = spark.range(1, 10)
df = df.withColumns({
"doubled": double_and_json_parse(col("id")),
"summed_with_noise": sum_and_add_noise(col("id"), col("doubled")),
})
df.show()
Ağaç kesimi
Bulunan bağımlılıkların çıktısını almak için ortam değişkenini SPARK_CONNECT_LOG_LEVEL veya infoolarak debug ayarlayın. Alternatif olarak Python günlüğe kaydetme çerçevesini yapılandırın:
import logging
logging.basicConfig(level=logging.INFO)
İlgili günlükler databricks.connect.auto_dependencies modülü tarafından yayılır, örneğin:
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered local module: my_json
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered local module: my_helper
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered distribution: simplejson for module simplejson
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered distribution: dice for module dice
INFO:databricks.connect.auto_dependencies.hook:Synced zip artifact for: my_json
INFO:databricks.connect.auto_dependencies.hook:Synced zip artifact for: my_helper
INFO:databricks.connect.auto_dependencies.hook:Updated simplejson with auto-detected version ==3.20.2
INFO:databricks.connect.auto_dependencies.hook:Updated dice with auto-detected version ==4.0.0
Python temel ortamı
UDF'ler, istemcide değil, Databricks hesaplama ortamında yürütülür. UDF'lerin yürütüldüğü temel Python ortamı, Databricks hesaplama ortamına bağlıdır.
Kümeler için temel Python ortamı, kümede çalışan Databricks Runtime sürümünün Python ortamıdır. Python sürümü ve bu temel ortamdaki paketlerin listesi Databricks Runtime sürüm notlarınınSistem ortamı ve Yüklü Python kitaplıkları bölümlerinde bulunur.
Sunucusuz işlem için temel Python ortamı, aşağıdaki tabloya göre sunucusuz ortam sürümüne karşılık gelir. Bu tabloda listelenmeyen Databricks Connect sürümleri sunucusuz sürümü desteklemez veya destek sonuna ulaşmıştır. Bkz. sürüm destek matrisi ve destek sonu Databricks Connect sürümleri.
| Databricks Connect sürümü | UDF sunucusuz ortam |
|---|---|
| 18.0, Python 3.12 | Sürüm 5 |
| 17.2 - 17.3, Python 3.12 | Sürüm 4 |
| 16.4.1'den 17'ye kadar, Python 3.12 | Sürüm 3 |
| 15.4.10'dan 16'nın altına kadar, Python 3.12 | Sürüm 3 |
| 15.4.10 sürümünden 16 sürümüne kadar, Python 3.11 | Sürüm 2 |