Fungsi agregat yang ditentukan pengguna (UDAF)
Berlaku untuk: Databricks Runtime
Fungsi agregat yang ditentukan pengguna (UDAF) adalah rutinitas yang dapat diprogram pengguna yang bertindak pada beberapa baris sekaligus dan mengembalikan satu nilai agregat sebagai hasilnya. Dokumentasi ini mencantumkan kelas yang diperlukan untuk membuat dan mendaftarkan UDAF. Ini juga berisi contoh yang menunjukkan cara menentukan dan mendaftarkan UDAF di Scala dan memanggilnya di Spark SQL.
Agregator
SintaksAggregator[-IN, BUF, OUT]
Kelas dasar untuk agregasi yang ditentukan pengguna, yang dapat digunakan dalam operasi Himpunan Data untuk mengambil semua elemen grup dan menguranginya menjadi satu nilai.
IN: Jenis input untuk agregasi.
BUF: Jenis nilai perantara pengurangan.
OUT: Jenis hasil output akhir.
bufferEncoder: Encoder[BUF]
Encoder untuk jenis nilai menengah.
finish(reduction: BUF): OUT
Mengubah output pengurangan.
merge(b1: BUF, b2: BUF): BUF
Gabungkan dua nilai perantara.
outputEncoder: Encoder[OUT]
Encoder untuk jenis nilai output akhir.
reduce(b: BUF, a: IN): BUF
Agregat nilai
a
input ke dalam nilai perantara saat ini. Untuk performa, fungsi dapat memodifikasib
dan mengembalikannya alih-alih membangun objek baru untukb
.nol: BUF
Nilai awal hasil perantara untuk agregasi ini.
Contoh
Fungsi agregat yang ditentukan pengguna yang ditentukan pengguna yang aman
Agregasi yang ditentukan pengguna untuk Himpunan Data yang diketik dengan kuat berputar di Aggregator
sekitar kelas abstrak.
Misalnya, rata-rata jenis aman yang ditentukan pengguna dapat terlihat seperti:
Fungsi agregat yang ditentukan pengguna yang tidak dititipkan
Agregasi yang ditik, seperti yang dijelaskan di atas, juga dapat didaftarkan sebagai UDF agregasi yang tidak ditiru untuk digunakan dengan DataFrames. Misalnya, rata-rata yang ditentukan pengguna untuk DataFrames yang tidak ditiru dapat terlihat seperti:
Scala
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.functions
case class Average(var sum: Long, var count: Long)
object MyAverage extends Aggregator[Long, Average, Double] {
// A zero value for this aggregation. Should satisfy the property that any b + zero = b
def zero: Average = Average(0L, 0L)
// Combine two values to produce a new value. For performance, the function may modify `buffer`
// and return it instead of constructing a new object
def reduce(buffer: Average, data: Long): Average = {
buffer.sum += data
buffer.count += 1
buffer
}
// Merge two intermediate values
def merge(b1: Average, b2: Average): Average = {
b1.sum += b2.sum
b1.count += b2.count
b1
}
// Transform the output of the reduction
def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count
// The Encoder for the intermediate value type
def bufferEncoder: Encoder[Average] = Encoders.product
// The Encoder for the final output value type
def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}
// Register the function to access it
spark.udf.register("myAverage", functions.udaf(MyAverage))
val df = spark.read.format("json").load("examples/src/main/resources/employees.json")
df.createOrReplaceTempView("employees")
df.show()
// +-------+------+
// | name|salary|
// +-------+------+
// |Michael| 3000|
// | Andy| 4500|
// | Justin| 3500|
// | Berta| 4000|
// +-------+------+
val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")
result.show()
// +--------------+
// |average_salary|
// +--------------+
// | 3750.0|
// +--------------+
Jawa
import java.io.Serializable;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.expressions.Aggregator;
import org.apache.spark.sql.functions;
public static class Average implements Serializable {
private long sum;
private long count;
// Constructors, getters, setters...
}
public static class MyAverage extends Aggregator<Long, Average, Double> {
// A zero value for this aggregation. Should satisfy the property that any b + zero = b
public Average zero() {
return new Average(0L, 0L);
}
// Combine two values to produce a new value. For performance, the function may modify `buffer`
// and return it instead of constructing a new object
public Average reduce(Average buffer, Long data) {
long newSum = buffer.getSum() + data;
long newCount = buffer.getCount() + 1;
buffer.setSum(newSum);
buffer.setCount(newCount);
return buffer;
}
// Merge two intermediate values
public Average merge(Average b1, Average b2) {
long mergedSum = b1.getSum() + b2.getSum();
long mergedCount = b1.getCount() + b2.getCount();
b1.setSum(mergedSum);
b1.setCount(mergedCount);
return b1;
}
// Transform the output of the reduction
public Double finish(Average reduction) {
return ((double) reduction.getSum()) / reduction.getCount();
}
// The Encoder for the intermediate value type
public Encoder<Average> bufferEncoder() {
return Encoders.bean(Average.class);
}
// The Encoder for the final output value type
public Encoder<Double> outputEncoder() {
return Encoders.DOUBLE();
}
}
// Register the function to access it
spark.udf().register("myAverage", functions.udaf(new MyAverage(), Encoders.LONG()));
Dataset<Row> df = spark.read().format("json").load("examples/src/main/resources/employees.json");
df.createOrReplaceTempView("employees");
df.show();
// +-------+------+
// | name|salary|
// +-------+------+
// |Michael| 3000|
// | Andy| 4500|
// | Justin| 3500|
// | Berta| 4000|
// +-------+------+
Dataset<Row> result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees");
result.show();
// +--------------+
// |average_salary|
// +--------------+
// | 3750.0|
// +--------------+
SQL
-- Compile and place UDAF MyAverage in a JAR file called `MyAverage.jar` in /tmp.
CREATE FUNCTION myAverage AS 'MyAverage' USING JAR '/tmp/MyAverage.jar';
SHOW USER FUNCTIONS;
+------------------+
| function|
+------------------+
| default.myAverage|
+------------------+
CREATE TEMPORARY VIEW employees
USING org.apache.spark.sql.json
OPTIONS (
path "examples/src/main/resources/employees.json"
);
SELECT * FROM employees;
+-------+------+
| name|salary|
+-------+------+
|Michael| 3000|
| Andy| 4500|
| Justin| 3500|
| Berta| 4000|
+-------+------+
SELECT myAverage(salary) as average_salary FROM employees;
+--------------+
|average_salary|
+--------------+
| 3750.0|
+--------------+