Condividi tramite


Funzioni Pandas definite dall'utente

Una funzione definita dall'utente (UDF) pandas, nota anche come funzione definita dall'utente vettorializzata, è una funzione definita dall'utente che usa Apache Arrow per trasferire dati e pandas per lavorare con i dati. Le UDF pandas consentono operazioni vettoriali che possono aumentare le prestazioni fino a 100 volte rispetto alle UDF Python riga per riga.

Per informazioni di base, vedere il post di blog Nuove UDF pandas e hint per i tipi Python nella prossima versione di Apache Spark 3.0.

È possibile definire una funzione pandas definita dall'utente usando la parola chiave pandas_udf come elemento Decorator ed eseguire il wrapping della funzione con un hint per il tipo Python. Questo articolo descrive i diversi tipi di UDF pandas e illustra come usare le UDF pandas con hint di tipo.

UDF da serie a serie

Per vettorizzare le operazioni scalari, usare una UDF pandas da serie a serie. È possibile utilizzarle con API come select e withColumn.

La funzione Python deve accettare una serie pandas come input e restituire una serie pandas con la stessa lunghezza ed è necessario specificarli negli hint per il tipo Python. Spark esegue una funzione pandas definita dall'utente suddividendo le colonne in batch, chiamando la funzione per ogni batch come subset dei dati, quindi concatenando i risultati.

L'esempio seguente illustra come creare una UDF pandas che calcola il prodotto di 2 colonne.

import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

# Declare the function and create the UDF
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
    return a * b

multiply = pandas_udf(multiply_func, returnType=LongType())

# The function for a pandas_udf should be able to execute with local pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
# 0    1
# 1    4
# 2    9
# dtype: int64

# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))

# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()
# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# |                  1|
# |                  4|
# |                  9|
# +-------------------+

UDF da iteratore di serie a iteratore di serie

Una UDF iteratore è identica a una funzione UDF pandas scalare, ad eccezione di:

  • La funzione Python
    • Accetta un iteratore di batch anziché un singolo batch di input come input.
    • Restituisce un iteratore di batch di output anziché un singolo batch di output.
  • La lunghezza dell'intero output nell'iteratore deve corrispondere alla lunghezza dell'intero input.
  • La UDF pandas di cui è stato eseguito il wrapping accetta una singola colonna Spark come input.

È necessario specificare l'hint per il tipo Python come Iterator[pandas.Series] ->Iterator[pandas.Series].

Questa UDF pandas è utile quando l'esecuzione UDF richiede l'inizializzazione di uno stato, ad esempio il caricamento di un file del modello di Machine Learning per applicare l'inferenza a ogni batch di input.

L'esempio seguente illustra come creare una UDF pandas con supporto iteratore.

import pandas as pd
from typing import Iterator
from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

# When the UDF is called with the column,
# the input to the underlying function is an iterator of pd.Series.
@pandas_udf("long")
def plus_one(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    for x in batch_iter:
        yield x + 1

df.select(plus_one(col("x"))).show()
# +-----------+
# |plus_one(x)|
# +-----------+
# |          2|
# |          3|
# |          4|
# +-----------+

# In the UDF, you can initialize some state before processing batches.
# Wrap your code with try/finally or use context managers to ensure
# the release of resources at the end.
y_bc = spark.sparkContext.broadcast(1)

@pandas_udf("long")
def plus_y(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    y = y_bc.value  # initialize states
    try:
        for x in batch_iter:
            yield x + y
    finally:
        pass  # release resources here, if any

df.select(plus_y(col("x"))).show()
# +---------+
# |plus_y(x)|
# +---------+
# |        2|
# |        3|
# |        4|
# +---------+

UDF da iteratore di più serie a iteratore di serie

Una UDF da iteratore di più serie a iteratore di serie presenta caratteristiche e restrizioni simili alla UDF da iteratore di serie a iteratore di serie. La funzione specificata accetta un iteratore di batch e restituisce un iteratore di batch. È utile anche quando l'esecuzione della funzione definita dall'utente richiede l'inizializzazione di uno stato.

Le differenze sono le seguenti:

  • La funzione Python sottostante accetta un iteratore di una tupla di serie pandas.
  • La UDF pandas di cui è stato eseguito il wrapping accetta più colonne Spark come input.

Specificare gli hint di tipo come Iterator[Tuple[pandas.Series, ...]] ->Iterator[pandas.Series].

from typing import Iterator, Tuple
import pandas as pd

from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

@pandas_udf("long")
def multiply_two_cols(
        iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
    for a, b in iterator:
        yield a * b

df.select(multiply_two_cols("x", "x")).show()
# +-----------------------+
# |multiply_two_cols(x, x)|
# +-----------------------+
# |                      1|
# |                      4|
# |                      9|
# +-----------------------+

UDF da serie a scalare

Le UDF pandas da serie a scalari sono simili alle funzioni di aggregazione di Spark. Una UDF pandas da serie a scalare definisce un'aggregazione da una o più serie pandas a un valore scalare, in cui ogni serie pandas rappresenta una colonna Spark. Usare una UDF pandas da serie a scalare con API come select, withColumn, groupBy.agg e pyspark.sql.Window.

Si esprime l'hint di tipo come pandas.Series, ... ->Any. Il tipo restituito deve essere un tipo di dati primitivo e lo scalare restituito può essere un tipo primitivo Python, ad esempio int o float o un tipo di dati NumPy, ad esempio numpy.int64 o numpy.float64. Any deve essere idealmente un tipo scalare specifico.

Questo tipo di UDF non supporta l'aggregazione parziale e tutti i dati per ogni gruppo vengono caricati in memoria.

L'esempio seguente mostra come usare questo tipo di UDF per calcolare la media con le operazioni select, groupBy e window:

import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

# Declare the function and create the UDF
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
    return v.mean()

df.select(mean_udf(df['v'])).show()
# +-----------+
# |mean_udf(v)|
# +-----------+
# |        4.2|
# +-----------+

df.groupby("id").agg(mean_udf(df['v'])).show()
# +---+-----------+
# | id|mean_udf(v)|
# +---+-----------+
# |  1|        1.5|
# |  2|        6.0|
# +---+-----------+

w = Window \
    .partitionBy('id') \
    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
# +---+----+------+
# | id|   v|mean_v|
# +---+----+------+
# |  1| 1.0|   1.5|
# |  1| 2.0|   1.5|
# |  2| 3.0|   6.0|
# |  2| 5.0|   6.0|
# |  2|10.0|   6.0|
# +---+----+------+

Per informazioni dettagliate sull'utilizzo, vedere pyspark.sql.functions.pandas_udf.

Utilizzo

Impostazione delle dimensioni del batch Arrow

Nota

Questa configurazione non ha alcun impatto sul calcolo configurato con la modalità di accesso condiviso e Databricks Runtime LTS da 13.3 a 14.2.

Le partizioni di dati in Spark vengono convertite in batch di record Arrow, che possono causare temporaneamente un utilizzo elevato della memoria nella JVM. Per evitare possibili eccezioni di memoria insufficiente, è possibile modificare le dimensioni dei batch di record Arrow impostando la configurazione spark.sql.execution.arrow.maxRecordsPerBatch su un numero intero che determina il numero massimo di righe per ogni batch. Il valore predefinito è 10.000 record per batch. Se il numero di colonne è elevato, il valore deve essere regolato di conseguenza. Usando questo limite, ogni partizione di dati viene suddivisa in 1 o più batch di record per l'elaborazione.

Timestamp con semantica fuso orario

Spark archivia internamente i timestamp come valori UTC e i dati di timestamp inseriti senza un fuso orario specificato vengono convertiti come ora locale in UTC con risoluzione microsecondo.

Quando i dati di timestamp vengono esportati o visualizzati in Spark, il fuso orario della sessione viene usato per localizzare i valori del timestamp. Il fuso orario della sessione viene impostato con la configurazione spark.sql.session.timeZone e per impostazione predefinita viene impostato il fuso orario locale del sistema JVM. Pandas usa un tipo datetime64 con risoluzione nanosecondo, datetime64[ns] con fuso orario facoltativo per colonna.

Quando i dati timestamp vengono trasferiti da Spark a pandas, vengono convertiti in nanosecondi e ogni colonna viene convertita nel fuso orario della sessione Spark, quindi localizzata in tale fuso orario, che rimuove il fuso orario e visualizza i valori come ora locale. Ciò si verifica quando si chiama toPandas() o pandas_udf con colonne timestamp.

Quando i dati timestamp vengono trasferiti da pandas a Spark, vengono convertiti in microsecondi UTC. Ciò si verifica quando si chiama createDataFrame con un dataframe pandas o quando si restituisce un timestamp da una UDF pandas. Queste conversioni vengono eseguite automaticamente per garantire che Spark abbia dati nel formato previsto, quindi non è necessario eseguire manualmente alcuna di queste conversioni. I valori nanosecondi vengono troncati.

Una funzione definita dall'utente standard carica i dati di timestamp come oggetti datetime Python, che è diverso da un timestamp pandas. Per ottenere le migliori prestazioni, si consiglia di utilizzare la funzionalità serie temporale di pandas quando si lavora con i timestamp in una UDF di pandas. Per informazioni dettagliate, vedere Funzionalità serie temporale/data.

Notebook di esempio

Il notebook seguente illustra i miglioramenti delle prestazioni che è possibile ottenere con le funzioni definite dall'utente pandas:

Notebook benchmark delle UDF pandas

Ottenere il notebook