Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Bu makalede Python kullanıcı tanımlı işlev (UDF) örnekleri yer alır. UDF'lerin nasıl kaydedileceklerini, UDF'lerin nasıl çağrılacaklarını gösterir ve Spark SQL'de alt ifadelerin değerlendirme sırası hakkında uyarılar sağlar.
Databricks Runtime 14.0 ve üzerinde, skaler değerler yerine tüm ilişkileri döndüren işlevleri kaydetmek için Python kullanıcı tanımlı tablo işlevlerini (UDF) kullanabilirsiniz. bkz. Python kullanıcı tanımlı tablo işlevleri (UDF).
Uyarı
Databricks Runtime 12.2 LTS ve altında Python UDF'leri ve Pandas UDF'leri standart erişim modunu kullanan Unity Kataloğu işlemlerinde desteklenmez. Skaler Python UDF'leri ve Pandas UDF'leri tüm erişim modları için Databricks Runtime 13.3 LTS ve üzerinde desteklenir.
Databricks Runtime 13.3 LTS ve üzerinde SQL söz dizimlerini kullanarak skaler Python UDF'lerini Unity Kataloğu'na kaydedebilirsiniz. Bkz. Unity Kataloğundaki Kullanıcı Tanımlı Fonksiyonlar (UDF).
İşlevi UDF olarak kaydetme
def squared(s):
return s * s
spark.udf.register("squaredWithPython", squared)
İsteğe bağlı olarak UDF'nizin dönüş türünü ayarlayabilirsiniz. Varsayılan dönüş türü şeklindedir StringType.
from pyspark.sql.types import LongType
def squared_typed(s):
return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())
Spark SQL'de UDF'yi çağırma
spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test
DataFrame'lerle UDF kullanma
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
Alternatif olarak, anotasyon söz dizimini kullanarak aynı UDF'yi bildirebilirsiniz.
from pyspark.sql.functions import udf
@udf("long")
def squared_udf(s):
return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
UDF ile çeşitlemeler
Değişken için PySpark türü VariantType olarak belirtilir ve değerler VariantVal türündedir. Çeşitlemeler hakkında bilgi için bkz. Değişken verilerini sorgulama.
from pyspark.sql.types import VariantType
# Return Variant
@udf(returnType = VariantType())
def toVariant(jsonString):
return VariantVal.parseJson(jsonString)
spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toVariant(col("json"))).display()
+---------------+
|toVariant(json)|
+---------------+
| {"a":1}|
+---------------+
# Return Struct<Variant>
@udf(returnType = StructType([StructField("v", VariantType(), True)]))
def toStructVariant(jsonString):
return {"v": VariantVal.parseJson(jsonString)}
spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toStructVariant(col("json"))).display()
+---------------------+
|toStructVariant(json)|
+---------------------+
| {"v":{"a":1}}|
+---------------------+
# Return Array<Variant>
@udf(returnType = ArrayType(VariantType()))
def toArrayVariant(jsonString):
return [VariantVal.parseJson(jsonString)]
spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toArrayVariant(col("json"))).display()
+--------------------+
|toArrayVariant(json)|
+--------------------+
| [{"a":1}]|
+--------------------+
# Return Map<String, Variant>
@udf(returnType = MapType(StringType(), VariantType(), True))
def toArrayVariant(jsonString):
return {"v1": VariantVal.parseJson(jsonString), "v2": VariantVal.parseJson("[" + jsonString + "]")}
spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toArrayVariant(col("json"))).display()
+-----------------------------+
| toArrayVariant(json)|
+-----------------------------+
|{"v2":[{"a":1}],"v1":{"a":1}}|
+-----------------------------+
Değerlendirme sırası ve null denetimi
Spark SQL (SQL ve DataFrame ile Veri Kümesi API'si dahil) alt ifadelerin değerlendirilme sırasını garanti etmez. Özellikle, bir işlecin veya işlevin girişleri mutlaka soldan sağa veya başka bir sabit sırada değerlendirilmez. Örneğin, mantıksal AND ve OR ifadelerde soldan sağa "kısa devre" semantiği yoktur.
Bu nedenle, boole ifadelerinin yan etkilerine veya değerlendirme sırasına, ayrıca WHERE ve HAVING tümcelerinin dizilimine güvenmek tehlikelidir; çünkü bu ifadeler ve tümceler, sorgu iyileştirme ve planlama sırasında yeniden düzenlenebilir. Özellikle, bir UDF null denetim için SQL'de kısa devre semantiği kullanıyorsa, UDF'yi çağırmadan önce null denetimin gerçekleşeceğinin garantisi yoktur. Örneğin,
spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee
Bu WHERE madde, null değerleri filtreledikten sonra UDF'nin çağrılmasını garanti etmez strlen.
Doğru null denetimi gerçekleştirmek için aşağıdakilerden birini yapmanızı öneririz:
- UDF'nin kendisini null algılayan hale getirin ve UDF'nin içinde null denetimi yapın
- Null denetimi yapmak ve koşullu bir dalda UDF'yi çağırmak için
IFveyaCASE WHENifadelerini kullanın.
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1") // ok
Skaler Python UDF'lerinde hizmet kimlik bilgileri
Skaler Python UDF'leri, dış bulut hizmetlerine güvenli bir şekilde erişmek için Unity Kataloğu hizmeti kimlik bilgilerini kullanabilir. Bu, bulut tabanlı belirteçleştirme, şifreleme veya gizli dizi yönetimi gibi işlemleri doğrudan veri dönüşümlerinizle tümleştirmek için kullanışlıdır.
Skaler Python UDF'leri için hizmet kimlik bilgileri yalnızca SQL ambarı ve genel işlemde desteklenir.
Uyarı
Skaler Python UDF'lerindeki hizmet kimlik bilgileri Databricks Runtime 17.1 ve üzerini gerektirir.
Hizmet kimlik bilgileri oluşturmak için bkz. Hizmet kimlik bilgileri oluşturma.
Uyarı
Hizmet kimlik bilgileri için UDF'ye özgü API:
UDF'lerde, hizmet kimlik bilgilerine erişmek için kullanın databricks.service_credentials.getServiceCredentialsProvider() .
Bu, UDF yürütme bağlamlarında bulunmayan not defterlerinde kullanılan işlevden dbutils.credentials.getServiceCredentialsProvider() farklıdır.
Hizmet kimlik bilgilerine erişmek için UDF mantığınızdaki yardımcı programını kullanarak databricks.service_credentials.getServiceCredentialsProvider() bulut SDK'larını uygun kimlik bilgileriyle başlatın. Tüm kod UDF gövdesinde kapsüllenmelidir.
@udf
def use_service_credential():
from azure.mgmt.web import WebSiteManagementClient
# Assuming there is a service credential named 'testcred' set up in Unity Catalog
web_client = WebSiteManagementClient(subscription_id, credential = getServiceCredentialsProvider('testcred'))
# Use web_client to perform operations
Hizmet kimlik bilgileri izinleri
UDF'yi oluşturan, Unity Kataloğu hizmeti kimlik bilgileri üzerinde ACCESS iznine sahip olmalıdır.
Ayrılmış kümeler olarak da bilinen No-PE kapsamında çalışan UDF'ler, hizmet kimlik bilgileri üzerinde YÖNET izinleri gerektirir.
Varsayılan kimlik bilgileri
Skaler Python UDF'lerinde kullanıldığında Databricks, işlem ortamı değişkenindeki varsayılan hizmet kimlik bilgilerini otomatik olarak kullanır. Bu davranış, UDF kodunuzda kimlik bilgileri diğer adlarını açıkça yönetmeden dış hizmetlere güvenli bir şekilde başvurmanızı sağlar. Bkz . İşlem kaynağı için varsayılan hizmet kimlik bilgilerini belirtme
Varsayılan kimlik bilgisi desteği yalnızca Standart ve Ayrılmış erişim modu kümelerinde kullanılabilir. DBSQL'de kullanılamaz.
Sağlayıcıyı kullanmak için azure-identity paketini yüklemeniz gerekir. Paketi yüklemek için bkz . Not defteri kapsamlı Python kitaplıkları veya İşlem kapsamlı kitaplıklar.
@udf
def use_service_credential():
from azure.identity import DefaultAzureCredential
from azure.mgmt.web import WebSiteManagementClient
# DefaultAzureCredential is automatically using the default service credential for the compute
web_client_default = WebSiteManagementClient(DefaultAzureCredential(), subscription_id)
# Use web_client to perform operations
Görev yürütme bağlamını al
Kullanıcının kimliği, küme etiketleri, spark görev kimliği ve daha fazlası gibi bağlam bilgilerini almak için TaskContext PySpark API'sini kullanın. Bkz. UDF'de görev bağlamı alma.
Sınırlamalar
PySpark UDF'leri için aşağıdaki sınırlamalar geçerlidir:
Dosya erişim kısıtlamaları: Databricks Runtime 14.2 ve altında, paylaşılan kümelerdeki PySpark UDF'leri Git klasörlerine, çalışma alanı dosyalarına veya Unity Katalog Birimlerine erişemez.
Yayın değişkenleri: Standart erişim modu kümelerindeki PySpark UDF'leri ve sunucusuz işlem yayın değişkenlerini desteklemez.
Hizmet kimlik bilgileri: Hizmet kimlik bilgileri yalnızca Batch Unity Kataloğu Python UDF'lerinde ve Skaler Python UDF'lerinde kullanılabilir. Standart Unity Kataloğu Python UDF'lerinde desteklenmez.
Hizmet kimlik bilgileri: Hizmet kimlik bilgileri yalnızca sunucusuz ortam sürüm 3 veya üzeri kullanılırken sunucusuz işlemde kullanılabilir. bkz. Sunucusuz ortam sürümleri.
- Sunucusuz üzerinde bellek sınırı: Sunucusuz işlemdeki PySpark UDF'leri, PySpark UDF başına 1 GB bellek sınırına sahiptir. Bu sınırın aşılması, UDF_PYSPARK_USER_CODE_ERROR türünde bir hatayla sonuçlanır . MEMORY_LIMIT_SERVERLESS.
- Standart erişim modunda bellek sınırı: Standart erişim modundaki PySpark UDF'leri, seçilen örnek türünün kullanılabilir belleğine göre bir bellek sınırına sahiptir. Kullanılabilir belleğin aşılması, UDF_PYSPARK_USER_CODE_ERROR türünde bir hatayla sonuçlanır . MEMORY_LIMIT.