pandas işlev API'leri

pandas işlev API'leri, pandas örneklerini alıp PySpark DataFrame'e veren bir Python yerel işlevini doğrudan uygulamanızı sağlar. Benzer şekilde, pandas kullanıcı tanımlı fonksiyonlar, fonksiyon API'leri de verileri aktarmak için Apache Arrow ve verilerle çalışmak için pandas kullanır; ancak Python tür ipuçları pandas fonksiyon API'lerinde isteğe bağlıdır.

Pandas işlev API'lerinin üç türü vardır:

  • Gruplandırılmış harita
  • Harita
  • Eş gruplama haritası

pandas işlev API'leri, pandas UDF yürütmesinin kullandığı iç mantığı kullanır. PyArrow, desteklenen SQL türleri ve yapılandırmalar gibi özellikleri paylaşırlar.

Daha fazla bilgi için, Apache Spark 3.0'ın Yaklaşan Sürümü'ndeki Yeni Pandas UDF'leri ve Python Türü İpuçları hakkında blog gönderisine bakın.

Gruplandırılmış harita

"Split-apply-combine" desenini uygulamak için groupBy().applyInPandas() kullanarak gruplandırılmış verilerinizi dönüştürebilirsiniz. "Split-apply-combine" üç adımdan oluşur:

  • DataFrame.groupBykullanarak verileri gruplara bölün.
  • Her gruba bir işlev uygulayın. Fonksiyonun hem girişi hem de çıkışı pandas.DataFrame. Giriş verileri, her grubun tüm satırlarını ve sütunlarını içerir.
  • Yeni bir DataFrameiçin sonuçları birleştirin.

groupBy().applyInPandas()kullanmak için aşağıdakileri tanımlamanız gerekir:

  • Her grup için hesaplamayı tanımlayan bir Python işlevi
  • Çıktı StructType şemasını tanımlayan bir DataFrame nesnesi veya dize

Döndürülen pandas.DataFrame sütun etiketleri, eğer dizeler olarak belirtilmişse tanımlı çıktı şemasındaki alan adlarıyla eşleşmeli veya dizeler değilse, örneğin tamsayı dizinleri gibi, konumlarına göre alan veri türleriyle eşleşmelidir. Bkz. .

Bir grubun tüm verileri, işlev uygulanmadan önce belleğe yüklenir. Bu, özellikle grup boyutları dengesizse bellek hatalarına yol açabilir. maxRecordsPerBatch yapılandırması gruplara uygulanmaz ve gruplandırılmış verilerin kullanılabilir belleğe sığdığından emin olmak size bağlıdır.

Aşağıdaki örnekte, gruptaki her değerden ortalamayı çıkarmak için groupby().apply() nasıl kullanılacağı gösterilmektedir.

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

def subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id|   v|
# +---+----+
# |  1|-0.5|
# |  1| 0.5|
# |  2|-3.0|
# |  2|-1.0|
# |  2| 4.0|
# +---+----+

Ayrıntılı kullanım için bkz. pyspark.sql.GroupedData.applyInPandas.

Harita

DataFrame.mapInPandas() yineleyicisini geçerli PySpark DataFrame'i temsil eden ve sonucu PySpark DataFrame olarak döndüren başka bir pandas.DataFrame yineleyicisine dönüştürmek için pandas.DataFrame pandas örnekleriyle eşleme işlemleri gerçekleştirirsiniz.

Temel alınan işlev pandas.DataFrameyineleyicisini alır ve çıktı olarak verir. Seriden Seriye gibi bazı pandas UDF'lerinin aksine rastgele uzunlukta çıkış döndürebilir.

Aşağıdaki örnekte mapInPandas()nasıl kullanılacağı gösterilmektedir:

df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

def filter_func(iterator):
    for pdf in iterator:
        yield pdf[pdf.id == 1]

df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# |  1| 21|
# +---+---+

Ayrıntılı kullanım için bkz. pyspark.sql.DataFrame.mapInPandas.

Eş gruplama haritası

Pandas örnekleriyle gruplandırılmış eşleme işlemleri için, iki PySpark DataFrame.groupby().cogroup().applyInPandas()'yi ortak bir anahtarla gruplandırmak ve ardından her bir cogroup'a, gösterildiği şekilde, bir Python işlevi uygulamak için DataFrame kullanın.

  • Bir anahtarı paylaşan her DataFrame grubunun birlikte gruplandırılması için verileri karıştırın.
  • Her bir cogroup'a bir işlev uygulayın. İşlevin girişi iki pandas.DataFrame (anahtarı temsil eden isteğe bağlı bir tuple ile). İşlevin çıkışı bir pandas.DataFrame'dur.
  • Tüm gruplardan gelen pandas.DataFrame'ları yeni bir PySpark DataFrameiçinde birleştirin.

groupBy().cogroup().applyInPandas()kullanmak için aşağıdakileri tanımlamanız gerekir:

  • Her bir cogroup için hesaplamayı tanımlayan bir Python işlevi.
  • StructType nesnesi veya PySpark DataFrameçıktısının şemasını tanımlayan bir dize.

Döndürülen pandas.DataFrame sütun etiketleri, eğer dizeler olarak belirtilmişse tanımlı çıktı şemasındaki alan adlarıyla eşleşmeli veya dizeler değilse, örneğin tamsayı dizinleri gibi, konumlarına göre alan veri türleriyle eşleşmelidir. Bkz. .

İşlev uygulanmadan önce bir ortak grubun tüm verileri belleğe yüklenir. Bu, özellikle grup boyutları dengesizse bellek hatalarına yol açabilir. maxRecordsPerBatch yapılandırması uygulanmaz ve birlikte gruplandırılmış verilerin kullanılabilir belleğe sığmasını sağlamak size bağlıdır.

Aşağıdaki örnekte, iki veri kümesi arasında bir groupby().cogroup().applyInPandas() gerçekleştirmek için asof join nasıl kullanılacağı gösterilmektedir.

import pandas as pd

df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ("time", "id", "v1"))

df2 = spark.createDataFrame(
    [(20000101, 1, "x"), (20000101, 2, "y")],
    ("time", "id", "v2"))

def asof_join(l, r):
    return pd.merge_asof(l, r, on="time", by="id")

df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
    asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# |    time| id| v1| v2|
# +--------+---+---+---+
# |20000101|  1|1.0|  x|
# |20000102|  1|3.0|  x|
# |20000101|  2|2.0|  y|
# |20000102|  2|4.0|  y|
# +--------+---+---+---+

Ayrıntılı kullanım için bkz. pyspark.sql.PandasCogroupedOps.applyInPandas.