Compartilhar via


APIs de função do Pandas

As APIs de função do Pandas permitem que você aplique diretamente uma função nativa do Python, que recebe e gera instâncias pandas, a um PySpark DataFrame. Semelhante às funções definidas pelo usuário do Pandas, as APIs de função também usam o Apache Arrow para transferir dados e pandas para trabalhar com os dados; no entanto, as dicas de tipo Python são opcionais em APIs de função do Pandas.

Há três tipos de APIs de função do Pandas:

  • Mapeamento agrupado
  • Mapeamento
  • Mapeamento coagrupado

As APIs de função do Pandas aproveitam a mesma lógica interna que a execução UDF do Pandas usa. Elas compartilham características, como PyArrow, tipos de SQL com suporte e as configurações.

Para obter mais informações, confira a postagem do blog Novas UDFs do Pandas e dicas de tipo Python na próxima versão do Apache Spark 3.0.

Mapeamento agrupado

Você transforma seus dados agrupados usando groupBy().applyInPandas() para implementar o padrão “dividir-aplicar-combinar”. Dividir-aplicar-combinar consiste em três etapas:

  • Dividir os dados em grupos usando DataFrame.groupBy.
  • Aplicar uma função em cada grupo. A entrada e a saída da função são pandas.DataFrame. Os dados de entrada contêm todas as linhas e colunas para cada grupo.
  • Combinar os resultados em um novo DataFrame.

Para usar groupBy().applyInPandas(), você deve definir o seguinte:

  • Uma função Python que define a computação para cada grupo
  • Um objeto ou uma cadeia de caracteres StructType que define o esquema da saída DataFrame

Os rótulos de coluna do pandas.DataFrame retornado devem corresponder aos nomes de campo no esquema de saída definido, se especificado como cadeias de caracteres, ou corresponder aos tipos de dados de campo por posição, se não forem cadeias de caracteres, por exemplo, índices inteiros. Confira pandas.DataFrame para saber como rotular colunas ao construir um pandas.DataFrame.

Todos os dados de um grupo são carregados na memória antes que a função seja aplicada. Isso pode levar a exceções de falta de memória, especialmente se os tamanhos dos grupos estiverem distorcidos. A configuração para maxRecordsPerBatch não é aplicada em grupos e cabe a você garantir que os dados agrupados caibam na memória disponível.

O exemplo a seguir mostra como usar groupby().apply() para subtrair a média de cada valor no grupo.

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

def subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id|   v|
# +---+----+
# |  1|-0.5|
# |  1| 0.5|
# |  2|-3.0|
# |  2|-1.0|
# |  2| 4.0|
# +---+----+

Para uso detalhado, confira pyspark.sql.GroupedData.applyInPandas.

Mapeamento

Você executa operações de mapa com instâncias de pandas por DataFrame.mapInPandas() para transformar um iterador de pandas.DataFrame em outro iterador de pandas.DataFrame que representa o PySpark DataFrame atual e retorna o resultado como um PySpark DataFrame.

A função subjacente recebe e gera um iterador de pandas.DataFrame. Ele pode retornar a saída de comprimento arbitrário em contraste com alguns UDFs do Pandas, como de série para série.

O exemplo a seguir mostra como usar mapInPandas():

df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

def filter_func(iterator):
    for pdf in iterator:
        yield pdf[pdf.id == 1]

df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# |  1| 21|
# +---+---+

Para uso detalhado, confira pyspark.sql.DataFrame.mapInPandas.

Mapeamento coagrupado

Para operações de mapeamentos coagrupados com instâncias do Pandas, use DataFrame.groupby().cogroup().applyInPandas() para coagrupar dois PySpark DataFrames sejam coagrupados por uma chave comum e, em seguida, aplique uma função Python a cada cogrupo conforme mostrado:

  • Embaralhe os dados de forma que os grupos de cada DataFrame que compartilham uma chave sejam coagrupados.
  • Aplique uma função a cada cogrupo. A entrada da função é dois pandas.DataFrame (com uma tupla opcional representando a chave). A saída da função é um pandas.DataFrame.
  • Combine os pandas.DataFrames de todos os grupos em um novo PySpark DataFrame.

Para usar groupBy().cogroup().applyInPandas(), você deve definir o seguinte:

  • Uma função Python que define a computação para cada cogrupo.
  • Um objeto ou uma cadeia de caracteres StructType que define o esquema da saída PySpark DataFrame.

Os rótulos de coluna do pandas.DataFrame retornado devem corresponder aos nomes de campo no esquema de saída definido, se especificado como cadeias de caracteres, ou corresponder aos tipos de dados de campo por posição, se não forem cadeias de caracteres, por exemplo, índices inteiros. Confira pandas.DataFrame para saber como rotular colunas ao construir um pandas.DataFrame.

Todos os dados de um cogrupo são carregados na memória antes que a função seja aplicada. Isso pode levar a exceções de falta de memória, especialmente se os tamanhos dos grupos estiverem distorcidos. A configuração para maxRecordsPerBatch não é aplicada e cabe a você garantir que os dados coagrupados se ajustem à memória disponível.

O exemplo a seguir mostra como usar para groupby().cogroup().applyInPandas() executar um asof join entre dois conjuntos de dados.

import pandas as pd

df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ("time", "id", "v1"))

df2 = spark.createDataFrame(
    [(20000101, 1, "x"), (20000101, 2, "y")],
    ("time", "id", "v2"))

def asof_join(l, r):
    return pd.merge_asof(l, r, on="time", by="id")

df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
    asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# |    time| id| v1| v2|
# +--------+---+---+---+
# |20000101|  1|1.0|  x|
# |20000102|  1|3.0|  x|
# |20000101|  2|2.0|  y|
# |20000102|  2|4.0|  y|
# +--------+---+---+---+

Para uso detalhado, confira pyspark.sql.PandasCogroupedOps.applyInPandas.