Ćwiczenie — przesyłanie strumieniowe danych platformy Kafka do notesu Jupyter i okna danych

Ukończone

Klaster Platformy Kafka zapisuje teraz dane w dzienniku, które można przetworzyć za pośrednictwem przesyłania strumieniowego ze strukturą platformy Spark.

Notes platformy Spark jest uwzględniony w sklonowanym przykładzie, więc musisz przekazać ten notes do klastra Spark, aby go użyć.

Przekazywanie notesu języka Python do klastra Spark

  1. W witrynie Azure Portal kliknij pozycję Główne > klastry usługi HDInsight, a następnie wybierz właśnie utworzony klaster Spark (a nie klaster Platformy Kafka).

  2. W okienku Pulpity nawigacyjne klastra kliknij pozycję Notes Jupyter.

    Opening a Jupyter notebook

  3. Po wyświetleniu monitu o poświadczenia wprowadź nazwę użytkownika administratora i wprowadź hasło utworzone podczas tworzenia klastrów. Zostanie wyświetlona witryna internetowa Jupyter.

  4. Kliknij pozycję PySpark, a następnie na stronie PySpark kliknij pozycję Przekaż.

  5. Przejdź do lokalizacji, w której pobrano przykład z usługi GitHub, wybierz plik RealTimeStocks.ipynb, a następnie kliknij przycisk Otwórz, a następnie kliknij pozycję Przekaż, a następnie kliknij przycisk Odśwież w przeglądarce internetowej.

  6. Po przekazaniu notesu do folderu PySpark kliknij pozycję RealTimeStocks.ipynb, aby otworzyć notes w przeglądarce.

  7. Uruchom pierwszą komórkę w notesie, umieszczając kursor w komórce, a następnie klikając klawisze Shift+Enter, aby uruchomić komórkę.

    Komórka Konfigurowanie bibliotek i pakietów zakończy się pomyślnie po wyświetleniu komunikatu Uruchamianie aplikacji Spark i dodatkowych informacji, jak pokazano na poniższym ekranie podpis.

    Configuring libraries in a Jupyter notebook

  8. W komórce Set-up Połączenie ion to Kafka w wierszu .option("kafka.bootstrap.servers", "") wprowadź brokera platformy Kafka między drugim zestawem cudzysłowów. Na przykład .option("kafka.bootstrap.servers", "wn0-kafka.mdlamldganads.gx.internal.cloudapp.net:9092"), a następnie kliknij klawisze Shift+Enter, aby uruchomić komórkę.

    Konfiguracja Połączenie ion na platformie Kafka zostanie pomyślnie ukończona po wyświetleniu komunikatu inputDf: org.apache.spark.sql.DataFrame = [key: binary, value: binary ... 5 kolejnych pól]. Platforma Spark używa interfejsu API readStream do odczytywania danych.

    Set-up a connection to Kafka

  9. Wybierz komórkę Odczyt z platformy Kafka do ramki danych przesyłania strumieniowego, a następnie kliknij przycisk Shift+Enter, aby uruchomić komórkę.

    Komórka zostanie pomyślnie ukończona po wyświetleniu następującego komunikatu: stockDf: org.apache.spark.sql.DataFrame = [symbol: ciąg, czas: ciąg ... 2 więcej pól]

    Read from Kafka into Streaming Dataframe

  10. Wybierz komórkę Output Streaming Dataframe do konsoli , a następnie kliknij przycisk Shift+Enter, aby uruchomić komórkę.

    Komórka zostanie pomyślnie ukończona po wyświetleniu informacji podobnych do poniższych. Dane wyjściowe pokazują wartość każdej komórki, ponieważ została przekazana w mikrosadowej partii, a jedna partia na sekundę.

    Output a Streaming Dataframe to a Console

  11. Wybierz komórkę Minimalna/Maksymalna w oknie, a następnie kliknij przycisk Shift + Enter, aby uruchomić komórkę.

    Komórka kończy się pomyślnie, gdy zapewnia maksymalną i minimalną cenę dla każdej akcji w 4-sekundowym oknie zdefiniowanym w komórce. Jak wspomniano w poprzedniej lekcji, udostępnianie informacji o określonych oknach czasu jest jedną z korzyści, jakie zyskujesz przy użyciu przesyłania strumieniowego ze strukturą platformy Spark.

    An example of a using a minimum and maximum aggregate function

  12. Wybierz pozycję Zbierz wszystkie wartości dla akcji w komórce okna, a następnie kliknij pozycję Shift + Enter, aby uruchomić komórkę.

    Komórka zostanie pomyślnie ukończona, gdy zawiera tabelę wartości dla zapasów w tabeli. Moduł outputMode został ukończony, aby wyświetlić wszystkie dane.

    An example of a using a total aggregate function

W tej lekcji przekazano notes Jupyter do klastra Spark, połączono go z klastrem Platformy Kafka, wyprowadzono dane przesyłane strumieniowo utworzone przez plik producenta języka Python do notesu platformy Spark, zdefiniowano okno dla danych przesyłania strumieniowego i wyświetlono wysokie i niskie ceny zapasów w tym oknie oraz wyświetlono wszystkie wartości zapasów w tabeli. Gratulacje, pomyślnie wykonano przesyłanie strumieniowe ze strukturą przy użyciu platformy Spark i platformy Kafka!