演習 - Kafka データを Jupyter Notebook にストリーム配信し、データをウィンドウ処理する

完了

Kafka クラスターでは現在、そのログにデータを書き込んでおり、そのデータは Spark Structured Streaming を使用して処理できます。

複製したサンプルには Spark ノートブックが組み込まれているので、そのノートブックを使用するには、それを Spark クラスターにアップロードする必要があります。

Python ノートブックを Spark クラスターにアップロードする

  1. Azure portal で、[ホーム] > [HDInsight クラスター] の順にクリックしてから、先ほど作成した Spark クラスター (Kafka クラスターではありません) を選択します。

  2. [クラスター ダッシュボード] ペインで、[Jupyter Notebook] をクリックします。

    Jupyter Notebook を開く

  3. 資格情報の入力を求められたら、管理者のユーザー名を入力し、クラスターの作成時に作成したパスワードを入力します。 Jupyter の Web サイトが表示されます。

  4. [PySpark] をクリックしてから、[PySpark] ページの [アップロード] をクリックします。

  5. GitHub からサンプルをダウンロードした場所に移動し、RealTimeStocks.ipynb ファイルを選択してから、[開く]、[アップロード] の順にクリックした後、インターネット ブラウザーで [最新の情報に更新] をクリックします。

  6. ノートブックが PySpark フォルダーにアップロードされたら、[RealTimeStocks.ipynb] をクリックして、そのノートブックをブラウザーで開きます。

  7. ノートブックの最初のセルを実行します。そのためには、そのセル内にカーソルを置き、Shift+Enter キーを押してセルを実行します。

    次の画面のキャプションに示すように、[ライブラリとパッケージの構成] セルに "Spark アプリケーションを開始中" のメッセージと追加情報が表示されると、セルは正常に完了します。

    Jupyter Notebook でのライブラリの構成

  8. [Kafka への接続の設定] セルの .option("kafka.bootstrap.servers", "") 行で、2 番目の引用符セットの間に kafka ブローカーを入力します。 次に例を示します。 .option("kafka.bootstrap.servers", "wn0-kafka.mdlamldganads.gx.internal.cloudapp.net:9092")。次に、Shift + Enter キーを押してセルを実行します。

    [Kafka への接続の設定] セルに次のメッセージが表示されると、セルは正常に完了します。"inputDf: org.apache.spark.sql.DataFrame = [key: binary, value: binary ... 5 more fields]" Spark では、readStream API を使用してデータを読み取ります。

    Kafka への接続の設定

  9. [Kafka からストリーミング データフレームへの読み取り] セルを選択してから、Shift+Enter キーを押してセルを実行します。

    セルに次のメッセージが表示されると、セルは正常に完了します。"stockDf: org.apache.spark.sql.DataFrame = [symbol: string, time: string ... 2 more fields]"

    Kafka からストリーミング データフレームへの読み取り

  10. [コンソールへのストリーミング データフレームの出力] セルを選択してから、Shift+Enter キーを押してセルを実行します。

    セルに次のような情報が表示されると、セルは正常に完了します。 出力には、マイクロ バッチで渡された各セルの値が表示されます (1 秒につき 1 つのバッチ)。

    コンソールへのストリーミング データフレームの出力

  11. [ウィンドウ処理された株式の最安値と最高値] セルを選択してから、Shift+Enter キーを押してセルを実行します。

    セルに定義された 4 秒間ウィンドウで各株式の最高値と最安値が表示されると、セルは正常に完了します。 前のユニットで説明したように、特定の時間ウィンドウに関する情報を提供することは、Spark Structured Streaming を使用することで得られる利点の 1 つです。

    最小値と最大値の集計関数を使用した例

  12. [ウィンドウ内の株価のすべての値を収集する] セルを選択してから、Shift+Enter キーを押してセルを実行します。

    テーブル内の株式の価格表が表示されると、セルは正常に完了します。 OutputMode が完了すると、すべてのデータが表示されます。

    合計値の集計関数を使用した例

このユニットでは、Jupyter Notebook を Spark クラスターにアップロードし、それを Kafka クラスターに接続しました。また、Python プロデューサー ファイルによって作成されたストリーミング データを Spark ノートブックに出力しました。さらに、ストリーミング データ用のウィンドウを定義して、そのウィンドウに株価の高値と安値を表示し、テーブル内のすべての株価を表示しました。 これで、Spark と Kafka を使用した構造化ストリーミングが正常に実行されました。