Använda SparkR
SparkR är ett R-paket som tillhandahåller en lättviktsklientdel för användning av Apache Spark från R. SparkR tillhandahåller en distribuerad implementering av dataramar som stöder åtgärder som val, filtrering, aggregering osv. SparkR stöder även distribuerad maskininlärning med MLlib.
Använd SparkR via Spark batch-jobbdefinitioner eller med interaktiva Microsoft Fabric-notebook-filer.
R-stöd är endast tillgängligt i Spark3.1 eller senare. R i Spark 2.4 stöds inte.
Förutsättningar
Skaffa en Microsoft Fabric-prenumeration. Eller registrera dig för en kostnadsfri utvärderingsversion av Microsoft Fabric.
Logga in på Microsoft Fabric.
Använd upplevelseväxlaren till vänster på startsidan för att växla till Synapse Datavetenskap upplevelse.
Öppna eller skapa en notebook-fil. Mer information finns i Använda Microsoft Fabric-notebook-filer.
Ange språkalternativet SparkR (R) för att ändra det primära språket.
Bifoga anteckningsboken till ett sjöhus. Till vänster väljer du Lägg till för att lägga till ett befintligt sjöhus eller för att skapa ett sjöhus.
Läsa och skriva SparkR DataFrames
Läsa en SparkR DataFrame från en lokal R-dataram
Det enklaste sättet att skapa en DataFrame är att konvertera en lokal R-data.frame till en Spark DataFrame.
# load SparkR pacakge
library(SparkR)
# read a SparkR DataFrame from a local R data.frame
df <- createDataFrame(faithful)
# displays the content of the DataFrame
display(df)
Läsa och skriva SparkR DataFrame från Lakehouse
Data kan lagras på det lokala filsystemet för klusternoder. De allmänna metoderna för att läsa och skriva en SparkR DataFrame från Lakehouse är read.df
och write.df
. De här metoderna använder sökvägen för att filen ska läsas in och typen av datakälla. SparkR stöder läsning av CSV-, JSON-, text- och Parquet-filer internt.
Om du vill läsa och skriva till en Lakehouse lägger du först till den i sessionen. Till vänster i anteckningsboken väljer du Lägg till för att lägga till ett befintligt Lakehouse eller skapa ett Lakehouse.
Kommentar
Om du vill komma åt Lakehouse-filer med spark-paket, till exempel read.df
eller write.df
, använder du dess ADFS-sökväg eller relativa sökväg för Spark. I Lakehouse-utforskaren högerklickar du på de filer eller mappar som du vill komma åt och kopierar dess ADFS-sökväg eller relativa sökväg för Spark från snabbmenyn.
# write data in CSV using relative path for Spark
temp_csv_spark<-"Files/data/faithful.csv"
write.df(df, temp_csv_spark ,source="csv", mode = "overwrite", header = "true")
# read data in CSV using relative path for Spark
faithfulDF_csv <- read.df(temp_csv_spark, source= "csv", header = "true", inferSchema = "true")
# displays the content of the DataFrame
display(faithfulDF_csv)
# write data in parquet using ADFS path
temp_parquet_spark<-"abfss://xxx/xxx/data/faithful.parquet"
write.df(df, temp_parquet_spark ,source="parquet", mode = "overwrite", header = "true")
# read data in parquet uxing ADFS path
faithfulDF_pq <- read.df(temp_parquet_spark, source= "parquet", header = "true", inferSchema = "true")
# displays the content of the DataFrame
display(faithfulDF_pq)
Microsoft Fabric har tidyverse
förinstallerats. Du kan komma åt Lakehouse-filer i dina välbekanta R-paket, till exempel att läsa och skriva Lakehouse-filer med hjälp av readr::read_csv()
och readr::write_csv()
.
Kommentar
Om du vill komma åt Lakehouse-filer med R-paket måste du använda sökvägen till fil-API:et. I Lakehouse-utforskaren högerklickar du på den fil eller mapp som du vill komma åt och kopierar sökvägen till fil-API:et från snabbmenyn.
# read data in CSV using API path
# To find the path, navigate to the csv file, right click, and Copy File API path.
temp_csv_api<-'/lakehouse/default/Files/data/faithful.csv/part-00000-d8e09a34-bd63-41bd-8cf8-f4ed2ef90e6c-c000.csv'
faithfulDF_API <- readr::read_csv(temp_csv_api)
# display the content of the R data.frame
head(faithfulDF_API)
Du kan också läsa en SparkR-dataram i Lakehouse med hjälp av SparkSQL-frågor.
# Regsiter ealier df as temp view
createOrReplaceTempView(df, "eruptions")
# Create a df using a SparkSQL query
waiting <- sql("SELECT * FROM eruptions")
head(waiting)
DataFrame-åtgärder
SparkR DataFrames har stöd för många funktioner för strukturerad databearbetning. Här följer några grundläggande exempel. En fullständig lista finns i SparkR API-dokumenten.
Markera rader och kolumner
# Select only the "waiting" column
head(select(df,df$waiting))
# Pass in column name as strings
head(select(df, "waiting"))
# Filter to only retain rows with waiting times longer than 70 mins
head(filter(df, df$waiting > 70))
Gruppering och sammansättning
SparkR-dataramar stöder många vanliga funktioner för att aggregera data efter gruppering. Vi kan till exempel beräkna ett histogram över väntetiden i den trofasta datauppsättningen enligt nedan
# we use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
# we can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))
Kolumnåtgärder
SparkR innehåller många funktioner som kan tillämpas direkt på kolumner för databearbetning och aggregering. I följande exempel visas användningen av grundläggande aritmetiska funktioner.
# convert waiting time from hours to seconds.
# you can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)
Använda användardefinierad funktion
SparkR stöder flera typer av användardefinierade funktioner:
Kör en funktion på en stor datamängd med dapply
eller dapplyCollect
dapply
Tillämpa en funktion på varje partition i en SparkDataFrame
. Den funktion som ska tillämpas på varje partition av SparkDataFrame
och ska bara ha en parameter, som en data.frame motsvarar varje partition skickas till. Funktionens utdata ska vara en data.frame
. Schemat anger radformatet för den resulterande SparkDataFrame
. Den måste matcha datatyperna för returnerade värden.
# convert waiting time from hours to seconds
df <- createDataFrame(faithful)
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
structField("waiting_secs", "double"))
# apply UDF to DataFrame
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))
dapplyCollect
Precis som dapply tillämpar du en funktion på varje partition i en SparkDataFrame
och samlar in resultatet tillbaka. Funktionens utdata ska vara en data.frame
. Men den här gången krävs inte schemat för att skickas. Observera att dapplyCollect
det kan misslyckas om utdata från funktionen körs på hela partitionen inte kan hämtas till drivrutinen och får plats i drivrutinsminnet.
# convert waiting time from hours to seconds
# apply UDF to DataFrame and return a R's data.frame
ldf <- dapplyCollect(
df,
function(x) {
x <- cbind(x, "waiting_secs" = x$waiting * 60)
})
head(ldf, 3)
Kör en funktion på en stor datamängdsgruppering efter indatakolumner med gapply
eller gapplyCollect
gapply
Tillämpa en funktion på varje grupp av en SparkDataFrame
. Funktionen ska tillämpas på varje grupp av SparkDataFrame
och ska bara ha två parametrar: grupperingsnyckel och R data.frame
som motsvarar den nyckeln. Grupperna väljs från SparkDataFrames
kolumner. Funktionens utdata ska vara en data.frame
. Schemat anger radformatet för det resulterande SparkDataFrame
. Den måste representera R-funktionens utdataschema från Spark-datatyper. Kolumnnamnen för de returnerade data.frame
anges av användaren.
# determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
df,
"waiting",
function(key, x) {
y <- data.frame(key, max(x$eruptions))
},
schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))
gapplyCollect
Som gapply
, tillämpar en funktion på varje grupp i en SparkDataFrame
och samlar in resultatet tillbaka till R data.frame
. Funktionens utdata ska vara en data.frame
. Men schemat krävs inte för att skickas. Observera att gapplyCollect
det kan misslyckas om utdata från funktionen körs på hela partitionen inte kan hämtas till drivrutinen och får plats i drivrutinsminnet.
# determine six waiting times with the largest eruption time in minutes.
result <- gapplyCollect(
df,
"waiting",
function(key, x) {
y <- data.frame(key, max(x$eruptions))
colnames(y) <- c("waiting", "max_eruption")
y
})
head(result[order(result$max_eruption, decreasing = TRUE), ])
Kör lokala R-funktioner distribuerade med spark.lapply
spark.lapply
Precis som lapply
i intern R spark.lapply
kör en funktion över en lista över element och distribuerar beräkningen med Spark. Tillämpar en funktion på ett sätt som liknar doParallel
eller lapply
på element i en lista. Resultatet av alla beräkningar bör passa i en enda dator. Om så inte är fallet kan de göra något som liknar df <- createDataFrame(list)
och sedan använda dapply
.
# perform distributed training of multiple models with spark.lapply. Here, we pass
# a read-only list of arguments which specifies family the generalized linear model should be.
families <- c("gaussian", "poisson")
train <- function(family) {
model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
summary(model)
}
# return a list of model's summaries
model.summaries <- spark.lapply(families, train)
# print the summary of each model
print(model.summaries)
Köra SQL-frågor från SparkR
En SparkR DataFrame kan också registreras som en tillfällig vy som gör att du kan köra SQL-frågor över dess data. Sql-funktionen gör det möjligt för program att köra SQL-frågor programmatiskt och returnerar resultatet som en SparkR DataFrame.
# Register earlier df as temp view
createOrReplaceTempView(df, "eruptions")
# Create a df using a SparkSQL query
waiting <- sql("SELECT waiting FROM eruptions where waiting>70 ")
head(waiting)
Maskininlärning
SparkR exponerar de flesta MLLib-algoritmer. Under huven använder SparkR MLlib för att träna modellen.
I följande exempel visas hur du skapar en Gaussian GLM-modell med SparkR. Om du vill köra linjär regression anger du familj till "gaussian"
. Om du vill köra logistisk regression anger du familj till "binomial"
. När du använder SparkML GLM
SparkR utför automatiskt en frekvent kodning av kategoriska funktioner så att det inte behöver göras manuellt. Utöver funktioner av typen String och Double är det också möjligt att få plats med MLlib Vector-funktioner för kompatibilitet med andra MLlib-komponenter.
Mer information om vilka maskininlärningsalgoritmer som stöds finns i dokumentationen för SparkR och MLlib.
# create the DataFrame
cars <- cbind(model = rownames(mtcars), mtcars)
carsDF <- createDataFrame(cars)
# fit a linear model over the dataset.
model <- spark.glm(carsDF, mpg ~ wt + cyl, family = "gaussian")
# model coefficients are returned in a similar format to R's native glm().
summary(model)