ما هي الوظائف المعرفة من قبل المستخدم (UDFs)؟
تسمح لك الوظائف المعرفة من قبل المستخدم (UDFs) بإعادة استخدام التعليمات البرمجية التي توسع الوظائف المضمنة على Azure Databricks ومشاركتها. استخدم UDFs لتنفيذ مهام معينة، مثل العمليات الحسابية المعقدة أو التحويلات أو معالجة البيانات المخصصة.
ملاحظة
على المجموعات ذات وضع الوصول المشترك، يتم دعم Python scalar UDFs في Databricks Runtime 13.3 LTS وما فوق، بينما يتم دعم Scala UDFs في Databricks Runtime 14.2 وما فوق.
يمكن تسجيل Python scalar UDFs في كتالوج Unity باستخدام بناء جملة SQL في Databricks Runtime 13.3 LTS وما فوق. راجع الدالات المعرفة من قبل المستخدم (UDFs) في كتالوج Unity.
استخدم UDFs للمنطق الذي يصعب التعبير به باستخدام وظائف Apache Spark المضمنة. تم تحسين وظائف Apache Spark المضمنة للمعالجة الموزعة وتقدم أداء أفضل بشكل عام على نطاق واسع. لمزيد من المعلومات، راجع الدالات.
توصي Databricks ب UDFs للاستعلامات المخصصة، وتنقية البيانات يدويا، وتحليل البيانات الاستكشافية، والعمليات على مجموعات البيانات الصغيرة إلى المتوسطة الحجم. تتضمن حالات الاستخدام الشائعة ل UDFs تشفير البيانات وفك تشفيرها والتجزئة وتحليل JSON والتحقق من الصحة.
استخدم أساليب Apache Spark للعمليات على مجموعات البيانات الكبيرة جدا وأي أحمال عمل يتم تشغيلها بانتظام أو باستمرار، بما في ذلك مهام ETL وعمليات الدفق.
يتم تسجيل UDFs التي تم إنشاؤها باستخدام SQL في كتالوج Unity ولديها أذونات مقترنة، بينما تستند UDFs التي تم إنشاؤها داخل دفتر ملاحظاتك إلى جلسة العمل ويتم تحديد نطاقها إلى SparkSession الحالي.
يمكنك تحديد UDFs المستندة إلى جلسة العمل والوصول إليها باستخدام أي لغة يدعمها Azure Databricks. يمكن أن تكون UDFs عددية أو غير عددية.
ملاحظة
تتوفر حاليا فقط SQL وPython UDFs العددية المسجلة في كتالوج Unity في DBSQL.
تعمل UDFs العددية على صف واحد وترجع قيمة واحدة لكل صف. يستخدم المثال التالي UDF عدديا لحساب طول كل اسم في name
عمود وإضافة القيمة في عمود name_length
جديد :
+-------+-------+
| name | score |
+-------+-------+
| alice | 10.0 |
| bob | 20.0 |
| carol | 30.0 |
| dave | 40.0 |
| eve | 50.0 |
+-------+-------+
-- Create a SQL UDF for name length
CREATE OR REPLACE FUNCTION get_name_length(name STRING)
RETURNS INT
RETURN LENGTH(name);
-- Use the UDF in a SQL query
SELECT name, get_name_length(name) AS name_length
FROM your_table;
+-------+-------+-------------+
| name | score | name_length |
+-------+-------+-------------+
| alice | 10.0 | 5 |
| bob | 20.0 | 3 |
| carol | 30.0 | 5 |
| dave | 40.0 | 4 |
| eve | 50.0 | 3 |
+-------+-------+-------------+
لتنفيذ ذلك في دفتر ملاحظات Databricks باستخدام PySpark:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
@udf(returnType=IntegerType())
def get_name_length(name):
return len(name)
df = df.withColumn("name_length", get_name_length(df.name))
# Show the result
display(df)
لمزيد من المعلومات، راجع الوظائف المعرفة من قبل المستخدم (UDFs) في كتالوج Unity والوظائف العددية المعرفة من قبل المستخدم - Python.
تعمل الدالات التجميعية المعرفة من قبل المستخدم (UDAFs) على صفوف متعددة وترجع نتيجة مجمعة واحدة. في المثال التالي، يتم تعريف UDAF الذي يجمع الدرجات.
from pyspark.sql.functions import pandas_udf
from pyspark.sql import SparkSession
import pandas as pd
# Define a pandas UDF for aggregating scores
@pandas_udf("int")
def total_score_udf(scores: pd.Series) -> int:
return scores.sum()
# Group by name length and aggregate
result_df = (df.groupBy("name_length")
.agg(total_score_udf(df["score"]).alias("total_score")))
display(result_df)
+-------------+-------------+
| name_length | total_score |
+-------------+-------------+
| 3 | 70.0 |
| 4 | 40.0 |
| 5 | 40.0 |
+-------------+-------------+
راجع وظائف pandas المعرفة من قبل المستخدم لوظائف Python والدالات التجميعية المعرفة من قبل المستخدم - Scala.
ملاحظة
تتوفر Python UDTFs في Databricks Runtime 14.3 LTS وما فوق.
يمكن أن ترجع دالات الجدول المعرفة من قبل المستخدم Python (UDTFs) صفوف وأعمدة متعددة لكل صف إدخال. في المثال التالي، تتوافق كل قيمة في عمود النتيجة مع قائمة الفئات. يتم تعريف UDTF لتقسيم القائمة المفصولة بفواصل إلى صفوف متعددة. راجع وظائف الجدول المعرفة من قبل المستخدم Python (UDTFs)
+-------+-------+-----------------+
| name | score | categories |
+-------+-------+-----------------+
| alice | 10.0 | math,science |
| bob | 20.0 | history,math |
| carol | 30.0 | science,history |
| dave | 40.0 | math,art |
| eve | 50.0 | science,art |
+-------+-------+-----------------+
from pyspark.sql.functions import udtf
@udtf(returnType="score: int, categories: string, name: string")
class ScoreCategoriesUDTF:
def eval(self, name: str, score: float, categories: str):
category_list = categories.split(',')
for category in category_list:
yield (name, score, category)
# Apply the UDTF
result_df = df.select(ScoreCategoriesUDTF(df.score, df.categories, df.name))
display(result_df)
+-------+-------+----------+
| name | score | category |
+-------+-------+----------+
| alice | 10.0 | math |
| alice | 10.0 | science |
| bob | 20.0 | history |
| bob | 20.0 | math |
| carol | 30.0 | science |
| carol | 30.0 | history |
| dave | 40.0 | math |
| dave | 40.0 | art |
| eve | 50.0 | science |
| eve | 50.0 | art |
+-------+-------+----------+
- الوظائف المضمنة وUDFs SQL هي الخيار الأكثر فعالية المتوفرة.
- تكون Scala UDFs أسرع بشكل عام لأنها تنفذ داخل جهاز Java الظاهري (JVM) وتتجنب الحمل الزائد لنقل البيانات داخل وخارج JVM.
- تميل Python UDFs وPandas UDFs إلى أن تكون أبطأ من Scala UDFs لأنها تتطلب تسلسل البيانات ونقلها من JVM إلى مترجم Python. Pandas UDFs أسرع حتى 100x من Python UDFs لأنها تستخدم سهم Apache لتقليل تكاليف التسلسل.