Analizzare i dati X usando Apache Hive e Apache Hadoop in HDInsight
Informazioni su come usare Apache Hive per elaborare i dati X. Il risultato è un elenco di utenti X che hanno inviato la maggior parte dei tweet che contengono una determinata parola.
I passaggi descritti in questo documento sono stati testati in HDInsight 3.6.
Ottenere i dati
X consente di recuperare i dati per ogni tweet come documento JSON (JavaScript Object Notation) tramite un'API REST. OAuth .
Creare un'applicazione X
Da un Web browser accedere a Selezionare il collegamento Iscrizione ora se non si ha un account X.
Seleziona Crea nuova app.
Compilare i campi Name, Description, Website. Per il campo Website è possibile creare un URL fittizio. Nella tabella seguente vengono mostrati alcuni valori di esempio da usare:
Campo Valore Nome MyHDInsightApp Descrizione MyHDInsightApp Sito Web
Selezionare Sì, Accetto e quindi selezionare Crea l'applicazione Twitter.
Selezionare la scheda Autorizzazioni . L'autorizzazione predefinita è Sola lettura.
Seleziona la scheda Chiavi e token di accesso.
Seleziona Crea il mio token di accesso.
Selezionare Test OAuth nell'angolo superiore destro della pagina.
Compilare i campi Consumer key, Consumer secret, Access token e Access token secret.
Scaricare tweet
Il codice Python seguente scarica 10.000 tweet da X e li salva in un file denominato tweets.txt.
I passaggi indicati di seguito vengono eseguiti nel cluster HDInsight, poiché Python è già installato.
Usare il comando ssh per connettersi al cluster. Modificare il comando seguente sostituendo CLUSTERNAME con il nome del cluster in uso e quindi immettere il comando:
Usare i comandi seguenti per installare Tweepy, Progress bar e altri pacchetti necessari:
sudo apt install python-dev libffi-dev libssl-dev sudo apt remove python-openssl python -m pip install virtualenv mkdir gettweets cd gettweets virtualenv gettweets source gettweets/bin/activate pip install tweepy progressbar pyOpenSSL requests[security]
Usare il comando seguente per creare un file denominato
Modificare il codice seguente sostituendo
Your consumer secret
,Your consumer key
,Your access token
eYour access token secret
con le informazioni pertinenti dell'applicazione X. Incollare quindi il codice modificato come contenuto del file .#!/usr/bin/python from tweepy import Stream, OAuthHandler from tweepy.streaming import StreamListener from progressbar import ProgressBar, Percentage, Bar import json import sys #X app information consumer_secret='Your consumer secret' consumer_key='Your consumer key' access_token='Your access token' access_token_secret='Your access token secret' #The number of tweets we want to get max_tweets=100 #Create the listener class that receives and saves tweets class listener(StreamListener): #On init, set the counter to zero and create a progress bar def __init__(self, api=None): self.num_tweets = 0 self.pbar = ProgressBar(widgets=[Percentage(), Bar()], maxval=max_tweets).start() #When data is received, do this def on_data(self, data): #Append the tweet to the 'tweets.txt' file with open('tweets.txt', 'a') as tweet_file: tweet_file.write(data) #Increment the number of tweets self.num_tweets += 1 #Check to see if we have hit max_tweets and exit if so if self.num_tweets >= max_tweets: self.pbar.finish() sys.exit(0) else: #increment the progress bar self.pbar.update(self.num_tweets) return True #Handle any errors that may occur def on_error(self, status): print status #Get the OAuth token auth = OAuthHandler(consumer_key, consumer_secret) auth.set_access_token(access_token, access_token_secret) #Use the listener class for stream processing twitterStream = Stream(auth, listener()) #Filter for these topics twitterStream.filter(track=["azure","cloud","hdinsight"])
Modificare il filtro di argomenti nell'ultima riga per tenere traccia delle parole chiave comuni. L'uso delle parole chiave più diffuse al momento dell'esecuzione dello script consente di acquisire più velocemente i dati.
Usare Ctrl + X, quindi premere Y per salvare il file.
Usare il comando seguente per aprire il file e scaricare i tweet:
Viene visualizzato un indicatore di stato che mostra un valore fino al 100% mentre vengono scaricati i tweet.
Se l'indicatore di stato impiega molto tempo per avanzare, è necessario modificare il filtro per monitorare gli argomenti di tendenza. Quando sono presenti molti tweet sull'argomento nel filtro, è possibile ottenere rapidamente i 100 tweet necessari.
Caricare i dati
Usare i comandi seguenti per aggiornare i dati nell'archiviazione HDInsight:
hdfs dfs -mkdir -p /tutorials/x/data
hdfs dfs -put tweets.txt /tutorials/x/data/tweets.txt
Questi comandi archiviano i dati in un percorso accessibile a tutti i nodi del cluster.
Eseguire il processo di HiveQL
Usare il comando seguente per creare un file contenente istruzioni HiveQL:
nano x.hql
Usare il testo seguente come contenuto del file:
set hive.exec.dynamic.partition = true; set hive.exec.dynamic.partition.mode = nonstrict; -- Drop table, if it exists DROP TABLE tweets_raw; -- Create it, pointing toward the tweets logged from X CREATE EXTERNAL TABLE tweets_raw ( json_response STRING ) STORED AS TEXTFILE LOCATION '/tutorials/x/data'; -- Drop and recreate the destination table DROP TABLE tweets; CREATE TABLE tweets ( id BIGINT, created_at STRING, created_at_date STRING, created_at_year STRING, created_at_month STRING, created_at_day STRING, created_at_time STRING, in_reply_to_user_id_str STRING, text STRING, contributors STRING, retweeted STRING, truncated STRING, coordinates STRING, source STRING, retweet_count INT, url STRING, hashtags array<STRING>, user_mentions array<STRING>, first_hashtag STRING, first_user_mention STRING, screen_name STRING, name STRING, followers_count INT, listed_count INT, friends_count INT, lang STRING, user_location STRING, time_zone STRING, profile_image_url STRING, json_response STRING ); -- Select tweets from the imported data, parse the JSON, -- and insert into the tweets table FROM tweets_raw INSERT OVERWRITE TABLE tweets SELECT cast(get_json_object(json_response, '$.id_str') as BIGINT), get_json_object(json_response, '$.created_at'), concat(substr (get_json_object(json_response, '$.created_at'),1,10),' ', substr (get_json_object(json_response, '$.created_at'),27,4)), substr (get_json_object(json_response, '$.created_at'),27,4), case substr (get_json_object(json_response, '$.created_at'),5,3) when "Jan" then "01" when "Feb" then "02" when "Mar" then "03" when "Apr" then "04" when "May" then "05" when "Jun" then "06" when "Jul" then "07" when "Aug" then "08" when "Sep" then "09" when "Oct" then "10" when "Nov" then "11" when "Dec" then "12" end, substr (get_json_object(json_response, '$.created_at'),9,2), substr (get_json_object(json_response, '$.created_at'),12,8), get_json_object(json_response, '$.in_reply_to_user_id_str'), get_json_object(json_response, '$.text'), get_json_object(json_response, '$.contributors'), get_json_object(json_response, '$.retweeted'), get_json_object(json_response, '$.truncated'), get_json_object(json_response, '$.coordinates'), get_json_object(json_response, '$.source'), cast (get_json_object(json_response, '$.retweet_count') as INT), get_json_object(json_response, '$.entities.display_url'), array( trim(lower(get_json_object(json_response, '$.entities.hashtags[0].text'))), trim(lower(get_json_object(json_response, '$.entities.hashtags[1].text'))), trim(lower(get_json_object(json_response, '$.entities.hashtags[2].text'))), trim(lower(get_json_object(json_response, '$.entities.hashtags[3].text'))), trim(lower(get_json_object(json_response, '$.entities.hashtags[4].text')))), array( trim(lower(get_json_object(json_response, '$.entities.user_mentions[0].screen_name'))), trim(lower(get_json_object(json_response, '$.entities.user_mentions[1].screen_name'))), trim(lower(get_json_object(json_response, '$.entities.user_mentions[2].screen_name'))), trim(lower(get_json_object(json_response, '$.entities.user_mentions[3].screen_name'))), trim(lower(get_json_object(json_response, '$.entities.user_mentions[4].screen_name')))), trim(lower(get_json_object(json_response, '$.entities.hashtags[0].text'))), trim(lower(get_json_object(json_response, '$.entities.user_mentions[0].screen_name'))), get_json_object(json_response, '$.user.screen_name'), get_json_object(json_response, '$'), cast (get_json_object(json_response, '$.user.followers_count') as INT), cast (get_json_object(json_response, '$.user.listed_count') as INT), cast (get_json_object(json_response, '$.user.friends_count') as INT), get_json_object(json_response, '$.user.lang'), get_json_object(json_response, '$.user.location'), get_json_object(json_response, '$.user.time_zone'), get_json_object(json_response, '$.user.profile_image_url'), json_response WHERE (length(json_response) > 500);
Premere Ctrl + X, quindi Y per salvare il file.
Usare il comando riportato di seguito per eseguire lo script HiveQL contenuto nel file:
beeline -u 'jdbc:hive2://headnodehost:10001/;transportMode=http' -i x.hql
Questo comando esegue il file x.hql . Dopo il completamento della query, viene visualizzato un prompt
.Dal prompt dei comandi Beeline usare la query seguente per verificare che i dati siano stati importati:
SELECT name, screen_name, count(1) as cc FROM tweets WHERE text like "%Azure%" GROUP BY name,screen_name ORDER BY cc DESC LIMIT 10;
Viene restituito un massimo di 10 tweet contenenti la parola Azure nel testo del messaggio.
Se è stato modificato il filtro nello script
, sostituire Azure con uno dei filtri usati.
Passaggi successivi
Si è appreso come trasformare un set di dati JSON non strutturato in una tabella Apache Hive strutturata. Per altre informazioni su Hive in HDInsight, vedere i documenti seguenti: