Bagikan melalui


fungsi panda yang ditentukan pengguna

Fungsi yang ditentukan pengguna (UDF) pandas - juga dikenal sebagai UDF vektor - adalah fungsi yang ditentukan pengguna yang menggunakan Apache Arrow untuk mentransfer data dan pandas untuk bekerja dengan data. UDF pandas memungkinkan operasi dalam vektor yang dapat meningkatkan performa hingga 100x dibandingkan dengan UDF Python yang hanya satu baris pada satu waktu.

Untuk informasi latar belakang, lihat posting blog UDF Pandas Baru dan Petunjuk Jenis Python di Rilis Apache Spark 3.0 mendatang.

Anda mendefinisikan UDF pandas menggunakan kata kunci pandas_udf sebagai dekorator dan membungkus fungsi dengan petunjuk jenis Python. Artikel ini menjelaskan berbagai jenis UDF pandas dan juga menunjukkan cara menggunakan UDF pandas dengan petunjuk jenis.

Seri ke Seri UDF

Anda menggunakan UDF pandas Seri ke Seri untuk melakukan vektor operasi skalar. Anda dapat menggunakannya dengan API seperti select dan withColumn.

Fungsi Python harus mengambil Seri pandas sebagai input dan mengembalikan Seri pandas dengan panjang yang sama. Anda juga harus menentukannya pada petunjuk jenis Python. Spark menjalankan UDF pandas dengan membagi kolom menjadi batch, memanggil fungsi untuk setiap batch sebagai subset data, kemudian menggabungkan hasilnya.

Contoh berikut menunjukkan cara membuat UDF pandas yang menghitung produk dari 2 kolom.

import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

# Declare the function and create the UDF
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
    return a * b

multiply = pandas_udf(multiply_func, returnType=LongType())

# The function for a pandas_udf should be able to execute with local pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
# 0    1
# 1    4
# 2    9
# dtype: int64

# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))

# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()
# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# |                  1|
# |                  4|
# |                  9|
# +-------------------+

Iterator Seri ke Iterator Seri UDF

Iterator UDF sama dengan skalar UDF pandas, kecuali:

  • Fungsi Python
    • Mengambil iterator batch sebagai input, bukan batch input tunggal.
    • Mengembalikan iterator batch output, bukan batch output tunggal.
  • Panjang seluruh output dalam iterator harus sama dengan panjang seluruh input.
  • UDF pandas yang dibungkus akan mengambil satu kolom Spark sebagai input.

Anda harus menentukan petunjuk jenis Python sebagai Iterator[pandas.Series] ->Iterator[pandas.Series].

UDF pandas ini berguna ketika eksekusi UDF memerlukan inisialisasi pada beberapa kondisi, misalnya, memuat file model pembelajaran mesin untuk menerapkan inferensi ke setiap batch input.

Contoh berikut menunjukkan cara membuat UDF pandas dengan dukungan iterator.

import pandas as pd
from typing import Iterator
from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

# When the UDF is called with the column,
# the input to the underlying function is an iterator of pd.Series.
@pandas_udf("long")
def plus_one(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    for x in batch_iter:
        yield x + 1

df.select(plus_one(col("x"))).show()
# +-----------+
# |plus_one(x)|
# +-----------+
# |          2|
# |          3|
# |          4|
# +-----------+

# In the UDF, you can initialize some state before processing batches.
# Wrap your code with try/finally or use context managers to ensure
# the release of resources at the end.
y_bc = spark.sparkContext.broadcast(1)

@pandas_udf("long")
def plus_y(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    y = y_bc.value  # initialize states
    try:
        for x in batch_iter:
            yield x + y
    finally:
        pass  # release resources here, if any

df.select(plus_y(col("x"))).show()
# +---------+
# |plus_y(x)|
# +---------+
# |        2|
# |        3|
# |        4|
# +---------+

Iterator dari beberapa Seri ke Iterator Seri UDF

Iterator dari beberapa Seri ke Iterator Seri UDF memiliki karakteristik dan batasan yang sama dengan Iterator Seri ke Iterator Seri UDF. Fungsi yang telah ditentukan akan mengambil iterator batch dan mengeluarkan iterator batch. Ini juga berguna ketika eksekusi UDF memerlukan inisialisasi pada beberapa kondisi.

Perbedaannya adalah:

  • Fungsi Python yang mendasarinya akan mengambil iterator dari tuple Seri pandas.
  • UDF pandas yang dibungkus akan mengambil beberapa kolom Spark sebagai input.

Anda menentukan petunjuk jenis sebagai Iterator[Tuple[pandas.Series, ...]] ->Iterator[pandas.Series].

from typing import Iterator, Tuple
import pandas as pd

from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

@pandas_udf("long")
def multiply_two_cols(
        iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
    for a, b in iterator:
        yield a * b

df.select(multiply_two_cols("x", "x")).show()
# +-----------------------+
# |multiply_two_cols(x, x)|
# +-----------------------+
# |                      1|
# |                      4|
# |                      9|
# +-----------------------+

Seri ke skalar UDF

Seri ke skalar UDF pandas mirip dengan fungsi agregat Spark. Seri ke skalar UDF pandas mendefinisikan agregasi dari satu atau beberapa Seri pandas ke nilai skalar, di mana setiap Seri pandas mewakili satu kolom Spark. Anda menggunakan Seri ke skalar UDF pandas dengan API seperti select, withColumn, groupBy.agg, dan pyspark.sql.Window.

Anda menyatakan petunjuk jenis sebagai pandas.Series, ... ->Any. Jenis pengembaliannya harus berupa jenis data primitif, dan skalar yang dikembalikan dapat berupa jenis primitif Python, misalnya, int atau float, atau jenis data NumPy, seperti numpy.int64 atau numpy.float64. Any idealnya adalah jenis skalar khusus.

Jenis UDF ini tidak mendukung agregasi parsial dan semua data untuk setiap grup dimuat ke dalam memori.

Contoh berikut menunjukkan cara menggunakan UDF jenis ini untuk menghitung mean dengan operasi select, groupBy, dan window:

import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

# Declare the function and create the UDF
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
    return v.mean()

df.select(mean_udf(df['v'])).show()
# +-----------+
# |mean_udf(v)|
# +-----------+
# |        4.2|
# +-----------+

df.groupby("id").agg(mean_udf(df['v'])).show()
# +---+-----------+
# | id|mean_udf(v)|
# +---+-----------+
# |  1|        1.5|
# |  2|        6.0|
# +---+-----------+

w = Window \
    .partitionBy('id') \
    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
# +---+----+------+
# | id|   v|mean_v|
# +---+----+------+
# |  1| 1.0|   1.5|
# |  1| 2.0|   1.5|
# |  2| 3.0|   6.0|
# |  2| 5.0|   6.0|
# |  2|10.0|   6.0|
# +---+----+------+

Untuk penggunaan terperinci, lihat pyspark.sql.functions.pandas_udf.

Penggunaan

Mengatur ukuran batch Arrow

Catatan

Konfigurasi ini tidak berdampak pada komputasi yang dikonfigurasi dengan mode akses bersama dan Databricks Runtime 13.3 LTS hingga 14.2.

Partisi data di Spark diubah menjadi kumpulan baris Arrow, yang dapat menyebabkan penggunaan memori tinggi untuk sementara pada JVM. Untuk menghindari kemungkinan dari pengecualian memori, Anda dapat menyesuaikan ukuran batch baris Arrow dengan mengatur konfigurasi spark.sql.execution.arrow.maxRecordsPerBatch ke bilangan bulat yang menentukan jumlah baris maksimum untuk setiap batch. Nilai default adalah 10.000 baris per batch. Jika jumlah kolom besar, nilainya harus sesuai dengan jumlah tersebut. Dengan menggunakan batas ini, setiap partisi data dibagi menjadi 1 atau beberapa batch baris untuk diproses.

Stempel waktu dengan semantik zona waktu

Spark menyimpan stempel waktu secara internal sebagai nilai UTC, dan data stempel waktu yang dibawa tanpa zona waktu tertentu diubah menjadi waktu lokal ke UTC dengan resolusi mikrodetik.

Saat data stempel waktu diekspor atau ditampilkan di Spark, zona waktu sesi digunakan untuk melokalkan nilai stempel waktu. Zona waktu sesi diatur dengan konfigurasi dan default spark.sql.session.timeZone ke zona waktu lokal sistem JVM. pandas menggunakan jenis datetime64 dengan resolusi nanodetik, datetime64[ns], dengan zona waktu opsional per kolom.

Ketika data stempel waktu ditransfer dari Spark ke pandas, data tersebut akan dikonversi menjadi nanodetik dan setiap kolomnya akan dikonversi ke zona waktu sesi Spark, kemudian dilokalkan ke zona waktu tersebut, yang mana akan menghapus zona waktu dan menampilkan nilai sebagai waktu setempat. Ini terjadi saat memanggil toPandas() atau pandas_udf dengan kolom stempel waktu.

Ketika data stempel waktu ditransfer dari pandas ke Spark, data tersebut dikonversi ke mikrodetik UTC. Ini terjadi ketika memanggil createDataFrame dengan DataFrame pandas atau saat mengembalikan stempel waktu dari UDF pandas. Konversi ini dilakukan secara otomatis untuk memastikan Spark memiliki data dalam format yang diharapkan, jadi konversi ini tidak perlu dilakukan sendiri. Setiap nilai nanodetik dipotong.

UDF standar memuat data stempel waktu sebagai objek tanggalwaktu Python, yang berbeda dari stempel waktu pandas. Untuk mendapatkan performa terbaik, kami sarankan Anda menggunakan fungsi deret waktu pandas saat mengerjakan stempel waktu di UDF pandas. Untuk detailnya, lihat Fungsi Deret Waktu / Tanggal.

Contoh notebook

Notebook berikut mengilustrasikan peningkatan performa yang dapat Anda capai dengan UDF pandas:

notebook tolok ukur UDF pandas

Dapatkan buku catatan