Spark データフレーム内のデータを操作する

完了

Spark では、"耐障害性分散データセット" (RDD) と呼ばれるデータ構造がネイティブで使用されます。ただし、RDD で直接動作するコードを記述 "できる" ものの、Spark で構造化データを操作するために最もよく使用されるデータ構造は、Spark SQL ライブラリの一部として提供される "データフレーム"です。 Spark のデータフレームは、ユビキタス Pandas Python ライブラリのデータフレームと似ていますが、Spark の分散処理環境で動作するように最適化されています。

注意

データフレーム API に加えて、Spark SQL では、Java と Scala でサポートされている厳密に型指定された "データセット" API が提供されます。 このモジュールでは、Dataframe API に焦点を当てます。

データフレームにデータを読み込む

仮説の例を見て、データフレームを使用してデータを操作する方法を確認しましょう。 レイクハウスの Files/data フォルダー内の products.csv という名前のコンマ区切りテキスト ファイルに、次のデータがあるとします。

ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

スキーマの推論

Spark ノートブックで、次の PySpark コードを使用してデータフレームにファイル データを読み込み、最初の 10 行を表示できます。

%%pyspark
df = spark.read.load('Files/data/products.csv',
    format='csv',
    header=True
)
display(df.limit(10))

先頭の %%pyspark 行は "マジック" と呼ばれ、このセルで使用される言語が PySpark であることを Spark に伝えます。 ノートブック インターフェイスのツール バーで既定として使用する言語を選択し、マジックを使用して特定のセルの選択をオーバーライドできます。 たとえば、製品データの同等の Scala コードの例を次に示します。

%%spark
val df = spark.read.format("csv").option("header", "true").load("Files/data/products.csv")
display(df.limit(10))

マジック %%spark は Scala を指定するために使用されます。

これらのコード サンプルの両方で、次のような出力が生成されます。

ProductID ProductName カテゴリ ListPrice
771 Mountain-100 Silver, 38 マウンテン バイク 3399.9900
772 Mountain-100 Silver, 42 マウンテン バイク 3399.9900
773 Mountain-100 Silver, 44 マウンテン バイク 3399.9900
... ... ... ...

明示的なスキーマの指定

前の例では、CSV ファイルの最初の行に列名が含まれており、Spark により、含まれているデータから各列のデータ型を推論できました。 また、データの明示的なスキーマを指定することもできます。これは、次の CSV の例のように、データ ファイルに列名が含まれていない場合に便利です。

771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

次の PySpark の例は、product-data.csv という名前のファイルからデータフレームを読み込むスキーマをこの形式で指定する方法を示しています。

from pyspark.sql.types import *
from pyspark.sql.functions import *

productSchema = StructType([
    StructField("ProductID", IntegerType()),
    StructField("ProductName", StringType()),
    StructField("Category", StringType()),
    StructField("ListPrice", FloatType())
    ])

df = spark.read.load('Files/data/product-data.csv',
    format='csv',
    schema=productSchema,
    header=False)
display(df.limit(10))

ここでも、結果は次のようになります。

ProductID ProductName カテゴリ ListPrice
771 Mountain-100 Silver, 38 マウンテン バイク 3399.9900
772 Mountain-100 Silver, 42 マウンテン バイク 3399.9900
773 Mountain-100 Silver, 44 マウンテン バイク 3399.9900
... ... ... ...

ヒント

明示的なスキーマを指定すると、パフォーマンスも向上します。

データフレームのフィルター処理とグループ化を行う

Dataframe クラスのメソッドを使用して、含まれているデータをフィルター処理、並べ替え、グループ化、操作できます。 たとえば、次のコード例では、select メソッドを使用して、前の例の製品データを含む df データフレームから ProductID および ListPrice 列を取得します。

pricelist_df = df.select("ProductID", "ListPrice")

このコード例の結果は次のようになります。

ProductID ListPrice
771 3399.9900
772 3399.9900
773 3399.9900
... ...

ほとんどのデータ操作メソッドと同様に、select は新しいデータフレーム オブジェクトを返します。

ヒント

データフレームから列のサブセットを選択することは一般的な操作であり、次の短い構文を使用して実現することもできます。

pricelist_df = df["ProductID", "ListPrice"]

メソッドを "チェーン" して、変換されたデータフレームを作成する一連の操作を実行できます。 たとえば、次のコード例では、selectメソッドと where メソッドをチェーンして、Mountain Bikes または Road Bikes のカテゴリを持つ製品に対して ProductName 列と ListPrice 列を含む新しいデータフレームを作成します。

bikes_df = df.select("ProductName", "Category", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)

このコード例の結果は次のようになります。

ProductName カテゴリ ListPrice
Mountain-100 Silver, 38 マウンテン バイク 3399.9900
Road-750 Black, 52 ロード バイク 539.9900
... ... ...

データをグループ化して集計するには、groupBy メソッドと集計関数を使用します。 たとえば、次の PySpark コードでは、各カテゴリの製品数をカウントします。

counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)

このコード例の結果は次のようになります。

カテゴリ count
ヘッドセット 3
ホイール 14
マウンテン バイク 32
... ...

データフレームの保存

Spark を使用して生データを変換し、結果を保存してさらに分析やダウンストリーム処理を行うことがよくあります。 次のコード例では、データフレームをデータ レイク内の Parquet ファイルに保存し、同じ名前の既存のファイルを置き換えます。

bikes_df.write.mode("overwrite").parquet('Files/product_data/bikes.parquet')

Note

Parquet 形式は、通常、分析ストアへの追加の分析またはインジェストに使用するデータ ファイルに適しています。 Parquet は、ほとんどの大規模な Data Analytics システムでサポートされている非常に効率的な形式です。 実際、データ変換の要件が、単に別の形式 (CSV など) から Parquet にデータを変換することだけである場合があります。

出力ファイルのパーティション分割

パーティション分割は、Spark でワーカー ノード全体のパフォーマンスを最大化できるようにする最適化の手法です。 不要なディスク IO を排除して、クエリ内のデータをフィルター処理すると、パフォーマンスがより一層向上します。

データフレームをパーティション分割されたファイルのセットとして保存するには、データの書き込み時に partitionBy メソッドを使用します。 次の例では、bikes_df データフレーム (mountain bikesroad bikes というカテゴリの製品データを含む) を保存し、カテゴリ別にデータをパーティション分割しています。

bikes_df.write.partitionBy("Category").mode("overwrite").parquet("Files/bike_data")

データフレームのパーティション分割時に生成されるフォルダー名には、パーティション分割列の名前と値が column=value 形式で含まれているため、このコード例では、次のサブフォルダーを含む bike_data という名前のフォルダーを作成します。

  • Category=Mountain Bikes
  • Category=Road Bikes

各サブフォルダーには、適切なカテゴリの製品データを含む 1 つ以上の parquet ファイルが含まれています。

Note

データは複数の列でパーティション分割できます。これにより、パーティション キーごとにフォルダーの階層が作成されます。 たとえば、販売注文データを年と月でパーティション分割して、フォルダー階層に各年の値のフォルダーが含まれ、さらに各月の値のサブフォルダーが含まれるようにすることができます。

パーティション分割されたデータの読み込み

パーティション分割されたデータをデータフレームに読み込むときは、パーティション分割されたフィールドの明示的な値またはワイルドカードを指定することで、階層内の任意のフォルダーからデータを読み込むことができます。 次の例では、 Road Bikes カテゴリの製品のデータを読み込みます。

road_bikes_df = spark.read.parquet('Files/bike_data/Category=Road Bikes')
display(road_bikes_df.limit(5))

Note

ファイル パスで指定されたパーティション分割列は、結果のデータフレームでは省略されます。 この例のクエリによって生成される結果には Category 列は含まれません。すべての行のカテゴリは Road Bikes になります。