次の方法で共有


チュートリアル: Apache Spark DataFrame を使用してデータを読み込んで変換する

このチュートリアルでは、Azure Databricks で Apache Spark Python (PySpark) DataFrame API、Apache Spark Scala DataFrame API、SparkR SparkDataFrame API を使ってデータを読み込んで変換する方法について説明します。

このチュートリアルを最後まで進めると、DataFrame とは何であるかを理解し、以下のタスクを快適に実行できます。

Python

Apache Spark PySpark API リファレンスも参照してください。

Scala

Apache Spark Scala API リファレンスも参照してください。

R

Apache SparkR API リファレンスも参照してください。

DataFrame とは

DataFrame は、潜在的に異なる型の columns を持つ 2 次元ラベル付きデータ構造です。 DataFrame は、スプレッドシート、SQL table、または系列オブジェクトのディクショナリと考えることができます。 Apache Spark DataFrames には、一般的なデータ分析の問題を効率的に解決できる豊富な set の関数 (selectcolumns、フィルター、join、集計) が用意されています。

Apache Spark DataFrames は、Resilient Distributed Datasets (RDD) に基づいて構築された抽象化です。 Spark DataFrames と Spark SQL では、統合された計画と最適化エンジンを使用して、Azure Databricks でサポートされているすべての言語 (Python、SQL、Scala、R) でほぼ同じパフォーマンスを get できます。

要件

次のチュートリアルを完了するには、次の要件を満たす必要があります。

  • このチュートリアルの例を使用するには、ワークスペースで Unity Catalog を有効にする必要があります。

  • このチュートリアルの例では、Unity Catalogボリューム を使用してサンプル データを格納します。 これらの例を使うには、ボリュームを作成し、そのボリュームのcatalog、schema、ボリュームの名前を使って、例で使われるボリューム パスをsetします。

  • Unity Catalogでは、次のアクセス許可が必要です。

    • このチュートリアルで使うボリュームに対する READ VOLUMEWRITE VOLUME、または ALL PRIVILEGES
    • このチュートリアルに使用する schema に対して USE SCHEMAALL PRIVILEGES
    • このチュートリアルで使用する catalog に対して、USE CATALOG または ALL PRIVILEGES を使用します。

    これらのアクセス許可をsetするには、Databricks 管理者に確認するか、「Unity Catalog の権限とセキュリティ保護可能なオブジェクト」を参照してください。

ヒント

この記事の完成したノートブックについては、「DataFrame チュートリアルのノートブック」を参照してください。

ステップ 1: 変数を定義して CSV ファイルを読み込む

この手順では、このチュートリアルで使用する変数を定義し、赤ちゃんの名前データを含む CSV ファイルを health.data.ny.gov から Unity Catalog ボリュームに読み込みます。

  1. 新規アイコン アイコンをクリックして、新しいノートブックを開きます。 Azure Databricks ノートブックを操作する方法については、「ノートブックの外観をカスタマイズする」を参照してください。

  2. 次のコードをコピーして、新しい空のノートブック セルに貼り付けます。 Unity Catalog ボリューム用に、<catalog-name><schema-name><volume-name> を catalog、schema、そしてボリューム名に置き換えてください。 <table_name> を、選択した table 名に置き換えます。 このチュートリアルの後半で、この table に赤ちゃんの名前データを読み込みます。

    Python

    catalog = "<catalog_name>"
    schema = "<schema_name>"
    volume = "<volume_name>"
    download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name = "rows.csv"
    table_name = "<table_name>"
    path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume
    path_table = catalog + "." + schema
    print(path_table) # Show the complete path
    print(path_volume) # Show the complete path
    

    Scala

    val catalog = "<catalog_name>"
    val schema = "<schema_name>"
    val volume = "<volume_name>"
    val downloadUrl = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    val fileName = "rows.csv"
    val tableName = "<table_name>"
    val pathVolume = s"/Volumes/$catalog/$schema/$volume"
    val pathTable = s"$catalog.$schema"
    print(pathVolume) // Show the complete path
    print(pathTable) // Show the complete path
    

    R

    catalog <- "<catalog_name>"
    schema <- "<schema_name>"
    volume <- "<volume_name>"
    download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name <- "rows.csv"
    table_name <- "<table_name>"
    path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "")
    path_table <- paste(catalog, ".", schema, sep = "")
    print(path_volume) # Show the complete path
    print(path_table) # Show the complete path
    
  3. Shift+Enter キーを押してセルを実行し、新しい空のセルを作成します。

  4. 次のコードをコピーして、新しい空のノートブック セルに貼り付けます。 このコードでは、Databricks dbutuils コマンドを使用して、health.data.ny.gov から Unity Catalog ボリュームに rows.csv ファイルをコピーします。

    Python

    dbutils.fs.cp(f"{download_url}", f"{path_volume}/{file_name}")
    

    Scala

    dbutils.fs.cp(downloadUrl, s"$pathVolume/$fileName")
    

    R

    dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
    
  5. Shift+Enter キーを押してセルを実行してから、次のセルに移動します。

ステップ 2: DataFrame を作成する

このステップでは、テスト データを含む df1 という名前の DataFrame を作成してから、その内容を表示します。

  1. 次のコードをコピーして、新しい空のノートブック セルに貼り付けます。 このコードでは、テスト データを含む DataFrame を作成し、DataFrame の内容と schema を表示します。

    Python

    data = [[2021, "test", "Albany", "M", 42]]
    columns = ["Year", "First_Name", "County", "Sex", "Count"]
    
    df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int")
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    Scala

    val data = Seq((2021, "test", "Albany", "M", 42))
    val columns = Seq("Year", "First_Name", "County", "Sex", "Count")
    
    val df1 = data.toDF(columns: _*)
    display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization.
    // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    R

    # Load the SparkR package that is already preinstalled on the cluster.
    library(SparkR)
    
    data <- data.frame(
      Year = as.integer(c(2021)),
      First_Name = c("test"),
      County = c("Albany"),
      Sex = c("M"),
      Count = as.integer(c(42))
    )
    
    df1 <- createDataFrame(data)
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
    
  2. Shift+Enter キーを押してセルを実行してから、次のセルに移動します。

ステップ 3: CSV ファイルから DataFrame にデータを読み込む

この手順では、以前に Unity Catalog ボリュームに読み込んだ CSV ファイルから、df_csv という名前の DataFrame を作成します。 spark.read.csv を参照してください。

  1. 次のコードをコピーして、新しい空のノートブック セルに貼り付けます。 このコードでは、赤ちゃんの名前のデータを CSV ファイルから DataFrame df_csv に読み込み、DataFrame の内容を表示します。

    Python

    df_csv = spark.read.csv(f"{path_volume}/{file_name}",
        header=True,
        inferSchema=True,
        sep=",")
    display(df_csv)
    

    Scala

    val dfCsv = spark.read
        .option("header", "true")
        .option("inferSchema", "true")
        .option("delimiter", ",")
        .csv(s"$pathVolume/$fileName")
    
    display(dfCsv)
    

    R

    df_csv <- read.df(paste(path_volume, "/", file_name, sep=""),
        source="csv",
        header = TRUE,
        inferSchema = TRUE,
        delimiter = ",")
    
    display(df_csv)
    
  2. Shift+Enter キーを押してセルを実行してから、次のセルに移動します。

サポートされている多くのファイル形式からデータを読み込むことができます。

ステップ 4: DataFrame を表示して操作する

次の方法を使って、赤ちゃんの名前の DataFrame を表示して操作します。

Apache Spark DataFrame の schema を表示する方法について説明します。 Apache Spark では、schema という用語を使用して、DataFrame 内の columns の名前とデータ型を参照します。

Note

また、Azure Databricks では、schema という用語を使用して、catalogに登録された tables のコレクションを記述します。

  1. 次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードでは、2 つの DataFrame のスキーマを表示する .printSchema() メソッドを使用して DataFrame の schema を示します。2 つの DataFrame を結合する準備をします。

    Python

    df_csv.printSchema()
    df1.printSchema()
    

    Scala

    dfCsv.printSchema()
    df1.printSchema()
    

    R

    printSchema(df_csv)
    printSchema(df1)
    
  2. Shift+Enter キーを押してセルを実行してから、次のセルに移動します。

DataFrame の column の名前を変更する

DataFrame で column の名前を変更する方法について説明します。

  1. 次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードは、df1 DataFrame 内のそれぞれの column と一致するように、df1_csv DataFrame 内の column の名前を変更します。 このコードでは、Apache Spark withColumnRenamed() メソッドを使います。

    Python

    df_csv = df_csv.withColumnRenamed("First Name", "First_Name")
    df_csv.printSchema
    

    Scala

    val dfCsvRenamed = dfCsv.withColumnRenamed("First Name", "First_Name")
    // when modifying a DataFrame in Scala, you must assign it to a new variable
    dfCsvRenamed.printSchema()
    

    R

    df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name")
    printSchema(df_csv)
    
  2. Shift+Enter キーを押してセルを実行してから、次のセルに移動します。

DataFrame を結合する

1 つの DataFrame の行を別のものに追加する新しい DataFrame を作成する方法を説明します。

  1. 次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードでは、Apache Spark union() メソッドを使って、最初の DataFrame df の内容と、CSV ファイルから読み込まれた赤ちゃんの名前のデータを含む DataFrame df_csv を結合します。

    Python

    df = df1.union(df_csv)
    display(df)
    

    Scala

    val df = df1.union(dfCsvRenamed)
    display(df)
    

    R

    display(df <- union(df1, df_csv))
    
  2. Shift+Enter キーを押してセルを実行してから、次のセルに移動します。

DataFrame で行をフィルター処理する

Apache Spark .filter() または .where() メソッドを使用して行をフィルター処理して、データ set で最も一般的な赤ちゃんの名前を確認します。 フィルター処理を使用して、DataFrame で返す行または変更する行のサブセットをselectします。 以下の例に示すように、パフォーマンスや構文に違いはありません。

.filter() メソッドの使用

  1. 次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードでは、Apache Spark .filter() メソッドを使って、DataFrame 内で数が 50 より多い行を表示します。

    Python
    display(df.filter(df["Count"] > 50))
    
    Scala
    display(df.filter(df("Count") > 50))
    
    R
    display(filteredDF <- filter(df, df$Count > 50))
    
  2. Shift+Enter キーを押してセルを実行してから、次のセルに移動します。

where() メソッドの使用

  1. 次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードでは、Apache Spark .where() メソッドを使って、DataFrame 内で数が 50 より多い行を表示します。

    Python
    display(df.where(df["Count"] > 50))
    
    Scala
    display(df.where(df("Count") > 50))
    
    R
    display(filtered_df <- where(df, df$Count > 50))
    
  2. Shift+Enter キーを押してセルを実行してから、次のセルに移動します。

データフレームから Selectとcolumns を抽出し、頻度で並べます。

select() メソッドで赤ちゃんの名前の頻度を使って、返す DataFrame のcolumnsを指定する方法を説明します。 Apache Spark orderbydesc 関数を使って結果を並べ替えます。

Apache Spark 用の pyspark.sql モジュールでは、SQL 関数がサポートされています。 このチュートリアルで使うこれらの関数の中に、Apache Spark orderBy()desc()expr() 関数があります。 必要に応じてこれらの関数をセッションにインポートし、それらを使用できるようにします。

  1. 次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードでは、desc() 関数をインポートしてから、Apache Spark select() メソッドおよび Apache Spark orderBy()desc() 関数を使って、最も一般的な名前とその数を降順に表示します。

    Python

    from pyspark.sql.functions import desc
    display(df.select("First_Name", "Count").orderBy(desc("Count")))
    

    Scala

    import org.apache.spark.sql.functions.desc
    display(df.select("First_Name", "Count").orderBy(desc("Count")))
    

    R

    display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))
    
  2. Shift+Enter キーを押してセルを実行してから、次のセルに移動します。

サブセットの DataFrame を作成する

既存の DataFrame からサブセット DataFrame を作成する方法を説明します。

  1. 次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードでは、Apache Spark filter メソッドを使い、年、数、性別でデータを制限して新しい DataFrame を作成します。 Apache Spark select() メソッドを使用して、columnsを limit します。 また、Apache Spark orderBy()desc() 関数を使って、数の順で新しい DataFrame を並べ替えます。

    Python

    subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
    display(subsetDF)
    

    Scala

    val subsetDF = df.filter((df("Year") === 2009) && (df("Count") > 100) && (df("Sex") === "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
    
    display(subsetDF)
    

    R

    subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count")
    display(subsetDF)
    
  2. Shift+Enter キーを押してセルを実行してから、次のセルに移動します。

ステップ 5: DataFrameを保存する

DataFrame を保存する方法を説明します。 DataFrame を table に保存するか、データフレームをファイルまたは複数のファイルに書き込むことができます。

DataFrame を table に保存する

Azure Databricks では、既定ですべての tables に Delta Lake 形式が使用されます。 DataFrame を保存するには、catalog と schemaに対する CREATEtable 特権が必要です。

  1. 次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードでは、このチュートリアルの開始時に定義した変数を使用して、DataFrame の内容を table に保存します。

    Python

    df.write.mode("overwrite").saveAsTable(f"{path_table}.{table_name}")
    

    Scala

    df.write.mode("overwrite").saveAsTable(s"$pathTable" + "." + s"$tableName")
    

    R

    saveAsTable(df, paste(path_table, ".", table_name), mode = "overwrite")
    
  2. Shift+Enter キーを押してセルを実行してから、次のセルに移動します。

ほとんどの Apache Spark アプリケーションは、大規模なデータ セットに対して分散方式で動作します。 Apache Spark は、1 つのファイルではなく、ファイルのディレクトリを書き出します。 Delta Lake では Parquet フォルダーとファイルを分割します。 多くのデータ システムでは、これらのファイルのディレクトリを読み取ることができます。 Azure Databricks では、ほとんどのアプリケーションでファイル パスに対して tables を使用することをお勧めします。

DataFrame を JSON ファイルに保存する

  1. 次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードは、DataFrame を JSON ファイルのディレクトリに保存します。

    Python

    df.write.format("json").mode("overwrite").save("/tmp/json_data")
    

    Scala

    df.write.format("json").mode("overwrite").save("/tmp/json_data")
    

    R

    write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")
    
  2. Shift+Enter キーを押してセルを実行してから、次のセルに移動します。

DataFrame を JSON ファイルから読み取る

Apache Spark spark.read.format() メソッドを使って、ディレクトリから DataFrame に JSON データを読み取る方法を説明します。

  1. 次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードでは、前の例で保存した JSON ファイルが表示されます。

    Python

    display(spark.read.format("json").json("/tmp/json_data"))
    

    Scala

    display(spark.read.format("json").json("/tmp/json_data"))
    

    R

    display(read.json("/tmp/json_data"))
    
  2. Shift+Enter キーを押してセルを実行してから、次のセルに移動します。

追加タスク: PySpark、Scala、R で SQL クエリを実行する

Apache Spark の DataFrame には、SQL と PySpark、Scala、R を組み合わせる次のオプションがあります。このチュートリアル用に作成したのと同じノートブックで、次のコードを実行できます。

SQL クエリとして column を指定する

Apache Spark の selectExpr() メソッドの使用方法を説明します。 これは、SQL 式を受け取って更新された DataFrame を返す select() メソッドのバリエーションです。 このメソッドでは、upper などの SQL 式を使用できます。

  1. 次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードでは、Apache Spark selectExpr() メソッドと SQL upper 式を使用して、文字列 column を大文字に変換します (columnの名前を変更します)。

    Python

    display(df.selectExpr("Count", "upper(County) as big_name"))
    

    Scala

    display(df.selectExpr("Count", "upper(County) as big_name"))
    

    R

    display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))
    
  2. Shift+Enter キーを押してセルを実行してから、次のセルに移動します。

expr() を使用して、column に SQL 構文を使用する

Apache Spark expr() 関数をインポートして使用して、column を指定する任意の場所で SQL 構文を使用する方法について説明します。

  1. 次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードでは、expr() 関数をインポートし、Apache Spark expr() 関数と SQL lower 式を使用して、文字列 column を小文字に変換します (columnの名前を変更します)。

    Python

    from pyspark.sql.functions import expr
    display(df.select("Count", expr("lower(County) as little_name")))
    

    Scala

    import org.apache.spark.sql.functions.{col, expr}
    // Scala requires us to import the col() function as well as the expr() function
    
    display(df.select(col("Count"), expr("lower(County) as little_name")))
    

    R

    display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name"))
    # expr() function is not supported in R, selectExpr in SparkR replicates this functionality
    
  2. Shift+Enter キーを押してセルを実行してから、次のセルに移動します。

spark.sql() 関数を使用して任意の SQL クエリを実行する

Apache Spark の spark.sql() 関数を使って任意の SQL クエリを実行する方法を説明します。

  1. 次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードでは、Apache Spark spark.sql() 関数を使用して、SQL 構文を使用して SQL table に対してクエリを実行します。

    Python

    display(spark.sql(f"SELECT * FROM {path_table}.{table_name}"))
    

    Scala

    display(spark.sql(s"SELECT * FROM $pathTable.$tableName"))
    

    R

    display(sql(paste("SELECT * FROM", path_table, ".", table_name)))
    
  2. Shift+Enter キーを押してセルを実行してから、次のセルに移動します。

DataFrame チュートリアルのノートブック

以下のノートブックには、このチュートリアルのクエリ例が含まれています。

Python

Python を使用する DataFrame チュートリアル

ノートブックをGet

Scala

Scala を使用する DataFrame チュートリアル

ノートブックをGet

R

R を使用する DataFrame チュートリアル

ノートブックをGet

その他のリソース