Condividi tramite


Funzioni definite dall'utente in Databricks Connect per Scala

Nota

Questo articolo illustra Databricks Connect per Databricks Runtime 14.1 e versioni successive.

Questo articolo descrive come eseguire funzioni definite dall'utente con Databricks Connect per Scala. Databricks Connect consente di connettere gli IDE, i server notebook e le applicazioni personalizzate più diffusi ai cluster Azure Databricks. Per la versione Python di questo articolo, vedere Funzioni definite dall'utente in Databricks Connect per Python.

Nota

Prima di iniziare a usare Databricks Connect, è necessario configurare il client Databricks Connect.

Per Databricks Runtime 14.1 e versioni successive, Databricks Connect per Scala supporta l'esecuzione di funzioni definite dall'utente .

Per eseguire una UDF, è necessario caricare nel cluster la classe compilata e i file JAR richiesti dalla UDF. L'API addCompiledArtifacts() può essere usata per specificare la classe compilata e i file JAR da caricare.

Nota

La versione di Scala utilizzata dal client deve corrispondere alla versione di Scala nel cluster Azure Databricks. Per controllare la versione scala del cluster, vedere la sezione "Ambiente di sistema" per la versione di Databricks Runtime del cluster in versioni delle note sulla versione di Databricks Runtime e sulla compatibilità.

Il programma Scala seguente configura una semplice UDF che quadra i valori in una colonna.

import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}

object Main {
  def main(args: Array[String]): Unit = {
    val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI

    val spark = DatabricksSession.builder()
      .addCompiledArtifacts(sourceLocation)
      .getOrCreate()

    def squared(x: Int): Int = x * x

    val squared_udf = udf(squared _)

    spark.range(3)
      .withColumn("squared", squared_udf(col("id")))
      .select("squared")
      .show()
  }
}

Nell'esempio precedente, poiché la FDU è completamente contenuta all'interno di Main, viene aggiunto solo l'artefatto compilato di Main. Se la funzione definita dall'utente (UDF) si estende ad altre classi o utilizza librerie esterne (ad esempio JAR), è necessario includere anche tutte queste librerie.

Quando la sessione Spark è già inizializzata, è possibile caricare altre classi compilate e jar usando l'API spark.addArtifact().

Nota

Quando si caricano file JAR, per il caricamento devono essere inclusi tutti i JAR di dipendenza transitiva. Le API non eseguono alcun rilevamento automatico delle dipendenze transitive.

API del set di dati tipizzato

Lo stesso meccanismo descritto nella sezione precedente per le funzioni definite dall'utente si applica anche alle API Dataset tipizzate.

Le API dei set di dati tipizzati consentono di eseguire trasformazioni come map, filtro e aggregazioni sui set di dati risultanti. Anche questi vengono eseguiti in modo simile alle funzioni definite dall'utente nel cluster Databricks.

L'applicazione Scala seguente usa l'API map() per modificare un numero in una colonna di risultato in una stringa con prefisso.

import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}

object Main {
  def main(args: Array[String]): Unit = {
    val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI

    val spark = DatabricksSession.builder()
      .addCompiledArtifacts(sourceLocation)
      .getOrCreate()

    spark.range(3).map(f => s"row-$f").show()
  }
}

Anche se in questo esempio viene usata l'API map(), questo vale anche per altre API del set di dati tipizzato, ad esempio filter(), mapPartitions()e così via.