Verwenden von SparkR
SparkR ist ein R-Paket, das ein schlankes Front-End für die Nutzung von Apache Spark in R bereitstellt. SparkR ermöglicht eine verteilte Datenrahmenimplementierung, die Vorgänge wie Auswählen, Filtern, Aggregieren usw. unterstützt. SparkR unterstützt außerdem verteiltes maschinelles Lernen mithilfe von MLlib.
Verwenden Sie SparkR über Spark-Batchauftragsdefinitionen oder mit interaktiven Microsoft Fabric-Notebooks.
R-Unterstützung ist nur in Spark3.1 oder höher verfügbar. R in Spark 2.4 wird nicht unterstützt.
Voraussetzungen
Erwerben Sie ein Microsoft Fabric-Abonnement. Registrieren Sie sich alternativ für eine kostenlose Microsoft Fabric-Testversion.
Melden Sie sich bei Microsoft Fabric an.
Wechseln Sie zur Synapse-Data Science-Benutzeroberfläche, indem Sie den Umschalter für die Benutzeroberfläche auf der linken Seite Ihrer Homepage verwenden.
Öffnen oder erstellen Sie ein Notebook. Informationen dazu finden Sie unter Verwenden von Microsoft Fabric-Notebooks.
Legen Sie zum Ändern der primären Sprache die Sprachoption auf SparkR (R) fest.
Verbinden Sie Ihr Notebook mit einem Lakehouse. Wählen Sie auf der linken Seite Hinzufügen aus, um ein vorhandenes Lakehouse hinzuzufügen oder ein Lakehouse zu erstellen.
Lesen und Schreiben von SparkR-Dataframes
Erstellen eines SparkR-Dataframe aus einem lokalen R-Dataframe (data.frame)
Die einfachste Möglichkeit zum Erstellen eines Dataframe besteht in der Konvertierung eines lokalen R-Dataframe (data.frame) in SparkDataFrame.
# load SparkR pacakge
library(SparkR)
# read a SparkR DataFrame from a local R data.frame
df <- createDataFrame(faithful)
# displays the content of the DataFrame
display(df)
Lesen und Schreiben von SparkR Dataframes aus Lakehouse
Daten können im lokalen Dateisystem von Clusterknoten gespeichert werden. Die allgemeinen Methoden zum Lesen und Schreiben eines SparkR-Dataframe aus Lakehouse sind read.df
und write.df
. Diese Methoden verwenden den Pfad der zu ladenden Datei und den Typ der Datenquelle. SparkR unterstützt nativ das Lesen von CSV-, JSON-, Text- und Parquet-Dateien.
Um in einem Lakehouse zu lesen und zu schreiben, fügen Sie das Lakehouse zuerst zu Ihrer Sitzung hinzu. Wählen Sie auf der linken Seite des Notebooks die Option Hinzufügen aus, um ein vorhandenes Lakehouse hinzuzufügen oder ein neues zu erstellen.
Hinweis
Um mithilfe von Spark-Paketen wie read.df
oder write.df
auf Lakehouse-Dateien zuzugreifen, verwenden Sie den ADFS-Pfad oder den relativen Pfad für Spark. Klicken Sie im Lakehouse-Explorer mit der rechten Maustaste auf die Dateien oder den Ordner, auf die bzw. den Sie zugreifen möchten, und kopieren Sie den ADFS-Pfad oder den relativen Pfad für Spark aus dem Kontextmenü.
# write data in CSV using relative path for Spark
temp_csv_spark<-"Files/data/faithful.csv"
write.df(df, temp_csv_spark ,source="csv", mode = "overwrite", header = "true")
# read data in CSV using relative path for Spark
faithfulDF_csv <- read.df(temp_csv_spark, source= "csv", header = "true", inferSchema = "true")
# displays the content of the DataFrame
display(faithfulDF_csv)
# write data in parquet using ADFS path
temp_parquet_spark<-"abfss://xxx/xxx/data/faithful.parquet"
write.df(df, temp_parquet_spark ,source="parquet", mode = "overwrite", header = "true")
# read data in parquet uxing ADFS path
faithfulDF_pq <- read.df(temp_parquet_spark, source= "parquet", header = "true", inferSchema = "true")
# displays the content of the DataFrame
display(faithfulDF_pq)
In Microsoft Fabric ist tidyverse
vorinstalliert. Sie können in Ihren vertrauten R-Paketen auf Lakehouse-Dateien zugreifen und können z. B. Lakehouse-Dateien mit readr::read_csv()
und readr::write_csv()
lesen und schreiben.
Hinweis
Um mithilfe von R-Paketen auf Lakehouse-Dateien zuzugreifen, müssen Sie den Datei-API-Pfad verwenden. Klicken Sie im Lakehouse-Explorer mit der rechten Maustaste auf die Datei oder den Ordner, auf die bzw. den Sie zugreifen möchten, und kopieren Sie den Datei-API-Pfad aus dem Kontextmenü.
# read data in CSV using API path
# To find the path, navigate to the csv file, right click, and Copy File API path.
temp_csv_api<-'/lakehouse/default/Files/data/faithful.csv/part-00000-d8e09a34-bd63-41bd-8cf8-f4ed2ef90e6c-c000.csv'
faithfulDF_API <- readr::read_csv(temp_csv_api)
# display the content of the R data.frame
head(faithfulDF_API)
Sie können auch einen SparkR-Dataframe in Ihrem Lakehouse mithilfe von SparkSQL-Abfragen lesen.
# Regsiter ealier df as temp view
createOrReplaceTempView(df, "eruptions")
# Create a df using a SparkSQL query
waiting <- sql("SELECT * FROM eruptions")
head(waiting)
DataFrame-Vorgänge
Spark-DataFrames unterstützen viele Funktionen für die strukturierte Datenverarbeitung. Es folgen einige einfache Beispiele. Eine vollständige Liste finden Sie in der API-Dokumentation für SparkR.
Auswählen von Zeilen und Spalten
# Select only the "waiting" column
head(select(df,df$waiting))
# Pass in column name as strings
head(select(df, "waiting"))
# Filter to only retain rows with waiting times longer than 70 mins
head(filter(df, df$waiting > 70))
Gruppierung und Aggregation
SparkR-Dataframes unterstützen viele gängige Funktionen zum Aggregieren von Daten nach der Gruppierung. Beispielsweise können wir wie unten gezeigt ein Histogramm der Wartezeit im originalgetreuen Dataset berechnen.
# we use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
# we can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))
Spaltenvorgänge (Column operations)
SparkR bietet viele Funktionen zur Datenverarbeitung und -aggregation, die direkt auf Spalten angewendet werden können. Das folgende Beispiel zeigt die Verwendung grundlegender arithmetischer Funktionen.
# convert waiting time from hours to seconds.
# you can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)
Anwenden einer benutzerdefinierten Funktion
SparkR unterstützt verschiedene Arten von benutzerdefinierten Funktionen:
Ausführen einer Funktion für ein großes Dataset mit dapply
oder dapplyCollect
dapply
Wenden Sie eine Funktion auf jede Partition eines SparkDataFrame
an. Die Funktion, die auf jede Partition von SparkDataFrame
angewendet werden soll, sollte nur über einen Parameter verfügen. An diesen wird ein Dataframe übergeben, der jeder Partition entspricht. Die Ausgabe der Funktion sollte ein data.frame
sein. Das Schema gibt das Zeilenformat des resultierenden SparkDataFrame
an. Es muss mit den Datentypen des zurückgegebenen Werts übereinstimmen.
# convert waiting time from hours to seconds
df <- createDataFrame(faithful)
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
structField("waiting_secs", "double"))
# apply UDF to DataFrame
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))
dapplyCollect
Wenden Sie wie bei dapply eine Funktion auf jede Partition eines SparkDataFrame
an, und rufen Sie das Ergebnis ab. Die Ausgabe der Funktion sollte ein data.frame
sein. Dieses Mal muss das Schema jedoch nicht übergeben werden. Beachten Sie, dass dapplyCollect
fehlschlagen kann, wenn die Ausgaben der Funktion, die auf der gesamten Partition ausgeführt werden, nicht in den Treiber gepullt werden können und nicht in den Treiberspeicher passen.
# convert waiting time from hours to seconds
# apply UDF to DataFrame and return a R's data.frame
ldf <- dapplyCollect(
df,
function(x) {
x <- cbind(x, "waiting_secs" = x$waiting * 60)
})
head(ldf, 3)
Ausführen einer Funktion für ein großes Dataset mit Gruppierung nach Eingabespalte(n) mit gapply
oder gapplyCollect
gapply
Wenden Sie eine Funktion auf jede SparkDataFrame
-Gruppe an. Die Funktion soll auf jede Gruppe von SparkDataFrame
angewendet werden und sollte nur zwei Parameter haben: den Gruppierungsschlüssel und den data.frame
von R, der diesem Schlüssel entspricht. Die Gruppen werden aus den SparkDataFrames
-Spalten ausgewählt. Die Ausgabe der Funktion sollte ein data.frame
sein. Das Schema gibt das Zeilenformat des resultierenden SparkDataFrame
an. Es muss das Ausgabeschema der R-Funktion aus den Spark-Datentypen entsprechen. Die Spaltennamen des zurückgegebenen data.frame
werden vom Benutzer bzw. der Benutzerin festgelegt.
# determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
df,
"waiting",
function(key, x) {
y <- data.frame(key, max(x$eruptions))
},
schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))
gapplyCollect
Wie bei gapply
wird eine Funktion auf jede SparkDataFrame
-Gruppe angewendet, und die Ergebnisse werden zurück an R-data.frame
übergeben. Die Ausgabe der Funktion sollte ein data.frame
sein. Das Schema muss jedoch nicht übergeben werden. Beachten Sie, dass gapplyCollect
fehlschlagen kann, wenn die Ausgaben der Funktion, die auf der gesamten Partition ausgeführt werden, nicht in den Treiber gepullt werden können und nicht in den Treiberspeicher passen.
# determine six waiting times with the largest eruption time in minutes.
result <- gapplyCollect(
df,
"waiting",
function(key, x) {
y <- data.frame(key, max(x$eruptions))
colnames(y) <- c("waiting", "max_eruption")
y
})
head(result[order(result$max_eruption, decreasing = TRUE), ])
Ausführen lokaler R-Funktionen, verteilt mit „spark.lapply“
spark.lapply
Ähnlich wie bei lapply
wird in nativen R-Elementen von spark.lapply
eine Funktion für eine Liste von Elementen ausgeführt, und die Berechnungen werden mit Spark verteilt. Es wird eine Funktion ähnlich wie bei doParallel
und lapply
auf Elemente einer Liste angewendet. Der Speicherplatz eines einzelnen Computers sollte für alle Berechnungen ausreichend sein. Wenn dies nicht der Fall ist, können sie beispielsweise df <- createDataFrame(list)
anwenden und anschließend dapply
verwenden.
# perform distributed training of multiple models with spark.lapply. Here, we pass
# a read-only list of arguments which specifies family the generalized linear model should be.
families <- c("gaussian", "poisson")
train <- function(family) {
model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
summary(model)
}
# return a list of model's summaries
model.summaries <- spark.lapply(families, train)
# print the summary of each model
print(model.summaries)
Ausführen von SQL-Abfragen über SparkR
Ein SparkR-Dataframe kann auch als temporäre Ansicht registriert werden, mit der Sie SQL-Abfragen über die Daten ausführen können. Die SQL-Funktion ermöglicht es Anwendungen, SQL-Abfragen programmgesteuert auszuführen, und gibt das Ergebnis als SparkR-Dataframe zurück.
# Register earlier df as temp view
createOrReplaceTempView(df, "eruptions")
# Create a df using a SparkSQL query
waiting <- sql("SELECT waiting FROM eruptions where waiting>70 ")
head(waiting)
Machine Learning
SparkR macht die meisten MLLib-Algorithmen verfügbar. Im Hintergrund verwendet SparkR die MLlib, um das Modell zu trainieren.
Das folgende Beispiel zeigt, wie Sie mit SparkR ein gaußsches generalisiertes lineares Modell (GLM) erstellen. Legen Sie zum Ausführen einer linearen Regression „family“ auf "gaussian"
fest. Legen Sie zum Ausführen einer logistischen Regression „family“ auf "binomial"
fest. Bei Verwendung von SparkML GLM
führt SparkR automatisch eine One-Hot-Codierung kategorischer Features durch, sodass diese nicht manuell erfolgen muss. Neben den Typfeatures „String“ und „Double“ ist es auch möglich, MLlib Vector-Features einzupassen, um die Kompatibilität mit anderen MLlib-Komponenten zu gewährleisten.
Weitere Informationen dazu, welche Algorithmen für maschinelles Lernen unterstützt werden, finden Sie in der Dokumentation für SparkR und MLlib.
# create the DataFrame
cars <- cbind(model = rownames(mtcars), mtcars)
carsDF <- createDataFrame(cars)
# fit a linear model over the dataset.
model <- spark.glm(carsDF, mpg ~ wt + cyl, family = "gaussian")
# model coefficients are returned in a similar format to R's native glm().
summary(model)