Adatelemzési folyamat üzembe helyezése
Az adatfolyamok sok adatelemzési megoldást tartalmaznak. Ahogy a neve is sugallja, az adatfolyamatok szükség szerint nyers adatokat vesznek fel, tisztítanak és alakítanak át, majd általában számításokat vagy összesítéseket hajtanak végre a feldolgozott adatok tárolása előtt. A feldolgozott adatokat ügyfelek, jelentések vagy API-k használják fel. Az adatfolyamnak ismétlődő eredményeket kell biztosítania, akár ütemezés szerint, akár új adatok aktiválásakor.
Ez a cikk bemutatja, hogyan működtetheti az adatfolyamokat az megismételhetőség érdekében a HDInsight Hadoop-fürtökön futó Oozie használatával. A példaforgatókönyv végigvezeti egy olyan adatfolyamon, amely előkészíti és feldolgozza a légitársaságok repülési idősoradatait.
A következő forgatókönyvben a bemeneti adatok egy egy hónapra vonatkozó repülési adatokat tartalmazó, egy kötegből álló fájl. Ezek a repülési adatok olyan információkat tartalmaznak, mint a kiindulási és a cél repülőtér, a repült mérföldek, az indulási és érkezési időpontok stb. Ezzel a folyamattal az a cél, hogy összegezze a légitársaságok napi teljesítményét, ahol minden légitársaságnak van egy sora minden napra az átlagos indulási és érkezési késésekkel percek alatt, valamint az adott napon repült összes mérfölddel.
YEAR | MONTH | DAY_OF_MONTH | FUVAROZÓ | AVG_DEP_DELAY | AVG_ARR_DELAY | TOTAL_DISTANCE |
---|---|---|---|---|---|---|
2017 | 0 | 3 | AA | 10.142229 | 7.862926 | 2644539 |
2017 | 0 | 3 | AS | 9.435449 | 5.482143 | 572289 |
2017 | 0 | 3 | DL | 6.935409 | -2.1893024 | 1909696 |
A példafolyamat megvárja, amíg egy új időszak repülési adatai meg nem érkeznek, majd ezeket a részletes repülési információkat az Apache Hive adattárházban tárolja hosszú távú elemzések céljából. A folyamat egy sokkal kisebb adatkészletet is létrehoz, amely csak a napi repülési adatokat foglalja össze. Ezeket a napi repülési összefoglaló adatokat a rendszer elküldi egy SQL Database-nek, hogy jelentéseket biztosítson, például egy webhelyhez.
Az alábbi ábra a példafolyamatot szemlélteti.
Az Apache Oozie-megoldás áttekintése
Ez a folyamat egy HDInsight Hadoop-fürtön futó Apache Oozie-t használ.
Az Oozie a folyamatokat műveletek, munkafolyamatok és koordinátorok szerint írja le. A műveletek határozzák meg a ténylegesen végrehajtandó munkát, például hive-lekérdezések futtatását. A munkafolyamatok határozzák meg a műveletek sorrendjét. A koordinátorok határozzák meg a munkafolyamat futtatásának ütemezését. A koordinátorok az új adatok rendelkezésre állását is megvárhatják, mielőtt elindítanák a munkafolyamat egy példányát.
Az alábbi ábra a példa Oozie-folyamat magas szintű kialakítását mutatja be.
Azure-erőforrások kiépítése
Ehhez a folyamathoz egy Azure SQL Database és egy HDInsight Hadoop-fürt szükséges ugyanazon a helyen. Az Azure SQL Database a folyamat által létrehozott összesítő adatokat és az Oozie Metadata Store-t is tárolja.
Azure SQL Database kiépítése
Hozzon létre egy Azure SQL Database-adatbázist. Lásd: Azure SQL Database létrehozása az Azure Portalon.
Annak érdekében, hogy a HDInsight-fürt hozzáférhessen a csatlakoztatott Azure SQL Database-hez, konfigurálja az Azure SQL Database tűzfalszabályait, hogy az Azure-szolgáltatások és erőforrások hozzáférhessenek a kiszolgálóhoz. Ezt a lehetőséget az Azure Portalon a Kiszolgálói tűzfal beállítása, majd az On (On) lehetőség kiválasztásával engedélyezheti az Azure-szolgáltatások és -erőforrások számára az Azure SQL Database-kiszolgáló eléréséhez. További információ: IP-tűzfalszabályok létrehozása és kezelése.
A Lekérdezésszerkesztővel hajtsa végre a következő SQL-utasításokat a
dailyflights
folyamat egyes futtatásaiból származó összesített adatokat tároló tábla létrehozásához.CREATE TABLE dailyflights ( YEAR INT, MONTH INT, DAY_OF_MONTH INT, CARRIER CHAR(2), AVG_DEP_DELAY FLOAT, AVG_ARR_DELAY FLOAT, TOTAL_DISTANCE FLOAT ) GO CREATE CLUSTERED INDEX dailyflights_clustered_index on dailyflights(YEAR,MONTH,DAY_OF_MONTH,CARRIER) GO
Az Azure SQL Database készen áll.
Apache Hadoop-fürt kiépítése
Hozzon létre egy Apache Hadoop-fürtöt egy egyéni metaadattárral. A fürt portálról való létrehozása során, a Storage lapon győződjön meg arról, hogy a Metaadattár beállításai között válassza ki az SQL Database-t. További információ a metaadattár kiválasztásáról: Egyéni metaadattár kiválasztása a fürt létrehozásakor. A fürtlétrehozással kapcsolatos további információkért tekintse meg a HDInsight linuxos használatának első lépéseit.
SSH-bújtatás beállításának ellenőrzése
Ha az Oozie webkonzol használatával szeretné megtekinteni a koordinátor és a munkafolyamat-példányok állapotát, állítson be egy SSH-alagutat a HDInsight-fürthöz. További információ: SSH-alagút.
Feljegyzés
A Chrome és a Foxy Proxy bővítmény használatával is tallózhat a fürt webes erőforrásai között az SSH-alagútban. Konfigurálja úgy, hogy az összes kérést az alagút 9876-os portján lévő gazdagépen localhost
keresztül proxyzhassa. Ez a módszer kompatibilis a Windows 10-en futó Bash néven is ismert Linuxos Windows-alrendszer.
Futtassa a következő parancsot egy SSH-alagút fürthöz való megnyitásához, ahol
CLUSTERNAME
a fürt neve található:ssh -C2qTnNf -D 9876 sshuser@CLUSTERNAME-ssh.azurehdinsight.net
Az alagút működésének ellenőrzéséhez navigáljon a fejcsomóponton található Ambarihoz a következő tallózással:
http://headnodehost:8080
Az Oozie webkonzol Ambarin belüli eléréséhez lépjen az Oozie>gyorshivatkozások> [Aktív kiszolgáló] >Oozie webes felhasználói felületére.
A Hive konfigurálása
Adatok feltöltése
Töltsön le egy példa CSV-fájlt, amely egy hónapig tartalmazza a repülési adatokat. Töltse le a ZIP-fájlját
2017-01-FlightData.zip
a HDInsight GitHub-adattárból, és bontsa ki a CSV-fájlba2017-01-FlightData.csv
.Másolja ezt a CSV-fájlt a HDInsight-fürthöz csatolt Azure Storage-fiókba, és helyezze a
/example/data/flights
mappába.Az SCP használatával másolja a fájlokat a helyi gépről a HDInsight-fürtfejcsomópont helyi tárolójára.
scp ./2017-01-FlightData.csv sshuser@CLUSTERNAME-ssh.azurehdinsight.net:2017-01-FlightData.csv
Az ssh paranccsal csatlakozzon a fürthöz. Szerkessze az alábbi parancsot a fürt nevére cserélve
CLUSTERNAME
, majd írja be a parancsot:ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
Az ssh-munkamenetben a HDFS paranccsal másolja a fájlt a központi csomópont helyi tárolójából az Azure Storage-ba.
hadoop fs -mkdir /example/data/flights hdfs dfs -put ./2017-01-FlightData.csv /example/data/flights/2017-01-FlightData.csv
Táblák létrehozása
A mintaadatok már elérhetők. A folyamathoz azonban két Hive-táblára van szükség a feldolgozáshoz, egyet a bejövő adatokhoz (rawFlights
) és egyet az összegzett adatokhoz (flights
). Ezeket a táblákat az alábbiak szerint hozhatja létre az Ambariban.
Jelentkezzen be az Ambariba a következőre navigálva
http://headnodehost:8080
: .A szolgáltatások listájában válassza a Hive lehetőséget.
Válassza a Hive View 2.0 címke melletti Ugrás a nézetre lehetőséget.
A lekérdezés szövegterületére illessze be a következő utasításokat a
rawFlights
tábla létrehozásához. ArawFlights
táblázat az Azure Storage mappájában lévő/example/data/flights
CSV-fájlokhoz biztosít egy sémaolvasásra alkalmas sémát.CREATE EXTERNAL TABLE IF NOT EXISTS rawflights ( YEAR INT, MONTH INT, DAY_OF_MONTH INT, FL_DATE STRING, CARRIER STRING, FL_NUM STRING, ORIGIN STRING, DEST STRING, DEP_DELAY FLOAT, ARR_DELAY FLOAT, ACTUAL_ELAPSED_TIME FLOAT, DISTANCE FLOAT) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' WITH SERDEPROPERTIES ( "separatorChar" = ",", "quoteChar" = "\"" ) LOCATION '/example/data/flights'
A tábla létrehozásához válassza a Végrehajtás lehetőséget.
A
flights
táblázat létrehozásához cserélje le a lekérdezés szövegterületének szövegét az alábbi utasításokra. Aflights
tábla egy Hive által felügyelt tábla, amely év, hónap és hónap szerint particionálja a bele betöltött adatokat. Ez a táblázat tartalmazza az összes előzmény repülési adatot, és a legalacsonyabb részletesség szerepel a járatonként egy sor forrásadataiban.SET hive.exec.dynamic.partition.mode=nonstrict; CREATE TABLE flights ( FL_DATE STRING, CARRIER STRING, FL_NUM STRING, ORIGIN STRING, DEST STRING, DEP_DELAY FLOAT, ARR_DELAY FLOAT, ACTUAL_ELAPSED_TIME FLOAT, DISTANCE FLOAT ) PARTITIONED BY (YEAR INT, MONTH INT, DAY_OF_MONTH INT) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' WITH SERDEPROPERTIES ( "separatorChar" = ",", "quoteChar" = "\"" );
A tábla létrehozásához válassza a Végrehajtás lehetőséget.
Az Oozie-munkafolyamat létrehozása
A folyamatok általában egy adott időintervallum szerint dolgozzák fel az adatokat kötegekben. Ebben az esetben a folyamat naponta dolgozza fel a repülési adatokat. Ez a megközelítés lehetővé teszi, hogy a bemeneti CSV-fájlok naponta, hetente, havonta vagy évente érkezzenek.
A minta munkafolyamat három fő lépésben dolgozza fel a repülési adatokat nap mint nap:
- Futtasson egy Hive-lekérdezést az adott nap dátumtartományának adatainak kinyeréséhez a tábla által
rawFlights
képviselt forrás CSV-fájlból, és szúrja be az adatokat aflights
táblába. - Hive-lekérdezés futtatásával dinamikusan létrehozhat egy átmeneti táblát a Hive-ben a napra vonatkozóan, amely tartalmazza a repülési adatok nap és szolgáltató szerint összegzett másolatát.
- Az Apache Sqoop használatával másolja az összes adatot a Hive napi átmeneti táblájából az Azure SQL Database céltáblájára
dailyflights
. A Sqoop beolvassa a forrássorokat az Azure Storage-ban található Hive-tábla mögötti adatokból, és JDBC-kapcsolat használatával betölti őket az SQL Database-be.
Ezt a három lépést egy Oozie-munkafolyamat koordinálja.
A helyi munkaállomásról hozzon létre egy .
job.properties
Használja az alábbi szöveget a fájl kezdő tartalmaként. Ezután frissítse az adott környezet értékeit. A szöveg alatti táblázat összefoglalja az egyes tulajdonságokat, és jelzi, hogy hol találhatók a saját környezet értékei.nameNode=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net jobTracker=[ACTIVERESOURCEMANAGER]:8050 queueName=default oozie.use.system.libpath=true appBase=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net/oozie oozie.wf.application.path=${appBase}/load_flights_by_day hiveScriptLoadPartition=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net/oozie/load_flights_by_day/hive-load-flights-partition.hql hiveScriptCreateDailyTable=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net/oozie/load_flights_by_day/hive-create-daily-summary-table.hql hiveDailyTableName=dailyflights${year}${month}${day} hiveDataFolder=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net/example/data/flights/day/${year}/${month}/${day} sqlDatabaseConnectionString="jdbc:sqlserver://[SERVERNAME].database.windows.net;user=[USERNAME];password=[PASSWORD];database=[DATABASENAME]" sqlDatabaseTableName=dailyflights year=2017 month=01 day=03
Tulajdonság Értékforrás nameNode A HDInsight-fürthöz csatolt Azure Storage-tároló teljes elérési útja. jobTracker Az aktív fürt YARN-főcsomópontjának belső állomásneve. Az Ambari kezdőlapján válassza a YARN elemet a szolgáltatások listájából, majd válassza az Active Resource Manager lehetőséget. A gazdagépnév URI a lap tetején jelenik meg. Fűzze hozzá a 8050-ös portot. queueName A Hive-műveletek ütemezéséhez használt YARN-üzenetsor neve. Hagyja meg az alapértelmezett beállítást. oozie.use.system.libpath Hagyja igazként. appBase Az Azure Storage almappájának elérési útja, ahol üzembe helyezi az Oozie-munkafolyamatot és a támogató fájlokat. oozie.wf.application.path A futtatandó Oozie-munkafolyamat workflow.xml
helye.hiveScriptLoadPartition Az Azure Storage elérési útja a Hive-lekérdezésfájlhoz hive-load-flights-partition.hql
.hiveScriptCreateDailyTable Az Azure Storage elérési útja a Hive-lekérdezésfájlhoz hive-create-daily-summary-table.hql
.hiveDailyTableName Az előkészítési táblához használandó dinamikusan létrehozott név. hiveDataFolder Az Azure Storage elérési útja az előkészítési tábla által tárolt adatokhoz. sqlDatabase Csatlakozás ionString A JDBC szintaxisa kapcsolati sztring az Azure SQL Database-hez. sqlDatabaseTableName Annak a táblának a neve az Azure SQL Database-ben, amelybe összegző sorokat szúr be. Hagyja meg a dailyflights
.Év Annak a napnak az évösszetevője, amelyre a repülési összegzések ki vannak számítva. Hagyja változatlanul. hónap Annak a napnak a hónapösszetevője, amelyre a repülési összegzések ki vannak számítva. Hagyja változatlanul. Nap Annak a napnak a hónapösszetevője, amelyre a repülési összegzések ki vannak számítva. Hagyja változatlanul. A helyi munkaállomásról hozzon létre egy .
hive-load-flights-partition.hql
Használja az alábbi kódot a fájl tartalmaként.SET hive.exec.dynamic.partition.mode=nonstrict; INSERT OVERWRITE TABLE flights PARTITION (YEAR, MONTH, DAY_OF_MONTH) SELECT FL_DATE, CARRIER, FL_NUM, ORIGIN, DEST, DEP_DELAY, ARR_DELAY, ACTUAL_ELAPSED_TIME, DISTANCE, YEAR, MONTH, DAY_OF_MONTH FROM rawflights WHERE year = ${year} AND month = ${month} AND day_of_month = ${day};
Az Oozie változók a szintaxist
${variableName}
használják. Ezek a változók ajob.properties
fájlban vannak beállítva. Az Oozie futásidőben helyettesíti a tényleges értékeket.A helyi munkaállomásról hozzon létre egy .
hive-create-daily-summary-table.hql
Használja az alábbi kódot a fájl tartalmaként.DROP TABLE ${hiveTableName}; CREATE EXTERNAL TABLE ${hiveTableName} ( YEAR INT, MONTH INT, DAY_OF_MONTH INT, CARRIER STRING, AVG_DEP_DELAY FLOAT, AVG_ARR_DELAY FLOAT, TOTAL_DISTANCE FLOAT ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TEXTFILE LOCATION '${hiveDataFolder}'; INSERT OVERWRITE TABLE ${hiveTableName} SELECT year, month, day_of_month, carrier, avg(dep_delay) avg_dep_delay, avg(arr_delay) avg_arr_delay, sum(distance) total_distance FROM flights GROUP BY year, month, day_of_month, carrier HAVING year = ${year} AND month = ${month} AND day_of_month = ${day};
Ez a lekérdezés létrehoz egy átmeneti táblát, amely csak egy napig tárolja az összesített adatokat. Jegyezze fel a Standard kiadás LECT utasítást, amely kiszámítja az átlagos késéseket és a szállítók által naponta megtett távolságok összegét. A táblába beszúrt adatok egy ismert helyen (a hiveDataFolder változó által jelzett útvonalon) tároltak, hogy a következő lépésben a Sqoop forrásaként lehessen használni őket.
A helyi munkaállomásról hozzon létre egy .
workflow.xml
Használja az alábbi kódot a fájl tartalmaként. A fenti lépések az Oozie-munkafolyamat-fájlban külön műveletekként vannak kifejezve.<workflow-app name="loadflightstable" xmlns="uri:oozie:workflow:0.5"> <start to = "RunHiveLoadFlightsScript"/> <action name="RunHiveLoadFlightsScript"> <hive xmlns="uri:oozie:hive-action:0.2"> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <configuration> <property> <name>mapred.job.queue.name</name> <value>${queueName}</value> </property> </configuration> <script>${hiveScriptLoadPartition}</script> <param>year=${year}</param> <param>month=${month}</param> <param>day=${day}</param> </hive> <ok to="RunHiveCreateDailyFlightTableScript"/> <error to="fail"/> </action> <action name="RunHiveCreateDailyFlightTableScript"> <hive xmlns="uri:oozie:hive-action:0.2"> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <configuration> <property> <name>mapred.job.queue.name</name> <value>${queueName}</value> </property> </configuration> <script>${hiveScriptCreateDailyTable}</script> <param>hiveTableName=${hiveDailyTableName}</param> <param>year=${year}</param> <param>month=${month}</param> <param>day=${day}</param> <param>hiveDataFolder=${hiveDataFolder}/${year}/${month}/${day}</param> </hive> <ok to="RunSqoopExport"/> <error to="fail"/> </action> <action name="RunSqoopExport"> <sqoop xmlns="uri:oozie:sqoop-action:0.2"> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <configuration> <property> <name>mapred.compress.map.output</name> <value>true</value> </property> </configuration> <arg>export</arg> <arg>--connect</arg> <arg>${sqlDatabaseConnectionString}</arg> <arg>--table</arg> <arg>${sqlDatabaseTableName}</arg> <arg>--export-dir</arg> <arg>${hiveDataFolder}/${year}/${month}/${day}</arg> <arg>-m</arg> <arg>1</arg> <arg>--input-fields-terminated-by</arg> <arg>"\t"</arg> <archive>mssql-jdbc-7.0.0.jre8.jar</archive> </sqoop> <ok to="end"/> <error to="fail"/> </action> <kill name="fail"> <message>Job failed, error message[${wf:errorMessage(wf:lastErrorNode())}] </message> </kill> <end name="end"/> </workflow-app>
A két Hive-lekérdezést az elérési út az Azure Storage-ban éri el, a fennmaradó változó értékeket pedig a job.properties
fájl adja meg. Ez a fájl úgy konfigurálja a munkafolyamatot, hogy a 2017. január 3-i dátumra fusson.
Az Oozie-munkafolyamat üzembe helyezése és futtatása
Használja a bash-munkamenet SCP-jét az Oozie-munkafolyamat (workflow.xml
), a Hive-lekérdezések (hive-load-flights-partition.hql
és ) és hive-create-daily-summary-table.hql
a feladatkonfiguráció (job.properties
) üzembe helyezéséhez. Oozie-ban csak a job.properties
fájl létezhet a fejcsomópont helyi tárolójában. Minden más fájlt HDFS-ben, ebben az esetben az Azure Storage-ban kell tárolni. A munkafolyamat által használt Sqoop-művelet egy JDBC-illesztőtől függ az SQL Database-vel való kommunikációhoz, amelyet át kell másolni a fő csomópontról a HDFS-be.
Hozza létre az
load_flights_by_day
almappát a felhasználó elérési útja alatt a fő csomópont helyi tárolójában. A nyitott ssh-munkamenetben hajtsa végre a következő parancsot:mkdir load_flights_by_day
Másolja az aktuális könyvtárban lévő összes fájlt (és
workflow.xml
job.properties
fájlokat) azload_flights_by_day
almappába. A helyi munkaállomáson hajtsa végre a következő parancsot:scp ./* sshuser@CLUSTERNAME-ssh.azurehdinsight.net:load_flights_by_day
Munkafolyamat-fájlok másolása a HDFS-be. A nyitott ssh-munkamenetben hajtsa végre a következő parancsokat:
cd load_flights_by_day hadoop fs -mkdir -p /oozie/load_flights_by_day hdfs dfs -put ./* /oozie/load_flights_by_day
Másolja
mssql-jdbc-7.0.0.jre8.jar
a helyi fő csomópontról a HDFS munkafolyamat-mappájába. Szükség szerint módosítsa a parancsot, ha a fürt más jar-fájlt tartalmaz. Szükség szerint módosítsaworkflow.xml
, hogy egy másik jar-fájlt tükrözze. A nyitott ssh-munkamenetben hajtsa végre a következő parancsot:hdfs dfs -put /usr/share/java/sqljdbc_7.0/enu/mssql-jdbc*.jar /oozie/load_flights_by_day
Futtassa a munkafolyamatot. A nyitott ssh-munkamenetben hajtsa végre a következő parancsot:
oozie job -config job.properties -run
Figyelje meg az állapotot az Oozie webkonzol használatával. Az Ambariban válassza az Oozie, a Gyorshivatkozások, majd az Oozie webkonzol lehetőséget. A Munkafolyamat-feladatok lapon válassza a Minden feladat lehetőséget.
Ha az állapot SUCC Enterprise kiadás DED, a beszúrt sorok megtekintéséhez kérdezze le az SQL Database táblát. Az Azure Portalon lépjen az SQL Database paneljére, válassza az Eszközök lehetőséget, és nyissa meg a Lekérdezésszerkesztő.
SELECT * FROM dailyflights
Most, hogy a munkafolyamat egyetlen tesztnapon fut, ezt a munkafolyamatot egy koordinátorral burkolhatja, amely ütemezi a munkafolyamatot, hogy naponta fusson.
A munkafolyamat futtatása koordinátorral
Ha úgy szeretné ütemezni ezt a munkafolyamatot, hogy naponta (vagy egy dátumtartományban minden nap) fusson, használhat egy koordinátort. A koordinátort egy XML-fájl határozza meg, például coordinator.xml
:
<coordinator-app name="daily_export" start="2017-01-01T00:00Z" end="2017-01-05T00:00Z" frequency="${coord:days(1)}" timezone="UTC" xmlns="uri:oozie:coordinator:0.4">
<datasets>
<dataset name="ds_input1" frequency="${coord:days(1)}" initial-instance="2016-12-31T00:00Z" timezone="UTC">
<uri-template>${sourceDataFolder}${YEAR}-${MONTH}-FlightData.csv</uri-template>
<done-flag></done-flag>
</dataset>
</datasets>
<input-events>
<data-in name="event_input1" dataset="ds_input1">
<instance>${coord:current(0)}</instance>
</data-in>
</input-events>
<action>
<workflow>
<app-path>${appBase}/load_flights_by_day</app-path>
<configuration>
<property>
<name>year</name>
<value>${coord:formatTime(coord:nominalTime(), 'yyyy')}</value>
</property>
<property>
<name>month</name>
<value>${coord:formatTime(coord:nominalTime(), 'MM')}</value>
</property>
<property>
<name>day</name>
<value>${coord:formatTime(coord:nominalTime(), 'dd')}</value>
</property>
<property>
<name>hiveScriptLoadPartition</name>
<value>${hiveScriptLoadPartition}</value>
</property>
<property>
<name>hiveScriptCreateDailyTable</name>
<value>${hiveScriptCreateDailyTable}</value>
</property>
<property>
<name>hiveDailyTableNamePrefix</name>
<value>${hiveDailyTableNamePrefix}</value>
</property>
<property>
<name>hiveDailyTableName</name>
<value>${hiveDailyTableNamePrefix}${coord:formatTime(coord:nominalTime(), 'yyyy')}${coord:formatTime(coord:nominalTime(), 'MM')}${coord:formatTime(coord:nominalTime(), 'dd')}</value>
</property>
<property>
<name>hiveDataFolderPrefix</name>
<value>${hiveDataFolderPrefix}</value>
</property>
<property>
<name>hiveDataFolder</name>
<value>${hiveDataFolderPrefix}${coord:formatTime(coord:nominalTime(), 'yyyy')}/${coord:formatTime(coord:nominalTime(), 'MM')}/${coord:formatTime(coord:nominalTime(), 'dd')}</value>
</property>
<property>
<name>sqlDatabaseConnectionString</name>
<value>${sqlDatabaseConnectionString}</value>
</property>
<property>
<name>sqlDatabaseTableName</name>
<value>${sqlDatabaseTableName}</value>
</property>
</configuration>
</workflow>
</action>
</coordinator-app>
Mint látható, a koordinátor többsége csak a konfigurációs információkat továbbítja a munkafolyamat-példánynak. Van azonban néhány fontos elem, amelyeket fel kell hívni.
1. pont: Maga
start
az elem ésend
azcoordinator-app
attribútumok vezérlik a koordinátor által futtatott időintervallumot.<coordinator-app ... start="2017-01-01T00:00Z" end="2017-01-05T00:00Z" frequency="${coord:days(1)}" ...>
A koordinátor feladata a műveletek ütemezése a
start
end
dátumtartományon belül, azfrequency
attribútum által megadott intervallumnak megfelelően. Minden ütemezett művelet a konfigurált módon futtatja a munkafolyamatot. A fenti koordinátordefinícióban a koordinátor úgy van konfigurálva, hogy 2017. január 1-től 2017. január 5-ig futtassa a műveleteket. A gyakoriságot az Oozie kifejezés nyelvének gyakorisági kifejezése${coord:days(1)}
egy napra állítja be. Ez azt eredményezi, hogy a koordinátor naponta egyszer ütemez egy műveletet (és így a munkafolyamatot). A korábbi dátumtartományok esetében, mint ebben a példában, a művelet ütemezése késedelem nélkül fog futni. A művelet futtatásának kezdőnapját névleges időnek nevezzük. Ha például 2017. január 1-jével szeretné feldolgozni az adatokat, a koordinátor 2017.01.01. 01.00:00 GMT névleges időpontban ütemezi a műveletet.2. pont: A munkafolyamat dátumtartományán belül az
dataset
elem meghatározza, hogy hol kell keresni a HDFS-ben egy adott dátumtartomány adataihoz, és konfigurálja, hogy az Oozie hogyan határozza meg, hogy az adatok még rendelkezésre állnak-e feldolgozásra.<dataset name="ds_input1" frequency="${coord:days(1)}" initial-instance="2016-12-31T00:00Z" timezone="UTC"> <uri-template>${sourceDataFolder}${YEAR}-${MONTH}-FlightData.csv</uri-template> <done-flag></done-flag> </dataset>
A HDFS-ben az adatok elérési útja dinamikusan épül fel az elemben
uri-template
megadott kifejezésnek megfelelően. Ebben a koordinátorban az adatkészlettel egynapos gyakoriságot is használnak. Míg a koordinátori elem kezdő és záró dátumai szabályozzák a műveletek ütemezését (és meghatározza azok névleges időpontját), azinitial-instance
adatkészleten ésfrequency
az adatkészleten az összeállításuri-template
során használt dátum számítását is szabályozza. Ebben az esetben állítsa be a kezdeti példányt egy nappal a koordinátor kezdete előtt, hogy biztosan az első nap (2017. január 1.) értékét vegye fel. Az adathalmaz dátumszámítása a (2016.12.31.) értékrőlinitial-instance
halad előre az adathalmaz gyakoriságának növekményeiben (egy nap), amíg meg nem találja azt a legutóbbi dátumot, amely nem felel meg a koordinátor által beállított névleges időnek (2017-01-01T00:00:00:00 GMT az első művelethez).Az üres
done-flag
elem azt jelzi, hogy amikor az Oozie a kijelölt időpontban ellenőrzi a bemeneti adatok jelenlétét, az Oozie egy könyvtár vagy fájl jelenlétével határozza meg az adatokat. Ebben az esetben egy csv-fájl jelenik meg. Ha csv-fájl van jelen, az Oozie feltételezi, hogy az adatok készen állnak, és elindít egy munkafolyamat-példányt a fájl feldolgozásához. Ha nincs csv-fájl, az Oozie feltételezi, hogy az adatok még nem állnak készen, és a munkafolyamat futtatása várakozási állapotba kerül.3. pont: Az
data-in
elem megadja azt az időbélyeget, amelyet névleges időként kell használni a társított adathalmaz értékeinekuri-template
cseréjekor.<data-in name="event_input1" dataset="ds_input1"> <instance>${coord:current(0)}</instance> </data-in>
Ebben az esetben állítsa a példányt a kifejezésre
${coord:current(0)}
, amely a műveletnek a koordinátor által eredetileg ütemezett névleges idejét használja. Más szóval, amikor a koordinátor 2017.01.01-i névleges idő szerint ütemezi a műveletet, akkor 2017. 01. 01. az URI-sablon ÉV (2017) és MONTH (01) változóinak helyére kerül. Miután kiszámította az URI-sablont ehhez a példányhoz, az Oozie ellenőrzi, hogy elérhető-e a várt könyvtár vagy fájl, és ennek megfelelően ütemezi a munkafolyamat következő futtatását.
Az előző három pont egy olyan helyzetet eredményez, amelyben a koordinátor napról napra ütemezi a forrásadatok feldolgozását.
1. pont: A koordinátor a 2017-01-01 névleges dátummal kezdődik.
2. pont: Oozie a rendelkezésre álló adatokat keresi.
sourceDataFolder/2017-01-FlightData.csv
3. pont: Amikor Oozie megtalálja a fájlt, ütemezi a munkafolyamat egy példányát, amely feldolgozza az adatokat 2017. január 1-jei időpontra. Oozie ezután folytatja a feldolgozást 2017.01.02. Ez az értékelés legfeljebb 2017-01-05-ig ismétlődik, de nem.
A munkafolyamatokhoz hasonlóan a koordinátor konfigurációja is egy job.properties
fájlban van definiálva, amely a munkafolyamat által használt beállítások szuperhalmazával rendelkezik.
nameNode=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net
jobTracker=[ACTIVERESOURCEMANAGER]:8050
queueName=default
oozie.use.system.libpath=true
appBase=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net/oozie
oozie.coord.application.path=${appBase}
sourceDataFolder=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net/example/data/flights/
hiveScriptLoadPartition=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net/oozie/load_flights_by_day/hive-load-flights-partition.hql
hiveScriptCreateDailyTable=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net/oozie/load_flights_by_day/hive-create-daily-summary-table.hql
hiveDailyTableNamePrefix=dailyflights
hiveDataFolderPrefix=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net/example/data/flights/day/
sqlDatabaseConnectionString="jdbc:sqlserver://[SERVERNAME].database.windows.net;user=[USERNAME];password=[PASSWORD];database=[DATABASENAME]"
sqlDatabaseTableName=dailyflights
A fájlban job.properties
csak a következő új tulajdonságok jelennek meg:
Tulajdonság | Értékforrás |
---|---|
oozie.coord.application.path | A futtatandó Oozie-koordinátort tartalmazó fájl helyét coordinator.xml jelzi. |
hiveDailyTableNamePrefix | Az előkészítési tábla táblanevének dinamikus létrehozásakor használt előtag. |
hiveDataFolderPrefix | Annak az elérési útnak az előtagja, amelyben az összes előkészítési tábla tárolása történik. |
Az Oozie-koordinátor üzembe helyezése és futtatása
A folyamat koordinátorral való futtatásához a munkafolyamathoz hasonlóan kell haladnia, kivéve, ha egy olyan mappából dolgozik, amely egy szinttel a munkafolyamatot tartalmazó mappa felett található. Ez a mappakonvenció elkülöníti a koordinátorokat a lemez munkafolyamataitól, így egy koordinátort társíthat különböző gyermek-munkafolyamatokhoz.
A helyi gépről származó SCP használatával másolja a koordinátor fájljait a fürt fő csomópontjának helyi tárolójához.
scp ./* sshuser@CLUSTERNAME-ssh.azurehdinsight.net:~
SSH a fő csomópontba.
ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
Másolja a koordinátor fájljait a HDFS-be.
hdfs dfs -put ./* /oozie/
Futtassa a koordinátort.
oozie job -config job.properties -run
Ellenőrizze az állapotot az Oozie webkonzol használatával, ezúttal válassza a Koordinátori feladatok lapot, majd a Minden feladatot.
Válasszon ki egy koordinátorpéldányt az ütemezett műveletek listájának megjelenítéséhez. Ebben az esetben négy olyan műveletet kell látnia, amelyek névleges ideje 2017. január 1-től 2017. január 4-ig tart.
A listában szereplő műveletek a munkafolyamat egy olyan példányának felelnek meg, amely egy napnyi adatot dolgoz fel, ahol az adott nap kezdetét a névleges idő jelzi.