Aracılığıyla paylaş


Veri analizi işlem hattını kullanıma hazır hale getirme

Veri işlem hatlarının altında birçok veri analizi çözümü bulunur. Adından da anlaşılacağı gibi, bir veri işlem hattı ham verileri alır, gerektiğinde temizler ve yeniden şekillendirır ve ardından işlenen verileri depolamadan önce genellikle hesaplamalar veya toplamalar yapar. İşlenen veriler istemciler, raporlar veya API'ler tarafından kullanılır. Veri işlem hattı, zamanlamaya göre veya yeni veriler tarafından tetiklendiğinde yinelenebilir sonuçlar sağlamalıdır.

Bu makalede, HDInsight Hadoop kümelerinde çalışan Oozie'yi kullanarak veri işlem hatlarınızı tekrarlanabilirlik için nasıl kullanıma hazır hale getirmeniz açıklanır. Örnek senaryo, havayolu uçuş zaman serisi verilerini hazırlayan ve işleyen bir veri işlem hattında size yol gösterir.

Aşağıdaki senaryoda, giriş verileri bir ay boyunca toplu uçuş verilerini içeren düz bir dosyadır. Bu uçuş verileri, çıkış ve varış havaalanı, uçurulan mil sayısı, kalkış ve varış saatleri gibi bilgileri içerir. Bu işlem hattının amacı, her havayolunun her gün için dakika cinsinden ortalama kalkış ve varış gecikmeleri ile bir satıra sahip olduğu ve o gün akan toplam mil sayısı olan günlük havayolu performansını özetlemektir.

YEAR MONTH DAY_OF_MONTH TAŞIYICI AVG_DEP_DELAY AVG_ARR_DELAY TOTAL_DISTANCE
2017 1 3 AA 10.142229 7.862926 2644539
2017 1 3 AS 9.435449 5.482143 572289
2017 1 3 DL 6.935409 -2.1893024 1909696

Örnek işlem hattı, yeni bir zaman diliminin uçuş verileri gelene kadar bekler ve ardından bu ayrıntılı uçuş bilgilerini uzun vadeli analizler için Apache Hive veri ambarınıza depolar. İşlem hattı ayrıca yalnızca günlük uçuş verilerini özetleyen çok daha küçük bir veri kümesi oluşturur. Bu günlük uçuş özeti verileri, web sitesi gibi raporlar sağlamak üzere bir SQL Veritabanı gönderilir.

Aşağıdaki diyagramda örnek işlem hattı gösterilmektedir.

HDI flight example data pipeline overview.

Apache Oozie çözümüne genel bakış

Bu işlem hattı, HDInsight Hadoop kümesinde çalışan Apache Oozie kullanır.

Oozie işlem hatlarını eylemler, iş akışları ve koordinatörler açısından açıklar. Eylemler, Hive sorgusu çalıştırma gibi gerçekleştirilecek fiili çalışmayı belirler. İş akışları eylem dizisini tanımlar. Koordinatörler, iş akışının ne zaman çalıştırılacağını belirler. Koordinatörler, iş akışının bir örneğini başlatmadan önce yeni verilerin kullanılabilirliğini de bekleyebilir.

Aşağıdaki diyagramda bu örnek Oozie işlem hattının üst düzey tasarımı gösterilmektedir.

Oozie Flight example Data Pipeline.

Azure kaynaklarını sağlama

Bu işlem hattı, aynı konumda bir Azure SQL Veritabanı ve hdInsight Hadoop kümesi gerektirir. Azure SQL Veritabanı hem işlem hattı tarafından üretilen özet verileri hem de Oozie Meta Veri Deposu'nı depolar.

Sağlama Azure SQL Veritabanı

  1. bir Azure SQL Veritabanı oluşturun. Bkz. Azure portalında Azure SQL Veritabanı oluşturma.

  2. HDInsight kümenizin bağlı Azure SQL Veritabanı erişebildiğinden emin olmak için Azure SQL Veritabanı güvenlik duvarı kurallarını Azure hizmetlerinin ve kaynaklarının sunucuya erişmesine izin verecek şekilde yapılandırın. Bu seçeneği Azure portalında Sunucu güvenlik duvarını ayarla'yı seçip Azure hizmetlerinin ve kaynaklarının Azure SQL Veritabanı için bu sunucuya erişmesine izin ver'in altında ON seçeneğini belirleyerek etkinleştirebilirsiniz. Daha fazla bilgi için bkz. IP güvenlik duvarı kurallarını oluşturma ve yönetme.

  3. İşlem hattının her çalıştırmasından özetlenmiş verileri depolayacak tabloyu oluşturmak dailyflights üzere aşağıdaki SQL deyimlerini yürütmek için Sorgu düzenleyicisini kullanın.

    CREATE TABLE dailyflights
    (
        YEAR INT,
        MONTH INT,
        DAY_OF_MONTH INT,
        CARRIER CHAR(2),
        AVG_DEP_DELAY FLOAT,
        AVG_ARR_DELAY FLOAT,
        TOTAL_DISTANCE FLOAT
    )
    GO
    
    CREATE CLUSTERED INDEX dailyflights_clustered_index on dailyflights(YEAR,MONTH,DAY_OF_MONTH,CARRIER)
    GO
    

Azure SQL Veritabanı artık hazır.

Apache Hadoop Kümesi Sağlama

Özel bir meta veri deposuyla Apache Hadoop kümesi oluşturun. Portaldan küme oluşturma sırasında, Depolama sekmesinden Meta veri deposu ayarları altında SQL Veritabanı seçtiğinizden emin olun. Meta veri deposu seçme hakkında daha fazla bilgi için bkz . Küme oluşturma sırasında özel meta veri deposu seçme. Küme oluşturma hakkında daha fazla bilgi için bkz . Linux üzerinde HDInsight kullanmaya başlama.

SSH tünel kurulumunu doğrulama

Koordinatörünüzün ve iş akışı örneklerinizin durumunu görüntülemek için Oozie Web Konsolu'nu kullanmak için HDInsight kümenize bir SSH tüneli ayarlayın. Daha fazla bilgi için bkz . SSH Tüneli.

Not

Kümenizin web kaynaklarına SSH tüneli boyunca göz atmak için Chrome'ı Foxy Proxy uzantısıyla da kullanabilirsiniz. Bunu, tünelin 9876 numaralı bağlantı noktasındaki konak localhost aracılığıyla tüm isteklere ara sunucu olarak yapılandırın. Bu yaklaşım, Windows 10'da Bash olarak da bilinen Linux için Windows Alt Sistemi ile uyumludur.

  1. Kümenizin adı olan CLUSTERNAME kümenize bir SSH tüneli açmak için aşağıdaki komutu çalıştırın:

    ssh -C2qTnNf -D 9876 sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  2. Şu konuma göz atarak baş düğümünüzün Ambari'sine giderek tünelin çalıştığını doğrulayın:

    http://headnodehost:8080

  3. Ambari'nin içinden Oozie Web Konsolu'na erişmek için Oozie>Hızlı Bağlantılar> [Etkin sunucu] >Oozie Web Kullanıcı Arabirimi'ne gidin.

Hive'ı yapılandırma

Verileri karşıya yükleme

  1. Bir ay boyunca uçuş verilerini içeren örnek bir CSV dosyasını indirin. ZIP dosyasını 2017-01-FlightData.zip HDInsight GitHub deposundan indirin ve CSV dosyasına 2017-01-FlightData.csvaçın.

  2. Bu CSV dosyasını HDInsight kümenize bağlı Azure Depolama hesabına kopyalayın ve klasörüne /example/data/flights yerleştirin.

    1. Dosyaları yerel makinenizden HDInsight küme baş düğümünüzün yerel depolama alanına kopyalamak için SCP kullanın.

      scp ./2017-01-FlightData.csv sshuser@CLUSTERNAME-ssh.azurehdinsight.net:2017-01-FlightData.csv
      
    2. Kümenize bağlanmak için ssh komutunu kullanın. öğesini kümenizin adıyla değiştirerek CLUSTERNAME aşağıdaki komutu düzenleyin ve komutunu girin:

      ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
      
    3. Ssh oturumunuzda HDFS komutunu kullanarak dosyayı baş düğüm yerel depolama alanından Azure Depolama'a kopyalayın.

      hadoop fs -mkdir /example/data/flights
      hdfs dfs -put ./2017-01-FlightData.csv /example/data/flights/2017-01-FlightData.csv
      

Tablo oluştur

Örnek veriler artık kullanılabilir. Ancak işlem hattı, işlem için biri gelen veriler () ve biri özetlenmiş verilerflights (rawFlights) için iki Hive tablosu gerektirir. Ambari'de bu tabloları aşağıdaki gibi oluşturun.

  1. adresine giderek Ambari'de http://headnodehost:8080oturum açın.

  2. Hizmet listesinden Hive'ı seçin.

    Apache Ambari services list selecting Hive.

  3. Hive Görünümü 2.0 etiketinin yanındaki Görünüme Git'i seçin.

    Ambari Apache Hive summary list.

  4. Sorgu metni alanına aşağıdaki deyimleri yapıştırarak rawFlights tabloyu oluşturun. Tablo, rawFlights Azure Depolama klasöründeki CSV dosyaları /example/data/flights için okunan bir şema sağlar.

    CREATE EXTERNAL TABLE IF NOT EXISTS rawflights (
        YEAR INT,
        MONTH INT,
        DAY_OF_MONTH INT,
        FL_DATE STRING,
        CARRIER STRING,
        FL_NUM STRING,
        ORIGIN STRING,
        DEST STRING,
        DEP_DELAY FLOAT,
        ARR_DELAY FLOAT,
        ACTUAL_ELAPSED_TIME FLOAT,
        DISTANCE FLOAT)
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
    WITH SERDEPROPERTIES
    (
        "separatorChar" = ",",
        "quoteChar"     = "\""
    )
    LOCATION '/example/data/flights'
    
  5. Tabloyu oluşturmak için Yürüt'e tıklayın.

    hdi ambari services hive query.

  6. Tabloyu oluşturmak flights için, sorgu metin alanındaki metni aşağıdaki deyimlerle değiştirin. Tablo flights , içine yüklenen verileri yıla, aya ve ayın gününe göre bölümleyen Hive tarafından yönetilen bir tablodur. Bu tablo, her uçuş için bir satırın kaynak verilerinde en düşük ayrıntı düzeyine sahip olan tüm geçmiş uçuş verilerini içerir.

    SET hive.exec.dynamic.partition.mode=nonstrict;
    
    CREATE TABLE flights
    (
        FL_DATE STRING,
        CARRIER STRING,
        FL_NUM STRING,
        ORIGIN STRING,
        DEST STRING,
        DEP_DELAY FLOAT,
        ARR_DELAY FLOAT,
        ACTUAL_ELAPSED_TIME FLOAT,
        DISTANCE FLOAT
    )
    PARTITIONED BY (YEAR INT, MONTH INT, DAY_OF_MONTH INT)
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
    WITH SERDEPROPERTIES 
    (
        "separatorChar" = ",",
        "quoteChar"     = "\""
    );
    
  7. Tabloyu oluşturmak için Yürüt'e tıklayın.

Oozie iş akışını oluşturma

İşlem hatları genellikle verileri belirli bir zaman aralığına göre toplu olarak işler. Bu durumda işlem hattı, uçuş verilerini günlük olarak işler. Bu yaklaşım giriş CSV dosyalarının günlük, haftalık, aylık veya yıllık olarak ulaşmasını sağlar.

Örnek iş akışı, üç ana adımda günlük olarak uçuş verilerini işler:

  1. Tablo tarafından rawFlights temsil edilen kaynak CSV dosyasından o günün tarih aralığına ait verileri ayıklamak ve verileri tabloya flights eklemek için bir Hive sorgusu çalıştırın.
  2. Hive'da gün ve taşıyıcıya göre özetlenen uçuş verilerinin bir kopyasını içeren bir hazırlama tablosu oluşturmak için bir Hive sorgusu çalıştırın.
  3. Hive'daki günlük hazırlama tablosundaki tüm verileri Azure SQL Veritabanı hedef dailyflights tabloya kopyalamak için Apache Sqoop kullanın. Sqoop, Azure Depolama'da bulunan Hive tablosunun arkasındaki verilerden kaynak satırları okur ve bunları JDBC bağlantısı kullanarak SQL Veritabanı yükler.

Bu üç adım bir Oozie iş akışı tarafından koordine edilir.

  1. Yerel iş istasyonunuzda adlı job.propertiesbir dosya oluşturun. Dosyanın başlangıç içeriği olarak aşağıdaki metni kullanın. Ardından, belirli ortamınız için değerleri güncelleştirin. Metnin altındaki tablo özelliklerin her birini özetler ve kendi ortamınız için değerleri nerede bulabileceğinizi gösterir.

    nameNode=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net
    jobTracker=[ACTIVERESOURCEMANAGER]:8050
    queueName=default
    oozie.use.system.libpath=true
    appBase=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net/oozie
    oozie.wf.application.path=${appBase}/load_flights_by_day
    hiveScriptLoadPartition=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net/oozie/load_flights_by_day/hive-load-flights-partition.hql
    hiveScriptCreateDailyTable=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net/oozie/load_flights_by_day/hive-create-daily-summary-table.hql
    hiveDailyTableName=dailyflights${year}${month}${day}
    hiveDataFolder=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net/example/data/flights/day/${year}/${month}/${day}
    sqlDatabaseConnectionString="jdbc:sqlserver://[SERVERNAME].database.windows.net;user=[USERNAME];password=[PASSWORD];database=[DATABASENAME]"
    sqlDatabaseTableName=dailyflights
    year=2017
    month=01
    day=03
    
    Özellik Değer kaynağı
    nameNode HDInsight kümenize bağlı Azure Depolama Kapsayıcısının tam yolu.
    jobTracker Etkin kümenizin YARN baş düğümü için iç konak adı. Ambari giriş sayfasında, hizmetler listesinden YARN'ı ve ardından Etkin Resource Manager'ı seçin. Ana bilgisayar adı URI'si sayfanın üst kısmında görüntülenir. 8050 numaralı bağlantı noktasını ekleme.
    queueName Hive eylemlerini zamanlarken kullanılan YARN kuyruğunun adı. Varsayılan değerde bırakın.
    oozie.use.system.libpath Doğru olarak bırak.
    appBase Azure'da Oozie iş akışını ve destekleyici dosyaları dağıttığınız alt klasörün yolu Depolama.
    oozie.wf.application.path Çalıştırılacak Oozie iş akışının workflow.xml konumu.
    hiveScriptLoadPartition Azure'daki yol Hive sorgu dosyasına hive-load-flights-partition.hqlDepolama.
    hiveScriptCreateDailyTable Azure'daki yol Hive sorgu dosyasına hive-create-daily-summary-table.hqlDepolama.
    hiveDailyTableName Hazırlama tablosu için kullanılacak dinamik olarak oluşturulan ad.
    hiveDataFolder Azure'daki yol, hazırlama tablosunun içerdiği verilere Depolama.
    sqlDatabase Bağlan ionString JDBC söz dizimi Azure SQL Veritabanı bağlantı dizesi.
    sqlDatabaseTableName özet satırların eklendiği Azure SQL Veritabanı içindeki tablonun adı. olarak dailyflightsbırakın.
    yıl Uçuş özetlerinin hesaplandığı günün yıl bileşeni. Olduğu gibi bırakın.
    aya Uçuş özetlerinin hesaplandığı günün ay bileşeni. Olduğu gibi bırakın.
    gün Uçuş özetlerinin hesaplandığı günün ayın günü bileşeni. Olduğu gibi bırakın.
  2. Yerel iş istasyonunuzda adlı hive-load-flights-partition.hqlbir dosya oluşturun. Dosyanın içeriği olarak aşağıdaki kodu kullanın.

    SET hive.exec.dynamic.partition.mode=nonstrict;
    
    INSERT OVERWRITE TABLE flights
    PARTITION (YEAR, MONTH, DAY_OF_MONTH)
    SELECT 
          FL_DATE,
          CARRIER,
          FL_NUM,
          ORIGIN,
          DEST,
          DEP_DELAY,
          ARR_DELAY,
          ACTUAL_ELAPSED_TIME,
          DISTANCE,
        YEAR,
          MONTH,
          DAY_OF_MONTH
    FROM rawflights
    WHERE year = ${year} AND month = ${month} AND day_of_month = ${day};
    

    Oozie değişkenleri söz dizimini ${variableName}kullanır. Bu değişkenler dosyasında ayarlanır job.properties . Oozie çalışma zamanında gerçek değerlerin yerini alır.

  3. Yerel iş istasyonunuzda adlı hive-create-daily-summary-table.hqlbir dosya oluşturun. Dosyanın içeriği olarak aşağıdaki kodu kullanın.

    DROP TABLE ${hiveTableName};
    CREATE EXTERNAL TABLE ${hiveTableName}
    (
        YEAR INT,
          MONTH INT,
          DAY_OF_MONTH INT,
          CARRIER STRING,
          AVG_DEP_DELAY FLOAT,
          AVG_ARR_DELAY FLOAT,
          TOTAL_DISTANCE FLOAT
    )
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY '\t' STORED AS TEXTFILE LOCATION '${hiveDataFolder}';
    INSERT OVERWRITE TABLE ${hiveTableName}
    SELECT     year, month, day_of_month, carrier, avg(dep_delay) avg_dep_delay, 
            avg(arr_delay) avg_arr_delay, sum(distance) total_distance 
    FROM flights
    GROUP BY year, month, day_of_month, carrier 
    HAVING year = ${year} AND month = ${month} AND day_of_month = ${day};
    

    Bu sorgu, yalnızca bir gün için özetlenmiş verileri depolayacak bir hazırlama tablosu oluşturur, operatör tarafından günlük ortalama gecikmeleri ve toplam mesafeyi hesaplayan SELECT deyimini not alın. Bu tabloya eklenen veriler, bir sonraki adımda Sqoop kaynağı olarak kullanılabilmesi için bilinen bir konumda (hiveDataFolder değişkeni tarafından belirtilen yol) depolanır.

  4. Yerel iş istasyonunuzda adlı workflow.xmlbir dosya oluşturun. Dosyanın içeriği olarak aşağıdaki kodu kullanın. Yukarıdaki adımlar Oozie iş akışı dosyasında ayrı eylemler olarak ifade edilir.

    <workflow-app name="loadflightstable" xmlns="uri:oozie:workflow:0.5">
        <start to = "RunHiveLoadFlightsScript"/>
        <action name="RunHiveLoadFlightsScript">
            <hive xmlns="uri:oozie:hive-action:0.2">
                <job-tracker>${jobTracker}</job-tracker>
                <name-node>${nameNode}</name-node>
                <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
                </configuration>
                <script>${hiveScriptLoadPartition}</script>
                <param>year=${year}</param>
                <param>month=${month}</param>
                <param>day=${day}</param>
            </hive>
            <ok to="RunHiveCreateDailyFlightTableScript"/>
            <error to="fail"/>
        </action>
    
        <action name="RunHiveCreateDailyFlightTableScript">
            <hive xmlns="uri:oozie:hive-action:0.2">
                <job-tracker>${jobTracker}</job-tracker>
                <name-node>${nameNode}</name-node>
                <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
                </configuration>
                <script>${hiveScriptCreateDailyTable}</script>
                <param>hiveTableName=${hiveDailyTableName}</param>
                <param>year=${year}</param>
                <param>month=${month}</param>
                <param>day=${day}</param>
                <param>hiveDataFolder=${hiveDataFolder}/${year}/${month}/${day}</param>
            </hive>
            <ok to="RunSqoopExport"/>
            <error to="fail"/>
        </action>
    
        <action name="RunSqoopExport">
            <sqoop xmlns="uri:oozie:sqoop-action:0.2">
                <job-tracker>${jobTracker}</job-tracker>
                <name-node>${nameNode}</name-node>
                <configuration>
                <property>
                    <name>mapred.compress.map.output</name>
                    <value>true</value>
                </property>
                </configuration>
                <arg>export</arg>
                <arg>--connect</arg>
                <arg>${sqlDatabaseConnectionString}</arg>
                <arg>--table</arg>
                <arg>${sqlDatabaseTableName}</arg>
                <arg>--export-dir</arg>
                <arg>${hiveDataFolder}/${year}/${month}/${day}</arg>
                <arg>-m</arg>
                <arg>1</arg>
                <arg>--input-fields-terminated-by</arg>
                <arg>"\t"</arg>
                <archive>mssql-jdbc-7.0.0.jre8.jar</archive>
                </sqoop>
            <ok to="end"/>
            <error to="fail"/>
        </action>
        <kill name="fail">
            <message>Job failed, error message[${wf:errorMessage(wf:lastErrorNode())}] </message>
        </kill>
        <end name="end"/>
    </workflow-app>
    

İki Hive sorgusuna Azure Depolama'daki yolları tarafından erişilir ve kalan değişken değerleri dosya tarafından job.properties sağlanır. Bu dosya, iş akışını 3 Ocak 2017 tarihinde çalışacak şekilde yapılandırır.

Oozie iş akışını dağıtma ve çalıştırma

Oozie iş akışınızı (workflow.xml), Hive sorgularını (hive-load-flights-partition.hql ve ) ve hive-create-daily-summary-table.hqliş yapılandırmasını (job.properties ) dağıtmak için bash oturumunuzdan SCP kullanın. Oozie'de baş düğümün job.properties yerel depolamasında yalnızca dosya bulunabilir. Diğer tüm dosyalar HDFS'de( bu örnekte Azure Depolama) depolanmalıdır. İş akışı tarafından kullanılan Sqoop eylemi, SQL Veritabanı ile iletişim kurmak için bir JDBC sürücüsüne bağlıdır ve bu sürücü baş düğümden HDFS'ye kopyalanmalıdır.

  1. Baş düğümün load_flights_by_day yerel depolama alanında kullanıcının yolunun altında alt klasörü oluşturun. Açık ssh oturumunuzda aşağıdaki komutu yürütebilirsiniz:

    mkdir load_flights_by_day
    
  2. Geçerli dizindeki tüm dosyaları ( workflow.xml ve job.properties dosyaları) alt klasöre load_flights_by_day kopyalayın. Yerel iş istasyonunuzda aşağıdaki komutu yürütür:

    scp ./* sshuser@CLUSTERNAME-ssh.azurehdinsight.net:load_flights_by_day
    
  3. İş akışı dosyalarını HDFS'ye kopyalayın. Açık ssh oturumunuzda aşağıdaki komutları yürütebilirsiniz:

    cd load_flights_by_day
    hadoop fs -mkdir -p /oozie/load_flights_by_day
    hdfs dfs -put ./* /oozie/load_flights_by_day
    
  4. Yerel baş düğümden HDFS'deki iş akışı klasörüne kopyalayın mssql-jdbc-7.0.0.jre8.jar . Kümeniz farklı bir jar dosyası içeriyorsa komutu gerektiği gibi düzeltin. workflow.xml Farklı bir jar dosyasını yansıtmak için gerektiği şekilde düzeltin. Açık ssh oturumunuzda aşağıdaki komutu yürütebilirsiniz:

    hdfs dfs -put /usr/share/java/sqljdbc_7.0/enu/mssql-jdbc*.jar /oozie/load_flights_by_day
    
  5. İş akışını çalıştırma Açık ssh oturumunuzda aşağıdaki komutu yürütebilirsiniz:

    oozie job -config job.properties -run
    
  6. Oozie Web Konsolu'nu kullanarak durumu gözlemleyin. Ambari'nin içinden Oozie, Hızlı Bağlantılar ve ardından Oozie Web Konsolu'nu seçin. İş Akışı İşleri sekmesinin altında Tüm İşler'i seçin.

    hdi oozie web console workflows.

  7. Durum BAŞARILI olduğunda, eklenen satırları görüntülemek için SQL Veritabanı tablosunu sorgular. Azure portalını kullanarak SQL Veritabanı bölmeye gidin, Araçlar'ı seçin ve Sorgu Düzenleyicisi açın.

    SELECT * FROM dailyflights
    

Artık iş akışı tek bir test günü için çalıştığına göre, iş akışını günlük çalışacak şekilde zamanlayan bir koordinatörle sarmalayabilirsiniz.

İş akışını bir koordinatörle çalıştırma

Bu iş akışını günlük (veya tarih aralığındaki tüm günler) çalışacak şekilde zamanlamak için bir koordinatör kullanabilirsiniz. Koordinatör bir XML dosyası tarafından tanımlanır, örneğin coordinator.xml:

<coordinator-app name="daily_export" start="2017-01-01T00:00Z" end="2017-01-05T00:00Z" frequency="${coord:days(1)}" timezone="UTC" xmlns="uri:oozie:coordinator:0.4">
    <datasets>
        <dataset name="ds_input1" frequency="${coord:days(1)}" initial-instance="2016-12-31T00:00Z" timezone="UTC">
            <uri-template>${sourceDataFolder}${YEAR}-${MONTH}-FlightData.csv</uri-template>
            <done-flag></done-flag>
        </dataset>
    </datasets>
    <input-events>
        <data-in name="event_input1" dataset="ds_input1">
            <instance>${coord:current(0)}</instance>
        </data-in>
    </input-events>
    <action>
        <workflow>
            <app-path>${appBase}/load_flights_by_day</app-path>
            <configuration>
                <property>
                    <name>year</name>
                    <value>${coord:formatTime(coord:nominalTime(), 'yyyy')}</value>
                </property>
                <property>
                    <name>month</name>
                    <value>${coord:formatTime(coord:nominalTime(), 'MM')}</value>
                </property>
                <property>
                    <name>day</name>
                    <value>${coord:formatTime(coord:nominalTime(), 'dd')}</value>
                </property>
                <property>
                    <name>hiveScriptLoadPartition</name>
                    <value>${hiveScriptLoadPartition}</value>
                </property>
                <property>
                    <name>hiveScriptCreateDailyTable</name>
                    <value>${hiveScriptCreateDailyTable}</value>
                </property>
                <property>
                    <name>hiveDailyTableNamePrefix</name>
                    <value>${hiveDailyTableNamePrefix}</value>
                </property>
                <property>
                    <name>hiveDailyTableName</name>
                    <value>${hiveDailyTableNamePrefix}${coord:formatTime(coord:nominalTime(), 'yyyy')}${coord:formatTime(coord:nominalTime(), 'MM')}${coord:formatTime(coord:nominalTime(), 'dd')}</value>
                </property>
                <property>
                    <name>hiveDataFolderPrefix</name>
                    <value>${hiveDataFolderPrefix}</value>
                </property>
                <property>
                    <name>hiveDataFolder</name>
                    <value>${hiveDataFolderPrefix}${coord:formatTime(coord:nominalTime(), 'yyyy')}/${coord:formatTime(coord:nominalTime(), 'MM')}/${coord:formatTime(coord:nominalTime(), 'dd')}</value>
                </property>
                <property>
                    <name>sqlDatabaseConnectionString</name>
                    <value>${sqlDatabaseConnectionString}</value>
                </property>
                <property>
                    <name>sqlDatabaseTableName</name>
                    <value>${sqlDatabaseTableName}</value>
                </property>
            </configuration>
        </workflow>
    </action>
</coordinator-app>

Gördüğünüz gibi, koordinatörün çoğunluğu yalnızca yapılandırma bilgilerini iş akışı örneğine geçiriyor. Ancak, çağrılacak birkaç önemli öğe vardır.

  • 1. Nokta: start Öğenin üzerindeki coordinator-app ve end öznitelikleri, koordinatörün çalıştığı zaman aralığını denetler.

    <coordinator-app ... start="2017-01-01T00:00Z" end="2017-01-05T00:00Z" frequency="${coord:days(1)}" ...>
    

    Koordinatör, özniteliği tarafından belirtilen aralığa göre ve end tarih aralığındaki eylemleri start zamanlamakla frequency sorumludur. Zamanlanan her eylem de iş akışını yapılandırıldığı gibi çalıştırır. Yukarıdaki koordinatör tanımında, koordinatör 1 Ocak 2017 ile 5 Ocak 2017 dönemindeki eylemleri çalıştıracak şekilde yapılandırılmıştır. Sıklık, Oozie İfade Dili frekans ifadesi ${coord:days(1)}tarafından bir gün olarak ayarlanır. Bu, koordinatörün günde bir eylem (ve dolayısıyla iş akışı) zamanlamasını sağlar. Bu örnekte olduğu gibi geçmişteki tarih aralıkları için eylem gecikme olmadan çalışacak şekilde zamanlanır. Bir eylemin çalıştırılacak zamanlandığı tarihin başlangıcına nominal saat adı verilir. Örneğin, 1 Ocak 2017 verilerini işlemek için koordinatör, 2017-01-01T00:00:00 GMT nominal zamanına sahip bir eylem zamanlar.

  • 2. Nokta: İş akışının tarih aralığında, dataset öğesi belirli bir tarih aralığına ait verilerin HDFS'de nereye bakileceğini belirtir ve Oozie'nin verilerin henüz işlenmek üzere kullanılabilir olup olmadığını nasıl belirleyeceğini yapılandırır.

    <dataset name="ds_input1" frequency="${coord:days(1)}" initial-instance="2016-12-31T00:00Z" timezone="UTC">
        <uri-template>${sourceDataFolder}${YEAR}-${MONTH}-FlightData.csv</uri-template>
        <done-flag></done-flag>
    </dataset>
    

    HDFS'deki verilerin yolu, öğesinde uri-template sağlanan ifadeye göre dinamik olarak oluşturulur. Bu koordinatörde, veri kümesiyle bir gün sıklığı da kullanılır. Koordinatör öğesindeki başlangıç ve bitiş tarihleri, eylemlerin zamanlandığı zamanı denetlerken (ve bunların nominal saatlerini tanımlar), initial-instance veri kümesindeki ve frequency değeri, oluşturmada kullanılan tarihin hesaplanması işlemini uri-templatedenetler. Bu durumda, ilk günün (1 Ocak 2017) değerindeki verileri aldığından emin olmak için ilk örneği koordinatörün başlangıcından bir gün öncesine ayarlayın. Veri kümesinin tarih hesaplaması, koordinatör tarafından ayarlanan nominal süreyi (ilk eylem için 2017-01-01T00:00:00 GMT) geçirmeyen en son tarihi bulana kadar veri kümesi sıklığı (bir gün) artışlarıyla ilerleyen (12/31/2016) değerinden initial-instance ileri doğru ilerler.

    Boş done-flag öğesi, Oozie belirtilen zamanda giriş verilerinin varlığını denetlediğinde, Oozie'nin bir dizin veya dosya bulunup bulunmadığını belirler. Bu durumda, bir csv dosyasının varlığıdır. Bir csv dosyası varsa, Oozie verilerin hazır olduğunu varsayar ve dosyayı işlemek için bir iş akışı örneği başlatır. Csv dosyası yoksa, Oozie verilerin henüz hazır olmadığını ve iş akışı çalıştırmasının bekleme durumuna geçtiğini varsayar.

  • 3. Nokta: öğesi, data-in ilişkili veri kümesi için içindeki değerleri uri-template değiştirirken nominal süre olarak kullanılacak belirli zaman damgasını belirtir.

    <data-in name="event_input1" dataset="ds_input1">
        <instance>${coord:current(0)}</instance>
    </data-in>
    

    Bu durumda, örneğini ifadesi ${coord:current(0)}olarak ayarlayın. Bu ifade, başlangıçta koordinatör tarafından zamanlandığı gibi eylemin nominal zamanını kullanmaya çevrilir. Başka bir deyişle, koordinatör eylemi 01/01/2017 nominal zamanıyla çalışacak şekilde zamanladığında, URI şablonundaki YIL (2017) ve AY (01) değişkenlerini değiştirmek için 01/01/2017 kullanılır. Bu örnek için URI şablonu hesaplandıktan sonra, Oozie beklenen dizinin veya dosyanın kullanılabilir olup olmadığını denetler ve iş akışının bir sonraki çalıştırmasını buna göre zamanlar.

Yukarıdaki üç nokta, koordinatörün kaynak verilerin günlük olarak işlenmesini zamanladığı bir durum elde etmek için birleştirilir.

  • 1. Nokta: Koordinatör 2017-01-01 nominal tarihiyle başlar.

  • 2. Nokta: Oozie, içinde sourceDataFolder/2017-01-FlightData.csvbulunan verileri arar.

  • 3. Nokta: Oozie bu dosyayı bulduğunda, 1 Ocak 2017'ye ait verileri işleyecek iş akışının bir örneğini zamanlar. Oozie daha sonra 2017-01-02 için işlemeye devam eder. Bu değerlendirme 2017-01-05'e kadar yinelenmez ancak yinelenmez.

İş akışlarında olduğu gibi, bir koordinatörün yapılandırması, iş akışı tarafından kullanılan ayarların üst kümesine sahip bir dosyada job.properties tanımlanır.

nameNode=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net
jobTracker=[ACTIVERESOURCEMANAGER]:8050
queueName=default
oozie.use.system.libpath=true
appBase=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net/oozie
oozie.coord.application.path=${appBase}
sourceDataFolder=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net/example/data/flights/
hiveScriptLoadPartition=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net/oozie/load_flights_by_day/hive-load-flights-partition.hql
hiveScriptCreateDailyTable=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net/oozie/load_flights_by_day/hive-create-daily-summary-table.hql
hiveDailyTableNamePrefix=dailyflights
hiveDataFolderPrefix=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net/example/data/flights/day/
sqlDatabaseConnectionString="jdbc:sqlserver://[SERVERNAME].database.windows.net;user=[USERNAME];password=[PASSWORD];database=[DATABASENAME]"
sqlDatabaseTableName=dailyflights

Bu job.properties dosyada sunulan tek yeni özellikler şunlardır:

Özellik Değer kaynağı
oozie.coord.application.path Çalıştırılacak Oozie koordinatörünün bulunduğu dosyanın konumunu coordinator.xml gösterir.
hiveDailyTableNamePrefix Hazırlama tablosunun tablo adını dinamik olarak oluştururken kullanılan ön ek.
hiveDataFolderPrefix Tüm hazırlama tablolarının depolanacağı yolun ön eki.

Oozie Düzenleyicisi'ni dağıtma ve çalıştırma

İşlem hattını bir koordinatörle çalıştırmak için, iş akışınızın bulunduğu klasörün bir düzey üzerindeki bir klasörden çalışmanız dışında iş akışıyla benzer şekilde ilerleyin. Bu klasör kuralı, bir koordinatörü farklı alt iş akışlarıyla ilişkilendirebilmeniz için koordinatörleri disk üzerindeki iş akışlarından ayırır.

  1. Koordinatör dosyalarını kümenizin baş düğümünün yerel depolama alanına kopyalamak için yerel makinenizden SCP kullanın.

    scp ./* sshuser@CLUSTERNAME-ssh.azurehdinsight.net:~
    
  2. Baş düğümünüzün içine SSH.

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  3. Düzenleyici dosyalarını HDFS'ye kopyalayın.

    hdfs dfs -put ./* /oozie/
    
  4. Koordinatörü çalıştırın.

    oozie job -config job.properties -run
    
  5. Oozie Web Konsolu'nu kullanarak durumu doğrulayın, bu kez Koordinatör İşleri sekmesini ve ardından Tüm işler'i seçin.

    Oozie Web Console Coordinator Jobs.

  6. Zamanlanmış eylemlerin listesini görüntülemek için bir koordinatör örneği seçin. Bu durumda, 1 Ocak 2017 ile 4 Ocak 2017 arasında nominal süreleri olan dört eylem görmeniz gerekir.

    Oozie Web Console Coordinator Job.

    Bu listedeki her eylem, iş akışının bir günlük verileri işleyen bir örneğine karşılık gelir ve burada o günün başlangıcı nominal saatle gösterilir.

Sonraki adımlar

Apache Oozie Belgeleri