Nozioni di base su PySpark
Questo articolo illustra esempi semplici per illustrare l'utilizzo di PySpark. Si presuppone che si comprendano i concetti fondamentali di Apache Spark e che eseguano comandi in un notebook di Azure Databricks connesso al calcolo. È possibile creare dataframe usando dati di esempio, eseguire trasformazioni di base, incluse operazioni su righe e colonne su questi dati, combinare più dataframe e aggregare questi dati, visualizzarli e quindi salvarli in una tabella o in un file.
Caricare dati
Alcuni esempi in questo articolo usano i dati di esempio forniti da Databricks per illustrare l'uso dei dataframe per caricare, trasformare e salvare i dati. Se si vogliono usare i propri dati non ancora presenti in Databricks, è possibile caricarli prima e crearne uno. Vedere Creare o modificare una tabella usando il caricamento di file e Caricare file in un volume del catalogo Unity.
Informazioni sui dati di esempio di Databricks
Databricks fornisce dati di esempio nel samples
catalogo e nella /databricks-datasets
directory .
- Per accedere ai dati di esempio nel
samples
catalogo, usare il formatosamples.<schema-name>.<table-name>
. Questo articolo usa tabelle nellosamples.tpch
schema, che contiene dati di un'azienda fittizia. Lacustomer
tabella contiene informazioni sui clienti eorders
contiene informazioni sugli ordini effettuati dai clienti. - Usare
dbutils.fs.ls
per esplorare i dati in/databricks-datasets
. Usare Spark SQL o DataFrames per eseguire query sui dati in questo percorso usando i percorsi dei file. Per altre informazioni sui dati di esempio forniti da Databricks, vedere Set di dati di esempio.
Importare tipi di dati
Molte operazioni PySpark richiedono l'uso di funzioni SQL o l'interazione con i tipi Spark nativi. È possibile importare direttamente solo le funzioni e i tipi necessari oppure importare l'intero modulo.
# import all
from pyspark.sql.types import *
from pyspark.sql.functions import *
# import select functions and types
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import floor, round
Poiché alcune funzioni importate potrebbero eseguire l'override delle funzioni predefinite di Python, alcuni utenti scelgono di importare questi moduli usando un alias. Gli esempi seguenti illustrano un alias comune usato negli esempi di codice Apache Spark:
import pyspark.sql.types as T
import pyspark.sql.functions as F
Per un elenco completo dei tipi di dati, vedere Tipi di dati Spark.
Per un elenco completo delle funzioni SQL di PySpark, vedere Funzioni Spark.
Creare un dataframe
Esistono diversi modi per creare un dataframe. In genere si definisce un dataframe rispetto a un'origine dati, ad esempio una tabella o una raccolta di file. Quindi, come descritto nella sezione Concetti fondamentali di Apache Spark, usare un'azione, ad esempio display
, per attivare le trasformazioni da eseguire. Il display
metodo restituisce dataframe.
Creare un dataframe con valori specificati
Per creare un dataframe con valori specificati, usare il createDataFrame
metodo , dove le righe vengono espresse come elenco di tuple:
df_children = spark.createDataFrame(
data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
schema = ['name', 'age'])
display(df_children)
Si noti nell'output che i tipi di dati delle colonne di df_children
vengono dedotti automaticamente. In alternativa, è possibile specificare i tipi aggiungendo uno schema. Gli schemi vengono definiti utilizzando il StructType
che è costituito StructFields
da che specificano il nome, il tipo di dati e un flag booleano che indica se contengono o meno un valore Null. È necessario importare i tipi di dati da pyspark.sql.types
.
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
df_children_with_schema = spark.createDataFrame(
data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
schema = StructType([
StructField('name', StringType(), True),
StructField('age', IntegerType(), True)
])
)
display(df_children_with_schema)
Creare un dataframe da una tabella in Unity Catalog
Per creare un dataframe da una tabella in Unity Catalog, usare il table
metodo che identifica la tabella usando il formato <catalog-name>.<schema-name>.<table-name>
. Fare clic su Catalogo sulla barra di spostamento a sinistra per usare Esplora cataloghi per passare alla tabella. Fare clic su di esso, quindi selezionare Copia percorso tabella per inserire il percorso della tabella nel notebook.
Nell'esempio seguente viene caricata la tabella samples.tpch.customer
, ma in alternativa è possibile specificare il percorso della tabella.
df_customer = spark.table('samples.tpch.customer')
display(df_customer)
Creare un dataframe da un file caricato
Per creare un dataframe da un file caricato nei volumi del catalogo Unity, usare la read
proprietà . Questo metodo restituisce un DataFrameReader
oggetto , che è quindi possibile usare per leggere il formato appropriato. Fare clic sull'opzione catalogo sulla piccola barra laterale a sinistra e usare il browser del catalogo per individuare il file. Selezionarlo, quindi fare clic su Copia percorso del file del volume.
L'esempio seguente legge da un *.csv
file, ma DataFrameReader
supporta il caricamento di file in molti altri formati. Vedere Metodi DataFrameReader.
# Assign this variable your full volume file path
volume_file_path = ""
df_csv = (spark.read
.format("csv")
.option("header", True)
.option("inferSchema", True)
.load(volume_file_path)
)
display(df_csv)
Per altre informazioni sui volumi del catalogo Unity, vedere Che cosa sono i volumi del catalogo Unity?.
Creare un dataframe da una risposta JSON
Per creare un dataframe da un payload di risposta JSON restituito da un'API REST, usare il pacchetto Python requests
per eseguire query e analizzare la risposta. È necessario importare il pacchetto per usarlo. Questo esempio usa i dati del database dell'applicazione di farmaci Stati Uniti Food and Drug Administration.
import requests
# Download data from URL
url = "https://api.fda.gov/drug/drugsfda.json?limit=100"
response = requests.get(url)
# Create the DataFrame
df_drugs = spark.createDataFrame(response.json()["results"])
display(df_drugs)
Per informazioni sull'uso di JSON e altri dati semistrutturati in Databricks, vedere Modellare dati semistrutturati.
Selezionare un campo o un oggetto JSON
Per selezionare un campo o un oggetto specifico dal codice JSON convertito, usare la []
notazione . Ad esempio, per selezionare il products
campo che si tratta di una matrice di prodotti:
display(df_drugs.select(df_drugs["products"]))
È anche possibile concatenare le chiamate al metodo per attraversare più campi. Ad esempio, per restituire il nome del marchio del primo prodotto in un'applicazione di droga:
display(df_drugs.select(df_drugs["products"][0]["brand_name"]))
Creare un dataframe da un file
Per illustrare la creazione di un dataframe da un file, questo esempio carica i dati CSV nella /databricks-datasets
directory.
Per passare ai set di dati di esempio, è possibile usare i comandi del file system Databricks Utilties . L'esempio seguente usa dbutils
per elencare i set di dati disponibili in /databricks-datasets
:
display(dbutils.fs.ls('/databricks-datasets'))
In alternativa, è possibile usare %fs
per accedere ai comandi del file system dell'interfaccia della riga di comando di Databricks, come illustrato nell'esempio seguente:
%fs ls '/databricks-datasets'
Per creare un dataframe da un file o una directory di file, specificare il percorso nel load
metodo :
df_population = (spark.read
.format("csv")
.option("header", True)
.option("inferSchema", True)
.load("/databricks-datasets/samples/population-vs-price/data_geo.csv")
)
display(df_population)
Trasformare i dati con i dataframe
I dataframe semplificano la trasformazione dei dati usando metodi predefiniti per ordinare, filtrare e aggregare i dati. Molte trasformazioni non vengono specificate come metodi nei dataframe, ma vengono fornite nel spark.sql.functions
pacchetto. Vedere Funzioni SQL di Databricks Spark.
- Operazioni di colonna
- Operazioni di riga
- Creare un join di dataframe
- Aggregare i dati
- Concatenamento delle chiamate
Operazioni su colonne
Spark offre molte operazioni di base sulla colonna:
- Selezionare le colonne
- Creare colonne
- Rinominare le colonne
- Eseguire il cast dei tipi di colonna
- Rimuovere colonne
Suggerimento
Per restituire tutte le colonne in un dataframe, usare columns
, ad esempio df_customer.columns
.
Seleziona colonne
È possibile selezionare colonne specifiche usando select
e col
. La col
funzione si trova nel pyspark.sql.functions
modulo secondario.
from pyspark.sql.functions import col
df_customer.select(
col("c_custkey"),
col("c_acctbal")
)
È anche possibile fare riferimento a una colonna che expr
accetta un'espressione definita come stringa:
from pyspark.sql.functions import expr
df_customer.select(
expr("c_custkey"),
expr("c_acctbal")
)
È anche possibile usare selectExpr
, che accetta espressioni SQL:
df_customer.selectExpr(
"c_custkey as key",
"round(c_acctbal) as account_rounded"
)
Per selezionare le colonne usando un valore letterale stringa, eseguire le operazioni seguenti:
df_customer.select(
"c_custkey",
"c_acctbal"
)
Per selezionare in modo esplicito una colonna da un dataframe specifico, è possibile usare l'operatore []
o l'operatore .
. Non è possibile usare l'operatore .
per selezionare le colonne che iniziano con un numero intero o quelle che contengono uno spazio o un carattere speciale. Ciò può essere particolarmente utile quando si unisce dataframe in cui alcune colonne hanno lo stesso nome.
df_customer.select(
df_customer["c_custkey"],
df_customer["c_acctbal"]
)
df_customer.select(
df_customer.c_custkey,
df_customer.c_acctbal
)
Creare colonne
Per creare una nuova colonna, usare il withColumn
metodo . Nell'esempio seguente viene creata una nuova colonna contenente un valore booleano in base al fatto che il saldo c_acctbal
dell'account cliente superi 1000
:
df_customer_flag = df_customer.withColumn("balance_flag", col("c_acctbal") > 1000)
Rinominare le colonne
Per rinominare una colonna, usare il withColumnRenamed
metodo , che accetta i nomi di colonna esistenti e nuovi:
df_customer_flag_renamed = df_customer_flag.withColumnRenamed("balance_flag", "balance_flag_renamed")
Il alias
metodo è particolarmente utile quando si desidera rinominare le colonne come parte delle aggregazioni:
from pyspark.sql.functions import avg
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
avg(df_customer["c_acctbal"]).alias("avg_account_balance")
)
display(df_segment_balance)
Eseguire il cast dei tipi di colonna
In alcuni casi è possibile modificare il tipo di dati per una o più colonne nel dataframe. A tale scopo, utilizzare il metodo per eseguire la cast
conversione tra tipi di dati di colonna. Nell'esempio seguente viene illustrato come convertire una colonna da un numero intero a un tipo stringa, usando il col
metodo per fare riferimento a una colonna:
from pyspark.sql.functions import col
df_casted = df_customer.withColumn("c_custkey", col("c_custkey").cast(StringType()))
print(type(df_casted))
Rimuovere colonne
Per rimuovere le colonne, è possibile omettere le colonne durante una selezione oppure select(*) except
è possibile usare il drop
metodo :
df_customer_flag_renamed.drop("balance_flag_renamed")
È anche possibile eliminare più colonne contemporaneamente:
df_customer_flag_renamed.drop("c_phone", "balance_flag_renamed")
Operazioni su righe
Spark offre molte operazioni di base per le righe:
- Filtra righe
- Rimuovere righe duplicate
- Gestire i valori Null
- Accoda righe
- Ordinare le righe
- Filtra righe
Filtraggio delle righe
Per filtrare le righe, utilizzare il filter
metodo o where
in un dataframe per restituire solo determinate righe. Per identificare una colonna da filtrare, usare il col
metodo o un'espressione che restituisce una colonna.
from pyspark.sql.functions import col
df_that_one_customer = df_customer.filter(col("c_custkey") == 412449)
Per filtrare in base a più condizioni, usare operatori logici. Ad esempio, &
e |
abilitare rispettivamente l'utente e AND
OR
le condizioni. Nell'esempio seguente vengono filtrate le righe in cui c_nationkey
è uguale a 20
e c_acctbal
è maggiore di 1000
.
df_customer.filter((col("c_nationkey") == 20) & (col("c_acctbal") > 1000))
df_filtered_customer = df_customer.filter((col("c_custkey") == 412446) | (col("c_custkey") == 412447))
Rimuovere righe duplicate
Per deduplicare le righe, usare distinct
, che restituisce solo le righe univoce.
df_unique = df_customer.distinct()
Gestire i valori Null
Per gestire i valori Null, eliminare righe contenenti valori Null usando il na.drop
metodo . Questo metodo consente di specificare se si desidera eliminare righe contenenti any
valori Null o all
valori Null.
Per eliminare i valori Null, usare uno degli esempi seguenti.
df_customer_no_nulls = df_customer.na.drop()
df_customer_no_nulls = df_customer.na.drop("any")
Se invece si desidera filtrare solo le righe contenenti tutti i valori Null, usare quanto segue:
df_customer_no_nulls = df_customer.na.drop("all")
È possibile applicarlo per un subset di colonne specificando questo valore, come illustrato di seguito:
df_customer_no_nulls = df_customer.na.drop("all", subset=["c_acctbal", "c_custkey"])
Per compilare i valori mancanti, usare il fill
metodo . È possibile scegliere di applicarlo a tutte le colonne o a un subset di colonne. Nell'esempio seguente i saldi di conto con valore Null per il saldo c_acctbal
del conto vengono riempiti con 0
.
df_customer_filled = df_customer.na.fill("0", subset=["c_acctbal"])
Per sostituire le stringhe con altri valori, usare il replace
metodo . Nell'esempio seguente tutte le stringhe di indirizzo vuote vengono sostituite con la parola UNKNOWN
:
df_customer_phone_filled = df_customer.na.replace([""], ["UNKNOWN"], subset=["c_phone"])
Accoda righe
Per accodare righe è necessario usare il union
metodo per creare un nuovo dataframe. Nell'esempio seguente il dataframe creato in precedenza e df_filtered_customer
vengono combinati, che restituisce un dataframe df_that_one_customer
con tre clienti:
df_appended_rows = df_that_one_customer.union(df_filtered_customer)
display(df_appended_rows)
Nota
È anche possibile combinare dataframe scrivendoli in una tabella e quindi aggiungendo nuove righe. Per i carichi di lavoro di produzione, l'elaborazione incrementale delle origini dati in una tabella di destinazione può ridurre drasticamente la latenza e i costi di calcolo man mano che aumentano le dimensioni dei dati. Vedere Inserire dati in un databricks lakehouse.
Ordinare le righe
Importante
L'ordinamento può essere costoso su larga scala e, se si archiviano i dati ordinati e si ricaricano i dati con Spark, l'ordine non è garantito. Assicurati di essere intenzionale nell'uso dell'ordinamento.
Per ordinare le righe in base a una o più colonne, utilizzare il sort
metodo o orderBy
. Per impostazione predefinita, questi metodi vengono ordinati in ordine crescente:
df_customer.orderBy(col("c_acctbal"))
Per filtrare in ordine decrescente, usare desc
:
df_customer.sort(col("c_custkey").desc())
L'esempio seguente illustra come ordinare in due colonne:
df_sorted = df_customer.orderBy(col("c_acctbal").desc(), col("c_custkey").asc())
df_sorted = df_customer.sort(col("c_acctbal").desc(), col("c_custkey").asc())
Per limitare il numero di righe da restituire dopo l'ordinamento del dataframe, usare il limit
metodo . Nell'esempio seguente vengono visualizzati solo i risultati principali 10
:
display(df_sorted.limit(10))
Creare un join di dataframe
Per unire due o più dataframe, usare il join
metodo . È possibile specificare la modalità di unione dei dataframe nel how
(tipo di join) e on
(su quali colonne basare i parametri di join). I tipi di join comuni includono:
inner
: si tratta del tipo di join predefinito, che restituisce un dataframe che mantiene solo le righe in cui è presente una corrispondenza per ilon
parametro nei dataframe.left
: mantiene tutte le righe del primo dataframe specificato e solo le righe del secondo dataframe specificato che hanno una corrispondenza con la prima.outer
: un outer join mantiene tutte le righe di entrambi i dataframe indipendentemente dalla corrispondenza.
Per informazioni dettagliate sui join, vedere Usare join in Azure Databricks. Per un elenco dei join supportati in PySpark, vedere Join di dataframe.
Nell'esempio seguente viene restituito un singolo dataframe in cui ogni riga del orders
dataframe viene unita alla riga corrispondente del customers
dataframe. Viene usato un inner join, in quanto si prevede che ogni ordine corrisponda esattamente a un cliente.
df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')
df_joined = df_order.join(
df_customer,
on = df_order["o_custkey"] == df_customer["c_custkey"],
how = "inner"
)
display(df_joined)
Per creare un join in più condizioni, usare rispettivamente operatori booleani come &
e |
per specificare AND
e OR
. Nell'esempio seguente viene aggiunta una condizione aggiuntiva, filtrando solo le righe con o_totalprice
maggiore di 500,000
:
df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')
df_complex_joined = df_order.join(
df_customer,
on = ((df_order["o_custkey"] == df_customer["c_custkey"]) & (df_order["o_totalprice"] > 500000)),
how = "inner"
)
display(df_complex_joined)
Aggregare i dati
Per aggregare i dati in un dataframe, in modo analogo a in GROUP BY
SQL, usare il groupBy
metodo per specificare le colonne per raggruppare e il agg
metodo per specificare le aggregazioni. Importare aggregazioni comuni, tra cui avg
, max
sum
, e min
da pyspark.sql.functions
. L'esempio seguente mostra il saldo medio dei clienti per segmento di mercato:
from pyspark.sql.functions import avg
# group by one column
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
avg(df_customer["c_acctbal"])
)
display(df_segment_balance)
from pyspark.sql.functions import avg
# group by two columns
df_segment_nation_balance = df_customer.groupBy("c_mktsegment", "c_nationkey").agg(
avg(df_customer["c_acctbal"])
)
display(df_segment_nation_balance)
Alcune aggregazioni sono azioni, il che significa che attivano i calcoli. In questo caso non è necessario usare altre azioni per restituire i risultati.
Per contare le righe in un dataframe, usare il count
metodo :
df_customer.count()
Concatenamento delle chiamate
I metodi che trasformano i dataframe restituiscono dataframe e Spark non agisce sulle trasformazioni finché non vengono chiamate azioni. Questa valutazione differita consente di concatenare più metodi per praticità e leggibilità. Nell'esempio seguente viene illustrato come concatenare filtri, aggregazioni e ordinamento:
from pyspark.sql.functions import count
df_chained = (
df_order.filter(col("o_orderstatus") == "F")
.groupBy(col("o_orderpriority"))
.agg(count(col("o_orderkey")).alias("n_orders"))
.sort(col("n_orders").desc())
)
display(df_chained)
Visualizzare il dataframe
Per visualizzare un dataframe in un notebook, fare clic sul + segno accanto alla tabella in alto a sinistra del dataframe, quindi selezionare Visualizzazione per aggiungere uno o più grafici in base al dataframe. Per informazioni dettagliate sulle visualizzazioni, vedere Visualizzazioni nei notebook di Databricks.
display(df_order)
Per eseguire visualizzazioni aggiuntive, Databricks consiglia di usare l'API Pandas per Spark. .pandas_api()
consente di eseguire il cast all'API pandas corrispondente per un dataframe Spark. Per altre informazioni, vedere API Pandas in Spark.
Salvare i dati
Dopo aver trasformato i dati, è possibile salvarli usando i DataFrameWriter
metodi . Un elenco completo di questi metodi è disponibile in DataFrameWriter. Le sezioni seguenti illustrano come salvare il dataframe come tabella e come raccolta di file di dati.
Salvare il dataframe come tabella
Per salvare il dataframe come tabella in Unity Catalog, usare il write.saveAsTable
metodo e specificare il percorso nel formato <catalog-name>.<schema-name>.<table-name>
.
df_joined.write.saveAsTable(f"{catalog_name}.{schema_name}.{table_name}")
Scrivere il dataframe come CSV
Per scrivere il dataframe in *.csv
formato, usare il write.csv
metodo , specificando il formato e le opzioni. Per impostazione predefinita, se i dati sono presenti nel percorso specificato, l'operazione di scrittura ha esito negativo. È possibile specificare una delle modalità seguenti per eseguire un'azione diversa:
overwrite
sovrascrive tutti i dati esistenti nel percorso di destinazione con il contenuto del dataframe.append
aggiunge il contenuto del dataframe ai dati nel percorso di destinazione.ignore
in modo invisibile all'utente la scrittura se i dati sono presenti nel percorso di destinazione.
L'esempio seguente illustra la sovrascrittura dei dati con il contenuto del dataframe come file CSV:
# Assign this variable your file path
file_path = ""
(df_joined.write
.format("csv")
.mode("overwrite")
.write(file_path)
)
Passaggi successivi
Per sfruttare altre funzionalità spark in Databricks, vedere: