在 HDInsight 上使用 Apache Hive 與 Apache Hadoop 分析 Twitter 資料

了解如何使用 Apache Hive 來處理 Twitter 資料。 結果是一份傳送了最多包含特定文字之推文的 Twitter 使用者清單。

重要

本文件中的步驟已在 HDInsight 3.6 上進行過測試。

取得資料

Twitter 可讓您透過 REST API 抓取每則推文資料,作為 JavaScript 物件標記法 (JSON)。 OAuth (英文)。

建立 Twitter 應用程式

  1. 從網頁瀏覽器登入 https://developer.twitter.com。 如果您沒有 Twitter 帳戶,請選取 [立即註冊] 連結。

  2. 選取建立新應用程式

  3. 輸入 [名稱]、[說明]、[網站]。 您可以在 [網站] 欄位中自行設定 URL。 下表列出部分要使用的範例值:

    欄位
    名稱 MyHDInsightApp
    描述 MyHDInsightApp
    網站 https://www.myhdinsightapp.com
  4. 依序選取 [是,我同意] 和 [建立 Twitter 應用程式]

  5. 選取 [權限] 索引標籤。預設權限為 [唯讀]

  6. 選取金鑰和存取權杖索引標籤。

  7. 選取建立我的存取權杖

  8. 選取位於頁面右上角的 [測試 OAuth]

  9. 記下消費者金鑰消費者祕密存取權杖存取權杖祕密

下載的推文

下列 Python 程式碼會從 Twitter 下載 10,000 則推文,並儲存到名為 tweets.txt的檔案。

注意

由於已安裝 Python,下列步驟會在 HDInsight 叢集上執行。

  1. 使用 ssh 命令來連線到您的叢集。 編輯以下命令並將 CLUSTERNAME 取代為您叢集的名稱,然後輸入命令:

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  2. 使用下列命令安裝 TweepyProgressbar 和其他必要套件:

    sudo apt install python-dev libffi-dev libssl-dev
    sudo apt remove python-openssl
    python -m pip install virtualenv
    mkdir gettweets
    cd gettweets
    virtualenv gettweets
    source gettweets/bin/activate
    pip install tweepy progressbar pyOpenSSL requests[security]
    
  3. 使用以下命令建立名為 gettweets.py 的檔案:

    nano gettweets.py
    
  4. 編輯下列程式碼,將 Your consumer secretYour consumer keyYour access tokenYour access token secret 改為您 Twitter 應用程式中的相關資訊, 然後貼上編輯完畢的程式碼作為 gettweets.py 檔案的內容。

    #!/usr/bin/python
    
    from tweepy import Stream, OAuthHandler
    from tweepy.streaming import StreamListener
    from progressbar import ProgressBar, Percentage, Bar
    import json
    import sys
    
    #Twitter app information
    consumer_secret='Your consumer secret'
    consumer_key='Your consumer key'
    access_token='Your access token'
    access_token_secret='Your access token secret'
    
    #The number of tweets we want to get
    max_tweets=100
    
    #Create the listener class that receives and saves tweets
    class listener(StreamListener):
        #On init, set the counter to zero and create a progress bar
        def __init__(self, api=None):
            self.num_tweets = 0
            self.pbar = ProgressBar(widgets=[Percentage(), Bar()], maxval=max_tweets).start()
    
        #When data is received, do this
        def on_data(self, data):
            #Append the tweet to the 'tweets.txt' file
            with open('tweets.txt', 'a') as tweet_file:
                tweet_file.write(data)
                #Increment the number of tweets
                self.num_tweets += 1
                #Check to see if we have hit max_tweets and exit if so
                if self.num_tweets >= max_tweets:
                    self.pbar.finish()
                    sys.exit(0)
                else:
                    #increment the progress bar
                    self.pbar.update(self.num_tweets)
            return True
    
        #Handle any errors that may occur
        def on_error(self, status):
            print status
    
    #Get the OAuth token
    auth = OAuthHandler(consumer_key, consumer_secret)
    auth.set_access_token(access_token, access_token_secret)
    #Use the listener class for stream processing
    twitterStream = Stream(auth, listener())
    #Filter for these topics
    twitterStream.filter(track=["azure","cloud","hdinsight"])
    

    提示

    調整最後一行上的主題篩選條件,以追蹤熱門關鍵字。 使用您執行指令碼當時熱門的關鍵字,可以更快擷取資料。

  5. 依序按 Ctrl + X,然後 Y 儲存檔案。

  6. 使用以下命令執行檔案,並下載推文:

    python gettweets.py
    

    進度指示器隨即出現。 隨著推文的下載,其進度會推進到 100%。

    注意

    如果需要花費很長的時間來讓進度列往前移動,則您應該變更篩選來追蹤趨勢主題。 當您的篩選中有許多關於該主題的推文時,您就能快速取得所需的 100 則推文。

上傳資料

若要將資料下載到 HDInsight 儲存體,請使用下列命令:

hdfs dfs -mkdir -p /tutorials/twitter/data
hdfs dfs -put tweets.txt /tutorials/twitter/data/tweets.txt

這些命令會將資料儲存在叢集中的所有節點都能存取的位置。

執行 HiveQL 工作

  1. 使用以下命令建立包含 HiveQL 陳述式的檔案:

    nano twitter.hql
    

    使用下列文字做為檔案的內容:

    set hive.exec.dynamic.partition = true;
    set hive.exec.dynamic.partition.mode = nonstrict;
    -- Drop table, if it exists
    DROP TABLE tweets_raw;
    -- Create it, pointing toward the tweets logged from Twitter
    CREATE EXTERNAL TABLE tweets_raw (
        json_response STRING
    )
    STORED AS TEXTFILE LOCATION '/tutorials/twitter/data';
    -- Drop and recreate the destination table
    DROP TABLE tweets;
    CREATE TABLE tweets
    (
        id BIGINT,
        created_at STRING,
        created_at_date STRING,
        created_at_year STRING,
        created_at_month STRING,
        created_at_day STRING,
        created_at_time STRING,
        in_reply_to_user_id_str STRING,
        text STRING,
        contributors STRING,
        retweeted STRING,
        truncated STRING,
        coordinates STRING,
        source STRING,
        retweet_count INT,
        url STRING,
        hashtags array<STRING>,
        user_mentions array<STRING>,
        first_hashtag STRING,
        first_user_mention STRING,
        screen_name STRING,
        name STRING,
        followers_count INT,
        listed_count INT,
        friends_count INT,
        lang STRING,
        user_location STRING,
        time_zone STRING,
        profile_image_url STRING,
        json_response STRING
    );
    -- Select tweets from the imported data, parse the JSON,
    -- and insert into the tweets table
    FROM tweets_raw
    INSERT OVERWRITE TABLE tweets
    SELECT
        cast(get_json_object(json_response, '$.id_str') as BIGINT),
        get_json_object(json_response, '$.created_at'),
        concat(substr (get_json_object(json_response, '$.created_at'),1,10),' ',
        substr (get_json_object(json_response, '$.created_at'),27,4)),
        substr (get_json_object(json_response, '$.created_at'),27,4),
        case substr (get_json_object(json_response,    '$.created_at'),5,3)
            when "Jan" then "01"
            when "Feb" then "02"
            when "Mar" then "03"
            when "Apr" then "04"
            when "May" then "05"
            when "Jun" then "06"
            when "Jul" then "07"
            when "Aug" then "08"
            when "Sep" then "09"
            when "Oct" then "10"
            when "Nov" then "11"
            when "Dec" then "12" end,
        substr (get_json_object(json_response, '$.created_at'),9,2),
        substr (get_json_object(json_response, '$.created_at'),12,8),
        get_json_object(json_response, '$.in_reply_to_user_id_str'),
        get_json_object(json_response, '$.text'),
        get_json_object(json_response, '$.contributors'),
        get_json_object(json_response, '$.retweeted'),
        get_json_object(json_response, '$.truncated'),
        get_json_object(json_response, '$.coordinates'),
        get_json_object(json_response, '$.source'),
        cast (get_json_object(json_response, '$.retweet_count') as INT),
        get_json_object(json_response, '$.entities.display_url'),
        array(
            trim(lower(get_json_object(json_response, '$.entities.hashtags[0].text'))),
            trim(lower(get_json_object(json_response, '$.entities.hashtags[1].text'))),
            trim(lower(get_json_object(json_response, '$.entities.hashtags[2].text'))),
            trim(lower(get_json_object(json_response, '$.entities.hashtags[3].text'))),
            trim(lower(get_json_object(json_response, '$.entities.hashtags[4].text')))),
        array(
            trim(lower(get_json_object(json_response, '$.entities.user_mentions[0].screen_name'))),
            trim(lower(get_json_object(json_response, '$.entities.user_mentions[1].screen_name'))),
            trim(lower(get_json_object(json_response, '$.entities.user_mentions[2].screen_name'))),
            trim(lower(get_json_object(json_response, '$.entities.user_mentions[3].screen_name'))),
            trim(lower(get_json_object(json_response, '$.entities.user_mentions[4].screen_name')))),
        trim(lower(get_json_object(json_response, '$.entities.hashtags[0].text'))),
        trim(lower(get_json_object(json_response, '$.entities.user_mentions[0].screen_name'))),
        get_json_object(json_response, '$.user.screen_name'),
        get_json_object(json_response, '$.user.name'),
        cast (get_json_object(json_response, '$.user.followers_count') as INT),
        cast (get_json_object(json_response, '$.user.listed_count') as INT),
        cast (get_json_object(json_response, '$.user.friends_count') as INT),
        get_json_object(json_response, '$.user.lang'),
        get_json_object(json_response, '$.user.location'),
        get_json_object(json_response, '$.user.time_zone'),
        get_json_object(json_response, '$.user.profile_image_url'),
        json_response
    WHERE (length(json_response) > 500);
    
  2. 依序按 Ctrl + X,然後 Y 儲存檔案。

  3. 使用以下命令執行包含於檔案中的 HiveQL:

    beeline -u 'jdbc:hive2://headnodehost:10001/;transportMode=http' -i twitter.hql
    

    此命令會執行 twitter.hql 檔案。 當查詢完成時,您會看到 jdbc:hive2//localhost:10001/> 提示字元。

  4. 從 beeline 提示字元中,使用下列查詢來確認資料已匯入︰

    SELECT name, screen_name, count(1) as cc
    FROM tweets
    WHERE text like "%Azure%"
    GROUP BY name,screen_name
    ORDER BY cc DESC LIMIT 10;
    

    這個查詢會傳回最多 10 則訊息內容中包含文字 Azure 的推文。

    注意

    如果您變更了 gettweets.py 指令碼中的篩選條件,請將 Azure 取代為您使用的其中一個篩選條件。

下一步

您已經學會如何將非結構化的 JSON 資料集轉換成結構化的 Apache Hive 資料表。 若要深入了解 HDInsight 上的 Hive,請參閱下列文件: