Sdílet prostřednictvím


Analýza dat X pomocí Apache Hivu a Apache Hadoopu ve službě HDInsight

Naučte se používat Apache Hive ke zpracování dat X. Výsledkem je seznam uživatelů X, kteří odeslali nejvíce tweetů, které obsahují určité slovo.

Důležité

Kroky v tomto dokumentu byly testovány ve službě HDInsight 3.6.

Získání dat

X umožňuje načíst data pro každý tweet jako dokument JSON (JavaScript Object Notation) prostřednictvím rozhraní REST API. Pro ověřování v rozhraní API se vyžaduje OAuth .

Vytvoření aplikace X

  1. Ve webovém prohlížeči se přihlaste k https://developer.x.com. Pokud nemáte účet X, vyberte odkaz Pro registraci.

  2. Vyberte položku Vytvořit novou aplikaci.

  3. Zadejte název, popis, web. Můžete vytvořit adresu URL pole Web . Následující tabulka uvádí několik ukázkových hodnot, které se mají použít:

    Pole Hodnota
    Name MyHDInsightApp
    Popis MyHDInsightApp
    Web https://www.myhdinsightapp.com
  4. Vyberte Ano, souhlasím a pak vyberte Vytvořit aplikaci Twitter.

  5. Vyberte kartu Oprávnění. Výchozí oprávnění je jen pro čtení.

  6. Vyberte kartu Klíče a přístupové tokeny.

  7. Vyberte Vytvořit můj přístupový token.

  8. V pravém horním rohu stránky vyberte Test OAuth .

  9. Poznamenejte si uživatelský klíč, tajný klíč příjemce, přístupový token a tajný klíč přístupového tokenu.

Stažení tweetů

Následující kód Pythonu stáhne z X 10 000 tweetů a uloží je do souboru s názvem tweets.txt.

Poznámka:

V clusteru HDInsight se provádějí následující kroky, protože Python je už nainstalovaný.

  1. Pomocí příkazu ssh se připojte ke clusteru. Upravte následující příkaz nahrazením clusteru názvem clusteru a zadáním příkazu:

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  2. K instalaci tweepy, indikátoru průběhu a dalších požadovaných balíčků použijte následující příkazy:

    sudo apt install python-dev libffi-dev libssl-dev
    sudo apt remove python-openssl
    python -m pip install virtualenv
    mkdir gettweets
    cd gettweets
    virtualenv gettweets
    source gettweets/bin/activate
    pip install tweepy progressbar pyOpenSSL requests[security]
    
  3. Pomocí následujícího příkazu vytvořte soubor s názvem gettweets.py:

    nano gettweets.py
    
  4. Upravte níže uvedený kód nahrazením Your consumer secret, Your access tokenYour consumer keya Your access token secret odpovídajícími informacemi z vaší aplikace X. Potom vložte upravený kód jako obsah souboru gettweets.py .

    #!/usr/bin/python
    
    from tweepy import Stream, OAuthHandler
    from tweepy.streaming import StreamListener
    from progressbar import ProgressBar, Percentage, Bar
    import json
    import sys
    
    #X app information
    consumer_secret='Your consumer secret'
    consumer_key='Your consumer key'
    access_token='Your access token'
    access_token_secret='Your access token secret'
    
    #The number of tweets we want to get
    max_tweets=100
    
    #Create the listener class that receives and saves tweets
    class listener(StreamListener):
        #On init, set the counter to zero and create a progress bar
        def __init__(self, api=None):
            self.num_tweets = 0
            self.pbar = ProgressBar(widgets=[Percentage(), Bar()], maxval=max_tweets).start()
    
        #When data is received, do this
        def on_data(self, data):
            #Append the tweet to the 'tweets.txt' file
            with open('tweets.txt', 'a') as tweet_file:
                tweet_file.write(data)
                #Increment the number of tweets
                self.num_tweets += 1
                #Check to see if we have hit max_tweets and exit if so
                if self.num_tweets >= max_tweets:
                    self.pbar.finish()
                    sys.exit(0)
                else:
                    #increment the progress bar
                    self.pbar.update(self.num_tweets)
            return True
    
        #Handle any errors that may occur
        def on_error(self, status):
            print status
    
    #Get the OAuth token
    auth = OAuthHandler(consumer_key, consumer_secret)
    auth.set_access_token(access_token, access_token_secret)
    #Use the listener class for stream processing
    twitterStream = Stream(auth, listener())
    #Filter for these topics
    twitterStream.filter(track=["azure","cloud","hdinsight"])
    

    Tip

    Upravte filtr témat na posledním řádku a sledujte oblíbená klíčová slova. Použití klíčových slov oblíbených v době, kdy skript spouštíte, umožňuje rychlejší zachycení dat.

  5. K uložení souboru použijte kombinaci kláves Ctrl+X a potom Y .

  6. Ke spuštění souboru a stažení tweetů použijte následující příkaz:

    python gettweets.py
    

    Zobrazí se indikátor průběhu. Při stahování tweetů se počítá až 100 %.

    Poznámka:

    Pokud přechod na indikátor průběhu trvá příliš dlouho, měli byste změnit filtr tak, aby sledoval populární témata. Pokud filtr obsahuje mnoho tweetů o tématu, můžete rychle získat 100 tweetů, které potřebujete.

Nahrání dat

K nahrání dat do úložiště HDInsight použijte následující příkazy:

hdfs dfs -mkdir -p /tutorials/x/data
hdfs dfs -put tweets.txt /tutorials/x/data/tweets.txt

Tyto příkazy ukládají data do umístění, ke kterému mají přístup všechny uzly v clusteru.

Spuštění úlohy HiveQL

  1. Pomocí následujícího příkazu vytvořte soubor obsahující příkazy HiveQL :

    nano x.hql
    

    Jako obsah souboru použijte následující text:

    set hive.exec.dynamic.partition = true;
    set hive.exec.dynamic.partition.mode = nonstrict;
    -- Drop table, if it exists
    DROP TABLE tweets_raw;
    -- Create it, pointing toward the tweets logged from X
    CREATE EXTERNAL TABLE tweets_raw (
        json_response STRING
    )
    STORED AS TEXTFILE LOCATION '/tutorials/x/data';
    -- Drop and recreate the destination table
    DROP TABLE tweets;
    CREATE TABLE tweets
    (
        id BIGINT,
        created_at STRING,
        created_at_date STRING,
        created_at_year STRING,
        created_at_month STRING,
        created_at_day STRING,
        created_at_time STRING,
        in_reply_to_user_id_str STRING,
        text STRING,
        contributors STRING,
        retweeted STRING,
        truncated STRING,
        coordinates STRING,
        source STRING,
        retweet_count INT,
        url STRING,
        hashtags array<STRING>,
        user_mentions array<STRING>,
        first_hashtag STRING,
        first_user_mention STRING,
        screen_name STRING,
        name STRING,
        followers_count INT,
        listed_count INT,
        friends_count INT,
        lang STRING,
        user_location STRING,
        time_zone STRING,
        profile_image_url STRING,
        json_response STRING
    );
    -- Select tweets from the imported data, parse the JSON,
    -- and insert into the tweets table
    FROM tweets_raw
    INSERT OVERWRITE TABLE tweets
    SELECT
        cast(get_json_object(json_response, '$.id_str') as BIGINT),
        get_json_object(json_response, '$.created_at'),
        concat(substr (get_json_object(json_response, '$.created_at'),1,10),' ',
        substr (get_json_object(json_response, '$.created_at'),27,4)),
        substr (get_json_object(json_response, '$.created_at'),27,4),
        case substr (get_json_object(json_response,    '$.created_at'),5,3)
            when "Jan" then "01"
            when "Feb" then "02"
            when "Mar" then "03"
            when "Apr" then "04"
            when "May" then "05"
            when "Jun" then "06"
            when "Jul" then "07"
            when "Aug" then "08"
            when "Sep" then "09"
            when "Oct" then "10"
            when "Nov" then "11"
            when "Dec" then "12" end,
        substr (get_json_object(json_response, '$.created_at'),9,2),
        substr (get_json_object(json_response, '$.created_at'),12,8),
        get_json_object(json_response, '$.in_reply_to_user_id_str'),
        get_json_object(json_response, '$.text'),
        get_json_object(json_response, '$.contributors'),
        get_json_object(json_response, '$.retweeted'),
        get_json_object(json_response, '$.truncated'),
        get_json_object(json_response, '$.coordinates'),
        get_json_object(json_response, '$.source'),
        cast (get_json_object(json_response, '$.retweet_count') as INT),
        get_json_object(json_response, '$.entities.display_url'),
        array(
            trim(lower(get_json_object(json_response, '$.entities.hashtags[0].text'))),
            trim(lower(get_json_object(json_response, '$.entities.hashtags[1].text'))),
            trim(lower(get_json_object(json_response, '$.entities.hashtags[2].text'))),
            trim(lower(get_json_object(json_response, '$.entities.hashtags[3].text'))),
            trim(lower(get_json_object(json_response, '$.entities.hashtags[4].text')))),
        array(
            trim(lower(get_json_object(json_response, '$.entities.user_mentions[0].screen_name'))),
            trim(lower(get_json_object(json_response, '$.entities.user_mentions[1].screen_name'))),
            trim(lower(get_json_object(json_response, '$.entities.user_mentions[2].screen_name'))),
            trim(lower(get_json_object(json_response, '$.entities.user_mentions[3].screen_name'))),
            trim(lower(get_json_object(json_response, '$.entities.user_mentions[4].screen_name')))),
        trim(lower(get_json_object(json_response, '$.entities.hashtags[0].text'))),
        trim(lower(get_json_object(json_response, '$.entities.user_mentions[0].screen_name'))),
        get_json_object(json_response, '$.user.screen_name'),
        get_json_object(json_response, '$.user.name'),
        cast (get_json_object(json_response, '$.user.followers_count') as INT),
        cast (get_json_object(json_response, '$.user.listed_count') as INT),
        cast (get_json_object(json_response, '$.user.friends_count') as INT),
        get_json_object(json_response, '$.user.lang'),
        get_json_object(json_response, '$.user.location'),
        get_json_object(json_response, '$.user.time_zone'),
        get_json_object(json_response, '$.user.profile_image_url'),
        json_response
    WHERE (length(json_response) > 500);
    
  2. Stisknutím kombinace kláves Ctrl+X uložte soubor stisknutím klávesy Y .

  3. Ke spuštění HiveQL obsaženého v souboru použijte následující příkaz:

    beeline -u 'jdbc:hive2://headnodehost:10001/;transportMode=http' -i x.hql
    

    Tento příkaz spustí soubor x.hql . Po dokončení dotazu se zobrazí jdbc:hive2//localhost:10001/> výzva.

  4. Z příkazového řádku beeline pomocí následujícího dotazu ověřte, že se data naimportovala:

    SELECT name, screen_name, count(1) as cc
    FROM tweets
    WHERE text like "%Azure%"
    GROUP BY name,screen_name
    ORDER BY cc DESC LIMIT 10;
    

    Tento dotaz vrátí maximálně 10 tweetů obsahujících slovo Azure v textu zprávy.

    Poznámka:

    Pokud jste filtr ve gettweets.py skriptu změnili, nahraďte Azure jedním z použitých filtrů.

Další kroky

Naučili jste se transformovat nestrukturovanou datovou sadu JSON na strukturovanou tabulku Apache Hive . Další informace o Hivu ve službě HDInsight najdete v následujících dokumentech: