Databricks Connect for Scala 中的使用者定義函式
注意
本文介紹適用於 Databricks Runtime 14.1 及以上版本的 Databricks Connect。
本文說明如何使用適用於 Scala 的 Databricks Connect 來執行使用者定義函式。 Databricks Connect 可讓您將熱門的 IDE、Notebook 伺服器和自定義應用程式連線到 Azure Databricks 叢集。 如需本文的 Python 版本,請參閱 Databricks Connect for Python中的
注意
開始使用 Databricks Connect 之前,您必須先 設定 Databricks Connect 用戶端。
針對 Databricks Runtime 14.1 和更新版本,Databricks Connect for Scala 支援執行使用者定義的函式 (UDF)。
若要執行 UDF,必須將 UDF 所需的已編譯類別和 JAR 上傳至叢集。
addCompiledArtifacts()
API 可用來指定必須上傳的已編譯類別和 JAR 檔案。
注意
用戶端所使用的 Scala 必須符合 Azure Databricks 叢集上的 Scala 版本。 若要檢查叢集的 Scala 版本,請參閱 Databricks Runtime 版本和相容性資訊中有關叢集 Databricks Runtime 版本的「系統環境」一節。
下列 Scala 程式會設定簡單的 UDF,以將數據行中的值平方。
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}
object Main {
def main(args: Array[String]): Unit = {
val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI
val spark = DatabricksSession.builder()
.addCompiledArtifacts(sourceLocation)
.getOrCreate()
def squared(x: Int): Int = x * x
val squared_udf = udf(squared _)
spark.range(3)
.withColumn("squared", squared_udf(col("id")))
.select("squared")
.show()
}
}
在上述範例中,由於 UDF 完全包含在 Main
中,因此只會新增 Main
的已編譯成品。
如果UDF散佈於其他類別,或使用外部連結庫(亦即JAR),則也應該包含所有這些連結庫。
當 Spark 工作階段已初始化時,可以使用 spark.addArtifact()
API 來上傳進一步編譯的類別和 JAR。
注意
上傳 JAR 時,必須包含所有可轉移的相依性 JAR 才能上傳。 API 不會執行任何可轉移相依性的自動偵測。
具類型的數據集 API
上一節中針對UDF所述的相同機制也適用於具類型的數據集API。
具類型的數據集 API 可讓其中一個 API 在產生的數據集上執行轉換,例如地圖、篩選和匯總。 與 UDF 類似,這些操作也會在 Databricks 叢集上執行。
下列 Scala 應用程式會使用 map()
API,將結果數據行中的數位修改為前置字串。
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}
object Main {
def main(args: Array[String]): Unit = {
val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI
val spark = DatabricksSession.builder()
.addCompiledArtifacts(sourceLocation)
.getOrCreate()
spark.range(3).map(f => s"row-$f").show()
}
}
雖然此範例會使用 map()
API,但這也適用於其他具類型的數據集 API,例如 filter()
、mapPartitions()
等。