在 HDInsight 上使用 Apache Hive 和 Apache Hadoop 分析 X 數據
瞭解如何使用 Apache Hive 來處理 X 數據。 結果是 X 使用者的清單,這些使用者傳送了包含特定單字的最多推文。
重要
本文件中的步驟已在 HDInsight 3.6 上進行過測試。
取得資料
X 可讓您透過 REST API,將每個推文的數據擷取為 JavaScript 物件表示法 (JSON) 檔。 OAuth (英文)。
建立 X 應用程式
從網頁瀏覽器登入 https://developer.x.com。 如果您沒有 X 帳戶,請選取 [立即註冊] 連結。
選取建立新應用程式。
輸入 [名稱]、[說明]、[網站]。 您可以在 [網站] 欄位中自行設定 URL。 下表列出部分要使用的範例值:
欄位 值 名稱 MyHDInsightApp 描述 MyHDInsightApp 網站 https://www.myhdinsightapp.com
依序選取 [是,我同意] 和 [建立 Twitter 應用程式]。
選取 [權限] 索引標籤。預設權限為 [唯讀] 。
選取金鑰和存取權杖索引標籤。
選取建立我的存取權杖。
選取位於頁面右上角的 [測試 OAuth]。
記下消費者金鑰、消費者祕密、存取權杖和存取權杖祕密。
下載的推文
下列 Python 程式代碼會從 X 下載 10,000 條推文,並將其儲存至名為 tweets.txt 的檔案。
注意
由於已安裝 Python,下列步驟會在 HDInsight 叢集上執行。
使用 ssh 命令來連線到您的叢集。 編輯以下命令並將 CLUSTERNAME 取代為您叢集的名稱,然後輸入命令:
ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
使用下列命令安裝 Tweepy、Progressbar 和其他必要套件:
sudo apt install python-dev libffi-dev libssl-dev sudo apt remove python-openssl python -m pip install virtualenv mkdir gettweets cd gettweets virtualenv gettweets source gettweets/bin/activate pip install tweepy progressbar pyOpenSSL requests[security]
使用以下命令建立名為 gettweets.py 的檔案:
nano gettweets.py
以 X 應用程式的相關信息取代
Your consumer secret
、Your consumer key
、Your access token
和Your access token secret
,以編輯下列程式代碼。 然後貼上編輯完畢的程式碼作為 gettweets.py 檔案的內容。#!/usr/bin/python from tweepy import Stream, OAuthHandler from tweepy.streaming import StreamListener from progressbar import ProgressBar, Percentage, Bar import json import sys #X app information consumer_secret='Your consumer secret' consumer_key='Your consumer key' access_token='Your access token' access_token_secret='Your access token secret' #The number of tweets we want to get max_tweets=100 #Create the listener class that receives and saves tweets class listener(StreamListener): #On init, set the counter to zero and create a progress bar def __init__(self, api=None): self.num_tweets = 0 self.pbar = ProgressBar(widgets=[Percentage(), Bar()], maxval=max_tweets).start() #When data is received, do this def on_data(self, data): #Append the tweet to the 'tweets.txt' file with open('tweets.txt', 'a') as tweet_file: tweet_file.write(data) #Increment the number of tweets self.num_tweets += 1 #Check to see if we have hit max_tweets and exit if so if self.num_tweets >= max_tweets: self.pbar.finish() sys.exit(0) else: #increment the progress bar self.pbar.update(self.num_tweets) return True #Handle any errors that may occur def on_error(self, status): print status #Get the OAuth token auth = OAuthHandler(consumer_key, consumer_secret) auth.set_access_token(access_token, access_token_secret) #Use the listener class for stream processing twitterStream = Stream(auth, listener()) #Filter for these topics twitterStream.filter(track=["azure","cloud","hdinsight"])
提示
調整最後一行上的主題篩選條件,以追蹤熱門關鍵字。 使用您執行指令碼當時熱門的關鍵字,可以更快擷取資料。
依序按 Ctrl + X,然後 Y 儲存檔案。
使用以下命令執行檔案,並下載推文:
python gettweets.py
進度指示器隨即出現。 隨著推文的下載,其進度會推進到 100%。
注意
如果需要花費很長的時間來讓進度列往前移動,則您應該變更篩選來追蹤趨勢主題。 當您的篩選中有許多關於該主題的推文時,您就能快速取得所需的 100 則推文。
上傳資料
若要將資料下載到 HDInsight 儲存體,請使用下列命令:
hdfs dfs -mkdir -p /tutorials/x/data
hdfs dfs -put tweets.txt /tutorials/x/data/tweets.txt
這些命令會將資料儲存在叢集中的所有節點都能存取的位置。
執行 HiveQL 工作
使用以下命令建立包含 HiveQL 陳述式的檔案:
nano x.hql
使用下列文字做為檔案的內容:
set hive.exec.dynamic.partition = true; set hive.exec.dynamic.partition.mode = nonstrict; -- Drop table, if it exists DROP TABLE tweets_raw; -- Create it, pointing toward the tweets logged from X CREATE EXTERNAL TABLE tweets_raw ( json_response STRING ) STORED AS TEXTFILE LOCATION '/tutorials/x/data'; -- Drop and recreate the destination table DROP TABLE tweets; CREATE TABLE tweets ( id BIGINT, created_at STRING, created_at_date STRING, created_at_year STRING, created_at_month STRING, created_at_day STRING, created_at_time STRING, in_reply_to_user_id_str STRING, text STRING, contributors STRING, retweeted STRING, truncated STRING, coordinates STRING, source STRING, retweet_count INT, url STRING, hashtags array<STRING>, user_mentions array<STRING>, first_hashtag STRING, first_user_mention STRING, screen_name STRING, name STRING, followers_count INT, listed_count INT, friends_count INT, lang STRING, user_location STRING, time_zone STRING, profile_image_url STRING, json_response STRING ); -- Select tweets from the imported data, parse the JSON, -- and insert into the tweets table FROM tweets_raw INSERT OVERWRITE TABLE tweets SELECT cast(get_json_object(json_response, '$.id_str') as BIGINT), get_json_object(json_response, '$.created_at'), concat(substr (get_json_object(json_response, '$.created_at'),1,10),' ', substr (get_json_object(json_response, '$.created_at'),27,4)), substr (get_json_object(json_response, '$.created_at'),27,4), case substr (get_json_object(json_response, '$.created_at'),5,3) when "Jan" then "01" when "Feb" then "02" when "Mar" then "03" when "Apr" then "04" when "May" then "05" when "Jun" then "06" when "Jul" then "07" when "Aug" then "08" when "Sep" then "09" when "Oct" then "10" when "Nov" then "11" when "Dec" then "12" end, substr (get_json_object(json_response, '$.created_at'),9,2), substr (get_json_object(json_response, '$.created_at'),12,8), get_json_object(json_response, '$.in_reply_to_user_id_str'), get_json_object(json_response, '$.text'), get_json_object(json_response, '$.contributors'), get_json_object(json_response, '$.retweeted'), get_json_object(json_response, '$.truncated'), get_json_object(json_response, '$.coordinates'), get_json_object(json_response, '$.source'), cast (get_json_object(json_response, '$.retweet_count') as INT), get_json_object(json_response, '$.entities.display_url'), array( trim(lower(get_json_object(json_response, '$.entities.hashtags[0].text'))), trim(lower(get_json_object(json_response, '$.entities.hashtags[1].text'))), trim(lower(get_json_object(json_response, '$.entities.hashtags[2].text'))), trim(lower(get_json_object(json_response, '$.entities.hashtags[3].text'))), trim(lower(get_json_object(json_response, '$.entities.hashtags[4].text')))), array( trim(lower(get_json_object(json_response, '$.entities.user_mentions[0].screen_name'))), trim(lower(get_json_object(json_response, '$.entities.user_mentions[1].screen_name'))), trim(lower(get_json_object(json_response, '$.entities.user_mentions[2].screen_name'))), trim(lower(get_json_object(json_response, '$.entities.user_mentions[3].screen_name'))), trim(lower(get_json_object(json_response, '$.entities.user_mentions[4].screen_name')))), trim(lower(get_json_object(json_response, '$.entities.hashtags[0].text'))), trim(lower(get_json_object(json_response, '$.entities.user_mentions[0].screen_name'))), get_json_object(json_response, '$.user.screen_name'), get_json_object(json_response, '$.user.name'), cast (get_json_object(json_response, '$.user.followers_count') as INT), cast (get_json_object(json_response, '$.user.listed_count') as INT), cast (get_json_object(json_response, '$.user.friends_count') as INT), get_json_object(json_response, '$.user.lang'), get_json_object(json_response, '$.user.location'), get_json_object(json_response, '$.user.time_zone'), get_json_object(json_response, '$.user.profile_image_url'), json_response WHERE (length(json_response) > 500);
依序按 Ctrl + X,然後 Y 儲存檔案。
使用以下命令執行包含於檔案中的 HiveQL:
beeline -u 'jdbc:hive2://headnodehost:10001/;transportMode=http' -i x.hql
此命令會 執行 x.hql 檔案。 當查詢完成時,您會看到
jdbc:hive2//localhost:10001/>
提示字元。從 beeline 提示字元中,使用下列查詢來確認資料已匯入︰
SELECT name, screen_name, count(1) as cc FROM tweets WHERE text like "%Azure%" GROUP BY name,screen_name ORDER BY cc DESC LIMIT 10;
這個查詢會傳回最多 10 則訊息內容中包含文字 Azure 的推文。
注意
如果您變更了
gettweets.py
指令碼中的篩選條件,請將 Azure 取代為您使用的其中一個篩選條件。
下一步
您已經學會如何將非結構化的 JSON 資料集轉換成結構化的 Apache Hive 資料表。 若要深入了解 HDInsight 上的 Hive,請參閱下列文件: