演習 - Azure Synapse パイプライン内で Notebook を統合する
このユニットでは、Azure Synapse Spark ノートブックを作成して、マッピング データ フローによって読み込まれたデータを分析および変換し、そのデータをデータ レイクに格納します。 ノートブックでデータ レイクに書き込まれたデータのフォルダー名を定義する文字列パラメーターを受け取るパラメーター セルを作成します。
その後、このノートブックを Synapse パイプラインに追加し、一意のパイプライン実行 ID をノートブック パラメーターに渡します。これにより、パイプライン実行と、ノートブック アクティビティによって保存されたデータを関連付けることができます。
最後に、Synapse Studio の [監視] ハブを使用して、パイプライン実行を監視し、実行 ID を取得してから、データ レイクに格納されている対応するファイルを見つけます。
Apache Spark とノートブックについて
Apache Spark は、ビッグデータ分析アプリケーションのパフォーマンスを向上させるメモリ内処理をサポートする並列処理フレームワークです。 Azure Synapse Analytics の Apache Spark は、Apache Spark を Microsoft がクラウドに実装したものです。
Azure Synapse Studio の Apache Spark ノートブックは、ライブ コード、視覚化、説明テキストを含むファイルを作成するための Web インターフェイスです。 ノートブックは、アイデアを確認し、簡単な実験を使用してデータから分析情報を得るのに最適な場所です。 また、ノートブックは、データの準備、データの視覚化、機械学習、およびその他のビッグ データのシナリオで広く使用されています。
Synapse Spark ノートブックを作成する
Synapse Analytics で、ユーザー プロファイル データを処理、結合、インポートするためのマッピング データ フローが既に作成されているとします。 ここで、ユーザーに好まれ、かつ上位に選ばれており、過去 12 か月で最も購入数が多い製品という条件に基づいて、ユーザーごとに上位 5 つの製品を検索したいと考えています。 その後、全体で上位 5 つの製品を計算したいと考えています。
この演習では、これらの計算を行うために Synapse Spark ノートブックを作成します。
Synapse Analytics Studio (https://web.azuresynapse.net/) を開き、[データ] ハブに移動します。
[リンク] タブ (1) を選択し、[Azure Data Lake Storage Gen2] の下の Data Lake Storage のプライマリ アカウント (2) を展開します。 [wwi-02] コンテナー (3) を選択し、top-products フォルダー (4) を開きます。 任意の Parquet ファイル (5) を右クリックし、[新しいノートブック] メニュー項目 (6)、[Load to DataFrame](データフレームに読み込む) (7) の順に選択します。 フォルダーが表示されない場合は、[更新] を選択します。
Refresh
ノートブックが Spark プールにアタッチされていることを確認します。
Parquet ファイル名を
*.parquet
(1) に置き換えて、top-products
フォルダー内のすべての Parquet ファイルを選択します。 たとえば、パスはabfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top-products/*.parquet
のようになります。ノートブックを実行するには、ノートブック ツール バーの [すべて実行] を選択します。
注意
Spark プールでノートブックを初めて実行すると、Synapse によって新しいセッションが作成されます。 これには、3 分から 5 分ほどかかる可能性があります。
注意
セルだけを実行するには、セルの上にポインターを合わせ、セルの左側にある [セルの実行] アイコンを選択するか、セルを選んでから Ctrl + Enter キーを押します。
[+] ボタンを選び、[コード セル] 項目を選んで、下に新しいセルを作成します。 [+] ボタンは、左側のノートブック セルの下にあります。 また、ノートブック ツールバーの [+ セル] メニューを展開して、[コード セル] 項目を選択することもできます。
新しいセルで次のコマンドを実行して
topPurchases
という新しいデータフレームを設定し、top_purchases
という名前の新しい一時ビューを作成して、最初の 100 行を表示します。topPurchases = df.select( "UserId", "ProductId", "ItemsPurchasedLast12Months", "IsTopProduct", "IsPreferredProduct") # Populate a temporary view so we can query from SQL topPurchases.createOrReplaceTempView("top_purchases") topPurchases.show(100)
出力は次のようになります。
+------+---------+--------------------------+------------+------------------+ |UserId|ProductId|ItemsPurchasedLast12Months|IsTopProduct|IsPreferredProduct| +------+---------+--------------------------+------------+------------------+ | 148| 2717| null| false| true| | 148| 4002| null| false| true| | 148| 1716| null| false| true| | 148| 4520| null| false| true| | 148| 951| null| false| true| | 148| 1817| null| false| true| | 463| 2634| null| false| true| | 463| 2795| null| false| true| | 471| 1946| null| false| true| | 471| 4431| null| false| true| | 471| 566| null| false| true| | 471| 2179| null| false| true| | 471| 3758| null| false| true| | 471| 2434| null| false| true| | 471| 1793| null| false| true| | 471| 1620| null| false| true| | 471| 1572| null| false| true| | 833| 957| null| false| true| | 833| 3140| null| false| true| | 833| 1087| null| false| true|
SQL を使用して新しい一時ビューを作成するには、新しいセルで次のコマンドを実行します。
%%sql CREATE OR REPLACE TEMPORARY VIEW top_5_products AS select UserId, ProductId, ItemsPurchasedLast12Months from (select *, row_number() over (partition by UserId order by ItemsPurchasedLast12Months desc) as seqnum from top_purchases ) a where seqnum <= 5 and IsTopProduct == true and IsPreferredProduct = true order by a.UserId
注意
このクエリの出力はありません。
このクエリでは、
top_purchases
一時ビューをソースとして使用し、row_number() over
メソッドを適用して、ItemsPurchasedLast12Months
が最大である各ユーザーのレコードに行番号を適用します。where
句は結果をフィルター処理するため、IsTopProduct
とIsPreferredProduct
の両方が true に設定されている最大 5 つの製品のみを取得することになります。 これにより、Azure Cosmos DB に格納されているユーザー プロファイルに従って、ユーザーごとに購入数の最も多い上位 5 つの製品が得られ、それらの製品は "さらに" お気に入りの製品としても識別されます。新しいセルで次のコマンドを実行し、前のセルで作成した
top_5_products
一時ビューの結果を格納する新しいデータフレームを作成して表示します。top5Products = sqlContext.table("top_5_products") top5Products.show(100)
好まれている上位 5 つの製品をユーザーごとに表示した次のような出力が表示されます。
顧客に好まれ、購入数が最も多い製品という条件に基づいた全体で上位 5 つの製品を計算します。 これを行うには、新しいセルで次のコマンドを実行します。
top5ProductsOverall = (top5Products.select("ProductId","ItemsPurchasedLast12Months") .groupBy("ProductId") .agg( sum("ItemsPurchasedLast12Months").alias("Total") ) .orderBy( col("Total").desc() ) .limit(5)) top5ProductsOverall.show()
このセルでは、上位 5 つの好まれている商品を商品 ID 別にグループ化し、その製品が過去 12 か月に購入された合計数を計算し、その値を降順で並べ替えて、上位 5 つの結果を返しました。 出力は次のようになります。
+---------+-----+ |ProductId|Total| +---------+-----+ | 2107| 4538| | 4833| 4533| | 347| 4523| | 3459| 4233| | 4246| 4155| +---------+-----+
パラメーター セルを作成する
Azure Synapse パイプラインでは、パラメーター セルを検索し、このセルを、実行時に渡されるパラメーターの既定値として扱います。 実行エンジンにより、既定値を上書きするために、入力パラメーターを含んだ新しいセルがパラメーター セルの下に追加されます。 パラメーター セルが指定されていない場合、挿入されたセルがノートブックの上部に挿入されます。
ここでは、このノートブックをパイプラインから実行します。 Parquet ファイルに名前を付けるために使用される
runId
変数の値を設定するパラメーターを渡す必要があります。 新しいセルで次のコマンドを実行します。import uuid # Generate random GUID runId = uuid.uuid4()
Spark に付属されている
uuid
ライブラリを使用して、ランダムな GUID を生成します。 パイプラインによって渡されたパラメーターを使用して、runId
変数をオーバーライドする必要があります。 このためには、これをパラメーター セルとして切り替える必要があります。セル (1) の右上隅にあるアクションの省略記号 (...) を選択してから、[Toggle parameter cell](パラメーター セルを切り替える) (2) を選びます。
このオプションに切り替えると、セルに [パラメーター] タグが表示されます。
新しいセルに次のコードを貼り付けて、Data Lake のプライマリ アカウントの
/top5-products/
パスで、Parquet ファイル名としてrunId
変数を使用します。 Data Lake のプライマリ アカウントで、パスのYOUR_DATALAKE_NAME
を置き換えます。 これを見つけるには、ページの上部にあるセル 1(1) までスクロールします。 パス (2) から Data Lake Storage アカウントをコピーします。 この値をYOUR_DATALAKE_NAME
の代わりとして、新しいセル内のパス (3) に貼り付けてから、そのセルでコマンドを実行します。%%pyspark top5ProductsOverall.write.parquet('abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top5-products/' + str(runId) + '.parquet')
ファイルがデータ レイクに書き込まれたことを確認します。 [データ] ハブに移動して、[リンク] タブ (1) を選択します。 Data Lake Storage のプライマリ アカウントを展開してから、[wwi-02] コンテナー (2) を選択します。 top5-products フォルダー (3) に移動します。 ディレクトリに、ファイル名 (4) として GUID を含む Parquet ファイルのフォルダーが表示されるはずです。
このディレクトリは、以前は存在していなかったため、ノートブック セル内のデータフレームに対する Parquet の書き込みメソッドによって作成されました。
ノートブックを Synapse パイプラインに追加する
この演習の冒頭で説明したマッピング データ フローに戻り、データ フローがオーケストレーション プロセスの一部として実行された後にこのノートブックを実行するとします。 これを行うには、このノートブックを新しいノートブック アクティビティとしてパイプラインに追加します。
ノートブックに戻ります。 ノートブックの右上隅にある [プロパティ] (1) を選択してから、[名前] (2) に「
Calculate Top 5 Products
」と入力します。ノートブックの右上隅にある [パイプラインに追加] (1) を選択してから、[既存のパイプライン] (2) を選びます。
[Write User Profile Data to ASA](ユーザー プロファイル データを ASA に書き込む) パイプライン (1) を選んでから、[追加] *(2) を選択します。
Synapse Studio により、ノートブック アクティビティがパイプラインに追加されます。 [ノートブック] アクティビティが [データ フロー] アクティビティの右側に配置されるように再配置します。 [データ フロー] アクティビティを選択し、パイプライン接続の成功アクティビティを示す緑色のボックスを [ノートブック] アクティビティにドラッグします。
成功アクティビティの矢印により、[データ フロー] アクティビティが正常に実行された後、[ノートブック] アクティビティを実行するようにパイプラインに示されます。
[ノートブック] アクティビティ (1)、[設定] タブ (2) の順に選択し、[基本パラメーター] (3) を展開してから、[+ 新規] (4) を選択します。 [名前] フィールド (5) に「
runId
」と入力します。 [型] (6) として [文字列] を選択します。 [値] として、[動的なコンテンツの追加] (7) を選択します。[システム変数] (1) で、[Pipeline run ID](パイプラインの実行 ID) を選択します。 これにより、動的コンテンツ ボックス (2) に
@pipeline().RunId
が追加されます。 [完了] (3) を選択してダイアログ ボックスを閉じます。パイプラインの実行 ID の値は、各パイプライン実行に割り当てられた一意の GUID です。 この値を
runId
ノートブック パラメーターとして渡して、Parquet ファイルの名前に使用します。 パイプライン実行の履歴を調べて、各パイプライン実行用に作成されている特定の Parquet ファイルを見つけることができます。[すべて発行]、[発行] の順に選択して、変更を保存します。
発行が完了したら、[トリガーの追加] (1)、[Trigger now](今すぐトリガー) (2) の順に選択して、更新されたパイプラインを実行します。
[OK] を選択してトリガーを実行します。
パイプラインの実行を監視します
[監視] ハブを使用すると、SQL、Apache Spark、パイプラインの現在および過去のアクティビティを監視できます。
[監視] ハブに移動します。
[パイプラインの実行] (1) を選択し、パイプライン実行が正常に完了する (2) まで待ちます。 場合によっては、ビューを更新する (3) 必要があります。
パイプラインのアクティビティの実行を表示するには、そのパイプラインの名前を選択します。
[データ フロー] アクティビティと新しい[ノートブック] アクティビティ (1) の両方に注目してください。 [Pipeline run ID](パイプラインの実行 ID) の値 (2) をメモします。 これを、ノートブックによって生成された Parquet ファイルの名前と比較します。 [Calculate Top 5 Products] ノートブック名を選択して、その詳細を表示します (3)。
ここで、ノートブック実行の詳細を確認します。 [再生] (1) を選択して、ジョブ (2) の進行状況の再生を監視することができます。 下部では、さまざまなフィルター オプション (3) を使用して [診断] と [ログ] を表示できます。 右側には、実行時間、Livy ID、Spark プールの詳細など、実行の詳細が表示されます。 ジョブの [詳細の表示] リンクを選択すると、その詳細 (5) が表示されます。
新しいタブで、ステージの詳細を表示できる Spark アプリケーション UI が開きます。 [DAG Visualization](DAG の視覚化) を展開すると、ステージの詳細が表示されます。
[データ] ハブに戻ります。
[リンク] タブ (1) を選択してから、Data Lake Storage のプライマリ アカウントの [wwi-02] コンテナー (2) を選び、top5-products フォルダー (3) に移動して、名前がパイプラインの実行 ID と一致する Parquet ファイルのフォルダーが存在していることを確認します。
ご覧のとおり、前述のパイプラインの実行 ID と名前が一致するファイルがあります。
これらの値が一致するのは、[ノートブック] アクティビティの
runId
パラメーターにパイプラインの実行 ID を渡したためです。