チュートリアル: Apache Spark DataFrame を使用してデータを読み込んで変換する
このチュートリアルでは、Azure Databricks で Apache Spark Python (PySpark) DataFrame API、Apache Spark Scala DataFrame API、SparkR SparkDataFrame API を使ってデータを読み込んで変換する方法について説明します。
このチュートリアルを最後まで進めると、DataFrame とは何であるかを理解し、以下のタスクを快適に実行できます。
Python
- 変数を定義し、パブリック データを Unity Catalog ボリューム にコピーする
- Python を使用して DataFrame を作成する
- CSV ファイルから DataFrame にデータを読み込む
- DataFrame を表示して操作する
- DataFrame を保存する
- PySpark で SQL クエリを実行する
Apache Spark PySpark API リファレンスも参照してください。
Scala
- 変数を定義し、パブリック データを Unity Catalog ボリューム にコピーする
- Scala を使用して DataFrame を作成する
- CSV ファイルから DataFrame にデータを読み込む
- DataFrame を表示して操作する
- DataFrame を保存する
- Apache Spark で SQL クエリを実行する
Apache Spark Scala API リファレンスも参照してください。
R
- 変数を定義し、パブリック データを Unity Catalog ボリューム にコピーする
- SparkR SparkDataFrames を作成する
- CSV ファイルから DataFrame にデータを読み込む
- DataFrame を表示して操作する
- DataFrame を保存する
- SparkR で SQL クエリを実行する
Apache SparkR API リファレンスも参照してください。
DataFrame とは
DataFrame は、潜在的に異なる型の columns を持つ 2 次元ラベル付きデータ構造です。 DataFrame は、スプレッドシート、SQL table、または系列オブジェクトのディクショナリと考えることができます。 Apache Spark DataFrames には、一般的なデータ分析の問題を効率的に解決できる豊富な set の関数 (selectcolumns、フィルター、join、集計) が用意されています。
Apache Spark DataFrames は、Resilient Distributed Datasets (RDD) に基づいて構築された抽象化です。 Spark DataFrames と Spark SQL では、統合された計画と最適化エンジンを使用して、Azure Databricks でサポートされているすべての言語 (Python、SQL、Scala、R) でほぼ同じパフォーマンスを get できます。
要件
次のチュートリアルを完了するには、次の要件を満たす必要があります。
このチュートリアルの例を使用するには、ワークスペースで Unity Catalog を有効にする必要があります。
このチュートリアルの例では、Unity Catalogボリューム を使用してサンプル データを格納します。 これらの例を使うには、ボリュームを作成し、そのボリュームのcatalog、schema、ボリュームの名前を使って、例で使われるボリューム パスをsetします。
Unity Catalogでは、次のアクセス許可が必要です。
- このチュートリアルで使うボリュームに対する
READ VOLUME
とWRITE VOLUME
、またはALL PRIVILEGES
。 - このチュートリアルに使用する schema に対して
USE SCHEMA
かALL PRIVILEGES
。 - このチュートリアルで使用する catalog に対して、
USE CATALOG
またはALL PRIVILEGES
を使用します。
これらのアクセス許可をsetするには、Databricks 管理者に確認するか、「Unity Catalog の権限とセキュリティ保護可能なオブジェクト」を参照してください。
- このチュートリアルで使うボリュームに対する
ヒント
この記事の完成したノートブックについては、「DataFrame チュートリアルのノートブック」を参照してください。
ステップ 1: 変数を定義して CSV ファイルを読み込む
この手順では、このチュートリアルで使用する変数を定義し、赤ちゃんの名前データを含む CSV ファイルを health.data.ny.gov から Unity Catalog ボリュームに読み込みます。
アイコンをクリックして、新しいノートブックを開きます。 Azure Databricks ノートブックを操作する方法については、「ノートブックの外観をカスタマイズする」を参照してください。
次のコードをコピーして、新しい空のノートブック セルに貼り付けます。 Unity Catalog ボリューム用に、
<catalog-name>
、<schema-name>
、<volume-name>
を catalog、schema、そしてボリューム名に置き換えてください。<table_name>
を、選択した table 名に置き換えます。 このチュートリアルの後半で、この table に赤ちゃんの名前データを読み込みます。Python
catalog = "<catalog_name>" schema = "<schema_name>" volume = "<volume_name>" download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name = "rows.csv" table_name = "<table_name>" path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume path_table = catalog + "." + schema print(path_table) # Show the complete path print(path_volume) # Show the complete path
Scala
val catalog = "<catalog_name>" val schema = "<schema_name>" val volume = "<volume_name>" val downloadUrl = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" val fileName = "rows.csv" val tableName = "<table_name>" val pathVolume = s"/Volumes/$catalog/$schema/$volume" val pathTable = s"$catalog.$schema" print(pathVolume) // Show the complete path print(pathTable) // Show the complete path
R
catalog <- "<catalog_name>" schema <- "<schema_name>" volume <- "<volume_name>" download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name <- "rows.csv" table_name <- "<table_name>" path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "") path_table <- paste(catalog, ".", schema, sep = "") print(path_volume) # Show the complete path print(path_table) # Show the complete path
Shift+Enter
キーを押してセルを実行し、新しい空のセルを作成します。次のコードをコピーして、新しい空のノートブック セルに貼り付けます。 このコードでは、Databricks dbutuils コマンドを使用して、health.data.ny.gov から Unity Catalog ボリュームに
rows.csv
ファイルをコピーします。Python
dbutils.fs.cp(f"{download_url}", f"{path_volume}/{file_name}")
Scala
dbutils.fs.cp(downloadUrl, s"$pathVolume/$fileName")
R
dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
Shift+Enter
キーを押してセルを実行してから、次のセルに移動します。
ステップ 2: DataFrame を作成する
このステップでは、テスト データを含む df1
という名前の DataFrame を作成してから、その内容を表示します。
次のコードをコピーして、新しい空のノートブック セルに貼り付けます。 このコードでは、テスト データを含む DataFrame を作成し、DataFrame の内容と schema を表示します。
Python
data = [[2021, "test", "Albany", "M", 42]] columns = ["Year", "First_Name", "County", "Sex", "Count"] df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int") display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
Scala
val data = Seq((2021, "test", "Albany", "M", 42)) val columns = Seq("Year", "First_Name", "County", "Sex", "Count") val df1 = data.toDF(columns: _*) display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization. // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
R
# Load the SparkR package that is already preinstalled on the cluster. library(SparkR) data <- data.frame( Year = as.integer(c(2021)), First_Name = c("test"), County = c("Albany"), Sex = c("M"), Count = as.integer(c(42)) ) df1 <- createDataFrame(data) display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
Shift+Enter
キーを押してセルを実行してから、次のセルに移動します。
ステップ 3: CSV ファイルから DataFrame にデータを読み込む
この手順では、以前に Unity Catalog ボリュームに読み込んだ CSV ファイルから、df_csv
という名前の DataFrame を作成します。 spark.read.csv を参照してください。
次のコードをコピーして、新しい空のノートブック セルに貼り付けます。 このコードでは、赤ちゃんの名前のデータを CSV ファイルから DataFrame
df_csv
に読み込み、DataFrame の内容を表示します。Python
df_csv = spark.read.csv(f"{path_volume}/{file_name}", header=True, inferSchema=True, sep=",") display(df_csv)
Scala
val dfCsv = spark.read .option("header", "true") .option("inferSchema", "true") .option("delimiter", ",") .csv(s"$pathVolume/$fileName") display(dfCsv)
R
df_csv <- read.df(paste(path_volume, "/", file_name, sep=""), source="csv", header = TRUE, inferSchema = TRUE, delimiter = ",") display(df_csv)
Shift+Enter
キーを押してセルを実行してから、次のセルに移動します。
サポートされている多くのファイル形式からデータを読み込むことができます。
ステップ 4: DataFrame を表示して操作する
次の方法を使って、赤ちゃんの名前の DataFrame を表示して操作します。
DataFrame schema を印刷する
Apache Spark DataFrame の schema を表示する方法について説明します。 Apache Spark では、schema という用語を使用して、DataFrame 内の columns の名前とデータ型を参照します。
Note
また、Azure Databricks では、schema という用語を使用して、catalogに登録された tables のコレクションを記述します。
次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードでは、2 つの DataFrame のスキーマを表示する
.printSchema()
メソッドを使用して DataFrame の schema を示します。2 つの DataFrame を結合する準備をします。Python
df_csv.printSchema() df1.printSchema()
Scala
dfCsv.printSchema() df1.printSchema()
R
printSchema(df_csv) printSchema(df1)
Shift+Enter
キーを押してセルを実行してから、次のセルに移動します。
DataFrame の column の名前を変更する
DataFrame で column の名前を変更する方法について説明します。
次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードは、
df1
DataFrame 内のそれぞれの column と一致するように、df1_csv
DataFrame 内の column の名前を変更します。 このコードでは、Apache SparkwithColumnRenamed()
メソッドを使います。Python
df_csv = df_csv.withColumnRenamed("First Name", "First_Name") df_csv.printSchema
Scala
val dfCsvRenamed = dfCsv.withColumnRenamed("First Name", "First_Name") // when modifying a DataFrame in Scala, you must assign it to a new variable dfCsvRenamed.printSchema()
R
df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name") printSchema(df_csv)
Shift+Enter
キーを押してセルを実行してから、次のセルに移動します。
DataFrame を結合する
1 つの DataFrame の行を別のものに追加する新しい DataFrame を作成する方法を説明します。
次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードでは、Apache Spark
union()
メソッドを使って、最初の DataFramedf
の内容と、CSV ファイルから読み込まれた赤ちゃんの名前のデータを含む DataFramedf_csv
を結合します。Python
df = df1.union(df_csv) display(df)
Scala
val df = df1.union(dfCsvRenamed) display(df)
R
display(df <- union(df1, df_csv))
Shift+Enter
キーを押してセルを実行してから、次のセルに移動します。
DataFrame で行をフィルター処理する
Apache Spark .filter()
または .where()
メソッドを使用して行をフィルター処理して、データ set で最も一般的な赤ちゃんの名前を確認します。 フィルター処理を使用して、DataFrame で返す行または変更する行のサブセットをselectします。 以下の例に示すように、パフォーマンスや構文に違いはありません。
.filter() メソッドの使用
次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードでは、Apache Spark
.filter()
メソッドを使って、DataFrame 内で数が 50 より多い行を表示します。Python
display(df.filter(df["Count"] > 50))
Scala
display(df.filter(df("Count") > 50))
R
display(filteredDF <- filter(df, df$Count > 50))
Shift+Enter
キーを押してセルを実行してから、次のセルに移動します。
where() メソッドの使用
次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードでは、Apache Spark
.where()
メソッドを使って、DataFrame 内で数が 50 より多い行を表示します。Python
display(df.where(df["Count"] > 50))
Scala
display(df.where(df("Count") > 50))
R
display(filtered_df <- where(df, df$Count > 50))
Shift+Enter
キーを押してセルを実行してから、次のセルに移動します。
データフレームから Selectとcolumns を抽出し、頻度で並べます。
select()
メソッドで赤ちゃんの名前の頻度を使って、返す DataFrame のcolumnsを指定する方法を説明します。 Apache Spark orderby
と desc
関数を使って結果を並べ替えます。
Apache Spark 用の pyspark.sql モジュールでは、SQL 関数がサポートされています。 このチュートリアルで使うこれらの関数の中に、Apache Spark orderBy()
、desc()
、expr()
関数があります。 必要に応じてこれらの関数をセッションにインポートし、それらを使用できるようにします。
次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードでは、
desc()
関数をインポートしてから、Apache Sparkselect()
メソッドおよび Apache SparkorderBy()
とdesc()
関数を使って、最も一般的な名前とその数を降順に表示します。Python
from pyspark.sql.functions import desc display(df.select("First_Name", "Count").orderBy(desc("Count")))
Scala
import org.apache.spark.sql.functions.desc display(df.select("First_Name", "Count").orderBy(desc("Count")))
R
display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))
Shift+Enter
キーを押してセルを実行してから、次のセルに移動します。
サブセットの DataFrame を作成する
既存の DataFrame からサブセット DataFrame を作成する方法を説明します。
次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードでは、Apache Spark
filter
メソッドを使い、年、数、性別でデータを制限して新しい DataFrame を作成します。 Apache Sparkselect()
メソッドを使用して、columnsを limit します。 また、Apache SparkorderBy()
とdesc()
関数を使って、数の順で新しい DataFrame を並べ替えます。Python
subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count")) display(subsetDF)
Scala
val subsetDF = df.filter((df("Year") === 2009) && (df("Count") > 100) && (df("Sex") === "F")).select("First_Name", "County", "Count").orderBy(desc("Count")) display(subsetDF)
R
subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count") display(subsetDF)
Shift+Enter
キーを押してセルを実行してから、次のセルに移動します。
ステップ 5: DataFrameを保存する
DataFrame を保存する方法を説明します。 DataFrame を table に保存するか、データフレームをファイルまたは複数のファイルに書き込むことができます。
DataFrame を table に保存する
Azure Databricks では、既定ですべての tables に Delta Lake 形式が使用されます。 DataFrame を保存するには、catalog と schemaに対する CREATE
table 特権が必要です。
次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードでは、このチュートリアルの開始時に定義した変数を使用して、DataFrame の内容を table に保存します。
Python
df.write.mode("overwrite").saveAsTable(f"{path_table}.{table_name}")
Scala
df.write.mode("overwrite").saveAsTable(s"$pathTable" + "." + s"$tableName")
R
saveAsTable(df, paste(path_table, ".", table_name), mode = "overwrite")
Shift+Enter
キーを押してセルを実行してから、次のセルに移動します。
ほとんどの Apache Spark アプリケーションは、大規模なデータ セットに対して分散方式で動作します。 Apache Spark は、1 つのファイルではなく、ファイルのディレクトリを書き出します。 Delta Lake では Parquet フォルダーとファイルを分割します。 多くのデータ システムでは、これらのファイルのディレクトリを読み取ることができます。 Azure Databricks では、ほとんどのアプリケーションでファイル パスに対して tables を使用することをお勧めします。
DataFrame を JSON ファイルに保存する
次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードは、DataFrame を JSON ファイルのディレクトリに保存します。
Python
df.write.format("json").mode("overwrite").save("/tmp/json_data")
Scala
df.write.format("json").mode("overwrite").save("/tmp/json_data")
R
write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")
Shift+Enter
キーを押してセルを実行してから、次のセルに移動します。
DataFrame を JSON ファイルから読み取る
Apache Spark spark.read.format()
メソッドを使って、ディレクトリから DataFrame に JSON データを読み取る方法を説明します。
次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードでは、前の例で保存した JSON ファイルが表示されます。
Python
display(spark.read.format("json").json("/tmp/json_data"))
Scala
display(spark.read.format("json").json("/tmp/json_data"))
R
display(read.json("/tmp/json_data"))
Shift+Enter
キーを押してセルを実行してから、次のセルに移動します。
追加タスク: PySpark、Scala、R で SQL クエリを実行する
Apache Spark の DataFrame には、SQL と PySpark、Scala、R を組み合わせる次のオプションがあります。このチュートリアル用に作成したのと同じノートブックで、次のコードを実行できます。
SQL クエリとして column を指定する
Apache Spark の selectExpr()
メソッドの使用方法を説明します。 これは、SQL 式を受け取って更新された DataFrame を返す select()
メソッドのバリエーションです。 このメソッドでは、upper
などの SQL 式を使用できます。
次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードでは、Apache Spark
selectExpr()
メソッドと SQLupper
式を使用して、文字列 column を大文字に変換します (columnの名前を変更します)。Python
display(df.selectExpr("Count", "upper(County) as big_name"))
Scala
display(df.selectExpr("Count", "upper(County) as big_name"))
R
display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))
Shift+Enter
キーを押してセルを実行してから、次のセルに移動します。
expr()
を使用して、column に SQL 構文を使用する
Apache Spark expr()
関数をインポートして使用して、column を指定する任意の場所で SQL 構文を使用する方法について説明します。
次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードでは、
expr()
関数をインポートし、Apache Sparkexpr()
関数と SQLlower
式を使用して、文字列 column を小文字に変換します (columnの名前を変更します)。Python
from pyspark.sql.functions import expr display(df.select("Count", expr("lower(County) as little_name")))
Scala
import org.apache.spark.sql.functions.{col, expr} // Scala requires us to import the col() function as well as the expr() function display(df.select(col("Count"), expr("lower(County) as little_name")))
R
display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name")) # expr() function is not supported in R, selectExpr in SparkR replicates this functionality
Shift+Enter
キーを押してセルを実行してから、次のセルに移動します。
spark.sql() 関数を使用して任意の SQL クエリを実行する
Apache Spark の spark.sql()
関数を使って任意の SQL クエリを実行する方法を説明します。
次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードでは、Apache Spark
spark.sql()
関数を使用して、SQL 構文を使用して SQL table に対してクエリを実行します。Python
display(spark.sql(f"SELECT * FROM {path_table}.{table_name}"))
Scala
display(spark.sql(s"SELECT * FROM $pathTable.$tableName"))
R
display(sql(paste("SELECT * FROM", path_table, ".", table_name)))
Shift+Enter
キーを押してセルを実行してから、次のセルに移動します。
DataFrame チュートリアルのノートブック
以下のノートブックには、このチュートリアルのクエリ例が含まれています。
Python
Python を使用する DataFrame チュートリアル
ノートブックをGet
Scala
Scala を使用する DataFrame チュートリアル
ノートブックをGet
R
R を使用する DataFrame チュートリアル
ノートブックをGet