次の方法で共有


SparkR を使用する

SparkR は、R から Apache Spark を使用するための軽量フロントエンドを提供する R パッケージです。SparkR には、選択、フィルター処理、集計などの操作をサポートする分散データ フレームの実装が用意されています。また、SparkR では、MLlib を使用した分散機械学習もサポートします。

Spark バッチ ジョブ定義を通じて、または対話型の Fabric ノートブックで SparkR を使用します。

R のサポートは、Spark3.1 以降でのみ使用できます。 Spark 2.4 の R はサポートされていません。

前提条件

  • ノートブックを開くか作成します。 方法については、「Microsoft Fabric ノートブックの使用方法」をご覧ください。

  • 言語オプションを [SparkR (R)] に設定することで、主要言語を変更します。

  • ノートブックをレイクハウスにアタッチします。 左側にある [追加] を選択して、既存のレイクハウスを追加するか、レイクハウスを作成します。

SparkR DataFrames の読み取りと書き込み

ローカルの R data.frame から SparkR データフレームを読み取る

DataFrame を作成する最も簡単な方法は、ローカルの R データフレームを Spark DataFrame に変換することです。

# load SparkR pacakge
library(SparkR)

# read a SparkR DataFrame from a local R data.frame
df <- createDataFrame(faithful)

# displays the content of the DataFrame
display(df)

レイクハウスからの SparkR DataFrame の読み取りと書き込み

データは、クラスター ノードのローカル ファイルシステムに格納できます。 レイクハウスから SparkR DataFrame の読み取りと書き込みを行うための一般的なメソッドは、read.dfwrite.df です。 これらのメソッドでは、読み込むファイルのパスとデータ ソースの種類を受け取ります。 SparkR では、CSV、JSON、テキスト、Parquet の各ファイルをネイティブに読み取ることができます。

レイクハウスの読み取りと書き込みを行うには、まずレイクハウスをセッションに追加します。 ノートブックの左側で [追加] を選択して、既存のレイクハウスを追加するか、レイクハウスを作成します。

Note

read.dfwrite.df などの Spark パッケージを使用してレイクファイルのファイルにアクセスするには、その ADFS パス または Spark の相対パス を使用します。 レイクハウス エクスプローラーで、アクセスするファイルまたはフォルダーを右クリックし、コンテキスト メニューから ADFS パス または Spark の相対パス をコピーします。

# write data in CSV using relative path for Spark
temp_csv_spark<-"Files/data/faithful.csv"
write.df(df, temp_csv_spark ,source="csv", mode = "overwrite", header = "true")

# read data in CSV using relative path for Spark
faithfulDF_csv <- read.df(temp_csv_spark, source= "csv", header = "true", inferSchema = "true")

# displays the content of the DataFrame
display(faithfulDF_csv)
# write data in parquet using ADFS path
temp_parquet_spark<-"abfss://xxx/xxx/data/faithful.parquet"
write.df(df, temp_parquet_spark ,source="parquet", mode = "overwrite", header = "true")

# read data in parquet uxing ADFS path
faithfulDF_pq <- read.df(temp_parquet_spark, source= "parquet", header = "true", inferSchema = "true")

# displays the content of the DataFrame
display(faithfulDF_pq)

Microsoft Fabric に tidyverse がプレインストールされています。 readr::read_csv()readr::write_csv() を使用してレイクハウス ファイルの読み取りと書き込みを行うなど、使い慣れた R パッケージのレイクハウス ファイルにアクセスできます。

Note

R パッケージを使用してレイクハウス ファイルにアクセスするには、ファイルの API パスを使用する必要があります。 レイクハウス エクスプローラーで、アクセスするファイルまたはフォルダーを右クリックし、コンテキスト メニューから ファイル API パス をコピーします。

# read data in CSV using API path
# To find the path, navigate to the csv file, right click, and  Copy File API path.
temp_csv_api<-'/lakehouse/default/Files/data/faithful.csv/part-00000-d8e09a34-bd63-41bd-8cf8-f4ed2ef90e6c-c000.csv'
faithfulDF_API <- readr::read_csv(temp_csv_api)

# display the content of the R data.frame
head(faithfulDF_API)

SparkSQL クエリを使用して、レイクハウスの SparkR データフレームを読み取ることもできます。

# Regsiter ealier df as temp view
createOrReplaceTempView(df, "eruptions")

# Create a df using a SparkSQL query
waiting <- sql("SELECT * FROM eruptions")

head(waiting)

DataFrame 操作

SparkR DataFrame では、構造化データ処理を行うための多くの関数がサポートされています。 ここでは基本的な例の一部を示します。 完全な一覧については「SparkR API のドキュメント」をご覧ください。

行と列の選択

# Select only the "waiting" column
head(select(df,df$waiting))
# Pass in column name as strings
head(select(df, "waiting"))
# Filter to only retain rows with waiting times longer than 70 mins
head(filter(df, df$waiting > 70))

グループ化と集計

SparkR DataFrame では、グループ化後にデータを集計するために一般的に使用される関数が多数サポートされています。 たとえば、次に示すように、忠実なデータセットの待機時間のヒストグラムを計算できます

# we use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
# we can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))

列の操作

SparkR には、データ処理と集計のために列に直接適用できる多数の関数が用意されています。 基本的な算術関数の使用例を次に示します。

# convert waiting time from hours to seconds.
# you can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)

ユーザー定義関数を適用する

SparkRは複数のユーザー定義関数をサポートしています:

dapply または dapplyCollect を使用して、大規模なデータセットで関数を実行する

dapply

SparkDataFrame の各パーティションに関数を適用します。 SparkDataFrame の各パーティションに適用される関数は、パーティションに対応する data.frame を引数として 1 つだけ取る必要があります。 関数の出力は data.frame である必要があります。 スキーマは、結果である SparkDataFrame の行形式を指定します。 戻り値の データ型 と一致する必要があります。

# convert waiting time from hours to seconds
df <- createDataFrame(faithful)
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
                     structField("waiting_secs", "double"))

# apply UDF to DataFrame
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))

dapplyCollect

dapply と同様に、SparkDataFrame の各パーティションに関数を適用し、結果を収集します。 関数の出力は data.frame である必要があります。 ただし、今回はスキーマを渡す必要はありません。 関数がすべてのパーティションでの出力をドライバーに取り込むことができず、またドライバーのメモリに収まらない場合、dapplyCollect は失敗する可能性があることにご注意ください。

# convert waiting time from hours to seconds
# apply UDF to DataFrame and return a R's data.frame
ldf <- dapplyCollect(
         df,
         function(x) {
           x <- cbind(x, "waiting_secs" = x$waiting * 60)
         })
head(ldf, 3)

gapply または gapplyCollect を使用して、入力列でグループ化された大規模なデータセットに対して関数を実行する

gapply

SparkDataFrame の各複合グループに関数を適用します。 関数は SparkDataFrame の各グループに適用され、グループ化キーとそのキーに対応する R data.frame という 2 つのパラメーターのみを持つ必要があります。 グループは SparkDataFrames 列から選択されます。 関数の出力は data.frame である必要があります。 スキーマは、結果である SparkDataFrame の行形式を指定します。 これは、Spark のデータ型からの R 関数の出力スキーマを表す必要があります。 返される data.frame の列名は、ユーザーによって設定されます。

# determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
    },
    schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))

gapplyCollect

gapply と同様に、SparkDataFrame の各グループに関数を適用し、結果を R data.frame に収集します。 関数の出力は data.frame である必要があります。 ただし、スキーマを渡す必要はありません。 関数がすべてのパーティションでの出力をドライバーに取り込むことができず、またドライバーのメモリに収まらない場合、gapplyCollect は失敗する可能性があることにご注意ください。

# determine six waiting times with the largest eruption time in minutes.
result <- gapplyCollect(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
        colnames(y) <- c("waiting", "max_eruption")
        y
    })
head(result[order(result$max_eruption, decreasing = TRUE), ])

spark.lapply を使用して分散されたローカル R 関数を実行する

spark.lapply

ネイティブ R の lapply と同様に、 spark.lapply は要素の一覧に対して関数を実行し、Spark を使用して計算を分散します。 doParallel または lapply に似た方法で、リストの要素に関数を適用します。 すべての計算の結果は、1 台のコンピューターに収まる必要があります。 そうでない場合は、df <- createDataFrame(list) のような操作を実行し、dapply を使用できます。

# perform distributed training of multiple models with spark.lapply. Here, we pass
# a read-only list of arguments which specifies family the generalized linear model should be.
families <- c("gaussian", "poisson")
train <- function(family) {
  model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
  summary(model)
}
# return a list of model's summaries
model.summaries <- spark.lapply(families, train)

# print the summary of each model
print(model.summaries)

SparkR から SQL クエリを実行する

SparkR DataFrame は、データに対して SQL クエリを実行できる一時的なビューとして登録することもできます。 sql 関数を使用すると、アプリケーションで SQL クエリをプログラムで実行し、結果を SparkR DataFrame として返します。

# Register earlier df as temp view
createOrReplaceTempView(df, "eruptions")

# Create a df using a SparkSQL query
waiting <- sql("SELECT waiting FROM eruptions where waiting>70 ")

head(waiting)

機械学習

SparkR では、ほとんどの MLLib アルゴリズムが公開されています。 内部的には、SparkR ではモデルをトレーニングするために MLlib が使用されます。

次の例は、SparkR を使用してガウス GLM モデルを作成する方法を示しています。 線形回帰を実行するには、ファミリを "gaussian" に設定します。 ロジスティック回帰を実行するには、ファミリを "binomial" に設定します。 SparkML GLM を使用するとき、GLM SparkR ではカテゴリの特徴のワンホット エンコードが自動的に実行されることで、手動で行う必要がなくなります。 他の MLlib コンポーネントとの互換性のために、String と Double 型の機能に加えて、MLlib Vector の特徴に合わせることもできます。

サポートされている機械学習アルゴリズムの詳細については、SparkR と MLlib に関するドキュメントを参照してください。

# create the DataFrame
cars <- cbind(model = rownames(mtcars), mtcars)
carsDF <- createDataFrame(cars)

# fit a linear model over the dataset.
model <- spark.glm(carsDF, mpg ~ wt + cyl, family = "gaussian")

# model coefficients are returned in a similar format to R's native glm().
summary(model)