Funkcje tabeli zdefiniowane przez użytkownika w języku Python (UDF)
Ważne
Ta funkcja jest dostępna w publicznej wersji zapoznawczej w środowisku Databricks Runtime 14.3 LTS lub nowszym.
Funkcja tabeli zdefiniowana przez użytkownika (UDTF) umożliwia rejestrowanie funkcji, które zwracają tabele zamiast wartości skalarnych. W przeciwieństwie do funkcji skalarnych, które zwracają pojedynczą wartość wyniku z każdego wywołania, każdy element UDTF jest wywoływany w klauzuli instrukcji FROM
SQL i zwraca całą tabelę jako dane wyjściowe.
Każde wywołanie UDTF może akceptować zero lub więcej argumentów. Te argumenty mogą być wyrażeniami skalarnych lub argumentami tabeli reprezentującymi całe tabele wejściowe.
Podstawowa składnia udTF
Platforma Apache Spark implementuje funkcje UDF języka Python jako klasy języka Python z obowiązkową eval
metodą używaną yield
do emitowania wierszy wyjściowych.
Aby użyć klasy jako funkcji UDTF, należy zaimportować funkcję PySpark udtf
. Usługa Databricks zaleca używanie tej funkcji jako dekoratora i jawnego określania nazw pól i typów przy użyciu returnType
opcji (chyba że klasa definiuje metodę analyze
zgodnie z opisem w późniejszej sekcji).
Następujący program UDTF tworzy tabelę przy użyciu stałej listy dwóch argumentów liczb całkowitych:
from pyspark.sql.functions import lit, udtf
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, x: int, y: int):
yield x + y, x - y
GetSumDiff(lit(1), lit(2)).show()
+----+-----+
| sum| diff|
+----+-----+
| 3| -1|
+----+-----+
Rejestrowanie funkcji UDTF
Funkcje zdefiniowane przez użytkownika są rejestrowane w środowisku lokalnym SparkSession
i są izolowane na poziomie notesu lub zadania.
Nie można zarejestrować funkcji UDTFs jako obiektów w wykazie aparatu Unity, a funkcje UDF nie mogą być używane z magazynami SQL.
Możesz zarejestrować funkcję UDTF do bieżącej SparkSession
funkcji do użycia w zapytaniach SQL za pomocą funkcji spark.udtf.register()
. Podaj nazwę funkcji SQL i klasę UDTF języka Python.
spark.udtf.register("get_sum_diff", GetSumDiff)
Wywoływanie zarejestrowanego udTF
Po zarejestrowaniu można użyć funkcji UDTF w języku SQL przy użyciu %sql
polecenia magic lub spark.sql()
funkcji:
spark.udtf.register("get_sum_diff", GetSumDiff)
spark.sql("SELECT * FROM get_sum_diff(1,2);")
%sql
SELECT * FROM get_sum_diff(1,2);
Korzystanie ze strzałki Apache
Jeśli funkcja UDTF odbiera niewielką ilość danych jako dane wejściowe, ale generuje dużą tabelę, usługa Databricks zaleca użycie narzędzia Apache Arrow. Można ją włączyć, określając useArrow
parametr podczas deklarowania funkcji UDTF:
@udtf(returnType="c1: int, c2: int", useArrow=True)
Listy argumentów zmiennych — *args i **kwargs
Możesz użyć języka Python *args
lub **kwargs
składni i zaimplementować logikę do obsługi nieokreślonej liczby wartości wejściowych.
Poniższy przykład zwraca ten sam wynik podczas jawnego sprawdzania długości danych wejściowych i typów argumentów:
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, *args):
assert(len(args) == 2)
assert(isinstance(arg, int) for arg in args)
x = args[0]
y = args[1]
yield x + y, x - y
GetSumDiff(lit(1), lit(2)).show()
Oto ten sam przykład, ale przy użyciu argumentów słów kluczowych:
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, **kwargs):
x = kwargs["x"]
y = kwargs["y"]
yield x + y, x - y
GetSumDiff(x=lit(1), y=lit(2)).show()
Definiowanie schematu statycznego w czasie rejestracji
Funkcja UDTF zwraca wiersze ze schematem wyjściowym obejmującym uporządkowaną sekwencję nazw kolumn i typów. Jeśli schemat UDTF zawsze powinien pozostać taki sam dla wszystkich zapytań, można określić statyczny, stały schemat po dekoratorze @udtf
. Musi to być element :StructType
StructType().add("c1", StringType())
Lub ciąg DDL reprezentujący typ struktury:
c1: string
Obliczanie schematu dynamicznego w czasie wywołania funkcji
Funkcje zdefiniowane przez użytkownika mogą również programowo obliczać schemat danych wyjściowych dla każdego wywołania w zależności od wartości argumentów wejściowych. W tym celu zdefiniuj metodę statyczną o nazwie analyze
, która akceptuje zero lub więcej parametrów odpowiadających argumentom podanym do określonego wywołania UDTF.
Każdy argument analyze
metody jest wystąpieniem AnalyzeArgument
klasy, która zawiera następujące pola:
AnalyzeArgument pole klasy |
opis |
---|---|
dataType |
Typ argumentu wejściowego jako DataType . W przypadku argumentów tabeli wejściowej StructType jest to reprezentacja kolumn tabeli. |
value |
Wartość argumentu wejściowego jako Optional[Any] . None Dotyczy to argumentów tabeli lub argumentów skalarnych literału, które nie są stałe. |
isTable |
Czy argument wejściowy jest tabelą jako .BooleanType |
isConstantExpression |
Czy argument wejściowy jest wyrażeniem składanym stałym jako BooleanType . |
Metoda analyze
zwraca wystąpienie AnalyzeResult
klasy, które zawiera schemat tabeli wyników jako StructType
plus kilka pól opcjonalnych. Jeśli funkcja UDTF akceptuje argument tabeli wejściowej, AnalyzeResult
może również zawierać żądany sposób partycjonowania i porządkować wiersze tabeli wejściowej w kilku wywołaniach UDTF, zgodnie z opisem w dalszej części.
AnalyzeResult pole klasy |
opis |
---|---|
schema |
Schemat tabeli wyników jako StructType . |
withSinglePartition |
Czy wysyłać wszystkie wiersze wejściowe do tego samego wystąpienia klasy UDTF co BooleanType . |
partitionBy |
Jeśli ustawiono wartość niepustą, wszystkie wiersze z każdą unikatową kombinacją wartości wyrażeń partycjonowania są używane przez oddzielne wystąpienie klasy UDTF. |
orderBy |
Jeśli ustawiono wartość niepustą, określa kolejność wierszy w ramach każdej partycji. |
select |
Jeśli jest ustawiona wartość niepusta, jest to sekwencja wyrażeń, które funkcja UDTF określa dla katalizatora w celu obliczenia względem kolumn w wejściowym argumencie TABLE. Funkcja UDTF odbiera jeden atrybut wejściowy dla każdej nazwy na liście w kolejności, w której są wymienione. |
Ten analyze
przykład zwraca jedną kolumnę wyjściową dla każdego wyrazu w argumencie ciągu wejściowego.
@udtf
class MyUDTF:
@staticmethod
def analyze(text: AnalyzeArgument) -> AnalyzeResult:
schema = StructType()
for index, word in enumerate(sorted(list(set(text.value.split(" "))))):
schema = schema.add(f"word_{index}", IntegerType())
return AnalyzeResult(schema=schema)
def eval(self, text: str):
counts = {}
for word in text.split(" "):
if word not in counts:
counts[word] = 0
counts[word] += 1
result = []
for word in sorted(list(set(text.split(" ")))):
result.append(counts[word])
yield result
['word_0', 'word_1']
Przekazywanie stanu do przyszłych eval
wywołań
Metoda analyze
może służyć jako wygodne miejsce do przeprowadzenia inicjowania, a następnie przekazać wyniki do przyszłych eval
wywołań metody dla tego samego wywołania UDTF.
W tym celu utwórz podklasę i zwróć wystąpienie podklasy AnalyzeResult
podklasy analyze
z metody .
Następnie dodaj dodatkowy argument do __init__
metody , aby zaakceptować to wystąpienie.
Ten analyze
przykład zwraca stały schemat danych wyjściowych, ale dodaje informacje niestandardowe w metadanych wyników, które mają być używane przez przyszłe __init__
wywołania metod:
@dataclass
class AnalyzeResultWithBuffer(AnalyzeResult):
buffer: str = ""
@udtf
class TestUDTF:
def __init__(self, analyze_result=None):
self._total = 0
if analyze_result is not None:
self._buffer = analyze_result.buffer
else:
self._buffer = ""
@staticmethod
def analyze(argument, _) -> AnalyzeResult:
if (
argument.value is None
or argument.isTable
or not isinstance(argument.value, str)
or len(argument.value) == 0
):
raise Exception("The first argument must be a non-empty string")
assert argument.dataType == StringType()
assert not argument.isTable
return AnalyzeResultWithBuffer(
schema=StructType()
.add("total", IntegerType())
.add("buffer", StringType()),
withSinglePartition=True,
buffer=argument.value,
)
def eval(self, argument, row: Row):
self._total += 1
def terminate(self):
yield self._total, self._buffer
self.spark.udtf.register("test_udtf", TestUDTF)
spark.sql(
"""
WITH t AS (
SELECT id FROM range(1, 21)
)
SELECT total, buffer
FROM test_udtf("abc", TABLE(t))
"""
).show()
+-------+-------+
| count | buffer|
+-------+-------+
| 20 | "abc"|
+-------+-------+
Zwracanie wierszy wyjściowych
Metoda eval
jest uruchamiana raz dla każdego wiersza argumentu tabeli wejściowej (lub tylko raz, jeśli nie podano argumentu tabeli), po którym następuje jedno wywołanie terminate
metody na końcu. Metoda zwraca zero lub więcej wierszy, które są zgodne ze schematem wyników, generując krotki, listy lub pyspark.sql.Row
obiekty.
Ten przykład zwraca wiersz, podając krotkę trzech elementów:
def eval(self, x, y, z):
yield (x, y, z)
Można również pominąć nawiasy:
def eval(self, x, y, z):
yield x, y, z
Dodaj przecinek końcowy, aby zwrócić wiersz z tylko jedną kolumną:
def eval(self, x, y, z):
yield x,
Można również uzyskać pyspark.sql.Row
obiekt.
def eval(self, x, y, z)
from pyspark.sql.types import Row
yield Row(x, y, z)
W tym przykładzie terminate
zwracane są wiersze wyjściowe z metody przy użyciu listy języka Python. Stan wewnątrz klasy można przechowywać we wcześniejszych krokach w ocenie udTF w tym celu.
def terminate(self):
yield [self.x, self.y, self.z]
Przekazywanie argumentów skalarnych do funkcji UDTF
Argumenty skalarne można przekazać do formatu UDTF jako wyrażenia stałe składające się z wartości literałów lub funkcji na ich podstawie. Na przykład:
SELECT * FROM udtf(42, group => upper("finance_department"));
Przekazywanie argumentów tabeli do funkcji UDTF
Funkcje UDF języka Python mogą akceptować tabelę danych wejściowych jako argument oprócz argumentów wejściowych skalarnych. Pojedynczy protokół UDTF może również akceptować argument tabeli i wiele argumentów skalarnych.
Następnie dowolne zapytanie SQL może podać tabelę wejściową przy użyciu słowa kluczowego TABLE
, po którym następują nawiasy otaczające odpowiedni identyfikator tabeli, na przykład TABLE(t)
. Alternatywnie możesz przekazać podzapytywanie tabeli, na przykład TABLE(SELECT a, b, c FROM t)
lub TABLE(SELECT t1.a, t2.b FROM t1 INNER JOIN t2 USING (key))
.
Argument tabeli wejściowej jest następnie reprezentowany jako pyspark.sql.Row
argument eval
metody z jednym wywołaniem eval
metody dla każdego wiersza w tabeli wejściowej. Możesz użyć standardowych adnotacji pól kolumn PySpark do interakcji z kolumnami w każdym wierszu. W poniższym przykładzie pokazano jawne zaimportowanie typu PySpark Row
, a następnie filtrowanie przekazanej tabeli w id
polu:
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row
@udtf(returnType="id: int")
class FilterUDTF:
def eval(self, row: Row):
if row["id"] > 5:
yield row["id"],
spark.udtf.register("filter_udtf", FilterUDTF)
Aby wysłać zapytanie do funkcji, użyj słowa kluczowego TABLE
SQL:
SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)));
+---+
| id|
+---+
| 6|
| 7|
| 8|
| 9|
+---+
Określanie partycjonowania wierszy wejściowych z wywołań funkcji
Podczas wywoływania funkcji UDTF z argumentem tabeli dowolne zapytanie SQL może podzielić tabelę wejściową na kilka wywołań UDTF na podstawie wartości co najmniej jednej kolumny tabeli wejściowej.
Aby określić partycję, użyj PARTITION BY
klauzuli w wywołaniu funkcji po argumencie TABLE
.
Gwarantuje to, że wszystkie wiersze wejściowe z każdą unikatową kombinacją wartości kolumn partycjonowania będą używane przez dokładnie jedno wystąpienie klasy UDTF.
Należy pamiętać, że oprócz prostych odwołań do kolumn klauzula PARTITION BY
akceptuje również dowolne wyrażenia na podstawie kolumn tabeli wejściowej. Można na przykład określić LENGTH
ciąg, wyodrębnić miesiąc z daty lub połączyć dwie wartości.
Można również określić WITH SINGLE PARTITION
zamiast PARTITION BY
żądać tylko jednej partycji, w której wszystkie wiersze wejściowe muszą być używane przez dokładnie jedno wystąpienie klasy UDTF.
W ramach każdej partycji można opcjonalnie określić wymaganą kolejność wierszy wejściowych, ponieważ metoda UDTF eval
je używa. W tym celu podaj klauzulę ORDER BY
po klauzuli PARTITION BY
lub WITH SINGLE PARTITION
opisanej powyżej.
Rozważmy na przykład następujące funkcje UDTF:
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row
@udtf(returnType="a: string, b: int")
class FilterUDTF:
def __init__(self):
self.key = ""
self.max = 0
def eval(self, row: Row):
self.key = row["a"]
self.max = max(self.max, row["b"])
def terminate(self):
yield self.key, self.max
spark.udtf.register("filter_udtf", FilterUDTF)
Opcje partycjonowania można określić podczas wywoływania funkcji UDTF za pośrednictwem tabeli wejściowej na różne sposoby:
-- Create an input table with some example values.
DROP TABLE IF EXISTS values_table;
CREATE TABLE values_table (a STRING, b INT);
INSERT INTO values_table VALUES ('abc', 2), ('abc', 4), ('def', 6), ('def', 8)";
SELECT * FROM values_table;
+-------+----+
| a | b |
+-------+----+
| "abc" | 2 |
| "abc" | 4 |
| "def" | 6 |
| "def" | 8 |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique value in the `a` column are processed by the same
-- instance of the UDTF class. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY a ORDER BY b) ORDER BY 1;
+-------+----+
| a | b |
+-------+----+
| "abc" | 4 |
| "def" | 8 |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique result of evaluating the "LENGTH(a)" expression are
-- processed by the same instance of the UDTF class. Within each partition, the rows are ordered
-- by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY LENGTH(a) ORDER BY b) ORDER BY 1;
+-------+---+
| a | b |
+-------+---+
| "def" | 8 |
+-------+---+
-- Query the UDTF with the input table as an argument and a directive to consider all the input
-- rows in one single partition such that exactly one instance of the UDTF class consumes all of
-- the input rows. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) WITH SINGLE PARTITION ORDER BY b) ORDER BY 1;
+-------+----+
| a | b |
+-------+----+
| "def" | 8 |
+-------+----+
Określanie partycjonowania wierszy wejściowych z analyze
metody
Należy pamiętać, że dla każdego z powyższych sposobów partycjonowania tabeli wejściowej podczas wywoływania funkcji UDF w zapytaniach SQL istnieje odpowiedni sposób, aby metoda UDTF analyze
automatycznie określała tę samą metodę partycjonowania.
- Zamiast wywoływać funkcję UDTF jako
SELECT * FROM udtf(TABLE(t) PARTITION BY a)
, możesz zaktualizowaćanalyze
metodę , aby ustawić polepartitionBy=[PartitioningColumn("a")]
i po prostu wywołać funkcję przy użyciu poleceniaSELECT * FROM udtf(TABLE(t))
. - Za pomocą tego samego tokenu, zamiast określać
TABLE(t) WITH SINGLE PARTITION ORDER BY b
w zapytaniu SQL, można ustawićanalyze
polawithSinglePartition=true
, aorderBy=[OrderingColumn("b")]
następnie po prostu przekazaćTABLE(t)
polecenie . - Zamiast przekazywać
TABLE(SELECT a FROM t)
zapytanie SQL, możesz ustawićanalyze
,select=[SelectedColumn("a")]
a następnie przekazać polecenieTABLE(t)
.
W poniższym przykładzie analyze
zwraca stały schemat danych wyjściowych, wybiera podzbiór kolumn z tabeli wejściowej i określa, że tabela wejściowa jest partycjonowana na kilka wywołań UDTF na podstawie wartości date
kolumny:
@staticmethod
def analyze(*args) -> AnalyzeResult:
"""
The input table will be partitioned across several UDTF calls based on the monthly
values of each `date` column. The rows within each partition will arrive ordered by the `date`
column. The UDTF will only receive the `date` and `word` columns from the input table.
"""
from pyspark.sql.functions import (
AnalyzeResult,
OrderingColumn,
PartitioningColumn,
)
assert len(args) == 1, "This function accepts one argument only"
assert args[0].isTable, "Only table arguments are supported"
return AnalyzeResult(
schema=StructType()
.add("month", DateType())
.add('longest_word", IntegerType()),
partitionBy=[
PartitioningColumn("extract(month from date)")],
orderBy=[
OrderingColumn("date")],
select=[
SelectedColumn("date"),
SelectedColumn(
name="length(word),
alias="length_word")])