Agregační funkce definované uživatelem (UDAF)

Platí pro:označené jako ano Databricks Runtime

Uživatelem definované agregační funkce (UDAF) jsou uživatelsky programovatelné rutiny, které fungují na více řádcích najednou a v důsledku toho vrací jednu agregovanou hodnotu. Tato dokumentace obsahuje seznam tříd, které jsou potřeba k vytvoření a registraci UDAF. Obsahuje také příklady, které ukazují, jak definovat a zaregistrovat UDAF v jazyce Scala a vyvolat je ve Spark SQL.


SyntaxAggregator[-IN, BUF, OUT]

Základní třída pro uživatelem definované agregace, které lze použít v operacích datové sady k převzetí všech prvků skupiny a jejich snížení na jednu hodnotu.

  • IN: Vstupní typ agregace.

  • BUF: Typ přechodné hodnoty redukce.

  • OUT: Typ konečného výsledku výstupu.

  • bufferEncoder: Encoder[BUF]

    Kodér pro typ zprostředkující hodnoty.

  • finish(redukce: BUF): OUT

    Transformujte výstup redukce.

  • merge(b1: BUF, b2: BUF): BUF

    Sloučí dvě přechodné hodnoty.

  • outputEncoder: Encoder[OUT]

    Kodér pro konečný typ výstupní hodnoty.

  • reduce(b: BUF, a: IN): BUF

    Agregovat vstupní hodnotu a do aktuální přechodné hodnoty. Pro výkon může funkce upravovat b a vracet místo vytváření nového objektu pro b.

  • nula: BUF

    Počáteční hodnota zprostředkujícího výsledku pro tuto agregaci.


Uživatelsky definované typově bezpečné agregační funkce

Uživatelem definované agregace pro datové sady s typem Aggregator jsou založeny na abstraktní třídě. Například typově bezpečný uživatelsky definovaný průměr může vypadat takto:


import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.apache.spark.sql.expressions.Aggregator

case class Employee(name: String, salary: Long)
case class Average(var sum: Long, var count: Long)

object MyAverage extends Aggregator[Employee, Average, Double] {
  // A zero value for this aggregation. Should satisfy the property that any b + zero = b
  def zero: Average = Average(0L, 0L)
  // Combine two values to produce a new value. For performance, the function may modify `buffer`
  // and return it instead of constructing a new object
  def reduce(buffer: Average, employee: Employee): Average = {
    buffer.sum += employee.salary
    buffer.count += 1
  // Merge two intermediate values
  def merge(b1: Average, b2: Average): Average = {
    b1.sum += b2.sum
    b1.count += b2.count
  // Transform the output of the reduction
  def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count
  // The Encoder for the intermediate value type
  val bufferEncoder: Encoder[Average] = Encoders.product
  // The Encoder for the final output value type
  val outputEncoder: Encoder[Double] = Encoders.scalaDouble



import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.TypedColumn;
import org.apache.spark.sql.expressions.Aggregator;

public static class Employee implements Serializable {
    private String name;
    private long salary;

    // Constructors, getters, setters...

public static class Average implements Serializable  {
  private long sum;
  private long count;

  // Constructors, getters, setters...

public static class MyAverage extends Aggregator<Employee, Average, Double> {
  // A zero value for this aggregation. Should satisfy the property that any b + zero = b
  public Average zero() {
    return new Average(0L, 0L);
  // Combine two values to produce a new value. For performance, the function may modify `buffer`
  // and return it instead of constructing a new object
  public Average reduce(Average buffer, Employee employee) {
    long newSum = buffer.getSum() + employee.getSalary();
    long newCount = buffer.getCount() + 1;
    return buffer;
  // Merge two intermediate values
  public Average merge(Average b1, Average b2) {
    long mergedSum = b1.getSum() + b2.getSum();
    long mergedCount = b1.getCount() + b2.getCount();
    return b1;
  // Transform the output of the reduction
  public Double finish(Average reduction) {
    return ((double) reduction.getSum()) / reduction.getCount();
  // The Encoder for the intermediate value type
  public Encoder<Average> bufferEncoder() {
    return Encoders.bean(Average.class);
  // The Encoder for the final output value type
  public Encoder<Double> outputEncoder() {
    return Encoders.DOUBLE();

Encoder<Employee> employeeEncoder = Encoders.bean(Employee.class);
String path = "examples/src/main/resources/employees.json";
Dataset<Employee> ds ="json").load(path).as(employeeEncoder);;
// +-------+------+
// |   name|salary|
// +-------+------+
// |Michael|  3000|
// |   Andy|  4500|
// | Justin|  3500|
// |  Berta|  4000|
// +-------+------+

MyAverage myAverage = new MyAverage();
// Convert the function to a `TypedColumn` and give it a name
TypedColumn<Employee, Double> averageSalary = myAverage.toColumn().name("average_salary");
Dataset<Double> result =;;
// +--------------+
// |average_salary|
// +--------------+
// |        3750.0|
// +--------------+

Netypové agregační funkce definované uživatelem

Typované agregace, jak je popsáno výše, mohou být také registrovány jako nezatypované agregace UDF pro použití s datovými rámci. Například uživatelsky definovaný průměr pro nezatypované datové rámce může vypadat takto:


import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.functions

case class Average(var sum: Long, var count: Long)

object MyAverage extends Aggregator[Long, Average, Double] {
  // A zero value for this aggregation. Should satisfy the property that any b + zero = b
  def zero: Average = Average(0L, 0L)
  // Combine two values to produce a new value. For performance, the function may modify `buffer`
  // and return it instead of constructing a new object
  def reduce(buffer: Average, data: Long): Average = {
    buffer.sum += data
    buffer.count += 1
  // Merge two intermediate values
  def merge(b1: Average, b2: Average): Average = {
    b1.sum += b2.sum
    b1.count += b2.count
  // Transform the output of the reduction
  def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count
  // The Encoder for the intermediate value type
  val bufferEncoder: Encoder[Average] = Encoders.product
  // The Encoder for the final output value type
  val outputEncoder: Encoder[Double] = Encoders.scalaDouble

// Register the function to access it
spark.udf.register("myAverage", functions.udaf(MyAverage))

val df ="json").load("examples/src/main/resources/employees.json")
// +-------+------+
// |   name|salary|
// +-------+------+
// |Michael|  3000|
// |   Andy|  4500|
// | Justin|  3500|
// |  Berta|  4000|
// +-------+------+

val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")
// +--------------+
// |average_salary|
// +--------------+
// |        3750.0|
// +--------------+



import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.expressions.Aggregator;
import org.apache.spark.sql.functions;

public static class Average implements Serializable  {
    private long sum;
    private long count;

    // Constructors, getters, setters...


public static class MyAverage extends Aggregator<Long, Average, Double> {
  // A zero value for this aggregation. Should satisfy the property that any b + zero = b
  public Average zero() {
    return new Average(0L, 0L);
  // Combine two values to produce a new value. For performance, the function may modify `buffer`
  // and return it instead of constructing a new object
  public Average reduce(Average buffer, Long data) {
    long newSum = buffer.getSum() + data;
    long newCount = buffer.getCount() + 1;
    return buffer;
  // Merge two intermediate values
  public Average merge(Average b1, Average b2) {
    long mergedSum = b1.getSum() + b2.getSum();
    long mergedCount = b1.getCount() + b2.getCount();
    return b1;
  // Transform the output of the reduction
  public Double finish(Average reduction) {
    return ((double) reduction.getSum()) / reduction.getCount();
  // The Encoder for the intermediate value type
  public Encoder<Average> bufferEncoder() {
    return Encoders.bean(Average.class);
  // The Encoder for the final output value type
  public Encoder<Double> outputEncoder() {
    return Encoders.DOUBLE();

// Register the function to access it
spark.udf().register("myAverage", functions.udaf(new MyAverage(), Encoders.LONG()));

Dataset<Row> df ="json").load("examples/src/main/resources/employees.json");
// +-------+------+
// |   name|salary|
// +-------+------+
// |Michael|  3000|
// |   Andy|  4500|
// | Justin|  3500|
// |  Berta|  4000|
// +-------+------+

Dataset<Row> result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees");;
// +--------------+
// |average_salary|
// +--------------+
// |        3750.0|
// +--------------+


-- Compile and place UDAF MyAverage in a JAR file called `MyAverage.jar` in /tmp.
CREATE FUNCTION myAverage AS 'MyAverage' USING JAR '/tmp/MyAverage.jar';

|          function|
| default.myAverage|

USING org.apache.spark.sql.json
    path "examples/src/main/resources/employees.json"

SELECT * FROM employees;
|   name|salary|
|Michael|  3000|
|   Andy|  4500|
| Justin|  3500|
|  Berta|  4000|

SELECT myAverage(salary) as average_salary FROM employees;
|        3750.0|