Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Fungsi yang ditentukan pengguna (UDF) pandas - juga dikenal sebagai UDF vektor - adalah fungsi yang ditentukan pengguna yang menggunakan Apache Arrow untuk mentransfer data dan pandas untuk bekerja dengan data. UDF pandas memungkinkan operasi dalam vektor yang dapat meningkatkan performa hingga 100x dibandingkan dengan UDF Python yang hanya satu baris pada satu waktu.
Untuk informasi latar belakang, lihat posting blog UDF Pandas Baru dan Petunjuk Jenis Python di Rilis Apache Spark 3.0 mendatang.
Anda mendefinisikan UDF pandas menggunakan kata kunci pandas_udf
sebagai dekorator dan membungkus fungsi dengan petunjuk jenis Python.
Artikel ini menjelaskan berbagai jenis UDF pandas dan juga menunjukkan cara menggunakan UDF pandas dengan petunjuk jenis.
Seri ke Seri UDF
Anda menggunakan UDF pandas Seri ke Seri untuk melakukan vektor operasi skalar.
Anda dapat menggunakannya dengan API seperti select
dan withColumn
.
Fungsi Python harus mengambil Seri pandas sebagai input dan mengembalikan Seri pandas dengan panjang yang sama. Anda juga harus menentukannya pada petunjuk jenis Python. Spark menjalankan UDF panda dengan membagi kolom menjadi batch, memanggil fungsi untuk setiap batch sebagai subset data, lalu menggabungkan hasilnya.
Contoh berikut menunjukkan cara membuat Pandas UDF yang menghitung perkalian dari 2 kolom.
import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType
# Declare the function and create the UDF
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
return a * b
multiply = pandas_udf(multiply_func, returnType=LongType())
# The function for a pandas_udf should be able to execute with local pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
# 0 1
# 1 4
# 2 9
# dtype: int64
# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))
# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()
# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# | 1|
# | 4|
# | 9|
# +-------------------+
Iterator Seri ke Iterator Seri UDF
Iterator UDF sama dengan skalar UDF pandas, kecuali:
- Fungsi Python
- Mengambil iterator batch sebagai input, bukan batch input tunggal.
- Mengembalikan iterator batch output, bukan batch output tunggal.
- Panjang seluruh output dalam iterator harus sama dengan panjang seluruh input.
- UDF Pandas yang terbungkus mengambil satu kolom Spark sebagai input.
Anda harus menentukan petunjuk jenis Python sebagai Iterator[pandas.Series]
->Iterator[pandas.Series]
.
UDF pandas ini berguna ketika eksekusi UDF memerlukan inisialisasi pada beberapa kondisi, misalnya, memuat file model pembelajaran mesin untuk menerapkan inferensi ke setiap batch input.
Contoh berikut menunjukkan cara membuat UDF pandas dengan dukungan iterator.
import pandas as pd
from typing import Iterator
from pyspark.sql.functions import col, pandas_udf, struct
pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)
# When the UDF is called with the column,
# the input to the underlying function is an iterator of pd.Series.
@pandas_udf("long")
def plus_one(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
for x in batch_iter:
yield x + 1
df.select(plus_one(col("x"))).show()
# +-----------+
# |plus_one(x)|
# +-----------+
# | 2|
# | 3|
# | 4|
# +-----------+
# In the UDF, you can initialize some state before processing batches.
# Wrap your code with try/finally or use context managers to ensure
# the release of resources at the end.
y_bc = spark.sparkContext.broadcast(1)
@pandas_udf("long")
def plus_y(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
y = y_bc.value # initialize states
try:
for x in batch_iter:
yield x + y
finally:
pass # release resources here, if any
df.select(plus_y(col("x"))).show()
# +---------+
# |plus_y(x)|
# +---------+
# | 2|
# | 3|
# | 4|
# +---------+
Iterator dari beberapa Seri ke Iterator Seri UDF
Iterator dari beberapa Seri ke Iterator Seri UDF memiliki karakteristik dan batasan yang sama dengan Iterator Seri ke Iterator Seri UDF. Fungsi yang telah ditentukan akan mengambil iterator batch dan mengeluarkan iterator batch. Ini juga berguna ketika eksekusi UDF memerlukan inisialisasi pada beberapa kondisi.
Perbedaannya adalah:
- Fungsi Python yang mendasarinya akan mengambil iterator dari tuple Seri pandas.
- UDF panda yang dibungkus mengambil beberapa kolom Spark sebagai input.
Anda menentukan petunjuk jenis sebagai Iterator[Tuple[pandas.Series, ...]]
->Iterator[pandas.Series]
.
from typing import Iterator, Tuple
import pandas as pd
from pyspark.sql.functions import col, pandas_udf, struct
pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)
@pandas_udf("long")
def multiply_two_cols(
iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
for a, b in iterator:
yield a * b
df.select(multiply_two_cols("x", "x")).show()
# +-----------------------+
# |multiply_two_cols(x, x)|
# +-----------------------+
# | 1|
# | 4|
# | 9|
# +-----------------------+
Seri ke skalar UDF
Seri ke skalar UDF pandas mirip dengan fungsi agregat Spark.
Seri ke UDF pandas skalar menentukan agregasi dari satu atau beberapa Seri pandas ke nilai skalar, di mana setiap Seri pandas mewakili kolom Spark.
Anda menggunakan UDF pandas skalar Seri dengan API seperti select
, withColumn
, groupBy.agg
, dan pyspark.sql.Window.
Anda menyatakan petunjuk jenis sebagai pandas.Series, ...
->Any
. Jenis pengembaliannya harus berupa jenis data primitif, dan skalar yang dikembalikan dapat berupa jenis primitif Python, misalnya, int
atau float
, atau jenis data NumPy, seperti numpy.int64
atau numpy.float64
.
Any
idealnya adalah jenis skalar khusus.
Jenis UDF ini tidak mendukung agregasi parsial dan semua data untuk setiap grup dimuat ke dalam memori.
Contoh berikut menunjukkan cara menggunakan UDF jenis ini untuk menghitung mean dengan operasi select
, groupBy
, dan window
:
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
# Declare the function and create the UDF
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
return v.mean()
df.select(mean_udf(df['v'])).show()
# +-----------+
# |mean_udf(v)|
# +-----------+
# | 4.2|
# +-----------+
df.groupby("id").agg(mean_udf(df['v'])).show()
# +---+-----------+
# | id|mean_udf(v)|
# +---+-----------+
# | 1| 1.5|
# | 2| 6.0|
# +---+-----------+
w = Window \
.partitionBy('id') \
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
# +---+----+------+
# | id| v|mean_v|
# +---+----+------+
# | 1| 1.0| 1.5|
# | 1| 2.0| 1.5|
# | 2| 3.0| 6.0|
# | 2| 5.0| 6.0|
# | 2|10.0| 6.0|
# +---+----+------+
Untuk penggunaan terperinci, lihat pyspark.sql.functions.pandas_udf.
Penggunaan
Mengatur ukuran batch Arrow
Catatan
Konfigurasi ini tidak berdampak pada komputasi yang dikonfigurasi dengan mode akses bersama dan Databricks Runtime 13.3 LTS hingga 14.2.
Partisi data di Spark diubah menjadi kumpulan baris Arrow, yang dapat menyebabkan penggunaan memori tinggi untuk sementara pada JVM. Untuk menghindari kemungkinan dari pengecualian memori, Anda dapat menyesuaikan ukuran batch baris Arrow dengan mengatur konfigurasi spark.sql.execution.arrow.maxRecordsPerBatch
ke bilangan bulat yang menentukan jumlah baris maksimum untuk setiap batch. Nilai default adalah 10.000 baris per batch. Jika jumlah kolom besar, nilai harus disesuaikan. Dengan menggunakan batas ini, setiap partisi data dibagi menjadi 1 batch rekaman atau lebih untuk diproses.
Stempel waktu dengan semantik zona waktu
Spark secara internal menyimpan tanda waktu sebagai nilai UTC, dan data tanda waktu yang dibawa tanpa zona waktu tertentu dikonversi sebagai waktu lokal ke UTC dengan resolusi mikrosekond.
Saat data tanda waktu diekspor atau ditampilkan di Spark, zona waktu sesi digunakan untuk melokalisasi nilai tanda waktu. Zona waktu sesi diatur dengan konfigurasi spark.sql.session.timeZone
dan default ke zona waktu lokal sistem JVM. pandas menggunakan jenis datetime64
dengan resolusi nanodetik, datetime64[ns]
, dengan zona waktu opsional untuk setiap kolom.
Ketika data stempel waktu ditransfer dari Spark ke pandas, data dikonversi menjadi nanodetik, dan setiap kolom dikonversi ke zona waktu sesi Spark, lalu dilokalkan ke zona waktu tersebut. Proses ini menghapus zona waktu dan menampilkan nilai sebagai waktu lokal. Ini terjadi saat memanggil toPandas()
atau pandas_udf
dengan kolom tanda waktu.
Ketika data stempel waktu ditransfer dari pandas ke Spark, data tersebut dikonversi ke mikrodetik UTC. Ini terjadi ketika memanggil createDataFrame
dengan DataFrame pandas atau saat mengembalikan stempel waktu dari UDF pandas. Konversi ini dilakukan secara otomatis untuk memastikan Spark memiliki data dalam format yang diharapkan, jadi konversi ini tidak perlu dilakukan sendiri. Nilai nanodetik apa pun dipotong.
UDF standar memuat data stempel waktu sebagai objek tanggalwaktu Python, yang berbeda dari stempel waktu pandas. Untuk mendapatkan performa terbaik, kami sarankan Anda menggunakan fungsionalitas rangkaian waktu panda saat bekerja dengan tanda waktu di UDF panda. Untuk detailnya, lihat Fungsi Deret Waktu / Tanggal.
Contoh notebook
Notebook berikut mengilustrasikan peningkatan performa yang dapat Anda capai dengan UDF pandas:
notebook tolok ukur UDF pandas
Dapatkan buku catatan