pandas 함수 API
pandas 함수 API를 사용하면 pandas 인스턴스를 가져와 PySpark DataFrame에 출력하는 Python 네이티브 함수를 직접 적용할 수 있습니다. pandas 사용자 정의 함수와 마찬가지로 함수 API는 Apache Arrow를 사용하여 데이터를 전송하고 pandas를 사용하여 데이터 작업을 수행합니다. 그러나 Python 형식 힌트는 pandas 함수 API에서 선택 사항입니다.
pandas 함수 API에는 다음 세 가지 유형이 있습니다.
- 그룹화된 맵
- 지도
- 공동 그룹화된 맵
pandas 함수 API는 pandas UDF 실행에서 사용하는 것과 동일한 내부 논리를 활용합니다. PyArrow, 지원되는 SQL 형식 및 구성과 같은 특성을 공유합니다.
자세한 내용은 Apache Spark 3.0의 예정된 릴리스에서 새 Pandas UDF 및 Python 유형 힌트 블로그 게시물을 참조하세요.
그룹화된 맵
를 사용하여 groupBy().applyInPandas()
그룹화된 데이터를 변환하여 "split-apply-combine" 패턴을 구현합니다. Split-apply-combine은 다음 세 단계로 구성됩니다.
- 를 사용하여
DataFrame.groupBy
데이터를 그룹으로 분할합니다. - 각 그룹에 함수를 적용합니다. 함수의 입력 및 출력은 모두
pandas.DataFrame
입니다. 입력 데이터에는 각 그룹에 대한 모든 행과 열이 포함됩니다. - 결과를 새
DataFrame
로 결합합니다.
를 사용 groupBy().applyInPandas()
하려면 다음을 정의해야 합니다.
- 각 그룹에 대한 계산을 정의하는 Python 함수
-
StructType
출력의 스키마를 정의하는 개체 또는 문자열DataFrame
반환 pandas.DataFrame
된 의 열 레이블은 문자열로 지정된 경우 정의된 출력 스키마의 필드 이름과 일치하거나 문자열이 아닌 경우 위치별로 필드 데이터 형식과 일치해야 합니다(예: 정수 인덱스).
pandas를 참조하세요. 를 생성할 때 열에 레이블을 지정하는 방법에 대한 DataFrame입니다pandas.DataFrame
.
그룹에 대한 모든 데이터는 함수가 적용되기 전에 메모리에 로드됩니다. 특히 그룹 크기가 기울어진 경우 메모리 부족 예외가 발생할 수 있습니다. maxRecordsPerBatch에 대한 구성은 그룹에 적용되지 않으며 그룹화된 데이터가 사용 가능한 메모리에 맞는지 확인하는 것은 사용자에게 달려 있습니다.
다음 예제에서는 를 사용하여 groupby().apply()
그룹의 각 값에서 평균을 뺍니다.
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=v - v.mean())
df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id| v|
# +---+----+
# | 1|-0.5|
# | 1| 0.5|
# | 2|-3.0|
# | 2|-1.0|
# | 2| 4.0|
# +---+----+
자세한 사용법은 pyspark.sql.GroupedData.applyInPandas를 참조하세요.
지도
의 반복기를 pandas.DataFrame
현재 PySpark DataFrame을 나타내는 의 pandas.DataFrame
다른 반복기로 변환하고 결과를 PySpark DataFrame으로 반환하기 위해 pandas 인스턴스 DataFrame.mapInPandas()
를 사용하여 맵 작업을 수행합니다.
기본 함수는 의 pandas.DataFrame
반복기를 사용하고 출력합니다. Series to Series와 같은 일부 pandas UDF와 달리 임의 길이의 출력을 반환할 수 있습니다.
다음 예제에서는 를 사용하는 mapInPandas()
방법을 보여줍니다.
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
def filter_func(iterator):
for pdf in iterator:
yield pdf[pdf.id == 1]
df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# | 1| 21|
# +---+---+
자세한 사용법은 pyspark.sql.DataFrame.mapInPandas를 참조하세요.
공동 그룹화된 맵
pandas 인스턴스를 사용하여 공동 그룹화된 맵 작업의 경우 를 사용하여 DataFrame.groupby().cogroup().applyInPandas()
공통 키로 두 PySpark DataFrame
s를 공동 그룹화한 다음, 다음과 같이 각 공동 그룹에 Python 함수를 적용합니다.
- 키를 공유하는 각 DataFrame의 그룹이 함께 그룹화되도록 데이터를 섞습니다.
- 각 공동 그룹에 함수를 적용합니다. 함수의 입력은 2
pandas.DataFrame
입니다(키를 나타내는 선택적 튜플 포함). 함수의 출력은 입니다pandas.DataFrame
. -
pandas.DataFrame
모든 그룹의 를 새 PySparkDataFrame
로 결합합니다.
를 사용 groupBy().cogroup().applyInPandas()
하려면 다음을 정의해야 합니다.
- 각 공동 그룹에 대한 계산을 정의하는 Python 함수입니다.
-
StructType
출력 PySparkDataFrame
의 스키마를 정의하는 개체 또는 문자열입니다.
반환 pandas.DataFrame
된 의 열 레이블은 문자열로 지정된 경우 정의된 출력 스키마의 필드 이름과 일치하거나 문자열이 아닌 경우 위치별로 필드 데이터 형식과 일치해야 합니다(예: 정수 인덱스).
pandas를 참조하세요. 를 생성할 때 열에 레이블을 지정하는 방법에 대한 DataFrame입니다pandas.DataFrame
.
함수가 적용되기 전에 공동 그룹에 대한 모든 데이터가 메모리에 로드됩니다. 특히 그룹 크기가 기울어진 경우 메모리 부족 예외가 발생할 수 있습니다. maxRecordsPerBatch에 대한 구성은 적용되지 않으며, 공동 그룹화된 데이터가 사용 가능한 메모리에 맞는지 확인하는 것은 사용자에게 달려 있습니다.
다음 예제에서는 를 사용하여 groupby().cogroup().applyInPandas()
두 데이터 세트 간에 를 asof join
수행하는 방법을 보여 줍니다.
import pandas as pd
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("time", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("time", "id", "v2"))
def asof_join(l, r):
return pd.merge_asof(l, r, on="time", by="id")
df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# | time| id| v1| v2|
# +--------+---+---+---+
# |20000101| 1|1.0| x|
# |20000102| 1|3.0| x|
# |20000101| 2|2.0| y|
# |20000102| 2|4.0| y|
# +--------+---+---+---+
자세한 사용법은 pyspark.sql.PandasCogroupedOps.applyInPandas를 참조하세요.