Spark を使用してデータ ファイルを操作する
Spark を使用する利点の 1 つは、さまざまなプログラミング言語でコードを記述して実行できることです。これにより、既にお持ちのプログラミング スキルを活用し、特定のタスクに最適な言語を使用することができます。 新しい Azure Databricks Spark ノートブックの既定の言語は PySpark です。これは、データ操作と視覚化に対する強力なサポートにより、データ サイエンティストやアナリストが一般的に使用する Python の Spark 最適化バージョンです。 さらに、Scala (対話形式で使用できる Java 派生言語) やSQL (Spark SQL ライブラリに含まれる一般的に使用される SQL 言語のバリアント) などの言語を使用して、リレーショナル データ構造を操作できます。 ソフトウェア エンジニアは、Java などのフレームワークを使用して Spark 上で実行されるコンパイル済みソリューションを作成することもできます。
データフレームを使用してデータを探索する
Spark では、"耐障害性分散データセット" (RDD) と呼ばれるデータ構造がネイティブで使用されます。ただし、RDD で直接動作するコードを記述 "できる" ものの、Spark で構造化データを操作するために最もよく使用されるデータ構造は、Spark SQL ライブラリの一部として提供される "データフレーム"です。 Spark のデータフレームは、ユビキタス Pandas Python ライブラリのデータフレームと似ていますが、Spark の分散処理環境で動作するように最適化されています。
注意
データフレーム API に加えて、Spark SQL では、Java と Scala でサポートされている厳密に型指定された "データセット" API が提供されます。 このモジュールでは、Dataframe API に焦点を当てます。
データフレームにデータを読み込む
仮説の例を見て、データフレームを使用してデータを操作する方法を確認しましょう。 Databricks File System (DBFS) ストレージの data フォルダーに、products.csv という名前のコンマ区切りのテキスト ファイルに次のデータがあるとします。
ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
Spark ノートブックでは、次の PySpark コードを使用してデータフレームにデータを読み込み、最初の 10 行を表示できます。
%pyspark
df = spark.read.load('/data/products.csv',
format='csv',
header=True
)
display(df.limit(10))
先頭の %pyspark
行は "マジック" と呼ばれ、このセルで使用される言語が PySpark であることを Spark に伝えます。 製品データの例と同等の Scala コードを次に示します。
%spark
val df = spark.read.format("csv").option("header", "true").load("/data/products.csv")
display(df.limit(10))
マジック %spark
は Scala を指定するために使用されます。
ヒント
Notebook インターフェイスの各セルに使用する言語を選択することもできます。
前に示した両方の例では、次のような出力が生成されます。
ProductID | ProductName | カテゴリ | ListPrice |
---|---|---|---|
771 | Mountain-100 Silver, 38 | マウンテン バイク | 3399.9900 |
772 | Mountain-100 Silver, 42 | マウンテン バイク | 3399.9900 |
773 | Mountain-100 Silver, 44 | マウンテン バイク | 3399.9900 |
... | ... | ... | ... |
データフレーム スキーマを指定する
前の例では、CSV ファイルの最初の行に列名が含まれており、Spark により、含まれているデータから各列のデータ型を推論できました。 また、データの明示的なスキーマを指定することもできます。これは、次の CSV の例のように、データ ファイルに列名が含まれていない場合に便利です。
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
次の PySpark の例は、product-data.csv という名前のファイルからデータフレームを読み込むスキーマをこの形式で指定する方法を示しています。
from pyspark.sql.types import *
from pyspark.sql.functions import *
productSchema = StructType([
StructField("ProductID", IntegerType()),
StructField("ProductName", StringType()),
StructField("Category", StringType()),
StructField("ListPrice", FloatType())
])
df = spark.read.load('/data/product-data.csv',
format='csv',
schema=productSchema,
header=False)
display(df.limit(10))
ここでも、結果は次のようになります。
ProductID | ProductName | カテゴリ | ListPrice |
---|---|---|---|
771 | Mountain-100 Silver, 38 | マウンテン バイク | 3399.9900 |
772 | Mountain-100 Silver, 42 | マウンテン バイク | 3399.9900 |
773 | Mountain-100 Silver, 44 | マウンテン バイク | 3399.9900 |
... | ... | ... | ... |
データフレームのフィルター処理とグループ化を行う
Dataframe クラスのメソッドを使用して、含まれているデータをフィルター処理、並べ替え、グループ化、操作できます。 たとえば、次のコード例では、select メソッドを使用して、前の例の製品データを含む df データフレームから ProductName 列と ListPrice 列を取得します。
pricelist_df = df.select("ProductID", "ListPrice")
このコード例の結果は次のようになります。
ProductID | ListPrice |
---|---|
771 | 3399.9900 |
772 | 3399.9900 |
773 | 3399.9900 |
... | ... |
ほとんどのデータ操作メソッドと同様に、select は新しいデータフレーム オブジェクトを返します。
ヒント
データフレームから列のサブセットを選択することは一般的な操作であり、次の短い構文を使用して実現することもできます。
pricelist_df = df["ProductID", "ListPrice"]
メソッドを "チェーン" して、変換されたデータフレームを作成する一連の操作を実行できます。 たとえば、次のコード例では、selectメソッドと where メソッドをチェーンして、Mountain Bikes または Road Bikes のカテゴリを持つ製品に対して ProductName 列と ListPrice 列を含む新しいデータフレームを作成します。
bikes_df = df.select("ProductName", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)
このコード例の結果は次のようになります。
ProductName | ListPrice |
---|---|
Mountain-100 Silver, 38 | 3399.9900 |
Road-750 Black, 52 | 539.9900 |
... | ... |
データをグループ化して集計するには、groupBy メソッドと集計関数を使用します。 たとえば、次の PySpark コードでは、各カテゴリの製品数をカウントします。
counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)
このコード例の結果は次のようになります。
カテゴリ | count |
---|---|
ヘッドセット | 3 |
ホイール | 14 |
マウンテン バイク | 32 |
... | ... |
Spark で SQL 式を使用する
Dataframe API は Spark SQL という名前の Spark ライブラリの一部であり、データ アナリストは SQL 式を使用してデータのクエリと操作を行います。
Spark カタログでデータベース オブジェクトを作成する
Spark カタログは、ビューやテーブルなどのリレーショナル データ オブジェクトのメタストアです。 Spark ランタイムでは、このカタログを使用して、任意の Spark 対応言語で記述されたコードと、一部のデータ アナリストや開発者にとってより自然な SQL 式をシームレスに統合できます。
Spark カタログでクエリを実行するためにデータフレーム内のデータを使用できるようにする最も簡単な方法の 1 つは、次のコード例に示すように、一時ビューを作成することです。
df.createOrReplaceTempView("products")
"ビュー" は一時的なもので、現在のセッションの終了時に自動的に削除されます。 また、カタログに保持される "テーブル" を作成して、Spark SQL を使用してクエリを実行できるデータベースを定義することもできます。
注意
このモジュールでは Spark カタログ テーブルについて詳しく説明しませんが、いくつかの重要な点を確認しておくことをお勧めします。
spark.catalog.createTable
メソッドを使用して、空のテーブルを作成できます。 テーブルは、カタログに関連付けられているストレージの場所に、基になるデータを格納するメタデータ構造です。 テーブルを削除すると、基になるデータも削除されます。- データフレームをテーブルとして保存するには、
saveAsTable
メソッドを使用します。 spark.catalog.createExternalTable
メソッドを使用して "外部" テーブルを作成できます。 外部テーブルではカタログ内のメタデータが定義されますが、外部ストレージの場所 (通常は、データ レイク内のフォルダー) から基になるデータが取得されます。 外部テーブルを削除しても、基になるデータは削除されません。
Spark SQL API を使用してデータのクエリを実行する
任意の言語で記述されたコードで Spark SQL API を使用して、カタログ内のデータに対してクエリを実行できます。 たとえば、次の PySpark コードでは、SQL クエリを使用して、products ビューからデータフレームとしてデータを返します。
bikes_df = spark.sql("SELECT ProductID, ProductName, ListPrice \
FROM products \
WHERE Category IN ('Mountain Bikes', 'Road Bikes')")
display(bikes_df)
このコード例の結果は、次の表のようになります。
ProductName | ListPrice |
---|---|
Mountain-100 Silver, 38 | 3399.9900 |
Road-750 Black, 52 | 539.9900 |
... | ... |
SQL コードを使用する
前の例では、Spark SQL API を使用して、Spark コードに SQL 式を埋め込む方法を示しました。 また、ノートブックで %sql
マジックを使用して、次のようにカタログ内のオブジェクトに対してクエリを行う SQL コードを実行することもできます。
%sql
SELECT Category, COUNT(ProductID) AS ProductCount
FROM products
GROUP BY Category
ORDER BY Category
この SQL コード例では、次のように、ノートブックにテーブルとして自動的に表示される結果セットが返されます。
カテゴリ | ProductCount |
---|---|
ビブショーツ | 3 |
バイク ラック | 1 |
バイク スタンド | 1 |
... | ... |