Azure Databricks の PySpark
Azure Databricks は、ビッグ データと機械学習用の統合分析エンジンである Apache Spark 上に構築されています。 PySpark は、学習、実装、保守が容易な柔軟な言語である Python プログラミング言語を使用して Apache Spark とインターフェイスするのに役立ちます。 また、Databricks でのデータ視覚化のための多くのオプションも用意されています。 PySpark は、Python と Apache Spark の機能を組み合わせた機能です。
この記事では、Databricks での PySpark の基礎の概要について説明します。
Spark の概念の紹介
PySpark を使い始める前に、Apache Spark の主要な概念を理解しておくことが重要です。
DataFrames
DataFrame は、Apache Spark の主となるオブジェクトです。 DataFrame は、名前付きの列に編成されたデータセットです。 DataFrame は、スプレッドシートや SQL テーブルのような、一連のレコード (テーブルの行のようなもの) とさまざまな型の列から成る、2 次元のラベル付きデータ構造と考えることができます。 DataFrame が提供する豊富な機能セット (列の選択、フィルター、結合、集計など) を使って、データの操作と分析の一般的なタスクを効率よく実行できます。
DataFrame の重要な要素には、次のようなものがあります。
- スキーマ: スキーマは、DataFrame の列の名前と型を定義します。 データ形式には、スキーマの定義と適用に関するさまざまなセマンティクスがあります。 データ ソースには、スキーマ情報を提供するものや、手動のスキーマ定義に依存するものや、スキーマを推論できるものがあります。 スキーマは、ユーザーが手動で定義することも、データ ソースから読み取ることもできます。
- 行: Spark では、DataFrame 内のレコードは
Row
オブジェクトとして表されます。 Delta Lake などの基になるデータ形式ではデータの格納に列が使われますが、Spark では、最適化のため、行を使ってデータのキャッシュとシャッフルが行われます。 - 列: Spark の列はスプレッドシートの列に似ていますが、文字列や整数などの単純な型だけでなく、配列、マップ、null などの複雑な型も表すことができます。 データ ソースの列を選択、操作、または削除するクエリを記述できます。 使用できるデータ ソースには、テーブル、ビュー、ファイル、または他の DataFrame が含まれます。 データセットや DataFrame から列が削除されることはありません。
.drop
変換やselect
ステートメントでの省略により、結果から省略されるだけです。
データ処理
Apache Spark は、遅延評価を使って、DataFrame で定義された変換やアクションを処理します。 これらの概念は、Spark でのデータ処理を理解するための基本です。
変換: Spark では処理ロジックを変換として表現します。これは、DataFrame を使ってデータを読み込んで操作するための手順です。 一般的な変換としては、データの読み取り、結合、集計、型キャストなどがあります。 Azure Databricks での変換については、「データを変換する」をご覧ください。
遅延評価: Spark は、変換で指定されているロジックを評価するための最も効率的な物理プランを特定して、データ処理を最適化します。 ただし、Spark はアクションが呼び出されるまで変換に対して影響を与えません。 Spark は、指定されている順序で正確に各変換を評価するのではなく、アクションですべての変換の計算がトリガーされるまで待ちます。 これは遅延評価または遅延読み込みと呼ばれ、Spark は実行を、定義されたときにすぐに実行するのではなく、遅延方式で処理するため、複数の操作を "チェーン" できます。
Note
遅延評価とは、DataFrame が論理クエリを、メモリ内の結果としてではなく、データ ソースに対する一連の命令として格納することを意味します。 これは、Pandas の DataFrame で使われるモデルである即時実行とは大きく異なります。
アクション: アクションは、1 つ以上の DataFrame で一連の変換の結果を計算するよう Spark に指示します。 アクションの操作は値を返し、次のいずれかになります。
- コンソールまたはエディターでデータを出力するアクション (
display
やshow
など) - データを収集するアクション (
Row
オブジェクトを返します) (take(n)
、first
、head
など) - データ ソースに書き込むアクション (
saveAsTable
など) - 計算をトリガーする集計 (
count
など)
重要
運用データ パイプラインには、通常、データ書き込みアクションのみが存在する必要があります。 他のすべてのアクションはクエリの最適化を妨げ、ボトルネックにつながる可能性があります。
DataFrame が不変であるとは何を意味するか?
DataFrame は、1 つ以上のデータ ソースに対して定義された変換とアクションのコレクションですが、最終的に Apache Spark はクエリを元のデータ ソースに解決するため、データ自体は変更されず、DataFrame が変更されることはありません。 つまり、DataFrame は "不変" です。 このため、変換の実行後に返される新しい DataFrame に後続の操作でアクセスするには、それを変数に保存する必要があります。 変換の中間ステップを評価する場合は、アクションを呼び出します。
API とライブラリ
Spark のすべての API と同様に、PySpark には、次のような強力な機能を有効にしてサポートする多くの API とライブラリが用意されています。
- Spark SQL と DataFrames を使用したリレーショナル クエリによる構造化データの処理。 Spark SQL を使用すると、SQL クエリと Spark プログラムを混在できます。 Spark DataFrames を使用すると、Python と SQL を使用してデータの読み取り、書き込み、変換、分析を効率的に行うことができます。つまり、常に Spark の全機能を活用できます。 PySpark の概要を参照してください。
- 構造化ストリーミングを使用したストリームのスケーラブルな処理。 ストリーミング計算は、静的データに対するバッチ計算を表現するのと同じ方法で表現でき、ストリーミング データの到着が続くにつれて、Spark SQL エンジンによって増分的および継続的に実行されます。 「構造化ストリーミングの概要」を参照してください。
- Spark 上の Pandas API を使用して Apache Spark で動作する Pandas データ構造とデータ分析ツール。 Spark 上の Pandas API を使用すると、Pandas (テスト、小規模なデータセット) と Spark (運用、分散データセット) で動作する単一のコードベースを使用して、複数のノードに分散された Pandas ワークロードを任意のサイズにスケーリングできます。 「Spark 上の Pandas API の概要」を参照してください。
- 機械学習 (MLLib) を使用した機械学習アルゴリズム。 MLlib は Spark 上に構築されたスケーラブルな機械学習ライブラリであり、ユーザーが実用的な機械学習パイプラインを作成および調整するのに役立つ API の統一されたセットを提供します。 「機械学習ライブラリの概要」を参照してください。
- GraphX を使用したグラフとグラフ並列計算。 GraphX では、各頂点とエッジにプロパティがアタッチされた新しい有向マルチグラフが導入され、グラフの計算演算子、アルゴリズム、ビルダーが公開され、グラフ分析タスクが簡略化されます。 「GraphX の概要」を参照してください。
Spark のチュートリアル
Databricks での PySpark の使用例については、次の記事を参照してください。
Apache Spark のドキュメントにも、Spark を学習するための次のようなクイックスタートとガイドがあります。
- PySpark DataFrames のクイックスタート
- Spark SQL の概要
- 構造化ストリーミング プログラミング ガイド
- Spark 上の Pandas API のクイックスタート
- 機械学習ライブラリ プログラミング ガイド
PySpark のリファレンス
Azure Databricks では、PySpark API とそれに対応するリファレンスの独自のバージョンが保持されています。このリファレンスは、次のセクションで確認できます。