pandas 함수 API들
pandas 함수 API를 사용하면 pySpark DataFrame에 pandas 인스턴스를 가져와 출력하는 Python 네이티브 함수를 직접 적용할 수 있습니다. pandas 사용자 정의 함수와 유사하게, 함수 API도 Apache Arrow를 사용하여 데이터를 전달하고 pandas로 데이터 작업을 수행합니다. 그러나 Python 형식 힌트는 pandas 함수 API에서 선택 사항입니다.
세 가지 유형의 pandas 함수 API가 있습니다.
- 그룹화된 지도
- 지도
- 공동 그룹화된 맵
pandas 함수 API는 pandas UDF 실행에서 사용하는 것과 동일한 내부 논리를 활용합니다. PyArrow, 지원되는 SQL 형식 및 구성과 같은 특성을 공유합니다.
자세한 내용은 Apache Spark 3.0예정된 릴리스에서 새 Pandas UDF 및 Python 유형 힌트를
그룹화된 지도
그룹화된 데이터를 groupBy().applyInPandas()
을 사용하여 변환하고, 이로써 "split-apply-combine" 패턴을 구현합니다. 분할-적용-결합은 다음 세 단계로 구성됩니다.
-
DataFrame.groupBy
사용하여 데이터를 그룹으로 분할합니다. - 각 그룹에 함수를 적용합니다. 함수의 입력 및 출력은 모두
pandas.DataFrame
. 입력 데이터에는 각 그룹에 대한 모든 행과 열이 포함됩니다. - 결과를
DataFrame
에 새롭게 결합합니다.
groupBy().applyInPandas()
사용하려면 다음을 정의해야 합니다.
- 각 그룹에 대한 계산을 정의하는 Python 함수
- 출력
DataFrame
스키마를 정의하는StructType
개체 또는 문자열입니다.
반환된 pandas.DataFrame
열 레이블은 문자열로 지정된 경우 정의된 출력 스키마의 필드 이름과 일치하거나 문자열이 아닌 경우 위치별 필드 데이터 형식(예: 정수 인덱스)을 일치시켜야 합니다.
pandas.DataFrame을 참조하여 pandas.DataFrame
생성할 때 열에 레이블을 지정하는 방법을 확인하십시오.
그룹의 모든 데이터는 함수가 적용되기 전에 메모리에 로드됩니다. 특히 그룹 크기가 기울어진 경우 메모리 부족 예외가 발생할 수 있습니다. maxRecordsPerBatch 대한 구성은 그룹에 적용되지 않으며 그룹화된 데이터가 사용 가능한 메모리에 맞는지 확인하는 것은 사용자에게 달려 있습니다.
다음 예제에서는 groupby().apply()
를 사용하여 그룹의 각 값에서 평균을 뺍니다.
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=v - v.mean())
df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id| v|
# +---+----+
# | 1|-0.5|
# | 1| 0.5|
# | 2|-3.0|
# | 2|-1.0|
# | 2| 4.0|
# +---+----+
자세한 사용법은 pyspark.sql.GroupedData.applyInPandas참조하세요.
지도
DataFrame.mapInPandas()
반복기를 현재 PySpark DataFrame을 나타내는 다른 pandas.DataFrame
반복기로 변환하고 결과를 PySpark DataFrame으로 반환하기 위해 pandas.DataFrame
pandas 인스턴스로 맵 작업을 수행합니다.
기본 함수는 pandas.DataFrame
반복자를 입력받고 출력합니다. Series to Series와 같은 일부 pandas UDF와 달리 임의 길이의 출력을 반환할 수 있습니다.
다음 예제에서는 mapInPandas()
사용하는 방법을 보여줍니다.
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
def filter_func(iterator):
for pdf in iterator:
yield pdf[pdf.id == 1]
df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# | 1| 21|
# +---+---+
자세한 사용법은 pyspark.sql.DataFrame.mapInPandas참조하세요.
공동 그룹화된 맵
pandas 인스턴스를 사용하여 코그룹화된 맵 작업을 수행하려면 DataFrame.groupby().cogroup().applyInPandas()
을(를) 사용하여 두 PySpark DataFrame
을(를) 공통 키로 코그룹화한 다음, 각 코그룹에 Python 함수를 적용합니다.
- 키를 공유하는 각 DataFrame의 그룹이 함께 그룹화되도록 데이터를 섞습니다.
- 각 공동 그룹에 함수를 적용합니다. 함수의 입력은 두 개의
pandas.DataFrame
(키를 나타내는 선택적 튜플 포함)입니다. 함수의 출력은pandas.DataFrame
입니다. - 모든 그룹의
pandas.DataFrame
을 결합하여 새 PySparkDataFrame
을 만듭니다.
groupBy().cogroup().applyInPandas()
사용하려면 다음을 정의해야 합니다.
- 각 공동 그룹에 대한 계산을 정의하는 Python 함수입니다.
- 출력 PySpark
DataFrame
스키마를 정의하는StructType
개체 또는 문자열입니다.
반환된 pandas.DataFrame
열 레이블은 문자열로 지정된 경우 정의된 출력 스키마의 필드 이름과 일치하거나 문자열이 아닌 경우 위치별 필드 데이터 형식(예: 정수 인덱스)을 일치시켜야 합니다.
pandas.DataFrame을 참조하십시오. DataFrame을 생성할 때 열에 레이블을 지정하는 방법에 대한 pandas.DataFrame
.
함수가 적용되기 전에 코 그룹에 대한 모든 데이터가 메모리에 로드됩니다. 특히 그룹 크기가 기울어진 경우 메모리 부족 예외가 발생할 수 있습니다. maxRecordsPerBatch 대한 구성은 적용되지 않으며, 공동 그룹화된 데이터가 사용 가능한 메모리에 맞는지 확인하는 것은 사용자에게 달려 있습니다.
다음 예제에서는 groupby().cogroup().applyInPandas()
사용하여 두 데이터 세트 간의 asof join
수행하는 방법을 보여 줍니다.
import pandas as pd
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("time", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("time", "id", "v2"))
def asof_join(l, r):
return pd.merge_asof(l, r, on="time", by="id")
df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# | time| id| v1| v2|
# +--------+---+---+---+
# |20000101| 1|1.0| x|
# |20000102| 1|3.0| x|
# |20000101| 2|2.0| y|
# |20000102| 2|4.0| y|
# +--------+---+---+---+
자세한 사용법은 pyspark.sql.PandasCogroupedOps.applyInPandas참조하세요.