PySpark の基本
この記事では、PySpark の使用法を示す簡単な例について説明します。 基本的な Apache Spark の概念を理解しており、コンピューティングに接続された Azure Databricks ノートブックでコマンドを実行していることを前提としています。 サンプル データを使用して DataFrame を作成し、このデータに対して行操作や列操作などの基本的な変換を実行し、複数の DataFrame を結合してこのデータを集計し、このデータを視覚化して、テーブルまたはファイルに保存します。
データをアップロードする
この記事の一部の例では、Databricks が提供するサンプル データを使用して、DataFrame を使用したデータの読み込み、変換、保存を行う方法を示します。 Databricks にまだ存在していない独自のデータを使用する場合は、まずそれをアップロードし、そこから DataFrame を作成できます。 「ファイルのアップロードを使用してテーブルを作成または変更する」と「Unity Catalog ボリュームにファイルをアップロードする」をご覧ください。
Databricks サンプル データについて
Databricks は、samples
カタログと /databricks-datasets
ディレクトリにサンプル データを提供します。
samples
カタログ内のサンプル データにアクセスするには、形式samples.<schema-name>.<table-name>
を使用します。 この記事では、架空の企業からのデータが含まれるsamples.tpch
スキーマのテーブルを使用します。customer
テーブルには顧客に関する情報が含まれており、orders
にはそれらの顧客による注文に関する情報が含まれています。dbutils.fs.ls
を使用して、/databricks-datasets
のデータを調べます。 Spark SQL または DataFrame を使用し、ファイルのパスを使ってこの場所のデータのクエリを実行します。 Databricks が提供するサンプル データの詳細については、「サンプル データセット」を参照してください。
データ型のインポート
多くの PySpark 操作では、SQL 関数を使用するか、ネイティブ Spark 型と対話する必要があります。 必要な関数と型のみを直接インポートすることも、モジュール全体をインポートすることもできます。
# import all
from pyspark.sql.types import *
from pyspark.sql.functions import *
# import select functions and types
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import floor, round
インポートされた関数によっては Python の組み込み関数をオーバーライドする場合があるため、一部のユーザーはエイリアスを使用してこれらのモジュールをインポートすることを選びます。 次の例は、Apache Spark のコード例で使用される一般的なエイリアスを示しています。
import pyspark.sql.types as T
import pyspark.sql.functions as F
データ型の包括的な一覧については、Spark のデータ型に関するページを参照してください。
PySpark SQL 関数の包括的な一覧については、Spark の関数に関するページを参照してください。
DataFrame の作成
DataFrame を作成する方法はいくつかあります。 通常、テーブルやファイルのコレクションなどのデータ ソースに対して DataFrame を定義します。 次に、Apache Spark の基本概念のセクションで説明されているように、display
などのアクションを使用して、実行する変換をトリガーします。 display
メソッドは DataFrame を出力します。
指定した値を使用して DataFrame を作成する
指定した値を使用して DataFrame を作成するには、行がタプルのリストとして表現される createDataFrame
メソッドを使用します。
df_children = spark.createDataFrame(
data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
schema = ['name', 'age'])
display(df_children)
出力では、df_children
の列のデータ型が自動的に推論されることに注目してください。 または、スキーマを追加して型を指定することもできます。 スキーマは、名前、データ型、null 値が含まれるかどうかを示すブール値フラグを指定する StructFields
で構成される StructType
を使用して定義されます。 pyspark.sql.types
からデータ型をインポートする必要があります。
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
df_children_with_schema = spark.createDataFrame(
data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
schema = StructType([
StructField('name', StringType(), True),
StructField('age', IntegerType(), True)
])
)
display(df_children_with_schema)
Unity Catalog のテーブルから DataFrame を作成する
Unity Catalog 内のテーブルから DataFrame を作成するには、形式 <catalog-name>.<schema-name>.<table-name>
を使用してテーブルを特定する table
メソッドを使用します。 左側のナビゲーションバーの [カタログ] をクリックし、カタログ エクスプローラーを使用してテーブルに移動します。 それをクリックし、[テーブル パスのコピー] を選んでテーブル パスをノートブックに挿入します。
次の例ではテーブル samples.tpch.customer
を読み込みますが、代わりに独自のテーブルへのパスを指定することもできます。
df_customer = spark.table('samples.tpch.customer')
display(df_customer)
アップロードしたファイルから DataFrame を作成する
Unity Catalog ボリュームにアップロードしたファイルから DataFrame を作成するには、read
プロパティを使用します。 このメソッドは DataFrameReader
を返します。これを使用して適切な形式を読み取ることができます。 左側の小さなサイド バーにあるカタログ オプションをクリックし、カタログ ブラウザーを使用してファイルを見つけます。 それを選び、[ボリュームのファイル パスをコピー] をクリックします。
次の例では *.csv
ファイルから読み取りますが、DataFrameReader
は他の多くの形式でのファイルのアップロードをサポートしています。 DataFrameReader メソッドに関するページを参照してください。
# Assign this variable your full volume file path
volume_file_path = ""
df_csv = (spark.read
.format("csv")
.option("header", True)
.option("inferSchema", True)
.load(volume_file_path)
)
display(df_csv)
Unity Catalog ボリュームの詳細については、「Unity Catalog ボリュームとは」を参照してください。
JSON 応答から DataFrame を作成する
REST API によって返された JSON 応答ペイロードから DataFrame を作成するには、Python requests
パッケージを使用して応答のクエリと解析を行います。 パッケージを使用するには、インポートする必要があります。 この例では、米国食品医薬品局の医薬品申請データベースのデータを使用します。
import requests
# Download data from URL
url = "https://api.fda.gov/drug/drugsfda.json?limit=100"
response = requests.get(url)
# Create the DataFrame
df_drugs = spark.createDataFrame(response.json()["results"])
display(df_drugs)
Databricks での JSON やその他の半構造化データの操作については、「半構造化データをモデル化する」を参照してください。
JSON フィールドまたはオブジェクトを選択する
変換された JSON から特定のフィールドまたはオブジェクトを選ぶには、[]
表記を使用します。 たとえば、それ自体が製品の配列である products
フィールドを選ぶには、次のようにします。
display(df_drugs.select(df_drugs["products"]))
メソッド呼び出しを連鎖して複数のフィールドをトラバースすることもできます。 たとえば、医薬品申請の最初の製品のブランド名を出力するには、次のようにします。
display(df_drugs.select(df_drugs["products"][0]["brand_name"]))
ファイルから DataFrame を作成する
ファイルからの DataFrame の作成を示すために、この例では /databricks-datasets
ディレクトリの CSV データを読み込みます。
サンプル データセットに移動するには、Databricks Utilties ファイル システム コマンドを使用できます。 次の例では、dbutils
を使用して、/databricks-datasets
で使用できるデータセットを一覧表示します。
display(dbutils.fs.ls('/databricks-datasets'))
または、次の例に示すように、%fs
を使用して Databricks CLI ファイル システム コマンドにアクセスできます。
%fs ls '/databricks-datasets'
ファイルまたはファイルのディレクトリから DataFrame を作成するには、load
メソッドでパスを指定します。
df_population = (spark.read
.format("csv")
.option("header", True)
.option("inferSchema", True)
.load("/databricks-datasets/samples/population-vs-price/data_geo.csv")
)
display(df_population)
DataFrame を使用したデータ変換
DataFrame を使用すると、データの並べ替え、フィルター処理、集計を行う組み込みメソッドを使用してデータを簡単に変換できます。 多くの変換は DataFrame のメソッドとして指定されておらず、代わりに spark.sql.functions
パッケージで提供されます。 Databricks Spark SQL 関数に関するページを参照してください。
列の操作
Spark には、多くの基本的な列操作が用意されています。
ヒント
DataFrame 内のすべての列を出力するには、columns
(たとえば df_customer.columns
) を使用します。
列を選択する
select
と col
を使用して特定の列を選択できます。 col
関数は pyspark.sql.functions
サブモジュール内にあります。
from pyspark.sql.functions import col
df_customer.select(
col("c_custkey"),
col("c_acctbal")
)
文字列として定義された式を受け取る expr
を使用して列を参照することもできます。
from pyspark.sql.functions import expr
df_customer.select(
expr("c_custkey"),
expr("c_acctbal")
)
SQL 式を受け入れる selectExpr
を使用することもできます。
df_customer.selectExpr(
"c_custkey as key",
"round(c_acctbal) as account_rounded"
)
文字列リテラルを使用して列を選択するには、以下を行います。
df_customer.select(
"c_custkey",
"c_acctbal"
)
特定の DataFrame から列を明示的に選択するには、[]
演算子または .
演算子を使用します (.
演算子は、整数で始まる列、またはスペースや特殊文字を含む列の選択には使用できません)。これは、一部の列が同じ名前を持つ DataFrame を結合する場合に特に役立ちます。
df_customer.select(
df_customer["c_custkey"],
df_customer["c_acctbal"]
)
df_customer.select(
df_customer.c_custkey,
df_customer.c_acctbal
)
列を作成する
新しい列を作成するには、withColumn
メソッドを使用します。 次の例では、顧客の勘定残高 c_acctbal
が 1000
を超えているかどうかに基づいて、ブール値を含む新しい列を作成します。
df_customer_flag = df_customer.withColumn("balance_flag", col("c_acctbal") > 1000)
列名の変更
列の名前を変更するには、既存の列名と新しい列名を受け入れる withColumnRenamed
メソッドを使用します。
df_customer_flag_renamed = df_customer_flag.withColumnRenamed("balance_flag", "balance_flag_renamed")
alias
メソッドは、集計の一部として列の名前を変更する場合に特に役立ちます。
from pyspark.sql.functions import avg
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
avg(df_customer["c_acctbal"]).alias("avg_account_balance")
)
display(df_segment_balance)
列の型のキャスト
場合によっては、DataFrame 内の 1 つ以上の列のデータ型を変更する必要があります。 これを行うには、cast
メソッドを使用して列のデータ型間で変換します。 次の例は、col
メソッドを使用して列を参照し、列を整数から文字列型に変換する方法を示しています。
from pyspark.sql.functions import col
df_casted = df_customer.withColumn("c_custkey", col("c_custkey").cast(StringType()))
print(type(df_casted))
列を削除する
列を削除するには、select または select(*) except
で列を省略するか、drop
メソッドを使用します。
df_customer_flag_renamed.drop("balance_flag_renamed")
複数の列を一度に削除することもできます。
df_customer_flag_renamed.drop("c_phone", "balance_flag_renamed")
行の操作
Spark には、多くの基本的な行操作が用意されています。
行のフィルター処理
行をフィルター処理するには、DataFrame で filter
または where
メソッドを使用して、特定の行のみを返します。 フィルター処理する列を特定するには、col
メソッドまたは列として評価される式を使用します。
from pyspark.sql.functions import col
df_that_one_customer = df_customer.filter(col("c_custkey") == 412449)
複数の条件でフィルター処理するには、論理演算子を使用します。 たとえば、&
と |
は、それぞれ AND
と OR
の条件を有効にできます。 次の例では、c_nationkey
が 20
に等しく、c_acctbal
が 1000
より大きい行をフィルター処理します。
df_customer.filter((col("c_nationkey") == 20) & (col("c_acctbal") > 1000))
df_filtered_customer = df_customer.filter((col("c_custkey") == 412446) | (col("c_custkey") == 412447))
重複する行の削除
行の重複を除外するには、一意の行のみを返す distinct
を使用します。
df_unique = df_customer.distinct()
null 値の処理
null 値を処理するには、na.drop
メソッドを使用して null 値を含む行を削除します。 このメソッドでは、削除する行が、1 つでも null 値を含む (any
) のか、すべてが null 値である (all
) のかを指定できます。
あらゆる null 値を削除するには、次のいずれかの例を使用します。
df_customer_no_nulls = df_customer.na.drop()
df_customer_no_nulls = df_customer.na.drop("any")
代わりに、すべて null 値のものが含まれる行のみをフィルターで除外する場合は、次を使用します。
df_customer_no_nulls = df_customer.na.drop("all")
次に示すように、これを列のサブセットに適用するには、これを指定します。
df_customer_no_nulls = df_customer.na.drop("all", subset=["c_acctbal", "c_custkey"])
欠損値を埋めるには、fill
メソッドを使用します。 これをすべての列に適用するか、列のサブセットに適用するかを選択できます。 以下の例では、勘定残高 c_acctbal
の値が null である勘定残高には、0
が入力されます。
df_customer_filled = df_customer.na.fill("0", subset=["c_acctbal"])
文字列を他の値に置き換えるには、replace
メソッドを使用します。 以下の例では、空のアドレス文字列は単語 UNKNOWN
に置き換えられます。
df_customer_phone_filled = df_customer.na.replace([""], ["UNKNOWN"], subset=["c_phone"])
行の追加
行を追加するには、union
メソッドを使用して新しい DataFrame を作成する必要があります。 次の例では、先ほど作成した DataFrame df_that_one_customer
と df_filtered_customer
が結合され、3 人の顧客を含む DataFrame が返されます。
df_appended_rows = df_that_one_customer.union(df_filtered_customer)
display(df_appended_rows)
Note
DataFrame をテーブルに書き込んでから新しい行を追加することで、DataFrame を結合することもできます。 運用ワークロードの場合、ターゲット テーブルへのデータ ソースの増分処理により、データのサイズが大きくなるにつれて待機時間とコンピューティング コストが大幅に削減されます。 「Databricks レイクハウスにデータを取り込む」を参照してください。
行の並べ替え
重要
並べ替えは大規模になるとコストがかかる場合があり、並べ替えられたデータを格納し、そのデータを Spark で再読み込みする場合、順序は保証されません。 並べ替えを意図的に使用していることを確認します。
1 つ以上の列で行を並べ替えるには、sort
メソッドまたは orderBy
メソッドを使用します。 既定では、これらのメソッドは昇順に並べ替えられます。
df_customer.orderBy(col("c_acctbal"))
降順でフィルター処理するには、desc
を使用します。
df_customer.sort(col("c_custkey").desc())
次の例は、2 つの列で並べ替える方法を示しています。
df_sorted = df_customer.orderBy(col("c_acctbal").desc(), col("c_custkey").asc())
df_sorted = df_customer.sort(col("c_acctbal").desc(), col("c_custkey").asc())
DataFrame の並べ替え後に返される行数を制限するには、limit
メソッドを使用します。 次の例では、上位 10
件の結果のみを表示します。
display(df_sorted.limit(10))
DataFrame を結合する
複数の DataFrame を結合するには、join
メソッドを使用します。 how
(結合の種類) および on
(結合のベースとなる列) パラメーターで DataFrame を結合する方法を指定できます。 一般的な結合の種類は次のとおりです。
inner
: これは既定の結合の種類であり、DataFrame 全体でon
パラメーターに一致する行のみを保持する DataFrame を返します。left
: 最初に指定された DataFrame のすべての行と、2 番目に指定された DataFrame の行のうち、最初のものと一致する行のみが保持されます。outer
: 外部結合では、一致に関係なく両方の DataFrame のすべての行が保持されます。
結合の詳細については、「Azure Databricks での結合の操作」を参照してください。 PySpark でサポートされている結合の一覧については、DataFrame 結合に関するページを参照してください。
次の例は、orders
DataFrame の各行が customers
DataFrame の対応する行と結合された 1 つの DataFrame を返します。 すべての注文が正確に 1 人の顧客に対応することが想定されるため、内部結合が使用されます。
df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')
df_joined = df_order.join(
df_customer,
on = df_order["o_custkey"] == df_customer["c_custkey"],
how = "inner"
)
display(df_joined)
複数の条件で結合するには、&
や |
などのブール演算子を使用して、それぞれ AND
と OR
を指定します。 次の例では、さらなる条件を追加して、500,000
より大きい o_totalprice
を持つ行のみをフィルター処理します。
df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')
df_complex_joined = df_order.join(
df_customer,
on = ((df_order["o_custkey"] == df_customer["c_custkey"]) & (df_order["o_totalprice"] > 500000)),
how = "inner"
)
display(df_complex_joined)
データの集計
DataFrame 内のデータを集計するには、SQL の GROUP BY
と同様に、groupBy
メソッドを使用してグループ化する列を指定し、agg
メソッドを使用して集計を指定します。 avg
、sum
、max
、min
などの一般的な集計を pyspark.sql.functions
からインポートします。 次の例は、市場セグメント別の平均顧客残高を示しています。
from pyspark.sql.functions import avg
# group by one column
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
avg(df_customer["c_acctbal"])
)
display(df_segment_balance)
from pyspark.sql.functions import avg
# group by two columns
df_segment_nation_balance = df_customer.groupBy("c_mktsegment", "c_nationkey").agg(
avg(df_customer["c_acctbal"])
)
display(df_segment_nation_balance)
一部の集計はアクションです。これは、集計が計算をトリガーすることを意味します。 この場合、結果を出力するために他のアクションを使用する必要はありません。
DataFrame 内の行をカウントするには、count
メソッドを使用します。
df_customer.count()
呼び出しのチェーン
DataFrame を変換するメソッドは DataFrame を返し、Spark はアクションが呼び出されるまで変換に対して作用しません。 この遅延評価は、利便性と可読性を高めるために複数のメソッドを連鎖できることを意味します。 次の例は、フィルター処理、集計、順序付けを連鎖する方法を示しています。
from pyspark.sql.functions import count
df_chained = (
df_order.filter(col("o_orderstatus") == "F")
.groupBy(col("o_orderpriority"))
.agg(count(col("o_orderkey")).alias("n_orders"))
.sort(col("n_orders").desc())
)
display(df_chained)
DataFrame を視覚化する
ノートブックで DataFrame を視覚化するには、DataFrame の左上隅にあるテーブルの横の + 記号をクリックし、[視覚化] を選んで DataFrame に基づいて 1 つ以上のグラフを追加します。 視覚化の詳細については、「Databricks ノートブックでの視覚化」を参照してください。
display(df_order)
追加の視覚化を実行するには、Databricks では Spark 用 pandas API を使用することをお勧めします。 .pandas_api()
を使用すると、Spark DataFrame の対応する Pandas API にキャストできます。 詳細については、「Spark の Pandas API」を参照してください。
データを保存する
データを変換したら、DataFrameWriter
メソッドを使用してデータを保存できます。 これらのメソッドの完全な一覧は、DataFrameWriter に関するページにあります。 次のセクションでは、DataFrame をテーブルとして、およびデータ ファイルのコレクションとして保存する方法を示します。
DataFrame をテーブルとして保存する
DataFrame を Unity Catalog のテーブルとして保存するには、write.saveAsTable
メソッドを使用し、<catalog-name>.<schema-name>.<table-name>
の形式でパスを指定します。
df_joined.write.saveAsTable(f"{catalog_name}.{schema_name}.{table_name}")
DataFrame を CSV として書き込む
DataFrame を *.csv
形式で書き込むには、write.csv
メソッドを使用して形式とオプションを指定します。 既定では、指定されたパスにデータが存在する場合、書き込み操作は失敗します。 次のモードのいずれかを指定して、別のアクションを実行できます。
overwrite
は、ターゲット パス内のすべての既存のデータを DataFrame の内容で上書きします。append
は、DataFrame のコンテンツをターゲット パス内のデータに追加します。- データがターゲット パスに存在する場合、
ignore
は書き込みを警告なしで失敗します。
次の例は、CSV ファイルとして DataFrame コンテンツでデータを上書きする方法を示しています。
# Assign this variable your file path
file_path = ""
(df_joined.write
.format("csv")
.mode("overwrite")
.write(file_path)
)
次のステップ
Databricks で Spark 機能を活用するには、以下を参照してください。