共用方式為


在 R 中使用 DataFrame 和資料表

本文說明如何使用 SparkR 、sparklyr 和 dplyr R 套件來處理 R data.frame s、 Spark DataFrame 和 記憶體內部 資料表。

請注意,當您使用 SparkR、sparklyr 和 dplyr 時,您可能會發現您可以使用所有這些套件完成特定作業,而且您可以使用最熟悉的套件。 例如,若要執行查詢,您可以呼叫 、 sparklyr::sdf_sql 、 和 dplyr::selectSparkR::sql 函式。 在其他時候,您可能只要完成一或兩個這些套件的作業,而您選擇的作業取決於您的使用案例。 例如,您呼叫 sparklyr::sdf_quantile 的方式與呼叫 dplyr::percentile_approx 的方式稍有不同,即使這兩個函式都會計算分位數。

您可以使用 SQL 作為 SparkR 與 sparklyr 之間的橋接器。 例如,您可以使用 SparkR::sql 來查詢使用 sparklyr 建立的資料表。 您可以使用 sparklyr::sdf_sql 來查詢使用 SparkR 建立的資料表。 程式 dplyr 代碼在執行之前,一律會轉譯為記憶體中的 SQL。 另 請參閱 API 互通性 SQL 翻譯

載入 SparkR、sparklyr 和 dplyr

SparkR、sparklyr 和 dplyr 套件包含在 Azure Databricks 叢集 上安裝的 Databricks Runtime 中。 因此,您不需要呼叫一般 install.package ,才能開始呼叫這些套件。 不過,您仍必須先載入 library 這些套件。 例如,從 Azure Databricks 工作區的 R 筆記本 內,在 Notebook 資料格中執行下列程式碼以載入 SparkR、sparklyr 和 dplyr:

library(SparkR)
library(sparklyr)
library(dplyr)

連線 sparklyr 到叢集

載入 sparklyr 之後,您必須呼叫 sparklyr::spark_connect 以連線到叢集,並指定 databricks 連接方法。 例如,在筆記本資料格中執行下列程式碼,以連線到裝載筆記本的叢集:

sc <- spark_connect(method = "databricks")

相反地,Azure Databricks 筆記本已在叢集上建立 SparkSession ,以便與 SparkR 搭配使用,因此您不需要呼叫 SparkR::sparkR.session ,才能開始呼叫 SparkR。

將 JSON 資料檔案上傳至您的工作區

本文中的許多程式碼範例都是以 Azure Databricks 工作區中特定位置的資料為基礎,其中包含特定資料行名稱和資料類型。 此程式碼範例的資料來源自于 GitHub 內名為 的 book.json JSON 檔案。 若要取得此檔案,並將它上傳至您的工作區:

  1. 移至 GitHub 上的 books.json 檔案,並使用文字編輯器將其內容複寫到本機電腦上某處名為 books.json 的檔案。
  2. 在 Azure Databricks 工作區提要欄位中,按一下 [目錄 ]。
  3. 按一下 [ 建立資料表 ]。
  4. 在 [ 上傳檔案 ] 索引標籤上 books.json ,將檔案從本機電腦卸載至 [要上傳的 檔案] 方塊。 或選取 按一下以流覽 ,然後從本機電腦流覽至 books.json 檔案。

根據預設,Azure Databricks 會使用 路徑 /FileStore/tables/books.json ,將本機 books.json 檔案上傳至 工作區中的 DBFS 位置。

請勿按一下 [使用 UI 建立資料表] 或 [ 在筆記本 中建立資料表]。 本文中的程式碼範例會使用此 DBFS 位置中上傳 books.json 檔案中的資料。

將 JSON 資料讀入 DataFrame

使用 sparklyr::spark_read_json 將上傳的 JSON 檔案讀取至 DataFrame、指定連接、JSON 檔案的路徑,以及資料內部資料表表示的名稱。 在此範例中,您必須指定 book.json 檔案包含多行。 在這裡指定資料行的架構是選擇性的。 否則,sparklyr 預設會推斷資料行的架構。 例如,在筆記本資料格中執行下列程式碼,將上傳的 JSON 檔案資料讀入名為 jsonDF 的資料框架中:

jsonDF <- spark_read_json(
  sc      = sc,
  name    = "jsonTable",
  path    = "/FileStore/tables/books.json",
  options = list("multiLine" = TRUE),
  columns = c(
    author    = "character",
    country   = "character",
    imageLink = "character",
    language  = "character",
    link      = "character",
    pages     = "integer",
    title     = "character",
    year      = "integer"
  )
)

您可以使用 SparkR::headSparkR::showsparklyr::collect 來列印 DataFrame 的第一個資料列。 根據預設, head 預設會列印前六個數據列。 showcollect 列印前 10 個數據列。 例如,在筆記本資料格中執行下列程式碼,以列印名為 jsonDF 之 DataFrame 的第一個資料列:

head(jsonDF)

# Source: spark<?> [?? x 8]
#   author                  country        image…¹ langu…² link  pages title  year
#   <chr>                   <chr>          <chr>   <chr>   <chr> <int> <chr> <int>
# 1 Chinua Achebe           Nigeria        images… English "htt…   209 Thin…  1958
# 2 Hans Christian Andersen Denmark        images… Danish  "htt…   784 Fair…  1836
# 3 Dante Alighieri         Italy          images… Italian "htt…   928 The …  1315
# 4 Unknown                 Sumer and Akk… images… Akkadi… "htt…   160 The … -1700
# 5 Unknown                 Achaemenid Em… images… Hebrew  "htt…   176 The …  -600
# 6 Unknown                 India/Iran/Ir… images… Arabic  "htt…   288 One …  1200
# … with abbreviated variable names ¹​imageLink, ²​language

show(jsonDF)

# Source: spark<jsonTable> [?? x 8]
#    author                  country       image…¹ langu…² link  pages title  year
#    <chr>                   <chr>         <chr>   <chr>   <chr> <int> <chr> <int>
#  1 Chinua Achebe           Nigeria       images… English "htt…   209 Thin…  1958
#  2 Hans Christian Andersen Denmark       images… Danish  "htt…   784 Fair…  1836
#  3 Dante Alighieri         Italy         images… Italian "htt…   928 The …  1315
#  4 Unknown                 Sumer and Ak… images… Akkadi… "htt…   160 The … -1700
#  5 Unknown                 Achaemenid E… images… Hebrew  "htt…   176 The …  -600
#  6 Unknown                 India/Iran/I… images… Arabic  "htt…   288 One …  1200
#  7 Unknown                 Iceland       images… Old No… "htt…   384 Njál…  1350
#  8 Jane Austen             United Kingd… images… English "htt…   226 Prid…  1813
#  9 Honoré de Balzac        France        images… French  "htt…   443 Le P…  1835
# 10 Samuel Beckett          Republic of … images… French… "htt…   256 Moll…  1952
# … with more rows, and abbreviated variable names ¹​imageLink, ²​language
# ℹ Use `print(n = ...)` to see more rows

collect(jsonDF)

# A tibble: 100 × 8
#    author                  country       image…¹ langu…² link  pages title  year
#    <chr>                   <chr>         <chr>   <chr>   <chr> <int> <chr> <int>
#  1 Chinua Achebe           Nigeria       images… English "htt…   209 Thin…  1958
#  2 Hans Christian Andersen Denmark       images… Danish  "htt…   784 Fair…  1836
#  3 Dante Alighieri         Italy         images… Italian "htt…   928 The …  1315
#  4 Unknown                 Sumer and Ak… images… Akkadi… "htt…   160 The … -1700
#  5 Unknown                 Achaemenid E… images… Hebrew  "htt…   176 The …  -600
#  6 Unknown                 India/Iran/I… images… Arabic  "htt…   288 One …  1200
#  7 Unknown                 Iceland       images… Old No… "htt…   384 Njál…  1350
#  8 Jane Austen             United Kingd… images… English "htt…   226 Prid…  1813
#  9 Honoré de Balzac        France        images… French  "htt…   443 Le P…  1835
# 10 Samuel Beckett          Republic of … images… French… "htt…   256 Moll…  1952
# … with 90 more rows, and abbreviated variable names ¹​imageLink, ²​language
# ℹ Use `print(n = ...)` to see more rows

執行 SQL 查詢,以及從資料表寫入和讀取

您可以使用 dplyr 函式在 DataFrame 上執行 SQL 查詢。 例如,在筆記本資料格中執行下列程式碼,以使用 dplyr::group_by ,並從 dployr::count 名為 jsonDF 的 DataFrame 取得計數。 使用 dplyr::arrangedplyr::desc 依計數以遞減順序排序結果。 然後預設會列印前 10 個數據列。

group_by(jsonDF, author) %>%
  count() %>%
  arrange(desc(n))

# Source:     spark<?> [?? x 2]
# Ordered by: desc(n)
#    author                     n
#    <chr>                  <dbl>
#  1 Fyodor Dostoevsky          4
#  2 Unknown                    4
#  3 Leo Tolstoy                3
#  4 Franz Kafka                3
#  5 William Shakespeare        3
#  6 William Faulkner           2
#  7 Gustave Flaubert           2
#  8 Homer                      2
#  9 Gabriel García Márquez     2
# 10 Thomas Mann                2
# … with more rows
# ℹ Use `print(n = ...)` to see more rows

然後 sparklyr::spark_write_table ,您可以使用 將結果寫入 Azure Databricks 中的資料表。 例如,在筆記本資料格中執行下列程式碼以重新執行查詢,然後將結果寫入名為 json_books_agg 的資料表:

group_by(jsonDF, author) %>%
  count() %>%
  arrange(desc(n)) %>%
  spark_write_table(
    name = "json_books_agg",
    mode = "overwrite"
  )

若要確認資料表已建立,您可以搭配 SparkR::showDF 使用 sparklyr::sdf_sql 來顯示資料表的資料。 例如,在筆記本資料格中執行下列程式碼,以查詢資料表到 DataFrame,然後使用 sparklyr::collect 預設列印 DataFrame 的前 10 個數據列:

collect(sdf_sql(sc, "SELECT * FROM json_books_agg"))

# A tibble: 82 × 2
#    author                     n
#    <chr>                  <dbl>
#  1 Fyodor Dostoevsky          4
#  2 Unknown                    4
#  3 Leo Tolstoy                3
#  4 Franz Kafka                3
#  5 William Shakespeare        3
#  6 William Faulkner           2
#  7 Homer                      2
#  8 Gustave Flaubert           2
#  9 Gabriel García Márquez     2
# 10 Thomas Mann                2
# … with 72 more rows
# ℹ Use `print(n = ...)` to see more rows

您也可以使用 sparklyr::spark_read_table 來執行類似的事情。 例如,在筆記本資料格中執行下列程式碼,以查詢名為 jsonDF 的先前 DataFrame,然後依預設使用 sparklyr::collect 來列印 DataFrame 的前 10 個數據列:

fromTable <- spark_read_table(
  sc   = sc,
  name = "json_books_agg"
)

collect(fromTable)

# A tibble: 82 × 2
#    author                     n
#    <chr>                  <dbl>
#  1 Fyodor Dostoevsky          4
#  2 Unknown                    4
#  3 Leo Tolstoy                3
#  4 Franz Kafka                3
#  5 William Shakespeare        3
#  6 William Faulkner           2
#  7 Homer                      2
#  8 Gustave Flaubert           2
#  9 Gabriel García Márquez     2
# 10 Thomas Mann                2
# … with 72 more rows
# ℹ Use `print(n = ...)` to see more rows

在 DataFrame 中新增資料行和計算資料行值

您可以使用 dplyr 函式,將資料行新增至 DataFrame,以及計算資料行的值。

例如,在筆記本資料格中執行下列程式碼,以取得名為 jsonDF 的資料框架內容。 使用 dplyr::mutate 新增名為 today 的資料行,並將這個新資料行填入目前的時間戳記。 然後將這些內容寫入名為 withDate 的新 DataFrame,並使用 dplyr::collect 預設列印新 DataFrame 的前 10 個數據列。

注意

dplyr::mutate 只接受符合 Hive 內建函式 (也稱為 UDF) 和內建彙總函式 (也稱為 UDAF) 的引數。 如需一般資訊,請參閱 Hive 函式 。 如需本節中日期相關函式的相關資訊,請參閱 Date Functions

withDate <- jsonDF %>%
  mutate(today = current_timestamp())

collect(withDate)

# A tibble: 100 × 9
#    author    country image…¹ langu…² link  pages title  year today
#    <chr>     <chr>   <chr>   <chr>   <chr> <int> <chr> <int> <dttm>
#  1 Chinua A… Nigeria images… English "htt…   209 Thin…  1958 2022-09-27 21:32:59
#  2 Hans Chr… Denmark images… Danish  "htt…   784 Fair…  1836 2022-09-27 21:32:59
#  3 Dante Al… Italy   images… Italian "htt…   928 The …  1315 2022-09-27 21:32:59
#  4 Unknown   Sumer … images… Akkadi… "htt…   160 The … -1700 2022-09-27 21:32:59
#  5 Unknown   Achaem… images… Hebrew  "htt…   176 The …  -600 2022-09-27 21:32:59
#  6 Unknown   India/… images… Arabic  "htt…   288 One …  1200 2022-09-27 21:32:59
#  7 Unknown   Iceland images… Old No… "htt…   384 Njál…  1350 2022-09-27 21:32:59
#  8 Jane Aus… United… images… English "htt…   226 Prid…  1813 2022-09-27 21:32:59
#  9 Honoré d… France  images… French  "htt…   443 Le P…  1835 2022-09-27 21:32:59
# 10 Samuel B… Republ… images… French… "htt…   256 Moll…  1952 2022-09-27 21:32:59
# … with 90 more rows, and abbreviated variable names ¹​imageLink, ²​language
# ℹ Use `print(n = ...)` to see more rows

現在,使用 dplyr::mutate 將兩個數據行新增至 DataFrame 的內容 withDate 。 新的 monthyear 資料行包含資料行中的 today 數值月份和年份。 然後將這些內容寫入名為 withMMyyyy 的新 DataFrame,並搭配 dplyr::collect 使用 dplyr::select ,並依預設列印 author 新 DataFrame 前十個數據列的 、 titlemonth 和資料 year 行:

withMMyyyy <- withDate %>%
  mutate(month = month(today),
         year  = year(today))

collect(select(withMMyyyy, c("author", "title", "month", "year")))

# A tibble: 100 × 4
#    author                  title                                     month  year
#    <chr>                   <chr>                                     <int> <int>
#  1 Chinua Achebe           Things Fall Apart                             9  2022
#  2 Hans Christian Andersen Fairy tales                                   9  2022
#  3 Dante Alighieri         The Divine Comedy                             9  2022
#  4 Unknown                 The Epic Of Gilgamesh                         9  2022
#  5 Unknown                 The Book Of Job                               9  2022
#  6 Unknown                 One Thousand and One Nights                   9  2022
#  7 Unknown                 Njál's Saga                                   9  2022
#  8 Jane Austen             Pride and Prejudice                           9  2022
#  9 Honoré de Balzac        Le Père Goriot                                9  2022
# 10 Samuel Beckett          Molloy, Malone Dies, The Unnamable, the …     9  2022
# … with 90 more rows
# ℹ Use `print(n = ...)` to see more rows

現在,使用 dplyr::mutate 將兩個數據行新增至 DataFrame 的內容 withMMyyyy 。 新 formatted_date 資料行包含 yyyy-MM-dd 資料行中的 today 部分,而新 day 資料行則包含新 formatted_date 資料行中的數值日。 然後將這些內容寫入名為 withUnixTimestamp 的新 DataFrame,並搭配 dplyr::collect 使用 dplyr::select ,並依預設列印 title 新 DataFrame 前十個數據列的 、 formatted_date 和資料 day 行:

withUnixTimestamp <- withMMyyyy %>%
  mutate(formatted_date = date_format(today, "yyyy-MM-dd"),
         day            = dayofmonth(formatted_date))

collect(select(withUnixTimestamp, c("title", "formatted_date", "day")))

# A tibble: 100 × 3
#    title                                           formatted_date   day
#    <chr>                                           <chr>          <int>
#  1 Things Fall Apart                               2022-09-27        27
#  2 Fairy tales                                     2022-09-27        27
#  3 The Divine Comedy                               2022-09-27        27
#  4 The Epic Of Gilgamesh                           2022-09-27        27
#  5 The Book Of Job                                 2022-09-27        27
#  6 One Thousand and One Nights                     2022-09-27        27
#  7 Njál's Saga                                     2022-09-27        27
#  8 Pride and Prejudice                             2022-09-27        27
#  9 Le Père Goriot                                  2022-09-27        27
# 10 Molloy, Malone Dies, The Unnamable, the trilogy 2022-09-27        27
# … with 90 more rows
# ℹ Use `print(n = ...)` to see more rows

建立暫存檢視

您可以在記憶體中建立以現有 DataFrame 為基礎的具名暫存檢視。 例如,在筆記本資料格中執行下列程式碼,以用來 SparkR::createOrReplaceTempView 取得前面名為 jsonTable DataFrame 的內容,並將暫存檢視從中命名為 timestampTable 。 然後,使用 sparklyr::spark_read_table 來讀取暫存檢視的內容。 使用 sparklyr::collect 預設列印臨時表的前 10 個數據列:

createOrReplaceTempView(withTimestampDF, viewName = "timestampTable")

spark_read_table(
  sc = sc,
  name = "timestampTable"
) %>% collect()

# A tibble: 100 × 10
#    author    country image…¹ langu…² link  pages title  year today
#    <chr>     <chr>   <chr>   <chr>   <chr> <int> <chr> <int> <dttm>
#  1 Chinua A… Nigeria images… English "htt…   209 Thin…  1958 2022-09-27 21:11:56
#  2 Hans Chr… Denmark images… Danish  "htt…   784 Fair…  1836 2022-09-27 21:11:56
#  3 Dante Al… Italy   images… Italian "htt…   928 The …  1315 2022-09-27 21:11:56
#  4 Unknown   Sumer … images… Akkadi… "htt…   160 The … -1700 2022-09-27 21:11:56
#  5 Unknown   Achaem… images… Hebrew  "htt…   176 The …  -600 2022-09-27 21:11:56
#  6 Unknown   India/… images… Arabic  "htt…   288 One …  1200 2022-09-27 21:11:56
#  7 Unknown   Iceland images… Old No… "htt…   384 Njál…  1350 2022-09-27 21:11:56
#  8 Jane Aus… United… images… English "htt…   226 Prid…  1813 2022-09-27 21:11:56
#  9 Honoré d… France  images… French  "htt…   443 Le P…  1835 2022-09-27 21:11:56
# 10 Samuel B… Republ… images… French… "htt…   256 Moll…  1952 2022-09-27 21:11:56
# … with 90 more rows, 1 more variable: month <chr>, and abbreviated variable
#   names ¹​imageLink, ²​language
# ℹ Use `print(n = ...)` to see more rows, and `colnames()` to see all variable names

在 DataFrame 上執行統計分析

您可以使用 sparklyr 和 dplyr 進行統計分析。

例如,建立要執行統計資料的 DataFrame。 若要這樣做,請在筆記本資料格中執行下列程式碼,以用來 sparklyr::sdf_copy_to 將 R 內建的資料集內容 iris 寫入名為 iris 的資料框架。 使用 sparklyr::sdf_collect 預設列印臨時表的前 10 個數據列:

irisDF <- sdf_copy_to(
  sc        = sc,
  x         = iris,
  name      = "iris",
  overwrite = TRUE
)

sdf_collect(irisDF, "row-wise")

# A tibble: 150 × 5
#    Sepal_Length Sepal_Width Petal_Length Petal_Width Species
#           <dbl>       <dbl>        <dbl>       <dbl> <chr>
#  1          5.1         3.5          1.4         0.2 setosa
#  2          4.9         3            1.4         0.2 setosa
#  3          4.7         3.2          1.3         0.2 setosa
#  4          4.6         3.1          1.5         0.2 setosa
#  5          5           3.6          1.4         0.2 setosa
#  6          5.4         3.9          1.7         0.4 setosa
#  7          4.6         3.4          1.4         0.3 setosa
#  8          5           3.4          1.5         0.2 setosa
#  9          4.4         2.9          1.4         0.2 setosa
# 10          4.9         3.1          1.5         0.1 setosa
# … with 140 more rows
# ℹ Use `print(n = ...)` to see more rows

現在使用 dplyr::group_by 依資料行分組資料 Species 列。 搭配 使用 dplyr::summarize ,依 資料行 Species 的 25th、50th、75 和 100th 分位數 Sepal_Length 計算摘要 dplyr::percentile_approx 統計資料。 使用 sparklyr::collect 列印結果:

注意

dplyr::summarize 只接受符合 Hive 內建函式 (也稱為 UDF) 和內建彙總函式 (也稱為 UDAF) 的引數。 如需一般資訊,請參閱 Hive 函式 。 如需 的相關資訊 percentile_approx ,請參閱 內建彙總函式(UDAF)。

quantileDF <- irisDF %>%
  group_by(Species) %>%
  summarize(
    quantile_25th = percentile_approx(
      Sepal_Length,
      0.25
    ),
    quantile_50th = percentile_approx(
      Sepal_Length,
      0.50
    ),
    quantile_75th = percentile_approx(
      Sepal_Length,
      0.75
    ),
    quantile_100th = percentile_approx(
      Sepal_Length,
      1.0
    )
  )

collect(quantileDF)

# A tibble: 3 × 5
#   Species    quantile_25th quantile_50th quantile_75th quantile_100th
#   <chr>              <dbl>         <dbl>         <dbl>          <dbl>
# 1 virginica            6.2           6.5           6.9            7.9
# 2 versicolor           5.6           5.9           6.3            7
# 3 setosa               4.8           5             5.2            5.8

例如,您可以使用 sparklyr::sdf_quantile 來計算類似的結果:

print(sdf_quantile(
  x = irisDF %>%
    filter(Species == "virginica"),
  column = "Sepal_Length",
  probabilities = c(0.25, 0.5, 0.75, 1.0)
))

# 25%  50%  75% 100%
# 6.2  6.5  6.9  7.9