Compartir vía


Funciones definidas por el usuario en Databricks Connect para Scala

Nota:

En este artículo se habla de Databricks Connect para Databricks Runtime 14.1 y versiones posteriores.

En este artículo se describe cómo ejecutar funciones definidas por el usuario con Databricks Connect para Scala. Databricks Connect le permite conectar los clústeres de Azure Databricks a entornos de desarrollo integrado populares, servidores de cuadernos y otras aplicaciones personalizadas. Para obtener la versión de Python de este artículo, consulte Funciones definidas por el usuario en Databricks Connect para Python.

Nota:

Antes de empezar a usar Databricks Connect, es necesario configurar el cliente de Databricks Connect.

Para Databricks Runtime 14.1 y versiones posteriores, Databricks Connect para Scala admite la ejecución de funciones definidas por el usuario (UDF).

Para ejecutar una UDF, la clase compilada y los JAR que requiere la UDF deben cargarse en el clúster. La API de addCompiledArtifacts() se puede usar para especificar la clase compilada y los archivos JAR que se deben cargar.

Nota:

La versión de Scala usada por el cliente debe coincidir con la versión de Scala del clúster de Azure Databricks. Para comprobar la versión de Scala del clúster, consulte la sección "Entorno del sistema" para conocer la versión de Databricks Runtime del clúster en Versiones y compatibilidad de las notas de la versión de Databricks Runtime.

El siguiente programa de Scala configura una UDF simple que cuadra los valores de una columna.

import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}

object Main {
  def main(args: Array[String]): Unit = {
    val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI

    val spark = DatabricksSession.builder()
      .addCompiledArtifacts(sourceLocation)
      .getOrCreate()

    def squared(x: Int): Int = x * x

    val squared_udf = udf(squared _)

    spark.range(3)
      .withColumn("squared", squared_udf(col("id")))
      .select("squared")
      .show()
  }
}

En el ejemplo anterior, dado que la UDF está totalmente contenida en Main, solo se agrega el artefacto compilado de Main. Si la UDF se distribuye en otras clases o usa bibliotecas externas (es decir, archivos JAR), también se deben incluir todas estas bibliotecas.

Cuando la sesión de Spark ya está inicializada, se pueden cargar más clases compiladas y JAR mediante la API de spark.addArtifact().

Nota:

Al cargar archivos JAR, se deben incluir todos los archivos JAR de dependencia transitiva para la carga. Las API no realizan ninguna detección automática de dependencias transitivas.

API de conjunto de datos con tipo

El mismo mecanismo descrito en la sección anterior para las UDF también se aplica a las API de conjunto de datos con tipo.

Las API de conjunto de datos con tipo permiten ejecutar transformaciones como asignaciones, filtros y agregaciones en los conjuntos de datos resultantes. También se ejecutan de forma similar a las UDF en el clúster de Databricks.

La siguiente aplicación de Scala usa la API de map() para modificar un número de una columna de resultados en una cadena con prefijo.

import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}

object Main {
  def main(args: Array[String]): Unit = {
    val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI

    val spark = DatabricksSession.builder()
      .addCompiledArtifacts(sourceLocation)
      .getOrCreate()

    spark.range(3).map(f => s"row-$f").show()
  }
}

Aunque en este ejemplo se usa la API de map(), esto también se aplica a otras API de conjunto de datos con tipo, como filter(), mapPartitions(), etc.