Převod mezi datovými rámci PySpark a pandas
Naučte se převádět datové rámce Apache Sparku do datových rámců pandas a z datových rámců pandas pomocí Apache Arrow v Azure Databricks.
Apache Arrow a PyArrow
Apache Arrow je sloupcový formát dat v paměti používaný v Apache Sparku k efektivnímu přenosu dat mezi procesy JVM a Pythonu. To je užitečné pro vývojáře v Pythonu, kteří pracují s daty pandas a NumPy. Jeho použití ale vyžaduje některé menší změny konfigurace nebo kódu, aby se zajistila kompatibilita a získala největší výhodu.
PyArrow je vazba Pythonu pro Apache Arrow a je nainstalovaná v Databricks Runtime. Informace o verzi PyArrow dostupné v jednotlivých verzích databricks Runtime najdete v poznámkách k verzi a kompatibilitě modulu Databricks Runtime.
Podporované typy SQL
Všechny datové typy Spark SQL jsou podporovány převodem na základě šipky s výjimkou ArrayType
TimestampType
.
MapType
a ArrayType
vnořené StructType
jsou podporovány pouze při použití PyArrow 2.0.0 a vyšší.
StructType
je reprezentován jako místo pandas.DataFrame
pandas.Series
.
Převod datových rámců PySpark na datové rámce pandas a z datových rámců pandas
Šipka je k dispozici jako optimalizace při převodu datového rámce PySpark na datový rámec pandas s datovým rámcem toPandas()
PySpark a při vytváření datového rámce PySpark z datového rámce pandas s createDataFrame(pandas_df)
.
Chcete-li použít Arrow pro tyto metody, set konfiguraci Sparkspark.sql.execution.arrow.pyspark.enabled
pro true
. Tato konfigurace je ve výchozím nastavení povolena, s výjimkou clusterů s vysokou souběžností a také clusterů izolace uživatelů v pracovních prostorech, které mají povolenu funkci Unity Catalog.
Kromě toho by optimalizace, které spark.sql.execution.arrow.pyspark.enabled
povolil, by se mohly vrátit k implementaci bez šipky, pokud dojde k chybě před výpočetem v rámci Sparku. Toto chování můžete řídit pomocí konfigurace spark.sql.execution.arrow.pyspark.fallback.enabled
Sparku .
Příklad
import numpy as np
import pandas as pd
# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
# Generate a pandas DataFrame
pdf = pd.DataFrame(np.random.rand(100, 3))
# Create a Spark DataFrame from a pandas DataFrame using Arrow
df = spark.createDataFrame(pdf)
# Convert the Spark DataFrame back to a pandas DataFrame using Arrow
result_pdf = df.select("*").toPandas()
Použití optimalizace šipky vytvoří stejné výsledky jako v případech, kdy není šipka povolená. I když je to šipka, toPandas()
výsledkem je shromažďování všech záznamů v datovém rámci do programu ovladače a mělo by být provedeno na malé podmnožině dat.
Kromě toho se nepodporují všechny datové typy Sparku a pokud column má nepodporovaný typ, může se vyvolat chyba. Pokud dojde k chybě během createDataFrame()
, Spark vytvoří datový rámec bez šipky.