Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
pandas işlev API'leri, pandas örneklerini alıp PySpark DataFrame'e veren bir Python yerel işlevini doğrudan uygulamanızı sağlar. Benzer şekilde, pandas kullanıcı tanımlı fonksiyonlar, fonksiyon API'leri de verileri aktarmak için Apache Arrow ve verilerle çalışmak için pandas kullanır; ancak Python tür ipuçları pandas fonksiyon API'lerinde isteğe bağlıdır.
Pandas işlev API'lerinin üç türü vardır:
- Gruplandırılmış harita
- Harita
- Eş gruplama haritası
pandas işlev API'leri, pandas UDF yürütmesinin kullandığı iç mantığı kullanır. PyArrow, desteklenen SQL türleri ve yapılandırmalar gibi özellikleri paylaşırlar.
Daha fazla bilgi için, Apache Spark 3.0'ın Yaklaşan Sürümü'ndeki Yeni Pandas UDF'leri ve Python Türü İpuçları hakkında blog gönderisine bakın.
Gruplandırılmış harita
"Split-apply-combine" desenini uygulamak için groupBy().applyInPandas() kullanarak gruplandırılmış verilerinizi dönüştürebilirsiniz. "Split-apply-combine" üç adımdan oluşur:
-
DataFrame.groupBykullanarak verileri gruplara bölün. - Her gruba bir işlev uygulayın. Fonksiyonun hem girişi hem de çıkışı
pandas.DataFrame. Giriş verileri, her grubun tüm satırlarını ve sütunlarını içerir. - Yeni bir
DataFrameiçin sonuçları birleştirin.
groupBy().applyInPandas()kullanmak için aşağıdakileri tanımlamanız gerekir:
- Her grup için hesaplamayı tanımlayan bir Python işlevi
- Çıktı
StructTypeşemasını tanımlayan birDataFramenesnesi veya dize
Döndürülen pandas.DataFrame sütun etiketleri, eğer dizeler olarak belirtilmişse tanımlı çıktı şemasındaki alan adlarıyla eşleşmeli veya dizeler değilse, örneğin tamsayı dizinleri gibi, konumlarına göre alan veri türleriyle eşleşmelidir. Bkz. .
Bir grubun tüm verileri, işlev uygulanmadan önce belleğe yüklenir. Bu, özellikle grup boyutları dengesizse bellek hatalarına yol açabilir. maxRecordsPerBatch
Aşağıdaki örnekte, gruptaki her değerden ortalamayı çıkarmak için groupby().apply() nasıl kullanılacağı gösterilmektedir.
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=v - v.mean())
df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id| v|
# +---+----+
# | 1|-0.5|
# | 1| 0.5|
# | 2|-3.0|
# | 2|-1.0|
# | 2| 4.0|
# +---+----+
Ayrıntılı kullanım için bkz. pyspark.sql.GroupedData.applyInPandas.
Harita
DataFrame.mapInPandas() yineleyicisini geçerli PySpark DataFrame'i temsil eden ve sonucu PySpark DataFrame olarak döndüren başka bir pandas.DataFrame yineleyicisine dönüştürmek için pandas.DataFrame pandas örnekleriyle eşleme işlemleri gerçekleştirirsiniz.
Temel alınan işlev pandas.DataFrameyineleyicisini alır ve çıktı olarak verir. Seriden Seriye gibi bazı pandas UDF'lerinin aksine rastgele uzunlukta çıkış döndürebilir.
Aşağıdaki örnekte mapInPandas()nasıl kullanılacağı gösterilmektedir:
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
def filter_func(iterator):
for pdf in iterator:
yield pdf[pdf.id == 1]
df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# | 1| 21|
# +---+---+
Ayrıntılı kullanım için bkz. pyspark.sql.DataFrame.mapInPandas.
Eş gruplama haritası
Pandas örnekleriyle gruplandırılmış eşleme işlemleri için, iki PySpark DataFrame.groupby().cogroup().applyInPandas()'yi ortak bir anahtarla gruplandırmak ve ardından her bir cogroup'a, gösterildiği şekilde, bir Python işlevi uygulamak için DataFrame kullanın.
- Bir anahtarı paylaşan her DataFrame grubunun birlikte gruplandırılması için verileri karıştırın.
- Her bir cogroup'a bir işlev uygulayın. İşlevin girişi iki
pandas.DataFrame(anahtarı temsil eden isteğe bağlı bir tuple ile). İşlevin çıkışı birpandas.DataFrame'dur. - Tüm gruplardan gelen
pandas.DataFrame'ları yeni bir PySparkDataFrameiçinde birleştirin.
groupBy().cogroup().applyInPandas()kullanmak için aşağıdakileri tanımlamanız gerekir:
- Her bir cogroup için hesaplamayı tanımlayan bir Python işlevi.
-
StructTypenesnesi veya PySparkDataFrameçıktısının şemasını tanımlayan bir dize.
Döndürülen pandas.DataFrame sütun etiketleri, eğer dizeler olarak belirtilmişse tanımlı çıktı şemasındaki alan adlarıyla eşleşmeli veya dizeler değilse, örneğin tamsayı dizinleri gibi, konumlarına göre alan veri türleriyle eşleşmelidir. Bkz. .
İşlev uygulanmadan önce bir ortak grubun tüm verileri belleğe yüklenir. Bu, özellikle grup boyutları dengesizse bellek hatalarına yol açabilir. maxRecordsPerBatch
Aşağıdaki örnekte, iki veri kümesi arasında bir groupby().cogroup().applyInPandas() gerçekleştirmek için asof join nasıl kullanılacağı gösterilmektedir.
import pandas as pd
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("time", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("time", "id", "v2"))
def asof_join(l, r):
return pd.merge_asof(l, r, on="time", by="id")
df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# | time| id| v1| v2|
# +--------+---+---+---+
# |20000101| 1|1.0| x|
# |20000102| 1|3.0| x|
# |20000101| 2|2.0| y|
# |20000102| 2|4.0| y|
# +--------+---+---+---+
Ayrıntılı kullanım için bkz. pyspark.sql.PandasCogroupedOps.applyInPandas.