Uživatelem definované funkce v Databricks Connect pro Scala
Poznámka
Tento článek popisuje Databricks Connect pro Databricks Runtime 14.1 a novější.
Tento článek popisuje, jak spouštět uživatelem definované funkce pomocí Databricks Connect pro Scala. Databricks Connect umožňuje připojit integrovaná vývojová prostředí, servery poznámkových bloků a vlastní aplikace k clusterům Azure Databricks. Python verzi tohoto článku najdete v Uživatelsky definované funkce v Databricks Connect pro Python.
Poznámka
Než začnete používat Databricks Connect, musíte nastavit klienta Databricks Connect.
Pro Databricks Runtime 14.1 a výše Databricks Connect pro Scala podporuje spouštění uživatelsky definovaných funkcí (UDF).
Aby bylo možné spustit UDF, musí být do clusteru nahraná kompilovaná třída a jar, které vyžaduje UDF.
Rozhraní API addCompiledArtifacts()
lze použít k určení kompilované třídy a souborů JAR, které se musí nahrát.
Poznámka
Scala používaná klientem musí odpovídat verzi Scala v clusteru Azure Databricks. Pokud chcete zkontrolovat verzi jazyka Scala clusteru, podívejte se do části "Systémové prostředí" u verze Databricks Runtime pro váš cluster v poznámkách k verzím Databricks Runtime a k otázkám kompatibility v a.
Následující program v jazyce Scala nastaví jednoduchou uživatelskou definovanou funkci, která ve sloupci umocní hodnoty.
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}
object Main {
def main(args: Array[String]): Unit = {
val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI
val spark = DatabricksSession.builder()
.addCompiledArtifacts(sourceLocation)
.getOrCreate()
def squared(x: Int): Int = x * x
val squared_udf = udf(squared _)
spark.range(3)
.withColumn("squared", squared_udf(col("id")))
.select("squared")
.show()
}
}
V předchozím příkladu, protože UDF je plně obsažen v Main
, je přidán pouze zkompilovaný artefakt Main
.
Pokud se funkce definovaná uživatelem rozprostírá na jiné třídy nebo používá externí knihovny (tj. JARy), měly by být zahrnuty i všechny tyto knihovny.
Pokud je relace Sparku již inicializována, je možné pomocí rozhraní API spark.addArtifact()
nahrát další kompilované třídy a jary.
Poznámka
Při nahrávání JAR souborů je nutné zahrnout všechny přidružené tranzitivní závislosti JARů. Rozhraní API neprovádějí automatickou detekci tranzitivních závislostí.
Typed Dataset APIs
Stejný mechanismus popsaný v předchozí části pro funkce definované uživatelem platí také pro typované rozhraní API pro datové sady.
Rozhraní typové Dataset API umožňují spouštět transformace, jako jsou mapování, filtrování a agregace, na výsledných datových sadách. Ty se také spouští podobně jako UDF v clusteru Databricks.
Následující aplikace Scala používá rozhraní API map()
k úpravě čísla ve výsledkovém sloupci na řetězec s předponou.
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}
object Main {
def main(args: Array[String]): Unit = {
val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI
val spark = DatabricksSession.builder()
.addCompiledArtifacts(sourceLocation)
.getOrCreate()
spark.range(3).map(f => s"row-$f").show()
}
}
I když tento příklad používá rozhraní API map()
, platí to také pro jiná typovaná rozhraní API datové sady, jako jsou filter()
, mapPartitions()
atd.