وظائف الجدول المعرفة من قبل المستخدم Python (UDTFs)

هام

هذه الميزة موجودة في المعاينة العامة في Databricks Runtime 14.3 LTS وما فوق.

تسمح لك دالة الجدول المعرفة من قبل المستخدم (UDTF) بتسجيل الوظائف التي ترجع الجداول بدلا من القيم العددية. على عكس الدالات العددية التي ترجع قيمة نتيجة واحدة من كل استدعاء، يتم استدعاء كل UDTF في عبارة SQL FROM وإرجاع جدول بأكمله كإخراج.

يمكن لكل استدعاء UDTF قبول صفر أو أكثر من الوسيطات. يمكن أن تكون هذه الوسيطات تعبيرات عددية أو وسيطات جدول تمثل جداول الإدخال بأكملها.

بناء جملة UDTF الأساسي

ينفذ Apache Spark Python UDTFs كفئات Python مع أسلوب إلزامي eval يستخدم yield لإرسال صفوف الإخراج.

لاستخدام الفئة الخاصة بك ك UDTF، يجب استيراد الدالة PySpark udtf . توصي Databricks باستخدام هذه الدالة كمزخرف وتحديد أسماء الحقول وأنواعها بشكل صريح باستخدام returnType الخيار (ما لم تحدد الفئة أسلوبا analyze كما هو موضح في قسم لاحق).

ينشئ UDTF التالي جدولا باستخدام قائمة ثابتة من وسيطتين صحيحتين:

from pyspark.sql.functions import lit, udtf

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, x: int, y: int):
        yield x + y, x - y

GetSumDiff(lit(1), lit(2)).show()
+----+-----+
| sum| diff|
+----+-----+
|   3|   -1|
+----+-----+

تسجيل UDTF

يتم تسجيل UDTFs محليا SparkSession ويتم عزلها على مستوى دفتر الملاحظات أو الوظيفة.

لا يمكنك تسجيل UDTFs ككائنات في كتالوج Unity، ولا يمكن استخدام UDTFs مع مستودعات SQL.

يمكنك تسجيل UDTF إلى الحالي SparkSession للاستخدام في استعلامات SQL مع الدالة spark.udtf.register(). أدخل اسما لدالة SQL وفئة Python UDTF.

spark.udtf.register("get_sum_diff", GetSumDiff)

استدعاء UDTF مسجل

بمجرد التسجيل، يمكنك استخدام UDTF في SQL باستخدام الأمر السحري أو spark.sql() الدالة%sql:

spark.udtf.register("get_sum_diff", GetSumDiff)
spark.sql("SELECT * FROM get_sum_diff(1,2);")
%sql
SELECT * FROM get_sum_diff(1,2);

استخدام سهم Apache

إذا تلقى UDTF كمية صغيرة من البيانات كإدخال ولكنه يقوم بإخراج جدول كبير، توصي Databricks باستخدام سهم Apache. يمكنك تمكينه عن طريق تحديد المعلمة useArrow عند الإعلان عن UDTF:

@udtf(returnType="c1: int, c2: int", useArrow=True)

قوائم الوسيطات المتغيرة - *args و**kwargs

يمكنك استخدام Python *args أو **kwargs بناء الجملة وتنفيذ المنطق للتعامل مع عدد غير محدد من قيم الإدخال.

يرجع المثال التالي نفس النتيجة أثناء التحقق صراحة من طول الإدخال وأنواع الوسيطات:

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, *args):
        assert(len(args) == 2)
        assert(isinstance(arg, int) for arg in args)
        x = args[0]
        y = args[1]
        yield x + y, x - y

GetSumDiff(lit(1), lit(2)).show()

فيما يلي نفس المثال، ولكن باستخدام وسيطات الكلمة الأساسية:

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, **kwargs):
        x = kwargs["x"]
        y = kwargs["y"]
        yield x + y, x - y

GetSumDiff(x=lit(1), y=lit(2)).show()

تعريف مخطط ثابت في وقت التسجيل

يقوم UDTF بإرجاع صفوف مع مخطط إخراج يتكون من تسلسل مرتب لأسماء الأعمدة وأنواعها. إذا كان يجب أن يظل مخطط UDTF كما هو دائما لجميع الاستعلامات، يمكنك تحديد مخطط ثابت ثابت بعد @udtf المصمم. يجب أن يكون StructTypeإما :

StructType().add("c1", StringType())

أو سلسلة DDL تمثل نوع بنية:

c1: string

حساب مخطط ديناميكي في وقت استدعاء الوظيفة

يمكن ل UDTFs أيضا حساب مخطط الإخراج برمجيا لكل استدعاء اعتمادا على قيم وسيطات الإدخال. للقيام بذلك، حدد أسلوبا ثابتا يسمى analyze يقبل صفرا أو أكثر من المعلمات التي تتوافق مع الوسيطات المقدمة لاستدعاء UDTF المحدد.

كل وسيطة من analyze الأسلوب هي مثيل للفئة AnalyzeArgument التي تحتوي على الحقول التالية:

AnalyzeArgument حقل الفئة ‏‏الوصف
dataType نوع وسيطة الإدخال ك DataType. بالنسبة إلى وسيطات جدول الإدخال، هذا يمثل StructType أعمدة الجدول.
value قيمة وسيطة الإدخال ك Optional[Any]. هذا None لوسائط الجدول أو الوسيطات العددية الحرفية غير الثابتة.
isTable ما إذا كانت وسيطة الإدخال عبارة عن جدول ك BooleanType.
isConstantExpression ما إذا كانت وسيطة الإدخال تعبيرا قابلا للطي الثابت ك BooleanType.

يقوم analyze الأسلوب بإرجاع مثيل للفئة AnalyzeResult ، والذي يتضمن مخطط جدول النتائج بالإضافة StructType إلى بعض الحقول الاختيارية. إذا قبل UDTF وسيطة جدول إدخال، AnalyzeResult فيمكن أن يتضمن أيضا طريقة مطلوبة لتقسيم صفوف جدول الإدخال وترتيبها عبر العديد من استدعاءات UDTF، كما هو موضح لاحقا.

AnalyzeResult حقل الفئة ‏‏الوصف
schema مخطط جدول النتائج ك StructType.
withSinglePartition ما إذا كنت تريد إرسال كافة صفوف الإدخال إلى نفس مثيل فئة UDTF مثل BooleanType.
partitionBy إذا تم تعيينها إلى غير فارغة، يتم استهلاك جميع الصفوف التي تحتوي على كل مجموعة فريدة من قيم تعبيرات التقسيم بواسطة مثيل منفصل من فئة UDTF.
orderBy إذا تم تعيينه إلى غير فارغ، فهذا يحدد ترتيب الصفوف داخل كل قسم.
select إذا تم تعيينه إلى غير فارغ، فهذا تسلسل من التعبيرات التي يحددها UDTF ل Catalyst لتقييمها مقابل الأعمدة في وسيطة input TABLE. يتلقى UDTF سمة إدخال واحدة لكل اسم في القائمة بالترتيب الذي يتم سردها به.

يقوم هذا analyze المثال بإرجاع عمود إخراج واحد لكل كلمة في وسيطة سلسلة الإدخال.

@udtf
class MyUDTF:
  @staticmethod
  def analyze(text: AnalyzeArgument) -> AnalyzeResult:
    schema = StructType()
    for index, word in enumerate(sorted(list(set(text.value.split(" "))))):
      schema = schema.add(f"word_{index}", IntegerType())
    return AnalyzeResult(schema=schema)

  def eval(self, text: str):
    counts = {}
    for word in text.split(" "):
      if word not in counts:
            counts[word] = 0
      counts[word] += 1
    result = []
    for word in sorted(list(set(text.split(" ")))):
      result.append(counts[word])
    yield result
['word_0', 'word_1']

إعادة توجيه الحالة إلى المكالمات المستقبلية eval

analyze يمكن أن يعمل الأسلوب كمكان مناسب لإجراء التهيئة ثم إعادة توجيه النتائج إلى استدعاءات الأسلوب المستقبلية eval لنفس استدعاء UDTF.

للقيام بذلك، قم بإنشاء فئة فرعية من AnalyzeResult وإرجاع مثيل الفئة الفرعية analyze من الأسلوب . ثم أضف وسيطة إضافية إلى __init__ الأسلوب لقبول هذا المثيل.

يقوم هذا analyze المثال بإرجاع مخطط إخراج ثابت، ولكنه يضيف معلومات مخصصة في بيانات تعريف النتيجة ليتم استهلاكها بواسطة استدعاءات الأسلوب المستقبلية __init__ :

@dataclass
class AnalyzeResultWithBuffer(AnalyzeResult):
    buffer: str = ""

@udtf
class TestUDTF:
  def __init__(self, analyze_result=None):
    self._total = 0
    if analyze_result is not None:
      self._buffer = analyze_result.buffer
    else:
      self._buffer = ""

  @staticmethod
  def analyze(argument, _) -> AnalyzeResult:
    if (
      argument.value is None
      or argument.isTable
      or not isinstance(argument.value, str)
      or len(argument.value) == 0
    ):
      raise Exception("The first argument must be a non-empty string")
    assert argument.dataType == StringType()
    assert not argument.isTable
    return AnalyzeResultWithBuffer(
      schema=StructType()
        .add("total", IntegerType())
        .add("buffer", StringType()),
      withSinglePartition=True,
      buffer=argument.value,
    )

  def eval(self, argument, row: Row):
    self._total += 1

  def terminate(self):
    yield self._total, self._buffer

self.spark.udtf.register("test_udtf", TestUDTF)

spark.sql(
  """
  WITH t AS (
    SELECT id FROM range(1, 21)
  )
  SELECT total, buffer
  FROM test_udtf("abc", TABLE(t))
  """
).show()
+-------+-------+
| count | buffer|
+-------+-------+
|    20 |  "abc"|
+-------+-------+

صفوف الإخراج الناتجة

eval يتم تشغيل الأسلوب مرة واحدة لكل صف من وسيطة جدول الإدخال (أو مرة واحدة فقط إذا لم يتم توفير وسيطة جدول)، متبوعا ب استدعاء واحد للأسلوب terminate في النهاية. يقوم أي من الأسلوبين بإخراج صفر أو أكثر من الصفوف التي تتوافق مع مخطط النتيجة عن طريق إنتاج مجموعات أو قوائم أو pyspark.sql.Row كائنات.

يقوم هذا المثال بإرجاع صف عن طريق توفير مجموعة من ثلاثة عناصر:

def eval(self, x, y, z):
  yield (x, y, z)

يمكنك أيضا حذف الأقواس:

def eval(self, x, y, z):
  yield x, y, z

أضف فاصلة لاحقة لإرجاع صف بعمود واحد فقط:

def eval(self, x, y, z):
  yield x,

يمكنك أيضا إنتاج كائن pyspark.sql.Row .

def eval(self, x, y, z)
  from pyspark.sql.types import Row
  yield Row(x, y, z)

ينتج عن هذا المثال صفوف إخراج من terminate الأسلوب باستخدام قائمة Python. يمكنك تخزين الحالة داخل الفئة من الخطوات السابقة في تقييم UDTF لهذا الغرض.

def terminate(self):
  yield [self.x, self.y, self.z]

تمرير الوسيطات العددية إلى UDTF

يمكنك تمرير الوسيطات العددية إلى UDTF كتعبيرات ثابتة تتضمن قيما حرفية أو دالات تستند إليها. على سبيل المثال:

SELECT * FROM udtf(42, group => upper("finance_department"));

تمرير وسيطات الجدول إلى UDTF

يمكن أن تقبل Python UDTFs جدول إدخال كوسيطة بالإضافة إلى وسيطات الإدخال العددية. يمكن أن يقبل UDTF واحد أيضا وسيطة جدول ووسيطات عددية متعددة.

ثم يمكن لأي استعلام SQL توفير جدول إدخال باستخدام TABLE الكلمة الأساسية متبوعة بأقواس تحيط بمعرف جدول مناسب، مثل TABLE(t). بدلا من ذلك، يمكنك تمرير الاستعلام الفرعي للجدول، مثل TABLE(SELECT a, b, c FROM t) أو TABLE(SELECT t1.a, t2.b FROM t1 INNER JOIN t2 USING (key)).

ثم يتم تمثيل وسيطة جدول الإدخال كوسيطة pyspark.sql.Row eval للأسلوب، مع استدعاء واحد للأسلوب eval لكل صف في جدول الإدخال. يمكنك استخدام التعليقات التوضيحية القياسية لحقل عمود PySpark للتفاعل مع الأعمدة في كل صف. يوضح المثال التالي استيراد نوع PySpark Row بشكل صريح ثم تصفية الجدول الذي تم تمريره في id الحقل:

from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="id: int")
class FilterUDTF:
    def eval(self, row: Row):
        if row["id"] > 5:
            yield row["id"],

spark.udtf.register("filter_udtf", FilterUDTF)

للاستعلام عن الدالة، استخدم TABLE الكلمة الأساسية SQL:

SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)));
+---+
| id|
+---+
|  6|
|  7|
|  8|
|  9|
+---+

تحديد تقسيم صفوف الإدخال من استدعاءات الدالة

عند استدعاء UDTF باستخدام وسيطة جدول، يمكن لأي استعلام SQL تقسيم جدول الإدخال عبر العديد من استدعاءات UDTF استنادا إلى قيم عمود واحد أو أكثر من أعمدة جدول الإدخال.

لتحديد قسم، استخدم العبارة PARTITION BY في استدعاء الدالة بعد الوسيطة TABLE . يضمن هذا أن جميع صفوف الإدخال مع كل تركيبة فريدة من قيم أعمدة التقسيم سيتم استهلاكها بواسطة مثيل واحد بالضبط من فئة UDTF.

لاحظ أنه بالإضافة إلى مراجع الأعمدة البسيطة، تقبل العبارة PARTITION BY أيضا تعبيرات عشوائية استنادا إلى أعمدة جدول الإدخال. على سبيل المثال، يمكنك تحديد LENGTH سلسلة، أو استخراج شهر من تاريخ، أو تسلسل قيمتين.

من الممكن أيضا تحديد WITH SINGLE PARTITION بدلا من PARTITION BY طلب قسم واحد فقط حيث يجب استهلاك جميع صفوف الإدخال بواسطة مثيل واحد بالضبط من فئة UDTF.

داخل كل قسم، يمكنك اختياريا تحديد ترتيب مطلوب لصفوف الإدخال حيث يستهلكها أسلوب UDTF eval . للقيام بذلك، قم بتوفير عبارة ORDER BY بعد عبارة أو WITH SINGLE PARTITION الموضحة PARTITION BY أعلاه.

على سبيل المثال، ضع في اعتبارك UDTF التالي:

from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="a: string, b: int")
class FilterUDTF:
  def __init__(self):
    self.key = ""
    self.max = 0

  def eval(self, row: Row):
    self.key = row["a"]
    self.max = max(self.max, row["b"])

  def terminate(self):
    yield self.key, self.max

spark.udtf.register("filter_udtf", FilterUDTF)

يمكنك تحديد خيارات التقسيم عند استدعاء UDTF عبر جدول الإدخال بطرق muliple:

-- Create an input table with some example values.
DROP TABLE IF EXISTS values_table;
CREATE TABLE values_table (a STRING, b INT);
INSERT INTO values_table VALUES ('abc', 2), ('abc', 4), ('def', 6), ('def', 8)";
SELECT * FROM values_table;
+-------+----+
|     a |  b |
+-------+----+
| "abc" | 2  |
| "abc" | 4  |
| "def" | 6  |
| "def" | 8  |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique value in the `a` column are processed by the same
-- instance of the UDTF class. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY a ORDER BY b) ORDER BY 1;
+-------+----+
|     a |  b |
+-------+----+
| "abc" | 4  |
| "def" | 8  |
+-------+----+

-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique result of evaluating the "LENGTH(a)" expression are
-- processed by the same instance of the UDTF class. Within each partition, the rows are ordered
-- by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY LENGTH(a) ORDER BY b) ORDER BY 1;
+-------+---+
|     a | b |
+-------+---+
| "def" | 8 |
+-------+---+
-- Query the UDTF with the input table as an argument and a directive to consider all the input
-- rows in one single partition such that exactly one instance of the UDTF class consumes all of
-- the input rows. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) WITH SINGLE PARTITION ORDER BY b) ORDER BY 1;
+-------+----+
|     a |  b |
+-------+----+
| "def" | 8 |
+-------+----+

تحديد تقسيم صفوف الإدخال من analyze الأسلوب

لاحظ أنه لكل من الطرق المذكورة أعلاه لتقسيم جدول الإدخال عند استدعاء UDTFs في استعلامات SQL، هناك طريقة مقابلة لأسلوب UDTF analyze لتحديد نفس أسلوب التقسيم تلقائيا بدلا من ذلك.

  • بدلا من استدعاء UDTF ك SELECT * FROM udtf(TABLE(t) PARTITION BY a)، يمكنك تحديث analyze الأسلوب لتعيين الحقل partitionBy=[PartitioningColumn("a")] واستدعاء الدالة ببساطة باستخدام SELECT * FROM udtf(TABLE(t)).
  • بنفس الرمز المميز، بدلا من تحديد TABLE(t) WITH SINGLE PARTITION ORDER BY b في استعلام SQL، يمكنك تعيين analyze الحقول withSinglePartition=true orderBy=[OrderingColumn("b")] ثم تمرير TABLE(t)فقط .
  • بدلا من تمرير TABLE(SELECT a FROM t) استعلام SQL، يمكنك تعيين analyze select=[SelectedColumn("a")] ثم تمرير TABLE(t)فقط .

في المثال التالي، analyze إرجاع مخطط إخراج ثابت، وتحديد مجموعة فرعية من الأعمدة من جدول الإدخال، وتحديد أن جدول الإدخال مقسم عبر العديد من استدعاءات UDTF استنادا إلى قيم date العمود:

@staticmethod
def analyze(*args) -> AnalyzeResult:
  """
  The input table will be partitioned across several UDTF calls based on the monthly
  values of each `date` column. The rows within each partition will arrive ordered by the `date`
  column. The UDTF will only receive the `date` and `word` columns from the input table.
  """
  from pyspark.sql.functions import (
    AnalyzeResult,
    OrderingColumn,
    PartitioningColumn,
  )

  assert len(args) == 1, "This function accepts one argument only"
  assert args[0].isTable, "Only table arguments are supported"
  return AnalyzeResult(
    schema=StructType()
      .add("month", DateType())
      .add('longest_word", IntegerType()),
    partitionBy=[
      PartitioningColumn("extract(month from date)")],
    orderBy=[
      OrderingColumn("date")],
    select=[
      SelectedColumn("date"),
      SelectedColumn(
        name="length(word),
        alias="length_word")])