共用方式為


pandas 函式 API

pandas 函式 API 使您能直接將 Python 原生函式套用於 pandas 實例,並在 PySpark DataFrame 中進行輸出。 與 pandas 使用者定義函式類似,函式 API 也會使用 Apache Arrow 來傳輸數據和 pandas 來處理數據:不過,在 pandas 函式 API 中,Python 類型提示是選擇性的。

pandas 函式 API 有這三種類型:

  • 群組地圖
  • 地圖
  • 聯合分組映射

pandas API 函式 利用與 pandas UDF 執行相同的內部邏輯。 它們會共用 PyArrow、支援的 SQL 類型和組態等特性。

如需詳細資訊,請參閱即將發行的 Apache Spark 3.0中的部落格文章 新的 Pandas UDF 和 Python 類型提示。

群組地圖

您可以使用 groupBy().applyInPandas() 來轉換群組數據,以實作「split-apply-combine」模式。 分割-應用-合併包含三個步驟:

  • 使用 DataFrame.groupBy將數據分割成群組。
  • 在每個群組上套用函式。 函式的輸入和輸出都是 pandas.DataFrame。 輸入數據包含每個群組的所有數據行和數據列。
  • 將結果合併成新的 DataFrame

若要使用 groupBy().applyInPandas(),您必須定義下列事項:

  • 定義每個群組計算的 Python 函式
  • 定義輸出架構的 StructType 物件或字串 DataFrame

傳回 pandas.DataFrame 的列標籤必須符合定義的輸出架構中的欄位名稱(如果指定為字串),或者在不為字串時按照位置符合欄位資料類型,例如整數索引。 請參閱 pandas.DataFrame,瞭解如何在建構 pandas.DataFrame時標記欄位。

套用函式之前,群組的所有數據都會載入記憶體中。 這可能會導致記憶體不足的例外狀況,特別是當群組大小扭曲時。 maxRecordsPerBatch 的設定不會套用在群組上,因此您必須確定群組數據符合可用的記憶體。

下列範例示範如何使用 groupby().apply(),從群組中的每個值減去平均值。

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

def subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id|   v|
# +---+----+
# |  1|-0.5|
# |  1| 0.5|
# |  2|-3.0|
# |  2|-1.0|
# |  2| 4.0|
# +---+----+

如需詳細的使用方式,請參閱 pyspark.sql.GroupedData.applyInPandas

地圖

您可以藉由 DataFrame.mapInPandas() 執行 pandas 實例的映射操作,將 pandas.DataFrame 迭代器轉換為另一個代表目前 PySpark DataFrame 的 pandas.DataFrame 迭代器,並以 PySpark DataFrame 傳回結果。

底層函式會接受並輸出 pandas.DataFrame的迭代器。 它可以傳回任意長度的輸出,與某些 pandas UDF 相較之下,例如數列到數列。

下列範例示範如何使用 mapInPandas()

df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

def filter_func(iterator):
    for pdf in iterator:
        yield pdf[pdf.id == 1]

df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# |  1| 21|
# +---+---+

如需詳細的使用方式,請參閱 pyspark.sql.DataFrame.mapInPandas

聯合分組映射

針對使用 pandas 實例的共同群組映射操作,請使用 DataFrame.groupby().cogroup().applyInPandas() 將兩個 PySpark DataFrame透過共同索引鍵共同群組在一起,然後將 Python 函式應用於每個共同群組,如下所示:

  • 將數據重新排列,使共用鍵的每個 DataFrame 群組能夠聚集在一起。
  • 將函式套用至每個共同群組。 函式的輸入是兩個 pandas.DataFrame(和一個可選的用來表示鍵的元組)。 函式的輸出是 pandas.DataFrame
  • 將所有群組中的 pandas.DataFrame合併為新的 PySpark DataFrame

若要使用 groupBy().cogroup().applyInPandas(),您必須定義下列事項:

  • Python 函式,定義每個共同群組的計算。
  • StructType 物件或字串,定義輸出 PySpark DataFrame的架構。

傳回 pandas.DataFrame 的列標籤必須符合定義的輸出架構中的欄位名稱(如果指定為字串),或者在不為字串時按照位置符合欄位資料類型,例如整數索引。 請參閱 pandas.DataFrame,瞭解如何在建構 pandas.DataFrame時標記欄位。

在套用函式之前,會將共同群組的所有數據載入記憶體中。 這可能會導致記憶體不足的例外狀況,特別是當群組大小扭曲時。 maxRecordsPerBatch 的設定未被套用,您必須確保共組的數據能符合可用的記憶體。

下列範例示範如何使用 groupby().cogroup().applyInPandas() 在兩個數據集之間執行 asof join

import pandas as pd

df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ("time", "id", "v1"))

df2 = spark.createDataFrame(
    [(20000101, 1, "x"), (20000101, 2, "y")],
    ("time", "id", "v2"))

def asof_join(l, r):
    return pd.merge_asof(l, r, on="time", by="id")

df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
    asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# |    time| id| v1| v2|
# +--------+---+---+---+
# |20000101|  1|1.0|  x|
# |20000102|  1|3.0|  x|
# |20000101|  2|2.0|  y|
# |20000102|  2|4.0|  y|
# +--------+---+---+---+

如需詳細的使用方式,請參閱 pyspark.sql.PandasCogroupedOps.applyInPandas