Python 使用者定義資料表函式 (UDDF)
使用者定義的數據表函式 (UDTF) 可讓您註冊傳回數據表而非純量值的函式。 不同於從每個呼叫傳回單一結果值的純量函式,每個 UDTF 都會在 SQL 語句的 FROM
子句中叫用,並以輸出傳回整個數據表。
每個 UDTF 呼叫都可以接受零個或多個自變數。 這些自變數可以是代表整個輸入數據表的純量表達式或數據表自變數。
基本 UDTF 語法
Apache Spark 將 Python UDTF 實作為具有強制性 eval
方法的 Python 類別,該方法使用 yield
來發出輸出數據列。
若要使用類別作為 UDTF,您必須匯入 PySpark udtf
函式。 Databricks 建議使用這個函式作為裝飾器,並使用 returnType
選項明確指定欄位名稱和類型(除非類別定義了一個 analyze
方法,如稍後章節所述)。
下列 UDTF 會使用兩個整數自變數的固定清單來建立資料表:
from pyspark.sql.functions import lit, udtf
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, x: int, y: int):
yield x + y, x - y
GetSumDiff(lit(1), lit(2)).show()
+----+-----+
| sum| diff|
+----+-----+
| 3| -1|
+----+-----+
註冊 UDTF
UDTF 會註冊到本地 SparkSession
,並在筆記本或作業層級隔離。
您無法將 UDDF 註冊為 Unity 目錄中的物件,而 UDDF 不能與 SQL 倉儲搭配使用。
您可以將 UDTF 註冊至目前的 SparkSession
,以便搭配 函式 spark.udtf.register()
在 SQL 查詢中使用。 提供 SQL 函式和 Python UDTF 類別的名稱。
spark.udtf.register("get_sum_diff", GetSumDiff)
呼叫已註冊的 UDTF
註冊之後,您可以使用 %sql
magic 命令或 spark.sql()
函式,在 SQL 中使用 UDTF:
spark.udtf.register("get_sum_diff", GetSumDiff)
spark.sql("SELECT * FROM get_sum_diff(1,2);")
%sql
SELECT * FROM get_sum_diff(1,2);
使用 Apache Arrow
如果您的 UDTF 收到少量的數據做為輸入,但會輸出大型數據表,Databricks 建議使用 Apache Arrow。 您可以在宣告 UDTF 時指定 useArrow
參數來啟用它:
@udtf(returnType="c1: int, c2: int", useArrow=True)
可變參數清單 - *args 和 **kwargs
您可以使用 Python *args
或 **kwargs
語法,並實作邏輯來處理未指定的輸入值數目。
下列範例會傳回相同的結果,同時明確檢查自變數的輸入長度和類型:
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, *args):
assert(len(args) == 2)
assert(isinstance(arg, int) for arg in args)
x = args[0]
y = args[1]
yield x + y, x - y
GetSumDiff(lit(1), lit(2)).show()
以下是相同的範例,但使用關鍵詞自變數:
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, **kwargs):
x = kwargs["x"]
y = kwargs["y"]
yield x + y, x - y
GetSumDiff(x=lit(1), y=lit(2)).show()
在註冊時定義靜態架構
UDTF 會傳回包含數據行名稱和型別排序序列之輸出架構的數據列。 如果所有查詢的 UDTF 架構一律保持不變,您可以在 @udtf
裝飾項目之後指定靜態、固定的架構。 它必須是 StructType
:
StructType().add("c1", StringType())
或代表結構類型的 DDL 字串:
c1: string
在函數調用時間計算動態架構
UDDF 也可以根據輸入自變數的值,以程式設計方式計算每個呼叫的輸出架構。 若要這樣做,請定義一個稱為 analyze
的靜態方法,以接受與特定 UDTF 呼叫的自變數相對應的零個或多個參數。
analyze
方法的每個自變數都是 AnalyzeArgument
類別的實例,其中包含下列欄位:
AnalyzeArgument 類別欄位 |
描述 |
---|---|
dataType |
輸入參數的類型作為 DataType 。 對於輸入數據表自變數,這是代表數據表數據行的 StructType 。 |
value |
輸入參數的值作為 Optional[Any] 。 對於非常數的表格參數或文字純量參數而言,這是 None 。 |
isTable |
輸入參數是否為一個表格作為 BooleanType 。 |
isConstantExpression |
輸入自變數是否為常數折疊表示式,做為 BooleanType 。 |
analyze
方法會傳回 AnalyzeResult
類別的實例,其中包含結果數據表的架構做為 StructType
加上一些選擇性字段。 如果 UDTF 接受輸入表格參數,則 AnalyzeResult
也可以包含一種要求的方法,以分割和排序輸入表格的行,如稍後所述。
AnalyzeResult 類別欄位 |
描述 |
---|---|
schema |
結果資料表的架構為 StructType 。 |
withSinglePartition |
是否要將所有輸入數據列傳送至與 BooleanType 相同的 UDTF 類別實例。 |
partitionBy |
如果設定為非空白,分割表達式的每個唯一值組合的所有數據列都會由UDTF類別的個別實例取用。 |
orderBy |
如果設定為非空白,此設定會指定每個分區內資料列的排序。 |
select |
如果設定為非空,這是一個運算式序列,由UDTF指定,並由Catalyst對輸入 TABLE 引數中的數據列進行評估。 UDTF 會依列出的順序,接收清單中每個名稱的一個輸入屬性。 |
本 analyze
範例會針對輸入字串自變數中的每個單字傳回一個輸出數據行。
@udtf
class MyUDTF:
@staticmethod
def analyze(text: AnalyzeArgument) -> AnalyzeResult:
schema = StructType()
for index, word in enumerate(sorted(list(set(text.value.split(" "))))):
schema = schema.add(f"word_{index}", IntegerType())
return AnalyzeResult(schema=schema)
def eval(self, text: str):
counts = {}
for word in text.split(" "):
if word not in counts:
counts[word] = 0
counts[word] += 1
result = []
for word in sorted(list(set(text.split(" ")))):
result.append(counts[word])
yield result
['word_0', 'word_1']
將狀態轉送至未來的 eval
呼叫
analyze
方法可作為執行初始化的便利位置,然後將結果轉送至相同 UDTF 呼叫的未來 eval
方法調用。
若要這樣做,請建立 AnalyzeResult
的子類別,並從 analyze
方法傳回子類別的實例。
然後,將額外的自變數新增至 __init__
方法,以接受該實例。
此 analyze
範例會傳回常數輸出架構,但在未來 __init__
方法呼叫所取用的結果元數據中新增自定義資訊:
@dataclass
class AnalyzeResultWithBuffer(AnalyzeResult):
buffer: str = ""
@udtf
class TestUDTF:
def __init__(self, analyze_result=None):
self._total = 0
if analyze_result is not None:
self._buffer = analyze_result.buffer
else:
self._buffer = ""
@staticmethod
def analyze(argument, _) -> AnalyzeResult:
if (
argument.value is None
or argument.isTable
or not isinstance(argument.value, str)
or len(argument.value) == 0
):
raise Exception("The first argument must be a non-empty string")
assert argument.dataType == StringType()
assert not argument.isTable
return AnalyzeResultWithBuffer(
schema=StructType()
.add("total", IntegerType())
.add("buffer", StringType()),
withSinglePartition=True,
buffer=argument.value,
)
def eval(self, argument, row: Row):
self._total += 1
def terminate(self):
yield self._total, self._buffer
self.spark.udtf.register("test_udtf", TestUDTF)
spark.sql(
"""
WITH t AS (
SELECT id FROM range(1, 21)
)
SELECT total, buffer
FROM test_udtf("abc", TABLE(t))
"""
).show()
+-------+-------+
| count | buffer|
+-------+-------+
| 20 | "abc"|
+-------+-------+
產生輸出數據列
eval
方法會針對輸入數據表自變數的每個數據列執行一次(如果未提供任何數據表自變數,則只執行一次),後面接著最後一個叫用 terminate
方法。 方法會透過產生元組、清單或 pyspark.sql.Row
物件,輸出符合結果模式的零行或多行資料。
此範例會透過提供一個含有三個元素的元組來傳回資料列:
def eval(self, x, y, z):
yield (x, y, z)
您也可以省略括弧:
def eval(self, x, y, z):
yield x, y, z
新增尾端逗號以傳回僅有一欄的資料列:
def eval(self, x, y, z):
yield x,
您也可以產生 pyspark.sql.Row
物件。
def eval(self, x, y, z)
from pyspark.sql.types import Row
yield Row(x, y, z)
此範例會使用 Python 清單,從 terminate
方法產生輸出資料列。 您可以從 UDTF 評估中的先前步驟,將此狀態儲存在 類別內,以達到此目的。
def terminate(self):
yield [self.x, self.y, self.z]
將純量自變數傳遞至UDTF
您可以將純量參數作為常值表達式傳遞至 UDTF,此常值表達式由文字值或基於文字值的函式組成。 例如:
SELECT * FROM udtf(42, group => upper("finance_department"));
將數據表自變數傳遞至UDTF
除了純量輸入自變數之外,Python UDF 還可以接受輸入數據表做為自變數。 單一 UDTF 也可以接受數據表自變數和多個純量自變數。
然後,任何 SQL 查詢都可以使用 TABLE
關鍵詞提供輸入數據表,後面接著括在適當數據表標識符周圍的括弧,例如 TABLE(t)
。 或者,您可以傳遞資料表子查詢,例如 TABLE(SELECT a, b, c FROM t)
或 TABLE(SELECT t1.a, t2.b FROM t1 INNER JOIN t2 USING (key))
。
然後,輸入表格自變數會表示為 pyspark.sql.Row
方法的 eval
自變數,並針對輸入表格中的每一行呼叫 eval
方法。 您可以使用標準的 PySpark 欄位註解來處理每個資料列中的欄位。 下列範例示範明確匯入 PySpark Row
類型,然後篩選 id
欄位上傳遞的數據表:
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row
@udtf(returnType="id: int")
class FilterUDTF:
def eval(self, row: Row):
if row["id"] > 5:
yield row["id"],
spark.udtf.register("filter_udtf", FilterUDTF)
若要查詢函式,請使用 TABLE
SQL 關鍵詞:
SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)));
+---+
| id|
+---+
| 6|
| 7|
| 8|
| 9|
+---+
從函數調用中的輸入行確定分區方法
使用數據表自變數呼叫 UDTF 時,任何 SQL 查詢都可以根據一或多個輸入資料表數據行的值,將輸入數據表分割成數個 UDTF 呼叫。
若要指定分割區,請在 PARTITION BY
自變數之後,在函數調用中使用 TABLE
子句。
這保證了具有分割列之值的每個唯一組合的所有輸入資料行,僅會由UDTF類別的一個實例取用。
請注意,除了簡單的欄位參考之外,PARTITION BY
子句也可以根據輸入表格的欄位接受任意表達式。 例如,您可以指定字串的 LENGTH
、從日期擷取月份,或串連兩個值。
您也可以指定 WITH SINGLE PARTITION
,而不是 PARTITION BY
,以僅要求一個分割區,其中所有輸入數據行必須由 UDTF 類別的一個且僅有一個實例取用。
在每個分割區中,您可以選擇性地指定輸入數據列的必要順序,因為 UDTF 的 eval
方法會取用它們。 若要這樣做,請在上述 ORDER BY
或 PARTITION BY
子句後面提供 WITH SINGLE PARTITION
子句。
例如,請考慮以下的 UDTF:
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row
@udtf(returnType="a: string, b: int")
class FilterUDTF:
def __init__(self):
self.key = ""
self.max = 0
def eval(self, row: Row):
self.key = row["a"]
self.max = max(self.max, row["b"])
def terminate(self):
yield self.key, self.max
spark.udtf.register("filter_udtf", FilterUDTF)
在輸入資料表上呼叫 UDTF 時,您可以以多種方式指定資料分割選項:
-- Create an input table with some example values.
DROP TABLE IF EXISTS values_table;
CREATE TABLE values_table (a STRING, b INT);
INSERT INTO values_table VALUES ('abc', 2), ('abc', 4), ('def', 6), ('def', 8)";
SELECT * FROM values_table;
+-------+----+
| a | b |
+-------+----+
| "abc" | 2 |
| "abc" | 4 |
| "def" | 6 |
| "def" | 8 |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique value in the `a` column are processed by the same
-- instance of the UDTF class. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY a ORDER BY b) ORDER BY 1;
+-------+----+
| a | b |
+-------+----+
| "abc" | 4 |
| "def" | 8 |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique result of evaluating the "LENGTH(a)" expression are
-- processed by the same instance of the UDTF class. Within each partition, the rows are ordered
-- by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY LENGTH(a) ORDER BY b) ORDER BY 1;
+-------+---+
| a | b |
+-------+---+
| "def" | 8 |
+-------+---+
-- Query the UDTF with the input table as an argument and a directive to consider all the input
-- rows in one single partition such that exactly one instance of the UDTF class consumes all of
-- the input rows. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) WITH SINGLE PARTITION ORDER BY b) ORDER BY 1;
+-------+----+
| a | b |
+-------+----+
| "def" | 8 |
+-------+----+
從 analyze
方法指定輸入列的分區
請注意,當在 SQL 查詢中呼叫 UDTF 時,針對上述每一種分割輸入表格的方式,UDTF 的 analyze
方法都有對應的方式來自動指定相同的分割方法。
- 您可以更新
SELECT * FROM udtf(TABLE(t) PARTITION BY a)
方法來設定字段analyze
,並使用partitionBy=[PartitioningColumn("a")]
來呼叫函式,而不是將UDTF呼叫為SELECT * FROM udtf(TABLE(t))
。 - 同樣地,您無需在 SQL 查詢中指定
TABLE(t) WITH SINGLE PARTITION ORDER BY b
,而是可以讓analyze
設定欄位withSinglePartition=true
和orderBy=[OrderingColumn("b")]
,然後只需傳遞TABLE(t)
。 - 您可以讓
TABLE(SELECT a FROM t)
設定analyze
,然後只傳遞select=[SelectedColumn("a")]
,而不是在 SQL 查詢中傳遞TABLE(t)
。
在下列範例中,analyze
傳回常數輸出架構、從輸入數據表中選取數據行的子集,並指定根據 date
數據行的值,將輸入數據表分割成數個 UDTF 呼叫:
@staticmethod
def analyze(*args) -> AnalyzeResult:
"""
The input table will be partitioned across several UDTF calls based on the monthly
values of each `date` column. The rows within each partition will arrive ordered by the `date`
column. The UDTF will only receive the `date` and `word` columns from the input table.
"""
from pyspark.sql.functions import (
AnalyzeResult,
OrderingColumn,
PartitioningColumn,
)
assert len(args) == 1, "This function accepts one argument only"
assert args[0].isTable, "Only table arguments are supported"
return AnalyzeResult(
schema=StructType()
.add("month", DateType())
.add('longest_word", IntegerType()),
partitionBy=[
PartitioningColumn("extract(month from date)")],
orderBy=[
OrderingColumn("date")],
select=[
SelectedColumn("date"),
SelectedColumn(
name="length(word),
alias="length_word")])