Utilizar o SparkR
O SparkR é um pacote R que fornece um frontend leve para usar o Apache Spark da R. O SparkR fornece uma implementação de quadro de dados distribuído que suporta operações como seleção, filtragem, agregação, etc. O SparkR também suporta aprendizado de máquina distribuído usando MLlib.
Use o SparkR por meio de definições de trabalho em lote do Spark ou com blocos de anotações interativos do Microsoft Fabric.
O suporte a R só está disponível no Spark3.1 ou superior. R no Spark 2.4 não é suportado.
Pré-requisitos
Obtenha uma assinatura do Microsoft Fabric. Ou inscreva-se para uma avaliação gratuita do Microsoft Fabric.
Entre no Microsoft Fabric.
Use o seletor de experiência no canto inferior esquerdo da página inicial para alternar para o Fabric.
Abra ou crie um bloco de notas. Para saber como, consulte Como usar blocos de anotações do Microsoft Fabric.
Defina a opção de idioma como SparkR (R) para alterar o idioma principal.
Ligue o seu bloco de notas a uma casa no lago. No lado esquerdo, selecione Adicionar para adicionar uma casa de lago existente ou para criar uma casa de lago.
Ler e gravar DataFrames SparkR
Ler um DataFrame SparkR a partir de um data.frame R local
A maneira mais simples de criar um DataFrame é converter um R data.frame local em um Spark DataFrame.
# load SparkR pacakge
library(SparkR)
# read a SparkR DataFrame from a local R data.frame
df <- createDataFrame(faithful)
# displays the content of the DataFrame
display(df)
Leia e escreva o SparkR DataFrame da Lakehouse
Os dados podem ser armazenados no sistema de arquivos local de nós de cluster. Os métodos gerais para ler e escrever um DataFrame SparkR da Lakehouse é read.df
e write.df
. Esses métodos tomam o caminho para o arquivo a ser carregado e o tipo de fonte de dados. O SparkR suporta a leitura de arquivos CSV, JSON, texto e Parquet nativamente.
Para ler e escrever em uma Lakehouse, primeiro adicione-a à sua sessão. No lado esquerdo do bloco de anotações, selecione Adicionar para adicionar um Lakehouse existente ou criar um Lakehouse.
Nota
Para acessar arquivos do Lakehouse usando pacotes do Spark, como read.df
ou write.df
, use seu caminho ADFS ou caminho relativo para o Spark. No Lakehouse explorer, clique com o botão direito do mouse nos arquivos ou pastas que você deseja acessar e copie seu caminho ADFS ou caminho relativo para o Spark no menu contextual.
# write data in CSV using relative path for Spark
temp_csv_spark<-"Files/data/faithful.csv"
write.df(df, temp_csv_spark ,source="csv", mode = "overwrite", header = "true")
# read data in CSV using relative path for Spark
faithfulDF_csv <- read.df(temp_csv_spark, source= "csv", header = "true", inferSchema = "true")
# displays the content of the DataFrame
display(faithfulDF_csv)
# write data in parquet using ADFS path
temp_parquet_spark<-"abfss://xxx/xxx/data/faithful.parquet"
write.df(df, temp_parquet_spark ,source="parquet", mode = "overwrite", header = "true")
# read data in parquet uxing ADFS path
faithfulDF_pq <- read.df(temp_parquet_spark, source= "parquet", header = "true", inferSchema = "true")
# displays the content of the DataFrame
display(faithfulDF_pq)
O Microsoft Fabric foi tidyverse
pré-instalado. Você pode acessar arquivos Lakehouse em seus pacotes R familiares, como ler e escrever arquivos Lakehouse usando readr::read_csv()
e readr::write_csv()
.
Nota
Para acessar arquivos do Lakehouse usando pacotes R, você precisa usar o caminho da API de arquivo. No Lakehouse explorer, clique com o botão direito do mouse no arquivo ou pasta que você deseja acessar e copie seu caminho da API de arquivo do menu contextual.
# read data in CSV using API path
# To find the path, navigate to the csv file, right click, and Copy File API path.
temp_csv_api<-'/lakehouse/default/Files/data/faithful.csv/part-00000-d8e09a34-bd63-41bd-8cf8-f4ed2ef90e6c-c000.csv'
faithfulDF_API <- readr::read_csv(temp_csv_api)
# display the content of the R data.frame
head(faithfulDF_API)
Você também pode ler um Dataframe SparkR em sua Lakehouse usando consultas SparkSQL.
# Regsiter ealier df as temp view
createOrReplaceTempView(df, "eruptions")
# Create a df using a SparkSQL query
waiting <- sql("SELECT * FROM eruptions")
head(waiting)
Operações DataFrame
Os SparkR DataFrames suportam muitas funções para fazer processamento de dados estruturados. Aqui estão alguns exemplos básicos. Uma lista completa pode ser encontrada nos documentos da API do SparkR.
Selecionar linhas e colunas
# Select only the "waiting" column
head(select(df,df$waiting))
# Pass in column name as strings
head(select(df, "waiting"))
# Filter to only retain rows with waiting times longer than 70 mins
head(filter(df, df$waiting > 70))
Agrupamento e agregação
Os quadros de dados SparkR suportam muitas funções comumente usadas para agregar dados após o agrupamento. Por exemplo, podemos calcular um histograma do tempo de espera no conjunto de dados fiel, como mostrado abaixo
# we use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
# we can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))
Operações de coluna
O SparkR fornece muitas funções que podem ser aplicadas diretamente a colunas para processamento e agregação de dados. O exemplo a seguir mostra o uso de funções aritméticas básicas.
# convert waiting time from hours to seconds.
# you can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)
Aplicar função definida pelo usuário
O SparkR suporta vários tipos de funções definidas pelo usuário:
Executar uma função em um conjunto de dados grande com dapply
ou dapplyCollect
dapply
Aplique uma função a cada partição de um SparkDataFrame
arquivo . A função a ser aplicada a cada partição do e deve ter apenas um parâmetro, para o SparkDataFrame
qual um data.frame corresponde a cada partição será passada. A saída da função deve ser um data.frame
arquivo . O esquema especifica o formato de linha do arquivo SparkDataFrame
. Ele deve corresponder aos tipos de dados do valor retornado.
# convert waiting time from hours to seconds
df <- createDataFrame(faithful)
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
structField("waiting_secs", "double"))
# apply UDF to DataFrame
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))
dapplyCollect
Como dapply, aplique uma função a cada partição de um SparkDataFrame
e colete o resultado de volta. A saída da função deve ser um data.frame
arquivo . Mas, desta vez, o esquema não precisa ser passado. Observe que dapplyCollect
pode falhar se as saídas da função executadas em toda a partição não podem ser puxadas para o driver e caber na memória do driver.
# convert waiting time from hours to seconds
# apply UDF to DataFrame and return a R's data.frame
ldf <- dapplyCollect(
df,
function(x) {
x <- cbind(x, "waiting_secs" = x$waiting * 60)
})
head(ldf, 3)
Executar uma função em um grande agrupamento de conjuntos de dados por coluna(s) de entrada com gapply
ou gapplyCollect
gapply
Aplicar uma função a cada grupo de um SparkDataFrame
arquivo . A função deve ser aplicada a cada grupo do e deve ter apenas dois parâmetros: chave de SparkDataFrame
agrupamento e R data.frame
correspondente a essa chave. Os grupos são escolhidos a partir da SparkDataFrames
(s) coluna(s). A saída da função deve ser um data.frame
arquivo . O esquema especifica o formato de linha do arquivo SparkDataFrame
. Ele deve representar o esquema de saída da função R dos tipos de dados do Spark. Os nomes das colunas retornadas data.frame
são definidos pelo usuário.
# determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
df,
"waiting",
function(key, x) {
y <- data.frame(key, max(x$eruptions))
},
schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))
gapplyCollect
Como gapply
, aplica uma função a cada grupo de a SparkDataFrame
e coleta o resultado de volta para R data.frame
. A saída da função deve ser um data.frame
arquivo . Mas, o esquema não precisa ser passado. Observe que gapplyCollect
pode falhar se as saídas da função executadas em toda a partição não podem ser puxadas para o driver e caber na memória do driver.
# determine six waiting times with the largest eruption time in minutes.
result <- gapplyCollect(
df,
"waiting",
function(key, x) {
y <- data.frame(key, max(x$eruptions))
colnames(y) <- c("waiting", "max_eruption")
y
})
head(result[order(result$max_eruption, decreasing = TRUE), ])
Execute funções R locais distribuídas com spark.lapply
spark.lapply
Semelhante ao lapply
R nativo, spark.lapply
executa uma função sobre uma lista de elementos e distribui os cálculos com o Spark. Aplica uma função de maneira semelhante ou doParallel
lapply
a elementos de uma lista. Os resultados de todos os cálculos devem caber em uma única máquina. Se não for esse o caso, eles podem fazer algo parecido df <- createDataFrame(list)
e depois usar dapply
.
# perform distributed training of multiple models with spark.lapply. Here, we pass
# a read-only list of arguments which specifies family the generalized linear model should be.
families <- c("gaussian", "poisson")
train <- function(family) {
model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
summary(model)
}
# return a list of model's summaries
model.summaries <- spark.lapply(families, train)
# print the summary of each model
print(model.summaries)
Executar consultas SQL a partir do SparkR
Um DataFrame SparkR também pode ser registrado como uma exibição temporária que permite executar consultas SQL sobre seus dados. A função sql permite que os aplicativos executem consultas SQL programaticamente e retorna o resultado como um DataFrame SparkR.
# Register earlier df as temp view
createOrReplaceTempView(df, "eruptions")
# Create a df using a SparkSQL query
waiting <- sql("SELECT waiting FROM eruptions where waiting>70 ")
head(waiting)
Aprendizagem automática
O SparkR expõe a maioria dos algoritmos MLLib. Sob o capô, o SparkR usa MLlib para treinar o modelo.
O exemplo a seguir mostra como criar um modelo GLM gaussiano usando SparkR. Para executar a regressão linear, defina family como "gaussian"
. Para executar a regressão logística, defina família como "binomial"
. Ao usar o SparkML, GLM
o SparkR executa automaticamente uma codificação a quente de recursos categóricos para que não precise ser feita manualmente. Além dos recursos do tipo String e Double, também é possível ajustar os recursos do MLlib Vetor, para compatibilidade com outros componentes MLlib.
Para saber mais sobre quais algoritmos de aprendizado de máquina são suportados, você pode visitar a documentação do SparkR e MLlib.
# create the DataFrame
cars <- cbind(model = rownames(mtcars), mtcars)
carsDF <- createDataFrame(cars)
# fit a linear model over the dataset.
model <- spark.glm(carsDF, mpg ~ wt + cyl, family = "gaussian")
# model coefficients are returned in a similar format to R's native glm().
summary(model)