Поделиться через


Определяемые пользователем функции в Databricks Connect для Scala

Заметка

В этой статье рассматривается Databricks Connect, начиная с версии Databricks Runtime 14.1.

В этой статье описывается, как выполнять определяемые пользователем функции с помощью Databricks Connect для Scala. Databricks Connect позволяет подключать популярные IDE, серверы блокнотов и пользовательские приложения к кластерам Azure Databricks. Сведения о версии Python этой статьи см. в определяемых пользователем функций в Databricks Connect для Python.

Заметка

Прежде чем начать использовать Databricks Connect, необходимо настроить клиент Databricks Connect.

Для Databricks Runtime 14.1 и более поздних версий Databricks Connect для Scala поддерживает выполнение пользовательских функций.

Чтобы запустить UDF, скомпилированный класс и JAR, необходимые UDF, должны быть отправлены в кластер. API addCompiledArtifacts() можно использовать для указания скомпилированных классов и JAR-файлов, которые должны быть отправлены.

Заметка

Скала, используемая клиентом, должна соответствовать версии Scala в кластере Azure Databricks. Чтобы проверить версию Scala кластера, см. раздел "Системная среда" для версии среды выполнения Databricks для кластера в версиях заметок о выпуске Databricks Runtime исовместимости.

Следующая программа Scala настраивает простую UDF, которая квадратирует значения в столбце.

import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}

object Main {
  def main(args: Array[String]): Unit = {
    val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI

    val spark = DatabricksSession.builder()
      .addCompiledArtifacts(sourceLocation)
      .getOrCreate()

    def squared(x: Int): Int = x * x

    val squared_udf = udf(squared _)

    spark.range(3)
      .withColumn("squared", squared_udf(col("id")))
      .select("squared")
      .show()
  }
}

В предыдущем примере, так как UDF полностью содержится в Main, добавляется только скомпилированный артефакт Main. Если UDF распространяется по другим классам или использует внешние библиотеки (т. е. JAR), все эти библиотеки также должны быть включены.

Когда сеанс Spark уже инициализирован, можно отправить дополнительные скомпилированные классы и JAR с помощью API spark.addArtifact().

Заметка

При загрузке JAR все транзитивные зависимости должны быть приложены к загрузке. API не выполняют автоматическое обнаружение транзитивных зависимостей.

API типизированного набора данных

Тот же механизм, описанный в предыдущем разделе для пользовательских определяемых функций, также применяется к API типизированных наборов данных.

API типизированного набора данных позволяет выполнять преобразования, такие как сопоставление, фильтрация и агрегации над результирующим набором данных. Они также выполняются аналогично пользовательским функциям (UDFs) в кластере Databricks.

Следующее приложение Scala использует API map() для изменения числа в столбце результатов в префиксированную строку.

import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}

object Main {
  def main(args: Array[String]): Unit = {
    val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI

    val spark = DatabricksSession.builder()
      .addCompiledArtifacts(sourceLocation)
      .getOrCreate()

    spark.range(3).map(f => s"row-$f").show()
  }
}

Хотя в этом примере используется API map(), это также относится к другим типизированным API набора данных, таким как filter(), mapPartitions()и т. д.