Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Fungsi yang ditentukan pengguna pandas (UDF)—juga dikenal sebagai UDF tervektor—adalah fungsi yang menggunakan Apache Arrow untuk mentransfer data dan pandas untuk memproses data tersebut. UDF pandas memungkinkan operasi bervektor yang dapat meningkatkan performa hingga 100x dibandingkan dengan UDF Python yang memproses satu baris sekaligus.
Untuk informasi latar belakang, lihat pos blog Pandas UDF Baru dan Petunjuk Jenis Python dalam Rilis Mendatang Apache Spark 3.0.
Anda mendefinisikan UDF pandas menggunakan kata kunci pandas_udf sebagai dekorator dan membungkus fungsi dengan petunjuk jenis Python.
Artikel ini mendeskripsikan berbagai jenis UDF pandas dan menunjukkan cara menggunakan UDF pandas dengan petunjuk tipe.
Seri-ke-Seri UDF
Anda menggunakan UDF pandas Seri ke Seri untuk melakukan vektor operasi skalar.
Anda dapat menggunakannya dengan API seperti select dan withColumn.
Fungsi Python harus menerima Seri pandas sebagai input dan mengembalikan Seri pandas dengan panjang yang sama. Tentukan jenis ini menggunakan petunjuk jenis Python. Spark menjalankan UDF pandas dengan membagi data menjadi batch baris, memanggil fungsi untuk setiap batch, dan kemudian menggabungkan hasilnya.
Contoh berikut menunjukkan cara membuat Pandas UDF yang menghitung perkalian dari 2 kolom.
import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType
# Declare the function and create the UDF
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
return a * b
multiply = pandas_udf(multiply_func, returnType=LongType())
# The function for a pandas_udf should be able to execute with local pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
# 0 1
# 1 4
# 2 9
# dtype: int64
# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))
# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()
# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# | 1|
# | 4|
# | 9|
# +-------------------+
Iterator dari Seri menjadi Iterator dari Seri UDF
Iterator UDF sama dengan UDF pandas skalar, kecuali:
- Fungsi Python
- Menggunakan iterator dari beberapa batch sebagai input, bukan batch input tunggal.
- Mengembalikan iterator batch output, bukan batch output tunggal.
- Panjang seluruh output dalam iterator harus sama dengan panjang seluruh input.
- UDF Pandas yang terbungkus mengambil satu kolom Spark sebagai input.
Anda harus menentukan petunjuk jenis Python sebagai Iterator[pandas.Series] ->Iterator[pandas.Series].
UDF pandas ini berguna ketika eksekusi UDF memerlukan inisialisasi pada beberapa kondisi, misalnya, memuat file model pembelajaran mesin untuk menerapkan inferensi ke setiap batch input.
Contoh berikut menunjukkan cara membuat pandas UDF dengan dukungan untuk iterator.
import pandas as pd
from typing import Iterator
from pyspark.sql.functions import col, pandas_udf, struct
pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)
# When the UDF is called with the column,
# the input to the underlying function is an iterator of pd.Series.
@pandas_udf("long")
def plus_one(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
for x in batch_iter:
yield x + 1
df.select(plus_one(col("x"))).show()
# +-----------+
# |plus_one(x)|
# +-----------+
# | 2|
# | 3|
# | 4|
# +-----------+
# In the UDF, you can initialize some state before processing batches.
# Wrap your code with try/finally or use context managers to ensure
# the release of resources at the end.
y_bc = spark.sparkContext.broadcast(1)
@pandas_udf("long")
def plus_y(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
y = y_bc.value # initialize states
try:
for x in batch_iter:
yield x + y
finally:
pass # release resources here, if any
df.select(plus_y(col("x"))).show()
# +---------+
# |plus_y(x)|
# +---------+
# | 2|
# | 3|
# | 4|
# +---------+
Iterator dari beberapa Seri ke Iterator Seri UDF
Iterator dari beberapa Seri ke Iterator Seri UDF memiliki karakteristik dan batasan yang sama dengan Iterator Seri ke Iterator Seri UDF. Fungsi yang telah ditentukan akan mengambil iterator batch dan mengeluarkan iterator batch. Ini juga berguna ketika eksekusi UDF memerlukan inisialisasi pada beberapa kondisi.
Perbedaannya adalah:
- Fungsi Python yang mendasarinya akan mengambil iterator dari tuple Seri pandas.
- UDF panda yang dibungkus mengambil beberapa kolom Spark sebagai input.
Anda menentukan petunjuk jenis sebagai Iterator[Tuple[pandas.Series, ...]] ->Iterator[pandas.Series].
from typing import Iterator, Tuple
import pandas as pd
from pyspark.sql.functions import col, pandas_udf, struct
pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)
@pandas_udf("long")
def multiply_two_cols(
iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
for a, b in iterator:
yield a * b
df.select(multiply_two_cols("x", "x")).show()
# +-----------------------+
# |multiply_two_cols(x, x)|
# +-----------------------+
# | 1|
# | 4|
# | 9|
# +-----------------------+
Seri ke skalar UDF
UDF pandas dari seri ke skalar mirip dengan fungsi agregat Spark.
Seri ke UDF pandas skalar menentukan agregasi dari satu atau beberapa Seri pandas ke nilai skalar, di mana setiap Seri pandas mewakili kolom Spark.
Anda menggunakan UDF pandas skalar Seri dengan API seperti select, withColumn, groupBy.agg, dan pyspark.sql.Window.
Anda menyatakan petunjuk jenis sebagai pandas.Series, ... ->Any. Jenis pengembaliannya harus berupa jenis data primitif, dan skalar yang dikembalikan dapat berupa jenis primitif Python, misalnya, int atau float, atau jenis data NumPy, seperti numpy.int64 atau numpy.float64.
Any idealnya adalah jenis skalar tertentu.
Jenis UDF ini tidak mendukung agregasi parsial dan semua data untuk setiap grup dimuat ke dalam memori.
Contoh berikut menunjukkan cara menggunakan UDF jenis ini untuk menghitung mean dengan operasi select, groupBy, dan window:
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
# Declare the function and create the UDF
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
return v.mean()
df.select(mean_udf(df['v'])).show()
# +-----------+
# |mean_udf(v)|
# +-----------+
# | 4.2|
# +-----------+
df.groupby("id").agg(mean_udf(df['v'])).show()
# +---+-----------+
# | id|mean_udf(v)|
# +---+-----------+
# | 1| 1.5|
# | 2| 6.0|
# +---+-----------+
w = Window \
.partitionBy('id') \
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
# +---+----+------+
# | id| v|mean_v|
# +---+----+------+
# | 1| 1.0| 1.5|
# | 1| 2.0| 1.5|
# | 2| 3.0| 6.0|
# | 2| 5.0| 6.0|
# | 2|10.0| 6.0|
# +---+----+------+
Untuk penggunaan terperinci, lihat pyspark.sql.functions.pandas_udf.
Penggunaan
Mengatur ukuran batch Arrow
Catatan
Konfigurasi ini tidak berdampak pada komputasi yang dikonfigurasi dengan mode akses standar dan Databricks Runtime 13.3 LTS hingga 14.2.
Partisi data di Spark diubah menjadi kumpulan baris Arrow, yang dapat menyebabkan penggunaan memori tinggi untuk sementara pada JVM. Untuk menghindari kemungkinan pengecualian memori habis, Anda dapat menyesuaikan ukuran batch rekord Arrow dengan mengatur spark.sql.execution.arrow.maxRecordsPerBatch ke bilangan bulat yang menentukan jumlah baris maksimum untuk setiap batch. Nilai default adalah 10.000 rekaman per batch. Jika jumlah kolom besar, nilai harus disesuaikan. Dengan menggunakan batas ini, setiap partisi data dibagi menjadi 1 batch rekaman atau lebih untuk diproses.
Stempel waktu dengan semantik zona waktu
Spark secara internal menyimpan tanda waktu sebagai nilai UTC, dan data tanda waktu yang dibawa tanpa zona waktu tertentu dikonversi sebagai waktu lokal ke UTC dengan resolusi mikrosekond.
Saat data tanda waktu diekspor atau ditampilkan di Spark, zona waktu sesi digunakan untuk melokalisasi nilai tanda waktu. Zona waktu sesi diatur dengan konfigurasi spark.sql.session.timeZone dan default ke zona waktu lokal sistem JVM. pandas menggunakan jenis datetime64 dengan resolusi nanodetik, datetime64[ns], dengan zona waktu opsional untuk setiap kolom.
Ketika data stempel waktu ditransfer dari Spark ke pandas, data dikonversi menjadi nanodetik, dan setiap kolom dikonversi ke zona waktu sesi Spark, lalu dilokalkan ke zona waktu tersebut. Proses ini menghapus zona waktu dan menampilkan nilai sebagai waktu lokal. Ini terjadi saat memanggil toPandas() atau pandas_udf dengan kolom tanda waktu.
Ketika data stempel waktu ditransfer dari pandas ke Spark, data tersebut dikonversi ke mikrodetik UTC. Ini terjadi ketika memanggil createDataFrame dengan DataFrame pandas atau saat mengembalikan stempel waktu dari UDF pandas. Konversi ini dilakukan secara otomatis untuk memastikan Spark memiliki data dalam format yang diharapkan, jadi konversi ini tidak perlu dilakukan sendiri. Nilai nanodetik apa pun dipotong.
UDF standar memuat data stempel waktu sebagai objek tanggalwaktu Python, yang berbeda dari stempel waktu pandas. Untuk mendapatkan performa terbaik, kami sarankan Anda menggunakan fungsionalitas rangkaian waktu panda saat bekerja dengan tanda waktu di UDF panda. Untuk detailnya, lihat Fungsi Deret Waktu / Tanggal.
Contoh notebook
Notebook yang berikut ini mengilustrasikan peningkatan performa yang dapat Anda capai dengan pandas UDFs.
notebook tolok ukur UDF pandas
Dapatkan buku catatan