Определяемые пользователем функции в Databricks Connect для Scala
Заметка
В этой статье рассматривается Databricks Connect, начиная с версии Databricks Runtime 14.1.
В этой статье описывается, как выполнять определяемые пользователем функции с помощью Databricks Connect для Scala. Databricks Connect позволяет подключать популярные IDE, серверы блокнотов и пользовательские приложения к кластерам Azure Databricks. Сведения о версии Python этой статьи см. в определяемых пользователем функций в Databricks Connect для Python.
Заметка
Прежде чем начать использовать Databricks Connect, необходимо настроить клиент Databricks Connect.
Для Databricks Runtime 14.1 и более поздних версий Databricks Connect для Scala поддерживает выполнение пользовательских функций.
Чтобы запустить UDF, скомпилированный класс и JAR, необходимые UDF, должны быть отправлены в кластер.
API addCompiledArtifacts()
можно использовать для указания скомпилированных классов и JAR-файлов, которые должны быть отправлены.
Заметка
Скала, используемая клиентом, должна соответствовать версии Scala в кластере Azure Databricks. Чтобы проверить версию Scala кластера, см. раздел "Системная среда" для версии среды выполнения Databricks для кластера в версиях заметок о выпуске Databricks Runtime исовместимости.
Следующая программа Scala настраивает простую UDF, которая квадратирует значения в столбце.
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}
object Main {
def main(args: Array[String]): Unit = {
val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI
val spark = DatabricksSession.builder()
.addCompiledArtifacts(sourceLocation)
.getOrCreate()
def squared(x: Int): Int = x * x
val squared_udf = udf(squared _)
spark.range(3)
.withColumn("squared", squared_udf(col("id")))
.select("squared")
.show()
}
}
В предыдущем примере, так как UDF полностью содержится в Main
, добавляется только скомпилированный артефакт Main
.
Если UDF распространяется по другим классам или использует внешние библиотеки (т. е. JAR), все эти библиотеки также должны быть включены.
Когда сеанс Spark уже инициализирован, можно отправить дополнительные скомпилированные классы и JAR с помощью API spark.addArtifact()
.
Заметка
При загрузке JAR все транзитивные зависимости должны быть приложены к загрузке. API не выполняют автоматическое обнаружение транзитивных зависимостей.
API типизированного набора данных
Тот же механизм, описанный в предыдущем разделе для пользовательских определяемых функций, также применяется к API типизированных наборов данных.
API типизированного набора данных позволяет выполнять преобразования, такие как сопоставление, фильтрация и агрегации над результирующим набором данных. Они также выполняются аналогично пользовательским функциям (UDFs) в кластере Databricks.
Следующее приложение Scala использует API map()
для изменения числа в столбце результатов в префиксированную строку.
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}
object Main {
def main(args: Array[String]): Unit = {
val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI
val spark = DatabricksSession.builder()
.addCompiledArtifacts(sourceLocation)
.getOrCreate()
spark.range(3).map(f => s"row-$f").show()
}
}
Хотя в этом примере используется API map()
, это также относится к другим типизированным API набора данных, таким как filter()
, mapPartitions()
и т. д.