Megosztás a következőn keresztül:


Adatelemzési folyamat üzembe helyezése

Az adatfolyamok sok adatelemzési megoldást tartalmaznak. Ahogy a neve is sugallja, az adatfolyamatok szükség szerint nyers adatokat vesznek fel, tisztítanak és alakítanak át, majd általában számításokat vagy összesítéseket hajtanak végre a feldolgozott adatok tárolása előtt. A feldolgozott adatokat ügyfelek, jelentések vagy API-k használják fel. Az adatfolyamnak ismétlődő eredményeket kell biztosítania, akár ütemezés szerint, akár új adatok aktiválásakor.

Ez a cikk bemutatja, hogyan működtetheti az adatfolyamokat az megismételhetőség érdekében a HDInsight Hadoop-fürtökön futó Oozie használatával. A példaforgatókönyv végigvezeti egy olyan adatfolyamon, amely előkészíti és feldolgozza a légitársaságok repülési idősoradatait.

A következő forgatókönyvben a bemeneti adatok egy egy hónapra vonatkozó repülési adatokat tartalmazó, egy kötegből álló fájl. Ezek a repülési adatok olyan információkat tartalmaznak, mint a kiindulási és a cél repülőtér, a repült mérföldek, az indulási és érkezési időpontok stb. Ezzel a folyamattal az a cél, hogy összegezze a légitársaságok napi teljesítményét, ahol minden légitársaságnak van egy sora minden napra az átlagos indulási és érkezési késésekkel percek alatt, valamint az adott napon repült összes mérfölddel.

YEAR MONTH DAY_OF_MONTH FUVAROZÓ AVG_DEP_DELAY AVG_ARR_DELAY TOTAL_DISTANCE
2017 0 3 AA 10.142229 7.862926 2644539
2017 0 3 AS 9.435449 5.482143 572289
2017 0 3 DL 6.935409 -2.1893024 1909696

A példafolyamat megvárja, amíg egy új időszak repülési adatai meg nem érkeznek, majd ezeket a részletes repülési információkat az Apache Hive adattárházban tárolja hosszú távú elemzések céljából. A folyamat egy sokkal kisebb adatkészletet is létrehoz, amely csak a napi repülési adatokat foglalja össze. Ezeket a napi repülési összefoglaló adatokat a rendszer elküldi egy SQL Database-nek, hogy jelentéseket biztosítson, például egy webhelyhez.

Az alábbi ábra a példafolyamatot szemlélteti.

HDI flight example data pipeline overview.

Az Apache Oozie-megoldás áttekintése

Ez a folyamat egy HDInsight Hadoop-fürtön futó Apache Oozie-t használ.

Az Oozie a folyamatokat műveletek, munkafolyamatok és koordinátorok szerint írja le. A műveletek határozzák meg a ténylegesen végrehajtandó munkát, például hive-lekérdezések futtatását. A munkafolyamatok határozzák meg a műveletek sorrendjét. A koordinátorok határozzák meg a munkafolyamat futtatásának ütemezését. A koordinátorok az új adatok rendelkezésre állását is megvárhatják, mielőtt elindítanák a munkafolyamat egy példányát.

Az alábbi ábra a példa Oozie-folyamat magas szintű kialakítását mutatja be.

Oozie Flight example Data Pipeline.

Azure-erőforrások kiépítése

Ehhez a folyamathoz egy Azure SQL Database és egy HDInsight Hadoop-fürt szükséges ugyanazon a helyen. Az Azure SQL Database a folyamat által létrehozott összesítő adatokat és az Oozie Metadata Store-t is tárolja.

Azure SQL Database kiépítése

  1. Hozzon létre egy Azure SQL Database-adatbázist. Lásd: Azure SQL Database létrehozása az Azure Portalon.

  2. Annak érdekében, hogy a HDInsight-fürt hozzáférhessen a csatlakoztatott Azure SQL Database-hez, konfigurálja az Azure SQL Database tűzfalszabályait, hogy az Azure-szolgáltatások és erőforrások hozzáférhessenek a kiszolgálóhoz. Ezt a lehetőséget az Azure Portalon a Kiszolgálói tűzfal beállítása, majd az On (On) lehetőség kiválasztásával engedélyezheti az Azure-szolgáltatások és -erőforrások számára az Azure SQL Database-kiszolgáló eléréséhez. További információ: IP-tűzfalszabályok létrehozása és kezelése.

  3. A Lekérdezésszerkesztővel hajtsa végre a következő SQL-utasításokat a dailyflights folyamat egyes futtatásaiból származó összesített adatokat tároló tábla létrehozásához.

    CREATE TABLE dailyflights
    (
        YEAR INT,
        MONTH INT,
        DAY_OF_MONTH INT,
        CARRIER CHAR(2),
        AVG_DEP_DELAY FLOAT,
        AVG_ARR_DELAY FLOAT,
        TOTAL_DISTANCE FLOAT
    )
    GO
    
    CREATE CLUSTERED INDEX dailyflights_clustered_index on dailyflights(YEAR,MONTH,DAY_OF_MONTH,CARRIER)
    GO
    

Az Azure SQL Database készen áll.

Apache Hadoop-fürt kiépítése

Hozzon létre egy Apache Hadoop-fürtöt egy egyéni metaadattárral. A fürt portálról való létrehozása során, a Storage lapon győződjön meg arról, hogy a Metaadattár beállításai között válassza ki az SQL Database-t. További információ a metaadattár kiválasztásáról: Egyéni metaadattár kiválasztása a fürt létrehozásakor. A fürtlétrehozással kapcsolatos további információkért tekintse meg a HDInsight linuxos használatának első lépéseit.

SSH-bújtatás beállításának ellenőrzése

Ha az Oozie webkonzol használatával szeretné megtekinteni a koordinátor és a munkafolyamat-példányok állapotát, állítson be egy SSH-alagutat a HDInsight-fürthöz. További információ: SSH-alagút.

Feljegyzés

A Chrome és a Foxy Proxy bővítmény használatával is tallózhat a fürt webes erőforrásai között az SSH-alagútban. Konfigurálja úgy, hogy az összes kérést az alagút 9876-os portján lévő gazdagépen localhost keresztül proxyzhassa. Ez a módszer kompatibilis a Windows 10-en futó Bash néven is ismert Linuxos Windows-alrendszer.

  1. Futtassa a következő parancsot egy SSH-alagút fürthöz való megnyitásához, ahol CLUSTERNAME a fürt neve található:

    ssh -C2qTnNf -D 9876 sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  2. Az alagút működésének ellenőrzéséhez navigáljon a fejcsomóponton található Ambarihoz a következő tallózással:

    http://headnodehost:8080

  3. Az Oozie webkonzol Ambarin belüli eléréséhez lépjen az Oozie>gyorshivatkozások> [Aktív kiszolgáló] >Oozie webes felhasználói felületére.

A Hive konfigurálása

Adatok feltöltése

  1. Töltsön le egy példa CSV-fájlt, amely egy hónapig tartalmazza a repülési adatokat. Töltse le a ZIP-fájlját 2017-01-FlightData.zip a HDInsight GitHub-adattárból, és bontsa ki a CSV-fájlba 2017-01-FlightData.csv.

  2. Másolja ezt a CSV-fájlt a HDInsight-fürthöz csatolt Azure Storage-fiókba, és helyezze a /example/data/flights mappába.

    1. Az SCP használatával másolja a fájlokat a helyi gépről a HDInsight-fürtfejcsomópont helyi tárolójára.

      scp ./2017-01-FlightData.csv sshuser@CLUSTERNAME-ssh.azurehdinsight.net:2017-01-FlightData.csv
      
    2. Az ssh paranccsal csatlakozzon a fürthöz. Szerkessze az alábbi parancsot a fürt nevére cserélve CLUSTERNAME , majd írja be a parancsot:

      ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
      
    3. Az ssh-munkamenetben a HDFS paranccsal másolja a fájlt a központi csomópont helyi tárolójából az Azure Storage-ba.

      hadoop fs -mkdir /example/data/flights
      hdfs dfs -put ./2017-01-FlightData.csv /example/data/flights/2017-01-FlightData.csv
      

Táblák létrehozása

A mintaadatok már elérhetők. A folyamathoz azonban két Hive-táblára van szükség a feldolgozáshoz, egyet a bejövő adatokhoz (rawFlights) és egyet az összegzett adatokhoz (flights). Ezeket a táblákat az alábbiak szerint hozhatja létre az Ambariban.

  1. Jelentkezzen be az Ambariba a következőre navigálva http://headnodehost:8080: .

  2. A szolgáltatások listájában válassza a Hive lehetőséget.

    Apache Ambari services list selecting Hive.

  3. Válassza a Hive View 2.0 címke melletti Ugrás a nézetre lehetőséget.

    Ambari Apache Hive summary list.

  4. A lekérdezés szövegterületére illessze be a következő utasításokat a rawFlights tábla létrehozásához. A rawFlights táblázat az Azure Storage mappájában lévő /example/data/flights CSV-fájlokhoz biztosít egy sémaolvasásra alkalmas sémát.

    CREATE EXTERNAL TABLE IF NOT EXISTS rawflights (
        YEAR INT,
        MONTH INT,
        DAY_OF_MONTH INT,
        FL_DATE STRING,
        CARRIER STRING,
        FL_NUM STRING,
        ORIGIN STRING,
        DEST STRING,
        DEP_DELAY FLOAT,
        ARR_DELAY FLOAT,
        ACTUAL_ELAPSED_TIME FLOAT,
        DISTANCE FLOAT)
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
    WITH SERDEPROPERTIES
    (
        "separatorChar" = ",",
        "quoteChar"     = "\""
    )
    LOCATION '/example/data/flights'
    
  5. A tábla létrehozásához válassza a Végrehajtás lehetőséget.

    hdi ambari services hive query.

  6. A flights táblázat létrehozásához cserélje le a lekérdezés szövegterületének szövegét az alábbi utasításokra. A flights tábla egy Hive által felügyelt tábla, amely év, hónap és hónap szerint particionálja a bele betöltött adatokat. Ez a táblázat tartalmazza az összes előzmény repülési adatot, és a legalacsonyabb részletesség szerepel a járatonként egy sor forrásadataiban.

    SET hive.exec.dynamic.partition.mode=nonstrict;
    
    CREATE TABLE flights
    (
        FL_DATE STRING,
        CARRIER STRING,
        FL_NUM STRING,
        ORIGIN STRING,
        DEST STRING,
        DEP_DELAY FLOAT,
        ARR_DELAY FLOAT,
        ACTUAL_ELAPSED_TIME FLOAT,
        DISTANCE FLOAT
    )
    PARTITIONED BY (YEAR INT, MONTH INT, DAY_OF_MONTH INT)
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
    WITH SERDEPROPERTIES 
    (
        "separatorChar" = ",",
        "quoteChar"     = "\""
    );
    
  7. A tábla létrehozásához válassza a Végrehajtás lehetőséget.

Az Oozie-munkafolyamat létrehozása

A folyamatok általában egy adott időintervallum szerint dolgozzák fel az adatokat kötegekben. Ebben az esetben a folyamat naponta dolgozza fel a repülési adatokat. Ez a megközelítés lehetővé teszi, hogy a bemeneti CSV-fájlok naponta, hetente, havonta vagy évente érkezzenek.

A minta munkafolyamat három fő lépésben dolgozza fel a repülési adatokat nap mint nap:

  1. Futtasson egy Hive-lekérdezést az adott nap dátumtartományának adatainak kinyeréséhez a tábla által rawFlights képviselt forrás CSV-fájlból, és szúrja be az adatokat a flights táblába.
  2. Hive-lekérdezés futtatásával dinamikusan létrehozhat egy átmeneti táblát a Hive-ben a napra vonatkozóan, amely tartalmazza a repülési adatok nap és szolgáltató szerint összegzett másolatát.
  3. Az Apache Sqoop használatával másolja az összes adatot a Hive napi átmeneti táblájából az Azure SQL Database céltáblájára dailyflights . A Sqoop beolvassa a forrássorokat az Azure Storage-ban található Hive-tábla mögötti adatokból, és JDBC-kapcsolat használatával betölti őket az SQL Database-be.

Ezt a három lépést egy Oozie-munkafolyamat koordinálja.

  1. A helyi munkaállomásról hozzon létre egy .job.properties Használja az alábbi szöveget a fájl kezdő tartalmaként. Ezután frissítse az adott környezet értékeit. A szöveg alatti táblázat összefoglalja az egyes tulajdonságokat, és jelzi, hogy hol találhatók a saját környezet értékei.

    nameNode=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net
    jobTracker=[ACTIVERESOURCEMANAGER]:8050
    queueName=default
    oozie.use.system.libpath=true
    appBase=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net/oozie
    oozie.wf.application.path=${appBase}/load_flights_by_day
    hiveScriptLoadPartition=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net/oozie/load_flights_by_day/hive-load-flights-partition.hql
    hiveScriptCreateDailyTable=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net/oozie/load_flights_by_day/hive-create-daily-summary-table.hql
    hiveDailyTableName=dailyflights${year}${month}${day}
    hiveDataFolder=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net/example/data/flights/day/${year}/${month}/${day}
    sqlDatabaseConnectionString="jdbc:sqlserver://[SERVERNAME].database.windows.net;user=[USERNAME];password=[PASSWORD];database=[DATABASENAME]"
    sqlDatabaseTableName=dailyflights
    year=2017
    month=01
    day=03
    
    Tulajdonság Értékforrás
    nameNode A HDInsight-fürthöz csatolt Azure Storage-tároló teljes elérési útja.
    jobTracker Az aktív fürt YARN-főcsomópontjának belső állomásneve. Az Ambari kezdőlapján válassza a YARN elemet a szolgáltatások listájából, majd válassza az Active Resource Manager lehetőséget. A gazdagépnév URI a lap tetején jelenik meg. Fűzze hozzá a 8050-ös portot.
    queueName A Hive-műveletek ütemezéséhez használt YARN-üzenetsor neve. Hagyja meg az alapértelmezett beállítást.
    oozie.use.system.libpath Hagyja igazként.
    appBase Az Azure Storage almappájának elérési útja, ahol üzembe helyezi az Oozie-munkafolyamatot és a támogató fájlokat.
    oozie.wf.application.path A futtatandó Oozie-munkafolyamat workflow.xml helye.
    hiveScriptLoadPartition Az Azure Storage elérési útja a Hive-lekérdezésfájlhoz hive-load-flights-partition.hql.
    hiveScriptCreateDailyTable Az Azure Storage elérési útja a Hive-lekérdezésfájlhoz hive-create-daily-summary-table.hql.
    hiveDailyTableName Az előkészítési táblához használandó dinamikusan létrehozott név.
    hiveDataFolder Az Azure Storage elérési útja az előkészítési tábla által tárolt adatokhoz.
    sqlDatabase Csatlakozás ionString A JDBC szintaxisa kapcsolati sztring az Azure SQL Database-hez.
    sqlDatabaseTableName Annak a táblának a neve az Azure SQL Database-ben, amelybe összegző sorokat szúr be. Hagyja meg a dailyflights.
    Év Annak a napnak az évösszetevője, amelyre a repülési összegzések ki vannak számítva. Hagyja változatlanul.
    hónap Annak a napnak a hónapösszetevője, amelyre a repülési összegzések ki vannak számítva. Hagyja változatlanul.
    Nap Annak a napnak a hónapösszetevője, amelyre a repülési összegzések ki vannak számítva. Hagyja változatlanul.
  2. A helyi munkaállomásról hozzon létre egy .hive-load-flights-partition.hql Használja az alábbi kódot a fájl tartalmaként.

    SET hive.exec.dynamic.partition.mode=nonstrict;
    
    INSERT OVERWRITE TABLE flights
    PARTITION (YEAR, MONTH, DAY_OF_MONTH)
    SELECT 
          FL_DATE,
          CARRIER,
          FL_NUM,
          ORIGIN,
          DEST,
          DEP_DELAY,
          ARR_DELAY,
          ACTUAL_ELAPSED_TIME,
          DISTANCE,
        YEAR,
          MONTH,
          DAY_OF_MONTH
    FROM rawflights
    WHERE year = ${year} AND month = ${month} AND day_of_month = ${day};
    

    Az Oozie változók a szintaxist ${variableName}használják. Ezek a változók a job.properties fájlban vannak beállítva. Az Oozie futásidőben helyettesíti a tényleges értékeket.

  3. A helyi munkaállomásról hozzon létre egy .hive-create-daily-summary-table.hql Használja az alábbi kódot a fájl tartalmaként.

    DROP TABLE ${hiveTableName};
    CREATE EXTERNAL TABLE ${hiveTableName}
    (
        YEAR INT,
          MONTH INT,
          DAY_OF_MONTH INT,
          CARRIER STRING,
          AVG_DEP_DELAY FLOAT,
          AVG_ARR_DELAY FLOAT,
          TOTAL_DISTANCE FLOAT
    )
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY '\t' STORED AS TEXTFILE LOCATION '${hiveDataFolder}';
    INSERT OVERWRITE TABLE ${hiveTableName}
    SELECT     year, month, day_of_month, carrier, avg(dep_delay) avg_dep_delay, 
            avg(arr_delay) avg_arr_delay, sum(distance) total_distance 
    FROM flights
    GROUP BY year, month, day_of_month, carrier 
    HAVING year = ${year} AND month = ${month} AND day_of_month = ${day};
    

    Ez a lekérdezés létrehoz egy átmeneti táblát, amely csak egy napig tárolja az összesített adatokat. Jegyezze fel a Standard kiadás LECT utasítást, amely kiszámítja az átlagos késéseket és a szállítók által naponta megtett távolságok összegét. A táblába beszúrt adatok egy ismert helyen (a hiveDataFolder változó által jelzett útvonalon) tároltak, hogy a következő lépésben a Sqoop forrásaként lehessen használni őket.

  4. A helyi munkaállomásról hozzon létre egy .workflow.xml Használja az alábbi kódot a fájl tartalmaként. A fenti lépések az Oozie-munkafolyamat-fájlban külön műveletekként vannak kifejezve.

    <workflow-app name="loadflightstable" xmlns="uri:oozie:workflow:0.5">
        <start to = "RunHiveLoadFlightsScript"/>
        <action name="RunHiveLoadFlightsScript">
            <hive xmlns="uri:oozie:hive-action:0.2">
                <job-tracker>${jobTracker}</job-tracker>
                <name-node>${nameNode}</name-node>
                <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
                </configuration>
                <script>${hiveScriptLoadPartition}</script>
                <param>year=${year}</param>
                <param>month=${month}</param>
                <param>day=${day}</param>
            </hive>
            <ok to="RunHiveCreateDailyFlightTableScript"/>
            <error to="fail"/>
        </action>
    
        <action name="RunHiveCreateDailyFlightTableScript">
            <hive xmlns="uri:oozie:hive-action:0.2">
                <job-tracker>${jobTracker}</job-tracker>
                <name-node>${nameNode}</name-node>
                <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
                </configuration>
                <script>${hiveScriptCreateDailyTable}</script>
                <param>hiveTableName=${hiveDailyTableName}</param>
                <param>year=${year}</param>
                <param>month=${month}</param>
                <param>day=${day}</param>
                <param>hiveDataFolder=${hiveDataFolder}/${year}/${month}/${day}</param>
            </hive>
            <ok to="RunSqoopExport"/>
            <error to="fail"/>
        </action>
    
        <action name="RunSqoopExport">
            <sqoop xmlns="uri:oozie:sqoop-action:0.2">
                <job-tracker>${jobTracker}</job-tracker>
                <name-node>${nameNode}</name-node>
                <configuration>
                <property>
                    <name>mapred.compress.map.output</name>
                    <value>true</value>
                </property>
                </configuration>
                <arg>export</arg>
                <arg>--connect</arg>
                <arg>${sqlDatabaseConnectionString}</arg>
                <arg>--table</arg>
                <arg>${sqlDatabaseTableName}</arg>
                <arg>--export-dir</arg>
                <arg>${hiveDataFolder}/${year}/${month}/${day}</arg>
                <arg>-m</arg>
                <arg>1</arg>
                <arg>--input-fields-terminated-by</arg>
                <arg>"\t"</arg>
                <archive>mssql-jdbc-7.0.0.jre8.jar</archive>
                </sqoop>
            <ok to="end"/>
            <error to="fail"/>
        </action>
        <kill name="fail">
            <message>Job failed, error message[${wf:errorMessage(wf:lastErrorNode())}] </message>
        </kill>
        <end name="end"/>
    </workflow-app>
    

A két Hive-lekérdezést az elérési út az Azure Storage-ban éri el, a fennmaradó változó értékeket pedig a job.properties fájl adja meg. Ez a fájl úgy konfigurálja a munkafolyamatot, hogy a 2017. január 3-i dátumra fusson.

Az Oozie-munkafolyamat üzembe helyezése és futtatása

Használja a bash-munkamenet SCP-jét az Oozie-munkafolyamat (workflow.xml), a Hive-lekérdezések (hive-load-flights-partition.hql és ) és hive-create-daily-summary-table.hqla feladatkonfiguráció (job.properties) üzembe helyezéséhez. Oozie-ban csak a job.properties fájl létezhet a fejcsomópont helyi tárolójában. Minden más fájlt HDFS-ben, ebben az esetben az Azure Storage-ban kell tárolni. A munkafolyamat által használt Sqoop-művelet egy JDBC-illesztőtől függ az SQL Database-vel való kommunikációhoz, amelyet át kell másolni a fő csomópontról a HDFS-be.

  1. Hozza létre az load_flights_by_day almappát a felhasználó elérési útja alatt a fő csomópont helyi tárolójában. A nyitott ssh-munkamenetben hajtsa végre a következő parancsot:

    mkdir load_flights_by_day
    
  2. Másolja az aktuális könyvtárban lévő összes fájlt (és workflow.xml job.properties fájlokat) az load_flights_by_day almappába. A helyi munkaállomáson hajtsa végre a következő parancsot:

    scp ./* sshuser@CLUSTERNAME-ssh.azurehdinsight.net:load_flights_by_day
    
  3. Munkafolyamat-fájlok másolása a HDFS-be. A nyitott ssh-munkamenetben hajtsa végre a következő parancsokat:

    cd load_flights_by_day
    hadoop fs -mkdir -p /oozie/load_flights_by_day
    hdfs dfs -put ./* /oozie/load_flights_by_day
    
  4. Másolja mssql-jdbc-7.0.0.jre8.jar a helyi fő csomópontról a HDFS munkafolyamat-mappájába. Szükség szerint módosítsa a parancsot, ha a fürt más jar-fájlt tartalmaz. Szükség szerint módosítsa workflow.xml , hogy egy másik jar-fájlt tükrözze. A nyitott ssh-munkamenetben hajtsa végre a következő parancsot:

    hdfs dfs -put /usr/share/java/sqljdbc_7.0/enu/mssql-jdbc*.jar /oozie/load_flights_by_day
    
  5. Futtassa a munkafolyamatot. A nyitott ssh-munkamenetben hajtsa végre a következő parancsot:

    oozie job -config job.properties -run
    
  6. Figyelje meg az állapotot az Oozie webkonzol használatával. Az Ambariban válassza az Oozie, a Gyorshivatkozások, majd az Oozie webkonzol lehetőséget. A Munkafolyamat-feladatok lapon válassza a Minden feladat lehetőséget.

    hdi oozie web console workflows.

  7. Ha az állapot SUCC Enterprise kiadás DED, a beszúrt sorok megtekintéséhez kérdezze le az SQL Database táblát. Az Azure Portalon lépjen az SQL Database paneljére, válassza az Eszközök lehetőséget, és nyissa meg a Lekérdezésszerkesztő.

    SELECT * FROM dailyflights
    

Most, hogy a munkafolyamat egyetlen tesztnapon fut, ezt a munkafolyamatot egy koordinátorral burkolhatja, amely ütemezi a munkafolyamatot, hogy naponta fusson.

A munkafolyamat futtatása koordinátorral

Ha úgy szeretné ütemezni ezt a munkafolyamatot, hogy naponta (vagy egy dátumtartományban minden nap) fusson, használhat egy koordinátort. A koordinátort egy XML-fájl határozza meg, például coordinator.xml:

<coordinator-app name="daily_export" start="2017-01-01T00:00Z" end="2017-01-05T00:00Z" frequency="${coord:days(1)}" timezone="UTC" xmlns="uri:oozie:coordinator:0.4">
    <datasets>
        <dataset name="ds_input1" frequency="${coord:days(1)}" initial-instance="2016-12-31T00:00Z" timezone="UTC">
            <uri-template>${sourceDataFolder}${YEAR}-${MONTH}-FlightData.csv</uri-template>
            <done-flag></done-flag>
        </dataset>
    </datasets>
    <input-events>
        <data-in name="event_input1" dataset="ds_input1">
            <instance>${coord:current(0)}</instance>
        </data-in>
    </input-events>
    <action>
        <workflow>
            <app-path>${appBase}/load_flights_by_day</app-path>
            <configuration>
                <property>
                    <name>year</name>
                    <value>${coord:formatTime(coord:nominalTime(), 'yyyy')}</value>
                </property>
                <property>
                    <name>month</name>
                    <value>${coord:formatTime(coord:nominalTime(), 'MM')}</value>
                </property>
                <property>
                    <name>day</name>
                    <value>${coord:formatTime(coord:nominalTime(), 'dd')}</value>
                </property>
                <property>
                    <name>hiveScriptLoadPartition</name>
                    <value>${hiveScriptLoadPartition}</value>
                </property>
                <property>
                    <name>hiveScriptCreateDailyTable</name>
                    <value>${hiveScriptCreateDailyTable}</value>
                </property>
                <property>
                    <name>hiveDailyTableNamePrefix</name>
                    <value>${hiveDailyTableNamePrefix}</value>
                </property>
                <property>
                    <name>hiveDailyTableName</name>
                    <value>${hiveDailyTableNamePrefix}${coord:formatTime(coord:nominalTime(), 'yyyy')}${coord:formatTime(coord:nominalTime(), 'MM')}${coord:formatTime(coord:nominalTime(), 'dd')}</value>
                </property>
                <property>
                    <name>hiveDataFolderPrefix</name>
                    <value>${hiveDataFolderPrefix}</value>
                </property>
                <property>
                    <name>hiveDataFolder</name>
                    <value>${hiveDataFolderPrefix}${coord:formatTime(coord:nominalTime(), 'yyyy')}/${coord:formatTime(coord:nominalTime(), 'MM')}/${coord:formatTime(coord:nominalTime(), 'dd')}</value>
                </property>
                <property>
                    <name>sqlDatabaseConnectionString</name>
                    <value>${sqlDatabaseConnectionString}</value>
                </property>
                <property>
                    <name>sqlDatabaseTableName</name>
                    <value>${sqlDatabaseTableName}</value>
                </property>
            </configuration>
        </workflow>
    </action>
</coordinator-app>

Mint látható, a koordinátor többsége csak a konfigurációs információkat továbbítja a munkafolyamat-példánynak. Van azonban néhány fontos elem, amelyeket fel kell hívni.

  • 1. pont: Maga start az elem és end az coordinator-app attribútumok vezérlik a koordinátor által futtatott időintervallumot.

    <coordinator-app ... start="2017-01-01T00:00Z" end="2017-01-05T00:00Z" frequency="${coord:days(1)}" ...>
    

    A koordinátor feladata a műveletek ütemezése a start end dátumtartományon belül, az frequency attribútum által megadott intervallumnak megfelelően. Minden ütemezett művelet a konfigurált módon futtatja a munkafolyamatot. A fenti koordinátordefinícióban a koordinátor úgy van konfigurálva, hogy 2017. január 1-től 2017. január 5-ig futtassa a műveleteket. A gyakoriságot az Oozie kifejezés nyelvének gyakorisági kifejezése ${coord:days(1)}egy napra állítja be. Ez azt eredményezi, hogy a koordinátor naponta egyszer ütemez egy műveletet (és így a munkafolyamatot). A korábbi dátumtartományok esetében, mint ebben a példában, a művelet ütemezése késedelem nélkül fog futni. A művelet futtatásának kezdőnapját névleges időnek nevezzük. Ha például 2017. január 1-jével szeretné feldolgozni az adatokat, a koordinátor 2017.01.01. 01.00:00 GMT névleges időpontban ütemezi a műveletet.

  • 2. pont: A munkafolyamat dátumtartományán belül az dataset elem meghatározza, hogy hol kell keresni a HDFS-ben egy adott dátumtartomány adataihoz, és konfigurálja, hogy az Oozie hogyan határozza meg, hogy az adatok még rendelkezésre állnak-e feldolgozásra.

    <dataset name="ds_input1" frequency="${coord:days(1)}" initial-instance="2016-12-31T00:00Z" timezone="UTC">
        <uri-template>${sourceDataFolder}${YEAR}-${MONTH}-FlightData.csv</uri-template>
        <done-flag></done-flag>
    </dataset>
    

    A HDFS-ben az adatok elérési útja dinamikusan épül fel az elemben uri-template megadott kifejezésnek megfelelően. Ebben a koordinátorban az adatkészlettel egynapos gyakoriságot is használnak. Míg a koordinátori elem kezdő és záró dátumai szabályozzák a műveletek ütemezését (és meghatározza azok névleges időpontját), az initial-instance adatkészleten és frequency az adatkészleten az összeállítás uri-templatesorán használt dátum számítását is szabályozza. Ebben az esetben állítsa be a kezdeti példányt egy nappal a koordinátor kezdete előtt, hogy biztosan az első nap (2017. január 1.) értékét vegye fel. Az adathalmaz dátumszámítása a (2016.12.31.) értékről initial-instance halad előre az adathalmaz gyakoriságának növekményeiben (egy nap), amíg meg nem találja azt a legutóbbi dátumot, amely nem felel meg a koordinátor által beállított névleges időnek (2017-01-01T00:00:00:00 GMT az első művelethez).

    Az üres done-flag elem azt jelzi, hogy amikor az Oozie a kijelölt időpontban ellenőrzi a bemeneti adatok jelenlétét, az Oozie egy könyvtár vagy fájl jelenlétével határozza meg az adatokat. Ebben az esetben egy csv-fájl jelenik meg. Ha csv-fájl van jelen, az Oozie feltételezi, hogy az adatok készen állnak, és elindít egy munkafolyamat-példányt a fájl feldolgozásához. Ha nincs csv-fájl, az Oozie feltételezi, hogy az adatok még nem állnak készen, és a munkafolyamat futtatása várakozási állapotba kerül.

  • 3. pont: Az data-in elem megadja azt az időbélyeget, amelyet névleges időként kell használni a társított adathalmaz értékeinek uri-template cseréjekor.

    <data-in name="event_input1" dataset="ds_input1">
        <instance>${coord:current(0)}</instance>
    </data-in>
    

    Ebben az esetben állítsa a példányt a kifejezésre ${coord:current(0)}, amely a műveletnek a koordinátor által eredetileg ütemezett névleges idejét használja. Más szóval, amikor a koordinátor 2017.01.01-i névleges idő szerint ütemezi a műveletet, akkor 2017. 01. 01. az URI-sablon ÉV (2017) és MONTH (01) változóinak helyére kerül. Miután kiszámította az URI-sablont ehhez a példányhoz, az Oozie ellenőrzi, hogy elérhető-e a várt könyvtár vagy fájl, és ennek megfelelően ütemezi a munkafolyamat következő futtatását.

Az előző három pont egy olyan helyzetet eredményez, amelyben a koordinátor napról napra ütemezi a forrásadatok feldolgozását.

  • 1. pont: A koordinátor a 2017-01-01 névleges dátummal kezdődik.

  • 2. pont: Oozie a rendelkezésre álló adatokat keresi.sourceDataFolder/2017-01-FlightData.csv

  • 3. pont: Amikor Oozie megtalálja a fájlt, ütemezi a munkafolyamat egy példányát, amely feldolgozza az adatokat 2017. január 1-jei időpontra. Oozie ezután folytatja a feldolgozást 2017.01.02. Ez az értékelés legfeljebb 2017-01-05-ig ismétlődik, de nem.

A munkafolyamatokhoz hasonlóan a koordinátor konfigurációja is egy job.properties fájlban van definiálva, amely a munkafolyamat által használt beállítások szuperhalmazával rendelkezik.

nameNode=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net
jobTracker=[ACTIVERESOURCEMANAGER]:8050
queueName=default
oozie.use.system.libpath=true
appBase=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net/oozie
oozie.coord.application.path=${appBase}
sourceDataFolder=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net/example/data/flights/
hiveScriptLoadPartition=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net/oozie/load_flights_by_day/hive-load-flights-partition.hql
hiveScriptCreateDailyTable=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net/oozie/load_flights_by_day/hive-create-daily-summary-table.hql
hiveDailyTableNamePrefix=dailyflights
hiveDataFolderPrefix=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net/example/data/flights/day/
sqlDatabaseConnectionString="jdbc:sqlserver://[SERVERNAME].database.windows.net;user=[USERNAME];password=[PASSWORD];database=[DATABASENAME]"
sqlDatabaseTableName=dailyflights

A fájlban job.properties csak a következő új tulajdonságok jelennek meg:

Tulajdonság Értékforrás
oozie.coord.application.path A futtatandó Oozie-koordinátort tartalmazó fájl helyét coordinator.xml jelzi.
hiveDailyTableNamePrefix Az előkészítési tábla táblanevének dinamikus létrehozásakor használt előtag.
hiveDataFolderPrefix Annak az elérési útnak az előtagja, amelyben az összes előkészítési tábla tárolása történik.

Az Oozie-koordinátor üzembe helyezése és futtatása

A folyamat koordinátorral való futtatásához a munkafolyamathoz hasonlóan kell haladnia, kivéve, ha egy olyan mappából dolgozik, amely egy szinttel a munkafolyamatot tartalmazó mappa felett található. Ez a mappakonvenció elkülöníti a koordinátorokat a lemez munkafolyamataitól, így egy koordinátort társíthat különböző gyermek-munkafolyamatokhoz.

  1. A helyi gépről származó SCP használatával másolja a koordinátor fájljait a fürt fő csomópontjának helyi tárolójához.

    scp ./* sshuser@CLUSTERNAME-ssh.azurehdinsight.net:~
    
  2. SSH a fő csomópontba.

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  3. Másolja a koordinátor fájljait a HDFS-be.

    hdfs dfs -put ./* /oozie/
    
  4. Futtassa a koordinátort.

    oozie job -config job.properties -run
    
  5. Ellenőrizze az állapotot az Oozie webkonzol használatával, ezúttal válassza a Koordinátori feladatok lapot, majd a Minden feladatot.

    Oozie Web Console Coordinator Jobs.

  6. Válasszon ki egy koordinátorpéldányt az ütemezett műveletek listájának megjelenítéséhez. Ebben az esetben négy olyan műveletet kell látnia, amelyek névleges ideje 2017. január 1-től 2017. január 4-ig tart.

    Oozie Web Console Coordinator Job.

    A listában szereplő műveletek a munkafolyamat egy olyan példányának felelnek meg, amely egy napnyi adatot dolgoz fel, ahol az adott nap kezdetét a névleges idő jelzi.

Következő lépések

Az Apache Oozie dokumentációja