Usar o SparkR
O SparkR é um pacote do R que fornece um front-end leve para usar o Apache Spark do R. O SparkR fornece uma implementação de quadro de dados distribuído que dá suporte a operações como seleção, filtragem, agregação, etc. O SparkR também dá suporte ao aprendizado de máquina distribuído usando o MLlib.
Use o SparkR por meio de definições de trabalho em lote do Spark ou com notebooks interativos do Microsoft Fabric.
O suporte ao R só está disponível no Spark3.1 ou superior. Não há suporte para R no Spark 2.4.
Pré-requisitos
Obtenha uma assinatura do Microsoft Fabric. Ou, inscreva-se para uma avaliação gratuita do Microsoft Fabric.
Entre no Microsoft Fabric.
Use o alternador de experiência no lado esquerdo da sua página inicial para mudar para a experiência de Ciência de Dados Synapse.
Abrir ou criar um notebook. Para saber como, consulte Como usar notebooks do Microsoft Fabric.
Defina a opção de idioma para SparkR (R) para mudar o idioma principal.
Anexe o notebook a um lakehouse. No lado esquerdo, selecione Adicionar para adicionar um lakehouse existente ou para criar um.
Ler e gravar DataFrames do SparkR
Criar um DataFrame do SparkR de um data.frame do R local
A maneira mais simples de criar um DataFrame é converter um data.frame do R local em um DataFrame do Spark.
# load SparkR pacakge
library(SparkR)
# read a SparkR DataFrame from a local R data.frame
df <- createDataFrame(faithful)
# displays the content of the DataFrame
display(df)
Ler e gravar um DataFrame do SparkR do Lakehouse
Os dados podem ser armazenados no sistema de arquivos local de nós de cluster. Os métodos gerais para ler e gravar um DataFrame do SparkR do Lakehouse são read.df
e write.df
. Esses métodos usam o caminho de carregamento do arquivo e o tipo de fonte de dados. O SparkR dá suporte à leitura de arquivos CSV, JSON, texto e Parquet nativamente.
Para ler e gravar em um Lakehouse, primeiro adicione-o à sua sessão. No lado esquerdo, selecione Adicionar para adicionar um lakehouse existente ou criar um.
Observação
Para acessar arquivos do Lakehouse usando pacotes do Spark, como read.df
ou write.df
, use seu caminho do ADFS ou caminho relativo para Spark. No Gerenciador do Lakehouse, clique com o botão direito do mouse nos arquivos ou na pasta que você deseja acessar e copie seu caminho do ADFS ou caminho relativo para Spark do menu contextual.
# write data in CSV using relative path for Spark
temp_csv_spark<-"Files/data/faithful.csv"
write.df(df, temp_csv_spark ,source="csv", mode = "overwrite", header = "true")
# read data in CSV using relative path for Spark
faithfulDF_csv <- read.df(temp_csv_spark, source= "csv", header = "true", inferSchema = "true")
# displays the content of the DataFrame
display(faithfulDF_csv)
# write data in parquet using ADFS path
temp_parquet_spark<-"abfss://xxx/xxx/data/faithful.parquet"
write.df(df, temp_parquet_spark ,source="parquet", mode = "overwrite", header = "true")
# read data in parquet uxing ADFS path
faithfulDF_pq <- read.df(temp_parquet_spark, source= "parquet", header = "true", inferSchema = "true")
# displays the content of the DataFrame
display(faithfulDF_pq)
O Microsoft Fabric tem tidyverse
pré-instalado. Você pode acessar arquivos do Lakehouse em seus pacotes de R familiares, como ler e gravar arquivos do Lakehouse usando readr::read_csv()
e readr::write_csv()
.
Observação
Para acessar arquivos do Lakehouse usando pacotes do R, você precisa usar o caminho da API do Arquivo. No Gerenciador do Lakehouse, clique com o botão direito do mouse no arquivo ou na pasta que você deseja acessar e copie o caminho da API do Arquivo no menu contextual.
# read data in CSV using API path
# To find the path, navigate to the csv file, right click, and Copy File API path.
temp_csv_api<-'/lakehouse/default/Files/data/faithful.csv/part-00000-d8e09a34-bd63-41bd-8cf8-f4ed2ef90e6c-c000.csv'
faithfulDF_API <- readr::read_csv(temp_csv_api)
# display the content of the R data.frame
head(faithfulDF_API)
Você também pode ler um Dataframe do SparkR em seu Lakehouse usando consultas SparkSQL.
# Regsiter ealier df as temp view
createOrReplaceTempView(df, "eruptions")
# Create a df using a SparkSQL query
waiting <- sql("SELECT * FROM eruptions")
head(waiting)
Operações de DataFrame
Os DataFrames do SparkR dão suporte a várias funções para realizar o processamento de dados estruturado. Veja aqui alguns exemplos básicos. Há uma lista completa na documentação de API do SparkR.
Selecionar linhas e colunas
# Select only the "waiting" column
head(select(df,df$waiting))
# Pass in column name as strings
head(select(df, "waiting"))
# Filter to only retain rows with waiting times longer than 70 mins
head(filter(df, df$waiting > 70))
Agrupamento e agregação
Os DataFrames do SparkR aceitam várias funções usadas comumente para agregar dados após o agrupamento. Por exemplo, podemos calcular um histograma do tempo de espera no conjunto de dados fiel, conforme mostrado abaixo
# we use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
# we can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))
Column operations
O SparkR fornece várias funções que podem ser aplicadas diretamente a colunas para processamento de dados e agregação. O exemplo a seguir mostra o uso de funções aritméticas básicas.
# convert waiting time from hours to seconds.
# you can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)
Aplicar função definida pelo usuário
O SparkR dá suporte a vários tipos de funções definidas pelo usuário:
Executar uma função em um conjunto de dados grande com dapply
ou dapplyCollect
dapply
Aplique uma função a cada partição de um SparkDataFrame
. A função será aplicada a cada partição do SparkDataFrame
e deve ter apenas um parâmetro, ao qual um data.frame que corresponde a cada partição será passado. A saída da função deve ser um data.frame
. O esquema especifica o formato de linha do resultante de um SparkDataFrame
. Ele deve corresponder aos tipos de dados do valor retornado.
# convert waiting time from hours to seconds
df <- createDataFrame(faithful)
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
structField("waiting_secs", "double"))
# apply UDF to DataFrame
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))
dapplyCollect
Como dapply, aplique uma função a cada partição de um SparkDataFrame
e colete o resultado de volta. A saída da função deve ser um data.frame
. Mas, desta vez, o esquema não precisa ser passado. Observe que dapplyCollect
pode falhar se as saídas da função executadas em toda a partição não puderem ser trazidas para o driver e couberem na memória do driver.
# convert waiting time from hours to seconds
# apply UDF to DataFrame and return a R's data.frame
ldf <- dapplyCollect(
df,
function(x) {
x <- cbind(x, "waiting_secs" = x$waiting * 60)
})
head(ldf, 3)
Executar uma função em um agrupamento de conjunto de dados grande por colunas de entrada com gapply
ou gapplyCollect
gapply
Aplique uma função a cada grupo de um SparkDataFrame
. A função deve ser aplicada a cada grupo do SparkDataFrame
e deve ter apenas dois parâmetros: chave de agrupamento e data.frame
de R correspondentes a essa chave. Os grupos são escolhidos de colunas do SparkDataFrames
. A saída da função deve ser um data.frame
. O esquema especifica o formato de linha do resultante de um SparkDataFrame
. Ele deve representar o esquema de saída da função R dos tipos de dados do Spark. Os nomes de coluna do data.frame
retornado são definidos pelo usuário.
# determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
df,
"waiting",
function(key, x) {
y <- data.frame(key, max(x$eruptions))
},
schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))
gapplyCollect
Assim como gapply
, aplica uma função a cada grupo de um SparkDataFrame
e coleta o resultado de volta para o data.frame
de R. A saída da função deve ser um data.frame
. No entanto, o esquema não precisa ser passado. Observe que gapplyCollect
pode falhar se as saídas da função executadas em toda a partição não puderem ser trazidas para o driver e couberem na memória do driver.
# determine six waiting times with the largest eruption time in minutes.
result <- gapplyCollect(
df,
"waiting",
function(key, x) {
y <- data.frame(key, max(x$eruptions))
colnames(y) <- c("waiting", "max_eruption")
y
})
head(result[order(result$max_eruption, decreasing = TRUE), ])
Executar funções do R local distribuídas com spark.lapply
spark.lapply
Semelhante ao lapply
no R nativo, spark.lapply
executa uma função em uma lista de elementos e distribui as computações com o Spark. Aplica uma função de maneira semelhante a doParallel
ou lapply
a elementos de uma lista. Os resultados de todas as computações devem caber em um único computador. Se esse não for o caso, eles poderão fazer algo como df <- createDataFrame(list)
e, em seguida, usar dapply
.
# perform distributed training of multiple models with spark.lapply. Here, we pass
# a read-only list of arguments which specifies family the generalized linear model should be.
families <- c("gaussian", "poisson")
train <- function(family) {
model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
summary(model)
}
# return a list of model's summaries
model.summaries <- spark.lapply(families, train)
# print the summary of each model
print(model.summaries)
Executar consultas SQL do SparkR
Um DataFrame do SparkR também pode ser registrado como uma exibição temporária que permite executar consultas SQL em seus dados. A função sql permite que os aplicativos executem consultas SQL programaticamente e retorna o resultado como um DataFrame do SparkR.
# Register earlier df as temp view
createOrReplaceTempView(df, "eruptions")
# Create a df using a SparkSQL query
waiting <- sql("SELECT waiting FROM eruptions where waiting>70 ")
head(waiting)
Aprendizado de máquina
O SparkR expõe a maioria dos algoritmos de MLLib. Nos bastidores, o SparkR usa MLlib para treinar o modelo.
O exemplo a seguir mostra como criar um modelo GLM gaussiano usando o SparkR. Para executar a regressão linear, defina a família como "gaussian"
. Para executar a regressão logística, defina a família como "binomial"
. Ao usar o SparkML GLM
, o SparkR executa automaticamente a codificação one-hot de recursos categóricos para que isso não precise ser feito manualmente. Além dos recursos do tipo String e Double, também é possível se adaptar aos recursos de vetor do MLlib, para compatibilidade com outros componentes do MLlib.
Para saber mais sobre quais algoritmos de aprendizado de máquina têm suporte, acesse a documentação do SparkR e do MLlib.
# create the DataFrame
cars <- cbind(model = rownames(mtcars), mtcars)
carsDF <- createDataFrame(cars)
# fit a linear model over the dataset.
model <- spark.glm(carsDF, mpg ~ wt + cyl, family = "gaussian")
# model coefficients are returned in a similar format to R's native glm().
summary(model)