Funkcje skalarne zdefiniowane przez użytkownika — Scala
Ten artykuł zawiera przykłady funkcji zdefiniowanej przez użytkownika (UDF). Przedstawiono w nim sposób rejestrowania funkcji zdefiniowanych przez użytkownika, wywoływania funkcji zdefiniowanych przez użytkownika i zastrzeżeń dotyczących kolejności oceny podexpressionów w usłudze Spark SQL. Aby uzyskać więcej informacji, zobacz Zewnętrzne funkcje skalarne zdefiniowane przez użytkownika (UDF).
Uwaga
Funkcje zdefiniowane przez użytkownika języka Scala w zasobach obliczeniowych z obsługą wykazu aparatu Unity z trybem dostępu współdzielonego wymagają środowiska Databricks Runtime 14.2 lub nowszego.
Rejestrowanie funkcji jako funkcji zdefiniowanej przez użytkownika
val squared = (s: Long) => {
s * s
}
spark.udf.register("square", squared)
Wywoływanie funkcji zdefiniowanej przez użytkownika w usłudze Spark SQL
spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, square(id) as id_squared from test
Używanie funkcji UDF z ramkami danych
import org.apache.spark.sql.functions.{col, udf}
val squared = udf((s: Long) => s * s)
display(spark.range(1, 20).select(squared(col("id")) as "id_squared"))
Kolejność oceny i sprawdzanie wartości null
Usługa Spark SQL (w tym sql i interfejsy API ramek danych i zestawów danych) nie gwarantuje kolejności obliczania podexpressionów. W szczególności dane wejściowe operatora lub funkcji nie muszą być oceniane od lewej do prawej ani w innej stałej kolejności. Na przykład wyrażenia logiczne AND
i OR
nie mają semantyki od lewej do prawej "zwarcie".
W związku z tym niebezpieczne jest poleganie na skutkach ubocznych lub kolejności obliczania wyrażeń logicznych oraz kolejności WHERE
klauzul i HAVING
, ponieważ takie wyrażenia i klauzule można zmienić kolejność podczas optymalizacji zapytań i planowania. W szczególności jeśli funkcja UDF opiera się na semantyce zwarciowej w języku SQL w celu sprawdzania wartości null, nie ma gwarancji, że sprawdzanie wartości null nastąpi przed wywołaniem funkcji zdefiniowanej przez użytkownika. Na przykład:
spark.udf.register("strlen", (s: String) => s.length)
spark.sql("select s from test1 where s is not null and strlen(s) > 1") // no guarantee
Ta WHERE
klauzula nie gwarantuje wywołania funkcji zdefiniowanej strlen
przez użytkownika po odfiltrowaniu wartości null.
Aby wykonać odpowiednie sprawdzanie wartości null, zalecamy wykonanie jednej z następujących czynności:
- Upewnij się, że funkcja UDF jest świadoma wartości null i sprawdza wartość null wewnątrz samej funkcji zdefiniowanej przez użytkownika
- Używanie
IF
wyrażeń lubCASE WHEN
do sprawdzania wartości null i wywoływanie funkcji zdefiniowanej przez użytkownika w gałęzi warunkowej
spark.udf.register("strlen_nullsafe", (s: String) => if (s != null) s.length else -1)
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1") // ok
Typizowane interfejsy API zestawu danych
Uwaga
Ta funkcja jest obsługiwana w klastrach z obsługą wykazu aparatu Unity z trybem dostępu współdzielonego w środowisku Databricks Runtime 15.4 lub nowszym.
Typizowane interfejsy API zestawu danych umożliwiają uruchamianie przekształceń, takich jak mapowanie, filtrowanie i agregacje w wynikowych zestawach danych z funkcją zdefiniowaną przez użytkownika.
Na przykład następująca aplikacja Scala używa interfejsu map()
API do modyfikowania liczby w kolumnie wynikowej na prefiksowany ciąg.
spark.range(3).map(f => s"row-$f").show()
W tym przykładzie użyto interfejsu map()
API, ale dotyczy to również innych typowych interfejsów API zestawu danych, takich jak filter()
, , mapPartitions()
foreach()
, foreachPartition()
, reduce()
i flatMap()
.