다음을 통해 공유


pandas 함수 API들

pandas 함수 API를 사용하면 pySpark DataFrame에 pandas 인스턴스를 가져와 출력하는 Python 네이티브 함수를 직접 적용할 수 있습니다. pandas 사용자 정의 함수와 유사하게, 함수 API도 Apache Arrow를 사용하여 데이터를 전달하고 pandas로 데이터 작업을 수행합니다. 그러나 Python 형식 힌트는 pandas 함수 API에서 선택 사항입니다.

세 가지 유형의 pandas 함수 API가 있습니다.

  • 그룹화된 지도
  • 지도
  • 공동 그룹화된 맵

pandas 함수 API는 pandas UDF 실행에서 사용하는 것과 동일한 내부 논리를 활용합니다. PyArrow, 지원되는 SQL 형식 및 구성과 같은 특성을 공유합니다.

자세한 내용은 Apache Spark 3.0예정된 릴리스에서 새 Pandas UDF 및 Python 유형 힌트를 블로그 게시물을 참조하세요.

그룹화된 지도

그룹화된 데이터를 groupBy().applyInPandas()을 사용하여 변환하고, 이로써 "split-apply-combine" 패턴을 구현합니다. 분할-적용-결합은 다음 세 단계로 구성됩니다.

  • DataFrame.groupBy사용하여 데이터를 그룹으로 분할합니다.
  • 각 그룹에 함수를 적용합니다. 함수의 입력 및 출력은 모두 pandas.DataFrame. 입력 데이터에는 각 그룹에 대한 모든 행과 열이 포함됩니다.
  • 결과를 DataFrame에 새롭게 결합합니다.

groupBy().applyInPandas()사용하려면 다음을 정의해야 합니다.

  • 각 그룹에 대한 계산을 정의하는 Python 함수
  • 출력 DataFrame 스키마를 정의하는 StructType 개체 또는 문자열입니다.

반환된 pandas.DataFrame 열 레이블은 문자열로 지정된 경우 정의된 출력 스키마의 필드 이름과 일치하거나 문자열이 아닌 경우 위치별 필드 데이터 형식(예: 정수 인덱스)을 일치시켜야 합니다. pandas.DataFrame을 참조하여 pandas.DataFrame생성할 때 열에 레이블을 지정하는 방법을 확인하십시오.

그룹의 모든 데이터는 함수가 적용되기 전에 메모리에 로드됩니다. 특히 그룹 크기가 기울어진 경우 메모리 부족 예외가 발생할 수 있습니다. maxRecordsPerBatch 대한 구성은 그룹에 적용되지 않으며 그룹화된 데이터가 사용 가능한 메모리에 맞는지 확인하는 것은 사용자에게 달려 있습니다.

다음 예제에서는 groupby().apply()를 사용하여 그룹의 각 값에서 평균을 뺍니다.

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

def subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id|   v|
# +---+----+
# |  1|-0.5|
# |  1| 0.5|
# |  2|-3.0|
# |  2|-1.0|
# |  2| 4.0|
# +---+----+

자세한 사용법은 pyspark.sql.GroupedData.applyInPandas참조하세요.

지도

DataFrame.mapInPandas() 반복기를 현재 PySpark DataFrame을 나타내는 다른 pandas.DataFrame 반복기로 변환하고 결과를 PySpark DataFrame으로 반환하기 위해 pandas.DataFrame pandas 인스턴스로 맵 작업을 수행합니다.

기본 함수는 pandas.DataFrame반복자를 입력받고 출력합니다. Series to Series와 같은 일부 pandas UDF와 달리 임의 길이의 출력을 반환할 수 있습니다.

다음 예제에서는 mapInPandas()사용하는 방법을 보여줍니다.

df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

def filter_func(iterator):
    for pdf in iterator:
        yield pdf[pdf.id == 1]

df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# |  1| 21|
# +---+---+

자세한 사용법은 pyspark.sql.DataFrame.mapInPandas참조하세요.

공동 그룹화된 맵

pandas 인스턴스를 사용하여 코그룹화된 맵 작업을 수행하려면 DataFrame.groupby().cogroup().applyInPandas()을(를) 사용하여 두 PySpark DataFrame을(를) 공통 키로 코그룹화한 다음, 각 코그룹에 Python 함수를 적용합니다.

  • 키를 공유하는 각 DataFrame의 그룹이 함께 그룹화되도록 데이터를 섞습니다.
  • 각 공동 그룹에 함수를 적용합니다. 함수의 입력은 두 개의 pandas.DataFrame(키를 나타내는 선택적 튜플 포함)입니다. 함수의 출력은 pandas.DataFrame입니다.
  • 모든 그룹의 pandas.DataFrame을 결합하여 새 PySpark DataFrame을 만듭니다.

groupBy().cogroup().applyInPandas()사용하려면 다음을 정의해야 합니다.

  • 각 공동 그룹에 대한 계산을 정의하는 Python 함수입니다.
  • 출력 PySpark DataFrame스키마를 정의하는 StructType 개체 또는 문자열입니다.

반환된 pandas.DataFrame 열 레이블은 문자열로 지정된 경우 정의된 출력 스키마의 필드 이름과 일치하거나 문자열이 아닌 경우 위치별 필드 데이터 형식(예: 정수 인덱스)을 일치시켜야 합니다. pandas.DataFrame을 참조하십시오. DataFrame을 생성할 때 열에 레이블을 지정하는 방법에 대한 pandas.DataFrame.

함수가 적용되기 전에 코 그룹에 대한 모든 데이터가 메모리에 로드됩니다. 특히 그룹 크기가 기울어진 경우 메모리 부족 예외가 발생할 수 있습니다. maxRecordsPerBatch 대한 구성은 적용되지 않으며, 공동 그룹화된 데이터가 사용 가능한 메모리에 맞는지 확인하는 것은 사용자에게 달려 있습니다.

다음 예제에서는 groupby().cogroup().applyInPandas() 사용하여 두 데이터 세트 간의 asof join 수행하는 방법을 보여 줍니다.

import pandas as pd

df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ("time", "id", "v1"))

df2 = spark.createDataFrame(
    [(20000101, 1, "x"), (20000101, 2, "y")],
    ("time", "id", "v2"))

def asof_join(l, r):
    return pd.merge_asof(l, r, on="time", by="id")

df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
    asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# |    time| id| v1| v2|
# +--------+---+---+---+
# |20000101|  1|1.0|  x|
# |20000102|  1|3.0|  x|
# |20000101|  2|2.0|  y|
# |20000102|  2|4.0|  y|
# +--------+---+---+---+

자세한 사용법은 pyspark.sql.PandasCogroupedOps.applyInPandas참조하세요.