Functie-API's van pandas
Met pandas-functie-API's kunt u rechtstreeks een systeemeigen Python-functie toepassen die pandas-exemplaren gebruikt en uitvoert op een PySpark DataFrame. Net als gebruikersgedefinieerde pandas-functies, gebruiken functie-API's ook Apache Arrow om gegevens over te dragen en pandas om met de gegevens te werken; Python type hints zijn echter optioneel in pandas-functie-API's.
Er zijn drie typen pandas-functie-API's:
- Gegroepeerde kaart
- Kaart
- Gegroepeerde kaart
Pandas-functie-API's maken gebruik van dezelfde interne logica die door pandas UDF-uitvoering wordt gebruikt. Ze delen kenmerken zoals PyArrow, ondersteunde SQL-typen en de configuraties.
Zie het blogbericht Nieuwe Pandas UDF's en Python Type Hints in de aanstaande release van Apache Spark 3.0voor meer informatie.
Gegroepeerde kaart
U transformeert uw gegroepeerde gegevens met behulp van groupBy().applyInPandas()
om het patroon split-apply-combine te implementeren. Split-apply-combine bestaat uit drie stappen:
- Splits de gegevens in groepen met behulp van
DataFrame.groupBy
. - Een functie toepassen op elke groep. De invoer en uitvoer van de functie zijn beide
pandas.DataFrame
. De invoergegevens bevatten alle rijen en kolommen voor elke groep. - Combineer de resultaten in een nieuwe
DataFrame
.
Als u groupBy().applyInPandas()
wilt gebruiken, moet u het volgende definiëren:
- Een Python-functie die de berekening voor elke groep definieert
- Een
StructType
-object of een tekenreeks die het schema van de uitvoerDataFrame
definieert
De kolomlabels van de geretourneerde pandas.DataFrame
moeten overeenkomen met de veldnamen in het gedefinieerde uitvoerschema, indien opgegeven als tekenreeksen, of overeenkomen met de veldgegevenstypen op positie als dat niet tekenreeksen zijn, bijvoorbeeld gehele getallen. Zie pandas.DataFrame voor hoe u kolommen labelt bij het maken van een pandas.DataFrame
.
Alle gegevens voor een groep worden in het geheugen geladen voordat de functie wordt toegepast. Dit kan leiden tot uitzonderingen met onvoldoende geheugen, met name als de groepsgrootten scheef zijn. De configuratie voor maxRecordsPerBatch wordt niet toegepast op groepen en het is aan u om ervoor te zorgen dat de gegroepeerde gegevens in het beschikbare geheugen passen.
In het volgende voorbeeld ziet u hoe u groupby().apply()
gebruikt om het gemiddelde van elke waarde in de groep af te trekken.
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=v - v.mean())
df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id| v|
# +---+----+
# | 1|-0.5|
# | 1| 0.5|
# | 2|-3.0|
# | 2|-1.0|
# | 2| 4.0|
# +---+----+
Zie pyspark.sql.GroupedData.applyInPandasvoor gedetailleerde gebruik.
Kaart
U voert kaartbewerkingen uit met pandas-exemplaren door DataFrame.mapInPandas()
om een iterator van pandas.DataFrame
te transformeren naar een andere iterator van pandas.DataFrame
die het huidige PySpark DataFrame vertegenwoordigt en het resultaat retourneert als een PySpark DataFrame.
De onderliggende functie neemt een iterator van pandas.DataFrame
en voert deze uit. Het kan uitvoer van willekeurige lengte retourneren in tegenstelling tot sommige pandas UDF's, zoals Series to Series.
In het volgende voorbeeld ziet u hoe u mapInPandas()
gebruikt:
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
def filter_func(iterator):
for pdf in iterator:
yield pdf[pdf.id == 1]
df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# | 1| 21|
# +---+---+
Zie pyspark.sql.DataFrame.mapInPandasvoor gedetailleerde gebruik.
Gegroepeerde kaart
Voor gegroepeerde kaartbewerkingen met pandas-exemplaren gebruikt u DataFrame.groupby().cogroup().applyInPandas()
om twee PySpark-DataFrame
s te groeperen met een gemeenschappelijke sleutel en past u vervolgens een Python-functie toe op elke cogroep, zoals wordt weergegeven:
- Verdeel de gegevens zodanig dat de groepen van elk DataFrame die een sleutel delen, samen worden gegroepeerd.
- Pas een functie toe op elke cogroep. De invoer van de functie is twee
pandas.DataFrame
(met een optionele tuple die de sleutel vertegenwoordigt). De uitvoer van de functie is eenpandas.DataFrame
. - Combineer de
pandas.DataFrame
’s van alle groepen tot een nieuwe PySpark-DataFrame
.
Als u groupBy().cogroup().applyInPandas()
wilt gebruiken, moet u het volgende definiëren:
- Een Python-functie waarmee de berekening voor elke cogroep wordt gedefinieerd.
- Een
StructType
-object of een tekenreeks waarmee het schema van de uitvoer pySpark-DataFrame
wordt gedefinieerd.
De kolomlabels van de geretourneerde pandas.DataFrame
moeten overeenkomen met de veldnamen in het gedefinieerde uitvoerschema, indien opgegeven als tekenreeksen, of overeenkomen met de veldgegevenstypen op positie als dat niet tekenreeksen zijn, bijvoorbeeld gehele getallen. Zie pandas.DataFrame voor hoe u kolommen labelt bij het maken van een pandas.DataFrame
.
Alle gegevens voor een cogroep worden in het geheugen geladen voordat de functie wordt toegepast. Dit kan leiden tot uitzonderingen met onvoldoende geheugen, met name als de groepsgrootten scheef zijn. De configuratie voor maxRecordsPerBatch wordt niet toegepast en het is aan u om ervoor te zorgen dat de gegroepeerde gegevens in het beschikbare geheugen passen.
In het volgende voorbeeld ziet u hoe u groupby().cogroup().applyInPandas()
gebruikt om een asof join
uit te voeren tussen twee gegevenssets.
import pandas as pd
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("time", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("time", "id", "v2"))
def asof_join(l, r):
return pd.merge_asof(l, r, on="time", by="id")
df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# | time| id| v1| v2|
# +--------+---+---+---+
# |20000101| 1|1.0| x|
# |20000102| 1|3.0| x|
# |20000101| 2|2.0| y|
# |20000102| 2|4.0| y|
# +--------+---+---+---+
Zie pyspark.sql.PandasCogroupedOps.applyInPandasvoor gedetailleerde gebruik.