Pandas の関数 API
pandas 関数 API を使用すると、Pandas インスタンスを受け取って PySpark DataFrame に出力する Python ネイティブ関数を直接適用できます。 pandas ユーザー定義関数と同様に、関数 API では、Apache Arrow を使用してデータを転送し、pandas を使用してデータを操作することもできます。ただし、Pandas 関数 API では Python 型ヒントは省略可能です。
pandas 関数 API には、次の 3 種類があります。
- グループ化されたマップ
- 地図
- グループ化されたマップ
pandas 関数 API は、pandas UDF の実行で使用されるのと同じ内部ロジックを利用します。 PyArrow、サポートされている SQL 型、構成などの特性を共有します。
詳細については、Apache Spark 3.0の今後のリリース
グループ化されたマップ
groupBy().applyInPandas()
を使用してグループ化されたデータを変換し、"split-apply-combine" パターンを実装します。 「Split-apply-combine」は、3 つのステップで成り立っています。
DataFrame.groupBy
を使用して、データをグループに分割します。- 各グループに関数を適用します。 関数の入力と出力はどちらも
pandas.DataFrame
。 入力データには、各グループのすべての行と列が含まれます。 - 結果を新しい
DataFrame
に結合します。
groupBy().applyInPandas()
を使用するには、次を定義する必要があります。
- 各グループの計算を定義する Python 関数
- 出力
DataFrame
のスキーマを定義するStructType
オブジェクトまたは文字列
返される pandas.DataFrame
の列ラベルは、文字列として指定されている場合は、定義された出力スキーマ内のフィールド名と一致するか、整数インデックスなどの文字列でない場合は、フィールドのデータ型と位置で一致する必要があります。 「pandas」を参照してください。DataFramepandas.DataFrame
を構築するときに列にラベルを付ける方法について説明します。
関数が適用される前に、グループのすべてのデータがメモリに読み込まれます。 これにより、特にグループ のサイズが偏っている場合に、メモリ不足の例外が発生する可能性があります。 maxRecordsPerBatch の構成はグループには適用されず、グループ化されたデータが使用可能なメモリに収まるようにする必要があります。
次の例は、groupby().apply()
を使用して、グループ内の各値から平均値を減算する方法を示しています。
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=v - v.mean())
df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id| v|
# +---+----+
# | 1|-0.5|
# | 1| 0.5|
# | 2|-3.0|
# | 2|-1.0|
# | 2| 4.0|
# +---+----+
詳しくは、pyspark.sql.GroupedData.applyInPandasをご覧ください。
地図
pandas.DataFrame
の反復子を現在の PySpark DataFrame を表し、結果を PySpark DataFrame として返す pandas.DataFrame
の別の反復子に変換するには、DataFrame.mapInPandas()
して pandas インスタンスでマップ操作を実行します。
基になる関数は、pandas.DataFrame
の反復子を受け取って出力します。 "系列から系列" などの一部の Pandas UDF とは対照的に、任意の長さの出力を返すことができます。
次の例は、mapInPandas()
の使用方法を示しています。
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
def filter_func(iterator):
for pdf in iterator:
yield pdf[pdf.id == 1]
df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# | 1| 21|
# +---+---+
詳しい使用方法については、pyspark.sql.DataFrame.mapInPandasを参照してください。
グループ化されたマップ
pandas インスタンスを使用したグループ化されたマップ操作の場合は、DataFrame.groupby().cogroup().applyInPandas()
を使用して 2 つの PySpark DataFrame
を共通キーでグループ化し、次に示すように各グループに Python 関数を適用します。
- キーを共有する各 DataFrame のグループがグループ化されるように、データをシャッフルします。
- 各コグループに関数を適用します。 関数の入力は 2 つの
pandas.DataFrame
です (キーを表すタプルは省略可能)。 関数の出力はpandas.DataFrame
です。 - すべてのグループの
pandas.DataFrame
を新しい PySparkDataFrame
に結合します。
groupBy().cogroup().applyInPandas()
を使用するには、次を定義する必要があります。
- 各コグループの計算を定義する Python 関数。
- 出力 PySpark
DataFrame
のスキーマを定義するStructType
オブジェクトまたは文字列。
返される pandas.DataFrame
の列ラベルは、文字列として指定されている場合は、定義された出力スキーマ内のフィールド名と一致するか、整数インデックスなどの文字列でない場合は、フィールドのデータ型と位置で一致する必要があります。 「pandas」を参照してください。DataFramepandas.DataFrame
を構築するときに列にラベルを付ける方法について説明します。
コグループのすべてのデータは、関数が適用される前にメモリに読み込まれます。 これにより、特にグループ のサイズが偏っている場合に、メモリ不足の例外が発生する可能性があります。 maxRecordsPerBatch の構成は適用されず、グループ化されたデータが使用可能なメモリに収まるようにする必要があります。
次の例は、groupby().cogroup().applyInPandas()
を使用して 2 つのデータセット間で asof join
を実行する方法を示しています。
import pandas as pd
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("time", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("time", "id", "v2"))
def asof_join(l, r):
return pd.merge_asof(l, r, on="time", by="id")
df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# | time| id| v1| v2|
# +--------+---+---+---+
# |20000101| 1|1.0| x|
# |20000102| 1|3.0| x|
# |20000101| 2|2.0| y|
# |20000102| 2|4.0| y|
# +--------+---+---+---+
詳細については、pyspark.sql.PandasCogroupedOps.applyInPandasを参照してください。