Sdílet prostřednictvím


Uživatelem definované funkce v Databricks Connect pro Scala

Poznámka

Tento článek popisuje Databricks Connect pro Databricks Runtime 14.1 a novější.

Tento článek popisuje, jak spouštět uživatelem definované funkce pomocí Databricks Connect pro Scala. Databricks Connect umožňuje připojit integrovaná vývojová prostředí, servery poznámkových bloků a vlastní aplikace k clusterům Azure Databricks. Python verzi tohoto článku najdete v Uživatelsky definované funkce v Databricks Connect pro Python.

Poznámka

Než začnete používat Databricks Connect, musíte nastavit klienta Databricks Connect.

Pro Databricks Runtime 14.1 a výše Databricks Connect pro Scala podporuje spouštění uživatelsky definovaných funkcí (UDF).

Aby bylo možné spustit UDF, musí být do clusteru nahraná kompilovaná třída a jar, které vyžaduje UDF. Rozhraní API addCompiledArtifacts() lze použít k určení kompilované třídy a souborů JAR, které se musí nahrát.

Poznámka

Scala používaná klientem musí odpovídat verzi Scala v clusteru Azure Databricks. Pokud chcete zkontrolovat verzi jazyka Scala clusteru, podívejte se do části "Systémové prostředí" u verze Databricks Runtime pro váš cluster v poznámkách k verzím Databricks Runtime a k otázkám kompatibility v a.

Následující program v jazyce Scala nastaví jednoduchou uživatelskou definovanou funkci, která ve sloupci umocní hodnoty.

import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}

object Main {
  def main(args: Array[String]): Unit = {
    val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI

    val spark = DatabricksSession.builder()
      .addCompiledArtifacts(sourceLocation)
      .getOrCreate()

    def squared(x: Int): Int = x * x

    val squared_udf = udf(squared _)

    spark.range(3)
      .withColumn("squared", squared_udf(col("id")))
      .select("squared")
      .show()
  }
}

V předchozím příkladu, protože UDF je plně obsažen v Main, je přidán pouze zkompilovaný artefakt Main. Pokud se funkce definovaná uživatelem rozprostírá na jiné třídy nebo používá externí knihovny (tj. JARy), měly by být zahrnuty i všechny tyto knihovny.

Pokud je relace Sparku již inicializována, je možné pomocí rozhraní API spark.addArtifact() nahrát další kompilované třídy a jary.

Poznámka

Při nahrávání JAR souborů je nutné zahrnout všechny přidružené tranzitivní závislosti JARů. Rozhraní API neprovádějí automatickou detekci tranzitivních závislostí.

Typed Dataset APIs

Stejný mechanismus popsaný v předchozí části pro funkce definované uživatelem platí také pro typované rozhraní API pro datové sady.

Rozhraní typové Dataset API umožňují spouštět transformace, jako jsou mapování, filtrování a agregace, na výsledných datových sadách. Ty se také spouští podobně jako UDF v clusteru Databricks.

Následující aplikace Scala používá rozhraní API map() k úpravě čísla ve výsledkovém sloupci na řetězec s předponou.

import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}

object Main {
  def main(args: Array[String]): Unit = {
    val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI

    val spark = DatabricksSession.builder()
      .addCompiledArtifacts(sourceLocation)
      .getOrCreate()

    spark.range(3).map(f => s"row-$f").show()
  }
}

I když tento příklad používá rozhraní API map(), platí to také pro jiná typovaná rozhraní API datové sady, jako jsou filter(), mapPartitions()atd.