练习 - 在 Azure Synapse Pipelines 中集成笔记本
在此单元中,你将创建一个 Azure Synapse Spark 笔记本,用于分析和转换由映射数据流加载的数据,并将数据存储在数据湖中。 创建一个参数单元,该单元格接受一个字符串参数,该参数定义笔记本写入数据湖的数据的文件夹名称。
然后,将此笔记本添加到 Synapse 管道,并将唯一的管道运行 ID 传递到笔记本参数,以便稍后可以将管道运行与笔记本活动保存的数据关联起来。
最后,使用 Synapse Studio 中的监视中心来监视管道运行,获取运行 ID,然后查找存储在数据湖中的相应文件。
关于 Apache Spark 和笔记本
Apache Spark 是并行处理框架,支持使用内存中处理来提升大数据分析应用程序的性能。 Azure Synapse Analytics 中的 Apache Spark 是 Apache Spark 在云中的一种 Microsoft 实现。
Synapse Studio 中的 Apache Spark 笔记本是一个 Web 界面,用于创建包含实时代码、可视化效果和叙述性文本的文件。 笔记本是验证想法并使用快速试验从数据中获取见解的好地方。 笔记本还广泛用于数据准备、数据可视化、机器学习和其他大数据方案。
创建 Synapse Spark 笔记本
假设你在 Synapse Analytics 中创建了一个映射数据流来处理、联接和导入用户配置文件数据。 现在,你需要根据过去 12 个月里的首选以及最受欢迎的产品以及购买量最大的产品,为每个用户找到排名前 5 的产品。 然后,你需要计算出总体排名前 5 的产品。
在此练习中,你将创建 Synapse Spark 笔记本来执行这些计算。
打开 Synapse Analytics Studio (https://web.azuresynapse.net/),然后前往“数据”中心。
选择“链接”选项卡 (1),并展开“Azure Data Lake Storage Gen2”下的“主数据湖存储帐户(2)”。 选择“wwi-02”容器 (3) 并打开“top-products”文件夹 (4)。 右键单击任意 Parquet 文件 (5),选择“新建笔记本”菜单项 (6),然后选择“加载到数据帧(7)”。 如果看不到该文件夹,请选择
Refresh
。确保笔记本附加到 Spark 池。
将 Parquet 文件名替换为
*.parquet
(1) 以选择top-products
文件夹中的所有 Parquet 文件。 例如,路径应类似于:abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top-products/*.parquet
。在笔记本工具栏上选择“全部运行”,以执行笔记本。
注意
首次在 Spark 池中运行笔记本时,Synapse 会创建一个新的会话。 这大约需要 3-5 分钟时间。
注意
若要仅运行单元格,将鼠标悬停在单元格上,然后选择单元格左侧的“运行单元格”图标,或者选中单元格,再按下 Ctrl+Enter 键。
通过选择 + 按钮并选择“代码单元格”项在下面创建新的单元格。 “+”按钮位于左侧笔记本单元格下方。 此外,还可以在笔记本工具栏中展开“+ 单元格”菜单,然后选择“代码单元格”项。
在新单元格中运行以下命令,以填充名为
topPurchases
的新数据帧,创建名为top_purchases
的新临时视图,并显示前 100 行:topPurchases = df.select( "UserId", "ProductId", "ItemsPurchasedLast12Months", "IsTopProduct", "IsPreferredProduct") # Populate a temporary view so we can query from SQL topPurchases.createOrReplaceTempView("top_purchases") topPurchases.show(100)
输出应如下所示:
+------+---------+--------------------------+------------+------------------+ |UserId|ProductId|ItemsPurchasedLast12Months|IsTopProduct|IsPreferredProduct| +------+---------+--------------------------+------------+------------------+ | 148| 2717| null| false| true| | 148| 4002| null| false| true| | 148| 1716| null| false| true| | 148| 4520| null| false| true| | 148| 951| null| false| true| | 148| 1817| null| false| true| | 463| 2634| null| false| true| | 463| 2795| null| false| true| | 471| 1946| null| false| true| | 471| 4431| null| false| true| | 471| 566| null| false| true| | 471| 2179| null| false| true| | 471| 3758| null| false| true| | 471| 2434| null| false| true| | 471| 1793| null| false| true| | 471| 1620| null| false| true| | 471| 1572| null| false| true| | 833| 957| null| false| true| | 833| 3140| null| false| true| | 833| 1087| null| false| true|
在新单元格中运行以下命令,使用 SQL 创建新的临时视图:
%%sql CREATE OR REPLACE TEMPORARY VIEW top_5_products AS select UserId, ProductId, ItemsPurchasedLast12Months from (select *, row_number() over (partition by UserId order by ItemsPurchasedLast12Months desc) as seqnum from top_purchases ) a where seqnum <= 5 and IsTopProduct == true and IsPreferredProduct = true order by a.UserId
注意
此查询没有任何输出。
查询使用
top_purchases
临时视图作为源,并应用row_number() over
方法为ItemsPurchasedLast12Months
最大的每个用户的记录应用行号。where
子句将筛选结果,因此我们仅检索最多五个产品,其中IsTopProduct
和IsPreferredProduct
均设置为 true。 根据 Azure Cosmos DB 中存储的用户配置文件,我们获得了每个用户购买最多的前 5 个产品,这些产品还被标识为他们最喜欢的产品。在新单元格中运行以下命令,创建并显示新的数据帧,用于存储在上一单元中创建的
top_5_products
临时视图的结果:top5Products = sqlContext.table("top_5_products") top5Products.show(100)
应会看到如下所示的输出,其中显示了每个用户的前五个首选产品:
根据客户首选且购买最多的产品,计算总体排名前五的产品。 为此,请在新单元格中运行以下命令:
top5ProductsOverall = (top5Products.select("ProductId","ItemsPurchasedLast12Months") .groupBy("ProductId") .agg( sum("ItemsPurchasedLast12Months").alias("Total") ) .orderBy( col("Total").desc() ) .limit(5)) top5ProductsOverall.show()
在此单元格中,我们按产品 ID 对前 5 个首选产品进行了分组,并汇总了过去 12 个月内购买的总项数,按降序对该值进行排序,并返回了前 5 个结果。 输出应类似如下所示:
+---------+-----+ |ProductId|Total| +---------+-----+ | 2107| 4538| | 4833| 4533| | 347| 4523| | 3459| 4233| | 4246| 4155| +---------+-----+
创建参数单元格
Azure Synapse 管道查找参数单元格,并将此单元格作为执行时传入的参数的默认单元格。 执行引擎将使用输入参数在参数单元格下面添加新的单元格,以覆盖默认值。 如果未指定参数单元格,则插入的单元格将插入笔记本的顶部。
我们将从管道执行此笔记本。 我们想要传入一个参数,该参数设置
runId
变量值,用于命名 Parquet 文件。 在新单元格中运行以下命令:import uuid # Generate random GUID runId = uuid.uuid4()
我们使用 Spark 随附的
uuid
库来生成随机 GUID。 我们希望使用管道传入的参数替代runId
变量。 为此,我们需要将其切换为参数单元格。选择单元格“(1)”右上角的操作省略号(...),然后选择“切换参数单元格(2)”。
切换此选项后,会在单元格上看到“参数”标记。
将以下代码粘贴到新单元格中,以将
runId
变量用作主数据湖帐户的/top5-products/
路径中的 Parquet 文件名。 将路径中的YOUR_DATALAKE_NAME
替换为主数据湖帐户名称。 为此,请向上滚动到页面“(1)”顶部的“单元格 1”。 从路径“(2)”复制数据湖存储帐户。 粘贴此值作为新单元格内路径“(3)”中YOUR_DATALAKE_NAME
的替换值,然后在该单元格内运行命令。%%pyspark top5ProductsOverall.write.parquet('abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top5-products/' + str(runId) + '.parquet')
验证是否已将文件写入数据湖。 前往“数据”中心,然后选择“链接”选项卡“(1)”。 展开主数据湖存储帐户,并选择“wwi-02”容器 (2)。 前往“top5-products”文件夹 (3)。 应会在目录中看到 Parquet 文件的文件夹,其中 GUID 为文件名 (4)。
笔记本单元格中数据帧上的 Parquet 写入方法创建了此目录,因为它以前不存在。
将笔记本添加到 Synapse 管道
返回到我们在练习开始时描述的映射数据流,假设你想要在数据流作为业务流程过程的一部分运行后执行此笔记本。 为此,请将此笔记本作为新的笔记本活动添加到管道。
返回到笔记本。 选择笔记本右上角的“属性”(1),然后为“名称(2)”输入
Calculate Top 5 Products
。选择笔记本右上角的“添加到管道”(1),然后选择“现有管道(2)”。
选择“将用户配置文件数据写入 ASA”管道 (1),然后选择“添加(2)”。
Synapse Studio 会将笔记本活动添加到管道。 重新排列“笔记本活动”,使其位于“数据流活动”右侧。 选择“数据流活动”,并将“成功”活动管道连接“绿盒”拖至“笔记本活动”。
“成功”活动箭头指示管道在数据流活动成功运行后执行笔记本活动。
选择“笔记本活动(1)”,选择“设置”选项卡 (2),展开“基本参数(3)”,然后选择“+ 新建(4)”。 在“名称”字段 (5) 中输入
runId
。 为“类型(6)”选择“字符串”。 对于“值”,选择“添加动态内容(7)”。选择“系统变量(1)”下的“管道运行 ID”。 这会将
@pipeline().RunId
添加到动态内容框 (2)。 选择“完成(3)”关闭对话框。管道运行 ID 值是分配给每个管道运行的唯一 GUID。 我们将此值用于 Parquet 文件的名称,方法是将此值作为
runId
笔记本参数传递。 然后,可以查看管道运行历史记录,并找到为每个管道运行创建的特定 Parquet 文件。选择“全部发布”,然后单击“发布”以保存更改。
发布完成后,选择“添加触发器(1)”,然后选择“立即触发(2)”以运行更新后的管道。
选择“确定”以运行触发器。
监视管道运行
监视中心使你可以监视 SQL、Apache Spark 和 Pipelines 的当前和历史活动。
转到“监视”选项卡。
选择“管道运行(1)”并等待管道运行成功完成 (2)。 可能需要刷新 (3) 视图。
选择管道名称,以查看管道的活动运行。
请注意“数据流”活动和新的“笔记本”活动 (1)。 记下“管道运行 ID”值 (2)。 我们会将其与笔记本生成的 Parquet 文件名进行比较。 选择“计算前 5 个产品”笔记本名称,查看其详细信息 (3)。
在这里,我们将看到笔记本运行详细信息。 你可以选择“播放”(1),通过“作业(2)”观看进度播放。 在底部,可以查看具有不同筛选选项 (3) 的“诊断”和“日志”。 在右侧,可以查看运行详细信息,例如持续时间、Livy ID、Spark 池详细信息等。 选择某项作业的“查看详细信息”链接,查看其详细信息 (5)。
Spark 应用程序 UI 将在新选项卡中打开,可在其中查看阶段详细信息。 展开“DAG 可视化效果”以查看阶段详细信息。
前往“数据”中心。
选择“链接”选项卡 (1),然后在主数据湖存储帐户上选择“wwi-02”容器 (2),前“top5-products”文件夹 (3),并验证 Parquet 文件存在一个文件夹,该文件夹名称与“管道运行 ID”相匹配。
如你所见,我们有一个文件,它的名称与前面提到的“管道运行 ID”相匹配:
这些值之所以匹配,是因为我们将管道运行 ID 传入笔记本活动的
runId
参数。