Condividi tramite


Nozioni di base su PySpark

Questo articolo illustra esempi semplici per illustrare l'utilizzo di PySpark. Si presuppone che si comprendano i concetti fondamentali di Apache Spark e che eseguano comandi in un notebook di Azure Databricks connesso al calcolo. È possibile creare dataframe usando dati di esempio, eseguire trasformazioni di base, incluse operazioni su righe e colonne su questi dati, combinare più dataframe e aggregare questi dati, visualizzarli e quindi salvarli in una tabella o in un file.

Caricare dati

Alcuni esempi in questo articolo usano i dati di esempio forniti da Databricks per illustrare l'uso dei dataframe per caricare, trasformare e salvare i dati. Se si vogliono usare i propri dati non ancora presenti in Databricks, è possibile caricarli prima e crearne uno. Vedere Creare o modificare una tabella usando il caricamento di file e Caricare file in un volume del catalogo Unity.

Informazioni sui dati di esempio di Databricks

Databricks fornisce dati di esempio nel samples catalogo e nella /databricks-datasets directory .

  • Per accedere ai dati di esempio nel samples catalogo, usare il formato samples.<schema-name>.<table-name>. Questo articolo usa tabelle nello samples.tpch schema, che contiene dati di un'azienda fittizia. La customer tabella contiene informazioni sui clienti e orders contiene informazioni sugli ordini effettuati dai clienti.
  • Usare dbutils.fs.ls per esplorare i dati in /databricks-datasets. Usare Spark SQL o DataFrames per eseguire query sui dati in questo percorso usando i percorsi dei file. Per altre informazioni sui dati di esempio forniti da Databricks, vedere Set di dati di esempio.

Importare tipi di dati

Molte operazioni PySpark richiedono l'uso di funzioni SQL o l'interazione con i tipi Spark nativi. È possibile importare direttamente solo le funzioni e i tipi necessari oppure importare l'intero modulo.

# import all
from pyspark.sql.types import *
from pyspark.sql.functions import *

# import select functions and types
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import floor, round

Poiché alcune funzioni importate potrebbero eseguire l'override delle funzioni predefinite di Python, alcuni utenti scelgono di importare questi moduli usando un alias. Gli esempi seguenti illustrano un alias comune usato negli esempi di codice Apache Spark:

import pyspark.sql.types as T
import pyspark.sql.functions as F

Per un elenco completo dei tipi di dati, vedere Tipi di dati Spark.

Per un elenco completo delle funzioni SQL di PySpark, vedere Funzioni Spark.

Creare un dataframe

Esistono diversi modi per creare un dataframe. In genere si definisce un dataframe rispetto a un'origine dati, ad esempio una tabella o una raccolta di file. Quindi, come descritto nella sezione Concetti fondamentali di Apache Spark, usare un'azione, ad esempio display, per attivare le trasformazioni da eseguire. Il display metodo restituisce dataframe.

Creare un dataframe con valori specificati

Per creare un dataframe con valori specificati, usare il createDataFrame metodo , dove le righe vengono espresse come elenco di tuple:

df_children = spark.createDataFrame(
  data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
  schema = ['name', 'age'])
display(df_children)

Si noti nell'output che i tipi di dati delle colonne di df_children vengono dedotti automaticamente. In alternativa, è possibile specificare i tipi aggiungendo uno schema. Gli schemi vengono definiti utilizzando il StructType che è costituito StructFields da che specificano il nome, il tipo di dati e un flag booleano che indica se contengono o meno un valore Null. È necessario importare i tipi di dati da pyspark.sql.types.

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

df_children_with_schema = spark.createDataFrame(
  data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
  schema = StructType([
    StructField('name', StringType(), True),
    StructField('age', IntegerType(), True)
  ])
)
display(df_children_with_schema)

Creare un dataframe da una tabella in Unity Catalog

Per creare un dataframe da una tabella in Unity Catalog, usare il table metodo che identifica la tabella usando il formato <catalog-name>.<schema-name>.<table-name>. Fare clic su Catalogo sulla barra di spostamento a sinistra per usare Esplora cataloghi per passare alla tabella. Fare clic su di esso, quindi selezionare Copia percorso tabella per inserire il percorso della tabella nel notebook.

Nell'esempio seguente viene caricata la tabella samples.tpch.customer, ma in alternativa è possibile specificare il percorso della tabella.

df_customer = spark.table('samples.tpch.customer')
display(df_customer)

Creare un dataframe da un file caricato

Per creare un dataframe da un file caricato nei volumi del catalogo Unity, usare la read proprietà . Questo metodo restituisce un DataFrameReaderoggetto , che è quindi possibile usare per leggere il formato appropriato. Fare clic sull'opzione catalogo sulla piccola barra laterale a sinistra e usare il browser del catalogo per individuare il file. Selezionarlo, quindi fare clic su Copia percorso del file del volume.

L'esempio seguente legge da un *.csv file, ma DataFrameReader supporta il caricamento di file in molti altri formati. Vedere Metodi DataFrameReader.

# Assign this variable your full volume file path
volume_file_path = ""

df_csv = (spark.read
  .format("csv")
  .option("header", True)
  .option("inferSchema", True)
  .load(volume_file_path)
)
display(df_csv)

Per altre informazioni sui volumi del catalogo Unity, vedere Che cosa sono i volumi del catalogo Unity?.

Creare un dataframe da una risposta JSON

Per creare un dataframe da un payload di risposta JSON restituito da un'API REST, usare il pacchetto Python requests per eseguire query e analizzare la risposta. È necessario importare il pacchetto per usarlo. Questo esempio usa i dati del database dell'applicazione di farmaci Stati Uniti Food and Drug Administration.

import requests

# Download data from URL
url = "https://api.fda.gov/drug/drugsfda.json?limit=100"
response = requests.get(url)

# Create the DataFrame
df_drugs = spark.createDataFrame(response.json()["results"])
display(df_drugs)

Per informazioni sull'uso di JSON e altri dati semistrutturati in Databricks, vedere Modellare dati semistrutturati.

Selezionare un campo o un oggetto JSON

Per selezionare un campo o un oggetto specifico dal codice JSON convertito, usare la [] notazione . Ad esempio, per selezionare il products campo che si tratta di una matrice di prodotti:

display(df_drugs.select(df_drugs["products"]))

È anche possibile concatenare le chiamate al metodo per attraversare più campi. Ad esempio, per restituire il nome del marchio del primo prodotto in un'applicazione di droga:

display(df_drugs.select(df_drugs["products"][0]["brand_name"]))

Creare un dataframe da un file

Per illustrare la creazione di un dataframe da un file, questo esempio carica i dati CSV nella /databricks-datasets directory.

Per passare ai set di dati di esempio, è possibile usare i comandi del file system Databricks Utilties . L'esempio seguente usa dbutils per elencare i set di dati disponibili in /databricks-datasets:

display(dbutils.fs.ls('/databricks-datasets'))

In alternativa, è possibile usare %fs per accedere ai comandi del file system dell'interfaccia della riga di comando di Databricks, come illustrato nell'esempio seguente:

%fs ls '/databricks-datasets'

Per creare un dataframe da un file o una directory di file, specificare il percorso nel load metodo :

df_population = (spark.read
  .format("csv")
  .option("header", True)
  .option("inferSchema", True)
  .load("/databricks-datasets/samples/population-vs-price/data_geo.csv")
)
display(df_population)

Trasformare i dati con i dataframe

I dataframe semplificano la trasformazione dei dati usando metodi predefiniti per ordinare, filtrare e aggregare i dati. Molte trasformazioni non vengono specificate come metodi nei dataframe, ma vengono fornite nel spark.sql.functions pacchetto. Vedere Funzioni SQL di Databricks Spark.

Operazioni su colonne

Spark offre molte operazioni di base sulla colonna:

Suggerimento

Per restituire tutte le colonne in un dataframe, usare columns, ad esempio df_customer.columns.

Seleziona colonne

È possibile selezionare colonne specifiche usando select e col. La col funzione si trova nel pyspark.sql.functions modulo secondario.

from pyspark.sql.functions import col

df_customer.select(
  col("c_custkey"),
  col("c_acctbal")
)

È anche possibile fare riferimento a una colonna che expr accetta un'espressione definita come stringa:

from pyspark.sql.functions import expr

df_customer.select(
  expr("c_custkey"),
  expr("c_acctbal")
)

È anche possibile usare selectExpr, che accetta espressioni SQL:

df_customer.selectExpr(
  "c_custkey as key",
  "round(c_acctbal) as account_rounded"
)

Per selezionare le colonne usando un valore letterale stringa, eseguire le operazioni seguenti:

df_customer.select(
  "c_custkey",
  "c_acctbal"
)

Per selezionare in modo esplicito una colonna da un dataframe specifico, è possibile usare l'operatore [] o l'operatore . . Non è possibile usare l'operatore . per selezionare le colonne che iniziano con un numero intero o quelle che contengono uno spazio o un carattere speciale. Ciò può essere particolarmente utile quando si unisce dataframe in cui alcune colonne hanno lo stesso nome.

df_customer.select(
  df_customer["c_custkey"],
  df_customer["c_acctbal"]
)
df_customer.select(
  df_customer.c_custkey,
  df_customer.c_acctbal
)

Creare colonne

Per creare una nuova colonna, usare il withColumn metodo . Nell'esempio seguente viene creata una nuova colonna contenente un valore booleano in base al fatto che il saldo c_acctbal dell'account cliente superi 1000:

df_customer_flag = df_customer.withColumn("balance_flag", col("c_acctbal") > 1000)

Rinominare le colonne

Per rinominare una colonna, usare il withColumnRenamed metodo , che accetta i nomi di colonna esistenti e nuovi:

df_customer_flag_renamed = df_customer_flag.withColumnRenamed("balance_flag", "balance_flag_renamed")

Il alias metodo è particolarmente utile quando si desidera rinominare le colonne come parte delle aggregazioni:

from pyspark.sql.functions import avg

df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
    avg(df_customer["c_acctbal"]).alias("avg_account_balance")
)

display(df_segment_balance)

Eseguire il cast dei tipi di colonna

In alcuni casi è possibile modificare il tipo di dati per una o più colonne nel dataframe. A tale scopo, utilizzare il metodo per eseguire la cast conversione tra tipi di dati di colonna. Nell'esempio seguente viene illustrato come convertire una colonna da un numero intero a un tipo stringa, usando il col metodo per fare riferimento a una colonna:

from pyspark.sql.functions import col

df_casted = df_customer.withColumn("c_custkey", col("c_custkey").cast(StringType()))
print(type(df_casted))

Rimuovere colonne

Per rimuovere le colonne, è possibile omettere le colonne durante una selezione oppure select(*) except è possibile usare il drop metodo :

df_customer_flag_renamed.drop("balance_flag_renamed")

È anche possibile eliminare più colonne contemporaneamente:

df_customer_flag_renamed.drop("c_phone", "balance_flag_renamed")

Operazioni su righe

Spark offre molte operazioni di base per le righe:

Filtraggio delle righe

Per filtrare le righe, utilizzare il filter metodo o where in un dataframe per restituire solo determinate righe. Per identificare una colonna da filtrare, usare il col metodo o un'espressione che restituisce una colonna.

from pyspark.sql.functions import col

df_that_one_customer = df_customer.filter(col("c_custkey") == 412449)

Per filtrare in base a più condizioni, usare operatori logici. Ad esempio, & e | abilitare rispettivamente l'utente e AND OR le condizioni. Nell'esempio seguente vengono filtrate le righe in cui c_nationkey è uguale a 20 e c_acctbal è maggiore di 1000.

df_customer.filter((col("c_nationkey") == 20) & (col("c_acctbal") > 1000))
df_filtered_customer = df_customer.filter((col("c_custkey") == 412446) | (col("c_custkey") == 412447))

Rimuovere righe duplicate

Per deduplicare le righe, usare distinct, che restituisce solo le righe univoce.

df_unique = df_customer.distinct()

Gestire i valori Null

Per gestire i valori Null, eliminare righe contenenti valori Null usando il na.drop metodo . Questo metodo consente di specificare se si desidera eliminare righe contenenti any valori Null o all valori Null.

Per eliminare i valori Null, usare uno degli esempi seguenti.

df_customer_no_nulls = df_customer.na.drop()
df_customer_no_nulls = df_customer.na.drop("any")

Se invece si desidera filtrare solo le righe contenenti tutti i valori Null, usare quanto segue:

df_customer_no_nulls = df_customer.na.drop("all")

È possibile applicarlo per un subset di colonne specificando questo valore, come illustrato di seguito:

df_customer_no_nulls = df_customer.na.drop("all", subset=["c_acctbal", "c_custkey"])

Per compilare i valori mancanti, usare il fill metodo . È possibile scegliere di applicarlo a tutte le colonne o a un subset di colonne. Nell'esempio seguente i saldi di conto con valore Null per il saldo c_acctbal del conto vengono riempiti con 0.

df_customer_filled = df_customer.na.fill("0", subset=["c_acctbal"])

Per sostituire le stringhe con altri valori, usare il replace metodo . Nell'esempio seguente tutte le stringhe di indirizzo vuote vengono sostituite con la parola UNKNOWN:

df_customer_phone_filled = df_customer.na.replace([""], ["UNKNOWN"], subset=["c_phone"])

Accoda righe

Per accodare righe è necessario usare il union metodo per creare un nuovo dataframe. Nell'esempio seguente il dataframe creato in precedenza e df_filtered_customer vengono combinati, che restituisce un dataframe df_that_one_customer con tre clienti:

df_appended_rows = df_that_one_customer.union(df_filtered_customer)

display(df_appended_rows)

Nota

È anche possibile combinare dataframe scrivendoli in una tabella e quindi aggiungendo nuove righe. Per i carichi di lavoro di produzione, l'elaborazione incrementale delle origini dati in una tabella di destinazione può ridurre drasticamente la latenza e i costi di calcolo man mano che aumentano le dimensioni dei dati. Vedere Inserire dati in un databricks lakehouse.

Ordinare le righe

Importante

L'ordinamento può essere costoso su larga scala e, se si archiviano i dati ordinati e si ricaricano i dati con Spark, l'ordine non è garantito. Assicurati di essere intenzionale nell'uso dell'ordinamento.

Per ordinare le righe in base a una o più colonne, utilizzare il sort metodo o orderBy . Per impostazione predefinita, questi metodi vengono ordinati in ordine crescente:

df_customer.orderBy(col("c_acctbal"))

Per filtrare in ordine decrescente, usare desc:

df_customer.sort(col("c_custkey").desc())

L'esempio seguente illustra come ordinare in due colonne:

df_sorted = df_customer.orderBy(col("c_acctbal").desc(), col("c_custkey").asc())
df_sorted = df_customer.sort(col("c_acctbal").desc(), col("c_custkey").asc())

Per limitare il numero di righe da restituire dopo l'ordinamento del dataframe, usare il limit metodo . Nell'esempio seguente vengono visualizzati solo i risultati principali 10 :

display(df_sorted.limit(10))

Creare un join di dataframe

Per unire due o più dataframe, usare il join metodo . È possibile specificare la modalità di unione dei dataframe nel how (tipo di join) e on (su quali colonne basare i parametri di join). I tipi di join comuni includono:

  • inner: si tratta del tipo di join predefinito, che restituisce un dataframe che mantiene solo le righe in cui è presente una corrispondenza per il on parametro nei dataframe.
  • left: mantiene tutte le righe del primo dataframe specificato e solo le righe del secondo dataframe specificato che hanno una corrispondenza con la prima.
  • outer: un outer join mantiene tutte le righe di entrambi i dataframe indipendentemente dalla corrispondenza.

Per informazioni dettagliate sui join, vedere Usare join in Azure Databricks. Per un elenco dei join supportati in PySpark, vedere Join di dataframe.

Nell'esempio seguente viene restituito un singolo dataframe in cui ogni riga del orders dataframe viene unita alla riga corrispondente del customers dataframe. Viene usato un inner join, in quanto si prevede che ogni ordine corrisponda esattamente a un cliente.

df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')

df_joined = df_order.join(
  df_customer,
  on = df_order["o_custkey"] == df_customer["c_custkey"],
  how = "inner"
)

display(df_joined)

Per creare un join in più condizioni, usare rispettivamente operatori booleani come & e | per specificare AND e OR. Nell'esempio seguente viene aggiunta una condizione aggiuntiva, filtrando solo le righe con o_totalprice maggiore di 500,000:

df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')

df_complex_joined = df_order.join(
  df_customer,
  on = ((df_order["o_custkey"] == df_customer["c_custkey"]) & (df_order["o_totalprice"] > 500000)),
  how = "inner"
)

display(df_complex_joined)

Aggregare i dati

Per aggregare i dati in un dataframe, in modo analogo a in GROUP BY SQL, usare il groupBy metodo per specificare le colonne per raggruppare e il agg metodo per specificare le aggregazioni. Importare aggregazioni comuni, tra cui avg, maxsum, e min da pyspark.sql.functions. L'esempio seguente mostra il saldo medio dei clienti per segmento di mercato:

from pyspark.sql.functions import avg

# group by one column
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
    avg(df_customer["c_acctbal"])
)

display(df_segment_balance)
from pyspark.sql.functions import avg

# group by two columns
df_segment_nation_balance = df_customer.groupBy("c_mktsegment", "c_nationkey").agg(
    avg(df_customer["c_acctbal"])
)

display(df_segment_nation_balance)

Alcune aggregazioni sono azioni, il che significa che attivano i calcoli. In questo caso non è necessario usare altre azioni per restituire i risultati.

Per contare le righe in un dataframe, usare il count metodo :

df_customer.count()

Concatenamento delle chiamate

I metodi che trasformano i dataframe restituiscono dataframe e Spark non agisce sulle trasformazioni finché non vengono chiamate azioni. Questa valutazione differita consente di concatenare più metodi per praticità e leggibilità. Nell'esempio seguente viene illustrato come concatenare filtri, aggregazioni e ordinamento:

from pyspark.sql.functions import count

df_chained = (
    df_order.filter(col("o_orderstatus") == "F")
    .groupBy(col("o_orderpriority"))
    .agg(count(col("o_orderkey")).alias("n_orders"))
    .sort(col("n_orders").desc())
)

display(df_chained)

Visualizzare il dataframe

Per visualizzare un dataframe in un notebook, fare clic sul + segno accanto alla tabella in alto a sinistra del dataframe, quindi selezionare Visualizzazione per aggiungere uno o più grafici in base al dataframe. Per informazioni dettagliate sulle visualizzazioni, vedere Visualizzazioni nei notebook di Databricks.

display(df_order)

Per eseguire visualizzazioni aggiuntive, Databricks consiglia di usare l'API Pandas per Spark. .pandas_api() consente di eseguire il cast all'API pandas corrispondente per un dataframe Spark. Per altre informazioni, vedere API Pandas in Spark.

Salvare i dati

Dopo aver trasformato i dati, è possibile salvarli usando i DataFrameWriter metodi . Un elenco completo di questi metodi è disponibile in DataFrameWriter. Le sezioni seguenti illustrano come salvare il dataframe come tabella e come raccolta di file di dati.

Salvare il dataframe come tabella

Per salvare il dataframe come tabella in Unity Catalog, usare il write.saveAsTable metodo e specificare il percorso nel formato <catalog-name>.<schema-name>.<table-name>.

df_joined.write.saveAsTable(f"{catalog_name}.{schema_name}.{table_name}")

Scrivere il dataframe come CSV

Per scrivere il dataframe in *.csv formato, usare il write.csv metodo , specificando il formato e le opzioni. Per impostazione predefinita, se i dati sono presenti nel percorso specificato, l'operazione di scrittura ha esito negativo. È possibile specificare una delle modalità seguenti per eseguire un'azione diversa:

  • overwrite sovrascrive tutti i dati esistenti nel percorso di destinazione con il contenuto del dataframe.
  • append aggiunge il contenuto del dataframe ai dati nel percorso di destinazione.
  • ignore in modo invisibile all'utente la scrittura se i dati sono presenti nel percorso di destinazione.

L'esempio seguente illustra la sovrascrittura dei dati con il contenuto del dataframe come file CSV:

# Assign this variable your file path
file_path = ""

(df_joined.write
  .format("csv")
  .mode("overwrite")
  .write(file_path)
)

Passaggi successivi

Per sfruttare altre funzionalità spark in Databricks, vedere: