Condividi tramite


Usare SparkR

SparkR è un pacchetto R che fornisce un front-end leggero per l'uso di Apache Spark da R. SparkR fornisce un'implementazione di frame di dati distribuita che supporta operazioni come selezione, filtro, aggregazione e così via. SparkR supporta anche l'apprendimento automatico distribuito con MLlib.

Usare SparkR tramite definizioni dei processi batch di Spark o con notebook interattivi di Microsoft Fabric.

Il supporto per R è disponibile solo su Spark3.1 o versioni successive. R su Spark 2.4 non è supportato.

Prerequisiti

  • Aprire o creare un notebook. Per istruzioni, vedere Come usare i notebook di Microsoft Fabric.

  • Impostare l'opzione del linguaggio su SparkR (R) per modificare il linguaggio primario.

  • Collegare il notebook a un lakehouse. Sul lato sinistro, selezionare Aggiungi per aggiungere un lakehouse esistente o per crearne uno nuovo.

Leggere e scrivere i DataFramce SparkR

Leggere un DataFrame SparkR da un data.frame R locale

Il modo più semplice per creare un DataFrame è convertire un data.frame R locale in un DataFrame Spark.

# load SparkR pacakge
library(SparkR)

# read a SparkR DataFrame from a local R data.frame
df <- createDataFrame(faithful)

# displays the content of the DataFrame
display(df)

Leggere e scrivere un DataFrame SparkR da Lakehouse

I dati possono essere archiviati nel file system locale dei nodi del cluster. I metodi generici per leggere e scrivere un DataFrame SparkR da Lakehouse sono read.df e write.df. Questi metodi richiedono il percorso del file da caricare e il tipo di origine dati. SparkR supporta la lettura di file CSV, JSON, testo e Parquet in modo nativo.

Per leggere e scrivere in una Lakehouse, per prima cosa occorre aggiungerla alla sessione. Sul lato sinistro del notebook, selezionare Aggiungi per aggiungere una Lakehouse esistente o per crearne una nuova.

Nota

Per accedere ai file Lakehouse usando pacchetti Spark, come read.df o write.df, usare il percorso file system distribuito di Azure o il percorso relativo per Spark. Nell'explorer di Lakehouse, fare clic con il pulsante destro del mouse sui file o sulla cartella a cui si vuole accedere e copiarne il percorso del file system distribuito di Azure o il percorso relativo per Spark dal menu contestuale.

# write data in CSV using relative path for Spark
temp_csv_spark<-"Files/data/faithful.csv"
write.df(df, temp_csv_spark ,source="csv", mode = "overwrite", header = "true")

# read data in CSV using relative path for Spark
faithfulDF_csv <- read.df(temp_csv_spark, source= "csv", header = "true", inferSchema = "true")

# displays the content of the DataFrame
display(faithfulDF_csv)
# write data in parquet using ADFS path
temp_parquet_spark<-"abfss://xxx/xxx/data/faithful.parquet"
write.df(df, temp_parquet_spark ,source="parquet", mode = "overwrite", header = "true")

# read data in parquet uxing ADFS path
faithfulDF_pq <- read.df(temp_parquet_spark, source= "parquet", header = "true", inferSchema = "true")

# displays the content of the DataFrame
display(faithfulDF_pq)

Microsoft Fabric ha tidyverse preinstallato. È possibile accedere ai file Lakehouse utilizzando i pacchetti R familiari, come la lettura e la scrittura di file Lakehouse usando readr::read_csv() e readr::write_csv().

Nota

Per accedere ai file di Lakehouse usando i pacchetti R, è necessario usare il percorso del file API. Nell'explorer di Lakehouse, fare clic con il pulsante destro del mouse sul file o sulla cartella a cui si vuole accedere e copiarne il percorso del file API dal menu contestuale.

# read data in CSV using API path
# To find the path, navigate to the csv file, right click, and  Copy File API path.
temp_csv_api<-'/lakehouse/default/Files/data/faithful.csv/part-00000-d8e09a34-bd63-41bd-8cf8-f4ed2ef90e6c-c000.csv'
faithfulDF_API <- readr::read_csv(temp_csv_api)

# display the content of the R data.frame
head(faithfulDF_API)

È anche possibile leggere un DataFrame SparkR sul Lakehouse usando query SparkSQL.

# Regsiter ealier df as temp view
createOrReplaceTempView(df, "eruptions")

# Create a df using a SparkSQL query
waiting <- sql("SELECT * FROM eruptions")

head(waiting)

Operazioni dei DataFrame

I DataFrame SparkR supportano molte funzioni per eseguire l'elaborazione dei dati strutturati. Ecco alcuni esempi di base. Un elenco completo è disponibile nella documentazione API di SparkR.

Selezionare righe e colonne

# Select only the "waiting" column
head(select(df,df$waiting))
# Pass in column name as strings
head(select(df, "waiting"))
# Filter to only retain rows with waiting times longer than 70 mins
head(filter(df, df$waiting > 70))

Raggruppamento e aggregazioni

I frame di dati SparkR supportano molte funzioni di uso comune per aggregare i dati dopo il loro raggruppamento. Ad esempio, è possibile calcolare un istogramma del tempo di attesa nel set di dati faithful, come illustrato di seguito

# we use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
# we can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))

Operazioni su colonne

SparkR offre molte funzioni che possono essere applicate direttamente alle colonne per l'aggregazione e l'elaborazione dei dati. L'esempio seguente mostra come usare le funzioni aritmetiche di base.

# convert waiting time from hours to seconds.
# you can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)

Applicare funzioni definite dall'utente

SparkR supporta diversi tipi di funzioni definite dall'utente:

Eseguire una funzione in un set di dati di grandi dimensioni con dapply o dapplyCollect

dapply

Applicare una funzione a ciascuna partizione di un SparkDataFrame. La funzione da applicare a ogni partizione del SparkDataFrame deve avere un solo parametro, al quale verrà passato un data.frame corrispondente a ciascuna partizione. L'output della funzione deve essere un data.frame. Lo schema specifica il formato delle righe del SparkDataFrame risultante. Deve corrispondere ai tipi di dati del valore restituito.

# convert waiting time from hours to seconds
df <- createDataFrame(faithful)
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
                     structField("waiting_secs", "double"))

# apply UDF to DataFrame
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))

dapplyCollect

Analogamente a dapply, applicare una funzione a ciascuna partizione di un SparkDataFrame e restituire il risultato. L'output della funzione deve essere un data.frame. Tuttavia, questa volta, non è necessario fornire lo schema. Si noti che dapplyCollect può avere esito negativo se gli output della funzione vengono eseguiti su tutte le partizioni non possono essere trasferiti nel driver ed essere contenuti nella memoria del driver.

# convert waiting time from hours to seconds
# apply UDF to DataFrame and return a R's data.frame
ldf <- dapplyCollect(
         df,
         function(x) {
           x <- cbind(x, "waiting_secs" = x$waiting * 60)
         })
head(ldf, 3)

Eseguire una funzione su un set di dati di grandi dimensioni raggruppando per una o più colonne di input con gapply o gapplyCollect

gapply

Applicare una funzione a ciascun gruppo di un SparkDataFrame. La funzione deve essere applicata a ogni gruppo del SparkDataFrame e deve avere solo due parametri: la chiave di raggruppamento e data.frame R corrispondente a quella chiave. I gruppi vengono scelti da una o più colonne del SparkDataFrames. L'output della funzione deve essere un data.frame. Lo schema specifica il formato delle righe del SparkDataFrame risultante. Deve rappresentare lo schema di output della funzione R basato sui tipi di dati di Spark. I nomi delle colonne del data.frame restituito vengono impostati dall'utente.

# determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
    },
    schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))

gapplyCollect

Analogamente a gapply, applicare una funzione a ciascun gruppo di un SparkDataFrame e restituire il risultato nel data.frame R. L'output della funzione deve essere un data.frame. Ma non è necessario fornire lo schema. Si noti che gapplyCollect può avere esito negativo se gli output della funzione vengono eseguiti su tutte le partizioni non possono essere trasferiti nel driver ed essere contenuti nella memoria del driver.

# determine six waiting times with the largest eruption time in minutes.
result <- gapplyCollect(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
        colnames(y) <- c("waiting", "max_eruption")
        y
    })
head(result[order(result$max_eruption, decreasing = TRUE), ])

Eseguire funzioni locali R distribuite con spark.lapply

spark.lapply

Analogamente a lapply in R nativo, spark.lapply esegue una funzione su un elenco di elementi e distribuisce i calcoli con Spark. Applica una funzione in modo simile a doParallel o lapply sugli elementi di un elenco. I risultati di tutti i calcoli devono poter essere contenute in un singolo computer. In caso contrario, è possibile eseguire operazioni come df <- createDataFrame(list) e successivamente usare dapply.

# perform distributed training of multiple models with spark.lapply. Here, we pass
# a read-only list of arguments which specifies family the generalized linear model should be.
families <- c("gaussian", "poisson")
train <- function(family) {
  model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
  summary(model)
}
# return a list of model's summaries
model.summaries <- spark.lapply(families, train)

# print the summary of each model
print(model.summaries)

Eseguire query SQL da SparkR

Un DataFrame SparkR può anche essere registrato come visualizzazione temporanea che consente l'esecuzione di query SQL sui relativi dati. La funzione sql consente alle applicazioni l'esecuzione di query SQL a livello di codice e restituisce il risultato come un DataFrame SparkR.

# Register earlier df as temp view
createOrReplaceTempView(df, "eruptions")

# Create a df using a SparkSQL query
waiting <- sql("SELECT waiting FROM eruptions where waiting>70 ")

head(waiting)

Apprendimento automatico

SparkR espone la maggior parte degli algoritmi MLLib. SparkR usa MLlib per eseguire il training del modello.

L'esempio seguente mostra come fare a creare un modello GLM gaussiano usando SparkR. Per eseguire la regressione lineare, impostare la famiglia su "gaussian". Per eseguire la regressione logistica, impostare la famiglia su "binomial". Quando si usa SparkML GLM SparkR esegue automaticamente la codifica one-hot delle funzionalità categoriche in modo che non sia necessario farlo manualmente. Oltre alle caratteristiche di tipo String e Double, è anche possibile adattare il modello alle caratteristiche MLlib Vector, per garantire la compatibilità con altri componenti MLlib.

Per altre informazioni sugli algoritmi di Machine Learning supportati, vedere la documentazione per SparkR e MLlib.

# create the DataFrame
cars <- cbind(model = rownames(mtcars), mtcars)
carsDF <- createDataFrame(cars)

# fit a linear model over the dataset.
model <- spark.glm(carsDF, mpg ~ wt + cyl, family = "gaussian")

# model coefficients are returned in a similar format to R's native glm().
summary(model)