Использование SparkR
SparkR — это пакет R, который предоставляет интерфейс с легким весом для использования Apache Spark из R. SparkR предоставляет реализацию распределенного кадра данных, которая поддерживает такие операции, как выбор, фильтрация, агрегирование и т. д. SparkR также поддерживает распределенное машинное обучение с помощью MLlib.
Используйте SparkR с помощью определений пакетного задания Spark или интерактивных записных книжек Microsoft Fabric.
Поддержка R доступна только в Spark3.1 или более поздней версии. R в Spark 2.4 не поддерживается.
Необходимые компоненты
Получение подписки Microsoft Fabric. Или зарегистрируйте бесплатную пробную версию Microsoft Fabric.
Войдите в Microsoft Fabric.
Используйте переключатель интерфейса в левой нижней части домашней страницы, чтобы перейти на Fabric.
Откройте или создайте записную книжку. Узнайте, как использовать записные книжки Microsoft Fabric.
Задайте для параметра языка значение SparkR (R), чтобы изменить основной язык.
Подключите записную книжку к lakehouse. В левой части нажмите кнопку "Добавить ", чтобы добавить существующее озеро или создать озеро.
Чтение и запись кадров данных SparkR
Чтение кадра данных SparkR из локального кадра данных R.frame
Самый простой способ создания кадра данных — преобразовать локальный кадр данных R.frame в кадр данных Spark.
# load SparkR pacakge
library(SparkR)
# read a SparkR DataFrame from a local R data.frame
df <- createDataFrame(faithful)
# displays the content of the DataFrame
display(df)
Чтение и запись Кадра данных SparkR из Lakehouse
Данные можно хранить в локальной файловой системе узлов кластера. Общие методы для чтения и записи кадра данных SparkR из Lakehouse и read.df
write.df
. Эти методы позволяют загрузить файл и тип источника данных. SparkR поддерживает чтение файлов в формате CSV, JSON, файлов текстовом формате и файлов Parquet по умолчанию.
Чтобы прочитать и записать в Lakehouse, сначала добавьте его в сеанс. В левой части записной книжки нажмите кнопку "Добавить", чтобы добавить существующий Lakehouse или создать Lakehouse.
Примечание.
Чтобы получить доступ к файлам Lakehouse с помощью пакетов Spark, таких как read.df
или write.df
, используйте его путь ADFS или относительный путь для Spark. В обозревателе Lakehouse щелкните правой кнопкой мыши файлы или папку, к которым вы хотите получить доступ, и скопируйте путь ADFS или относительный путь к Spark из контекстного меню.
# write data in CSV using relative path for Spark
temp_csv_spark<-"Files/data/faithful.csv"
write.df(df, temp_csv_spark ,source="csv", mode = "overwrite", header = "true")
# read data in CSV using relative path for Spark
faithfulDF_csv <- read.df(temp_csv_spark, source= "csv", header = "true", inferSchema = "true")
# displays the content of the DataFrame
display(faithfulDF_csv)
# write data in parquet using ADFS path
temp_parquet_spark<-"abfss://xxx/xxx/data/faithful.parquet"
write.df(df, temp_parquet_spark ,source="parquet", mode = "overwrite", header = "true")
# read data in parquet uxing ADFS path
faithfulDF_pq <- read.df(temp_parquet_spark, source= "parquet", header = "true", inferSchema = "true")
# displays the content of the DataFrame
display(faithfulDF_pq)
Microsoft Fabric предварительно tidyverse
установлен. Вы можете получить доступ к файлам Lakehouse в знакомых пакетах R, таких как чтение и запись файлов Lakehouse с помощью readr::read_csv()
и readr::write_csv()
.
Примечание.
Чтобы получить доступ к файлам Lakehouse с помощью пакетов R, необходимо использовать путь к API файлов. В обозревателе Lakehouse щелкните правой кнопкой мыши файл или папку, к которой требуется получить доступ, и скопируйте путь к API файла из контекстного меню.
# read data in CSV using API path
# To find the path, navigate to the csv file, right click, and Copy File API path.
temp_csv_api<-'/lakehouse/default/Files/data/faithful.csv/part-00000-d8e09a34-bd63-41bd-8cf8-f4ed2ef90e6c-c000.csv'
faithfulDF_API <- readr::read_csv(temp_csv_api)
# display the content of the R data.frame
head(faithfulDF_API)
Вы также можете прочитать кадр данных SparkR в Lakehouse с помощью запросов SparkSQL.
# Regsiter ealier df as temp view
createOrReplaceTempView(df, "eruptions")
# Create a df using a SparkSQL query
waiting <- sql("SELECT * FROM eruptions")
head(waiting)
Операции с кадрами данных
Кадры данных SparkR поддерживают многие функции для структурированной обработки данных. Ниже приводятся несколько простых примеров. Полный список можно найти в документации по API SparkR.
Выбор строк и столбцов
# Select only the "waiting" column
head(select(df,df$waiting))
# Pass in column name as strings
head(select(df, "waiting"))
# Filter to only retain rows with waiting times longer than 70 mins
head(filter(df, df$waiting > 70))
Группирование и статистический анализ
Кадры данных SparkR поддерживают множество часто используемых функций для агрегирования данных после группировки. Например, можно вычислить гистограмму времени ожидания в верном наборе данных, как показано ниже.
# we use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
# we can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))
Операции столбцов
SparkR предоставляет множество функций, которые можно напрямую применять к столбцам для обработки и агрегирования данных. В следующем примере показано применение базовых арифметических функций.
# convert waiting time from hours to seconds.
# you can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)
Применение определяемой пользователем функции
SparkR поддерживает несколько типов определяемых пользователем функций:
Запуск функции в большом наборе данных с dapply
помощью или dapplyCollect
dapply
Примените функцию к каждой секции объекта SparkDataFrame
. Функция, применяемая к каждой SparkDataFrame
секции и должна иметь только один параметр, к которому будет передана функция data.frame, соответствующая каждой секции. Выходные данные функции должны быть data.frame
. Схема задает формат строки результирующего SparkDataFrame
объекта. Он должен соответствовать типам данных возвращаемого значения.
# convert waiting time from hours to seconds
df <- createDataFrame(faithful)
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
structField("waiting_secs", "double"))
# apply UDF to DataFrame
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))
dapplyCollect
Как и dapply, примените функцию к каждой секции и SparkDataFrame
соберите результат обратно. Выходные данные функции должны быть data.frame
. Но на этот раз схема не требуется передавать. Обратите внимание, что может завершиться ошибкой, dapplyCollect
если выходные данные функции выполняются во всех разделах, не могут быть извлечены в драйвер и помещаются в память драйвера.
# convert waiting time from hours to seconds
# apply UDF to DataFrame and return a R's data.frame
ldf <- dapplyCollect(
df,
function(x) {
x <- cbind(x, "waiting_secs" = x$waiting * 60)
})
head(ldf, 3)
Выполнение функции в большом наборе данных сгруппированием по входным столбцам или gapply
gapplyCollect
gapply
Примените функцию к каждой группе объекта SparkDataFrame
. Функция должна применяться к каждой SparkDataFrame
группе и должна иметь только два параметра: группирование ключей и R data.frame
, соответствующих этому ключу. Группы выбираются из SparkDataFrames
столбцов. Выходные данные функции должны быть data.frame
. Схема задает формат строки результирующего SparkDataFrame
объекта. Она должна представлять выходную схему функции R из типов данных Spark. Имена возвращаемых data.frame
столбцов задаются пользователем.
# determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
df,
"waiting",
function(key, x) {
y <- data.frame(key, max(x$eruptions))
},
schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))
gapplyCollect
Например gapply
, применяет функцию к каждой SparkDataFrame
группе и собирает результат обратно в R data.frame
. Выходные данные функции должны быть data.frame
. Но для прохождения схемы не требуется. Обратите внимание, что может завершиться ошибкой, gapplyCollect
если выходные данные функции выполняются во всех разделах, не могут быть извлечены в драйвер и помещаются в память драйвера.
# determine six waiting times with the largest eruption time in minutes.
result <- gapplyCollect(
df,
"waiting",
function(key, x) {
y <- data.frame(key, max(x$eruptions))
colnames(y) <- c("waiting", "max_eruption")
y
})
head(result[order(result$max_eruption, decreasing = TRUE), ])
Выполнение локальных функций R, распределенных с помощью spark.lapply
spark.lapply
lapply
Как и в машинном коде R, spark.lapply
выполняет функцию по списку элементов и распределяет вычисления с помощью Spark. Применяет функцию таким образом, как и doParallel
lapply
к элементам списка. Результаты всех вычислений должны соответствовать одному компьютеру. Если это не так, они могут сделать что-то подобное df <- createDataFrame(list)
, а затем использовать dapply
.
# perform distributed training of multiple models with spark.lapply. Here, we pass
# a read-only list of arguments which specifies family the generalized linear model should be.
families <- c("gaussian", "poisson")
train <- function(family) {
model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
summary(model)
}
# return a list of model's summaries
model.summaries <- spark.lapply(families, train)
# print the summary of each model
print(model.summaries)
Выполнение запросов SQL из SparkR
Кадр данных SparkR также можно зарегистрировать в качестве временного представления, позволяющего выполнять запросы SQL по своим данным. Функция SQL позволяет приложениям выполнять sql-запросы программным способом и возвращать результат в виде кадра данных SparkR.
# Register earlier df as temp view
createOrReplaceTempView(df, "eruptions")
# Create a df using a SparkSQL query
waiting <- sql("SELECT waiting FROM eruptions where waiting>70 ")
head(waiting)
Машинное обучение
SparkR включает большую часть алгоритмов MLLib. SparkR применяет MLlib для обучения модели.
В следующем примере показано, как создать модель GLM Gaussian с помощью SparkR. Чтобы выполнить линейную регрессию, установите для параметра family значение "gaussian"
. Чтобы выполнить логистическую регрессию, установите для параметра family значение "binomial"
. При использовании SparkML GLM
SparkR автоматически выполняет одноохотливную кодировку категориальных функций, чтобы ее не нужно было выполнять вручную. Помимо функций строкового и двойного типа, также можно разместить над функциями MLlib Vector для совместимости с другими компонентами MLlib.
Дополнительные сведения о поддерживаемых алгоритмах машинного обучения см. в документации по SparkR и MLlib.
# create the DataFrame
cars <- cbind(model = rownames(mtcars), mtcars)
carsDF <- createDataFrame(cars)
# fit a linear model over the dataset.
model <- spark.glm(carsDF, mpg ~ wt + cyl, family = "gaussian")
# model coefficients are returned in a similar format to R's native glm().
summary(model)