Condividi tramite


"Funzioni API di pandas"

Le API della funzione pandas consentono di applicare direttamente una funzione nativa Python che accetta e restituisce istanze pandas a un dataframe PySpark. Analogamente a pandas funzioni definite dall'utente, le API di funzione usano anche Apache Arrow per trasferire dati e pandas per lavorare con i dati; Tuttavia, gli hint per il tipo Python sono facoltativi nelle API della funzione pandas.

Esistono tre tipi di API per le funzioni pandas:

  • Mappa raggruppata
  • Mappa
  • Mappa co-raggruppata

Le API delle funzioni pandas sfruttano la stessa logica interna utilizzata dall'esecuzione delle UDF pandas. Condividono caratteristiche come PyArrow, tipi SQL supportati e configurazioni.

Per ulteriori informazioni, vedere l'articolo sul blog , UDF Pandas e indicazioni di tipo Python nella prossima versione di Apache Spark 3.0.

Mappa raggruppata

Trasformi i tuoi dati raggruppati usando groupBy().applyInPandas() per implementare il pattern "split-apply-combine". Split-apply-combine è costituito da tre passaggi:

  • Suddividere i dati in gruppi usando DataFrame.groupBy.
  • Applicare una funzione a ogni gruppo. L'input e l'output della funzione sono entrambi pandas.DataFrame. I dati di input contengono tutte le righe e le colonne per ogni gruppo.
  • Combinare i risultati in un nuovo DataFrame.

Per usare groupBy().applyInPandas(), è necessario definire quanto segue:

  • Funzione Python che definisce il calcolo per ogni gruppo
  • Un oggetto StructType o una stringa che definisce lo schema dell'output DataFrame

Le etichette di colonna del pandas.DataFrame restituito devono corrispondere ai nomi dei campi nello schema di output definito se specificati come stringhe oppure devono corrispondere ai tipi di dati di campo in base alla posizione, se non stringhe, ad esempio indici interi. Vedere pandas.DataFrame per sapere come etichettare le colonne durante la costruzione di un pandas.DataFrame.

Tutti i dati per un gruppo vengono caricati in memoria prima dell'applicazione della funzione. Ciò può causare eccezioni di memoria insufficiente, soprattutto se le dimensioni del gruppo sono asimmetriche. La configurazione per maxRecordsPerBatch non viene applicata ai gruppi ed è necessario assicurarsi che i dati raggruppati si adattino alla memoria disponibile.

Nell'esempio seguente viene illustrato come usare groupby().apply() per sottrarre la media da ogni valore del gruppo.

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

def subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id|   v|
# +---+----+
# |  1|-0.5|
# |  1| 0.5|
# |  2|-3.0|
# |  2|-1.0|
# |  2| 4.0|
# +---+----+

Per informazioni dettagliate sull'utilizzo, vedere pyspark.sql.GroupedData.applyInPandas.

Mappa

È possibile eseguire operazioni di mapping con istanze pandas DataFrame.mapInPandas() per trasformare un iteratore di pandas.DataFrame in un altro iteratore di pandas.DataFrame che rappresenta il dataframe PySpark corrente e restituisce il risultato come dataframe PySpark.

La funzione sottostante accetta e restituisce un iteratore di pandas.DataFrame. Può restituire l'output di lunghezza arbitraria, a differenza di alcune UDF pandas, ad esempio Series to Series.

Nell'esempio seguente viene illustrato come usare mapInPandas():

df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

def filter_func(iterator):
    for pdf in iterator:
        yield pdf[pdf.id == 1]

df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# |  1| 21|
# +---+---+

Per informazioni dettagliate sull'utilizzo, vedere pyspark.sql.DataFrame.mapInPandas.

Mappa co-raggruppata

Per le operazioni di mappa raggruppate con istanze pandas, usare DataFrame.groupby().cogroup().applyInPandas() per raggruppare due istanze PySpark DataFramein base a una chiave comune e quindi applicare una funzione Python a ogni cogroup, come illustrato di seguito.

  • Eseguire lo shuffling dei dati in modo che i gruppi di ogni DataFrame che condividono una chiave siano cogruppati insieme.
  • Applicare una funzione a ogni cogroup. L'input della funzione è due pandas.DataFrame (con una tupla facoltativa che rappresenta la chiave). Il risultato della funzione è un pandas.DataFrame.
  • Combina i pandas.DataFrameda tutti i gruppi in un nuovo PySpark DataFrame.

Per usare groupBy().cogroup().applyInPandas(), è necessario definire quanto segue:

  • Funzione Python che definisce il calcolo per ogni cogroup.
  • Oggetto StructType o stringa che definisce lo schema dell'output PySpark DataFrame.

Le etichette di colonna del pandas.DataFrame restituito devono corrispondere ai nomi dei campi nello schema di output definito se specificati come stringhe oppure devono corrispondere ai tipi di dati di campo in base alla posizione, se non stringhe, ad esempio indici interi. Vedere pandas.DataFrame per sapere come etichettare le colonne durante la costruzione di un pandas.DataFrame.

Tutti i dati per un cogroup vengono caricati in memoria prima dell'applicazione della funzione. Ciò può causare eccezioni di memoria insufficiente, soprattutto se le dimensioni del gruppo sono asimmetriche. La configurazione per maxRecordsPerBatch non viene applicata ed è necessario assicurarsi che i dati raggruppati si adattino alla memoria disponibile.

Nell'esempio seguente viene illustrato come usare groupby().cogroup().applyInPandas() per eseguire un asof join tra due set di dati.

import pandas as pd

df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ("time", "id", "v1"))

df2 = spark.createDataFrame(
    [(20000101, 1, "x"), (20000101, 2, "y")],
    ("time", "id", "v2"))

def asof_join(l, r):
    return pd.merge_asof(l, r, on="time", by="id")

df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
    asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# |    time| id| v1| v2|
# +--------+---+---+---+
# |20000101|  1|1.0|  x|
# |20000102|  1|3.0|  x|
# |20000101|  2|2.0|  y|
# |20000102|  2|4.0|  y|
# +--------+---+---+---+

Per informazioni dettagliate sull'utilizzo, vedere pyspark.sql.PandasCogroupedOps.applyInPandas.