Aracılığıyla paylaş


Kullanıcı tanımlı işlevler (UDF) nedir?

Kullanıcı tanımlı işlevler (UDF), Azure Databricks'te yerleşik işlevselliği genişleten kodu yeniden kullanmanıza ve paylaşmanıza olanak tanır. Karmaşık hesaplamalar, dönüştürmeler veya özel veri işlemeleri gibi belirli görevleri gerçekleştirmek için UDF'leri kullanın.

UDF ve Apache Spark işlevi ne zaman kullanılır?

Yerleşik Apache Spark işlevleriyle ifade etmek zor olan mantık için UDF'leri kullanın. Yerleşik Apache Spark işlevleri dağıtılmış işleme için iyileştirilmiştir ve büyük ölçekte daha iyi performans sunar. Daha fazla bilgi için bkz. İşlevleri.

Databricks geçici sorgular, el ile veri temizleme, keşif veri analizi ve küçük ve orta ölçekli veri kümelerindeki işlemler için UDF'ler önerir. UDF'ler için yaygın kullanım örnekleri arasında veri şifreleme, şifre çözme, karma oluşturma, JSON ayrıştırma ve doğrulama sayılabilir.

Çok büyük veri kümelerindeki işlemler için Apache Spark yöntemlerini kullanın ve ETL işleri ve akış işlemleri dahil olmak üzere tüm iş yükleri düzenli veya sürekli olarak çalışır.

UDF türlerini anlama

Açıklamayı, örneği ve daha fazla bilgi edinmek için bağlantıyı görmek için aşağıdaki sekmelerden bir UDF türü seçin.

Skaler UDF

Skaler UDF'ler tek bir satırda çalışır ve her satır için tek bir sonuç değeri döndürür. Unity Kataloğu yönetilebilir veya oturum kapsamına alınmış olabilir.

Aşağıdaki örnekte, bir sütundaki her adın uzunluğunu hesaplamak ve değerini yeni namebir name_length sütuna eklemek için skaler UDF kullanılır.

+-------+-------+
| name  | score |
+-------+-------+
| alice |  10.0 |
| bob   |  20.0 |
| carol |  30.0 |
| dave  |  40.0 |
| eve   |  50.0 |
+-------+-------+
-- Create a SQL UDF for name length
CREATE OR REPLACE FUNCTION main.test.get_name_length(name STRING)
RETURNS INT
RETURN LENGTH(name);

-- Use the UDF in a SQL query
SELECT name, main.test.get_name_length(name) AS name_length
FROM your_table;
+-------+-------+-------------+
| name  | score | name_length |
+-------+-------+-------------+
| alice |  10.0 |      5      |
|  bob  |  20.0 |      3      |
| carol |  30.0 |      5      |
| dave  |  40.0 |      4      |
|  eve  |  50.0 |      3      |
+-------+-------+-------------+

Bunu PySpark kullanarak databricks not defterinde uygulamak için:

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

@udf(returnType=IntegerType())
def get_name_length(name):
  return len(name)

df = df.withColumn("name_length", get_name_length(df.name))

# Show the result
display(df)

Bkz. Unity Kataloğu'nda kullanıcı tanımlı işlevler (UDF' ler) ve Kullanıcı tanımlı skaler işlevler - Python.

Toplu Skaler UDF'ler

1:1 giriş/çıkış satırı eşliğini korurken verileri toplu işleyin. Bu, büyük ölçekli veri işleme için satır satır işlemleri yükünü azaltır. Toplu iş UDF'leri ayrıca daha verimli çalışmak, kaynakları yeniden kullanmak ve veri öbekleri arasında bağlam gerektiren karmaşık hesaplamaları işlemek için toplu işler arasında durumu korur.

Unity Kataloğu yönetilebilir veya oturum kapsamına alınmış olabilir.

Aşağıdaki Batch Unity Catalog Python UDF, satırlar üzerinde işlem yaparken BMI'yi hesaplar:

+-------------+-------------+
| weight_kg   | height_m    |
+-------------+-------------+
|      90     |     1.8     |
|      77     |     1.6     |
|      50     |     1.5     |
+-------------+-------------+
%sql
CREATE OR REPLACE FUNCTION main.test.calculate_bmi_pandas(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
AS $$
import pandas as pd
from typing import Iterator, Tuple

def handler_function(batch_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
  for weight_series, height_series in batch_iter:
    yield weight_series / (height_series ** 2)
$$;

select main.test.calculate_bmi_pandas(cast(70 as double), cast(1.8 as double));
+--------+
|  BMI   |
+--------+
|  27.8  |
|  30.1  |
|  22.2  |
+--------+

Bkz. Unity Kataloğu'nda Kullanıcı tanımlı işlevler (UDF'ler) ve Unity Kataloğu'nda Toplu Python Kullanıcı tanımlı işlevler (UDF'ler).

Skaler Olmayan UDF'ler

Skaler olmayan UDF'ler esnek giriş/çıkış oranlarıyla (1:N veya çok:çok) veri kümelerinin/sütunların tamamında çalışır.

Oturum düzeyinde toplu pandas UDF'leri aşağıdaki türlerde olabilir:

  • Seriden Seriye
  • Seri Yineleyiciden Seri Yineleyicisine
  • Birden çok Serinin Yineleyicisinden Seri Yineleyicisine
  • Seriden skalara

Aşağıda, Seriden Seriye pandas UDF örneği verilmiştir.

from pyspark.sql.functions import pandas_udf
import pandas as pd

df = spark.createDataFrame([(70, 1.75), (80, 1.80), (60, 1.65)], ["Weight", "Height"])

@pandas_udf("double")
def calculate_bmi_pandas(weight: pd.Series, height: pd.Series) -> pd.Series:
  return weight / (height ** 2)

df.withColumn("BMI", calculate_bmi_pandas(df["Weight"], df["Height"])).display()

Bkz. pandas kullanıcı tanımlı işlevler.

UDAF

UDAF'ler birden çok satırda çalışır ve tek bir toplu sonuç döndürür. UDAF'ler yalnızca oturum kapsamına alınır.

Aşağıdaki UDAF örneği, puanları ad uzunluğuna göre toplar.

from pyspark.sql.functions import pandas_udf
from pyspark.sql import SparkSession
import pandas as pd

# Define a pandas UDF for aggregating scores
@pandas_udf("int")
def total_score_udf(scores: pd.Series) -> int:
  return scores.sum()

# Group by name length and aggregate
result_df = (df.groupBy("name_length")
  .agg(total_score_udf(df["score"]).alias("total_score")))

display(result_df)
+-------------+-------------+
| name_length | total_score |
+-------------+-------------+
|      3      |     70.0    |
|      4      |     40.0    |
|      5      |     40.0    |
+-------------+-------------+

Bkz. Python için pandas kullanıcı tanımlı işlevler ve Kullanıcı tanımlı toplama işlevleri - Scala.

UDTF'ler

UDTF, bir veya daha fazla giriş argümanını alır ve her giriş satırı için birden çok satır (ve muhtemelen birden çok sütun) döndürür. Unity Kataloğu yönetilebilir veya oturum kapsamına alınmış olabilir.

Aşağıdaki UDTF, iki tamsayı bağımsız değişkeninin sabit listesini kullanarak bir tablo oluşturur:

CREATE OR REPLACE FUNCTION get_sum_diff(x INT, y INT)
RETURNS TABLE (sum INT, diff INT)
LANGUAGE PYTHON
HANDLER 'GetSumDiff'
AS $$
class GetSumDiff:
    def eval(self, x: int, y: int):
        yield x + y, x - y
$$;

SELECT * FROM get_sum_diff(10, 3);
+-----+------+
| sum | diff |
+-----+------+
| 13  | 7    |
+-----+------+

Bunu PySpark kullanarak databricks not defterinde uygulamak için:

from pyspark.sql.functions import lit, udtf

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, x: int, y: int):
        yield x + y, x - y

GetSumDiff(lit(1), lit(2)).show()

Bkz. Unity Kataloğu UDF'leri ve oturum kapsamlı UDF'ler.

Unity Kataloğu yönetilen ve oturum kapsamı belirlenmiş UDF'ler karşılaştırması

Unity Kataloğu Python UDF'leri, Batch Unity Kataloğu Python UDF'leri ve Unity Kataloğu Python UDF'leri, iyileştirilmiş idare, yeniden kullanım ve bulunabilirlik için Unity Kataloğu'nda kalıcıdır. Diğer tüm UDF'ler oturum tabanlıdır; başka bir deyişle bunlar bir not defterinde veya işte tanımlanır ve kapsamı geçerli SparkSession ile belirlenir. Scala veya Python kullanarak oturum kapsamlı UDF'leri tanımlayabilir ve bu UDF'lere erişebilirsiniz.

Unity Kataloğu tarafından yönetilen UDF'ler özet bilgi sayfası

Unity Kataloğu'na tabi UDF'ler, özel işlevlerin farklı bilgi işlem ortamlarında tanımlanmasına, kullanılmasına, güvenli bir şekilde paylaşılmasına ve yönetilmesine olanak tanır. Bkz Unity Kataloğu'nda Kullanıcı tanımlı işlevler (UDF'ler).

UDF türü Desteklenen hesaplama Açıklama
Unity Kataloğu Python UDF
  • Sunucusuz not defterleri ve işler
  • Standart erişim moduyla klasik işlem (Databricks Runtime 13.3 LTS ve üzeri)
  • SQL ambarı (sunucusuz ve profesyonel)
  • Lakeflow Spark Bildirimli İşlem Hatları (klasik ve sunucusuz)
Python'da bir UDF tanımlayın ve bunu idare için Unity Kataloğu'nda kaydedin.
Skaler UDF'ler tek bir satırda çalışır ve her satır için tek bir sonuç değeri döndürür.
Batch Unity Kataloğu Python UDF
  • Sunucusuz not defterleri ve işler
  • Standart erişim moduyla klasik işlem (Databricks Runtime 16.3 ve üzeri)
  • SQL ambarı (sunucusuz ve profesyonel)
Python'da bir UDF tanımlayın ve bunu idare için Unity Kataloğu'nda kaydedin.
Birden çok değer üzerinde toplu işlem gerçekleştirir ve birden çok değer döndürür. Büyük ölçekli veri işleme için satır satır işlemleri yükünü azaltır.
Unity Kataloğu Python UDTF
  • Sunucusuz not defterleri ve işler
  • Standart erişim moduyla klasik işlem (Databricks Runtime 17.1 ve üzeri)
  • SQL ambarı (sunucusuz ve profesyonel)
Python'da bir UDTF tanımlayın ve bunu idare için Unity Kataloğu'nda kaydedin.
UDTF, bir veya daha fazla giriş argümanını alır ve her giriş satırı için birden çok satır (ve muhtemelen birden çok sütun) döndürür.

Kullanıcı tarafından yalıtılmış hesaplama için oturum kapsamlı UDF'ler rehberi

Oturuma özgü UDF'ler bir not defterinde veya işte tanımlanır ve geçerli olan Spark oturumu için kapsamlanır. Scala veya Python kullanarak oturum kapsamlı UDF'leri tanımlayabilir ve bu UDF'lere erişebilirsiniz.

UDF türü Desteklenen hesaplama Açıklama
Python skaler
  • Sunucusuz not defterleri ve işler
  • Standart erişim moduyla klasik işlem (Databricks Runtime 13.3 LTS ve üzeri)
  • Lakeflow Spark Bildirimli İşlem Hatları (klasik ve sunucusuz)
Skaler UDF'ler tek bir satırda çalışır ve her satır için tek bir sonuç değeri döndürür.
Python skaler olmayan
  • Sunucusuz not defterleri ve işler
  • Standart erişim moduyla klasik işlem (Databricks Runtime 14.3 LTS ve üzeri)
  • Lakeflow Spark Bildirimli İşlem Hatları (klasik ve sunucusuz)
Skaler olmayan UDF'ler pandas_udf, mapInPandas, mapInArrow, applyInPandas içerir. Pandas UDF'leri, verileri aktarmak için Apache Arrow'u ve verilerle çalışmak için de pandas'ı kullanır. Pandas UDF'leri, satır satır skaler UDF'lere göre performansı büyük ölçüde artırabilen vektörleştirilmiş işlemleri destekler.
Python UDTF'leri
  • Sunucusuz not defterleri ve işler
  • Standart erişim moduyla klasik işlem (Databricks Runtime 14.3 LTS ve üzeri)
  • Lakeflow Spark Bildirimli İşlem Hatları (klasik ve sunucusuz)
UDTF, bir veya daha fazla giriş argümanını alır ve her giriş satırı için birden çok satır (ve muhtemelen birden çok sütun) döndürür.
Scala skaler UDF'ler
  • Standart erişim moduyla klasik işlem (Databricks Runtime 13.3 LTS ve üzeri)
Skaler UDF'ler tek bir satırda çalışır ve her satır için tek bir sonuç değeri döndürür.
Scala UDAF'leri
  • Ayrılmış erişim moduyla klasik işlem (Databricks Runtime 14.2 LTS ve üzeri)
UDAF'ler birden çok satırda çalışır ve tek bir toplu sonuç döndürür.

Performansla ilgili dikkat edilmesi gerekenler

  • Yerleşik işlevler ve SQL UDF'leri en verimli seçeneklerdir.

  • Scala UDF'leri genellikle Python UDF'lerinden daha hızlıdır.

    • Çözümlenmemiş Scala UDF'leri Java Sanal Makinesi'nde (JVM) çalıştırıldığından, JVM'ye veri taşıma ve JVM'den dışarı taşıma yükünden kaçınır.
    • Yalıtılmış Scala UDF'lerinin JVM'de veri taşıması ve JVM'den dışarı taşıması gerekir, ancak belleği daha verimli işledikleri için python UDF'lerinden daha hızlı olabilirler.
  • Python UDF'leri ve pandas UDF'leri , verileri seri hale getirmeleri ve JVM'den Python yorumlayıcısına taşımaları gerektiğinden Scala UDF'lerinden daha yavaş olma eğilimindedir.

    • Pandas UDF'leri, serileştirme maliyetlerini azaltmak için Apache Arrow kullandıklarından Python UDF'lerinden 100 kat daha hızlıdır.