Spark データフレーム内のデータを操作する
Spark では、"耐障害性分散データセット" (RDD) と呼ばれるデータ構造がネイティブで使用されます。ただし、RDD で直接動作するコードを記述 "できる" ものの、Spark で構造化データを操作するために最もよく使用されるデータ構造は、Spark SQL ライブラリの一部として提供される "データフレーム"です。 Spark のデータフレームは、ユビキタス Pandas Python ライブラリのデータフレームと似ていますが、Spark の分散処理環境で動作するように最適化されています。
注意
データフレーム API に加えて、Spark SQL では、Java と Scala でサポートされている厳密に型指定された "データセット" API が提供されます。 このモジュールでは、Dataframe API に焦点を当てます。
データフレームにデータを読み込む
仮説の例を見て、データフレームを使用してデータを操作する方法を確認しましょう。 レイクハウスの Files/data フォルダー内の products.csv という名前のコンマ区切りテキスト ファイルに、次のデータがあるとします。
ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
スキーマの推論
Spark ノートブックで、次の PySpark コードを使用してデータフレームにファイル データを読み込み、最初の 10 行を表示できます。
%%pyspark
df = spark.read.load('Files/data/products.csv',
format='csv',
header=True
)
display(df.limit(10))
先頭の %%pyspark
行は "マジック" と呼ばれ、このセルで使用される言語が PySpark であることを Spark に伝えます。 ノートブック インターフェイスのツール バーで既定として使用する言語を選択し、マジックを使用して特定のセルの選択をオーバーライドできます。 たとえば、製品データの同等の Scala コードの例を次に示します。
%%spark
val df = spark.read.format("csv").option("header", "true").load("Files/data/products.csv")
display(df.limit(10))
マジック %%spark
は Scala を指定するために使用されます。
これらのコード サンプルの両方で、次のような出力が生成されます。
ProductID | ProductName | カテゴリ | ListPrice |
---|---|---|---|
771 | Mountain-100 Silver, 38 | マウンテン バイク | 3399.9900 |
772 | Mountain-100 Silver, 42 | マウンテン バイク | 3399.9900 |
773 | Mountain-100 Silver, 44 | マウンテン バイク | 3399.9900 |
... | ... | ... | ... |
明示的なスキーマの指定
前の例では、CSV ファイルの最初の行に列名が含まれており、Spark により、含まれているデータから各列のデータ型を推論できました。 また、データの明示的なスキーマを指定することもできます。これは、次の CSV の例のように、データ ファイルに列名が含まれていない場合に便利です。
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
次の PySpark の例は、product-data.csv という名前のファイルからデータフレームを読み込むスキーマをこの形式で指定する方法を示しています。
from pyspark.sql.types import *
from pyspark.sql.functions import *
productSchema = StructType([
StructField("ProductID", IntegerType()),
StructField("ProductName", StringType()),
StructField("Category", StringType()),
StructField("ListPrice", FloatType())
])
df = spark.read.load('Files/data/product-data.csv',
format='csv',
schema=productSchema,
header=False)
display(df.limit(10))
ここでも、結果は次のようになります。
ProductID | ProductName | カテゴリ | ListPrice |
---|---|---|---|
771 | Mountain-100 Silver, 38 | マウンテン バイク | 3399.9900 |
772 | Mountain-100 Silver, 42 | マウンテン バイク | 3399.9900 |
773 | Mountain-100 Silver, 44 | マウンテン バイク | 3399.9900 |
... | ... | ... | ... |
ヒント
明示的なスキーマを指定すると、パフォーマンスも向上します。
データフレームのフィルター処理とグループ化を行う
Dataframe クラスのメソッドを使用して、含まれているデータをフィルター処理、並べ替え、グループ化、操作できます。 たとえば、次のコード例では、select メソッドを使用して、前の例の製品データを含む df データフレームから ProductID および ListPrice 列を取得します。
pricelist_df = df.select("ProductID", "ListPrice")
このコード例の結果は次のようになります。
ProductID | ListPrice |
---|---|
771 | 3399.9900 |
772 | 3399.9900 |
773 | 3399.9900 |
... | ... |
ほとんどのデータ操作メソッドと同様に、select は新しいデータフレーム オブジェクトを返します。
ヒント
データフレームから列のサブセットを選択することは一般的な操作であり、次の短い構文を使用して実現することもできます。
pricelist_df = df["ProductID", "ListPrice"]
メソッドを "チェーン" して、変換されたデータフレームを作成する一連の操作を実行できます。 たとえば、次のコード例では、selectメソッドと where メソッドをチェーンして、Mountain Bikes または Road Bikes のカテゴリを持つ製品に対して ProductName 列と ListPrice 列を含む新しいデータフレームを作成します。
bikes_df = df.select("ProductName", "Category", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)
このコード例の結果は次のようになります。
ProductName | カテゴリ | ListPrice |
---|---|---|
Mountain-100 Silver, 38 | マウンテン バイク | 3399.9900 |
Road-750 Black, 52 | ロード バイク | 539.9900 |
... | ... | ... |
データをグループ化して集計するには、groupBy メソッドと集計関数を使用します。 たとえば、次の PySpark コードでは、各カテゴリの製品数をカウントします。
counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)
このコード例の結果は次のようになります。
カテゴリ | count |
---|---|
ヘッドセット | 3 |
ホイール | 14 |
マウンテン バイク | 32 |
... | ... |
データフレームの保存
Spark を使用して生データを変換し、結果を保存してさらに分析やダウンストリーム処理を行うことがよくあります。 次のコード例では、データフレームをデータ レイク内の Parquet ファイルに保存し、同じ名前の既存のファイルを置き換えます。
bikes_df.write.mode("overwrite").parquet('Files/product_data/bikes.parquet')
Note
Parquet 形式は、通常、分析ストアへの追加の分析またはインジェストに使用するデータ ファイルに適しています。 Parquet は、ほとんどの大規模な Data Analytics システムでサポートされている非常に効率的な形式です。 実際、データ変換の要件が、単に別の形式 (CSV など) から Parquet にデータを変換することだけである場合があります。
出力ファイルのパーティション分割
パーティション分割は、Spark でワーカー ノード全体のパフォーマンスを最大化できるようにする最適化の手法です。 不要なディスク IO を排除して、クエリ内のデータをフィルター処理すると、パフォーマンスがより一層向上します。
データフレームをパーティション分割されたファイルのセットとして保存するには、データの書き込み時に partitionBy メソッドを使用します。 次の例では、bikes_df データフレーム (mountain bikes と road bikes というカテゴリの製品データを含む) を保存し、カテゴリ別にデータをパーティション分割しています。
bikes_df.write.partitionBy("Category").mode("overwrite").parquet("Files/bike_data")
データフレームのパーティション分割時に生成されるフォルダー名には、パーティション分割列の名前と値が column=value 形式で含まれているため、このコード例では、次のサブフォルダーを含む bike_data という名前のフォルダーを作成します。
- Category=Mountain Bikes
- Category=Road Bikes
各サブフォルダーには、適切なカテゴリの製品データを含む 1 つ以上の parquet ファイルが含まれています。
Note
データは複数の列でパーティション分割できます。これにより、パーティション キーごとにフォルダーの階層が作成されます。 たとえば、販売注文データを年と月でパーティション分割して、フォルダー階層に各年の値のフォルダーが含まれ、さらに各月の値のサブフォルダーが含まれるようにすることができます。
パーティション分割されたデータの読み込み
パーティション分割されたデータをデータフレームに読み込むときは、パーティション分割されたフィールドの明示的な値またはワイルドカードを指定することで、階層内の任意のフォルダーからデータを読み込むことができます。 次の例では、 Road Bikes カテゴリの製品のデータを読み込みます。
road_bikes_df = spark.read.parquet('Files/bike_data/Category=Road Bikes')
display(road_bikes_df.limit(5))
Note
ファイル パスで指定されたパーティション分割列は、結果のデータフレームでは省略されます。 この例のクエリによって生成される結果には Category 列は含まれません。すべての行のカテゴリは Road Bikes になります。