다음을 통해 공유


Python UDF(사용자 정의 테이블 함수)

Important

이 기능은 Databricks Runtime 14.3 LTS 이상의 공개 미리 보기 에 있습니다.

UDTF(사용자 정의 테이블 함수)를 사용하면 스칼라 값 대신 테이블을 반환하는 함수를 등록할 수 있습니다. 각 호출에서 단일 결과 값을 반환하는 스칼라 함수와 달리 각 UDTF는 SQL 문의 FROM 절에서 호출되고 전체 테이블을 출력으로 반환합니다.

각 UDTF 호출은 0개 이상의 인수를 수락할 수 있습니다. 이러한 인수는 전체 입력 테이블을 나타내는 스칼라 식 또는 테이블 인수일 수 있습니다.

기본 UDTF 구문

Apache Spark는 출력 행을 내보내는 데 사용하는 필수 eval 메서드를 사용하여 yield Python UDF를 Python 클래스로 구현합니다.

클래스를 UDTF로 사용하려면 PySpark udtf 함수를 가져와야 합니다. Databricks는 이 함수를 데코레이터로 사용하고 옵션을 사용하여 returnType 필드 이름 및 형식을 명시적으로 지정하는 것이 좋습니다(클래스가 이후 섹션에 설명된 대로 메서드를 analyze 정의하지 않는 한).

다음 UDTF는 두 정수 인수의 고정 목록을 사용하여 테이블을 만듭니다.

from pyspark.sql.functions import lit, udtf

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, x: int, y: int):
        yield x + y, x - y

GetSumDiff(lit(1), lit(2)).show()
+----+-----+
| sum| diff|
+----+-----+
|   3|   -1|
+----+-----+

UDTF 등록

UDF는 로컬 SparkSession 에 등록되고 Notebook 또는 작업 수준에서 격리됩니다.

Unity 카탈로그에서 UDF를 개체로 등록할 수 없으며 SQL 웨어하우스에서 UDF를 사용할 수 없습니다.

함수spark.udtf.register()를 사용하여 SQL 쿼리에서 사용하기 위해 UDTF를 현재 SparkSession 에 등록할 수 있습니다. SQL 함수 및 Python UDTF 클래스의 이름을 제공합니다.

spark.udtf.register("get_sum_diff", GetSumDiff)

등록된 UDTF 호출

등록되면 매직 명령 또는 spark.sql() 함수를 사용하여 SQL에서 UDTF를 %sql 사용할 수 있습니다.

spark.udtf.register("get_sum_diff", GetSumDiff)
spark.sql("SELECT * FROM get_sum_diff(1,2);")
%sql
SELECT * FROM get_sum_diff(1,2);

Apache 화살표 사용

UDTF가 소량의 데이터를 입력으로 수신하지만 큰 테이블을 출력하는 경우 Databricks는 Apache Arrow를 사용하는 것이 좋습니다. UDTF를 선언할 때 매개 변수를 useArrow 지정하여 사용하도록 설정할 수 있습니다.

@udtf(returnType="c1: int, c2: int", useArrow=True)

변수 인수 목록 - *args 및 **kwargs

Python *args 또는 **kwargs 구문을 사용하고 논리를 구현하여 지정되지 않은 수의 입력 값을 처리할 수 있습니다.

다음 예제에서는 인수의 입력 길이 및 형식을 명시적으로 확인하면서 동일한 결과를 반환합니다.

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, *args):
        assert(len(args) == 2)
        assert(isinstance(arg, int) for arg in args)
        x = args[0]
        y = args[1]
        yield x + y, x - y

GetSumDiff(lit(1), lit(2)).show()

다음은 동일한 예제이지만 키워드 인수를 사용하는 예제입니다.

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, **kwargs):
        x = kwargs["x"]
        y = kwargs["y"]
        yield x + y, x - y

GetSumDiff(x=lit(1), y=lit(2)).show()

등록 시 정적 스키마 정의

UDTF는 순서가 지정된 열 이름 및 형식 시퀀스로 구성된 출력 스키마가 있는 행을 반환합니다. 모든 쿼리에 대해 UDTF 스키마가 항상 동일하게 유지되어야 하는 경우 데코레이터 뒤의 고정된 고정 스키마를 @udtf 지정할 수 있습니다. 다음 중 하나여야 합니다.StructType

StructType().add("c1", StringType())

또는 구조체 형식을 나타내는 DDL 문자열입니다.

c1: string

함수 호출 시 동적 스키마 컴퓨팅

UDF는 입력 인수의 값에 따라 각 호출에 대해 프로그래밍 방식으로 출력 스키마를 계산할 수도 있습니다. 이렇게 하려면 특정 UDTF 호출에 제공된 인수에 해당하는 매개 변수를 0개 이상 허용하는 정 analyze 적 메서드를 정의합니다.

메서드의 analyze 각 인수는 다음 필드를 포함하는 클래스의 AnalyzeArgument 인스턴스입니다.

AnalyzeArgument 클래스 필드 설명
dataType 입력 인수의 형식입니다 DataType. 입력 테이블 인수의 경우 테이블 StructType 의 열을 나타내는 인수입니다.
value 입력 인수의 값입니다 Optional[Any]. None 이는 상수가 아닌 테이블 인수 또는 리터럴 스칼라 인수에 대한 것입니다.
isTable 입력 인수가 테이블인지 여부를 지정합니다 BooleanType.
isConstantExpression 입력 인수가 상수 접이식 식 BooleanType인지 여부를 나타냅니다.

메서드는 analyze 결과 테이블의 AnalyzeResult 스키마와 일부 선택적 필드가 포함된 클래스의 인스턴스를 StructType 반환합니다. UDTF가 입력 테이블 인수 AnalyzeResult 를 수락하는 경우 나중에 설명한 대로 여러 UDTF 호출에서 입력 테이블의 행을 분할하고 정렬하는 요청된 방법을 포함할 수도 있습니다.

AnalyzeResult 클래스 필드 설명
schema 결과 테이블의 스키마입니다 StructType.
withSinglePartition 모든 입력 행을 동일한 UDTF 클래스 인스턴스로 BooleanType보낼지 여부를 나타냅니다.
partitionBy 비어있지 않은 것으로 설정하면 분할 식의 각 고유한 값 조합이 있는 모든 행이 UDTF 클래스의 별도 인스턴스에서 사용됩니다.
orderBy 비어있지 않은 행으로 설정하면 각 파티션 내의 행 순서가 지정됩니다.
select 비어있지 않은 것으로 설정하면 UDTF가 Catalyst가 입력 TABLE 인수의 열에 대해 평가하도록 지정하는 식의 시퀀스입니다. UDTF는 목록의 각 이름에 대해 나열된 순서대로 하나의 입력 특성을 받습니다.

다음은 analyze 입력 문자열 인수의 각 단어에 대해 하나의 출력 열을 반환하는 예제입니다.

@udtf
class MyUDTF:
  @staticmethod
  def analyze(text: AnalyzeArgument) -> AnalyzeResult:
    schema = StructType()
    for index, word in enumerate(sorted(list(set(text.value.split(" "))))):
      schema = schema.add(f"word_{index}", IntegerType())
    return AnalyzeResult(schema=schema)

  def eval(self, text: str):
    counts = {}
    for word in text.split(" "):
      if word not in counts:
            counts[word] = 0
      counts[word] += 1
    result = []
    for word in sorted(list(set(text.split(" ")))):
      result.append(counts[word])
    yield result
['word_0', 'word_1']

이후 eval 호출에 상태 전달

이 메서드는 analyze 초기화를 수행한 다음 동일한 UDTF 호출에 대한 이후 eval 메서드 호출에 결과를 전달하는 편리한 위치로 사용될 수 있습니다.

이렇게 하려면 서브클래스를 AnalyzeResult 만들고 메서드에서 서브클래스의 인스턴스를 analyze 반환합니다. 그런 다음 메서드에 인수를 __init__ 추가하여 해당 인스턴스를 수락합니다.

다음은 analyze 상수 출력 스키마를 반환하지만 이후 __init__ 메서드 호출에서 사용할 결과 메타데이터에 사용자 지정 정보를 추가하는 예제입니다.

@dataclass
class AnalyzeResultWithBuffer(AnalyzeResult):
    buffer: str = ""

@udtf
class TestUDTF:
  def __init__(self, analyze_result=None):
    self._total = 0
    if analyze_result is not None:
      self._buffer = analyze_result.buffer
    else:
      self._buffer = ""

  @staticmethod
  def analyze(argument, _) -> AnalyzeResult:
    if (
      argument.value is None
      or argument.isTable
      or not isinstance(argument.value, str)
      or len(argument.value) == 0
    ):
      raise Exception("The first argument must be a non-empty string")
    assert argument.dataType == StringType()
    assert not argument.isTable
    return AnalyzeResultWithBuffer(
      schema=StructType()
        .add("total", IntegerType())
        .add("buffer", StringType()),
      withSinglePartition=True,
      buffer=argument.value,
    )

  def eval(self, argument, row: Row):
    self._total += 1

  def terminate(self):
    yield self._total, self._buffer

self.spark.udtf.register("test_udtf", TestUDTF)

spark.sql(
  """
  WITH t AS (
    SELECT id FROM range(1, 21)
  )
  SELECT total, buffer
  FROM test_udtf("abc", TABLE(t))
  """
).show()
+-------+-------+
| count | buffer|
+-------+-------+
|    20 |  "abc"|
+-------+-------+

출력 행 수율

이 메서드는 eval 입력 테이블 인수의 각 행에 대해 한 번 실행되며(또는 테이블 인수가 제공되지 않은 경우 한 번만), 끝에 메서드를 한 번 호출 terminate 합니다. 두 메서드는 튜플, 목록 또는 pyspark.sql.Row 개체를 생성하여 결과 스키마를 준수하는 0개 이상의 행을 출력합니다.

이 예제에서는 다음 세 가지 요소의 튜플을 제공하여 행을 반환합니다.

def eval(self, x, y, z):
  yield (x, y, z)

괄호를 생략할 수도 있습니다.

def eval(self, x, y, z):
  yield x, y, z

열이 하나만 있는 행을 반환하는 후행 쉼표 추가:

def eval(self, x, y, z):
  yield x,

개체를 생성할 pyspark.sql.Row 수도 있습니다.

def eval(self, x, y, z)
  from pyspark.sql.types import Row
  yield Row(x, y, z)

다음은 Python 목록을 사용하여 메서드에서 출력 행을 terminate 생성하는 예제입니다. 이 목적을 위해 UDTF 평가의 이전 단계에서 클래스 내에 상태를 저장할 수 있습니다.

def terminate(self):
  yield [self.x, self.y, self.z]

UDTF에 스칼라 인수 전달

스칼라 인수를 UDTF에 리터럴 값 또는 함수를 기반으로 하는 상수 식으로 전달할 수 있습니다. 예시:

SELECT * FROM udtf(42, group => upper("finance_department"));

UDTF에 테이블 인수 전달

Python UDF는 스칼라 입력 인수 외에도 입력 테이블을 인수로 수락할 수 있습니다. 단일 UDTF는 테이블 인수와 여러 스칼라 인수를 수락할 수도 있습니다.

그런 다음 모든 SQL 쿼리는 키워드를 사용하여 TABLE 입력 테이블을 제공할 수 있으며, 그 다음에는 다음과 같이 TABLE(t)적절한 테이블 식별자를 둘러싼 괄호가 옵니다. 또는 테이블 하위 쿼리(예 TABLE(SELECT a, b, c FROM t) : 또는 TABLE(SELECT t1.a, t2.b FROM t1 INNER JOIN t2 USING (key)).)를 전달할 수 있습니다.

그런 다음 입력 테이블 인수는 입력 테이블의 eval 각 행에 대해 메서드를 한 번 호출하여 eval 메서드에 대한 인수로 pyspark.sql.Row 표시됩니다. 표준 PySpark 열 필드 주석을 사용하여 각 행의 열과 상호 작용할 수 있습니다. 다음 예제에서는 PySpark Row 형식을 명시적으로 가져온 다음, 필드에서 전달된 테이블을 id 필터링하는 방법을 보여 줍니다.

from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="id: int")
class FilterUDTF:
    def eval(self, row: Row):
        if row["id"] > 5:
            yield row["id"],

spark.udtf.register("filter_udtf", FilterUDTF)

함수를 쿼리하려면 SQL 키워드를 TABLE 사용합니다.

SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)));
+---+
| id|
+---+
|  6|
|  7|
|  8|
|  9|
+---+

함수 호출에서 입력 행 분할 지정

테이블 인수를 사용하여 UDTF를 호출할 때 SQL 쿼리는 하나 이상의 입력 테이블 열 값에 따라 여러 UDTF 호출에서 입력 테이블을 분할할 수 있습니다.

파티션을 지정하려면 인수 뒤의 PARTITION BY 함수 호출에서 절을 TABLE 사용합니다. 이렇게 하면 분할 열의 각 고유한 값 조합이 있는 모든 입력 행이 정확히 하나의 UDTF 클래스 인스턴스에서 사용됩니다.

이 절은 단순 열 참조 PARTITION BY 외에도 입력 테이블 열을 기반으로 하는 임의의 식을 허용합니다. 예를 들어 문자열을 LENGTH 지정하거나, 날짜에서 월을 추출하거나, 두 값을 연결할 수 있습니다.

모든 입력 행을 정확히 하나의 UDTF 클래스 인스턴스에서 사용해야 하는 파티션을 하나만 요청하는 대신 PARTITION BY 지정할 WITH SINGLE PARTITION 수도 있습니다.

각 파티션 내에서 UDTF의 메서드에서 입력 행을 사용할 때 입력 행의 eval 필수 순서를 선택적으로 지정할 수 있습니다. 이렇게 하려면 위에서 설명한 ORDER BY 또는 WITH SINGLE PARTITION 절 다음에 절을 PARTITION BY 입력합니다.

예를 들어 다음 UDTF를 고려합니다.

from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="a: string, b: int")
class FilterUDTF:
  def __init__(self):
    self.key = ""
    self.max = 0

  def eval(self, row: Row):
    self.key = row["a"]
    self.max = max(self.max, row["b"])

  def terminate(self):
    yield self.key, self.max

spark.udtf.register("filter_udtf", FilterUDTF)

입력 테이블을 통해 UDTF를 호출할 때 다음과 같은 방법으로 분할 옵션을 지정할 수 있습니다.

-- Create an input table with some example values.
DROP TABLE IF EXISTS values_table;
CREATE TABLE values_table (a STRING, b INT);
INSERT INTO values_table VALUES ('abc', 2), ('abc', 4), ('def', 6), ('def', 8)";
SELECT * FROM values_table;
+-------+----+
|     a |  b |
+-------+----+
| "abc" | 2  |
| "abc" | 4  |
| "def" | 6  |
| "def" | 8  |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique value in the `a` column are processed by the same
-- instance of the UDTF class. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY a ORDER BY b) ORDER BY 1;
+-------+----+
|     a |  b |
+-------+----+
| "abc" | 4  |
| "def" | 8  |
+-------+----+

-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique result of evaluating the "LENGTH(a)" expression are
-- processed by the same instance of the UDTF class. Within each partition, the rows are ordered
-- by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY LENGTH(a) ORDER BY b) ORDER BY 1;
+-------+---+
|     a | b |
+-------+---+
| "def" | 8 |
+-------+---+
-- Query the UDTF with the input table as an argument and a directive to consider all the input
-- rows in one single partition such that exactly one instance of the UDTF class consumes all of
-- the input rows. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) WITH SINGLE PARTITION ORDER BY b) ORDER BY 1;
+-------+----+
|     a |  b |
+-------+----+
| "def" | 8 |
+-------+----+

메서드에서 입력 행 분할 지정 analyze

SQL 쿼리에서 UDTF를 호출할 때 입력 테이블을 분할하는 위의 각 방법에 대해 UDTF의 analyze 메서드가 동일한 분할 방법을 자동으로 지정하는 해당 방법이 있습니다.

  • UDTF를 UDTF로 SELECT * FROM udtf(TABLE(t) PARTITION BY a)호출하는 대신, 메서드를 analyze 업데이트하여 필드를 partitionBy=[PartitioningColumn("a")] SELECT * FROM udtf(TABLE(t))설정하고 .
  • 동일한 토큰을 사용하여 SQL 쿼리에서 지정하는 TABLE(t) WITH SINGLE PARTITION ORDER BY b 대신 필드를 withSinglePartition=true orderBy=[OrderingColumn("b")] 설정한 다음 전달할 TABLE(t)수 있습니다analyze.
  • SQL 쿼리를 전달하는 TABLE(SELECT a FROM t) 대신 집합 select=[SelectedColumn("a")] 을 만들고 analyze 그냥 전달할 TABLE(t)수 있습니다.

다음 예제 analyze 에서는 상수 출력 스키마를 반환하고, 입력 테이블에서 열의 하위 집합을 선택하고, 입력 테이블이 열 값 date 에 따라 여러 UDTF 호출에서 분할되도록 지정합니다.

@staticmethod
def analyze(*args) -> AnalyzeResult:
  """
  The input table will be partitioned across several UDTF calls based on the monthly
  values of each `date` column. The rows within each partition will arrive ordered by the `date`
  column. The UDTF will only receive the `date` and `word` columns from the input table.
  """
  from pyspark.sql.functions import (
    AnalyzeResult,
    OrderingColumn,
    PartitioningColumn,
  )

  assert len(args) == 1, "This function accepts one argument only"
  assert args[0].isTable, "Only table arguments are supported"
  return AnalyzeResult(
    schema=StructType()
      .add("month", DateType())
      .add('longest_word", IntegerType()),
    partitionBy=[
      PartitioningColumn("extract(month from date)")],
    orderBy=[
      OrderingColumn("date")],
    select=[
      SelectedColumn("date"),
      SelectedColumn(
        name="length(word),
        alias="length_word")])