واجهات برمجة تطبيقات دالة pandas

تمكنك واجهات برمجة التطبيقات لدالة pandas من تطبيق دالة Python الأصلية مباشرة التي تأخذ مثيلات pandas وتخرجها إلى PySpark DataFrame. على غرار وظائف pandas المعرفة من قبل المستخدم، تستخدم واجهات برمجة تطبيقات الوظائف أيضا Apache سهم لنقل البيانات وpandas للعمل مع البيانات؛ ومع ذلك، تعد تلميحات نوع Python اختيارية في واجهات برمجة تطبيقات وظائف pandas.

هناك ثلاثة أنواع من واجهات برمجة تطبيقات وظائف pandas:

  • خريطة مجمعة
  • خريطه
  • خريطة تجميع مشترك

تستفيد واجهات برمجة تطبيقات دالة pandas من نفس المنطق الداخلي الذي يستخدمه تنفيذ Pandas UDF. وهي تشترك في خصائص مثل PyArrow وأنواع SQL المدعومة والتكوينات.

لمزيد من المعلومات، راجع منشور المدونة New Pandas UDFs وPython Type Hints في الإصدار القادم من Apache Spark 3.0.

خريطة مجمعة

يمكنك تحويل البيانات المجمعة باستخدام groupBy().applyInPandas() لتنفيذ نمط "split-apply-combine". يتكون Split-apply-combine من ثلاث خطوات:

  • تقسيم البيانات إلى مجموعات باستخدام DataFrame.groupBy.
  • تطبيق دالة على كل مجموعة. الإدخال والإخراج للدالة كلاهما pandas.DataFrame. تحتوي بيانات الإدخال على جميع الصفوف والأعمدة لكل مجموعة.
  • ادمج النتائج في جديد DataFrame.

لاستخدام groupBy().applyInPandas()، يجب تحديد ما يلي:

  • دالة Python التي تحدد الحساب لكل مجموعة
  • كائن StructType أو سلسلة تحدد مخطط الإخراج DataFrame

يجب أن تتطابق تسميات الأعمدة التي تم إرجاعها pandas.DataFrame إما مع أسماء الحقول في مخطط الإخراج المحدد إذا تم تحديدها كسلاسل، أو تطابق أنواع بيانات الحقول حسب الموضع إن لم تكن سلاسل، على سبيل المثال، فهارس عدد صحيح. راجع الباندا. DataFrame لكيفية تسمية الأعمدة عند إنشاء pandas.DataFrame.

يتم تحميل جميع بيانات المجموعة في الذاكرة قبل تطبيق الدالة. يمكن أن يؤدي هذا إلى استثناءات نفاد الذاكرة، خاصة إذا تم انحراف أحجام المجموعة. لا يتم تطبيق تكوين maxRecordsPerBatch على المجموعات، الأمر متروك لك للتأكد من احتواء البيانات المجمعة في الذاكرة المتوفرة.

يوضح المثال التالي كيفية استخدام groupby().apply() لطرح الوسط من كل قيمة في المجموعة.

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

def subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id|   v|
# +---+----+
# |  1|-0.5|
# |  1| 0.5|
# |  2|-3.0|
# |  2|-1.0|
# |  2| 4.0|
# +---+----+

للحصول على استخدام مفصل، راجع pyspark.sql.GroupedData.applyInPandas.

خريطه

يمكنك تنفيذ عمليات الخريطة باستخدام مثيلات pandas من DataFrame.mapInPandas() أجل تحويل مكرر pandas.DataFrame إلى مكرر pandas.DataFrame آخر يمثل PySpark DataFrame الحالي ويعيد النتيجة ك PySpark DataFrame.

تأخذ الدالة الأساسية ويخرج مكرر ل pandas.DataFrame. يمكن أن ترجع مخرجات الطول العشوائي على النقيض من بعض Pandas UDFs مثل Series to Series.

يوضح المثال التالي كيفية استخدام mapInPandas():

df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

def filter_func(iterator):
    for pdf in iterator:
        yield pdf[pdf.id == 1]

df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# |  1| 21|
# +---+---+

للحصول على استخدام مفصل، راجع pyspark.sql.DataFrame.mapInPandas.

خريطة تجميع مشترك

لعمليات الخريطة المجمعة مع مثيلات pandas، استخدم DataFrame.groupby().cogroup().applyInPandas() ل تجميع اثنين من PySpark DataFrames بواسطة مفتاح مشترك ثم تطبيق دالة Python على كل مجموعة مشاركة مشتركة كما هو موضح:

  • قم بخلط البيانات بحيث يتم تجميع مجموعات كل DataFrame التي تشترك في مفتاح معا.
  • تطبيق دالة على كل مجموعة مشاركة. إدخال الدالة هو اثنان pandas.DataFrame (مع مجموعة اختيارية تمثل المفتاح). إخراج الدالة هو pandas.DataFrame.
  • ادمج s pandas.DataFrameمن جميع المجموعات في PySpark DataFrameجديد .

لاستخدام groupBy().cogroup().applyInPandas()، يجب تحديد ما يلي:

  • دالة Python التي تحدد الحساب لكل مجموعة مشاركة.
  • كائن StructType أو سلسلة تحدد مخطط إخراج PySpark DataFrame.

يجب أن تتطابق تسميات الأعمدة التي تم إرجاعها pandas.DataFrame إما مع أسماء الحقول في مخطط الإخراج المحدد إذا تم تحديدها كسلاسل، أو تطابق أنواع بيانات الحقول حسب الموضع إن لم تكن سلاسل، على سبيل المثال، فهارس عدد صحيح. راجع الباندا. DataFrame لكيفية تسمية الأعمدة عند إنشاء pandas.DataFrame.

يتم تحميل جميع بيانات مجموعة المشاركة في الذاكرة قبل تطبيق الدالة. يمكن أن يؤدي هذا إلى استثناءات نفاد الذاكرة، خاصة إذا تم انحراف أحجام المجموعة. لا يتم تطبيق تكوين maxRecordsPerBatch والمسؤولية متروكة لك للتأكد من أن البيانات المجمعة تناسب الذاكرة المتوفرة.

يوضح المثال التالي كيفية استخدام groupby().cogroup().applyInPandas() لتنفيذ asof join بين مجموعتي بيانات.

import pandas as pd

df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ("time", "id", "v1"))

df2 = spark.createDataFrame(
    [(20000101, 1, "x"), (20000101, 2, "y")],
    ("time", "id", "v2"))

def asof_join(l, r):
    return pd.merge_asof(l, r, on="time", by="id")

df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
    asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# |    time| id| v1| v2|
# +--------+---+---+---+
# |20000101|  1|1.0|  x|
# |20000102|  1|3.0|  x|
# |20000101|  2|2.0|  y|
# |20000102|  2|4.0|  y|
# +--------+---+---+---+

للحصول على استخدام مفصل، راجع pyspark.sql.PandasCogroupedOps.applyInPandas.