Fungsi agregat yang ditentukan pengguna (UDAF)

Berlaku untuk:check ditandai ya Databricks Runtime

Fungsi agregat yang ditentukan pengguna (UDAF) adalah rutinitas yang dapat diprogram pengguna yang bertindak pada beberapa baris sekaligus dan mengembalikan satu nilai agregat sebagai hasilnya. Dokumentasi ini mencantumkan kelas yang diperlukan untuk membuat dan mendaftarkan UDAF. Ini juga berisi contoh yang menunjukkan cara menentukan dan mendaftarkan UDAF di Scala dan memanggilnya di Spark SQL.

Agregator

SintaksAggregator[-IN, BUF, OUT]

Kelas dasar untuk agregasi yang ditentukan pengguna, yang dapat digunakan dalam operasi Himpunan Data untuk mengambil semua elemen grup dan menguranginya menjadi satu nilai.

  • IN: Jenis input untuk agregasi.

  • BUF: Jenis nilai perantara pengurangan.

  • OUT: Jenis hasil output akhir.

  • bufferEncoder: Encoder[BUF]

    Encoder untuk jenis nilai menengah.

  • finish(reduction: BUF): OUT

    Mengubah output pengurangan.

  • merge(b1: BUF, b2: BUF): BUF

    Gabungkan dua nilai perantara.

  • outputEncoder: Encoder[OUT]

    Encoder untuk jenis nilai output akhir.

  • reduce(b: BUF, a: IN): BUF

    Agregat nilai a input ke dalam nilai perantara saat ini. Untuk performa, fungsi dapat memodifikasi b dan mengembalikannya alih-alih membangun objek baru untuk b.

  • nol: BUF

    Nilai awal hasil perantara untuk agregasi ini.

Contoh

Fungsi agregat yang ditentukan pengguna yang ditentukan pengguna yang aman

Agregasi yang ditentukan pengguna untuk Himpunan Data yang diketik dengan kuat berputar di Aggregator sekitar kelas abstrak. Misalnya, rata-rata jenis aman yang ditentukan pengguna dapat terlihat seperti:

Fungsi agregat yang ditentukan pengguna yang tidak dititipkan

Agregasi yang ditik, seperti yang dijelaskan di atas, juga dapat didaftarkan sebagai UDF agregasi yang tidak ditiru untuk digunakan dengan DataFrames. Misalnya, rata-rata yang ditentukan pengguna untuk DataFrames yang tidak ditiru dapat terlihat seperti:

Scala

import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.functions

case class Average(var sum: Long, var count: Long)

object MyAverage extends Aggregator[Long, Average, Double] {
  // A zero value for this aggregation. Should satisfy the property that any b + zero = b
  def zero: Average = Average(0L, 0L)
  // Combine two values to produce a new value. For performance, the function may modify `buffer`
  // and return it instead of constructing a new object
  def reduce(buffer: Average, data: Long): Average = {
    buffer.sum += data
    buffer.count += 1
    buffer
  }
  // Merge two intermediate values
  def merge(b1: Average, b2: Average): Average = {
    b1.sum += b2.sum
    b1.count += b2.count
    b1
  }
  // Transform the output of the reduction
  def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count
  // The Encoder for the intermediate value type
  def bufferEncoder: Encoder[Average] = Encoders.product
  // The Encoder for the final output value type
  def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}

// Register the function to access it
spark.udf.register("myAverage", functions.udaf(MyAverage))

val df = spark.read.format("json").load("examples/src/main/resources/employees.json")
df.createOrReplaceTempView("employees")
df.show()
// +-------+------+
// |   name|salary|
// +-------+------+
// |Michael|  3000|
// |   Andy|  4500|
// | Justin|  3500|
// |  Berta|  4000|
// +-------+------+

val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")
result.show()
// +--------------+
// |average_salary|
// +--------------+
// |        3750.0|
// +--------------+

Jawa

import java.io.Serializable;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.expressions.Aggregator;
import org.apache.spark.sql.functions;

public static class Average implements Serializable  {
    private long sum;
    private long count;

    // Constructors, getters, setters...

}

public static class MyAverage extends Aggregator<Long, Average, Double> {
  // A zero value for this aggregation. Should satisfy the property that any b + zero = b
  public Average zero() {
    return new Average(0L, 0L);
  }
  // Combine two values to produce a new value. For performance, the function may modify `buffer`
  // and return it instead of constructing a new object
  public Average reduce(Average buffer, Long data) {
    long newSum = buffer.getSum() + data;
    long newCount = buffer.getCount() + 1;
    buffer.setSum(newSum);
    buffer.setCount(newCount);
    return buffer;
  }
  // Merge two intermediate values
  public Average merge(Average b1, Average b2) {
    long mergedSum = b1.getSum() + b2.getSum();
    long mergedCount = b1.getCount() + b2.getCount();
    b1.setSum(mergedSum);
    b1.setCount(mergedCount);
    return b1;
  }
  // Transform the output of the reduction
  public Double finish(Average reduction) {
    return ((double) reduction.getSum()) / reduction.getCount();
  }
  // The Encoder for the intermediate value type
  public Encoder<Average> bufferEncoder() {
    return Encoders.bean(Average.class);
  }
  // The Encoder for the final output value type
  public Encoder<Double> outputEncoder() {
    return Encoders.DOUBLE();
  }
}

// Register the function to access it
spark.udf().register("myAverage", functions.udaf(new MyAverage(), Encoders.LONG()));

Dataset<Row> df = spark.read().format("json").load("examples/src/main/resources/employees.json");
df.createOrReplaceTempView("employees");
df.show();
// +-------+------+
// |   name|salary|
// +-------+------+
// |Michael|  3000|
// |   Andy|  4500|
// | Justin|  3500|
// |  Berta|  4000|
// +-------+------+

Dataset<Row> result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees");
result.show();
// +--------------+
// |average_salary|
// +--------------+
// |        3750.0|
// +--------------+

SQL

-- Compile and place UDAF MyAverage in a JAR file called `MyAverage.jar` in /tmp.
CREATE FUNCTION myAverage AS 'MyAverage' USING JAR '/tmp/MyAverage.jar';

SHOW USER FUNCTIONS;
+------------------+
|          function|
+------------------+
| default.myAverage|
+------------------+

CREATE TEMPORARY VIEW employees
USING org.apache.spark.sql.json
OPTIONS (
    path "examples/src/main/resources/employees.json"
);

SELECT * FROM employees;
+-------+------+
|   name|salary|
+-------+------+
|Michael|  3000|
|   Andy|  4500|
| Justin|  3500|
|  Berta|  4000|
+-------+------+

SELECT myAverage(salary) as average_salary FROM employees;
+--------------+
|average_salary|
+--------------+
|        3750.0|
+--------------+