Partilhar via


Analise dados X usando o Apache Hive e o Apache Hadoop no HDInsight

Saiba como usar o Apache Hive para processar dados X. O resultado é uma lista de X usuários que enviaram mais tweets que contêm uma determinada palavra.

Importante

As etapas deste documento foram testadas no HDInsight 3.6.

Obter os dados

X permite que você recupere os dados de cada tweet como um documento JSON (JavaScript Object Notation) por meio de uma API REST. O OAuth é necessário para autenticação na API.

Criar um aplicativo X

  1. Em um navegador da Web, entre no https://developer.x.com. Selecione o link Inscreva-se agora se não tiver uma conta X.

  2. Selecione Criar Nova Aplicação.

  3. Insira Nome, Descrição, Website. Você pode criar um URL para o campo Site . A tabela a seguir mostra alguns valores de exemplo a serem usados:

    Campo Valor
    Nome MyHDInsightApp
    Description MyHDInsightApp
    Site https://www.myhdinsightapp.com
  4. Selecione Sim, concordo e, em seguida, selecione Criar seu aplicativo do Twitter.

  5. Selecione a guia Permissões . A permissão padrão é Somente leitura.

  6. Selecione o separador Chaves e Tokens de Acesso .

  7. Selecione Criar o meu token de acesso.

  8. Selecione Testar OAuth no canto superior direito da página.

  9. Anote a chave do consumidor, o segredo do consumidor, o token de acesso e o segredo do token de acesso.

Baixar tweets

O código Python a seguir baixa 10.000 tweets do X e os salva em um arquivo chamado tweets.txt.

Nota

As etapas a seguir são executadas no cluster HDInsight, pois o Python já está instalado.

  1. Use o comando ssh para se conectar ao cluster. Edite o comando abaixo substituindo CLUSTERNAME pelo nome do cluster e digite o comando:

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  2. Use os seguintes comandos para instalar o Tweepy, a barra de progresso e outros pacotes necessários:

    sudo apt install python-dev libffi-dev libssl-dev
    sudo apt remove python-openssl
    python -m pip install virtualenv
    mkdir gettweets
    cd gettweets
    virtualenv gettweets
    source gettweets/bin/activate
    pip install tweepy progressbar pyOpenSSL requests[security]
    
  3. Use o seguinte comando para criar um arquivo chamado gettweets.py:

    nano gettweets.py
    
  4. Edite o código abaixo substituindo Your consumer secret, Your consumer key, Your access tokene Your access token secret com as informações relevantes do seu aplicativo X. Em seguida, cole o código editado como o conteúdo do arquivo gettweets.py.

    #!/usr/bin/python
    
    from tweepy import Stream, OAuthHandler
    from tweepy.streaming import StreamListener
    from progressbar import ProgressBar, Percentage, Bar
    import json
    import sys
    
    #X app information
    consumer_secret='Your consumer secret'
    consumer_key='Your consumer key'
    access_token='Your access token'
    access_token_secret='Your access token secret'
    
    #The number of tweets we want to get
    max_tweets=100
    
    #Create the listener class that receives and saves tweets
    class listener(StreamListener):
        #On init, set the counter to zero and create a progress bar
        def __init__(self, api=None):
            self.num_tweets = 0
            self.pbar = ProgressBar(widgets=[Percentage(), Bar()], maxval=max_tweets).start()
    
        #When data is received, do this
        def on_data(self, data):
            #Append the tweet to the 'tweets.txt' file
            with open('tweets.txt', 'a') as tweet_file:
                tweet_file.write(data)
                #Increment the number of tweets
                self.num_tweets += 1
                #Check to see if we have hit max_tweets and exit if so
                if self.num_tweets >= max_tweets:
                    self.pbar.finish()
                    sys.exit(0)
                else:
                    #increment the progress bar
                    self.pbar.update(self.num_tweets)
            return True
    
        #Handle any errors that may occur
        def on_error(self, status):
            print status
    
    #Get the OAuth token
    auth = OAuthHandler(consumer_key, consumer_secret)
    auth.set_access_token(access_token, access_token_secret)
    #Use the listener class for stream processing
    twitterStream = Stream(auth, listener())
    #Filter for these topics
    twitterStream.filter(track=["azure","cloud","hdinsight"])
    

    Gorjeta

    Ajuste o filtro de tópicos na última linha para rastrear palavras-chave populares. O uso de palavras-chave populares no momento em que você executa o script permite uma captura mais rápida de dados.

  5. Use Ctrl + X e, em seguida , Y para salvar o arquivo.

  6. Use o seguinte comando para executar o arquivo e baixar tweets:

    python gettweets.py
    

    É apresentado um indicador de progresso. Conta até 100% à medida que os tweets são descarregados.

    Nota

    Se estiver demorando muito para a barra de progresso avançar, você deve alterar o filtro para acompanhar os trending topics. Quando há muitos tweets sobre o tema em seu filtro, você pode obter rapidamente os 100 tweets necessários.

Carregue os dados

Para carregar os dados no armazenamento do HDInsight, use os seguintes comandos:

hdfs dfs -mkdir -p /tutorials/x/data
hdfs dfs -put tweets.txt /tutorials/x/data/tweets.txt

Esses comandos armazenam os dados em um local que todos os nós do cluster podem acessar.

Executar o trabalho HiveQL

  1. Use o seguinte comando para criar um arquivo contendo instruções HiveQL :

    nano x.hql
    

    Use o seguinte texto como o conteúdo do arquivo:

    set hive.exec.dynamic.partition = true;
    set hive.exec.dynamic.partition.mode = nonstrict;
    -- Drop table, if it exists
    DROP TABLE tweets_raw;
    -- Create it, pointing toward the tweets logged from X
    CREATE EXTERNAL TABLE tweets_raw (
        json_response STRING
    )
    STORED AS TEXTFILE LOCATION '/tutorials/x/data';
    -- Drop and recreate the destination table
    DROP TABLE tweets;
    CREATE TABLE tweets
    (
        id BIGINT,
        created_at STRING,
        created_at_date STRING,
        created_at_year STRING,
        created_at_month STRING,
        created_at_day STRING,
        created_at_time STRING,
        in_reply_to_user_id_str STRING,
        text STRING,
        contributors STRING,
        retweeted STRING,
        truncated STRING,
        coordinates STRING,
        source STRING,
        retweet_count INT,
        url STRING,
        hashtags array<STRING>,
        user_mentions array<STRING>,
        first_hashtag STRING,
        first_user_mention STRING,
        screen_name STRING,
        name STRING,
        followers_count INT,
        listed_count INT,
        friends_count INT,
        lang STRING,
        user_location STRING,
        time_zone STRING,
        profile_image_url STRING,
        json_response STRING
    );
    -- Select tweets from the imported data, parse the JSON,
    -- and insert into the tweets table
    FROM tweets_raw
    INSERT OVERWRITE TABLE tweets
    SELECT
        cast(get_json_object(json_response, '$.id_str') as BIGINT),
        get_json_object(json_response, '$.created_at'),
        concat(substr (get_json_object(json_response, '$.created_at'),1,10),' ',
        substr (get_json_object(json_response, '$.created_at'),27,4)),
        substr (get_json_object(json_response, '$.created_at'),27,4),
        case substr (get_json_object(json_response,    '$.created_at'),5,3)
            when "Jan" then "01"
            when "Feb" then "02"
            when "Mar" then "03"
            when "Apr" then "04"
            when "May" then "05"
            when "Jun" then "06"
            when "Jul" then "07"
            when "Aug" then "08"
            when "Sep" then "09"
            when "Oct" then "10"
            when "Nov" then "11"
            when "Dec" then "12" end,
        substr (get_json_object(json_response, '$.created_at'),9,2),
        substr (get_json_object(json_response, '$.created_at'),12,8),
        get_json_object(json_response, '$.in_reply_to_user_id_str'),
        get_json_object(json_response, '$.text'),
        get_json_object(json_response, '$.contributors'),
        get_json_object(json_response, '$.retweeted'),
        get_json_object(json_response, '$.truncated'),
        get_json_object(json_response, '$.coordinates'),
        get_json_object(json_response, '$.source'),
        cast (get_json_object(json_response, '$.retweet_count') as INT),
        get_json_object(json_response, '$.entities.display_url'),
        array(
            trim(lower(get_json_object(json_response, '$.entities.hashtags[0].text'))),
            trim(lower(get_json_object(json_response, '$.entities.hashtags[1].text'))),
            trim(lower(get_json_object(json_response, '$.entities.hashtags[2].text'))),
            trim(lower(get_json_object(json_response, '$.entities.hashtags[3].text'))),
            trim(lower(get_json_object(json_response, '$.entities.hashtags[4].text')))),
        array(
            trim(lower(get_json_object(json_response, '$.entities.user_mentions[0].screen_name'))),
            trim(lower(get_json_object(json_response, '$.entities.user_mentions[1].screen_name'))),
            trim(lower(get_json_object(json_response, '$.entities.user_mentions[2].screen_name'))),
            trim(lower(get_json_object(json_response, '$.entities.user_mentions[3].screen_name'))),
            trim(lower(get_json_object(json_response, '$.entities.user_mentions[4].screen_name')))),
        trim(lower(get_json_object(json_response, '$.entities.hashtags[0].text'))),
        trim(lower(get_json_object(json_response, '$.entities.user_mentions[0].screen_name'))),
        get_json_object(json_response, '$.user.screen_name'),
        get_json_object(json_response, '$.user.name'),
        cast (get_json_object(json_response, '$.user.followers_count') as INT),
        cast (get_json_object(json_response, '$.user.listed_count') as INT),
        cast (get_json_object(json_response, '$.user.friends_count') as INT),
        get_json_object(json_response, '$.user.lang'),
        get_json_object(json_response, '$.user.location'),
        get_json_object(json_response, '$.user.time_zone'),
        get_json_object(json_response, '$.user.profile_image_url'),
        json_response
    WHERE (length(json_response) > 500);
    
  2. Prima Ctrl + X e, em seguida, prima Y para guardar o ficheiro.

  3. Use o seguinte comando para executar o HiveQL contido no arquivo:

    beeline -u 'jdbc:hive2://headnodehost:10001/;transportMode=http' -i x.hql
    

    Este comando executa o arquivo x.hql . Quando a consulta for concluída, você verá um jdbc:hive2//localhost:10001/> prompt.

  4. No prompt beeline, use a seguinte consulta para verificar se os dados foram importados:

    SELECT name, screen_name, count(1) as cc
    FROM tweets
    WHERE text like "%Azure%"
    GROUP BY name,screen_name
    ORDER BY cc DESC LIMIT 10;
    

    Essa consulta retorna um máximo de 10 tweets que contêm a palavra Azure no texto da mensagem.

    Nota

    Se você alterou o filtro no script, substitua gettweets.py o Azure por um dos filtros usados.

Próximos passos

Você aprendeu como transformar um conjunto de dados JSON não estruturado em uma tabela estruturada do Apache Hive . Para saber mais sobre o Hive no HDInsight, consulte os seguintes documentos: