使用 sparklyr
sparklyr 是 Apache Spark 的 R 接口。 它提供能使用熟悉的 R 接口与 Spark 交互的机制。 可以通过 Spark 批处理作业定义或交互式 Microsoft Fabric 笔记本使用 sparklyr。
sparklyr
通常与其他 tidyverse 包(例如 dplyr)一起使用。 Microsoft Fabric 每一次运行时发布时都会分发 sparklyr 和 tidyverse 的最新稳定版。 可以导入它们并开始使用 API。
先决条件
获取 Microsoft Fabric 订阅。 或者注册免费的 Microsoft Fabric 试用版。
登录 Microsoft Fabric。
使用主页左侧的体验切换器切换到 Synapse 数据科学体验。
打开或创建笔记本。 请参阅如何使用 Microsoft Fabric 笔记本,了解如何操作。
通过将语言选项设置为 Spark (R) 来更改主要语言。
将笔记本附加到湖屋。 选择左侧的“添加”以添加现有湖屋或创建湖屋。
将 sparklyr 连接到 Synapse Spark 群集
在 spark_connect()
中使用以下连接方法建立 sparklyr
连接。 我们支持名为 synapse
的新连接方法,该方法可用于连接到现有的 Spark 会话。 这样大大减少了 sparklyr
会话开启时间。 此外,我们还将此连接方法加入了开源 sparklyr 项目。 使用 method = "synapse"
,可以在同一会话中使用 sparklyr
和 SparkR
,还能轻松地在两者间共享数据。
# connect sparklyr to your spark cluster
spark_version <- sparkR.version()
config <- spark_config()
sc <- spark_connect(master = "yarn", version = spark_version, spark_home = "/opt/spark", method = "synapse", config = config)
使用 sparklyr 读取数据
新的 Spark 会话不包含任何数据。 第一步是将数据加载到 Spark 会话的内存中,或使 Spark 指向数据的位置,以便它可以按需访问数据。
# load the sparklyr package
library(sparklyr)
# copy data from R environment to the Spark session's memory
mtcars_tbl <- copy_to(sc, mtcars, "spark_mtcars", overwrite = TRUE)
head(mtcars_tbl)
使用 sparklyr
,还可以使用 ABFS 路径从 Lakehouse 文件 write
和 read
数据。 若要读取和写入湖屋,请先将其添加到会话。 在笔记本左侧,选择“添加”以添加现有的湖屋或创建湖屋。
若要查找 ABFS 路径,请右键单击湖屋中的“文件”文件夹,然后选择“复制 ABFS 路径”。 粘贴路径以替换此代码中的 abfss://xxxx@onelake.dfs.fabric.microsoft.com/xxxx/Files
:
temp_csv = "abfss://xxxx@onelake.dfs.fabric.microsoft.com/xxxx/Files/data/mtcars.csv"
# write the table to your lakehouse using the ABFS path
spark_write_csv(mtcars_tbl, temp_csv, header = TRUE, mode = 'overwrite')
# read the data as CSV from lakehouse using the ABFS path
mtcarsDF <- spark_read_csv(sc, temp_csv)
head(mtcarsDF)
使用 sparklyr 操作数据
sparklyr
提供了多种方法来处理 Spark 中的数据,使用:
dplyr
命令- SparkSQL
- Spark 的功能转换器
使用 dplyr
可以使用熟悉的 dplyr
命令在 Spark 中准备数据。 命令在 Spark 内部运行,因此 R 和 Spark 之间没有不必要的数据传输。
单击“使用 dplyr
操作数据”,查看有关将 dplyr 与 Spark 配合使用的额外文档。
# count cars by the number of cylinders the engine contains (cyl), order the results descendingly
library(dplyr)
cargroup <- group_by(mtcars_tbl, cyl) %>%
count() %>%
arrange(desc(n))
cargroup
sparklyr
和 dplyr
为我们将 R 命令转换为 Spark SQL。 若要查看生成的查询,请使用 show_query()
:
# show the dplyr commands that are to run against the Spark connection
dplyr::show_query(cargroup)
使用 SQL
还可以直接对 Spark 群集中的表执行 SQL 查询。 spark_connection()
对象实现 Spark 的 DBI 接口,因此可以使用 dbGetQuery()
执行 SQL 并将结果作为 R 数据帧返回:
library(DBI)
dbGetQuery(sc, "select cyl, count(*) as n from spark_mtcars
GROUP BY cyl
ORDER BY n DESC")
使用功能转换器
上述两种方法都依赖于 SQL 语句。 Spark 提供的命令使某些数据转换更加方便,而无需使用 SQL。
例如,ft_binarizer()
命令简化了新列的创建过程,该列指示另一列的值是否高于特定阈值。
可以通过 sparklyr
从引用 -FT 中找到可用的 Spark 功能转换器的完整列表。
mtcars_tbl %>%
ft_binarizer("mpg", "over_20", threshold = 20) %>%
select(mpg, over_20) %>%
head(5)
在 sparklyr
和 SparkR
之间共享数据
使用 method = "synapse"
将 sparklyr
连接到 synapse spark 群集时,可以在同一会话中使用 sparklyr
和 SparkR
,还能轻松地在两者间共享数据。 可以在 sparklyr
中创建 spark 表,并从 SparkR
读取它。
# load the sparklyr package
library(sparklyr)
# Create table in `sparklyr`
mtcars_sparklyr <- copy_to(sc, df = mtcars, name = "mtcars_tbl", overwrite = TRUE, repartition = 3L)
# Read table from `SparkR`
mtcars_sparklr <- SparkR::sql("select cyl, count(*) as n
from mtcars_tbl
GROUP BY cyl
ORDER BY n DESC")
head(mtcars_sparklr)
机器学习
以下示例使用 ml_linear_regression()
来拟合线性回归模型。 我们使用内置的 mtcars
数据集,看看是否可以根据汽车的重量 (wt
) 以及发动机包含的汽缸数量 (cyl
) 来预测汽车的油耗 (mpg
)。 我们假设在每种情况下,mpg
和每个特征之间的关系都是线性的。
生成测试和训练数据集
使用拆分:70% 用于训练,30% 用于测试模型。 采用此比率会产生不同的模型。
# split the dataframe into test and training dataframes
partitions <- mtcars_tbl %>%
select(mpg, wt, cyl) %>%
sdf_random_split(training = 0.7, test = 0.3, seed = 2023)
定型模型
训练逻辑回归模型。
fit <- partitions$training %>%
ml_linear_regression(mpg ~ .)
fit
现在,使用 summary()
详细了解模型的质量,以及每个预测器的统计意义。
summary(fit)
使用模型
可以通过调用 ml_predict()
在测试数据集上应用模型。
pred <- ml_predict(fit, partitions$test)
head(pred)
有关通过 sparklyr 提供的 Spark ML 模型的列表,请访问引用 - ML
断开与 Spark 群集的连接
可以调用 spark_disconnect()
或选择笔记本功能区顶部的“停止会话”按钮来结束 Spark 会话。
spark_disconnect(sc)
相关内容
详细了解 R 功能: