Udostępnij za pośrednictwem


Analizowanie danych X przy użyciu technologii Apache Hive i Apache Hadoop w usłudze HDInsight

Dowiedz się, jak przetwarzać dane X przy użyciu programu Apache Hive . Wynikiem jest lista użytkowników X, którzy wysłali najwięcej tweetów zawierających określone słowo.

Ważne

Kroki opisane w tym dokumencie zostały przetestowane w usłudze HDInsight 3.6.

Pobieranie danych

Język X umożliwia pobieranie danych dla każdego tweetu jako dokumentu JavaScript Object Notation (JSON) za pośrednictwem interfejsu API REST. Protokół OAuth jest wymagany do uwierzytelniania w interfejsie API.

Tworzenie aplikacji X

  1. W przeglądarce internetowej zaloguj się do https://developer.x.comwitryny . Wybierz link Zarejestruj się teraz, jeśli nie masz konta X.

  2. Wybierz Utwórz nową aplikację

  3. Wprowadź nazwę, opis, witrynę internetową. Możesz utworzyć adres URL pola Witryna internetowa . W poniższej tabeli przedstawiono kilka przykładowych wartości do użycia:

    Pole Wartość
    Nazwisko MyHDInsightApp
    opis MyHDInsightApp
    Witryna internetowa https://www.myhdinsightapp.com
  4. Wybierz pozycję Tak, zgadzam się, a następnie wybierz pozycję Utwórz aplikację twitter.

  5. Wybierz kartę Uprawnienia . Domyślne uprawnienie to Tylko do odczytu.

  6. Wybierz kartę Klucze i tokeny dostępu.

  7. Wybierz opcję Utwórz token dostępu.

  8. Wybierz pozycję Testuj uwierzytelnianie OAuth w prawym górnym rogu strony.

  9. Zapisz klucz klienta, klucz tajny użytkownika, token dostępu i klucz tajny tokenu dostępu.

Pobieranie tweetów

Poniższy kod języka Python pobiera 10 000 tweetów z X i zapisuje je w pliku o nazwie tweets.txt.

Uwaga

Poniższe kroki są wykonywane w klastrze usługi HDInsight, ponieważ język Python jest już zainstalowany.

  1. Użyj polecenia ssh, aby nawiązać połączenie z klastrem. Zmodyfikuj poniższe polecenie, zastępując ciąg CLUSTERNAME nazwą klastra, a następnie wprowadź polecenie:

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  2. Użyj następujących poleceń, aby zainstalować tweepy, pasek postępu i inne wymagane pakiety:

    sudo apt install python-dev libffi-dev libssl-dev
    sudo apt remove python-openssl
    python -m pip install virtualenv
    mkdir gettweets
    cd gettweets
    virtualenv gettweets
    source gettweets/bin/activate
    pip install tweepy progressbar pyOpenSSL requests[security]
    
  3. Użyj następującego polecenia, aby utworzyć plik o nazwie gettweets.py:

    nano gettweets.py
    
  4. Zmodyfikuj poniższy kod, zastępując Your consumer secretciąg , Your consumer key, Your access tokeni Your access token secret odpowiednimi informacjami z aplikacji X. Następnie wklej edytowany kod jako zawartość pliku gettweets.py .

    #!/usr/bin/python
    
    from tweepy import Stream, OAuthHandler
    from tweepy.streaming import StreamListener
    from progressbar import ProgressBar, Percentage, Bar
    import json
    import sys
    
    #X app information
    consumer_secret='Your consumer secret'
    consumer_key='Your consumer key'
    access_token='Your access token'
    access_token_secret='Your access token secret'
    
    #The number of tweets we want to get
    max_tweets=100
    
    #Create the listener class that receives and saves tweets
    class listener(StreamListener):
        #On init, set the counter to zero and create a progress bar
        def __init__(self, api=None):
            self.num_tweets = 0
            self.pbar = ProgressBar(widgets=[Percentage(), Bar()], maxval=max_tweets).start()
    
        #When data is received, do this
        def on_data(self, data):
            #Append the tweet to the 'tweets.txt' file
            with open('tweets.txt', 'a') as tweet_file:
                tweet_file.write(data)
                #Increment the number of tweets
                self.num_tweets += 1
                #Check to see if we have hit max_tweets and exit if so
                if self.num_tweets >= max_tweets:
                    self.pbar.finish()
                    sys.exit(0)
                else:
                    #increment the progress bar
                    self.pbar.update(self.num_tweets)
            return True
    
        #Handle any errors that may occur
        def on_error(self, status):
            print status
    
    #Get the OAuth token
    auth = OAuthHandler(consumer_key, consumer_secret)
    auth.set_access_token(access_token, access_token_secret)
    #Use the listener class for stream processing
    twitterStream = Stream(auth, listener())
    #Filter for these topics
    twitterStream.filter(track=["azure","cloud","hdinsight"])
    

    Napiwek

    Dostosuj filtr tematów w ostatnim wierszu, aby śledzić popularne słowa kluczowe. Używanie słów kluczowych popularnych w czasie uruchamiania skryptu umożliwia szybsze przechwytywanie danych.

  5. Użyj Ctrl + X, a następnie Y , aby zapisać plik.

  6. Użyj następującego polecenia, aby uruchomić plik i pobrać tweety:

    python gettweets.py
    

    Zostanie wyświetlony wskaźnik postępu. Liczba tweetów wynosi do 100%, ponieważ są pobierane tweety.

    Uwaga

    Jeśli przejście paska postępu trwa długo, należy zmienić filtr, aby śledzić popularne tematy. Jeśli w filtrze znajduje się wiele tweetów dotyczących tematu, możesz szybko uzyskać 100 potrzebnych tweetów.

Przekazywanie danych

Aby przekazać dane do magazynu usługi HDInsight, użyj następujących poleceń:

hdfs dfs -mkdir -p /tutorials/x/data
hdfs dfs -put tweets.txt /tutorials/x/data/tweets.txt

Te polecenia przechowują dane w lokalizacji, do której mogą uzyskiwać dostęp wszystkie węzły w klastrze.

Uruchamianie zadania HiveQL

  1. Użyj następującego polecenia, aby utworzyć plik zawierający instrukcje HiveQL :

    nano x.hql
    

    Użyj następującego tekstu jako zawartości pliku:

    set hive.exec.dynamic.partition = true;
    set hive.exec.dynamic.partition.mode = nonstrict;
    -- Drop table, if it exists
    DROP TABLE tweets_raw;
    -- Create it, pointing toward the tweets logged from X
    CREATE EXTERNAL TABLE tweets_raw (
        json_response STRING
    )
    STORED AS TEXTFILE LOCATION '/tutorials/x/data';
    -- Drop and recreate the destination table
    DROP TABLE tweets;
    CREATE TABLE tweets
    (
        id BIGINT,
        created_at STRING,
        created_at_date STRING,
        created_at_year STRING,
        created_at_month STRING,
        created_at_day STRING,
        created_at_time STRING,
        in_reply_to_user_id_str STRING,
        text STRING,
        contributors STRING,
        retweeted STRING,
        truncated STRING,
        coordinates STRING,
        source STRING,
        retweet_count INT,
        url STRING,
        hashtags array<STRING>,
        user_mentions array<STRING>,
        first_hashtag STRING,
        first_user_mention STRING,
        screen_name STRING,
        name STRING,
        followers_count INT,
        listed_count INT,
        friends_count INT,
        lang STRING,
        user_location STRING,
        time_zone STRING,
        profile_image_url STRING,
        json_response STRING
    );
    -- Select tweets from the imported data, parse the JSON,
    -- and insert into the tweets table
    FROM tweets_raw
    INSERT OVERWRITE TABLE tweets
    SELECT
        cast(get_json_object(json_response, '$.id_str') as BIGINT),
        get_json_object(json_response, '$.created_at'),
        concat(substr (get_json_object(json_response, '$.created_at'),1,10),' ',
        substr (get_json_object(json_response, '$.created_at'),27,4)),
        substr (get_json_object(json_response, '$.created_at'),27,4),
        case substr (get_json_object(json_response,    '$.created_at'),5,3)
            when "Jan" then "01"
            when "Feb" then "02"
            when "Mar" then "03"
            when "Apr" then "04"
            when "May" then "05"
            when "Jun" then "06"
            when "Jul" then "07"
            when "Aug" then "08"
            when "Sep" then "09"
            when "Oct" then "10"
            when "Nov" then "11"
            when "Dec" then "12" end,
        substr (get_json_object(json_response, '$.created_at'),9,2),
        substr (get_json_object(json_response, '$.created_at'),12,8),
        get_json_object(json_response, '$.in_reply_to_user_id_str'),
        get_json_object(json_response, '$.text'),
        get_json_object(json_response, '$.contributors'),
        get_json_object(json_response, '$.retweeted'),
        get_json_object(json_response, '$.truncated'),
        get_json_object(json_response, '$.coordinates'),
        get_json_object(json_response, '$.source'),
        cast (get_json_object(json_response, '$.retweet_count') as INT),
        get_json_object(json_response, '$.entities.display_url'),
        array(
            trim(lower(get_json_object(json_response, '$.entities.hashtags[0].text'))),
            trim(lower(get_json_object(json_response, '$.entities.hashtags[1].text'))),
            trim(lower(get_json_object(json_response, '$.entities.hashtags[2].text'))),
            trim(lower(get_json_object(json_response, '$.entities.hashtags[3].text'))),
            trim(lower(get_json_object(json_response, '$.entities.hashtags[4].text')))),
        array(
            trim(lower(get_json_object(json_response, '$.entities.user_mentions[0].screen_name'))),
            trim(lower(get_json_object(json_response, '$.entities.user_mentions[1].screen_name'))),
            trim(lower(get_json_object(json_response, '$.entities.user_mentions[2].screen_name'))),
            trim(lower(get_json_object(json_response, '$.entities.user_mentions[3].screen_name'))),
            trim(lower(get_json_object(json_response, '$.entities.user_mentions[4].screen_name')))),
        trim(lower(get_json_object(json_response, '$.entities.hashtags[0].text'))),
        trim(lower(get_json_object(json_response, '$.entities.user_mentions[0].screen_name'))),
        get_json_object(json_response, '$.user.screen_name'),
        get_json_object(json_response, '$.user.name'),
        cast (get_json_object(json_response, '$.user.followers_count') as INT),
        cast (get_json_object(json_response, '$.user.listed_count') as INT),
        cast (get_json_object(json_response, '$.user.friends_count') as INT),
        get_json_object(json_response, '$.user.lang'),
        get_json_object(json_response, '$.user.location'),
        get_json_object(json_response, '$.user.time_zone'),
        get_json_object(json_response, '$.user.profile_image_url'),
        json_response
    WHERE (length(json_response) > 500);
    
  2. Naciśnij Ctrl + X, a następnie naciśnij Y , aby zapisać plik.

  3. Użyj następującego polecenia, aby uruchomić plik HiveQL zawarty w pliku:

    beeline -u 'jdbc:hive2://headnodehost:10001/;transportMode=http' -i x.hql
    

    To polecenie uruchamia plik x.hql . Po zakończeniu zapytania zostanie wyświetlony jdbc:hive2//localhost:10001/> monit.

  4. W wierszu polecenia beeline użyj następującego zapytania, aby sprawdzić, czy dane zostały zaimportowane:

    SELECT name, screen_name, count(1) as cc
    FROM tweets
    WHERE text like "%Azure%"
    GROUP BY name,screen_name
    ORDER BY cc DESC LIMIT 10;
    

    To zapytanie zwraca maksymalnie 10 tweetów zawierających słowo Azure w tekście wiadomości.

    Uwaga

    Jeśli filtr został zmieniony w skrypcie gettweets.py , zastąp platformę Azure jednym z użytych filtrów.

Następne kroki

Wiesz już, jak przekształcić nieustrukturyzowany zestaw danych JSON na ustrukturyzowaną tabelę Apache Hive . Aby dowiedzieć się więcej na temat programu Hive w usłudze HDInsight, zobacz następujące dokumenty: