تحليل بيانات Twitter باستخدام Apache Hive وApache Hadoop على HDInsight

تعرف على كيفية استخدام Apache Hive لمعالجة بيانات Twitter. والنتيجة هي قائمة مستخدمي Twitter الذين أرسلوا معظم التغريدات التي تحتوي على كلمة معينة.

هام

تم اختبار الخطوات الواردة في هذا المستند على HDInsight 3.6.

الحصول على البيانات

يتيح لك Twitter استرداد البيانات لكل تغريدة كمستند JavaScript Object Notation (JSON) من خلال واجهة برمجة تطبيقات REST. OAuth مطلوب للمصادقة على واجهة برمجة التطبيقات.

إنشاء تطبيق Twitter

  1. من مستعرض ويب، سجّل الدخول إلى https://developer.twitter.com. حدد ارتباط التسجيل الآن إذا لم يكن لديك حساب على Twitter.

  2. حدد إنشاء تطبيق جديد.

  3. أدخل الاسم، الوصف، الموقع. يمكنك إنشاء عنوان URL لحقل الموقع. يعرض الجدول التالي بعض نماذج القيم لاستخدامها:

    الحقل القيمة
    الاسم MyHDInsightApp
    الوصف MyHDInsightApp
    موقع الويب https://www.myhdinsightapp.com
  4. حدد نعم، أوافق، ثم حدد إنشاء تطبيق Twitter الخاص بك.

  5. حدد علامة التبويب أذونات. الإذن الافتراضي هو قراءة فقط.

  6. حدد علامة التبويب مفاتيح ورموز الوصول المميزة.

  7. حدد إنشاء رمز الوصول المميز الخاص بي.

  8. حدد اختبار OAuth في الزاوية العلوية اليمنى من الصفحة.

  9. دوّن مفتاح المستهلك، والبيانات السرية للمستهلك، ورمز الوصول المميز، والبيانات السرية لرمز الوصول المميز.

تنزيل التغريدات

يقوم رمز Python التالي بتنزيل 10000 تغريدة من Twitter وحفظها في ملف باسم tweets.txt.

ملاحظة

يتم تنفيذ الخطوات التالية على نظام المجموعة HDInsight، بما أنه تم تثبيت Python بالفعل.

  1. استخدم الأمر ssh للاتصال بنظام المجموعة الخاص بك. قم بتحرير الأمر أدناه عن طريق استبدال اسم نظام المجموعة باسم نظام مجموعتك ثم إدخال الأمر:

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  2. استخدم الأوامر التالية لتثبيت Tweepy، وشريط التقدم، والحزم الأخرى المطلوبة:

    sudo apt install python-dev libffi-dev libssl-dev
    sudo apt remove python-openssl
    python -m pip install virtualenv
    mkdir gettweets
    cd gettweets
    virtualenv gettweets
    source gettweets/bin/activate
    pip install tweepy progressbar pyOpenSSL requests[security]
    
  3. استخدم الأمر التالي لإنشاء ملف باسم gettweets.py:

    nano gettweets.py
    
  4. قم بتحرير التعليمات البرمجية أدناه عن طريق استبدال Your consumer secret، وYour consumer key، وYour access token، وYour access token secret مع المعلومات ذات الصلة من تطبيق Twitter الخاص بك. ثم لصق التعليمات البرمجية المحررة كمحتويات ملف gettweets.py.

    #!/usr/bin/python
    
    from tweepy import Stream, OAuthHandler
    from tweepy.streaming import StreamListener
    from progressbar import ProgressBar, Percentage, Bar
    import json
    import sys
    
    #Twitter app information
    consumer_secret='Your consumer secret'
    consumer_key='Your consumer key'
    access_token='Your access token'
    access_token_secret='Your access token secret'
    
    #The number of tweets we want to get
    max_tweets=100
    
    #Create the listener class that receives and saves tweets
    class listener(StreamListener):
        #On init, set the counter to zero and create a progress bar
        def __init__(self, api=None):
            self.num_tweets = 0
            self.pbar = ProgressBar(widgets=[Percentage(), Bar()], maxval=max_tweets).start()
    
        #When data is received, do this
        def on_data(self, data):
            #Append the tweet to the 'tweets.txt' file
            with open('tweets.txt', 'a') as tweet_file:
                tweet_file.write(data)
                #Increment the number of tweets
                self.num_tweets += 1
                #Check to see if we have hit max_tweets and exit if so
                if self.num_tweets >= max_tweets:
                    self.pbar.finish()
                    sys.exit(0)
                else:
                    #increment the progress bar
                    self.pbar.update(self.num_tweets)
            return True
    
        #Handle any errors that may occur
        def on_error(self, status):
            print status
    
    #Get the OAuth token
    auth = OAuthHandler(consumer_key, consumer_secret)
    auth.set_access_token(access_token, access_token_secret)
    #Use the listener class for stream processing
    twitterStream = Stream(auth, listener())
    #Filter for these topics
    twitterStream.filter(track=["azure","cloud","hdinsight"])
    

    تلميح

    قم بضبط تصفية المواضيع على السطر الأخير لتعقب الكلمات الرئيسية الشائعة. يتيح لك استخدم الكلمات الرئيسية الشائعة في الوقت الذي تقوم فيه بتشغيل البرنامج النصي يسمح بحفظ البيانات بصورة أسرع.

  5. استخدم Ctrl + X، ثم Y لحفظ الملف.

  6. استخدم الأمر التالي لتشغيل الملف وتنزيل التغريدات:

    python gettweets.py
    

    يظهر مؤشر تقدم. وهو يعد ما يصل إلى 100% كما يتم تنزيل التغريدات.

    ملاحظة

    إذا كان الأمر يستغرق وقتاً طويلاً حتى يتقدم شريط التقدم، فيجب تغيير عامل التصفية لتعقب الموضوعات الرائجة. عندما يكون هناك العديد من التغريدات حول الموضوع في عامل التصفية لديك، يمكنك الحصول بسرعة على 100 تغريدة مطلوبة.

تحميل البيانات

لتحميل البيانات إلى تخزين HDInsight، استخدم الأوامر التالية:

hdfs dfs -mkdir -p /tutorials/twitter/data
hdfs dfs -put tweets.txt /tutorials/twitter/data/tweets.txt

تُخزن هذه الأوامر البيانات في موقع يمكن من خلاله وصول جميع العُقد في نظام المجموعة.

تشغيل مهمة HiveQL

  1. استخدم الأمر التالي لإنشاء ملف يحتوي على عبارات HiveQL:

    nano twitter.hql
    

    استخدم النص التالي كمحتويات هذا الملف:

    set hive.exec.dynamic.partition = true;
    set hive.exec.dynamic.partition.mode = nonstrict;
    -- Drop table, if it exists
    DROP TABLE tweets_raw;
    -- Create it, pointing toward the tweets logged from Twitter
    CREATE EXTERNAL TABLE tweets_raw (
        json_response STRING
    )
    STORED AS TEXTFILE LOCATION '/tutorials/twitter/data';
    -- Drop and recreate the destination table
    DROP TABLE tweets;
    CREATE TABLE tweets
    (
        id BIGINT,
        created_at STRING,
        created_at_date STRING,
        created_at_year STRING,
        created_at_month STRING,
        created_at_day STRING,
        created_at_time STRING,
        in_reply_to_user_id_str STRING,
        text STRING,
        contributors STRING,
        retweeted STRING,
        truncated STRING,
        coordinates STRING,
        source STRING,
        retweet_count INT,
        url STRING,
        hashtags array<STRING>,
        user_mentions array<STRING>,
        first_hashtag STRING,
        first_user_mention STRING,
        screen_name STRING,
        name STRING,
        followers_count INT,
        listed_count INT,
        friends_count INT,
        lang STRING,
        user_location STRING,
        time_zone STRING,
        profile_image_url STRING,
        json_response STRING
    );
    -- Select tweets from the imported data, parse the JSON,
    -- and insert into the tweets table
    FROM tweets_raw
    INSERT OVERWRITE TABLE tweets
    SELECT
        cast(get_json_object(json_response, '$.id_str') as BIGINT),
        get_json_object(json_response, '$.created_at'),
        concat(substr (get_json_object(json_response, '$.created_at'),1,10),' ',
        substr (get_json_object(json_response, '$.created_at'),27,4)),
        substr (get_json_object(json_response, '$.created_at'),27,4),
        case substr (get_json_object(json_response,    '$.created_at'),5,3)
            when "Jan" then "01"
            when "Feb" then "02"
            when "Mar" then "03"
            when "Apr" then "04"
            when "May" then "05"
            when "Jun" then "06"
            when "Jul" then "07"
            when "Aug" then "08"
            when "Sep" then "09"
            when "Oct" then "10"
            when "Nov" then "11"
            when "Dec" then "12" end,
        substr (get_json_object(json_response, '$.created_at'),9,2),
        substr (get_json_object(json_response, '$.created_at'),12,8),
        get_json_object(json_response, '$.in_reply_to_user_id_str'),
        get_json_object(json_response, '$.text'),
        get_json_object(json_response, '$.contributors'),
        get_json_object(json_response, '$.retweeted'),
        get_json_object(json_response, '$.truncated'),
        get_json_object(json_response, '$.coordinates'),
        get_json_object(json_response, '$.source'),
        cast (get_json_object(json_response, '$.retweet_count') as INT),
        get_json_object(json_response, '$.entities.display_url'),
        array(
            trim(lower(get_json_object(json_response, '$.entities.hashtags[0].text'))),
            trim(lower(get_json_object(json_response, '$.entities.hashtags[1].text'))),
            trim(lower(get_json_object(json_response, '$.entities.hashtags[2].text'))),
            trim(lower(get_json_object(json_response, '$.entities.hashtags[3].text'))),
            trim(lower(get_json_object(json_response, '$.entities.hashtags[4].text')))),
        array(
            trim(lower(get_json_object(json_response, '$.entities.user_mentions[0].screen_name'))),
            trim(lower(get_json_object(json_response, '$.entities.user_mentions[1].screen_name'))),
            trim(lower(get_json_object(json_response, '$.entities.user_mentions[2].screen_name'))),
            trim(lower(get_json_object(json_response, '$.entities.user_mentions[3].screen_name'))),
            trim(lower(get_json_object(json_response, '$.entities.user_mentions[4].screen_name')))),
        trim(lower(get_json_object(json_response, '$.entities.hashtags[0].text'))),
        trim(lower(get_json_object(json_response, '$.entities.user_mentions[0].screen_name'))),
        get_json_object(json_response, '$.user.screen_name'),
        get_json_object(json_response, '$.user.name'),
        cast (get_json_object(json_response, '$.user.followers_count') as INT),
        cast (get_json_object(json_response, '$.user.listed_count') as INT),
        cast (get_json_object(json_response, '$.user.friends_count') as INT),
        get_json_object(json_response, '$.user.lang'),
        get_json_object(json_response, '$.user.location'),
        get_json_object(json_response, '$.user.time_zone'),
        get_json_object(json_response, '$.user.profile_image_url'),
        json_response
    WHERE (length(json_response) > 500);
    
  2. لحفظ الملف، اضغط على Ctrl + X، ثم y.

  3. استخدم الأمر التالي لتشغيل HiveQL المضمنة في الملف:

    beeline -u 'jdbc:hive2://headnodehost:10001/;transportMode=http' -i twitter.hql
    

    يقوم هذا الأمر بتشغيل الملف twitter.hql. بمجرد اكتمال الاستعلام، تظهر مطالبة jdbc:hive2//localhost:10001/>.

  4. من مطالبة beeline، استخدم الاستعلام التالي للتحقق من استيراد البيانات:

    SELECT name, screen_name, count(1) as cc
    FROM tweets
    WHERE text like "%Azure%"
    GROUP BY name,screen_name
    ORDER BY cc DESC LIMIT 10;
    

    يقوم هذا الاستعلام بإرجاع 10 تغريدات كحد أقصى تحتوي على كلمة Azure في نص الرسالة.

    ملاحظة

    إذا قمت بتغيير عامل التصفية في البرنامج النصي gettweets.py، استبدل Azure بأحد عوامل التصفية التي استخدمتها.

الخطوات التالية

لقد تعلمت كيفية تحويل مجموعة بيانات JSON غير المصنفة إلى جدول Apache Hive مصنف. لمعرفة المزيد حول Apache Hive على HDInsight، راجع المستندات التالية: