练习 - 将 Kafka 数据流式传输到 Jupyter Notebook 并为数据设定窗口

已完成

Kafka 群集正在将数据写入其日志,该操作可通过 Spark 结构化流处理。

Spark 笔记本包含在你克隆的示例中,因此你需要将该笔记本上传到 Spark 群集才能使用它。

将 Python 笔记本上传到 Spark 群集

  1. 在 Azure 门户中,单击“主页”>“HDInsight 群集”,然后选择刚刚创建的 Spark 群集(而不是 Kafka 群集)。

  2. 在“群集仪表板”窗格中,单击“Jupyter Notebook”。

    打开 Jupyter Notebook

  3. 提示输入凭据时,请输入管理员的用户名,并输入创建群集时创建的密码。 此时将显示 Jupyter 网站。

  4. 单击“PySpark”,然后在 PySpark 页中单击“上传”。

  5. 导航到你从 GitHub 下载示例的位置,选择 RealTimeStocks.ipynb 文件,依次单击“打开”、“上传”,然后在 Internet 浏览器中单击“刷新”。

  6. 笔记本上传到 PySpark 文件夹后,单击“RealTimeStocks.ipynb”在浏览器中打开笔记本。

  7. 将光标放置在笔记本的第一个单元格中,然后按 Shift+Enter,运行该单元格。

    如果“配置库和包”单元格显示“启动 Spark 应用程序”消息和如以下屏幕截图所示的其他信息,说明该单元格的操作已成功完成。

    在 Jupyter Notebook 中配置库

  8. 在“设置与 Kafka 的连接”单元格中的 .option("kafka.bootstrap.servers", "") 行中,在第二组引号间输入 Kafka 中转站。 例如, .option("kafka.bootstrap.servers", "wn0-kafka.mdlamldganads.gx.internal.cloudapp.net:9092"),然后单击“Shift+Enter”来运行单元格。

    “设置与 Kafka 的连接”单元格显示消息 inputDf: org.apache.spark.sql.DataFrame = [key: binary, value: binary ...5 more fields] 时,说明它已成功完成。 Spark 使用 readStream API 读取数据。

    设置与 Kafka 的连接

  9. 选择“从 Kafka 读取到流式数据帧”单元格,然后单击“Shift+Enter”来运行单元格。

    单元格显示以下消息时,说明它已成功完成:stockDf: org.apache.spark.sql.DataFrame = [symbol: string, time: string ...2 more fields]

    从 Kafka 读取到流式数据帧

  10. 选择“将流式数据帧输出到控制台”单元格,然后单击“Shift+Enter”运行单元格。

    当单元格显示类似下面的信息时,说明已成功完成。 输出显示在微批处理中传递的每个单元格的值,并且批处理每秒一次。

    将流式数据帧输出到控制台

  11. 选择“分时段的股票最小值/最大值”单元格,然后单击“Shift+Enter”运行单元格。

    当单元格在 4 秒的窗口(在单元格中定义)中为每只股票提供最高价格和最低价格时,说明已成功完成该单元格。 如上一个单元所述,使用 Spark 结构化流的一个好处是提供有关特定窗口的信息。

    使用最小值和最大值聚合函数的示例

  12. 选择窗口单元格中的“收集股票的所有值”,然后单击“Shift+Enter”来运行单元格。

    单元格提供一个股价表时,说它已成功完成。 outputMode 已完成,因此显示了所有数据。

    使用汇总聚合函数的示例

在本单元中,你已将 Jupyter Notebook 上传到 Spark 群集,将其连接到 Kafka 群集,将 Python 制作者文件创建的流数据输出到 Spark 笔记本,为流数据定义了窗口,并在该窗口中显示了高低股价,还在表中显示了股票的所有值。 恭喜,你已成功使用 Spark 和 Kafka 执行结构化流!