Python UDF(사용자 정의 테이블 함수)
Important
이 기능은 Databricks Runtime 14.3 LTS 이상의 공개 미리 보기 에 있습니다.
UDTF(사용자 정의 테이블 함수)를 사용하면 스칼라 값 대신 테이블을 반환하는 함수를 등록할 수 있습니다. 각 호출에서 단일 결과 값을 반환하는 스칼라 함수와 달리 각 UDTF는 SQL 문의 FROM
절에서 호출되고 전체 테이블을 출력으로 반환합니다.
각 UDTF 호출은 0개 이상의 인수를 수락할 수 있습니다. 이러한 인수는 전체 입력 테이블을 나타내는 스칼라 식 또는 테이블 인수일 수 있습니다.
기본 UDTF 구문
Apache Spark는 출력 행을 내보내는 데 사용하는 필수 eval
메서드를 사용하여 yield
Python UDF를 Python 클래스로 구현합니다.
클래스를 UDTF로 사용하려면 PySpark udtf
함수를 가져와야 합니다. Databricks는 이 함수를 데코레이터로 사용하고 옵션을 사용하여 returnType
필드 이름 및 형식을 명시적으로 지정하는 것이 좋습니다(클래스가 이후 섹션에 설명된 대로 메서드를 analyze
정의하지 않는 한).
다음 UDTF는 두 정수 인수의 고정 목록을 사용하여 테이블을 만듭니다.
from pyspark.sql.functions import lit, udtf
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, x: int, y: int):
yield x + y, x - y
GetSumDiff(lit(1), lit(2)).show()
+----+-----+
| sum| diff|
+----+-----+
| 3| -1|
+----+-----+
UDTF 등록
UDF는 로컬 SparkSession
에 등록되고 Notebook 또는 작업 수준에서 격리됩니다.
Unity 카탈로그에서 UDF를 개체로 등록할 수 없으며 SQL 웨어하우스에서 UDF를 사용할 수 없습니다.
함수spark.udtf.register()
를 사용하여 SQL 쿼리에서 사용하기 위해 UDTF를 현재 SparkSession
에 등록할 수 있습니다. SQL 함수 및 Python UDTF 클래스의 이름을 제공합니다.
spark.udtf.register("get_sum_diff", GetSumDiff)
등록된 UDTF 호출
등록되면 매직 명령 또는 spark.sql()
함수를 사용하여 SQL에서 UDTF를 %sql
사용할 수 있습니다.
spark.udtf.register("get_sum_diff", GetSumDiff)
spark.sql("SELECT * FROM get_sum_diff(1,2);")
%sql
SELECT * FROM get_sum_diff(1,2);
Apache 화살표 사용
UDTF가 소량의 데이터를 입력으로 수신하지만 큰 테이블을 출력하는 경우 Databricks는 Apache Arrow를 사용하는 것이 좋습니다. UDTF를 선언할 때 매개 변수를 useArrow
지정하여 사용하도록 설정할 수 있습니다.
@udtf(returnType="c1: int, c2: int", useArrow=True)
변수 인수 목록 - *args 및 **kwargs
Python *args
또는 **kwargs
구문을 사용하고 논리를 구현하여 지정되지 않은 수의 입력 값을 처리할 수 있습니다.
다음 예제에서는 인수의 입력 길이 및 형식을 명시적으로 확인하면서 동일한 결과를 반환합니다.
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, *args):
assert(len(args) == 2)
assert(isinstance(arg, int) for arg in args)
x = args[0]
y = args[1]
yield x + y, x - y
GetSumDiff(lit(1), lit(2)).show()
다음은 동일한 예제이지만 키워드 인수를 사용하는 예제입니다.
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, **kwargs):
x = kwargs["x"]
y = kwargs["y"]
yield x + y, x - y
GetSumDiff(x=lit(1), y=lit(2)).show()
등록 시 정적 스키마 정의
UDTF는 순서가 지정된 열 이름 및 형식 시퀀스로 구성된 출력 스키마가 있는 행을 반환합니다. 모든 쿼리에 대해 UDTF 스키마가 항상 동일하게 유지되어야 하는 경우 데코레이터 뒤의 고정된 고정 스키마를 @udtf
지정할 수 있습니다. 다음 중 하나여야 합니다.StructType
StructType().add("c1", StringType())
또는 구조체 형식을 나타내는 DDL 문자열입니다.
c1: string
함수 호출 시 동적 스키마 컴퓨팅
UDF는 입력 인수의 값에 따라 각 호출에 대해 프로그래밍 방식으로 출력 스키마를 계산할 수도 있습니다. 이렇게 하려면 특정 UDTF 호출에 제공된 인수에 해당하는 매개 변수를 0개 이상 허용하는 정 analyze
적 메서드를 정의합니다.
메서드의 analyze
각 인수는 다음 필드를 포함하는 클래스의 AnalyzeArgument
인스턴스입니다.
AnalyzeArgument 클래스 필드 |
설명 |
---|---|
dataType |
입력 인수의 형식입니다 DataType . 입력 테이블 인수의 경우 테이블 StructType 의 열을 나타내는 인수입니다. |
value |
입력 인수의 값입니다 Optional[Any] . None 이는 상수가 아닌 테이블 인수 또는 리터럴 스칼라 인수에 대한 것입니다. |
isTable |
입력 인수가 테이블인지 여부를 지정합니다 BooleanType . |
isConstantExpression |
입력 인수가 상수 접이식 식 BooleanType 인지 여부를 나타냅니다. |
메서드는 analyze
결과 테이블의 AnalyzeResult
스키마와 일부 선택적 필드가 포함된 클래스의 인스턴스를 StructType
반환합니다. UDTF가 입력 테이블 인수 AnalyzeResult
를 수락하는 경우 나중에 설명한 대로 여러 UDTF 호출에서 입력 테이블의 행을 분할하고 정렬하는 요청된 방법을 포함할 수도 있습니다.
AnalyzeResult 클래스 필드 |
설명 |
---|---|
schema |
결과 테이블의 스키마입니다 StructType . |
withSinglePartition |
모든 입력 행을 동일한 UDTF 클래스 인스턴스로 BooleanType 보낼지 여부를 나타냅니다. |
partitionBy |
비어있지 않은 것으로 설정하면 분할 식의 각 고유한 값 조합이 있는 모든 행이 UDTF 클래스의 별도 인스턴스에서 사용됩니다. |
orderBy |
비어있지 않은 행으로 설정하면 각 파티션 내의 행 순서가 지정됩니다. |
select |
비어있지 않은 것으로 설정하면 UDTF가 Catalyst가 입력 TABLE 인수의 열에 대해 평가하도록 지정하는 식의 시퀀스입니다. UDTF는 목록의 각 이름에 대해 나열된 순서대로 하나의 입력 특성을 받습니다. |
다음은 analyze
입력 문자열 인수의 각 단어에 대해 하나의 출력 열을 반환하는 예제입니다.
@udtf
class MyUDTF:
@staticmethod
def analyze(text: AnalyzeArgument) -> AnalyzeResult:
schema = StructType()
for index, word in enumerate(sorted(list(set(text.value.split(" "))))):
schema = schema.add(f"word_{index}", IntegerType())
return AnalyzeResult(schema=schema)
def eval(self, text: str):
counts = {}
for word in text.split(" "):
if word not in counts:
counts[word] = 0
counts[word] += 1
result = []
for word in sorted(list(set(text.split(" ")))):
result.append(counts[word])
yield result
['word_0', 'word_1']
이후 eval
호출에 상태 전달
이 메서드는 analyze
초기화를 수행한 다음 동일한 UDTF 호출에 대한 이후 eval
메서드 호출에 결과를 전달하는 편리한 위치로 사용될 수 있습니다.
이렇게 하려면 서브클래스를 AnalyzeResult
만들고 메서드에서 서브클래스의 인스턴스를 analyze
반환합니다.
그런 다음 메서드에 인수를 __init__
추가하여 해당 인스턴스를 수락합니다.
다음은 analyze
상수 출력 스키마를 반환하지만 이후 __init__
메서드 호출에서 사용할 결과 메타데이터에 사용자 지정 정보를 추가하는 예제입니다.
@dataclass
class AnalyzeResultWithBuffer(AnalyzeResult):
buffer: str = ""
@udtf
class TestUDTF:
def __init__(self, analyze_result=None):
self._total = 0
if analyze_result is not None:
self._buffer = analyze_result.buffer
else:
self._buffer = ""
@staticmethod
def analyze(argument, _) -> AnalyzeResult:
if (
argument.value is None
or argument.isTable
or not isinstance(argument.value, str)
or len(argument.value) == 0
):
raise Exception("The first argument must be a non-empty string")
assert argument.dataType == StringType()
assert not argument.isTable
return AnalyzeResultWithBuffer(
schema=StructType()
.add("total", IntegerType())
.add("buffer", StringType()),
withSinglePartition=True,
buffer=argument.value,
)
def eval(self, argument, row: Row):
self._total += 1
def terminate(self):
yield self._total, self._buffer
self.spark.udtf.register("test_udtf", TestUDTF)
spark.sql(
"""
WITH t AS (
SELECT id FROM range(1, 21)
)
SELECT total, buffer
FROM test_udtf("abc", TABLE(t))
"""
).show()
+-------+-------+
| count | buffer|
+-------+-------+
| 20 | "abc"|
+-------+-------+
출력 행 수율
이 메서드는 eval
입력 테이블 인수의 각 행에 대해 한 번 실행되며(또는 테이블 인수가 제공되지 않은 경우 한 번만), 끝에 메서드를 한 번 호출 terminate
합니다. 두 메서드는 튜플, 목록 또는 pyspark.sql.Row
개체를 생성하여 결과 스키마를 준수하는 0개 이상의 행을 출력합니다.
이 예제에서는 다음 세 가지 요소의 튜플을 제공하여 행을 반환합니다.
def eval(self, x, y, z):
yield (x, y, z)
괄호를 생략할 수도 있습니다.
def eval(self, x, y, z):
yield x, y, z
열이 하나만 있는 행을 반환하는 후행 쉼표 추가:
def eval(self, x, y, z):
yield x,
개체를 생성할 pyspark.sql.Row
수도 있습니다.
def eval(self, x, y, z)
from pyspark.sql.types import Row
yield Row(x, y, z)
다음은 Python 목록을 사용하여 메서드에서 출력 행을 terminate
생성하는 예제입니다. 이 목적을 위해 UDTF 평가의 이전 단계에서 클래스 내에 상태를 저장할 수 있습니다.
def terminate(self):
yield [self.x, self.y, self.z]
UDTF에 스칼라 인수 전달
스칼라 인수를 UDTF에 리터럴 값 또는 함수를 기반으로 하는 상수 식으로 전달할 수 있습니다. 예시:
SELECT * FROM udtf(42, group => upper("finance_department"));
UDTF에 테이블 인수 전달
Python UDF는 스칼라 입력 인수 외에도 입력 테이블을 인수로 수락할 수 있습니다. 단일 UDTF는 테이블 인수와 여러 스칼라 인수를 수락할 수도 있습니다.
그런 다음 모든 SQL 쿼리는 키워드를 사용하여 TABLE
입력 테이블을 제공할 수 있으며, 그 다음에는 다음과 같이 TABLE(t)
적절한 테이블 식별자를 둘러싼 괄호가 옵니다. 또는 테이블 하위 쿼리(예 TABLE(SELECT a, b, c FROM t)
: 또는 TABLE(SELECT t1.a, t2.b FROM t1 INNER JOIN t2 USING (key))
.)를 전달할 수 있습니다.
그런 다음 입력 테이블 인수는 입력 테이블의 eval
각 행에 대해 메서드를 한 번 호출하여 eval
메서드에 대한 인수로 pyspark.sql.Row
표시됩니다. 표준 PySpark 열 필드 주석을 사용하여 각 행의 열과 상호 작용할 수 있습니다. 다음 예제에서는 PySpark Row
형식을 명시적으로 가져온 다음, 필드에서 전달된 테이블을 id
필터링하는 방법을 보여 줍니다.
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row
@udtf(returnType="id: int")
class FilterUDTF:
def eval(self, row: Row):
if row["id"] > 5:
yield row["id"],
spark.udtf.register("filter_udtf", FilterUDTF)
함수를 쿼리하려면 SQL 키워드를 TABLE
사용합니다.
SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)));
+---+
| id|
+---+
| 6|
| 7|
| 8|
| 9|
+---+
함수 호출에서 입력 행 분할 지정
테이블 인수를 사용하여 UDTF를 호출할 때 SQL 쿼리는 하나 이상의 입력 테이블 열 값에 따라 여러 UDTF 호출에서 입력 테이블을 분할할 수 있습니다.
파티션을 지정하려면 인수 뒤의 PARTITION BY
함수 호출에서 절을 TABLE
사용합니다.
이렇게 하면 분할 열의 각 고유한 값 조합이 있는 모든 입력 행이 정확히 하나의 UDTF 클래스 인스턴스에서 사용됩니다.
이 절은 단순 열 참조 PARTITION BY
외에도 입력 테이블 열을 기반으로 하는 임의의 식을 허용합니다. 예를 들어 문자열을 LENGTH
지정하거나, 날짜에서 월을 추출하거나, 두 값을 연결할 수 있습니다.
모든 입력 행을 정확히 하나의 UDTF 클래스 인스턴스에서 사용해야 하는 파티션을 하나만 요청하는 대신 PARTITION BY
지정할 WITH SINGLE PARTITION
수도 있습니다.
각 파티션 내에서 UDTF의 메서드에서 입력 행을 사용할 때 입력 행의 eval
필수 순서를 선택적으로 지정할 수 있습니다. 이렇게 하려면 위에서 설명한 ORDER BY
또는 WITH SINGLE PARTITION
절 다음에 절을 PARTITION BY
입력합니다.
예를 들어 다음 UDTF를 고려합니다.
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row
@udtf(returnType="a: string, b: int")
class FilterUDTF:
def __init__(self):
self.key = ""
self.max = 0
def eval(self, row: Row):
self.key = row["a"]
self.max = max(self.max, row["b"])
def terminate(self):
yield self.key, self.max
spark.udtf.register("filter_udtf", FilterUDTF)
입력 테이블을 통해 UDTF를 호출할 때 다음과 같은 방법으로 분할 옵션을 지정할 수 있습니다.
-- Create an input table with some example values.
DROP TABLE IF EXISTS values_table;
CREATE TABLE values_table (a STRING, b INT);
INSERT INTO values_table VALUES ('abc', 2), ('abc', 4), ('def', 6), ('def', 8)";
SELECT * FROM values_table;
+-------+----+
| a | b |
+-------+----+
| "abc" | 2 |
| "abc" | 4 |
| "def" | 6 |
| "def" | 8 |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique value in the `a` column are processed by the same
-- instance of the UDTF class. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY a ORDER BY b) ORDER BY 1;
+-------+----+
| a | b |
+-------+----+
| "abc" | 4 |
| "def" | 8 |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique result of evaluating the "LENGTH(a)" expression are
-- processed by the same instance of the UDTF class. Within each partition, the rows are ordered
-- by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY LENGTH(a) ORDER BY b) ORDER BY 1;
+-------+---+
| a | b |
+-------+---+
| "def" | 8 |
+-------+---+
-- Query the UDTF with the input table as an argument and a directive to consider all the input
-- rows in one single partition such that exactly one instance of the UDTF class consumes all of
-- the input rows. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) WITH SINGLE PARTITION ORDER BY b) ORDER BY 1;
+-------+----+
| a | b |
+-------+----+
| "def" | 8 |
+-------+----+
메서드에서 입력 행 분할 지정 analyze
SQL 쿼리에서 UDTF를 호출할 때 입력 테이블을 분할하는 위의 각 방법에 대해 UDTF의 analyze
메서드가 동일한 분할 방법을 자동으로 지정하는 해당 방법이 있습니다.
- UDTF를 UDTF로
SELECT * FROM udtf(TABLE(t) PARTITION BY a)
호출하는 대신, 메서드를analyze
업데이트하여 필드를partitionBy=[PartitioningColumn("a")]
SELECT * FROM udtf(TABLE(t))
설정하고 . - 동일한 토큰을 사용하여 SQL 쿼리에서 지정하는
TABLE(t) WITH SINGLE PARTITION ORDER BY b
대신 필드를withSinglePartition=true
orderBy=[OrderingColumn("b")]
설정한 다음 전달할TABLE(t)
수 있습니다analyze
. - SQL 쿼리를 전달하는
TABLE(SELECT a FROM t)
대신 집합select=[SelectedColumn("a")]
을 만들고analyze
그냥 전달할TABLE(t)
수 있습니다.
다음 예제 analyze
에서는 상수 출력 스키마를 반환하고, 입력 테이블에서 열의 하위 집합을 선택하고, 입력 테이블이 열 값 date
에 따라 여러 UDTF 호출에서 분할되도록 지정합니다.
@staticmethod
def analyze(*args) -> AnalyzeResult:
"""
The input table will be partitioned across several UDTF calls based on the monthly
values of each `date` column. The rows within each partition will arrive ordered by the `date`
column. The UDTF will only receive the `date` and `word` columns from the input table.
"""
from pyspark.sql.functions import (
AnalyzeResult,
OrderingColumn,
PartitioningColumn,
)
assert len(args) == 1, "This function accepts one argument only"
assert args[0].isTable, "Only table arguments are supported"
return AnalyzeResult(
schema=StructType()
.add("month", DateType())
.add('longest_word", IntegerType()),
partitionBy=[
PartitioningColumn("extract(month from date)")],
orderBy=[
OrderingColumn("date")],
select=[
SelectedColumn("date"),
SelectedColumn(
name="length(word),
alias="length_word")])