APIs de função do Pandas
As APIs de função do Pandas permitem que você aplique diretamente uma função nativa do Python, que recebe e gera instâncias pandas, a um PySpark DataFrame. Semelhante às funções definidas pelo usuário do Pandas, as APIs de função também usam o Apache Arrow para transferir dados e pandas para trabalhar com os dados; no entanto, as dicas de tipo Python são opcionais em APIs de função do Pandas.
Há três tipos de APIs de função do Pandas:
- Mapeamento agrupado
- Mapeamento
- Mapeamento coagrupado
As APIs de função do Pandas aproveitam a mesma lógica interna que a execução UDF do Pandas usa. Elas compartilham características, como PyArrow, tipos de SQL com suporte e as configurações.
Para obter mais informações, confira a postagem do blog Novas UDFs do Pandas e dicas de tipo Python na próxima versão do Apache Spark 3.0.
Mapeamento agrupado
Você transforma seus dados agrupados usando groupBy().applyInPandas()
para implementar o padrão “dividir-aplicar-combinar”. Dividir-aplicar-combinar consiste em três etapas:
- Dividir os dados em grupos usando
DataFrame.groupBy
. - Aplicar uma função em cada grupo. A entrada e a saída da função são
pandas.DataFrame
. Os dados de entrada contêm todas as linhas e colunas para cada grupo. - Combinar os resultados em um novo
DataFrame
.
Para usar groupBy().applyInPandas()
, você deve definir o seguinte:
- Uma função Python que define a computação para cada grupo
- Um objeto ou uma cadeia de caracteres
StructType
que define o esquema da saídaDataFrame
Os rótulos de coluna do pandas.DataFrame
retornado devem corresponder aos nomes de campo no esquema de saída definido, se especificado como cadeias de caracteres, ou corresponder aos tipos de dados de campo por posição, se não forem cadeias de caracteres, por exemplo, índices inteiros. Confira pandas.DataFrame para saber como rotular colunas ao construir um pandas.DataFrame
.
Todos os dados de um grupo são carregados na memória antes que a função seja aplicada. Isso pode levar a exceções de falta de memória, especialmente se os tamanhos dos grupos estiverem distorcidos. A configuração para maxRecordsPerBatch não é aplicada em grupos e cabe a você garantir que os dados agrupados caibam na memória disponível.
O exemplo a seguir mostra como usar groupby().apply()
para subtrair a média de cada valor no grupo.
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=v - v.mean())
df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id| v|
# +---+----+
# | 1|-0.5|
# | 1| 0.5|
# | 2|-3.0|
# | 2|-1.0|
# | 2| 4.0|
# +---+----+
Para uso detalhado, confira pyspark.sql.GroupedData.applyInPandas.
Mapeamento
Você executa operações de mapa com instâncias de pandas por DataFrame.mapInPandas()
para transformar um iterador de pandas.DataFrame
em outro iterador de pandas.DataFrame
que representa o PySpark DataFrame atual e retorna o resultado como um PySpark DataFrame.
A função subjacente recebe e gera um iterador de pandas.DataFrame
. Ele pode retornar a saída de comprimento arbitrário em contraste com alguns UDFs do Pandas, como de série para série.
O exemplo a seguir mostra como usar mapInPandas()
:
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
def filter_func(iterator):
for pdf in iterator:
yield pdf[pdf.id == 1]
df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# | 1| 21|
# +---+---+
Para uso detalhado, confira pyspark.sql.DataFrame.mapInPandas.
Mapeamento coagrupado
Para operações de mapeamentos coagrupados com instâncias do Pandas, use DataFrame.groupby().cogroup().applyInPandas()
para coagrupar dois PySpark DataFrame
s sejam coagrupados por uma chave comum e, em seguida, aplique uma função Python a cada cogrupo conforme mostrado:
- Embaralhe os dados de forma que os grupos de cada DataFrame que compartilham uma chave sejam coagrupados.
- Aplique uma função a cada cogrupo. A entrada da função é dois
pandas.DataFrame
(com uma tupla opcional representando a chave). A saída da função é umpandas.DataFrame
. - Combine os
pandas.DataFrame
s de todos os grupos em um novo PySparkDataFrame
.
Para usar groupBy().cogroup().applyInPandas()
, você deve definir o seguinte:
- Uma função Python que define a computação para cada cogrupo.
- Um objeto ou uma cadeia de caracteres
StructType
que define o esquema da saída PySparkDataFrame
.
Os rótulos de coluna do pandas.DataFrame
retornado devem corresponder aos nomes de campo no esquema de saída definido, se especificado como cadeias de caracteres, ou corresponder aos tipos de dados de campo por posição, se não forem cadeias de caracteres, por exemplo, índices inteiros. Confira pandas.DataFrame para saber como rotular colunas ao construir um pandas.DataFrame
.
Todos os dados de um cogrupo são carregados na memória antes que a função seja aplicada. Isso pode levar a exceções de falta de memória, especialmente se os tamanhos dos grupos estiverem distorcidos. A configuração para maxRecordsPerBatch não é aplicada e cabe a você garantir que os dados coagrupados se ajustem à memória disponível.
O exemplo a seguir mostra como usar para groupby().cogroup().applyInPandas()
executar um asof join
entre dois conjuntos de dados.
import pandas as pd
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("time", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("time", "id", "v2"))
def asof_join(l, r):
return pd.merge_asof(l, r, on="time", by="id")
df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# | time| id| v1| v2|
# +--------+---+---+---+
# |20000101| 1|1.0| x|
# |20000102| 1|3.0| x|
# |20000101| 2|2.0| y|
# |20000102| 2|4.0| y|
# +--------+---+---+---+
Para uso detalhado, confira pyspark.sql.PandasCogroupedOps.applyInPandas.