SparkR を使用する
SparkR は、R から Apache Spark を使用するための軽量フロントエンドを提供する R パッケージです。SparkR には、選択、フィルター処理、集計などの操作をサポートする分散データ フレームの実装が用意されています。また、SparkR では、MLlib を使用した分散機械学習もサポートします。
Spark バッチ ジョブ定義を通じて、または対話型の Fabric ノートブックで SparkR を使用します。
R のサポートは、Spark3.1 以降でのみ使用できます。 Spark 2.4 の R はサポートされていません。
前提条件
Microsoft Fabric サブスクリプションを取得します。 または、無料の Microsoft Fabric 試用版にサインアップします。
Microsoft Fabric にサインインします。
ホーム ページの左側にある環境スイッチャーを使って、Synapse Data Science 環境に切り替えます。
ノートブックを開くか作成します。 方法については、「Microsoft Fabric ノートブックの使用方法」をご覧ください。
言語オプションを [SparkR (R)] に設定することで、主要言語を変更します。
ノートブックをレイクハウスにアタッチします。 左側にある [追加] を選択して、既存のレイクハウスを追加するか、レイクハウスを作成します。
SparkR DataFrames の読み取りと書き込み
ローカルの R data.frame から SparkR データフレームを読み取る
DataFrame を作成する最も簡単な方法は、ローカルの R データフレームを Spark DataFrame に変換することです。
# load SparkR pacakge
library(SparkR)
# read a SparkR DataFrame from a local R data.frame
df <- createDataFrame(faithful)
# displays the content of the DataFrame
display(df)
レイクハウスからの SparkR DataFrame の読み取りと書き込み
データは、クラスター ノードのローカル ファイルシステムに格納できます。 レイクハウスから SparkR DataFrame の読み取りと書き込みを行うための一般的なメソッドは、read.df
と write.df
です。 これらのメソッドでは、読み込むファイルのパスとデータ ソースの種類を受け取ります。 SparkR では、CSV、JSON、テキスト、Parquet の各ファイルをネイティブに読み取ることができます。
レイクハウスの読み取りと書き込みを行うには、まずレイクハウスをセッションに追加します。 ノートブックの左側で [追加] を選択して、既存のレイクハウスを追加するか、レイクハウスを作成します。
Note
read.df
や write.df
などの Spark パッケージを使用してレイクファイルのファイルにアクセスするには、その ADFS パス または Spark の相対パス を使用します。 レイクハウス エクスプローラーで、アクセスするファイルまたはフォルダーを右クリックし、コンテキスト メニューから ADFS パス または Spark の相対パス をコピーします。
# write data in CSV using relative path for Spark
temp_csv_spark<-"Files/data/faithful.csv"
write.df(df, temp_csv_spark ,source="csv", mode = "overwrite", header = "true")
# read data in CSV using relative path for Spark
faithfulDF_csv <- read.df(temp_csv_spark, source= "csv", header = "true", inferSchema = "true")
# displays the content of the DataFrame
display(faithfulDF_csv)
# write data in parquet using ADFS path
temp_parquet_spark<-"abfss://xxx/xxx/data/faithful.parquet"
write.df(df, temp_parquet_spark ,source="parquet", mode = "overwrite", header = "true")
# read data in parquet uxing ADFS path
faithfulDF_pq <- read.df(temp_parquet_spark, source= "parquet", header = "true", inferSchema = "true")
# displays the content of the DataFrame
display(faithfulDF_pq)
Microsoft Fabric に tidyverse
がプレインストールされています。 readr::read_csv()
と readr::write_csv()
を使用してレイクハウス ファイルの読み取りと書き込みを行うなど、使い慣れた R パッケージのレイクハウス ファイルにアクセスできます。
Note
R パッケージを使用してレイクハウス ファイルにアクセスするには、ファイルの API パスを使用する必要があります。 レイクハウス エクスプローラーで、アクセスするファイルまたはフォルダーを右クリックし、コンテキスト メニューから ファイル API パス をコピーします。
# read data in CSV using API path
# To find the path, navigate to the csv file, right click, and Copy File API path.
temp_csv_api<-'/lakehouse/default/Files/data/faithful.csv/part-00000-d8e09a34-bd63-41bd-8cf8-f4ed2ef90e6c-c000.csv'
faithfulDF_API <- readr::read_csv(temp_csv_api)
# display the content of the R data.frame
head(faithfulDF_API)
SparkSQL クエリを使用して、レイクハウスの SparkR データフレームを読み取ることもできます。
# Regsiter ealier df as temp view
createOrReplaceTempView(df, "eruptions")
# Create a df using a SparkSQL query
waiting <- sql("SELECT * FROM eruptions")
head(waiting)
DataFrame 操作
SparkR DataFrame では、構造化データ処理を行うための多くの関数がサポートされています。 ここでは基本的な例の一部を示します。 完全な一覧については「SparkR API のドキュメント」をご覧ください。
行と列の選択
# Select only the "waiting" column
head(select(df,df$waiting))
# Pass in column name as strings
head(select(df, "waiting"))
# Filter to only retain rows with waiting times longer than 70 mins
head(filter(df, df$waiting > 70))
グループ化と集計
SparkR DataFrame では、グループ化後にデータを集計するために一般的に使用される関数が多数サポートされています。 たとえば、次に示すように、忠実なデータセットの待機時間のヒストグラムを計算できます
# we use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
# we can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))
列の操作
SparkR には、データ処理と集計のために列に直接適用できる多数の関数が用意されています。 基本的な算術関数の使用例を次に示します。
# convert waiting time from hours to seconds.
# you can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)
ユーザー定義関数を適用する
SparkRは複数のユーザー定義関数をサポートしています:
dapply
または dapplyCollect
を使用して、大規模なデータセットで関数を実行する
dapply
SparkDataFrame
の各パーティションに関数を適用します。 SparkDataFrame
の各パーティションに適用される関数は、パーティションに対応する data.frame を引数として 1 つだけ取る必要があります。 関数の出力は data.frame
である必要があります。 スキーマは、結果である SparkDataFrame
の行形式を指定します。 戻り値の データ型 と一致する必要があります。
# convert waiting time from hours to seconds
df <- createDataFrame(faithful)
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
structField("waiting_secs", "double"))
# apply UDF to DataFrame
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))
dapplyCollect
dapply と同様に、SparkDataFrame
の各パーティションに関数を適用し、結果を収集します。 関数の出力は data.frame
である必要があります。 ただし、今回はスキーマを渡す必要はありません。 関数がすべてのパーティションでの出力をドライバーに取り込むことができず、またドライバーのメモリに収まらない場合、dapplyCollect
は失敗する可能性があることにご注意ください。
# convert waiting time from hours to seconds
# apply UDF to DataFrame and return a R's data.frame
ldf <- dapplyCollect(
df,
function(x) {
x <- cbind(x, "waiting_secs" = x$waiting * 60)
})
head(ldf, 3)
gapply
または gapplyCollect
を使用して、入力列でグループ化された大規模なデータセットに対して関数を実行する
gapply
SparkDataFrame
の各複合グループに関数を適用します。 関数は SparkDataFrame
の各グループに適用され、グループ化キーとそのキーに対応する R data.frame
という 2 つのパラメーターのみを持つ必要があります。 グループは SparkDataFrames
列から選択されます。 関数の出力は data.frame
である必要があります。 スキーマは、結果である SparkDataFrame
の行形式を指定します。 これは、Spark のデータ型からの R 関数の出力スキーマを表す必要があります。 返される data.frame
の列名は、ユーザーによって設定されます。
# determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
df,
"waiting",
function(key, x) {
y <- data.frame(key, max(x$eruptions))
},
schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))
gapplyCollect
gapply
と同様に、SparkDataFrame
の各グループに関数を適用し、結果を R data.frame
に収集します。 関数の出力は data.frame
である必要があります。 ただし、スキーマを渡す必要はありません。 関数がすべてのパーティションでの出力をドライバーに取り込むことができず、またドライバーのメモリに収まらない場合、gapplyCollect
は失敗する可能性があることにご注意ください。
# determine six waiting times with the largest eruption time in minutes.
result <- gapplyCollect(
df,
"waiting",
function(key, x) {
y <- data.frame(key, max(x$eruptions))
colnames(y) <- c("waiting", "max_eruption")
y
})
head(result[order(result$max_eruption, decreasing = TRUE), ])
spark.lapply を使用して分散されたローカル R 関数を実行する
spark.lapply
ネイティブ R の lapply
と同様に、 spark.lapply
は要素の一覧に対して関数を実行し、Spark を使用して計算を分散します。 doParallel
または lapply
に似た方法で、リストの要素に関数を適用します。 すべての計算の結果は、1 台のコンピューターに収まる必要があります。 そうでない場合は、df <- createDataFrame(list)
のような操作を実行し、dapply
を使用できます。
# perform distributed training of multiple models with spark.lapply. Here, we pass
# a read-only list of arguments which specifies family the generalized linear model should be.
families <- c("gaussian", "poisson")
train <- function(family) {
model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
summary(model)
}
# return a list of model's summaries
model.summaries <- spark.lapply(families, train)
# print the summary of each model
print(model.summaries)
SparkR から SQL クエリを実行する
SparkR DataFrame は、データに対して SQL クエリを実行できる一時的なビューとして登録することもできます。 sql 関数を使用すると、アプリケーションで SQL クエリをプログラムで実行し、結果を SparkR DataFrame として返します。
# Register earlier df as temp view
createOrReplaceTempView(df, "eruptions")
# Create a df using a SparkSQL query
waiting <- sql("SELECT waiting FROM eruptions where waiting>70 ")
head(waiting)
機械学習
SparkR では、ほとんどの MLLib アルゴリズムが公開されています。 内部的には、SparkR ではモデルをトレーニングするために MLlib が使用されます。
次の例は、SparkR を使用してガウス GLM モデルを作成する方法を示しています。 線形回帰を実行するには、ファミリを "gaussian"
に設定します。 ロジスティック回帰を実行するには、ファミリを "binomial"
に設定します。 SparkML GLM を使用するとき、GLM
SparkR ではカテゴリの特徴のワンホット エンコードが自動的に実行されることで、手動で行う必要がなくなります。 他の MLlib コンポーネントとの互換性のために、String と Double 型の機能に加えて、MLlib Vector の特徴に合わせることもできます。
サポートされている機械学習アルゴリズムの詳細については、SparkR と MLlib に関するドキュメントを参照してください。
# create the DataFrame
cars <- cbind(model = rownames(mtcars), mtcars)
carsDF <- createDataFrame(cars)
# fit a linear model over the dataset.
model <- spark.glm(carsDF, mpg ~ wt + cyl, family = "gaussian")
# model coefficients are returned in a similar format to R's native glm().
summary(model)