Použití SparkR
SparkR je balíček R, který poskytuje lehký front-end pro použití Apache Sparku z R. SparkR poskytuje implementaci distribuovaného datového rámce, která podporuje operace, jako je výběr, filtrování, agregace atd. SparkR podporuje také distribuované strojové učení pomocí knihovny MLlib.
Používejte SparkR prostřednictvím definic dávkových úloh Sparku nebo s interaktivními poznámkovými bloky Microsoft Fabric.
Podpora jazyka R je dostupná jenom ve Sparku 3.1 nebo novějším. R ve Sparku 2.4 se nepodporuje.
Požadavky
Získejte předplatné Microsoft Fabric. Nebo si zaregistrujte bezplatnou zkušební verzi Microsoft Fabricu.
Přihlaste se k Microsoft Fabric.
Pomocí přepínače prostředí na levé straně domovské stránky přepněte na prostředí Synapse Datová Věda.
Otevřete nebo vytvořte poznámkový blok. Postup najdete v tématu Použití poznámkových bloků Microsoft Fabric.
Nastavte možnost jazyka na SparkR (R) a změňte primární jazyk.
Připojte poznámkový blok k jezeru. Na levé straně vyberte Přidat, pokud chcete přidat existující jezerní dům nebo vytvořit jezero.
Čtení a zápis datových rámců SparkR
Čtení datového rámce SparkR z místního datového rámce R
Nejjednodušší způsob, jak vytvořit datový rámec, je převést místní R data.frame na datový rámec Spark.
# load SparkR pacakge
library(SparkR)
# read a SparkR DataFrame from a local R data.frame
df <- createDataFrame(faithful)
# displays the content of the DataFrame
display(df)
Čtení a zápis datového rámce SparkR z Lakehouse
Data mohou být uložena v místním systému souborů uzlů clusteru. Obecné metody čtení a zápisu datového rámce SparkR z Lakehouse je read.df
a write.df
. Tyto metody přebírají cestu pro načtení souboru a typ zdroje dat. SparkR nativně podporuje čtení souborů CSV, JSON, textu a Parquet.
Pokud chcete číst a zapisovat do Lakehouse, nejprve ho přidejte do relace. Na levé straně poznámkového bloku vyberte Přidat a přidejte existující lakehouse nebo vytvořte Lakehouse.
Poznámka:
Pokud chcete získat přístup k souborům Lakehouse pomocí balíčků Sparku, například read.df
nebo write.df
, použijte jeho cestu ADFS nebo relativní cestu pro Spark. V Průzkumníku Lakehouse klikněte pravým tlačítkem na soubory nebo složku, ke které chcete získat přístup, a zkopírujte jeho cestu ADFS nebo relativní cestu pro Spark z místní nabídky.
# write data in CSV using relative path for Spark
temp_csv_spark<-"Files/data/faithful.csv"
write.df(df, temp_csv_spark ,source="csv", mode = "overwrite", header = "true")
# read data in CSV using relative path for Spark
faithfulDF_csv <- read.df(temp_csv_spark, source= "csv", header = "true", inferSchema = "true")
# displays the content of the DataFrame
display(faithfulDF_csv)
# write data in parquet using ADFS path
temp_parquet_spark<-"abfss://xxx/xxx/data/faithful.parquet"
write.df(df, temp_parquet_spark ,source="parquet", mode = "overwrite", header = "true")
# read data in parquet uxing ADFS path
faithfulDF_pq <- read.df(temp_parquet_spark, source= "parquet", header = "true", inferSchema = "true")
# displays the content of the DataFrame
display(faithfulDF_pq)
Microsoft Fabric je tidyverse
předinstalovaný. K souborům Lakehouse můžete přistupovat ve známých balíčcích R, jako je čtení a zápis souborů Lakehouse pomocí readr::read_csv()
a readr::write_csv()
.
Poznámka:
Pokud chcete získat přístup k souborům Lakehouse pomocí balíčků R, musíte použít cestu k rozhraní File API. V Průzkumníku Lakehouse klikněte pravým tlačítkem na soubor nebo složku, ke které chcete získat přístup, a zkopírujte jeho cestu k rozhraní File API z místní nabídky.
# read data in CSV using API path
# To find the path, navigate to the csv file, right click, and Copy File API path.
temp_csv_api<-'/lakehouse/default/Files/data/faithful.csv/part-00000-d8e09a34-bd63-41bd-8cf8-f4ed2ef90e6c-c000.csv'
faithfulDF_API <- readr::read_csv(temp_csv_api)
# display the content of the R data.frame
head(faithfulDF_API)
Datový rámec SparkR ve službě Lakehouse můžete také číst pomocí dotazů SparkSQL.
# Regsiter ealier df as temp view
createOrReplaceTempView(df, "eruptions")
# Create a df using a SparkSQL query
waiting <- sql("SELECT * FROM eruptions")
head(waiting)
Operace datového rámce
Datové rámce SparkR podporují mnoho funkcí pro zpracování strukturovaných dat. Tady je několik základních příkladů. Úplný seznam najdete v dokumentaci k rozhraní API SparkR.
Výběr řádků a sloupců
# Select only the "waiting" column
head(select(df,df$waiting))
# Pass in column name as strings
head(select(df, "waiting"))
# Filter to only retain rows with waiting times longer than 70 mins
head(filter(df, df$waiting > 70))
Seskupení a agregace
Datové rámce SparkR podporují mnoho běžně používaných funkcí pro agregaci dat po seskupení. Můžeme například vypočítat histogram doby čekání v věrné datové sadě, jak je znázorněno níže.
# we use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
# we can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))
Operace se sloupci
SparkR poskytuje mnoho funkcí, které lze přímo použít u sloupců pro zpracování a agregaci dat. Následující příklad ukazuje použití základních aritmetických funkcí.
# convert waiting time from hours to seconds.
# you can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)
Použití uživatelem definované funkce
SparkR podporuje několik druhů uživatelem definovaných funkcí:
Spuštění funkce u velké datové sady s dapply
nebo dapplyCollect
dapply
Použití funkce pro každý oddíl oddílu SparkDataFrame
. Funkce, která se má použít pro každý oddíl oddílu SparkDataFrame
a měla by mít pouze jeden parametr, do kterého bude předána hodnota data.frame každému oddílu. Výstup funkce by měl být .data.frame
Schéma určuje formát řádku výsledného objektu SparkDataFrame
. Musí odpovídat datovým typům vrácených hodnot.
# convert waiting time from hours to seconds
df <- createDataFrame(faithful)
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
structField("waiting_secs", "double"))
# apply UDF to DataFrame
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))
dapplyCollect
Podobně jako dapply použijte funkci na každý oddíl oddílu SparkDataFrame
a shromážděte výsledek zpět. Výstupem funkce by měl být data.frame
. Tentokrát ale není nutné předat schéma. Všimněte si, že dapplyCollect
pokud výstupy funkce běží ve všech oddílech, nelze na ovladač načíst a vejít do paměti ovladače.
# convert waiting time from hours to seconds
# apply UDF to DataFrame and return a R's data.frame
ldf <- dapplyCollect(
df,
function(x) {
x <- cbind(x, "waiting_secs" = x$waiting * 60)
})
head(ldf, 3)
Spuštění funkce pro velké seskupení datových sad podle vstupních sloupců s gapply
nebo gapplyCollect
gapply
Použití funkce pro každou skupinu .SparkDataFrame
Funkce se použije pro každou skupinu a SparkDataFrame
měla by mít pouze dva parametry: seskupovací klíč a R data.frame
odpovídající danému klíči. Skupiny se vyberou ze SparkDataFrames
sloupců. Výstupem funkce by měl být data.frame
. Schéma určuje formát řádku výsledného SparkDataFrame
souboru . Musí reprezentovat výstupní schéma funkce R z datových typů Sparku. Názvy vrácených data.frame
sloupců jsou nastaveny uživatelem.
# determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
df,
"waiting",
function(key, x) {
y <- data.frame(key, max(x$eruptions))
},
schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))
gapplyCollect
Podobně jako gapply
funkce použije funkci pro každou skupinu a SparkDataFrame
shromáždí výsledek zpět do R data.frame
. Výstupem funkce by měl být data.frame
. Schéma ale není nutné předat. Všimněte si, že gapplyCollect
pokud výstupy funkce běží ve všech oddílech, nelze na ovladač načíst a vejít do paměti ovladače.
# determine six waiting times with the largest eruption time in minutes.
result <- gapplyCollect(
df,
"waiting",
function(key, x) {
y <- data.frame(key, max(x$eruptions))
colnames(y) <- c("waiting", "max_eruption")
y
})
head(result[order(result$max_eruption, decreasing = TRUE), ])
Spouštění místních funkcí R distribuovaných pomocí spark.lapply
spark.lapply
lapply
Podobně jako v nativním jazyce R spark.lapply
spustí funkci nad seznamem prvků a distribuuje výpočty pomocí Sparku. Použije funkci způsobem, který je podobný doParallel
prvkům seznamu nebo lapply
prvkům seznamu. Výsledky všechvýpočtůch Pokud tomu tak není, mohou udělat něco jako df <- createDataFrame(list)
a pak použít dapply
.
# perform distributed training of multiple models with spark.lapply. Here, we pass
# a read-only list of arguments which specifies family the generalized linear model should be.
families <- c("gaussian", "poisson")
train <- function(family) {
model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
summary(model)
}
# return a list of model's summaries
model.summaries <- spark.lapply(families, train)
# print the summary of each model
print(model.summaries)
Spouštění dotazů SQL ze SparkR
Datový rámec SparkR lze také zaregistrovat jako dočasné zobrazení, které umožňuje spouštět dotazy SQL na jeho data. Funkce SQL umožňuje aplikacím programově spouštět dotazy SQL a vrací výsledek jako datový rámec SparkR.
# Register earlier df as temp view
createOrReplaceTempView(df, "eruptions")
# Create a df using a SparkSQL query
waiting <- sql("SELECT waiting FROM eruptions where waiting>70 ")
head(waiting)
Strojové učení
SparkR zveřejňuje většinu algoritmů MLLib. SparkR pod kapotou používá K trénování modelu MLlib.
Následující příklad ukazuje, jak pomocí SparkR sestavit model Gaussian GLM. Chcete-li spustit lineární regresi, nastavte rodinu na "gaussian"
. Pokud chcete spustit logistickou regresi, nastavte rodinu na "binomial"
hodnotu . Pokud používáte SparkML GLM
SparkR, automaticky provádí kódování kategorických funkcí s jedním žhavým kódováním, aby se nemuselo provádět ručně. Kromě funkcí typu String a Double je také možné přizpůsobit funkce MLlib Vector, aby byly kompatibilní s jinými komponentami knihovny MLlib.
Další informace o podporovaných algoritmech strojového učení najdete v dokumentaci pro SparkR a MLlib.
# create the DataFrame
cars <- cbind(model = rownames(mtcars), mtcars)
carsDF <- createDataFrame(cars)
# fit a linear model over the dataset.
model <- spark.glm(carsDF, mpg ~ wt + cyl, family = "gaussian")
# model coefficients are returned in a similar format to R's native glm().
summary(model)