Udostępnij za pośrednictwem


Funkcje zdefiniowane przez użytkownika w programie Databricks Connect dla języka Scala

Nota

W tym artykule opisano program Databricks Connect dla środowiska Databricks Runtime 14.1 lub nowszego.

W tym artykule opisano sposób wykonywania funkcji zdefiniowanych przez użytkownika za pomocą programu Databricks Connect dla języka Scala. Usługa Databricks Connect umożliwia łączenie popularnych środowisk IDE, serwerów notesów i aplikacji niestandardowych z klastrami usługi Azure Databricks. Aby uzyskać wersję tego artykułu w języku Python, zobacz funkcje zdefiniowane przez użytkownika w programie Databricks Connect dla języka Python.

Nota

Przed rozpoczęciem korzystania z usługi Databricks Connect należy skonfigurować klienta usługi Databricks Connect.

W przypadku środowiska Databricks Runtime 14.1 lub nowszego program Databricks Connect dla języka Scala obsługuje uruchamianie funkcji zdefiniowanych przez użytkownika (UDF).

Aby uruchomić funkcję zdefiniowaną przez użytkownika (UDF), skompilowana klasa i wymagane biblioteki JAR muszą zostać przekazane do klastra. Interfejs API addCompiledArtifacts() może służyć do określania skompilowanej klasy i plików JAR, które muszą zostać przekazane.

Nota

Język Scala używany przez klienta musi być zgodny z wersją języka Scala w klastrze usługi Azure Databricks. Aby sprawdzić wersję środowiska Scala klastra, zobacz sekcję "Środowisko systemowe" dla wersji środowiska Databricks Runtime klastra w wersji środowiska Databricks Runtime i zgodności.

Poniższy program Scala konfiguruje prostą funkcję użytkownika (UDF), która kwadratuje wartości w kolumnie.

import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}

object Main {
  def main(args: Array[String]): Unit = {
    val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI

    val spark = DatabricksSession.builder()
      .addCompiledArtifacts(sourceLocation)
      .getOrCreate()

    def squared(x: Int): Int = x * x

    val squared_udf = udf(squared _)

    spark.range(3)
      .withColumn("squared", squared_udf(col("id")))
      .select("squared")
      .show()
  }
}

W poprzednim przykładzie, ponieważ funkcja UDF jest w pełni zawarta w Main, dodawany jest tylko skompilowany artefakt Main. Jeśli funkcja UDF rozprzestrzenia się na inne klasy lub używa bibliotek zewnętrznych (tj. jednostek JAR), należy również uwzględnić wszystkie te biblioteki.

Po zainicjowaniu sesji Spark można przesyłać kolejne skompilowane klasy i pliki JAR przy użyciu interfejsu API spark.addArtifact().

Nota

Podczas przesyłania plików JAR wszystkie przechodnie zależności JAR muszą być dołączone do załadowania. Interfejsy API nie wykonują żadnego automatycznego wykrywania zależności przechodnich.

Typizowane interfejsy API zestawu danych

Ten sam mechanizm opisany w poprzedniej sekcji dla funkcji zdefiniowanych przez użytkownika dotyczy również typowanych interfejsów API zestawów danych.

Typizowane interfejsy API zestawu danych umożliwiają uruchamianie przekształceń, takich jak mapowanie, filtrowanie i agregacje w wynikowych zestawach danych. Są one również wykonywane podobnie jak funkcje UDF w klastrze usługi Databricks.

Poniższa aplikacja Scala używa interfejsu API map() do modyfikowania liczby w kolumnie wynikowej na prefiksowany ciąg.

import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}

object Main {
  def main(args: Array[String]): Unit = {
    val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI

    val spark = DatabricksSession.builder()
      .addCompiledArtifacts(sourceLocation)
      .getOrCreate()

    spark.range(3).map(f => s"row-$f").show()
  }
}

W tym przykładzie użyto interfejsu API map(), ale dotyczy to również innych typowych interfejsów API zestawu danych, takich jak filter(), mapPartitions()itp.