SparkR gebruiken
SparkR is een R-pakket dat een lichtgewicht front-end biedt voor het gebruik van Apache Spark van R. SparkR biedt een gedistribueerde implementatie van een gegevensframe die ondersteuning biedt voor bewerkingen zoals selectie, filteren, aggregatie, enzovoort. SparkR biedt ook ondersteuning voor gedistribueerde machine learning met MLlib.
SparkR gebruiken via Batch-taakdefinities van Spark of met interactieve Microsoft Fabric-notebooks.
R-ondersteuning is alleen beschikbaar in Spark3.1 of hoger. R in Spark 2.4 wordt niet ondersteund.
Vereisten
Haal een Microsoft Fabric-abonnement op. Of meld u aan voor een gratis proefversie van Microsoft Fabric.
Meld u aan bij Microsoft Fabric.
Gebruik de ervaringswisselaar aan de linkerkant van de startpagina om over te schakelen naar Fabric.
Open of maak een notitieblok. Zie Microsoft Fabric-notebooks gebruiken voor meer informatie.
Stel de taaloptie in op SparkR (R) om de primaire taal te wijzigen.
Koppel uw notitieblok aan een lakehouse. Selecteer aan de linkerkant Toevoegen om een bestaand lakehouse toe te voegen of om een lakehouse te maken.
SparkR DataFrames lezen en schrijven
Een SparkR DataFrame lezen vanuit een lokaal R data.frame
De eenvoudigste manier om een DataFrame te maken, is door een lokale R data.frame te converteren naar een Spark DataFrame.
# load SparkR pacakge
library(SparkR)
# read a SparkR DataFrame from a local R data.frame
df <- createDataFrame(faithful)
# displays the content of the DataFrame
display(df)
SparkR DataFrame lezen en schrijven vanuit Lakehouse
Gegevens kunnen worden opgeslagen in het lokale bestandssysteem van clusterknooppunten. De algemene methoden voor het lezen en schrijven van een SparkR DataFrame uit Lakehouse is read.df
en write.df
. Met deze methoden wordt het pad voor het bestand geladen en het type gegevensbron. SparkR biedt ondersteuning voor het lezen van CSV-, JSON-, tekst- en Parquet-bestanden.
Als u naar een Lakehouse wilt lezen en schrijven, voegt u deze eerst toe aan uw sessie. Selecteer aan de linkerkant van het notitieblok Toevoegen om een bestaand Lakehouse toe te voegen of een Lakehouse te maken.
Notitie
Als u Toegang wilt krijgen tot Lakehouse-bestanden met Behulp van Spark-pakketten, zoals read.df
of write.df
, gebruikt u het ADFS-pad of het relatieve pad voor Spark. Klik in Lakehouse Explorer met de rechtermuisknop op de bestanden of map die u wilt openen en kopieer het bijbehorende ADFS-pad of relatief pad voor Spark vanuit het contextmenu.
# write data in CSV using relative path for Spark
temp_csv_spark<-"Files/data/faithful.csv"
write.df(df, temp_csv_spark ,source="csv", mode = "overwrite", header = "true")
# read data in CSV using relative path for Spark
faithfulDF_csv <- read.df(temp_csv_spark, source= "csv", header = "true", inferSchema = "true")
# displays the content of the DataFrame
display(faithfulDF_csv)
# write data in parquet using ADFS path
temp_parquet_spark<-"abfss://xxx/xxx/data/faithful.parquet"
write.df(df, temp_parquet_spark ,source="parquet", mode = "overwrite", header = "true")
# read data in parquet uxing ADFS path
faithfulDF_pq <- read.df(temp_parquet_spark, source= "parquet", header = "true", inferSchema = "true")
# displays the content of the DataFrame
display(faithfulDF_pq)
Microsoft Fabric is tidyverse
vooraf geïnstalleerd. U hebt toegang tot Lakehouse-bestanden in uw vertrouwde R-pakketten, zoals het lezen en schrijven van Lakehouse-bestanden met en readr::read_csv()
readr::write_csv()
.
Notitie
Voor toegang tot Lakehouse-bestanden met behulp van R-pakketten moet u het bestands-API-pad gebruiken. Klik in Lakehouse Explorer met de rechtermuisknop op het bestand of de map die u wilt openen en kopieer het bestands-API-pad in het contextmenu.
# read data in CSV using API path
# To find the path, navigate to the csv file, right click, and Copy File API path.
temp_csv_api<-'/lakehouse/default/Files/data/faithful.csv/part-00000-d8e09a34-bd63-41bd-8cf8-f4ed2ef90e6c-c000.csv'
faithfulDF_API <- readr::read_csv(temp_csv_api)
# display the content of the R data.frame
head(faithfulDF_API)
U kunt ook een SparkR Dataframe in uw Lakehouse lezen met behulp van SparkSQL-query's.
# Regsiter ealier df as temp view
createOrReplaceTempView(df, "eruptions")
# Create a df using a SparkSQL query
waiting <- sql("SELECT * FROM eruptions")
head(waiting)
DataFrame-bewerkingen
SparkR DataFrames ondersteunen veel functies voor gestructureerde gegevensverwerking. Hier volgen enkele basisvoorbeelden. Een volledige lijst vindt u in de SparkR-API-documenten.
Rijen en kolommen selecteren
# Select only the "waiting" column
head(select(df,df$waiting))
# Pass in column name as strings
head(select(df, "waiting"))
# Filter to only retain rows with waiting times longer than 70 mins
head(filter(df, df$waiting > 70))
Groepering en aggregatie
SparkR-gegevensframes ondersteunen veel veelgebruikte functies voor het aggregeren van gegevens na groepering. We kunnen bijvoorbeeld een histogram berekenen van de wachttijd in de getrouwe gegevensset, zoals hieronder wordt weergegeven
# we use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
# we can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))
Kolombewerkingen
SparkR biedt veel functies die rechtstreeks kunnen worden toegepast op kolommen voor gegevensverwerking en aggregatie. In het volgende voorbeeld ziet u het gebruik van eenvoudige rekenkundige functies.
# convert waiting time from hours to seconds.
# you can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)
Door de gebruiker gedefinieerde functie toepassen
SparkR ondersteunt verschillende soorten door de gebruiker gedefinieerde functies:
Een functie uitvoeren op een grote gegevensset met dapply
of dapplyCollect
dapply
Een functie toepassen op elke partitie van een SparkDataFrame
. De functie die moet worden toegepast op elke partitie van de SparkDataFrame
en mag slechts één parameter hebben, waaraan een data.frame overeenkomt met elke partitie wordt doorgegeven. De uitvoer van de functie moet een data.frame
. Schema geeft de rijopmaak van het resulterende a SparkDataFrame
. Deze moet overeenkomen met gegevenstypen van geretourneerde waarde.
# convert waiting time from hours to seconds
df <- createDataFrame(faithful)
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
structField("waiting_secs", "double"))
# apply UDF to DataFrame
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))
dapplyCollect
Net als bij dapply past u een functie toe op elke partitie van een SparkDataFrame
en verzamelt u het resultaat terug. De uitvoer van de functie moet een data.frame
. Maar deze keer hoeft het schema niet te worden doorgegeven. Houd er rekening mee dat dapplyCollect
dit kan mislukken als de uitvoer van de functie op alle partities niet kan worden opgehaald naar het stuurprogramma en in het geheugen van het stuurprogramma past.
# convert waiting time from hours to seconds
# apply UDF to DataFrame and return a R's data.frame
ldf <- dapplyCollect(
df,
function(x) {
x <- cbind(x, "waiting_secs" = x$waiting * 60)
})
head(ldf, 3)
Een functie uitvoeren op een grote gegevenssetgroepering op basis van invoerkolom(s) met gapply
of gapplyCollect
gapply
Een functie toepassen op elke groep van een SparkDataFrame
. De functie moet worden toegepast op elke groep van de SparkDataFrame
en mag slechts twee parameters hebben: groeperingssleutel en R data.frame
die overeenkomen met die sleutel. De groepen worden gekozen uit SparkDataFrames
kolommen. De uitvoer van de functie moet een data.frame
. Schema geeft de rijopmaak van het resulterende SparkDataFrame
. Het moet het uitvoerschema van de R-functie van Spark-gegevenstypen vertegenwoordigen. De kolomnamen van het geretourneerde data.frame
bestand worden ingesteld door de gebruiker.
# determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
df,
"waiting",
function(key, x) {
y <- data.frame(key, max(x$eruptions))
},
schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))
gapplyCollect
Hiermee gapply
past u een functie toe op elke groep van een SparkDataFrame
en verzamelt u het resultaat terug op R data.frame
. De uitvoer van de functie moet een data.frame
. Het schema hoeft echter niet te worden doorgegeven. Houd er rekening mee dat gapplyCollect
dit kan mislukken als de uitvoer van de functie op alle partities niet kan worden opgehaald naar het stuurprogramma en in het geheugen van het stuurprogramma past.
# determine six waiting times with the largest eruption time in minutes.
result <- gapplyCollect(
df,
"waiting",
function(key, x) {
y <- data.frame(key, max(x$eruptions))
colnames(y) <- c("waiting", "max_eruption")
y
})
head(result[order(result$max_eruption, decreasing = TRUE), ])
Lokale R-functies uitvoeren die zijn gedistribueerd met spark.lapply
spark.lapply
lapply
Net als in systeemeigen R voert spark.lapply
u een functie uit via een lijst met elementen en distribueert u de berekeningen met Spark. Hiermee past u een functie toe op een manier die vergelijkbaar is met doParallel
of lapply
met elementen van een lijst. De resultaten van alle berekeningen moeten in één machine passen. Als dat niet het geval is, kunnen ze iets als df <- createDataFrame(list)
en dan gebruiken dapply
.
# perform distributed training of multiple models with spark.lapply. Here, we pass
# a read-only list of arguments which specifies family the generalized linear model should be.
families <- c("gaussian", "poisson")
train <- function(family) {
model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
summary(model)
}
# return a list of model's summaries
model.summaries <- spark.lapply(families, train)
# print the summary of each model
print(model.summaries)
SQL-query's uitvoeren vanuit SparkR
Een SparkR DataFrame kan ook worden geregistreerd als een tijdelijke weergave waarmee u SQL-query's kunt uitvoeren op de gegevens. Met de sql-functie kunnen toepassingen programmatisch SQL-query's uitvoeren en wordt het resultaat geretourneerd als een SparkR DataFrame.
# Register earlier df as temp view
createOrReplaceTempView(df, "eruptions")
# Create a df using a SparkSQL query
waiting <- sql("SELECT waiting FROM eruptions where waiting>70 ")
head(waiting)
Machinelearning
SparkR maakt de meeste MLLib-algoritmen beschikbaar. SparkR gebruikt MLlib om het model te trainen.
In het volgende voorbeeld ziet u hoe u een Gaussian GLM-model bouwt met behulp van SparkR. Als u lineaire regressie wilt uitvoeren, stelt u de familie in op "gaussian"
. Als u logistieke regressie wilt uitvoeren, stelt u familie in op "binomial"
. Wanneer u SparkML GLM
SparkR gebruikt, wordt automatisch één dynamische codering van categorische functies uitgevoerd, zodat deze niet handmatig hoeft te worden uitgevoerd. Naast tekenreeks- en dubbeltypefuncties is het ook mogelijk om te passen op MLlib Vector-functies, voor compatibiliteit met andere MLlib-onderdelen.
Raadpleeg de documentatie voor SparkR en MLlib voor meer informatie over welke machine learning-algoritmen worden ondersteund.
# create the DataFrame
cars <- cbind(model = rownames(mtcars), mtcars)
carsDF <- createDataFrame(cars)
# fit a linear model over the dataset.
model <- spark.glm(carsDF, mpg ~ wt + cyl, family = "gaussian")
# model coefficients are returned in a similar format to R's native glm().
summary(model)