다음을 통해 공유


pandas 함수 API

pandas 함수 API를 사용하면 pandas 인스턴스를 가져와 PySpark DataFrame에 출력하는 Python 네이티브 함수를 직접 적용할 수 있습니다. pandas 사용자 정의 함수와 마찬가지로 함수 API는 Apache Arrow를 사용하여 데이터를 전송하고 pandas를 사용하여 데이터 작업을 수행합니다. 그러나 Python 형식 힌트는 pandas 함수 API에서 선택 사항입니다.

pandas 함수 API에는 다음 세 가지 유형이 있습니다.

  • 그룹화된 맵
  • 지도
  • 공동 그룹화된 맵

pandas 함수 API는 pandas UDF 실행에서 사용하는 것과 동일한 내부 논리를 활용합니다. PyArrow, 지원되는 SQL 형식 및 구성과 같은 특성을 공유합니다.

자세한 내용은 Apache Spark 3.0의 예정된 릴리스에서 새 Pandas UDF 및 Python 유형 힌트 블로그 게시물을 참조하세요.

그룹화된 맵

를 사용하여 groupBy().applyInPandas() 그룹화된 데이터를 변환하여 "split-apply-combine" 패턴을 구현합니다. Split-apply-combine은 다음 세 단계로 구성됩니다.

  • 를 사용하여 DataFrame.groupBy데이터를 그룹으로 분할합니다.
  • 각 그룹에 함수를 적용합니다. 함수의 입력 및 출력은 모두 pandas.DataFrame입니다. 입력 데이터에는 각 그룹에 대한 모든 행과 열이 포함됩니다.
  • 결과를 새 DataFrame로 결합합니다.

를 사용 groupBy().applyInPandas()하려면 다음을 정의해야 합니다.

  • 각 그룹에 대한 계산을 정의하는 Python 함수
  • StructType 출력의 스키마를 정의하는 개체 또는 문자열DataFrame

반환 pandas.DataFrame 된 의 열 레이블은 문자열로 지정된 경우 정의된 출력 스키마의 필드 이름과 일치하거나 문자열이 아닌 경우 위치별로 필드 데이터 형식과 일치해야 합니다(예: 정수 인덱스). pandas를 참조하세요. 를 생성할 때 열에 레이블을 지정하는 방법에 대한 DataFrame입니다pandas.DataFrame.

그룹에 대한 모든 데이터는 함수가 적용되기 전에 메모리에 로드됩니다. 특히 그룹 크기가 기울어진 경우 메모리 부족 예외가 발생할 수 있습니다. maxRecordsPerBatch에 대한 구성은 그룹에 적용되지 않으며 그룹화된 데이터가 사용 가능한 메모리에 맞는지 확인하는 것은 사용자에게 달려 있습니다.

다음 예제에서는 를 사용하여 groupby().apply() 그룹의 각 값에서 평균을 뺍니다.

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

def subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id|   v|
# +---+----+
# |  1|-0.5|
# |  1| 0.5|
# |  2|-3.0|
# |  2|-1.0|
# |  2| 4.0|
# +---+----+

자세한 사용법은 pyspark.sql.GroupedData.applyInPandas를 참조하세요.

지도

의 반복기를 pandas.DataFrame 현재 PySpark DataFrame을 나타내는 의 pandas.DataFrame 다른 반복기로 변환하고 결과를 PySpark DataFrame으로 반환하기 위해 pandas 인스턴스 DataFrame.mapInPandas() 를 사용하여 맵 작업을 수행합니다.

기본 함수는 의 pandas.DataFrame반복기를 사용하고 출력합니다. Series to Series와 같은 일부 pandas UDF와 달리 임의 길이의 출력을 반환할 수 있습니다.

다음 예제에서는 를 사용하는 mapInPandas()방법을 보여줍니다.

df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

def filter_func(iterator):
    for pdf in iterator:
        yield pdf[pdf.id == 1]

df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# |  1| 21|
# +---+---+

자세한 사용법은 pyspark.sql.DataFrame.mapInPandas를 참조하세요.

공동 그룹화된 맵

pandas 인스턴스를 사용하여 공동 그룹화된 맵 작업의 경우 를 사용하여 DataFrame.groupby().cogroup().applyInPandas() 공통 키로 두 PySpark DataFrames를 공동 그룹화한 다음, 다음과 같이 각 공동 그룹에 Python 함수를 적용합니다.

  • 키를 공유하는 각 DataFrame의 그룹이 함께 그룹화되도록 데이터를 섞습니다.
  • 각 공동 그룹에 함수를 적용합니다. 함수의 입력은 2 pandas.DataFrame 입니다(키를 나타내는 선택적 튜플 포함). 함수의 출력은 입니다 pandas.DataFrame.
  • pandas.DataFrame모든 그룹의 를 새 PySpark DataFrame로 결합합니다.

를 사용 groupBy().cogroup().applyInPandas()하려면 다음을 정의해야 합니다.

  • 각 공동 그룹에 대한 계산을 정의하는 Python 함수입니다.
  • StructType 출력 PySpark DataFrame의 스키마를 정의하는 개체 또는 문자열입니다.

반환 pandas.DataFrame 된 의 열 레이블은 문자열로 지정된 경우 정의된 출력 스키마의 필드 이름과 일치하거나 문자열이 아닌 경우 위치별로 필드 데이터 형식과 일치해야 합니다(예: 정수 인덱스). pandas를 참조하세요. 를 생성할 때 열에 레이블을 지정하는 방법에 대한 DataFrame입니다pandas.DataFrame.

함수가 적용되기 전에 공동 그룹에 대한 모든 데이터가 메모리에 로드됩니다. 특히 그룹 크기가 기울어진 경우 메모리 부족 예외가 발생할 수 있습니다. maxRecordsPerBatch에 대한 구성은 적용되지 않으며, 공동 그룹화된 데이터가 사용 가능한 메모리에 맞는지 확인하는 것은 사용자에게 달려 있습니다.

다음 예제에서는 를 사용하여 groupby().cogroup().applyInPandas() 두 데이터 세트 간에 를 asof join 수행하는 방법을 보여 줍니다.

import pandas as pd

df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ("time", "id", "v1"))

df2 = spark.createDataFrame(
    [(20000101, 1, "x"), (20000101, 2, "y")],
    ("time", "id", "v2"))

def asof_join(l, r):
    return pd.merge_asof(l, r, on="time", by="id")

df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
    asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# |    time| id| v1| v2|
# +--------+---+---+---+
# |20000101|  1|1.0|  x|
# |20000102|  1|3.0|  x|
# |20000101|  2|2.0|  y|
# |20000102|  2|4.0|  y|
# +--------+---+---+---+

자세한 사용법은 pyspark.sql.PandasCogroupedOps.applyInPandas를 참조하세요.