"Funzioni API di pandas"
Le API della funzione pandas consentono di applicare direttamente una funzione nativa Python che accetta e restituisce istanze pandas a un dataframe PySpark. Analogamente a pandas funzioni definite dall'utente, le API di funzione usano anche Apache Arrow per trasferire dati e pandas per lavorare con i dati; Tuttavia, gli hint per il tipo Python sono facoltativi nelle API della funzione pandas.
Esistono tre tipi di API per le funzioni pandas:
- Mappa raggruppata
- Mappa
- Mappa co-raggruppata
Le API delle funzioni pandas sfruttano la stessa logica interna utilizzata dall'esecuzione delle UDF pandas. Condividono caratteristiche come PyArrow, tipi SQL supportati e configurazioni.
Per ulteriori informazioni, vedere l'articolo sul blog , UDF Pandas e indicazioni di tipo Python nella prossima versione di Apache Spark 3.0.
Mappa raggruppata
Trasformi i tuoi dati raggruppati usando groupBy().applyInPandas()
per implementare il pattern "split-apply-combine". Split-apply-combine è costituito da tre passaggi:
- Suddividere i dati in gruppi usando
DataFrame.groupBy
. - Applicare una funzione a ogni gruppo. L'input e l'output della funzione sono entrambi
pandas.DataFrame
. I dati di input contengono tutte le righe e le colonne per ogni gruppo. - Combinare i risultati in un nuovo
DataFrame
.
Per usare groupBy().applyInPandas()
, è necessario definire quanto segue:
- Funzione Python che definisce il calcolo per ogni gruppo
- Un oggetto
StructType
o una stringa che definisce lo schema dell'outputDataFrame
Le etichette di colonna del pandas.DataFrame
restituito devono corrispondere ai nomi dei campi nello schema di output definito se specificati come stringhe oppure devono corrispondere ai tipi di dati di campo in base alla posizione, se non stringhe, ad esempio indici interi. Vedere pandas.DataFrame per sapere come etichettare le colonne durante la costruzione di un pandas.DataFrame
.
Tutti i dati per un gruppo vengono caricati in memoria prima dell'applicazione della funzione. Ciò può causare eccezioni di memoria insufficiente, soprattutto se le dimensioni del gruppo sono asimmetriche. La configurazione per maxRecordsPerBatch non viene applicata ai gruppi ed è necessario assicurarsi che i dati raggruppati si adattino alla memoria disponibile.
Nell'esempio seguente viene illustrato come usare groupby().apply()
per sottrarre la media da ogni valore del gruppo.
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=v - v.mean())
df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id| v|
# +---+----+
# | 1|-0.5|
# | 1| 0.5|
# | 2|-3.0|
# | 2|-1.0|
# | 2| 4.0|
# +---+----+
Per informazioni dettagliate sull'utilizzo, vedere pyspark.sql.GroupedData.applyInPandas.
Mappa
È possibile eseguire operazioni di mapping con istanze pandas DataFrame.mapInPandas()
per trasformare un iteratore di pandas.DataFrame
in un altro iteratore di pandas.DataFrame
che rappresenta il dataframe PySpark corrente e restituisce il risultato come dataframe PySpark.
La funzione sottostante accetta e restituisce un iteratore di pandas.DataFrame
. Può restituire l'output di lunghezza arbitraria, a differenza di alcune UDF pandas, ad esempio Series to Series.
Nell'esempio seguente viene illustrato come usare mapInPandas()
:
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
def filter_func(iterator):
for pdf in iterator:
yield pdf[pdf.id == 1]
df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# | 1| 21|
# +---+---+
Per informazioni dettagliate sull'utilizzo, vedere pyspark.sql.DataFrame.mapInPandas.
Mappa co-raggruppata
Per le operazioni di mappa raggruppate con istanze pandas, usare DataFrame.groupby().cogroup().applyInPandas()
per raggruppare due istanze PySpark DataFrame
in base a una chiave comune e quindi applicare una funzione Python a ogni cogroup, come illustrato di seguito.
- Eseguire lo shuffling dei dati in modo che i gruppi di ogni DataFrame che condividono una chiave siano cogruppati insieme.
- Applicare una funzione a ogni cogroup. L'input della funzione è due
pandas.DataFrame
(con una tupla facoltativa che rappresenta la chiave). Il risultato della funzione è unpandas.DataFrame
. - Combina i
pandas.DataFrame
da tutti i gruppi in un nuovo PySparkDataFrame
.
Per usare groupBy().cogroup().applyInPandas()
, è necessario definire quanto segue:
- Funzione Python che definisce il calcolo per ogni cogroup.
- Oggetto
StructType
o stringa che definisce lo schema dell'output PySparkDataFrame
.
Le etichette di colonna del pandas.DataFrame
restituito devono corrispondere ai nomi dei campi nello schema di output definito se specificati come stringhe oppure devono corrispondere ai tipi di dati di campo in base alla posizione, se non stringhe, ad esempio indici interi. Vedere pandas.DataFrame per sapere come etichettare le colonne durante la costruzione di un pandas.DataFrame
.
Tutti i dati per un cogroup vengono caricati in memoria prima dell'applicazione della funzione. Ciò può causare eccezioni di memoria insufficiente, soprattutto se le dimensioni del gruppo sono asimmetriche. La configurazione per maxRecordsPerBatch non viene applicata ed è necessario assicurarsi che i dati raggruppati si adattino alla memoria disponibile.
Nell'esempio seguente viene illustrato come usare groupby().cogroup().applyInPandas()
per eseguire un asof join
tra due set di dati.
import pandas as pd
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("time", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("time", "id", "v2"))
def asof_join(l, r):
return pd.merge_asof(l, r, on="time", by="id")
df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# | time| id| v1| v2|
# +--------+---+---+---+
# |20000101| 1|1.0| x|
# |20000102| 1|3.0| x|
# |20000101| 2|2.0| y|
# |20000102| 2|4.0| y|
# +--------+---+---+---+
Per informazioni dettagliate sull'utilizzo, vedere pyspark.sql.PandasCogroupedOps.applyInPandas.