sparklyr を使用する
sparklyr は、Apache Spark への R インターフェイスです。 これによって、使い慣れた R インターフェイスを使用して Spark と対話するメカニズムが得られます。 Spark バッチ ジョブ定義を通じて、または対話型の Microsoft Fabric ノートブックで sparklyr を使用できます。
sparklyr
は、通常、dplyr などの他の tidyverse パッケージと共に使用されます。 Microsoft Fabric では、ランタイムのリリースごとに sparklyr と tidyverse の最新の安定したバージョンが配布されます。 それらをインポートすることで、API の使用を開始できます。
前提条件
Microsoft Fabric サブスクリプションを取得します。 または、無料の Microsoft Fabric 試用版にサインアップします。
Microsoft Fabric にサインインします。
ホーム ページの左側にある環境スイッチャーを使って、Synapse Data Science 環境に切り替えます。
ノートブックを開くか作成します。 方法については、「Microsoft Fabric ノートブックの使用方法」をご覧ください。
言語オプションを [SparkR (R)] に設定することで、主要言語を変更します。
ノートブックをレイクハウスにアタッチします。 左側にある [追加] を選択して、既存のレイクハウスを追加するか、レイクハウスを作成します。
Sparklyr を Synapse Spark クラスターに接続する
sparklyr
接続を確立するには、spark_connect()
で次の接続方法を使用します。 synapse
という新しい接続方法がサポートされています。これにより、既存の Spark セッションに接続できます。 これにより、sparklyr
セッションの開始時間が大幅に短縮されます。 さらに、この接続方法を、オープン ソース化された sparklyr プロジェクトに提供しました。 method = "synapse"
を使用すると、同じセッションで sparklyr
と SparkR
の両方を使用でき、簡単に両者の間でデータを共有できます。
# connect sparklyr to your spark cluster
spark_version <- sparkR.version()
config <- spark_config()
sc <- spark_connect(master = "yarn", version = spark_version, spark_home = "/opt/spark", method = "synapse", config = config)
sparklyr を使用してデータを読み取る
新しい Spark セッションにはデータは含まれません。 最初の手順は、Spark セッションのメモリにデータを読み込むか、Spark がオンデマンドでデータにアクセスできるように Spark をデータの場所にポイントすることです。
# load the sparklyr package
library(sparklyr)
# copy data from R environment to the Spark session's memory
mtcars_tbl <- copy_to(sc, mtcars, "spark_mtcars", overwrite = TRUE)
head(mtcars_tbl)
sparklyr
を使用すると、ABFS パスを使用して Lakehouse ファイルのデータを write
および read
することもできます。 Lakehouse を読んで書き込むには、まずそれをセッションに追加します。 ノートブックの左側で [追加] を選択して、既存のレイクハウスを追加するか、レイクハウスを作成します。
ABFS パスを見つけるには、Lakehouse の [ファイル] フォルダーを右クリックし、[ABFS パスのコピー] を選択します。 次のコードに abfss://xxxx@onelake.dfs.fabric.microsoft.com/xxxx/Files
と置き換えるパスを貼り付けます。
temp_csv = "abfss://xxxx@onelake.dfs.fabric.microsoft.com/xxxx/Files/data/mtcars.csv"
# write the table to your lakehouse using the ABFS path
spark_write_csv(mtcars_tbl, temp_csv, header = TRUE, mode = 'overwrite')
# read the data as CSV from lakehouse using the ABFS path
mtcarsDF <- spark_read_csv(sc, temp_csv)
head(mtcarsDF)
sparklyr を使用してデータを操作する
sparklyr
には、次を使用して Spark 内のデータを処理する複数の方法が用意されています。
dplyr
コマンド- SparkSQL
- Spark のフィーチャー トランスフォーマー
dplyr
を使用します
使い慣れた dplyr
コマンドを使用して、Spark 内のデータを準備できます。 コマンドは Spark 内で実行されるため、R と Spark の間で不要なデータ転送はありません。
[でデータを操作する]dplyr
をクリックすると、Spark での dplyr の使用に関する追加のドキュメントが表示されます。
# count cars by the number of cylinders the engine contains (cyl), order the results descendingly
library(dplyr)
cargroup <- group_by(mtcars_tbl, cyl) %>%
count() %>%
arrange(desc(n))
cargroup
sparklyr
と dplyr
は、R コマンドを Spark SQL に変換します。 結果のクエリを表示するには、show_query()
を使用します。
# show the dplyr commands that are to run against the Spark connection
dplyr::show_query(cargroup)
SQL を使用する
Spark クラスター内のテーブルに対して SQL クエリを直接実行することもできます。 spark_connection()
オブジェクトは Spark 用の DBI インターフェイスを実装するため、dbGetQuery()
を使用して SQL を実行し、次のように結果を R データ フレームとして返すことができます。
library(DBI)
dbGetQuery(sc, "select cyl, count(*) as n from spark_mtcars
GROUP BY cyl
ORDER BY n DESC")
フィーチャー トランスフォーマーを使用する
上記のメソッドはどちらも SQL ステートメントに依存しています。 Spark には、SQL を使用せずに、一部のデータ変換をより便利にするコマンドが用意されています。
たとえば、ft_binarizer()
コマンドを使用すると、別の列の値が特定のしきい値を超えているかどうかを示す新しい列の作成が簡略化されます。
Spark Feature Transformers の完全な一覧は、リファレンス -FT の sparklyr
から入手できます。
mtcars_tbl %>%
ft_binarizer("mpg", "over_20", threshold = 20) %>%
select(mpg, over_20) %>%
head(5)
sparklyr
と SparkR
の間でデータを共有する
method = "synapse"
を使用して sparklyr
を Synapse Spark クラスターに接続すると、同じセッションで sparklyr
と SparkR
の両方を使用でき、簡単に両者の間でデータを共有できます。 sparklyr
で Spark テーブルを作成し、SparkR
から読み取ることができます。
# load the sparklyr package
library(sparklyr)
# Create table in `sparklyr`
mtcars_sparklyr <- copy_to(sc, df = mtcars, name = "mtcars_tbl", overwrite = TRUE, repartition = 3L)
# Read table from `SparkR`
mtcars_sparklr <- SparkR::sql("select cyl, count(*) as n
from mtcars_tbl
GROUP BY cyl
ORDER BY n DESC")
head(mtcars_sparklr)
機械学習
線形回帰モデルに適合するために ml_linear_regression()
を使用する例を次に示します。 組み込みの mtcars
データセットを使用し、その重量 (wt
) とエンジンに含まれるシリンダーの数 (cyl
) に基づいて自動車の燃料消費量 (mpg
) を予測できるかどうかを確認します。 各ケースでは、mpg
と各フィーチャーの関係が線形であると仮定します。
テストおよびトレーニング データセットを生成する
分割を使用します。トレーニングには 70%、モデルのテストには 30% を使用します。 この比率を変えると、異なるモデルになります。
# split the dataframe into test and training dataframes
partitions <- mtcars_tbl %>%
select(mpg, wt, cyl) %>%
sdf_random_split(training = 0.7, test = 0.3, seed = 2023)
モデルをトレーニングする
ロジスティック回帰モデルをトレーニングします。
fit <- partitions$training %>%
ml_linear_regression(mpg ~ .)
fit
次に、summary()
を使用して、モデルの品質と、各予測子の統計的有意性についてもう少し学習します。
summary(fit)
モデルを使用する
ml_predict()
を呼び出すことで、テスト データセットにモデルを適用できます。
pred <- ml_predict(fit, partitions$test)
head(pred)
sparklyr で使用できる Spark ML モデルの一覧については、「リファレンス - ML」を参照してください
Spark クラスターからの切断
spark_disconnect()
を呼び出すか、ノートブック リボンの上部にある [セッションの停止] ボタンを選択すると、Spark セッションが終了します。
spark_disconnect(sc)
関連するコンテンツ
R 機能の詳細については、以下を参照してください。