Udostępnij za pośrednictwem


Interfejsy API funkcji biblioteki pandas

Interfejsy API funkcji pandas umożliwiają bezpośrednie zastosowanie natywnej funkcji języka Python, która pobiera wystąpienia biblioteki pandas i generuje je do ramki danych PySpark. Podobnie jak w przypadku funkcji zdefiniowanych przez użytkownika biblioteki pandas, interfejsy API funkcji używają również apache Arrow do przesyłania danych i biblioteki pandas do pracy z danymi; jednak wskazówki dotyczące typu języka Python są opcjonalne w interfejsach API funkcji biblioteki pandas.

Istnieją trzy typy interfejsów API funkcji biblioteki pandas:

  • Pogrupowana mapa
  • Mapa
  • Mapa współgrupowana

Interfejsy API funkcji pandas korzystają z tej samej logiki wewnętrznej używanej przez funkcję UDF biblioteki pandas. Mają one cechy, takie jak PyArrow, obsługiwane typy SQL i konfiguracje.

Aby uzyskać więcej informacji, zobacz wpis na blogu Nowe UDF Pandas i wskazówki dotyczące typów w Pythonie w nadchodzącej wersji Apache Spark 3.0.

Pogrupowana mapa

Transformujesz swoje zgrupowane dane za pomocą groupBy().applyInPandas(), aby zastosować wzorzec „split-apply-combine”. Split-apply-combine składa się z trzech kroków:

  • Podziel dane na grupy przy użyciu DataFrame.groupBy.
  • Zastosuj funkcję w każdej grupie. Dane wejściowe i wyjściowe funkcji są pandas.DataFrame. Dane wejściowe zawierają wszystkie wiersze i columns dla każdej grupy.
  • Połącz wyniki w nową DataFrame.

Aby użyć groupBy().applyInPandas(), należy zdefiniować następujące elementy:

  • Funkcja języka Python, która definiuje obliczenia dla każdej grupy
  • Obiekt StructType lub ciąg definiujący schema danych wyjściowych DataFrame

Etykiety column zwróconych pandas.DataFrame muszą być zgodne z nazwami pól w zdefiniowanych danych wyjściowych schema, jeśli są określone jako ciągi, lub pasują do typów danych pól według pozycji, jeśli nie ciągi, na przykład indeksy liczb całkowitych. Zobacz pandas.DataFrame, jak etykietować columns podczas konstruowania pandas.DataFrame.

Wszystkie dane dla grupy są ładowane do pamięci przed zastosowaniem funkcji. To może prowadzić do wyjątków braku pamięci, zwłaszcza jeśli rozmiary grup są nierównomierne. Konfiguracja dla maxRecordsPerBatch nie jest stosowana w grupach i to do Ciebie należy zapewnienie, że zgrupowane dane mieszczą się w dostępnej pamięci.

W poniższym przykładzie pokazano, jak użyć groupby().apply(), aby odjąć średnią z każdej wartości w grupie.

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

def subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id|   v|
# +---+----+
# |  1|-0.5|
# |  1| 0.5|
# |  2|-3.0|
# |  2|-1.0|
# |  2| 4.0|
# +---+----+

Aby uzyskać szczegółowe informacje o użyciu, zobacz pyspark.sql.GroupedData.applyInPandas.

Mapa

Operacje mapowania z wystąpieniami biblioteki pandas są wykonywane przez DataFrame.mapInPandas() w celu przekształcenia iteratora pandas.DataFrame do innego iteratora pandas.DataFrame, który reprezentuje bieżącą ramkę danych PySpark i zwraca wynik jako ramkę danych PySpark.

Funkcja bazowa pobiera i generuje iterator pandas.DataFrame. Może zwracać dane wyjściowe dowolnej długości w przeciwieństwie do niektórych funkcji zdefiniowanych przez użytkownika biblioteki pandas, takich jak Seria do serii.

W poniższym przykładzie pokazano, jak używać mapInPandas():

df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

def filter_func(iterator):
    for pdf in iterator:
        yield pdf[pdf.id == 1]

df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# |  1| 21|
# +---+---+

Aby uzyskać szczegółowe informacje o użyciu, zobacz pyspark.sql.DataFrame.mapInPandas.

Mapa współgrupowana

W przypadku współgrupowanych operacji mapowania z wystąpieniami biblioteki pandas użyj DataFrame.groupby().cogroup().applyInPandas(), aby współgrupować dwa DataFramePySpark za pomocą wspólnego klucza, a następnie zastosować funkcję języka Python do każdej grupy, jak pokazano poniżej:

  • Przetasuj dane tak, aby grupy każdej ramki danych, które współużytkują klucz, są ze sobą zgrupowane.
  • Zastosuj funkcję do każdej cogrupy. Dane wejściowe funkcji to dwa pandas.DataFrame (z opcjonalną krotką reprezentującą klucz). Dane wyjściowe funkcji to pandas.DataFrame.
  • Połącz pandas.DataFrameze wszystkich grup w nową strukturę PySpark DataFrame.

Aby użyć groupBy().cogroup().applyInPandas(), należy zdefiniować następujące elementy:

  • Funkcja języka Python, która definiuje obliczenia dla każdej cogroup.
  • Obiekt StructType lub ciąg definiujący schema danych wyjściowych PySpark DataFrame.

Etykiety column zwróconych pandas.DataFrame muszą być zgodne z nazwami pól w zdefiniowanych danych wyjściowych schema, jeśli etykiety są określone jako ciągi znaków. Jeśli nie są ciągami, muszą pasować do typów danych pól według pozycji, na przykład indeksy liczb całkowitych. Zobacz pandas.DataFrame, aby dowiedzieć się, jak etykietować columns podczas konstruowania pandas.DataFrame.

Wszystkie dane dla cogroup są ładowane do pamięci przed zastosowaniem funkcji. Może to prowadzić do wyjątków związanych z brakiem pamięci, zwłaszcza jeśli rozmiary grup są nierówne. Konfiguracja dla maxRecordsPerBatch nie jest stosowana i to od Ciebie zależy, aby pogrupowane wspólnie dane zmieściły się w dostępnej pamięci.

W poniższym przykładzie pokazano, jak używać groupby().cogroup().applyInPandas() do wykonywania asof join między dwoma zestawami danych.

import pandas as pd

df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ("time", "id", "v1"))

df2 = spark.createDataFrame(
    [(20000101, 1, "x"), (20000101, 2, "y")],
    ("time", "id", "v2"))

def asof_join(l, r):
    return pd.merge_asof(l, r, on="time", by="id")

df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
    asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# |    time| id| v1| v2|
# +--------+---+---+---+
# |20000101|  1|1.0|  x|
# |20000102|  1|3.0|  x|
# |20000101|  2|2.0|  y|
# |20000102|  2|4.0|  y|
# +--------+---+---+---+

Aby uzyskać szczegółowe informacje o użyciu, zobacz pyspark.sql.PandasCogroupedOps.applyInPandas.