次の方法で共有


HDInsight で Apache Hive と Apache Hadoop を使用して X データを分析する

Apache Hive を使用して X データを処理する方法を説明します。 結果として、特定の単語が含まれた最も多くのツイートを送信した X ユーザーのリストが返されます。

重要

このドキュメントの手順は、HDInsight 3.6 でテストされています。

データを取得する

X では、REST API を使用して、JavaScript Object Notation (JSON) ドキュメントとして各ツイートのデータを取得できます。 OAuth が必要です。

X アプリケーションを作成する

  1. Web ブラウザーで、https://developer.x.com にサインインします。 X アカウントを持っていない場合は、[今すぐ登録] リンクを選択します。

  2. [Create New App] を選択します。

  3. 名前説明Web サイトを入力します。 [Website] フィールドの URL を構成することができます。 次のテーブルは使用する値のサンプルを示しています。

    フィールド
    名前 MyHDInsightApp
    説明 MyHDInsightApp
    Web サイト https://www.myhdinsightapp.com
  4. [Yes, I agree] を選択して、[Create your Twitter application] を選択します。

  5. [Permissions] タブを選択します。既定のアクセス許可は 読み取り専用です。

  6. [Keys and Access Tokens] タブをクリックします。

  7. [Create my access token] を選択します。

  8. ページの右上隅にある [Test OAuth] を選択します。

  9. コンシューマー キーコンシューマー シークレットアクセス トークンアクセス トークン シークレットを書き留めます。

ツイートをダウンロードする

次の Python コードは、X から 10,000 個のツイートをダウンロードし、tweets.txt という名前のファイルに保存します。

Note

Python が既にインストールされているので、次の手順は HDInsight クラスターで実行します。

  1. ssh コマンドを使用してクラスターに接続します。 次のコマンドを編集して CLUSTERNAME をクラスターの名前に置き換えてから、そのコマンドを入力します。

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  2. 次のコマンドを使用して、TweepyProgressbar、およびその他の必要なパッケージをインストールします。

    sudo apt install python-dev libffi-dev libssl-dev
    sudo apt remove python-openssl
    python -m pip install virtualenv
    mkdir gettweets
    cd gettweets
    virtualenv gettweets
    source gettweets/bin/activate
    pip install tweepy progressbar pyOpenSSL requests[security]
    
  3. 次のコマンドを使用して、gettweets.py という名前のファイルを作成します。

    nano gettweets.py
    
  4. 次のコードを編集して、Your consumer secretYour consumer keyYour access token、および Your access token secret を、X アプリケーションの関連する情報に置き換えます。 次に、編集したコードを gettweets.py ファイルの内容として貼り付けます。

    #!/usr/bin/python
    
    from tweepy import Stream, OAuthHandler
    from tweepy.streaming import StreamListener
    from progressbar import ProgressBar, Percentage, Bar
    import json
    import sys
    
    #X app information
    consumer_secret='Your consumer secret'
    consumer_key='Your consumer key'
    access_token='Your access token'
    access_token_secret='Your access token secret'
    
    #The number of tweets we want to get
    max_tweets=100
    
    #Create the listener class that receives and saves tweets
    class listener(StreamListener):
        #On init, set the counter to zero and create a progress bar
        def __init__(self, api=None):
            self.num_tweets = 0
            self.pbar = ProgressBar(widgets=[Percentage(), Bar()], maxval=max_tweets).start()
    
        #When data is received, do this
        def on_data(self, data):
            #Append the tweet to the 'tweets.txt' file
            with open('tweets.txt', 'a') as tweet_file:
                tweet_file.write(data)
                #Increment the number of tweets
                self.num_tweets += 1
                #Check to see if we have hit max_tweets and exit if so
                if self.num_tweets >= max_tweets:
                    self.pbar.finish()
                    sys.exit(0)
                else:
                    #increment the progress bar
                    self.pbar.update(self.num_tweets)
            return True
    
        #Handle any errors that may occur
        def on_error(self, status):
            print status
    
    #Get the OAuth token
    auth = OAuthHandler(consumer_key, consumer_secret)
    auth.set_access_token(access_token, access_token_secret)
    #Use the listener class for stream processing
    twitterStream = Stream(auth, listener())
    #Filter for these topics
    twitterStream.filter(track=["azure","cloud","hdinsight"])
    

    ヒント

    よく使われているキーワードを追跡するには、最後の行のトピック フィルターを調整してください。 スクリプトの実行時によく使用されているキーワードを使用すると、高速にデータをキャプチャできます。

  5. Ctrl + X キーを押した後、Y キーを押してファイルを保存します。

  6. 次のコマンドを使用してファイルを実行し、ツイートをダウンロードします。

    python gettweets.py
    

    進行状況のインジケーターが表示されます。 このインジケーターは、ツイートのダウンロードの進行状況を 100% になるまでカウントします。

    Note

    進行が遅い場合は、フィルターを変更してトレンド トピックを追跡することをお勧めします。 フィルターしたトピックに関するツイートが多いほど、必要な 100 ツイートをすばやく取得できます。

データのアップロード

HDInsight のストレージにデータをアップロードするには、次のコマンドを使用します。

hdfs dfs -mkdir -p /tutorials/x/data
hdfs dfs -put tweets.txt /tutorials/x/data/tweets.txt

クラスター内のすべてのノードがアクセスできる場所にデータが保存されます。

HiveQL ジョブの実行

  1. 次のコマンドを使用して、HiveQL ステートメントを含むファイルを作成します。

    nano x.hql
    

    ファイルの内容として、次のテキストを使用します。

    set hive.exec.dynamic.partition = true;
    set hive.exec.dynamic.partition.mode = nonstrict;
    -- Drop table, if it exists
    DROP TABLE tweets_raw;
    -- Create it, pointing toward the tweets logged from X
    CREATE EXTERNAL TABLE tweets_raw (
        json_response STRING
    )
    STORED AS TEXTFILE LOCATION '/tutorials/x/data';
    -- Drop and recreate the destination table
    DROP TABLE tweets;
    CREATE TABLE tweets
    (
        id BIGINT,
        created_at STRING,
        created_at_date STRING,
        created_at_year STRING,
        created_at_month STRING,
        created_at_day STRING,
        created_at_time STRING,
        in_reply_to_user_id_str STRING,
        text STRING,
        contributors STRING,
        retweeted STRING,
        truncated STRING,
        coordinates STRING,
        source STRING,
        retweet_count INT,
        url STRING,
        hashtags array<STRING>,
        user_mentions array<STRING>,
        first_hashtag STRING,
        first_user_mention STRING,
        screen_name STRING,
        name STRING,
        followers_count INT,
        listed_count INT,
        friends_count INT,
        lang STRING,
        user_location STRING,
        time_zone STRING,
        profile_image_url STRING,
        json_response STRING
    );
    -- Select tweets from the imported data, parse the JSON,
    -- and insert into the tweets table
    FROM tweets_raw
    INSERT OVERWRITE TABLE tweets
    SELECT
        cast(get_json_object(json_response, '$.id_str') as BIGINT),
        get_json_object(json_response, '$.created_at'),
        concat(substr (get_json_object(json_response, '$.created_at'),1,10),' ',
        substr (get_json_object(json_response, '$.created_at'),27,4)),
        substr (get_json_object(json_response, '$.created_at'),27,4),
        case substr (get_json_object(json_response,    '$.created_at'),5,3)
            when "Jan" then "01"
            when "Feb" then "02"
            when "Mar" then "03"
            when "Apr" then "04"
            when "May" then "05"
            when "Jun" then "06"
            when "Jul" then "07"
            when "Aug" then "08"
            when "Sep" then "09"
            when "Oct" then "10"
            when "Nov" then "11"
            when "Dec" then "12" end,
        substr (get_json_object(json_response, '$.created_at'),9,2),
        substr (get_json_object(json_response, '$.created_at'),12,8),
        get_json_object(json_response, '$.in_reply_to_user_id_str'),
        get_json_object(json_response, '$.text'),
        get_json_object(json_response, '$.contributors'),
        get_json_object(json_response, '$.retweeted'),
        get_json_object(json_response, '$.truncated'),
        get_json_object(json_response, '$.coordinates'),
        get_json_object(json_response, '$.source'),
        cast (get_json_object(json_response, '$.retweet_count') as INT),
        get_json_object(json_response, '$.entities.display_url'),
        array(
            trim(lower(get_json_object(json_response, '$.entities.hashtags[0].text'))),
            trim(lower(get_json_object(json_response, '$.entities.hashtags[1].text'))),
            trim(lower(get_json_object(json_response, '$.entities.hashtags[2].text'))),
            trim(lower(get_json_object(json_response, '$.entities.hashtags[3].text'))),
            trim(lower(get_json_object(json_response, '$.entities.hashtags[4].text')))),
        array(
            trim(lower(get_json_object(json_response, '$.entities.user_mentions[0].screen_name'))),
            trim(lower(get_json_object(json_response, '$.entities.user_mentions[1].screen_name'))),
            trim(lower(get_json_object(json_response, '$.entities.user_mentions[2].screen_name'))),
            trim(lower(get_json_object(json_response, '$.entities.user_mentions[3].screen_name'))),
            trim(lower(get_json_object(json_response, '$.entities.user_mentions[4].screen_name')))),
        trim(lower(get_json_object(json_response, '$.entities.hashtags[0].text'))),
        trim(lower(get_json_object(json_response, '$.entities.user_mentions[0].screen_name'))),
        get_json_object(json_response, '$.user.screen_name'),
        get_json_object(json_response, '$.user.name'),
        cast (get_json_object(json_response, '$.user.followers_count') as INT),
        cast (get_json_object(json_response, '$.user.listed_count') as INT),
        cast (get_json_object(json_response, '$.user.friends_count') as INT),
        get_json_object(json_response, '$.user.lang'),
        get_json_object(json_response, '$.user.location'),
        get_json_object(json_response, '$.user.time_zone'),
        get_json_object(json_response, '$.user.profile_image_url'),
        json_response
    WHERE (length(json_response) > 500);
    
  2. Ctrl + X キーを押した後、Y キーを押してファイルを保存します。

  3. 次のコマンドを使用して、ファイルに含まれている HiveQL を実行します。

    beeline -u 'jdbc:hive2://headnodehost:10001/;transportMode=http' -i x.hql
    

    このコマンドは、x.hql ファイルを実行します。 クエリが完了すると、jdbc:hive2//localhost:10001/> プロンプトが表示されます。

  4. Beeline プロンプトで次のクエリを使用して、データがインポートされたことを確認します。

    SELECT name, screen_name, count(1) as cc
    FROM tweets
    WHERE text like "%Azure%"
    GROUP BY name,screen_name
    ORDER BY cc DESC LIMIT 10;
    

    このクエリでは、メッセージ テキストに Azure という単語が含まれた最大 10 個のツイートが返されます。

    Note

    gettweets.py スクリプトのフィルターを変更した場合は、Azure を、使用したフィルターのいずれかで置き換えてください。

次のステップ

ここでは、構造化されていない JSON データ セットを構造化された Apache Hive テーブルに変換する方法を学習しました。 HDInsight での Hive の詳細については、次のドキュメントを参照してください。