وظائف الجدول المعرفة من قبل المستخدم Python (UDTFs)
هام
هذه الميزة موجودة في المعاينة العامة في Databricks Runtime 14.3 LTS وما فوق.
تسمح لك دالة الجدول المعرفة من قبل المستخدم (UDTF) بتسجيل الوظائف التي ترجع الجداول بدلا من القيم العددية. على عكس الدالات العددية التي ترجع قيمة نتيجة واحدة من كل استدعاء، يتم استدعاء كل UDTF في عبارة SQL FROM
وإرجاع جدول بأكمله كإخراج.
يمكن لكل استدعاء UDTF قبول صفر أو أكثر من الوسيطات. يمكن أن تكون هذه الوسيطات تعبيرات عددية أو وسيطات جدول تمثل جداول الإدخال بأكملها.
ينفذ Apache Spark Python UDTFs كفئات Python مع أسلوب إلزامي eval
يستخدم yield
لإرسال صفوف الإخراج.
لاستخدام الفئة الخاصة بك ك UDTF، يجب استيراد الدالة PySpark udtf
. توصي Databricks باستخدام هذه الدالة كمزخرف وتحديد أسماء الحقول وأنواعها بشكل صريح باستخدام returnType
الخيار (ما لم تحدد الفئة أسلوبا analyze
كما هو موضح في قسم لاحق).
ينشئ UDTF التالي جدولا باستخدام قائمة ثابتة من وسيطتين صحيحتين:
from pyspark.sql.functions import lit, udtf
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, x: int, y: int):
yield x + y, x - y
GetSumDiff(lit(1), lit(2)).show()
+----+-----+
| sum| diff|
+----+-----+
| 3| -1|
+----+-----+
يتم تسجيل UDTFs محليا SparkSession
ويتم عزلها على مستوى دفتر الملاحظات أو الوظيفة.
لا يمكنك تسجيل UDTFs ككائنات في كتالوج Unity، ولا يمكن استخدام UDTFs مع مستودعات SQL.
يمكنك تسجيل UDTF إلى الحالي SparkSession
للاستخدام في استعلامات SQL مع الدالة spark.udtf.register()
. أدخل اسما لدالة SQL وفئة Python UDTF.
spark.udtf.register("get_sum_diff", GetSumDiff)
بمجرد التسجيل، يمكنك استخدام UDTF في SQL باستخدام الأمر السحري أو spark.sql()
الدالة%sql
:
spark.udtf.register("get_sum_diff", GetSumDiff)
spark.sql("SELECT * FROM get_sum_diff(1,2);")
%sql
SELECT * FROM get_sum_diff(1,2);
إذا تلقى UDTF كمية صغيرة من البيانات كإدخال ولكنه يقوم بإخراج جدول كبير، توصي Databricks باستخدام سهم Apache. يمكنك تمكينه عن طريق تحديد المعلمة useArrow
عند الإعلان عن UDTF:
@udtf(returnType="c1: int, c2: int", useArrow=True)
يمكنك استخدام Python *args
أو **kwargs
بناء الجملة وتنفيذ المنطق للتعامل مع عدد غير محدد من قيم الإدخال.
يرجع المثال التالي نفس النتيجة أثناء التحقق صراحة من طول الإدخال وأنواع الوسيطات:
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, *args):
assert(len(args) == 2)
assert(isinstance(arg, int) for arg in args)
x = args[0]
y = args[1]
yield x + y, x - y
GetSumDiff(lit(1), lit(2)).show()
فيما يلي نفس المثال، ولكن باستخدام وسيطات الكلمة الأساسية:
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, **kwargs):
x = kwargs["x"]
y = kwargs["y"]
yield x + y, x - y
GetSumDiff(x=lit(1), y=lit(2)).show()
يقوم UDTF بإرجاع صفوف مع مخطط إخراج يتكون من تسلسل مرتب لأسماء الأعمدة وأنواعها. إذا كان يجب أن يظل مخطط UDTF كما هو دائما لجميع الاستعلامات، يمكنك تحديد مخطط ثابت ثابت بعد @udtf
المصمم. يجب أن يكون StructType
إما :
StructType().add("c1", StringType())
أو سلسلة DDL تمثل نوع بنية:
c1: string
يمكن ل UDTFs أيضا حساب مخطط الإخراج برمجيا لكل استدعاء اعتمادا على قيم وسيطات الإدخال. للقيام بذلك، حدد أسلوبا ثابتا يسمى analyze
يقبل صفرا أو أكثر من المعلمات التي تتوافق مع الوسيطات المقدمة لاستدعاء UDTF المحدد.
كل وسيطة من analyze
الأسلوب هي مثيل للفئة AnalyzeArgument
التي تحتوي على الحقول التالية:
AnalyzeArgument حقل الفئة |
الوصف |
---|---|
dataType |
نوع وسيطة الإدخال ك DataType . بالنسبة إلى وسيطات جدول الإدخال، هذا يمثل StructType أعمدة الجدول. |
value |
قيمة وسيطة الإدخال ك Optional[Any] . هذا None لوسائط الجدول أو الوسيطات العددية الحرفية غير الثابتة. |
isTable |
ما إذا كانت وسيطة الإدخال عبارة عن جدول ك BooleanType . |
isConstantExpression |
ما إذا كانت وسيطة الإدخال تعبيرا قابلا للطي الثابت ك BooleanType . |
يقوم analyze
الأسلوب بإرجاع مثيل للفئة AnalyzeResult
، والذي يتضمن مخطط جدول النتائج بالإضافة StructType
إلى بعض الحقول الاختيارية. إذا قبل UDTF وسيطة جدول إدخال، AnalyzeResult
فيمكن أن يتضمن أيضا طريقة مطلوبة لتقسيم صفوف جدول الإدخال وترتيبها عبر العديد من استدعاءات UDTF، كما هو موضح لاحقا.
AnalyzeResult حقل الفئة |
الوصف |
---|---|
schema |
مخطط جدول النتائج ك StructType . |
withSinglePartition |
ما إذا كنت تريد إرسال كافة صفوف الإدخال إلى نفس مثيل فئة UDTF مثل BooleanType . |
partitionBy |
إذا تم تعيينها إلى غير فارغة، يتم استهلاك جميع الصفوف التي تحتوي على كل مجموعة فريدة من قيم تعبيرات التقسيم بواسطة مثيل منفصل من فئة UDTF. |
orderBy |
إذا تم تعيينه إلى غير فارغ، فهذا يحدد ترتيب الصفوف داخل كل قسم. |
select |
إذا تم تعيينه إلى غير فارغ، فهذا تسلسل من التعبيرات التي يحددها UDTF ل Catalyst لتقييمها مقابل الأعمدة في وسيطة input TABLE. يتلقى UDTF سمة إدخال واحدة لكل اسم في القائمة بالترتيب الذي يتم سردها به. |
يقوم هذا analyze
المثال بإرجاع عمود إخراج واحد لكل كلمة في وسيطة سلسلة الإدخال.
@udtf
class MyUDTF:
@staticmethod
def analyze(text: AnalyzeArgument) -> AnalyzeResult:
schema = StructType()
for index, word in enumerate(sorted(list(set(text.value.split(" "))))):
schema = schema.add(f"word_{index}", IntegerType())
return AnalyzeResult(schema=schema)
def eval(self, text: str):
counts = {}
for word in text.split(" "):
if word not in counts:
counts[word] = 0
counts[word] += 1
result = []
for word in sorted(list(set(text.split(" ")))):
result.append(counts[word])
yield result
['word_0', 'word_1']
analyze
يمكن أن يعمل الأسلوب كمكان مناسب لإجراء التهيئة ثم إعادة توجيه النتائج إلى استدعاءات الأسلوب المستقبلية eval
لنفس استدعاء UDTF.
للقيام بذلك، قم بإنشاء فئة فرعية من AnalyzeResult
وإرجاع مثيل الفئة الفرعية analyze
من الأسلوب .
ثم أضف وسيطة إضافية إلى __init__
الأسلوب لقبول هذا المثيل.
يقوم هذا analyze
المثال بإرجاع مخطط إخراج ثابت، ولكنه يضيف معلومات مخصصة في بيانات تعريف النتيجة ليتم استهلاكها بواسطة استدعاءات الأسلوب المستقبلية __init__
:
@dataclass
class AnalyzeResultWithBuffer(AnalyzeResult):
buffer: str = ""
@udtf
class TestUDTF:
def __init__(self, analyze_result=None):
self._total = 0
if analyze_result is not None:
self._buffer = analyze_result.buffer
else:
self._buffer = ""
@staticmethod
def analyze(argument, _) -> AnalyzeResult:
if (
argument.value is None
or argument.isTable
or not isinstance(argument.value, str)
or len(argument.value) == 0
):
raise Exception("The first argument must be a non-empty string")
assert argument.dataType == StringType()
assert not argument.isTable
return AnalyzeResultWithBuffer(
schema=StructType()
.add("total", IntegerType())
.add("buffer", StringType()),
withSinglePartition=True,
buffer=argument.value,
)
def eval(self, argument, row: Row):
self._total += 1
def terminate(self):
yield self._total, self._buffer
self.spark.udtf.register("test_udtf", TestUDTF)
spark.sql(
"""
WITH t AS (
SELECT id FROM range(1, 21)
)
SELECT total, buffer
FROM test_udtf("abc", TABLE(t))
"""
).show()
+-------+-------+
| count | buffer|
+-------+-------+
| 20 | "abc"|
+-------+-------+
eval
يتم تشغيل الأسلوب مرة واحدة لكل صف من وسيطة جدول الإدخال (أو مرة واحدة فقط إذا لم يتم توفير وسيطة جدول)، متبوعا ب استدعاء واحد للأسلوب terminate
في النهاية. يقوم أي من الأسلوبين بإخراج صفر أو أكثر من الصفوف التي تتوافق مع مخطط النتيجة عن طريق إنتاج مجموعات أو قوائم أو pyspark.sql.Row
كائنات.
يقوم هذا المثال بإرجاع صف عن طريق توفير مجموعة من ثلاثة عناصر:
def eval(self, x, y, z):
yield (x, y, z)
يمكنك أيضا حذف الأقواس:
def eval(self, x, y, z):
yield x, y, z
أضف فاصلة لاحقة لإرجاع صف بعمود واحد فقط:
def eval(self, x, y, z):
yield x,
يمكنك أيضا إنتاج كائن pyspark.sql.Row
.
def eval(self, x, y, z)
from pyspark.sql.types import Row
yield Row(x, y, z)
ينتج عن هذا المثال صفوف إخراج من terminate
الأسلوب باستخدام قائمة Python. يمكنك تخزين الحالة داخل الفئة من الخطوات السابقة في تقييم UDTF لهذا الغرض.
def terminate(self):
yield [self.x, self.y, self.z]
يمكنك تمرير الوسيطات العددية إلى UDTF كتعبيرات ثابتة تتضمن قيما حرفية أو دالات تستند إليها. على سبيل المثال:
SELECT * FROM udtf(42, group => upper("finance_department"));
يمكن أن تقبل Python UDTFs جدول إدخال كوسيطة بالإضافة إلى وسيطات الإدخال العددية. يمكن أن يقبل UDTF واحد أيضا وسيطة جدول ووسيطات عددية متعددة.
ثم يمكن لأي استعلام SQL توفير جدول إدخال باستخدام TABLE
الكلمة الأساسية متبوعة بأقواس تحيط بمعرف جدول مناسب، مثل TABLE(t)
. بدلا من ذلك، يمكنك تمرير الاستعلام الفرعي للجدول، مثل TABLE(SELECT a, b, c FROM t)
أو TABLE(SELECT t1.a, t2.b FROM t1 INNER JOIN t2 USING (key))
.
ثم يتم تمثيل وسيطة جدول الإدخال كوسيطة pyspark.sql.Row
eval
للأسلوب، مع استدعاء واحد للأسلوب eval
لكل صف في جدول الإدخال. يمكنك استخدام التعليقات التوضيحية القياسية لحقل عمود PySpark للتفاعل مع الأعمدة في كل صف. يوضح المثال التالي استيراد نوع PySpark Row
بشكل صريح ثم تصفية الجدول الذي تم تمريره في id
الحقل:
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row
@udtf(returnType="id: int")
class FilterUDTF:
def eval(self, row: Row):
if row["id"] > 5:
yield row["id"],
spark.udtf.register("filter_udtf", FilterUDTF)
للاستعلام عن الدالة، استخدم TABLE
الكلمة الأساسية SQL:
SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)));
+---+
| id|
+---+
| 6|
| 7|
| 8|
| 9|
+---+
عند استدعاء UDTF باستخدام وسيطة جدول، يمكن لأي استعلام SQL تقسيم جدول الإدخال عبر العديد من استدعاءات UDTF استنادا إلى قيم عمود واحد أو أكثر من أعمدة جدول الإدخال.
لتحديد قسم، استخدم العبارة PARTITION BY
في استدعاء الدالة بعد الوسيطة TABLE
.
يضمن هذا أن جميع صفوف الإدخال مع كل تركيبة فريدة من قيم أعمدة التقسيم سيتم استهلاكها بواسطة مثيل واحد بالضبط من فئة UDTF.
لاحظ أنه بالإضافة إلى مراجع الأعمدة البسيطة، تقبل العبارة PARTITION BY
أيضا تعبيرات عشوائية استنادا إلى أعمدة جدول الإدخال. على سبيل المثال، يمكنك تحديد LENGTH
سلسلة، أو استخراج شهر من تاريخ، أو تسلسل قيمتين.
من الممكن أيضا تحديد WITH SINGLE PARTITION
بدلا من PARTITION BY
طلب قسم واحد فقط حيث يجب استهلاك جميع صفوف الإدخال بواسطة مثيل واحد بالضبط من فئة UDTF.
داخل كل قسم، يمكنك اختياريا تحديد ترتيب مطلوب لصفوف الإدخال حيث يستهلكها أسلوب UDTF eval
. للقيام بذلك، قم بتوفير عبارة ORDER BY
بعد عبارة أو WITH SINGLE PARTITION
الموضحة PARTITION BY
أعلاه.
على سبيل المثال، ضع في اعتبارك UDTF التالي:
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row
@udtf(returnType="a: string, b: int")
class FilterUDTF:
def __init__(self):
self.key = ""
self.max = 0
def eval(self, row: Row):
self.key = row["a"]
self.max = max(self.max, row["b"])
def terminate(self):
yield self.key, self.max
spark.udtf.register("filter_udtf", FilterUDTF)
يمكنك تحديد خيارات التقسيم عند استدعاء UDTF عبر جدول الإدخال بطرق muliple:
-- Create an input table with some example values.
DROP TABLE IF EXISTS values_table;
CREATE TABLE values_table (a STRING, b INT);
INSERT INTO values_table VALUES ('abc', 2), ('abc', 4), ('def', 6), ('def', 8)";
SELECT * FROM values_table;
+-------+----+
| a | b |
+-------+----+
| "abc" | 2 |
| "abc" | 4 |
| "def" | 6 |
| "def" | 8 |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique value in the `a` column are processed by the same
-- instance of the UDTF class. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY a ORDER BY b) ORDER BY 1;
+-------+----+
| a | b |
+-------+----+
| "abc" | 4 |
| "def" | 8 |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique result of evaluating the "LENGTH(a)" expression are
-- processed by the same instance of the UDTF class. Within each partition, the rows are ordered
-- by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY LENGTH(a) ORDER BY b) ORDER BY 1;
+-------+---+
| a | b |
+-------+---+
| "def" | 8 |
+-------+---+
-- Query the UDTF with the input table as an argument and a directive to consider all the input
-- rows in one single partition such that exactly one instance of the UDTF class consumes all of
-- the input rows. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) WITH SINGLE PARTITION ORDER BY b) ORDER BY 1;
+-------+----+
| a | b |
+-------+----+
| "def" | 8 |
+-------+----+
لاحظ أنه لكل من الطرق المذكورة أعلاه لتقسيم جدول الإدخال عند استدعاء UDTFs في استعلامات SQL، هناك طريقة مقابلة لأسلوب UDTF analyze
لتحديد نفس أسلوب التقسيم تلقائيا بدلا من ذلك.
- بدلا من استدعاء UDTF ك
SELECT * FROM udtf(TABLE(t) PARTITION BY a)
، يمكنك تحديثanalyze
الأسلوب لتعيين الحقلpartitionBy=[PartitioningColumn("a")]
واستدعاء الدالة ببساطة باستخدامSELECT * FROM udtf(TABLE(t))
. - بنفس الرمز المميز، بدلا من تحديد
TABLE(t) WITH SINGLE PARTITION ORDER BY b
في استعلام SQL، يمكنك تعيينanalyze
الحقولwithSinglePartition=true
orderBy=[OrderingColumn("b")]
ثم تمريرTABLE(t)
فقط . - بدلا من تمرير
TABLE(SELECT a FROM t)
استعلام SQL، يمكنك تعيينanalyze
select=[SelectedColumn("a")]
ثم تمريرTABLE(t)
فقط .
في المثال التالي، analyze
إرجاع مخطط إخراج ثابت، وتحديد مجموعة فرعية من الأعمدة من جدول الإدخال، وتحديد أن جدول الإدخال مقسم عبر العديد من استدعاءات UDTF استنادا إلى قيم date
العمود:
@staticmethod
def analyze(*args) -> AnalyzeResult:
"""
The input table will be partitioned across several UDTF calls based on the monthly
values of each `date` column. The rows within each partition will arrive ordered by the `date`
column. The UDTF will only receive the `date` and `word` columns from the input table.
"""
from pyspark.sql.functions import (
AnalyzeResult,
OrderingColumn,
PartitioningColumn,
)
assert len(args) == 1, "This function accepts one argument only"
assert args[0].isTable, "Only table arguments are supported"
return AnalyzeResult(
schema=StructType()
.add("month", DateType())
.add('longest_word", IntegerType()),
partitionBy=[
PartitioningColumn("extract(month from date)")],
orderBy=[
OrderingColumn("date")],
select=[
SelectedColumn("date"),
SelectedColumn(
name="length(word),
alias="length_word")])