Note
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de changer d’annuaire.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de changer d’annuaire.
Les API de fonction pandas vous permettent d’appliquer directement une fonction native Python qui accepte et génère des instances pandas à un DataFrame PySpark. Similaires aux fonctions définies par l'utilisateur de pandas, les API de fonction utilisent également Apache Arrow pour transférer des données et pandas pour travailler avec les données ; cependant, les indications de type Python sont facultatives dans les API de fonction pandas.
Il existe trois types d’API des fonctions pandas :
- Carte groupée
- Carte
- Carte cogroupée
Les API de fonction pandas tirent parti de la même logique interne que celle utilisée par l’exécution de pandas UDF. Ils partagent des caractéristiques telles que PyArrow, les types SQL pris en charge et les configurations.
Pour plus d'informations, consultez l'article de blog Nouveaux UDF Pandas et indices de type Python dans la prochaine version d'Apache Spark 3.0.
Carte groupée
Vous transformez vos données groupées en utilisant groupBy().applyInPandas() pour implémenter le modèle “split-apply-combine”. Le processus se déroule en trois étapes :
- Fractionnez les données en groupes à l’aide de
DataFrame.groupBy. - Appliquez une fonction sur chaque groupe. L'entrée et la sortie de la fonction sont toutes deux
pandas.DataFrame. Les données d’entrée contiennent toutes les lignes et colonnes pour chaque groupe. - Combiner les résultats dans un nouveau
DataFrame.
Pour utiliser groupBy().applyInPandas(), vous devez définir les éléments suivants :
- Fonction Python qui définit le calcul pour chaque groupe
- Objet
StructTypeou chaîne qui définit le schéma de la sortieDataFrame
Les étiquettes de colonne du pandas.DataFrame retourné doivent correspondre aux noms de champs dans le schéma de sortie défini s’ils sont spécifiés en tant que chaînes, ou correspondre aux types de données de champ par position si ce n’est pas les chaînes, par exemple, les index entiers. Voir pandas.DataFrame pour savoir comment étiqueter les colonnes lors de la construction d’un pandas.DataFrame.
Toutes les données d’un groupe sont chargées en mémoire avant l’application de la fonction. Cela peut entraîner des exceptions de mémoire insuffisante, en particulier si les groupes sont de taille asymétrique. La configuration de maxRecordsPerBatch n’est pas appliquée sur les groupes et vous devez vous assurer que les données groupées s’intègrent dans la mémoire disponible.
L’exemple suivant montre comment utiliser groupby().apply() pour soustraire la moyenne de chaque valeur du groupe.
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=v - v.mean())
df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id| v|
# +---+----+
# | 1|-0.5|
# | 1| 0.5|
# | 2|-3.0|
# | 2|-1.0|
# | 2| 4.0|
# +---+----+
Pour une utilisation détaillée, consultez pyspark.sql.GroupedData.applyInPandas.
Carte
Vous effectuez des opérations cartographiques avec des instances pandas en DataFrame.mapInPandas() afin de transformer un itérateur de pandas.DataFrame en un autre itérateur de pandas.DataFrame qui représente le DataFrame PySpark actuel et retourne le résultat sous la forme d’un DataFrame PySpark.
La fonction sous-jacente prend et génère un itérateur de pandas.DataFrame. Elle peut retourner une sortie d’une longueur arbitraire, contrairement à certaines fonctions définies par l’utilisateur pandas comme la fonction de série à série.
L'exemple suivant montre comment utiliser mapInPandas() :
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
def filter_func(iterator):
for pdf in iterator:
yield pdf[pdf.id == 1]
df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# | 1| 21|
# +---+---+
Pour une utilisation détaillée, consultez pyspark.sql.DataFrame.mapInPandas.
Carte cogroupée
Pour les opérations de mappage cogroupées avec les instances de pandas, utilisez DataFrame.groupby().cogroup().applyInPandas() pour cogrouper deux éléments DataFramePySpark par une clé commune, puis appliquez une fonction Python à chaque cogroupe, comme montré :
- Mélangez les données de manière à ce que les groupes de chaque DataFrame qui partagent une clé soient regroupés.
- Appliquez une fonction à chaque cogroupe. L'entrée de la fonction est constituée de deux
pandas.DataFrame(accompagnée d'un tuple facultatif représentant la clé). La sortie de la fonction est unpandas.DataFrame. - Combinez les
pandas.DataFrames de tous les groupes dans un nouveau PySparkDataFrame.
Pour utiliser groupBy().cogroup().applyInPandas(), vous devez définir les éléments suivants :
- Fonction Python qui définit le calcul de chaque cogroupe.
- Objet
StructTypeou chaîne qui définit le schéma de la sortie PySparkDataFrame.
Les étiquettes de colonne du pandas.DataFrame retourné doivent correspondre aux noms de champs dans le schéma de sortie défini s’ils sont spécifiés en tant que chaînes, ou correspondre aux types de données de champ par position si ce n’est pas les chaînes, par exemple, les index entiers. Voir pandas.DataFrame pour savoir comment étiqueter les colonnes lors de la construction d’un pandas.DataFrame.
Toutes les données d’un cogroupe sont chargées en mémoire avant l’application de la fonction. Cela peut entraîner des exceptions de mémoire insuffisante, en particulier si les groupes sont de taille asymétrique. La configuration de maxRecordsPerBatch n’est pas appliquée et vous devez vous assurer que les données cogroupées s’intègrent dans la mémoire disponible.
L’exemple suivant montre comment utiliser groupby().cogroup().applyInPandas() pour effectuer une asof join entre deux jeux de données.
import pandas as pd
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("time", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("time", "id", "v2"))
def asof_join(l, r):
return pd.merge_asof(l, r, on="time", by="id")
df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# | time| id| v1| v2|
# +--------+---+---+---+
# |20000101| 1|1.0| x|
# |20000102| 1|3.0| x|
# |20000101| 2|2.0| y|
# |20000102| 2|4.0| y|
# +--------+---+---+---+
Pour une utilisation détaillée, consultez pyspark.sql.PandasCogroupedOps.applyInPandas.